Michael Viamari created KAFKA-9222:
--------------------------------------
Summary: StreamPartitioner for internal repartition topics does
not match defaults for to() operation
Key: KAFKA-9222
URL: https://issues.apache.org/jira/browse/KAFKA-9222
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 2.3.1
Reporter: Michael Viamari
When a KStream has a Windowed key, different StreamPartitions are selected
depending on how the stream sink is generated.
When using `KStream#to()`, the topology uses a `StreamSinkNode`, which chooses
a `WindowedStreamPartitioner` when no partitioner is provided when creating a
`SinkNode` for the topology.
{code:java}
KTable<> aggResult = inputStream.groupByKey().windowed(...).aggregate(...);
aggResult.toStream().to(aggStreamTopic)
{code}
When an internal repartition is created before a stateful operation, an
`OptimizableRepartitionNode` is used, which results in a `SinkNode` being added
to the topology. This node is created with a null partitioner, which then would
always use the Producer default partitioner. This becomes an issue when
attempting to join a windowed stream/ktable with a stream that was mapped into
a windowed key.
{code:java}
KTable<> windowedAgg = inputStream.groupByKey().windowed(...).aggregate(...);
windowedAgg.toStream().to(aggStreamTopic);
KStream<> windowedStream = inputStream.map((k, v) -> {
Map<Long, TimeWindow> w = windows.windowsFor(v.getTimestamp());
Window minW = getMinWindow(w.values());
return KeyValue.pair(new Windowed<>(k, minW), v);
});
windowedStream.leftJoin(windowedAgg, ....);
{code}
The only work around I've found is to either use the default partitioner for
the `KStream#to()` operation, or to use `KStream.through()` for the repartition
operation.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)