Repository: flink
Updated Branches:
  refs/heads/master 2d393e882 -> 2b82578ab


[FLINK-7596] [table] Fix bug during Set Operation (Union, Minus ... ) with 
Any(GenericRelDataType)

This closes #4658.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/62ebda3f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/62ebda3f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/62ebda3f

Branch: refs/heads/master
Commit: 62ebda3f99f0d311676e98d6ae1e48c2322a39cb
Parents: 2d393e8
Author: Xpray <leonxp...@gmail.com>
Authored: Fri Sep 15 08:24:17 2017 +0800
Committer: twalthr <twal...@apache.org>
Committed: Tue Sep 26 16:38:33 2017 +0200

----------------------------------------------------------------------
 .../flink/table/calcite/FlinkTypeFactory.scala  | 37 ++++++++++++++++++++
 .../stream/table/SetOperatorsITCase.scala       | 17 +++++++++
 2 files changed, 54 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/62ebda3f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index 637e8cc..449b198 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -18,6 +18,9 @@
 
 package org.apache.flink.table.calcite
 
+import java.util
+import java.util.List
+
 import org.apache.calcite.avatica.util.TimeUnit
 import org.apache.calcite.jdbc.JavaTypeFactoryImpl
 import org.apache.calcite.rel.`type`._
@@ -244,6 +247,40 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) 
extends JavaTypeFactoryImp
 
     canonize(newType)
   }
+
+  private def resolveAnySqlType(types: java.util.List[RelDataType]): 
RelDataType = {
+    val hasAny = types.asScala.map(_.getSqlTypeName).exists(_ == 
SqlTypeName.ANY)
+    val nullable = types.asScala.exists(
+      sqlType => sqlType.isNullable || sqlType.getSqlTypeName == 
SqlTypeName.NULL
+    )
+    if (hasAny) {
+      if (types.get(0).isInstanceOf[GenericRelDataType] &&
+        types.get(1).isInstanceOf[GenericRelDataType]) {
+        createTypeWithNullability(types.get(0), nullable)
+      } else {
+        throw new RuntimeException("only GenericRelDataType of ANY is 
supported")
+      }
+    } else {
+      null
+    }
+  }
+
+  override def leastRestrictive(types: util.List[RelDataType]): RelDataType = {
+    assert(types != null)
+    assert(types.size >= 1)
+    val type0 = types.get(0)
+    if (type0.getSqlTypeName != null) {
+      val resultType = resolveAnySqlType(types)
+      if (resultType != null) {
+        resultType
+      } else {
+        super.leastRestrictive(types)
+      }
+    } else {
+      super.leastRestrictive(types)
+    }
+  }
+
 }
 
 object FlinkTypeFactory {

http://git-wip-us.apache.org/repos/asf/flink/blob/62ebda3f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala
index 688849e..2333c92 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala
@@ -70,4 +70,21 @@ class SetOperatorsITCase extends 
StreamingMultipleProgramsTestBase {
     val expected = mutable.MutableList("Hi", "Hallo")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
+
+  @Test
+  def testUnionWithAnyType(): Unit = {
+    val list = List((1, new NODE), (2, new NODE))
+    val list2 = List((3, new NODE), (4, new NODE))
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val s1 = tEnv.fromDataStream(env.fromCollection(list))
+    val s2 = tEnv.fromDataStream(env.fromCollection(list2))
+    val result = s1.unionAll(s2).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+  }
+
+  class NODE {
+    val x = new java.util.HashMap[String, String]()
+  }
 }

Reply via email to