I reset and still not working! My env is setup using http://docs.confluent.io/3.2.0/cp-docker-images/docs/quickstart.html
I just tried using https://github.com/confluentinc/examples/blob/3.2.x/kafka-streams/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java#L178-L181 with all the topics(e.g. TextLinesTopic and WordsWithCountsTopic) created from scratch as went through the steps as directed. When I stopped the java program and check the topics below are the data in each topic. docker run \ --net=host \ --rm \ confluentinc/cp-kafka:3.2.0 \ kafka-console-consumer --bootstrap-server localhost:29092 --topic TextLinesTopic --new-consumer --from-beginning SHOWS hello kafka streams all streams lead to kafka join kafka summit test1 test2 test3 test4 FOR WordsWithCountsTopic nothing is shown I am new to the Kafka/Kafka Stream and still do not understand why a simple example does not work! On Tue, Mar 14, 2017 at 1:03 PM, Matthias J. Sax <matth...@confluent.io> wrote: > >> So, when I check the number of messages in wordCount-input I see the > same > >> messages. However, when I run below code I do not see any message/data > in > >> wordCount-output. > > Did you reset your application? > > Each time you run you app and restart it, it will resume processing > where it left off. Thus, if something went wrong in you first run but > you got committed offsets, the app will not re-read the whole topic. > > You can check committed offset via bin/kafka-consumer-groups.sh. The > application-id from StreamConfig is used a group.id. > > Thus, resetting you app would be required to consumer the input topic > from scratch. Of you just write new data to you input topic. > > >> Can I connect to kafka in VM/docker container using below code or do I > need > >> to change/add other parameters? How can I submit the code to > >> kafka/kafka-connect? Do we have similar concept as SPARK to submit the > >> code(e.g. jar file)? > > A Streams app is a regular Java application and can run anywhere -- > there is no notion of a processing cluster and you don't "submit" your > code -- you just run your app. > > Thus, if your console consumer can connect to the cluster, your Streams > app should also be able to connect to the cluster. > > > Maybe, the short runtime of 5 seconds could be a problem (even if it > seems log to process just a few records). But you might need to put > startup delay into account. I would recommend to register a shutdown > hook: see > https://github.com/confluentinc/examples/blob/3. > 2.x/kafka-streams/src/main/java/io/confluent/examples/streams/ > WordCountLambdaExample.java#L178-L181 > > > Hope this helps. > > -Matthias > > > On 3/13/17 7:30 PM, Mina Aslani wrote: > > Hi Matthias, > > > > Thank you for the quick response, appreciate it! > > > > I created the topics wordCount-input and wordCount-output. Pushed some > data > > to wordCount-input using > > > > docker exec -it $(docker ps -f "name=kafka\\." --format "{{.Names}}") > > /bin/kafka-console-producer --broker-list localhost:9092 --topic > > wordCount-input > > > > test > > > > new > > > > word > > > > count > > > > wordcount > > > > word count > > > > So, when I check the number of messages in wordCount-input I see the same > > messages. However, when I run below code I do not see any message/data in > > wordCount-output. > > > > Can I connect to kafka in VM/docker container using below code or do I > need > > to change/add other parameters? How can I submit the code to > > kafka/kafka-connect? Do we have similar concept as SPARK to submit the > > code(e.g. jar file)? > > > > I really appreciate your input as I am blocked and cannot run even below > > simple example. > > > > Best regards, > > Mina > > > > I changed the code to be as below: > > > > Properties props = new Properties(); > > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordCount-streaming"); > > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "<ipAddress>:9092"); > > props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > > Serdes.String().getClass().getName()); > > props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > > Serdes.String().getClass().getName()); > > > > props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10); > > props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); > > > > // setting offset reset to earliest so that we can re-run the demo > > code with the same pre-loaded data > > // Note: To re-run the demo, you need to use the offset reset tool: > > // https://cwiki.apache.org/confluence/display/KAFKA/ > Kafka+Streams+Application+Reset+Tool > > props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); > > > > KStreamBuilder builder = new KStreamBuilder(); > > > > KStream<String, String> source = builder.stream("wordCount-input"); > > > > KTable<String, Long> counts = source > > .flatMapValues(new ValueMapper<String, Iterable<String>>() { > > @Override > > public Iterable<String> apply(String value) { > > return > > Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")); > > } > > }).map(new KeyValueMapper<String, String, KeyValue<String, > String>>() { > > @Override > > public KeyValue<String, String> apply(String key, String value) > { > > return new KeyValue<>(value, value); > > } > > }) > > .groupByKey() > > .count("Counts"); > > > > // need to override value serde to Long type > > counts.to(Serdes.String(), Serdes.Long(), "wordCount-output"); > > > > KafkaStreams streams = new KafkaStreams(builder, props); > > streams.start(); > > > > // usually the stream application would be running forever, > > // in this example we just let it run for some time and stop since the > > input data is finite. > > Thread.sleep(5000L); > > > > streams.close(); > > > > > > > > > > On Mon, Mar 13, 2017 at 4:09 PM, Matthias J. Sax <matth...@confluent.io> > > wrote: > > > >> Maybe you need to reset your application using the reset tool: > >> http://docs.confluent.io/current/streams/developer- > >> guide.html#application-reset-tool > >> > >> Also keep in mind, that KTables buffer internally, and thus, you might > >> only see data on commit. > >> > >> Try to reduce commit interval or disable caching by setting > >> "cache.max.bytes.buffering" to zero in your StreamsConfig. > >> > >> > >> -Matthias > >> > >> On 3/13/17 12:29 PM, Mina Aslani wrote: > >>> Hi, > >>> > >>> This is the first time that am using Kafka Stream. I would like to read > >>> from input topic and write to output topic. However, I do not see the > >> word > >>> count when I try to run below example. Looks like that it does not > >> connect > >>> to Kafka. I do not see any error though. I tried my localhost kafka as > >> well > >>> as the container in a VM, same situation. > >>> > >>> There are over 200 message in the input kafka topic. > >>> > >>> Your input is appreciated! > >>> > >>> Best regards, > >>> Mina > >>> > >>> import org.apache.kafka.common.serialization.*; > >>> import org.apache.kafka.streams.*; > >>> import org.apache.kafka.streams.kstream.*; > >>> > >>> import java.util.*; > >>> import java.util.regex.*; > >>> > >>> public class WordCountExample { > >>> > >>> > >>> public static void main(String [] args) { > >>> final Properties streamsConfiguration = new Properties(); > >>> streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, > >>> "wordcount-streaming"); > >>> streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > >>> "<IPADDRESS>:9092"); > >>> streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > >>> Serdes.String().getClass().getName()); > >>> streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > >>> Serdes.String().getClass().getName()); > >>> streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_ > MS_CONFIG, > >>> 10 * 1000); > >>> > >>> final Serde<String> stringSerde = Serdes.String(); > >>> final Serde<Long> longSerde = Serdes.Long(); > >>> > >>> final KStreamBuilder builder = new KStreamBuilder(); > >>> > >>> final KStream<String, String> textLines = > >>> builder.stream(stringSerde, stringSerde, "wordcount-input"); > >>> > >>> final Pattern pattern = Pattern.compile("\\W+", > >>> Pattern.UNICODE_CHARACTER_CLASS); > >>> > >>> final KStream<String, Long> wordCounts = textLines > >>> .flatMapValues(value -> > >>> Arrays.asList(pattern.split(value.toLowerCase()))) > >>> .groupBy((key, word) -> word) > >>> .count("Counts") > >>> .toStream(); > >>> > >>> > >>> wordCounts.to(stringSerde, longSerde, "wordcount-output"); > >>> > >>> final KafkaStreams streams = new KafkaStreams(builder, > >>> streamsConfiguration); > >>> streams.cleanUp(); > >>> streams.start(); > >>> > >>> Runtime.getRuntime().addShutdownHook(new > >> Thread(streams::close)); } > >>> } > >>> > >> > >> > > > >