[
https://issues.apache.org/jira/browse/KAFKA-8905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Eduard Wirch resolved KAFKA-8905.
---------------------------------
Resolution: Not A Problem
> Stream DSL: tasks should take serdes from upstream tasks
> --------------------------------------------------------
>
> Key: KAFKA-8905
> URL: https://issues.apache.org/jira/browse/KAFKA-8905
> Project: Kafka
> Issue Type: Improvement
> Components: streams
> Affects Versions: 2.3.0
> Reporter: Eduard Wirch
> Priority: Major
> Labels: usability
>
> {code:java}
> final Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
> props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
> final StreamsBuilder builder = new StreamsBuilder();
> final KStream<String, String> source = builder.stream(
> "streams-plaintext-input",
> Consumed.with(Serdes.String(), Serdes.String())
> );
> final KTable<String, Long> counts = source
> .flatMapValues(value ->
> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")))
> .groupBy(
> (key, value) -> value
> )
> .count();
> // need to override value serde to Long type
> counts.toStream().to("streams-wordcount-output",
> Produced.with(Serdes.String(), Serdes.Long()));
> final KafkaStreams streams = new KafkaStreams(builder.build(), props);{code}
> Original code taken from code sample
> [https://github.com/apache/kafka/blob/2.3/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java]
> I removed the {{DEFAULT_KEY_SERDE_CLASS_CONFIG}} and
> {{DEFAULT_VALUE_SERDE_CLASS_CONFIG}} settings to make my point clear. This
> application will fail:
> {code:java}
> Caused by: java.lang.ClassCastException: java.lang.String incompatible with
> [BCaused by: java.lang.ClassCastException: java.lang.String incompatible with
> [B at
> org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19)
> at
> org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
> at
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:161)
> at
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:102)
> at
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)
> {code}
> Adjusting this part of the code:
> {code:java}
> .groupBy(
> (key, value) -> value,
> Grouped.with(Serdes.String(), Serdes.String())
> ) {code}
> Will make the application run properly.
> This explicit serde specification is unnecessarily, since the serde are
> already known from upstream source task. Relying on default serde works in
> this simple example, but fails for more complex scenarios.
> Please make the DSL more usable by taking the serde configuration from
> upstream tasks.
--
This message was sent by Atlassian Jira
(v8.3.2#803003)