Hi there,

I noticed in your example that you are using localhost:9092 to produce but 
localhost:29092 to consume? Or is that a typo? Is zookeeper, kafka, and the 
Kafka Streams app all running within one docker container, or in different 
containers?

I just tested the WordCountLambdaExample and it works for me. This might not 
have anything to do with streams, but rather with the Kafka configuration and 
whether streams (that is just an app) can reach Kafka at all. If you provide 
the above information we can look further.



Thanks
Eno

> On 14 Mar 2017, at 18:42, Mina Aslani <aslanim...@gmail.com> wrote:
> 
> I reset and still not working!
> 
> My env is setup using
> http://docs.confluent.io/3.2.0/cp-docker-images/docs/quickstart.html
> 
> I just tried using
> https://github.com/confluentinc/examples/blob/3.2.x/kafka-streams/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java#L178-L181
> with all the topics(e.g. TextLinesTopic and WordsWithCountsTopic) created
> from scratch as went through the steps as directed.
> 
> When I stopped the java program and check the topics below are the data in
> each topic.
> 
> docker run \
> 
>  --net=host \
> 
>  --rm \
> 
>  confluentinc/cp-kafka:3.2.0 \
> 
>  kafka-console-consumer --bootstrap-server localhost:29092 --topic
> TextLinesTopic --new-consumer --from-beginning
> 
> 
> SHOWS
> 
> hello kafka streams
> 
> all streams lead to kafka
> 
> join kafka summit
> 
> test1
> 
> test2
> 
> test3
> 
> test4
> 
> FOR WordsWithCountsTopic nothing is shown
> 
> 
> I am new to the Kafka/Kafka Stream and still do not understand why a simple
> example does not work!
> 
> On Tue, Mar 14, 2017 at 1:03 PM, Matthias J. Sax <matth...@confluent.io>
> wrote:
> 
>>>> So, when I check the number of messages in wordCount-input I see the
>> same
>>>> messages. However, when I run below code I do not see any message/data
>> in
>>>> wordCount-output.
>> 
>> Did you reset your application?
>> 
>> Each time you run you app and restart it, it will resume processing
>> where it left off. Thus, if something went wrong in you first run but
>> you got committed offsets, the app will not re-read the whole topic.
>> 
>> You can check committed offset via bin/kafka-consumer-groups.sh. The
>> application-id from StreamConfig is used a group.id.
>> 
>> Thus, resetting you app would be required to consumer the input topic
>> from scratch. Of you just write new data to you input topic.
>> 
>>>> Can I connect to kafka in VM/docker container using below code or do I
>> need
>>>> to change/add other parameters? How can I submit the code to
>>>> kafka/kafka-connect? Do we have similar concept as SPARK to submit the
>>>> code(e.g. jar file)?
>> 
>> A Streams app is a regular Java application and can run anywhere --
>> there is no notion of a processing cluster and you don't "submit" your
>> code -- you just run your app.
>> 
>> Thus, if your console consumer can connect to the cluster, your Streams
>> app should also be able to connect to the cluster.
>> 
>> 
>> Maybe, the short runtime of 5 seconds could be a problem (even if it
>> seems log to process just a few records). But you might need to put
>> startup delay into account. I would recommend to register a shutdown
>> hook: see
>> https://github.com/confluentinc/examples/blob/3.
>> 2.x/kafka-streams/src/main/java/io/confluent/examples/streams/
>> WordCountLambdaExample.java#L178-L181
>> 
>> 
>> Hope this helps.
>> 
>> -Matthias
>> 
>> 
>> On 3/13/17 7:30 PM, Mina Aslani wrote:
>>> Hi Matthias,
>>> 
>>> Thank you for the quick response, appreciate it!
>>> 
>>> I created the topics wordCount-input and wordCount-output. Pushed some
>> data
>>> to wordCount-input using
>>> 
>>> docker exec -it $(docker ps -f "name=kafka\\." --format "{{.Names}}")
>>> /bin/kafka-console-producer --broker-list localhost:9092 --topic
>>> wordCount-input
>>> 
>>> test
>>> 
>>> new
>>> 
>>> word
>>> 
>>> count
>>> 
>>> wordcount
>>> 
>>> word count
>>> 
>>> So, when I check the number of messages in wordCount-input I see the same
>>> messages. However, when I run below code I do not see any message/data in
>>> wordCount-output.
>>> 
>>> Can I connect to kafka in VM/docker container using below code or do I
>> need
>>> to change/add other parameters? How can I submit the code to
>>> kafka/kafka-connect? Do we have similar concept as SPARK to submit the
>>> code(e.g. jar file)?
>>> 
>>> I really appreciate your input as I am blocked and cannot run even below
>>> simple example.
>>> 
>>> Best regards,
>>> Mina
>>> 
>>> I changed the code to be as below:
>>> 
>>> Properties props = new Properties();
>>> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordCount-streaming");
>>> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "<ipAddress>:9092");
>>> props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
>>> Serdes.String().getClass().getName());
>>> props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
>>> Serdes.String().getClass().getName());
>>> 
>>> props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10);
>>> props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
>>> 
>>> // setting offset reset to earliest so that we can re-run the demo
>>> code with the same pre-loaded data
>>> // Note: To re-run the demo, you need to use the offset reset tool:
>>> // https://cwiki.apache.org/confluence/display/KAFKA/
>> Kafka+Streams+Application+Reset+Tool
>>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
>>> 
>>> KStreamBuilder builder = new KStreamBuilder();
>>> 
>>> KStream<String, String> source = builder.stream("wordCount-input");
>>> 
>>> KTable<String, Long> counts = source
>>>      .flatMapValues(new ValueMapper<String, Iterable<String>>() {
>>>         @Override
>>>         public Iterable<String> apply(String value) {
>>>            return
>>> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));
>>>         }
>>>      }).map(new KeyValueMapper<String, String, KeyValue<String,
>> String>>() {
>>>         @Override
>>>         public KeyValue<String, String> apply(String key, String value)
>> {
>>>            return new KeyValue<>(value, value);
>>>         }
>>>      })
>>>      .groupByKey()
>>>      .count("Counts");
>>> 
>>> // need to override value serde to Long type
>>> counts.to(Serdes.String(), Serdes.Long(), "wordCount-output");
>>> 
>>> KafkaStreams streams = new KafkaStreams(builder, props);
>>> streams.start();
>>> 
>>> // usually the stream application would be running forever,
>>> // in this example we just let it run for some time and stop since the
>>> input data is finite.
>>> Thread.sleep(5000L);
>>> 
>>> streams.close();
>>> 
>>> 
>>> 
>>> 
>>> On Mon, Mar 13, 2017 at 4:09 PM, Matthias J. Sax <matth...@confluent.io>
>>> wrote:
>>> 
>>>> Maybe you need to reset your application using the reset tool:
>>>> http://docs.confluent.io/current/streams/developer-
>>>> guide.html#application-reset-tool
>>>> 
>>>> Also keep in mind, that KTables buffer internally, and thus, you might
>>>> only see data on commit.
>>>> 
>>>> Try to reduce commit interval or disable caching by setting
>>>> "cache.max.bytes.buffering" to zero in your StreamsConfig.
>>>> 
>>>> 
>>>> -Matthias
>>>> 
>>>> On 3/13/17 12:29 PM, Mina Aslani wrote:
>>>>> Hi,
>>>>> 
>>>>> This is the first time that am using Kafka Stream. I would like to read
>>>>> from input topic and write to output topic. However, I do not see the
>>>> word
>>>>> count when I try to run below example. Looks like that it does not
>>>> connect
>>>>> to Kafka. I do not see any error though. I tried my localhost kafka as
>>>> well
>>>>> as the container in a VM, same situation.
>>>>> 
>>>>> There are over 200 message in the input kafka topic.
>>>>> 
>>>>> Your input is appreciated!
>>>>> 
>>>>> Best regards,
>>>>> Mina
>>>>> 
>>>>> import org.apache.kafka.common.serialization.*;
>>>>> import org.apache.kafka.streams.*;
>>>>> import org.apache.kafka.streams.kstream.*;
>>>>> 
>>>>> import java.util.*;
>>>>> import java.util.regex.*;
>>>>> 
>>>>> public class WordCountExample {
>>>>> 
>>>>> 
>>>>>   public static void main(String [] args)   {
>>>>>      final Properties streamsConfiguration = new Properties();
>>>>>      streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
>>>>> "wordcount-streaming");
>>>>>      streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
>>>>> "<IPADDRESS>:9092");
>>>>>      streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
>>>>> Serdes.String().getClass().getName());
>>>>>      streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
>>>>> Serdes.String().getClass().getName());
>>>>>      streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_
>> MS_CONFIG,
>>>>> 10 * 1000);
>>>>> 
>>>>>      final Serde<String> stringSerde = Serdes.String();
>>>>>      final Serde<Long> longSerde = Serdes.Long();
>>>>> 
>>>>>      final KStreamBuilder builder = new KStreamBuilder();
>>>>> 
>>>>>      final KStream<String, String> textLines =
>>>>> builder.stream(stringSerde, stringSerde, "wordcount-input");
>>>>> 
>>>>>      final Pattern pattern = Pattern.compile("\\W+",
>>>>> Pattern.UNICODE_CHARACTER_CLASS);
>>>>> 
>>>>>      final KStream<String, Long> wordCounts = textLines
>>>>>            .flatMapValues(value ->
>>>>> Arrays.asList(pattern.split(value.toLowerCase())))
>>>>>            .groupBy((key, word) -> word)
>>>>>            .count("Counts")
>>>>>            .toStream();
>>>>> 
>>>>> 
>>>>>      wordCounts.to(stringSerde, longSerde, "wordcount-output");
>>>>> 
>>>>>      final KafkaStreams streams = new KafkaStreams(builder,
>>>>> streamsConfiguration);
>>>>>      streams.cleanUp();
>>>>>      streams.start();
>>>>> 
>>>>>      Runtime.getRuntime().addShutdownHook(new
>>>> Thread(streams::close));  }
>>>>> }
>>>>> 
>>>> 
>>>> 
>>> 
>> 
>> 

Reply via email to