This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d903480  [FLINK-13045][table] Move Scala expression DSL to 
flink-table-api-scala
d903480 is described below

commit d903480ef7abc95cd419e6a194db2276dd7eb4cb
Author: Timo Walther <twal...@apache.org>
AuthorDate: Tue Jul 2 16:55:08 2019 +0200

    [FLINK-13045][table] Move Scala expression DSL to flink-table-api-scala
    
    This move the Scala expression DSL to flink-table-api-scala.
    
    Users of pure table programs should define there imports like:
    
    import org.apache.flink.table.api._
    TableEnvironment.create(...)
    
    Users of the DataStream API should define their imports like:
    
    import org.apache.flink.table.api._
    import org.apache.flink.table.api.scala._
    StreamTableEnvironment.create(...)
    
    This commit did not split the package object 
org.apache.flink.table.api.scala._ into
    two parts yet because we want to give users the chance to update their 
imports.
    
    This closes #8945.
---
 docs/dev/table/tableApi.md                         |   4 +
 docs/dev/table/tableApi.zh.md                      |   4 +
 flink-table/flink-table-api-scala-bridge/pom.xml   |   9 +-
 .../flink/table/api/scala/DataSetConversions.scala |   6 +-
 .../table/api/scala/DataStreamConversions.scala    |   6 +-
 .../flink/table/api/scala/TableConversions.scala   |  25 +-
 .../org/apache/flink/table/api/scala/package.scala |  89 ++++++
 flink-table/flink-table-api-scala/pom.xml          |  47 +++
 .../apache/flink/table/api}/expressionDsl.scala    | 319 ++++++---------------
 .../scala/org/apache/flink/table/api/package.scala |  48 ++++
 .../scala/org/apache/flink/table/api/package.scala |  34 ---
 .../org/apache/flink/table/api/scala/package.scala |  93 ------
 12 files changed, 307 insertions(+), 377 deletions(-)

diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index a0bd51a..bd6bd38 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -44,6 +44,9 @@ The following example shows the differences between the Scala 
and Java Table API
 The Java Table API is enabled by importing 
`org.apache.flink.table.api.java.*`. The following example shows how a Java 
Table API program is constructed and how expressions are specified as strings.
 
 {% highlight java %}
+import org.apache.flink.table.api._
+import org.apache.flink.table.api.java._
+
 // environment configuration
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
@@ -73,6 +76,7 @@ The following example shows how a Scala Table API program is 
constructed. Table
 
 {% highlight scala %}
 import org.apache.flink.api.scala._
+import org.apache.flink.table.api._
 import org.apache.flink.table.api.scala._
 
 // environment configuration
diff --git a/docs/dev/table/tableApi.zh.md b/docs/dev/table/tableApi.zh.md
index 7a75c0a..e4b8a55 100644
--- a/docs/dev/table/tableApi.zh.md
+++ b/docs/dev/table/tableApi.zh.md
@@ -44,6 +44,9 @@ The following example shows the differences between the Scala 
and Java Table API
 The Java Table API is enabled by importing 
`org.apache.flink.table.api.java.*`. The following example shows how a Java 
Table API program is constructed and how expressions are specified as strings.
 
 {% highlight java %}
+import org.apache.flink.table.api._
+import org.apache.flink.table.api.java._
+
 // environment configuration
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
@@ -73,6 +76,7 @@ The following example shows how a Scala Table API program is 
constructed. Table
 
 {% highlight scala %}
 import org.apache.flink.api.scala._
+import org.apache.flink.table.api._
 import org.apache.flink.table.api.scala._
 
 // environment configuration
diff --git a/flink-table/flink-table-api-scala-bridge/pom.xml 
b/flink-table/flink-table-api-scala-bridge/pom.xml
index 2a7f2ec..f49f71c 100644
--- a/flink-table/flink-table-api-scala-bridge/pom.xml
+++ b/flink-table/flink-table-api-scala-bridge/pom.xml
@@ -61,19 +61,18 @@ under the License.
                                <groupId>net.alchim31.maven</groupId>
                                <artifactId>scala-maven-plugin</artifactId>
                                <executions>
-                                       <!-- Run scala compiler in the 
process-resources phase, so that dependencies on
-                                               scala classes can be resolved 
later in the (Java) compile phase -->
+                                       <!-- Run Scala compiler in the 
process-resources phase, so that dependencies on
+                                               Scala classes can be resolved 
later in the (Java) compile phase -->
                                        <execution>
                                                <id>scala-compile-first</id>
                                                <phase>process-resources</phase>
                                                <goals>
-                                                       <goal>add-source</goal>
                                                        <goal>compile</goal>
                                                </goals>
                                        </execution>
 
-                                       <!-- Run scala compiler in the 
process-test-resources phase, so that dependencies on
-                                                scala classes can be resolved 
later in the (Java) test-compile phase -->
+                                       <!-- Run Scala compiler in the 
process-test-resources phase, so that dependencies on
+                                                Scala classes can be resolved 
later in the (Java) test-compile phase -->
                                        <execution>
                                                <id>scala-test-compile</id>
                                                
<phase>process-test-resources</phase>
diff --git 
a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/DataSetConversions.scala
 
b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/DataSetConversions.scala
index 4b92bdb..4d80e75 100644
--- 
a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/DataSetConversions.scala
+++ 
b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/DataSetConversions.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.table.api.scala
 
