Kafka Beginner's Tutorial
Table of Contents
Installation
To get started with Apache Kafka, you can follow these steps:
- Download Apache Kafka from the official website.
- Extract the downloaded archive to a suitable location on your system.
- Set up ZooKeeper (required for Kafka) by starting ZooKeeper server:
bin/zookeeper-server-start.sh config/zookeeper.properties
- Start Kafka broker by running:
bin/kafka-server-start.sh config/server.properties
Writing Producers
To write a producer in Kafka, follow these steps:
- Create a new Java project in your preferred IDE.
- Add Kafka dependencies to your project, such as
kafka-clients
. - Write code to configure Kafka producer properties:
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class KafkaProducerExample {
public static void main(String[] args) {
// Set up producer properties
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// Create Kafka producer
Producer producer = new KafkaProducer<>(props);
// Send messages to Kafka topic
String topic = "my-topic";
for (int i = 0; i < 10; i++) {
String key = "key-" + i;
String value = "Message " + i;
ProducerRecord record = new ProducerRecord<>(topic, key, value);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println("Message sent successfully: " + metadata);
} else {
System.err.println("Error sending message: " + exception.getMessage());
}
}
});
}
// Close the producer
producer.close();
}
}
Writing Consumers
To write a consumer in Kafka, follow these steps:
- Create a new Java project in your preferred IDE.
- Add Kafka dependencies to your project, such as
kafka-clients
. - Write code to configure Kafka consumer properties:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
- Write code to subscribe to Kafka topics:
String topic = "my-topic";
consumer.subscribe(Collections.singletonList(topic));
- Write code to consume messages from Kafka topics:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Received message: key=%s, value=%s%n", record.key(), record.value());
}
}
Make sure to handle exceptions and gracefully close the consumer when done.
No comments:
Post a Comment