Hi, I am using below code to read from a topic and count words and write to another topic. The example is the one in github. My kafka container is in the VM. I do not get any error but I do not see any result/output in my output ordCount-output topic either. The program also does not stop either!
Any idea? Best regards, Mina Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "cloudtrail-events-streaming"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.99.102:29092"); props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data // Note: To re-run the demo, you need to use the offset reset tool: // https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); KStreamBuilder builder = new KStreamBuilder(); KStream<String, String> source = builder.stream("wordCount-input"); KTable<String, Long> counts = source .flatMapValues(new ValueMapper<String, Iterable<String>>() { @Override public Iterable<String> apply(String value) { return Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")); } }).map(new KeyValueMapper<String, String, KeyValue<String, String>>() { @Override public KeyValue<String, String> apply(String key, String value) { return new KeyValue<>(value, value); } }) .groupByKey() .count("Counts"); // need to override value serde to Long type counts.to(Serdes.String(), Serdes.Long(), "wordCount-output"); LOGGER.info("counts:::::::::" + counts); KafkaStreams streams = new KafkaStreams(builder, props); streams.cleanUp(); streams.start(); // usually the stream application would be running forever, // in this example we just let it run for some time and stop since the input data is finite. Thread.sleep(5000L); streams.close();