Thank you for your thorough explanation Michael. It helped a lot. Cheers Davood
On Thu, Jun 16, 2016 at 5:01 PM, Michael Noll <mich...@confluent.io> wrote: > Davood, > > you are reading the input topic into a KTable, which means that subsequent > records for the same key (such as the key `1`, which appears twice in the > input messages/records) will be considered as updates to any previous > records for that key. So I think what you actually want to do is read the > input as a KStream instead of a KTable? > > The following code works for me, it looks like what you're trying to do. > Note that I am reading the input data into a KStream, not a KTable. > > Input: > new KeyValue<>(1, "message1"), > new KeyValue<>(1, "message1"), > new KeyValue<>(2, "message2"), > new KeyValue<>(3, "message3"), > new KeyValue<>(4, "message4") > > Streams topology: > > KStream<Integer, String> input = builder.stream(Serdes.Integer(), > Serdes.String(), inputTopic); > KTable<String, Long> counted = input > .map((key, value) -> KeyValue.pair(value, value)) > .countByKey(Serdes.String(), "counted"); > counted.to(Serdes.String(), Serdes.Long(), outputTopic); > > Output: > new KeyValue<>("message1", 1L), > new KeyValue<>("message1", 2L), > new KeyValue<>("message2", 1L), > new KeyValue<>("message3", 1L), > new KeyValue<>("message4", 1L) > > Does that help? > Michael > > > > > On Thu, Jun 16, 2016 at 4:20 PM, Davood Rafiei <rafieidavo...@gmail.com> > wrote: > > > Hi, > > > > > > I am trying to use groupby operator in simple example. However, I get > > strange results. > > > > I have inputs on "test" topic like: (Long, String) > > 1 Message_1 > > 1 Message_1 > > 2 Message_2 > > 3 Message_3 > > 4 Message_4 > > > > I want to get counts of each value. So: > > Message_1 2 > > Message_1 1 > > Message_2 1 > > Message_3 1 > > Message_4 1 > > > > Because there is not any operator like groupby (fieldIndex), I assume > that > > groupby works always on keys. > > > > So, my program is: > > > > KTable<Long, String> source = builder.table(longSerde, stringSerde, > > "test"); > > KTable<String,Long> counts = source.groupBy(new > KeyValueMapper<Long, > > String, KeyValue<String, String>>() { > > > > @Override > > public KeyValue<String, String> apply(Long key, String value) { > > // TODO Auto-generated method stub > > return KeyValue.pair(value, value); > > } > > },Serdes.String(), Serdes.String()).count("count"); > > counts.print();; > > > > And I get this output as a result: > > > > Message_1 1 > > Message_1 0 > > Message_1 1 > > Message_1 0 > > Message_2 1 > > Message_2 0 > > Message_3 1 > > Message_3 0 > > Message_4 1 > > Message_4 0 > > > > I couldn't understand this behavior. > > > > > > Cheers > > Davood > > >