Repository: flink Updated Branches: refs/heads/master 83061ad0f -> db31ca3f8
[FLINK-4562] [table] Move table examples into a dedicated module in flink-examples. This closes #2460. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/46a950df Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/46a950df Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/46a950df Branch: refs/heads/master Commit: 46a950df8b311e0dc64c709a02c56b0497e47c21 Parents: 83061ad Author: shijinkui <shijin...@huawei.com> Authored: Fri Feb 10 16:42:29 2017 +0800 Committer: Fabian Hueske <fhue...@apache.org> Committed: Fri Apr 21 18:21:42 2017 +0200 ---------------------------------------------------------------------- flink-dist/pom.xml | 1 + flink-examples/flink-examples-table/pom.xml | 108 +++++++++++ .../flink/table/examples/java/WordCountSQL.java | 87 +++++++++ .../table/examples/java/WordCountTable.java | 85 +++++++++ .../table/examples/scala/StreamSQLExample.scala | 76 ++++++++ .../examples/scala/StreamTableExample.scala | 71 ++++++++ .../table/examples/scala/TPCHQuery3Table.scala | 180 +++++++++++++++++++ .../table/examples/scala/WordCountSQL.scala | 62 +++++++ .../table/examples/scala/WordCountTable.scala | 62 +++++++ flink-examples/pom.xml | 1 + .../flink/table/examples/java/WordCountSQL.java | 87 --------- .../table/examples/java/WordCountTable.java | 85 --------- .../table/examples/scala/StreamSQLExample.scala | 76 -------- .../examples/scala/StreamTableExample.scala | 72 -------- .../table/examples/scala/TPCHQuery3Table.scala | 180 ------------------- .../table/examples/scala/WordCountSQL.scala | 62 ------- .../table/examples/scala/WordCountTable.scala | 62 ------- .../scala/batch/table/AggregationsITCase.scala | 15 +- 18 files changed, 741 insertions(+), 631 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/46a950df/flink-dist/pom.xml ---------------------------------------------------------------------- diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml index e42aea6..76df09e 100644 --- a/flink-dist/pom.xml +++ b/flink-dist/pom.xml @@ -338,6 +338,7 @@ under the License. <excludes> <exclude>org.apache.flink:flink-examples-batch</exclude> <exclude>org.apache.flink:flink-examples-streaming</exclude> + <exclude>org.apache.flink:flink-examples-table</exclude> <exclude>org.apache.flink:flink-python</exclude> <exclude>org.slf4j:slf4j-log4j12</exclude> <exclude>log4j:log4j</exclude> http://git-wip-us.apache.org/repos/asf/flink/blob/46a950df/flink-examples/flink-examples-table/pom.xml ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-table/pom.xml b/flink-examples/flink-examples-table/pom.xml new file mode 100644 index 0000000..de050d7 --- /dev/null +++ b/flink-examples/flink-examples-table/pom.xml @@ -0,0 +1,108 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-examples_2.10</artifactId> + <version>1.3-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <name>flink-examples-table</name> + <artifactId>flink-examples-table_2.10</artifactId> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table_2.10</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>net.alchim31.maven</groupId> + <artifactId>scala-maven-plugin</artifactId> + <executions> + <execution> + <id>scala-compile-first</id> + <phase>process-resources</phase> + <goals> + <goal>add-source</goal> + <goal>compile</goal> + </goals> + </execution> + <execution> + <id>scala-test-compile</id> + <phase>process-test-resources</phase> + <goals> + <goal>testCompile</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <id>flink-table-examples_2.10</id> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <shadeTestJar>false</shadeTestJar> + <shadedArtifactAttached>false</shadedArtifactAttached> + <createDependencyReducedPom>false</createDependencyReducedPom> + <finalName>TableExamples</finalName> + <outputFile>flink-examples-table-with-dependencies.jar</outputFile> + <filters> + <filter> + <artifact>*:*</artifact> + <includes> + <include>org.codehaus.commons.compiler.properties</include> + <include>org/codehaus/janino/**</include> + <include>org/codehaus/commons/**</include> + <include>org/apache/calcite/**</include> + <include>org/apache/flink/table/**</include> + <include>org/apache/flink/shaded/calcite/com/google/common/**</include> + <include>org/apache/flink/shaded/calcite/org/eigenbase/util/property/**</include> + </includes> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/46a950df/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/WordCountSQL.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/WordCountSQL.java b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/WordCountSQL.java new file mode 100644 index 0000000..9e1b45e --- /dev/null +++ b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/WordCountSQL.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.examples.java; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.java.BatchTableEnvironment; + +/** + * Simple example that shows how the Batch SQL API is used in Java. + * + * This example shows how to: + * - Convert DataSets to Tables + * - Register a Table under a name + * - Run a SQL query on the registered Table + * + */ +public class WordCountSQL { + + // ************************************************************************* + // PROGRAM + // ************************************************************************* + + public static void main(String[] args) throws Exception { + + // set up execution environment + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); + + DataSet<WC> input = env.fromElements( + new WC("Hello", 1), + new WC("Ciao", 1), + new WC("Hello", 1)); + + // register the DataSet as table "WordCount" + tEnv.registerDataSet("WordCount", input, "word, frequency"); + + // run a SQL query on the Table and retrieve the result as a new Table + Table table = tEnv.sql( + "SELECT word, SUM(frequency) as frequency FROM WordCount GROUP BY word"); + + DataSet<WC> result = tEnv.toDataSet(table, WC.class); + + result.print(); + } + + // ************************************************************************* + // USER DATA TYPES + // ************************************************************************* + + public static class WC { + public String word; + public long frequency; + + // public constructor to make it a Flink POJO + public WC() { + } + + public WC(String word, long frequency) { + this.word = word; + this.frequency = frequency; + } + + @Override + public String toString() { + return "WC " + word + " " + frequency; + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/46a950df/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/WordCountTable.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/WordCountTable.java b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/WordCountTable.java new file mode 100644 index 0000000..1ee8c12 --- /dev/null +++ b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/WordCountTable.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.examples.java; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.java.BatchTableEnvironment; + +/** + * Simple example for demonstrating the use of the Table API for a Word Count in Java. + * + * This example shows how to: + * - Convert DataSets to Tables + * - Apply group, aggregate, select, and filter operations + * + */ +public class WordCountTable { + + // ************************************************************************* + // PROGRAM + // ************************************************************************* + + public static void main(String[] args) throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment(); + BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); + + DataSet<WC> input = env.fromElements( + new WC("Hello", 1), + new WC("Ciao", 1), + new WC("Hello", 1)); + + Table table = tEnv.fromDataSet(input); + + Table filtered = table + .groupBy("word") + .select("word, frequency.sum as frequency") + .filter("frequency = 2"); + + DataSet<WC> result = tEnv.toDataSet(filtered, WC.class); + + result.print(); + } + + // ************************************************************************* + // USER DATA TYPES + // ************************************************************************* + + public static class WC { + public String word; + public long frequency; + + // public constructor to make it a Flink POJO + public WC() { + + } + + public WC(String word, long frequency) { + this.word = word; + this.frequency = frequency; + } + + @Override + public String toString() { + return "WC " + word + " " + frequency; + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/46a950df/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala new file mode 100644 index 0000000..2cdd8b8 --- /dev/null +++ b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.examples.scala + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.scala._ + +/** + * Simple example for demonstrating the use of SQL on a Stream Table. + * + * This example shows how to: + * - Convert DataStreams to Tables + * - Register a Table under a name + * - Run a StreamSQL query on the registered Table + * + */ +object StreamSQLExample { + + // ************************************************************************* + // PROGRAM + // ************************************************************************* + + def main(args: Array[String]): Unit = { + + // set up execution environment + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val orderA: DataStream[Order] = env.fromCollection(Seq( + Order(1L, "beer", 3), + Order(1L, "diaper", 4), + Order(3L, "rubber", 2))) + + val orderB: DataStream[Order] = env.fromCollection(Seq( + Order(2L, "pen", 3), + Order(2L, "rubber", 3), + Order(4L, "beer", 1))) + + // register the DataStreams under the name "OrderA" and "OrderB" + tEnv.registerDataStream("OrderA", orderA, 'user, 'product, 'amount) + tEnv.registerDataStream("OrderB", orderB, 'user, 'product, 'amount) + + // union the two tables + val result = tEnv.sql( + "SELECT * FROM OrderA WHERE amount > 2 UNION ALL " + + "SELECT * FROM OrderB WHERE amount < 2") + + result.toDataStream[Order].print() + + env.execute() + } + + // ************************************************************************* + // USER DATA TYPES + // ************************************************************************* + + case class Order(user: Long, product: String, amount: Int) + +} http://git-wip-us.apache.org/repos/asf/flink/blob/46a950df/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamTableExample.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamTableExample.scala b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamTableExample.scala new file mode 100644 index 0000000..5c5012b --- /dev/null +++ b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamTableExample.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.examples.scala + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.scala._ + +/** + * Simple example for demonstrating the use of Table API on a Stream Table. + * + * This example shows how to: + * - Convert DataStreams to Tables + * - Apply union, select, and filter operations + */ +object StreamTableExample { + + // ************************************************************************* + // PROGRAM + // ************************************************************************* + + def main(args: Array[String]): Unit = { + + // set up execution environment + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val orderA = env.fromCollection(Seq( + Order(1L, "beer", 3), + Order(1L, "diaper", 4), + Order(3L, "rubber", 2))).toTable(tEnv) + + val orderB = env.fromCollection(Seq( + Order(2L, "pen", 3), + Order(2L, "rubber", 3), + Order(4L, "beer", 1))).toTable(tEnv) + + // union the two tables + val result: DataStream[Order] = orderA.unionAll(orderB) + .select('user, 'product, 'amount) + .where('amount > 2) + .toDataStream[Order] + + result.print() + + env.execute() + } + + // ************************************************************************* + // USER DATA TYPES + // ************************************************************************* + + case class Order(user: Long, product: String, amount: Int) + +} http://git-wip-us.apache.org/repos/asf/flink/blob/46a950df/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/TPCHQuery3Table.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/TPCHQuery3Table.scala b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/TPCHQuery3Table.scala new file mode 100644 index 0000000..74afb06 --- /dev/null +++ b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/TPCHQuery3Table.scala @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.examples.scala + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.scala._ + +/** + * This program implements a modified version of the TPC-H query 3. The + * example demonstrates how to assign names to fields by extending the Tuple class. + * The original query can be found at + * [http://www.tpc.org/tpch/spec/tpch2.16.0.pdf](http://www.tpc.org/tpch/spec/tpch2.16.0.pdf) + * (page 29). + * + * This program implements the following SQL equivalent: + * + * {{{ + * SELECT + * l_orderkey, + * SUM(l_extendedprice*(1-l_discount)) AS revenue, + * o_orderdate, + * o_shippriority + * FROM customer, + * orders, + * lineitem + * WHERE + * c_mktsegment = '[SEGMENT]' + * AND c_custkey = o_custkey + * AND l_orderkey = o_orderkey + * AND o_orderdate < date '[DATE]' + * AND l_shipdate > date '[DATE]' + * GROUP BY + * l_orderkey, + * o_orderdate, + * o_shippriority + * ORDER BY + * revenue desc, + * o_orderdate; + * }}} + * + * Input files are plain text CSV files using the pipe character ('|') as field separator + * as generated by the TPC-H data generator which is available at + * [http://www.tpc.org/tpch/](a href="http://www.tpc.org/tpch/). + * + * Usage: + * {{{ + * TPCHQuery3Expression <lineitem-csv path> <customer-csv path> <orders-csv path> <result path> + * }}} + * + * This example shows how to: + * - Convert DataSets to Tables + * - Use Table API expressions + * + */ +object TPCHQuery3Table { + + // ************************************************************************* + // PROGRAM + // ************************************************************************* + + def main(args: Array[String]) { + if (!parseParameters(args)) { + return + } + + // set filter date + val date = "1995-03-12".toDate + + // get execution environment + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val lineitems = getLineitemDataSet(env) + .toTable(tEnv, 'id, 'extdPrice, 'discount, 'shipDate) + .filter('shipDate.toDate > date) + + val customers = getCustomerDataSet(env) + .toTable(tEnv, 'id, 'mktSegment) + .filter('mktSegment === "AUTOMOBILE") + + val orders = getOrdersDataSet(env) + .toTable(tEnv, 'orderId, 'custId, 'orderDate, 'shipPrio) + .filter('orderDate.toDate < date) + + val items = + orders.join(customers) + .where('custId === 'id) + .select('orderId, 'orderDate, 'shipPrio) + .join(lineitems) + .where('orderId === 'id) + .select( + 'orderId, + 'extdPrice * (1.0f.toExpr - 'discount) as 'revenue, + 'orderDate, + 'shipPrio) + + val result = items + .groupBy('orderId, 'orderDate, 'shipPrio) + .select('orderId, 'revenue.sum as 'revenue, 'orderDate, 'shipPrio) + .orderBy('revenue.desc, 'orderDate.asc) + + // emit result + result.writeAsCsv(outputPath, "\n", "|") + + // execute program + env.execute("Scala TPCH Query 3 (Table API Expression) Example") + } + + // ************************************************************************* + // USER DATA TYPES + // ************************************************************************* + + case class Lineitem(id: Long, extdPrice: Double, discount: Double, shipDate: String) + case class Customer(id: Long, mktSegment: String) + case class Order(orderId: Long, custId: Long, orderDate: String, shipPrio: Long) + + // ************************************************************************* + // UTIL METHODS + // ************************************************************************* + + private var lineitemPath: String = _ + private var customerPath: String = _ + private var ordersPath: String = _ + private var outputPath: String = _ + + private def parseParameters(args: Array[String]): Boolean = { + if (args.length == 4) { + lineitemPath = args(0) + customerPath = args(1) + ordersPath = args(2) + outputPath = args(3) + true + } else { + System.err.println("This program expects data from the TPC-H benchmark as input data.\n" + + " Due to legal restrictions, we can not ship generated data.\n" + + " You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" + + " Usage: TPCHQuery3 <lineitem-csv path> <customer-csv path> " + + "<orders-csv path> <result path>") + false + } + } + + private def getLineitemDataSet(env: ExecutionEnvironment): DataSet[Lineitem] = { + env.readCsvFile[Lineitem]( + lineitemPath, + fieldDelimiter = "|", + includedFields = Array(0, 5, 6, 10) ) + } + + private def getCustomerDataSet(env: ExecutionEnvironment): DataSet[Customer] = { + env.readCsvFile[Customer]( + customerPath, + fieldDelimiter = "|", + includedFields = Array(0, 6) ) + } + + private def getOrdersDataSet(env: ExecutionEnvironment): DataSet[Order] = { + env.readCsvFile[Order]( + ordersPath, + fieldDelimiter = "|", + includedFields = Array(0, 1, 4, 7) ) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/46a950df/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountSQL.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountSQL.scala b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountSQL.scala new file mode 100644 index 0000000..a8b8268 --- /dev/null +++ b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountSQL.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.examples.scala + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.scala._ + +/** + * Simple example that shows how the Batch SQL API is used in Scala. + * + * This example shows how to: + * - Convert DataSets to Tables + * - Register a Table under a name + * - Run a SQL query on the registered Table + * + */ +object WordCountSQL { + + // ************************************************************************* + // PROGRAM + // ************************************************************************* + + def main(args: Array[String]): Unit = { + + // set up execution environment + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1)) + + // register the DataSet as table "WordCount" + tEnv.registerDataSet("WordCount", input, 'word, 'frequency) + + // run a SQL query on the Table and retrieve the result as a new Table + val table = tEnv.sql("SELECT word, SUM(frequency) FROM WordCount GROUP BY word") + + table.toDataSet[WC].print() + } + + // ************************************************************************* + // USER DATA TYPES + // ************************************************************************* + + case class WC(word: String, frequency: Long) + +} http://git-wip-us.apache.org/repos/asf/flink/blob/46a950df/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountTable.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountTable.scala b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountTable.scala new file mode 100644 index 0000000..75ea8ce --- /dev/null +++ b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountTable.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.examples.scala + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.scala._ + +/** + * Simple example for demonstrating the use of the Table API for a Word Count in Scala. + * + * This example shows how to: + * - Convert DataSets to Tables + * - Apply group, aggregate, select, and filter operations + * + */ +object WordCountTable { + + // ************************************************************************* + // PROGRAM + // ************************************************************************* + + def main(args: Array[String]): Unit = { + + // set up execution environment + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1)) + val expr = input.toTable(tEnv) + val result = expr + .groupBy('word) + .select('word, 'frequency.sum as 'frequency) + .filter('frequency === 2) + .toDataSet[WC] + + result.print() + } + + // ************************************************************************* + // USER DATA TYPES + // ************************************************************************* + + case class WC(word: String, frequency: Long) + +} http://git-wip-us.apache.org/repos/asf/flink/blob/46a950df/flink-examples/pom.xml ---------------------------------------------------------------------- diff --git a/flink-examples/pom.xml b/flink-examples/pom.xml index d9c906a..a5939f5 100644 --- a/flink-examples/pom.xml +++ b/flink-examples/pom.xml @@ -68,5 +68,6 @@ under the License. <modules> <module>flink-examples-batch</module> <module>flink-examples-streaming</module> + <module>flink-examples-table</module> </modules> </project> http://git-wip-us.apache.org/repos/asf/flink/blob/46a950df/flink-libraries/flink-table/src/main/java/org/apache/flink/table/examples/java/WordCountSQL.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/examples/java/WordCountSQL.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/examples/java/WordCountSQL.java deleted file mode 100644 index e3b345c..0000000 --- a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/examples/java/WordCountSQL.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.examples.java; - -import org.apache.flink.table.api.Table; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.table.api.java.BatchTableEnvironment; -import org.apache.flink.table.api.TableEnvironment; - -/** - * Simple example that shows how the Batch SQL API is used in Java. - * - * This example shows how to: - * - Convert DataSets to Tables - * - Register a Table under a name - * - Run a SQL query on the registered Table - * - */ -public class WordCountSQL { - - // ************************************************************************* - // PROGRAM - // ************************************************************************* - - public static void main(String[] args) throws Exception { - - // set up execution environment - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); - - DataSet<WC> input = env.fromElements( - new WC("Hello", 1), - new WC("Ciao", 1), - new WC("Hello", 1)); - - // register the DataSet as table "WordCount" - tEnv.registerDataSet("WordCount", input, "word, frequency"); - - // run a SQL query on the Table and retrieve the result as a new Table - Table table = tEnv.sql( - "SELECT word, SUM(frequency) as frequency FROM WordCount GROUP BY word"); - - DataSet<WC> result = tEnv.toDataSet(table, WC.class); - - result.print(); - } - - // ************************************************************************* - // USER DATA TYPES - // ************************************************************************* - - public static class WC { - public String word; - public long frequency; - - // public constructor to make it a Flink POJO - public WC() { - } - - public WC(String word, long frequency) { - this.word = word; - this.frequency = frequency; - } - - @Override - public String toString() { - return "WC " + word + " " + frequency; - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/46a950df/flink-libraries/flink-table/src/main/java/org/apache/flink/table/examples/java/WordCountTable.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/examples/java/WordCountTable.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/examples/java/WordCountTable.java deleted file mode 100644 index bdb50f5..0000000 --- a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/examples/java/WordCountTable.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.examples.java; - -import org.apache.flink.table.api.Table; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.table.api.java.BatchTableEnvironment; -import org.apache.flink.table.api.TableEnvironment; - -/** - * Simple example for demonstrating the use of the Table API for a Word Count in Java. - * - * This example shows how to: - * - Convert DataSets to Tables - * - Apply group, aggregate, select, and filter operations - * - */ -public class WordCountTable { - - // ************************************************************************* - // PROGRAM - // ************************************************************************* - - public static void main(String[] args) throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment(); - BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); - - DataSet<WC> input = env.fromElements( - new WC("Hello", 1), - new WC("Ciao", 1), - new WC("Hello", 1)); - - Table table = tEnv.fromDataSet(input); - - Table filtered = table - .groupBy("word") - .select("word, frequency.sum as frequency") - .filter("frequency = 2"); - - DataSet<WC> result = tEnv.toDataSet(filtered, WC.class); - - result.print(); - } - - // ************************************************************************* - // USER DATA TYPES - // ************************************************************************* - - public static class WC { - public String word; - public long frequency; - - // public constructor to make it a Flink POJO - public WC() { - - } - - public WC(String word, long frequency) { - this.word = word; - this.frequency = frequency; - } - - @Override - public String toString() { - return "WC " + word + " " + frequency; - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/46a950df/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala deleted file mode 100644 index 6d16722..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.examples.scala - -import org.apache.flink.api.scala._ -import org.apache.flink.table.api.scala._ -import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} -import org.apache.flink.table.api.TableEnvironment - -/** - * Simple example for demonstrating the use of SQL on a Stream Table. - * - * This example shows how to: - * - Convert DataStreams to Tables - * - Register a Table under a name - * - Run a StreamSQL query on the registered Table - * - */ -object StreamSQLExample { - - // ************************************************************************* - // PROGRAM - // ************************************************************************* - - def main(args: Array[String]): Unit = { - - // set up execution environment - val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) - - val orderA: DataStream[Order] = env.fromCollection(Seq( - Order(1L, "beer", 3), - Order(1L, "diaper", 4), - Order(3L, "rubber", 2))) - - val orderB: DataStream[Order] = env.fromCollection(Seq( - Order(2L, "pen", 3), - Order(2L, "rubber", 3), - Order(4L, "beer", 1))) - - // register the DataStreams under the name "OrderA" and "OrderB" - tEnv.registerDataStream("OrderA", orderA, 'user, 'product, 'amount) - tEnv.registerDataStream("OrderB", orderB, 'user, 'product, 'amount) - - // union the two tables - val result = tEnv.sql( - "SELECT * FROM OrderA WHERE amount > 2 UNION ALL " + - "SELECT * FROM OrderB WHERE amount < 2") - - result.toDataStream[Order].print() - - env.execute() - } - - // ************************************************************************* - // USER DATA TYPES - // ************************************************************************* - - case class Order(user: Long, product: String, amount: Int) - -} http://git-wip-us.apache.org/repos/asf/flink/blob/46a950df/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/StreamTableExample.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/StreamTableExample.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/StreamTableExample.scala deleted file mode 100644 index 6c1467f..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/StreamTableExample.scala +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.examples.scala - -import org.apache.flink.api.scala._ -import org.apache.flink.table.api.scala._ -import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} -import org.apache.flink.table.api.TableEnvironment - -/** - * Simple example for demonstrating the use of Table API on a Stream Table. - * - * This example shows how to: - * - Convert DataStreams to Tables - * - Apply union, select, and filter operations - * - */ -object StreamTableExample { - - // ************************************************************************* - // PROGRAM - // ************************************************************************* - - def main(args: Array[String]): Unit = { - - // set up execution environment - val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) - - val orderA = env.fromCollection(Seq( - Order(1L, "beer", 3), - Order(1L, "diaper", 4), - Order(3L, "rubber", 2))).toTable(tEnv) - - val orderB = env.fromCollection(Seq( - Order(2L, "pen", 3), - Order(2L, "rubber", 3), - Order(4L, "beer", 1))).toTable(tEnv) - - // union the two tables - val result: DataStream[Order] = orderA.unionAll(orderB) - .select('user, 'product, 'amount) - .where('amount > 2) - .toDataStream[Order] - - result.print() - - env.execute() - } - - // ************************************************************************* - // USER DATA TYPES - // ************************************************************************* - - case class Order(user: Long, product: String, amount: Int) - -} http://git-wip-us.apache.org/repos/asf/flink/blob/46a950df/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/TPCHQuery3Table.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/TPCHQuery3Table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/TPCHQuery3Table.scala deleted file mode 100644 index 74afb06..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/TPCHQuery3Table.scala +++ /dev/null @@ -1,180 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.examples.scala - -import org.apache.flink.api.scala._ -import org.apache.flink.table.api.TableEnvironment -import org.apache.flink.table.api.scala._ - -/** - * This program implements a modified version of the TPC-H query 3. The - * example demonstrates how to assign names to fields by extending the Tuple class. - * The original query can be found at - * [http://www.tpc.org/tpch/spec/tpch2.16.0.pdf](http://www.tpc.org/tpch/spec/tpch2.16.0.pdf) - * (page 29). - * - * This program implements the following SQL equivalent: - * - * {{{ - * SELECT - * l_orderkey, - * SUM(l_extendedprice*(1-l_discount)) AS revenue, - * o_orderdate, - * o_shippriority - * FROM customer, - * orders, - * lineitem - * WHERE - * c_mktsegment = '[SEGMENT]' - * AND c_custkey = o_custkey - * AND l_orderkey = o_orderkey - * AND o_orderdate < date '[DATE]' - * AND l_shipdate > date '[DATE]' - * GROUP BY - * l_orderkey, - * o_orderdate, - * o_shippriority - * ORDER BY - * revenue desc, - * o_orderdate; - * }}} - * - * Input files are plain text CSV files using the pipe character ('|') as field separator - * as generated by the TPC-H data generator which is available at - * [http://www.tpc.org/tpch/](a href="http://www.tpc.org/tpch/). - * - * Usage: - * {{{ - * TPCHQuery3Expression <lineitem-csv path> <customer-csv path> <orders-csv path> <result path> - * }}} - * - * This example shows how to: - * - Convert DataSets to Tables - * - Use Table API expressions - * - */ -object TPCHQuery3Table { - - // ************************************************************************* - // PROGRAM - // ************************************************************************* - - def main(args: Array[String]) { - if (!parseParameters(args)) { - return - } - - // set filter date - val date = "1995-03-12".toDate - - // get execution environment - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) - - val lineitems = getLineitemDataSet(env) - .toTable(tEnv, 'id, 'extdPrice, 'discount, 'shipDate) - .filter('shipDate.toDate > date) - - val customers = getCustomerDataSet(env) - .toTable(tEnv, 'id, 'mktSegment) - .filter('mktSegment === "AUTOMOBILE") - - val orders = getOrdersDataSet(env) - .toTable(tEnv, 'orderId, 'custId, 'orderDate, 'shipPrio) - .filter('orderDate.toDate < date) - - val items = - orders.join(customers) - .where('custId === 'id) - .select('orderId, 'orderDate, 'shipPrio) - .join(lineitems) - .where('orderId === 'id) - .select( - 'orderId, - 'extdPrice * (1.0f.toExpr - 'discount) as 'revenue, - 'orderDate, - 'shipPrio) - - val result = items - .groupBy('orderId, 'orderDate, 'shipPrio) - .select('orderId, 'revenue.sum as 'revenue, 'orderDate, 'shipPrio) - .orderBy('revenue.desc, 'orderDate.asc) - - // emit result - result.writeAsCsv(outputPath, "\n", "|") - - // execute program - env.execute("Scala TPCH Query 3 (Table API Expression) Example") - } - - // ************************************************************************* - // USER DATA TYPES - // ************************************************************************* - - case class Lineitem(id: Long, extdPrice: Double, discount: Double, shipDate: String) - case class Customer(id: Long, mktSegment: String) - case class Order(orderId: Long, custId: Long, orderDate: String, shipPrio: Long) - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - - private var lineitemPath: String = _ - private var customerPath: String = _ - private var ordersPath: String = _ - private var outputPath: String = _ - - private def parseParameters(args: Array[String]): Boolean = { - if (args.length == 4) { - lineitemPath = args(0) - customerPath = args(1) - ordersPath = args(2) - outputPath = args(3) - true - } else { - System.err.println("This program expects data from the TPC-H benchmark as input data.\n" + - " Due to legal restrictions, we can not ship generated data.\n" + - " You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" + - " Usage: TPCHQuery3 <lineitem-csv path> <customer-csv path> " + - "<orders-csv path> <result path>") - false - } - } - - private def getLineitemDataSet(env: ExecutionEnvironment): DataSet[Lineitem] = { - env.readCsvFile[Lineitem]( - lineitemPath, - fieldDelimiter = "|", - includedFields = Array(0, 5, 6, 10) ) - } - - private def getCustomerDataSet(env: ExecutionEnvironment): DataSet[Customer] = { - env.readCsvFile[Customer]( - customerPath, - fieldDelimiter = "|", - includedFields = Array(0, 6) ) - } - - private def getOrdersDataSet(env: ExecutionEnvironment): DataSet[Order] = { - env.readCsvFile[Order]( - ordersPath, - fieldDelimiter = "|", - includedFields = Array(0, 1, 4, 7) ) - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/46a950df/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountSQL.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountSQL.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountSQL.scala deleted file mode 100644 index a8b8268..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountSQL.scala +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.examples.scala - -import org.apache.flink.api.scala._ -import org.apache.flink.table.api.TableEnvironment -import org.apache.flink.table.api.scala._ - -/** - * Simple example that shows how the Batch SQL API is used in Scala. - * - * This example shows how to: - * - Convert DataSets to Tables - * - Register a Table under a name - * - Run a SQL query on the registered Table - * - */ -object WordCountSQL { - - // ************************************************************************* - // PROGRAM - // ************************************************************************* - - def main(args: Array[String]): Unit = { - - // set up execution environment - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) - - val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1)) - - // register the DataSet as table "WordCount" - tEnv.registerDataSet("WordCount", input, 'word, 'frequency) - - // run a SQL query on the Table and retrieve the result as a new Table - val table = tEnv.sql("SELECT word, SUM(frequency) FROM WordCount GROUP BY word") - - table.toDataSet[WC].print() - } - - // ************************************************************************* - // USER DATA TYPES - // ************************************************************************* - - case class WC(word: String, frequency: Long) - -} http://git-wip-us.apache.org/repos/asf/flink/blob/46a950df/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountTable.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountTable.scala deleted file mode 100644 index 75ea8ce..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountTable.scala +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.examples.scala - -import org.apache.flink.api.scala._ -import org.apache.flink.table.api.TableEnvironment -import org.apache.flink.table.api.scala._ - -/** - * Simple example for demonstrating the use of the Table API for a Word Count in Scala. - * - * This example shows how to: - * - Convert DataSets to Tables - * - Apply group, aggregate, select, and filter operations - * - */ -object WordCountTable { - - // ************************************************************************* - // PROGRAM - // ************************************************************************* - - def main(args: Array[String]): Unit = { - - // set up execution environment - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) - - val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1)) - val expr = input.toTable(tEnv) - val result = expr - .groupBy('word) - .select('word, 'frequency.sum as 'frequency) - .filter('frequency === 2) - .toDataSet[WC] - - result.print() - } - - // ************************************************************************* - // USER DATA TYPES - // ************************************************************************* - - case class WC(word: String, frequency: Long) - -} http://git-wip-us.apache.org/repos/asf/flink/blob/46a950df/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsITCase.scala index 22b7f0f..5ac09b9 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsITCase.scala @@ -25,7 +25,6 @@ import org.apache.flink.table.api.scala._ import org.apache.flink.api.scala.util.CollectionDataSets import org.apache.flink.types.Row import org.apache.flink.table.api.TableEnvironment -import org.apache.flink.table.examples.scala.WordCountTable.{WC => MyWC} import org.apache.flink.test.util.TestBaseUtils import org.junit._ import org.junit.runner.RunWith @@ -156,17 +155,17 @@ class AggregationsITCase( val tEnv = TableEnvironment.getTableEnvironment(env, config) val input = env.fromElements( - MyWC("hello", 1), - MyWC("hello", 1), - MyWC("ciao", 1), - MyWC("hola", 1), - MyWC("hola", 1)) + WC("hello", 1), + WC("hello", 1), + WC("ciao", 1), + WC("hola", 1), + WC("hola", 1)) val expr = input.toTable(tEnv) val result = expr .groupBy('word) .select('word, 'frequency.sum as 'frequency) .filter('frequency === 2) - .toDataSet[MyWC] + .toDataSet[WC] val mappedResult = result.map(w => (w.word, w.frequency * 10)).collect() val expected = "(hello,20)\n" + "(hola,20)" @@ -339,4 +338,6 @@ class AggregationsITCase( val results = t.toDataSet[Row].collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } + + case class WC(word: String, frequency: Long) }