Hi Matthias,

Thank you for the quick response, appreciate it!

I created the topics wordCount-input and wordCount-output. Pushed some data
to wordCount-input using

docker exec -it $(docker ps -f "name=kafka\\." --format "{{.Names}}")
/bin/kafka-console-producer --broker-list localhost:9092 --topic
wordCount-input

test

new

word

count

wordcount

word count

So, when I check the number of messages in wordCount-input I see the same
messages. However, when I run below code I do not see any message/data in
wordCount-output.

Can I connect to kafka in VM/docker container using below code or do I need
to change/add other parameters? How can I submit the code to
kafka/kafka-connect? Do we have similar concept as SPARK to submit the
code(e.g. jar file)?

I really appreciate your input as I am blocked and cannot run even below
simple example.

Best regards,
Mina

I changed the code to be as below:

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordCount-streaming");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "<ipAddress>:9092");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());

props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);

// setting offset reset to earliest so that we can re-run the demo
code with the same pre-loaded data
// Note: To re-run the demo, you need to use the offset reset tool:
// 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

KStreamBuilder builder = new KStreamBuilder();

KStream<String, String> source = builder.stream("wordCount-input");

KTable<String, Long> counts = source
      .flatMapValues(new ValueMapper<String, Iterable<String>>() {
         @Override
         public Iterable<String> apply(String value) {
            return
Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));
         }
      }).map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
         @Override
         public KeyValue<String, String> apply(String key, String value) {
            return new KeyValue<>(value, value);
         }
      })
      .groupByKey()
      .count("Counts");

// need to override value serde to Long type
counts.to(Serdes.String(), Serdes.Long(), "wordCount-output");

KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();

// usually the stream application would be running forever,
// in this example we just let it run for some time and stop since the
input data is finite.
Thread.sleep(5000L);

streams.close();




On Mon, Mar 13, 2017 at 4:09 PM, Matthias J. Sax <matth...@confluent.io>
wrote:

> Maybe you need to reset your application using the reset tool:
> http://docs.confluent.io/current/streams/developer-
> guide.html#application-reset-tool
>
> Also keep in mind, that KTables buffer internally, and thus, you might
> only see data on commit.
>
> Try to reduce commit interval or disable caching by setting
> "cache.max.bytes.buffering" to zero in your StreamsConfig.
>
>
> -Matthias
>
> On 3/13/17 12:29 PM, Mina Aslani wrote:
> > Hi,
> >
> > This is the first time that am using Kafka Stream. I would like to read
> > from input topic and write to output topic. However, I do not see the
> word
> > count when I try to run below example. Looks like that it does not
> connect
> > to Kafka. I do not see any error though. I tried my localhost kafka as
> well
> > as the container in a VM, same situation.
> >
> > There are over 200 message in the input kafka topic.
> >
> > Your input is appreciated!
> >
> > Best regards,
> > Mina
> >
> > import org.apache.kafka.common.serialization.*;
> > import org.apache.kafka.streams.*;
> > import org.apache.kafka.streams.kstream.*;
> >
> > import java.util.*;
> > import java.util.regex.*;
> >
> > public class WordCountExample {
> >
> >
> >    public static void main(String [] args)   {
> >       final Properties streamsConfiguration = new Properties();
> >       streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
> > "wordcount-streaming");
> >       streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> > "<IPADDRESS>:9092");
> >       streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > Serdes.String().getClass().getName());
> >       streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> > Serdes.String().getClass().getName());
> >       streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
> > 10 * 1000);
> >
> >       final Serde<String> stringSerde = Serdes.String();
> >       final Serde<Long> longSerde = Serdes.Long();
> >
> >       final KStreamBuilder builder = new KStreamBuilder();
> >
> >       final KStream<String, String> textLines =
> > builder.stream(stringSerde, stringSerde, "wordcount-input");
> >
> >       final Pattern pattern = Pattern.compile("\\W+",
> > Pattern.UNICODE_CHARACTER_CLASS);
> >
> >       final KStream<String, Long> wordCounts = textLines
> >             .flatMapValues(value ->
> > Arrays.asList(pattern.split(value.toLowerCase())))
> >             .groupBy((key, word) -> word)
> >             .count("Counts")
> >             .toStream();
> >
> >
> >       wordCounts.to(stringSerde, longSerde, "wordcount-output");
> >
> >       final KafkaStreams streams = new KafkaStreams(builder,
> > streamsConfiguration);
> >       streams.cleanUp();
> >       streams.start();
> >
> >       Runtime.getRuntime().addShutdownHook(new
> Thread(streams::close));  }
> > }
> >
>
>

Reply via email to