danny0405 commented on a change in pull request #11985:
URL: https://github.com/apache/flink/pull/11985#discussion_r421898493



##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
##########
@@ -76,7 +73,7 @@ class FlinkLogicalTableSourceScan(
 
   override def explainTerms(pw: RelWriter): RelWriter = {
     super.explainTerms(pw)
-      .item("fields", tableSource.getTableSchema.getFieldNames.mkString(", "))
+      .item("fields", getRowType.getFieldNames.mkString(", "))
   }

Review comment:
       Is this because we have new way to support fields push down ?

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
##########
@@ -92,177 +73,21 @@ class StreamExecTableSourceScan(
     replaceInput(ordinalInParent, newInputNode.asInstanceOf[RelNode])
   }
 
-  override protected def translateToPlanInternal(
-      planner: StreamPlanner): Transformation[RowData] = {
-    val config = planner.getTableConfig
-    val inputTransform = getSourceTransformation(planner.getExecEnv)
-
-    val fieldIndexes = computeIndexMapping()
-
-    val inputDataType = inputTransform.getOutputType
-    val producedDataType = tableSource.getProducedDataType
-
-    // check that declared and actual type of table source DataStream are 
identical
-    if (inputDataType !=
-        TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(producedDataType)) {
-      throw new TableException(s"TableSource of type 
${tableSource.getClass.getCanonicalName} " +
-        s"returned a DataStream of data type $inputDataType that does not 
match with the " +
-        s"data type $producedDataType declared by the 
TableSource.getProducedDataType() method. " +
-        s"Please validate the implementation of the TableSource.")
-    }
-
-    // get expression to extract rowtime attribute
-    val rowtimeExpression: Option[RexNode] = 
TableSourceUtil.getRowtimeAttributeDescriptor(
-      tableSource,
-      tableSourceTable.getRowType
-    ).map(desc =>
-      TableSourceUtil.getRowtimeExtractionExpression(
-        desc.getTimestampExtractor,
-        producedDataType,
-        planner.getRelBuilder,
-        nameMapping
-      )
-    )
-
-    val streamTransformation = if (needInternalConversion) {
-      // extract time if the index is -1 or -2.
-      val (extractElement, resetElement) =
-        if (ScanUtil.hasTimeAttributeField(fieldIndexes)) {
-          (s"ctx.$ELEMENT = $ELEMENT;", s"ctx.$ELEMENT = null;")
-        } else {
-          ("", "")
-        }
-      val ctx = CodeGeneratorContext(config).setOperatorBaseClass(
-        classOf[AbstractProcessStreamOperator[RowData]])
-      // the produced type may not carry the correct precision user defined in 
DDL, because
-      // it may be converted from legacy type. Fix precision using logical 
schema from DDL.
-      // Code generation requires the correct precision of input fields.
-      val fixedProducedDataType = 
TableSourceUtil.fixPrecisionForProducedDataType(
-        tableSource,
-        FlinkTypeFactory.toLogicalRowType(tableSourceTable.getRowType))
-      val conversionTransform = ScanUtil.convertToInternalRow(
-        ctx,
-        inputTransform.asInstanceOf[Transformation[Any]],
-        fieldIndexes,
-        fixedProducedDataType,
-        getRowType,
-        getTable.getQualifiedName,
-        config,
-        rowtimeExpression,
-        beforeConvert = extractElement,
-        afterConvert = resetElement)
-      conversionTransform
-    } else {
-      inputTransform.asInstanceOf[Transformation[RowData]]
-    }
-
-    val ingestedTable = new DataStream(planner.getExecEnv, 
streamTransformation)
-
-    // generate watermarks for rowtime indicator
-    val rowtimeDescOption: Option[RowtimeAttributeDescriptor] =
-      TableSourceUtil.getRowtimeAttributeDescriptor(tableSource, 
tableSourceTable.getRowType)
-
-    val withWatermarks = rowtimeDescOption match {
-      case Some(rowtimeDesc) =>
-        val rowtimeFieldIdx = 
getRowType.getFieldNames.indexOf(rowtimeDesc.getAttributeName)
-        val watermarkStrategy = rowtimeDesc.getWatermarkStrategy
-        watermarkStrategy match {
-          case p: PeriodicWatermarkAssigner =>
-            val watermarkGenerator = new 
PeriodicWatermarkAssignerWrapper(rowtimeFieldIdx, p)
-            ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator)
-          case p: PunctuatedWatermarkAssigner =>
-            val watermarkGenerator =
-              new PunctuatedWatermarkAssignerWrapper(rowtimeFieldIdx, p, 
producedDataType)
-            ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator)
-          case _: PreserveWatermarks =>
-            // The watermarks have already been provided by the underlying 
DataStream.
-            ingestedTable
-        }
-      case None =>
-        // No need to generate watermarks if no rowtime attribute is specified.
-        ingestedTable
-    }
-    withWatermarks.getTransformation
-  }
-
-  private def needInternalConversion: Boolean = {
-    val fieldIndexes = computeIndexMapping()
-    ScanUtil.hasTimeAttributeField(fieldIndexes) ||
-      ScanUtil.needsConversion(tableSource.getProducedDataType)
-  }
-
-  override def createInput[IN](
+  override protected def createInputFormatTransformation(
       env: StreamExecutionEnvironment,
-      format: InputFormat[IN, _ <: InputSplit],
-      t: TypeInformation[IN]): Transformation[IN] = {
-    // See StreamExecutionEnvironment.createInput, it is better to deal with 
checkpoint.
-    // The disadvantage is that streaming not support multi-paths.
-    env.createInput(format, 
t).name(tableSource.explainSource()).getTransformation
-  }
-
-  private def computeIndexMapping()
-    : Array[Int] = {
-    TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(
-      tableSource,
-      FlinkTypeFactory.toTableSchema(getRowType).getTableColumns,
-      true,
-      nameMapping
-    )
-  }
-
-  private lazy val nameMapping: JFunction[String, String] = tableSource match {
-    case mapping: DefinedFieldMapping if mapping.getFieldMapping != null =>
-      new JFunction[String, String] {
-        override def apply(t: String): String = mapping.getFieldMapping.get(t)
-      }
-    case _ => JFunction.identity()
+      inputFormat: InputFormat[RowData, _],
+      name: String,
+      outTypeInfo: RowDataTypeInfo): Transformation[RowData] = {
+    // It's better to use StreamExecutionEnvironment.createInput()
+    // rather than addSource() for streaming, because it take care of 
checkpoint.
+    env
+      .createInput(inputFormat, outTypeInfo)
+      .name(name)
+      .getTransformation
   }
-}
-
-/**
-  * Generates periodic watermarks based on a [[PeriodicWatermarkAssigner]].
-  *
-  * @param timeFieldIdx the index of the rowtime attribute.
-  * @param assigner the watermark assigner.
-  */
-private class PeriodicWatermarkAssignerWrapper(
-    timeFieldIdx: Int,
-    assigner: PeriodicWatermarkAssigner)
-  extends AssignerWithPeriodicWatermarks[RowData] {
-
-  override def getCurrentWatermark: Watermark = assigner.getWatermark
-
-  override def extractTimestamp(row: RowData, previousElementTimestamp: Long): 
Long = {
-    val timestamp: Long = row.getTimestamp(timeFieldIdx, 3).getMillisecond
-    assigner.nextTimestamp(timestamp)
-    0L
-  }
-}
 
