This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0563a35abe9f7506566e7bdb59af6da18547e662 Author: BenoƮt Paris <benoit.paris....@gmail.com> AuthorDate: Tue Jan 21 11:59:49 2020 +0100 [FLINK-15726] [Blink Planner] [hotfix] Fix error message in StreamExecTableSourceScan & BatchExecTableSourceScan This closes #10914 --- .../planner/plan/nodes/physical/batch/BatchExecTableSourceScan.scala | 2 +- .../plan/nodes/physical/stream/StreamExecTableSourceScan.scala | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecTableSourceScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecTableSourceScan.scala index 38387c4..f5a222b 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecTableSourceScan.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecTableSourceScan.scala @@ -103,7 +103,7 @@ class BatchExecTableSourceScan( // check that declared and actual type of table source DataStream are identical if (inputDataType != TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(producedDataType)) { throw new TableException(s"TableSource of type ${tableSource.getClass.getCanonicalName} " + - s"returned a DataStream of data type $producedDataType that does not match with the " + + s"returned a DataStream of data type $inputDataType that does not match with the " + s"data type $producedDataType declared by the TableSource.getProducedDataType() method. " + s"Please validate the implementation of the TableSource.") } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala index fa9d505..4963e52 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala @@ -106,13 +106,14 @@ class StreamExecTableSourceScan( val fieldIndexes = computeIndexMapping() + val inputDataType = inputTransform.getOutputType val producedDataType = tableSource.getProducedDataType // check that declared and actual type of table source DataStream are identical - if (inputTransform.getOutputType != + if (inputDataType != TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(producedDataType)) { throw new TableException(s"TableSource of type ${tableSource.getClass.getCanonicalName} " + - s"returned a DataStream of data type $producedDataType that does not match with the " + + s"returned a DataStream of data type $inputDataType that does not match with the " + s"data type $producedDataType declared by the TableSource.getProducedDataType() method. " + s"Please validate the implementation of the TableSource.") }