[FLINK-3794] Add checks for unsupported operations in streaming table API

This closes #1921


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4b06b01e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4b06b01e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4b06b01e

Branch: refs/heads/master
Commit: 4b06b01ea42561b46e6c38b70eb217abee0ebdf2
Parents: 51ff4a0
Author: vasia <va...@apache.org>
Authored: Thu Apr 21 12:17:32 2016 +0200
Committer: vasia <va...@apache.org>
Committed: Thu Apr 21 15:27:52 2016 +0200

----------------------------------------------------------------------
 .../scala/table/StreamTableEnvironment.scala    |  3 +-
 .../org/apache/flink/api/table/table.scala      | 74 +++++++++++++-------
 .../streaming/test/UnsupportedOpsTest.scala     | 62 ++++++++++++++++
 3 files changed, 114 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4b06b01e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
index 1bbfaaa..3beedcc 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
@@ -90,7 +90,8 @@ class StreamTableEnvironment(
     * [[org.apache.flink.api.table.TableEnvironment]]'s catalog.
     * Registered tables can be referenced in SQL queries.
     *
-    * The field names of the [[Table]] are automatically derived from the type 
of the [[DataStream]].
+    * The field names of the [[Table]] are automatically derived
+    * from the type of the [[DataStream]].
     *
     * @param name The name under which the [[DataStream]] is registered in the 
catalog.
     * @param dataStream The [[DataStream]] to register.

http://git-wip-us.apache.org/repos/asf/flink/blob/4b06b01e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
index f9536a1..6485139 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
@@ -102,8 +102,14 @@ class Table(
 
     // apply aggregations
     if (aggCalls.nonEmpty) {
-      val emptyKey: GroupKey = relBuilder.groupKey()
-      relBuilder.aggregate(emptyKey, aggCalls.toIterable.asJava)
+      // aggregation on stream table is not currently supported
+      tableEnv match {
+        case _: StreamTableEnvironment =>
+          throw new TableException("Aggregation on stream tables is currently 
not supported.")
+        case _ =>
+          val emptyKey: GroupKey = relBuilder.groupKey()
+          relBuilder.aggregate(emptyKey, aggCalls.toIterable.asJava)
+      }
     }
 
     // get selection expressions
@@ -262,11 +268,18 @@ class Table(
     */
   def groupBy(fields: Expression*): GroupedTable = {
 
-    relBuilder.push(relNode)
-    val groupExpr = fields.map(_.toRexNode(relBuilder)).toIterable.asJava
-    val groupKey = relBuilder.groupKey(groupExpr)
+    // group by on stream tables is currently not supported
+    tableEnv match {
+      case _: StreamTableEnvironment =>
+        throw new TableException("Group by on stream tables is currently not 
supported.")
+      case _ => {
+        relBuilder.push(relNode)
+        val groupExpr = fields.map(_.toRexNode(relBuilder)).toIterable.asJava
+        val groupKey = relBuilder.groupKey(groupExpr)
 
-    new GroupedTable(relBuilder.build(), tableEnv, groupKey)
+        new GroupedTable(relBuilder.build(), tableEnv, groupKey)
+      }
+    }
   }
 
   /**
@@ -294,9 +307,15 @@ class Table(
     * }}}
     */
   def distinct(): Table = {
-    relBuilder.push(relNode)
-    relBuilder.distinct()
-    new Table(relBuilder.build(), tableEnv)
+    // distinct on stream table is not currently supported
+    tableEnv match {
+      case _: StreamTableEnvironment =>
+        throw new TableException("Distinct on stream tables is currently not 
supported.")
+      case _ =>
+        relBuilder.push(relNode)
+        relBuilder.distinct()
+        new Table(relBuilder.build(), tableEnv)
+    }
   }
 
   /**
@@ -314,24 +333,31 @@ class Table(
     */
   def join(right: Table): Table = {
 
-    // check that right table belongs to the same TableEnvironment
-    if (right.tableEnv != this.tableEnv) {
-      throw new TableException("Only tables from the same TableEnvironment can 
be joined.")
-    }
+    // join on stream tables is currently not supported
+    tableEnv match {
+      case _: StreamTableEnvironment =>
+        throw new TableException("Join on stream tables is currently not 
supported.")
+      case _ => {
+        // check that right table belongs to the same TableEnvironment
+        if (right.tableEnv != this.tableEnv) {
+          throw new TableException("Only tables from the same TableEnvironment 
can be joined.")
+        }
 
-    // check that join inputs do not have overlapping field names
-    val leftFields = relNode.getRowType.getFieldNames.asScala.toSet
-    val rightFields = right.relNode.getRowType.getFieldNames.asScala.toSet
-    if (leftFields.intersect(rightFields).nonEmpty) {
-      throw new IllegalArgumentException("Overlapping fields names on join 
input.")
-    }
+        // check that join inputs do not have overlapping field names
+        val leftFields = relNode.getRowType.getFieldNames.asScala.toSet
+        val rightFields = right.relNode.getRowType.getFieldNames.asScala.toSet
+        if (leftFields.intersect(rightFields).nonEmpty) {
+          throw new IllegalArgumentException("Overlapping fields names on join 
input.")
+        }
 
-    relBuilder.push(relNode)
-    relBuilder.push(right.relNode)
+        relBuilder.push(relNode)
+        relBuilder.push(right.relNode)
 
-    relBuilder.join(JoinRelType.INNER, relBuilder.literal(true))
-    val join = relBuilder.build()
-    new Table(join, tableEnv)
+        relBuilder.join(JoinRelType.INNER, relBuilder.literal(true))
+        val join = relBuilder.build()
+        new Table(join, tableEnv)
+      }
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/4b06b01e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/UnsupportedOpsTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/UnsupportedOpsTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/UnsupportedOpsTest.scala
new file mode 100644
index 0000000..f7bd0ff
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/UnsupportedOpsTest.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.table.streaming.test
+
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.table.streaming.test.utils.{StreamITCase, 
StreamTestData}
+import org.apache.flink.api.table.{TableException, TableEnvironment}
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.junit.Test
+
+import scala.collection.mutable
+
+class UnsupportedOpsTest extends StreamingMultipleProgramsTestBase {
+
+  @Test(expected = classOf[TableException])
+  def testSelectWithAggregation(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1.min)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testGroupBy(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
+      .groupBy('_1)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testDistinct(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).distinct()
+  }
+
+  @Test(expected = classOf[TableException])
+  def testJoin(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
+    val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
+    t1.join(t2)
+  }
+}

Reply via email to