Davood,
you are reading the input topic into a KTable, which means that subsequent
records for the same key (such as the key `1`, which appears twice in the
input messages/records) will be considered as updates to any previous
records for that key. So I think what you actually want to do is read the
input as a KStream instead of a KTable?
The following code works for me, it looks like what you're trying to do.
Note that I am reading the input data into a KStream, not a KTable.
Input:
new KeyValue<>(1, "message1"),
new KeyValue<>(1, "message1"),
new KeyValue<>(2, "message2"),
new KeyValue<>(3, "message3"),
new KeyValue<>(4, "message4")
Streams topology:
KStream<Integer, String> input = builder.stream(Serdes.Integer(),
Serdes.String(), inputTopic);
KTable<String, Long> counted = input
.map((key, value) -> KeyValue.pair(value, value))
.countByKey(Serdes.String(), "counted");
counted.to(Serdes.String(), Serdes.Long(), outputTopic);
Output:
new KeyValue<>("message1", 1L),
new KeyValue<>("message1", 2L),
new KeyValue<>("message2", 1L),
new KeyValue<>("message3", 1L),
new KeyValue<>("message4", 1L)
Does that help?
Michael
On Thu, Jun 16, 2016 at 4:20 PM, Davood Rafiei <[email protected]>
wrote:
> Hi,
>
>
> I am trying to use groupby operator in simple example. However, I get
> strange results.
>
> I have inputs on "test" topic like: (Long, String)
> 1 Message_1
> 1 Message_1
> 2 Message_2
> 3 Message_3
> 4 Message_4
>
> I want to get counts of each value. So:
> Message_1 2
> Message_1 1
> Message_2 1
> Message_3 1
> Message_4 1
>
> Because there is not any operator like groupby (fieldIndex), I assume that
> groupby works always on keys.
>
> So, my program is:
>
> KTable<Long, String> source = builder.table(longSerde, stringSerde,
> "test");
> KTable<String,Long> counts = source.groupBy(new KeyValueMapper<Long,
> String, KeyValue<String, String>>() {
>
> @Override
> public KeyValue<String, String> apply(Long key, String value) {
> // TODO Auto-generated method stub
> return KeyValue.pair(value, value);
> }
> },Serdes.String(), Serdes.String()).count("count");
> counts.print();;
>
> And I get this output as a result:
>
> Message_1 1
> Message_1 0
> Message_1 1
> Message_1 0
> Message_2 1
> Message_2 0
> Message_3 1
> Message_3 0
> Message_4 1
> Message_4 0
>
> I couldn't understand this behavior.
>
>
> Cheers
> Davood
>