lincoln-lil commented on code in PR #20393:
URL: https://github.com/apache/flink/pull/20393#discussion_r935658039
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalIncrementalGroupAggregate.scala:
##########
@@ -62,14 +62,19 @@ class StreamPhysicalIncrementalGroupAggregate(
inputRel: RelNode,
val partialAggGrouping: Array[Int],
val partialAggCalls: Array[AggregateCall],
- val finalAggGrouping: Array[Int],
Review Comment:
Do you mean the attribute name in `StreamPhysicalGroupAggregateBase`?
I was thinking the partial-final is the internal concept for two-phase
aggregate, so tend to omit the 'final' prefix in the base class which I think
can both represents
grouping in one-phase agg and distinguishes from partialGroupings in
two-phase agg.
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RelTreeWriterImpl.scala:
##########
@@ -85,6 +87,19 @@ class RelTreeWriterImpl(
case _ => // ignore
}
+ if (withUpsertKey) rel match {
+ case streamRel: StreamPhysicalRel =>
+ val fmq =
FlinkRelMetadataQuery.reuseOrCreate(rel.getCluster.getMetadataQuery)
+ val upsertKeys = fmq.getUpsertKeys(streamRel)
+ if (null != upsertKeys && !upsertKeys.isEmpty) {
+ printValues.add(
+ Pair.of("upsertKeys", upsertKeys.map(bitset =>
bitset.toString).mkString(", ")))
Review Comment:
The previous idea using index for simplicity since there already has field
names, but upsertKey usually not too long, I'll update it
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala:
##########
@@ -585,6 +585,50 @@ object FlinkRexUtil {
}
false
}
+
+ /**
+ * Returns whether a given expression is deterministic in streaming
scenario, differs from
+ * calcite's [[RexUtil]], it considers both non-deterministic and dynamic
functions.
+ */
+ def isDeterministicInStreaming(e: RexNode): Boolean = try {
Review Comment:
I want to highlight it to avoid some one using it in batch processing due to
the different semantics on dynamic function.
I'll add separate tests for it.
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala:
##########
@@ -228,17 +229,29 @@ class FlinkRelMdUpsertKeys private extends
MetadataHandler[UpsertKeys] {
join: CommonPhysicalLookupJoin,
mq: RelMetadataQuery): util.Set[ImmutableBitSet] = {
val left = join.getInput
- val leftKeys = FlinkRelMetadataQuery.reuseOrCreate(mq).getUpsertKeys(left)
val leftType = left.getRowType
val leftJoinKeys = join.joinInfo.leftSet
+ // differs from regular join, here we do not filterKeys because there's no
shuffle on join keys
+ // by default.
+ val leftUpsertKeys =
FlinkRelMetadataQuery.reuseOrCreate(mq).getUpsertKeys(left)
+ val rightUniqueKeys =
FlinkRelMdUniqueKeys.INSTANCE.getUniqueKeysOfTemporalTable(join)
+
+ val rightUpsertKeys =
+ if (
+ (join.remainingCondition.isDefined &&
!FlinkRexUtil.isDeterministicInStreaming(
+ join.remainingCondition.get))
+ || (join.calcOnTemporalTable.isDefined &&
!FlinkRexUtil.isDeterministicInStreaming(
Review Comment:
good catch! I'll re-split the commits for next updates
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalLookupJoin.scala:
##########
@@ -279,6 +280,83 @@ abstract class CommonPhysicalLookupJoin(
constantLookupKeys.toMap[Int, LookupKey] ++ fieldRefLookupKeys.toMap[Int,
LookupKey]
}
+ /** Check if lookup key contains primary key, include constant lookup keys.
*/
+ def lookupKeyContainsPrimaryKey(): Boolean = {
+ val outputPkIdx = getOutputPrimaryKeyIndexes
+ // use allLookupKeys instead of joinInfo.rightSet because there may exists
constant
+ // lookup key(s) which are not included in joinInfo.rightKeys.
+ outputPkIdx.nonEmpty && outputPkIdx.forall(index =>
allLookupKeys.contains(index))
+ }
+
+ /** Get final output pk indexes if exists, otherwise will get empty. */
+ def getOutputPrimaryKeyIndexes: Array[Int] = {
Review Comment:
this output indexes is the final one which may has a project inside, not
directly on a temporal table, so I didn't add the temporalTable into the name.
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelOptUtil.scala:
##########
@@ -80,7 +81,8 @@ object FlinkRelOptUtil {
withIdPrefix,
withChangelogTraits,
withRowType,
- withTreeStyle = true)
+ withTreeStyle = true,
+ withUpsertKey)
Review Comment:
I had the same feeling at first, but all attributes in `RelTreeWriterImpl`
has default values, so I appended the new `withUpsertKey` to the last position
of `RelTreeWriterImpl`.
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java:
##########
@@ -142,4 +142,33 @@ public class OptimizerConfigOptions {
.withDescription(
"When it is true, the optimizer will merge the
operators with pipelined shuffling "
+ "into a multiple input operator to
reduce shuffling and improve performance. Default value is true.");
+
+ @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+ public static final ConfigOption<NonDeterministicUpdateHandling>
+ TABLE_OPTIMIZER_NONDETERMINISTIC_UPDATE_HANDLING =
+ key("table.optimizer.non-deterministic-update.handling")
Review Comment:
This option aimed for resolving all NDU (non deterministic update) problems
in the long run, lookup join has the first priority in the goal.
For naming, the main consideration is similar to error-handling, NDU is also
one kind of errors, so I prefer handling than strategy, WDYT?
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RelTreeWriterImpl.scala:
##########
@@ -85,6 +87,19 @@ class RelTreeWriterImpl(
case _ => // ignore
}
+ if (withUpsertKey) rel match {
Review Comment:
Ok
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalIncrementalGroupAggregate.scala:
##########
@@ -62,14 +62,19 @@ class StreamPhysicalIncrementalGroupAggregate(
inputRel: RelNode,
val partialAggGrouping: Array[Int],
val partialAggCalls: Array[AggregateCall],
- val finalAggGrouping: Array[Int],
Review Comment:
Do you mean the attribute name in `StreamPhysicalGroupAggregateBase`?
I was thinking the partial-final is the internal concept for two-phase
aggregate, so tend to omit the 'final' prefix in the base class which I think
can both represents
grouping in one-phase agg and distinguishes from partialGroupings in
two-phase agg.
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalJoin.scala:
##########
@@ -80,4 +81,14 @@ abstract class CommonPhysicalJoin(
.item("select", getRowType.getFieldNames.mkString(", "))
}
+ def getUniqueKeys(input: RelNode, keys: Array[Int]): List[Array[Int]] = {
Review Comment:
this method was moved from `StreamPhysicalJoin`, so I didn't change the
original name (there're several related methods named `xxUniqueKeys`, maybe
better refact this in another pr)
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalLookupJoin.scala:
##########
@@ -279,6 +280,83 @@ abstract class CommonPhysicalLookupJoin(
constantLookupKeys.toMap[Int, LookupKey] ++ fieldRefLookupKeys.toMap[Int,
LookupKey]
}
+ /** Check if lookup key contains primary key, include constant lookup keys.
*/
+ def lookupKeyContainsPrimaryKey(): Boolean = {
+ val outputPkIdx = getOutputPrimaryKeyIndexes
+ // use allLookupKeys instead of joinInfo.rightSet because there may exists
constant
+ // lookup key(s) which are not included in joinInfo.rightKeys.
+ outputPkIdx.nonEmpty && outputPkIdx.forall(index =>
allLookupKeys.contains(index))
+ }
+
+ /** Get final output pk indexes if exists, otherwise will get empty. */
+ def getOutputPrimaryKeyIndexes: Array[Int] = {
+ val temporalPkIdxs = getPrimaryKeyIndexesOfTemporalTable
+
+ val outputPkIdx = if (!temporalPkIdxs.isEmpty) {
+ if (calcOnTemporalTable.isDefined) {
+ val program = calcOnTemporalTable.get
+ val outputMapping = program.getProjectList.asScala.zipWithIndex
+ .map { case (ref, index) => (index, program.expandLocalRef(ref)) }
+ .map {
+ case (outIndex, ref) =>
+ ref match {
+ case inputRef: RexInputRef => (inputRef.getIndex, outIndex)
+ case _ => (-1, -1)
+ }
+ }
+ .toMap
+ val outputPk = temporalPkIdxs.forall(outputMapping.contains)
+ if (outputPk) {
+ // remapping pk index
+ temporalPkIdxs.map(outputMapping.get(_).get)
+ } else {
+ Array[Int]()
+ }
+ } else {
+ temporalPkIdxs
+ }
+ } else {
+ // temporal table has no pk, no uk produces
+ Array[Int]()
+ }
+
+ outputPkIdx
+ }
+
+ private def getPrimaryKeyIndexesOfTemporalTable: Array[Int] = {
+ // get primary key columns of lookup table if exists
+ val pkColumns = getPrimaryKeyColumnsOfTemporalTable
+ if (pkColumns.isDefined) {
+ val newSchema = temporalTable.getRowType.getFieldNames
+ pkColumns.get.toArray().map(col => newSchema.indexOf(col))
+ } else {
+ Array[Int]()
+ }
+ }
+
+ private def getPrimaryKeyColumnsOfTemporalTable: Option[util.List[String]] =
{
+ val schema = temporalTable match {
+ case t: TableSourceTable =>
+
TableSchema.fromResolvedSchema(t.contextResolvedTable.getResolvedSchema)
Review Comment:
This is mainly due to the desire to obtain pk in a uniform way, but not
found proper method on the `LegacyTableSourceTable`. We can switch to use
ResolvedSchema after `LegacyTableSourceTable` was removed
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala:
##########
@@ -228,17 +229,29 @@ class FlinkRelMdUpsertKeys private extends
MetadataHandler[UpsertKeys] {
join: CommonPhysicalLookupJoin,
mq: RelMetadataQuery): util.Set[ImmutableBitSet] = {
val left = join.getInput
- val leftKeys = FlinkRelMetadataQuery.reuseOrCreate(mq).getUpsertKeys(left)
val leftType = left.getRowType
val leftJoinKeys = join.joinInfo.leftSet
+ // differs from regular join, here we do not filterKeys because there's no
shuffle on join keys
+ // by default.
+ val leftUpsertKeys =
FlinkRelMetadataQuery.reuseOrCreate(mq).getUpsertKeys(left)
+ val rightUniqueKeys =
FlinkRelMdUniqueKeys.INSTANCE.getUniqueKeysOfTemporalTable(join)
+
+ val rightUpsertKeys =
+ if (
+ (join.remainingCondition.isDefined &&
!FlinkRexUtil.isDeterministicInStreaming(
Review Comment:
make sense to me
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala:
##########
@@ -488,16 +488,36 @@ class FlinkRelMdUniqueKeys private extends
MetadataHandler[BuiltInMetadata.Uniqu
ignoreNulls: Boolean): util.Set[ImmutableBitSet] = {
val left = join.getInput
val leftUniqueKeys = mq.getUniqueKeys(left, ignoreNulls)
- val leftType = left.getRowType
- getJoinUniqueKeys(
- join.joinType,
- leftType,
- leftUniqueKeys,
- null,
- mq.areColumnsUnique(left, join.joinInfo.leftSet, ignoreNulls),
- // TODO get uniqueKeys from TableSchema of TableSource
+
+ if (leftUniqueKeys != null) {
Review Comment:
ok
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala:
##########
@@ -488,16 +488,36 @@ class FlinkRelMdUniqueKeys private extends
MetadataHandler[BuiltInMetadata.Uniqu
ignoreNulls: Boolean): util.Set[ImmutableBitSet] = {
val left = join.getInput
val leftUniqueKeys = mq.getUniqueKeys(left, ignoreNulls)
- val leftType = left.getRowType
- getJoinUniqueKeys(
- join.joinType,
- leftType,
- leftUniqueKeys,
- null,
- mq.areColumnsUnique(left, join.joinInfo.leftSet, ignoreNulls),
- // TODO get uniqueKeys from TableSchema of TableSource
+
+ if (leftUniqueKeys != null) {
+ val rightUniqueKeys = getUniqueKeysOfTemporalTable(join)
+
+ getJoinUniqueKeys(
+ join.joinType,
+ left.getRowType,
+ leftUniqueKeys,
+ rightUniqueKeys,
+ mq.areColumnsUnique(left, join.joinInfo.leftSet, ignoreNulls),
+ rightUniqueKeys != null)
+ } else {
null
- )
+ }
+ }
+
+ private[flink] def getUniqueKeysOfTemporalTable(
Review Comment:
The expected return values are different here, it'll be a little obscure to
use 'JSet[ImmutableBitSet] != null' for representing
lookupKeyContainsPrimaryKey of lookup join if unified to this method.
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalLookupJoin.scala:
##########
@@ -279,6 +280,83 @@ abstract class CommonPhysicalLookupJoin(
constantLookupKeys.toMap[Int, LookupKey] ++ fieldRefLookupKeys.toMap[Int,
LookupKey]
}
+ /** Check if lookup key contains primary key, include constant lookup keys.
*/
+ def lookupKeyContainsPrimaryKey(): Boolean = {
+ val outputPkIdx = getOutputPrimaryKeyIndexes
+ // use allLookupKeys instead of joinInfo.rightSet because there may exists
constant
+ // lookup key(s) which are not included in joinInfo.rightKeys.
+ outputPkIdx.nonEmpty && outputPkIdx.forall(index =>
allLookupKeys.contains(index))
+ }
+
+ /** Get final output pk indexes if exists, otherwise will get empty. */
+ def getOutputPrimaryKeyIndexes: Array[Int] = {
+ val temporalPkIdxs = getPrimaryKeyIndexesOfTemporalTable
+
+ val outputPkIdx = if (!temporalPkIdxs.isEmpty) {
+ if (calcOnTemporalTable.isDefined) {
Review Comment:
make sense
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]