This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 0c4ab25aa14 [FLINK-39241][table] Fix MultiJoinStateViews by changing
GenericRowData to BinaryRowData
0c4ab25aa14 is described below
commit 0c4ab25aa14cf28a3e99283b954cba9fabe4fddb
Author: Dmitriy Linevich <[email protected]>
AuthorDate: Tue Mar 31 16:21:33 2026 +0700
[FLINK-39241][table] Fix MultiJoinStateViews by changing GenericRowData to
BinaryRowData
---
.../planner/runtime/stream/sql/JoinITCase.scala | 49 ++++++++++++++++++++++
.../join/stream/StreamingMultiJoinOperator.java | 7 ++--
.../join/stream/state/MultiJoinStateViews.java | 41 ++++++++++++++----
3 files changed, 87 insertions(+), 10 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala
index fa5ef6a2752..d65c35b78d9 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala
@@ -640,6 +640,55 @@ class JoinITCase(miniBatch: MiniBatchMode, state:
StateBackendMode, enableAsyncS
assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted)
}
+ @TestTemplate
+ def testThreeWayMultiJoinWithoutPk(): Unit = {
+ env.setParallelism(1)
+ val data1 = new mutable.MutableList[(Int, Long)]
+ data1.+=((1, 1L))
+ data1.+=((1, 2L))
+ data1.+=((1, 2L))
+ data1.+=((1, 5L))
+ data1.+=((2, 7L))
+ data1.+=((1, 9L))
+ data1.+=((1, 8L))
+ data1.+=((3, 8L))
+
+ val data2 = new mutable.MutableList[(Int, Long)]
+ data2.+=((1, 1L))
+ data2.+=((2, 2L))
+ data2.+=((3, 2L))
+ data2.+=((1, 4L))
+
+ val data3 = new mutable.MutableList[(Int, Long)]
+ data3.+=((1, 1L))
+ data3.+=((2, 2L))
+ data3.+=((3, 2L))
+ data3.+=((2, 1L))
+
+ val a = failingDataSource(data1).toTable(tEnv, 'a1, 'a2)
+ val b = failingDataSource(data2).toTable(tEnv, 'b1, 'b2)
+ val c = failingDataSource(data3).toTable(tEnv, 'c1, 'c2)
+
+ tEnv.createTemporaryView("Atable", a)
+ tEnv.createTemporaryView("Btable", b)
+ tEnv.createTemporaryView("Ctable", c)
+
+ tEnv.getConfig.getConfiguration
+
.setString(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED.key(),
"true")
+ val query1 = "SELECT SUM(a2) AS a2, a1 FROM Atable group by a1"
+ val query2 = "SELECT SUM(b2) AS b2, b1 FROM Btable group by b1"
+ val query3 = "SELECT SUM(c2) AS c2, c1 FROM Ctable group by c1"
+ val query =
+ s"SELECT a1, b1, c1 FROM ($query1) JOIN ($query2) ON a1 = b1 JOIN
($query3) ON c2 = b2"
+
+ val sink = new TestingRetractSink
+ tEnv.sqlQuery(query).toRetractStream[Row].addSink(sink).setParallelism(1)
+ env.execute()
+
+ val expected = Seq("2,2,3", "3,3,3")
+ assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted)
+ }
+
@TestTemplate
def testInnerJoinWithPk(): Unit = {
val query1 = "SELECT SUM(a2) AS a2, a1 FROM A group by a1"
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingMultiJoinOperator.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingMultiJoinOperator.java
index 77ab15e0b33..3ed160ab711 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingMultiJoinOperator.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingMultiJoinOperator.java
@@ -817,8 +817,8 @@ public class StreamingMultiJoinOperator extends
AbstractStreamOperatorV2<RowData
"Keyed state store not found when initializing keyed state
store handlers.");
}
- boolean prohibitReuseRow = isHeapBackend();
- if (prohibitReuseRow) {
+ boolean requiresKeyDeepCopy = isHeapBackend();
+ if (requiresKeyDeepCopy) {
this.keyExtractor.requiresKeyDeepCopy();
}
@@ -835,7 +835,8 @@ public class StreamingMultiJoinOperator extends
AbstractStreamOperatorV2<RowData
inputSpecs.get(i),
joinKeyType,
inputTypes.get(i),
- stateRetentionTime[i]);
+ stateRetentionTime[i],
+ requiresKeyDeepCopy);
stateHandlers.add(stateView);
}
}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/MultiJoinStateViews.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/MultiJoinStateViews.java
index 6d3caba2ba3..8326dada55d 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/MultiJoinStateViews.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/MultiJoinStateViews.java
@@ -68,7 +68,8 @@ public final class MultiJoinStateViews {
RowType
joinKeyType, /* joinKeyType is null for inputId =
0, see {@link InputSideHasUniqueKey}*/
RowType recordType,
- long retentionTime) {
+ long retentionTime,
+ boolean requiresKeyDeepCopy) {
StateTtlConfig ttlConfig = createTtlConfig(retentionTime);
if (inputSideSpec.hasUniqueKey()) {
@@ -83,10 +84,12 @@ public final class MultiJoinStateViews {
recordType,
inputSideSpec.getUniqueKeyType(),
inputSideSpec.getUniqueKeySelector(),
- ttlConfig);
+ ttlConfig,
+ requiresKeyDeepCopy);
}
} else {
- return new InputSideHasNoUniqueKey(ctx, stateName, joinKeyType,
recordType, ttlConfig);
+ return new InputSideHasNoUniqueKey(
+ ctx, stateName, joinKeyType, recordType, ttlConfig,
requiresKeyDeepCopy);
}
}
@@ -182,7 +185,9 @@ public final class MultiJoinStateViews {
private final MapState<RowData, RowData> recordState;
private final KeySelector<RowData, RowData> uniqueKeySelector;
private RowDataSerializer joinKeySerializer; // Null if joinKeyType is
null
+ private RowDataSerializer stateKeySerializer; // Null if joinKeyType
is null
private int joinKeyFieldCount = 0; // 0 if joinKeyType is null
+ private final boolean requiresKeyDeepCopy;
private InputSideHasUniqueKey(
RuntimeContext ctx,
@@ -191,7 +196,8 @@ public final class MultiJoinStateViews {
final RowType recordType,
final InternalTypeInfo<RowData> uniqueKeyType,
final KeySelector<RowData, RowData> uniqueKeySelector,
- final StateTtlConfig ttlConfig) {
+ final StateTtlConfig ttlConfig,
+ final boolean requiresKeyDeepCopy) {
checkNotNull(uniqueKeyType);
checkNotNull(uniqueKeySelector);
this.uniqueKeySelector = uniqueKeySelector;
@@ -208,6 +214,7 @@ public final class MultiJoinStateViews {
// Composite key type: RowData with 2 fields (joinKey,
uniqueKey)
// The composite key is a RowData with joinKey at index 0 and
uniqueKey at index 1.
final RowType keyRowType = RowType.of(joinKeyType,
uniqueKeyType.toRowType());
+ this.stateKeySerializer = new RowDataSerializer(keyRowType);
keyStateType = InternalTypeInfo.of(keyRowType);
}
@@ -216,6 +223,7 @@ public final class MultiJoinStateViews {
stateName, keyStateType,
InternalTypeInfo.of(recordType), ttlConfig);
this.recordState = ctx.getMapState(recordStateDesc);
+ this.requiresKeyDeepCopy = requiresKeyDeepCopy;
}
private boolean joinKeysEqual(RowData joinKey, RowData
currentJoinKeyInState) {
@@ -231,7 +239,14 @@ public final class MultiJoinStateViews {
GenericRowData compositeKey = new GenericRowData(2);
compositeKey.setField(0, joinKey);
compositeKey.setField(1, uniqueKey);
- return compositeKey;
+
+ // need to make deep copy when heap state backend is used
+ // because generic row data and binary row data are not
equivalent
+ if (requiresKeyDeepCopy) {
+ return stateKeySerializer.toBinaryRow(compositeKey, true);
+ } else {
+ return compositeKey;
+ }
}
}
@@ -313,16 +328,19 @@ public final class MultiJoinStateViews {
private static final class InputSideHasNoUniqueKey implements
MultiJoinStateView {
private final MapState<RowData, Integer> recordState;
private RowDataSerializer joinKeySerializer; // Null if joinKeyType is
null
+ private RowDataSerializer stateKeySerializer; // Null if joinKeyType
is null
private int joinKeyFieldCount; // 0 if joinKeyType is null
private final int recordFieldCount;
@Nullable private final RowType joinKeyType; // Store to check for null
+ private final boolean requiresKeyDeepCopy;
private InputSideHasNoUniqueKey(
RuntimeContext ctx,
final String stateName,
@Nullable final RowType joinKeyType, // Can be null
final RowType recordType,
- final StateTtlConfig ttlConfig) {
+ final StateTtlConfig ttlConfig,
+ final boolean requiresKeyDeepCopy) {
this.joinKeyType = joinKeyType;
this.recordFieldCount = recordType.getFieldCount();
@@ -335,6 +353,7 @@ public final class MultiJoinStateViews {
this.joinKeyFieldCount = this.joinKeyType.getFieldCount();
// Composite key type: RowData with 2 fields (joinKey, record)
final RowType keyRowType = RowType.of(this.joinKeyType,
recordType);
+ this.stateKeySerializer = new RowDataSerializer(keyRowType);
keyStateType = InternalTypeInfo.of(keyRowType);
}
@@ -342,6 +361,7 @@ public final class MultiJoinStateViews {
createStateDescriptor(stateName, keyStateType, Types.INT,
ttlConfig);
this.recordState = ctx.getMapState(recordStateDesc);
+ this.requiresKeyDeepCopy = requiresKeyDeepCopy;
}
private boolean joinKeysEqual(RowData joinKeyToLookup, RowData
currentJoinKeyInState) {
@@ -357,7 +377,14 @@ public final class MultiJoinStateViews {
GenericRowData compositeKey = new GenericRowData(2);
compositeKey.setField(0, joinKey);
compositeKey.setField(1, record);
- return compositeKey;
+
+ // need to make deep copy when heap state backend is used
+ // because generic row data and binary row data are not
equivalent
+ if (requiresKeyDeepCopy) {
+ return stateKeySerializer.toBinaryRow(compositeKey, true);
+ } else {
+ return compositeKey;
+ }
}
}