Basic example to use Apache Kafka as communication between Microservices.
We will be integrating apache Kafka with spring-boot micro-services as a mode of communication.
We will have 2 micro-services —
- Publisher (the service that will send out the message)
- Subscriber (the service that will consume the message)
Pre-requisites & Setup :
Step.1 — Download and run Zookeeper . (used to track the status of kafka nodes and stores basic metadata such as information about topics, brokers, consumer offsets )
Note — If you face error running zookeeper service regarding config file, try to change the config file name to “zoo.cfg” .
Bash from zookeeper root — sh bin/zkServer.sh start
Step.2 — Download and run kafka server/broker .
Bash from kafka root — bin/kafka-server-start.sh config/server.properties
Now we are ready to use our micro-services which will act as a publisher and subscriber using this Kafka broker.
Publisher Springboot Application :
I’ve created a rest end point , which we will use to publish a message to the Kafka broker.
Dependencies :
application.properties :
server.port=9041spring.kafka.producer.key-serializer = org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.value-serializer = org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.bootstrap-servers=localhost:9092
Our pub app will run on 9041 and one property to note here is “spring.kafka.producer.bootstrap-servers” which is the server host&port where our broker is running.
You can check your broker config in kafka directory config folder in server.properties file .
@RestController
@RequestMapping(value = "/archive")
public class Controller {private final Publisher producer;@Autowired
public Controller(Publisher producer) {
this.producer = producer;
}@GetMapping
public void sendMessageToKafkaTopic(@RequestParam("message") String message){
this.producer.sendMessage(message);
}
}
The Publisher Service :
@Service
public class Publisher { private static final Logger logger = LoggerFactory.getLogger(Publisher.class);
private static final String TOPIC = "archived_docs"; @Autowired
private KafkaTemplate<String,String> kafkaTemplate;public void sendMessage(String message){ logger.info(String.format("$$ -> Producing message --> %s",message)); this.kafkaTemplate.send(TOPIC,message);
}
}
As we can see , topic that the message will be published to is “archived_docs”.
Subscriber Springboot Application :
Dependencies : same as publisher.
application.properties :
server.port = 9042
We can directly create a service that will listen to the messages from Kafka broker .
@Service
public class Subscriber {private final Logger logger = LoggerFactory.getLogger(Subscriber.class);@KafkaListener(topics = "archived_docs", groupId = "archived")
public void consume(String message){
logger.info(String.format("$$ -> Consumed Message -> %s",message)); }
}
Here this service is listening to incoming messages on the topic — “archived_docs”. As soon as this service consumes the message, we are logging it in the console .
Let’s publish some messages now :
Our publisher is running on 9041 , let’s call the API to publish .
http://localhost:9041/archive?message=kuttiStory
Now , if we check the subscriber logs , we should see the consumed message.
PS — One of the major merits of kafka is it makes the micro-services loosely coupled . One micro-service can just publish the message without caring when/if the other service gets it , its job (publishing the message) is done . Let’s say if consumer application is down . Not a problem . When it is running back again , consumer app will recieve that message when it is UP running .
Try it yourself , stop the consumer app , hit the producer API , and when you start the consumer app again , it will consume the message .
That’s all for the basic kafka workflow .
References :