Repository: flink
Updated Branches:
  refs/heads/master f00e1e7c5 -> 72e6b760f


[FLINK-4546] [table] Remove STREAM keyword in Stream SQL

This closes #2454.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/72e6b760
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/72e6b760
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/72e6b760

Branch: refs/heads/master
Commit: 72e6b760fd951764c3ecc6fc191dc99a42d55e0b
Parents: f00e1e7
Author: Jark Wu <wuchong...@alibaba-inc.com>
Authored: Mon Aug 29 11:35:43 2016 +0800
Committer: twalthr <twal...@apache.org>
Committed: Tue Oct 4 15:24:48 2016 +0200

----------------------------------------------------------------------
 docs/dev/table_api.md                           |  8 +--
 .../api/java/table/StreamTableEnvironment.scala |  8 +--
 .../scala/table/StreamTableEnvironment.scala    |  8 +--
 .../api/table/StreamTableEnvironment.scala      | 38 ++--------
 .../flink/api/table/TableEnvironment.scala      | 14 +---
 .../datastream/StreamTableSourceScan.scala      |  4 +-
 .../api/table/plan/rules/FlinkRuleSets.scala    |  2 -
 .../plan/rules/datastream/RemoveDeltaRule.scala | 42 -----------
 .../datastream/StreamTableSourceScanRule.scala  |  6 +-
 .../api/table/plan/schema/DataStreamTable.scala | 11 ---
 .../schema/StreamableTableSourceTable.scala     | 30 --------
 .../table/plan/schema/TransStreamTable.scala    | 73 --------------------
 .../flink/api/java/stream/sql/SqlITCase.java    |  8 +--
 .../api/scala/stream/TableSourceITCase.scala    |  4 +-
 .../flink/api/scala/stream/sql/SqlITCase.scala  | 18 ++---
 .../api/table/ExpressionReductionTest.scala     | 10 +--
 16 files changed, 46 insertions(+), 238 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/72e6b760/docs/dev/table_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md
index 1d03b38..2d6d6ce 100644
--- a/docs/dev/table_api.md
+++ b/docs/dev/table_api.md
@@ -1061,7 +1061,7 @@ Among others, the following SQL features are not 
supported, yet:
 
 ### SQL on Streaming Tables
 
-SQL queries can be executed on streaming Tables (Tables backed by `DataStream` 
or `StreamTableSource`) by using the `SELECT STREAM` keywords instead of 
`SELECT`. Please refer to the [Apache Calcite's Streaming SQL 
documentation](https://calcite.apache.org/docs/stream.html) for more 
information on the Streaming SQL syntax.
+SQL queries can be executed on streaming Tables (Tables backed by `DataStream` 
or `StreamTableSource`) like standard SQL.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -1075,7 +1075,7 @@ DataStream<Tuple3<Long, String, Integer>> ds = 
env.addSource(...);
 tableEnv.registerDataStream("Orders", ds, "user, product, amount");
 // run a SQL query on the Table and retrieve the result as a new Table
 Table result = tableEnv.sql(
-  "SELECT STREAM product, amount FROM Orders WHERE product LIKE '%Rubber%'");
+  "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
 {% endhighlight %}
 </div>
 
@@ -1090,7 +1090,7 @@ val ds: DataStream[(Long, String, Integer)] = 
env.addSource(...)
 tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount)
 // run a SQL query on the Table and retrieve the result as a new Table
 val result = tableEnv.sql(
-  "SELECT STREAM product, amount FROM Orders WHERE product LIKE '%Rubber%'")
+  "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
 {% endhighlight %}
 </div>
 </div>
@@ -1125,7 +1125,7 @@ orderItem:
   expression [ ASC | DESC ]
 
 select:
-  SELECT [ STREAM ] [ ALL | DISTINCT ]
+  SELECT [ ALL | DISTINCT ]
   { * | projectItem [, projectItem ]* }
   FROM tableExpression
   [ WHERE booleanExpression ]