-/**
-  * Generates periodic watermarks based on a [[PunctuatedWatermarkAssigner]].
-  *
-  * @param timeFieldIdx the index of the rowtime attribute.
-  * @param assigner the watermark assigner.
-  */
-private class PunctuatedWatermarkAssignerWrapper(
-    timeFieldIdx: Int,
-    assigner: PunctuatedWatermarkAssigner,
-    sourceType: DataType)
-  extends AssignerWithPunctuatedWatermarks[RowData] {
-
-  private val converter =
-    DataFormatConverters.getConverterForDataType((sourceType match {
-      case _: FieldsDataType => sourceType
-      case _ => DataTypes.ROW(DataTypes.FIELD("f0", sourceType))
-    }).bridgedTo(classOf[Row])).asInstanceOf[DataFormatConverter[RowData, Row]]
-
-  override def checkAndGetNextWatermark(row: RowData, ts: Long): Watermark = {
-    val timestamp: Long = row.getLong(timeFieldIdx)
-    assigner.getWatermark(converter.toExternal(row), timestamp)
-  }
-
-  override def extractTimestamp(element: RowData, previousElementTimestamp: 
Long): Long = {
-    0L
+  override protected def translateToPlanInternal(
+      planner: StreamPlanner): Transformation[RowData] = {
+    createSourceTransformation(planner.getExecEnv, getRelDetailedDescription)
   }

Review comment:
       The old code uses `tableSource.explainSource()` as the name, is this 
change intentional ?

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/trait/UpdateKindTrait.scala
##########
@@ -95,4 +98,17 @@ object UpdateKindTrait {
     }
     new UpdateKindTrait(updateKind)
   }
+
+  /**
+   * Creates an instance of [[UpdateKindTrait]] from the given 
[[ChangelogMode]].
+   */
+  def fromChangelogMode(changelogMode: ChangelogMode): UpdateKindTrait = {
+    val hasUpdateBefore = changelogMode.contains(RowKind.UPDATE_BEFORE)
+    val hasUpdateAfter = changelogMode.contains(RowKind.UPDATE_AFTER)
+    (hasUpdateBefore, hasUpdateAfter) match {
+      case (true, true) => BEFORE_AND_AFTER
+      case (false, true) => ONLY_UPDATE_AFTER
+      case _ => NONE
+    }

Review comment:
       Should we throw exception for `NONE` ?

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
##########
@@ -226,4 +251,113 @@ class CatalogSourceTable[T](
     }
     factory.buildRelNodeRowType(fieldNames, fieldTypes)
   }
+
+  /**
+   * Returns true if there is any generated columns defined on the catalog 
table.
+   */
+  private def containsGeneratedColumns(catalogTable: CatalogTable): Boolean = {
+    catalogTable.getSchema.getTableColumns.exists(_.isGenerated)
+  }
+
+  /**
+   * Creates a new catalog table with the given hint options,
+   * but return the original catalog table if the given hint options is empty.
+   */
+  private def createCatalogTableWithHints(hintedOptions: JMap[String, 
String]): CatalogTable = {
+    if (hintedOptions.nonEmpty) {
+      catalogTable.copy(FlinkHints.mergeTableOptions(hintedOptions, 
catalogTable.getOptions))
+    } else {
+      catalogTable
+    }
+  }
+
+  /**
+   * Infers whether the current options is using legacy [[TableSource]].
+   */
+  private def isLegacyOptions(
+      hintedOptions: JMap[String, String],
+      conf: ReadableConfig): Boolean = {
+    val newCatalogTable = createCatalogTableWithHints(hintedOptions)
+    if 
(newCatalogTable.getOptions.contains(ConnectorDescriptorValidator.CONNECTOR_TYPE))
 {
+      true
+    } else {
+      // try to create legacy table source using the options,
+      // some legacy factories uses the new 'connector' key
+      try {
+        findAndCreateLegacyTableSource(hintedOptions, conf)
+        // success, then we will use the legacy factories
+        true
+      } catch {
+        case _: Throwable =>
+          // can't create, then we will use new factories
+          false
+      }
+    }
+  }
+
+  private def validateTableSource(tableSource: DynamicTableSource): Unit = {
+    // validation
+    val unsupportedAbilities = List(
+      classOf[SupportsProjectionPushDown],
+      classOf[SupportsFilterPushDown],
+      classOf[SupportsLimitPushDown],
+      classOf[SupportsPartitionPushDown],
+      classOf[SupportsComputedColumnPushDown],
+      classOf[SupportsWatermarkPushDown]
+    )
+    val tableName = schemaTable.getTableIdentifier.asSummaryString
+    tableSource match {
+      case ts: ScanTableSource =>
+        val changelogMode = ts.getChangelogMode
+        if (!schemaTable.isStreamingMode) {
+          // batch only supports bounded source
+          val provider = 
ts.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE)
+          if (!provider.isBounded) {
+            throw new ValidationException("Cannot query on an unbounded source 
in batch mode, " +
+              s"but '$tableName' is unbounded.")
+          }
+          // batch only supports INSERT only source
+          if (!changelogMode.containsOnly(RowKind.INSERT)) {
+            throw new UnsupportedOperationException(
+              "Currently, batch mode only supports INSERT only source, but " +
+              s"'$tableName' source produces not INSERT only messages")
+          }
+        } else {
+          // sanity check for produced ChangelogMode
+          val hasUpdateBefore = changelogMode.contains(RowKind.UPDATE_BEFORE)
+          val hasUpdateAfter = changelogMode.contains(RowKind.UPDATE_AFTER)
+          (hasUpdateBefore, hasUpdateAfter) match {
+            case (true, true) =>
+              // UPDATE_BEFORE and UPDATE_AFTER, pass
+            case (false, true) =>
+              // only UPDATE_AFTER
+              throw new UnsupportedOperationException(
+                "Currently, ScanTableSource doesn't support producing 
ChangelogMode " +
+                  "which contains UPDATE_AFTER but no UPDATE_BEFORE. Please 
adapt the " +
+                  s"implementation of '${ts.asSummaryString()}' source.")
+            case (true, false) =>
+               // only UPDATE_BEFORE
+              throw new ValidationException(
+                s"'$tableName' source produces ChangelogMode which " +
+                  s"contains UPDATE_BEFORE but doesn't contain UPDATE_AFTER, 
this is invalid.")
+            case _ =>
+              // no updates, pass
+          }
+
+          // watermark defined on a changelog source is not supported
+          if (!catalogTable.getSchema.getWatermarkSpecs.isEmpty &&
+              !changelogMode.containsOnly(RowKind.INSERT)) {
+            throw new UnsupportedOperationException(
+              "Currently, defining WATERMARK on a changelog source is not 
supported.")
+          }
+        }
+      case _ =>
+        unsupportedAbilities.foreach { ability =>
+          if (tableSource.getClass.isAssignableFrom(ability)) {
+            throw new UnsupportedOperationException("Currently, a 
DynamicTableSource with " +
+              s"${ability.getSimpleName} ability is not supported.")
+          }
+        }
+    }

Review comment:
       How about we move to a new CatalogSourceTable and keep the old one as 
`LegacyCatalogSourceTable` ? The code now is a mess.

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
##########
@@ -226,4 +251,113 @@ class CatalogSourceTable[T](
     }
     factory.buildRelNodeRowType(fieldNames, fieldTypes)
   }
+
+  /**
+   * Returns true if there is any generated columns defined on the catalog 
table.
+   */
+  private def containsGeneratedColumns(catalogTable: CatalogTable): Boolean = {
+    catalogTable.getSchema.getTableColumns.exists(_.isGenerated)
+  }
+
+  /**
+   * Creates a new catalog table with the given hint options,
+   * but return the original catalog table if the given hint options is empty.
+   */
+  private def createCatalogTableWithHints(hintedOptions: JMap[String, 
String]): CatalogTable = {
+    if (hintedOptions.nonEmpty) {
+      catalogTable.copy(FlinkHints.mergeTableOptions(hintedOptions, 
catalogTable.getOptions))
+    } else {
+      catalogTable
+    }
+  }
+
+  /**
+   * Infers whether the current options is using legacy [[TableSource]].
+   */
+  private def isLegacyOptions(
+      hintedOptions: JMap[String, String],
+      conf: ReadableConfig): Boolean = {
+    val newCatalogTable = createCatalogTableWithHints(hintedOptions)
+    if 
(newCatalogTable.getOptions.contains(ConnectorDescriptorValidator.CONNECTOR_TYPE))
 {
+      true
+    } else {
+      // try to create legacy table source using the options,
+      // some legacy factories uses the new 'connector' key
+      try {
+        findAndCreateLegacyTableSource(hintedOptions, conf)
+        // success, then we will use the legacy factories
+        true
+      } catch {
+        case _: Throwable =>
+          // can't create, then we will use new factories
+          false
+      }
+    }
+  }
+
+  private def validateTableSource(tableSource: DynamicTableSource): Unit = {
+    // validation
+    val unsupportedAbilities = List(
+      classOf[SupportsProjectionPushDown],
+      classOf[SupportsFilterPushDown],
+      classOf[SupportsLimitPushDown],
+      classOf[SupportsPartitionPushDown],
+      classOf[SupportsComputedColumnPushDown],
+      classOf[SupportsWatermarkPushDown]
+    )
+    val tableName = schemaTable.getTableIdentifier.asSummaryString
+    tableSource match {
+      case ts: ScanTableSource =>
+        val changelogMode = ts.getChangelogMode
+        if (!schemaTable.isStreamingMode) {
+          // batch only supports bounded source
+          val provider = 
ts.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE)
+          if (!provider.isBounded) {
+            throw new ValidationException("Cannot query on an unbounded source 
in batch mode, " +
+              s"but '$tableName' is unbounded.")
+          }
+          // batch only supports INSERT only source
+          if (!changelogMode.containsOnly(RowKind.INSERT)) {
+            throw new UnsupportedOperationException(
+              "Currently, batch mode only supports INSERT only source, but " +
+              s"'$tableName' source produces not INSERT only messages")
+          }
+        } else {
+          // sanity check for produced ChangelogMode
+          val hasUpdateBefore = changelogMode.contains(RowKind.UPDATE_BEFORE)
+          val hasUpdateAfter = changelogMode.contains(RowKind.UPDATE_AFTER)
+          (hasUpdateBefore, hasUpdateAfter) match {
+            case (true, true) =>
+              // UPDATE_BEFORE and UPDATE_AFTER, pass
+            case (false, true) =>
+              // only UPDATE_AFTER
+              throw new UnsupportedOperationException(
+                "Currently, ScanTableSource doesn't support producing 
ChangelogMode " +
+                  "which contains UPDATE_AFTER but no UPDATE_BEFORE. Please 
adapt the " +
+                  s"implementation of '${ts.asSummaryString()}' source.")
+            case (true, false) =>
+               // only UPDATE_BEFORE
+              throw new ValidationException(
+                s"'$tableName' source produces ChangelogMode which " +
+                  s"contains UPDATE_BEFORE but doesn't contain UPDATE_AFTER, 
this is invalid.")
+            case _ =>
+              // no updates, pass
+          }
+
+          // watermark defined on a changelog source is not supported
+          if (!catalogTable.getSchema.getWatermarkSpecs.isEmpty &&
+              !changelogMode.containsOnly(RowKind.INSERT)) {
+            throw new UnsupportedOperationException(
+              "Currently, defining WATERMARK on a changelog source is not 
supported.")
+          }
+        }
+      case _ =>
+        unsupportedAbilities.foreach { ability =>
+          if (tableSource.getClass.isAssignableFrom(ability)) {

Review comment:
       Does the `ScanTableSource ` support all these features now ?

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
##########
@@ -226,4 +251,113 @@ class CatalogSourceTable[T](
     }
     factory.buildRelNodeRowType(fieldNames, fieldTypes)
   }
+
+  /**
+   * Returns true if there is any generated columns defined on the catalog 
table.
+   */
+  private def containsGeneratedColumns(catalogTable: CatalogTable): Boolean = {
+    catalogTable.getSchema.getTableColumns.exists(_.isGenerated)
+  }
+
+  /**
+   * Creates a new catalog table with the given hint options,
+   * but return the original catalog table if the given hint options is empty.
+   */
+  private def createCatalogTableWithHints(hintedOptions: JMap[String, 
String]): CatalogTable = {
+    if (hintedOptions.nonEmpty) {
+      catalogTable.copy(FlinkHints.mergeTableOptions(hintedOptions, 
catalogTable.getOptions))
+    } else {
+      catalogTable
+    }
+  }
+
+  /**
+   * Infers whether the current options is using legacy [[TableSource]].
+   */
+  private def isLegacyOptions(
+      hintedOptions: JMap[String, String],
+      conf: ReadableConfig): Boolean = {
+    val newCatalogTable = createCatalogTableWithHints(hintedOptions)
+    if 
(newCatalogTable.getOptions.contains(ConnectorDescriptorValidator.CONNECTOR_TYPE))
 {
+      true
+    } else {
+      // try to create legacy table source using the options,
+      // some legacy factories uses the new 'connector' key
+      try {
+        findAndCreateLegacyTableSource(hintedOptions, conf)
+        // success, then we will use the legacy factories
+        true
+      } catch {
+        case _: Throwable =>

Review comment:
       Should we use the connector.property-version to identify if the 
connector is new or not ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to