"Welcome To Ashok IT" "Spring Boot and MicroServices" Topic : Apache Kafka Date : 18/02/2025 (Session - 127) _____________________________________________________________________________________________________________________________ Today Session ============= * Kafka is just like messaging system Example : p1,p2(producers) --------------> KafkaSystem-------------------> c1,c2,c3(consumers) * It is distributed platform/application In production environment kafka reffered as kafka cluster A Cluster is madeup more than one kafka server. Each Kafka Server is referred as Broker. * Kafka is "Fault Tolerance" Ability of system to continue operating without interruption when one or more of its components fails. In Kafka Cluster messages are replicated with multiple brokers Replication Factor (When We are publishing the message and these messages are replicated into multiple clusters) * Kafka is "Scalabe System" We can add new brokers anytime We can increase number of consumers. * Kafka is an open-source, distributed streaming platform * It was originally developed by LinkedIn and later open-sourced as an Apache Software Foundation project. Kafka Components ================ 1.Producer ======== * Producers are responsible for publishing data to Kafka topics. * They send records (messages) to Kafka brokers. * Producers can be implemented in various programming languages. 2.Broker ======== * Brokers are the Kafka server instances within a Kafka cluster. * They receive records from producers, store these records in topics, and serve records to consumers. 3.Consumer ========== * Consumers are responsible for subscribing to Kafka topics and reading records from those topics. * They can be part of a consumer group. * Each consumer reads from one or more partitions. 4.Cluster ========= * Kafka clusters can have multiple brokers to achieve scalability and fault tolerance. 5.Topic ======= * Topics are logical channels or categories for data streams. * They are used to organize and categorize data. * Each topic can have one or more partitions. 6.Partitions ============ * Partitions are the basic unit of parallelism and distribution in Kafka. * Each topic is divided into partitions, allowing for parallel processing. * Partitions are stored on individual broker nodes. * Data within a partition is ordered and immutable. 7.offSet ======== * Each message in a partition is assigned a unique offset, which represents the message's position in the partition. * Consumers keep track of their current offset, which allows them to resume reading from where they left off. 8.Consumer groups ================= * A consumer group is a logical grouping of consumers that work together to consume records from one or more partitions of a topic. * Kafka ensures that each record is consumed by only one consumer within a group. 9.zookeeper =========== * In earlier versions of Kafka, ZooKeeper was used for managing cluster metadata and leader election. * In newer versions(since Kafka 2.8), ZooKeeper is not required, and Kafka uses its internal metadata management. 10.Producer API and Consumer API ================================ * Kafka provides producer and consumer APIs in various programming languages to interact with Kafka clusters. * These APIs are used to publish and consume records. Kafka Installation ================== Step-1 : Download the Apache Kafka Software with below URL https://kafka.apache.org/downloads Binary downloads:(kafka_2.12-3.6.0.tgz(asc,sha512)) Step-2 : After completing downloading we need to extract the zip folder into normal folder. The following are the below folders are available in extracted folder kafka_2.12-3.6.0 | |-------------------> bin | |----------------> *.bat | |-------------------> config | |----------------> *.properties | |-------------------> libs | |----------------> *.jar | |-------------------> logs | |----------------> *.logs | |-------------------> site-docs |----------------> documentation Step-3 : We need to install the offset explorer for viewing the Apache Kafka Components https://www.kafkatool.com/download.html Complete installation process just like normal software. Step-4 : Interacting with Apache Kafka CLI Interface using commands ****************************** Starting the Zookeeper Server ****************************** -> C:\kafka_2.12-3.6.0>.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties >>>>>>> 2181 ******************* Apache Kafka Server ******************* -> C:\kafka_2.12-3.6.0>.\bin\windows\kafka-server-start.bat .\config\server.properties >>>>>>>>>>>>>>>9092 ****************************** Creating Topic in Apache Kafka ****************************** -> C:\kafka_2.12-3.6.0>.\bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --create --topic sampletopic3 --partitions 3 --replication-factor 1 ******* OUTPUT ******* Created topic sampletopic2. ******************************* Listing the Apache Kafka Topics ******************************* -> C:\kafka_2.12-3.6.0>.\bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --list ******* OUTPUT ******* sampletopic sampletopic2 sampletopics ******************************************* Viewing the Apache Kafka Partitions in Topic ******************************************* -> C:\kafka_2.12-3.6.0>.\bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --describe --topic order-topic ********************************** Creating the Apache Kafka Producer *********************************** -> C:\kafka_2.12-3.6.0>.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic sampletopic3 -> C:\kafka_2.12-3.6.0>.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic sampletopic3 ********************************** Creating the Apache Kafka Consumer *********************************** -> C:\kafka_2.12-3.6.0>.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic sampletopic3 --from-beginning Event-driven architecture(EDA) ============================== * An event is defined as a change of state of some key business system. For instance, somebody buys a product, someone else checks in for a flight or a bus is late arriving somewhere. And if one thinks about it, events exist everywhere and are constantly happening, no matter what industry. * Event-driven architecture (EDA) is a software design pattern in which decoupled applications can asynchronously publish and subscribe to events via an event broker/message broker. * In an Event-Driven Architecture, applications communicate with each other by sending and/or receiving events or messages. * Event-driven architecture is often referred to as “asynchronous” communication. This means that the sender and recipient don’t have to wait for each other to move on to their next task. Systems are not dependent on that one message. * Event-driven apps can be created in any programming language because event-driven is a programming approach, not a language. An event-driven architecture is loosely coupled. Example ======= Kafka Broker/Kafka Server OrderEvent OrderEvent(Listening) Order Service(Producer)>>>>>>>>>>>>>>>>>>>>>>>Topic(Orders-topic) <<<<<<<<<<<<<<<<<<<<<<<<<<<<< StockService Producer Application Steps ========================== 1) Creating new spring boot project with below dependencies * Spring Boot web * Spring Kafka * Lombok 2) Create the different packages in the project com.ashokit com.ashokit.config com.ashokit.controller com.ashokit.service com.ashokit.dto com.ashokit.events 3) Developing the source code for above packages application.properties ====================== server.port=1122 spring.kafka.topic.name=ashokit-orders-topic KafkaProducerConfig.java ======================== package com.ashokit.config; import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.TopicBuilder; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.support.serializer.JsonSerializer; import com.ashokit.events.OrderEvent; @Configuration public class KafkaProducerConfig { //collecting the topic name from application.properties file @Value("${spring.kafka.topic.name}") private String topicName; //Defining the Spring bean definition for creating topic through programmatic approach @Bean public NewTopic creatingNewTopic() { //Creating the new topic in kafka broker with 1 partition by default return TopicBuilder.name(topicName).build(); } @Bean public ProducerFactory producerFactory() { //creating map object for storing kafka configurations Map configProps = new HashMap<>(); //specifying the location for kafka broker configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); //specifying the serialization for key while publishing message to kafka broker configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); //specifying the serialization for value while publishing message to kafka broker configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return new DefaultKafkaProducerFactory<>(configProps); } @Bean public KafkaTemplate kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } } Order.java ========== package com.ashokit.dto; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; import lombok.ToString; @NoArgsConstructor @AllArgsConstructor @Setter @Getter @Builder @ToString public class Order { private String orderId; private String name; private int quantity; private double price; } OrderEvent.java =============== package com.ashokit.events; import com.ashokit.dto.Order; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @Data @AllArgsConstructor @NoArgsConstructor public class OrderEvent { private String message; private String status; private Order order; } OrderProducer.java ================== package com.ashokit.producer; import org.apache.kafka.clients.admin.NewTopic; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Service; import com.ashokit.events.OrderEvent; import lombok.extern.slf4j.Slf4j; @Service @Slf4j public class OrderProducer { @Autowired private NewTopic topic; @Autowired private KafkaTemplate kafkaTemplate; public String sendMessage(OrderEvent orderEvent) { log.info("*******OrderEvent Started************"); log.info(String.format("OrderEvent Data.....%s", orderEvent.toString())); //Preparing message to send for KafkaBroker Message kafkaMessage = MessageBuilder .withPayload(orderEvent)//setting the payload of OrderEvent .setHeader(KafkaHeaders.TOPIC, topic.name()) //setting the topic name .build(); try { //Sending message to KafkaBroker kafkaTemplate.send(kafkaMessage);//sending the message to kafka system log.info("*******OrderEvent Completed************"); return "Order Placed Successfully...."; }catch(Exception e) { e.printStackTrace(); return "Problem Occurred While Placing Order...."; } } } OrderController.java ==================== package com.ashokit.controller; import java.util.UUID; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import com.ashokit.dto.Order; import com.ashokit.events.OrderEvent; import com.ashokit.producer.OrderProducer; @RestController @RequestMapping("/api/v1") public class OrderController { private OrderProducer orderProducer; //Constructor Injection public OrderController(OrderProducer orderProducer) { this.orderProducer = orderProducer; } @PostMapping("/createNewOrder") public ResponseEntity placingOrder(@RequestBody Order order){ //setting orderId for Order Payload order.setOrderId(UUID.randomUUID().toString()); //Creating OrderEvent for sending OrderEvent orderEvent = new OrderEvent(); orderEvent.setStatus("PENDING"); orderEvent.setMessage("Order Status is Pending Status...."); orderEvent.setOrder(order); //calling Service Method return new ResponseEntity(orderProducer.sendMessage(orderEvent),HttpStatus.CREATED); } } 4) Executing the Application * Start the Zookeeper Server & Kafka Server with command Prompt given commands C:\kafka_2.12-3.6.0>.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties >>>>>>> 2181 C:\kafka_2.12-3.6.0>.\bin\windows\kafka-server-start.bat .\config\server.properties >>>>>>>>>>>>>>>9092 NOTE ==== If we have any antivirus software which is blocking firewall disable that for few mins * Open the PostMan Tool and Send Request for Placing the Order with below URL http://localhost:1122/api/v1/createNewOrder API Request Body ================ Post Request { "name":"Samsung", "quantity":25, "price":50000 } API Response Body ================= Order Placed Successfully.... * Open the Offset Explorer and view the Produced Messages Consumer Application Development ================================= 1) Creating new spring boot project with below dependencies * Spring Boot web * Spring Kafka * Lombok 2) Create the different packages in the project com.ashokit com.ashokit.config com.ashokit.controller com.ashokit.service com.ashokit.dto com.ashokit.events 3) Developing the Source Code application.properties ====================== #Defining the server Port Number server.port=1133 #Which Topic Wanted to Read the messages from kafka Server spring.kafka.topic.name=ashokit-orders-topic #Consumer group spring.kafka.consumer.group-id=stock #spring.kafka.consumer.properties.spring.json.trusted.packages=* KafkaConsumerConfig.java ======================== package com.ashokit.config; import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.support.serializer.JsonDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import com.ashokit.events.OrderEvent; @Configuration public class KafkaConsumerConfig { @Bean public ConsumerFactory consumerFactory() { Map configProps = new HashMap<>(); configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return new DefaultKafkaConsumerFactory<>(configProps, new StringDeserializer(), new JsonDeserializer<>(OrderEvent.class)); } @Bean public ConcurrentKafkaListenerContainerFactory kafkaListnerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } } Order.java ========== package com.ashokit.dto; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; @NoArgsConstructor @AllArgsConstructor @Setter @Getter @Builder public class Order { private String orderId; private String name; private int quantity; private double price; } OrderEvent.java ================ package com.ashokit.events; import com.ashokit.dto.Order; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import lombok.ToString; @Data @AllArgsConstructor @NoArgsConstructor @ToString public class OrderEvent { private String message; private String status; private Order order; } OrderConsumer.java ================== package com.ashokit.service; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; import com.ashokit.dto.Order; import com.ashokit.events.OrderEvent; import lombok.extern.slf4j.Slf4j; @Service @Slf4j public class OrderConsumer { @KafkaListener(topics = "${spring.kafka.topic.name}", groupId = "${spring.kafka.consumer.group-id}", containerFactory="kafkaListnerFactory") public void orderReceivedDetails(OrderEvent orderEvent) { log.info("Test::::", orderEvent); Order order = orderEvent.getOrder(); System.out.println("Tested Order:::::" + order.toString()); //save the data into database for processing order } } 4) Application Execution Run the Main Program and publish some message from Producer and will able to see some messages in logs of consumer application