pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r247926320
 
 

 ##########
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
 ##########
 @@ -215,6 +217,19 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) 
extends RelShuttle {
     materializerUtils.projectAndMaterializeFields(rewrittenTemporalJoin, 
indicesToMaterialize)
   }
 
+  def visit(upsertToRetraction: LogicalUpsertToRetraction): RelNode = {
 
 Review comment:
   I think this code is incorrect. 
   
   First, you didn't recursively call the `RelTimeIndicatorConverter` on the 
`upsertToRetraction` input:
   ```
   val rewrittenInput = upsertToRetraction.accept(this)
   ```
   
   Secondly, `LogicalUpsertToRetraction` doesn't have to materialize any 
fields: it doesn't invalidate watermarks guarantees (all records after the 
watermark have a rowtime value above the watermark). In other words, it 
preserves rowtime fields/watermark guarantees from it's input. A rowtime field 
must be materialized if this contract between rowtime & watermark is violated, 
like for example in the non windowed joins (rowtime fields in the join result 
can be older than the watermark).
   
   For `LogicalUpsertToRetraction` that's not the case, isn't it? If I'm 
correct (and please correct me if I'm wrong), the code should look just like 
this:
   ```
   def visit(...):
     val rewrittenInput = upsertToRetraction.accept(this)
     return upsertToRetraction.copy(upsertToRetraction.getTraitSet, 
Seq(rewrittenInput))
   ```
   
   I think the second issues would be caught by a test that uses time windowed 
join/aggregation on the upsert source (this should work, but in the current 
version of this PR I would expect it to fail). 
   ```
   SELECT 
     key, max(value) 
   FROM
     UpsertTable
   GROUP BY 
     TUMBLE(rowTime1, INTERVAL '1' DAY), key
   ```
   
   First issue is probably difficult to test/trigger since at the moment when 
`RelTimeIndicatorConverter` is executed, `LogicalUpsertToRetraction` is always 
just after the `TableScan` node. However if we changed the order of 
`CalcUpsertToRetractionTransposeRule` and `RelTimeIndicatorConverter` current 
code would fail to materialize fields for queries like:
   
   ```
   SELECT 
     key, max(value) 
   FROM (
     SELECT rowTime1 + 1 as rowTime2, key, value FROM UpsertTable)
   GROUP BY 
     TUMBLE(rowTime2, INTERVAL '1' DAY), key
   ```
   
   Even if I'm wrong, could you add those two unit tests?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to