This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0563a35abe9f7506566e7bdb59af6da18547e662
Author: BenoƮt Paris <benoit.paris....@gmail.com>
AuthorDate: Tue Jan 21 11:59:49 2020 +0100

    [FLINK-15726] [Blink Planner] [hotfix] Fix error message in
    StreamExecTableSourceScan & BatchExecTableSourceScan
    
    This closes #10914
---
 .../planner/plan/nodes/physical/batch/BatchExecTableSourceScan.scala | 2 +-
 .../plan/nodes/physical/stream/StreamExecTableSourceScan.scala       | 5 +++--
 2 files changed, 4 insertions(+), 3 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
index 38387c4..f5a222b 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecTableSourceScan.scala
@@ -103,7 +103,7 @@ class BatchExecTableSourceScan(
     // check that declared and actual type of table source DataStream are 
identical
     if (inputDataType != 
TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(producedDataType)) {
       throw new TableException(s"TableSource of type 
${tableSource.getClass.getCanonicalName} " +
-        s"returned a DataStream of data type $producedDataType that does not 
match with the " +
+        s"returned a DataStream of data type $inputDataType that does not 
match with the " +
         s"data type $producedDataType declared by the 
TableSource.getProducedDataType() method. " +
         s"Please validate the implementation of the TableSource.")
     }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
index fa9d505..4963e52 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala
@@ -106,13 +106,14 @@ class StreamExecTableSourceScan(
 
     val fieldIndexes = computeIndexMapping()
 
+    val inputDataType = inputTransform.getOutputType
     val producedDataType = tableSource.getProducedDataType
 
     // check that declared and actual type of table source DataStream are 
identical
-    if (inputTransform.getOutputType !=
+    if (inputDataType !=
         TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(producedDataType)) {
       throw new TableException(s"TableSource of type 
${tableSource.getClass.getCanonicalName} " +
-        s"returned a DataStream of data type $producedDataType that does not 
match with the " +
+        s"returned a DataStream of data type $inputDataType that does not 
match with the " +
         s"data type $producedDataType declared by the 
TableSource.getProducedDataType() method. " +
         s"Please validate the implementation of the TableSource.")
     }

Reply via email to