Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4380#discussion_r129838980
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala
 ---
    @@ -108,28 +108,25 @@ class DataStreamSort(
             case _ if FlinkTypeFactory.isProctimeIndicatorType(timeType)  =>
    --- End diff --
    
    There is a lot code duplication in this class. All `createSort` methods 
look basically the same and mostly differ in the `SortUtil` methods they call. 
I think we don't need these methods and can do everything with a few if 
conditions directly in the `translateToPlan()` method.
    
    Basically:
    ```
    val returnTypeInfo = CRowTypeInfo(schema.physicalTypeInfo)
    
    val pFunc = if (FlinkTypeFactory.isProctimeIndicatorType(timeType) && 
sortCollation.getFieldCollations.size() == 1) {
      SortUtil.createProcTimeNoSortFunction(..., sortOffset, sortFetch)
    } else if (FlinkTypeFactory.isProctimeIndicatorType(timeType)) {
      SortUtil.createProcTimeSortFunction(..., sortOffset, sortFetch)
    } else if (FlinkTypeFactory.isRowtimeIndicatorType(timeType)) {
      SortUtil.createRowTimeSortFunction(..., sortOffset, sortFetch)
    } else {
      // error
    }
    
    inputDS.keyBy(new NullByteKeySelector[CRow])
      .process(processFunction).setParallelism(1).setMaxParallelism(1)
      .returns(returnTypeInfo)
      .asInstanceOf[DataStream[CRow]]
    ```
    
    We would have to change the `IdentityCRowMap` to a ProcessFunction but 
that's fine. `ORDER BY proctime` is a corner case that does not add 
functionality and is only supported for syntactical completeness. IMO it is not 
worth added code complexity.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to