wuchong commented on a change in pull request #11985:
URL: https://github.com/apache/flink/pull/11985#discussion_r421913683



##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
##########
@@ -92,177 +73,21 @@ class StreamExecTableSourceScan(
     replaceInput(ordinalInParent, newInputNode.asInstanceOf[RelNode])
   }
 
-  override protected def translateToPlanInternal(
-      planner: StreamPlanner): Transformation[RowData] = {
-    val config = planner.getTableConfig
-    val inputTransform = getSourceTransformation(planner.getExecEnv)
-
-    val fieldIndexes = computeIndexMapping()
-
-    val inputDataType = inputTransform.getOutputType
-    val producedDataType = tableSource.getProducedDataType
-
-    // check that declared and actual type of table source DataStream are 
identical
-    if (inputDataType !=
-        TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(producedDataType)) {
-      throw new TableException(s"TableSource of type 
${tableSource.getClass.getCanonicalName} " +
-        s"returned a DataStream of data type $inputDataType that does not 
match with the " +
-        s"data type $producedDataType declared by the 
TableSource.getProducedDataType() method. " +
-        s"Please validate the implementation of the TableSource.")
-    }
-
-    // get expression to extract rowtime attribute
-    val rowtimeExpression: Option[RexNode] = 
TableSourceUtil.getRowtimeAttributeDescriptor(
-      tableSource,
-      tableSourceTable.getRowType
-    ).map(desc =>
-      TableSourceUtil.getRowtimeExtractionExpression(
-        desc.getTimestampExtractor,
-        producedDataType,
-        planner.getRelBuilder,
-        nameMapping
-      )
-    )
-
-    val streamTransformation = if (needInternalConversion) {
-      // extract time if the index is -1 or -2.
-      val (extractElement, resetElement) =
-        if (ScanUtil.hasTimeAttributeField(fieldIndexes)) {
-          (s"ctx.$ELEMENT = $ELEMENT;", s"ctx.$ELEMENT = null;")
-        } else {
-          ("", "")
-        }
-      val ctx = CodeGeneratorContext(config).setOperatorBaseClass(
-        classOf[AbstractProcessStreamOperator[RowData]])
-      // the produced type may not carry the correct precision user defined in 
DDL, because
-      // it may be converted from legacy type. Fix precision using logical 
schema from DDL.
-      // Code generation requires the correct precision of input fields.
-      val fixedProducedDataType = 
TableSourceUtil.fixPrecisionForProducedDataType(
-        tableSource,
-        FlinkTypeFactory.toLogicalRowType(tableSourceTable.getRowType))
-      val conversionTransform = ScanUtil.convertToInternalRow(
-        ctx,
-        inputTransform.asInstanceOf[Transformation[Any]],
-        fieldIndexes,
-        fixedProducedDataType,
-        getRowType,
-        getTable.getQualifiedName,
-        config,
-        rowtimeExpression,
-        beforeConvert = extractElement,
-        afterConvert = resetElement)
-      conversionTransform
-    } else {
-      inputTransform.asInstanceOf[Transformation[RowData]]
-    }
-
-    val ingestedTable = new DataStream(planner.getExecEnv, 
streamTransformation)
-
-    // generate watermarks for rowtime indicator
-    val rowtimeDescOption: Option[RowtimeAttributeDescriptor] =
-      TableSourceUtil.getRowtimeAttributeDescriptor(tableSource, 
tableSourceTable.getRowType)
-
-    val withWatermarks = rowtimeDescOption match {
-      case Some(rowtimeDesc) =>
-        val rowtimeFieldIdx = 
getRowType.getFieldNames.indexOf(rowtimeDesc.getAttributeName)
-        val watermarkStrategy = rowtimeDesc.getWatermarkStrategy
-        watermarkStrategy match {
-          case p: PeriodicWatermarkAssigner =>
-            val watermarkGenerator = new 
PeriodicWatermarkAssignerWrapper(rowtimeFieldIdx, p)
-            ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator)
-          case p: PunctuatedWatermarkAssigner =>
-            val watermarkGenerator =
-              new PunctuatedWatermarkAssignerWrapper(rowtimeFieldIdx, p, 
producedDataType)
-            ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator)
-          case _: PreserveWatermarks =>
-            // The watermarks have already been provided by the underlying 
DataStream.
-            ingestedTable
-        }
-      case None =>
-        // No need to generate watermarks if no rowtime attribute is specified.
-        ingestedTable
-    }
-    withWatermarks.getTransformation
-  }
-
-  private def needInternalConversion: Boolean = {
-    val fieldIndexes = computeIndexMapping()
-    ScanUtil.hasTimeAttributeField(fieldIndexes) ||
-      ScanUtil.needsConversion(tableSource.getProducedDataType)
-  }
-
-  override def createInput[IN](
+  override protected def createInputFormatTransformation(
       env: StreamExecutionEnvironment,
-      format: InputFormat[IN, _ <: InputSplit],
-      t: TypeInformation[IN]): Transformation[IN] = {
-    // See StreamExecutionEnvironment.createInput, it is better to deal with 
checkpoint.
-    // The disadvantage is that streaming not support multi-paths.
-    env.createInput(format, 
t).name(tableSource.explainSource()).getTransformation
-  }
-
-  private def computeIndexMapping()
-    : Array[Int] = {
-    TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(
-      tableSource,
-      FlinkTypeFactory.toTableSchema(getRowType).getTableColumns,
-      true,
-      nameMapping
-    )
-  }
-
-  private lazy val nameMapping: JFunction[String, String] = tableSource match {
-    case mapping: DefinedFieldMapping if mapping.getFieldMapping != null =>
-      new JFunction[String, String] {
-        override def apply(t: String): String = mapping.getFieldMapping.get(t)
-      }
-    case _ => JFunction.identity()
+      inputFormat: InputFormat[RowData, _],
+      name: String,
+      outTypeInfo: RowDataTypeInfo): Transformation[RowData] = {
+    // It's better to use StreamExecutionEnvironment.createInput()
+    // rather than addSource() for streaming, because it take care of 
checkpoint.
+    env
+      .createInput(inputFormat, outTypeInfo)
+      .name(name)
+      .getTransformation
   }
-}
-
-/**
-  * Generates periodic watermarks based on a [[PeriodicWatermarkAssigner]].
-  *
-  * @param timeFieldIdx the index of the rowtime attribute.
-  * @param assigner the watermark assigner.
-  */
-private class PeriodicWatermarkAssignerWrapper(
-    timeFieldIdx: Int,
-    assigner: PeriodicWatermarkAssigner)
-  extends AssignerWithPeriodicWatermarks[RowData] {
-
-  override def getCurrentWatermark: Watermark = assigner.getWatermark
-
-  override def extractTimestamp(row: RowData, previousElementTimestamp: Long): 
Long = {
-    val timestamp: Long = row.getTimestamp(timeFieldIdx, 3).getMillisecond
-    assigner.nextTimestamp(timestamp)
-    0L
-  }
-}
 
