[FLINK-7596] [table] Restrict equality and improve tests
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2b82578a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2b82578a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2b82578a Branch: refs/heads/master Commit: 2b82578abec20eb97bc0d641ca19977cc3805c9e Parents: 62ebda3 Author: twalthr <twal...@apache.org> Authored: Tue Sep 26 16:37:31 2017 +0200 Committer: twalthr <twal...@apache.org> Committed: Tue Sep 26 16:49:10 2017 +0200 ---------------------------------------------------------------------- .../flink/table/calcite/FlinkTypeFactory.scala | 45 +++++++++----------- .../stream/table/SetOperatorsITCase.scala | 16 ++++--- 2 files changed, 31 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2b82578a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala index 449b198..1cc9f6b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala @@ -19,7 +19,6 @@ package org.apache.flink.table.calcite import java.util -import java.util.List import org.apache.calcite.avatica.util.TimeUnit import org.apache.calcite.jdbc.JavaTypeFactoryImpl @@ -248,39 +247,35 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp canonize(newType) } - private def resolveAnySqlType(types: java.util.List[RelDataType]): RelDataType = { - val hasAny = types.asScala.map(_.getSqlTypeName).exists(_ == SqlTypeName.ANY) - val nullable = types.asScala.exists( - sqlType => sqlType.isNullable || sqlType.getSqlTypeName == SqlTypeName.NULL - ) - if (hasAny) { - if (types.get(0).isInstanceOf[GenericRelDataType] && - types.get(1).isInstanceOf[GenericRelDataType]) { - createTypeWithNullability(types.get(0), nullable) - } else { - throw new RuntimeException("only GenericRelDataType of ANY is supported") - } - } else { - null - } - } - override def leastRestrictive(types: util.List[RelDataType]): RelDataType = { - assert(types != null) - assert(types.size >= 1) val type0 = types.get(0) if (type0.getSqlTypeName != null) { - val resultType = resolveAnySqlType(types) + val resultType = resolveAny(types) if (resultType != null) { - resultType + return resultType + } + } + super.leastRestrictive(types) + } + + private def resolveAny(types: util.List[RelDataType]): RelDataType = { + val allTypes = types.asScala + val hasAny = allTypes.exists(_.getSqlTypeName == SqlTypeName.ANY) + if (hasAny) { + val head = allTypes.head + // only allow ANY with exactly the same GenericRelDataType for all types + if (allTypes.forall(_ == head)) { + val nullable = allTypes.exists( + sqlType => sqlType.isNullable || sqlType.getSqlTypeName == SqlTypeName.NULL + ) + createTypeWithNullability(head, nullable) } else { - super.leastRestrictive(types) + throw TableException("Generic ANY types must have a common type information.") } } else { - super.leastRestrictive(types) + null } } - } object FlinkTypeFactory { http://git-wip-us.apache.org/repos/asf/flink/blob/2b82578a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala index 2333c92..cf195a5 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala @@ -73,18 +73,24 @@ class SetOperatorsITCase extends StreamingMultipleProgramsTestBase { @Test def testUnionWithAnyType(): Unit = { - val list = List((1, new NODE), (2, new NODE)) - val list2 = List((3, new NODE), (4, new NODE)) val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) - val s1 = tEnv.fromDataStream(env.fromCollection(list)) - val s2 = tEnv.fromDataStream(env.fromCollection(list2)) + + StreamITCase.testResults = mutable.MutableList() + val s1 = env.fromElements((1, new NonPojo), (2, new NonPojo)).toTable(tEnv, 'a, 'b) + val s2 = env.fromElements((3, new NonPojo), (4, new NonPojo)).toTable(tEnv, 'a, 'b) + val result = s1.unionAll(s2).toAppendStream[Row] result.addSink(new StreamITCase.StringSink[Row]) env.execute() + + val expected = mutable.MutableList("1,{}", "2,{}", "3,{}", "4,{}") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) } - class NODE { + class NonPojo { val x = new java.util.HashMap[String, String]() + + override def toString: String = x.toString } }