Can you show the full stack trace?

How do you ingest the date into the topic? I also think, you should read
the topic as KStream (instead of KTable).

What de-/serializer do you specify in props. (see
http://docs.confluent.io/3.0.0/streams/developer-guide.html#streams-developer-guide-serdes)


-Matthias

On 06/19/2016 03:06 PM, Adrienne Kole wrote:
> Hi,
> 
> I want to implement wordcount example with reduce function in KTable.
> However, I get the error:
> 
> Exception in thread "StreamThread-1"
> org.apache.kafka.common.errors.SerializationException: Size of data
> received by LongDeserializer is not 8
> 
> 
> Here is my code:
> 
> 
>         KTable<Long, String> source = builder.table("topic1"); // here we
> have WordID and Word itself
> 
>         KTable<String, Long> counts = source.reduce(new Reducer<Long>() {
> 
>             @Override
>             public Long apply(Long value1, Long value2) {
>                 // TODO Auto-generated method stub
>                 return value1+value2;
>             }
>         },
> 
>         new Reducer<Long>() {
> 
>             @Override
>             public Long apply(Long value1, Long value2) {
>                 // TODO Auto-generated method stub
>                 return value1-value2;
>             }
>         }
> 
>         , new KeyValueMapper<Long, String, KeyValue<String,Long>>() {
> 
>             @Override
>             public KeyValue<String, Long> apply(Long key, String value) {
>                 // TODO Auto-generated method stub
>                 return new KeyValue<String, Long>(value, new Long(1));
>             }
>         }, stringSerde, longSerde, "count");
> 
>         counts.to(Serdes.String(), Serdes.Long(), "topic2");
> 
>         KafkaStreams streams = new KafkaStreams(builder, props);
>         streams.start();
> 
> 
> Moreover, I think the error messages should be more informative to better
> deal with such situations.
> 
> 
> 
> - Adrienne
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to