wuchong commented on a change in pull request #11985:
URL: https://github.com/apache/flink/pull/11985#discussion_r422013035



##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
##########
@@ -226,4 +176,89 @@ class CatalogSourceTable[T](
     }
     factory.buildRelNodeRowType(fieldNames, fieldTypes)
   }
+
+  /**
+   * Returns true if there is any generated columns defined on the catalog 
table.
+   */
+  private def containsGeneratedColumns(catalogTable: CatalogTable): Boolean = {
+    catalogTable.getSchema.getTableColumns.exists(_.isGenerated)
+  }
+
+  /**
+   * Creates a new catalog table with the given hint options,
+   * but return the original catalog table if the given hint options is empty.
+   */
+  private def createCatalogTableWithHints(hintedOptions: JMap[String, 
String]): CatalogTable = {
+    if (hintedOptions.nonEmpty) {
+      catalogTable.copy(FlinkHints.mergeTableOptions(hintedOptions, 
catalogTable.getOptions))
+    } else {
+      catalogTable
+    }
+  }
+
+  private def validateTableSource(tableSource: DynamicTableSource): Unit = {
+    // validation
+    val unsupportedAbilities = List(
+      classOf[SupportsProjectionPushDown],
+      classOf[SupportsFilterPushDown],
+      classOf[SupportsLimitPushDown],
+      classOf[SupportsPartitionPushDown],
+      classOf[SupportsComputedColumnPushDown],
+      classOf[SupportsWatermarkPushDown]
+    )
+    val tableName = schemaTable.getTableIdentifier.asSummaryString
+    tableSource match {
+      case ts: ScanTableSource =>
+        val changelogMode = ts.getChangelogMode
+        if (!schemaTable.isStreamingMode) {
+          // batch only supports bounded source
+          val provider = 
ts.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE)
+          if (!provider.isBounded) {
+            throw new ValidationException("Cannot query on an unbounded source 
in batch mode, " +
+              s"but '$tableName' is unbounded.")
+          }
+          // batch only supports INSERT only source
+          if (!changelogMode.containsOnly(RowKind.INSERT)) {
+            throw new UnsupportedOperationException(
+              "Currently, batch mode only supports INSERT only source, but " +
+              s"'$tableName' source produces not INSERT only messages")
+          }
+        } else {
+          // sanity check for produced ChangelogMode
+          val hasUpdateBefore = changelogMode.contains(RowKind.UPDATE_BEFORE)
+          val hasUpdateAfter = changelogMode.contains(RowKind.UPDATE_AFTER)
+          (hasUpdateBefore, hasUpdateAfter) match {
+            case (true, true) =>
+              // UPDATE_BEFORE and UPDATE_AFTER, pass
+            case (false, true) =>
+              // only UPDATE_AFTER

Review comment:
       Do you mean in `UpdateKindTrait`? It's not similar. There is not much 
code can be shared between them. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to