【Kafka】KafkaのJava client APIでconsumerとproducerを実行してみる【Java】
Kafkaを触ったことがなかったので試しにlocalで実行してみます
kafkaを予めbrew でinstallしておきます
`brew install kafka`
installしたらkafkaを起動します
kafka-server-start /usr/local/etc/kafka/server.properties
kafkaにtopicを作成しておきます
今回はmy-topic
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my-topic
consumer起動
$ kafka-console-consumer --bootstrap-server localhost:9092 --topic hello_kafka --from-beginning
producer起動
$ kafka-console-producer --broker-list localhost:9092 --topic hello_kafka
public class HozTest {
@Test
public void gettingStartedProducer() {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
ArrayList<String> integers = new ArrayList<>();
integers.add("hoge");
integers.add("huga");
try (KafkaProducer<Integer, String> producer =
new KafkaProducer<>(properties, new IntegerSerializer(), new StringSerializer())) {
IntStream
.rangeClosed(1, 10)
.forEach(i -> {
try {
producer.send(new ProducerRecord<>("my-topic", i, "value" + i)).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
}
}
@Test
public void gettingStartedConsumer() {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
KafkaConsumer<Integer, String> consumer =
new KafkaConsumer<>(properties, new IntegerDeserializer(), new StringDeserializer());
List<ConsumerRecord<Integer, String>> received = new ArrayList<>();
consumer.subscribe(Arrays.asList("my-topic"));
//print each record.
received.forEach(record -> {
System.out.println("Record Key " + record.key());
System.out.println("Record value " + record.value());
System.out.println("Record partition " + record.partition());
System.out.println("Record offset " + record.offset());
}); }
}