[GitHub] [flink] beyond1920 commented on a change in pull request #16620: [FLINK-23246][table-planner] Refactor the time indicator materialization

2021-08-05 Thread GitBox


beyond1920 commented on a change in pull request #16620:
URL: https://github.com/apache/flink/pull/16620#discussion_r683078506



##
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java
##
@@ -544,11 +595,110 @@ private RelNode convertAggInput(Aggregate agg) {
 .collect(Collectors.toList());
 }
 
+private FlinkLogicalWindowAggregate 
visitWindowAggregate(FlinkLogicalWindowAggregate agg) {

Review comment:
   The case cover the branch must contain a Match with Match_ROWTIME, the 
successor is a WindowAggregate, which is exactly what 
`MatchRecognizeTest#testMatchRecognizeOnRowtimeLTZ` do. I think we don't need 
to add extra tests.

##
File path: 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.xml
##
@@ -207,7 +207,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], 
rowtime=[$4], rowNum=[$5])
 
 
   

[GitHub] [flink] beyond1920 commented on a change in pull request #16620: [FLINK-23246][table-planner] Refactor the time indicator materialization

2021-08-05 Thread GitBox


beyond1920 commented on a change in pull request #16620:
URL: https://github.com/apache/flink/pull/16620#discussion_r683259475



##
File path: 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.xml
##
@@ -207,7 +207,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], 
rowtime=[$4], rowNum=[$5])
 
 
   

[GitHub] [flink] beyond1920 commented on a change in pull request #16620: [FLINK-23246][table-planner] Refactor the time indicator materialization

2021-08-04 Thread GitBox


beyond1920 commented on a change in pull request #16620:
URL: https://github.com/apache/flink/pull/16620#discussion_r683105222



##
File path: 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.xml
##
@@ -207,7 +207,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], 
rowtime=[$4], rowNum=[$5])
 
 
   

[GitHub] [flink] beyond1920 commented on a change in pull request #16620: [FLINK-23246][table-planner] Refactor the time indicator materialization

2021-08-04 Thread GitBox


beyond1920 commented on a change in pull request #16620:
URL: https://github.com/apache/flink/pull/16620#discussion_r683078506



##
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java
##
@@ -544,11 +595,110 @@ private RelNode convertAggInput(Aggregate agg) {
 .collect(Collectors.toList());
 }
 
+private FlinkLogicalWindowAggregate 
visitWindowAggregate(FlinkLogicalWindowAggregate agg) {

Review comment:
   The case cover the branch must contain a Match with Match_ROWTIME, the 
successor is a WindowAggregate, which is exactly what 
`MatchRecognizeTest#testMatchRecognizeOnRowtimeLTZ` do. I think we don't need 
to add extra tests.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] beyond1920 commented on a change in pull request #16620: [FLINK-23246][table-planner] Refactor the time indicator materialization

2021-08-04 Thread GitBox


beyond1920 commented on a change in pull request #16620:
URL: https://github.com/apache/flink/pull/16620#discussion_r682437085



##
File path: 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.xml
##
@@ -207,7 +207,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], 
rowtime=[$4], rowNum=[$5])
 
 
   

[GitHub] [flink] beyond1920 commented on a change in pull request #16620: [FLINK-23246][table-planner] Refactor the time indicator materialization

2021-08-04 Thread GitBox


beyond1920 commented on a change in pull request #16620:
URL: https://github.com/apache/flink/pull/16620#discussion_r682407823



##
File path: 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/WindowRankTest.xml
##
@@ -212,8 +212,8 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[$
 
 
   

[GitHub] [flink] beyond1920 commented on a change in pull request #16620: [FLINK-23246][table-planner] Refactor the time indicator materialization

2021-08-04 Thread GitBox


beyond1920 commented on a change in pull request #16620:
URL: https://github.com/apache/flink/pull/16620#discussion_r682407014



##
File path: 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/WindowRankTest.xml
##
@@ -212,8 +212,8 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
rowtime=[$5], proctime=[$
 
 
   

[GitHub] [flink] beyond1920 commented on a change in pull request #16620: [FLINK-23246][table-planner] Refactor the time indicator materialization

2021-08-04 Thread GitBox


beyond1920 commented on a change in pull request #16620:
URL: https://github.com/apache/flink/pull/16620#discussion_r682406281



##
File path: 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.xml
##
@@ -207,7 +207,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], 
rowtime=[$4], rowNum=[$5])
 
 
   

