This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new d903480 [FLINK-13045][table] Move Scala expression DSL to flink-table-api-scala d903480 is described below commit d903480ef7abc95cd419e6a194db2276dd7eb4cb Author: Timo Walther <twal...@apache.org> AuthorDate: Tue Jul 2 16:55:08 2019 +0200 [FLINK-13045][table] Move Scala expression DSL to flink-table-api-scala This move the Scala expression DSL to flink-table-api-scala. Users of pure table programs should define there imports like: import org.apache.flink.table.api._ TableEnvironment.create(...) Users of the DataStream API should define their imports like: import org.apache.flink.table.api._ import org.apache.flink.table.api.scala._ StreamTableEnvironment.create(...) This commit did not split the package object org.apache.flink.table.api.scala._ into two parts yet because we want to give users the chance to update their imports. This closes #8945. --- docs/dev/table/tableApi.md | 4 + docs/dev/table/tableApi.zh.md | 4 + flink-table/flink-table-api-scala-bridge/pom.xml | 9 +- .../flink/table/api/scala/DataSetConversions.scala | 6 +- .../table/api/scala/DataStreamConversions.scala | 6 +- .../flink/table/api/scala/TableConversions.scala | 25 +- .../org/apache/flink/table/api/scala/package.scala | 89 ++++++ flink-table/flink-table-api-scala/pom.xml | 47 +++ .../apache/flink/table/api}/expressionDsl.scala | 319 ++++++--------------- .../scala/org/apache/flink/table/api/package.scala | 48 ++++ .../scala/org/apache/flink/table/api/package.scala | 34 --- .../org/apache/flink/table/api/scala/package.scala | 93 ------ 12 files changed, 307 insertions(+), 377 deletions(-) diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md index a0bd51a..bd6bd38 100644 --- a/docs/dev/table/tableApi.md +++ b/docs/dev/table/tableApi.md @@ -44,6 +44,9 @@ The following example shows the differences between the Scala and Java Table API The Java Table API is enabled by importing `org.apache.flink.table.api.java.*`. The following example shows how a Java Table API program is constructed and how expressions are specified as strings. {% highlight java %} +import org.apache.flink.table.api._ +import org.apache.flink.table.api.java._ + // environment configuration ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); @@ -73,6 +76,7 @@ The following example shows how a Scala Table API program is constructed. Table {% highlight scala %} import org.apache.flink.api.scala._ +import org.apache.flink.table.api._ import org.apache.flink.table.api.scala._ // environment configuration diff --git a/docs/dev/table/tableApi.zh.md b/docs/dev/table/tableApi.zh.md index 7a75c0a..e4b8a55 100644 --- a/docs/dev/table/tableApi.zh.md +++ b/docs/dev/table/tableApi.zh.md @@ -44,6 +44,9 @@ The following example shows the differences between the Scala and Java Table API The Java Table API is enabled by importing `org.apache.flink.table.api.java.*`. The following example shows how a Java Table API program is constructed and how expressions are specified as strings. {% highlight java %} +import org.apache.flink.table.api._ +import org.apache.flink.table.api.java._ + // environment configuration ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); @@ -73,6 +76,7 @@ The following example shows how a Scala Table API program is constructed. Table {% highlight scala %} import org.apache.flink.api.scala._ +import org.apache.flink.table.api._ import org.apache.flink.table.api.scala._ // environment configuration diff --git a/flink-table/flink-table-api-scala-bridge/pom.xml b/flink-table/flink-table-api-scala-bridge/pom.xml index 2a7f2ec..f49f71c 100644 --- a/flink-table/flink-table-api-scala-bridge/pom.xml +++ b/flink-table/flink-table-api-scala-bridge/pom.xml @@ -61,19 +61,18 @@ under the License. <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <executions> - <!-- Run scala compiler in the process-resources phase, so that dependencies on - scala classes can be resolved later in the (Java) compile phase --> + <!-- Run Scala compiler in the process-resources phase, so that dependencies on + Scala classes can be resolved later in the (Java) compile phase --> <execution> <id>scala-compile-first</id> <phase>process-resources</phase> <goals> - <goal>add-source</goal> <goal>compile</goal> </goals> </execution> - <!-- Run scala compiler in the process-test-resources phase, so that dependencies on - scala classes can be resolved later in the (Java) test-compile phase --> + <!-- Run Scala compiler in the process-test-resources phase, so that dependencies on + Scala classes can be resolved later in the (Java) test-compile phase --> <execution> <id>scala-test-compile</id> <phase>process-test-resources</phase> diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/DataSetConversions.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/DataSetConversions.scala index 4b92bdb..4d80e75 100644 --- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/DataSetConversions.scala +++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/DataSetConversions.scala @@ -17,6 +17,7 @@ */ package org.apache.flink.table.api.scala +import org.apache.flink.annotation.PublicEvolving import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.scala._ import org.apache.flink.table.api.Table @@ -29,16 +30,17 @@ import org.apache.flink.table.expressions.Expression * @param inputType The [[TypeInformation]] for the type of the [[DataSet]]. * @tparam T The type of the [[DataSet]]. */ +@PublicEvolving class DataSetConversions[T](dataSet: DataSet[T], inputType: TypeInformation[T]) { /** * Converts the [[DataSet]] into a [[Table]]. * - * The field name of the new [[Table]] can be specified like this: + * The field names of the new [[Table]] can be specified like this: * * {{{ * val env = ExecutionEnvironment.getExecutionEnvironment - * val tEnv = TableEnvironment.getTableEnvironment(env) + * val tEnv = BatchTableEnvironment.create(env) * * val set: DataSet[(String, Int)] = ... * val table = set.toTable(tEnv, 'name, 'amount) diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/DataStreamConversions.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/DataStreamConversions.scala index 3c31859..1360f5c 100644 --- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/DataStreamConversions.scala +++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/DataStreamConversions.scala @@ -17,6 +17,7 @@ */ package org.apache.flink.table.api.scala +import org.apache.flink.annotation.PublicEvolving import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.streaming.api.scala.DataStream import org.apache.flink.table.api.Table @@ -29,16 +30,17 @@ import org.apache.flink.table.expressions.Expression * @param inputType The [[TypeInformation]] for the type of the [[DataStream]]. * @tparam T The type of the [[DataStream]]. */ +@PublicEvolving class DataStreamConversions[T](dataStream: DataStream[T], inputType: TypeInformation[T]) { /** * Converts the [[DataStream]] into a [[Table]]. * - * The field name of the new [[Table]] can be specified like this: + * The field names of the new [[Table]] can be specified like this: * * {{{ * val env = StreamExecutionEnvironment.getExecutionEnvironment - * val tEnv = TableEnvironment.getTableEnvironment(env) + * val tEnv = StreamTableEnvironment.create(env) * * val stream: DataStream[(String, Int)] = ... * val table = stream.toTable(tEnv, 'name, 'amount) diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala index 0f3c8eb..ff74c78 100644 --- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala +++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala @@ -18,18 +18,19 @@ package org.apache.flink.table.api.scala +import org.apache.flink.annotation.PublicEvolving import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.DataStream import org.apache.flink.table.api.internal.TableImpl -import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTableEnv, StreamTableEnvironment => ScalaStreamTableEnv} -import org.apache.flink.table.api.{BatchQueryConfig, StreamQueryConfig, Table, TableException} +import org.apache.flink.table.api.{BatchQueryConfig, StreamQueryConfig, Table, TableException, ValidationException} /** * Holds methods to convert a [[Table]] into a [[DataSet]] or a [[DataStream]]. * * @param table The table to convert. */ +@PublicEvolving class TableConversions(table: Table) { private val internalTable = table.asInstanceOf[TableImpl] @@ -48,10 +49,10 @@ class TableConversions(table: Table) { def toDataSet[T: TypeInformation]: DataSet[T] = { internalTable.getTableEnvironment match { - case tEnv: ScalaBatchTableEnv => + case tEnv: BatchTableEnvironment => tEnv.toDataSet(table) case _ => - throw new TableException( + throw new ValidationException( "Only tables that originate from Scala DataSets can be converted to Scala DataSets.") } } @@ -71,10 +72,10 @@ class TableConversions(table: Table) { def toDataSet[T: TypeInformation](queryConfig: BatchQueryConfig): DataSet[T] = { internalTable.getTableEnvironment match { - case tEnv: ScalaBatchTableEnv => + case tEnv: BatchTableEnvironment => tEnv.toDataSet(table, queryConfig) case _ => - throw new TableException( + throw new ValidationException( "Only tables that originate from Scala DataSets can be converted to Scala DataSets.") } } @@ -96,10 +97,10 @@ class TableConversions(table: Table) { def toAppendStream[T: TypeInformation]: DataStream[T] = { internalTable.getTableEnvironment match { - case tEnv: ScalaStreamTableEnv => + case tEnv: StreamTableEnvironment => tEnv.toAppendStream(table) case _ => - throw new TableException( + throw new ValidationException( "Only tables that originate from Scala DataStreams " + "can be converted to Scala DataStreams.") } @@ -122,10 +123,10 @@ class TableConversions(table: Table) { */ def toAppendStream[T: TypeInformation](queryConfig: StreamQueryConfig): DataStream[T] = { internalTable.getTableEnvironment match { - case tEnv: ScalaStreamTableEnv => + case tEnv: StreamTableEnvironment => tEnv.toAppendStream(table, queryConfig) case _ => - throw new TableException( + throw new ValidationException( "Only tables that originate from Scala DataStreams " + "can be converted to Scala DataStreams.") } @@ -141,7 +142,7 @@ class TableConversions(table: Table) { def toRetractStream[T: TypeInformation]: DataStream[(Boolean, T)] = { internalTable.getTableEnvironment match { - case tEnv: ScalaStreamTableEnv => + case tEnv: StreamTableEnvironment => tEnv.toRetractStream(table) case _ => throw new TableException( @@ -163,7 +164,7 @@ class TableConversions(table: Table) { queryConfig: StreamQueryConfig): DataStream[(Boolean, T)] = { internalTable.getTableEnvironment match { - case tEnv: ScalaStreamTableEnv => + case tEnv: StreamTableEnvironment => tEnv.toRetractStream(table, queryConfig) case _ => throw new TableException( diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/package.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/package.scala new file mode 100644 index 0000000..556c657 --- /dev/null +++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/package.scala @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.api + +import org.apache.flink.api.scala.{DataSet, _} +import org.apache.flink.streaming.api.scala.DataStream +import org.apache.flink.table.api.internal.TableImpl +import org.apache.flink.table.api.scala.StreamTableEnvironment +import org.apache.flink.types.Row + +import _root_.scala.language.implicitConversions + +/** + * == Table & SQL API with Flink's DataStream API == + * + * This package contains the API of the Table & SQL API that bridges to Flink's [[DataStream]] API + * for the Scala programming language. Users can create [[Table]]s from [[DataStream]]s on which + * relational operations can be performed. Tables can also be converted back to [[DataStream]]s for + * further processing. + * + * For accessing all API classes and implicit conversions, use the following imports: + * + * {{{ + * import org.apache.flink.table.api._ + * import org.apache.flink.table.api.scala._ + * }}} + * + * More information about the entry points of the API can be found in [[StreamTableEnvironment]]. + * + * Available implicit expressions are listed in [[ImplicitExpressionConversions]] and + * [[ImplicitExpressionOperations]]. + * + * Available implicit table-to-stream conversions are listed in this package object. + * + * Please refer to the website documentation about how to construct and run table programs that are + * connected to the DataStream API. + */ +package object scala extends ImplicitExpressionConversions { + + // This package object should not extend from ImplicitExpressionConversions but would clash with + // "org.apache.flink.table.api._" therefore we postpone splitting the package object into + // two and let users update there imports first. All users should import both `api._` and + // `api.scala._`. + + implicit def tableConversions(table: Table): TableConversions = { + new TableConversions(table.asInstanceOf[TableImpl]) + } + + implicit def dataSetConversions[T](set: DataSet[T]): DataSetConversions[T] = { + new DataSetConversions[T](set, set.getType()) + } + + implicit def dataStreamConversions[T](set: DataStream[T]): DataStreamConversions[T] = { + new DataStreamConversions[T](set, set.dataType) + } + + implicit def table2RowDataSet(table: Table): DataSet[Row] = { + val tableEnv = table.asInstanceOf[TableImpl].getTableEnvironment + if (!tableEnv.isInstanceOf[BatchTableEnvironment]) { + throw new ValidationException("Table cannot be converted into a DataSet. " + + "It is not part of a batch table environment.") + } + tableEnv.asInstanceOf[BatchTableEnvironment].toDataSet[Row](table) + } + + implicit def table2RowDataStream(table: Table): DataStream[Row] = { + val tableEnv = table.asInstanceOf[TableImpl].getTableEnvironment + if (!tableEnv.isInstanceOf[StreamTableEnvironment]) { + throw new ValidationException("Table cannot be converted into a DataStream. " + + "It is not part of a stream table environment.") + } + tableEnv.asInstanceOf[StreamTableEnvironment].toAppendStream[Row](table) + } +} diff --git a/flink-table/flink-table-api-scala/pom.xml b/flink-table/flink-table-api-scala/pom.xml index 7fd6f0a..37a1bcf 100644 --- a/flink-table/flink-table-api-scala/pom.xml +++ b/flink-table/flink-table-api-scala/pom.xml @@ -37,6 +37,7 @@ under the License. <packaging>jar</packaging> <dependencies> + <!-- Flink dependencies --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> @@ -47,5 +48,51 @@ under the License. <artifactId>flink-table-api-java</artifactId> <version>${project.version}</version> </dependency> + + <!-- External dependencies --> + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + </dependency> </dependencies> + + <build> + <plugins> + <!-- Scala Compiler --> + <plugin> + <groupId>net.alchim31.maven</groupId> + <artifactId>scala-maven-plugin</artifactId> + <executions> + <!-- Run Scala compiler in the process-resources phase, so that dependencies on + Scala classes can be resolved later in the (Java) compile phase --> + <execution> + <id>scala-compile-first</id> + <phase>process-resources</phase> + <goals> + <goal>compile</goal> + </goals> + </execution> + + <!-- Run Scala compiler in the process-test-resources phase, so that dependencies on + Scala classes can be resolved later in the (Java) test-compile phase --> + <execution> + <id>scala-test-compile</id> + <phase>process-test-resources</phase> + <goals> + <goal>testCompile</goal> + </goals> + </execution> + </executions> + </plugin> + + <!-- Scala Code Style --> + <plugin> + <groupId>org.scalastyle</groupId> + <artifactId>scalastyle-maven-plugin</artifactId> + <configuration> + <configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation> + </configuration> + </plugin> + </plugins> + </build> </project> diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/expressionDsl.scala similarity index 87% rename from flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala rename to flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/expressionDsl.scala index c84d5de..2248f8e 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala +++ b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/expressionDsl.scala @@ -15,18 +15,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.table.api.scala +package org.apache.flink.table.api import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float => JFloat, Integer => JInteger, Long => JLong, Short => JShort} import java.math.{BigDecimal => JBigDecimal} import java.sql.{Date, Time, Timestamp} -import java.time.{LocalDate, LocalDateTime} +import java.time.{LocalDate, LocalDateTime, LocalTime} +import org.apache.flink.annotation.PublicEvolving import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation} -import org.apache.flink.table.api.{DataTypes, Over, Table, ValidationException} -import org.apache.flink.table.expressions.utils.ApiExpressionUtils._ import org.apache.flink.table.expressions._ -import org.apache.flink.table.functions.BuiltInFunctionDefinitions.{RANGE_TO, WITH_COLUMNS, E => FDE, UUID => FDUUID, _} +import org.apache.flink.table.expressions.utils.ApiExpressionUtils._ +import org.apache.flink.table.functions.BuiltInFunctionDefinitions._ import org.apache.flink.table.functions.{ScalarFunction, TableFunction, UserDefinedAggregateFunction, UserFunctionsTypeHelper, _} import org.apache.flink.table.types.DataType import org.apache.flink.table.types.utils.TypeConversions @@ -41,6 +41,7 @@ import _root_.scala.language.implicitConversions * These operations must be kept in sync with the parser in * [[org.apache.flink.table.expressions.ExpressionParser]]. */ +@PublicEvolving trait ImplicitExpressionOperations { private[flink] def expr: Expression @@ -189,7 +190,9 @@ trait ImplicitExpressionOperations { /** * Indicates the range from left to right, i.e. [left, right], which can be used in columns - * selection, e.g.: withColumns(1 to 3). + * selection. + * + * e.g. withColumns(1 to 3) */ def to (other: Expression): Expression = unresolvedCall(RANGE_TO, expr, other) @@ -885,7 +888,7 @@ trait ImplicitExpressionOperations { * @param name name of the field (similar to Flink's field expressions) * @return value of the field */ - def get(name: String): Expression = unresolvedCall(GET, expr, name) + def get(name: String): Expression = unresolvedCall(GET, expr, valueLiteral(name)) /** * Accesses the field of a Flink composite type (such as Tuple, POJO, etc.) by index and @@ -894,7 +897,7 @@ trait ImplicitExpressionOperations { * @param index position of the field * @return value of the field */ - def get(index: Int): Expression = unresolvedCall(GET, expr, index) + def get(index: Int): Expression = unresolvedCall(GET, expr, valueLiteral(index)) /** * Converts a Flink composite type (such as Tuple, POJO, etc.) and all of its direct subtypes @@ -997,8 +1000,13 @@ trait ImplicitExpressionOperations { * Implicit conversions from Scala literals to [[Expression]] and from [[Expression]] * to [[ImplicitExpressionOperations]]. */ +@PublicEvolving trait ImplicitExpressionConversions { + // ---------------------------------------------------------------------------------------------- + // Implicit values + // ---------------------------------------------------------------------------------------------- + /** * Offset constant to be used in the `preceding` clause of unbounded [[Over]] windows. Use this * constant for a time interval. Unbounded over windows start with the first row of a partition. @@ -1026,6 +1034,10 @@ trait ImplicitExpressionConversions { */ implicit val CURRENT_RANGE: Expression = unresolvedCall(BuiltInFunctionDefinitions.CURRENT_RANGE) + // ---------------------------------------------------------------------------------------------- + // Implicit conversions + // ---------------------------------------------------------------------------------------------- + implicit class WithOperations(e: Expression) extends ImplicitExpressionOperations { def expr: Expression = e } @@ -1244,101 +1256,56 @@ trait ImplicitExpressionConversions { convertArray(array) } -} - -// ------------------------------------------------------------------------------------------------ -// Expressions with no parameters -// ------------------------------------------------------------------------------------------------ - -// we disable the object checker here as it checks for capital letters of objects -// but we want that objects look like functions in certain cases e.g. array(1, 2, 3) -// scalastyle:off object.name -/** - * Returns the current SQL date in UTC time zone. - */ -object currentDate { + // ---------------------------------------------------------------------------------------------- + // Implicit expressions in prefix notation + // ---------------------------------------------------------------------------------------------- /** * Returns the current SQL date in UTC time zone. */ - def apply(): Expression = { + def currentDate(): Expression = { unresolvedCall(CURRENT_DATE) } -} - -/** - * Returns the current SQL time in UTC time zone. - */ -object currentTime { /** * Returns the current SQL time in UTC time zone. */ - def apply(): Expression = { + def currentTime(): Expression = { unresolvedCall(CURRENT_TIME) } -} - -/** - * Returns the current SQL timestamp in UTC time zone. - */ -object currentTimestamp { /** * Returns the current SQL timestamp in UTC time zone. */ - def apply(): Expression = { + def currentTimestamp(): Expression = { unresolvedCall(CURRENT_TIMESTAMP) } -} - -/** - * Returns the current SQL time in local time zone. - */ -object localTime { /** * Returns the current SQL time in local time zone. */ - def apply(): Expression = { + def localTime(): Expression = { unresolvedCall(LOCAL_TIME) } -} - -/** - * Returns the current SQL timestamp in local time zone. - */ -object localTimestamp { /** * Returns the current SQL timestamp in local time zone. */ - def apply(): Expression = { + def localTimestamp(): Expression = { unresolvedCall(LOCAL_TIMESTAMP) } -} - -/** - * Determines whether two anchored time intervals overlap. Time point and temporal are - * transformed into a range defined by two time points (start, end). The function - * evaluates <code>leftEnd >= rightStart && rightEnd >= leftStart</code>. - * - * It evaluates: leftEnd >= rightStart && rightEnd >= leftStart - * - * e.g. temporalOverlaps("2:55:00".toTime, 1.hour, "3:30:00".toTime, 2.hour) leads to true - */ -object temporalOverlaps { /** * Determines whether two anchored time intervals overlap. Time point and temporal are - * transformed into a range defined by two time points (start, end). + * transformed into a range defined by two time points (start, end). The function + * evaluates <code>leftEnd >= rightStart && rightEnd >= leftStart</code>. * * It evaluates: leftEnd >= rightStart && rightEnd >= leftStart * * e.g. temporalOverlaps("2:55:00".toTime, 1.hour, "3:30:00".toTime, 2.hour) leads to true */ - def apply( + def temporalOverlaps( leftTimePoint: Expression, leftTemporal: Expression, rightTimePoint: Expression, @@ -1346,17 +1313,6 @@ object temporalOverlaps { : Expression = { unresolvedCall(TEMPORAL_OVERLAPS, leftTimePoint, leftTemporal, rightTimePoint, rightTemporal) } -} - -/** - * Formats a timestamp as a string using a specified format. - * The format must be compatible with MySQL's date formatting syntax as used by the - * date_parse function. - * - * For example <code>dataFormat('time, "%Y, %d %M")</code> results in strings - * formatted as "2017, 05 May". - */ -object dateFormat { /** * Formats a timestamp as a string using a specified format. @@ -1369,21 +1325,12 @@ object dateFormat { * @param format The format of the string. * @return The formatted timestamp as string. */ - def apply( + def dateFormat( timestamp: Expression, format: Expression) : Expression = { unresolvedCall(DATE_FORMAT, timestamp, format) } -} - -/** - * Returns the (signed) number of [[TimePointUnit]] between timePoint1 and timePoint2. - * - * For example, timestampDiff(TimePointUnit.DAY, '2016-06-15'.toDate, '2016-06-18'.toDate leads - * to 3. - */ -object timestampDiff { /** * Returns the (signed) number of [[TimePointUnit]] between timePoint1 and timePoint2. @@ -1396,89 +1343,53 @@ object timestampDiff { * @param timePoint2 The second point in time. * @return The number of intervals as integer value. */ - def apply( + def timestampDiff( timePointUnit: TimePointUnit, timePoint1: Expression, timePoint2: Expression) : Expression = { unresolvedCall(TIMESTAMP_DIFF, timePointUnit, timePoint1, timePoint2) } -} - -/** - * Creates an array of literals. The array will be an array of objects (not primitives). - */ -object array { /** - * Creates an array of literals. The array will be an array of objects (not primitives). + * Creates an array of literals. */ - def apply(head: Expression, tail: Expression*): Expression = { + def array(head: Expression, tail: Expression*): Expression = { unresolvedCall(ARRAY, head +: tail: _*) } -} - -/** - * Creates a row of expressions. - */ -object row { /** * Creates a row of expressions. */ - def apply(head: Expression, tail: Expression*): Expression = { + def row(head: Expression, tail: Expression*): Expression = { unresolvedCall(ROW, head +: tail: _*) } -} - -/** - * Creates a map of expressions. The map will be a map between two objects (not primitives). - */ -object map { /** - * Creates a map of expressions. The map will be a map between two objects (not primitives). + * Creates a map of expressions. */ - def apply(key: Expression, value: Expression, tail: Expression*): Expression = { + def map(key: Expression, value: Expression, tail: Expression*): Expression = { unresolvedCall(MAP, key +: value +: tail: _*) } -} - -/** - * Returns a value that is closer than any other value to pi. - */ -object pi { /** * Returns a value that is closer than any other value to pi. */ - def apply(): Expression = { + def pi(): Expression = { unresolvedCall(PI) } -} - -/** - * Returns a value that is closer than any other value to e. - */ -object e { /** * Returns a value that is closer than any other value to e. */ - def apply(): Expression = { - unresolvedCall(FDE) + def e(): Expression = { + unresolvedCall(E) } -} - -/** - * Returns a pseudorandom double value between 0.0 (inclusive) and 1.0 (exclusive). - */ -object rand { /** * Returns a pseudorandom double value between 0.0 (inclusive) and 1.0 (exclusive). */ - def apply(): Expression = { + def rand(): Expression = { unresolvedCall(RAND) } @@ -1487,22 +1398,15 @@ object rand { * initial seed. Two rand() functions will return identical sequences of numbers if they * have same initial seed. */ - def apply(seed: Expression): Expression = { + def rand(seed: Expression): Expression = { unresolvedCall(RAND, seed) } -} - -/** - * Returns a pseudorandom integer value between 0.0 (inclusive) and the specified - * value (exclusive). - */ -object randInteger { /** * Returns a pseudorandom integer value between 0.0 (inclusive) and the specified * value (exclusive). */ - def apply(bound: Expression): Expression = { + def randInteger(bound: Expression): Expression = { unresolvedCall(RAND_INTEGER, bound) } @@ -1511,59 +1415,35 @@ object randInteger { * (exclusive) with a initial seed. Two randInteger() functions will return identical sequences * of numbers if they have same initial seed and same bound. */ - def apply(seed: Expression, bound: Expression): Expression = { + def randInteger(seed: Expression, bound: Expression): Expression = { unresolvedCall(RAND_INTEGER, seed, bound) } -} - -/** - * Returns the string that results from concatenating the arguments. - * Returns NULL if any argument is NULL. - */ -object concat { /** * Returns the string that results from concatenating the arguments. * Returns NULL if any argument is NULL. */ - def apply(string: Expression, strings: Expression*): Expression = { + def concat(string: Expression, strings: Expression*): Expression = { unresolvedCall(CONCAT, string +: strings: _*) } -} - -/** - * Calculates the arc tangent of a given coordinate. - */ -object atan2 { /** * Calculates the arc tangent of a given coordinate. */ - def apply(y: Expression, x: Expression): Expression = { + def atan2(y: Expression, x: Expression): Expression = { unresolvedCall(ATAN2, y, x) } -} -/** - * Returns the string that results from concatenating the arguments and separator. - * Returns NULL If the separator is NULL. - * - * Note: this user-defined function does not skip empty strings. However, it does skip any NULL - * values after the separator argument. - **/ -object concat_ws { - def apply(separator: Expression, string: Expression, strings: Expression*): Expression = { + /** + * Returns the string that results from concatenating the arguments and separator. + * Returns NULL If the separator is NULL. + * + * Note: this user-defined function does not skip empty strings. However, it does skip any NULL + * values after the separator argument. + **/ + def concat_ws(separator: Expression, string: Expression, strings: Expression*): Expression = { unresolvedCall(CONCAT_WS, separator +: string +: strings: _*) } -} - -/** - * Returns an UUID (Universally Unique Identifier) string (e.g., - * "3d3c68f7-f608-473f-b60c-b0c44ad4cc4e") according to RFC 4122 type 4 (pseudo randomly - * generated) UUID. The UUID is generated using a cryptographically strong pseudo random number - * generator. - */ -object uuid { /** * Returns an UUID (Universally Unique Identifier) string (e.g., @@ -1571,66 +1451,43 @@ object uuid { * generated) UUID. The UUID is generated using a cryptographically strong pseudo random number * generator. */ - def apply(): Expression = { - unresolvedCall(FDUUID) + def uuid(): Expression = { + unresolvedCall(UUID) } -} - -/** - * Returns a null literal value of a given data type. - * - * e.g. nullOf(DataTypes.INT()) - */ -object nullOf { /** * Returns a null literal value of a given data type. * * e.g. nullOf(DataTypes.INT()) */ - def apply(dataType: DataType): Expression = { + def nullOf(dataType: DataType): Expression = { valueLiteral(null, dataType) } /** - * @deprecated This method will be removed in future versions as it uses the old type system. It - * is recommended to use [[apply(DataType)]] instead which uses the new type system - * based on [[DataTypes]]. Please make sure to use either the old or the new type - * system consistently to avoid unintended behavior. See the website documentation - * for more information. + * @deprecated This method will be removed in future versions as it uses the old type system. + * It is recommended to use [[nullOf(DataType)]] instead which uses the new type + * system based on [[DataTypes]]. Please make sure to use either the old or the new + * type system consistently to avoid unintended behavior. See the website + * documentation for more information. */ - def apply(typeInfo: TypeInformation[_]): Expression = { - apply(TypeConversions.fromLegacyInfoToDataType(typeInfo)) + def nullOf(typeInfo: TypeInformation[_]): Expression = { + nullOf(TypeConversions.fromLegacyInfoToDataType(typeInfo)) } -} - -/** - * Calculates the logarithm of the given value. - */ -object log { /** - * Calculates the natural logarithm of the given value. + * Calculates the logarithm of the given value. */ - def apply(value: Expression): Expression = { + def log(value: Expression): Expression = { unresolvedCall(LOG, value) } /** * Calculates the logarithm of the given value to the given base. */ - def apply(base: Expression, value: Expression): Expression = { + def log(base: Expression, value: Expression): Expression = { unresolvedCall(LOG, base, value) } -} - -/** - * Ternary conditional operator that decides which of two other expressions should be evaluated - * based on a evaluated boolean condition. - * - * e.g. ifThenElse(42 > 5, "A", "B") leads to "A" - */ -object ifThenElse { /** * Ternary conditional operator that decides which of two other expressions should be evaluated @@ -1642,30 +1499,34 @@ object ifThenElse { * @param ifTrue expression to be evaluated if condition holds * @param ifFalse expression to be evaluated if condition does not hold */ - def apply(condition: Expression, ifTrue: Expression, ifFalse: Expression): Expression = { + def ifThenElse(condition: Expression, ifTrue: Expression, ifFalse: Expression): Expression = { unresolvedCall(IF, condition, ifTrue, ifFalse) } -} -/** - * Creates a withColumns expression. - */ -object withColumns { - - def apply(head: Expression, tail: Expression*): Expression = { + /** + * Creates an expression that selects a range of columns. It can be used wherever an array of + * expression is accepted such as function calls, projections, or groupings. + * + * A range can either be index-based or name-based. Indices start at 1 and boundaries are + * inclusive. + * + * e.g. withColumns('b to 'c) or withColumns('*) + */ + def withColumns(head: Expression, tail: Expression*): Expression = { unresolvedCall(WITH_COLUMNS, head +: tail: _*) } -} -/** - * Creates a withoutColumns expression. - */ -object withoutColumns { - - def apply(head: Expression, tail: Expression*): Expression = { + /** + * Creates an expression that selects all columns except for the given range of columns. It can + * be used wherever an array of expression is accepted such as function calls, projections, or + * groupings. + * + * A range can either be index-based or name-based. Indices start at 1 and boundaries are + * inclusive. + * + * e.g. withoutColumns('b to 'c) or withoutColumns('c) + */ + def withoutColumns(head: Expression, tail: Expression*): Expression = { unresolvedCall(WITHOUT_COLUMNS, head +: tail: _*) } } - - -// scalastyle:on object.name diff --git a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/package.scala b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/package.scala new file mode 100644 index 0000000..2fb4f32 --- /dev/null +++ b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/package.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table + +import org.apache.flink.table.api.{ImplicitExpressionConversions, ImplicitExpressionOperations, Table, TableEnvironment} + +/** + * == Table & SQL API == + * + * This package contains the API of the Table & SQL API. Users create [[Table]]s on which + * relational operations can be performed. + * + * For accessing all API classes and implicit conversions, use the following imports: + * + * {{{ + * import org.apache.flink.table.api._ + * }}} + * + * More information about the entry points of the API can be found in [[TableEnvironment]]. + * + * Available implicit expressions are listed in [[ImplicitExpressionConversions]] and + * [[ImplicitExpressionOperations]]. + * + * Please refer to the website documentation about how to construct and run table programs. + */ +package object api /* extends ImplicitExpressionConversions */ { + + // This package object should extend from ImplicitExpressionConversions but would clash with + // "org.apache.flink.table.api.scala._" therefore we postpone splitting the package object into + // two and let users update there imports first +} + diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/package.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/package.scala deleted file mode 100644 index 64e7801..0000000 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/package.scala +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table - -/** - * == Table API == - * - * This package contains the API of the Table API. It can be used with Flink Streaming - * and Flink Batch. From Scala as well as from Java. - * - * When using the Table API, as user creates a [[org.apache.flink.table.api.Table]] from - * a DataSet or DataStream. On this relational operations can be performed. A table can also - * be converted back to a DataSet or DataStream. - * - * Packages [[org.apache.flink.table.api.scala]] and [[org.apache.flink.table.api.java]] contain - * the language specific part of the API. Refer to these packages for documentation on how - * the Table API can be used in Java and Scala. - */ -package object api diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/package.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/package.scala deleted file mode 100644 index 8e4855b..0000000 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/package.scala +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.api - -import org.apache.flink.types.Row -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.DataSet -import org.apache.flink.streaming.api.scala.DataStream -import org.apache.flink.table.api.internal.TableImpl -import org.apache.flink.table.api.scala.{StreamTableEnvironment => ScalaStreamTableEnv} -import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTableEnv} - -import _root_.scala.language.implicitConversions - -/** - * == Table API (Scala) == - * - * Importing this package with: - * - * {{{ - * import org.apache.flink.table.api.scala._ - * }}} - * - * imports implicit conversions for converting a [[DataSet]] and a [[DataStream]] to a - * [[Table]]. This can be used to perform SQL-like queries on data. Please have - * a look at [[Table]] to see which operations are supported and - * [[org.apache.flink.table.api.scala.ImplicitExpressionOperations]] to see how an - * expression can be specified. - * - * When writing a query you can use Scala Symbols to refer to field names. One would - * refer to field `a` by writing `'a`. Sometimes it is necessary to manually convert a - * Scala literal to an expression literal, in those cases use `.toExpr`, as in `3.toExpr`. - * - * Example: - * - * {{{ - * import org.apache.flink.api.scala._ - * import org.apache.flink.table.api.scala._ - * - * val env = ExecutionEnvironment.getExecutionEnvironment - * val tEnv = TableEnvironment.getTableEnvironment(env) - * - * val input: DataSet[(String, Int)] = env.fromElements(("Hello", 2), ("Hello", 5), ("Ciao", 3)) - * val result = input - * .toTable(tEnv, 'word, 'count) - * .groupBy('word) - * .select('word, 'count.avg) - * - * result.print() - * }}} - * - */ -package object scala extends ImplicitExpressionConversions { - - implicit def table2TableConversions(table: Table): TableConversions = { - new TableConversions(table.asInstanceOf[TableImpl]) - } - - implicit def dataSet2DataSetConversions[T](set: DataSet[T]): DataSetConversions[T] = { - new DataSetConversions[T](set, set.getType()) - } - - implicit def table2RowDataSet(table: Table): DataSet[Row] = { - val tableEnv = - table.asInstanceOf[TableImpl].getTableEnvironment.asInstanceOf[ScalaBatchTableEnv] - tableEnv.toDataSet[Row](table) - } - - implicit def dataStream2DataStreamConversions[T](set: DataStream[T]): DataStreamConversions[T] = { - new DataStreamConversions[T](set, set.dataType) - } - - implicit def table2RowDataStream(table: Table): DataStream[Row] = { - val tableEnv = - table.asInstanceOf[TableImpl].getTableEnvironment.asInstanceOf[ScalaStreamTableEnv] - tableEnv.toAppendStream[Row](table) - } -}