Repository: flink Updated Branches: refs/heads/release-1.3 9a8b515c0 -> 314ca04b3
[FLINK-7596] [table] Fix bug during Set Operation (Union, Minus ... ) with Any(GenericRelDataType) This closes #4658. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/701be5d5 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/701be5d5 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/701be5d5 Branch: refs/heads/release-1.3 Commit: 701be5d53c9a6531089d2bba2b4633513713e5ec Parents: 9a8b515 Author: Xpray <leonxp...@gmail.com> Authored: Fri Sep 15 08:24:17 2017 +0800 Committer: twalthr <twal...@apache.org> Committed: Tue Sep 26 16:52:17 2017 +0200 ---------------------------------------------------------------------- .../flink/table/calcite/FlinkTypeFactory.scala | 37 ++++++++++++++++++++ .../datastream/DataStreamCalcITCase.scala | 17 +++++++++ 2 files changed, 54 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/701be5d5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala index 6c23b9e..82dd390 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala @@ -18,6 +18,9 @@ package org.apache.flink.table.calcite +import java.util +import java.util.List + import org.apache.calcite.avatica.util.TimeUnit import org.apache.calcite.jdbc.JavaTypeFactoryImpl import org.apache.calcite.rel.`type`._ @@ -269,6 +272,40 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp canonize(newType) } + + private def resolveAnySqlType(types: java.util.List[RelDataType]): RelDataType = { + val hasAny = types.asScala.map(_.getSqlTypeName).exists(_ == SqlTypeName.ANY) + val nullable = types.asScala.exists( + sqlType => sqlType.isNullable || sqlType.getSqlTypeName == SqlTypeName.NULL + ) + if (hasAny) { + if (types.get(0).isInstanceOf[GenericRelDataType] && + types.get(1).isInstanceOf[GenericRelDataType]) { + createTypeWithNullability(types.get(0), nullable) + } else { + throw new RuntimeException("only GenericRelDataType of ANY is supported") + } + } else { + null + } + } + + override def leastRestrictive(types: util.List[RelDataType]): RelDataType = { + assert(types != null) + assert(types.size >= 1) + val type0 = types.get(0) + if (type0.getSqlTypeName != null) { + val resultType = resolveAnySqlType(types) + if (resultType != null) { + resultType + } else { + super.leastRestrictive(types) + } + } else { + super.leastRestrictive(types) + } + } + } object FlinkTypeFactory { http://git-wip-us.apache.org/repos/asf/flink/blob/701be5d5/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala index dcd7800..c37a728 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamCalcITCase.scala @@ -78,4 +78,21 @@ class DataStreamCalcITCase extends StreamingMultipleProgramsTestBase { val expected = mutable.MutableList("Hello", "Hello world") assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + + @Test + def testUnionWithAnyType(): Unit = { + val list = List((1, new NODE), (2, new NODE)) + val list2 = List((3, new NODE), (4, new NODE)) + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + val s1 = tEnv.fromDataStream(env.fromCollection(list)) + val s2 = tEnv.fromDataStream(env.fromCollection(list2)) + val result = s1.unionAll(s2).toAppendStream[Row] + result.addSink(new StreamITCase.StringSink[Row]) + env.execute() + } + + class NODE { + val x = new java.util.HashMap[String, String]() + } }