Repository: flink Updated Branches: refs/heads/master 343d05a40 -> 78f551194
[FLINK-3955] [tableAPI] Rename Table.toSink() to Table.writeToSink(). This closes #2023 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/829c75c4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/829c75c4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/829c75c4 Branch: refs/heads/master Commit: 829c75c49531c64b5c73acd199d3d2a87388d54f Parents: 343d05a Author: Fabian Hueske <fhue...@apache.org> Authored: Mon May 23 16:49:52 2016 +0200 Committer: Fabian Hueske <fhue...@apache.org> Committed: Tue May 24 09:43:26 2016 +0200 ---------------------------------------------------------------------- docs/apis/table.md | 16 ++++++++-------- .../flink/api/table/BatchTableEnvironment.scala | 10 +++++----- .../flink/api/table/StreamTableEnvironment.scala | 10 +++++----- .../apache/flink/api/table/TableEnvironment.scala | 8 ++++---- .../scala/org/apache/flink/api/table/table.scala | 10 +++++----- .../flink/api/scala/batch/TableSinkITCase.scala | 2 +- .../flink/api/scala/stream/TableSinkITCase.scala | 2 +- 7 files changed, 29 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/829c75c4/docs/apis/table.md ---------------------------------------------------------------------- diff --git a/docs/apis/table.md b/docs/apis/table.md index 276341d..f33ae59 100644 --- a/docs/apis/table.md +++ b/docs/apis/table.md @@ -685,12 +685,12 @@ SQL queries can be executed on DataStream Tables by adding the `STREAM` SQL keyw {% top %} -Emit a Table to external sinks +Write Tables to external sinks ---- -A `Table` can be emitted to a `TableSink`, which is a generic interface to support a wide variety of file formats (e.g. CSV, Apache Parquet, Apache Avro), storage systems (e.g., JDBC, Apache HBase, Apache Cassandra, Elasticsearch), or messaging systems (e.g., Apache Kafka, RabbitMQ), and others. A batch `Table` can only be emitted by a `BatchTableSink`, a streaming table requires a `StreamTableSink` (a `TableSink` can implement both interfaces). +A `Table` can be written to a `TableSink`, which is a generic interface to support a wide variety of file formats (e.g. CSV, Apache Parquet, Apache Avro), storage systems (e.g., JDBC, Apache HBase, Apache Cassandra, Elasticsearch), or messaging systems (e.g., Apache Kafka, RabbitMQ). A batch `Table` can only be written to a `BatchTableSink`, a streaming table requires a `StreamTableSink`. A `TableSink` can implement both interfaces at the same time. -Currently, Flink only provides a `CsvTableSink` that writes a batch or streaming `Table` to CSV-formatted files. A custom `TableSource` can be defined by implementing the `BatchTableSink` and/or `StreamTableSink` interface. +Currently, Flink only provides a `CsvTableSink` that writes a batch or streaming `Table` to CSV-formatted files. A custom `TableSink` can be defined by implementing the `BatchTableSink` and/or `StreamTableSink` interface. <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> @@ -703,8 +703,8 @@ Table result = ... // create a TableSink TableSink sink = new CsvTableSink("/path/to/file", fieldDelim = "|"); -// add a TableSink to emit the result Table -result.toSink(sink); +// write the result Table to the TableSink +result.writeToSink(sink); // execute the program env.execute(); @@ -721,8 +721,8 @@ val result: Table = ... // create a TableSink val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|") -// add a TableSink to emit the result Table -result.toSink(sink) +// write the result Table to the TableSink +result.writeToSink(sink) // execute the program env.execute() @@ -737,5 +737,5 @@ Runtime Configuration The Table API provides a configuration (the so-called `TableConfig`) to modify runtime behavior. It can be accessed either through `TableEnvironment` or passed to the `toDataSet`/`toDataStream` method when using Scala implicit conversion. ### Null Handling -By default, the Table API supports `null` values. Null handling can be disabled by setting the `nullCheck` property in the `TableConfig` to `false`. +By default, the Table API supports `null` values. Null handling can be disabled to improve preformance by setting the `nullCheck` property in the `TableConfig` to `false`. http://git-wip-us.apache.org/repos/asf/flink/blob/829c75c4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala index b25c940..4c8b370 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala @@ -136,16 +136,16 @@ abstract class BatchTableEnvironment( } /** - * Emits a [[Table]] to a [[TableSink]]. + * Writes a [[Table]] to a [[TableSink]]. * * Internally, the [[Table]] is translated into a [[DataSet]] and handed over to the - * [[TableSink]] to emit it. + * [[TableSink]] to write it. * - * @param table The [[Table]] to emit. - * @param sink The [[TableSink]] to emit the [[Table]] to. + * @param table The [[Table]] to write. + * @param sink The [[TableSink]] to write the [[Table]] to. * @tparam T The expected type of the [[DataSet]] which represents the [[Table]]. */ - override private[flink] def emitToSink[T](table: Table, sink: TableSink[T]): Unit = { + override private[flink] def writeToSink[T](table: Table, sink: TableSink[T]): Unit = { sink match { case batchSink: BatchTableSink[T] => http://git-wip-us.apache.org/repos/asf/flink/blob/829c75c4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala index 8ba3000..bacb587 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala @@ -137,16 +137,16 @@ abstract class StreamTableEnvironment( } /** - * Emits a [[Table]] to a [[TableSink]]. + * Writes a [[Table]] to a [[TableSink]]. * * Internally, the [[Table]] is translated into a [[DataStream]] and handed over to the - * [[TableSink]] to emit it. + * [[TableSink]] to write it. * - * @param table The [[Table]] to emit. - * @param sink The [[TableSink]] to emit the [[Table]] to. + * @param table The [[Table]] to write. + * @param sink The [[TableSink]] to write the [[Table]] to. * @tparam T The expected type of the [[DataStream]] which represents the [[Table]]. */ - override private[flink] def emitToSink[T](table: Table, sink: TableSink[T]): Unit = { + override private[flink] def writeToSink[T](table: Table, sink: TableSink[T]): Unit = { sink match { case streamSink: StreamTableSink[T] => http://git-wip-us.apache.org/repos/asf/flink/blob/829c75c4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala index 1c592f9..7debb65 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala @@ -147,13 +147,13 @@ abstract class TableEnvironment(val config: TableConfig) { def sql(query: String): Table /** - * Emits a [[Table]] to a [[TableSink]]. + * Writes a [[Table]] to a [[TableSink]]. * - * @param table The [[Table]] to emit. - * @param sink The [[TableSink]] to emit the [[Table]] to. + * @param table The [[Table]] to write. + * @param sink The [[TableSink]] to write the [[Table]] to. * @tparam T The data type that the [[TableSink]] expects. */ - private[flink] def emitToSink[T](table: Table, sink: TableSink[T]): Unit + private[flink] def writeToSink[T](table: Table, sink: TableSink[T]): Unit /** * Registers a Calcite [[AbstractTable]] in the TableEnvironment's catalog. http://git-wip-us.apache.org/repos/asf/flink/blob/829c75c4/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala index 4f111c9..1e558c5 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala @@ -315,16 +315,16 @@ class Table( } /** - * Emits the [[Table]] to a [[TableSink]]. A [[TableSink]] defines an external storage location. + * Writes the [[Table]] to a [[TableSink]]. A [[TableSink]] defines an external storage location. * - * A batch [[Table]] can only be emitted by a + * A batch [[Table]] can only be written to a * [[org.apache.flink.api.table.sinks.BatchTableSink]], a streaming [[Table]] requires a * [[org.apache.flink.api.table.sinks.StreamTableSink]]. * - * @param sink The [[TableSink]] to which the [[Table]] is emitted. + * @param sink The [[TableSink]] to which the [[Table]] is written. * @tparam T The data type that the [[TableSink]] expects. */ - def toSink[T](sink: TableSink[T]): Unit = { + def writeToSink[T](sink: TableSink[T]): Unit = { // get schema information of table val rowType = getRelNode.getRowType @@ -336,7 +336,7 @@ class Table( val configuredSink = sink.configure(fieldNames, fieldTypes) // emit the table to the configured table sink - tableEnv.emitToSink(this, configuredSink) + tableEnv.writeToSink(this, configuredSink) } } http://git-wip-us.apache.org/repos/asf/flink/blob/829c75c4/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSinkITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSinkITCase.scala index 39684ff..dd0668c 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSinkITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSinkITCase.scala @@ -59,7 +59,7 @@ class TableSinkITCase( val results = input.toTable(tEnv, 'a, 'b, 'c) .where('a < 5 || 'a > 17) .select('c, 'b) - .toSink(new CsvTableSink(path, fieldDelim = "|")) + .writeToSink(new CsvTableSink(path, fieldDelim = "|")) env.execute() http://git-wip-us.apache.org/repos/asf/flink/blob/829c75c4/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSinkITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSinkITCase.scala index 66cb9bf..160d88a 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSinkITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSinkITCase.scala @@ -53,7 +53,7 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase { val results = input.toTable(tEnv, 'a, 'b, 'c) .where('a < 5 || 'a > 17) .select('c, 'b) - .toSink(new CsvTableSink(path)) + .writeToSink(new CsvTableSink(path)) env.execute()