Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6323#discussion_r202507057 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala --- @@ -52,12 +52,9 @@ class FlinkLogicalTableSourceScan( override def deriveRowType(): RelDataType = { val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] val streamingTable = table.unwrap(classOf[TableSourceSinkTable[_, _]]) match { - case t: TableSourceSinkTable[_, _] => t.tableSourceTableOpt match { - case Some(_: StreamTableSourceTable[_]) => true - case Some(_: BatchTableSourceTable[_]) => false - case _ => throw TableException(s"Unknown Table type ${t.getClass}.") - } - case t => throw TableException(s"Unknown Table type ${t.getClass}.") + case t: TableSourceSinkTable[_, _] if t.isStreamSourceTable => true + // null --- End diff -- This information is useful.
---