This is an automated email from the ASF dual-hosted git repository.
xuyangzhong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 5543195eb06 [FLINK-38511][table] Join that consumes cdc source without
delete may be converted to delta join (#27111)
5543195eb06 is described below
commit 5543195eb06599727d55a85efe89dda45e421597
Author: Xuyang <[email protected]>
AuthorDate: Mon Oct 27 09:38:52 2025 +0800
[FLINK-38511][table] Join that consumes cdc source without delete may be
converted to delta join (#27111)
---
.../nodes/exec/stream/StreamExecDeltaJoin.java | 33 +-
.../physical/stream/StreamPhysicalDeltaJoin.java | 64 +-
.../physical/stream/DeltaJoinRewriteRule.java | 7 +-
.../table/planner/plan/utils/DeltaJoinUtil.java | 71 +-
.../planner/factories/TestValuesTableFactory.java | 13 +-
.../planner/plan/stream/sql/DeltaJoinTest.xml | 254 ++++++-
.../planner/plan/stream/sql/DeltaJoinTest.scala | 162 ++++-
.../runtime/stream/sql/DeltaJoinITCase.scala | 602 ++++++++++------
.../join/deltajoin/StreamingDeltaJoinOperator.java | 5 +-
.../deltajoin/StreamingDeltaJoinOperatorTest.java | 782 +++++++++++++++------
10 files changed, 1461 insertions(+), 532 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeltaJoin.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeltaJoin.java
index 962cc550541..07ae35a6e82 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeltaJoin.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeltaJoin.java
@@ -80,6 +80,8 @@ import org.apache.calcite.tools.RelBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -117,6 +119,9 @@ public class StreamExecDeltaJoin extends
ExecNodeBase<RowData>
"lookupRightTableJoinSpec";
private static final String FIELD_NAME_LOOKUP_LEFT_TABLE_JOIN_SPEC =
"lookupLeftTableJoinSpec";
+ private static final String FIELD_NAME_LEFT_UPSERT_KEY = "leftUpsertKey";
+ private static final String FIELD_NAME_RIGHT_UPSERT_KEY = "rightUpsertKey";
+
private static final String FIELD_NAME_JOIN_TYPE = "joinType";
public static final String FIELD_NAME_ASYNC_OPTIONS = "asyncOptions";
@@ -135,6 +140,11 @@ public class StreamExecDeltaJoin extends
ExecNodeBase<RowData>
@JsonProperty(FIELD_NAME_LEFT_JOIN_KEYS)
private final int[] leftJoinKeys;
+ @JsonProperty(FIELD_NAME_LEFT_UPSERT_KEY)
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @Nullable
+ private final int[] leftUpsertKeys;
+
// left (streaming) side join right (lookup) side
@JsonProperty(FIELD_NAME_LOOKUP_RIGHT_TABLE_JOIN_SPEC)
private final DeltaJoinSpec lookupRightTableJoinSpec;
@@ -144,6 +154,11 @@ public class StreamExecDeltaJoin extends
ExecNodeBase<RowData>
@JsonProperty(FIELD_NAME_RIGHT_JOIN_KEYS)
private final int[] rightJoinKeys;
+ @JsonProperty(FIELD_NAME_RIGHT_UPSERT_KEY)
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @Nullable
+ private final int[] rightUpsertKeys;
+
// right (streaming) side join left (lookup) side
@JsonProperty(FIELD_NAME_LOOKUP_LEFT_TABLE_JOIN_SPEC)
private final DeltaJoinSpec lookupLeftTableJoinSpec;
@@ -153,9 +168,11 @@ public class StreamExecDeltaJoin extends
ExecNodeBase<RowData>
FlinkJoinType flinkJoinType,
// delta join args related with the left side
int[] leftJoinKeys,
+ @Nullable int[] leftUpsertKeys,
DeltaJoinSpec lookupRightTableJoinSpec,
// delta join args related with the right side
int[] rightJoinKeys,
+ @Nullable int[] rightUpsertKeys,
DeltaJoinSpec lookupLeftTableJoinSpec,
InputProperty leftInputProperty,
InputProperty rightInputProperty,
@@ -168,8 +185,10 @@ public class StreamExecDeltaJoin extends
ExecNodeBase<RowData>
ExecNodeContext.newPersistedConfig(StreamExecDeltaJoin.class,
tableConfig),
flinkJoinType,
leftJoinKeys,
+ leftUpsertKeys,
lookupRightTableJoinSpec,
rightJoinKeys,
+ rightUpsertKeys,
lookupLeftTableJoinSpec,
Lists.newArrayList(leftInputProperty, rightInputProperty),
outputType,
@@ -184,9 +203,11 @@ public class StreamExecDeltaJoin extends
ExecNodeBase<RowData>
@JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig
persistedConfig,
@JsonProperty(FIELD_NAME_JOIN_TYPE) FlinkJoinType flinkJoinType,
@JsonProperty(FIELD_NAME_LEFT_JOIN_KEYS) int[] leftJoinKeys,
+ @JsonProperty(FIELD_NAME_LEFT_UPSERT_KEY) @Nullable int[]
leftUpsertKeys,
@JsonProperty(FIELD_NAME_LOOKUP_RIGHT_TABLE_JOIN_SPEC)
DeltaJoinSpec lookupRightTableJoinSpec,
@JsonProperty(FIELD_NAME_RIGHT_JOIN_KEYS) int[] rightJoinKeys,
+ @JsonProperty(FIELD_NAME_RIGHT_UPSERT_KEY) @Nullable int[]
rightUpsertKeys,
@JsonProperty(FIELD_NAME_LOOKUP_LEFT_TABLE_JOIN_SPEC)
DeltaJoinSpec lookupLeftTableJoinSpec,
@JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty>
inputProperties,
@@ -197,8 +218,10 @@ public class StreamExecDeltaJoin extends
ExecNodeBase<RowData>
this.flinkJoinType = flinkJoinType;
this.leftJoinKeys = leftJoinKeys;
+ this.leftUpsertKeys = leftUpsertKeys;
this.lookupRightTableJoinSpec = lookupRightTableJoinSpec;
this.rightJoinKeys = rightJoinKeys;
+ this.rightUpsertKeys = rightUpsertKeys;
this.lookupLeftTableJoinSpec = lookupLeftTableJoinSpec;
this.asyncLookupOptions = asyncLookupOptions;
}
@@ -237,17 +260,15 @@ public class StreamExecDeltaJoin extends
ExecNodeBase<RowData>
RowDataKeySelector leftJoinKeySelector =
KeySelectorUtil.getRowDataSelector(
classLoader, leftJoinKeys,
InternalTypeInfo.of(leftStreamType));
- // currently, delta join only supports consuming INSERT-ONLY stream
RowDataKeySelector leftUpsertKeySelector =
- getUpsertKeySelector(new int[0], leftStreamType, classLoader);
+ getUpsertKeySelector(leftUpsertKeys, leftStreamType,
classLoader);
// right side selector
RowDataKeySelector rightJoinKeySelector =
KeySelectorUtil.getRowDataSelector(
classLoader, rightJoinKeys,
InternalTypeInfo.of(rightStreamType));
- // currently, delta join only supports consuming INSERT-ONLY stream
RowDataKeySelector rightUpsertKeySelector =
- getUpsertKeySelector(new int[0], rightStreamType, classLoader);
+ getUpsertKeySelector(rightUpsertKeys, rightStreamType,
classLoader);
StreamOperatorFactory<RowData> operatorFactory =
createAsyncLookupDeltaJoin(
@@ -485,9 +506,9 @@ public class StreamExecDeltaJoin extends
ExecNodeBase<RowData>
}
private RowDataKeySelector getUpsertKeySelector(
- int[] upsertKey, RowType rowType, ClassLoader classLoader) {
+ @Nullable int[] upsertKey, RowType rowType, ClassLoader
classLoader) {
final int[] rightUpsertKeys;
- if (upsertKey.length > 0) {
+ if (upsertKey != null && upsertKey.length > 0) {
rightUpsertKeys = upsertKey;
} else {
rightUpsertKeys = IntStream.range(0,
rowType.getFields().size()).toArray();
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalDeltaJoin.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalDeltaJoin.java
index 7c0f11cdc61..217ad55a570 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalDeltaJoin.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalDeltaJoin.java
@@ -22,39 +22,35 @@ import
org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.spec.DeltaJoinSpec;
import
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecDeltaJoin;
import org.apache.flink.table.planner.plan.utils.FunctionCallUtil;
+import org.apache.flink.table.planner.plan.utils.JoinTypeUtil;
import org.apache.flink.table.planner.plan.utils.RelExplainUtil;
+import org.apache.flink.table.planner.plan.utils.UpsertKeyUtil;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
-import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.BiRel;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelWriter;
-import org.apache.calcite.rel.core.JoinInfo;
-import org.apache.calcite.rel.hint.Hintable;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.hint.RelHint;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexNode;
+import java.util.Collections;
import java.util.List;
import java.util.Optional;
import static
org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig;
/** Stream physical RelNode for delta join. */
-public class StreamPhysicalDeltaJoin extends BiRel implements
StreamPhysicalRel, Hintable {
-
- private final FlinkJoinType joinType;
-
- private final RexNode originalJoinCondition;
-
- private final com.google.common.collect.ImmutableList<RelHint> hints;
+public class StreamPhysicalDeltaJoin extends Join implements StreamPhysicalRel
{
private final RelDataType rowType;
@@ -70,15 +66,20 @@ public class StreamPhysicalDeltaJoin extends BiRel
implements StreamPhysicalRel,
List<RelHint> hints,
RelNode left,
RelNode right,
- FlinkJoinType joinType,
+ JoinRelType joinType,
RexNode originalJoinCondition,
DeltaJoinSpec lookupRightTableJoinSpec,
DeltaJoinSpec lookupLeftTableJoinSpec,
RelDataType rowType) {
- super(cluster, traitSet, left, right);
- this.hints = com.google.common.collect.ImmutableList.copyOf(hints);
- this.joinType = joinType;
- this.originalJoinCondition = originalJoinCondition;
+ super(
+ cluster,
+ traitSet,
+ hints,
+ left,
+ right,
+ originalJoinCondition,
+ Collections.emptySet(),
+ joinType);
this.lookupRightTableJoinSpec = lookupRightTableJoinSpec;
this.lookupLeftTableJoinSpec = lookupLeftTableJoinSpec;
this.rowType = rowType;
@@ -97,15 +98,20 @@ public class StreamPhysicalDeltaJoin extends BiRel
implements StreamPhysicalRel,
// scenarios to enhance throughput as much as possible.
true,
AsyncDataStream.OutputMode.ORDERED);
+ FlinkRelMetadataQuery fmq =
+
FlinkRelMetadataQuery.reuseOrCreate(this.getCluster().getMetadataQuery());
- JoinInfo joinInfo = JoinInfo.of(left, right, originalJoinCondition);
+ int[] leftUpsertKey =
UpsertKeyUtil.smallestKey(fmq.getUpsertKeys(left)).orElse(null);
+ int[] rightUpsertKey =
UpsertKeyUtil.smallestKey(fmq.getUpsertKeys(right)).orElse(null);
return new StreamExecDeltaJoin(
config,
- joinType,
+ JoinTypeUtil.getFlinkJoinType(joinType),
joinInfo.leftKeys.toIntArray(),
+ leftUpsertKey,
lookupRightTableJoinSpec,
joinInfo.rightKeys.toIntArray(),
+ rightUpsertKey,
lookupLeftTableJoinSpec,
InputProperty.DEFAULT,
InputProperty.DEFAULT,
@@ -120,16 +126,21 @@ public class StreamPhysicalDeltaJoin extends BiRel
implements StreamPhysicalRel,
}
@Override
- public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
- assert inputs.size() == 2;
+ public Join copy(
+ RelTraitSet traitSet,
+ RexNode conditionExpr,
+ RelNode left,
+ RelNode right,
+ JoinRelType joinType,
+ boolean semiJoinDone) {
return new StreamPhysicalDeltaJoin(
getCluster(),
traitSet,
hints,
- inputs.get(0),
- inputs.get(1),
+ left,
+ right,
joinType,
- originalJoinCondition,
+ conditionExpr,
lookupRightTableJoinSpec,
lookupLeftTableJoinSpec,
rowType);
@@ -147,12 +158,13 @@ public class StreamPhysicalDeltaJoin extends BiRel
implements StreamPhysicalRel,
@Override
public RelWriter explainTerms(RelWriter pw) {
- return super.explainTerms(pw)
- .item("joinType", joinType.toString())
+ return pw.input("left", left)
+ .input("right", right)
+ .item("joinType",
JoinTypeUtil.getFlinkJoinType(joinType).toString())
.item(
"where",
getExpressionString(
- originalJoinCondition,
+ condition,
JavaScalaConversionUtil.toScala(this.getRowType().getFieldNames())
.toList(),
JavaScalaConversionUtil.toScala(Optional.empty()),
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/DeltaJoinRewriteRule.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/DeltaJoinRewriteRule.java
index e4c90fea9a2..14529f69ab8 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/DeltaJoinRewriteRule.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/DeltaJoinRewriteRule.java
@@ -25,7 +25,6 @@ import
org.apache.flink.table.planner.plan.nodes.exec.spec.DeltaJoinSpec;
import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDeltaJoin;
import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalJoin;
import org.apache.flink.table.planner.plan.utils.DeltaJoinUtil;
-import org.apache.flink.table.planner.plan.utils.JoinTypeUtil;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelRule;
@@ -44,7 +43,9 @@ import static
org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConf
* <li>The join is INNER join.
* <li>There is at least one join key pair in the join.
* <li>The downstream nodes of this join can accept duplicate changes.
- * <li>All join inputs are insert only streams.
+ * <li>All join inputs are with changelog "I" or "I, UA".
+ * <li>If this join outputs update records, the non-equiv conditions must be
applied on upsert
+ * keys of this join.
* <li>All upstream nodes of this join are in {@code
* DeltaJoinUtil#ALL_SUPPORTED_DELTA_JOIN_UPSTREAM_NODES}
* <li>The join keys include at least one complete index in each source
table of the join input.
@@ -92,7 +93,7 @@ public class DeltaJoinRewriteRule extends
RelRule<DeltaJoinRewriteRule.Config> {
join.getHints(),
join.getLeft(),
join.getRight(),
- JoinTypeUtil.getFlinkJoinType(join.getJoinType()),
+ join.getJoinType(),
join.getCondition(),
lookupRightTableJoinSpec,
lookupLeftTableJoinSpec,
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java
index e0920a4d1c1..a2e356e714f 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java
@@ -25,9 +25,12 @@ import
org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.functions.AsyncTableFunction;
import org.apache.flink.table.functions.UserDefinedFunction;
+import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery;
import org.apache.flink.table.planner.plan.nodes.exec.spec.DeltaJoinSpec;
+import org.apache.flink.table.planner.plan.nodes.exec.spec.JoinSpec;
import
org.apache.flink.table.planner.plan.nodes.exec.spec.TemporalTableSourceSpec;
import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDeltaJoin;
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDropUpdateBefore;
import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalExchange;
import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalIntermediateTableScan;
import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalJoin;
@@ -53,10 +56,12 @@ import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.mapping.IntPair;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -75,7 +80,10 @@ public class DeltaJoinUtil {
* <p>More physical nodes can be added to support more patterns for delta
join.
*/
private static final Set<Class<?>> ALL_SUPPORTED_DELTA_JOIN_UPSTREAM_NODES
=
- Sets.newHashSet(StreamPhysicalTableSourceScan.class,
StreamPhysicalExchange.class);
+ Sets.newHashSet(
+ StreamPhysicalTableSourceScan.class,
+ StreamPhysicalExchange.class,
+ StreamPhysicalDropUpdateBefore.class);
private DeltaJoinUtil() {}
@@ -95,8 +103,8 @@ public class DeltaJoinUtil {
return false;
}
- // currently, only join with append-only inputs is supported
- if (!areAllInputsInsertOnly(join)) {
+ // currently, only join that consumes +I and +U is supported
+ if (!areAllInputsInsertOrUpdateAfter(join)) {
return false;
}
@@ -240,7 +248,29 @@ public class DeltaJoinUtil {
private static boolean areJoinConditionsSupported(StreamPhysicalJoin join)
{
JoinInfo joinInfo = join.analyzeCondition();
// there must be one pair of join key
- return !joinInfo.pairs().isEmpty();
+ if (joinInfo.pairs().isEmpty()) {
+ return false;
+ }
+
+ // if this join outputs cdc records and has non-equiv condition, the
reference columns in
+ // the non-equiv condition must come from the same set of upsert keys
+ ChangelogMode changelogMode = getChangelogMode(join);
+ if (changelogMode.containsOnly(RowKind.INSERT)) {
+ return true;
+ }
+ JoinSpec joinSpec = join.joinSpec();
+ Optional<RexNode> nonEquiCond = joinSpec.getNonEquiCondition();
+ if (nonEquiCond.isEmpty()) {
+ return true;
+ }
+ ImmutableBitSet fieldRefIndices =
+ ImmutableBitSet.of(
+ RexNodeExtractor.extractRefInputFields(
+ Collections.singletonList(nonEquiCond.get())));
+ FlinkRelMetadataQuery fmq =
+
FlinkRelMetadataQuery.reuseOrCreate(join.getCluster().getMetadataQuery());
+ Set<ImmutableBitSet> upsertKeys = fmq.getUpsertKeys(join);
+ return upsertKeys.stream().anyMatch(uk ->
uk.contains(fieldRefIndices));
}
private static boolean areAllJoinTableScansSupported(StreamPhysicalJoin
join) {
@@ -301,8 +331,9 @@ public class DeltaJoinUtil {
node = unwrapNode(node, true);
// support to get table across more nodes if we support more nodes in
// `ALL_SUPPORTED_DELTA_JOIN_UPSTREAM_NODES`
- if (node instanceof StreamPhysicalExchange) {
- return getTableScan(((StreamPhysicalExchange) node).getInput());
+ if (node instanceof StreamPhysicalExchange
+ || node instanceof StreamPhysicalDropUpdateBefore) {
+ return getTableScan(node.getInput(0));
}
Preconditions.checkState(node instanceof TableScan);
@@ -340,25 +371,29 @@ public class DeltaJoinUtil {
return DuplicateChanges.ALLOW.equals(duplicateChanges);
}
- private static boolean areAllInputsInsertOnly(StreamPhysicalJoin join) {
+ private static boolean areAllInputsInsertOrUpdateAfter(StreamPhysicalJoin
join) {
for (RelNode input : join.getInputs()) {
- if (!isInsertOnly(unwrapNode(input, false))) {
+ if (!onlyProduceInsertOrUpdateAfter(unwrapNode(input, false))) {
return false;
}
}
return true;
}
- private static boolean isInsertOnly(StreamPhysicalRel node) {
- ChangelogMode changelogMode =
-
JavaScalaConversionUtil.toJava(ChangelogPlanUtils.getChangelogMode(node))
- .orElseThrow(
- () ->
- new IllegalStateException(
- String.format(
- "Unable to derive
changelog mode from node %s. This is a bug.",
- node)));
- return changelogMode.containsOnly(RowKind.INSERT);
+ private static boolean onlyProduceInsertOrUpdateAfter(StreamPhysicalRel
node) {
+ ChangelogMode changelogMode = getChangelogMode(node);
+ Set<RowKind> allKinds = changelogMode.getContainedKinds();
+ return !allKinds.contains(RowKind.UPDATE_BEFORE) &&
!allKinds.contains(RowKind.DELETE);
+ }
+
+ private static ChangelogMode getChangelogMode(StreamPhysicalRel node) {
+ return
JavaScalaConversionUtil.toJava(ChangelogPlanUtils.getChangelogMode(node))
+ .orElseThrow(
+ () ->
+ new IllegalStateException(
+ String.format(
+ "Unable to derive changelog
mode from node %s. This is a bug.",
+ node)));
}
private static StreamPhysicalRel unwrapNode(RelNode node, boolean
transposeToChildBlock) {
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
index 9f05cafb49f..4e12f5f72d8 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
@@ -1977,8 +1977,9 @@ public final class TestValuesTableFactory
Row pk = extractPk(row);
RowKind originalRowKind = row.getKind();
if (originalRowKind == RowKind.INSERT || originalRowKind ==
RowKind.UPDATE_AFTER) {
- row.setKind(RowKind.INSERT);
- pkMap.put(pk, row);
+ Row copiedRow = copyRow(row);
+ copiedRow.setKind(RowKind.INSERT);
+ pkMap.put(pk, copiedRow);
} else {
pkMap.remove(pk);
}
@@ -1986,6 +1987,14 @@ public final class TestValuesTableFactory
return new ArrayList<>(pkMap.values());
}
+ private Row copyRow(Row oldRow) {
+ Row newRow = new Row(oldRow.getKind(), oldRow.getArity());
+ for (int i = 0; i < newRow.getArity(); i++) {
+ newRow.setField(i, oldRow.getField(i));
+ }
+ return newRow;
+ }
+
private Row extractPk(Row row) {
Object[] pk = new Object[primaryKeyIndices.length];
for (int i = 0; i < primaryKeyIndices.length; i++) {
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml
index c37d0b83d1c..55249bedc21 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml
@@ -37,30 +37,6 @@ Sink(table=[default_catalog.default_database.append_snk],
fields=[a0, a1, a2, a3
: +- TableSourceScan(table=[[default_catalog, default_database, src1]],
fields=[a0, a1, a2, a3])
+- Exchange(distribution=[hash[b1, b2]])
+- TableSourceScan(table=[[default_catalog, default_database, src2]],
fields=[b0, b2, b1])
-]]>
- </Resource>
- </TestCase>
- <TestCase name="testCdcSource">
- <Resource name="sql">
- <![CDATA[insert into snk select * from src1 join cdc_src on src1.a1 =
cdc_src.b1 and src1.a2 = cdc_src.b2]]>
- </Resource>
- <Resource name="ast">
- <![CDATA[
-LogicalSink(table=[default_catalog.default_database.snk], fields=[a0, a1, a2,
a3, b0, b2, b1])
-+- LogicalProject(a0=[$0], a1=[$1], a2=[$2], a3=[$3], b0=[$4], b2=[$5],
b1=[$6])
- +- LogicalJoin(condition=[AND(=($1, $6), =($2, $5))], joinType=[inner])
- :- LogicalTableScan(table=[[default_catalog, default_database, src1]])
- +- LogicalTableScan(table=[[default_catalog, default_database, cdc_src]])
-]]>
- </Resource>
- <Resource name="optimized rel plan">
- <![CDATA[
-Sink(table=[default_catalog.default_database.snk], fields=[a0, a1, a2, a3, b0,
b2, b1])
-+- Join(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))], select=[a0,
a1, a2, a3, b0, b2, b1], leftInputSpec=[NoUniqueKey],
rightInputSpec=[NoUniqueKey])
- :- Exchange(distribution=[hash[a1, a2]])
- : +- TableSourceScan(table=[[default_catalog, default_database, src1]],
fields=[a0, a1, a2, a3])
- +- Exchange(distribution=[hash[b1, b2]])
- +- TableSourceScan(table=[[default_catalog, default_database, cdc_src]],
fields=[b0, b2, b1])
]]>
</Resource>
</TestCase>
@@ -298,6 +274,84 @@ Sink(table=[default_catalog.default_database.snk],
fields=[a0, a1, a2, a3, b0, b
: +- TableSourceScan(table=[[default_catalog, default_database, src1,
filter=[]]], fields=[a0, a1, a2, a3])
+- Exchange(distribution=[hash[b1, b2]])
+- TableSourceScan(table=[[default_catalog, default_database, src2]],
fields=[b0, b2, b1])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase
name="testFilterOnNonUpsertKeysAfterJoinWithCdcSourceWithoutDelete">
+ <Resource name="sql">
+ <![CDATA[insert into snk_for_cdc_src select * from no_delete_src1 join
no_delete_src2 on a1 = b1 and a2 = b2 where a3 > b0]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.snk_for_cdc_src],
fields=[a0, a1, a2, a3, b0, b2, b1])
++- LogicalProject(a0=[$0], a1=[$1], a2=[$2], a3=[$3], b0=[$4], b2=[$5],
b1=[$6])
+ +- LogicalFilter(condition=[>($3, $4)])
+ +- LogicalJoin(condition=[AND(=($1, $6), =($2, $5))], joinType=[inner])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
no_delete_src1]])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
no_delete_src2]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.snk_for_cdc_src], fields=[a0, a1,
a2, a3, b0, b2, b1], changelogMode=[NONE])
++- Join(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2), >(a3, b0))],
select=[a0, a1, a2, a3, b0, b2, b1], leftInputSpec=[HasUniqueKey],
rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA])
+ :- Exchange(distribution=[hash[a1, a2]], changelogMode=[I,UA])
+ : +- DropUpdateBefore(changelogMode=[I,UA])
+ : +- TableSourceScan(table=[[default_catalog, default_database,
no_delete_src1]], fields=[a0, a1, a2, a3], changelogMode=[I,UB,UA])
+ +- Exchange(distribution=[hash[b1, b2]], changelogMode=[I,UA])
+ +- DropUpdateBefore(changelogMode=[I,UA])
+ +- TableSourceScan(table=[[default_catalog, default_database,
no_delete_src2]], fields=[b0, b2, b1], changelogMode=[I,UB,UA])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testFilterOnUpsertKeysAfterJoinWithCdcSourceWithoutDelete">
+ <Resource name="sql">
+ <![CDATA[insert into snk_for_cdc_src select * from no_delete_src1 join
no_delete_src2 on a1 = b1 and a2 = b2 where a0 > b1]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.snk_for_cdc_src],
fields=[a0, a1, a2, a3, b0, b2, b1])
++- LogicalProject(a0=[$0], a1=[$1], a2=[$2], a3=[$3], b0=[$4], b2=[$5],
b1=[$6])
+ +- LogicalFilter(condition=[>($0, $6)])
+ +- LogicalJoin(condition=[AND(=($1, $6), =($2, $5))], joinType=[inner])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
no_delete_src1]])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
no_delete_src2]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.snk_for_cdc_src], fields=[a0, a1,
a2, a3, b0, b2, b1], changelogMode=[NONE])
++- DeltaJoin(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2), >(a0,
b1))], select=[a0, a1, a2, a3, b0, b2, b1], changelogMode=[I,UA])
+ :- Exchange(distribution=[hash[a1, a2]], changelogMode=[I,UA])
+ : +- DropUpdateBefore(changelogMode=[I,UA])
+ : +- TableSourceScan(table=[[default_catalog, default_database,
no_delete_src1]], fields=[a0, a1, a2, a3], changelogMode=[I,UB,UA])
+ +- Exchange(distribution=[hash[b1, b2]], changelogMode=[I,UA])
+ +- DropUpdateBefore(changelogMode=[I,UA])
+ +- TableSourceScan(table=[[default_catalog, default_database,
no_delete_src2]], fields=[b0, b2, b1], changelogMode=[I,UB,UA])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testJoinAppendOnlySourceAndSourceWithoutDelete">
+ <Resource name="sql">
+ <![CDATA[insert into tmp_snk select * from src1 join no_delete_src2 on
a1 = b1 and a2 = b2]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.tmp_snk], fields=[a0, a1,
a2, a3, b0, b2, b1])
++- LogicalProject(a0=[$0], a1=[$1], a2=[$2], a3=[$3], b0=[$4], b2=[$5],
b1=[$6])
+ +- LogicalJoin(condition=[AND(=($1, $6), =($2, $5))], joinType=[inner])
+ :- LogicalTableScan(table=[[default_catalog, default_database, src1]])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
no_delete_src2]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.tmp_snk], fields=[a0, a1, a2, a3,
b0, b2, b1], changelogMode=[NONE])
++- Join(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))], select=[a0,
a1, a2, a3, b0, b2, b1], leftInputSpec=[NoUniqueKey],
rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UB,UA])
+ :- Exchange(distribution=[hash[a1, a2]], changelogMode=[I])
+ : +- TableSourceScan(table=[[default_catalog, default_database, src1]],
fields=[a0, a1, a2, a3], changelogMode=[I])
+ +- Exchange(distribution=[hash[b1, b2]], changelogMode=[I,UB,UA])
+ +- TableSourceScan(table=[[default_catalog, default_database,
no_delete_src2]], fields=[b0, b2, b1], changelogMode=[I,UB,UA])
]]>
</Resource>
</TestCase>
@@ -600,6 +654,86 @@ Sink(table=[default_catalog.default_database.snk],
fields=[a0, a1, a2, a3, b0, b
Sink(table=[default_catalog.default_database.snk2], fields=[a0, a1, a2, a3,
b0, b2, b1])
+- Reused(reference_id=[1])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testPKContainJoinKeyAndOnlyOneSourceNoDelete">
+ <Resource name="sql">
+ <![CDATA[insert into snk_for_cdc_src select * from all_changelog_src
join no_delete_src2 on a1 = b1 and a2 = b2]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.snk_for_cdc_src],
fields=[a0, a1, a2, a3, b0, b2, b1])
++- LogicalProject(a0=[$0], a1=[$1], a2=[$2], a3=[$3], b0=[$4], b2=[$5],
b1=[$6])
+ +- LogicalJoin(condition=[AND(=($1, $6), =($2, $5))], joinType=[inner])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
all_changelog_src]])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
no_delete_src2]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.snk_for_cdc_src], fields=[a0, a1,
a2, a3, b0, b2, b1], changelogMode=[NONE])
++- Join(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))], select=[a0,
a1, a2, a3, b0, b2, b1], leftInputSpec=[HasUniqueKey],
rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
+ :- Exchange(distribution=[hash[a1, a2]], changelogMode=[I,UA,D])
+ : +- DropUpdateBefore(changelogMode=[I,UA,D])
+ : +- TableSourceScan(table=[[default_catalog, default_database,
all_changelog_src]], fields=[a0, a1, a2, a3], changelogMode=[I,UB,UA,D])
+ +- Exchange(distribution=[hash[b1, b2]], changelogMode=[I,UA])
+ +- DropUpdateBefore(changelogMode=[I,UA])
+ +- TableSourceScan(table=[[default_catalog, default_database,
no_delete_src2]], fields=[b0, b2, b1], changelogMode=[I,UB,UA])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testPKContainsJoinKeyAndSourceNoUBAndD">
+ <Resource name="sql">
+ <![CDATA[insert into snk_for_cdc_src select * from
no_delete_and_update_before_src1 join no_delete_and_update_before_src2 on a1 =
b1 and a2 = b2]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.snk_for_cdc_src],
fields=[a0, a1, a2, a3, b0, b2, b1])
++- LogicalProject(a0=[$0], a1=[$1], a2=[$2], a3=[$3], b0=[$4], b2=[$5],
b1=[$6])
+ +- LogicalJoin(condition=[AND(=($1, $6), =($2, $5))], joinType=[inner])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
no_delete_and_update_before_src1]])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
no_delete_and_update_before_src2]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.snk_for_cdc_src], fields=[a0, a1,
a2, a3, b0, b2, b1], changelogMode=[NONE])
++- Join(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))], select=[a0,
a1, a2, a3, b0, b2, b1], leftInputSpec=[HasUniqueKey],
rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
+ :- Exchange(distribution=[hash[a1, a2]], changelogMode=[I,UA,D])
+ : +- ChangelogNormalize(key=[a0, a1, a2], changelogMode=[I,UA,D])
+ : +- Exchange(distribution=[hash[a0, a1, a2]], changelogMode=[I,UA])
+ : +- TableSourceScan(table=[[default_catalog, default_database,
no_delete_and_update_before_src1]], fields=[a0, a1, a2, a3],
changelogMode=[I,UA])
+ +- Exchange(distribution=[hash[b1, b2]], changelogMode=[I,UA,D])
+ +- ChangelogNormalize(key=[b1, b2], changelogMode=[I,UA,D])
+ +- Exchange(distribution=[hash[b1, b2]], changelogMode=[I,UA])
+ +- TableSourceScan(table=[[default_catalog, default_database,
no_delete_and_update_before_src2]], fields=[b0, b2, b1], changelogMode=[I,UA])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testPKContainsJoinKeyAndTwoSourcesNoDelete">
+ <Resource name="sql">
+ <![CDATA[insert into snk_for_cdc_src select * from no_delete_src1 join
no_delete_src2 on a1 = b1 and a2 = b2]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.snk_for_cdc_src],
fields=[a0, a1, a2, a3, b0, b2, b1])
++- LogicalProject(a0=[$0], a1=[$1], a2=[$2], a3=[$3], b0=[$4], b2=[$5],
b1=[$6])
+ +- LogicalJoin(condition=[AND(=($1, $6), =($2, $5))], joinType=[inner])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
no_delete_src1]])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
no_delete_src2]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.snk_for_cdc_src], fields=[a0, a1,
a2, a3, b0, b2, b1], changelogMode=[NONE])
++- DeltaJoin(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))],
select=[a0, a1, a2, a3, b0, b2, b1], changelogMode=[I,UA])
+ :- Exchange(distribution=[hash[a1, a2]], changelogMode=[I,UA])
+ : +- DropUpdateBefore(changelogMode=[I,UA])
+ : +- TableSourceScan(table=[[default_catalog, default_database,
no_delete_src1]], fields=[a0, a1, a2, a3], changelogMode=[I,UB,UA])
+ +- Exchange(distribution=[hash[b1, b2]], changelogMode=[I,UA])
+ +- DropUpdateBefore(changelogMode=[I,UA])
+ +- TableSourceScan(table=[[default_catalog, default_database,
no_delete_src2]], fields=[b0, b2, b1], changelogMode=[I,UB,UA])
]]>
</Resource>
</TestCase>
@@ -651,6 +785,54 @@ Sink(table=[default_catalog.default_database.snk],
targetColumns=[[0],[1],[2],[4
: +- TableSourceScan(table=[[default_catalog, default_database, src1,
project=[a0, a1, a2], metadata=[]]], fields=[a0, a1, a2])
+- Exchange(distribution=[hash[b1, b2]])
+- TableSourceScan(table=[[default_catalog, default_database, src2]],
fields=[b0, b2, b1])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testRetractSink">
+ <Resource name="sql">
+ <![CDATA[insert into retract_snk select * from src1 join src2 on src1.a1
= src2.b1 and src1.a2 = src2.b2]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.retract_snk], fields=[a0,
a1, a2, a3, b0, b2, b1])
++- LogicalProject(a0=[$0], a1=[$1], a2=[$2], a3=[$3], b0=[$4], b2=[$5],
b1=[$6])
+ +- LogicalJoin(condition=[AND(=($1, $6), =($2, $5))], joinType=[inner])
+ :- LogicalTableScan(table=[[default_catalog, default_database, src1]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, src2]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.retract_snk], fields=[a0, a1, a2,
a3, b0, b2, b1])
++- Join(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))], select=[a0,
a1, a2, a3, b0, b2, b1], leftInputSpec=[NoUniqueKey],
rightInputSpec=[NoUniqueKey])
+ :- Exchange(distribution=[hash[a1, a2]])
+ : +- TableSourceScan(table=[[default_catalog, default_database, src1]],
fields=[a0, a1, a2, a3])
+ +- Exchange(distribution=[hash[b1, b2]])
+ +- TableSourceScan(table=[[default_catalog, default_database, src2]],
fields=[b0, b2, b1])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSourceWithAllRowKinds">
+ <Resource name="sql">
+ <![CDATA[insert into snk select * from src1 join cdc_src on src1.a1 =
cdc_src.b1 and src1.a2 = cdc_src.b2]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.snk], fields=[a0, a1, a2,
a3, b0, b2, b1])
++- LogicalProject(a0=[$0], a1=[$1], a2=[$2], a3=[$3], b0=[$4], b2=[$5],
b1=[$6])
+ +- LogicalJoin(condition=[AND(=($1, $6), =($2, $5))], joinType=[inner])
+ :- LogicalTableScan(table=[[default_catalog, default_database, src1]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, cdc_src]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.snk], fields=[a0, a1, a2, a3, b0,
b2, b1])
++- Join(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))], select=[a0,
a1, a2, a3, b0, b2, b1], leftInputSpec=[NoUniqueKey],
rightInputSpec=[NoUniqueKey])
+ :- Exchange(distribution=[hash[a1, a2]])
+ : +- TableSourceScan(table=[[default_catalog, default_database, src1]],
fields=[a0, a1, a2, a3])
+ +- Exchange(distribution=[hash[b1, b2]])
+ +- TableSourceScan(table=[[default_catalog, default_database, cdc_src]],
fields=[b0, b2, b1])
]]>
</Resource>
</TestCase>
@@ -708,27 +890,27 @@ Sink(table=[default_catalog.default_database.tmp_snk],
fields=[a0, a1, a2, a3, b
]]>
</Resource>
</TestCase>
- <TestCase name="testRetractSink">
+ <TestCase name="testPKNotContainJoinKeyAndTwoSourcesNoDelete">
<Resource name="sql">
- <![CDATA[insert into retract_snk select * from src1 join src2 on src1.a1
= src2.b1 and src1.a2 = src2.b2]]>
+ <![CDATA[insert into snk_for_cdc_src select * from no_delete_src1 join
no_delete_src2 on a0 = b0 and a1 = b1 and a2 = b2]]>
</Resource>
<Resource name="ast">
<![CDATA[
-LogicalSink(table=[default_catalog.default_database.retract_snk], fields=[a0,
a1, a2, a3, b0, b2, b1])
+LogicalSink(table=[default_catalog.default_database.snk_for_cdc_src],
fields=[a0, a1, a2, a3, b0, b2, b1])
+- LogicalProject(a0=[$0], a1=[$1], a2=[$2], a3=[$3], b0=[$4], b2=[$5],
b1=[$6])
- +- LogicalJoin(condition=[AND(=($1, $6), =($2, $5))], joinType=[inner])
- :- LogicalTableScan(table=[[default_catalog, default_database, src1]])
- +- LogicalTableScan(table=[[default_catalog, default_database, src2]])
+ +- LogicalJoin(condition=[AND(=($0, $4), =($1, $6), =($2, $5))],
joinType=[inner])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
no_delete_src1]])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
no_delete_src2]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Sink(table=[default_catalog.default_database.retract_snk], fields=[a0, a1, a2,
a3, b0, b2, b1])
-+- Join(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))], select=[a0,
a1, a2, a3, b0, b2, b1], leftInputSpec=[NoUniqueKey],
rightInputSpec=[NoUniqueKey])
- :- Exchange(distribution=[hash[a1, a2]])
- : +- TableSourceScan(table=[[default_catalog, default_database, src1]],
fields=[a0, a1, a2, a3])
- +- Exchange(distribution=[hash[b1, b2]])
- +- TableSourceScan(table=[[default_catalog, default_database, src2]],
fields=[b0, b2, b1])
+Sink(table=[default_catalog.default_database.snk_for_cdc_src], fields=[a0, a1,
a2, a3, b0, b2, b1], changelogMode=[NONE])
++- Join(joinType=[InnerJoin], where=[AND(=(a0, b0), =(a1, b1), =(a2, b2))],
select=[a0, a1, a2, a3, b0, b2, b1], leftInputSpec=[JoinKeyContainsUniqueKey],
rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UB,UA])
+ :- Exchange(distribution=[hash[a0, a1, a2]], changelogMode=[I,UB,UA])
+ : +- TableSourceScan(table=[[default_catalog, default_database,
no_delete_src1]], fields=[a0, a1, a2, a3], changelogMode=[I,UB,UA])
+ +- Exchange(distribution=[hash[b0, b1, b2]], changelogMode=[I,UB,UA])
+ +- TableSourceScan(table=[[default_catalog, default_database,
no_delete_src2]], fields=[b0, b2, b1], changelogMode=[I,UB,UA])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.scala
index 9ed9ddaee8d..7cb04376bba 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.scala
@@ -26,6 +26,7 @@ import org.apache.flink.table.planner.JMap
import org.apache.flink.table.planner.utils.{TableTestBase,
TestingTableEnvironment}
import org.assertj.core.api.Assertions.assertThatThrownBy
+import org.assertj.core.util.Maps
import org.junit.jupiter.api.{BeforeEach, Test}
import java.util.{Collections, HashMap => JHashMap}
@@ -76,6 +77,33 @@ class DeltaJoinTest extends TableTestBase {
.build()
)
+ addTable(
+ "no_delete_src1",
+ Schema
+ .newBuilder()
+ .column("a0", DataTypes.INT.notNull)
+ .column("a1", DataTypes.DOUBLE.notNull)
+ .column("a2", DataTypes.STRING.notNull)
+ .column("a3", DataTypes.INT)
+ .primaryKey("a0", "a1", "a2")
+ .index("a1", "a2")
+ .build(),
+ Maps.newHashMap("changelog-mode", "I,UA,UB")
+ )
+
+ addTable(
+ "no_delete_src2",
+ Schema
+ .newBuilder()
+ .column("b0", DataTypes.INT)
+ .column("b2", DataTypes.STRING.notNull)
+ .column("b1", DataTypes.DOUBLE.notNull)
+ .primaryKey("b1", "b2")
+ .index("b2")
+ .build(),
+ Maps.newHashMap("changelog-mode", "I,UA,UB")
+ )
+
addTable(
"snk",
Schema
@@ -90,6 +118,21 @@ class DeltaJoinTest extends TableTestBase {
.primaryKey("l0", "r0")
.build()
)
+
+ addTable(
+ "snk_for_cdc_src",
+ Schema
+ .newBuilder()
+ .column("l0", DataTypes.INT.notNull)
+ .column("l1", DataTypes.DOUBLE.notNull)
+ .column("l2", DataTypes.STRING.notNull)
+ .column("l3", DataTypes.INT)
+ .column("r0", DataTypes.INT)
+ .column("r2", DataTypes.STRING.notNull)
+ .column("r1", DataTypes.DOUBLE.notNull)
+ .primaryKey("l0", "l1", "l2", "r1", "r2")
+ .build()
+ )
}
@Test
@@ -188,6 +231,30 @@ class DeltaJoinTest extends TableTestBase {
"where a3 > b0")
}
+ @Test
+ def testFilterOnNonUpsertKeysAfterJoinWithCdcSourceWithoutDelete(): Unit = {
+ util.verifyRelPlanInsert(
+ "insert into snk_for_cdc_src select * from no_delete_src1 " +
+ "join no_delete_src2 " +
+ "on a1 = b1 " +
+ "and a2 = b2 " +
+ "where a3 > b0",
+ ExplainDetail.CHANGELOG_MODE
+ )
+ }
+
+ @Test
+ def testFilterOnUpsertKeysAfterJoinWithCdcSourceWithoutDelete(): Unit = {
+ util.verifyRelPlanInsert(
+ "insert into snk_for_cdc_src select * from no_delete_src1 " +
+ "join no_delete_src2 " +
+ "on a1 = b1 " +
+ "and a2 = b2 " +
+ "where a0 > b1",
+ ExplainDetail.CHANGELOG_MODE
+ )
+ }
+
@Test
def testMultiRootsWithReusingJoinView(): Unit = {
util.tableConfig.set(
@@ -372,7 +439,7 @@ class DeltaJoinTest extends TableTestBase {
}
@Test
- def testCdcSource(): Unit = {
+ def testSourceWithAllRowKinds(): Unit = {
util.tableConfig.set(
ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE,
UpsertMaterialize.NONE)
@@ -387,6 +454,99 @@ class DeltaJoinTest extends TableTestBase {
"and src1.a2 = cdc_src.b2")
}
+ @Test
+ def testPKContainsJoinKeyAndTwoSourcesNoDelete(): Unit = {
+ util.verifyRelPlanInsert(
+ "insert into snk_for_cdc_src " +
+ "select * from no_delete_src1 join no_delete_src2 " +
+ "on a1 = b1 " +
+ "and a2 = b2",
+ ExplainDetail.CHANGELOG_MODE)
+ }
+
+ @Test
+ def testPKNotContainJoinKeyAndTwoSourcesNoDelete(): Unit = {
+ util.tableConfig.set(
+ ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE,
+ UpsertMaterialize.NONE)
+
+ util.verifyRelPlanInsert(
+ "insert into snk_for_cdc_src " +
+ "select * from no_delete_src1 join no_delete_src2 " +
+ "on a0 = b0 " +
+ "and a1 = b1 " +
+ "and a2 = b2",
+ ExplainDetail.CHANGELOG_MODE
+ )
+ }
+
+ @Test
+ def testPKContainJoinKeyAndOnlyOneSourceNoDelete(): Unit = {
+ util.tableEnv.executeSql("""
+ |create table all_changelog_src with (
+ | 'changelog-mode' = 'I,UA,UB,D'
+ |) like no_delete_src1
+ |""".stripMargin)
+
+ util.verifyRelPlanInsert(
+ "insert into snk_for_cdc_src " +
+ "select * from all_changelog_src join no_delete_src2 " +
+ "on a1 = b1 " +
+ "and a2 = b2",
+ ExplainDetail.CHANGELOG_MODE
+ )
+ }
+
+ @Test
+ def testPKContainsJoinKeyAndSourceNoUBAndD(): Unit = {
+ // FLINK-38489 Currently, ChangelogNormalize will always generate
changelog mode with D,
+ // and Join with D cannot be optimized into Delta Join
+ util.tableEnv.executeSql("""
+ |create table no_delete_and_update_before_src1
with (
+ | 'changelog-mode' = 'I,UA'
+ |) like no_delete_src1
+ |""".stripMargin)
+
+ util.tableEnv.executeSql("""
+ |create table no_delete_and_update_before_src2
with (
+ | 'changelog-mode' = 'I,UA'
+ |) like no_delete_src2
+ |""".stripMargin)
+
+ util.verifyRelPlanInsert(
+ "insert into snk_for_cdc_src " +
+ "select * from no_delete_and_update_before_src1 " +
+ "join no_delete_and_update_before_src2 " +
+ "on a1 = b1 " +
+ "and a2 = b2",
+ ExplainDetail.CHANGELOG_MODE
+ )
+ }
+
+ @Test
+ def testJoinAppendOnlySourceAndSourceWithoutDelete(): Unit = {
+ util.tableConfig.set(
+ ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE,
+ UpsertMaterialize.NONE)
+
+ util.tableEnv.executeSql("""
+ |create table tmp_snk (
+ | primary key (r1, r2) not enforced
+ |) like snk_for_cdc_src (
+ | EXCLUDING CONSTRAINTS
+ |)
+ |""".stripMargin)
+
+ // the join could not be converted into the delta join
+ // because the upsert key of the join is `null`
+ util.verifyRelPlanInsert(
+ "insert into tmp_snk " +
+ "select * from src1 join no_delete_src2 " +
+ "on a1 = b1 " +
+ "and a2 = b2",
+ ExplainDetail.CHANGELOG_MODE)
+ }
+
@Test
def testSourceWithSourceAbilities(): Unit = {
util.tableEnv.executeSql(
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/DeltaJoinITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/DeltaJoinITCase.scala
index 063fe973c67..0c5d881d660 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/DeltaJoinITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/DeltaJoinITCase.scala
@@ -30,13 +30,14 @@ import
org.apache.flink.table.planner.runtime.utils.{FailingCollectionSource, St
import
org.apache.flink.testutils.junit.extensions.parameterized.{ParameterizedTestExtension,
Parameters}
import org.apache.flink.types.Row
-import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy}
+import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.{BeforeEach, TestTemplate}
import org.junit.jupiter.api.extension.ExtendWith
import javax.annotation.Nullable
import java.time.LocalDateTime
+import java.util.Objects.requireNonNull
import java.util.concurrent.TimeUnit
import scala.collection.JavaConversions._
@@ -81,7 +82,18 @@ class DeltaJoinITCase(enableCache: Boolean) extends
StreamingTestBase {
"+I[1.0, 1, 2021-01-01T01:01:01, 1, 1.0, 2021-01-01T01:01:01]",
"+I[2.0, 2, 2022-02-02T02:02:02, 2, 2.0, 2022-02-02T02:02:22]"
)
- testUpsertResult(List("a1"), List("b1"), data1, data2, "a1 = b1",
expected, 6)
+
+ testUpsertResult(
+ newTestSpecBuilder()
+ .withLeftIndex(List("a1"))
+ .withRightIndex(List("b1"))
+ .withLeftData(data1)
+ .withRightData(data2)
+ .withJoinCondition("a1 = b1")
+ .withSinkPk(List("l0", "r0"))
+ .withExpectedData(expected)
+ .withExpectedLookupFunctionInvokeCount(6)
+ .build())
}
@TestTemplate
@@ -106,34 +118,18 @@ class DeltaJoinITCase(enableCache: Boolean) extends
StreamingTestBase {
"+I[2.0, 2, 2022-02-02T02:02:02, 2, 2.0, 2022-02-02T02:02:02]",
"+I[1.0, 1, 2021-01-01T01:01:01, 1, 1.0, 2021-01-01T01:01:01]"
)
- testUpsertResult(List("a1"), List("b1"), data1, data2, "a1 = b1 and a2 =
b2", expected, 6)
- }
-
- @TestTemplate
- def testJoinKeyNotContainsIndex(): Unit = {
- val data1 = List(
- changelogRow("+I", Double.box(1.0), Int.box(1), LocalDateTime.of(2021,
1, 1, 1, 1, 1)),
- changelogRow("+I", Double.box(2.0), Int.box(2), LocalDateTime.of(2022,
2, 2, 2, 2, 2)),
- // mismatch
- changelogRow("+I", Double.box(3.0), Int.box(3), LocalDateTime.of(2023,
3, 3, 3, 3, 3))
- )
- val data2 = List(
- changelogRow("+I", Int.box(1), Double.box(1.0), LocalDateTime.of(2021,
1, 1, 1, 1, 11)),
- changelogRow("+I", Int.box(2), Double.box(2.0), LocalDateTime.of(2022,
2, 2, 2, 2, 22)),
- // mismatch
- changelogRow("+I", Int.box(99), Double.box(99.0), LocalDateTime.of(2099,
2, 2, 2, 2, 2))
- )
-
- // TestValuesRuntimeFunctions#KeyedUpsertingSinkFunction will change the
RowKind from
- // "+U" to "+I"
- val expected = List("+I[1.0, 1, 2022-02-02T02:02:02, 1, 1.0,
2022-02-02T02:02:22]")
-
- // could not optimize into delta join because join keys do not contain
indexes strictly
- assertThatThrownBy(
- () =>
- testUpsertResult(List("a0", "a1"), List("b0", "b1"), data1, data2, "a1
= b1", expected, 6))
- .hasMessageContaining("The current sql doesn't support to do delta join
optimization.")
+ testUpsertResult(
+ newTestSpecBuilder()
+ .withLeftIndex(List("a1"))
+ .withRightIndex(List("b1"))
+ .withLeftData(data1)
+ .withRightData(data2)
+ .withJoinCondition("a1 = b1 and a2 = b2")
+ .withSinkPk(List("l0", "r0"))
+ .withExpectedData(expected)
+ .withExpectedLookupFunctionInvokeCount(6)
+ .build())
}
@TestTemplate
@@ -157,14 +153,18 @@ class DeltaJoinITCase(enableCache: Boolean) extends
StreamingTestBase {
val expected = List(
"+I[1.0, 1, 2022-02-02T02:02:02, 1, 1.0, 2022-02-02T02:02:22]"
)
+
testUpsertResult(
- List("a1"),
- List("b1"),
- data1,
- data2,
- "a1 = b1",
- expected,
- if (enableCache) 4 else 6)
+ newTestSpecBuilder()
+ .withLeftIndex(List("a1"))
+ .withRightIndex(List("b1"))
+ .withLeftData(data1)
+ .withRightData(data2)
+ .withJoinCondition("a1 = b1")
+ .withSinkPk(List("l0", "r0"))
+ .withExpectedData(expected)
+ .withExpectedLookupFunctionInvokeCount(if (enableCache) 4 else 6)
+ .build())
}
@TestTemplate
@@ -192,13 +192,17 @@ class DeltaJoinITCase(enableCache: Boolean) extends
StreamingTestBase {
"+I[1.0, 2, 2021-01-01T01:01:01, 2, 1.0, 2021-01-01T01:01:01]"
)
testUpsertResult(
- List("a1"),
- List("b1"),
- data1,
- data2,
- "a1 = b1 and a2 = b2",
- expected,
- if (enableCache) 4 else 6)
+ newTestSpecBuilder()
+ .withLeftIndex(List("a1"))
+ .withRightIndex(List("b1"))
+ .withLeftData(data1)
+ .withRightData(data2)
+ .withJoinCondition("a1 = b1 and a2 = b2")
+ .withSinkPk(List("l0", "r0"))
+ .withExpectedData(expected)
+ .withExpectedLookupFunctionInvokeCount(if (enableCache) 4 else 6)
+ .build()
+ )
}
@TestTemplate
@@ -222,99 +226,147 @@ class DeltaJoinITCase(enableCache: Boolean) extends
StreamingTestBase {
val expected = List("+I[2.0, 2, 2023-03-03T03:03:03, 2, 2.0,
2022-02-02T02:02:22]")
testUpsertResult(
- List("a0"),
- List("b0"),
- data1,
- data2,
- "a0 = b0 and a1 = b1 and a2 > b2",
- expected,
- 6)
+ newTestSpecBuilder()
+ .withLeftIndex(List("a0"))
+ .withRightIndex(List("b0"))
+ .withLeftData(data1)
+ .withRightData(data2)
+ .withJoinCondition("a0 = b0 and a1 = b1 and a2 > b2")
+ .withSinkPk(List("l0", "r0"))
+ .withExpectedData(expected)
+ .withExpectedLookupFunctionInvokeCount(6)
+ .build())
}
@TestTemplate
- def testWithNonEquiCondition2(): Unit = {
+ def testCdcSourceWithoutDelete(): Unit = {
val data1 = List(
+ // pk1
changelogRow("+I", Double.box(1.0), Int.box(1), LocalDateTime.of(2021,
1, 1, 1, 1, 1)),
- changelogRow("+I", Double.box(2.0), Int.box(2), LocalDateTime.of(2023,
3, 3, 3, 3, 3)),
+ changelogRow("-U", Double.box(1.0), Int.box(1), LocalDateTime.of(2021,
1, 1, 1, 1, 1)),
+ changelogRow("+U", Double.box(11.0), Int.box(1), LocalDateTime.of(2021,
1, 1, 1, 1, 11)),
+ // pk2
+ changelogRow("+I", Double.box(2.0), Int.box(2), LocalDateTime.of(2022,
2, 2, 2, 2, 2)),
+ changelogRow("-U", Double.box(2.0), Int.box(2), LocalDateTime.of(2022,
2, 2, 2, 2, 2)),
+ changelogRow("+U", Double.box(22.0), Int.box(2), LocalDateTime.of(2022,
2, 2, 2, 2, 22)),
+ // mismatch
+ changelogRow("+I", Double.box(3.0), Int.box(3), LocalDateTime.of(2023,
3, 3, 3, 3, 3))
+ )
+
+ val data2 = List(
+ // pk1
+ changelogRow("+I", Int.box(1), Double.box(12.0), LocalDateTime.of(2021,
1, 1, 1, 1, 12)),
+ changelogRow("-U", Int.box(1), Double.box(12.0), LocalDateTime.of(2021,
1, 1, 1, 1, 12)),
+ changelogRow("+U", Int.box(1), Double.box(13.0), LocalDateTime.of(2021,
1, 1, 1, 1, 13)),
+ // pk2
+ changelogRow("+I", Int.box(2), Double.box(22.0), LocalDateTime.of(2022,
2, 2, 2, 2, 22)),
+ changelogRow("-U", Int.box(2), Double.box(22.0), LocalDateTime.of(2022,
2, 2, 2, 2, 22)),
+ changelogRow("+U", Int.box(2), Double.box(23.0), LocalDateTime.of(2022,
2, 2, 2, 2, 23)),
+ // mismatch
+ changelogRow("+I", Int.box(99), Double.box(99.0), LocalDateTime.of(2099,
2, 2, 2, 2, 2))
+ )
+
+ // TestValuesRuntimeFunctions#KeyedUpsertingSinkFunction will change the
RowKind from
+ // "+U" to "+I"
+ val expected = List(
+ "+I[11.0, 1, 2021-01-01T01:01:11, 1, 13.0, 2021-01-01T01:01:13]",
+ "+I[22.0, 2, 2022-02-02T02:02:22, 2, 23.0, 2022-02-02T02:02:23]"
+ )
+
+ testUpsertResult(
+ newTestSpecBuilder()
+ .withLeftIndex(List("a0"))
+ .withRightIndex(List("b0"))
+ .withLeftPk(List("a0"))
+ .withRightPk(List("b0"))
+ .withSinkPk(List("l0", "r0"))
+ .withLeftChangelogMode("I,UA,UB")
+ .withRightChangelogMode("I,UA,UB")
+ .withLeftData(data1)
+ .withRightData(data2)
+ .withJoinCondition("a0 = b0")
+ .withExpectedData(expected)
+ .withExpectedLookupFunctionInvokeCount(if (enableCache) 6 else 10)
+ .build())
+ }
+
+ @TestTemplate
+ def testFilterFieldsAfterJoin(): Unit = {
+ val data1 = List(
+ changelogRow("+I", Double.box(1.0), Int.box(1), LocalDateTime.of(2021,
1, 1, 1, 1, 1)),
+ changelogRow("+I", Double.box(2.0), Int.box(2), LocalDateTime.of(2022,
2, 2, 2, 2, 2)),
// mismatch
changelogRow("+I", Double.box(3.0), Int.box(3), LocalDateTime.of(2033,
3, 3, 3, 3, 3))
)
val data2 = List(
changelogRow("+I", Int.box(1), Double.box(1.0), LocalDateTime.of(2021,
1, 1, 1, 1, 11)),
- changelogRow("+I", Int.box(2), Double.box(2.0), LocalDateTime.of(2022,
2, 2, 2, 2, 22)),
+ changelogRow("+I", Int.box(2), Double.box(3.0), LocalDateTime.of(2022,
2, 2, 2, 2, 33)),
// mismatch
changelogRow("+I", Int.box(99), Double.box(99.0), LocalDateTime.of(2099,
2, 2, 2, 2, 2))
)
// TestValuesRuntimeFunctions#KeyedUpsertingSinkFunction will change the
RowKind from
// "+U" to "+I"
- val expected = List("+I[2.0, 2, 2023-03-03T03:03:03, 2, 2.0,
2022-02-02T02:02:22]")
+ val expected = List("+I[2.0, 2, 2022-02-02T02:02:02, 2, 3.0,
2022-02-02T02:02:33]")
- // could not optimize into delta join because there is calc between join
and source
- assertThatThrownBy(
- () =>
- testUpsertResult(
- List("a0"),
- List("b0"),
- data1,
- data2,
- "a0 = b0 and a1 = b1 and a2 > TO_TIMESTAMP('2021-01-01 01:01:11')",
- expected,
- 6))
- .hasMessageContaining("The current sql doesn't support to do delta join
optimization.")
-
- // could not optimize into delta join because there is calc between join
and source
- assertThatThrownBy(
- () =>
- testUpsertResult(
- List("a0"),
- List("b0"),
- data1,
- data2,
- "a0 = b0 and b1 > 1.0",
- expected,
- 12))
- .hasMessageContaining("The current sql doesn't support to do delta join
optimization.")
+ testUpsertResult(
+ newTestSpecBuilder()
+ .withLeftIndex(List("a0"))
+ .withRightIndex(List("b0"))
+ .withLeftData(data1)
+ .withRightData(data2)
+ .withJoinCondition("a0 = b0")
+ .withFilterAfterJoin("a1 <> b1")
+ .withSinkPk(List("l0", "r0"))
+ .withExpectedData(expected)
+ .withExpectedLookupFunctionInvokeCount(6)
+ .build())
}
@TestTemplate
- def testFilterFieldsBeforeJoin(): Unit = {
+ def testFilterFieldsAfterJoinWithCdcSourceWithoutDelete(): Unit = {
val data1 = List(
+ // pk1
changelogRow("+I", Double.box(1.0), Int.box(1), LocalDateTime.of(2021,
1, 1, 1, 1, 1)),
- changelogRow("+I", Double.box(2.0), Int.box(2), LocalDateTime.of(2022,
2, 2, 2, 2, 2)),
+ changelogRow("-U", Double.box(1.0), Int.box(1), LocalDateTime.of(2021,
1, 1, 1, 1, 1)),
+ changelogRow("+U", Double.box(1.0), Int.box(1), LocalDateTime.of(2021,
1, 1, 1, 1, 2)),
+ // pk2
+ changelogRow("+I", Double.box(2.0), Int.box(3), LocalDateTime.of(2022,
2, 2, 2, 2, 2)),
+ changelogRow("-U", Double.box(2.0), Int.box(3), LocalDateTime.of(2022,
2, 2, 2, 2, 2)),
+ changelogRow("+U", Double.box(2.0), Int.box(3), LocalDateTime.of(2022,
2, 2, 2, 2, 3)),
// mismatch
changelogRow("+I", Double.box(3.0), Int.box(3), LocalDateTime.of(2033,
3, 3, 3, 3, 3))
)
val data2 = List(
+ // pk1
changelogRow("+I", Int.box(1), Double.box(1.0), LocalDateTime.of(2021,
1, 1, 1, 1, 11)),
- changelogRow("+I", Int.box(2), Double.box(2.0), LocalDateTime.of(2022,
2, 2, 2, 2, 22)),
+ changelogRow("+I", Int.box(3), Double.box(2.0), LocalDateTime.of(2022,
2, 2, 2, 2, 22)),
// mismatch
changelogRow("+I", Int.box(99), Double.box(99.0), LocalDateTime.of(2099,
2, 2, 2, 2, 2))
)
// TestValuesRuntimeFunctions#KeyedUpsertingSinkFunction will change the
RowKind from
// "+U" to "+I"
- val expected = List("+I[1.0, 1, 2021-01-01T01:01:01, 1, 1.0,
2021-01-01T01:01:11]")
-
- // could not optimize into delta join because there is calc between join
and source
- assertThatThrownBy(
- () =>
- testUpsertResult(
- List("a0"),
- List("b0"),
- data1,
- data2,
- "a0 = b0 and a1 = b1 and a2 = TO_TIMESTAMP('2021-01-01 01:01:01')",
- expected,
- 6))
- .hasMessageContaining("The current sql doesn't support to do delta join
optimization.")
-
- // could not optimize into delta join because there is calc between join
and source
- assertThatThrownBy(
- () => testUpsertResult(List(), List(), data1, data2, "a0 = b0 and b1 =
1.0", expected, 12))
- .hasMessageContaining("The current sql doesn't support to do delta join
optimization.")
+ val expected = List("+I[2.0, 3, 2022-02-02T02:02:03, 3, 2.0,
2022-02-02T02:02:22]")
+
+ testUpsertResult(
+ newTestSpecBuilder()
+ .withLeftIndex(List("a0"))
+ .withRightIndex(List("b0"))
+ .withLeftPk(List("a0", "a1"))
+ .withRightPk(List("b0", "b1"))
+ .withSinkPk(List("l0", "r0", "l1", "r1"))
+ .withLeftData(data1)
+ .withRightData(data2)
+ .withLeftChangelogMode("I,UA,UB")
+ .withRightChangelogMode("I,UA,UB")
+ .withJoinCondition("a0 = b0")
+ .withFilterAfterJoin("a1 < b0")
+ .withExpectedData(expected)
+ .withExpectedLookupFunctionInvokeCount(if (enableCache) 5 else 8)
+ .build())
}
@TestTemplate
@@ -363,85 +415,6 @@ class DeltaJoinITCase(enableCache: Boolean) extends
StreamingTestBase {
assertThat(AsyncTestValueLookupFunction.invokeCount.get()).isEqualTo(6)
}
- @TestTemplate
- def testProjectFieldsBeforeJoin(): Unit = {
- val data1 = List(
- changelogRow("+I", Double.box(1.0), Int.box(1), LocalDateTime.of(2021,
1, 1, 1, 1, 1)),
- changelogRow("+I", Double.box(2.0), Int.box(2), LocalDateTime.of(2022,
2, 2, 2, 2, 2)),
- // mismatch
- changelogRow("+I", Double.box(3.0), Int.box(3), LocalDateTime.of(2033,
3, 3, 3, 3, 3))
- )
-
- val data2 = List(
- changelogRow("+I", Int.box(1), Double.box(1.0), LocalDateTime.of(2021,
1, 1, 1, 1, 11)),
- changelogRow("+I", Int.box(2), Double.box(2.0), LocalDateTime.of(2022,
2, 2, 2, 2, 22)),
- // mismatch
- changelogRow("+I", Int.box(99), Double.box(99.0), LocalDateTime.of(2099,
2, 2, 2, 2, 2))
- )
-
- prepareTable(List("a0"), List("b0"), data1, data2)
- tEnv.executeSql(s"""
- |create table projectedSink(
- | l0 int,
- | r0 int,
- | l1 double,
- | l2 timestamp(3),
- | primary key(l0, r0) not enforced
- |) with (
- | 'connector' = 'values',
- | 'bounded' = 'false',
- | 'sink-insert-only' = 'false'
- |)
- |""".stripMargin)
-
- // could not optimize into delta join
- // because there is ProjectPushDownSpec between join and source
- assertThatThrownBy(() => tEnv.executeSql("""
- |insert into projectedSink
- | select
- | testLeft.a0,
- | testRight.b0,
- | testLeft.a1,
- | testLeft.a2
- | from testLeft
- | join testRight
- | on a0 = b0
- |""".stripMargin))
- .hasMessageContaining("The current sql doesn't support to do delta join
optimization.")
- }
-
- @TestTemplate
- def testProjectFieldsBeforeJoin2(): Unit = {
- val data1 = List(
- changelogRow("+I", Double.box(1.0), Int.box(1), LocalDateTime.of(2021,
1, 1, 1, 1, 1)),
- changelogRow("+I", Double.box(2.0), Int.box(2), LocalDateTime.of(2022,
2, 2, 2, 2, 2)),
- // mismatch
- changelogRow("+I", Double.box(3.0), Int.box(3), LocalDateTime.of(2033,
3, 3, 3, 3, 3))
- )
-
- val data2 = List(
- changelogRow("+I", Int.box(1), Double.box(1.0), LocalDateTime.of(2021,
1, 1, 1, 1, 11)),
- changelogRow("+I", Int.box(2), Double.box(2.0), LocalDateTime.of(2022,
2, 2, 2, 2, 22)),
- // mismatch
- changelogRow("+I", Int.box(99), Double.box(99.0), LocalDateTime.of(2099,
2, 2, 2, 2, 2))
- )
-
- prepareTable(List("a0"), List("b0"), data1, data2)
-
- // could not optimize into delta join because there is calc between join
and source
- assertThatThrownBy(
- () =>
- tEnv
- .executeSql(
- s"insert into testSnk(l0, l1, l2, r0, r1, r2) " +
- "select * from ( " +
- " select a0, a1, a2 from testLeft" +
- ") join testRight " +
- "on a1 = b1")
- .await())
- .hasMessageContaining("The current sql doesn't support to do delta join
optimization.")
- }
-
@TestTemplate
def testFailOverAndRestore(): Unit = {
// enable checkpoint, we are using failing source to force have a complete
checkpoint
@@ -468,14 +441,16 @@ class DeltaJoinITCase(enableCache: Boolean) extends
StreamingTestBase {
val expected = List("+I[2.0, 2, 2022-02-02T02:02:02, 2, 2.0,
2022-02-02T02:02:22]")
testUpsertResult(
- List("a0"),
- List("b0"),
- data1,
- data2,
- "a0 = b0 and a1 = b1",
- expected,
- null,
- testFailingSource = true)
+ newTestSpecBuilder()
+ .withLeftIndex(List("a0"))
+ .withRightIndex(List("b0"))
+ .withLeftData(data1)
+ .withRightData(data2)
+ .withJoinCondition("a0 = b0 and a1 = b1")
+ .withSinkPk(List("l0", "r0"))
+ .withExpectedData(expected)
+ .withTestFailingSource(true)
+ .build())
}
/** TODO add index in DDL. */
@@ -510,26 +485,35 @@ class DeltaJoinITCase(enableCache: Boolean) extends
StreamingTestBase {
catalog.createTable(tablePath, newResolvedTable, false)
}
- private def testUpsertResult(
- leftIndex: List[String],
- rightIndex: List[String],
- leftData: List[Row],
- rightData: List[Row],
- joinKeyStr: String,
- expected: List[String],
- @Nullable expectedLookupFunctionInvokeCount: Integer,
- testFailingSource: Boolean = false): Unit = {
- prepareTable(leftIndex, rightIndex, leftData, rightData, testFailingSource)
+ private def testUpsertResult(testSpec: TestSpec): Unit = {
+ prepareTable(
+ testSpec.leftIndex,
+ testSpec.rightIndex,
+ testSpec.leftPk.orNull,
+ testSpec.rightPk.orNull,
+ testSpec.sinkPk,
+ testSpec.leftData,
+ testSpec.rightData,
+ testSpec.testFailingSource,
+ testSpec.leftChangelogMode,
+ testSpec.rightChangelogMode
+ )
+ val sql =
+ s"""
+ | insert into testSnk
+ | select * from testLeft join testRight on ${testSpec.joinCondition}
+ | ${if (testSpec.filterAfterJoin.isEmpty) "" else s"where
${testSpec.filterAfterJoin.get}"}
+ |""".stripMargin
tEnv
- .executeSql(s"insert into testSnk select * from testLeft join testRight
on $joinKeyStr")
+ .executeSql(sql)
.await(60, TimeUnit.SECONDS)
val result = TestValuesTableFactory.getResultsAsStrings("testSnk")
- assertThat(result.sorted).isEqualTo(expected.sorted)
- if (expectedLookupFunctionInvokeCount != null) {
+ assertThat(result.sorted).isEqualTo(testSpec.expected.sorted)
+ if (testSpec.expectedLookupFunctionInvokeCount.isDefined) {
assertThat(AsyncTestValueLookupFunction.invokeCount.get())
- .isEqualTo(expectedLookupFunctionInvokeCount)
+ .isEqualTo(testSpec.expectedLookupFunctionInvokeCount.get)
}
}
@@ -537,40 +521,67 @@ class DeltaJoinITCase(enableCache: Boolean) extends
StreamingTestBase {
leftIndex: List[String],
rightIndex: List[String],
leftData: List[Row],
+ rightData: List[Row]): Unit = {
+ prepareTable(
+ leftIndex,
+ rightIndex,
+ null,
+ null,
+ List("l0", "r0"),
+ leftData,
+ rightData,
+ testFailingSource = false,
+ "I",
+ "I")
+ }
+
+ private def prepareTable(
+ leftIndex: List[String],
+ rightIndex: List[String],
+ @Nullable leftPk: List[String],
+ @Nullable rightPk: List[String],
+ sinkPk: List[String],
+ leftData: List[Row],
rightData: List[Row],
- testFailingSource: Boolean = false): Unit = {
+ testFailingSource: Boolean,
+ leftChangelogMode: String,
+ rightChangelogMode: String): Unit = {
tEnv.executeSql("drop table if exists testLeft")
- tEnv.executeSql(s"""
- |create table testLeft(
- | a1 double,
- | a0 int,
- | a2 timestamp(3)
- |) with (
- | 'connector' = 'values',
- | 'bounded' = 'false',
- | 'changelog-mode' = 'I',
- | 'data-id' =
'${TestValuesTableFactory.registerData(leftData)}',
- | 'async' = 'true',
- | 'failing-source' = '$testFailingSource'
- |)
- |""".stripMargin)
+ tEnv.executeSql(
+ s"""
+ |create table testLeft(
+ | a1 double,
+ | a0 int,
+ | a2 timestamp(3)
+ | ${if (leftPk == null) "" else s", primary key
(${leftPk.mkString(",")}) not enforced"}
+ |) with (
+ | 'connector' = 'values',
+ | 'bounded' = 'false',
+ | 'changelog-mode' = '$leftChangelogMode',
+ | 'data-id' = '${TestValuesTableFactory.registerData(leftData)}',
+ | 'async' = 'true',
+ | 'failing-source' = '$testFailingSource'
+ |)
+ |""".stripMargin)
addIndex("testLeft", leftIndex)
tEnv.executeSql("drop table if exists testRight")
- tEnv.executeSql(s"""
- |create table testRight(
- | b0 int,
- | b1 double,
- | b2 timestamp(3)
- |) with (
- | 'connector' = 'values',
- | 'bounded' = 'false',
- | 'changelog-mode' = 'I',
- | 'data-id' =
'${TestValuesTableFactory.registerData(rightData)}',
- | 'async' = 'true',
- | 'failing-source' = '$testFailingSource'
- |)
- |""".stripMargin)
+ tEnv.executeSql(
+ s"""
+ |create table testRight(
+ | b0 int,
+ | b1 double,
+ | b2 timestamp(3)
+ | ${if (rightPk == null) "" else s", primary key
(${rightPk.mkString(",")}) not enforced"}
+ |) with (
+ | 'connector' = 'values',
+ | 'bounded' = 'false',
+ | 'changelog-mode' = '$rightChangelogMode',
+ | 'data-id' = '${TestValuesTableFactory.registerData(rightData)}',
+ | 'async' = 'true',
+ | 'failing-source' = '$testFailingSource'
+ |)
+ |""".stripMargin)
addIndex("testRight", rightIndex)
tEnv.executeSql("drop table if exists testSnk")
@@ -582,7 +593,7 @@ class DeltaJoinITCase(enableCache: Boolean) extends
StreamingTestBase {
| r0 int,
| r1 double,
| r2 timestamp(3),
- | primary key(l0, r0) not enforced
+ | primary key(${sinkPk.mkString(",")}) not enforced
|) with (
| 'connector' = 'values',
| 'bounded' = 'false',
@@ -591,6 +602,133 @@ class DeltaJoinITCase(enableCache: Boolean) extends
StreamingTestBase {
|""".stripMargin)
}
+ private def newTestSpecBuilder(): TestSpecBuilder = {
+ new TestSpecBuilder
+ }
+
+ private case class TestSpec(
+ leftIndex: List[String],
+ rightIndex: List[String],
+ leftPk: Option[List[String]],
+ rightPk: Option[List[String]],
+ sinkPk: List[String],
+ leftData: List[Row],
+ rightData: List[Row],
+ joinCondition: String,
+ filterAfterJoin: Option[String],
+ expected: List[String],
+ expectedLookupFunctionInvokeCount: Option[Int],
+ testFailingSource: Boolean,
+ leftChangelogMode: String,
+ rightChangelogMode: String
+ )
+
+ private class TestSpecBuilder {
+ private var leftIndex: Option[List[String]] = None
+ private var rightIndex: Option[List[String]] = None
+ private var leftPk: Option[List[String]] = None
+ private var rightPk: Option[List[String]] = None
+ private var sinkPk: Option[List[String]] = None
+ private var joinCondition: Option[String] = None
+ private var filterAfterJoin: Option[String] = None
+ private var leftData: Option[List[Row]] = None
+ private var rightData: Option[List[Row]] = None
+ private var expectedData: Option[List[String]] = None
+ private var expectedLookupFunctionInvokeCount: Option[Int] = None
+ private var testFailingSource: Boolean = false
+ private var leftChangelogMode: String = "I"
+ private var rightChangelogMode: String = "I"
+
+ def withLeftIndex(index: List[String]): TestSpecBuilder = {
+ leftIndex = Some(requireNonNull(index))
+ this
+ }
+
+ def withRightIndex(index: List[String]): TestSpecBuilder = {
+ rightIndex = Some(requireNonNull(index))
+ this
+ }
+
+ def withLeftPk(pk: List[String]): TestSpecBuilder = {
+ leftPk = Some(requireNonNull(pk))
+ this
+ }
+
+ def withRightPk(pk: List[String]): TestSpecBuilder = {
+ rightPk = Some(requireNonNull(pk))
+ this
+ }
+
+ def withSinkPk(pk: List[String]): TestSpecBuilder = {
+ sinkPk = Some(requireNonNull(pk))
+ this
+ }
+
+ def withLeftData(data: List[Row]): TestSpecBuilder = {
+ leftData = Some(requireNonNull(data))
+ this
+ }
+
+ def withRightData(data: List[Row]): TestSpecBuilder = {
+ rightData = Some(requireNonNull(data))
+ this
+ }
+
+ def withJoinCondition(condition: String): TestSpecBuilder = {
+ joinCondition = Some(requireNonNull(condition))
+ this
+ }
+
+ def withFilterAfterJoin(filter: String): TestSpecBuilder = {
+ filterAfterJoin = Some(requireNonNull(filter))
+ this
+ }
+
+ def withExpectedData(expected: List[String]): TestSpecBuilder = {
+ this.expectedData = Some(requireNonNull(expected))
+ this
+ }
+
+ def withExpectedLookupFunctionInvokeCount(count: Int): TestSpecBuilder = {
+ expectedLookupFunctionInvokeCount = Some(requireNonNull(count))
+ this
+ }
+
+ def withTestFailingSource(flag: Boolean): TestSpecBuilder = {
+ testFailingSource = requireNonNull(flag)
+ this
+ }
+
+ def withLeftChangelogMode(mode: String): TestSpecBuilder = {
+ leftChangelogMode = requireNonNull(mode)
+ this
+ }
+
+ def withRightChangelogMode(mode: String): TestSpecBuilder = {
+ rightChangelogMode = requireNonNull(mode)
+ this
+ }
+
+ def build(): TestSpec = {
+ TestSpec(
+ requireNonNull(leftIndex.orNull),
+ requireNonNull(rightIndex.orNull),
+ leftPk,
+ rightPk,
+ requireNonNull(sinkPk.orNull),
+ requireNonNull(leftData.orNull),
+ requireNonNull(rightData.orNull),
+ requireNonNull(joinCondition.orNull),
+ filterAfterJoin,
+ requireNonNull(expectedData.orNull),
+ expectedLookupFunctionInvokeCount,
+ testFailingSource,
+ leftChangelogMode,
+ rightChangelogMode
+ )
+ }
+
+ }
}
object DeltaJoinITCase {
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/deltajoin/StreamingDeltaJoinOperator.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/deltajoin/StreamingDeltaJoinOperator.java
index 45fc04b239c..2505550104b 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/deltajoin/StreamingDeltaJoinOperator.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/deltajoin/StreamingDeltaJoinOperator.java
@@ -357,8 +357,9 @@ public class StreamingDeltaJoinOperator
private void processElement(StreamRecord<RowData> element, int inputIndex)
throws Exception {
Preconditions.checkArgument(
- RowKind.INSERT == element.getValue().getRowKind(),
- "Currently, delta join only supports to consume append only
stream.");
+ RowKind.INSERT == element.getValue().getRowKind()
+ || RowKind.UPDATE_AFTER ==
element.getValue().getRowKind(),
+ "Currently, delta join only supports to consume insert record
or update after record.");
tryProcess();
StreamRecord<RowData> record;
boolean isLeft = isLeft(inputIndex);
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/deltajoin/StreamingDeltaJoinOperatorTest.java
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/deltajoin/StreamingDeltaJoinOperatorTest.java
index 17b286e9819..de52c8f2be2 100644
---
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/deltajoin/StreamingDeltaJoinOperatorTest.java
+++
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/deltajoin/StreamingDeltaJoinOperatorTest.java
@@ -35,7 +35,7 @@ import
org.apache.flink.table.data.conversion.DataStructureConverter;
import org.apache.flink.table.data.conversion.DataStructureConverters;
import org.apache.flink.table.runtime.collector.TableFunctionCollector;
import org.apache.flink.table.runtime.collector.TableFunctionResultFuture;
-import org.apache.flink.table.runtime.generated.GeneratedFunctionWrapper;
+import org.apache.flink.table.runtime.generated.GeneratedFunction;
import org.apache.flink.table.runtime.generated.GeneratedResultFutureWrapper;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import
org.apache.flink.table.runtime.operators.join.lookup.keyordered.AecRecord;
@@ -67,7 +67,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -83,6 +82,7 @@ import java.util.stream.Stream;
import static org.apache.flink.table.runtime.util.StreamRecordUtils.binaryrow;
import static
org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
+import static
org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -94,77 +94,9 @@ public class StreamingDeltaJoinOperatorTest {
private static final int CACHE_SIZE = 10;
// the data snapshot of the left/right table when joining
- private static final LinkedList<RowData> leftTableCurrentData = new
LinkedList<>();
- private static final LinkedList<RowData> rightTableCurrentData = new
LinkedList<>();
-
- /**
- * Mock sql like the following.
- *
- * <pre>
- * CREATE TABLE leftSrc(
- * left_value INT,
- * left_jk1 BOOLEAN,
- * left_jk2_lk STRING,
- * INDEX(left_jk2_lk)
- * )
- * </pre>
- *
- * <pre>
- * CREATE TABLE rightSrc(
- * right_jk2 STRING,
- * right_value INT,
- * right_jk1_lk BOOLEAN,
- * INDEX(right_jk1_lk)
- * )
- * </pre>
- *
- * <pre>
- * select * from leftSrc join rightSrc
- * on leftSrc.left_jk1 = rightSrc.right_jk1_lk
- * and leftSrc.left_jk2_lk = rightSrc.right_jk2
- * </pre>
- *
- * <p>For right lookup table(left stream side delta join right table), the
join key is
- * [right_jk1_lk, right_jk2], and it will be split into lookup key
[right_jk1_lk] and {@code
- * DeltaJoinSpec#remainingCondition} [right_jk2].
- *
- * <p>For left lookup table(right stream side delta join left table), the
join key is [left_jk1,
- * left_jk2_lk], and it will be split into lookup key [left_jk2_lk] and
{@code
- * DeltaJoinSpec#remainingCondition} [left_jk1].
- */
-
- // left join key: <left_jk1, left_jk2_lk>
- // left lookup key: <left_jk2_lk>
- private static final RowType leftRowType =
- RowType.of(
- new LogicalType[] {new IntType(), new BooleanType(),
VarCharType.STRING_TYPE},
- new String[] {"left_value", "left_jk1", "left_jk2_lk"});
-
- private static final InternalTypeInfo<RowData> leftTypeInfo =
InternalTypeInfo.of(leftRowType);
-
- private static final int[] leftJoinKeyIndices = new int[] {1, 2};
-
- // right join key: <right_jk1_lk, right_jk2>
- // right lookup key: <right_jk1_lk>
- private static final RowType rightRowType =
- RowType.of(
- new LogicalType[] {VarCharType.STRING_TYPE, new IntType(),
new BooleanType()},
- new String[] {"right_jk2", "right_value", "right_jk1_lk"});
-
- private static final InternalTypeInfo<RowData> rightTypeInfo =
- InternalTypeInfo.of(rightRowType);
-
- private static final int[] rightJoinKeyIndices = new int[] {2, 0};
-
- private static final RowDataKeySelector leftJoinKeySelector =
- HandwrittenSelectorUtil.getRowDataSelector(
- leftJoinKeyIndices, leftRowType.getChildren().toArray(new
LogicalType[0]));
- private static final RowDataKeySelector rightJoinKeySelector =
- HandwrittenSelectorUtil.getRowDataSelector(
- rightJoinKeyIndices,
rightRowType.getChildren().toArray(new LogicalType[0]));
-
- private static final int[] outputFieldIndices =
- IntStream.range(0, leftTypeInfo.getArity() +
rightTypeInfo.getArity()).toArray();
+ // <upsert key, data>
+ private static final HashMap<RowData, RowData> leftTableCurrentData = new
HashMap<>();
+ private static final HashMap<RowData, RowData> rightTableCurrentData = new
HashMap<>();
@Parameters(name = "EnableCache = {0}")
public static List<Boolean> parameters() {
@@ -181,46 +113,6 @@ public class StreamingDeltaJoinOperatorTest {
@BeforeEach
public void beforeEach() throws Exception {
- testHarness = createDeltaJoinOperatorTestHarness();
- testHarness.setup();
- testHarness.open();
- StreamingDeltaJoinOperator operator = unwrapOperator(testHarness);
- // set external failure cause consumer to prevent hang
- testHarness
- .getEnvironment()
- .setExternalFailureCauseConsumer(
- error -> {
- latestException = Optional.of(error);
- // DO NOT throw exception up again to avoid hang
- });
- operator.setAsyncExecutionController(
- new
MyAsyncExecutionControllerDelegate(operator.getAsyncExecutionController()));
- prepareOperatorRuntimeInfo(operator);
-
- assertor =
- new RowDataHarnessAssertor(
- getOutputType().getChildren().toArray(new
LogicalType[0]),
- // sort the result by the output upsert key
- (o1, o2) -> {
- for (int keyIndex : outputFieldIndices) {
- LogicalType type =
getOutputType().getChildren().get(keyIndex);
- RowData.FieldGetter getter =
- RowData.createFieldGetter(type,
keyIndex);
-
- int compareResult =
-
Objects.requireNonNull(getter.getFieldOrNull(o1))
- .toString()
- .compareTo(
- Objects.requireNonNull(
-
getter.getFieldOrNull(o2))
- .toString());
-
- if (compareResult != 0) {
- return compareResult;
- }
- }
- return o1.toString().compareTo(o2.toString());
- });
MyAsyncFunction.leftInvokeCount.set(0);
MyAsyncFunction.rightInvokeCount.set(0);
MyAsyncExecutionControllerDelegate.insertTableDataAfterEmit = true;
@@ -228,7 +120,9 @@ public class StreamingDeltaJoinOperatorTest {
@AfterEach
public void afterEach() throws Exception {
- testHarness.close();
+ if (assertor != null) {
+ testHarness.close();
+ }
leftTableCurrentData.clear();
rightTableCurrentData.clear();
latestException = Optional.empty();
@@ -236,7 +130,11 @@ public class StreamingDeltaJoinOperatorTest {
}
@TestTemplate
- void testJoinBothAppendOnlyTables() throws Exception {
+ void testJoinBothLogTables() throws Exception {
+ LogLogTableJoinTestSpec testSpec = LogLogTableJoinTestSpec.INSTANCE;
+ initTestHarness(testSpec);
+ initAssertor(testSpec);
+
StreamRecord<RowData> leftRecord1 = insertRecord(100, true, "jklk1");
StreamRecord<RowData> leftRecord2 = insertRecord(100, false, "jklk2");
testHarness.processElement1(leftRecord1);
@@ -293,6 +191,8 @@ public class StreamingDeltaJoinOperatorTest {
DeltaJoinCache cache = unwrapCache(testHarness);
if (enableCache) {
+ RowType leftRowType = testSpec.getLeftInputRowType();
+ RowType rightRowType = testSpec.getRightInputRowType();
Map<RowData, Map<RowData, Object>> expectedLeftCacheData =
newHashMap(
binaryrow(true, "jklk1"),
@@ -339,8 +239,102 @@ public class StreamingDeltaJoinOperatorTest {
}
}
+ @TestTemplate
+ void testJoinBothPkTables() throws Exception {
+ PkPkTableJoinTestSpec testSpec = PkPkTableJoinTestSpec.INSTANCE;
+ initTestHarness(testSpec);
+ initAssertor(testSpec);
+
+ StreamRecord<RowData> leftRecordK1V1 = insertRecord(100, true, "Tom");
+ StreamRecord<RowData> leftRecordK2V1 = insertRecord(101, false, "Tom");
+ // mismatch
+ StreamRecord<RowData> leftRecordK3V1 = insertRecord(1999, false,
"Jim");
+ testHarness.processElement1(leftRecordK1V1);
+ testHarness.processElement1(leftRecordK2V1);
+ testHarness.processElement1(leftRecordK3V1);
+
+ waitAllDataProcessed();
+ final ConcurrentLinkedQueue<Object> expectedOutput = new
ConcurrentLinkedQueue<>();
+ assertor.assertOutputEqualsSorted(
+ "result mismatch", expectedOutput, testHarness.getOutput());
+
+ StreamRecord<RowData> rightRecordK1V1 = insertRecord("Tom", 200, true);
+ StreamRecord<RowData> rightRecordK2V1 = insertRecord("Tom", 201,
false);
+ // mismatch
+ StreamRecord<RowData> rightRecordK3V1 = insertRecord("Sam", 2999,
false);
+ testHarness.processElement2(rightRecordK1V1);
+ testHarness.processElement2(rightRecordK2V1);
+ testHarness.processElement2(rightRecordK3V1);
+
+ waitAllDataProcessed();
+ expectedOutput.add(insertRecord(100, true, "Tom", "Tom", 200, true));
+ expectedOutput.add(insertRecord(101, false, "Tom", "Tom", 200, true));
+ expectedOutput.add(insertRecord(100, true, "Tom", "Tom", 201, false));
+ expectedOutput.add(insertRecord(101, false, "Tom", "Tom", 201, false));
+ assertor.assertOutputEqualsSorted(
+ "result mismatch", expectedOutput, testHarness.getOutput());
+
+ StreamRecord<RowData> leftRecordK1V2 = updateAfterRecord(1000, true,
"Tom");
+ testHarness.processElement1(leftRecordK1V2);
+
+ waitAllDataProcessed();
+ expectedOutput.add(updateAfterRecord(1000, true, "Tom", "Tom", 200,
true));
+ expectedOutput.add(updateAfterRecord(1000, true, "Tom", "Tom", 201,
false));
+ assertor.assertOutputEqualsSorted(
+ "result mismatch", expectedOutput, testHarness.getOutput());
+
+ StreamRecord<RowData> rightRecordK1V2 = updateAfterRecord("Tom", 2000,
true);
+ StreamRecord<RowData> rightRecordK2V2 = updateAfterRecord("Tom", 2001,
false);
+ testHarness.processElement2(rightRecordK1V2);
+ testHarness.processElement2(rightRecordK2V2);
+
+ waitAllDataProcessed();
+ expectedOutput.add(updateAfterRecord(1000, true, "Tom", "Tom", 2000,
true));
+ expectedOutput.add(updateAfterRecord(101, false, "Tom", "Tom", 2000,
true));
+ expectedOutput.add(updateAfterRecord(1000, true, "Tom", "Tom", 2001,
false));
+ expectedOutput.add(updateAfterRecord(101, false, "Tom", "Tom", 2001,
false));
+ assertor.assertOutputEqualsSorted(
+ "result mismatch", expectedOutput, testHarness.getOutput());
+
+ DeltaJoinCache cache = unwrapCache(testHarness);
+ if (enableCache) {
+ Map<RowData, Map<RowData, Object>> expectedLeftCacheData =
+ newHashMap(
+ binaryrow("Tom"),
+ newHashMap(
+ binaryrow(true, "Tom"),
+ leftRecordK1V2.getValue(),
+ binaryrow(false, "Tom"),
+ leftRecordK2V1.getValue()),
+ binaryrow("Sam"),
+ Collections.emptyMap());
+
+ Map<RowData, Map<RowData, Object>> expectedRightCacheData =
+ newHashMap(
+ binaryrow("Tom"),
+ newHashMap(
+ binaryrow("Tom", true),
+ rightRecordK1V2.getValue(),
+ binaryrow("Tom", false),
+ rightRecordK2V2.getValue()),
+ binaryrow("Jim"),
+ Collections.emptyMap());
+ verifyCacheData(cache, expectedLeftCacheData,
expectedRightCacheData, 5, 3, 4, 2);
+ assertThat(MyAsyncFunction.leftInvokeCount.get()).isEqualTo(2);
+ assertThat(MyAsyncFunction.rightInvokeCount.get()).isEqualTo(2);
+ } else {
+ verifyCacheData(cache, Collections.emptyMap(),
Collections.emptyMap(), 0, 0, 0, 0);
+ assertThat(MyAsyncFunction.leftInvokeCount.get()).isEqualTo(4);
+ assertThat(MyAsyncFunction.rightInvokeCount.get()).isEqualTo(5);
+ }
+ }
+
@TestTemplate
void testBlockingWithSameJoinKey() throws Exception {
+ LogLogTableJoinTestSpec testSpec = LogLogTableJoinTestSpec.INSTANCE;
+ initTestHarness(testSpec);
+ initAssertor(testSpec);
+
// block the async function
MyAsyncFunction.block();
@@ -377,6 +371,9 @@ public class StreamingDeltaJoinOperatorTest {
assertThat(recordsBuffer.getActiveBuffer().size()).isEqualTo(3);
assertThat(recordsBuffer.getBlockingBuffer().size()).isEqualTo(2);
+ RowDataKeySelector leftJoinKeySelector =
testSpec.getLeftJoinKeySelector();
+ RowDataKeySelector rightJoinKeySelector =
testSpec.getRightJoinKeySelector();
+
RowData joinKey1 = leftJoinKeySelector.getKey(insertRecord(100, true,
"jklk1").getValue());
RowData joinKey2 = leftJoinKeySelector.getKey(insertRecord(100, false,
"jklk2").getValue());
RowData joinKey3 =
@@ -411,6 +408,8 @@ public class StreamingDeltaJoinOperatorTest {
DeltaJoinCache cache = unwrapCache(testHarness);
if (enableCache) {
+ RowType leftRowType = testSpec.getLeftInputRowType();
+ RowType rightRowType = testSpec.getRightInputRowType();
Map<RowData, Map<RowData, Object>> expectedLeftCacheData =
newHashMap(
binaryrow(true, "jklk1"),
@@ -457,29 +456,33 @@ public class StreamingDeltaJoinOperatorTest {
* source and delta-join).
*/
@TestTemplate
- void testTableDataVisibleBeforeJoin() throws Exception {
+ void testLogTableDataVisibleBeforeJoin() throws Exception {
+ LogLogTableJoinTestSpec testSpec = LogLogTableJoinTestSpec.INSTANCE;
+ initTestHarness(testSpec);
+ initAssertor(testSpec);
+
MyAsyncExecutionControllerDelegate.insertTableDataAfterEmit = false;
// prepare the data first to mock all following requests were in
flight between source and
// delta-join
final StreamRecord<RowData> leftRecord1 = insertRecord(100, true,
"jklk1");
- insertLeftTable(leftRecord1);
+ insertLeftTable(testSpec, leftRecord1);
final StreamRecord<RowData> leftRecord2 = insertRecord(200, true,
"jklk1");
- insertLeftTable(leftRecord2);
+ insertLeftTable(testSpec, leftRecord2);
final StreamRecord<RowData> rightRecord1 = insertRecord("jklk1", 300,
true);
- insertRightTable(rightRecord1);
+ insertRightTable(testSpec, rightRecord1);
// mismatch
final StreamRecord<RowData> rightRecord2 = insertRecord("jklk2", 500,
false);
- insertRightTable(rightRecord2);
+ insertRightTable(testSpec, rightRecord2);
final StreamRecord<RowData> leftRecord3 = insertRecord(800, true,
"jklk1");
- insertLeftTable(leftRecord3);
+ insertLeftTable(testSpec, leftRecord3);
final StreamRecord<RowData> rightRecord3 = insertRecord("jklk1", 1000,
true);
- insertRightTable(rightRecord3);
+ insertRightTable(testSpec, rightRecord3);
testHarness.processElement1(leftRecord1);
testHarness.processElement1(leftRecord2);
@@ -525,6 +528,8 @@ public class StreamingDeltaJoinOperatorTest {
DeltaJoinCache cache = unwrapCache(testHarness);
if (enableCache) {
+ RowType leftRowType = testSpec.getLeftInputRowType();
+ RowType rightRowType = testSpec.getRightInputRowType();
Map<RowData, Map<RowData, Object>> expectedLeftCacheData =
newHashMap(
binaryrow(true, "jklk1"),
@@ -557,8 +562,134 @@ public class StreamingDeltaJoinOperatorTest {
}
}
+ /**
+ * This test is used to test the scenario where the right stream side
joined out a record from
+ * the left table that has not been sent to the delta-join operator (maybe
is in flight between
+ * source and delta-join).
+ */
+ @TestTemplate
+ void testPkTableDataVisibleBeforeJoin() throws Exception {
+ PkPkTableJoinTestSpec testSpec = PkPkTableJoinTestSpec.INSTANCE;
+ initTestHarness(testSpec);
+ initAssertor(testSpec);
+
+ MyAsyncExecutionControllerDelegate.insertTableDataAfterEmit = false;
+
+ // prepare the data first to mock all following requests were in
flight between source and
+ // delta-join
+ final StreamRecord<RowData> leftRecordK1V1 = insertRecord(100, true,
"Tom");
+ insertLeftTable(testSpec, leftRecordK1V1);
+ final StreamRecord<RowData> leftRecordK1V2 = updateAfterRecord(1000,
true, "Tom");
+ insertLeftTable(testSpec, leftRecordK1V2);
+
+ final StreamRecord<RowData> leftRecordK2V1 = insertRecord(101, false,
"Tom");
+ insertLeftTable(testSpec, leftRecordK2V1);
+
+ // mismatch
+ final StreamRecord<RowData> leftRecordK3V1 = insertRecord(101, false,
"Jim");
+ insertLeftTable(testSpec, leftRecordK3V1);
+ final StreamRecord<RowData> leftRecordK3V2 = updateAfterRecord(1001,
false, "Jim");
+ insertLeftTable(testSpec, leftRecordK3V2);
+
+ final StreamRecord<RowData> rightRecordK1V1 = insertRecord("Tom", 200,
true);
+ insertRightTable(testSpec, rightRecordK1V1);
+ final StreamRecord<RowData> rightRecordK1V2 = updateAfterRecord("Tom",
2000, true);
+ insertRightTable(testSpec, rightRecordK1V2);
+ final StreamRecord<RowData> rightRecordK1V3 = updateAfterRecord("Tom",
20000, true);
+ insertRightTable(testSpec, rightRecordK1V3);
+
+ final StreamRecord<RowData> rightRecordK2V1 = insertRecord("Tom", 201,
false);
+ insertRightTable(testSpec, rightRecordK2V1);
+
+ // mismatch
+ final StreamRecord<RowData> rightRecordK3V1 = insertRecord("Sam", 999,
false);
+ insertRightTable(testSpec, rightRecordK3V1);
+
+ final ConcurrentLinkedQueue<Object> expectedOutput = new
ConcurrentLinkedQueue<>();
+
+ testHarness.processElement1(leftRecordK1V1);
+ expectedOutput.add(insertRecord(100, true, "Tom", "Tom", 20000, true));
+ expectedOutput.add(insertRecord(100, true, "Tom", "Tom", 201, false));
+
+ testHarness.processElement1(leftRecordK1V2);
+ expectedOutput.add(updateAfterRecord(1000, true, "Tom", "Tom", 20000,
true));
+ expectedOutput.add(updateAfterRecord(1000, true, "Tom", "Tom", 201,
false));
+
+ testHarness.processElement1(leftRecordK2V1);
+ expectedOutput.add(insertRecord(101, false, "Tom", "Tom", 20000,
true));
+ expectedOutput.add(insertRecord(101, false, "Tom", "Tom", 201, false));
+
+ testHarness.processElement1(leftRecordK3V1);
+ testHarness.processElement1(leftRecordK3V2);
+
+ testHarness.processElement2(rightRecordK1V1);
+ expectedOutput.add(insertRecord(1000, true, "Tom", "Tom", 200, true));
+ expectedOutput.add(insertRecord(101, false, "Tom", "Tom", 200, true));
+
+ testHarness.processElement2(rightRecordK1V2);
+ expectedOutput.add(updateAfterRecord(1000, true, "Tom", "Tom", 2000,
true));
+ expectedOutput.add(updateAfterRecord(101, false, "Tom", "Tom", 2000,
true));
+
+ testHarness.processElement2(rightRecordK1V3);
+ expectedOutput.add(updateAfterRecord(1000, true, "Tom", "Tom", 20000,
true));
+ expectedOutput.add(updateAfterRecord(101, false, "Tom", "Tom", 20000,
true));
+
+ testHarness.processElement2(rightRecordK2V1);
+ expectedOutput.add(insertRecord(1000, true, "Tom", "Tom", 201, false));
+ expectedOutput.add(insertRecord(101, false, "Tom", "Tom", 201, false));
+
+ testHarness.processElement2(rightRecordK3V1);
+
+ waitAllDataProcessed();
+
+ assertor.assertOutputEqualsSorted(
+ "result mismatch", expectedOutput, testHarness.getOutput());
+
+ TableAsyncExecutionController<RowData, RowData, RowData> aec =
unwrapAEC(testHarness);
+ assertThat(aec.getBlockingSize()).isEqualTo(0);
+ assertThat(aec.getInFlightSize()).isEqualTo(0);
+ assertThat(aec.getFinishSize()).isEqualTo(0);
+
+ DeltaJoinCache cache = unwrapCache(testHarness);
+ if (enableCache) {
+ Map<RowData, Map<RowData, Object>> expectedLeftCacheData =
+ newHashMap(
+ binaryrow("Tom"),
+ newHashMap(
+ binaryrow(true, "Tom"),
+ leftRecordK1V2.getValue(),
+ binaryrow(false, "Tom"),
+ leftRecordK2V1.getValue()),
+ binaryrow("Sam"),
+ Collections.emptyMap());
+
+ Map<RowData, Map<RowData, Object>> expectedRightCacheData =
+ newHashMap(
+ binaryrow("Tom"),
+ newHashMap(
+ binaryrow("Tom", true),
+ rightRecordK1V3.getValue(),
+ binaryrow("Tom", false),
+ rightRecordK2V1.getValue()),
+ binaryrow("Jim"),
+ Collections.emptyMap());
+
+ verifyCacheData(cache, expectedLeftCacheData,
expectedRightCacheData, 5, 3, 5, 3);
+ assertThat(MyAsyncFunction.leftInvokeCount.get()).isEqualTo(2);
+ assertThat(MyAsyncFunction.rightInvokeCount.get()).isEqualTo(2);
+ } else {
+ verifyCacheData(cache, Collections.emptyMap(),
Collections.emptyMap(), 0, 0, 0, 0);
+ assertThat(MyAsyncFunction.leftInvokeCount.get()).isEqualTo(5);
+ assertThat(MyAsyncFunction.rightInvokeCount.get()).isEqualTo(5);
+ }
+ }
+
@TestTemplate
void testCheckpointAndRestore() throws Exception {
+ LogLogTableJoinTestSpec testSpec = LogLogTableJoinTestSpec.INSTANCE;
+ initTestHarness(testSpec);
+ initAssertor(testSpec);
+
// block the async function
MyAsyncFunction.block();
@@ -599,19 +730,19 @@ public class StreamingDeltaJoinOperatorTest {
MyAsyncFunction.block();
// restoring
- testHarness = createDeltaJoinOperatorTestHarness();
+ testHarness = createDeltaJoinOperatorTestHarness(testSpec);
testHarness.setup();
StreamingDeltaJoinOperator operator = unwrapOperator(testHarness);
operator.setAsyncExecutionController(
- new
MyAsyncExecutionControllerDelegate(operator.getAsyncExecutionController()));
+ new MyAsyncExecutionControllerDelegate(
+ testSpec, operator.getAsyncExecutionController()));
latestException = Optional.empty();
testHarness.initializeState(snapshot);
testHarness.open();
- prepareOperatorRuntimeInfo(operator);
aec = unwrapAEC(testHarness);
assertThat(aec.getBlockingSize()).isEqualTo(2);
@@ -641,6 +772,8 @@ public class StreamingDeltaJoinOperatorTest {
DeltaJoinCache cache = unwrapCache(testHarness);
if (enableCache) {
+ RowType leftRowType = testSpec.getLeftInputRowType();
+ RowType rightRowType = testSpec.getRightInputRowType();
Map<RowData, Map<RowData, Object>> expectedLeftCacheData =
newHashMap(
binaryrow(true, "jklk1"),
@@ -671,6 +804,10 @@ public class StreamingDeltaJoinOperatorTest {
@TestTemplate
void testClearLegacyStateWhenCheckpointing() throws Exception {
+ LogLogTableJoinTestSpec testSpec = LogLogTableJoinTestSpec.INSTANCE;
+ initTestHarness(testSpec);
+ initAssertor(testSpec);
+
// block the async function
MyAsyncFunction.block();
@@ -722,6 +859,10 @@ public class StreamingDeltaJoinOperatorTest {
@TestTemplate
void testMeetExceptionWhenLookup() throws Exception {
+ LogLogTableJoinTestSpec testSpec = LogLogTableJoinTestSpec.INSTANCE;
+ initTestHarness(testSpec);
+ initAssertor(testSpec);
+
Throwable expectedException = new IllegalStateException("Mock to
fail");
MyAsyncFunction.setExpectedThrownException(expectedException);
@@ -739,6 +880,52 @@ public class StreamingDeltaJoinOperatorTest {
.isEqualTo(expectedException);
}
+ private void initTestHarness(AbstractTestSpec testSpec) throws Exception {
+ testHarness = createDeltaJoinOperatorTestHarness(testSpec);
+ testHarness.setup();
+ testHarness.open();
+ StreamingDeltaJoinOperator operator = unwrapOperator(testHarness);
+ // set external failure cause consumer to prevent hang
+ testHarness
+ .getEnvironment()
+ .setExternalFailureCauseConsumer(
+ error -> {
+ latestException = Optional.of(error);
+ // DO NOT throw exception up again to avoid hang
+ });
+ operator.setAsyncExecutionController(
+ new MyAsyncExecutionControllerDelegate(
+ testSpec, operator.getAsyncExecutionController()));
+ }
+
+ private void initAssertor(AbstractTestSpec testSpec) {
+ RowType outputRowType = testSpec.getOutputRowType();
+ assertor =
+ new RowDataHarnessAssertor(
+ outputRowType.getChildren().toArray(new
LogicalType[0]),
+ // sort the result by the output upsert key
+ (o1, o2) -> {
+ for (int keyIndex :
testSpec.getOutputFieldIndices()) {
+ LogicalType type =
outputRowType.getChildren().get(keyIndex);
+ RowData.FieldGetter getter =
+ RowData.createFieldGetter(type,
keyIndex);
+
+ int compareResult =
+
Objects.requireNonNull(getter.getFieldOrNull(o1))
+ .toString()
+ .compareTo(
+ Objects.requireNonNull(
+
getter.getFieldOrNull(o2))
+ .toString());
+
+ if (compareResult != 0) {
+ return compareResult;
+ }
+ }
+ return o1.toString().compareTo(o2.toString());
+ });
+ }
+
private void verifyCacheData(
DeltaJoinCache actualCache,
Map<RowData, Map<RowData, Object>> expectedLeftCacheData,
@@ -820,58 +1007,68 @@ public class StreamingDeltaJoinOperatorTest {
}
private KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData,
RowData>
- createDeltaJoinOperatorTestHarness() throws Exception {
+ createDeltaJoinOperatorTestHarness(AbstractTestSpec testSpec)
throws Exception {
TaskMailbox mailbox = new TaskMailboxImpl();
MailboxProcessor mailboxProcessor =
new MailboxProcessor(controller -> {}, mailbox,
StreamTaskActionExecutor.IMMEDIATE);
DataStructureConverter<RowData, Object> leftFetcherConverter =
(DataStructureConverter)
-
DataStructureConverters.getConverter(leftTypeInfo.getDataType());
-
- RowDataKeySelector leftUpsertKeySelector =
getUpsertKeySelector(leftRowType, null);
- RowDataKeySelector rightUpsertKeySelector =
getUpsertKeySelector(rightRowType, null);
+ DataStructureConverters.getConverter(
+ testSpec.getLeftTypeInfo().getDataType());
AsyncDeltaJoinRunner leftAsyncFunction =
new AsyncDeltaJoinRunner(
- new GeneratedFunctionWrapper<>(new MyAsyncFunction()),
+ new GeneratedFunction<>("", "", new Object[0]) {
+ @Override
+ public MyAsyncFunction newInstance(ClassLoader
classLoader) {
+ return new MyAsyncFunction(testSpec, false);
+ }
+ },
leftFetcherConverter,
new GeneratedResultFutureWrapper<>(new
TestingFetcherResultFuture()),
- leftTypeInfo.toRowSerializer(),
- leftJoinKeySelector,
- leftUpsertKeySelector,
- rightJoinKeySelector,
- rightUpsertKeySelector,
+ testSpec.getLeftTypeInfo().toRowSerializer(),
+ testSpec.getLeftJoinKeySelector(),
+ testSpec.getLeftUpsertKeySelector(),
+ testSpec.getRightJoinKeySelector(),
+ testSpec.getRightUpsertKeySelector(),
AEC_CAPACITY,
false,
enableCache);
DataStructureConverter<RowData, Object> rightFetcherConverter =
(DataStructureConverter)
-
DataStructureConverters.getConverter(rightTypeInfo.getDataType());
+ DataStructureConverters.getConverter(
+ testSpec.getRightTypeInfo().getDataType());
AsyncDeltaJoinRunner rightAsyncFunction =
new AsyncDeltaJoinRunner(
- new GeneratedFunctionWrapper<>(new MyAsyncFunction()),
+ new GeneratedFunction<>("", "", new Object[0]) {
+ @Override
+ public MyAsyncFunction newInstance(ClassLoader
classLoader) {
+ return new MyAsyncFunction(testSpec, true);
+ }
+ },
rightFetcherConverter,
new GeneratedResultFutureWrapper<>(new
TestingFetcherResultFuture()),
- rightTypeInfo.toRowSerializer(),
- leftJoinKeySelector,
- leftUpsertKeySelector,
- rightJoinKeySelector,
- rightUpsertKeySelector,
+ testSpec.getRightTypeInfo().toRowSerializer(),
+ testSpec.getLeftJoinKeySelector(),
+ testSpec.getLeftUpsertKeySelector(),
+ testSpec.getRightJoinKeySelector(),
+ testSpec.getRightUpsertKeySelector(),
AEC_CAPACITY,
true,
enableCache);
- InternalTypeInfo<RowData> joinKeyTypeInfo =
leftJoinKeySelector.getProducedType();
+ InternalTypeInfo<RowData> joinKeyTypeInfo =
+ testSpec.getLeftJoinKeySelector().getProducedType();
StreamingDeltaJoinOperator operator =
new StreamingDeltaJoinOperator(
rightAsyncFunction,
leftAsyncFunction,
- leftJoinKeySelector,
- rightJoinKeySelector,
+ testSpec.getLeftJoinKeySelector(),
+ testSpec.getRightJoinKeySelector(),
-1L,
AEC_CAPACITY,
new TestProcessingTimeService(),
@@ -879,41 +1076,19 @@ public class StreamingDeltaJoinOperatorTest {
mailbox, 0,
StreamTaskActionExecutor.IMMEDIATE, mailboxProcessor),
CACHE_SIZE,
CACHE_SIZE,
- leftRowType,
- rightRowType);
+ testSpec.getLeftInputRowType(),
+ testSpec.getRightInputRowType());
return new KeyedTwoInputStreamOperatorTestHarness<>(
operator,
- leftJoinKeySelector,
- rightJoinKeySelector,
+ testSpec.getLeftJoinKeySelector(),
+ testSpec.getRightJoinKeySelector(),
joinKeyTypeInfo,
1,
1,
0,
- leftTypeInfo.toSerializer(),
- rightTypeInfo.toSerializer());
- }
-
- private RowDataKeySelector getUpsertKeySelector(RowType rowType, @Nullable
int[] upsertKey) {
- if (upsertKey == null) {
- upsertKey = IntStream.range(0, rowType.getFieldCount()).toArray();
- }
- return HandwrittenSelectorUtil.getRowDataSelector(
- upsertKey, rowType.getChildren().toArray(new LogicalType[0]));
- }
-
- private void prepareOperatorRuntimeInfo(StreamingDeltaJoinOperator
operator) {
- unwrapAsyncFunction(operator, true).tagInvokingSideDuringRuntime(true);
- unwrapAsyncFunction(operator,
false).tagInvokingSideDuringRuntime(false);
- }
-
- private MyAsyncFunction unwrapAsyncFunction(
- StreamingDeltaJoinOperator operator, boolean unwrapLeft) {
- if (unwrapLeft) {
- return (MyAsyncFunction)
operator.getLeftTriggeredUserFunction().getFetcher();
- } else {
- return (MyAsyncFunction)
operator.getRightTriggeredUserFunction().getFetcher();
- }
+ testSpec.getLeftTypeInfo().toSerializer(),
+ testSpec.getRightTypeInfo().toSerializer());
}
private TableAsyncExecutionController<RowData, RowData, RowData> unwrapAEC(
@@ -941,36 +1116,27 @@ public class StreamingDeltaJoinOperatorTest {
return cacheInLeftRunner;
}
- private RowType getOutputType() {
- return RowType.of(
- Stream.concat(
- leftRowType.getChildren().stream(),
- rightRowType.getChildren().stream())
- .toArray(LogicalType[]::new),
- Stream.concat(
- leftRowType.getFieldNames().stream(),
- rightRowType.getFieldNames().stream())
- .toArray(String[]::new));
+ private void insertLeftTable(AbstractTestSpec testSpec,
StreamRecord<RowData> record) {
+ insertTableData(testSpec, record, true);
}
- private void insertLeftTable(StreamRecord<RowData> record) {
- insertTableData(record, true);
+ private void insertRightTable(AbstractTestSpec testSpec,
StreamRecord<RowData> record) {
+ insertTableData(testSpec, record, false);
}
- private void insertRightTable(StreamRecord<RowData> record) {
- insertTableData(record, false);
- }
-
- private static void insertTableData(StreamRecord<RowData> record, boolean
insertLeftTable) {
+ private static void insertTableData(
+ AbstractTestSpec testSpec, StreamRecord<RowData> record, boolean
insertLeftTable) {
RowData rowData = record.getValue();
try {
if (insertLeftTable) {
synchronized (leftTableCurrentData) {
- leftTableCurrentData.add(rowData);
+ RowData upsertKey =
testSpec.getLeftUpsertKeySelector().getKey(rowData);
+ leftTableCurrentData.put(upsertKey, rowData);
}
} else {
synchronized (rightTableCurrentData) {
- rightTableCurrentData.add(rowData);
+ RowData upsertKey =
testSpec.getRightUpsertKeySelector().getKey(rowData);
+ rightTableCurrentData.put(upsertKey, rowData);
}
}
} catch (Exception e) {
@@ -1018,11 +1184,12 @@ public class StreamingDeltaJoinOperatorTest {
private static Optional<Throwable> expectedThrownException =
Optional.empty();
- // ===== runtime info =====
- private Boolean treatRightAsLookupTable;
+ private final AbstractTestSpec testSpec;
+ private final boolean treatRightAsLookupTable;
- public void tagInvokingSideDuringRuntime(boolean isLeftInvoking) {
- this.treatRightAsLookupTable = isLeftInvoking;
+ private MyAsyncFunction(AbstractTestSpec testSpec, boolean
treatRightAsLookupTable) {
+ this.testSpec = testSpec;
+ this.treatRightAsLookupTable = treatRightAsLookupTable;
}
public static void block() throws Exception {
@@ -1054,29 +1221,29 @@ public class StreamingDeltaJoinOperatorTest {
lock.await();
}
- LinkedList<RowData> lookupTableData;
+ HashMap<RowData, RowData> lookupTableData;
RowDataKeySelector streamSideJoinKeySelector;
RowDataKeySelector lookupSideJoinKeySelector;
- if
(Objects.requireNonNull(treatRightAsLookupTable)) {
+ if (treatRightAsLookupTable) {
synchronized (rightTableCurrentData) {
- lookupTableData = new
LinkedList<>(rightTableCurrentData);
+ lookupTableData = new
HashMap<>(rightTableCurrentData);
}
- streamSideJoinKeySelector =
leftJoinKeySelector.copy();
- lookupSideJoinKeySelector =
rightJoinKeySelector.copy();
+ streamSideJoinKeySelector =
testSpec.getLeftJoinKeySelector();
+ lookupSideJoinKeySelector =
testSpec.getRightJoinKeySelector();
leftInvokeCount.incrementAndGet();
} else {
synchronized (leftTableCurrentData) {
- lookupTableData = new
LinkedList<>(leftTableCurrentData);
+ lookupTableData = new
HashMap<>(leftTableCurrentData);
}
- streamSideJoinKeySelector =
rightJoinKeySelector.copy();
- lookupSideJoinKeySelector =
leftJoinKeySelector.copy();
+ streamSideJoinKeySelector =
testSpec.getRightJoinKeySelector();
+ lookupSideJoinKeySelector =
testSpec.getLeftJoinKeySelector();
rightInvokeCount.incrementAndGet();
}
List<Object> results = new ArrayList<>();
- for (RowData row : lookupTableData) {
+ for (RowData row : lookupTableData.values()) {
if (streamSideJoinKeySelector
.getKey(input)
.equals(lookupSideJoinKeySelector.getKey(row))) {
@@ -1139,6 +1306,7 @@ public class StreamingDeltaJoinOperatorTest {
private static boolean insertTableDataAfterEmit = true;
public MyAsyncExecutionControllerDelegate(
+ AbstractTestSpec testSpec,
TableAsyncExecutionController<RowData, RowData, RowData>
innerAec) {
super(
innerAec.getAsyncInvoke(),
@@ -1153,6 +1321,7 @@ public class StreamingDeltaJoinOperatorTest {
int inputIndex =
inputIndexAwareEntry.getInputIndex();
//noinspection unchecked
insertTableData(
+ testSpec,
(StreamRecord<RowData>)
inputIndexAwareEntry.getInputElement(),
inputIndex == 0);
}
@@ -1178,4 +1347,205 @@ public class StreamingDeltaJoinOperatorTest {
getResultFuture().complete((Collection) result);
}
}
+
+ private abstract static class AbstractTestSpec {
+
+ abstract RowType getLeftInputRowType();
+
+ final InternalTypeInfo<RowData> getLeftTypeInfo() {
+ return InternalTypeInfo.of(getLeftInputRowType());
+ }
+
+ abstract Optional<int[]> getLeftUpsertKey();
+
+ final RowDataKeySelector getLeftUpsertKeySelector() {
+ return getUpsertKeySelector(getLeftInputRowType(),
getLeftUpsertKey().orElse(null));
+ }
+
+ abstract RowType getRightInputRowType();
+
+ final InternalTypeInfo<RowData> getRightTypeInfo() {
+ return InternalTypeInfo.of(getRightInputRowType());
+ }
+
+ abstract Optional<int[]> getRightUpsertKey();
+
+ final RowDataKeySelector getRightUpsertKeySelector() {
+ return getUpsertKeySelector(getRightInputRowType(),
getRightUpsertKey().orElse(null));
+ }
+
+ abstract int[] getLeftJoinKeyIndices();
+
+ final RowDataKeySelector getLeftJoinKeySelector() {
+ return HandwrittenSelectorUtil.getRowDataSelector(
+ getLeftJoinKeyIndices(),
+ getLeftInputRowType().getChildren().toArray(new
LogicalType[0]));
+ }
+
+ abstract int[] getRightJoinKeyIndices();
+
+ final RowDataKeySelector getRightJoinKeySelector() {
+ return HandwrittenSelectorUtil.getRowDataSelector(
+ getRightJoinKeyIndices(),
+ getRightInputRowType().getChildren().toArray(new
LogicalType[0]));
+ }
+
+ final RowType getOutputRowType() {
+ return RowType.of(
+ Stream.concat(
+
getLeftInputRowType().getChildren().stream(),
+
getRightInputRowType().getChildren().stream())
+ .toArray(LogicalType[]::new),
+ Stream.concat(
+
getLeftInputRowType().getFieldNames().stream(),
+
getRightInputRowType().getFieldNames().stream())
+ .toArray(String[]::new));
+ }
+
+ final int[] getOutputFieldIndices() {
+ return IntStream.range(0,
getOutputRowType().getFieldCount()).toArray();
+ }
+
+ private RowDataKeySelector getUpsertKeySelector(
+ RowType rowType, @Nullable int[] upsertKey) {
+ if (upsertKey == null) {
+ upsertKey = IntStream.range(0,
rowType.getFieldCount()).toArray();
+ }
+ return HandwrittenSelectorUtil.getRowDataSelector(
+ upsertKey, rowType.getChildren().toArray(new
LogicalType[0]));
+ }
+ }
+
+ /**
+ * Mock sql like the following.
+ *
+ * <pre>
+ * CREATE TABLE leftSrc(
+ * left_value INT,
+ * left_jk1 BOOLEAN,
+ * left_jk2_index STRING,
+ * INDEX(left_jk2_index)
+ * )
+ * </pre>
+ *
+ * <pre>
+ * CREATE TABLE rightSrc(
+ * right_jk2 STRING,
+ * right_value INT,
+ * right_jk1_index BOOLEAN,
+ * INDEX(right_jk1_index)
+ * )
+ * </pre>
+ *
+ * <pre>
+ * select * from leftSrc join rightSrc
+ * on leftSrc.left_jk1 = rightSrc.right_jk1_index
+ * and leftSrc.left_jk2_index = rightSrc.right_jk2
+ * </pre>
+ */
+ private static class LogLogTableJoinTestSpec extends AbstractTestSpec {
+
+ private static final LogLogTableJoinTestSpec INSTANCE = new
LogLogTableJoinTestSpec();
+
+ @Override
+ RowType getLeftInputRowType() {
+ return RowType.of(
+ new LogicalType[] {new IntType(), new BooleanType(),
VarCharType.STRING_TYPE},
+ new String[] {"left_value", "left_jk1", "left_jk2_index"});
+ }
+
+ @Override
+ RowType getRightInputRowType() {
+ return RowType.of(
+ new LogicalType[] {VarCharType.STRING_TYPE, new IntType(),
new BooleanType()},
+ new String[] {"right_jk2", "right_value",
"right_jk1_index"});
+ }
+
+ @Override
+ Optional<int[]> getLeftUpsertKey() {
+ return Optional.empty();
+ }
+
+ @Override
+ Optional<int[]> getRightUpsertKey() {
+ return Optional.empty();
+ }
+
+ @Override
+ int[] getLeftJoinKeyIndices() {
+ return new int[] {1, 2};
+ }
+
+ @Override
+ int[] getRightJoinKeyIndices() {
+ return new int[] {2, 0};
+ }
+ }
+
+ /**
+ * Mock sql like the following.
+ *
+ * <pre>
+ * CREATE TABLE leftSrc(
+ * left_value INT,
+ * left_pk1 BOOLEAN,
+ * left_pk2_jk_index STRING,
+ * PRIMARY KEY (left_pk1, left_pk2_jk_index) NOT ENFORCED
+ * INDEX(left_pk2_jk_index)
+ * )
+ * </pre>
+ *
+ * <pre>
+ * CREATE TABLE rightSrc(
+ * right_pk2_jk_index STRING,
+ * right_value INT,
+ * right_pk1 BOOLEAN,
+ * PRIMARY KEY (right_pk2_jk_index, right_pk1) NOT ENFORCED
+ * INDEX(right_pk2_jk_index)
+ * )
+ * </pre>
+ *
+ * <pre>
+ * select * from leftSrc join rightSrc
+ * on leftSrc.left_pk2_jk_index = rightSrc.right_pk2_jk_index
+ * </pre>
+ */
+ private static class PkPkTableJoinTestSpec extends AbstractTestSpec {
+
+ private static final PkPkTableJoinTestSpec INSTANCE = new
PkPkTableJoinTestSpec();
+
+ @Override
+ RowType getLeftInputRowType() {
+ return RowType.of(
+ new LogicalType[] {new IntType(), new BooleanType(),
VarCharType.STRING_TYPE},
+ new String[] {"left_value", "left_pk1",
"left_pk2_jk_index"});
+ }
+
+ @Override
+ RowType getRightInputRowType() {
+ return RowType.of(
+ new LogicalType[] {VarCharType.STRING_TYPE, new IntType(),
new BooleanType()},
+ new String[] {"right_pk2_jk_index", "right_value",
"right_pk1"});
+ }
+
+ @Override
+ Optional<int[]> getLeftUpsertKey() {
+ return Optional.of(new int[] {1, 2});
+ }
+
+ @Override
+ Optional<int[]> getRightUpsertKey() {
+ return Optional.of(new int[] {0, 2});
+ }
+
+ @Override
+ int[] getLeftJoinKeyIndices() {
+ return new int[] {2};
+ }
+
+ @Override
+ int[] getRightJoinKeyIndices() {
+ return new int[] {0};
+ }
+ }
}