bvarghese1 commented on code in PR #25753:
URL: https://github.com/apache/flink/pull/25753#discussion_r2012509495


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecOverAggregate.java:
##########
@@ -309,73 +320,145 @@ private KeyedProcessFunction<RowData, RowData, RowData> 
createUnboundedOverProce
                         aggInputRowType,
                         JavaScalaConversionUtil.toScala(aggCalls),
                         new boolean[aggCalls.size()],
-                        false, // needRetraction
+                        false, // needInputCount
                         true, // isStateBackendDataViews
                         true); // needDistinctInfo
 
         LogicalType[] fieldTypes = inputRowType.getChildren().toArray(new 
LogicalType[0]);
-        AggsHandlerCodeGenerator generator =
+
+        AggsHandlerCodeGenerator aggsGenerator =
                 new AggsHandlerCodeGenerator(
                         ctx,
                         relBuilder,
                         
JavaScalaConversionUtil.toScala(Arrays.asList(fieldTypes)),
                         false); // copyInputField
 
-        GeneratedAggsHandleFunction genAggsHandler =
-                generator
+        aggsGenerator =
+                aggsGenerator
                         .needAccumulate()
                         // over agg code gen must pass the constants
-                        
.withConstants(JavaScalaConversionUtil.toScala(constants))
-                        .generateAggsHandler("UnboundedOverAggregateHelper", 
aggInfoList);
+                        
.withConstants(JavaScalaConversionUtil.toScala(constants));
+
+        GeneratedAggsHandleFunction genAggsHandler =
+                
aggsGenerator.generateAggsHandler("UnboundedOverAggregateHelper", aggInfoList);
 
         LogicalType[] flattenAccTypes =
                 Arrays.stream(aggInfoList.getAccTypes())
                         
.map(LogicalTypeDataTypeConverter::fromDataTypeToLogicalType)
                         .toArray(LogicalType[]::new);
 
