Repository: flink Updated Branches: refs/heads/master f00e1e7c5 -> 72e6b760f
[FLINK-4546] [table] Remove STREAM keyword in Stream SQL This closes #2454. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/72e6b760 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/72e6b760 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/72e6b760 Branch: refs/heads/master Commit: 72e6b760fd951764c3ecc6fc191dc99a42d55e0b Parents: f00e1e7 Author: Jark Wu <wuchong...@alibaba-inc.com> Authored: Mon Aug 29 11:35:43 2016 +0800 Committer: twalthr <twal...@apache.org> Committed: Tue Oct 4 15:24:48 2016 +0200 ---------------------------------------------------------------------- docs/dev/table_api.md | 8 +-- .../api/java/table/StreamTableEnvironment.scala | 8 +-- .../scala/table/StreamTableEnvironment.scala | 8 +-- .../api/table/StreamTableEnvironment.scala | 38 ++-------- .../flink/api/table/TableEnvironment.scala | 14 +--- .../datastream/StreamTableSourceScan.scala | 4 +- .../api/table/plan/rules/FlinkRuleSets.scala | 2 - .../plan/rules/datastream/RemoveDeltaRule.scala | 42 ----------- .../datastream/StreamTableSourceScanRule.scala | 6 +- .../api/table/plan/schema/DataStreamTable.scala | 11 --- .../schema/StreamableTableSourceTable.scala | 30 -------- .../table/plan/schema/TransStreamTable.scala | 73 -------------------- .../flink/api/java/stream/sql/SqlITCase.java | 8 +-- .../api/scala/stream/TableSourceITCase.scala | 4 +- .../flink/api/scala/stream/sql/SqlITCase.scala | 18 ++--- .../api/table/ExpressionReductionTest.scala | 10 +-- 16 files changed, 46 insertions(+), 238 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/72e6b760/docs/dev/table_api.md ---------------------------------------------------------------------- diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md index 1d03b38..2d6d6ce 100644 --- a/docs/dev/table_api.md +++ b/docs/dev/table_api.md @@ -1061,7 +1061,7 @@ Among others, the following SQL features are not supported, yet: ### SQL on Streaming Tables -SQL queries can be executed on streaming Tables (Tables backed by `DataStream` or `StreamTableSource`) by using the `SELECT STREAM` keywords instead of `SELECT`. Please refer to the [Apache Calcite's Streaming SQL documentation](https://calcite.apache.org/docs/stream.html) for more information on the Streaming SQL syntax. +SQL queries can be executed on streaming Tables (Tables backed by `DataStream` or `StreamTableSource`) like standard SQL. <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> @@ -1075,7 +1075,7 @@ DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(...); tableEnv.registerDataStream("Orders", ds, "user, product, amount"); // run a SQL query on the Table and retrieve the result as a new Table Table result = tableEnv.sql( - "SELECT STREAM product, amount FROM Orders WHERE product LIKE '%Rubber%'"); + "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'"); {% endhighlight %} </div> @@ -1090,7 +1090,7 @@ val ds: DataStream[(Long, String, Integer)] = env.addSource(...) tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount) // run a SQL query on the Table and retrieve the result as a new Table val result = tableEnv.sql( - "SELECT STREAM product, amount FROM Orders WHERE product LIKE '%Rubber%'") + "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'") {% endhighlight %} </div> </div> @@ -1125,7 +1125,7 @@ orderItem: expression [ ASC | DESC ] select: - SELECT [ STREAM ] [ ALL | DISTINCT ] + SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } FROM tableExpression [ WHERE booleanExpression ] http://git-wip-us.apache.org/repos/asf/flink/blob/72e6b760/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala index add9486..f8dbc37 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala @@ -57,7 +57,7 @@ class StreamTableEnvironment( def fromDataStream[T](dataStream: DataStream[T]): Table = { val name = createUniqueTableName() - registerDataStreamInternal(name, dataStream, false) + registerDataStreamInternal(name, dataStream) ingest(name) } @@ -82,7 +82,7 @@ class StreamTableEnvironment( .toArray val name = createUniqueTableName() - registerDataStreamInternal(name, dataStream, exprs, false) + registerDataStreamInternal(name, dataStream, exprs) ingest(name) } @@ -101,7 +101,7 @@ class StreamTableEnvironment( def registerDataStream[T](name: String, dataStream: DataStream[T]): Unit = { checkValidTableName(name) - registerDataStreamInternal(name, dataStream, true) + registerDataStreamInternal(name, dataStream) } /** @@ -127,7 +127,7 @@ class StreamTableEnvironment( .toArray checkValidTableName(name) - registerDataStreamInternal(name, dataStream, exprs, true) + registerDataStreamInternal(name, dataStream, exprs) } /** http://git-wip-us.apache.org/repos/asf/flink/blob/72e6b760/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala index ee8c56a..e106178 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala @@ -59,7 +59,7 @@ class StreamTableEnvironment( def fromDataStream[T](dataStream: DataStream[T]): Table = { val name = createUniqueTableName() - registerDataStreamInternal(name, dataStream.javaStream, false) + registerDataStreamInternal(name, dataStream.javaStream) ingest(name) } @@ -81,7 +81,7 @@ class StreamTableEnvironment( def fromDataStream[T](dataStream: DataStream[T], fields: Expression*): Table = { val name = createUniqueTableName() - registerDataStreamInternal(name, dataStream.javaStream, fields.toArray, false) + registerDataStreamInternal(name, dataStream.javaStream, fields.toArray) ingest(name) } @@ -100,7 +100,7 @@ class StreamTableEnvironment( def registerDataStream[T](name: String, dataStream: DataStream[T]): Unit = { checkValidTableName(name) - registerDataStreamInternal(name, dataStream.javaStream, true) + registerDataStreamInternal(name, dataStream.javaStream) } /** @@ -123,7 +123,7 @@ class StreamTableEnvironment( def registerDataStream[T](name: String, dataStream: DataStream[T], fields: Expression*): Unit = { checkValidTableName(name) - registerDataStreamInternal(name, dataStream.javaStream, fields.toArray, true) + registerDataStreamInternal(name, dataStream.javaStream, fields.toArray) } /** http://git-wip-us.apache.org/repos/asf/flink/blob/72e6b760/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala index 44d90ac..15e3960 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala @@ -27,14 +27,12 @@ import org.apache.calcite.sql2rel.RelDecorrelator import org.apache.calcite.tools.Programs import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.DataSet import org.apache.flink.api.table.expressions.Expression import org.apache.flink.api.table.plan.logical.{CatalogNode, LogicalRelNode} import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamConvention, DataStreamRel} import org.apache.flink.api.table.plan.rules.FlinkRuleSets import org.apache.flink.api.table.sinks.{StreamTableSink, TableSink} -import org.apache.flink.api.table.plan.schema. - {StreamableTableSourceTable, TransStreamTable, DataStreamTable} +import org.apache.flink.api.table.plan.schema.{TableSourceTable, DataStreamTable} import org.apache.flink.api.table.sources.StreamTableSource import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment @@ -113,7 +111,7 @@ abstract class StreamTableEnvironment( def registerTableSource(name: String, tableSource: StreamTableSource[_]): Unit = { checkValidTableName(name) - registerTableInternal(name, new StreamableTableSourceTable(tableSource)) + registerTableInternal(name, new TableSourceTable(tableSource)) } /** @@ -167,14 +165,11 @@ abstract class StreamTableEnvironment( * * @param name The name under which the table is registered in the catalog. * @param dataStream The [[DataStream]] to register as table in the catalog. - * @param wrapper True if the registration has to wrap the datastreamTable - * into a [[org.apache.calcite.schema.StreamableTable]] * @tparam T the type of the [[DataStream]]. */ protected def registerDataStreamInternal[T]( name: String, - dataStream: DataStream[T], - wrapper: Boolean): Unit = { + dataStream: DataStream[T]): Unit = { val (fieldNames, fieldIndexes) = getFieldInfo[T](dataStream.getType) val dataStreamTable = new DataStreamTable[T]( @@ -182,16 +177,7 @@ abstract class StreamTableEnvironment( fieldIndexes, fieldNames ) - // when registering a DataStream, we need to wrap it into a TransStreamTable - // so that the SQL validation phase won't fail - if (wrapper) { - registerTableInternal(name, dataStreamTable) - val t = ingest(name) - replaceRegisteredTable(name, new TransStreamTable(t.getRelNode, true)) - } - else { - registerTableInternal(name, dataStreamTable) - } + registerTableInternal(name, dataStreamTable) } /** @@ -201,15 +187,12 @@ abstract class StreamTableEnvironment( * @param name The name under which the table is registered in the catalog. * @param dataStream The [[DataStream]] to register as table in the catalog. * @param fields The field expressions to define the field names of the table. - * @param wrapper True if the registration has to wrap the datastreamTable - * into a [[org.apache.calcite.schema.StreamableTable]] * @tparam T The type of the [[DataStream]]. */ protected def registerDataStreamInternal[T]( name: String, dataStream: DataStream[T], - fields: Array[Expression], - wrapper: Boolean): Unit = { + fields: Array[Expression]): Unit = { val (fieldNames, fieldIndexes) = getFieldInfo[T](dataStream.getType, fields.toArray) val dataStreamTable = new DataStreamTable[T]( @@ -217,16 +200,7 @@ abstract class StreamTableEnvironment( fieldIndexes.toArray, fieldNames.toArray ) - // when registering a DataStream, we need to wrap it into a StreamableTable - // so that the SQL validation phase won't fail - if (wrapper) { - registerTableInternal(name, dataStreamTable) - val t = ingest(name) - replaceRegisteredTable(name, new TransStreamTable(t.getRelNode, true)) - } - else { - registerTableInternal(name, dataStreamTable) - } + registerTableInternal(name, dataStreamTable) } /** http://git-wip-us.apache.org/repos/asf/flink/blob/72e6b760/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala index 02204b1..c3b728b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala @@ -40,7 +40,7 @@ import org.apache.flink.api.scala.{ExecutionEnvironment => ScalaBatchExecEnv} import org.apache.flink.api.table.expressions.{Alias, Expression, UnresolvedFieldReference} import org.apache.flink.api.table.functions.{ScalarFunction, UserDefinedFunction} import org.apache.flink.api.table.plan.cost.DataSetCostFactory -import org.apache.flink.api.table.plan.schema.{RelTable, TransStreamTable} +import org.apache.flink.api.table.plan.schema.RelTable import org.apache.flink.api.table.sinks.TableSink import org.apache.flink.api.table.validate.FunctionCatalog import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaStreamExecEnv} @@ -130,16 +130,8 @@ abstract class TableEnvironment(val config: TableConfig) { } checkValidTableName(name) - - table.tableEnv match { - case e: BatchTableEnvironment => - val tableTable = new RelTable(table.getRelNode) - registerTableInternal(name, tableTable) - case e: StreamTableEnvironment => - val sTableTable = new TransStreamTable(table.getRelNode, true) - tables.add(name, sTableTable) - } - + val tableTable = new RelTable(table.getRelNode) + registerTableInternal(name, tableTable) } /** http://git-wip-us.apache.org/repos/asf/flink/blob/72e6b760/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamTableSourceScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamTableSourceScan.scala index 2c7a584..21b8a63 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamTableSourceScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamTableSourceScan.scala @@ -23,7 +23,7 @@ import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataType import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.table.StreamTableEnvironment -import org.apache.flink.api.table.plan.schema.StreamableTableSourceTable +import org.apache.flink.api.table.plan.schema.TableSourceTable import org.apache.flink.api.table.sources.StreamTableSource import org.apache.flink.streaming.api.datastream.DataStream @@ -35,7 +35,7 @@ class StreamTableSourceScan( rowType: RelDataType) extends StreamScan(cluster, traitSet, table, rowType) { - val tableSourceTable = table.unwrap(classOf[StreamableTableSourceTable]) + val tableSourceTable = table.unwrap(classOf[TableSourceTable]) val tableSource = tableSourceTable.tableSource.asInstanceOf[StreamTableSource[_]] override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { http://git-wip-us.apache.org/repos/asf/flink/blob/72e6b760/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala index 7d915e6..03cb68c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala @@ -114,8 +114,6 @@ object FlinkRuleSets { */ val DATASTREAM_OPT_RULES: RuleSet = RuleSets.ofList( - RemoveDeltaRule.INSTANCE, - // convert a logical table scan to a relational expression TableScanRule.INSTANCE, EnumerableToLogicalTableScan.INSTANCE, http://git-wip-us.apache.org/repos/asf/flink/blob/72e6b760/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/RemoveDeltaRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/RemoveDeltaRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/RemoveDeltaRule.scala deleted file mode 100644 index 7b4720a..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/RemoveDeltaRule.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.rules.datastream - -import org.apache.calcite.plan.RelOptRule._ -import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} -import org.apache.calcite.rel.stream.LogicalDelta - -/** - * Rule that converts an EnumerableTableScan into a LogicalTableScan. - * We need this rule because Calcite creates an EnumerableTableScan - * when parsing a SQL query. We convert it into a LogicalTableScan - * so we can merge the optimization process with any plan that might be created - * by the Table API. - */ -class RemoveDeltaRule extends RelOptRule(operand(classOf[LogicalDelta], any), "RemoveDeltaRule") { - - override def onMatch(call: RelOptRuleCall): Unit = { - val delta = call.rel(0).asInstanceOf[LogicalDelta] - call.transformTo(delta.getInput) - } -} - -object RemoveDeltaRule { - val INSTANCE = new RemoveDeltaRule() -} http://git-wip-us.apache.org/repos/asf/flink/blob/72e6b760/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamTableSourceScanRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamTableSourceScanRule.scala index 8000cde..9d8075c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamTableSourceScanRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamTableSourceScanRule.scala @@ -25,7 +25,7 @@ import org.apache.calcite.rel.core.TableScan import org.apache.calcite.rel.logical.LogicalTableScan import org.apache.flink.api.table.plan.nodes.datastream. {StreamTableSourceScan, DataStreamConvention} -import org.apache.flink.api.table.plan.schema.StreamableTableSourceTable +import org.apache.flink.api.table.plan.schema.TableSourceTable import org.apache.flink.api.table.sources.StreamTableSource /** Rule to convert a [[LogicalTableScan]] into a [[StreamTableSourceScan]]. */ @@ -40,9 +40,9 @@ class StreamTableSourceScanRule /** Rule must only match if TableScan targets a [[StreamTableSource]] */ override def matches(call: RelOptRuleCall): Boolean = { val scan: TableScan = call.rel(0).asInstanceOf[TableScan] - val dataSetTable = scan.getTable.unwrap(classOf[StreamableTableSourceTable]) + val dataSetTable = scan.getTable.unwrap(classOf[TableSourceTable]) dataSetTable match { - case tst: StreamableTableSourceTable => + case tst: TableSourceTable => tst.tableSource match { case _: StreamTableSource[_] => true http://git-wip-us.apache.org/repos/asf/flink/blob/72e6b760/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataStreamTable.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataStreamTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataStreamTable.scala index 0fb5db9..570d723 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataStreamTable.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataStreamTable.scala @@ -18,8 +18,6 @@ package org.apache.flink.api.table.plan.schema -import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} -import org.apache.flink.api.table.FlinkTypeFactory import org.apache.flink.streaming.api.datastream.DataStream class DataStreamTable[T]( @@ -27,13 +25,4 @@ class DataStreamTable[T]( override val fieldIndexes: Array[Int], override val fieldNames: Array[String]) extends FlinkTable[T](dataStream.getType, fieldIndexes, fieldNames) { - - override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = { - val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory] - val builder = typeFactory.builder - fieldNames.zip(fieldTypes) - .foreach( f => - builder.add(f._1, flinkTypeFactory.createTypeFromTypeInfo(f._2)).nullable(true) ) - builder.build - } } http://git-wip-us.apache.org/repos/asf/flink/blob/72e6b760/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/StreamableTableSourceTable.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/StreamableTableSourceTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/StreamableTableSourceTable.scala deleted file mode 100644 index 58214bc..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/StreamableTableSourceTable.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.schema - -import org.apache.calcite.schema.{Table, StreamableTable} -import org.apache.flink.api.table.sources.TableSource - -/** Table which defines an external streamable table via a [[TableSource]] */ -class StreamableTableSourceTable(tableSource: TableSource[_]) - extends TableSourceTable(tableSource) - with StreamableTable { - - override def stream(): Table = new TableSourceTable(tableSource) -} http://git-wip-us.apache.org/repos/asf/flink/blob/72e6b760/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TransStreamTable.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TransStreamTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TransStreamTable.scala deleted file mode 100644 index 61f2598..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TransStreamTable.scala +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.plan.schema - -import org.apache.calcite.plan.RelOptTable -import org.apache.calcite.plan.RelOptTable.ToRelContext -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} -import org.apache.calcite.schema.Schema.TableType -import org.apache.calcite.schema.impl.AbstractTable -import org.apache.calcite.schema.{StreamableTable, Table, TranslatableTable} - -/** - * A [[org.apache.calcite.schema.Table]] implementation for registering - * Streaming Table API Tables in the Calcite schema to be used by Flink SQL. - * It implements [[TranslatableTable]] so that its logical scan - * can be converted to a relational expression and [[StreamableTable]] - * so that it can be used in Streaming SQL queries. - * - * @see [[DataStreamTable]] - */ -class TransStreamTable(relNode: RelNode, wrapper: Boolean) - extends AbstractTable - with TranslatableTable - with StreamableTable { - - override def getJdbcTableType: TableType = ??? - - override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = relNode.getRowType - - override def stream(): Table = { - if (wrapper) { - // we need to return a wrapper non-streamable table, - // otherwise Calcite's rule-matching produces an infinite loop - new StreamTable(relNode) - } - else { - this - } - } - - override def toRel(context: ToRelContext, relOptTable: RelOptTable): RelNode = - relNode - - /** - * Wraps a [[TransStreamTable]]'s relNode - * to implement its stream() method. - */ - class StreamTable(relNode: RelNode) extends AbstractTable { - - override def getJdbcTableType: TableType = ??? - - override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = { - relNode.getRowType - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/72e6b760/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/stream/sql/SqlITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/stream/sql/SqlITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/stream/sql/SqlITCase.java index 1743981..10ae5d9 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/stream/sql/SqlITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/stream/sql/SqlITCase.java @@ -46,7 +46,7 @@ public class SqlITCase extends StreamingMultipleProgramsTestBase { Table in = tableEnv.fromDataStream(ds, "a,b,c"); tableEnv.registerTable("MyTable", in); - String sqlQuery = "SELECT STREAM * FROM MyTable"; + String sqlQuery = "SELECT * FROM MyTable"; Table result = tableEnv.sql(sqlQuery); DataStream<Row> resultSet = tableEnv.toDataStream(result, Row.class); @@ -70,7 +70,7 @@ public class SqlITCase extends StreamingMultipleProgramsTestBase { DataStream<Tuple5<Integer, Long, Integer, String, Long>> ds = StreamTestData.get5TupleDataStream(env); tableEnv.registerDataStream("MyTable", ds, "a, b, c, d, e"); - String sqlQuery = "SELECT STREAM a, b, e FROM MyTable WHERE c < 4"; + String sqlQuery = "SELECT a, b, e FROM MyTable WHERE c < 4"; Table result = tableEnv.sql(sqlQuery); DataStream<Row> resultSet = tableEnv.toDataStream(result, Row.class); @@ -99,9 +99,9 @@ public class SqlITCase extends StreamingMultipleProgramsTestBase { DataStream<Tuple5<Integer, Long, Integer, String, Long>> ds2 = StreamTestData.get5TupleDataStream(env); tableEnv.registerDataStream("T2", ds2, "a, b, d, c, e"); - String sqlQuery = "SELECT STREAM * FROM T1 " + + String sqlQuery = "SELECT * FROM T1 " + "UNION ALL " + - "(SELECT STREAM a, b, c FROM T2 WHERE a < 3)"; + "(SELECT a, b, c FROM T2 WHERE a < 3)"; Table result = tableEnv.sql(sqlQuery); DataStream<Row> resultSet = tableEnv.toDataStream(result, Row.class); http://git-wip-us.apache.org/repos/asf/flink/blob/72e6b760/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSourceITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSourceITCase.scala index c82e6df..c14ad97 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSourceITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSourceITCase.scala @@ -72,7 +72,7 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase { tEnv.registerTableSource("MyTestTable", new TestStreamTableSource(33)) tEnv.sql( - "SELECT STREAM amount * id, name FROM MyTestTable WHERE amount < 4") + "SELECT amount * id, name FROM MyTestTable WHERE amount < 4") .toDataStream[Row] .addSink(new StreamITCase.StringSink) @@ -128,7 +128,7 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase { tEnv.registerTableSource("csvTable", csvTable) tEnv.sql( - "SELECT STREAM last, score, id FROM csvTable WHERE id < 4 ") + "SELECT last, score, id FROM csvTable WHERE id < 4 ") .toDataStream[Row] .addSink(new StreamITCase.StringSink) http://git-wip-us.apache.org/repos/asf/flink/blob/72e6b760/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/sql/SqlITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/sql/SqlITCase.scala index 26c701f..5b278c1 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/sql/SqlITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/sql/SqlITCase.scala @@ -39,7 +39,7 @@ class SqlITCase extends StreamingMultipleProgramsTestBase { val tEnv = TableEnvironment.getTableEnvironment(env) StreamITCase.testResults = mutable.MutableList() - val sqlQuery = "SELECT STREAM a * 2, b - 1 FROM MyTable" + val sqlQuery = "SELECT a * 2, b - 1 FROM MyTable" val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c) tEnv.registerTable("MyTable", t) @@ -60,7 +60,7 @@ class SqlITCase extends StreamingMultipleProgramsTestBase { val tEnv = TableEnvironment.getTableEnvironment(env) StreamITCase.testResults = mutable.MutableList() - val sqlQuery = "SELECT STREAM * FROM MyTable WHERE a = 3" + val sqlQuery = "SELECT * FROM MyTable WHERE a = 3" val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c) tEnv.registerTable("MyTable", t) @@ -81,7 +81,7 @@ class SqlITCase extends StreamingMultipleProgramsTestBase { val tEnv = TableEnvironment.getTableEnvironment(env) StreamITCase.testResults = mutable.MutableList() - val sqlQuery = "SELECT STREAM * FROM MyTable WHERE _1 = 3" + val sqlQuery = "SELECT * FROM MyTable WHERE _1 = 3" val t = StreamTestData.getSmall3TupleDataStream(env) tEnv.registerDataStream("MyTable", t) @@ -101,9 +101,9 @@ class SqlITCase extends StreamingMultipleProgramsTestBase { val tEnv = TableEnvironment.getTableEnvironment(env) StreamITCase.testResults = mutable.MutableList() - val sqlQuery = "SELECT STREAM * FROM T1 " + + val sqlQuery = "SELECT * FROM T1 " + "UNION ALL " + - "SELECT STREAM * FROM T2" + "SELECT * FROM T2" val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c) tEnv.registerTable("T1", t1) @@ -128,9 +128,9 @@ class SqlITCase extends StreamingMultipleProgramsTestBase { val tEnv = TableEnvironment.getTableEnvironment(env) StreamITCase.testResults = mutable.MutableList() - val sqlQuery = "SELECT STREAM * FROM T1 WHERE a = 3 " + + val sqlQuery = "SELECT * FROM T1 WHERE a = 3 " + "UNION ALL " + - "SELECT STREAM * FROM T2 WHERE a = 2" + "SELECT * FROM T2 WHERE a = 2" val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c) tEnv.registerTable("T1", t1) @@ -154,9 +154,9 @@ class SqlITCase extends StreamingMultipleProgramsTestBase { val tEnv = TableEnvironment.getTableEnvironment(env) StreamITCase.testResults = mutable.MutableList() - val sqlQuery = "SELECT STREAM c FROM T1 WHERE a = 3 " + + val sqlQuery = "SELECT c FROM T1 WHERE a = 3 " + "UNION ALL " + - "SELECT STREAM c FROM T2 WHERE a = 2" + "SELECT c FROM T2 WHERE a = 2" val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c) tEnv.registerTable("T1", t1) http://git-wip-us.apache.org/repos/asf/flink/blob/72e6b760/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala index 4830b75..925a818 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala @@ -229,7 +229,7 @@ class ExpressionReductionTest { def testReduceCalcExpressionForStreamSQL(): Unit = { val tEnv = mockStreamTableEnvironment() - val sqlQuery = "SELECT STREAM " + + val sqlQuery = "SELECT " + "(3+4)+a, " + "b+(1+2), " + "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " + @@ -269,7 +269,7 @@ class ExpressionReductionTest { def testReduceProjectExpressionForStreamSQL(): Unit = { val tEnv = mockStreamTableEnvironment() - val sqlQuery = "SELECT STREAM " + + val sqlQuery = "SELECT " + "(3+4)+a, " + "b+(1+2), " + "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " + @@ -289,8 +289,8 @@ class ExpressionReductionTest { val optimized = tEnv.optimize(table.getRelNode) val optimizedString = optimized.toString - assertTrue(optimizedString.contains("+(7, a) AS EXPR$0")) - assertTrue(optimizedString.contains("+(b, 3) AS EXPR$1")) + assertTrue(optimizedString.contains("+(7, _1) AS EXPR$0")) + assertTrue(optimizedString.contains("+(_2, 3) AS EXPR$1")) assertTrue(optimizedString.contains("'b' AS EXPR$2")) assertTrue(optimizedString.contains("'STRING' AS EXPR$3")) assertTrue(optimizedString.contains("'teststring' AS EXPR$4")) @@ -308,7 +308,7 @@ class ExpressionReductionTest { def testReduceFilterExpressionForStreamSQL(): Unit = { val tEnv = mockStreamTableEnvironment() - val sqlQuery = "SELECT STREAM " + + val sqlQuery = "SELECT " + "*" + "FROM MyTable WHERE a>(1+7)"