This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ba3f2bdea6 [FLINK-32052][table-runtime] Introduce left and right 
state retention time to StreamingJoinOperator
5ba3f2bdea6 is described below

commit 5ba3f2bdea6fc7c9e58b50200806ea341b7dd3d3
Author: Jane Chan <qingyue....@gmail.com>
AuthorDate: Sun Apr 30 00:29:29 2023 +0800

    [FLINK-32052][table-runtime] Introduce left and right state retention time 
to StreamingJoinOperator
    
    This closes #22566
---
 .../plan/nodes/exec/stream/StreamExecJoin.java     |   2 +
 .../join/stream/AbstractStreamingJoinOperator.java |   9 +-
 .../join/stream/StreamingJoinOperator.java         |  14 +-
 .../join/stream/StreamingSemiAntiJoinOperator.java |  12 +-
 .../join/stream/StreamingJoinOperatorTest.java     | 656 +++++++++++++++++++++
 .../join/stream/StreamingJoinOperatorTestBase.java | 142 +++++
 .../stream/StreamingSemiAntiJoinOperatorTest.java  | 294 +++++++++
 .../operators/sink/SinkUpsertMaterializerTest.java |  62 +-
 .../table/runtime/util/RowDataHarnessAssertor.java |  32 +
 9 files changed, 1164 insertions(+), 59 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecJoin.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecJoin.java
index da2800d246f..47544eeb6f8 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecJoin.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecJoin.java
@@ -184,6 +184,7 @@ public class StreamExecJoin extends ExecNodeBase<RowData>
                             leftInputSpec,
                             rightInputSpec,
                             joinSpec.getFilterNulls(),
+                            minRetentionTime,
                             minRetentionTime);
         } else {
             boolean leftIsOuter = joinType == FlinkJoinType.LEFT || joinType 
== FlinkJoinType.FULL;
@@ -199,6 +200,7 @@ public class StreamExecJoin extends ExecNodeBase<RowData>
                             leftIsOuter,
                             rightIsOuter,
                             joinSpec.getFilterNulls(),
+                            minRetentionTime,
                             minRetentionTime);
         }
 
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/AbstractStreamingJoinOperator.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/AbstractStreamingJoinOperator.java
index 64ada0f0db4..c7dad646631 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/AbstractStreamingJoinOperator.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/AbstractStreamingJoinOperator.java
@@ -60,7 +60,8 @@ public abstract class AbstractStreamingJoinOperator extends 
AbstractStreamOperat
 
     private final boolean[] filterNullKeys;
 
-    protected final long stateRetentionTime;
+    protected final long leftStateRetentionTime;
+    protected final long rightStateRetentionTime;
 
     protected transient JoinConditionWithNullFilters joinCondition;
     protected transient TimestampedCollector<RowData> collector;
@@ -72,13 +73,15 @@ public abstract class AbstractStreamingJoinOperator extends 
AbstractStreamOperat
             JoinInputSideSpec leftInputSideSpec,
             JoinInputSideSpec rightInputSideSpec,
             boolean[] filterNullKeys,
-            long stateRetentionTime) {
+            long leftStateRetentionTime,
+            long rightStateRetentionTime) {
         this.leftType = leftType;
         this.rightType = rightType;
         this.generatedJoinCondition = generatedJoinCondition;
         this.leftInputSideSpec = leftInputSideSpec;
         this.rightInputSideSpec = rightInputSideSpec;
-        this.stateRetentionTime = stateRetentionTime;
+        this.leftStateRetentionTime = leftStateRetentionTime;
+        this.rightStateRetentionTime = rightStateRetentionTime;
         this.filterNullKeys = filterNullKeys;
     }
 
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperator.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperator.java
index d221c555996..308b98e2794 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperator.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperator.java
@@ -60,7 +60,8 @@ public class StreamingJoinOperator extends 
AbstractStreamingJoinOperator {
             boolean leftIsOuter,
             boolean rightIsOuter,
             boolean[] filterNullKeys,
-            long stateRetentionTime) {
+            long leftStateRetentionTime,
+            long rightStateRetentionTime) {
         super(
                 leftType,
                 rightType,
@@ -68,7 +69,8 @@ public class StreamingJoinOperator extends 
AbstractStreamingJoinOperator {
                 leftInputSideSpec,
                 rightInputSideSpec,
                 filterNullKeys,
-                stateRetentionTime);
+                leftStateRetentionTime,
+                rightStateRetentionTime);
         this.leftIsOuter = leftIsOuter;
         this.rightIsOuter = rightIsOuter;
     }
@@ -89,7 +91,7 @@ public class StreamingJoinOperator extends 
AbstractStreamingJoinOperator {
                             "left-records",
                             leftInputSideSpec,
                             leftType,
-                            stateRetentionTime);
+                            leftStateRetentionTime);
         } else {
             this.leftRecordStateView =
                     JoinRecordStateViews.create(
@@ -97,7 +99,7 @@ public class StreamingJoinOperator extends 
AbstractStreamingJoinOperator {
                             "left-records",
                             leftInputSideSpec,
                             leftType,
-                            stateRetentionTime);
+                            leftStateRetentionTime);
         }
 
         if (rightIsOuter) {
@@ -107,7 +109,7 @@ public class StreamingJoinOperator extends 
AbstractStreamingJoinOperator {
                             "right-records",
                             rightInputSideSpec,
                             rightType,
-                            stateRetentionTime);
+                            rightStateRetentionTime);
         } else {
             this.rightRecordStateView =
                     JoinRecordStateViews.create(
@@ -115,7 +117,7 @@ public class StreamingJoinOperator extends 
AbstractStreamingJoinOperator {
                             "right-records",
                             rightInputSideSpec,
                             rightType,
-                            stateRetentionTime);
+                            rightStateRetentionTime);
         }
     }
 
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingSemiAntiJoinOperator.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingSemiAntiJoinOperator.java
index 3c841d7a008..63f5203fda7 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingSemiAntiJoinOperator.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingSemiAntiJoinOperator.java
@@ -35,7 +35,7 @@ public class StreamingSemiAntiJoinOperator extends 
AbstractStreamingJoinOperator
 
     private static final long serialVersionUID = -3135772379944924519L;
 
-    // true if it is anti join, otherwise is semi joinp
+    // true if it is anti join, otherwise is semi join
     private final boolean isAntiJoin;
 
     // left join state
@@ -51,7 +51,8 @@ public class StreamingSemiAntiJoinOperator extends 
AbstractStreamingJoinOperator
             JoinInputSideSpec leftInputSideSpec,
             JoinInputSideSpec rightInputSideSpec,
             boolean[] filterNullKeys,