-        if (rowTimeIdx >= 0) {
-            switch (unboundedOverVersion) {
-                // Currently there is no migration path between first and 
second versions.
-                case AbstractRowTimeUnboundedPrecedingOver.FIRST_OVER_VERSION:
-                    if (isRowsClause) {
-                        // ROWS unbounded over process function
-                        return new RowTimeRowsUnboundedPrecedingFunction<>(
+        switch (timeAttribute) {
+            case ROW_TIME:
+                final int rowTimeIdx = orderKeys[0];
+                switch (unboundedOverVersion) {
+                    // Currently there is no migration path between first and 
second versions.
+                    case 
AbstractRowTimeUnboundedPrecedingOver.FIRST_OVER_VERSION:
+                        if (isRowsClause) {
+                            // ROWS unbounded over process function
+                            return new RowTimeRowsUnboundedPrecedingFunction<>(
+                                    config.getStateRetentionTime(),
+                                    
TableConfigUtils.getMaxIdleStateRetentionTime(config),
+                                    genAggsHandler,
+                                    flattenAccTypes,
+                                    fieldTypes,
+                                    rowTimeIdx);
+                        } else {
+                            // RANGE unbounded over process function
+                            return new 
RowTimeRangeUnboundedPrecedingFunction<>(
+                                    config.getStateRetentionTime(),
+                                    
TableConfigUtils.getMaxIdleStateRetentionTime(config),
+                                    genAggsHandler,
+                                    flattenAccTypes,
+                                    fieldTypes,
+                                    rowTimeIdx);
+                        }
+                    case 
RowTimeUnboundedPrecedingOverFunctionV2.SECOND_OVER_VERSION:
+                        return new RowTimeUnboundedPrecedingOverFunctionV2<>(
+                                isRowsClause,
                                 config.getStateRetentionTime(),
                                 
TableConfigUtils.getMaxIdleStateRetentionTime(config),
                                 genAggsHandler,
                                 flattenAccTypes,
                                 fieldTypes,
                                 rowTimeIdx);
-                    } else {
-                        // RANGE unbounded over process function
-                        return new RowTimeRangeUnboundedPrecedingFunction<>(
-                                config.getStateRetentionTime(),
-                                
TableConfigUtils.getMaxIdleStateRetentionTime(config),
-                                genAggsHandler,
-                                flattenAccTypes,
-                                fieldTypes,
-                                rowTimeIdx);
-                    }
-                case 
RowTimeUnboundedPrecedingOverFunctionV2.SECOND_OVER_VERSION:
-                    return new RowTimeUnboundedPrecedingOverFunctionV2<>(
-                            isRowsClause,
-                            config.getStateRetentionTime(),
-                            
TableConfigUtils.getMaxIdleStateRetentionTime(config),
-                            genAggsHandler,
-                            flattenAccTypes,
-                            fieldTypes,
-                            rowTimeIdx);
-                default:
-                    throw new UnsupportedOperationException(
-                            "Unsupported unbounded over version: "
-                                    + unboundedOverVersion
-                                    + ". Valid versions are 1 and 2.");
-            }
-        } else {
-            return new ProcTimeUnboundedPrecedingFunction<>(
-                    
StateConfigUtil.createTtlConfig(config.getStateRetentionTime()),
-                    genAggsHandler,
-                    flattenAccTypes);
+                    default:
+                        throw new UnsupportedOperationException(
+                                "Unsupported unbounded over version: "
+                                        + unboundedOverVersion
+                                        + ". Valid versions are 1 and 2.");
+                }
+            case PROC_TIME:
+                return new ProcTimeUnboundedPrecedingFunction<>(
+                        
StateConfigUtil.createTtlConfig(config.getStateRetentionTime()),
+                        genAggsHandler,
+                        flattenAccTypes);
+            case NON_TIME:
+                if (isRowsClause) {
+                    // Non-Time Rows Unbounded Preceding Function
+                    throw new TableException(
+                            "Non-Time Rows Unbounded Preceding Function not 
supported yet.");

Review Comment:
   Fixed as suggested



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecOverAggregate.java:
##########
@@ -439,33 +524,44 @@ private KeyedProcessFunction<RowData, RowData, RowData> 
createBoundedOverProcess
                         
.map(LogicalTypeDataTypeConverter::fromDataTypeToLogicalType)
                         .toArray(LogicalType[]::new);
 
-        if (rowTimeIdx >= 0) {
-            if (isRowsClause) {
-                return new RowTimeRowsBoundedPrecedingFunction<>(
-                        config.getStateRetentionTime(),
-                        TableConfigUtils.getMaxIdleStateRetentionTime(config),
-                        genAggsHandler,
-                        flattenAccTypes,
-                        fieldTypes,
-                        precedingOffset,
-                        rowTimeIdx);
-            } else {
-                return new RowTimeRangeBoundedPrecedingFunction<>(
-                        genAggsHandler, flattenAccTypes, fieldTypes, 
precedingOffset, rowTimeIdx);
-            }
-        } else {
-            if (isRowsClause) {
-                return new ProcTimeRowsBoundedPrecedingFunction<>(
-                        config.getStateRetentionTime(),
-                        TableConfigUtils.getMaxIdleStateRetentionTime(config),
-                        genAggsHandler,
-                        flattenAccTypes,
-                        fieldTypes,
-                        precedingOffset);
-            } else {
-                return new ProcTimeRangeBoundedPrecedingFunction<>(
-                        genAggsHandler, flattenAccTypes, fieldTypes, 
precedingOffset);
-            }
+        switch (timeAttribute) {
+            case ROW_TIME:
+                final int rowTimeIdx = orderKeys[0];
+                if (isRowsClause) {
+                    return new RowTimeRowsBoundedPrecedingFunction<>(
+                            config.getStateRetentionTime(),
+                            
TableConfigUtils.getMaxIdleStateRetentionTime(config),
+                            genAggsHandler,
+                            flattenAccTypes,
+                            fieldTypes,
+                            precedingOffset,
+                            rowTimeIdx);
+                } else {
+                    return new RowTimeRangeBoundedPrecedingFunction<>(
+                            genAggsHandler,
+                            flattenAccTypes,
+                            fieldTypes,
+                            precedingOffset,
+                            rowTimeIdx);
+                }
+            case PROC_TIME:
+                if (isRowsClause) {
+                    return new ProcTimeRowsBoundedPrecedingFunction<>(
+                            config.getStateRetentionTime(),
+                            
TableConfigUtils.getMaxIdleStateRetentionTime(config),
+                            genAggsHandler,
+                            flattenAccTypes,
+                            fieldTypes,
+                            precedingOffset);
+                } else {
+                    return new ProcTimeRangeBoundedPrecedingFunction<>(
+                            genAggsHandler, flattenAccTypes, fieldTypes, 
precedingOffset);
+                }
+            case NON_TIME:
+                throw new TableException(
+                        "Non-time attribute sort is not supported for bounded 
over aggregate");
+            default:
+                throw new TableException("Unsupported bounded operation");

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to