wuchong commented on a change in pull request #11985:
URL: https://github.com/apache/flink/pull/11985#discussion_r421914394



##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
##########
@@ -226,4 +251,113 @@ class CatalogSourceTable[T](
     }
     factory.buildRelNodeRowType(fieldNames, fieldTypes)
   }
+
+  /**
+   * Returns true if there is any generated columns defined on the catalog 
table.
+   */
+  private def containsGeneratedColumns(catalogTable: CatalogTable): Boolean = {
+    catalogTable.getSchema.getTableColumns.exists(_.isGenerated)
+  }
+
+  /**
+   * Creates a new catalog table with the given hint options,
+   * but return the original catalog table if the given hint options is empty.
+   */
+  private def createCatalogTableWithHints(hintedOptions: JMap[String, 
String]): CatalogTable = {
+    if (hintedOptions.nonEmpty) {
+      catalogTable.copy(FlinkHints.mergeTableOptions(hintedOptions, 
catalogTable.getOptions))
+    } else {
+      catalogTable
+    }
+  }
+
+  /**
+   * Infers whether the current options is using legacy [[TableSource]].
+   */
+  private def isLegacyOptions(
+      hintedOptions: JMap[String, String],
+      conf: ReadableConfig): Boolean = {
+    val newCatalogTable = createCatalogTableWithHints(hintedOptions)
+    if 
(newCatalogTable.getOptions.contains(ConnectorDescriptorValidator.CONNECTOR_TYPE))
 {
+      true
+    } else {
+      // try to create legacy table source using the options,
+      // some legacy factories uses the new 'connector' key
+      try {
+        findAndCreateLegacyTableSource(hintedOptions, conf)
+        // success, then we will use the legacy factories
+        true
+      } catch {
+        case _: Throwable =>
+          // can't create, then we will use new factories
+          false
+      }
+    }
+  }
+
+  private def validateTableSource(tableSource: DynamicTableSource): Unit = {
+    // validation
+    val unsupportedAbilities = List(
+      classOf[SupportsProjectionPushDown],
+      classOf[SupportsFilterPushDown],
+      classOf[SupportsLimitPushDown],
+      classOf[SupportsPartitionPushDown],
+      classOf[SupportsComputedColumnPushDown],
+      classOf[SupportsWatermarkPushDown]
+    )
+    val tableName = schemaTable.getTableIdentifier.asSummaryString
+    tableSource match {
+      case ts: ScanTableSource =>
+        val changelogMode = ts.getChangelogMode
+        if (!schemaTable.isStreamingMode) {
+          // batch only supports bounded source
+          val provider = 
ts.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE)
+          if (!provider.isBounded) {
+            throw new ValidationException("Cannot query on an unbounded source 
in batch mode, " +
+              s"but '$tableName' is unbounded.")
+          }
+          // batch only supports INSERT only source
+          if (!changelogMode.containsOnly(RowKind.INSERT)) {
+            throw new UnsupportedOperationException(
+              "Currently, batch mode only supports INSERT only source, but " +
+              s"'$tableName' source produces not INSERT only messages")
+          }
+        } else {
+          // sanity check for produced ChangelogMode
+          val hasUpdateBefore = changelogMode.contains(RowKind.UPDATE_BEFORE)
+          val hasUpdateAfter = changelogMode.contains(RowKind.UPDATE_AFTER)
+          (hasUpdateBefore, hasUpdateAfter) match {
+            case (true, true) =>
+              // UPDATE_BEFORE and UPDATE_AFTER, pass
+            case (false, true) =>
+              // only UPDATE_AFTER
+              throw new UnsupportedOperationException(
+                "Currently, ScanTableSource doesn't support producing 
ChangelogMode " +
+                  "which contains UPDATE_AFTER but no UPDATE_BEFORE. Please 
adapt the " +
+                  s"implementation of '${ts.asSummaryString()}' source.")
+            case (true, false) =>
+               // only UPDATE_BEFORE
+              throw new ValidationException(
+                s"'$tableName' source produces ChangelogMode which " +
+                  s"contains UPDATE_BEFORE but doesn't contain UPDATE_AFTER, 
this is invalid.")
+            case _ =>
+              // no updates, pass
+          }
+
+          // watermark defined on a changelog source is not supported
+          if (!catalogTable.getSchema.getWatermarkSpecs.isEmpty &&
+              !changelogMode.containsOnly(RowKind.INSERT)) {
+            throw new UnsupportedOperationException(
+              "Currently, defining WATERMARK on a changelog source is not 
supported.")
+          }
+        }
+      case _ =>
+        unsupportedAbilities.foreach { ability =>
+          if (tableSource.getClass.isAssignableFrom(ability)) {

Review comment:
       No. We don't support any of the ability interface. That's why we should 
throw exception to idnicate users they are not supported yet. Once one of the 
ability interface is supported, the developer should remove then entry from 
`unsupportedAbilities`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to