This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 27e6ac83617 [FLINK-34166][table] Fix KeyedLookupJoinWrapper 
incorrectly process delete message for inner join when previous lookup result 
is empty
27e6ac83617 is described below

commit 27e6ac836171c5c5539ceeb234a806be661cc30a
Author: lincoln lee <[email protected]>
AuthorDate: Mon Jan 22 14:28:17 2024 +0800

    [FLINK-34166][table] Fix KeyedLookupJoinWrapper incorrectly process delete 
message for inner join when previous lookup result is empty
    
    This closes #24152
---
 .../join/lookup/KeyedLookupJoinWrapper.java        | 65 +++++++++++++---------
 .../operators/join/lookup/LookupJoinRunner.java    | 18 +++---
 .../operators/join/KeyedLookupJoinHarnessTest.java |  9 ++-
 3 files changed, 57 insertions(+), 35 deletions(-)

diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/KeyedLookupJoinWrapper.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/KeyedLookupJoinWrapper.java
index 3913a435498..5fd5ed16ffe 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/KeyedLookupJoinWrapper.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/KeyedLookupJoinWrapper.java
@@ -129,40 +129,55 @@ public class KeyedLookupJoinWrapper extends 
KeyedProcessFunction<RowData, RowDat
 
         // do lookup for acc msg
         if (RowDataUtil.isAccumulateMsg(in)) {
-            // clear local state first
-            deleteState();
+            if (lookupJoinRunner.preFilter(in)) {
+                // clear local state first
+                deleteState();
 
-            // fetcher has copied the input field when object reuse is enabled
-            lookupJoinRunner.doFetch(in);
+                // fetcher has copied the input field when object reuse is 
enabled
+                lookupJoinRunner.doFetch(in);
 
-            // update state with empty row if lookup miss or pre-filtered
-            if (!collectListener.collected) {
-                updateState(emptyRow);
+                // update state with empty row if join condition unsatisfied
+                if (!collectListener.collected) {
+                    updateState(emptyRow);
+                }
             }
-
             lookupJoinRunner.padNullForLeftJoin(in, out);
         } else {
-            // do state access for non-acc msg
-            if (lookupKeyContainsPrimaryKey) {
-                RowData rightRow = uniqueState.value();
-                // should distinguish null from empty(lookup miss)
-                if (null == rightRow) {
-                    stateStaledErrorHandle(in, out);
-                } else {
-                    collectDeleteRow(in, rightRow, out);
-                }
-            } else {
-                List<RowData> rightRows = state.value();
-                if (null == rightRows) {
-                    stateStaledErrorHandle(in, out);
+            boolean collected = false;
+            if (lookupJoinRunner.preFilter(in)) {
+                // do state access for non-acc msg
+                if (lookupKeyContainsPrimaryKey) {
+                    RowData rightRow = uniqueState.value();
+                    // should distinguish null from empty(join condition 
unsatisfied)
+                    if (null == rightRow) {
+                        stateStaledErrorHandle(in, out);
+                    } else if (!emptyRow.equals(rightRow)) {
+                        collectDeleteRow(in, rightRow, out);
+                        collected = true;
+                    }
                 } else {
-                    for (RowData row : rightRows) {
-                        collectDeleteRow(in, row, out);
+                    List<RowData> rightRows = state.value();
+                    if (null == rightRows) {
+                        stateStaledErrorHandle(in, out);
+                    } else {
+                        for (RowData row : rightRows) {
+                            if (!emptyRow.equals(row)) {
+                                collectDeleteRow(in, row, out);
+                                collected = true;
+                            }
+                        }
                     }
                 }
+                // clear state at last
+                deleteState();
+            }
+
+            // pad null for left join if no delete row collected from state, 
here we can't use the
+            // collector's status to determine whether the row is collected or 
not, because data
+            // fetched from state is not collected by the collector
+            if (lookupJoinRunner.isLeftOuterJoin && !collected) {
+                collectDeleteRow(in, lookupJoinRunner.nullRow, out);
             }
-            // clear state at last
-            deleteState();
         }
     }
 
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/LookupJoinRunner.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/LookupJoinRunner.java
index cb818452894..718962c5c8b 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/LookupJoinRunner.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/LookupJoinRunner.java
@@ -46,7 +46,7 @@ public class LookupJoinRunner extends 
ProcessFunction<RowData, RowData> {
     protected transient ListenableCollector<RowData> collector;
     protected transient JoinedRowData outRow;
     protected transient FilterCondition preFilterCondition;
-    private transient GenericRowData nullRow;
+    protected transient GenericRowData nullRow;
 
     public LookupJoinRunner(
             GeneratedFunction<FlatMapFunction<RowData, RowData>> 
generatedFetcher,
@@ -87,7 +87,10 @@ public class LookupJoinRunner extends 
ProcessFunction<RowData, RowData> {
 
         prepareCollector(in, out);
 
-        doFetch(in);
+        // apply local filter first
+        if (preFilter(in)) {
+            doFetch(in);
+        }
 
         padNullForLeftJoin(in, out);
     }
@@ -98,12 +101,13 @@ public class LookupJoinRunner extends 
ProcessFunction<RowData, RowData> {
         collector.reset();
     }
 
+    public boolean preFilter(RowData in) throws Exception {
+        return preFilterCondition.apply(in);
+    }
+
     public void doFetch(RowData in) throws Exception {
-        // apply local filter first
-        if (preFilterCondition.apply(in)) {
-            // fetcher has copied the input field when object reuse is enabled
-            fetcher.flatMap(in, getFetcherCollector());
-        }
+        // fetcher has copied the input field when object reuse is enabled
+        fetcher.flatMap(in, getFetcherCollector());
     }
 
     public void padNullForLeftJoin(RowData in, Collector<RowData> out) {
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/KeyedLookupJoinHarnessTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/KeyedLookupJoinHarnessTest.java
index ade839d6426..19505da9b07 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/KeyedLookupJoinHarnessTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/KeyedLookupJoinHarnessTest.java
@@ -200,7 +200,6 @@ public class KeyedLookupJoinHarnessTest {
         List<Object> expectedOutput = new ArrayList<>();
         expectedOutput.add(insertRecord(1, "a", 1, "Julian"));
         expectedOutput.add(insertRecord(4, "d", 4, "Fabian"));
-        expectedOutput.add(deleteRecord(3, "c", null, null));
         expectedOutput.add(insertRecord(3, "c2", 6, "Jark-2"));
         expectedOutput.add(deleteRecord(3, "c2", 6, "Jark-2"));
         expectedOutput.add(insertRecord(3, "c3", 9, "Jark-3"));
@@ -288,6 +287,7 @@ public class KeyedLookupJoinHarnessTest {
         testHarness.processElement(updateAfterRecord(3, "c2"));
         testHarness.processElement(deleteRecord(3, "c2"));
         testHarness.processElement(insertRecord(3, "c3"));
+        testHarness.processElement(deleteRecord(4, "d"));
         testHarness.processElement(insertRecord(4, null));
 
         List<Object> expectedOutput = new ArrayList<>();
@@ -305,6 +305,7 @@ public class KeyedLookupJoinHarnessTest {
         expectedOutput.add(deleteRecord(3, "c2", 6, "Jackson-2"));
         expectedOutput.add(insertRecord(3, "c3", 9, "Jark-3"));
         expectedOutput.add(insertRecord(3, "c3", 9, "Jackson-3"));
+        expectedOutput.add(deleteRecord(4, "d", 4, "Fabian"));
         expectedOutput.add(insertRecord(4, null, null, null));
 
         assertor.assertOutputEquals("output wrong.", expectedOutput, 
testHarness.getOutput());
@@ -329,6 +330,7 @@ public class KeyedLookupJoinHarnessTest {
         testHarness.processElement(updateAfterRecord(3, "c2"));
         testHarness.processElement(deleteRecord(3, "c2"));
         testHarness.processElement(insertRecord(3, "c3"));
+        testHarness.processElement(deleteRecord(4, "d"));
         testHarness.processElement(insertRecord(4, null));
 
         List<Object> expectedOutput = new ArrayList<>();
@@ -343,6 +345,7 @@ public class KeyedLookupJoinHarnessTest {
         expectedOutput.add(insertRecord(3, "c2", 6, "Jark-2"));
         expectedOutput.add(deleteRecord(3, "c2", 6, "Jark-2"));
         expectedOutput.add(insertRecord(3, "c3", 9, "Jark-3"));
+        expectedOutput.add(deleteRecord(4, "d", 4, "Fabian"));
         expectedOutput.add(insertRecord(4, null, null, null));
 
         assertor.assertOutputEquals("output wrong.", expectedOutput, 
testHarness.getOutput());
@@ -494,8 +497,8 @@ public class KeyedLookupJoinHarnessTest {
             int currentCnt = counter(id);
             List<GenericRowData> rows = lookup(id);
             if (rows != null) {
-                for (int i = 0; i < rows.size(); i++) {
-                    collectUpdatedRow(rows.get(i), currentCnt, out);
+                for (GenericRowData row : rows) {
+                    collectUpdatedRow(row, currentCnt, out);
                 }
             } else if (currentCnt > 1) {
                 // return a default value for which lookup miss at 1st time

Reply via email to