Hi,
I am trying to use groupby operator in simple example. However, I get
strange results.
I have inputs on "test" topic like: (Long, String)
1 Message_1
1 Message_1
2 Message_2
3 Message_3
4 Message_4
I want to get counts of each value. So:
Message_1 2
Message_1 1
Message_2 1
Message_3 1
Message_4 1
Because there is not any operator like groupby (fieldIndex), I assume that
groupby works always on keys.
So, my program is:
KTable<Long, String> source = builder.table(longSerde, stringSerde,
"test");
KTable<String,Long> counts = source.groupBy(new KeyValueMapper<Long,
String, KeyValue<String, String>>() {
@Override
public KeyValue<String, String> apply(Long key, String value) {
// TODO Auto-generated method stub
return KeyValue.pair(value, value);
}
},Serdes.String(), Serdes.String()).count("count");
counts.print();;
And I get this output as a result:
Message_1 1
Message_1 0
Message_1 1
Message_1 0
Message_2 1
Message_2 0
Message_3 1
Message_3 0
Message_4 1
Message_4 0
I couldn't understand this behavior.
Cheers
Davood