Eduard Wirch created KAFKA-8905:
-----------------------------------

             Summary: Stream DSL: tasks should take serdes from upstream tasks
                 Key: KAFKA-8905
                 URL: https://issues.apache.org/jira/browse/KAFKA-8905
             Project: Kafka
          Issue Type: Improvement
          Components: streams
    Affects Versions: 2.3.0
            Reporter: Eduard Wirch


{code:java}
final Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);

final StreamsBuilder builder = new StreamsBuilder();

final KStream<String, String> source = builder.stream(
      "streams-plaintext-input",
      Consumed.with(Serdes.String(), Serdes.String())
);

final KTable<String, Long> counts = source
      .flatMapValues(value -> 
Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")))
      .groupBy(
            (key, value) -> value
      )
      .count();

// need to override value serde to Long type
counts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), 
Serdes.Long()));
final KafkaStreams streams = new KafkaStreams(builder.build(), props);{code}
Original code taken from code sample 
[https://github.com/apache/kafka/blob/2.3/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java]

I removed the {{DEFAULT_KEY_SERDE_CLASS_CONFIG}} and 
{{DEFAULT_VALUE_SERDE_CLASS_CONFIG}} settings to make my point clear. This 
application will fail:
{code:java}
Caused by: java.lang.ClassCastException: java.lang.String incompatible with 
[BCaused by: java.lang.ClassCastException: java.lang.String incompatible with 
[B at 
org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19)
 at 
org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) 
at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:161)
 at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:102)
 at 
org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89) 
{code}
Adjusting this part of the code:
{code:java}
.groupBy(
      (key, value) -> value,
      Grouped.with(Serdes.String(), Serdes.String())
) {code}
Will make the application run properly. 

This explicit serde specification is unnecessarily, since the serde are already 
known from upstream source task. Relying on default serde works in this simple 
example, but fails for more complex scenarios.

Please make the DSL more usable by taking the serde configuration from upstream 
tasks.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Reply via email to