This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c4ab25aa14 [FLINK-39241][table] Fix MultiJoinStateViews by changing 
GenericRowData to BinaryRowData
0c4ab25aa14 is described below

commit 0c4ab25aa14cf28a3e99283b954cba9fabe4fddb
Author: Dmitriy Linevich <[email protected]>
AuthorDate: Tue Mar 31 16:21:33 2026 +0700

    [FLINK-39241][table] Fix MultiJoinStateViews by changing GenericRowData to 
BinaryRowData
---
 .../planner/runtime/stream/sql/JoinITCase.scala    | 49 ++++++++++++++++++++++
 .../join/stream/StreamingMultiJoinOperator.java    |  7 ++--
 .../join/stream/state/MultiJoinStateViews.java     | 41 ++++++++++++++----
 3 files changed, 87 insertions(+), 10 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala
index fa5ef6a2752..d65c35b78d9 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala
@@ -640,6 +640,55 @@ class JoinITCase(miniBatch: MiniBatchMode, state: 
StateBackendMode, enableAsyncS
     assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted)
   }
 
+  @TestTemplate
+  def testThreeWayMultiJoinWithoutPk(): Unit = {
+    env.setParallelism(1)
+    val data1 = new mutable.MutableList[(Int, Long)]
+    data1.+=((1, 1L))
+    data1.+=((1, 2L))
+    data1.+=((1, 2L))
+    data1.+=((1, 5L))
+    data1.+=((2, 7L))
+    data1.+=((1, 9L))
+    data1.+=((1, 8L))
+    data1.+=((3, 8L))
+
+    val data2 = new mutable.MutableList[(Int, Long)]
+    data2.+=((1, 1L))
+    data2.+=((2, 2L))
+    data2.+=((3, 2L))
+    data2.+=((1, 4L))
+
+    val data3 = new mutable.MutableList[(Int, Long)]
+    data3.+=((1, 1L))
+    data3.+=((2, 2L))
+    data3.+=((3, 2L))
+    data3.+=((2, 1L))
+
+    val a = failingDataSource(data1).toTable(tEnv, 'a1, 'a2)
+    val b = failingDataSource(data2).toTable(tEnv, 'b1, 'b2)
+    val c = failingDataSource(data3).toTable(tEnv, 'c1, 'c2)
+
+    tEnv.createTemporaryView("Atable", a)
+    tEnv.createTemporaryView("Btable", b)
+    tEnv.createTemporaryView("Ctable", c)
+
+    tEnv.getConfig.getConfiguration
+      
.setString(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED.key(), 
"true")
+    val query1 = "SELECT SUM(a2) AS a2, a1 FROM Atable group by a1"
+    val query2 = "SELECT SUM(b2) AS b2, b1 FROM Btable group by b1"
+    val query3 = "SELECT SUM(c2) AS c2, c1 FROM Ctable group by c1"
+    val query =
+      s"SELECT a1, b1, c1 FROM ($query1) JOIN ($query2) ON a1 = b1 JOIN 
($query3) ON c2 = b2"
+
+    val sink = new TestingRetractSink
+    tEnv.sqlQuery(query).toRetractStream[Row].addSink(sink).setParallelism(1)
+    env.execute()
+
+    val expected = Seq("2,2,3", "3,3,3")
+    assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted)
+  }
+
   @TestTemplate
   def testInnerJoinWithPk(): Unit = {
     val query1 = "SELECT SUM(a2) AS a2, a1 FROM A group by a1"
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingMultiJoinOperator.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingMultiJoinOperator.java
index 77ab15e0b33..3ed160ab711 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingMultiJoinOperator.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingMultiJoinOperator.java
@@ -817,8 +817,8 @@ public class StreamingMultiJoinOperator extends 
AbstractStreamOperatorV2<RowData
                     "Keyed state store not found when initializing keyed state 
store handlers.");
         }
 
-        boolean prohibitReuseRow = isHeapBackend();
-        if (prohibitReuseRow) {
+        boolean requiresKeyDeepCopy = isHeapBackend();
+        if (requiresKeyDeepCopy) {
             this.keyExtractor.requiresKeyDeepCopy();
         }
 
@@ -835,7 +835,8 @@ public class StreamingMultiJoinOperator extends 
AbstractStreamOperatorV2<RowData
                             inputSpecs.get(i),
                             joinKeyType,
                             inputTypes.get(i),
-                            stateRetentionTime[i]);
+                            stateRetentionTime[i],
+                            requiresKeyDeepCopy);
             stateHandlers.add(stateView);
         }
     }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/MultiJoinStateViews.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/MultiJoinStateViews.java
index 6d3caba2ba3..8326dada55d 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/MultiJoinStateViews.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/MultiJoinStateViews.java
@@ -68,7 +68,8 @@ public final class MultiJoinStateViews {
                     RowType
                             joinKeyType, /* joinKeyType is null for inputId = 
0, see {@link InputSideHasUniqueKey}*/
             RowType recordType,
-            long retentionTime) {
+            long retentionTime,
+            boolean requiresKeyDeepCopy) {
         StateTtlConfig ttlConfig = createTtlConfig(retentionTime);
 
         if (inputSideSpec.hasUniqueKey()) {
@@ -83,10 +84,12 @@ public final class MultiJoinStateViews {
                         recordType,
                         inputSideSpec.getUniqueKeyType(),
                         inputSideSpec.getUniqueKeySelector(),
-                        ttlConfig);
+                        ttlConfig,
+                        requiresKeyDeepCopy);
             }
         } else {
-            return new InputSideHasNoUniqueKey(ctx, stateName, joinKeyType, 
recordType, ttlConfig);
+            return new InputSideHasNoUniqueKey(
+                    ctx, stateName, joinKeyType, recordType, ttlConfig, 
requiresKeyDeepCopy);
         }
     }
 
