[FLINK-1996] [tableApi] Add TableSink interface to emit tables to external 
storage.

- Add a CsvTableSink

This closes #1961


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/13bce315
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/13bce315
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/13bce315

Branch: refs/heads/master
Commit: 13bce31507c431bcc20994bc8ce9dbeb7eba3b96
Parents: 8ec47f1
Author: Fabian Hueske <fhue...@apache.org>
Authored: Sat Apr 30 21:11:40 2016 +0200
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Wed May 4 21:21:49 2016 +0200

----------------------------------------------------------------------
 docs/apis/table.md                              | 47 +++++++++++
 .../flink/api/table/BatchTableEnvironment.scala | 25 ++++++
 .../api/table/StreamTableEnvironment.scala      | 25 ++++++
 .../flink/api/table/TableEnvironment.scala      | 11 +++
 .../flink/api/table/sinks/BatchTableSink.scala  | 31 +++++++
 .../flink/api/table/sinks/CsvTableSink.scala    | 86 ++++++++++++++++++++
 .../flink/api/table/sinks/StreamTableSink.scala | 31 +++++++
 .../flink/api/table/sinks/TableSink.scala       | 86 ++++++++++++++++++++
 .../org/apache/flink/api/table/table.scala      | 28 +++++++
 .../flink/api/scala/batch/TableSinkITCase.scala | 73 +++++++++++++++++
 .../api/scala/batch/TableSourceITCase.scala     |  8 +-
 .../api/scala/stream/TableSinkITCase.scala      | 67 +++++++++++++++
 12 files changed, 513 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/13bce315/docs/apis/table.md
----------------------------------------------------------------------
diff --git a/docs/apis/table.md b/docs/apis/table.md
index 14f05aa..3defdbb 100644
--- a/docs/apis/table.md
+++ b/docs/apis/table.md
@@ -685,6 +685,53 @@ SQL queries can be executed on DataStream Tables by adding 
the `STREAM` SQL keyw
 
 {% top %}
 
+Emit a Table to external sinks
+----
+
+A `Table` can be emitted to a `TableSink`, which is a generic interface to 
support a wide variaty of file formats (e.g., CSV, Apache Parquet, Apache 
Avro), storage systems (e.g., JDBC, Apache HBase, Apache Cassandra, 
Elasticsearch), or messaging systems (e.g., Apache Kafka, RabbitMQ), and 
others. A batch `Table` can only be emitted by a `BatchTableSink`, a streaming 
table requires a `StreamTableSink` (a `TableSink` can implement both 
interfaces). 
+
+Currently, Flink only provides a `CsvTableSink` that writes a batch or 
streaming `Table` to CSV-formatted files. A custom `TableSource` can be defined 
by implementing the `BatchTableSink` and/or `StreamTableSink` interface. 
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+// compute the result Table using Table API operators and/or SQL queries
+Table result = ... 
+
+// create a TableSink
+TableSink sink = new CsvTableSink("/path/to/file", fieldDelim = "|");
+// add a TableSink to emit the result Table
+result.toSink(sink);
+
+// execute the program
+env.execute();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tableEnv = TableEnvironment.getTableEnvironment(env)
+
+// compute the result Table using Table API operators and/or SQL queries
+val result: Table = ... 
+
+// create a TableSink
+val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|")
+// add a TableSink to emit the result Table
+result.toSink(sink)
+
+// execute the program
+env.execute()
+{% endhighlight %}
+</div>
+</div>
+
+{% top %}
+
 Runtime Configuration
 ----
 The Table API provides a configuration (the so-called `TableConfig`) to modify 
runtime behavior. It can be accessed either through `TableEnvironment` or 
passed to the `toDataSet`/`toDataStream` method when using Scala implicit 
conversion.

http://git-wip-us.apache.org/repos/asf/flink/blob/13bce315/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
index b38c7f0..39e3105 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
@@ -34,6 +34,7 @@ import org.apache.flink.api.table.plan.PlanGenException
 import org.apache.flink.api.table.plan.nodes.dataset.{DataSetRel, 
DataSetConvention}
 import org.apache.flink.api.table.plan.rules.FlinkRuleSets
 import org.apache.flink.api.table.plan.schema.{TableSourceTable, DataSetTable}
