Hi, I want to implement wordcount example with reduce function in KTable. However, I get the error:
Exception in thread "StreamThread-1" org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8 Here is my code: KTable<Long, String> source = builder.table("topic1"); // here we have WordID and Word itself KTable<String, Long> counts = source.reduce(new Reducer<Long>() { @Override public Long apply(Long value1, Long value2) { // TODO Auto-generated method stub return value1+value2; } }, new Reducer<Long>() { @Override public Long apply(Long value1, Long value2) { // TODO Auto-generated method stub return value1-value2; } } , new KeyValueMapper<Long, String, KeyValue<String,Long>>() { @Override public KeyValue<String, Long> apply(Long key, String value) { // TODO Auto-generated method stub return new KeyValue<String, Long>(value, new Long(1)); } }, stringSerde, longSerde, "count"); counts.to(Serdes.String(), Serdes.Long(), "topic2"); KafkaStreams streams = new KafkaStreams(builder, props); streams.start(); Moreover, I think the error messages should be more informative to better deal with such situations. - Adrienne