プログラミングと旅と映画の日々

普段はスマホ決済サービスの会社でバッグエンドを担当しているエンジニアです。プログラミングと趣味の映画、株、時々うどんに関してブログを書いていこうと思います。海外ドラマ、クロスバイクも好きです。

【Kafka】KafkaのJava client APIでconsumerとproducerを実行してみる【Java】

Kafkaを触ったことがなかったので試しにlocalで実行してみます

 

kafkaを予めbrew でinstallしておきます

`brew install kafka`

 

installしたらkafkaを起動します

 

kafka-server-start /usr/local/etc/kafka/server.properties

 

kafkaにtopicを作成しておきます

今回はmy-topic

kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my-topic
 

consumer起動

$ kafka-console-consumer --bootstrap-server localhost:9092 --topic hello_kafka --from-beginning

 

producer起動

$ kafka-console-producer --broker-list localhost:9092 --topic hello_kafka

 

以降はmavenjava clientAPIを試します

public class HozTest {

@Test
public void gettingStartedProducer() {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
ArrayList<String> integers = new ArrayList<>();
integers.add("hoge");
integers.add("huga");


try (KafkaProducer<Integer, String> producer =
new KafkaProducer<>(properties, new IntegerSerializer(), new StringSerializer())) {
IntStream
.rangeClosed(1, 10)
.forEach(i -> {
try {
producer.send(new ProducerRecord<>("my-topic", i, "value" + i)).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
}
}

@Test
public void gettingStartedConsumer() {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
KafkaConsumer<Integer, String> consumer =
new KafkaConsumer<>(properties, new IntegerDeserializer(), new StringDeserializer());

List<ConsumerRecord<Integer, String>> received = new ArrayList<>();

consumer.subscribe(Arrays.asList("my-topic"));


//print each record.
received.forEach(record -> {
System.out.println("Record Key " + record.key());
System.out.println("Record value " + record.value());
System.out.println("Record partition " + record.partition());
System.out.println("Record offset " + record.offset());
}); }
}

 https://github.com/takanorihozumi/kafka-java/blob/master/src/test/java/com/example/chapter4/HozTest.java