+import org.apache.flink.api.table.sinks.{BatchTableSink, TableSink}
 import org.apache.flink.api.table.sources.BatchTableSource
 
 /**
@@ -136,6 +137,30 @@ abstract class BatchTableEnvironment(
   }
 
   /**
+    * Emits a [[Table]] to a [[TableSink]].
+    *
+    * Internally, the [[Table]] is translated into a [[DataSet]] and handed 
over to the
+    * [[TableSink]] to emit it.
+    *
+    * @param table The [[Table]] to emit.
+    * @param sink The [[TableSink]] to emit the [[Table]] to.
+    * @tparam T The expected type of the [[DataSet]] which represents the 
[[Table]].
+    */
+  override private[flink] def emitToSink[T](table: Table, sink: TableSink[T]): 
Unit = {
+
+    sink match {
+      case batchSink: BatchTableSink[T] =>
+        val outputType = sink.getOutputType
+        // translate the Table into a DataSet and provide the type that the 
TableSink expects.
+        val result: DataSet[T] = translate(table)(outputType)
+        // Give the DataSet to the TableSink to emit it.
+        batchSink.emitDataSet(result)
+      case _ =>
+        throw new TableException("BatchTableSink required to emit batch Table")
+    }
+  }
+
+  /**
     * Returns the AST of the specified Table API and SQL queries and the 
execution plan to compute
     * the result of the given [[Table]].
     *

http://git-wip-us.apache.org/repos/asf/flink/blob/13bce315/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
index 81f0d67..918a65f 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
@@ -30,6 +30,7 @@ import org.apache.flink.api.table.plan.PlanGenException
 import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamRel, 
DataStreamConvention}
 import org.apache.flink.api.table.plan.rules.FlinkRuleSets
 import org.apache.flink.api.table.plan.schema.{TableSourceTable, 
TransStreamTable, DataStreamTable}
+import org.apache.flink.api.table.sinks.{StreamTableSink, TableSink}
 import org.apache.flink.api.table.sources.StreamTableSource
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
@@ -134,6 +135,30 @@ abstract class StreamTableEnvironment(
   }
 
   /**
+    * Emits a [[Table]] to a [[TableSink]].
+    *
+    * Internally, the [[Table]] is translated into a [[DataStream]] and handed 
over to the
+    * [[TableSink]] to emit it.
+    *
+    * @param table The [[Table]] to emit.
+    * @param sink The [[TableSink]] to emit the [[Table]] to.
+    * @tparam T The expected type of the [[DataStream]] which represents the 
[[Table]].
+    */
+  override private[flink] def emitToSink[T](table: Table, sink: TableSink[T]): 
Unit = {
+
+    sink match {
+      case streamSink: StreamTableSink[T] =>
+        val outputType = sink.getOutputType
+        // translate the Table into a DataStream and provide the type that the 
TableSink expects.
+        val result: DataStream[T] = translate(table)(outputType)
+        // Give the DataSet to the TableSink to emit it.
+        streamSink.emitDataStream(result)
+      case _ =>
+        throw new TableException("StreamTableSink required to emit streaming 
Table")
+    }
+  }
+
+  /**
     * Registers a [[DataStream]] as a table under a given name in the 
[[TableEnvironment]]'s
     * catalog.
     *

http://git-wip-us.apache.org/repos/asf/flink/blob/13bce315/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
index 633b54f..6ccde47 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
@@ -38,6 +38,8 @@ import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 import org.apache.flink.api.table.expressions.{Naming, 
UnresolvedFieldReference, Expression}
 import org.apache.flink.api.table.plan.cost.DataSetCostFactory
 import org.apache.flink.api.table.plan.schema.{TransStreamTable, RelTable}
+import org.apache.flink.api.table.sinks.TableSink
+import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment 
=> JavaStreamExecEnv}
 import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment => 
ScalaStreamExecEnv}
 
@@ -137,6 +139,15 @@ abstract class TableEnvironment(val config: TableConfig) {
   def sql(query: String): Table
 
   /**
+    * Emits a [[Table]] to a [[TableSink]].
+    *
+    * @param table The [[Table]] to emit.
+    * @param sink The [[TableSink]] to emit the [[Table]] to.
+    * @tparam T The data type that the [[TableSink]] expects.
+    */
+  private[flink] def emitToSink[T](table: Table, sink: TableSink[T]): Unit
+
+  /**
     * Registers a Calcite [[AbstractTable]] in the TableEnvironment's catalog.
     *
     * @param name The name under which the table is registered.

http://git-wip-us.apache.org/repos/asf/flink/blob/13bce315/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/BatchTableSink.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/BatchTableSink.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/BatchTableSink.scala
new file mode 100644
index 0000000..27dbe8e
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/BatchTableSink.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.sinks
+
+import org.apache.flink.api.java.DataSet
+
+/** Defines an external [[TableSink]] to emit a batch 
[[org.apache.flink.api.table.Table]].
+  *
+  * @tparam T Type of [[DataSet]] that this [[TableSink]] expects and supports.
+  */
+trait BatchTableSink[T] extends TableSink[T] {
+
+  /** Emits the DataSet. */
+  def emitDataSet(dataSet: DataSet[T]): Unit
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/13bce315/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/CsvTableSink.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/CsvTableSink.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/CsvTableSink.scala
new file mode 100644
index 0000000..ed05caf
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/CsvTableSink.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.sinks
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.datastream.DataStream
+
+/**
+  * A simple [[TableSink]] to emit data as CSV files.
+  *
+  * @param path The output path to write the Table to.
+  * @param fieldDelim The field delimiter, ',' by default.
+  */
+class CsvTableSink(
+    path: String,
+    fieldDelim: String = ",")
+  extends BatchTableSink[Row] with StreamTableSink[Row] {
+
+  override def emitDataSet(dataSet: DataSet[Row]): Unit = {
+    dataSet
+      .map(new CsvFormatter(fieldDelim))
+      .writeAsText(path)
+  }
+
+  override def emitDataStream(dataStream: DataStream[Row]): Unit = {
+    dataStream
+      .map(new CsvFormatter(fieldDelim))
+      .writeAsText(path)
+  }
+
+  override protected def copy: TableSink[Row] = {
+    new CsvTableSink(path, fieldDelim)
+  }
+
+  override def getOutputType: TypeInformation[Row] = {
+    new RowTypeInfo(getFieldTypes)
+  }
+}
+
+/**
+  * Formats a [[Row]] into a [[String]] with fields separated by the field 
delimiter.
+  *
+  * @param fieldDelim The field delimiter.
+  */
+class CsvFormatter(fieldDelim: String) extends MapFunction[Row, String] {
+  override def map(row: Row): String = {
+
+    val builder = new StringBuilder
+
+    // write first value
+    val v = row.productElement(0)
+    if (v != null) {
+      builder.append(v.toString)
+    }
+
+    // write following values
+    for (i <- 1 until row.productArity) {
+      builder.append(fieldDelim)
+      val v = row.productElement(i)
+      if (v != null) {
+        builder.append(v.toString)
+      }
+    }
+    builder.mkString
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/13bce315/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/StreamTableSink.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/StreamTableSink.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/StreamTableSink.scala
new file mode 100644
index 0000000..61ef3b2
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/StreamTableSink.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.sinks
+
+import org.apache.flink.streaming.api.datastream.DataStream
+
+/** Defines an external [[TableSink]] to emit a batch 
[[org.apache.flink.api.table.Table]].
+  *
+  * @tparam T Type of [[DataStream]] that this [[TableSink]] expects and 
supports.
+  */
+trait StreamTableSink[T] extends TableSink[T] {
+
+  /** Emits the DataStream. */
+  def emitDataStream(dataStream: DataStream[T]): Unit
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/13bce315/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSink.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSink.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSink.scala
new file mode 100644
index 0000000..12e57de
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSink.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.sinks
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+/** A [[TableSink]] specifies how to emit a 
[[org.apache.flink.api.table.Table]] to an external
+  * system or location.
+  *
+  * The interface is generic such that it can support different storage 
locations and formats.
+  *
+  * @tparam T The return type of the [[TableSink]].
+  */
+trait TableSink[T] {
+
+  private var fieldNames: Option[Array[String]] = None
+  private var fieldTypes: Option[Array[TypeInformation[_]]] = None
+
+  /**
+    * Return the type expected by this [[TableSink]].
+    *
+    * This type should depend on the types returned by [[getFieldNames]].
+    *
+    * @return The type expected by this [[TableSink]].
+    */
+  def getOutputType: TypeInformation[T]
+
+  /** Return a deep copy of the [[TableSink]]. */
+  protected def copy: TableSink[T]
+
+  /**
+    * Return the field names of the [[org.apache.flink.api.table.Table]] to 
emit. */
+  protected final def getFieldNames: Array[String] = {
+    fieldNames match {
+      case Some(n) => n
+      case None => throw new IllegalStateException(
+        "TableSink must be configured to retrieve field names.")
+    }
+  }
+
+  /** Return the field types of the [[org.apache.flink.api.table.Table]] to 
emit. */
+  protected final def getFieldTypes: Array[TypeInformation[_]] = {
+    fieldTypes match {
+      case Some(t) => t
+      case None => throw new IllegalStateException(
+        "TableSink must be configured to retrieve field types.")
+    }
+  }
+
+  /**
+    * Return a copy of this [[TableSink]] configured with the field names and 
types of the
+    * [[org.apache.flink.api.table.Table]] to emit.
+    *
+    * @param fieldNames The field names of the table to emit.
+    * @param fieldTypes The field types of the table to emit.
+    * @return A copy of this [[TableSink]] configured with the field names and 
types of the
+    *         [[org.apache.flink.api.table.Table]] to emit.
+    */
+  private[flink] final def configure(
+    fieldNames: Array[String],
+    fieldTypes: Array[TypeInformation[_]]): TableSink[T] = {
+
+    val configuredSink = this.copy
+    configuredSink.fieldNames = Some(fieldNames)
+    configuredSink.fieldTypes = Some(fieldTypes)
+
+    configuredSink
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/13bce315/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
index ed166bc..5356a9d 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
@@ -25,9 +25,12 @@ import org.apache.calcite.rex.{RexCall, RexInputRef, 
RexLiteral, RexNode}
 import org.apache.calcite.sql.SqlKind
 import org.apache.calcite.tools.RelBuilder.{AggCall, GroupKey}
 import org.apache.calcite.util.NlsString
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.table.plan.PlanGenException
 import org.apache.flink.api.table.plan.RexNodeTranslator.extractAggCalls
 import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.sinks.TableSink
+import org.apache.flink.api.table.typeutils.TypeConverter
 
 import scala.collection.mutable
 import scala.collection.JavaConverters._
@@ -444,6 +447,31 @@ class Table(
     orderBy(parsedFields: _*)
   }
 
+  /**
+    * Emits the [[Table]] to a [[TableSink]]. A [[TableSink]] defines an 
external storage location.
+    *
+    * A batch [[Table]] can only be emitted by a
+    * [[org.apache.flink.api.table.sinks.BatchTableSink]], a streaming 
[[Table]] requires a
+    * [[org.apache.flink.api.table.sinks.StreamTableSink]].
+    *
+    * @param sink The [[TableSink]] to which the [[Table]] is emitted.
+    * @tparam T The data type that the [[TableSink]] expects.
+    */
+  def toSink[T](sink: TableSink[T]): Unit = {
+
+    // get schema information of table
+    val rowType = relNode.getRowType
+    val fieldNames: Array[String] = rowType.getFieldNames.asScala.toArray
+    val fieldTypes: Array[TypeInformation[_]] = rowType.getFieldList.asScala
+      .map(f => 
TypeConverter.sqlTypeToTypeInfo(f.getType.getSqlTypeName)).toArray
+
+    // configure the table sink
+    val configuredSink = sink.configure(fieldNames, fieldTypes)
+
+    // emit the table to the configured table sink
+    tableEnv.emitToSink(this, configuredSink)
+  }
+
   private def createRenamingProject(exprs: Seq[RexNode]): LogicalProject = {
 
     val names = exprs.map{ e =>

http://git-wip-us.apache.org/repos/asf/flink/blob/13bce315/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSinkITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSinkITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSinkITCase.scala
new file mode 100644
index 0000000..39684ff
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSinkITCase.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.batch
+
+import java.io.File
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.apache.flink.api.table.{Row, TableEnvironment}
+import org.apache.flink.api.table.sinks.{CsvTableSink, TableSink, 
BatchTableSink}
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+
+@RunWith(classOf[Parameterized])
+class TableSinkITCase(
+    mode: TestExecutionMode)
+  extends MultipleProgramsTestBase(mode) {
+
+  @Test
+  def testBatchTableSink(): Unit = {
+
+    val tmpFile = File.createTempFile("flink-table-sink-test", ".tmp")
+    tmpFile.deleteOnExit()
+    val path = "file:///" + tmpFile.getAbsolutePath
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setParallelism(4)
+
+    val input = CollectionDataSets.get3TupleDataSet(env)
+      .map(x => x).setParallelism(4) // increase DOP to 4
+
+    val results = input.toTable(tEnv, 'a, 'b, 'c)
+      .where('a < 5 || 'a > 17)
+      .select('c, 'b)
+      .toSink(new CsvTableSink(path, fieldDelim = "|"))
+
+    env.execute()
+
+    val expected = Seq(
+      "Hi|1", "Hello|2", "Hello world|2", "Hello world, how are you?|3",
+      "Comment#12|6", "Comment#13|6", "Comment#14|6", 
"Comment#15|6").mkString("\n")
+
+    TestBaseUtils.compareResultsByLinesInMemory(expected, path)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/13bce315/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSourceITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSourceITCase.scala
index def6fb6..6fd0d13 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSourceITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSourceITCase.scala
@@ -31,7 +31,7 @@ import org.apache.flink.api.table.sources.{BatchTableSource, 
CsvTableSource}
 import org.apache.flink.api.table.typeutils.RowTypeInfo
 import org.apache.flink.api.table.{Row, TableEnvironment}
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.TestBaseUtils
+import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
 import org.junit.Test
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
@@ -39,10 +39,8 @@ import org.junit.runners.Parameterized
 import scala.collection.JavaConverters._
 
 @RunWith(classOf[Parameterized])
-class TableSourceITCase(
-    mode: TestExecutionMode,
-    configMode: TableConfigMode)
-  extends TableProgramsTestBase(mode, configMode) {
+class TableSourceITCase(mode: TestExecutionMode)
+  extends MultipleProgramsTestBase(mode) {
 
   @Test
   def testBatchTableSourceTableAPI(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/13bce315/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSinkITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSinkITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSinkITCase.scala
new file mode 100644
index 0000000..66cb9bf
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSinkITCase.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.stream
+
+import java.io.File
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.stream.utils.StreamTestData
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.sinks.{CsvTableSink, StreamTableSink, 
TableSink}
+import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.apache.flink.api.table.{Row, TableEnvironment}
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit.Test
+
+class TableSinkITCase extends StreamingMultipleProgramsTestBase {
+
+  @Test
+  def testStreamTableSink(): Unit = {
+
+    val tmpFile = File.createTempFile("flink-table-sink-test", ".tmp")
+    tmpFile.deleteOnExit()
+    val path = "file:///" + tmpFile.getAbsolutePath
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setParallelism(4)
+
+    val input = StreamTestData.get3TupleDataStream(env)
+      .map(x => x).setParallelism(4) // increase DOP to 4
+
+    val results = input.toTable(tEnv, 'a, 'b, 'c)
+      .where('a < 5 || 'a > 17)
+      .select('c, 'b)
+      .toSink(new CsvTableSink(path))
+
+    env.execute()
+
+    val expected = Seq(
+      "Hi,1", "Hello,2", "Hello world,2", "Hello world, how are you?,3",
+      "Comment#12,6", "Comment#13,6", "Comment#14,6", 
"Comment#15,6").mkString("\n")
+
+    TestBaseUtils.compareResultsByLinesInMemory(expected, path)
+  }
+
+}

Reply via email to