Repository: flink
Updated Branches:
  refs/heads/master 5eb0e38fb -> 185b5f6c6


[FLINK-4179] [table] Additional TPCHQuery3Table example improvements


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/185b5f6c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/185b5f6c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/185b5f6c

Branch: refs/heads/master
Commit: 185b5f6c6b91e4ae61ff77c7d22d1296153a320c
Parents: ec4c9be
Author: twalthr <twal...@apache.org>
Authored: Thu Jul 28 09:52:46 2016 +0200
Committer: twalthr <twal...@apache.org>
Committed: Thu Jul 28 09:56:01 2016 +0200

----------------------------------------------------------------------
 .../flink/examples/scala/TPCHQuery3Table.scala  | 126 ++++++++++---------
 1 file changed, 65 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/185b5f6c/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala
index 776f2fb..a761f4f 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala
@@ -17,57 +17,60 @@
  */
 package org.apache.flink.examples.scala
 
-import org.apache.flink.api.table.TableEnvironment
-import org.apache.flink.api.table.expressions.Literal
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.TableEnvironment
 
 /**
- * This program implements a modified version of the TPC-H query 3. The
- * example demonstrates how to assign names to fields by extending the Tuple 
class.
- * The original query can be found at
- * 
[http://www.tpc.org/tpch/spec/tpch2.16.0.pdf](http://www.tpc.org/tpch/spec/tpch2.16.0.pdf)
- * (page 29).
- *
- * This program implements the following SQL equivalent:
- *
- * {{{
- * SELECT 
- *      l_orderkey, 
- *      SUM(l_extendedprice*(1-l_discount)) AS revenue,
- *      o_orderdate, 
- *      o_shippriority 
- * FROM customer, 
- *      orders, 
- *      lineitem 
- * WHERE
- *      c_mktsegment = '[SEGMENT]' 
- *      AND c_custkey = o_custkey
- *      AND l_orderkey = o_orderkey
- *      AND o_orderdate < date '[DATE]'
- *      AND l_shipdate > date '[DATE]'
- * GROUP BY
- *      l_orderkey, 
- *      o_orderdate, 
- *      o_shippriority;
- * }}}
- *
- * Compared to the original TPC-H query this version does not sort the result 
by revenue
- * and orderdate.
- *
- * Input files are plain text CSV files using the pipe character ('|') as 
field separator 
- * as generated by the TPC-H data generator which is available at 
- * [http://www.tpc.org/tpch/](a href="http://www.tpc.org/tpch/).
- *
- * Usage: 
- * {{{
- * TPCHQuery3Expression <lineitem-csv path> <customer-csv path> <orders-csv 
path> <result path>
- * }}}
- *  
- * This example shows how to use:
- *  - Table API expressions
- *
- */
+  * This program implements a modified version of the TPC-H query 3. The
+  * example demonstrates how to assign names to fields by extending the Tuple 
class.
+  * The original query can be found at
+  * 
[http://www.tpc.org/tpch/spec/tpch2.16.0.pdf](http://www.tpc.org/tpch/spec/tpch2.16.0.pdf)
+  * (page 29).
+  *
+  * This program implements the following SQL equivalent:
+  *
+  * {{{
+  * SELECT
+  *      l_orderkey,
+  *      SUM(l_extendedprice*(1-l_discount)) AS revenue,
+  *      o_orderdate,
+  *      o_shippriority
+  * FROM customer,
+  *      orders,
+  *      lineitem
+  * WHERE
+  *      c_mktsegment = '[SEGMENT]'
+  *      AND c_custkey = o_custkey
+  *      AND l_orderkey = o_orderkey
+  *      AND o_orderdate < date '[DATE]'
+  *      AND l_shipdate > date '[DATE]'
+  * GROUP BY
+  *      l_orderkey,
+  *      o_orderdate,
+  *      o_shippriority
+  * ORDER BY
+  *      revenue desc,
+  *      o_orderdate;
+  * }}}
+  *
+  * Compared to the original TPC-H query this version does not sort the result 
by revenue
+  * and orderdate.
+  *
+  * Input files are plain text CSV files using the pipe character ('|') as 
field separator
+  * as generated by the TPC-H data generator which is available at
+  * [http://www.tpc.org/tpch/](a href="http://www.tpc.org/tpch/).
+  *
+  * Usage:
+  * {{{
+  * TPCHQuery3Expression <lineitem-csv path> <customer-csv path> <orders-csv 
path> <result path>
+  * }}}
+  *
+  * This example shows how to:
+  *  - Convert DataSets to Tables
+  *  - Use Table API expressions
+  *
+  */
 object TPCHQuery3Table {
 
   def main(args: Array[String]) {
@@ -76,23 +79,23 @@ object TPCHQuery3Table {
     }
 
     // set filter date
-    val date = java.sql.Date.valueOf("1995-03-12")
+    val date = "1995-03-12".toDate
 
     // get execution environment
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env)
 
     val lineitems = getLineitemDataSet(env)
-      .filter( l => java.sql.Date.valueOf(l.shipDate).after(date) 
).toTable(tEnv)
-      .as('id, 'extdPrice, 'discount, 'shipDate)
+      .toTable(tEnv, 'id, 'extdPrice, 'discount, 'shipDate)
+      .filter('shipDate.toDate > date)
 
-    val customers = getCustomerDataSet(env).toTable(tEnv)
-      .as('id, 'mktSegment)
-      .filter( 'mktSegment === "AUTOMOBILE" )
+    val customers = getCustomerDataSet(env)
+      .toTable(tEnv, 'id, 'mktSegment)
+      .filter('mktSegment === "AUTOMOBILE")
 
     val orders = getOrdersDataSet(env)
-      .filter( o => java.sql.Date.valueOf(o.orderDate).before(date) 
).toTable(tEnv)
-      .as('orderId, 'custId, 'orderDate, 'shipPrio)
+      .toTable(tEnv, 'orderId, 'custId, 'orderDate, 'shipPrio)
+      .filter('orderDate.toDate < date)
 
     val items =
       orders.join(customers)
@@ -102,19 +105,20 @@ object TPCHQuery3Table {
         .where('orderId === 'id)
         .select(
           'orderId,
-          'extdPrice * (Literal(1.0f) - 'discount) as 'revenue,
+          'extdPrice * (1.0f.toExpr - 'discount) as 'revenue,
           'orderDate,
           'shipPrio)
 
     val result = items
       .groupBy('orderId, 'orderDate, 'shipPrio)
-      .select('orderId, 'revenue.sum, 'orderDate, 'shipPrio)
+      .select('orderId, 'revenue.sum as 'revenue, 'orderDate, 'shipPrio)
+      .orderBy('revenue.desc, 'orderDate.asc)
 
     // emit result
     result.writeAsCsv(outputPath, "\n", "|")
 
     // execute program
-    env.execute("Scala TPCH Query 3 (Expression) Example")
+    env.execute("Scala TPCH Query 3 (Table API Expression) Example")
   }
   
   // *************************************************************************
@@ -145,12 +149,12 @@ object TPCHQuery3Table {
       System.err.println("This program expects data from the TPC-H benchmark 
as input data.\n" +
           " Due to legal restrictions, we can not ship generated data.\n" +
           " You can find the TPC-H data generator at 
http://www.tpc.org/tpch/.\n"; +
-          " Usage: TPCHQuery3 <lineitem-csv path> <customer-csv path>" + 
+          " Usage: TPCHQuery3 <lineitem-csv path> <customer-csv path> " +
                              "<orders-csv path> <result path>")
       false
     }
   }
-  
+
   private def getLineitemDataSet(env: ExecutionEnvironment): DataSet[Lineitem] 
= {
     env.readCsvFile[Lineitem](
         lineitemPath,
@@ -164,7 +168,7 @@ object TPCHQuery3Table {
         fieldDelimiter = "|",
         includedFields = Array(0, 6) )
   }
-  
+
   private def getOrdersDataSet(env: ExecutionEnvironment): DataSet[Order] = {
     env.readCsvFile[Order](
         ordersPath,

Reply via email to