wuchong commented on a change in pull request #8203: 
[Flink-12208][table-planner-blink] Support translation from StreamExecSort / 
TemporalSort / SortLimit/ Limit to StreamTransformation.
URL: https://github.com/apache/flink/pull/8203#discussion_r277502987
 
 

 ##########
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecSortLimit.scala
 ##########
 @@ -97,5 +114,113 @@ class StreamExecSortLimit(
     }
   }
 
-  // TODO check `fetch` is not null in translateToPlan
+  //~ ExecNode methods 
-----------------------------------------------------------
+
+  override def getInputNodes: util.List[ExecNode[StreamTableEnvironment, _]] = 
{
+    List(getInput.asInstanceOf[ExecNode[StreamTableEnvironment, _]])
+  }
+
+  override protected def translateToPlanInternal(
+      tableEnv: StreamTableEnvironment): StreamTransformation[BaseRow] = {
+    if (fetch == null) {
+      throw new TableException(
+        "FETCH is missed, which on streaming table is not supported currently")
+    }
+
+    val inputTransform = getInputNodes.get(0).translateToPlan(tableEnv)
+      .asInstanceOf[StreamTransformation[BaseRow]]
+    val inputRowTypeInfo = 
inputTransform.getOutputType.asInstanceOf[BaseRowTypeInfo]
+    val fieldCollations = sortCollation.getFieldCollations
+    val (sortFields, sortDirections, nullsIsLast) = 
SortUtil.getKeysAndOrders(fieldCollations)
+    val sortKeySelector = KeySelectorUtil.getBaseRowSelector(sortFields, 
inputRowTypeInfo)
+    val sortKeyType = sortKeySelector.getProducedType
+    val tableConfig = tableEnv.getConfig
+    val sortKeyComparator = ComparatorCodeGenerator.gen(
+      tableConfig,
+      "StreamExecSortComparator",
+      sortFields.indices.toArray,
+      sortKeyType.getInternalTypes,
+      sortDirections,
+      nullsIsLast)
+    val generateRetraction = StreamExecRetractionRules.isAccRetract(this)
+    val cacheSize = 
tableConfig.getConf.getLong(TableConfigOptions.SQL_EXEC_TOPN_CACHE_SIZE)
+    val minIdleStateRetentionTime = tableConfig.getMinIdleStateRetentionTime
+    val maxIdleStateRetentionTime = tableConfig.getMaxIdleStateRetentionTime
+
+    // rankStart begin with 1
+    val rankRange = new ConstantRankRange(limitStart + 1, limitEnd)
+    val rankType = RankType.ROW_NUMBER
+    val outputRankNumber = false
+
+    // Use RankFunction underlying StreamExecSortLimit
+    val processFunction = getStrategy(true) match {
+      case AppendFastStrategy =>
+        new AppendOnlyTopNFunction(
+          minIdleStateRetentionTime,
+          maxIdleStateRetentionTime,
+          inputRowTypeInfo,
+          sortKeyComparator,
+          sortKeySelector,
+          rankType,
+          rankRange,
+          generateRetraction,
+          outputRankNumber,
+          cacheSize)
+
+      case UpdateFastStrategy(primaryKeys) =>
+        val rowKeySelector = KeySelectorUtil.getBaseRowSelector(primaryKeys, 
inputRowTypeInfo)
+        new UpdatableTopNFunction(
+          minIdleStateRetentionTime,
+          maxIdleStateRetentionTime,
+          inputRowTypeInfo,
+          rowKeySelector,
+          sortKeyComparator,
+          sortKeySelector,
+          rankType,
+          rankRange,
+          generateRetraction,
+          outputRankNumber,
+          cacheSize)
+
+      // TODO Use UnaryUpdateTopNFunction after SortedMapState is merged
+      case RetractStrategy | UnaryUpdateStrategy(_) =>
+        val equaliserCodeGen = new 
EqualiserCodeGenerator(inputRowTypeInfo.getInternalTypes)
+        val generatedEqualiser = 
equaliserCodeGen.generateRecordEqualiser("RankValueEqualiser")
+        new RetractableTopNFunction(
+          minIdleStateRetentionTime,
+          maxIdleStateRetentionTime,
+          inputRowTypeInfo,
+          sortKeyComparator,
+          sortKeySelector,
+          rankType,
+          rankRange,
+          generatedEqualiser,
+          generateRetraction,
+          outputRankNumber)
+    }
+    val operator = new KeyedProcessOperator(processFunction)
+    processFunction.setKeyContext(operator)
+
+    val outputRowTypeInfo = 
FlinkTypeFactory.toInternalRowType(getRowType).toTypeInfo
+
+    // sets parallelism to 1 since StreamExecSortLimit could only work in 
global mode.
+    val ret = new OneInputTransformation(
+      inputTransform,
+      getOperatorName,
+      operator,
+      outputRowTypeInfo,
+      1)
 
 Review comment:
   Force maxParallelism to `1` too? 
   
   `ret.setMaxParallelism(1)`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to