Repository: flink
Updated Branches:
  refs/heads/master e5b65a7fc -> fe4e96a72


[FLINK-6033] [table] Add support for SQL UNNEST.

This closes #3793.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fe4e96a7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fe4e96a7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fe4e96a7

Branch: refs/heads/master
Commit: fe4e96a726dd32fb948db050b975312e120e2461
Parents: 9f2293c
Author: Shuyi Chen <sh...@uber.com>
Authored: Fri Apr 21 23:48:28 2017 -0700
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Sun May 7 13:32:26 2017 +0200

----------------------------------------------------------------------
 docs/dev/table_api.md                           |   2 +
 .../flink/table/calcite/FlinkTypeFactory.scala  |   5 +-
 .../utils/UserDefinedFunctionUtils.scala        |   6 +-
 .../flink/table/plan/nodes/FlinkRelNode.scala   |   3 +-
 .../flink/table/plan/rules/FlinkRuleSets.scala  |   3 +
 .../plan/rules/logical/LogicalUnnestRule.scala  | 134 +++++++++++++++++++
 .../table/plan/util/ExplodeFunctionUtil.scala   |  91 +++++++++++++
 .../flink/table/typeutils/TypeCheckUtils.scala  |   4 +-
 .../table/api/scala/batch/sql/JoinITCase.scala  |  54 ++++++++
 .../table/api/scala/stream/sql/SqlITCase.scala  |  90 +++++++++++++
 10 files changed, 388 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fe4e96a7/docs/dev/table_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md
index a77d994..d105188 100644
--- a/docs/dev/table_api.md
+++ b/docs/dev/table_api.md
@@ -1482,6 +1482,7 @@ val result2 = tableEnv.sql(
 #### Limitations
 
 Joins, set operations, and non-windowed aggregations are not supported yet.
+`UNNEST` supports only arrays and does not support `WITH ORDINALITY` yet.
 
 {% top %}
 
@@ -1690,6 +1691,7 @@ tableReference:
 tablePrimary:
   [ TABLE ] [ [ catalogName . ] schemaName . ] tableName
   | LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')'
+  | UNNEST '(' expression ')'
 
 values:
   VALUES expression [, expression ]*

http://git-wip-us.apache.org/repos/asf/flink/blob/fe4e96a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index 9281ad8..eba1623 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -26,7 +26,7 @@ import org.apache.calcite.sql.`type`.SqlTypeName._
 import org.apache.calcite.sql.`type`.{BasicSqlType, SqlTypeName}
 import org.apache.calcite.sql.parser.SqlParserPos
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.api.common.typeinfo.{NothingTypeInfo, 
PrimitiveArrayTypeInfo, SqlTimeTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeinfo._
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.java.typeutils.ValueTypeInfo._
 import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo, 
RowTypeInfo}
@@ -180,6 +180,9 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) 
extends JavaTypeFactoryImp
     case pa: PrimitiveArrayTypeInfo[_] =>
       new ArrayRelDataType(pa, createTypeFromTypeInfo(pa.getComponentType), 
false)
 