http://git-wip-us.apache.org/repos/asf/flink/blob/72e6b760/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala
index add9486..f8dbc37 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala
@@ -57,7 +57,7 @@ class StreamTableEnvironment(
   def fromDataStream[T](dataStream: DataStream[T]): Table = {
 
     val name = createUniqueTableName()
-    registerDataStreamInternal(name, dataStream, false)
+    registerDataStreamInternal(name, dataStream)
     ingest(name)
   }
 
@@ -82,7 +82,7 @@ class StreamTableEnvironment(
       .toArray
 
     val name = createUniqueTableName()
-    registerDataStreamInternal(name, dataStream, exprs, false)
+    registerDataStreamInternal(name, dataStream, exprs)
     ingest(name)
   }
 
@@ -101,7 +101,7 @@ class StreamTableEnvironment(
   def registerDataStream[T](name: String, dataStream: DataStream[T]): Unit = {
 
     checkValidTableName(name)
-    registerDataStreamInternal(name, dataStream, true)
+    registerDataStreamInternal(name, dataStream)
   }
 
   /**
@@ -127,7 +127,7 @@ class StreamTableEnvironment(
       .toArray
 
     checkValidTableName(name)
-    registerDataStreamInternal(name, dataStream, exprs, true)
+    registerDataStreamInternal(name, dataStream, exprs)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/72e6b760/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
index ee8c56a..e106178 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
@@ -59,7 +59,7 @@ class StreamTableEnvironment(
   def fromDataStream[T](dataStream: DataStream[T]): Table = {
 
     val name = createUniqueTableName()
-    registerDataStreamInternal(name, dataStream.javaStream, false)
+    registerDataStreamInternal(name, dataStream.javaStream)
     ingest(name)
   }
 
@@ -81,7 +81,7 @@ class StreamTableEnvironment(
   def fromDataStream[T](dataStream: DataStream[T], fields: Expression*): Table 
= {
 
     val name = createUniqueTableName()
-    registerDataStreamInternal(name, dataStream.javaStream, fields.toArray, 
false)
+    registerDataStreamInternal(name, dataStream.javaStream, fields.toArray)
     ingest(name)
   }
 
@@ -100,7 +100,7 @@ class StreamTableEnvironment(
   def registerDataStream[T](name: String, dataStream: DataStream[T]): Unit = {
 
     checkValidTableName(name)
-    registerDataStreamInternal(name, dataStream.javaStream, true)
+    registerDataStreamInternal(name, dataStream.javaStream)
   }
 
   /**
@@ -123,7 +123,7 @@ class StreamTableEnvironment(
   def registerDataStream[T](name: String, dataStream: DataStream[T], fields: 
Expression*): Unit = {
 
     checkValidTableName(name)
-    registerDataStreamInternal(name, dataStream.javaStream, fields.toArray, 
true)
+    registerDataStreamInternal(name, dataStream.javaStream, fields.toArray)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/72e6b760/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
index 44d90ac..15e3960 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
@@ -27,14 +27,12 @@ import org.apache.calcite.sql2rel.RelDecorrelator
 import org.apache.calcite.tools.Programs
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.DataSet
 import org.apache.flink.api.table.expressions.Expression
 import org.apache.flink.api.table.plan.logical.{CatalogNode, LogicalRelNode}
 import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamConvention, 
DataStreamRel}
 import org.apache.flink.api.table.plan.rules.FlinkRuleSets
 import org.apache.flink.api.table.sinks.{StreamTableSink, TableSink}
-import org.apache.flink.api.table.plan.schema.
-  {StreamableTableSourceTable, TransStreamTable, DataStreamTable}
+import org.apache.flink.api.table.plan.schema.{TableSourceTable, 
DataStreamTable}
 import org.apache.flink.api.table.sources.StreamTableSource
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
@@ -113,7 +111,7 @@ abstract class StreamTableEnvironment(
   def registerTableSource(name: String, tableSource: StreamTableSource[_]): 
Unit = {
 
     checkValidTableName(name)
-    registerTableInternal(name, new StreamableTableSourceTable(tableSource))
+    registerTableInternal(name, new TableSourceTable(tableSource))
   }
 
   /**
@@ -167,14 +165,11 @@ abstract class StreamTableEnvironment(
     *
     * @param name The name under which the table is registered in the catalog.
     * @param dataStream The [[DataStream]] to register as table in the catalog.
-    * @param wrapper True if the registration has to wrap the datastreamTable
-    *                into a [[org.apache.calcite.schema.StreamableTable]]
     * @tparam T the type of the [[DataStream]].
     */
   protected def registerDataStreamInternal[T](
     name: String,
-    dataStream: DataStream[T],
-    wrapper: Boolean): Unit = {
+    dataStream: DataStream[T]): Unit = {
 
     val (fieldNames, fieldIndexes) = getFieldInfo[T](dataStream.getType)
     val dataStreamTable = new DataStreamTable[T](
@@ -182,16 +177,7 @@ abstract class StreamTableEnvironment(
       fieldIndexes,
       fieldNames
     )
-    // when registering a DataStream, we need to wrap it into a 
TransStreamTable
-    // so that the SQL validation phase won't fail
-    if (wrapper) {
-      registerTableInternal(name, dataStreamTable)
-      val t = ingest(name)
-      replaceRegisteredTable(name, new TransStreamTable(t.getRelNode, true))
-    }
-    else {
-      registerTableInternal(name, dataStreamTable)
-    }
+    registerTableInternal(name, dataStreamTable)
   }
 
   /**
@@ -201,15 +187,12 @@ abstract class StreamTableEnvironment(
     * @param name The name under which the table is registered in the catalog.
     * @param dataStream The [[DataStream]] to register as table in the catalog.
     * @param fields The field expressions to define the field names of the 
table.
-    * @param wrapper True if the registration has to wrap the datastreamTable
-    *                into a [[org.apache.calcite.schema.StreamableTable]]
     * @tparam T The type of the [[DataStream]].
     */
   protected def registerDataStreamInternal[T](
     name: String,
     dataStream: DataStream[T],
-    fields: Array[Expression],
-    wrapper: Boolean): Unit = {
+    fields: Array[Expression]): Unit = {
 
     val (fieldNames, fieldIndexes) = getFieldInfo[T](dataStream.getType, 
fields.toArray)
     val dataStreamTable = new DataStreamTable[T](
@@ -217,16 +200,7 @@ abstract class StreamTableEnvironment(
       fieldIndexes.toArray,
       fieldNames.toArray
     )
-    // when registering a DataStream, we need to wrap it into a StreamableTable
-    // so that the SQL validation phase won't fail
-    if (wrapper) {
-      registerTableInternal(name, dataStreamTable)
-      val t = ingest(name)
-      replaceRegisteredTable(name, new TransStreamTable(t.getRelNode, true))
-    }
-    else {
-      registerTableInternal(name, dataStreamTable)
-    }
+    registerTableInternal(name, dataStreamTable)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/72e6b760/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
index 02204b1..c3b728b 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
@@ -40,7 +40,7 @@ import org.apache.flink.api.scala.{ExecutionEnvironment => 
ScalaBatchExecEnv}
 import org.apache.flink.api.table.expressions.{Alias, Expression, 
UnresolvedFieldReference}
 import org.apache.flink.api.table.functions.{ScalarFunction, 
UserDefinedFunction}
 import org.apache.flink.api.table.plan.cost.DataSetCostFactory
-import org.apache.flink.api.table.plan.schema.{RelTable, TransStreamTable}
+import org.apache.flink.api.table.plan.schema.RelTable
 import org.apache.flink.api.table.sinks.TableSink
 import org.apache.flink.api.table.validate.FunctionCatalog
 import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment 
=> JavaStreamExecEnv}
@@ -130,16 +130,8 @@ abstract class TableEnvironment(val config: TableConfig) {
     }
 
     checkValidTableName(name)
-
-    table.tableEnv match {
-      case e: BatchTableEnvironment =>
-        val tableTable = new RelTable(table.getRelNode)
-        registerTableInternal(name, tableTable)
-      case e: StreamTableEnvironment =>
-        val sTableTable = new TransStreamTable(table.getRelNode, true)
-        tables.add(name, sTableTable)
-    }
-
+    val tableTable = new RelTable(table.getRelNode)
+    registerTableInternal(name, tableTable)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/72e6b760/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamTableSourceScan.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamTableSourceScan.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamTableSourceScan.scala
index 2c7a584..21b8a63 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamTableSourceScan.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamTableSourceScan.scala
@@ -23,7 +23,7 @@ import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.table.StreamTableEnvironment
-import org.apache.flink.api.table.plan.schema.StreamableTableSourceTable
+import org.apache.flink.api.table.plan.schema.TableSourceTable
 import org.apache.flink.api.table.sources.StreamTableSource
 import org.apache.flink.streaming.api.datastream.DataStream
 
@@ -35,7 +35,7 @@ class StreamTableSourceScan(
     rowType: RelDataType)
   extends StreamScan(cluster, traitSet, table, rowType) {
 
-  val tableSourceTable = table.unwrap(classOf[StreamableTableSourceTable])
+  val tableSourceTable = table.unwrap(classOf[TableSourceTable])
   val tableSource = 
tableSourceTable.tableSource.asInstanceOf[StreamTableSource[_]]
 
   override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): 
RelNode = {

http://git-wip-us.apache.org/repos/asf/flink/blob/72e6b760/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
index 7d915e6..03cb68c 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
@@ -114,8 +114,6 @@ object FlinkRuleSets {
   */
   val DATASTREAM_OPT_RULES: RuleSet = RuleSets.ofList(
 
-      RemoveDeltaRule.INSTANCE,
-
       // convert a logical table scan to a relational expression
       TableScanRule.INSTANCE,
       EnumerableToLogicalTableScan.INSTANCE,

http://git-wip-us.apache.org/repos/asf/flink/blob/72e6b760/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/RemoveDeltaRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/RemoveDeltaRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/RemoveDeltaRule.scala
deleted file mode 100644
index 7b4720a..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/RemoveDeltaRule.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.rules.datastream
-
-import org.apache.calcite.plan.RelOptRule._
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
-import org.apache.calcite.rel.stream.LogicalDelta
-
-/**
- * Rule that converts an EnumerableTableScan into a LogicalTableScan.
- * We need this rule because Calcite creates an EnumerableTableScan
- * when parsing a SQL query. We convert it into a LogicalTableScan
- * so we can merge the optimization process with any plan that might be created
- * by the Table API.
- */
-class RemoveDeltaRule extends RelOptRule(operand(classOf[LogicalDelta], any), 
"RemoveDeltaRule") {
-
-  override def onMatch(call: RelOptRuleCall): Unit = {
-    val delta = call.rel(0).asInstanceOf[LogicalDelta]
-    call.transformTo(delta.getInput)
-  }
-}
-
-object RemoveDeltaRule {
-  val INSTANCE = new RemoveDeltaRule()
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/72e6b760/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamTableSourceScanRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamTableSourceScanRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamTableSourceScanRule.scala
index 8000cde..9d8075c 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamTableSourceScanRule.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/StreamTableSourceScanRule.scala
@@ -25,7 +25,7 @@ import org.apache.calcite.rel.core.TableScan
 import org.apache.calcite.rel.logical.LogicalTableScan
 import org.apache.flink.api.table.plan.nodes.datastream.
   {StreamTableSourceScan, DataStreamConvention}
-import org.apache.flink.api.table.plan.schema.StreamableTableSourceTable
+import org.apache.flink.api.table.plan.schema.TableSourceTable
 import org.apache.flink.api.table.sources.StreamTableSource
 
 /** Rule to convert a [[LogicalTableScan]] into a [[StreamTableSourceScan]]. */
@@ -40,9 +40,9 @@ class StreamTableSourceScanRule
   /** Rule must only match if TableScan targets a [[StreamTableSource]] */
   override def matches(call: RelOptRuleCall): Boolean = {
     val scan: TableScan = call.rel(0).asInstanceOf[TableScan]
-    val dataSetTable = 
scan.getTable.unwrap(classOf[StreamableTableSourceTable])
+    val dataSetTable = scan.getTable.unwrap(classOf[TableSourceTable])
     dataSetTable match {
-      case tst: StreamableTableSourceTable =>
+      case tst: TableSourceTable =>
         tst.tableSource match {
           case _: StreamTableSource[_] =>
             true

http://git-wip-us.apache.org/repos/asf/flink/blob/72e6b760/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataStreamTable.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataStreamTable.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataStreamTable.scala
index 0fb5db9..570d723 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataStreamTable.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataStreamTable.scala
@@ -18,8 +18,6 @@
 
 package org.apache.flink.api.table.plan.schema
 
-import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
-import org.apache.flink.api.table.FlinkTypeFactory
 import org.apache.flink.streaming.api.datastream.DataStream
 
 class DataStreamTable[T](
@@ -27,13 +25,4 @@ class DataStreamTable[T](
     override val fieldIndexes: Array[Int],
     override val fieldNames: Array[String])
   extends FlinkTable[T](dataStream.getType, fieldIndexes, fieldNames) {
-
-  override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
-    val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory]
-    val builder = typeFactory.builder
-    fieldNames.zip(fieldTypes)
-      .foreach( f =>
-        builder.add(f._1, 
flinkTypeFactory.createTypeFromTypeInfo(f._2)).nullable(true) )
-    builder.build
-  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/72e6b760/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/StreamableTableSourceTable.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/StreamableTableSourceTable.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/StreamableTableSourceTable.scala
deleted file mode 100644
index 58214bc..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/StreamableTableSourceTable.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.schema
-
-import org.apache.calcite.schema.{Table, StreamableTable}
-import org.apache.flink.api.table.sources.TableSource
-
-/** Table which defines an external streamable table via a [[TableSource]] */
-class StreamableTableSourceTable(tableSource: TableSource[_])
-  extends TableSourceTable(tableSource)
-  with StreamableTable {
-
-  override def stream(): Table = new TableSourceTable(tableSource)
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/72e6b760/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TransStreamTable.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TransStreamTable.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TransStreamTable.scala
deleted file mode 100644
index 61f2598..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TransStreamTable.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.schema
-
-import org.apache.calcite.plan.RelOptTable
-import org.apache.calcite.plan.RelOptTable.ToRelContext
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
-import org.apache.calcite.schema.Schema.TableType
-import org.apache.calcite.schema.impl.AbstractTable
-import org.apache.calcite.schema.{StreamableTable, Table, TranslatableTable}
-
-/**
-  * A [[org.apache.calcite.schema.Table]] implementation for registering
-  * Streaming Table API Tables in the Calcite schema to be used by Flink SQL.
-  * It implements [[TranslatableTable]] so that its logical scan
-  * can be converted to a relational expression and [[StreamableTable]]
-  * so that it can be used in Streaming SQL queries.
-  *
-  * @see [[DataStreamTable]]
-  */
-class TransStreamTable(relNode: RelNode, wrapper: Boolean)
-  extends AbstractTable
-  with TranslatableTable
-  with StreamableTable {
-
-  override def getJdbcTableType: TableType = ???
-
-  override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = 
relNode.getRowType
-
-  override def stream(): Table = {
-    if (wrapper) {
-      // we need to return a wrapper non-streamable table,
-      // otherwise Calcite's rule-matching produces an infinite loop
-      new StreamTable(relNode)
-    }
-    else {
-      this
-    }
-  }
-
-  override def toRel(context: ToRelContext, relOptTable: RelOptTable): RelNode 
=
-    relNode
-
-  /**
-    * Wraps a [[TransStreamTable]]'s relNode
-    * to implement its stream() method.
-    */
-  class StreamTable(relNode: RelNode) extends AbstractTable {
-
-    override def getJdbcTableType: TableType = ???
-
-    override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
-      relNode.getRowType
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/72e6b760/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/stream/sql/SqlITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/stream/sql/SqlITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/stream/sql/SqlITCase.java
index 1743981..10ae5d9 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/stream/sql/SqlITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/stream/sql/SqlITCase.java
@@ -46,7 +46,7 @@ public class SqlITCase extends 
StreamingMultipleProgramsTestBase {
                Table in = tableEnv.fromDataStream(ds, "a,b,c");
                tableEnv.registerTable("MyTable", in);
 
-               String sqlQuery = "SELECT STREAM * FROM MyTable";
+               String sqlQuery = "SELECT * FROM MyTable";
                Table result = tableEnv.sql(sqlQuery);
 
                DataStream<Row> resultSet = tableEnv.toDataStream(result, 
Row.class);
@@ -70,7 +70,7 @@ public class SqlITCase extends 
StreamingMultipleProgramsTestBase {
                DataStream<Tuple5<Integer, Long, Integer, String, Long>> ds = 
StreamTestData.get5TupleDataStream(env);
                tableEnv.registerDataStream("MyTable", ds, "a, b, c, d, e");
 
-               String sqlQuery = "SELECT STREAM a, b, e FROM MyTable WHERE c < 
4";
+               String sqlQuery = "SELECT a, b, e FROM MyTable WHERE c < 4";
                Table result = tableEnv.sql(sqlQuery);
 
                DataStream<Row> resultSet = tableEnv.toDataStream(result, 
Row.class);
@@ -99,9 +99,9 @@ public class SqlITCase extends 
StreamingMultipleProgramsTestBase {
                DataStream<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
StreamTestData.get5TupleDataStream(env);
                tableEnv.registerDataStream("T2", ds2, "a, b, d, c, e");
 
-               String sqlQuery = "SELECT STREAM * FROM T1 " +
+               String sqlQuery = "SELECT * FROM T1 " +
                                                        "UNION ALL " +
-                                                       "(SELECT STREAM a, b, c 
FROM T2 WHERE a < 3)";
+                                                       "(SELECT a, b, c FROM 
T2 WHERE a        < 3)";
                Table result = tableEnv.sql(sqlQuery);
 
                DataStream<Row> resultSet = tableEnv.toDataStream(result, 
Row.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/72e6b760/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSourceITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSourceITCase.scala
index c82e6df..c14ad97 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSourceITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSourceITCase.scala
@@ -72,7 +72,7 @@ class TableSourceITCase extends 
StreamingMultipleProgramsTestBase {
 
     tEnv.registerTableSource("MyTestTable", new TestStreamTableSource(33))
     tEnv.sql(
-      "SELECT STREAM amount * id, name FROM MyTestTable WHERE amount < 4")
+      "SELECT amount * id, name FROM MyTestTable WHERE amount < 4")
       .toDataStream[Row]
       .addSink(new StreamITCase.StringSink)
 
@@ -128,7 +128,7 @@ class TableSourceITCase extends 
StreamingMultipleProgramsTestBase {
 
     tEnv.registerTableSource("csvTable", csvTable)
     tEnv.sql(
-      "SELECT STREAM last, score, id FROM csvTable WHERE id < 4 ")
+      "SELECT last, score, id FROM csvTable WHERE id < 4 ")
       .toDataStream[Row]
       .addSink(new StreamITCase.StringSink)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/72e6b760/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/sql/SqlITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/sql/SqlITCase.scala
index 26c701f..5b278c1 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/sql/SqlITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/sql/SqlITCase.scala
@@ -39,7 +39,7 @@ class SqlITCase extends StreamingMultipleProgramsTestBase {
     val tEnv = TableEnvironment.getTableEnvironment(env)
     StreamITCase.testResults = mutable.MutableList()
 
-    val sqlQuery = "SELECT STREAM a * 2, b - 1 FROM MyTable"
+    val sqlQuery = "SELECT a * 2, b - 1 FROM MyTable"
 
     val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 
'b, 'c)
     tEnv.registerTable("MyTable", t)
@@ -60,7 +60,7 @@ class SqlITCase extends StreamingMultipleProgramsTestBase {
     val tEnv = TableEnvironment.getTableEnvironment(env)
     StreamITCase.testResults = mutable.MutableList()
 
-    val sqlQuery = "SELECT STREAM * FROM MyTable WHERE a = 3"
+    val sqlQuery = "SELECT * FROM MyTable WHERE a = 3"
 
     val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 
'b, 'c)
     tEnv.registerTable("MyTable", t)
@@ -81,7 +81,7 @@ class SqlITCase extends StreamingMultipleProgramsTestBase {
     val tEnv = TableEnvironment.getTableEnvironment(env)
     StreamITCase.testResults = mutable.MutableList()
 
-    val sqlQuery = "SELECT STREAM * FROM MyTable WHERE _1 = 3"
+    val sqlQuery = "SELECT * FROM MyTable WHERE _1 = 3"
 
     val t = StreamTestData.getSmall3TupleDataStream(env)
     tEnv.registerDataStream("MyTable", t)
@@ -101,9 +101,9 @@ class SqlITCase extends StreamingMultipleProgramsTestBase {
     val tEnv = TableEnvironment.getTableEnvironment(env)
     StreamITCase.testResults = mutable.MutableList()
 
-    val sqlQuery = "SELECT STREAM * FROM T1 " +
+    val sqlQuery = "SELECT * FROM T1 " +
       "UNION ALL " +
-      "SELECT STREAM * FROM T2"
+      "SELECT * FROM T2"
 
     val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 
'b, 'c)
     tEnv.registerTable("T1", t1)
@@ -128,9 +128,9 @@ class SqlITCase extends StreamingMultipleProgramsTestBase {
     val tEnv = TableEnvironment.getTableEnvironment(env)
     StreamITCase.testResults = mutable.MutableList()
 
-    val sqlQuery = "SELECT STREAM * FROM T1 WHERE a = 3 " +
+    val sqlQuery = "SELECT * FROM T1 WHERE a = 3 " +
       "UNION ALL " +
-      "SELECT STREAM * FROM T2 WHERE a = 2"
+      "SELECT * FROM T2 WHERE a = 2"
 
     val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 
'b, 'c)
     tEnv.registerTable("T1", t1)
@@ -154,9 +154,9 @@ class SqlITCase extends StreamingMultipleProgramsTestBase {
     val tEnv = TableEnvironment.getTableEnvironment(env)
     StreamITCase.testResults = mutable.MutableList()
 
-    val sqlQuery = "SELECT STREAM c FROM T1 WHERE a = 3 " +
+    val sqlQuery = "SELECT c FROM T1 WHERE a = 3 " +
       "UNION ALL " +
-      "SELECT STREAM c FROM T2 WHERE a = 2"
+      "SELECT c FROM T2 WHERE a = 2"
 
     val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 
'b, 'c)
     tEnv.registerTable("T1", t1)

http://git-wip-us.apache.org/repos/asf/flink/blob/72e6b760/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala
index 4830b75..925a818 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/ExpressionReductionTest.scala
@@ -229,7 +229,7 @@ class ExpressionReductionTest {
   def testReduceCalcExpressionForStreamSQL(): Unit = {
     val tEnv = mockStreamTableEnvironment()
 
-    val sqlQuery = "SELECT STREAM " +
+    val sqlQuery = "SELECT " +
       "(3+4)+a, " +
       "b+(1+2), " +
       "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
@@ -269,7 +269,7 @@ class ExpressionReductionTest {
   def testReduceProjectExpressionForStreamSQL(): Unit = {
     val tEnv = mockStreamTableEnvironment()
 
-    val sqlQuery = "SELECT STREAM " +
+    val sqlQuery = "SELECT " +
       "(3+4)+a, " +
       "b+(1+2), " +
       "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
@@ -289,8 +289,8 @@ class ExpressionReductionTest {
 
     val optimized = tEnv.optimize(table.getRelNode)
     val optimizedString = optimized.toString
-    assertTrue(optimizedString.contains("+(7, a) AS EXPR$0"))
-    assertTrue(optimizedString.contains("+(b, 3) AS EXPR$1"))
+    assertTrue(optimizedString.contains("+(7, _1) AS EXPR$0"))
+    assertTrue(optimizedString.contains("+(_2, 3) AS EXPR$1"))
     assertTrue(optimizedString.contains("'b' AS EXPR$2"))
     assertTrue(optimizedString.contains("'STRING' AS EXPR$3"))
     assertTrue(optimizedString.contains("'teststring' AS EXPR$4"))
@@ -308,7 +308,7 @@ class ExpressionReductionTest {
   def testReduceFilterExpressionForStreamSQL(): Unit = {
     val tEnv = mockStreamTableEnvironment()
 
-    val sqlQuery = "SELECT STREAM " +
+    val sqlQuery = "SELECT " +
       "*" +
       "FROM MyTable WHERE a>(1+7)"
 

Reply via email to