-            long stateRetentionTime) {
+            long leftStateRetentionTime,
+            long rightStateRetentionTIme) {
         super(
                 leftType,
                 rightType,
@@ -59,7 +60,8 @@ public class StreamingSemiAntiJoinOperator extends 
AbstractStreamingJoinOperator
                 leftInputSideSpec,
                 rightInputSideSpec,
                 filterNullKeys,
-                stateRetentionTime);
+                leftStateRetentionTime,
+                rightStateRetentionTIme);
         this.isAntiJoin = isAntiJoin;
     }
 
@@ -73,7 +75,7 @@ public class StreamingSemiAntiJoinOperator extends 
AbstractStreamingJoinOperator
                         LEFT_RECORDS_STATE_NAME,
                         leftInputSideSpec,
                         leftType,
-                        stateRetentionTime);
+                        leftStateRetentionTime);
 
         this.rightRecordStateView =
                 JoinRecordStateViews.create(
@@ -81,7 +83,7 @@ public class StreamingSemiAntiJoinOperator extends 
AbstractStreamingJoinOperator
                         RIGHT_RECORDS_STATE_NAME,
                         rightInputSideSpec,
                         rightType,
-                        stateRetentionTime);
+                        rightStateRetentionTime);
     }
 
     /**
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperatorTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperatorTest.java
new file mode 100644
index 00000000000..b2e609a3ddb
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperatorTest.java
@@ -0,0 +1,656 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join.stream;
+
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord;
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
+import static org.apache.flink.table.runtime.util.StreamRecordUtils.rowOfKind;
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord;
+
+/** Harness tests for {@link StreamingJoinOperator}. */
+public class StreamingJoinOperatorTest extends StreamingJoinOperatorTestBase {
+
+    @Override
+    protected StreamingJoinOperator createJoinOperator(TestInfo testInfo) {
+        Boolean[] joinTypeSpec = 
JOIN_TYPE_EXTRACTOR.apply(testInfo.getDisplayName());
+        Long[] ttl = STATE_RETENTION_TIME_EXTRACTOR.apply(testInfo.getTags());
+        return new StreamingJoinOperator(
+                leftTypeInfo,
+                rightTypeInfo,
+                joinCondition,
+                leftInputSpec,
+                rightInputSpec,
+                joinTypeSpec[0],
+                joinTypeSpec[1],
+                new boolean[] {true},
+                ttl[0],
+                ttl[1]);
+    }
+
+    @Override
+    protected RowType getOutputType() {
+        return RowType.of(
+                Stream.concat(
+                                
leftTypeInfo.toRowType().getChildren().stream(),
+                                
rightTypeInfo.toRowType().getChildren().stream())
+                        .toArray(LogicalType[]::new),
+                Stream.concat(
+                                
leftTypeInfo.toRowType().getFieldNames().stream(),
+                                
rightTypeInfo.toRowType().getFieldNames().stream())
+                        .toArray(String[]::new));
+    }
+
+    /**
+     * The equivalent SQL as follows.
+     *
+     * <p>{@code SELECT a.order_id, a.line_order_id, a.shipping_address, 
b.line_order_id,
+     * b.line_order_ship_mode FROM orders a JOIN line_orders b ON 
a.line_order_id = b.line_order_id}
+     */
+    @Tag("leftStateRetentionTime=4000")
+    @Tag("rightStateRetentionTime=1000")
+    @Test
+    public void testInnerJoinWithDifferentStateRetentionTime() throws 
Exception {
+        testHarness.setStateTtlProcessingTime(1);
+        testHarness.processElement1(
+                insertRecord("Ord#1", "LineOrd#1", "3 Bellevue Drive, 
Pottstown, PA 19464"));
+        assertor.shouldEmitNothing(testHarness);
+
+        testHarness.processElement1(
+                insertRecord("Ord#1", "LineOrd#2", "3 Bellevue Drive, 
Pottstown, PA 19464"));
+        assertor.shouldEmitNothing(testHarness);
+
+        testHarness.processElement2(insertRecord("LineOrd#2", "AIR"));
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#1",
+                        "LineOrd#2",
+                        "3 Bellevue Drive, Pottstown, PA 19464",
+                        "LineOrd#2",
+                        "AIR"));
+
+        // the right side state of LineOrd#2 has expired
+        testHarness.setStateTtlProcessingTime(3000);
+        testHarness.processElement1(
+                updateAfterRecord(
+                        "Ord#1", "LineOrd#2", "68 Manor Station Street, 
Honolulu, HI 96815"));
+        assertor.shouldEmitNothing(testHarness);
+
+        testHarness.processElement2(updateAfterRecord("LineOrd#2", "SHIP"));
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.UPDATE_AFTER,
+                        "Ord#1",
+                        "LineOrd#2",
+                        "68 Manor Station Street, Honolulu, HI 96815",
+                        "LineOrd#2",
+                        "SHIP"));
+
+        // the left side state of LineOrd#1 has expired
+        testHarness.setStateTtlProcessingTime(4001);
+        testHarness.processElement2(insertRecord("LineOrd#1", "TRUCK"));
+        assertor.shouldEmitNothing(testHarness);
+
+        testHarness.processElement2(deleteRecord("LineOrd#2", "SHIP"));
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.DELETE,
+                        "Ord#1",
+                        "LineOrd#2",
+                        "68 Manor Station Street, Honolulu, HI 96815",
+                        "LineOrd#2",
+                        "SHIP"));
+
+        // the left side state of LineOrd#2 has expired
+        testHarness.setStateTtlProcessingTime(7000);
+        testHarness.processElement2(insertRecord("LineOrd#2", "RAIL"));
+        assertor.shouldEmitNothing(testHarness);
+    }
+
+    /**
+     * The equivalent SQL is same with {@link 
#testInnerJoinWithDifferentStateRetentionTime}. The
+     * only difference is that the state retention is disabled.
+     */
+    @Test
+    public void testInnerJoinWithStateRetentionDisabled() throws Exception {
+        testHarness.setStateTtlProcessingTime(1);
+        testHarness.processElement1(
+                insertRecord("Ord#1", "LineOrd#1", "3 Bellevue Drive, 
Pottstown, PA 19464"));
+        assertor.shouldEmitNothing(testHarness);
+
+        testHarness.processElement1(
+                insertRecord("Ord#1", "LineOrd#2", "3 Bellevue Drive, 
Pottstown, PA 19464"));
+        assertor.shouldEmitNothing(testHarness);
+
+        testHarness.processElement2(insertRecord("LineOrd#2", "AIR"));
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#1",
+                        "LineOrd#2",
+                        "3 Bellevue Drive, Pottstown, PA 19464",
+                        "LineOrd#2",
+                        "AIR"));
+
+        testHarness.setStateTtlProcessingTime(3000);
+        testHarness.processElement1(
+                updateAfterRecord(
+                        "Ord#1", "LineOrd#2", "68 Manor Station Street, 
Honolulu, HI 96815"));
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.UPDATE_AFTER,
+                        "Ord#1",
+                        "LineOrd#2",
+                        "68 Manor Station Street, Honolulu, HI 96815",
+                        "LineOrd#2",
+                        "AIR"));
+
+        testHarness.processElement2(updateAfterRecord("LineOrd#2", "SHIP"));
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.UPDATE_AFTER,
+                        "Ord#1",
+                        "LineOrd#2",
+                        "68 Manor Station Street, Honolulu, HI 96815",
+                        "LineOrd#2",
+                        "SHIP"));
+
+        testHarness.setStateTtlProcessingTime(4001);
+        testHarness.processElement2(insertRecord("LineOrd#1", "TRUCK"));
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#1",
+                        "LineOrd#1",
+                        "3 Bellevue Drive, Pottstown, PA 19464",
+                        "LineOrd#1",
+                        "TRUCK"));
+
+        testHarness.processElement2(deleteRecord("LineOrd#2", "SHIP"));
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.DELETE,
+                        "Ord#1",
+                        "LineOrd#2",
+                        "68 Manor Station Street, Honolulu, HI 96815",
+                        "LineOrd#2",
+                        "SHIP"));
+
+        testHarness.setStateTtlProcessingTime(7000);
+        testHarness.processElement2(insertRecord("LineOrd#2", "RAIL"));
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#1",
+                        "LineOrd#2",
+                        "68 Manor Station Street, Honolulu, HI 96815",
+                        "LineOrd#2",
+                        "RAIL"));
+    }
+
+    /**
+     * The equivalent SQL is same with 
testInnerJoinWithDifferentStateRetentionTime. The only
+     * difference is that the left and right state retention time are same.
+     */
+    @Tag("leftStateRetentionTime=4000")
+    @Tag("rightStateRetentionTime=4000")
+    @Test
+    public void testInnerJoinWithSameStateRetentionTime() throws Exception {
+        testHarness.setStateTtlProcessingTime(1);
+        testHarness.processElement1(
+                insertRecord("Ord#1", "LineOrd#1", "3 Bellevue Drive, 
Pottstown, PA 19464"));
+        assertor.shouldEmitNothing(testHarness);
+
+        testHarness.processElement1(
+                insertRecord("Ord#1", "LineOrd#2", "3 Bellevue Drive, 
Pottstown, PA 19464"));
+        assertor.shouldEmitNothing(testHarness);
+
+        testHarness.processElement2(insertRecord("LineOrd#2", "AIR"));
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#1",
+                        "LineOrd#2",
+                        "3 Bellevue Drive, Pottstown, PA 19464",
+                        "LineOrd#2",
+                        "AIR"));
+
+        // extend the expired time to 8000 for LineOrd#2
+        testHarness.setStateTtlProcessingTime(4000);
+        testHarness.processElement1(
+                updateAfterRecord(
+                        "Ord#1", "LineOrd#2", "68 Manor Station Street, 
Honolulu, HI 96815"));
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.UPDATE_AFTER,
+                        "Ord#1",
+                        "LineOrd#2",
+                        "68 Manor Station Street, Honolulu, HI 96815",
+                        "LineOrd#2",
+                        "AIR"));
+
+        // the state of LineOrd#1 has expired
+        testHarness.setStateTtlProcessingTime(4001);
+        testHarness.processElement2(insertRecord("LineOrd#1", "TRUCK"));
+        assertor.shouldEmitNothing(testHarness);
+
+        // the expired time for left and right state of LineOrd#2 is 8000
+        testHarness.setStateTtlProcessingTime(7999);
+        testHarness.processElement2(updateAfterRecord("LineOrd#2", "TRUCK"));
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.UPDATE_AFTER,
+                        "Ord#1",
+                        "LineOrd#2",
+                        "68 Manor Station Street, Honolulu, HI 96815",
+                        "LineOrd#2",
+                        "TRUCK"));
+
+        testHarness.setStateTtlProcessingTime(8000);
+        testHarness.processElement2(updateAfterRecord("LineOrd#2", "RAIL"));
+        assertor.shouldEmitNothing(testHarness);
+    }
+
+    /**
+     * The equivalent SQL as follows.
+     *
+     * <p>{@code SELECT a.order_id, a.line_order_id, a.shipping_address, 
b.line_order_id,
+     * b.line_order_ship_mode FROM orders a LEFT JOIN line_orders b ON 
a.line_order_id =
+     * b.line_order_id}
+     */
+    @Tag("leftStateRetentionTime=4000")
+    @Tag("rightStateRetentionTime=1000")
+    @Test
+    public void testLeftOuterJoinWithDifferentStateRetentionTime() throws 
Exception {
+        testHarness.setStateTtlProcessingTime(1);
+        testHarness.processElement1(
+                insertRecord("Ord#1", "LineOrd#1", "3 Bellevue Drive, 
Pottstown, PA 19464"));
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#1",
+                        "LineOrd#1",
+                        "3 Bellevue Drive, Pottstown, PA 19464",
+                        null,
+                        null));
+
+        testHarness.processElement1(
+                insertRecord("Ord#1", "LineOrd#2", "3 Bellevue Drive, 
Pottstown, PA 19464"));
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#1",
+                        "LineOrd#2",
+                        "3 Bellevue Drive, Pottstown, PA 19464",
+                        null,
+                        null));
+
+        testHarness.processElement2(insertRecord("LineOrd#2", "AIR"));
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.DELETE,
+                        "Ord#1",
+                        "LineOrd#2",
+                        "3 Bellevue Drive, Pottstown, PA 19464",
+                        null,
+                        null),
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#1",
+                        "LineOrd#2",
+                        "3 Bellevue Drive, Pottstown, PA 19464",
+                        "LineOrd#2",
+                        "AIR"));
+
+        // the right side state of LineOrd#2 has expired
+        testHarness.setStateTtlProcessingTime(3000);
+        testHarness.processElement1(
+                updateAfterRecord(
+                        "Ord#1", "LineOrd#2", "68 Manor Station Street, 
Honolulu, HI 96815"));
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#1",
+                        "LineOrd#2",
+                        "68 Manor Station Street, Honolulu, HI 96815",
+                        null,
+                        null));
+
+        testHarness.processElement2(updateAfterRecord("LineOrd#2", "SHIP"));
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.DELETE,
+                        "Ord#1",
+                        "LineOrd#2",
+                        "68 Manor Station Street, Honolulu, HI 96815",
+                        null,
+                        null),
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#1",
+                        "LineOrd#2",
+                        "68 Manor Station Street, Honolulu, HI 96815",
+                        "LineOrd#2",
+                        "SHIP"));
+
+        // the left side state of LineOrd#1 has expired
+        testHarness.setStateTtlProcessingTime(4001);
+        testHarness.processElement2(insertRecord("LineOrd#1", "TRUCK"));
+        assertor.shouldEmitNothing(testHarness);
+
+        testHarness.processElement2(deleteRecord("LineOrd#2", "SHIP"));
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.DELETE,
+                        "Ord#1",
+                        "LineOrd#2",
+                        "68 Manor Station Street, Honolulu, HI 96815",
+                        "LineOrd#2",
+                        "SHIP"),
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#1",
+                        "LineOrd#2",
+                        "68 Manor Station Street, Honolulu, HI 96815",
+                        null,
+                        null));
+
+        // the left side state of LineOrd#2 has expired
+        testHarness.setStateTtlProcessingTime(8001);
+        testHarness.processElement2(insertRecord("LineOrd#2", "RAIL"));
+        assertor.shouldEmitNothing(testHarness);
+    }
+
+    /**
+     * The equivalent SQL is the same as {@link
+     * #testLeftOuterJoinWithDifferentStateRetentionTime()}. The only 
difference is that the state
+     * retention is disabled.
+     */
+    @Test
+    public void testLeftOuterJoinWithStateRetentionDisabled() throws Exception 
{
+        testHarness.setStateTtlProcessingTime(1);
+        testHarness.processElement1(
+                insertRecord("Ord#1", "LineOrd#1", "3 Bellevue Drive, 
Pottstown, PA 19464"));
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#1",
+                        "LineOrd#1",
+                        "3 Bellevue Drive, Pottstown, PA 19464",
+                        null,
+                        null));
+
+        testHarness.processElement1(
+                insertRecord("Ord#1", "LineOrd#2", "3 Bellevue Drive, 
Pottstown, PA 19464"));
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#1",
+                        "LineOrd#2",
+                        "3 Bellevue Drive, Pottstown, PA 19464",
+                        null,
+                        null));
+
+        testHarness.processElement2(insertRecord("LineOrd#2", "AIR"));
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.DELETE,
+                        "Ord#1",
+                        "LineOrd#2",
+                        "3 Bellevue Drive, Pottstown, PA 19464",
+                        null,
+                        null),
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#1",
+                        "LineOrd#2",
+                        "3 Bellevue Drive, Pottstown, PA 19464",
+                        "LineOrd#2",
+                        "AIR"));
+
+        testHarness.setStateTtlProcessingTime(3000);
+        testHarness.processElement1(
+                updateAfterRecord(
+                        "Ord#1", "LineOrd#2", "68 Manor Station Street, 
Honolulu, HI 96815"));
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#1",
+                        "LineOrd#2",
+                        "68 Manor Station Street, Honolulu, HI 96815",
+                        "LineOrd#2",
+                        "AIR"));
+
+        testHarness.processElement2(updateAfterRecord("LineOrd#2", "SHIP"));
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#1",
+                        "LineOrd#2",
+                        "68 Manor Station Street, Honolulu, HI 96815",
+                        "LineOrd#2",
+                        "SHIP"));
+
+        testHarness.setStateTtlProcessingTime(4001);
+        testHarness.processElement2(insertRecord("LineOrd#1", "TRUCK"));
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.DELETE,
+                        "Ord#1",
+                        "LineOrd#1",
+                        "3 Bellevue Drive, Pottstown, PA 19464",
+                        null,
+                        null),
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#1",
+                        "LineOrd#1",
+                        "3 Bellevue Drive, Pottstown, PA 19464",
+                        "LineOrd#1",
+                        "TRUCK"));
+
+        testHarness.setStateTtlProcessingTime(8001);
+        testHarness.processElement2(deleteRecord("LineOrd#2", "SHIP"));
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.DELETE,
+                        "Ord#1",
+                        "LineOrd#2",
+                        "68 Manor Station Street, Honolulu, HI 96815",
+                        "LineOrd#2",
+                        "SHIP"));
+    }
+
+    /**
+     * The equivalent SQL as follows.
+     *
+     * <p>{@code SELECT a.order_id, a.line_order_id, a.shipping_address, 
b.line_order_id,
+     * b.line_order_ship_mode FROM orders a RIGHT JOIN line_orders b ON 
a.line_order_id =
+     * b.line_order_id}
+     */
+    @Tag("leftStateRetentionTime=4000")
+    @Tag("rightStateRetentionTime=1000")
+    @Test
+    public void testRightOuterJoinWithDifferentStateRetentionTime() throws 
Exception {
+        testHarness.setStateTtlProcessingTime(1);
+        testHarness.processElement1(
+                insertRecord("Ord#1", "LineOrd#1", "3 Bellevue Drive, 
Pottstown, PA 19464"));
+        assertor.shouldEmitNothing(testHarness);
+
+        testHarness.processElement1(
+                insertRecord("Ord#1", "LineOrd#2", "3 Bellevue Drive, 
Pottstown, PA 19464"));
+        assertor.shouldEmitNothing(testHarness);
+
+        // left side state is expired
+        testHarness.setStateTtlProcessingTime(4001);
+        testHarness.processElement2(insertRecord("LineOrd#2", "AIR"));
+        assertor.shouldEmit(
+                testHarness, rowOfKind(RowKind.INSERT, null, null, null, 
"LineOrd#2", "AIR"));
+
+        testHarness.processElement2(insertRecord("LineOrd#1", "TRUCK"));
+        assertor.shouldEmit(
+                testHarness, rowOfKind(RowKind.INSERT, null, null, null, 
"LineOrd#1", "TRUCK"));
+
+        // the right side state has expired
+        testHarness.setStateTtlProcessingTime(5001);
+        testHarness.processElement1(
+                updateAfterRecord(
+                        "Ord#1", "LineOrd#2", "68 Manor Station Street, 
Honolulu, HI 96815"));
+        assertor.shouldEmitNothing(testHarness);
+
+        testHarness.processElement2(updateAfterRecord("LineOrd#2", "SHIP"));
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#1",
+                        "LineOrd#2",
+                        "68 Manor Station Street, Honolulu, HI 96815",
+                        "LineOrd#2",
+                        "SHIP"));
+
+        testHarness.processElement2(updateAfterRecord("LineOrd#1", "RAIL"));
+        assertor.shouldEmit(
+                testHarness, rowOfKind(RowKind.INSERT, null, null, null, 
"LineOrd#1", "RAIL"));
+
+        testHarness.setStateTtlProcessingTime(6000);
+        testHarness.processElement1(
+                updateAfterRecord(
+                        "Ord#1", "LineOrd#1", "3 North Winchester Drive, 
Haines City, FL 33844"));
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(RowKind.DELETE, null, null, null, "LineOrd#1", 
"RAIL"),
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#1",
+                        "LineOrd#1",
+                        "3 North Winchester Drive, Haines City, FL 33844",
+                        "LineOrd#1",
+                        "RAIL"));
+
+        // right side state has expired
+        testHarness.setStateTtlProcessingTime(7000);
+        testHarness.processElement1(
+                deleteRecord(
+                        "Ord#1", "LineOrd#1", "3 North Winchester Drive, 
Haines City, FL 33844"));
+        assertor.shouldEmitNothing(testHarness);
+    }
+
+    /**
+     * The equivalent SQL is the same as {@link
+     * #testRightOuterJoinWithDifferentStateRetentionTime()}. The only 
difference is that the state
+     * retention is disabled.
+     */
+    @Test
+    public void testRightOuterJoinWithDStateRetentionDisabled() throws 
Exception {
+        testHarness.setStateTtlProcessingTime(1);
+        testHarness.processElement1(
+                insertRecord("Ord#1", "LineOrd#1", "3 Bellevue Drive, 
Pottstown, PA 19464"));
+        assertor.shouldEmitNothing(testHarness);
+
+        testHarness.processElement1(
+                insertRecord("Ord#1", "LineOrd#2", "3 Bellevue Drive, 
Pottstown, PA 19464"));
+        assertor.shouldEmitNothing(testHarness);
+
+        testHarness.setStateTtlProcessingTime(4001);
+        testHarness.processElement2(insertRecord("LineOrd#2", "AIR"));
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#1",
+                        "LineOrd#2",
+                        "3 Bellevue Drive, Pottstown, PA 19464",
+                        "LineOrd#2",
+                        "AIR"));
+
+        testHarness.setStateTtlProcessingTime(10000);
+        testHarness.processElement2(insertRecord("LineOrd#1", "TRUCK"));
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#1",
+                        "LineOrd#1",
+                        "3 Bellevue Drive, Pottstown, PA 19464",
+                        "LineOrd#1",
+                        "TRUCK"));
+
+        testHarness.setStateTtlProcessingTime(20000);
+        testHarness.processElement1(
+                updateAfterRecord(
+                        "Ord#1", "LineOrd#2", "68 Manor Station Street, 
Honolulu, HI 96815"));
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#1",
+                        "LineOrd#2",
+                        "68 Manor Station Street, Honolulu, HI 96815",
+                        "LineOrd#2",
+                        "AIR"));
+    }
+
+    private static final Function<String, Boolean[]> JOIN_TYPE_EXTRACTOR =
+            (testDisplayName) -> {
+                if (testDisplayName.contains("InnerJoin")) {
+                    return new Boolean[] {false, false};
+                } else if (testDisplayName.contains("LeftOuterJoin")) {
+                    return new Boolean[] {true, false};
+                } else {
+                    return new Boolean[] {false, true};
+                }
+            };
+}
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperatorTestBase.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperatorTestBase.java
new file mode 100644
index 00000000000..48cc9884e66
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperatorTestBase.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join.stream;
+
+import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import 
org.apache.flink.table.runtime.operators.join.stream.state.JoinInputSideSpec;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.utils.HandwrittenSelectorUtil;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestInfo;
+
+import java.util.Set;
+import java.util.function.Function;
+
+/** Base test class for {@link AbstractStreamingJoinOperator}. */
+public abstract class StreamingJoinOperatorTestBase {
+
+    protected final InternalTypeInfo<RowData> leftTypeInfo =
+            InternalTypeInfo.of(
+                    RowType.of(
+                            new LogicalType[] {
+                                new CharType(false, 20),
+                                new CharType(false, 20),
+                                VarCharType.STRING_TYPE
+                            },
+                            new String[] {"order_id", "line_order_id", 
"shipping_address"}));
+
+    protected final InternalTypeInfo<RowData> rightTypeInfo =
+            InternalTypeInfo.of(
+                    RowType.of(
+                            new LogicalType[] {new CharType(false, 20), new 
CharType(true, 10)},
+                            new String[] {"line_order_id0", 
"line_order_ship_mode"}));
+
+    protected final RowDataKeySelector leftKeySelector =
+            HandwrittenSelectorUtil.getRowDataSelector(
+                    new int[] {1},
+                    leftTypeInfo.toRowType().getChildren().toArray(new 
LogicalType[0]));
+    protected final RowDataKeySelector rightKeySelector =
+            HandwrittenSelectorUtil.getRowDataSelector(
+                    new int[] {0},
+                    rightTypeInfo.toRowType().getChildren().toArray(new 
LogicalType[0]));
+
+    protected final JoinInputSideSpec leftInputSpec =
+            JoinInputSideSpec.withUniqueKeyContainedByJoinKey(leftTypeInfo, 
leftKeySelector);
+    protected final JoinInputSideSpec rightInputSpec =
+            JoinInputSideSpec.withUniqueKeyContainedByJoinKey(rightTypeInfo, 
rightKeySelector);
+
+    protected final InternalTypeInfo<RowData> joinKeyTypeInfo =
+            InternalTypeInfo.of(new CharType(false, 20));
+
+    protected final String funcCode =
+            "public class ConditionFunction extends 
org.apache.flink.api.common.functions.AbstractRichFunction "
+                    + "implements 
org.apache.flink.table.runtime.generated.JoinCondition {\n"
+                    + "\n"
+                    + "    public ConditionFunction(Object[] reference) {\n"
+                    + "    }\n"
+                    + "\n"
+                    + "    @Override\n"
+                    + "    public boolean 
apply(org.apache.flink.table.data.RowData in1, 
org.apache.flink.table.data.RowData in2) {\n"
+                    + "        return true;\n"
+                    + "    }\n"
+                    + "\n"
+                    + "    @Override\n"
+                    + "    public void close() throws Exception {\n"
+                    + "        super.close();\n"
+                    + "    }"
+                    + "}\n";
+    protected final GeneratedJoinCondition joinCondition =
+            new GeneratedJoinCondition("ConditionFunction", funcCode, new 
Object[0]);
+
+    protected final RowDataHarnessAssertor assertor =
+            new 
RowDataHarnessAssertor(getOutputType().getChildren().toArray(new 
LogicalType[0]));
+
+    protected KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, 
RowData, RowData>
+            testHarness;
+
+    @BeforeEach
+    public void beforeEach(TestInfo testInfo) throws Exception {
+        testHarness =
+                new KeyedTwoInputStreamOperatorTestHarness<>(
+                        createJoinOperator(testInfo),
+                        leftKeySelector,
+                        rightKeySelector,
+                        joinKeyTypeInfo);
+        testHarness.open();
+    }
+
+    @AfterEach
+    public void afterEach() throws Exception {
+        testHarness.close();
+    }
+
+    protected static final Function<Set<String>, Long[]> 
STATE_RETENTION_TIME_EXTRACTOR =
+            (tags) -> {
+                if (tags.isEmpty()) {
+                    return new Long[] {0L, 0L};
+                }
+                Long[] ttl = new Long[2];
+                for (String tag : tags) {
+                    String[] splits = tag.split("=");
+                    long value = Long.parseLong(splits[1].trim());
+                    if (splits[0].trim().startsWith("left")) {
+                        ttl[0] = value;
+                    } else {
+                        ttl[1] = value;
+                    }
+                }
+                return ttl;
+            };
+
+    /** Create streaming join operator according to {@link TestInfo}. */
+    protected abstract AbstractStreamingJoinOperator 
createJoinOperator(TestInfo testInfo);
+
+    /** Get the output row type of join operator. */
+    protected abstract RowType getOutputType();
+}
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/stream/StreamingSemiAntiJoinOperatorTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/stream/StreamingSemiAntiJoinOperatorTest.java
new file mode 100644
index 00000000000..5aa112c9fe1
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/stream/StreamingSemiAntiJoinOperatorTest.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join.stream;
+
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+
+import java.util.function.Predicate;
+
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord;
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
+import static org.apache.flink.table.runtime.util.StreamRecordUtils.rowOfKind;
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord;
+
+/** Test for {@link StreamingSemiAntiJoinOperator}. */
+public class StreamingSemiAntiJoinOperatorTest extends 
StreamingJoinOperatorTestBase {
+    @Override
+    protected StreamingSemiAntiJoinOperator createJoinOperator(TestInfo 
testInfo) {
+        Long[] ttl = STATE_RETENTION_TIME_EXTRACTOR.apply(testInfo.getTags());
+        return new StreamingSemiAntiJoinOperator(
+                ANTI_JOIN_CHECKER.test(testInfo.getDisplayName()),
+                leftTypeInfo,
+                rightTypeInfo,
+                joinCondition,
+                leftInputSpec,
+                rightInputSpec,
+                new boolean[] {true},
+                ttl[0],
+                ttl[1]);
+    }
+
+    @Override
+    protected RowType getOutputType() {
+        return leftTypeInfo.toRowType();
+    }
+
+    /**
+     * The equivalent SQL as follows.
+     *
+     * <p>{@code SELECT a.order_id, a.line_order_id, a.shipping_address FROM 
orders a WHERE
+     * a.line_order_id IN (SELECT b.line_order_id FROM line_orders b)}
+     */
+    @Tag("leftStateRetentionTime=4000")
+    @Tag("rightStateRetentionTime=1000")
+    @Test
+    public void testLeftSemiJoinWithDifferentStateRetentionTime() throws 
Exception {
+        testHarness.setStateTtlProcessingTime(1);
+        testHarness.processElement1(
+                insertRecord("Ord#1", "LineOrd#1", "3 Bellevue Drive, 
Pottstown, PA 19464"));
+        assertor.shouldEmitNothing(testHarness);
+
+        testHarness.processElement1(
+                insertRecord("Ord#1", "LineOrd#2", "3 Bellevue Drive, 
Pottstown, PA 19464"));
+        assertor.shouldEmitNothing(testHarness);
+
+        testHarness.setStateTtlProcessingTime(3001);
+        testHarness.processElement2(insertRecord("LineOrd#2", "AIR"));
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#1",
+                        "LineOrd#2",
+                        "3 Bellevue Drive, Pottstown, PA 19464"));
+
+        testHarness.processElement2(updateAfterRecord("LineOrd#2", "TRUCK"));
+        assertor.shouldEmitNothing(testHarness);
+
+        testHarness.processElement2(deleteRecord("LineOrd#2", "TRUCK"));
+        assertor.shouldEmitNothing(testHarness);
+
+        // numOfAssociations is reduced to 1, retract the record
+        testHarness.processElement2(deleteRecord("LineOrd#2", "AIR"));
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.DELETE,
+                        "Ord#1",
+                        "LineOrd#2",
+                        "3 Bellevue Drive, Pottstown, PA 19464"));
+
+        // the left side state of LineOrd#1 has expired
+        testHarness.setStateTtlProcessingTime(4001);
+        testHarness.processElement2(insertRecord("LineOrd#1", "SHIP"));
+        assertor.shouldEmitNothing(testHarness);
+
+        // the right side state of LineOrd#1 has expired
+        testHarness.setStateTtlProcessingTime(5001);
+        testHarness.processElement1(
+                updateAfterRecord("Ord#1", "LineOrd#1", "7238 Marsh St., 
Birmingham, AL 35209"));
+        assertor.shouldEmitNothing(testHarness);
+    }
+
+    /**
+     * The equivalent SQL is the same as {@link 
#testLeftSemiJoinWithDifferentStateRetentionTime()}.
+     * The only difference is that the state retention is disabled.
+     */
+    @Test
+    public void testLeftSemiJoinWithStateRetentionDisabled() throws Exception {
+        testHarness.setStateTtlProcessingTime(1);
+        testHarness.processElement1(
+                insertRecord("Ord#1", "LineOrd#1", "3 Bellevue Drive, 
Pottstown, PA 19464"));
+        assertor.shouldEmitNothing(testHarness);
+
+        testHarness.processElement1(
+                insertRecord("Ord#1", "LineOrd#2", "3 Bellevue Drive, 
Pottstown, PA 19464"));
+        assertor.shouldEmitNothing(testHarness);
+
+        testHarness.setStateTtlProcessingTime(3001);
+        testHarness.processElement2(insertRecord("LineOrd#2", "AIR"));
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#1",
+                        "LineOrd#2",
+                        "3 Bellevue Drive, Pottstown, PA 19464"));
+
+        testHarness.processElement2(updateAfterRecord("LineOrd#2", "TRUCK"));
+        assertor.shouldEmitNothing(testHarness);
+
+        testHarness.processElement2(deleteRecord("LineOrd#2", "TRUCK"));
+        assertor.shouldEmitNothing(testHarness);
+
+        // numOfAssociations is reduced to 1, retract the record
+        testHarness.processElement2(deleteRecord("LineOrd#2", "AIR"));
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.DELETE,
+                        "Ord#1",
+                        "LineOrd#2",
+                        "3 Bellevue Drive, Pottstown, PA 19464"));
+
+        testHarness.setStateTtlProcessingTime(4001);
+        testHarness.processElement2(insertRecord("LineOrd#1", "SHIP"));
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#1",
+                        "LineOrd#1",
+                        "3 Bellevue Drive, Pottstown, PA 19464"));
+
+        // the right side state of LineOrd#1 has expired
+        testHarness.setStateTtlProcessingTime(5001);
+        testHarness.processElement1(
+                updateAfterRecord("Ord#1", "LineOrd#1", "7238 Marsh St., 
Birmingham, AL 35209"));
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.UPDATE_AFTER,
+                        "Ord#1",
+                        "LineOrd#1",
+                        "7238 Marsh St., Birmingham, AL 35209"));
+    }
+
+    /**
+     * The equivalent SQL as follows.
+     *
+     * <p>{@code SELECT a.order_id, a.line_order_id, a.shipping_address FROM 
orders a WHERE
+     * a.line_order_id NOT IN (SELECT b.line_order_id FROM line_orders b)}
+     */
+    @Tag("leftStateRetentionTime=4000")
+    @Tag("rightStateRetentionTime=1000")
+    @Test
+    public void testLeftAntiJoinWithDifferentStateRetentionTime() throws 
Exception {
+        testHarness.setStateTtlProcessingTime(1);
+        testHarness.processElement1(
+                insertRecord("Ord#1", "LineOrd#1", "3 Bellevue Drive, 
Pottstown, PA 19464"));
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#1",
+                        "LineOrd#1",
+                        "3 Bellevue Drive, Pottstown, PA 19464"));
+
+        testHarness.processElement1(
+                insertRecord("Ord#1", "LineOrd#2", "3 Bellevue Drive, 
Pottstown, PA 19464"));
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#1",
+                        "LineOrd#2",
+                        "3 Bellevue Drive, Pottstown, PA 19464"));
+
+        testHarness.setStateTtlProcessingTime(3001);
+        testHarness.processElement2(insertRecord("LineOrd#2", "AIR"));
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.DELETE,
+                        "Ord#1",
+                        "LineOrd#2",
+                        "3 Bellevue Drive, Pottstown, PA 19464"));
+
+        // left side state of LineOrd#1 has expired
+        testHarness.setStateTtlProcessingTime(4001);
+        testHarness.processElement2(insertRecord("LineOrd#1", "RAIL"));
+        assertor.shouldEmitNothing(testHarness);
+
+        // right side state of LineOrd#1 has expired
+        testHarness.setStateTtlProcessingTime(5001);
+        testHarness.processElement1(
+                updateAfterRecord(
+                        "Ord#1", "LineOrd#1", "23 W. River Avenue, Port 
Orange, FL 32127"));
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.UPDATE_AFTER,
+                        "Ord#1",
+                        "LineOrd#1",
+                        "23 W. River Avenue, Port Orange, FL 32127"));
+    }
+
+    /**
+     * The equivalent SQL is the same as {@link 
#testLeftAntiJoinWithDifferentStateRetentionTime()}.
+     * The only difference is that the state retention is disabled.
+     */
+    @Test
+    public void testLeftAntiJoinWithStateRetentionTimeDisabled() throws 
Exception {
+        testHarness.setStateTtlProcessingTime(1);
+        testHarness.processElement1(
+                insertRecord("Ord#1", "LineOrd#1", "3 Bellevue Drive, 
Pottstown, PA 19464"));
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#1",
+                        "LineOrd#1",
+                        "3 Bellevue Drive, Pottstown, PA 19464"));
+
+        testHarness.processElement1(
+                insertRecord("Ord#1", "LineOrd#2", "3 Bellevue Drive, 
Pottstown, PA 19464"));
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#1",
+                        "LineOrd#2",
+                        "3 Bellevue Drive, Pottstown, PA 19464"));
+
+        testHarness.setStateTtlProcessingTime(3001);
+        testHarness.processElement2(insertRecord("LineOrd#2", "AIR"));
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.DELETE,
+                        "Ord#1",
+                        "LineOrd#2",
+                        "3 Bellevue Drive, Pottstown, PA 19464"));
+
+        testHarness.setStateTtlProcessingTime(4001);
+        testHarness.processElement2(insertRecord("LineOrd#1", "RAIL"));
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.DELETE,
+                        "Ord#1",
+                        "LineOrd#1",
+                        "3 Bellevue Drive, Pottstown, PA 19464"));
+
+        testHarness.setStateTtlProcessingTime(5001);
+        testHarness.processElement1(
+                updateAfterRecord(
+                        "Ord#1", "LineOrd#1", "23 W. River Avenue, Port 
Orange, FL 32127"));
+        assertor.shouldEmitNothing(testHarness);
+    }
+
+    private static final Predicate<String> ANTI_JOIN_CHECKER =
+            (testDisplayName) -> testDisplayName.contains("Anti");
+}
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java
index 671b2ec07d6..372318c0476 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java
@@ -19,15 +19,13 @@
 package org.apache.flink.table.runtime.operators.sink;
 
 import org.apache.flink.api.common.state.StateTtlConfig;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
 import org.apache.flink.table.runtime.generated.RecordEqualiser;
 import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
 import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
 import org.apache.flink.table.runtime.util.StateConfigUtil;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.IntType;
@@ -38,14 +36,10 @@ import org.apache.flink.types.RowKind;
 
 import org.junit.Test;
 
-import java.util.ArrayList;
-import java.util.List;
-
 import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord;
 import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
 import static org.apache.flink.table.runtime.util.StreamRecordUtils.rowOfKind;
 import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord;
-import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link SinkUpsertMaterializer}. */
 public class SinkUpsertMaterializerTest {
@@ -56,6 +50,7 @@ public class SinkUpsertMaterializerTest {
     private final RowDataSerializer serializer = new RowDataSerializer(types);
     private final RowDataKeySelector keySelector =
             HandwrittenSelectorUtil.getRowDataSelector(new int[] {1}, types);
+    private final RowDataHarnessAssertor assertor = new 
RowDataHarnessAssertor(types);
 
     private final GeneratedRecordEqualiser equaliser =
             new GeneratedRecordEqualiser("", "", new Object[0]) {
@@ -89,30 +84,30 @@ public class SinkUpsertMaterializerTest {
         testHarness.setStateTtlProcessingTime(1);
 
         testHarness.processElement(insertRecord(1L, 1, "a1"));
-        shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1L, 1, "a1"));
+        assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1L, 1, 
"a1"));
 
         testHarness.processElement(insertRecord(2L, 1, "a2"));
-        shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 2L, 1, "a2"));
+        assertor.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 2L, 
1, "a2"));
 
         testHarness.processElement(insertRecord(3L, 1, "a3"));
-        shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 3L, 1, "a3"));
+        assertor.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 3L, 
1, "a3"));
 
         testHarness.processElement(deleteRecord(2L, 1, "a2"));
-        shouldEmitNothing(testHarness);
+        assertor.shouldEmitNothing(testHarness);
 
         testHarness.processElement(deleteRecord(3L, 1, "a3"));
-        shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 1L, 1, "a1"));
+        assertor.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 1L, 
1, "a1"));
 
         testHarness.processElement(deleteRecord(1L, 1, "a1"));
-        shouldEmit(testHarness, rowOfKind(RowKind.DELETE, 1L, 1, "a1"));
+        assertor.shouldEmit(testHarness, rowOfKind(RowKind.DELETE, 1L, 1, 
"a1"));
 
         testHarness.processElement(insertRecord(4L, 1, "a4"));
-        shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 4L, 1, "a4"));
+        assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 4L, 1, 
"a4"));
 
         testHarness.setStateTtlProcessingTime(1002);
 
         testHarness.processElement(deleteRecord(4L, 1, "a4"));
-        shouldEmitNothing(testHarness);
+        assertor.shouldEmitNothing(testHarness);
 
         testHarness.close();
     }
@@ -131,54 +126,31 @@ public class SinkUpsertMaterializerTest {
         testHarness.setStateTtlProcessingTime(1);
 
         testHarness.processElement(insertRecord(1L, 1, "a1"));
-        shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1L, 1, "a1"));
+        assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1L, 1, 
"a1"));
 
         testHarness.processElement(updateAfterRecord(1L, 1, "a11"));
-        shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 1L, 1, "a11"));
+        assertor.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 1L, 
1, "a11"));
 
         testHarness.processElement(insertRecord(3L, 1, "a3"));
-        shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 3L, 1, "a3"));
+        assertor.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 3L, 
1, "a3"));
 
         testHarness.processElement(deleteRecord(1L, 1, "a111"));
-        shouldEmitNothing(testHarness);
+        assertor.shouldEmitNothing(testHarness);
 
         testHarness.processElement(deleteRecord(3L, 1, "a33"));
-        shouldEmit(testHarness, rowOfKind(RowKind.DELETE, 3L, 1, "a33"));
+        assertor.shouldEmit(testHarness, rowOfKind(RowKind.DELETE, 3L, 1, 
"a33"));
 
         testHarness.processElement(insertRecord(4L, 1, "a4"));
-        shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 4L, 1, "a4"));
+        assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 4L, 1, 
"a4"));
 
         testHarness.setStateTtlProcessingTime(1002);
 
         testHarness.processElement(deleteRecord(4L, 1, "a4"));
-        shouldEmitNothing(testHarness);
+        assertor.shouldEmitNothing(testHarness);
 
         testHarness.close();
     }
 
-    private void shouldEmitNothing(OneInputStreamOperatorTestHarness<RowData, 
RowData> harness) {
-        assertThat(getEmittedRows(harness)).isEmpty();
-    }
-
-    private void shouldEmit(
-            OneInputStreamOperatorTestHarness<RowData, RowData> harness, 
RowData expected) {
-        assertThat(getEmittedRows(harness)).containsExactly(expected);
-    }
-
-    private static List<RowData> getEmittedRows(
-            OneInputStreamOperatorTestHarness<RowData, RowData> harness) {
-        final List<RowData> rows = new ArrayList<>();
-        Object o;
-        while ((o = harness.getOutput().poll()) != null) {
-            RowData value = (RowData) ((StreamRecord<?>) o).getValue();
-            GenericRowData newRow =
-                    GenericRowData.of(value.getLong(0), value.getInt(1), 
value.getString(2));
-            newRow.setRowKind(value.getRowKind());
-            rows.add(newRow);
-        }
-        return rows;
-    }
-
     private static class TestRecordEqualiser implements RecordEqualiser {
         @Override
         public boolean equals(RowData row1, RowData row2) {
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/RowDataHarnessAssertor.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/RowDataHarnessAssertor.java
index c60967349a9..05bfed74d6b 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/RowDataHarnessAssertor.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/RowDataHarnessAssertor.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.runtime.util;
 
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -49,6 +50,17 @@ public class RowDataHarnessAssertor {
         this(types, new StringComparator());
     }
 
+    /** Assert the test harness should not emit any records. */
+    public void shouldEmitNothing(AbstractStreamOperatorTestHarness<RowData> 
harness) {
+        assertThat(getEmittedRows(harness)).isEmpty();
+    }
+
+    /** Assert the test harness should emit records exactly same as the 
expected records. */
+    public void shouldEmit(
+            AbstractStreamOperatorTestHarness<RowData> harness, RowData... 
expected) {
+        assertThat(getEmittedRows(harness)).containsExactly(expected);
+    }
+
     /**
      * Compare the two queues containing operator/task output by converting 
them to an array first.
      * Asserts two converted array should be same.
@@ -67,6 +79,26 @@ public class RowDataHarnessAssertor {
         assertOutputEquals(message, expected, actual, true);
     }
 
+    private List<RowData> 
getEmittedRows(AbstractStreamOperatorTestHarness<RowData> harness) {
+        final RowData.FieldGetter[] fieldGetters = new 
RowData.FieldGetter[types.length];
+        for (int i = 0; i < types.length; i++) {
+            fieldGetters[i] = RowData.createFieldGetter(types[i], i);
+        }
+        final List<RowData> rows = new ArrayList<>();
+        Object o;
+        while ((o = harness.getOutput().poll()) != null) {
+            RowData value = (RowData) ((StreamRecord<?>) o).getValue();
+            Object[] row = new Object[types.length];
+            for (int i = 0; i < types.length; i++) {
+                row[i] = fieldGetters[i].getFieldOrNull(value);
+            }
+            GenericRowData newRow = GenericRowData.of(row);
+            newRow.setRowKind(value.getRowKind());
+            rows.add(newRow);
+        }
+        return rows;
+    }
+
     private void assertOutputEquals(
             String message,
             Collection<Object> expected,

Reply via email to