+    case ba: BasicArrayTypeInfo[_, _] =>
+      new ArrayRelDataType(ba, createTypeFromTypeInfo(ba.getComponentInfo), 
true)
+
     case oa: ObjectArrayTypeInfo[_, _] =>
       new ArrayRelDataType(oa, createTypeFromTypeInfo(oa.getComponentInfo), 
true)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe4e96a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
index 689bf0e..11174de 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
@@ -436,7 +436,11 @@ object UserDefinedFunctionUtils {
     expected.isPrimitive && Primitives.wrap(expected) == candidate ||
     candidate == classOf[Date] && (expected == classOf[Int] || expected == 
classOf[JInt])  ||
     candidate == classOf[Time] && (expected == classOf[Int] || expected == 
classOf[JInt]) ||
-    candidate == classOf[Timestamp] && (expected == classOf[Long] || expected 
== classOf[JLong])
+    candidate == classOf[Timestamp] && (expected == classOf[Long] || expected 
== classOf[JLong]) ||
+    (candidate.isArray &&
+      expected.isArray &&
+      candidate.getComponentType.isInstanceOf[Object] &&
+      expected.getComponentType == classOf[Object])
 
   @throws[Exception]
   def serialize(function: UserDefinedFunction): String = {

http://git-wip-us.apache.org/repos/asf/flink/blob/fe4e96a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
index 0b244e9..8509a8e 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
@@ -61,7 +61,8 @@ trait FlinkRelNode extends RelNode {
         val referenceExpr = getExpressionString(fa.getReferenceExpr, inFields, 
localExprsTable)
         val field = fa.getField.getName
         s"$referenceExpr.$field"
-
+      case cv: RexCorrelVariable =>
+        cv.toString
       case _ =>
         throw new IllegalArgumentException(s"Unknown expression type 
'${expr.getClass}': $expr")
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/fe4e96a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index fad60fd..980dfd3 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -100,6 +100,9 @@ object FlinkRuleSets {
     PushProjectIntoTableSourceScanRule.INSTANCE,
     PushFilterIntoTableSourceScanRule.INSTANCE,
 
+    // Unnest rule
+    LogicalUnnestRule.INSTANCE,
+
     // translate to flink logical rel nodes
     FlinkLogicalAggregate.CONVERTER,
     FlinkLogicalWindowAggregate.CONVERTER,

http://git-wip-us.apache.org/repos/asf/flink/blob/fe4e96a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalUnnestRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalUnnestRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalUnnestRule.scala
new file mode 100644
index 0000000..f2d9f2a
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalUnnestRule.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import java.util.Collections
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand}
+import org.apache.calcite.rel.`type`.{RelDataTypeFieldImpl, RelRecordType, 
StructKind}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.Uncollect
+import org.apache.calcite.rel.logical._
+import org.apache.calcite.sql.`type`.AbstractSqlType
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
+import org.apache.flink.table.plan.schema.ArrayRelDataType
+import org.apache.flink.table.plan.util.ExplodeFunctionUtil
+
+class LogicalUnnestRule(
+    operand: RelOptRuleOperand,
+    description: String)
+  extends RelOptRule(operand, description) {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+
+    val join: LogicalCorrelate = call.rel(0).asInstanceOf[LogicalCorrelate]
+    val right = join.getRight.asInstanceOf[RelSubset].getOriginal
+
+    right match {
+      // a filter is pushed above the table function
+      case filter: LogicalFilter =>
+        filter.getInput.asInstanceOf[RelSubset].getOriginal match {
+          case u: Uncollect => !u.withOrdinality
+          case _ => false
+        }
+      case u: Uncollect => !u.withOrdinality
+      case _ => false
+    }
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+    val correlate = call.rel(0).asInstanceOf[LogicalCorrelate]
+
+    val outer = correlate.getLeft.asInstanceOf[RelSubset].getOriginal
+    val array = correlate.getRight.asInstanceOf[RelSubset].getOriginal
+
+    def convert(relNode: RelNode): RelNode = {
+      relNode match {
+        case rs: RelSubset =>
+          convert(rs.getRelList.get(0))
+
+        case f: LogicalFilter =>
+          f.copy(
+            f.getTraitSet,
+            
ImmutableList.of(convert(f.getInput.asInstanceOf[RelSubset].getOriginal)))
+
+        case uc: Uncollect =>
+          // convert Uncollect into TableFunctionScan
+          val cluster = correlate.getCluster
+
+          val arrayType =
+            
uc.getInput.getRowType.getFieldList.get(0).getValue.asInstanceOf[ArrayRelDataType]
+          val componentType = arrayType.getComponentType
+
+          // create table function
+          val explodeTableFunc = 
UserDefinedFunctionUtils.createTableSqlFunctions(
+            "explode",
+            ExplodeFunctionUtil.explodeTableFuncFromType(arrayType.typeInfo),
+            FlinkTypeFactory.toTypeInfo(arrayType.getComponentType),
+            cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]).head
+
+          // create table function call
+          val rexCall = cluster.getRexBuilder.makeCall(
+            explodeTableFunc,
+            uc.getInput.asInstanceOf[RelSubset]
+              .getOriginal.asInstanceOf[LogicalProject].getChildExps
+          )
+
+          // determine rel data type of unnest
+          val rowType = componentType match {
+            case _: AbstractSqlType =>
+              new RelRecordType(
+                StructKind.FULLY_QUALIFIED,
+                ImmutableList.of(new RelDataTypeFieldImpl("f0", 0, 
componentType)))
+            case _: RelRecordType => componentType
+            case _ => throw TableException(
+              s"Unsupported array component type in UNNEST: 
${componentType.toString}")
+          }
+
+          // create table function scan
+          new LogicalTableFunctionScan(
+            cluster,
+            correlate.getTraitSet,
+            Collections.emptyList(),
+            rexCall,
+            classOf[Array[Object]],
+            rowType,
+            null)
+      }
+    }
+
+    // convert unnest into table function scan
+    val tableFunctionScan = convert(array)
+    // create correlate with table function scan as input
+    val newCorrleate =
+      correlate.copy(correlate.getTraitSet, ImmutableList.of(outer, 
tableFunctionScan))
+    call.transformTo(newCorrleate)
+  }
+}
+
+object LogicalUnnestRule {
+  val INSTANCE = new LogicalUnnestRule(
+    operand(classOf[LogicalCorrelate], any),
+    "LogicalUnnestRule")
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe4e96a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala
new file mode 100644
index 0000000..1bcc6d9
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.util
+
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
BasicTypeInfo, PrimitiveArrayTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
+import org.apache.flink.table.functions.TableFunction
+
+class ObjectExplodeTableFunc extends TableFunction[Object] {
+  def eval(arr: Array[Object]): Unit = {
+    arr.foreach(collect)
+  }
+}
+
+class FloatExplodeTableFunc extends TableFunction[Float] {
+  def eval(arr: Array[Float]): Unit = {
+    arr.foreach(collect)
+  }
+}
+
+class ShortExplodeTableFunc extends TableFunction[Short] {
+  def eval(arr: Array[Short]): Unit = {
+    arr.foreach(collect)
+  }
+}
+class IntExplodeTableFunc extends TableFunction[Int] {
+  def eval(arr: Array[Int]): Unit = {
+    arr.foreach(collect)
+  }
+}
+
+class LongExplodeTableFunc extends TableFunction[Long] {
+  def eval(arr: Array[Long]): Unit = {
+    arr.foreach(collect)
+  }
+}
+
+class DoubleExplodeTableFunc extends TableFunction[Double] {
+  def eval(arr: Array[Double]): Unit = {
+    arr.foreach(collect)
+  }
+}
+
+class ByteExplodeTableFunc extends TableFunction[Byte] {
+  def eval(arr: Array[Byte]): Unit = {
+    arr.foreach(collect)
+  }
+}
+
+class BooleanExplodeTableFunc extends TableFunction[Boolean] {
+  def eval(arr: Array[Boolean]): Unit = {
+    arr.foreach(collect)
+  }
+}
+
+object ExplodeFunctionUtil {
+  def explodeTableFuncFromType(ti: TypeInformation[_]):TableFunction[_] = {
+    ti match {
+      case pat: PrimitiveArrayTypeInfo[_] => {
+        pat.getComponentType match {
+          case BasicTypeInfo.INT_TYPE_INFO => new IntExplodeTableFunc
+          case BasicTypeInfo.LONG_TYPE_INFO => new LongExplodeTableFunc
+          case BasicTypeInfo.SHORT_TYPE_INFO => new ShortExplodeTableFunc
+          case BasicTypeInfo.FLOAT_TYPE_INFO => new FloatExplodeTableFunc
+          case BasicTypeInfo.DOUBLE_TYPE_INFO => new DoubleExplodeTableFunc
+          case BasicTypeInfo.BYTE_TYPE_INFO => new ByteExplodeTableFunc
+          case BasicTypeInfo.BOOLEAN_TYPE_INFO => new BooleanExplodeTableFunc
+        }
+      }
+      case _: ObjectArrayTypeInfo[_, _] => new ObjectExplodeTableFunc
+      case _: BasicArrayTypeInfo[_, _] => new ObjectExplodeTableFunc
+      case _ => throw new UnsupportedOperationException(ti.toString + "IS NOT 
supported")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fe4e96a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
index 9896a8c..fea8c2a 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
@@ -68,7 +68,9 @@ object TypeCheckUtils {
   def isLong(dataType: TypeInformation[_]): Boolean = dataType == 
LONG_TYPE_INFO
 
   def isArray(dataType: TypeInformation[_]): Boolean = dataType match {
-    case _: ObjectArrayTypeInfo[_, _] | _: PrimitiveArrayTypeInfo[_] => true
+    case _: ObjectArrayTypeInfo[_, _] |
+         _: PrimitiveArrayTypeInfo[_] |
+         _: BasicArrayTypeInfo[_, _] => true
     case _ => false
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe4e96a7/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala
index 9df17ad..8a8c0ce 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala
@@ -26,6 +26,7 @@ import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.table.api.{TableEnvironment, TableException, 
ValidationException}
 import org.apache.flink.test.util.TestBaseUtils
 import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
 import org.junit._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
@@ -375,6 +376,59 @@ class JoinITCase(
     Assert.assertEquals(0, result)
   }
 
+  @Test
+  def testCrossWithUnnest(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val data = List(
+      (1, 1L, Array("Hi", "w")),
+      (2, 2L, Array("Hello", "k")),
+      (3, 2L, Array("Hello world", "x"))
+    )
+    val stream = env.fromCollection(data)
+    tEnv.registerDataSet("T", stream, 'a, 'b, 'c)
+
+    val sqlQuery = "SELECT a, s FROM T, UNNEST(T.c) as A (s)"
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = List("1,Hi", "1,w", "2,Hello", "2,k", "3,Hello world", 
"3,x")
+    val results = result.toDataSet[Row].collect().toList
+    assertEquals(expected.toString(), results.sortWith(_.toString < 
_.toString).toString())
+  }
+
+  @Test
+  def testJoinWithUnnestOfTuple(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val data = List(
+      (1, Array((12, "45.6"), (2, "45.612"))),
+      (2, Array((13, "41.6"), (1, "45.2136"))),
+      (3, Array((18, "42.6")))
+    )
+    val stream = env.fromCollection(data)
+    tEnv.registerDataSet("T", stream, 'a, 'b)
+
+    val sqlQuery = "" +
+      "SELECT a, b, x, y " +
+      "FROM " +
+      "  (SELECT a, b FROM T WHERE a < 3) as tf, " +
+      "  UNNEST(tf.b) as A (x, y) " +
+      "WHERE x > a"
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = List(
+      "1,[(12,45.6), (2,45.612)],12,45.6",
+      "1,[(12,45.6), (2,45.612)],2,45.612",
+      "2,[(13,41.6), (1,45.2136)],13,41.6").mkString(", ")
+    val results = result.toDataSet[Row].collect().map(_.toString)
+    assertEquals(expected, results.sorted.mkString(", "))
+  }
+
   @Test(expected = classOf[TableException])
   def testRightOuterJoinWithNonEquiJoinPredicate(): Unit = {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe4e96a7/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
index 95366e1..4147358 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
@@ -190,4 +190,94 @@ class SqlITCase extends StreamingWithStateTestBase {
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
+  @Test
+  def testUnnestPrimitiveArrayFromTable(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val data = List(
+      (1, Array(12, 45), Array(Array(12, 45))),
+      (2, Array(41, 5), Array(Array(18), Array(87))),
+      (3, Array(18, 42), Array(Array(1), Array(45)))
+    )
+    val stream = env.fromCollection(data)
+    tEnv.registerDataStream("T", stream, 'a, 'b, 'c)
+
+    val sqlQuery = "SELECT a, b, s FROM T, UNNEST(T.b) AS A (s)"
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = List(
+      "1,[12, 45],12",
+      "1,[12, 45],45",
+      "2,[41, 5],41",
+      "2,[41, 5],5",
+      "3,[18, 42],18",
+      "3,[18, 42],42"
+    )
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testUnnestArrayOfArrayFromTable(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val data = List(
+      (1, Array(12, 45), Array(Array(12, 45))),
+      (2, Array(41, 5), Array(Array(18), Array(87))),
+      (3, Array(18, 42), Array(Array(1), Array(45)))
+    )
+    val stream = env.fromCollection(data)
+    tEnv.registerDataStream("T", stream, 'a, 'b, 'c)
+
+    val sqlQuery = "SELECT a, s FROM T, UNNEST(T.c) AS A (s)"
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = List(
+      "1,[12, 45]",
+      "2,[18]",
+      "2,[87]",
+      "3,[1]",
+      "3,[45]")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testUnnestObjectArrayFromTableWithFilter(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val data = List(
+      (1, Array((12, "45.6"), (12, "45.612"))),
+      (2, Array((13, "41.6"), (14, "45.2136"))),
+      (3, Array((18, "42.6")))
+    )
+    val stream = env.fromCollection(data)
+    tEnv.registerDataStream("T", stream, 'a, 'b)
+
+    val sqlQuery = "SELECT a, b, s, t FROM T, UNNEST(T.b) AS A (s, t) WHERE s 
> 13"
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = List(
+      "2,[(13,41.6), (14,45.2136)],14,45.2136",
+      "3,[(18,42.6)],18,42.6")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
 }
+

Reply via email to