[GitHub] [flink] beyond1920 commented on a change in pull request #16620: [FLINK-23246][table-planner] Refactor the time indicator materialization

2021-08-04 Thread GitBox


beyond1920 commented on a change in pull request #16620:
URL: https://github.com/apache/flink/pull/16620#discussion_r682406281



##
File path: 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.xml
##
@@ -207,7 +207,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], 
rowtime=[$4], rowNum=[$5])
 
 
   

[GitHub] [flink] beyond1920 commented on a change in pull request #16620: [FLINK-23246][table-planner] Refactor the time indicator materialization

2021-08-04 Thread GitBox


beyond1920 commented on a change in pull request #16620:
URL: https://github.com/apache/flink/pull/16620#discussion_r682347079



##
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java
##
@@ -544,11 +595,110 @@ private RelNode convertAggInput(Aggregate agg) {
 .collect(Collectors.toList());
 }
 
+private FlinkLogicalWindowAggregate 
visitWindowAggregate(FlinkLogicalWindowAggregate agg) {
+RelNode newInput = convertAggInput(agg);
+List updatedAggCalls = convertAggregateCalls(agg);
+LogicalWindow oldWindow = agg.getWindow();
+Seq oldNamedProperties = 
agg.getNamedProperties();
+FieldReferenceExpression oldTimeAttribute = 
agg.getWindow().timeAttribute();
+LogicalType oldTimeAttributeType = 
oldTimeAttribute.getOutputDataType().getLogicalType();
+boolean isRowtimeIndicator = 
LogicalTypeChecks.isRowtimeAttribute(oldTimeAttributeType);
+boolean convertedToRowtimeTimestampLtz;
+if (!isRowtimeIndicator) {
+convertedToRowtimeTimestampLtz = false;
+} else {
+int timeIndicatorIdx = oldTimeAttribute.getFieldIndex();
+RelDataType oldType =
+
agg.getInput().getRowType().getFieldList().get(timeIndicatorIdx).getType();
+RelDataType newType =
+
newInput.getRowType().getFieldList().get(timeIndicatorIdx).getType();
+convertedToRowtimeTimestampLtz =
+isTimestampLtzType(newType) && 
!isTimestampLtzType(oldType);
+}
+LogicalWindow newWindow;
+Seq newNamedProperties;
+if (convertedToRowtimeTimestampLtz) {
+// MATCH_ROWTIME may be converted from rowtime attribute to 
timestamp_ltz rowtime
+// attribute, if time indicator of current window aggregate 
depends on input
+// MATCH_ROWTIME, we should rewrite logicalWindow and 
namedProperties.
+LogicalType newTimestampLtzType =
+new LocalZonedTimestampType(
+oldTimeAttributeType.isNullable(), 
TimestampKind.ROWTIME, 3);
+FieldReferenceExpression newFieldRef =
+new FieldReferenceExpression(
+oldTimeAttribute.getName(),
+fromLogicalTypeToDataType(newTimestampLtzType),
+oldTimeAttribute.getInputIndex(),
+oldTimeAttribute.getFieldIndex());
+PlannerWindowReference newAlias =
+new PlannerWindowReference(
+oldWindow.aliasAttribute().getName(), 
newTimestampLtzType);
+if (oldWindow instanceof TumblingGroupWindow) {
+TumblingGroupWindow window = (TumblingGroupWindow) oldWindow;
+newWindow = new TumblingGroupWindow(newAlias, newFieldRef, 
window.size());
+} else if (oldWindow instanceof SlidingGroupWindow) {
+SlidingGroupWindow window = (SlidingGroupWindow) oldWindow;
+newWindow =
+new SlidingGroupWindow(
+newAlias, newFieldRef, window.size(), 
window.slide());
+} else if (oldWindow instanceof SessionGroupWindow) {
+SessionGroupWindow window = (SessionGroupWindow) oldWindow;
+newWindow = new SessionGroupWindow(newAlias, newFieldRef, 
window.gap());
+} else {
+throw new TableException(
+String.format(
+"This is a bug and should not happen. Please 
file an issue. Invalid window %s.",
+oldWindow.getClass().getSimpleName()));
+}
+List newNamedPropertiesList =
+
JavaConverters.seqAsJavaListConverter(oldNamedProperties).asJava().stream()
+.map(
+namedProperty -> {
+if (namedProperty.getProperty()
+instanceof 
PlannerRowtimeAttribute) {
+return new 
PlannerNamedWindowProperty(
+namedProperty.getName(),
+new 
PlannerRowtimeAttribute(newAlias));
+} else {
+return namedProperty;
+}
+})
+.collect(Collectors.toList());
+newNamedProperties =
+
JavaConverters.iterableAsScalaIterableConverter(newNamedPropertiesList)
+.asScala()
+.toSeq();
+} else {
+   

[GitHub] [flink] beyond1920 commented on a change in pull request #16620: [FLINK-23246][table-planner] Refactor the time indicator materialization

2021-08-04 Thread GitBox


beyond1920 commented on a change in pull request #16620:
URL: https://github.com/apache/flink/pull/16620#discussion_r682345064



##
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java
##
@@ -544,11 +595,110 @@ private RelNode convertAggInput(Aggregate agg) {
 .collect(Collectors.toList());
 }
 
+private FlinkLogicalWindowAggregate 
visitWindowAggregate(FlinkLogicalWindowAggregate agg) {
+RelNode newInput = convertAggInput(agg);
+List updatedAggCalls = convertAggregateCalls(agg);
+LogicalWindow oldWindow = agg.getWindow();
+Seq oldNamedProperties = 
agg.getNamedProperties();
+FieldReferenceExpression oldTimeAttribute = 
agg.getWindow().timeAttribute();
+LogicalType oldTimeAttributeType = 
oldTimeAttribute.getOutputDataType().getLogicalType();
+boolean isRowtimeIndicator = 
LogicalTypeChecks.isRowtimeAttribute(oldTimeAttributeType);
+boolean convertedToRowtimeTimestampLtz;
+if (!isRowtimeIndicator) {
+convertedToRowtimeTimestampLtz = false;
+} else {
+int timeIndicatorIdx = oldTimeAttribute.getFieldIndex();
+RelDataType oldType =
+
agg.getInput().getRowType().getFieldList().get(timeIndicatorIdx).getType();
+RelDataType newType =
+
newInput.getRowType().getFieldList().get(timeIndicatorIdx).getType();
+convertedToRowtimeTimestampLtz =
+isTimestampLtzType(newType) && 
!isTimestampLtzType(oldType);
+}
+LogicalWindow newWindow;
+Seq newNamedProperties;
+if (convertedToRowtimeTimestampLtz) {
+// MATCH_ROWTIME may be converted from rowtime attribute to 
timestamp_ltz rowtime
+// attribute, if time indicator of current window aggregate 
depends on input
+// MATCH_ROWTIME, we should rewrite logicalWindow and 
namedProperties.
+LogicalType newTimestampLtzType =
+new LocalZonedTimestampType(
+oldTimeAttributeType.isNullable(), 
TimestampKind.ROWTIME, 3);
+FieldReferenceExpression newFieldRef =
+new FieldReferenceExpression(
+oldTimeAttribute.getName(),
+fromLogicalTypeToDataType(newTimestampLtzType),
+oldTimeAttribute.getInputIndex(),
+oldTimeAttribute.getFieldIndex());
+PlannerWindowReference newAlias =
+new PlannerWindowReference(
+oldWindow.aliasAttribute().getName(), 
newTimestampLtzType);
+if (oldWindow instanceof TumblingGroupWindow) {
+TumblingGroupWindow window = (TumblingGroupWindow) oldWindow;
+newWindow = new TumblingGroupWindow(newAlias, newFieldRef, 
window.size());
+} else if (oldWindow instanceof SlidingGroupWindow) {
+SlidingGroupWindow window = (SlidingGroupWindow) oldWindow;
+newWindow =
+new SlidingGroupWindow(
+newAlias, newFieldRef, window.size(), 
window.slide());
+} else if (oldWindow instanceof SessionGroupWindow) {
+SessionGroupWindow window = (SessionGroupWindow) oldWindow;
+newWindow = new SessionGroupWindow(newAlias, newFieldRef, 
window.gap());
+} else {
+throw new TableException(
+String.format(
+"This is a bug and should not happen. Please 
file an issue. Invalid window %s.",
+oldWindow.getClass().getSimpleName()));
+}
+List newNamedPropertiesList =
+
JavaConverters.seqAsJavaListConverter(oldNamedProperties).asJava().stream()
+.map(
+namedProperty -> {
+if (namedProperty.getProperty()
+instanceof 
PlannerRowtimeAttribute) {
+return new 
PlannerNamedWindowProperty(
+namedProperty.getName(),
+new 
PlannerRowtimeAttribute(newAlias));
+} else {
+return namedProperty;
+}
+})
+.collect(Collectors.toList());
+newNamedProperties =
+
JavaConverters.iterableAsScalaIterableConverter(newNamedPropertiesList)
+.asScala()
+.toSeq();
+} else {
+   

[GitHub] [flink] beyond1920 commented on a change in pull request #16620: [FLINK-23246][table-planner] Refactor the time indicator materialization

2021-08-03 Thread GitBox


beyond1920 commented on a change in pull request #16620:
URL: https://github.com/apache/flink/pull/16620#discussion_r682258760



##
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java
##
@@ -544,11 +595,110 @@ private RelNode convertAggInput(Aggregate agg) {
 .collect(Collectors.toList());
 }
 
+private FlinkLogicalWindowAggregate 
visitWindowAggregate(FlinkLogicalWindowAggregate agg) {
+RelNode newInput = convertAggInput(agg);
+List updatedAggCalls = convertAggregateCalls(agg);
+LogicalWindow oldWindow = agg.getWindow();
+Seq oldNamedProperties = 
agg.getNamedProperties();
+FieldReferenceExpression oldTimeAttribute = 
agg.getWindow().timeAttribute();
+LogicalType oldTimeAttributeType = 
oldTimeAttribute.getOutputDataType().getLogicalType();
+boolean isRowtimeIndicator = 
LogicalTypeChecks.isRowtimeAttribute(oldTimeAttributeType);
+boolean convertedToRowtimeTimestampLtz;
+if (!isRowtimeIndicator) {
+convertedToRowtimeTimestampLtz = false;
+} else {
+int timeIndicatorIdx = oldTimeAttribute.getFieldIndex();
+RelDataType oldType =
+
agg.getInput().getRowType().getFieldList().get(timeIndicatorIdx).getType();
+RelDataType newType =
+
newInput.getRowType().getFieldList().get(timeIndicatorIdx).getType();
+convertedToRowtimeTimestampLtz =
+isTimestampLtzType(newType) && 
!isTimestampLtzType(oldType);
+}
+LogicalWindow newWindow;
+Seq newNamedProperties;
+if (convertedToRowtimeTimestampLtz) {
+// MATCH_ROWTIME may be converted from rowtime attribute to 
timestamp_ltz rowtime
+// attribute, if time indicator of current window aggregate 
depends on input
+// MATCH_ROWTIME, we should rewrite logicalWindow and 
namedProperties.
+LogicalType newTimestampLtzType =
+new LocalZonedTimestampType(
+oldTimeAttributeType.isNullable(), 
TimestampKind.ROWTIME, 3);
+FieldReferenceExpression newFieldRef =
+new FieldReferenceExpression(
+oldTimeAttribute.getName(),
+fromLogicalTypeToDataType(newTimestampLtzType),
+oldTimeAttribute.getInputIndex(),
+oldTimeAttribute.getFieldIndex());
+PlannerWindowReference newAlias =
+new PlannerWindowReference(
+oldWindow.aliasAttribute().getName(), 
newTimestampLtzType);
+if (oldWindow instanceof TumblingGroupWindow) {
+TumblingGroupWindow window = (TumblingGroupWindow) oldWindow;
+newWindow = new TumblingGroupWindow(newAlias, newFieldRef, 
window.size());
+} else if (oldWindow instanceof SlidingGroupWindow) {
+SlidingGroupWindow window = (SlidingGroupWindow) oldWindow;
+newWindow =
+new SlidingGroupWindow(
+newAlias, newFieldRef, window.size(), 
window.slide());
+} else if (oldWindow instanceof SessionGroupWindow) {
+SessionGroupWindow window = (SessionGroupWindow) oldWindow;
+newWindow = new SessionGroupWindow(newAlias, newFieldRef, 
window.gap());
+} else {
+throw new TableException(
+String.format(
+"This is a bug and should not happen. Please 
file an issue. Invalid window %s.",
+oldWindow.getClass().getSimpleName()));
+}
+List newNamedPropertiesList =
+
JavaConverters.seqAsJavaListConverter(oldNamedProperties).asJava().stream()
+.map(
+namedProperty -> {
+if (namedProperty.getProperty()
+instanceof 
PlannerRowtimeAttribute) {
+return new 
PlannerNamedWindowProperty(
+namedProperty.getName(),
+new 
PlannerRowtimeAttribute(newAlias));
+} else {
+return namedProperty;
+}
+})
+.collect(Collectors.toList());
+newNamedProperties =
+
JavaConverters.iterableAsScalaIterableConverter(newNamedPropertiesList)
+.asScala()
+.toSeq();
+} else {
+   

[GitHub] [flink] beyond1920 commented on a change in pull request #16620: [FLINK-23246][table-planner] Refactor the time indicator materialization

2021-08-03 Thread GitBox


beyond1920 commented on a change in pull request #16620:
URL: https://github.com/apache/flink/pull/16620#discussion_r682258450



##
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java
##
@@ -544,11 +595,110 @@ private RelNode convertAggInput(Aggregate agg) {
 .collect(Collectors.toList());
 }
 
+private FlinkLogicalWindowAggregate 
visitWindowAggregate(FlinkLogicalWindowAggregate agg) {

Review comment:
   MatchRecognizeTest#testMatchRecognizeOnRowtimeLTZ




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] beyond1920 commented on a change in pull request #16620: [FLINK-23246][table-planner] Refactor the time indicator materialization

2021-07-28 Thread GitBox


beyond1920 commented on a change in pull request #16620:
URL: https://github.com/apache/flink/pull/16620#discussion_r678085882



##
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalWindowAggregateRuleBase.scala
##
@@ -327,9 +328,17 @@ abstract class LogicalWindowAggregateRuleBase(description: 
String)
   windowExpression: RexCall): RexNode
 
   /** Returns the expression that replaces the window expression after the 
aggregation. */
-  private[table] def getOutAggregateGroupExpression(
+  private def getOutAggregateGroupExpression(
   rexBuilder: RexBuilder,
-  windowExpression: RexCall): RexNode
+  windowExpression: RexCall): RexNode = {
+val zeroLiteral = rexBuilder.makeZeroLiteral(windowExpression.getType)
+if (isTimeIndicatorType(windowExpression.getType)) {
+  // cast zero literal to time indicator field

Review comment:
   In the previous version, this would not happen because the rowtime 
indicator in group key would be materialized to regular timestamp.
   But after we move time indicator materialize after logical_rewrite, the rule 
need to encounter the window expression in group key.
   It's safe to simply cast the literal to time indicator type, because the 
window expression column in group key would be projected out in the successor 
Project node.

##
File path: 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.xml
##
@@ -255,7 +255,7 @@ LogicalProject(a=[$0], b=[$6])