@@ -182,7 +185,9 @@ public final class MultiJoinStateViews {
         private final MapState<RowData, RowData> recordState;
         private final KeySelector<RowData, RowData> uniqueKeySelector;
         private RowDataSerializer joinKeySerializer; // Null if joinKeyType is 
null
+        private RowDataSerializer stateKeySerializer; // Null if joinKeyType 
is null
         private int joinKeyFieldCount = 0; // 0 if joinKeyType is null
+        private final boolean requiresKeyDeepCopy;
 
         private InputSideHasUniqueKey(
                 RuntimeContext ctx,
@@ -191,7 +196,8 @@ public final class MultiJoinStateViews {
                 final RowType recordType,
                 final InternalTypeInfo<RowData> uniqueKeyType,
                 final KeySelector<RowData, RowData> uniqueKeySelector,
-                final StateTtlConfig ttlConfig) {
+                final StateTtlConfig ttlConfig,
+                final boolean requiresKeyDeepCopy) {
             checkNotNull(uniqueKeyType);
             checkNotNull(uniqueKeySelector);
             this.uniqueKeySelector = uniqueKeySelector;
@@ -208,6 +214,7 @@ public final class MultiJoinStateViews {
                 // Composite key type: RowData with 2 fields (joinKey, 
uniqueKey)
                 // The composite key is a RowData with joinKey at index 0 and 
uniqueKey at index 1.
                 final RowType keyRowType = RowType.of(joinKeyType, 
uniqueKeyType.toRowType());
+                this.stateKeySerializer = new RowDataSerializer(keyRowType);
                 keyStateType = InternalTypeInfo.of(keyRowType);
             }
 
@@ -216,6 +223,7 @@ public final class MultiJoinStateViews {
                             stateName, keyStateType, 
InternalTypeInfo.of(recordType), ttlConfig);
 
             this.recordState = ctx.getMapState(recordStateDesc);
+            this.requiresKeyDeepCopy = requiresKeyDeepCopy;
         }
 
         private boolean joinKeysEqual(RowData joinKey, RowData 
currentJoinKeyInState) {
@@ -231,7 +239,14 @@ public final class MultiJoinStateViews {
                 GenericRowData compositeKey = new GenericRowData(2);
                 compositeKey.setField(0, joinKey);
                 compositeKey.setField(1, uniqueKey);
-                return compositeKey;
+
+                // need to make deep copy when heap state backend is used
+                // because generic row data and binary row data are not 
equivalent
+                if (requiresKeyDeepCopy) {
+                    return stateKeySerializer.toBinaryRow(compositeKey, true);
+                } else {
+                    return compositeKey;
+                }
             }
         }
 
@@ -313,16 +328,19 @@ public final class MultiJoinStateViews {
     private static final class InputSideHasNoUniqueKey implements 
MultiJoinStateView {
         private final MapState<RowData, Integer> recordState;
         private RowDataSerializer joinKeySerializer; // Null if joinKeyType is 
null
+        private RowDataSerializer stateKeySerializer; // Null if joinKeyType 
is null
         private int joinKeyFieldCount; // 0 if joinKeyType is null
         private final int recordFieldCount;
         @Nullable private final RowType joinKeyType; // Store to check for null
+        private final boolean requiresKeyDeepCopy;
 
         private InputSideHasNoUniqueKey(
                 RuntimeContext ctx,
                 final String stateName,
                 @Nullable final RowType joinKeyType, // Can be null
                 final RowType recordType,
-                final StateTtlConfig ttlConfig) {
+                final StateTtlConfig ttlConfig,
+                final boolean requiresKeyDeepCopy) {
             this.joinKeyType = joinKeyType;
             this.recordFieldCount = recordType.getFieldCount();
 
@@ -335,6 +353,7 @@ public final class MultiJoinStateViews {
                 this.joinKeyFieldCount = this.joinKeyType.getFieldCount();
                 // Composite key type: RowData with 2 fields (joinKey, record)
                 final RowType keyRowType = RowType.of(this.joinKeyType, 
recordType);
+                this.stateKeySerializer = new RowDataSerializer(keyRowType);
                 keyStateType = InternalTypeInfo.of(keyRowType);
             }
 
@@ -342,6 +361,7 @@ public final class MultiJoinStateViews {
                     createStateDescriptor(stateName, keyStateType, Types.INT, 
ttlConfig);
 
             this.recordState = ctx.getMapState(recordStateDesc);
+            this.requiresKeyDeepCopy = requiresKeyDeepCopy;
         }
 
         private boolean joinKeysEqual(RowData joinKeyToLookup, RowData 
currentJoinKeyInState) {
@@ -357,7 +377,14 @@ public final class MultiJoinStateViews {
                 GenericRowData compositeKey = new GenericRowData(2);
                 compositeKey.setField(0, joinKey);
                 compositeKey.setField(1, record);
-                return compositeKey;
+
+                // need to make deep copy when heap state backend is used
+                // because generic row data and binary row data are not 
equivalent
+                if (requiresKeyDeepCopy) {
+                    return stateKeySerializer.toBinaryRow(compositeKey, true);
+                } else {
+                    return compositeKey;
+                }
             }
         }
 

Reply via email to