Spring Cloud Stream — Functional Programming!
Spring cloud stream with functional programming.
Introduction
This story talks about using spring cloud stream and building binder agnostic event application using spring boot with functional programming. You might have used annotations while using spring cloud to define bindings channels[INPUT/OUTPUT], stream listeners which is now deprecated, I will not go into details of that but will mainly focus on building application with Functional Style. We will be using kafka as broker to publish messages & subscribe to those messages.
Functional Programming Style
- Producing value to a binder with functional programming - we use suppliers to provide value and the spring cloud framework binds these function names to produce messages to the broker[for our case i.e kafka]
- Processor uses Function<T,R> to take messages produced and processes it to another topic.
- Consumer i.e Consumers take those messages and perform actions required.
Deep Dive into Code
- Our supplier is as follows :
@Bean
public Supplier<Flux<Integer>> randomProducer() {
return ()->Flux.interval(Duration.ofSeconds(5)).map(value->random.nextInt(500 - 1) + 1).log();
}
Above supplier produces random values every 5 secs between 0 to 500.
- We will also use a processor to basically process messages via Function<T,R> as shown below :
@Bean
public Function<Flux<Integer>,Flux<Integer>> evenProcessor() {
return integerFlux -> integerFlux.map(this::getEvenNumber).log();
}
Above function takes the produced message and checks if it is a even number take it else returns 0. Sample implementation for the even check :
private int getEvenNumber(int i){
if(i%2 == 0)
return i;
else
return 0;
}
- Finally our consumer which is a plain consumer prints the values to console as follows :
@Bean
public Consumer<Flux<Integer>> numberConsumer() {
return System.out::println;
}
If you want to look at the whole code, click on the github link
Configuration
- Application yml contains configs defined as function names along with in & out bindings [in being reading from & out being produced to ]
- It also contains destinations to read & produce messages to topic names.
- And last but not the least specifying binder details which in our case is kafka.
server:
port: 9002
spring:
cloud:
stream:
function:
definition: randomProducer;evenProcessor;numberConsumer
bindings:
randomProducer-out-0:
destination: numbers
evenProcessor-in-0:
destination: numbers
evenProcessor-out-0:
destination: even
numberConsumer-in-0:
destination: even
kafka:
binder:
brokers: localhost:9092
auto-create-topics: true
Final Words
Spring cloud stream provide an easy way to integrate and build event based applications which helps you to focus only on business requirements.
Functional programming with cloud streams keeps our life very simple and just focus on writing and integrating via configs unlike remembering & searching for annotations :)
Keep Writing & reading ! Thank you :)