Kurt Ostfeld created KAFKA-4612: ----------------------------------- Summary: Simple Kafka Streams app causes "ClassCastException: java.lang.String cannot be cast to [B" Key: KAFKA-4612 URL: https://issues.apache.org/jira/browse/KAFKA-4612 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 0.10.1.1 Environment: Virtual Machine using Debian 8 + Confluent Platform 3.1.1. Reporter: Kurt Ostfeld Attachments: KafkaIsolatedBug.tar.gz
I've attached a minimal single source file project that reliably reproduces this issue. This project does the following: 1) Create test input data. Produces a single random (String,String) record into two diferent topics "topicInput" and "topicTable" 2) Creates and runs a Kafka Streams application: val kafkaTable: KTable[String, String] = builder.table(Serdes.String, Serdes.String, "topicTable", "topicTable") val incomingRecords: KStream[String, String] = builder.stream(Serdes.String, Serdes.String, "topicInput") val reKeyedRecords: KStream[String, String] = incomingRecords.selectKey((k, _) => k) val joinedRecords: KStream[String, String] = reKeyedRecords.leftJoin(kafkaTable, (s1: String, _: String) => s1) joinedRecords.to(Serdes.String, Serdes.String, "topicOutput") This reliably generates the following error: [error] (StreamThread-1) java.lang.ClassCastException: java.lang.String cannot be cast to [B java.lang.ClassCastException: java.lang.String cannot be cast to [B at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:18) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:63) at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:82) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202) at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202) at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:43) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202) at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66) at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:180) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242) One caveat: I'm running this on a Confluent Platform 3.1.1 instance which uses Kafka 0.10.1.0 since there is no newer Confluent Platform available. The Kafka Streams project is built using "kafka-clients" and "kafka-streams" version 0.10.1.1. If I use 0.10.1.0, I reliably hit bug https://issues.apache.org/jira/browse/KAFKA-4355. I am not sure if there is any issue using 0.10.1.1 libraries with a Confluent Platform running Kafka 0.10.1.0. I will obviously try the next Confluent Platform binary when it is available. -- This message was sent by Atlassian JIRA (v6.3.4#6332)