[FLINK-3794] Add checks for unsupported operations in streaming table API This closes #1921
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4b06b01e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4b06b01e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4b06b01e Branch: refs/heads/master Commit: 4b06b01ea42561b46e6c38b70eb217abee0ebdf2 Parents: 51ff4a0 Author: vasia <va...@apache.org> Authored: Thu Apr 21 12:17:32 2016 +0200 Committer: vasia <va...@apache.org> Committed: Thu Apr 21 15:27:52 2016 +0200 ---------------------------------------------------------------------- .../scala/table/StreamTableEnvironment.scala | 3 +- .../org/apache/flink/api/table/table.scala | 74 +++++++++++++------- .../streaming/test/UnsupportedOpsTest.scala | 62 ++++++++++++++++ 3 files changed, 114 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4b06b01e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala index 1bbfaaa..3beedcc 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala @@ -90,7 +90,8 @@ class StreamTableEnvironment( * [[org.apache.flink.api.table.TableEnvironment]]'s catalog. * Registered tables can be referenced in SQL queries. * - * The field names of the [[Table]] are automatically derived from the type of the [[DataStream]]. + * The field names of the [[Table]] are automatically derived + * from the type of the [[DataStream]]. * * @param name The name under which the [[DataStream]] is registered in the catalog. * @param dataStream The [[DataStream]] to register. http://git-wip-us.apache.org/repos/asf/flink/blob/4b06b01e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala index f9536a1..6485139 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala @@ -102,8 +102,14 @@ class Table( // apply aggregations if (aggCalls.nonEmpty) { - val emptyKey: GroupKey = relBuilder.groupKey() - relBuilder.aggregate(emptyKey, aggCalls.toIterable.asJava) + // aggregation on stream table is not currently supported + tableEnv match { + case _: StreamTableEnvironment => + throw new TableException("Aggregation on stream tables is currently not supported.") + case _ => + val emptyKey: GroupKey = relBuilder.groupKey() + relBuilder.aggregate(emptyKey, aggCalls.toIterable.asJava) + } } // get selection expressions @@ -262,11 +268,18 @@ class Table( */ def groupBy(fields: Expression*): GroupedTable = { - relBuilder.push(relNode) - val groupExpr = fields.map(_.toRexNode(relBuilder)).toIterable.asJava - val groupKey = relBuilder.groupKey(groupExpr) + // group by on stream tables is currently not supported + tableEnv match { + case _: StreamTableEnvironment => + throw new TableException("Group by on stream tables is currently not supported.") + case _ => { + relBuilder.push(relNode) + val groupExpr = fields.map(_.toRexNode(relBuilder)).toIterable.asJava + val groupKey = relBuilder.groupKey(groupExpr) - new GroupedTable(relBuilder.build(), tableEnv, groupKey) + new GroupedTable(relBuilder.build(), tableEnv, groupKey) + } + } } /** @@ -294,9 +307,15 @@ class Table( * }}} */ def distinct(): Table = { - relBuilder.push(relNode) - relBuilder.distinct() - new Table(relBuilder.build(), tableEnv) + // distinct on stream table is not currently supported + tableEnv match { + case _: StreamTableEnvironment => + throw new TableException("Distinct on stream tables is currently not supported.") + case _ => + relBuilder.push(relNode) + relBuilder.distinct() + new Table(relBuilder.build(), tableEnv) + } } /** @@ -314,24 +333,31 @@ class Table( */ def join(right: Table): Table = { - // check that right table belongs to the same TableEnvironment - if (right.tableEnv != this.tableEnv) { - throw new TableException("Only tables from the same TableEnvironment can be joined.") - } + // join on stream tables is currently not supported + tableEnv match { + case _: StreamTableEnvironment => + throw new TableException("Join on stream tables is currently not supported.") + case _ => { + // check that right table belongs to the same TableEnvironment + if (right.tableEnv != this.tableEnv) { + throw new TableException("Only tables from the same TableEnvironment can be joined.") + } - // check that join inputs do not have overlapping field names - val leftFields = relNode.getRowType.getFieldNames.asScala.toSet - val rightFields = right.relNode.getRowType.getFieldNames.asScala.toSet - if (leftFields.intersect(rightFields).nonEmpty) { - throw new IllegalArgumentException("Overlapping fields names on join input.") - } + // check that join inputs do not have overlapping field names + val leftFields = relNode.getRowType.getFieldNames.asScala.toSet + val rightFields = right.relNode.getRowType.getFieldNames.asScala.toSet + if (leftFields.intersect(rightFields).nonEmpty) { + throw new IllegalArgumentException("Overlapping fields names on join input.") + } - relBuilder.push(relNode) - relBuilder.push(right.relNode) + relBuilder.push(relNode) + relBuilder.push(right.relNode) - relBuilder.join(JoinRelType.INNER, relBuilder.literal(true)) - val join = relBuilder.build() - new Table(join, tableEnv) + relBuilder.join(JoinRelType.INNER, relBuilder.literal(true)) + val join = relBuilder.build() + new Table(join, tableEnv) + } + } } /** http://git-wip-us.apache.org/repos/asf/flink/blob/4b06b01e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/UnsupportedOpsTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/UnsupportedOpsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/UnsupportedOpsTest.scala new file mode 100644 index 0000000..f7bd0ff --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/streaming/test/UnsupportedOpsTest.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.table.streaming.test + +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.scala.table.streaming.test.utils.{StreamITCase, StreamTestData} +import org.apache.flink.api.table.{TableException, TableEnvironment} +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase +import org.junit.Test + +import scala.collection.mutable + +class UnsupportedOpsTest extends StreamingMultipleProgramsTestBase { + + @Test(expected = classOf[TableException]) + def testSelectWithAggregation(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1.min) + } + + @Test(expected = classOf[TableException]) + def testGroupBy(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv) + .groupBy('_1) + } + + @Test(expected = classOf[TableException]) + def testDistinct(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).distinct() + } + + @Test(expected = classOf[TableException]) + def testJoin(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv) + val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv) + t1.join(t2) + } +}