Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4380#discussion_r129838980 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala --- @@ -108,28 +108,25 @@ class DataStreamSort( case _ if FlinkTypeFactory.isProctimeIndicatorType(timeType) => --- End diff -- There is a lot code duplication in this class. All `createSort` methods look basically the same and mostly differ in the `SortUtil` methods they call. I think we don't need these methods and can do everything with a few if conditions directly in the `translateToPlan()` method. Basically: ``` val returnTypeInfo = CRowTypeInfo(schema.physicalTypeInfo) val pFunc = if (FlinkTypeFactory.isProctimeIndicatorType(timeType) && sortCollation.getFieldCollations.size() == 1) { SortUtil.createProcTimeNoSortFunction(..., sortOffset, sortFetch) } else if (FlinkTypeFactory.isProctimeIndicatorType(timeType)) { SortUtil.createProcTimeSortFunction(..., sortOffset, sortFetch) } else if (FlinkTypeFactory.isRowtimeIndicatorType(timeType)) { SortUtil.createRowTimeSortFunction(..., sortOffset, sortFetch) } else { // error } inputDS.keyBy(new NullByteKeySelector[CRow]) .process(processFunction).setParallelism(1).setMaxParallelism(1) .returns(returnTypeInfo) .asInstanceOf[DataStream[CRow]] ``` We would have to change the `IdentityCRowMap` to a ProcessFunction but that's fine. `ORDER BY proctime` is a corner case that does not add functionality and is only supported for syntactical completeness. IMO it is not worth added code complexity.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---