Davood, you are reading the input topic into a KTable, which means that subsequent records for the same key (such as the key `1`, which appears twice in the input messages/records) will be considered as updates to any previous records for that key. So I think what you actually want to do is read the input as a KStream instead of a KTable?
The following code works for me, it looks like what you're trying to do. Note that I am reading the input data into a KStream, not a KTable. Input: new KeyValue<>(1, "message1"), new KeyValue<>(1, "message1"), new KeyValue<>(2, "message2"), new KeyValue<>(3, "message3"), new KeyValue<>(4, "message4") Streams topology: KStream<Integer, String> input = builder.stream(Serdes.Integer(), Serdes.String(), inputTopic); KTable<String, Long> counted = input .map((key, value) -> KeyValue.pair(value, value)) .countByKey(Serdes.String(), "counted"); counted.to(Serdes.String(), Serdes.Long(), outputTopic); Output: new KeyValue<>("message1", 1L), new KeyValue<>("message1", 2L), new KeyValue<>("message2", 1L), new KeyValue<>("message3", 1L), new KeyValue<>("message4", 1L) Does that help? Michael On Thu, Jun 16, 2016 at 4:20 PM, Davood Rafiei <rafieidavo...@gmail.com> wrote: > Hi, > > > I am trying to use groupby operator in simple example. However, I get > strange results. > > I have inputs on "test" topic like: (Long, String) > 1 Message_1 > 1 Message_1 > 2 Message_2 > 3 Message_3 > 4 Message_4 > > I want to get counts of each value. So: > Message_1 2 > Message_1 1 > Message_2 1 > Message_3 1 > Message_4 1 > > Because there is not any operator like groupby (fieldIndex), I assume that > groupby works always on keys. > > So, my program is: > > KTable<Long, String> source = builder.table(longSerde, stringSerde, > "test"); > KTable<String,Long> counts = source.groupBy(new KeyValueMapper<Long, > String, KeyValue<String, String>>() { > > @Override > public KeyValue<String, String> apply(Long key, String value) { > // TODO Auto-generated method stub > return KeyValue.pair(value, value); > } > },Serdes.String(), Serdes.String()).count("count"); > counts.print();; > > And I get this output as a result: > > Message_1 1 > Message_1 0 > Message_1 1 > Message_1 0 > Message_2 1 > Message_2 0 > Message_3 1 > Message_3 0 > Message_4 1 > Message_4 0 > > I couldn't understand this behavior. > > > Cheers > Davood >