I reset and still not working!

My env is setup using
http://docs.confluent.io/3.2.0/cp-docker-images/docs/quickstart.html

I just tried using
https://github.com/confluentinc/examples/blob/3.2.x/kafka-streams/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java#L178-L181
with all the topics(e.g. TextLinesTopic and WordsWithCountsTopic) created
from scratch as went through the steps as directed.

When I stopped the java program and check the topics below are the data in
each topic.

docker run \

  --net=host \

  --rm \

  confluentinc/cp-kafka:3.2.0 \

  kafka-console-consumer --bootstrap-server localhost:29092 --topic
TextLinesTopic --new-consumer --from-beginning


SHOWS

hello kafka streams

all streams lead to kafka

join kafka summit

test1

test2

test3

test4

FOR WordsWithCountsTopic nothing is shown


I am new to the Kafka/Kafka Stream and still do not understand why a simple
example does not work!

On Tue, Mar 14, 2017 at 1:03 PM, Matthias J. Sax <matth...@confluent.io>
wrote:

> >> So, when I check the number of messages in wordCount-input I see the
> same
> >> messages. However, when I run below code I do not see any message/data
> in
> >> wordCount-output.
>
> Did you reset your application?
>
> Each time you run you app and restart it, it will resume processing
> where it left off. Thus, if something went wrong in you first run but
> you got committed offsets, the app will not re-read the whole topic.
>
> You can check committed offset via bin/kafka-consumer-groups.sh. The
> application-id from StreamConfig is used a group.id.
>
> Thus, resetting you app would be required to consumer the input topic
> from scratch. Of you just write new data to you input topic.
>
> >> Can I connect to kafka in VM/docker container using below code or do I
> need
> >> to change/add other parameters? How can I submit the code to
> >> kafka/kafka-connect? Do we have similar concept as SPARK to submit the
> >> code(e.g. jar file)?
>
> A Streams app is a regular Java application and can run anywhere --
> there is no notion of a processing cluster and you don't "submit" your
> code -- you just run your app.
>
> Thus, if your console consumer can connect to the cluster, your Streams
> app should also be able to connect to the cluster.
>
>
> Maybe, the short runtime of 5 seconds could be a problem (even if it
> seems log to process just a few records). But you might need to put
> startup delay into account. I would recommend to register a shutdown
> hook: see
> https://github.com/confluentinc/examples/blob/3.
> 2.x/kafka-streams/src/main/java/io/confluent/examples/streams/
> WordCountLambdaExample.java#L178-L181
>
>
> Hope this helps.
>
> -Matthias
>
>
> On 3/13/17 7:30 PM, Mina Aslani wrote:
> > Hi Matthias,
> >
> > Thank you for the quick response, appreciate it!
> >
> > I created the topics wordCount-input and wordCount-output. Pushed some
> data
> > to wordCount-input using
> >
> > docker exec -it $(docker ps -f "name=kafka\\." --format "{{.Names}}")
> > /bin/kafka-console-producer --broker-list localhost:9092 --topic
> > wordCount-input
> >
> > test
> >
> > new
> >
> > word
> >
> > count
> >
> > wordcount
> >
> > word count
> >
> > So, when I check the number of messages in wordCount-input I see the same
> > messages. However, when I run below code I do not see any message/data in
> > wordCount-output.
> >
> > Can I connect to kafka in VM/docker container using below code or do I
> need
> > to change/add other parameters? How can I submit the code to
> > kafka/kafka-connect? Do we have similar concept as SPARK to submit the
> > code(e.g. jar file)?
> >
> > I really appreciate your input as I am blocked and cannot run even below
> > simple example.
> >
> > Best regards,
> > Mina
> >
> > I changed the code to be as below:
> >
> > Properties props = new Properties();
> > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordCount-streaming");
> > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "<ipAddress>:9092");
> > props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > Serdes.String().getClass().getName());
> > props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> > Serdes.String().getClass().getName());
> >
> > props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10);
> > props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
> >
> > // setting offset reset to earliest so that we can re-run the demo
> > code with the same pre-loaded data
> > // Note: To re-run the demo, you need to use the offset reset tool:
> > // https://cwiki.apache.org/confluence/display/KAFKA/
> Kafka+Streams+Application+Reset+Tool
> > props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> >
> > KStreamBuilder builder = new KStreamBuilder();
> >
> > KStream<String, String> source = builder.stream("wordCount-input");
> >
> > KTable<String, Long> counts = source
> >       .flatMapValues(new ValueMapper<String, Iterable<String>>() {
> >          @Override
> >          public Iterable<String> apply(String value) {
> >             return
> > Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));
> >          }
> >       }).map(new KeyValueMapper<String, String, KeyValue<String,
> String>>() {
> >          @Override
> >          public KeyValue<String, String> apply(String key, String value)
> {
> >             return new KeyValue<>(value, value);
> >          }
> >       })
> >       .groupByKey()
> >       .count("Counts");
> >
> > // need to override value serde to Long type
> > counts.to(Serdes.String(), Serdes.Long(), "wordCount-output");
> >
> > KafkaStreams streams = new KafkaStreams(builder, props);
> > streams.start();
> >
> > // usually the stream application would be running forever,
> > // in this example we just let it run for some time and stop since the
> > input data is finite.
> > Thread.sleep(5000L);
> >
> > streams.close();
> >
> >
> >
> >
> > On Mon, Mar 13, 2017 at 4:09 PM, Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> >> Maybe you need to reset your application using the reset tool:
> >> http://docs.confluent.io/current/streams/developer-
> >> guide.html#application-reset-tool
> >>
> >> Also keep in mind, that KTables buffer internally, and thus, you might
> >> only see data on commit.
> >>
> >> Try to reduce commit interval or disable caching by setting
> >> "cache.max.bytes.buffering" to zero in your StreamsConfig.
> >>
> >>
> >> -Matthias
> >>
> >> On 3/13/17 12:29 PM, Mina Aslani wrote:
> >>> Hi,
> >>>
> >>> This is the first time that am using Kafka Stream. I would like to read
> >>> from input topic and write to output topic. However, I do not see the
> >> word
> >>> count when I try to run below example. Looks like that it does not
> >> connect
> >>> to Kafka. I do not see any error though. I tried my localhost kafka as
> >> well
> >>> as the container in a VM, same situation.
> >>>
> >>> There are over 200 message in the input kafka topic.
> >>>
> >>> Your input is appreciated!
> >>>
> >>> Best regards,
> >>> Mina
> >>>
> >>> import org.apache.kafka.common.serialization.*;
> >>> import org.apache.kafka.streams.*;
> >>> import org.apache.kafka.streams.kstream.*;
> >>>
> >>> import java.util.*;
> >>> import java.util.regex.*;
> >>>
> >>> public class WordCountExample {
> >>>
> >>>
> >>>    public static void main(String [] args)   {
> >>>       final Properties streamsConfiguration = new Properties();
> >>>       streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
> >>> "wordcount-streaming");
> >>>       streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> >>> "<IPADDRESS>:9092");
> >>>       streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> >>> Serdes.String().getClass().getName());
> >>>       streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> >>> Serdes.String().getClass().getName());
> >>>       streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_
> MS_CONFIG,
> >>> 10 * 1000);
> >>>
> >>>       final Serde<String> stringSerde = Serdes.String();
> >>>       final Serde<Long> longSerde = Serdes.Long();
> >>>
> >>>       final KStreamBuilder builder = new KStreamBuilder();
> >>>
> >>>       final KStream<String, String> textLines =
> >>> builder.stream(stringSerde, stringSerde, "wordcount-input");
> >>>
> >>>       final Pattern pattern = Pattern.compile("\\W+",
> >>> Pattern.UNICODE_CHARACTER_CLASS);
> >>>
> >>>       final KStream<String, Long> wordCounts = textLines
> >>>             .flatMapValues(value ->
> >>> Arrays.asList(pattern.split(value.toLowerCase())))
> >>>             .groupBy((key, word) -> word)
> >>>             .count("Counts")
> >>>             .toStream();
> >>>
> >>>
> >>>       wordCounts.to(stringSerde, longSerde, "wordcount-output");
> >>>
> >>>       final KafkaStreams streams = new KafkaStreams(builder,
> >>> streamsConfiguration);
> >>>       streams.cleanUp();
> >>>       streams.start();
> >>>
> >>>       Runtime.getRuntime().addShutdownHook(new
> >> Thread(streams::close));  }
> >>> }
> >>>
> >>
> >>
> >
>
>

Reply via email to