Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/4710#discussion_r140775690 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala --- @@ -47,34 +46,18 @@ class FlinkLogicalTableSourceScan( } override def deriveRowType(): RelDataType = { - val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] - - val fieldNames = TableEnvironment.getFieldNames(tableSource).toList - val fieldTypes = TableEnvironment.getFieldTypes(tableSource.getReturnType).toList - val fields = fieldNames.zip(fieldTypes) - - val withRowtime = tableSource match { - case timeSource: DefinedRowtimeAttribute if timeSource.getRowtimeAttribute != null => - val rowtimeAttribute = timeSource.getRowtimeAttribute - fields :+ (rowtimeAttribute, TimeIndicatorTypeInfo.ROWTIME_INDICATOR) - case _ => - fields - } + val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] - val withProctime = tableSource match { - case timeSource: DefinedProctimeAttribute if timeSource.getProctimeAttribute != null => - val proctimeAttribute = timeSource.getProctimeAttribute - withRowtime :+ (proctimeAttribute, TimeIndicatorTypeInfo.PROCTIME_INDICATOR) - case _ => - withRowtime + tableSource match { + case s: StreamTableSource[_] => + StreamTableSourceTable.deriveRowTypeOfTableSource(s, flinkTypeFactory) + case b: BatchTableSource[_] => --- End diff -- Replace `b` with `_` to remove IDE warning.
---