[FLINK-2207] Fix TableAPI conversion documenation and further renamings for consistency.
This closes #829 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/af0fee51 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/af0fee51 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/af0fee51 Branch: refs/heads/release-0.9 Commit: af0fee512bde4a5dc5c08a3cc17da788a06cd113 Parents: e513be7 Author: Fabian Hueske <fhue...@apache.org> Authored: Fri Jun 12 11:36:03 2015 +0200 Committer: Fabian Hueske <fhue...@apache.org> Committed: Fri Jun 12 14:27:03 2015 +0200 ---------------------------------------------------------------------- docs/libs/table.md | 8 ++++---- .../flink/api/scala/table/TableConversions.scala | 4 ++-- .../main/scala/org/apache/flink/api/table/Table.scala | 2 +- .../apache/flink/examples/scala/PageRankTable.scala | 2 +- .../flink/examples/scala/StreamingTableFilter.scala | 2 +- .../flink/api/scala/table/test/FilterITCase.scala | 6 +++--- .../flink/api/scala/table/test/JoinITCase.scala | 14 +++++++------- 7 files changed, 19 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/af0fee51/docs/libs/table.md ---------------------------------------------------------------------- diff --git a/docs/libs/table.md b/docs/libs/table.md index bcd2cb1..829c9cf 100644 --- a/docs/libs/table.md +++ b/docs/libs/table.md @@ -52,7 +52,7 @@ import org.apache.flink.api.scala.table._ case class WC(word: String, count: Int) val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1)) val expr = input.toTable -val result = expr.groupBy('word).select('word, 'count.sum as 'count).toSet[WC] +val result = expr.groupBy('word).select('word, 'count.sum as 'count).toDataSet[WC] {% endhighlight %} The expression DSL uses Scala symbols to refer to field names and we use code generation to @@ -69,7 +69,7 @@ case class MyResult(a: String, d: Int) val input1 = env.fromElements(...).toTable('a, 'b) val input2 = env.fromElements(...).toTable('c, 'd) -val joined = input1.join(input2).where("b = a && d > 42").select("a, d").toSet[MyResult] +val joined = input1.join(input2).where("b = a && d > 42").select("a, d").toDataSet[MyResult] {% endhighlight %} Notice, how a DataSet can be converted to a Table by using `as` and specifying new @@ -108,14 +108,14 @@ DataSet<WC> input = env.fromElements( new WC("Ciao", 1), new WC("Hello", 1)); -Table table = tableEnv.toTable(input); +Table table = tableEnv.fromDataSet(input); Table filtered = table .groupBy("word") .select("word.count as count, word") .filter("count = 2"); -DataSet<WC> result = tableEnv.toSet(filtered, WC.class); +DataSet<WC> result = tableEnv.toDataSet(filtered, WC.class); {% endhighlight %} When using Java, the embedded DSL for specifying expressions cannot be used. Only String expressions http://git-wip-us.apache.org/repos/asf/flink/blob/af0fee51/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala index b9c0a5e..4f2172e 100644 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala @@ -33,14 +33,14 @@ class TableConversions(table: Table) { /** * Converts the [[Table]] to a [[DataSet]]. */ - def toSet[T: TypeInformation]: DataSet[T] = { + def toDataSet[T: TypeInformation]: DataSet[T] = { new ScalaBatchTranslator().translate[T](table.operation) } /** * Converts the [[Table]] to a [[DataStream]]. */ - def toStream[T: TypeInformation]: DataStream[T] = { + def toDataStream[T: TypeInformation]: DataStream[T] = { new ScalaStreamingTranslator().translate[T](table.operation) } } http://git-wip-us.apache.org/repos/asf/flink/blob/af0fee51/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala index 83d5239..fdb125b 100644 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala @@ -39,7 +39,7 @@ import org.apache.flink.api.table.plan._ * val table = set.toTable('a, 'b) * ... * val table2 = ... - * val set = table2.toSet[MyType] + * val set = table2.toDataSet[MyType] * }}} */ case class Table(private[flink] val operation: PlanNode) { http://git-wip-us.apache.org/repos/asf/flink/blob/af0fee51/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankTable.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankTable.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankTable.scala index 7a26e0e..dda6265 100644 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankTable.scala +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankTable.scala @@ -101,7 +101,7 @@ object PageRankTable { val newRanks = currentRanks.toTable // distribute ranks to target pages .join(adjacencyLists).where('pageId === 'sourceId) - .select('rank, 'targetIds).toSet[RankOutput] + .select('rank, 'targetIds).toDataSet[RankOutput] .flatMap { (in, out: Collector[(Long, Double)]) => val targets = in.targetIds http://git-wip-us.apache.org/repos/asf/flink/blob/af0fee51/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala index 4aa5653..63dddc9 100644 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala @@ -42,7 +42,7 @@ object StreamingTableFilter { val cars = genCarStream().toTable .filter('carId === 0) .select('carId, 'speed, 'distance + 1000 as 'distance, 'time % 5 as 'time) - .toStream[CarEvent] + .toDataStream[CarEvent] cars.print() http://git-wip-us.apache.org/repos/asf/flink/blob/af0fee51/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala index bc51a7e..75cd728 100644 --- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala +++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala @@ -61,7 +61,7 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod val filterDs = ds.filter( Literal(false) ) - filterDs.toSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) + filterDs.toDataSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) env.execute() expected = "\n" } @@ -76,7 +76,7 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod val filterDs = ds.filter( Literal(true) ) - filterDs.toSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) + filterDs.toDataSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) env.execute() expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + @@ -109,7 +109,7 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod val filterDs = ds.filter( 'a % 2 === 0 ) - filterDs.toSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) + filterDs.toDataSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) env.execute() expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," + "Comment#2\n" + "10,4,Comment#4\n" + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," + http://git-wip-us.apache.org/repos/asf/flink/blob/af0fee51/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala index b3baa56..8c3d1ca 100644 --- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala +++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala @@ -57,7 +57,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) val joinDs = ds1.join(ds2).where('b === 'e).select('c, 'g) - joinDs.toSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) + joinDs.toDataSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) env.execute() expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n" } @@ -70,7 +70,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) val joinDs = ds1.join(ds2).where('b === 'e && 'b < 2).select('c, 'g) - joinDs.toSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) + joinDs.toDataSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) env.execute() expected = "Hi,Hallo\n" } @@ -83,7 +83,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) val joinDs = ds1.join(ds2).filter('a === 'd && 'b === 'h).select('c, 'g) - joinDs.toSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) + joinDs.toDataSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) env.execute() expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" + "Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n" @@ -97,7 +97,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) val joinDs = ds1.join(ds2).where('foo === 'e).select('c, 'g) - joinDs.toSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) + joinDs.toDataSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) env.execute() expected = "" } @@ -110,7 +110,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) val joinDs = ds1.join(ds2).where('a === 'g).select('c, 'g) - joinDs.toSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) + joinDs.toDataSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) env.execute() expected = "" } @@ -123,7 +123,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) val joinDs = ds1.join(ds2).where('a === 'd).select('c, 'g) - joinDs.toSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) + joinDs.toDataSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) env.execute() expected = "" } @@ -136,7 +136,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) val joinDs = ds1.join(ds2).where('a === 'd).select('g.count) - joinDs.toSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) + joinDs.toDataSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE) env.execute() expected = "6" }