xuyangzhong commented on code in PR #27372:
URL: https://github.com/apache/flink/pull/27372#discussion_r3192916808


##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.scala:
##########
@@ -815,4 +815,303 @@ class JoinTest extends TableTestBase {
         "Key: 'table.exec.mini-batch.size' , default: -1 (fallback keys: []) 
must be > 0.")
       .isInstanceOf[IllegalArgumentException]
   }
+
+  // Tests for FLINK-38753: Enrich upsert keys by equiv expressions in joins

Review Comment:
   nit remove this comment



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala:
##########
@@ -44,6 +44,7 @@ import scala.collection.JavaConversions._
  * for the standard logical algebra.
  */
 class FlinkRelMdUpsertKeys private extends MetadataHandler[UpsertKeys] {
+  private val MaxGeneratedEnrichedKeys = 128

Review Comment:
   In fact, as long as a node contains a filter, the upsert key can be enriched 
through equivalence relations. However, the biggest issue is that this would 
lead to too many combinations of upsert keys. I imagine you've considered this 
problem as well, which is why you used `MaxGeneratedEnrichedKeys` here to 
prevent the upsert key from exploding.
   
   Perhaps we could take a different approach — introduce a new metadata, 
FlinkRelMdEquivalenceColumns, and then leverage the new equivalence column 
information at the places where upsert keys are consumed for optimization (See 
more about 
FlinkChangelogModeInferenceProgram#canUpsertKeysWithImmutableColsSatisfyPk). 
While this would add some complexity to the call sites that use upsert keys, it 
would not be subject to the limitation of upsert key explosion. What do you 
think?



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeysTest.scala:
##########
@@ -399,23 +399,23 @@ class FlinkRelMdUpsertKeysTest extends 
FlinkRelMdHandlerTestBase {
   @Test
   def testGetUpsertKeysOnJoin(): Unit = {
     assertEquals(
-      toBitSet(Array(1), Array(5), Array(1, 5), Array(5, 6), Array(1, 5, 6)),
+      toBitSet(Array(1), Array(5), Array(1, 5), Array(1, 6), Array(5, 6), 
Array(1, 5, 6)),
       mq.getUpsertKeys(logicalInnerJoinOnUniqueKeys).toSet)
     assertEquals(toBitSet(), 
mq.getUpsertKeys(logicalInnerJoinNotOnUniqueKeys).toSet)
     assertEquals(toBitSet(), 
mq.getUpsertKeys(logicalInnerJoinOnRHSUniqueKeys).toSet)
     assertEquals(toBitSet(), 
mq.getUpsertKeys(logicalInnerJoinWithoutEquiCond).toSet)
     assertEquals(toBitSet(), 
mq.getUpsertKeys(logicalInnerJoinWithEquiAndNonEquiCond).toSet)
 
     assertEquals(
-      toBitSet(Array(1), Array(1, 5), Array(1, 5, 6)),
+      toBitSet(Array(1), Array(1, 5), Array(1, 6), Array(1, 5, 6)),
       mq.getUpsertKeys(logicalLeftJoinOnUniqueKeys).toSet)
     assertEquals(toBitSet(), 
mq.getUpsertKeys(logicalLeftJoinNotOnUniqueKeys).toSet)
     assertEquals(toBitSet(), 
mq.getUpsertKeys(logicalLeftJoinOnRHSUniqueKeys).toSet)
     assertEquals(toBitSet(), 
mq.getUpsertKeys(logicalLeftJoinWithoutEquiCond).toSet)
     assertEquals(toBitSet(), 
mq.getUpsertKeys(logicalLeftJoinWithEquiAndNonEquiCond).toSet)
 
     assertEquals(
-      toBitSet(Array(5), Array(1, 5), Array(5, 6), Array(1, 5, 6)),
+      toBitSet(Array(5), Array(5, 6), Array(1, 5), Array(1, 5, 6)),

Review Comment:
   revert this change



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala:
##########
@@ -359,19 +360,73 @@ class FlinkRelMdUpsertKeys private extends 
MetadataHandler[UpsertKeys] {
     val rightKeys = fmq.getUpsertKeys(right)
     val leftImmutableColumns = fmq.getImmutableColumns(left)
     val rightImmutableColumns = fmq.getImmutableColumns(right)
+    val leftFieldCount = left.getRowType.getFieldCount
 
-    FlinkRelMdUniqueKeys.INSTANCE.getJoinUniqueKeys(
+    val baseKeys = FlinkRelMdUniqueKeys.INSTANCE.getJoinUniqueKeys(
       joinRelType,
-      left.getRowType.getFieldCount,
-      // Retain only keys whose columns are contained in the join's equi-join 
columns

Review Comment:
   No need to remove these comments.



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.scala:
##########
@@ -815,4 +815,303 @@ class JoinTest extends TableTestBase {
         "Key: 'table.exec.mini-batch.size' , default: -1 (fallback keys: []) 
must be > 0.")
       .isInstanceOf[IllegalArgumentException]
   }
+
+  // Tests for FLINK-38753: Enrich upsert keys by equiv expressions in joins
+  @Test
+  def testJoinUpsertKeyEnrichmentInnerJoinBasic(): Unit = {
+    // Basic case from FLINK-38753: src1 pk {a1}, src2 pk {a2, b2}, join a1 = 
a2

Review Comment:
   nit remove ``Basic case from FLINK-38753:



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.scala:
##########
@@ -815,4 +815,303 @@ class JoinTest extends TableTestBase {
         "Key: 'table.exec.mini-batch.size' , default: -1 (fallback keys: []) 
must be > 0.")
       .isInstanceOf[IllegalArgumentException]
   }
+
+  // Tests for FLINK-38753: Enrich upsert keys by equiv expressions in joins
+  @Test
+  def testJoinUpsertKeyEnrichmentInnerJoinBasic(): Unit = {

Review Comment:
   nit extract the same logic from these new tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to