Repository: flink Updated Branches: refs/heads/master 2d393e882 -> 2b82578ab
[FLINK-7596] [table] Fix bug during Set Operation (Union, Minus ... ) with Any(GenericRelDataType) This closes #4658. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/62ebda3f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/62ebda3f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/62ebda3f Branch: refs/heads/master Commit: 62ebda3f99f0d311676e98d6ae1e48c2322a39cb Parents: 2d393e8 Author: Xpray <leonxp...@gmail.com> Authored: Fri Sep 15 08:24:17 2017 +0800 Committer: twalthr <twal...@apache.org> Committed: Tue Sep 26 16:38:33 2017 +0200 ---------------------------------------------------------------------- .../flink/table/calcite/FlinkTypeFactory.scala | 37 ++++++++++++++++++++ .../stream/table/SetOperatorsITCase.scala | 17 +++++++++ 2 files changed, 54 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/62ebda3f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala index 637e8cc..449b198 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala @@ -18,6 +18,9 @@ package org.apache.flink.table.calcite +import java.util +import java.util.List + import org.apache.calcite.avatica.util.TimeUnit import org.apache.calcite.jdbc.JavaTypeFactoryImpl import org.apache.calcite.rel.`type`._ @@ -244,6 +247,40 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp canonize(newType) } + + private def resolveAnySqlType(types: java.util.List[RelDataType]): RelDataType = { + val hasAny = types.asScala.map(_.getSqlTypeName).exists(_ == SqlTypeName.ANY) + val nullable = types.asScala.exists( + sqlType => sqlType.isNullable || sqlType.getSqlTypeName == SqlTypeName.NULL + ) + if (hasAny) { + if (types.get(0).isInstanceOf[GenericRelDataType] && + types.get(1).isInstanceOf[GenericRelDataType]) { + createTypeWithNullability(types.get(0), nullable) + } else { + throw new RuntimeException("only GenericRelDataType of ANY is supported") + } + } else { + null + } + } + + override def leastRestrictive(types: util.List[RelDataType]): RelDataType = { + assert(types != null) + assert(types.size >= 1) + val type0 = types.get(0) + if (type0.getSqlTypeName != null) { + val resultType = resolveAnySqlType(types) + if (resultType != null) { + resultType + } else { + super.leastRestrictive(types) + } + } else { + super.leastRestrictive(types) + } + } + } object FlinkTypeFactory { http://git-wip-us.apache.org/repos/asf/flink/blob/62ebda3f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala index 688849e..2333c92 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala @@ -70,4 +70,21 @@ class SetOperatorsITCase extends StreamingMultipleProgramsTestBase { val expected = mutable.MutableList("Hi", "Hallo") assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + + @Test + def testUnionWithAnyType(): Unit = { + val list = List((1, new NODE), (2, new NODE)) + val list2 = List((3, new NODE), (4, new NODE)) + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + val s1 = tEnv.fromDataStream(env.fromCollection(list)) + val s2 = tEnv.fromDataStream(env.fromCollection(list2)) + val result = s1.unionAll(s2).toAppendStream[Row] + result.addSink(new StreamITCase.StringSink[Row]) + env.execute() + } + + class NODE { + val x = new java.util.HashMap[String, String]() + } }