This is an automated email from the ASF dual-hosted git repository. godfrey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 347316ea639 [FLINK-28787][table-planner] Rename getUniqueKeys to getUpsertKeys in CommonPhysicalJoin 347316ea639 is described below commit 347316ea6394b24c4471aa8616f2632e126f733d Author: lincoln lee <lincoln.8...@gmail.com> AuthorDate: Tue Sep 6 16:58:08 2022 +0800 [FLINK-28787][table-planner] Rename getUniqueKeys to getUpsertKeys in CommonPhysicalJoin This closes #20763 --- .../plan/nodes/exec/stream/StreamExecJoin.java | 35 ++++++++++++---------- .../StreamNonDeterministicUpdatePlanVisitor.java | 4 +-- .../nodes/physical/common/CommonPhysicalJoin.scala | 3 +- .../nodes/physical/stream/StreamPhysicalJoin.scala | 10 +++---- .../JoinJsonPlanTest_jsonplan/testInnerJoin.out | 2 -- .../testInnerJoinWithEqualPk.out | 4 +-- .../testInnerJoinWithPk.out | 4 +-- .../testLeftJoinNonEqui.out | 2 -- 8 files changed, 31 insertions(+), 33 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecJoin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecJoin.java index b8a45b8d6f1..dbf399d8ae5 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecJoin.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecJoin.java @@ -48,6 +48,7 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.flink.shaded.guava30.com.google.common.collect.Lists; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; import java.util.List; @@ -73,23 +74,25 @@ public class StreamExecJoin extends ExecNodeBase<RowData> public static final String JOIN_TRANSFORMATION = "join"; public static final String FIELD_NAME_JOIN_SPEC = "joinSpec"; - public static final String FIELD_NAME_LEFT_UNIQUE_KEYS = "leftUniqueKeys"; - public static final String FIELD_NAME_RIGHT_UNIQUE_KEYS = "rightUniqueKeys"; + public static final String FIELD_NAME_LEFT_UPSERT_KEYS = "leftUpsertKeys"; + public static final String FIELD_NAME_RIGHT_UPSERT_KEYS = "rightUpsertKeys"; @JsonProperty(FIELD_NAME_JOIN_SPEC) private final JoinSpec joinSpec; - @JsonProperty(FIELD_NAME_LEFT_UNIQUE_KEYS) - private final List<int[]> leftUniqueKeys; + @JsonProperty(FIELD_NAME_LEFT_UPSERT_KEYS) + @JsonInclude(JsonInclude.Include.NON_DEFAULT) + private final List<int[]> leftUpsertKeys; - @JsonProperty(FIELD_NAME_RIGHT_UNIQUE_KEYS) - private final List<int[]> rightUniqueKeys; + @JsonProperty(FIELD_NAME_RIGHT_UPSERT_KEYS) + @JsonInclude(JsonInclude.Include.NON_DEFAULT) + private final List<int[]> rightUpsertKeys; public StreamExecJoin( ReadableConfig tableConfig, JoinSpec joinSpec, - List<int[]> leftUniqueKeys, - List<int[]> rightUniqueKeys, + List<int[]> leftUpsertKeys, + List<int[]> rightUpsertKeys, InputProperty leftInputProperty, InputProperty rightInputProperty, RowType outputType, @@ -99,8 +102,8 @@ public class StreamExecJoin extends ExecNodeBase<RowData> ExecNodeContext.newContext(StreamExecJoin.class), ExecNodeContext.newPersistedConfig(StreamExecJoin.class, tableConfig), joinSpec, - leftUniqueKeys, - rightUniqueKeys, + leftUpsertKeys, + rightUpsertKeys, Lists.newArrayList(leftInputProperty, rightInputProperty), outputType, description); @@ -112,16 +115,16 @@ public class StreamExecJoin extends ExecNodeBase<RowData> @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context, @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig persistedConfig, @JsonProperty(FIELD_NAME_JOIN_SPEC) JoinSpec joinSpec, - @JsonProperty(FIELD_NAME_LEFT_UNIQUE_KEYS) List<int[]> leftUniqueKeys, - @JsonProperty(FIELD_NAME_RIGHT_UNIQUE_KEYS) List<int[]> rightUniqueKeys, + @JsonProperty(FIELD_NAME_LEFT_UPSERT_KEYS) List<int[]> leftUpsertKeys, + @JsonProperty(FIELD_NAME_RIGHT_UPSERT_KEYS) List<int[]> rightUpsertKeys, @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> inputProperties, @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType, @JsonProperty(FIELD_NAME_DESCRIPTION) String description) { super(id, context, persistedConfig, inputProperties, outputType, description); checkArgument(inputProperties.size() == 2); this.joinSpec = checkNotNull(joinSpec); - this.leftUniqueKeys = leftUniqueKeys; - this.rightUniqueKeys = rightUniqueKeys; + this.leftUpsertKeys = leftUpsertKeys; + this.rightUpsertKeys = rightUpsertKeys; } @Override @@ -149,7 +152,7 @@ public class StreamExecJoin extends ExecNodeBase<RowData> planner.getFlinkContext().getClassLoader(), leftTypeInfo, leftJoinKey, - leftUniqueKeys); + leftUpsertKeys); final InternalTypeInfo<RowData> rightTypeInfo = InternalTypeInfo.of(rightType); final JoinInputSideSpec rightInputSpec = @@ -157,7 +160,7 @@ public class StreamExecJoin extends ExecNodeBase<RowData> planner.getFlinkContext().getClassLoader(), rightTypeInfo, rightJoinKey, - rightUniqueKeys); + rightUpsertKeys); GeneratedJoinCondition generatedCondition = JoinUtil.generateConditionFunction( diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java index 2caa2bb951b..9aff0a7a014 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java @@ -498,7 +498,7 @@ public class StreamNonDeterministicUpdatePlanVisitor { join.joinSpec().getLeftKeys(), // TODO remove this conversion when scala-free was total done. scala.collection.JavaConverters.seqAsJavaList( - join.getUniqueKeys(leftRel, join.joinSpec().getLeftKeys()))); + join.getUpsertKeys(leftRel, join.joinSpec().getLeftKeys()))); StreamPhysicalRel newRight = visitJoinChild( requireDeterminism, @@ -509,7 +509,7 @@ public class StreamNonDeterministicUpdatePlanVisitor { join.joinSpec().getRightKeys(), // TODO remove this conversion when scala-free was total done. scala.collection.JavaConverters.seqAsJavaList( - join.getUniqueKeys(rightRel, join.joinSpec().getRightKeys()))); + join.getUpsertKeys(rightRel, join.joinSpec().getRightKeys()))); return (StreamPhysicalRel) join.copy( diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalJoin.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalJoin.scala index bee5b1b00f2..871eccf00ae 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalJoin.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalJoin.scala @@ -81,8 +81,7 @@ abstract class CommonPhysicalJoin( .item("select", getRowType.getFieldNames.mkString(", ")) } - def getUniqueKeys(input: RelNode, keys: Array[Int]): List[Array[Int]] = { - // TODO update method name in FLINK-28787 + def getUpsertKeys(input: RelNode, keys: Array[Int]): List[Array[Int]] = { val upsertKeys = FlinkRelMetadataQuery .reuseOrCreate(cluster.getMetadataQuery) .getUpsertKeysInKeyGroupRange(input, keys) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalJoin.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalJoin.scala index ed3e40b52ca..60eb40d14d9 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalJoin.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalJoin.scala @@ -67,7 +67,7 @@ class StreamPhysicalJoin( def inputUniqueKeyContainsJoinKey(inputOrdinal: Int): Boolean = { val input = getInput(inputOrdinal) val joinKeys = if (inputOrdinal == 0) joinSpec.getLeftKeys else joinSpec.getRightKeys - val inputUniqueKeys = getUniqueKeys(input, joinKeys) + val inputUniqueKeys = getUpsertKeys(input, joinKeys) if (inputUniqueKeys != null) { inputUniqueKeys.exists(uniqueKey => joinKeys.forall(uniqueKey.contains(_))) } else { @@ -96,7 +96,7 @@ class StreamPhysicalJoin( unwrapClassLoader(left), InternalTypeInfo.of(FlinkTypeFactory.toLogicalRowType(left.getRowType)), joinSpec.getLeftKeys, - getUniqueKeys(left, joinSpec.getLeftKeys) + getUpsertKeys(left, joinSpec.getLeftKeys) ) ) .item( @@ -105,7 +105,7 @@ class StreamPhysicalJoin( unwrapClassLoader(right), InternalTypeInfo.of(FlinkTypeFactory.toLogicalRowType(right.getRowType)), joinSpec.getRightKeys, - getUniqueKeys(right, joinSpec.getRightKeys) + getUpsertKeys(right, joinSpec.getRightKeys) ) ) } @@ -119,8 +119,8 @@ class StreamPhysicalJoin( new StreamExecJoin( unwrapTableConfig(this), joinSpec, - getUniqueKeys(left, joinSpec.getLeftKeys), - getUniqueKeys(right, joinSpec.getRightKeys), + getUpsertKeys(left, joinSpec.getLeftKeys), + getUpsertKeys(right, joinSpec.getRightKeys), InputProperty.DEFAULT, InputProperty.DEFAULT, FlinkTypeFactory.toLogicalRowType(getRowType), diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoin.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoin.out index 0355fb03137..243f4d203ec 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoin.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoin.out @@ -116,8 +116,6 @@ "filterNulls" : [ true ], "nonEquiCondition" : null }, - "leftUniqueKeys" : [ ], - "rightUniqueKeys" : [ ], "inputProperties" : [ { "requiredDistribution" : { "type" : "UNKNOWN" diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoinWithEqualPk.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoinWithEqualPk.out index 5d80fb2baac..141b22ea160 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoinWithEqualPk.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoinWithEqualPk.out @@ -184,8 +184,8 @@ "filterNulls" : [ true ], "nonEquiCondition" : null }, - "leftUniqueKeys" : [ [ 0 ] ], - "rightUniqueKeys" : [ [ 0 ] ], + "leftUpsertKeys" : [ [ 0 ] ], + "rightUpsertKeys" : [ [ 0 ] ], "inputProperties" : [ { "requiredDistribution" : { "type" : "UNKNOWN" diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoinWithPk.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoinWithPk.out index 149408c5e1c..b0980c85f52 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoinWithPk.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoinWithPk.out @@ -246,8 +246,8 @@ "filterNulls" : [ true ], "nonEquiCondition" : null }, - "leftUniqueKeys" : [ [ 1 ] ], - "rightUniqueKeys" : [ [ 1 ] ], + "leftUpsertKeys" : [ [ 1 ] ], + "rightUpsertKeys" : [ [ 1 ] ], "inputProperties" : [ { "requiredDistribution" : { "type" : "UNKNOWN" diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testLeftJoinNonEqui.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testLeftJoinNonEqui.out index 27df7d82b1c..27ad53723c9 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testLeftJoinNonEqui.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testLeftJoinNonEqui.out @@ -130,8 +130,6 @@ "type" : "BOOLEAN" } }, - "leftUniqueKeys" : [ ], - "rightUniqueKeys" : [ ], "inputProperties" : [ { "requiredDistribution" : { "type" : "UNKNOWN"