[FLINK-1996] [tableApi] Add TableSink interface to emit tables to external storage.
- Add a CsvTableSink This closes #1961 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/13bce315 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/13bce315 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/13bce315 Branch: refs/heads/master Commit: 13bce31507c431bcc20994bc8ce9dbeb7eba3b96 Parents: 8ec47f1 Author: Fabian Hueske <fhue...@apache.org> Authored: Sat Apr 30 21:11:40 2016 +0200 Committer: Fabian Hueske <fhue...@apache.org> Committed: Wed May 4 21:21:49 2016 +0200 ---------------------------------------------------------------------- docs/apis/table.md | 47 +++++++++++ .../flink/api/table/BatchTableEnvironment.scala | 25 ++++++ .../api/table/StreamTableEnvironment.scala | 25 ++++++ .../flink/api/table/TableEnvironment.scala | 11 +++ .../flink/api/table/sinks/BatchTableSink.scala | 31 +++++++ .../flink/api/table/sinks/CsvTableSink.scala | 86 ++++++++++++++++++++ .../flink/api/table/sinks/StreamTableSink.scala | 31 +++++++ .../flink/api/table/sinks/TableSink.scala | 86 ++++++++++++++++++++ .../org/apache/flink/api/table/table.scala | 28 +++++++ .../flink/api/scala/batch/TableSinkITCase.scala | 73 +++++++++++++++++ .../api/scala/batch/TableSourceITCase.scala | 8 +- .../api/scala/stream/TableSinkITCase.scala | 67 +++++++++++++++ 12 files changed, 513 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/13bce315/docs/apis/table.md ---------------------------------------------------------------------- diff --git a/docs/apis/table.md b/docs/apis/table.md index 14f05aa..3defdbb 100644 --- a/docs/apis/table.md +++ b/docs/apis/table.md @@ -685,6 +685,53 @@ SQL queries can be executed on DataStream Tables by adding the `STREAM` SQL keyw {% top %} +Emit a Table to external sinks +---- + +A `Table` can be emitted to a `TableSink`, which is a generic interface to support a wide variaty of file formats (e.g., CSV, Apache Parquet, Apache Avro), storage systems (e.g., JDBC, Apache HBase, Apache Cassandra, Elasticsearch), or messaging systems (e.g., Apache Kafka, RabbitMQ), and others. A batch `Table` can only be emitted by a `BatchTableSink`, a streaming table requires a `StreamTableSink` (a `TableSink` can implement both interfaces). + +Currently, Flink only provides a `CsvTableSink` that writes a batch or streaming `Table` to CSV-formatted files. A custom `TableSource` can be defined by implementing the `BatchTableSink` and/or `StreamTableSink` interface. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); +BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); + +// compute the result Table using Table API operators and/or SQL queries +Table result = ... + +// create a TableSink +TableSink sink = new CsvTableSink("/path/to/file", fieldDelim = "|"); +// add a TableSink to emit the result Table +result.toSink(sink); + +// execute the program +env.execute(); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val env = ExecutionEnvironment.getExecutionEnvironment +val tableEnv = TableEnvironment.getTableEnvironment(env) + +// compute the result Table using Table API operators and/or SQL queries +val result: Table = ... + +// create a TableSink +val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|") +// add a TableSink to emit the result Table +result.toSink(sink) + +// execute the program +env.execute() +{% endhighlight %} +</div> +</div> + +{% top %} + Runtime Configuration ---- The Table API provides a configuration (the so-called `TableConfig`) to modify runtime behavior. It can be accessed either through `TableEnvironment` or passed to the `toDataSet`/`toDataStream` method when using Scala implicit conversion. http://git-wip-us.apache.org/repos/asf/flink/blob/13bce315/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala index b38c7f0..39e3105 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala @@ -34,6 +34,7 @@ import org.apache.flink.api.table.plan.PlanGenException import org.apache.flink.api.table.plan.nodes.dataset.{DataSetRel, DataSetConvention} import org.apache.flink.api.table.plan.rules.FlinkRuleSets import org.apache.flink.api.table.plan.schema.{TableSourceTable, DataSetTable} +import org.apache.flink.api.table.sinks.{BatchTableSink, TableSink} import org.apache.flink.api.table.sources.BatchTableSource /** @@ -136,6 +137,30 @@ abstract class BatchTableEnvironment( } /** + * Emits a [[Table]] to a [[TableSink]]. + * + * Internally, the [[Table]] is translated into a [[DataSet]] and handed over to the + * [[TableSink]] to emit it. + * + * @param table The [[Table]] to emit. + * @param sink The [[TableSink]] to emit the [[Table]] to. + * @tparam T The expected type of the [[DataSet]] which represents the [[Table]]. + */ + override private[flink] def emitToSink[T](table: Table, sink: TableSink[T]): Unit = { + + sink match { + case batchSink: BatchTableSink[T] => + val outputType = sink.getOutputType + // translate the Table into a DataSet and provide the type that the TableSink expects. + val result: DataSet[T] = translate(table)(outputType) + // Give the DataSet to the TableSink to emit it. + batchSink.emitDataSet(result) + case _ => + throw new TableException("BatchTableSink required to emit batch Table") + } + } + + /** * Returns the AST of the specified Table API and SQL queries and the execution plan to compute * the result of the given [[Table]]. * http://git-wip-us.apache.org/repos/asf/flink/blob/13bce315/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala index 81f0d67..918a65f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala @@ -30,6 +30,7 @@ import org.apache.flink.api.table.plan.PlanGenException import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamRel, DataStreamConvention} import org.apache.flink.api.table.plan.rules.FlinkRuleSets import org.apache.flink.api.table.plan.schema.{TableSourceTable, TransStreamTable, DataStreamTable} +import org.apache.flink.api.table.sinks.{StreamTableSink, TableSink} import org.apache.flink.api.table.sources.StreamTableSource import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment @@ -134,6 +135,30 @@ abstract class StreamTableEnvironment( } /** + * Emits a [[Table]] to a [[TableSink]]. + * + * Internally, the [[Table]] is translated into a [[DataStream]] and handed over to the + * [[TableSink]] to emit it. + * + * @param table The [[Table]] to emit. + * @param sink The [[TableSink]] to emit the [[Table]] to. + * @tparam T The expected type of the [[DataStream]] which represents the [[Table]]. + */ + override private[flink] def emitToSink[T](table: Table, sink: TableSink[T]): Unit = { + + sink match { + case streamSink: StreamTableSink[T] => + val outputType = sink.getOutputType + // translate the Table into a DataStream and provide the type that the TableSink expects. + val result: DataStream[T] = translate(table)(outputType) + // Give the DataSet to the TableSink to emit it. + streamSink.emitDataStream(result) + case _ => + throw new TableException("StreamTableSink required to emit streaming Table") + } + } + + /** * Registers a [[DataStream]] as a table under a given name in the [[TableEnvironment]]'s * catalog. * http://git-wip-us.apache.org/repos/asf/flink/blob/13bce315/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala index 633b54f..6ccde47 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala @@ -38,6 +38,8 @@ import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo import org.apache.flink.api.table.expressions.{Naming, UnresolvedFieldReference, Expression} import org.apache.flink.api.table.plan.cost.DataSetCostFactory import org.apache.flink.api.table.plan.schema.{TransStreamTable, RelTable} +import org.apache.flink.api.table.sinks.TableSink +import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaStreamExecEnv} import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment => ScalaStreamExecEnv} @@ -137,6 +139,15 @@ abstract class TableEnvironment(val config: TableConfig) { def sql(query: String): Table /** + * Emits a [[Table]] to a [[TableSink]]. + * + * @param table The [[Table]] to emit. + * @param sink The [[TableSink]] to emit the [[Table]] to. + * @tparam T The data type that the [[TableSink]] expects. + */ + private[flink] def emitToSink[T](table: Table, sink: TableSink[T]): Unit + + /** * Registers a Calcite [[AbstractTable]] in the TableEnvironment's catalog. * * @param name The name under which the table is registered. http://git-wip-us.apache.org/repos/asf/flink/blob/13bce315/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/BatchTableSink.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/BatchTableSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/BatchTableSink.scala new file mode 100644 index 0000000..27dbe8e --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/BatchTableSink.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.sinks + +import org.apache.flink.api.java.DataSet + +/** Defines an external [[TableSink]] to emit a batch [[org.apache.flink.api.table.Table]]. + * + * @tparam T Type of [[DataSet]] that this [[TableSink]] expects and supports. + */ +trait BatchTableSink[T] extends TableSink[T] { + + /** Emits the DataSet. */ + def emitDataSet(dataSet: DataSet[T]): Unit +} http://git-wip-us.apache.org/repos/asf/flink/blob/13bce315/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/CsvTableSink.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/CsvTableSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/CsvTableSink.scala new file mode 100644 index 0000000..ed05caf --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/CsvTableSink.scala @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.sinks + +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.table.Row +import org.apache.flink.api.table.typeutils.RowTypeInfo +import org.apache.flink.streaming.api.datastream.DataStream + +/** + * A simple [[TableSink]] to emit data as CSV files. + * + * @param path The output path to write the Table to. + * @param fieldDelim The field delimiter, ',' by default. + */ +class CsvTableSink( + path: String, + fieldDelim: String = ",") + extends BatchTableSink[Row] with StreamTableSink[Row] { + + override def emitDataSet(dataSet: DataSet[Row]): Unit = { + dataSet + .map(new CsvFormatter(fieldDelim)) + .writeAsText(path) + } + + override def emitDataStream(dataStream: DataStream[Row]): Unit = { + dataStream + .map(new CsvFormatter(fieldDelim)) + .writeAsText(path) + } + + override protected def copy: TableSink[Row] = { + new CsvTableSink(path, fieldDelim) + } + + override def getOutputType: TypeInformation[Row] = { + new RowTypeInfo(getFieldTypes) + } +} + +/** + * Formats a [[Row]] into a [[String]] with fields separated by the field delimiter. + * + * @param fieldDelim The field delimiter. + */ +class CsvFormatter(fieldDelim: String) extends MapFunction[Row, String] { + override def map(row: Row): String = { + + val builder = new StringBuilder + + // write first value + val v = row.productElement(0) + if (v != null) { + builder.append(v.toString) + } + + // write following values + for (i <- 1 until row.productArity) { + builder.append(fieldDelim) + val v = row.productElement(i) + if (v != null) { + builder.append(v.toString) + } + } + builder.mkString + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/13bce315/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/StreamTableSink.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/StreamTableSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/StreamTableSink.scala new file mode 100644 index 0000000..61ef3b2 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/StreamTableSink.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.sinks + +import org.apache.flink.streaming.api.datastream.DataStream + +/** Defines an external [[TableSink]] to emit a batch [[org.apache.flink.api.table.Table]]. + * + * @tparam T Type of [[DataStream]] that this [[TableSink]] expects and supports. + */ +trait StreamTableSink[T] extends TableSink[T] { + + /** Emits the DataStream. */ + def emitDataStream(dataStream: DataStream[T]): Unit +} http://git-wip-us.apache.org/repos/asf/flink/blob/13bce315/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSink.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSink.scala new file mode 100644 index 0000000..12e57de --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSink.scala @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.sinks + +import org.apache.flink.api.common.typeinfo.TypeInformation + +/** A [[TableSink]] specifies how to emit a [[org.apache.flink.api.table.Table]] to an external + * system or location. + * + * The interface is generic such that it can support different storage locations and formats. + * + * @tparam T The return type of the [[TableSink]]. + */ +trait TableSink[T] { + + private var fieldNames: Option[Array[String]] = None + private var fieldTypes: Option[Array[TypeInformation[_]]] = None + + /** + * Return the type expected by this [[TableSink]]. + * + * This type should depend on the types returned by [[getFieldNames]]. + * + * @return The type expected by this [[TableSink]]. + */ + def getOutputType: TypeInformation[T] + + /** Return a deep copy of the [[TableSink]]. */ + protected def copy: TableSink[T] + + /** + * Return the field names of the [[org.apache.flink.api.table.Table]] to emit. */ + protected final def getFieldNames: Array[String] = { + fieldNames match { + case Some(n) => n + case None => throw new IllegalStateException( + "TableSink must be configured to retrieve field names.") + } + } + + /** Return the field types of the [[org.apache.flink.api.table.Table]] to emit. */ + protected final def getFieldTypes: Array[TypeInformation[_]] = { + fieldTypes match { + case Some(t) => t + case None => throw new IllegalStateException( + "TableSink must be configured to retrieve field types.") + } + } + + /** + * Return a copy of this [[TableSink]] configured with the field names and types of the + * [[org.apache.flink.api.table.Table]] to emit. + * + * @param fieldNames The field names of the table to emit. + * @param fieldTypes The field types of the table to emit. + * @return A copy of this [[TableSink]] configured with the field names and types of the + * [[org.apache.flink.api.table.Table]] to emit. + */ + private[flink] final def configure( + fieldNames: Array[String], + fieldTypes: Array[TypeInformation[_]]): TableSink[T] = { + + val configuredSink = this.copy + configuredSink.fieldNames = Some(fieldNames) + configuredSink.fieldTypes = Some(fieldTypes) + + configuredSink + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/13bce315/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala index ed166bc..5356a9d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala @@ -25,9 +25,12 @@ import org.apache.calcite.rex.{RexCall, RexInputRef, RexLiteral, RexNode} import org.apache.calcite.sql.SqlKind import org.apache.calcite.tools.RelBuilder.{AggCall, GroupKey} import org.apache.calcite.util.NlsString +import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.table.plan.PlanGenException import org.apache.flink.api.table.plan.RexNodeTranslator.extractAggCalls import org.apache.flink.api.table.expressions._ +import org.apache.flink.api.table.sinks.TableSink +import org.apache.flink.api.table.typeutils.TypeConverter import scala.collection.mutable import scala.collection.JavaConverters._ @@ -444,6 +447,31 @@ class Table( orderBy(parsedFields: _*) } + /** + * Emits the [[Table]] to a [[TableSink]]. A [[TableSink]] defines an external storage location. + * + * A batch [[Table]] can only be emitted by a + * [[org.apache.flink.api.table.sinks.BatchTableSink]], a streaming [[Table]] requires a + * [[org.apache.flink.api.table.sinks.StreamTableSink]]. + * + * @param sink The [[TableSink]] to which the [[Table]] is emitted. + * @tparam T The data type that the [[TableSink]] expects. + */ + def toSink[T](sink: TableSink[T]): Unit = { + + // get schema information of table + val rowType = relNode.getRowType + val fieldNames: Array[String] = rowType.getFieldNames.asScala.toArray + val fieldTypes: Array[TypeInformation[_]] = rowType.getFieldList.asScala + .map(f => TypeConverter.sqlTypeToTypeInfo(f.getType.getSqlTypeName)).toArray + + // configure the table sink + val configuredSink = sink.configure(fieldNames, fieldTypes) + + // emit the table to the configured table sink + tableEnv.emitToSink(this, configuredSink) + } + private def createRenamingProject(exprs: Seq[RexNode]): LogicalProject = { val names = exprs.map{ e => http://git-wip-us.apache.org/repos/asf/flink/blob/13bce315/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSinkITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSinkITCase.scala new file mode 100644 index 0000000..39684ff --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSinkITCase.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.batch + +import java.io.File + +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.scala.ExecutionEnvironment +import org.apache.flink.api.scala.util.CollectionDataSets +import org.apache.flink.api.table.typeutils.RowTypeInfo +import org.apache.flink.api.table.{Row, TableEnvironment} +import org.apache.flink.api.table.sinks.{CsvTableSink, TableSink, BatchTableSink} +import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase} +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + + +@RunWith(classOf[Parameterized]) +class TableSinkITCase( + mode: TestExecutionMode) + extends MultipleProgramsTestBase(mode) { + + @Test + def testBatchTableSink(): Unit = { + + val tmpFile = File.createTempFile("flink-table-sink-test", ".tmp") + tmpFile.deleteOnExit() + val path = "file:///" + tmpFile.getAbsolutePath + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + env.setParallelism(4) + + val input = CollectionDataSets.get3TupleDataSet(env) + .map(x => x).setParallelism(4) // increase DOP to 4 + + val results = input.toTable(tEnv, 'a, 'b, 'c) + .where('a < 5 || 'a > 17) + .select('c, 'b) + .toSink(new CsvTableSink(path, fieldDelim = "|")) + + env.execute() + + val expected = Seq( + "Hi|1", "Hello|2", "Hello world|2", "Hello world, how are you?|3", + "Comment#12|6", "Comment#13|6", "Comment#14|6", "Comment#15|6").mkString("\n") + + TestBaseUtils.compareResultsByLinesInMemory(expected, path) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/13bce315/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSourceITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSourceITCase.scala index def6fb6..6fd0d13 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSourceITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSourceITCase.scala @@ -31,7 +31,7 @@ import org.apache.flink.api.table.sources.{BatchTableSource, CsvTableSource} import org.apache.flink.api.table.typeutils.RowTypeInfo import org.apache.flink.api.table.{Row, TableEnvironment} import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode -import org.apache.flink.test.util.TestBaseUtils +import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils} import org.junit.Test import org.junit.runner.RunWith import org.junit.runners.Parameterized @@ -39,10 +39,8 @@ import org.junit.runners.Parameterized import scala.collection.JavaConverters._ @RunWith(classOf[Parameterized]) -class TableSourceITCase( - mode: TestExecutionMode, - configMode: TableConfigMode) - extends TableProgramsTestBase(mode, configMode) { +class TableSourceITCase(mode: TestExecutionMode) + extends MultipleProgramsTestBase(mode) { @Test def testBatchTableSourceTableAPI(): Unit = { http://git-wip-us.apache.org/repos/asf/flink/blob/13bce315/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSinkITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSinkITCase.scala new file mode 100644 index 0000000..66cb9bf --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSinkITCase.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.stream + +import java.io.File + +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala.stream.utils.StreamTestData +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.table.sinks.{CsvTableSink, StreamTableSink, TableSink} +import org.apache.flink.api.table.typeutils.RowTypeInfo +import org.apache.flink.api.table.{Row, TableEnvironment} +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase +import org.apache.flink.test.util.TestBaseUtils +import org.junit.Test + +class TableSinkITCase extends StreamingMultipleProgramsTestBase { + + @Test + def testStreamTableSink(): Unit = { + + val tmpFile = File.createTempFile("flink-table-sink-test", ".tmp") + tmpFile.deleteOnExit() + val path = "file:///" + tmpFile.getAbsolutePath + + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + env.setParallelism(4) + + val input = StreamTestData.get3TupleDataStream(env) + .map(x => x).setParallelism(4) // increase DOP to 4 + + val results = input.toTable(tEnv, 'a, 'b, 'c) + .where('a < 5 || 'a > 17) + .select('c, 'b) + .toSink(new CsvTableSink(path)) + + env.execute() + + val expected = Seq( + "Hi,1", "Hello,2", "Hello world,2", "Hello world, how are you?,3", + "Comment#12,6", "Comment#13,6", "Comment#14,6", "Comment#15,6").mkString("\n") + + TestBaseUtils.compareResultsByLinesInMemory(expected, path) + } + +}