Repository: flink Updated Branches: refs/heads/master e5b65a7fc -> fe4e96a72
[FLINK-6033] [table] Add support for SQL UNNEST. This closes #3793. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fe4e96a7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fe4e96a7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fe4e96a7 Branch: refs/heads/master Commit: fe4e96a726dd32fb948db050b975312e120e2461 Parents: 9f2293c Author: Shuyi Chen <sh...@uber.com> Authored: Fri Apr 21 23:48:28 2017 -0700 Committer: Fabian Hueske <fhue...@apache.org> Committed: Sun May 7 13:32:26 2017 +0200 ---------------------------------------------------------------------- docs/dev/table_api.md | 2 + .../flink/table/calcite/FlinkTypeFactory.scala | 5 +- .../utils/UserDefinedFunctionUtils.scala | 6 +- .../flink/table/plan/nodes/FlinkRelNode.scala | 3 +- .../flink/table/plan/rules/FlinkRuleSets.scala | 3 + .../plan/rules/logical/LogicalUnnestRule.scala | 134 +++++++++++++++++++ .../table/plan/util/ExplodeFunctionUtil.scala | 91 +++++++++++++ .../flink/table/typeutils/TypeCheckUtils.scala | 4 +- .../table/api/scala/batch/sql/JoinITCase.scala | 54 ++++++++ .../table/api/scala/stream/sql/SqlITCase.scala | 90 +++++++++++++ 10 files changed, 388 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/fe4e96a7/docs/dev/table_api.md ---------------------------------------------------------------------- diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md index a77d994..d105188 100644 --- a/docs/dev/table_api.md +++ b/docs/dev/table_api.md @@ -1482,6 +1482,7 @@ val result2 = tableEnv.sql( #### Limitations Joins, set operations, and non-windowed aggregations are not supported yet. +`UNNEST` supports only arrays and does not support `WITH ORDINALITY` yet. {% top %} @@ -1690,6 +1691,7 @@ tableReference: tablePrimary: [ TABLE ] [ [ catalogName . ] schemaName . ] tableName | LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')' + | UNNEST '(' expression ')' values: VALUES expression [, expression ]* http://git-wip-us.apache.org/repos/asf/flink/blob/fe4e96a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala index 9281ad8..eba1623 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala @@ -26,7 +26,7 @@ import org.apache.calcite.sql.`type`.SqlTypeName._ import org.apache.calcite.sql.`type`.{BasicSqlType, SqlTypeName} import org.apache.calcite.sql.parser.SqlParserPos import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ -import org.apache.flink.api.common.typeinfo.{NothingTypeInfo, PrimitiveArrayTypeInfo, SqlTimeTypeInfo, TypeInformation} +import org.apache.flink.api.common.typeinfo._ import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.api.java.typeutils.ValueTypeInfo._ import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo, RowTypeInfo} @@ -180,6 +180,9 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp case pa: PrimitiveArrayTypeInfo[_] => new ArrayRelDataType(pa, createTypeFromTypeInfo(pa.getComponentType), false) + case ba: BasicArrayTypeInfo[_, _] => + new ArrayRelDataType(ba, createTypeFromTypeInfo(ba.getComponentInfo), true) + case oa: ObjectArrayTypeInfo[_, _] => new ArrayRelDataType(oa, createTypeFromTypeInfo(oa.getComponentInfo), true) http://git-wip-us.apache.org/repos/asf/flink/blob/fe4e96a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala index 689bf0e..11174de 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala @@ -436,7 +436,11 @@ object UserDefinedFunctionUtils { expected.isPrimitive && Primitives.wrap(expected) == candidate || candidate == classOf[Date] && (expected == classOf[Int] || expected == classOf[JInt]) || candidate == classOf[Time] && (expected == classOf[Int] || expected == classOf[JInt]) || - candidate == classOf[Timestamp] && (expected == classOf[Long] || expected == classOf[JLong]) + candidate == classOf[Timestamp] && (expected == classOf[Long] || expected == classOf[JLong]) || + (candidate.isArray && + expected.isArray && + candidate.getComponentType.isInstanceOf[Object] && + expected.getComponentType == classOf[Object]) @throws[Exception] def serialize(function: UserDefinedFunction): String = { http://git-wip-us.apache.org/repos/asf/flink/blob/fe4e96a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala index 0b244e9..8509a8e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala @@ -61,7 +61,8 @@ trait FlinkRelNode extends RelNode { val referenceExpr = getExpressionString(fa.getReferenceExpr, inFields, localExprsTable) val field = fa.getField.getName s"$referenceExpr.$field" - + case cv: RexCorrelVariable => + cv.toString case _ => throw new IllegalArgumentException(s"Unknown expression type '${expr.getClass}': $expr") } http://git-wip-us.apache.org/repos/asf/flink/blob/fe4e96a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala index fad60fd..980dfd3 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala @@ -100,6 +100,9 @@ object FlinkRuleSets { PushProjectIntoTableSourceScanRule.INSTANCE, PushFilterIntoTableSourceScanRule.INSTANCE, + // Unnest rule + LogicalUnnestRule.INSTANCE, + // translate to flink logical rel nodes FlinkLogicalAggregate.CONVERTER, FlinkLogicalWindowAggregate.CONVERTER, http://git-wip-us.apache.org/repos/asf/flink/blob/fe4e96a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalUnnestRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalUnnestRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalUnnestRule.scala new file mode 100644 index 0000000..f2d9f2a --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalUnnestRule.scala @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import java.util.Collections + +import com.google.common.collect.ImmutableList +import org.apache.calcite.plan.RelOptRule._ +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand} +import org.apache.calcite.rel.`type`.{RelDataTypeFieldImpl, RelRecordType, StructKind} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.core.Uncollect +import org.apache.calcite.rel.logical._ +import org.apache.calcite.sql.`type`.AbstractSqlType +import org.apache.flink.table.api.TableException +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils +import org.apache.flink.table.plan.schema.ArrayRelDataType +import org.apache.flink.table.plan.util.ExplodeFunctionUtil + +class LogicalUnnestRule( + operand: RelOptRuleOperand, + description: String) + extends RelOptRule(operand, description) { + + override def matches(call: RelOptRuleCall): Boolean = { + + val join: LogicalCorrelate = call.rel(0).asInstanceOf[LogicalCorrelate] + val right = join.getRight.asInstanceOf[RelSubset].getOriginal + + right match { + // a filter is pushed above the table function + case filter: LogicalFilter => + filter.getInput.asInstanceOf[RelSubset].getOriginal match { + case u: Uncollect => !u.withOrdinality + case _ => false + } + case u: Uncollect => !u.withOrdinality + case _ => false + } + } + + override def onMatch(call: RelOptRuleCall): Unit = { + val correlate = call.rel(0).asInstanceOf[LogicalCorrelate] + + val outer = correlate.getLeft.asInstanceOf[RelSubset].getOriginal + val array = correlate.getRight.asInstanceOf[RelSubset].getOriginal + + def convert(relNode: RelNode): RelNode = { + relNode match { + case rs: RelSubset => + convert(rs.getRelList.get(0)) + + case f: LogicalFilter => + f.copy( + f.getTraitSet, + ImmutableList.of(convert(f.getInput.asInstanceOf[RelSubset].getOriginal))) + + case uc: Uncollect => + // convert Uncollect into TableFunctionScan + val cluster = correlate.getCluster + + val arrayType = + uc.getInput.getRowType.getFieldList.get(0).getValue.asInstanceOf[ArrayRelDataType] + val componentType = arrayType.getComponentType + + // create table function + val explodeTableFunc = UserDefinedFunctionUtils.createTableSqlFunctions( + "explode", + ExplodeFunctionUtil.explodeTableFuncFromType(arrayType.typeInfo), + FlinkTypeFactory.toTypeInfo(arrayType.getComponentType), + cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]).head + + // create table function call + val rexCall = cluster.getRexBuilder.makeCall( + explodeTableFunc, + uc.getInput.asInstanceOf[RelSubset] + .getOriginal.asInstanceOf[LogicalProject].getChildExps + ) + + // determine rel data type of unnest + val rowType = componentType match { + case _: AbstractSqlType => + new RelRecordType( + StructKind.FULLY_QUALIFIED, + ImmutableList.of(new RelDataTypeFieldImpl("f0", 0, componentType))) + case _: RelRecordType => componentType + case _ => throw TableException( + s"Unsupported array component type in UNNEST: ${componentType.toString}") + } + + // create table function scan + new LogicalTableFunctionScan( + cluster, + correlate.getTraitSet, + Collections.emptyList(), + rexCall, + classOf[Array[Object]], + rowType, + null) + } + } + + // convert unnest into table function scan + val tableFunctionScan = convert(array) + // create correlate with table function scan as input + val newCorrleate = + correlate.copy(correlate.getTraitSet, ImmutableList.of(outer, tableFunctionScan)) + call.transformTo(newCorrleate) + } +} + +object LogicalUnnestRule { + val INSTANCE = new LogicalUnnestRule( + operand(classOf[LogicalCorrelate], any), + "LogicalUnnestRule") +} http://git-wip-us.apache.org/repos/asf/flink/blob/fe4e96a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala new file mode 100644 index 0000000..1bcc6d9 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.util + +import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, BasicTypeInfo, PrimitiveArrayTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo +import org.apache.flink.table.functions.TableFunction + +class ObjectExplodeTableFunc extends TableFunction[Object] { + def eval(arr: Array[Object]): Unit = { + arr.foreach(collect) + } +} + +class FloatExplodeTableFunc extends TableFunction[Float] { + def eval(arr: Array[Float]): Unit = { + arr.foreach(collect) + } +} + +class ShortExplodeTableFunc extends TableFunction[Short] { + def eval(arr: Array[Short]): Unit = { + arr.foreach(collect) + } +} +class IntExplodeTableFunc extends TableFunction[Int] { + def eval(arr: Array[Int]): Unit = { + arr.foreach(collect) + } +} + +class LongExplodeTableFunc extends TableFunction[Long] { + def eval(arr: Array[Long]): Unit = { + arr.foreach(collect) + } +} + +class DoubleExplodeTableFunc extends TableFunction[Double] { + def eval(arr: Array[Double]): Unit = { + arr.foreach(collect) + } +} + +class ByteExplodeTableFunc extends TableFunction[Byte] { + def eval(arr: Array[Byte]): Unit = { + arr.foreach(collect) + } +} + +class BooleanExplodeTableFunc extends TableFunction[Boolean] { + def eval(arr: Array[Boolean]): Unit = { + arr.foreach(collect) + } +} + +object ExplodeFunctionUtil { + def explodeTableFuncFromType(ti: TypeInformation[_]):TableFunction[_] = { + ti match { + case pat: PrimitiveArrayTypeInfo[_] => { + pat.getComponentType match { + case BasicTypeInfo.INT_TYPE_INFO => new IntExplodeTableFunc + case BasicTypeInfo.LONG_TYPE_INFO => new LongExplodeTableFunc + case BasicTypeInfo.SHORT_TYPE_INFO => new ShortExplodeTableFunc + case BasicTypeInfo.FLOAT_TYPE_INFO => new FloatExplodeTableFunc + case BasicTypeInfo.DOUBLE_TYPE_INFO => new DoubleExplodeTableFunc + case BasicTypeInfo.BYTE_TYPE_INFO => new ByteExplodeTableFunc + case BasicTypeInfo.BOOLEAN_TYPE_INFO => new BooleanExplodeTableFunc + } + } + case _: ObjectArrayTypeInfo[_, _] => new ObjectExplodeTableFunc + case _: BasicArrayTypeInfo[_, _] => new ObjectExplodeTableFunc + case _ => throw new UnsupportedOperationException(ti.toString + "IS NOT supported") + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/fe4e96a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala index 9896a8c..fea8c2a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala @@ -68,7 +68,9 @@ object TypeCheckUtils { def isLong(dataType: TypeInformation[_]): Boolean = dataType == LONG_TYPE_INFO def isArray(dataType: TypeInformation[_]): Boolean = dataType match { - case _: ObjectArrayTypeInfo[_, _] | _: PrimitiveArrayTypeInfo[_] => true + case _: ObjectArrayTypeInfo[_, _] | + _: PrimitiveArrayTypeInfo[_] | + _: BasicArrayTypeInfo[_, _] => true case _ => false } http://git-wip-us.apache.org/repos/asf/flink/blob/fe4e96a7/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala index 9df17ad..8a8c0ce 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala @@ -26,6 +26,7 @@ import org.apache.flink.api.scala.util.CollectionDataSets import org.apache.flink.table.api.{TableEnvironment, TableException, ValidationException} import org.apache.flink.test.util.TestBaseUtils import org.apache.flink.types.Row +import org.junit.Assert.assertEquals import org.junit._ import org.junit.runner.RunWith import org.junit.runners.Parameterized @@ -375,6 +376,59 @@ class JoinITCase( Assert.assertEquals(0, result) } + @Test + def testCrossWithUnnest(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val data = List( + (1, 1L, Array("Hi", "w")), + (2, 2L, Array("Hello", "k")), + (3, 2L, Array("Hello world", "x")) + ) + val stream = env.fromCollection(data) + tEnv.registerDataSet("T", stream, 'a, 'b, 'c) + + val sqlQuery = "SELECT a, s FROM T, UNNEST(T.c) as A (s)" + + val result = tEnv.sql(sqlQuery) + + val expected = List("1,Hi", "1,w", "2,Hello", "2,k", "3,Hello world", "3,x") + val results = result.toDataSet[Row].collect().toList + assertEquals(expected.toString(), results.sortWith(_.toString < _.toString).toString()) + } + + @Test + def testJoinWithUnnestOfTuple(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val data = List( + (1, Array((12, "45.6"), (2, "45.612"))), + (2, Array((13, "41.6"), (1, "45.2136"))), + (3, Array((18, "42.6"))) + ) + val stream = env.fromCollection(data) + tEnv.registerDataSet("T", stream, 'a, 'b) + + val sqlQuery = "" + + "SELECT a, b, x, y " + + "FROM " + + " (SELECT a, b FROM T WHERE a < 3) as tf, " + + " UNNEST(tf.b) as A (x, y) " + + "WHERE x > a" + + val result = tEnv.sql(sqlQuery) + + val expected = List( + "1,[(12,45.6), (2,45.612)],12,45.6", + "1,[(12,45.6), (2,45.612)],2,45.612", + "2,[(13,41.6), (1,45.2136)],13,41.6").mkString(", ") + val results = result.toDataSet[Row].collect().map(_.toString) + assertEquals(expected, results.sorted.mkString(", ")) + } + @Test(expected = classOf[TableException]) def testRightOuterJoinWithNonEquiJoinPredicate(): Unit = { http://git-wip-us.apache.org/repos/asf/flink/blob/fe4e96a7/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala index 95366e1..4147358 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala @@ -190,4 +190,94 @@ class SqlITCase extends StreamingWithStateTestBase { assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + @Test + def testUnnestPrimitiveArrayFromTable(): Unit = { + + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.clear + + val data = List( + (1, Array(12, 45), Array(Array(12, 45))), + (2, Array(41, 5), Array(Array(18), Array(87))), + (3, Array(18, 42), Array(Array(1), Array(45))) + ) + val stream = env.fromCollection(data) + tEnv.registerDataStream("T", stream, 'a, 'b, 'c) + + val sqlQuery = "SELECT a, b, s FROM T, UNNEST(T.b) AS A (s)" + + val result = tEnv.sql(sqlQuery).toDataStream[Row] + result.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = List( + "1,[12, 45],12", + "1,[12, 45],45", + "2,[41, 5],41", + "2,[41, 5],5", + "3,[18, 42],18", + "3,[18, 42],42" + ) + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testUnnestArrayOfArrayFromTable(): Unit = { + + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.clear + + val data = List( + (1, Array(12, 45), Array(Array(12, 45))), + (2, Array(41, 5), Array(Array(18), Array(87))), + (3, Array(18, 42), Array(Array(1), Array(45))) + ) + val stream = env.fromCollection(data) + tEnv.registerDataStream("T", stream, 'a, 'b, 'c) + + val sqlQuery = "SELECT a, s FROM T, UNNEST(T.c) AS A (s)" + + val result = tEnv.sql(sqlQuery).toDataStream[Row] + result.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = List( + "1,[12, 45]", + "2,[18]", + "2,[87]", + "3,[1]", + "3,[45]") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testUnnestObjectArrayFromTableWithFilter(): Unit = { + + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.clear + + val data = List( + (1, Array((12, "45.6"), (12, "45.612"))), + (2, Array((13, "41.6"), (14, "45.2136"))), + (3, Array((18, "42.6"))) + ) + val stream = env.fromCollection(data) + tEnv.registerDataStream("T", stream, 'a, 'b) + + val sqlQuery = "SELECT a, b, s, t FROM T, UNNEST(T.b) AS A (s, t) WHERE s > 13" + + val result = tEnv.sql(sqlQuery).toDataStream[Row] + result.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = List( + "2,[(13,41.6), (14,45.2136)],14,45.2136", + "3,[(18,42.6)],18,42.6") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + } +