Repository: flink Updated Branches: refs/heads/master 5eb0e38fb -> 185b5f6c6
[FLINK-4179] [table] Additional TPCHQuery3Table example improvements Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/185b5f6c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/185b5f6c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/185b5f6c Branch: refs/heads/master Commit: 185b5f6c6b91e4ae61ff77c7d22d1296153a320c Parents: ec4c9be Author: twalthr <twal...@apache.org> Authored: Thu Jul 28 09:52:46 2016 +0200 Committer: twalthr <twal...@apache.org> Committed: Thu Jul 28 09:56:01 2016 +0200 ---------------------------------------------------------------------- .../flink/examples/scala/TPCHQuery3Table.scala | 126 ++++++++++--------- 1 file changed, 65 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/185b5f6c/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala index 776f2fb..a761f4f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala @@ -17,57 +17,60 @@ */ package org.apache.flink.examples.scala -import org.apache.flink.api.table.TableEnvironment -import org.apache.flink.api.table.expressions.Literal import org.apache.flink.api.scala._ import org.apache.flink.api.scala.table._ +import org.apache.flink.api.table.TableEnvironment /** - * This program implements a modified version of the TPC-H query 3. The - * example demonstrates how to assign names to fields by extending the Tuple class. - * The original query can be found at - * [http://www.tpc.org/tpch/spec/tpch2.16.0.pdf](http://www.tpc.org/tpch/spec/tpch2.16.0.pdf) - * (page 29). - * - * This program implements the following SQL equivalent: - * - * {{{ - * SELECT - * l_orderkey, - * SUM(l_extendedprice*(1-l_discount)) AS revenue, - * o_orderdate, - * o_shippriority - * FROM customer, - * orders, - * lineitem - * WHERE - * c_mktsegment = '[SEGMENT]' - * AND c_custkey = o_custkey - * AND l_orderkey = o_orderkey - * AND o_orderdate < date '[DATE]' - * AND l_shipdate > date '[DATE]' - * GROUP BY - * l_orderkey, - * o_orderdate, - * o_shippriority; - * }}} - * - * Compared to the original TPC-H query this version does not sort the result by revenue - * and orderdate. - * - * Input files are plain text CSV files using the pipe character ('|') as field separator - * as generated by the TPC-H data generator which is available at - * [http://www.tpc.org/tpch/](a href="http://www.tpc.org/tpch/). - * - * Usage: - * {{{ - * TPCHQuery3Expression <lineitem-csv path> <customer-csv path> <orders-csv path> <result path> - * }}} - * - * This example shows how to use: - * - Table API expressions - * - */ + * This program implements a modified version of the TPC-H query 3. The + * example demonstrates how to assign names to fields by extending the Tuple class. + * The original query can be found at + * [http://www.tpc.org/tpch/spec/tpch2.16.0.pdf](http://www.tpc.org/tpch/spec/tpch2.16.0.pdf) + * (page 29). + * + * This program implements the following SQL equivalent: + * + * {{{ + * SELECT + * l_orderkey, + * SUM(l_extendedprice*(1-l_discount)) AS revenue, + * o_orderdate, + * o_shippriority + * FROM customer, + * orders, + * lineitem + * WHERE + * c_mktsegment = '[SEGMENT]' + * AND c_custkey = o_custkey + * AND l_orderkey = o_orderkey + * AND o_orderdate < date '[DATE]' + * AND l_shipdate > date '[DATE]' + * GROUP BY + * l_orderkey, + * o_orderdate, + * o_shippriority + * ORDER BY + * revenue desc, + * o_orderdate; + * }}} + * + * Compared to the original TPC-H query this version does not sort the result by revenue + * and orderdate. + * + * Input files are plain text CSV files using the pipe character ('|') as field separator + * as generated by the TPC-H data generator which is available at + * [http://www.tpc.org/tpch/](a href="http://www.tpc.org/tpch/). + * + * Usage: + * {{{ + * TPCHQuery3Expression <lineitem-csv path> <customer-csv path> <orders-csv path> <result path> + * }}} + * + * This example shows how to: + * - Convert DataSets to Tables + * - Use Table API expressions + * + */ object TPCHQuery3Table { def main(args: Array[String]) { @@ -76,23 +79,23 @@ object TPCHQuery3Table { } // set filter date - val date = java.sql.Date.valueOf("1995-03-12") + val date = "1995-03-12".toDate // get execution environment val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) val lineitems = getLineitemDataSet(env) - .filter( l => java.sql.Date.valueOf(l.shipDate).after(date) ).toTable(tEnv) - .as('id, 'extdPrice, 'discount, 'shipDate) + .toTable(tEnv, 'id, 'extdPrice, 'discount, 'shipDate) + .filter('shipDate.toDate > date) - val customers = getCustomerDataSet(env).toTable(tEnv) - .as('id, 'mktSegment) - .filter( 'mktSegment === "AUTOMOBILE" ) + val customers = getCustomerDataSet(env) + .toTable(tEnv, 'id, 'mktSegment) + .filter('mktSegment === "AUTOMOBILE") val orders = getOrdersDataSet(env) - .filter( o => java.sql.Date.valueOf(o.orderDate).before(date) ).toTable(tEnv) - .as('orderId, 'custId, 'orderDate, 'shipPrio) + .toTable(tEnv, 'orderId, 'custId, 'orderDate, 'shipPrio) + .filter('orderDate.toDate < date) val items = orders.join(customers) @@ -102,19 +105,20 @@ object TPCHQuery3Table { .where('orderId === 'id) .select( 'orderId, - 'extdPrice * (Literal(1.0f) - 'discount) as 'revenue, + 'extdPrice * (1.0f.toExpr - 'discount) as 'revenue, 'orderDate, 'shipPrio) val result = items .groupBy('orderId, 'orderDate, 'shipPrio) - .select('orderId, 'revenue.sum, 'orderDate, 'shipPrio) + .select('orderId, 'revenue.sum as 'revenue, 'orderDate, 'shipPrio) + .orderBy('revenue.desc, 'orderDate.asc) // emit result result.writeAsCsv(outputPath, "\n", "|") // execute program - env.execute("Scala TPCH Query 3 (Expression) Example") + env.execute("Scala TPCH Query 3 (Table API Expression) Example") } // ************************************************************************* @@ -145,12 +149,12 @@ object TPCHQuery3Table { System.err.println("This program expects data from the TPC-H benchmark as input data.\n" + " Due to legal restrictions, we can not ship generated data.\n" + " You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" + - " Usage: TPCHQuery3 <lineitem-csv path> <customer-csv path>" + + " Usage: TPCHQuery3 <lineitem-csv path> <customer-csv path> " + "<orders-csv path> <result path>") false } } - + private def getLineitemDataSet(env: ExecutionEnvironment): DataSet[Lineitem] = { env.readCsvFile[Lineitem]( lineitemPath, @@ -164,7 +168,7 @@ object TPCHQuery3Table { fieldDelimiter = "|", includedFields = Array(0, 6) ) } - + private def getOrdersDataSet(env: ExecutionEnvironment): DataSet[Order] = { env.readCsvFile[Order]( ordersPath,