+import org.apache.flink.annotation.PublicEvolving
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.Table
@@ -29,16 +30,17 @@ import org.apache.flink.table.expressions.Expression
   * @param inputType The [[TypeInformation]] for the type of the [[DataSet]].
   * @tparam T The type of the [[DataSet]].
   */
+@PublicEvolving
 class DataSetConversions[T](dataSet: DataSet[T], inputType: 
TypeInformation[T]) {
 
   /**
     * Converts the [[DataSet]] into a [[Table]].
     *
-    * The field name of the new [[Table]] can be specified like this:
+    * The field names of the new [[Table]] can be specified like this:
     *
     * {{{
     *   val env = ExecutionEnvironment.getExecutionEnvironment
-    *   val tEnv = TableEnvironment.getTableEnvironment(env)
+    *   val tEnv = BatchTableEnvironment.create(env)
     *
     *   val set: DataSet[(String, Int)] = ...
     *   val table = set.toTable(tEnv, 'name, 'amount)
diff --git 
a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/DataStreamConversions.scala
 
b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/DataStreamConversions.scala
index 3c31859..1360f5c 100644
--- 
a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/DataStreamConversions.scala
+++ 
b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/DataStreamConversions.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.table.api.scala
 
+import org.apache.flink.annotation.PublicEvolving
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.streaming.api.scala.DataStream
 import org.apache.flink.table.api.Table
@@ -29,16 +30,17 @@ import org.apache.flink.table.expressions.Expression
   * @param inputType The [[TypeInformation]] for the type of the 
[[DataStream]].
   * @tparam T The type of the [[DataStream]].
   */
+@PublicEvolving
 class DataStreamConversions[T](dataStream: DataStream[T], inputType: 
TypeInformation[T]) {
 
   /**
     * Converts the [[DataStream]] into a [[Table]].
     *
-    * The field name of the new [[Table]] can be specified like this:
+    * The field names of the new [[Table]] can be specified like this:
     *
     * {{{
     *   val env = StreamExecutionEnvironment.getExecutionEnvironment
-    *   val tEnv = TableEnvironment.getTableEnvironment(env)
+    *   val tEnv = StreamTableEnvironment.create(env)
     *
     *   val stream: DataStream[(String, Int)] = ...
     *   val table = stream.toTable(tEnv, 'name, 'amount)
diff --git 
a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
 
b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
index 0f3c8eb..ff74c78 100644
--- 
a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
+++ 
b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
@@ -18,18 +18,19 @@
 
 package org.apache.flink.table.api.scala
 
+import org.apache.flink.annotation.PublicEvolving
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.scala.DataStream
 import org.apache.flink.table.api.internal.TableImpl
-import org.apache.flink.table.api.scala.{BatchTableEnvironment => 
ScalaBatchTableEnv, StreamTableEnvironment => ScalaStreamTableEnv}
-import org.apache.flink.table.api.{BatchQueryConfig, StreamQueryConfig, Table, 
TableException}
+import org.apache.flink.table.api.{BatchQueryConfig, StreamQueryConfig, Table, 
TableException, ValidationException}
 
 /**
   * Holds methods to convert a [[Table]] into a [[DataSet]] or a 
[[DataStream]].
   *
   * @param table The table to convert.
   */
+@PublicEvolving
 class TableConversions(table: Table) {
 
   private val internalTable = table.asInstanceOf[TableImpl]
@@ -48,10 +49,10 @@ class TableConversions(table: Table) {
   def toDataSet[T: TypeInformation]: DataSet[T] = {
 
     internalTable.getTableEnvironment match {
-      case tEnv: ScalaBatchTableEnv =>
+      case tEnv: BatchTableEnvironment =>
         tEnv.toDataSet(table)
       case _ =>
-        throw new TableException(
+        throw new ValidationException(
           "Only tables that originate from Scala DataSets can be converted to 
Scala DataSets.")
     }
   }
@@ -71,10 +72,10 @@ class TableConversions(table: Table) {
   def toDataSet[T: TypeInformation](queryConfig: BatchQueryConfig): DataSet[T] 
= {
 
     internalTable.getTableEnvironment match {
-      case tEnv: ScalaBatchTableEnv =>
+      case tEnv: BatchTableEnvironment =>
         tEnv.toDataSet(table, queryConfig)
       case _ =>
-        throw new TableException(
+        throw new ValidationException(
           "Only tables that originate from Scala DataSets can be converted to 
Scala DataSets.")
     }
   }
@@ -96,10 +97,10 @@ class TableConversions(table: Table) {
   def toAppendStream[T: TypeInformation]: DataStream[T] = {
 
     internalTable.getTableEnvironment match {
-      case tEnv: ScalaStreamTableEnv =>
+      case tEnv: StreamTableEnvironment =>
         tEnv.toAppendStream(table)
       case _ =>
-        throw new TableException(
+        throw new ValidationException(
           "Only tables that originate from Scala DataStreams " +
             "can be converted to Scala DataStreams.")
     }
@@ -122,10 +123,10 @@ class TableConversions(table: Table) {
     */
   def toAppendStream[T: TypeInformation](queryConfig: StreamQueryConfig): 
DataStream[T] = {
     internalTable.getTableEnvironment match {
-      case tEnv: ScalaStreamTableEnv =>
+      case tEnv: StreamTableEnvironment =>
         tEnv.toAppendStream(table, queryConfig)
       case _ =>
-        throw new TableException(
+        throw new ValidationException(
           "Only tables that originate from Scala DataStreams " +
             "can be converted to Scala DataStreams.")
     }
@@ -141,7 +142,7 @@ class TableConversions(table: Table) {
   def toRetractStream[T: TypeInformation]: DataStream[(Boolean, T)] = {
 
     internalTable.getTableEnvironment match {
-      case tEnv: ScalaStreamTableEnv =>
+      case tEnv: StreamTableEnvironment =>
         tEnv.toRetractStream(table)
       case _ =>
         throw new TableException(
@@ -163,7 +164,7 @@ class TableConversions(table: Table) {
       queryConfig: StreamQueryConfig): DataStream[(Boolean, T)] = {
 
     internalTable.getTableEnvironment match {
-      case tEnv: ScalaStreamTableEnv =>
+      case tEnv: StreamTableEnvironment =>
         tEnv.toRetractStream(table, queryConfig)
       case _ =>
         throw new TableException(
diff --git 
a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/package.scala
 
b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/package.scala
new file mode 100644
index 0000000..556c657
--- /dev/null
+++ 
b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/package.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api
+
+import org.apache.flink.api.scala.{DataSet, _}
+import org.apache.flink.streaming.api.scala.DataStream
+import org.apache.flink.table.api.internal.TableImpl
+import org.apache.flink.table.api.scala.StreamTableEnvironment
+import org.apache.flink.types.Row
+
+import _root_.scala.language.implicitConversions
+
+/**
+  * == Table & SQL API with Flink's DataStream API ==
+  *
+  * This package contains the API of the Table & SQL API that bridges to 
Flink's [[DataStream]] API
+  * for the Scala programming language. Users can create [[Table]]s from 
[[DataStream]]s on which
+  * relational operations can be performed. Tables can also be converted back 
to [[DataStream]]s for
+  * further processing.
+  *
+  * For accessing all API classes and implicit conversions, use the following 
imports:
+  *
+  * {{{
+  *   import org.apache.flink.table.api._
+  *   import org.apache.flink.table.api.scala._
+  * }}}
+  *
+  * More information about the entry points of the API can be found in 
[[StreamTableEnvironment]].
+  *
+  * Available implicit expressions are listed in 
[[ImplicitExpressionConversions]] and
+  * [[ImplicitExpressionOperations]].
+  *
+  * Available implicit table-to-stream conversions are listed in this package 
object.
+  *
+  * Please refer to the website documentation about how to construct and run 
table programs that are
+  * connected to the DataStream API.
+  */
+package object scala extends ImplicitExpressionConversions {
+
+  // This package object should not extend from ImplicitExpressionConversions 
but would clash with
+  // "org.apache.flink.table.api._" therefore we postpone splitting the 
package object into
+  // two and let users update there imports first. All users should import 
both `api._` and
+  // `api.scala._`.
+
+  implicit def tableConversions(table: Table): TableConversions = {
+    new TableConversions(table.asInstanceOf[TableImpl])
+  }
+
+  implicit def dataSetConversions[T](set: DataSet[T]): DataSetConversions[T] = 
{
+    new DataSetConversions[T](set, set.getType())
+  }
+
+  implicit def dataStreamConversions[T](set: DataStream[T]): 
DataStreamConversions[T] = {
+    new DataStreamConversions[T](set, set.dataType)
+  }
+
+  implicit def table2RowDataSet(table: Table): DataSet[Row] = {
+    val tableEnv = table.asInstanceOf[TableImpl].getTableEnvironment
+    if (!tableEnv.isInstanceOf[BatchTableEnvironment]) {
+      throw new ValidationException("Table cannot be converted into a DataSet. 
" +
+        "It is not part of a batch table environment.")
+    }
+    tableEnv.asInstanceOf[BatchTableEnvironment].toDataSet[Row](table)
+  }
+
+  implicit def table2RowDataStream(table: Table): DataStream[Row] = {
+    val tableEnv = table.asInstanceOf[TableImpl].getTableEnvironment
+    if (!tableEnv.isInstanceOf[StreamTableEnvironment]) {
+      throw new ValidationException("Table cannot be converted into a 
DataStream. " +
+        "It is not part of a stream table environment.")
+    }
+    tableEnv.asInstanceOf[StreamTableEnvironment].toAppendStream[Row](table)
+  }
+}
diff --git a/flink-table/flink-table-api-scala/pom.xml 
b/flink-table/flink-table-api-scala/pom.xml
index 7fd6f0a..37a1bcf 100644
--- a/flink-table/flink-table-api-scala/pom.xml
+++ b/flink-table/flink-table-api-scala/pom.xml
@@ -37,6 +37,7 @@ under the License.
        <packaging>jar</packaging>
 
        <dependencies>
+               <!-- Flink dependencies -->
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-table-common</artifactId>
@@ -47,5 +48,51 @@ under the License.
                        <artifactId>flink-table-api-java</artifactId>
                        <version>${project.version}</version>
                </dependency>
+
+               <!-- External dependencies -->
+               <dependency>
+                       <groupId>org.scala-lang</groupId>
+                       <artifactId>scala-library</artifactId>
+               </dependency>
        </dependencies>
+
+       <build>
+               <plugins>
+                       <!-- Scala Compiler -->
+                       <plugin>
+                               <groupId>net.alchim31.maven</groupId>
+                               <artifactId>scala-maven-plugin</artifactId>
+                               <executions>
+                                       <!-- Run Scala compiler in the 
process-resources phase, so that dependencies on
+                                               Scala classes can be resolved 
later in the (Java) compile phase -->
+                                       <execution>
+                                               <id>scala-compile-first</id>
+                                               <phase>process-resources</phase>
+                                               <goals>
+                                                       <goal>compile</goal>
+                                               </goals>
+                                       </execution>
+
+                                       <!-- Run Scala compiler in the 
process-test-resources phase, so that dependencies on
+                                                Scala classes can be resolved 
later in the (Java) test-compile phase -->
+                                       <execution>
+                                               <id>scala-test-compile</id>
+                                               
<phase>process-test-resources</phase>
+                                               <goals>
+                                                       <goal>testCompile</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                       </plugin>
+
+                       <!-- Scala Code Style -->
+                       <plugin>
+                               <groupId>org.scalastyle</groupId>
+                               <artifactId>scalastyle-maven-plugin</artifactId>
+                               <configuration>
+                                       
<configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
+                               </configuration>
+                       </plugin>
+               </plugins>
+       </build>
 </project>
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 
b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/expressionDsl.scala
similarity index 87%
rename from 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
rename to 
flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/expressionDsl.scala
index c84d5de..2248f8e 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ 
b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/expressionDsl.scala
@@ -15,18 +15,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.table.api.scala
+package org.apache.flink.table.api
 
 import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float 
=> JFloat, Integer => JInteger, Long => JLong, Short => JShort}
 import java.math.{BigDecimal => JBigDecimal}
 import java.sql.{Date, Time, Timestamp}
-import java.time.{LocalDate, LocalDateTime}
+import java.time.{LocalDate, LocalDateTime, LocalTime}
 
+import org.apache.flink.annotation.PublicEvolving
 import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
-import org.apache.flink.table.api.{DataTypes, Over, Table, ValidationException}
-import org.apache.flink.table.expressions.utils.ApiExpressionUtils._
 import org.apache.flink.table.expressions._
-import org.apache.flink.table.functions.BuiltInFunctionDefinitions.{RANGE_TO, 
WITH_COLUMNS, E => FDE, UUID => FDUUID, _}
+import org.apache.flink.table.expressions.utils.ApiExpressionUtils._
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions._
 import org.apache.flink.table.functions.{ScalarFunction, TableFunction, 
UserDefinedAggregateFunction, UserFunctionsTypeHelper, _}
 import org.apache.flink.table.types.DataType
 import org.apache.flink.table.types.utils.TypeConversions
@@ -41,6 +41,7 @@ import _root_.scala.language.implicitConversions
   * These operations must be kept in sync with the parser in
   * [[org.apache.flink.table.expressions.ExpressionParser]].
   */
+@PublicEvolving
 trait ImplicitExpressionOperations {
   private[flink] def expr: Expression
 
@@ -189,7 +190,9 @@ trait ImplicitExpressionOperations {
 
   /**
     * Indicates the range from left to right, i.e. [left, right], which can be 
used in columns
-    * selection, e.g.: withColumns(1 to 3).
+    * selection.
+    *
+    * e.g. withColumns(1 to 3)
     */
   def to (other: Expression): Expression = unresolvedCall(RANGE_TO, expr, 
other)
 
@@ -885,7 +888,7 @@ trait ImplicitExpressionOperations {
     * @param name name of the field (similar to Flink's field expressions)
     * @return value of the field
     */
-  def get(name: String): Expression = unresolvedCall(GET, expr, name)
+  def get(name: String): Expression = unresolvedCall(GET, expr, 
valueLiteral(name))
 
   /**
     * Accesses the field of a Flink composite type (such as Tuple, POJO, etc.) 
by index and
@@ -894,7 +897,7 @@ trait ImplicitExpressionOperations {
     * @param index position of the field
     * @return value of the field
     */
-  def get(index: Int): Expression = unresolvedCall(GET, expr, index)
+  def get(index: Int): Expression = unresolvedCall(GET, expr, 
valueLiteral(index))
 
   /**
     * Converts a Flink composite type (such as Tuple, POJO, etc.) and all of 
its direct subtypes
@@ -997,8 +1000,13 @@ trait ImplicitExpressionOperations {
   * Implicit conversions from Scala literals to [[Expression]] and from 
[[Expression]]
   * to [[ImplicitExpressionOperations]].
   */
+@PublicEvolving
 trait ImplicitExpressionConversions {
 
+  // 
----------------------------------------------------------------------------------------------
+  // Implicit values
+  // 
----------------------------------------------------------------------------------------------
+
   /**
     * Offset constant to be used in the `preceding` clause of unbounded 
[[Over]] windows. Use this
     * constant for a time interval. Unbounded over windows start with the 
first row of a partition.
@@ -1026,6 +1034,10 @@ trait ImplicitExpressionConversions {
     */
   implicit val CURRENT_RANGE: Expression = 
unresolvedCall(BuiltInFunctionDefinitions.CURRENT_RANGE)
 
+  // 
----------------------------------------------------------------------------------------------
+  // Implicit conversions
+  // 
----------------------------------------------------------------------------------------------
+
   implicit class WithOperations(e: Expression) extends 
ImplicitExpressionOperations {
     def expr: Expression = e
   }
@@ -1244,101 +1256,56 @@ trait ImplicitExpressionConversions {
 
     convertArray(array)
   }
-}
-
-// 
------------------------------------------------------------------------------------------------
-// Expressions with no parameters
-// 
------------------------------------------------------------------------------------------------
-
-// we disable the object checker here as it checks for capital letters of 
objects
-// but we want that objects look like functions in certain cases e.g. array(1, 
2, 3)
-// scalastyle:off object.name
 
-/**
-  * Returns the current SQL date in UTC time zone.
-  */
-object currentDate {
+  // 
----------------------------------------------------------------------------------------------
+  // Implicit expressions in prefix notation
+  // 
----------------------------------------------------------------------------------------------
 
   /**
     * Returns the current SQL date in UTC time zone.
     */
-  def apply(): Expression = {
+  def currentDate(): Expression = {
     unresolvedCall(CURRENT_DATE)
   }
-}
-
-/**
-  * Returns the current SQL time in UTC time zone.
-  */
-object currentTime {
 
   /**
     * Returns the current SQL time in UTC time zone.
     */
-  def apply(): Expression = {
+  def currentTime(): Expression = {
     unresolvedCall(CURRENT_TIME)
   }
-}
-
-/**
-  * Returns the current SQL timestamp in UTC time zone.
-  */
-object currentTimestamp {
 
   /**
     * Returns the current SQL timestamp in UTC time zone.
     */
-  def apply(): Expression = {
+  def currentTimestamp(): Expression = {
     unresolvedCall(CURRENT_TIMESTAMP)
   }
-}
-
-/**
-  * Returns the current SQL time in local time zone.
-  */
-object localTime {
 
   /**
     * Returns the current SQL time in local time zone.
     */
-  def apply(): Expression = {
+  def localTime(): Expression = {
     unresolvedCall(LOCAL_TIME)
   }
-}
-
-/**
-  * Returns the current SQL timestamp in local time zone.
-  */
-object localTimestamp {
 
   /**
     * Returns the current SQL timestamp in local time zone.
     */
-  def apply(): Expression = {
+  def localTimestamp(): Expression = {
     unresolvedCall(LOCAL_TIMESTAMP)
   }
-}
-
-/**
-  * Determines whether two anchored time intervals overlap. Time point and 
temporal are
-  * transformed into a range defined by two time points (start, end). The 
function
-  * evaluates <code>leftEnd >= rightStart && rightEnd >= leftStart</code>.
-  *
-  * It evaluates: leftEnd >= rightStart && rightEnd >= leftStart
-  *
-  * e.g. temporalOverlaps("2:55:00".toTime, 1.hour, "3:30:00".toTime, 2.hour) 
leads to true
-  */
-object temporalOverlaps {
 
   /**
     * Determines whether two anchored time intervals overlap. Time point and 
temporal are
-    * transformed into a range defined by two time points (start, end).
+    * transformed into a range defined by two time points (start, end). The 
function
+    * evaluates <code>leftEnd >= rightStart && rightEnd >= leftStart</code>.
     *
     * It evaluates: leftEnd >= rightStart && rightEnd >= leftStart
     *
     * e.g. temporalOverlaps("2:55:00".toTime, 1.hour, "3:30:00".toTime, 
2.hour) leads to true
     */
-  def apply(
+  def temporalOverlaps(
       leftTimePoint: Expression,
       leftTemporal: Expression,
       rightTimePoint: Expression,
@@ -1346,17 +1313,6 @@ object temporalOverlaps {
     : Expression = {
     unresolvedCall(TEMPORAL_OVERLAPS, leftTimePoint, leftTemporal, 
rightTimePoint, rightTemporal)
   }
-}
-
-/**
-  * Formats a timestamp as a string using a specified format.
-  * The format must be compatible with MySQL's date formatting syntax as used 
by the
-  * date_parse function.
-  *
-  * For example <code>dataFormat('time, "%Y, %d %M")</code> results in strings
-  * formatted as "2017, 05 May".
-  */
-object dateFormat {
 
   /**
     * Formats a timestamp as a string using a specified format.
@@ -1369,21 +1325,12 @@ object dateFormat {
     * @param format The format of the string.
     * @return The formatted timestamp as string.
     */
-  def apply(
+  def dateFormat(
       timestamp: Expression,
       format: Expression)
     : Expression = {
     unresolvedCall(DATE_FORMAT, timestamp, format)
   }
-}
-
-/**
-  * Returns the (signed) number of [[TimePointUnit]] between timePoint1 and 
timePoint2.
-  *
-  * For example, timestampDiff(TimePointUnit.DAY, '2016-06-15'.toDate, 
'2016-06-18'.toDate leads
-  * to 3.
-  */
-object timestampDiff {
 
   /**
     * Returns the (signed) number of [[TimePointUnit]] between timePoint1 and 
timePoint2.
@@ -1396,89 +1343,53 @@ object timestampDiff {
     * @param timePoint2 The second point in time.
     * @return The number of intervals as integer value.
     */
-  def apply(
+  def timestampDiff(
       timePointUnit: TimePointUnit,
       timePoint1: Expression,
       timePoint2: Expression)
     : Expression = {
     unresolvedCall(TIMESTAMP_DIFF, timePointUnit, timePoint1, timePoint2)
   }
-}
-
-/**
-  * Creates an array of literals. The array will be an array of objects (not 
primitives).
-  */
-object array {
 
   /**
-    * Creates an array of literals. The array will be an array of objects (not 
primitives).
+    * Creates an array of literals.
     */
-  def apply(head: Expression, tail: Expression*): Expression = {
+  def array(head: Expression, tail: Expression*): Expression = {
     unresolvedCall(ARRAY, head +: tail: _*)
   }
-}
-
-/**
-  * Creates a row of expressions.
-  */
-object row {
 
   /**
     * Creates a row of expressions.
     */
-  def apply(head: Expression, tail: Expression*): Expression = {
+  def row(head: Expression, tail: Expression*): Expression = {
     unresolvedCall(ROW, head +: tail: _*)
   }
-}
-
-/**
-  * Creates a map of expressions. The map will be a map between two objects 
(not primitives).
-  */
-object map {
 
   /**
-    * Creates a map of expressions. The map will be a map between two objects 
(not primitives).
+    * Creates a map of expressions.
     */
-  def apply(key: Expression, value: Expression, tail: Expression*): Expression 
= {
+  def map(key: Expression, value: Expression, tail: Expression*): Expression = 
{
     unresolvedCall(MAP, key +: value +: tail: _*)
   }
-}
-
-/**
-  * Returns a value that is closer than any other value to pi.
-  */
-object pi {
 
   /**
     * Returns a value that is closer than any other value to pi.
     */
-  def apply(): Expression = {
+  def pi(): Expression = {
     unresolvedCall(PI)
   }
-}
-
-/**
-  * Returns a value that is closer than any other value to e.
-  */
-object e {
 
   /**
     * Returns a value that is closer than any other value to e.
     */
-  def apply(): Expression = {
-    unresolvedCall(FDE)
+  def e(): Expression = {
+    unresolvedCall(E)
   }
-}
-
-/**
-  * Returns a pseudorandom double value between 0.0 (inclusive) and 1.0 
(exclusive).
-  */
-object rand {
 
   /**
     * Returns a pseudorandom double value between 0.0 (inclusive) and 1.0 
(exclusive).
     */
-  def apply(): Expression = {
+  def rand(): Expression = {
     unresolvedCall(RAND)
   }
 
@@ -1487,22 +1398,15 @@ object rand {
     * initial seed. Two rand() functions will return identical sequences of 
numbers if they
     * have same initial seed.
     */
-  def apply(seed: Expression): Expression = {
+  def rand(seed: Expression): Expression = {
     unresolvedCall(RAND, seed)
   }
-}
-
-/**
-  * Returns a pseudorandom integer value between 0.0 (inclusive) and the 
specified
-  * value (exclusive).
-  */
-object randInteger {
 
   /**
     * Returns a pseudorandom integer value between 0.0 (inclusive) and the 
specified
     * value (exclusive).
     */
-  def apply(bound: Expression): Expression = {
+  def randInteger(bound: Expression): Expression = {
     unresolvedCall(RAND_INTEGER, bound)
   }
 
@@ -1511,59 +1415,35 @@ object randInteger {
     * (exclusive) with a initial seed. Two randInteger() functions will return 
identical sequences
     * of numbers if they have same initial seed and same bound.
     */
-  def apply(seed: Expression, bound: Expression): Expression = {
+  def randInteger(seed: Expression, bound: Expression): Expression = {
     unresolvedCall(RAND_INTEGER, seed, bound)
   }
-}
-
-/**
-  * Returns the string that results from concatenating the arguments.
-  * Returns NULL if any argument is NULL.
-  */
-object concat {
 
   /**
     * Returns the string that results from concatenating the arguments.
     * Returns NULL if any argument is NULL.
     */
-  def apply(string: Expression, strings: Expression*): Expression = {
+  def concat(string: Expression, strings: Expression*): Expression = {
     unresolvedCall(CONCAT, string +: strings: _*)
   }
-}
-
-/**
-  * Calculates the arc tangent of a given coordinate.
-  */
-object atan2 {
 
   /**
     * Calculates the arc tangent of a given coordinate.
     */
-  def apply(y: Expression, x: Expression): Expression = {
+  def atan2(y: Expression, x: Expression): Expression = {
     unresolvedCall(ATAN2, y, x)
   }
-}
 
-/**
-  * Returns the string that results from concatenating the arguments and 
separator.
-  * Returns NULL If the separator is NULL.
-  *
-  * Note: this user-defined function does not skip empty strings. However, it 
does skip any NULL
-  * values after the separator argument.
-  **/
-object concat_ws {
-  def apply(separator: Expression, string: Expression, strings: Expression*): 
Expression = {
+  /**
+    * Returns the string that results from concatenating the arguments and 
separator.
+    * Returns NULL If the separator is NULL.
+    *
+    * Note: this user-defined function does not skip empty strings. However, 
it does skip any NULL
+    * values after the separator argument.
+    **/
+  def concat_ws(separator: Expression, string: Expression, strings: 
Expression*): Expression = {
     unresolvedCall(CONCAT_WS, separator +: string +: strings: _*)
   }
-}
-
-/**
-  * Returns an UUID (Universally Unique Identifier) string (e.g.,
-  * "3d3c68f7-f608-473f-b60c-b0c44ad4cc4e") according to RFC 4122 type 4 
(pseudo randomly
-  * generated) UUID. The UUID is generated using a cryptographically strong 
pseudo random number
-  * generator.
-  */
-object uuid {
 
   /**
     * Returns an UUID (Universally Unique Identifier) string (e.g.,
@@ -1571,66 +1451,43 @@ object uuid {
     * generated) UUID. The UUID is generated using a cryptographically strong 
pseudo random number
     * generator.
     */
-  def apply(): Expression = {
-    unresolvedCall(FDUUID)
+  def uuid(): Expression = {
+    unresolvedCall(UUID)
   }
-}
-
-/**
-  * Returns a null literal value of a given data type.
-  *
-  * e.g. nullOf(DataTypes.INT())
-  */
-object nullOf {
 
   /**
     * Returns a null literal value of a given data type.
     *
     * e.g. nullOf(DataTypes.INT())
     */
-  def apply(dataType: DataType): Expression = {
+  def nullOf(dataType: DataType): Expression = {
     valueLiteral(null, dataType)
   }
 
   /**
-    * @deprecated This method will be removed in future versions as it uses 
the old type system. It
-    *             is recommended to use [[apply(DataType)]] instead which uses 
the new type system
-    *             based on [[DataTypes]]. Please make sure to use either the 
old or the new type
-    *             system consistently to avoid unintended behavior. See the 
website documentation
-    *             for more information.
+    * @deprecated This method will be removed in future versions as it uses 
the old type system.
+    *             It is recommended to use [[nullOf(DataType)]] instead which 
uses the new type
+    *             system based on [[DataTypes]]. Please make sure to use 
either the old or the new
+    *             type system consistently to avoid unintended behavior. See 
the website
+    *             documentation for more information.
     */
-  def apply(typeInfo: TypeInformation[_]): Expression = {
-    apply(TypeConversions.fromLegacyInfoToDataType(typeInfo))
+  def nullOf(typeInfo: TypeInformation[_]): Expression = {
+    nullOf(TypeConversions.fromLegacyInfoToDataType(typeInfo))
   }
-}
-
-/**
-  * Calculates the logarithm of the given value.
-  */
-object log {
 
   /**
-    * Calculates the natural logarithm of the given value.
+    * Calculates the logarithm of the given value.
     */
-  def apply(value: Expression): Expression = {
+  def log(value: Expression): Expression = {
     unresolvedCall(LOG, value)
   }
 
   /**
     * Calculates the logarithm of the given value to the given base.
     */
-  def apply(base: Expression, value: Expression): Expression = {
+  def log(base: Expression, value: Expression): Expression = {
     unresolvedCall(LOG, base, value)
   }
-}
-
-/**
-  * Ternary conditional operator that decides which of two other expressions 
should be evaluated
-  * based on a evaluated boolean condition.
-  *
-  * e.g. ifThenElse(42 > 5, "A", "B") leads to "A"
-  */
-object ifThenElse {
 
   /**
     * Ternary conditional operator that decides which of two other expressions 
should be evaluated
@@ -1642,30 +1499,34 @@ object ifThenElse {
     * @param ifTrue expression to be evaluated if condition holds
     * @param ifFalse expression to be evaluated if condition does not hold
     */
-  def apply(condition: Expression, ifTrue: Expression, ifFalse: Expression): 
Expression = {
+  def ifThenElse(condition: Expression, ifTrue: Expression, ifFalse: 
Expression): Expression = {
     unresolvedCall(IF, condition, ifTrue, ifFalse)
   }
-}
 
-/**
-  * Creates a withColumns expression.
-  */
-object withColumns {
-
-  def apply(head: Expression, tail: Expression*): Expression = {
+  /**
+    * Creates an expression that selects a range of columns. It can be used 
wherever an array of
+    * expression is accepted such as function calls, projections, or groupings.
+    *
+    * A range can either be index-based or name-based. Indices start at 1 and 
boundaries are
+    * inclusive.
+    *
+    * e.g. withColumns('b to 'c) or withColumns('*)
+    */
+  def withColumns(head: Expression, tail: Expression*): Expression = {
     unresolvedCall(WITH_COLUMNS, head +: tail: _*)
   }
-}
 
-/**
-  * Creates a withoutColumns expression.
-  */
-object withoutColumns {
-
-  def apply(head: Expression, tail: Expression*): Expression = {
+  /**
+    * Creates an expression that selects all columns except for the given 
range of columns. It can
+    * be used wherever an array of expression is accepted such as function 
calls, projections, or
+    * groupings.
+    *
+    * A range can either be index-based or name-based. Indices start at 1 and 
boundaries are
+    * inclusive.
+    *
+    * e.g. withoutColumns('b to 'c) or withoutColumns('c)
+    */
+  def withoutColumns(head: Expression, tail: Expression*): Expression = {
     unresolvedCall(WITHOUT_COLUMNS, head +: tail: _*)
   }
 }
-
-
-// scalastyle:on object.name
diff --git 
a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/package.scala
 
b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/package.scala
new file mode 100644
index 0000000..2fb4f32
--- /dev/null
+++ 
b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/package.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table
+
+import org.apache.flink.table.api.{ImplicitExpressionConversions, 
ImplicitExpressionOperations, Table, TableEnvironment}
+
+/**
+  * == Table & SQL API ==
+  *
+  * This package contains the API of the Table & SQL API. Users create 
[[Table]]s on which
+  * relational operations can be performed.
+  *
+  * For accessing all API classes and implicit conversions, use the following 
imports:
+  *
+  * {{{
+  *   import org.apache.flink.table.api._
+  * }}}
+  *
+  * More information about the entry points of the API can be found in 
[[TableEnvironment]].
+  *
+  * Available implicit expressions are listed in 
[[ImplicitExpressionConversions]] and
+  * [[ImplicitExpressionOperations]].
+  *
+  * Please refer to the website documentation about how to construct and run 
table programs.
+  */
+package object api /* extends ImplicitExpressionConversions */ {
+
+  // This package object should extend from ImplicitExpressionConversions but 
would clash with
+  // "org.apache.flink.table.api.scala._" therefore we postpone splitting the 
package object into
+  // two and let users update there imports first
+}
+
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/package.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/package.scala
deleted file mode 100644
index 64e7801..0000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/package.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table
-
-/**
- * == Table API ==
- *
- * This package contains the API of the Table API. It can be used with Flink 
Streaming
- * and Flink Batch. From Scala as well as from Java.
- *
- * When using the Table API, as user creates a 
[[org.apache.flink.table.api.Table]] from
- * a DataSet or DataStream. On this relational operations can be performed. A 
table can also
- * be converted back to a DataSet or DataStream.
- *
- * Packages [[org.apache.flink.table.api.scala]] and 
[[org.apache.flink.table.api.java]] contain
- * the language specific part of the API. Refer to these packages for 
documentation on how
- * the Table API can be used in Java and Scala.
- */
-package object api
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/package.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/package.scala
deleted file mode 100644
index 8e4855b..0000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/package.scala
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.api
-
-import org.apache.flink.types.Row
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.DataSet
-import org.apache.flink.streaming.api.scala.DataStream
-import org.apache.flink.table.api.internal.TableImpl
-import org.apache.flink.table.api.scala.{StreamTableEnvironment => 
ScalaStreamTableEnv}
-import org.apache.flink.table.api.scala.{BatchTableEnvironment => 
ScalaBatchTableEnv}
-
-import _root_.scala.language.implicitConversions
-
-/**
-  * == Table API (Scala) ==
-  *
-  * Importing this package with:
-  *
-  * {{{
-  *   import org.apache.flink.table.api.scala._
-  * }}}
-  *
-  * imports implicit conversions for converting a [[DataSet]] and a 
[[DataStream]] to a
-  * [[Table]]. This can be used to perform SQL-like queries on data. Please 
have
-  * a look at [[Table]] to see which operations are supported and
-  * [[org.apache.flink.table.api.scala.ImplicitExpressionOperations]] to see 
how an
-  * expression can be specified.
-  *
-  * When writing a query you can use Scala Symbols to refer to field names. 
One would
-  * refer to field `a` by writing `'a`. Sometimes it is necessary to manually 
convert a
-  * Scala literal to an expression literal, in those cases use `.toExpr`, as 
in `3.toExpr`.
-  *
-  * Example:
-  *
-  * {{{
-  *   import org.apache.flink.api.scala._
-  *   import org.apache.flink.table.api.scala._
-  *
-  *   val env = ExecutionEnvironment.getExecutionEnvironment
-  *   val tEnv = TableEnvironment.getTableEnvironment(env)
-  *
-  *   val input: DataSet[(String, Int)] = env.fromElements(("Hello", 2), 
("Hello", 5), ("Ciao", 3))
-  *   val result = input
-  *         .toTable(tEnv, 'word, 'count)
-  *         .groupBy('word)
-  *         .select('word, 'count.avg)
-  *
-  *   result.print()
-  * }}}
-  *
-  */
-package object scala extends ImplicitExpressionConversions {
-
-  implicit def table2TableConversions(table: Table): TableConversions = {
-    new TableConversions(table.asInstanceOf[TableImpl])
-  }
-
-  implicit def dataSet2DataSetConversions[T](set: DataSet[T]): 
DataSetConversions[T] = {
-    new DataSetConversions[T](set, set.getType())
-  }
-
-  implicit def table2RowDataSet(table: Table): DataSet[Row] = {
-    val tableEnv =
-      
table.asInstanceOf[TableImpl].getTableEnvironment.asInstanceOf[ScalaBatchTableEnv]
-    tableEnv.toDataSet[Row](table)
-  }
-
-  implicit def dataStream2DataStreamConversions[T](set: DataStream[T]): 
DataStreamConversions[T] = {
-    new DataStreamConversions[T](set, set.dataType)
-  }
-
-  implicit def table2RowDataStream(table: Table): DataStream[Row] = {
-    val tableEnv =
-      
table.asInstanceOf[TableImpl].getTableEnvironment.asInstanceOf[ScalaStreamTableEnv]
-    tableEnv.toAppendStream[Row](table)
-  }
-}

Reply via email to