Maybe you need to reset your application using the reset tool:
http://docs.confluent.io/current/streams/developer-guide.html#application-reset-tool

Also keep in mind, that KTables buffer internally, and thus, you might
only see data on commit.

Try to reduce commit interval or disable caching by setting
"cache.max.bytes.buffering" to zero in your StreamsConfig.


-Matthias

On 3/13/17 12:29 PM, Mina Aslani wrote:
> Hi,
> 
> This is the first time that am using Kafka Stream. I would like to read
> from input topic and write to output topic. However, I do not see the word
> count when I try to run below example. Looks like that it does not connect
> to Kafka. I do not see any error though. I tried my localhost kafka as well
> as the container in a VM, same situation.
> 
> There are over 200 message in the input kafka topic.
> 
> Your input is appreciated!
> 
> Best regards,
> Mina
> 
> import org.apache.kafka.common.serialization.*;
> import org.apache.kafka.streams.*;
> import org.apache.kafka.streams.kstream.*;
> 
> import java.util.*;
> import java.util.regex.*;
> 
> public class WordCountExample {
> 
> 
>    public static void main(String [] args)   {
>       final Properties streamsConfiguration = new Properties();
>       streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
> "wordcount-streaming");
>       streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "<IPADDRESS>:9092");
>       streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> Serdes.String().getClass().getName());
>       streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> Serdes.String().getClass().getName());
>       streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
> 10 * 1000);
> 
>       final Serde<String> stringSerde = Serdes.String();
>       final Serde<Long> longSerde = Serdes.Long();
> 
>       final KStreamBuilder builder = new KStreamBuilder();
> 
>       final KStream<String, String> textLines =
> builder.stream(stringSerde, stringSerde, "wordcount-input");
> 
>       final Pattern pattern = Pattern.compile("\\W+",
> Pattern.UNICODE_CHARACTER_CLASS);
> 
>       final KStream<String, Long> wordCounts = textLines
>             .flatMapValues(value ->
> Arrays.asList(pattern.split(value.toLowerCase())))
>             .groupBy((key, word) -> word)
>             .count("Counts")
>             .toStream();
> 
> 
>       wordCounts.to(stringSerde, longSerde, "wordcount-output");
> 
>       final KafkaStreams streams = new KafkaStreams(builder,
> streamsConfiguration);
>       streams.cleanUp();
>       streams.start();
> 
>       Runtime.getRuntime().addShutdownHook(new Thread(streams::close));  }
> }
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to