-/**
-  * Generates periodic watermarks based on a [[PunctuatedWatermarkAssigner]].
-  *
-  * @param timeFieldIdx the index of the rowtime attribute.
-  * @param assigner the watermark assigner.
-  */
-private class PunctuatedWatermarkAssignerWrapper(
-    timeFieldIdx: Int,
-    assigner: PunctuatedWatermarkAssigner,
-    sourceType: DataType)
-  extends AssignerWithPunctuatedWatermarks[RowData] {
-
-  private val converter =
-    DataFormatConverters.getConverterForDataType((sourceType match {
-      case _: FieldsDataType => sourceType
-      case _ => DataTypes.ROW(DataTypes.FIELD("f0", sourceType))
-    }).bridgedTo(classOf[Row])).asInstanceOf[DataFormatConverter[RowData, Row]]
-
-  override def checkAndGetNextWatermark(row: RowData, ts: Long): Watermark = {
-    val timestamp: Long = row.getLong(timeFieldIdx)
-    assigner.getWatermark(converter.toExternal(row), timestamp)
-  }
-
-  override def extractTimestamp(element: RowData, previousElementTimestamp: 
Long): Long = {
-    0L
+  override protected def translateToPlanInternal(
+      planner: StreamPlanner): Transformation[RowData] = {
+    createSourceTransformation(planner.getExecEnv, getRelDetailedDescription)
   }

Review comment:
       Yes. This is on purpose. We should prefer to use 
`getRelDetailedDescription` which have more detailed and formated information 
about this node. Besides, the `DynamicTableSource#asSummaryString` is not the 
same meaning with `TableSource#explainSource`. `asSummaryString` is just used 
for logging (such as `Kafka_0.9`), it doesn't contain detailed information of 
this node.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to