Kurt Ostfeld created KAFKA-4612:
-----------------------------------

             Summary: Simple Kafka Streams app causes "ClassCastException: 
java.lang.String cannot be cast to [B"
                 Key: KAFKA-4612
                 URL: https://issues.apache.org/jira/browse/KAFKA-4612
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 0.10.1.1
         Environment: Virtual Machine using Debian 8 + Confluent Platform 3.1.1.
            Reporter: Kurt Ostfeld
         Attachments: KafkaIsolatedBug.tar.gz

I've attached a minimal single source file project that reliably reproduces 
this issue.

This project does the following:

1) Create test input data. Produces a single random (String,String) record into 
two diferent topics "topicInput" and "topicTable"

2) Creates and runs a Kafka Streams application:

    val kafkaTable: KTable[String, String] = builder.table(Serdes.String, 
Serdes.String, "topicTable", "topicTable")
    val incomingRecords: KStream[String, String] = 
builder.stream(Serdes.String, Serdes.String, "topicInput")
    val reKeyedRecords: KStream[String, String] = incomingRecords.selectKey((k, 
_) => k)
    val joinedRecords: KStream[String, String] = 
reKeyedRecords.leftJoin(kafkaTable, (s1: String, _: String) => s1)
    joinedRecords.to(Serdes.String, Serdes.String, "topicOutput")

This reliably generates the following error:

[error] (StreamThread-1) java.lang.ClassCastException: java.lang.String cannot 
be cast to [B
java.lang.ClassCastException: java.lang.String cannot be cast to [B
        at 
org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:18)
        at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:63)
        at 
org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:82)
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
        at 
org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44)
        at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
        at 
org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:43)
        at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
        at 
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66)
        at 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:180)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)

One caveat: I'm running this on a Confluent Platform 3.1.1 instance which uses 
Kafka 0.10.1.0 since there is no newer Confluent Platform available. The Kafka 
Streams project is built using "kafka-clients" and "kafka-streams" version 
0.10.1.1. If I use 0.10.1.0, I reliably hit bug 
https://issues.apache.org/jira/browse/KAFKA-4355. I am not sure if there is any 
issue using 0.10.1.1 libraries with a Confluent Platform running Kafka 
0.10.1.0. I will obviously try the next Confluent Platform binary when it is 
available.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to