Repository: flink
Updated Branches:
  refs/heads/release-1.3 9a8b515c0 -> 314ca04b3


[FLINK-7596] [table] Fix bug during Set Operation (Union, Minus ... ) with 
Any(GenericRelDataType)

This closes #4658.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/701be5d5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/701be5d5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/701be5d5

Branch: refs/heads/release-1.3
Commit: 701be5d53c9a6531089d2bba2b4633513713e5ec
Parents: 9a8b515
Author: Xpray <leonxp...@gmail.com>
Authored: Fri Sep 15 08:24:17 2017 +0800
Committer: twalthr <twal...@apache.org>
Committed: Tue Sep 26 16:52:17 2017 +0200

----------------------------------------------------------------------
 .../flink/table/calcite/FlinkTypeFactory.scala  | 37 ++++++++++++++++++++
 .../datastream/DataStreamCalcITCase.scala       | 17 +++++++++
 2 files changed, 54 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/701be5d5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index 6c23b9e..82dd390 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -18,6 +18,9 @@
 
 package org.apache.flink.table.calcite
 
+import java.util
+import java.util.List
+
 import org.apache.calcite.avatica.util.TimeUnit
 import org.apache.calcite.jdbc.JavaTypeFactoryImpl
 import org.apache.calcite.rel.`type`._
@@ -269,6 +272,40 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) 
extends JavaTypeFactoryImp
 
     canonize(newType)
   }
+
+  private def resolveAnySqlType(types: java.util.List[RelDataType]): 
RelDataType = {
+    val hasAny = types.asScala.map(_.getSqlTypeName).exists(_ == 
SqlTypeName.ANY)
+    val nullable = types.asScala.exists(
+      sqlType => sqlType.isNullable || sqlType.getSqlTypeName == 
SqlTypeName.NULL
+    )
+    if (hasAny) {
+      if (types.get(0).isInstanceOf[GenericRelDataType] &&
+        types.get(1).isInstanceOf[GenericRelDataType]) {
+        createTypeWithNullability(types.get(0), nullable)
+      } else {
+        throw new RuntimeException("only GenericRelDataType of ANY is 
supported")
+      }
+    } else {
+      null
+    }
+  }
+
+  override def leastRestrictive(types: util.List[RelDataType]): RelDataType = {
+    assert(types != null)
+    assert(types.size >= 1)
+    val type0 = types.get(0)
+    if (type0.getSqlTypeName != null) {
+      val resultType = resolveAnySqlType(types)
+      if (resultType != null) {
+        resultType
+      } else {
+        super.leastRestrictive(types)
+      }
+    } else {
+      super.leastRestrictive(types)
+    }
+  }
+
 }
 
 object FlinkTypeFactory {

http://git-wip-us.apache.org/repos/asf/flink/blob/701be5d5/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala
index dcd7800..c37a728 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala
@@ -78,4 +78,21 @@ class DataStreamCalcITCase extends 
StreamingMultipleProgramsTestBase {
     val expected = mutable.MutableList("Hello", "Hello world")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
+
+  @Test
+  def testUnionWithAnyType(): Unit = {
+    val list = List((1, new NODE), (2, new NODE))
+    val list2 = List((3, new NODE), (4, new NODE))
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val s1 = tEnv.fromDataStream(env.fromCollection(list))
+    val s2 = tEnv.fromDataStream(env.fromCollection(list2))
+    val result = s1.unionAll(s2).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+  }
+
+  class NODE {
+    val x = new java.util.HashMap[String, String]()
+  }
 }

Reply via email to