Thank you for your thorough explanation Michael. It helped a lot.

Cheers
Davood

On Thu, Jun 16, 2016 at 5:01 PM, Michael Noll <mich...@confluent.io> wrote:

> Davood,
>
> you are reading the input topic into a KTable, which means that subsequent
> records for the same key (such as the key `1`, which appears twice in the
> input messages/records) will be considered as updates to any previous
> records for that key.  So I think what you actually want to do is read the
> input as a KStream instead of a KTable?
>
> The following code works for me, it looks like what you're trying to do.
> Note that I am reading the input data into a KStream, not a KTable.
>
> Input:
>   new KeyValue<>(1, "message1"),
>   new KeyValue<>(1, "message1"),
>   new KeyValue<>(2, "message2"),
>   new KeyValue<>(3, "message3"),
>   new KeyValue<>(4, "message4")
>
> Streams topology:
>
>   KStream<Integer, String> input = builder.stream(Serdes.Integer(),
> Serdes.String(), inputTopic);
>   KTable<String, Long> counted = input
>       .map((key, value) -> KeyValue.pair(value, value))
>       .countByKey(Serdes.String(), "counted");
>   counted.to(Serdes.String(), Serdes.Long(), outputTopic);
>
> Output:
>   new KeyValue<>("message1", 1L),
>   new KeyValue<>("message1", 2L),
>   new KeyValue<>("message2", 1L),
>   new KeyValue<>("message3", 1L),
>   new KeyValue<>("message4", 1L)
>
> Does that help?
> Michael
>
>
>
>
> On Thu, Jun 16, 2016 at 4:20 PM, Davood Rafiei <rafieidavo...@gmail.com>
> wrote:
>
> > Hi,
> >
> >
> > I am trying to use groupby operator in simple example. However, I get
> > strange results.
> >
> > I have inputs  on "test" topic like: (Long, String)
> > 1    Message_1
> > 1    Message_1
> > 2    Message_2
> > 3    Message_3
> > 4    Message_4
> >
> > I want to get counts of each value. So:
> > Message_1 2
> > Message_1 1
> > Message_2 1
> > Message_3 1
> > Message_4 1
> >
> > Because there is not any operator like groupby (fieldIndex), I assume
> that
> > groupby works always on keys.
> >
> > So, my program is:
> >
> >       KTable<Long, String> source = builder.table(longSerde, stringSerde,
> > "test");
> >       KTable<String,Long> counts =  source.groupBy(new
> KeyValueMapper<Long,
> > String, KeyValue<String, String>>() {
> >
> >         @Override
> >         public KeyValue<String, String> apply(Long key, String value) {
> >             // TODO Auto-generated method stub
> >              return  KeyValue.pair(value, value);
> >         }
> >     },Serdes.String(), Serdes.String()).count("count");
> >       counts.print();;
> >
> > And I get this output as a result:
> >
> > Message_1    1
> > Message_1    0
> > Message_1    1
> > Message_1    0
> > Message_2    1
> > Message_2    0
> > Message_3    1
> > Message_3    0
> > Message_4    1
> > Message_4    0
> >
> > I couldn't  understand this behavior.
> >
> >
> > Cheers
> > Davood
> >
>

Reply via email to