[FLINK-7596] [table] Restrict equality and improve tests

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2b82578a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2b82578a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2b82578a

Branch: refs/heads/master
Commit: 2b82578abec20eb97bc0d641ca19977cc3805c9e
Parents: 62ebda3
Author: twalthr <twal...@apache.org>
Authored: Tue Sep 26 16:37:31 2017 +0200
Committer: twalthr <twal...@apache.org>
Committed: Tue Sep 26 16:49:10 2017 +0200

----------------------------------------------------------------------
 .../flink/table/calcite/FlinkTypeFactory.scala  | 45 +++++++++-----------
 .../stream/table/SetOperatorsITCase.scala       | 16 ++++---
 2 files changed, 31 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2b82578a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index 449b198..1cc9f6b 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -19,7 +19,6 @@
 package org.apache.flink.table.calcite
 
 import java.util
-import java.util.List
 
 import org.apache.calcite.avatica.util.TimeUnit
 import org.apache.calcite.jdbc.JavaTypeFactoryImpl
@@ -248,39 +247,35 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) 
extends JavaTypeFactoryImp
     canonize(newType)
   }
 
-  private def resolveAnySqlType(types: java.util.List[RelDataType]): 
RelDataType = {
-    val hasAny = types.asScala.map(_.getSqlTypeName).exists(_ == 
SqlTypeName.ANY)
-    val nullable = types.asScala.exists(
-      sqlType => sqlType.isNullable || sqlType.getSqlTypeName == 
SqlTypeName.NULL
-    )
-    if (hasAny) {
-      if (types.get(0).isInstanceOf[GenericRelDataType] &&
-        types.get(1).isInstanceOf[GenericRelDataType]) {
-        createTypeWithNullability(types.get(0), nullable)
-      } else {
-        throw new RuntimeException("only GenericRelDataType of ANY is 
supported")
-      }
-    } else {
-      null
-    }
-  }
-
   override def leastRestrictive(types: util.List[RelDataType]): RelDataType = {
-    assert(types != null)
-    assert(types.size >= 1)
     val type0 = types.get(0)
     if (type0.getSqlTypeName != null) {
-      val resultType = resolveAnySqlType(types)
+      val resultType = resolveAny(types)
       if (resultType != null) {
-        resultType
+        return resultType
+      }
+    }
+    super.leastRestrictive(types)
+  }
+
+  private def resolveAny(types: util.List[RelDataType]): RelDataType = {
+    val allTypes = types.asScala
+    val hasAny = allTypes.exists(_.getSqlTypeName == SqlTypeName.ANY)
+    if (hasAny) {
+      val head = allTypes.head
+      // only allow ANY with exactly the same GenericRelDataType for all types
+      if (allTypes.forall(_ == head)) {
+        val nullable = allTypes.exists(
+          sqlType => sqlType.isNullable || sqlType.getSqlTypeName == 
SqlTypeName.NULL
+        )
+        createTypeWithNullability(head, nullable)
       } else {
-        super.leastRestrictive(types)
+        throw TableException("Generic ANY types must have a common type 
information.")
       }
     } else {
-      super.leastRestrictive(types)
+      null
     }
   }
-
 }
 
 object FlinkTypeFactory {

http://git-wip-us.apache.org/repos/asf/flink/blob/2b82578a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala
index 2333c92..cf195a5 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala
@@ -73,18 +73,24 @@ class SetOperatorsITCase extends 
StreamingMultipleProgramsTestBase {
 
   @Test
   def testUnionWithAnyType(): Unit = {
-    val list = List((1, new NODE), (2, new NODE))
-    val list2 = List((3, new NODE), (4, new NODE))
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env)
-    val s1 = tEnv.fromDataStream(env.fromCollection(list))
-    val s2 = tEnv.fromDataStream(env.fromCollection(list2))
+
+    StreamITCase.testResults = mutable.MutableList()
+    val s1 = env.fromElements((1, new NonPojo), (2, new 
NonPojo)).toTable(tEnv, 'a, 'b)
+    val s2 = env.fromElements((3, new NonPojo), (4, new 
NonPojo)).toTable(tEnv, 'a, 'b)
+
     val result = s1.unionAll(s2).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
+
+    val expected = mutable.MutableList("1,{}", "2,{}", "3,{}", "4,{}")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
-  class NODE {
+  class NonPojo {
     val x = new java.util.HashMap[String, String]()
+
+    override def toString: String = x.toString
   }
 }

Reply via email to