Hi,

I want to implement wordcount example with reduce function in KTable.
However, I get the error:

Exception in thread "StreamThread-1"
org.apache.kafka.common.errors.SerializationException: Size of data
received by LongDeserializer is not 8


Here is my code:


        KTable<Long, String> source = builder.table("topic1"); // here we
have WordID and Word itself

        KTable<String, Long> counts = source.reduce(new Reducer<Long>() {

            @Override
            public Long apply(Long value1, Long value2) {
                // TODO Auto-generated method stub
                return value1+value2;
            }
        },

        new Reducer<Long>() {

            @Override
            public Long apply(Long value1, Long value2) {
                // TODO Auto-generated method stub
                return value1-value2;
            }
        }

        , new KeyValueMapper<Long, String, KeyValue<String,Long>>() {

            @Override
            public KeyValue<String, Long> apply(Long key, String value) {
                // TODO Auto-generated method stub
                return new KeyValue<String, Long>(value, new Long(1));
            }
        }, stringSerde, longSerde, "count");

        counts.to(Serdes.String(), Serdes.Long(), "topic2");

        KafkaStreams streams = new KafkaStreams(builder, props);
        streams.start();


Moreover, I think the error messages should be more informative to better
deal with such situations.



- Adrienne

Reply via email to