Hi,

This is the first time that am using Kafka Stream. I would like to read
from input topic and write to output topic. However, I do not see the word
count when I try to run below example. Looks like that it does not connect
to Kafka. I do not see any error though. I tried my localhost kafka as well
as the container in a VM, same situation.

There are over 200 message in the input kafka topic.

Your input is appreciated!

Best regards,
Mina

import org.apache.kafka.common.serialization.*;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;

import java.util.*;
import java.util.regex.*;

public class WordCountExample {


   public static void main(String [] args)   {
      final Properties streamsConfiguration = new Properties();
      streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
"wordcount-streaming");
      streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
"<IPADDRESS>:9092");
      streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
      streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
      streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
10 * 1000);

      final Serde<String> stringSerde = Serdes.String();
      final Serde<Long> longSerde = Serdes.Long();

      final KStreamBuilder builder = new KStreamBuilder();

      final KStream<String, String> textLines =
builder.stream(stringSerde, stringSerde, "wordcount-input");

      final Pattern pattern = Pattern.compile("\\W+",
Pattern.UNICODE_CHARACTER_CLASS);

      final KStream<String, Long> wordCounts = textLines
            .flatMapValues(value ->
Arrays.asList(pattern.split(value.toLowerCase())))
            .groupBy((key, word) -> word)
            .count("Counts")
            .toStream();


      wordCounts.to(stringSerde, longSerde, "wordcount-output");

      final KafkaStreams streams = new KafkaStreams(builder,
streamsConfiguration);
      streams.cleanUp();
      streams.start();

      Runtime.getRuntime().addShutdownHook(new Thread(streams::close));  }
}

Reply via email to