pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r247926320
########## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala ########## @@ -215,6 +217,19 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle { materializerUtils.projectAndMaterializeFields(rewrittenTemporalJoin, indicesToMaterialize) } + def visit(upsertToRetraction: LogicalUpsertToRetraction): RelNode = { Review comment: I think this code is incorrect. First, you didn't recursively call the `RelTimeIndicatorConverter` on the `upsertToRetraction` input: ``` val rewrittenInput = upsertToRetraction.accept(this) ``` Secondly, `LogicalUpsertToRetraction` doesn't have to materialize any fields: it doesn't invalidate watermarks guarantees (all records after the watermark have a rowtime value above the watermark). In other words, it preserves rowtime fields/watermark guarantees from it's input. A rowtime field must be materialized if this contract between rowtime & watermark is violated, like for example in the non windowed joins (rowtime fields in the join result can be older than the watermark). For `LogicalUpsertToRetraction` that's not the case, isn't it? If I'm correct (and please correct me if I'm wrong), the code should look just like this: ``` def visit(...): val rewrittenInput = upsertToRetraction.accept(this) return upsertToRetraction.copy(upsertToRetraction.getTraitSet, Seq(rewrittenInput)) ``` I think the second issues would be caught by a test that uses time windowed join/aggregation on the upsert source (this should work, but in the current version of this PR I would expect it to fail). ``` SELECT key, max(value) FROM UpsertTable GROUP BY TUMBLE(rowTime1, INTERVAL '1' DAY), key ``` First issue is probably difficult to test/trigger since at the moment when `RelTimeIndicatorConverter` is executed, `LogicalUpsertToRetraction` is always just after the `TableScan` node. However if we changed the order of `CalcUpsertToRetractionTransposeRule` and `RelTimeIndicatorConverter` current code would fail to materialize fields for queries like: ``` SELECT key, max(value) FROM ( SELECT rowTime1 + 1 as rowTime2, key, value FROM UpsertTable) GROUP BY TUMBLE(rowTime2, INTERVAL '1' DAY), key ``` Even if I'm wrong, could you add those two unit tests? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services