This is an automated email from the ASF dual-hosted git repository.
lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 27e6ac83617 [FLINK-34166][table] Fix KeyedLookupJoinWrapper
incorrectly process delete message for inner join when previous lookup result
is empty
27e6ac83617 is described below
commit 27e6ac836171c5c5539ceeb234a806be661cc30a
Author: lincoln lee <[email protected]>
AuthorDate: Mon Jan 22 14:28:17 2024 +0800
[FLINK-34166][table] Fix KeyedLookupJoinWrapper incorrectly process delete
message for inner join when previous lookup result is empty
This closes #24152
---
.../join/lookup/KeyedLookupJoinWrapper.java | 65 +++++++++++++---------
.../operators/join/lookup/LookupJoinRunner.java | 18 +++---
.../operators/join/KeyedLookupJoinHarnessTest.java | 9 ++-
3 files changed, 57 insertions(+), 35 deletions(-)
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/KeyedLookupJoinWrapper.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/KeyedLookupJoinWrapper.java
index 3913a435498..5fd5ed16ffe 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/KeyedLookupJoinWrapper.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/KeyedLookupJoinWrapper.java
@@ -129,40 +129,55 @@ public class KeyedLookupJoinWrapper extends
KeyedProcessFunction<RowData, RowDat
// do lookup for acc msg
if (RowDataUtil.isAccumulateMsg(in)) {
- // clear local state first
- deleteState();
+ if (lookupJoinRunner.preFilter(in)) {
+ // clear local state first
+ deleteState();
- // fetcher has copied the input field when object reuse is enabled
- lookupJoinRunner.doFetch(in);
+ // fetcher has copied the input field when object reuse is
enabled
+ lookupJoinRunner.doFetch(in);
- // update state with empty row if lookup miss or pre-filtered
- if (!collectListener.collected) {
- updateState(emptyRow);
+ // update state with empty row if join condition unsatisfied
+ if (!collectListener.collected) {
+ updateState(emptyRow);
+ }
}
-
lookupJoinRunner.padNullForLeftJoin(in, out);
} else {
- // do state access for non-acc msg
- if (lookupKeyContainsPrimaryKey) {
- RowData rightRow = uniqueState.value();
- // should distinguish null from empty(lookup miss)
- if (null == rightRow) {
- stateStaledErrorHandle(in, out);
- } else {
- collectDeleteRow(in, rightRow, out);
- }
- } else {
- List<RowData> rightRows = state.value();
- if (null == rightRows) {
- stateStaledErrorHandle(in, out);
+ boolean collected = false;
+ if (lookupJoinRunner.preFilter(in)) {
+ // do state access for non-acc msg
+ if (lookupKeyContainsPrimaryKey) {
+ RowData rightRow = uniqueState.value();
+ // should distinguish null from empty(join condition
unsatisfied)
+ if (null == rightRow) {
+ stateStaledErrorHandle(in, out);
+ } else if (!emptyRow.equals(rightRow)) {
+ collectDeleteRow(in, rightRow, out);
+ collected = true;
+ }
} else {
- for (RowData row : rightRows) {
- collectDeleteRow(in, row, out);
+ List<RowData> rightRows = state.value();
+ if (null == rightRows) {
+ stateStaledErrorHandle(in, out);
+ } else {
+ for (RowData row : rightRows) {
+ if (!emptyRow.equals(row)) {
+ collectDeleteRow(in, row, out);
+ collected = true;
+ }
+ }
}
}
+ // clear state at last
+ deleteState();
+ }
+
+ // pad null for left join if no delete row collected from state,
here we can't use the
+ // collector's status to determine whether the row is collected or
not, because data
+ // fetched from state is not collected by the collector
+ if (lookupJoinRunner.isLeftOuterJoin && !collected) {
+ collectDeleteRow(in, lookupJoinRunner.nullRow, out);
}
- // clear state at last
- deleteState();
}
}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/LookupJoinRunner.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/LookupJoinRunner.java
index cb818452894..718962c5c8b 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/LookupJoinRunner.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/LookupJoinRunner.java
@@ -46,7 +46,7 @@ public class LookupJoinRunner extends
ProcessFunction<RowData, RowData> {
protected transient ListenableCollector<RowData> collector;
protected transient JoinedRowData outRow;
protected transient FilterCondition preFilterCondition;
- private transient GenericRowData nullRow;
+ protected transient GenericRowData nullRow;
public LookupJoinRunner(
GeneratedFunction<FlatMapFunction<RowData, RowData>>
generatedFetcher,
@@ -87,7 +87,10 @@ public class LookupJoinRunner extends
ProcessFunction<RowData, RowData> {
prepareCollector(in, out);
- doFetch(in);
+ // apply local filter first
+ if (preFilter(in)) {
+ doFetch(in);
+ }
padNullForLeftJoin(in, out);
}
@@ -98,12 +101,13 @@ public class LookupJoinRunner extends
ProcessFunction<RowData, RowData> {
collector.reset();
}
+ public boolean preFilter(RowData in) throws Exception {
+ return preFilterCondition.apply(in);
+ }
+
public void doFetch(RowData in) throws Exception {
- // apply local filter first
- if (preFilterCondition.apply(in)) {
- // fetcher has copied the input field when object reuse is enabled
- fetcher.flatMap(in, getFetcherCollector());
- }
+ // fetcher has copied the input field when object reuse is enabled
+ fetcher.flatMap(in, getFetcherCollector());
}
public void padNullForLeftJoin(RowData in, Collector<RowData> out) {
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/KeyedLookupJoinHarnessTest.java
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/KeyedLookupJoinHarnessTest.java
index ade839d6426..19505da9b07 100644
---
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/KeyedLookupJoinHarnessTest.java
+++
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/KeyedLookupJoinHarnessTest.java
@@ -200,7 +200,6 @@ public class KeyedLookupJoinHarnessTest {
List<Object> expectedOutput = new ArrayList<>();
expectedOutput.add(insertRecord(1, "a", 1, "Julian"));
expectedOutput.add(insertRecord(4, "d", 4, "Fabian"));
- expectedOutput.add(deleteRecord(3, "c", null, null));
expectedOutput.add(insertRecord(3, "c2", 6, "Jark-2"));
expectedOutput.add(deleteRecord(3, "c2", 6, "Jark-2"));
expectedOutput.add(insertRecord(3, "c3", 9, "Jark-3"));
@@ -288,6 +287,7 @@ public class KeyedLookupJoinHarnessTest {
testHarness.processElement(updateAfterRecord(3, "c2"));
testHarness.processElement(deleteRecord(3, "c2"));
testHarness.processElement(insertRecord(3, "c3"));
+ testHarness.processElement(deleteRecord(4, "d"));
testHarness.processElement(insertRecord(4, null));
List<Object> expectedOutput = new ArrayList<>();
@@ -305,6 +305,7 @@ public class KeyedLookupJoinHarnessTest {
expectedOutput.add(deleteRecord(3, "c2", 6, "Jackson-2"));
expectedOutput.add(insertRecord(3, "c3", 9, "Jark-3"));
expectedOutput.add(insertRecord(3, "c3", 9, "Jackson-3"));
+ expectedOutput.add(deleteRecord(4, "d", 4, "Fabian"));
expectedOutput.add(insertRecord(4, null, null, null));
assertor.assertOutputEquals("output wrong.", expectedOutput,
testHarness.getOutput());
@@ -329,6 +330,7 @@ public class KeyedLookupJoinHarnessTest {
testHarness.processElement(updateAfterRecord(3, "c2"));
testHarness.processElement(deleteRecord(3, "c2"));
testHarness.processElement(insertRecord(3, "c3"));
+ testHarness.processElement(deleteRecord(4, "d"));
testHarness.processElement(insertRecord(4, null));
List<Object> expectedOutput = new ArrayList<>();
@@ -343,6 +345,7 @@ public class KeyedLookupJoinHarnessTest {
expectedOutput.add(insertRecord(3, "c2", 6, "Jark-2"));
expectedOutput.add(deleteRecord(3, "c2", 6, "Jark-2"));
expectedOutput.add(insertRecord(3, "c3", 9, "Jark-3"));
+ expectedOutput.add(deleteRecord(4, "d", 4, "Fabian"));
expectedOutput.add(insertRecord(4, null, null, null));
assertor.assertOutputEquals("output wrong.", expectedOutput,
testHarness.getOutput());
@@ -494,8 +497,8 @@ public class KeyedLookupJoinHarnessTest {
int currentCnt = counter(id);
List<GenericRowData> rows = lookup(id);
if (rows != null) {
- for (int i = 0; i < rows.size(); i++) {
- collectUpdatedRow(rows.get(i), currentCnt, out);
+ for (GenericRowData row : rows) {
+ collectUpdatedRow(row, currentCnt, out);
}
} else if (currentCnt > 1) {
// return a default value for which lookup miss at 1st time