Eduard Wirch created KAFKA-8905:
-----------------------------------
Summary: Stream DSL: tasks should take serdes from upstream tasks
Key: KAFKA-8905
URL: https://issues.apache.org/jira/browse/KAFKA-8905
Project: Kafka
Issue Type: Improvement
Components: streams
Affects Versions: 2.3.0
Reporter: Eduard Wirch
{code:java}
final Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> source = builder.stream(
"streams-plaintext-input",
Consumed.with(Serdes.String(), Serdes.String())
);
final KTable<String, Long> counts = source
.flatMapValues(value ->
Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")))
.groupBy(
(key, value) -> value
)
.count();
// need to override value serde to Long type
counts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(),
Serdes.Long()));
final KafkaStreams streams = new KafkaStreams(builder.build(), props);{code}
Original code taken from code sample
[https://github.com/apache/kafka/blob/2.3/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java]
I removed the {{DEFAULT_KEY_SERDE_CLASS_CONFIG}} and
{{DEFAULT_VALUE_SERDE_CLASS_CONFIG}} settings to make my point clear. This
application will fail:
{code:java}
Caused by: java.lang.ClassCastException: java.lang.String incompatible with
[BCaused by: java.lang.ClassCastException: java.lang.String incompatible with
[B at
org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19)
at
org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
at
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:161)
at
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:102)
at
org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)
{code}
Adjusting this part of the code:
{code:java}
.groupBy(
(key, value) -> value,
Grouped.with(Serdes.String(), Serdes.String())
) {code}
Will make the application run properly.
This explicit serde specification is unnecessarily, since the serde are already
known from upstream source task. Relying on default serde works in this simple
example, but fails for more complex scenarios.
Please make the DSL more usable by taking the serde configuration from upstream
tasks.
--
This message was sent by Atlassian Jira
(v8.3.2#803003)