Davood,

you are reading the input topic into a KTable, which means that subsequent
records for the same key (such as the key `1`, which appears twice in the
input messages/records) will be considered as updates to any previous
records for that key.  So I think what you actually want to do is read the
input as a KStream instead of a KTable?

The following code works for me, it looks like what you're trying to do.
Note that I am reading the input data into a KStream, not a KTable.

Input:
  new KeyValue<>(1, "message1"),
  new KeyValue<>(1, "message1"),
  new KeyValue<>(2, "message2"),
  new KeyValue<>(3, "message3"),
  new KeyValue<>(4, "message4")

Streams topology:

  KStream<Integer, String> input = builder.stream(Serdes.Integer(),
Serdes.String(), inputTopic);
  KTable<String, Long> counted = input
      .map((key, value) -> KeyValue.pair(value, value))
      .countByKey(Serdes.String(), "counted");
  counted.to(Serdes.String(), Serdes.Long(), outputTopic);

Output:
  new KeyValue<>("message1", 1L),
  new KeyValue<>("message1", 2L),
  new KeyValue<>("message2", 1L),
  new KeyValue<>("message3", 1L),
  new KeyValue<>("message4", 1L)

Does that help?
Michael




On Thu, Jun 16, 2016 at 4:20 PM, Davood Rafiei <rafieidavo...@gmail.com>
wrote:

> Hi,
>
>
> I am trying to use groupby operator in simple example. However, I get
> strange results.
>
> I have inputs  on "test" topic like: (Long, String)
> 1    Message_1
> 1    Message_1
> 2    Message_2
> 3    Message_3
> 4    Message_4
>
> I want to get counts of each value. So:
> Message_1 2
> Message_1 1
> Message_2 1
> Message_3 1
> Message_4 1
>
> Because there is not any operator like groupby (fieldIndex), I assume that
> groupby works always on keys.
>
> So, my program is:
>
>       KTable<Long, String> source = builder.table(longSerde, stringSerde,
> "test");
>       KTable<String,Long> counts =  source.groupBy(new KeyValueMapper<Long,
> String, KeyValue<String, String>>() {
>
>         @Override
>         public KeyValue<String, String> apply(Long key, String value) {
>             // TODO Auto-generated method stub
>              return  KeyValue.pair(value, value);
>         }
>     },Serdes.String(), Serdes.String()).count("count");
>       counts.print();;
>
> And I get this output as a result:
>
> Message_1    1
> Message_1    0
> Message_1    1
> Message_1    0
> Message_2    1
> Message_2    0
> Message_3    1
> Message_3    0
> Message_4    1
> Message_4    0
>
> I couldn't  understand this behavior.
>
>
> Cheers
> Davood
>

Reply via email to