beyond1920 commented on a change in pull request #16620:
URL: https://github.com/apache/flink/pull/16620#discussion_r682345064



##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java
##########
@@ -544,11 +595,110 @@ private RelNode convertAggInput(Aggregate agg) {
                 .collect(Collectors.toList());
     }
 
+    private FlinkLogicalWindowAggregate 
visitWindowAggregate(FlinkLogicalWindowAggregate agg) {
+        RelNode newInput = convertAggInput(agg);
+        List<AggregateCall> updatedAggCalls = convertAggregateCalls(agg);
+        LogicalWindow oldWindow = agg.getWindow();
+        Seq<PlannerNamedWindowProperty> oldNamedProperties = 
agg.getNamedProperties();
+        FieldReferenceExpression oldTimeAttribute = 
agg.getWindow().timeAttribute();
+        LogicalType oldTimeAttributeType = 
oldTimeAttribute.getOutputDataType().getLogicalType();
+        boolean isRowtimeIndicator = 
LogicalTypeChecks.isRowtimeAttribute(oldTimeAttributeType);
+        boolean convertedToRowtimeTimestampLtz;
+        if (!isRowtimeIndicator) {
+            convertedToRowtimeTimestampLtz = false;
+        } else {
+            int timeIndicatorIdx = oldTimeAttribute.getFieldIndex();
+            RelDataType oldType =
+                    
agg.getInput().getRowType().getFieldList().get(timeIndicatorIdx).getType();
+            RelDataType newType =
+                    
newInput.getRowType().getFieldList().get(timeIndicatorIdx).getType();
+            convertedToRowtimeTimestampLtz =
+                    isTimestampLtzType(newType) && 
!isTimestampLtzType(oldType);
+        }
+        LogicalWindow newWindow;
+        Seq<PlannerNamedWindowProperty> newNamedProperties;
+        if (convertedToRowtimeTimestampLtz) {
+            // MATCH_ROWTIME may be converted from rowtime attribute to 
timestamp_ltz rowtime
+            // attribute, if time indicator of current window aggregate 
depends on input
+            // MATCH_ROWTIME, we should rewrite logicalWindow and 
namedProperties.
+            LogicalType newTimestampLtzType =
+                    new LocalZonedTimestampType(
+                            oldTimeAttributeType.isNullable(), 
TimestampKind.ROWTIME, 3);
+            FieldReferenceExpression newFieldRef =
+                    new FieldReferenceExpression(
+                            oldTimeAttribute.getName(),
+                            fromLogicalTypeToDataType(newTimestampLtzType),
+                            oldTimeAttribute.getInputIndex(),
+                            oldTimeAttribute.getFieldIndex());
+            PlannerWindowReference newAlias =
+                    new PlannerWindowReference(
+                            oldWindow.aliasAttribute().getName(), 
newTimestampLtzType);
+            if (oldWindow instanceof TumblingGroupWindow) {
+                TumblingGroupWindow window = (TumblingGroupWindow) oldWindow;
+                newWindow = new TumblingGroupWindow(newAlias, newFieldRef, 
window.size());
+            } else if (oldWindow instanceof SlidingGroupWindow) {
+                SlidingGroupWindow window = (SlidingGroupWindow) oldWindow;
+                newWindow =
+                        new SlidingGroupWindow(
+                                newAlias, newFieldRef, window.size(), 
window.slide());
+            } else if (oldWindow instanceof SessionGroupWindow) {
+                SessionGroupWindow window = (SessionGroupWindow) oldWindow;
+                newWindow = new SessionGroupWindow(newAlias, newFieldRef, 
window.gap());
+            } else {
+                throw new TableException(
+                        String.format(
+                                "This is a bug and should not happen. Please 
file an issue. Invalid window %s.",
+                                oldWindow.getClass().getSimpleName()));
+            }
+            List<PlannerNamedWindowProperty> newNamedPropertiesList =
+                    
JavaConverters.seqAsJavaListConverter(oldNamedProperties).asJava().stream()
+                            .map(
+                                    namedProperty -> {
+                                        if (namedProperty.getProperty()
+                                                instanceof 
PlannerRowtimeAttribute) {
+                                            return new 
PlannerNamedWindowProperty(
+                                                    namedProperty.getName(),
+                                                    new 
PlannerRowtimeAttribute(newAlias));
+                                        } else {
+                                            return namedProperty;
+                                        }
+                                    })
+                            .collect(Collectors.toList());
+            newNamedProperties =
+                    
JavaConverters.iterableAsScalaIterableConverter(newNamedPropertiesList)
+                            .asScala()
+                            .toSeq();
+        } else {
+            newWindow = oldWindow;
+            newNamedProperties = oldNamedProperties;
+        }
+        return new FlinkLogicalWindowAggregate(
+                agg.getCluster(),
+                agg.getTraitSet(),
+                newInput,
+                agg.getGroupSet(),
+                updatedAggCalls,
+                newWindow,
+                newNamedProperties);
+    }
+
+    private RelNode visitInvalidRel(RelNode node) {
+        throw new TableException(
+                String.format(
+                        "This is a bug and should not happen. Please file an 
issue. Unknown node %s.",
+                        node.getRelTypeName()));
+    }
+
     // 
----------------------------------------------------------------------------------------
     //                                       Utility
     // 
----------------------------------------------------------------------------------------
 
     private RelNode materializeProcTime(RelNode node) {
+        // If input is empty values, ignore materialize
+        if (node instanceof FlinkLogicalValues
+                && FlinkLogicalValues.isEmpty((FlinkLogicalValues) node)) {
+            return node;
+        }

Review comment:
       In the original version, after time indicator materialize, 
PRUNE_EMPTY_RULES would prune project with empty values as input node.
   After the refactor, if we add a calc to convert proc_time empty values, 
there is no rules to reduce this new added calc. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to