[
https://issues.apache.org/jira/browse/KAFKA-20264?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
appchemist updated KAFKA-20264:
-------------------------------
Description:
When the MERGE_REPARTITION_TOPICS optimization is enabled, merging two streams
where only one branch contains a key-changing operation may fail to consolidate
repartition topics.
The result depends on the order in which parent branches are traversed, if the
branch with the key-changing operation is searched first and a subsequent
branch without one returns null, the earlier result is overwritten.
*Steps to Reproduce*
{code:java}
// fail to optimize
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
final KStream<String, String> left =
builder.stream(Collections.singleton("topic-1"), consumed)
.selectKey((k, v) -> v)
.filter((k, v) -> v != null);
final KStream<String, String> right =
builder.stream(Collections.singleton("topic-2"), consumed);
final KStream<String, String> merged = left.merge(right);
final KGroupedStream<String, String> grouped = merged.groupByKey();
grouped.count(Materialized.as("count-store"));
grouped.aggregate(
() -> null,
(k, v, agg) -> k, Materialized.as("latest-store"));
// success to optimize
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
final KStream<String, String> left =
builder.stream(Collections.singleton("topic-1"), consumed);
final KStream<String, String> right =
builder.stream(Collections.singleton("topic-2"), consumed)
.selectKey((k, v) -> v)
.filter((k, v) -> v != null);
final KStream<String, String> merged = left.merge(right);
final KGroupedStream<String, String> grouped = merged.groupByKey();
grouped.count(Materialized.as("count-store"));
grouped.aggregate(
() -> null,
(k, v, agg) -> k, Materialized.as("latest-store"));{code}
*Expected Behavior*
With MERGE_REPARTITION_TOPICS enabled, count() and aggregate() should share a
single repartition topic — only 1 repartition topic should be created.
*Actual Behavior*
Duplicate repartition topics are created. The optimization fails to recognize
the relationship between the merge node and the key-changing node.
*Proposed PR*
[https://github.com/apache/kafka/pull/21540]
was:
When the MERGE_REPARTITION_TOPICS optimization is enabled, merging two streams
where only one branch contains a key-changing operation may fail to consolidate
repartition topics.
The result depends on the order in which parent branches are traversed, if the
branch with the key-changing operation is searched first and a subsequent
branch without one returns null, the earlier result is overwritten.
*Steps to Reproduce*
{code:java}
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
final KStream<String, String> left =
builder.stream(Collections.singleton("topic-1"), consumed)
.selectKey((k, v) -> v)
.filter((k, v) -> v != null);
final KStream<String, String> right =
builder.stream(Collections.singleton("topic-2"), consumed);
final KStream<String, String> merged = left.merge(right);
final KGroupedStream<String, String> grouped = merged.groupByKey();
grouped.count(Materialized.as("count-store"));
grouped.aggregate(
() -> null,
(k, v, agg) -> k, Materialized.as("latest-store"));
{code}
*Repartition*
*Expected Behavior*
With MERGE_REPARTITION_TOPICS enabled, count() and aggregate() should share a
single repartition topic — only 1 repartition topic should be created.
*Actual Behavior*
Duplicate repartition topics are created. The optimization fails to recognize
the relationship between the merge node and the key-changing node.
*Proposed PR*
[https://github.com/apache/kafka/pull/21540]
> `MERGE_REPARTITION_TOPICS` optimization fails depending on which branch of a
> merged stream contains the key-changing operation
> ------------------------------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-20264
> URL: https://issues.apache.org/jira/browse/KAFKA-20264
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Reporter: appchemist
> Priority: Minor
>
> When the MERGE_REPARTITION_TOPICS optimization is enabled, merging two
> streams where only one branch contains a key-changing operation may fail to
> consolidate repartition topics.
> The result depends on the order in which parent branches are traversed, if
> the branch with the key-changing operation is searched first and a subsequent
> branch without one returns null, the earlier result is overwritten.
> *Steps to Reproduce*
> {code:java}
> // fail to optimize
> props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
> final KStream<String, String> left =
> builder.stream(Collections.singleton("topic-1"), consumed)
> .selectKey((k, v) -> v)
> .filter((k, v) -> v != null);
> final KStream<String, String> right =
> builder.stream(Collections.singleton("topic-2"), consumed);
> final KStream<String, String> merged = left.merge(right);
> final KGroupedStream<String, String> grouped = merged.groupByKey();
> grouped.count(Materialized.as("count-store"));
> grouped.aggregate(
> () -> null,
> (k, v, agg) -> k, Materialized.as("latest-store"));
> // success to optimize
> props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
> final KStream<String, String> left =
> builder.stream(Collections.singleton("topic-1"), consumed);
> final KStream<String, String> right =
> builder.stream(Collections.singleton("topic-2"), consumed)
> .selectKey((k, v) -> v)
> .filter((k, v) -> v != null);
> final KStream<String, String> merged = left.merge(right);
> final KGroupedStream<String, String> grouped = merged.groupByKey();
> grouped.count(Materialized.as("count-store"));
> grouped.aggregate(
> () -> null,
> (k, v, agg) -> k, Materialized.as("latest-store"));{code}
> *Expected Behavior*
> With MERGE_REPARTITION_TOPICS enabled, count() and aggregate() should share a
> single repartition topic — only 1 repartition topic should be created.
> *Actual Behavior*
> Duplicate repartition topics are created. The optimization fails to recognize
> the relationship between the merge node and the key-changing node.
> *Proposed PR*
> [https://github.com/apache/kafka/pull/21540]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)