Re: How To Implement More Than One Subquery in Scala/Spark
Hi, Thank you so much! By the way, what is the DATEADD function in Scala/Spark? or how to implement DATEADD(MONTH, 3, '2013-07-01')” and DATEADD(YEAR, 1, '2014-01-01')” in Spark or Hive? Regards Arthur On 12 Oct, 2014, at 12:03 pm, Ilya Ganelin ilgan...@gmail.com wrote: Because of how closures work in Scala, there is no support for nested map/rdd-based operations. Specifically, if you have Context a { Context b { } } Operations within context b, when distributed across nodes, will no longer have visibility of variables specific to context a because that context is not distributed alongside that operation! To get around this you need to serialize your operations. For example , run a map job. Take the output of that and run a second map job to filter. Another option is to run two separate map jobs and join their results. Keep in mind that another useful technique is to execute the groupByKey routine , particularly if you want to operate on a particular variable. On Oct 11, 2014 11:09 AM, arthur.hk.c...@gmail.com arthur.hk.c...@gmail.com wrote: Hi, My Spark version is v1.1.0 and Hive is 0.12.0, I need to use more than 1 subquery in my Spark SQL, below are my sample table structures and a SQL that contains more than 1 subquery. Question 1: How to load a HIVE table into Scala/Spark? Question 2: How to implement a SQL_WITH_MORE_THAN_ONE_SUBQUERY in SCALA/SPARK? Question 3: What is the DATEADD function in Scala/Spark? or how to implement DATEADD(MONTH, 3, '2013-07-01')” and DATEADD(YEAR, 1, '2014-01-01')” in Spark or Hive? I can find HIVE (date_add(string startdate, int days)) but it is in days not MONTH / YEAR. Thanks. Regards Arthur === My sample SQL with more than 1 subquery: SELECT S_NAME, COUNT(*) AS NUMWAIT FROM SUPPLIER, LINEITEM L1, ORDERS WHERE S_SUPPKEY = L1.L_SUPPKEY AND O_ORDERKEY = L1.L_ORDERKEY AND O_ORDERSTATUS = 'F' AND L1.L_RECEIPTDATE L1.L_COMMITDATE AND EXISTS (SELECT * FROM LINEITEM L2 WHERE L2.L_ORDERKEY = L1.L_ORDERKEY AND L2.L_SUPPKEY L1.L_SUPPKEY) AND NOT EXISTS (SELECT * FROM LINEITEM L3 WHERE L3.L_ORDERKEY = L1.L_ORDERKEY AND L3.L_SUPPKEY L1.L_SUPPKEY AND L3.L_RECEIPTDATE L3.L_COMMITDATE) GROUP BY S_NAME ORDER BY NUMWAIT DESC, S_NAME limit 100; === Supplier Table: CREATE TABLE IF NOT EXISTS SUPPLIER ( S_SUPPKEY INTEGER PRIMARY KEY, S_NAMECHAR(25), S_ADDRESS VARCHAR(40), S_NATIONKEY BIGINT NOT NULL, S_PHONE CHAR(15), S_ACCTBAL DECIMAL, S_COMMENT VARCHAR(101) ) === Order Table: CREATE TABLE IF NOT EXISTS ORDERS ( O_ORDERKEYINTEGER PRIMARY KEY, O_CUSTKEY BIGINT NOT NULL, O_ORDERSTATUS CHAR(1), O_TOTALPRICE DECIMAL, O_ORDERDATE CHAR(10), O_ORDERPRIORITY CHAR(15), O_CLERK CHAR(15), O_SHIPPRIORITYINTEGER, O_COMMENT VARCHAR(79) === LineItem Table: CREATE TABLE IF NOT EXISTS LINEITEM ( L_ORDERKEY BIGINT not null, L_PARTKEY BIGINT, L_SUPPKEY BIGINT, L_LINENUMBERINTEGER not null, L_QUANTITY DECIMAL, L_EXTENDEDPRICE DECIMAL, L_DISCOUNT DECIMAL, L_TAX DECIMAL, L_SHIPDATE CHAR(10), L_COMMITDATECHAR(10), L_RECEIPTDATE CHAR(10), L_RETURNFLAGCHAR(1), L_LINESTATUSCHAR(1), L_SHIPINSTRUCT CHAR(25), L_SHIPMODE CHAR(10), L_COMMENT VARCHAR(44), CONSTRAINT pk PRIMARY KEY (L_ORDERKEY, L_LINENUMBER ) )
Re: How To Implement More Than One Subquery in Scala/Spark
Question 1: Please check http://spark.apache.org/docs/1.1.0/sql-programming-guide.html#hive-tables. Question 2: One workaround is to re-write it. You can use LEFT SEMI JOIN to implement the subquery with EXISTS and use LEFT OUTER JOIN + IS NULL to implement the subquery with NOT EXISTS. SELECT S_NAME, COUNT(*) AS NUMWAIT FROM LINEITEM L1 JOIN SUPPLIER ON (S_SUPPKEY = L1.L_SUPPKEY ) JOIN ORDERS ON (O_ORDERKEY = L1.L_ORDERKEY) LEFT SEMI JOIN LINEITEM L2 ON (L1.L_ORDERKEY = L2.L_ORDERKEY AND L1.L_SUPPKEY L2.L_SUPPKEY) LEFT OUTER JOIN LINEITEM L3 ON (L1.L_ORDERKEY = L3.L_ORDERKEY AND L3.L_SUPPKEY L1.L_SUPPKEY AND L3.L_RECEIPTDATE L3.L_COMMITDATE) WHERE O_ORDERSTATUS = 'F' AND L1.L_RECEIPTDATE L1.L_COMMITDATE AND L2.L_SUPPKEY L1.L_SUPPKEY AND L3.L_ORDERKEY IS NULL GROUP BY S_NAME ORDER BY NUMWAIT DESC, S_NAME limit 100; Question3: Seems you need to convert your predicates to use hive's date_add at the moment. Thanks, Yin On Mon, Oct 13, 2014 at 6:07 AM, arthur.hk.c...@gmail.com arthur.hk.c...@gmail.com wrote: Hi, Thank you so much! By the way, what is the DATEADD function in Scala/Spark? or how to implement DATEADD(MONTH, 3, '2013-07-01')” and DATEADD(YEAR, 1, ' 2014-01-01')” in Spark or Hive? Regards Arthur On 12 Oct, 2014, at 12:03 pm, Ilya Ganelin ilgan...@gmail.com wrote: Because of how closures work in Scala, there is no support for nested map/rdd-based operations. Specifically, if you have Context a { Context b { } } Operations within context b, when distributed across nodes, will no longer have visibility of variables specific to context a because that context is not distributed alongside that operation! To get around this you need to serialize your operations. For example , run a map job. Take the output of that and run a second map job to filter. Another option is to run two separate map jobs and join their results. Keep in mind that another useful technique is to execute the groupByKey routine , particularly if you want to operate on a particular variable. On Oct 11, 2014 11:09 AM, arthur.hk.c...@gmail.com arthur.hk.c...@gmail.com wrote: Hi, My Spark version is v1.1.0 and Hive is 0.12.0, I need to use more than 1 subquery in my Spark SQL, below are my sample table structures and a SQL that contains more than 1 subquery. Question 1: How to load a HIVE table into Scala/Spark? Question 2: How to implement a SQL_WITH_MORE_THAN_ONE_SUBQUERY in SCALA/SPARK? Question 3: What is the DATEADD function in Scala/Spark? or how to implement DATEADD(MONTH, 3, '2013-07-01')” and DATEADD(YEAR, 1, ' 2014-01-01')” in Spark or Hive? I can find HIVE (date_add(string startdate, int days)) but it is in days not MONTH / YEAR. Thanks. Regards Arthur === My sample SQL with more than 1 subquery: SELECT S_NAME, COUNT(*) AS NUMWAIT FROM SUPPLIER, LINEITEM L1, ORDERS WHERE S_SUPPKEY = L1.L_SUPPKEY AND O_ORDERKEY = L1.L_ORDERKEY AND O_ORDERSTATUS = 'F' AND L1.L_RECEIPTDATE L1.L_COMMITDATE AND EXISTS (SELECT * FROM LINEITEM L2 WHERE L2.L_ORDERKEY = L1.L_ORDERKEY AND L2.L_SUPPKEY L1.L_SUPPKEY) AND NOT EXISTS (SELECT * FROM LINEITEM L3 WHERE L3.L_ORDERKEY = L1.L_ORDERKEY AND L3.L_SUPPKEY L1.L_SUPPKEY AND L3.L_RECEIPTDATE L3.L_COMMITDATE) GROUP BY S_NAME ORDER BY NUMWAIT DESC, S_NAME limit 100; === Supplier Table: CREATE TABLE IF NOT EXISTS SUPPLIER ( S_SUPPKEY INTEGER PRIMARY KEY, S_NAME CHAR(25), S_ADDRESS VARCHAR(40), S_NATIONKEY BIGINT NOT NULL, S_PHONE CHAR(15), S_ACCTBAL DECIMAL, S_COMMENT VARCHAR(101) ) === Order Table: CREATE TABLE IF NOT EXISTS ORDERS ( O_ORDERKEY INTEGER PRIMARY KEY, O_CUSTKEY BIGINT NOT NULL, O_ORDERSTATUS CHAR(1), O_TOTALPRICEDECIMAL, O_ORDERDATE CHAR(10), O_ORDERPRIORITY CHAR(15), O_CLERK CHAR(15), O_SHIPPRIORITY INTEGER, O_COMMENT VARCHAR(79) === LineItem Table: CREATE TABLE IF NOT EXISTS LINEITEM ( L_ORDERKEY BIGINT not null, L_PARTKEY BIGINT, L_SUPPKEY BIGINT, L_LINENUMBERINTEGER not null, L_QUANTITY DECIMAL, L_EXTENDEDPRICE DECIMAL, L_DISCOUNT DECIMAL, L_TAX DECIMAL, L_SHIPDATE CHAR(10), L_COMMITDATECHAR(10), L_RECEIPTDATE CHAR(10), L_RETURNFLAGCHAR(1), L_LINESTATUSCHAR(1), L_SHIPINSTRUCT CHAR(25), L_SHIPMODE CHAR(10), L_COMMENT VARCHAR(44), CONSTRAINT pk PRIMARY KEY (L_ORDERKEY, L_LINENUMBER ) )
How To Implement More Than One Subquery in Scala/Spark
Hi, My Spark version is v1.1.0 and Hive is 0.12.0, I need to use more than 1 subquery in my Spark SQL, below are my sample table structures and a SQL that contains more than 1 subquery. Question 1: How to load a HIVE table into Scala/Spark? Question 2: How to implement a SQL_WITH_MORE_THAN_ONE_SUBQUERY in SCALA/SPARK? Question 3: What is the DATEADD function in Scala/Spark? or how to implement DATEADD(MONTH, 3, '2013-07-01')” and DATEADD(YEAR, 1, '2014-01-01')” in Spark or Hive? I can find HIVE (date_add(string startdate, int days)) but it is in days not MONTH / YEAR. Thanks. Regards Arthur === My sample SQL with more than 1 subquery: SELECT S_NAME, COUNT(*) AS NUMWAIT FROM SUPPLIER, LINEITEM L1, ORDERS WHERE S_SUPPKEY = L1.L_SUPPKEY AND O_ORDERKEY = L1.L_ORDERKEY AND O_ORDERSTATUS = 'F' AND L1.L_RECEIPTDATE L1.L_COMMITDATE AND EXISTS (SELECT * FROM LINEITEM L2 WHERE L2.L_ORDERKEY = L1.L_ORDERKEY AND L2.L_SUPPKEY L1.L_SUPPKEY) AND NOT EXISTS (SELECT * FROM LINEITEM L3 WHERE L3.L_ORDERKEY = L1.L_ORDERKEY AND L3.L_SUPPKEY L1.L_SUPPKEY AND L3.L_RECEIPTDATE L3.L_COMMITDATE) GROUP BY S_NAME ORDER BY NUMWAIT DESC, S_NAME limit 100; === Supplier Table: CREATE TABLE IF NOT EXISTS SUPPLIER ( S_SUPPKEY INTEGER PRIMARY KEY, S_NAME CHAR(25), S_ADDRESS VARCHAR(40), S_NATIONKEY BIGINT NOT NULL, S_PHONE CHAR(15), S_ACCTBAL DECIMAL, S_COMMENT VARCHAR(101) ) === Order Table: CREATE TABLE IF NOT EXISTS ORDERS ( O_ORDERKEY INTEGER PRIMARY KEY, O_CUSTKEY BIGINT NOT NULL, O_ORDERSTATUS CHAR(1), O_TOTALPRICEDECIMAL, O_ORDERDATE CHAR(10), O_ORDERPRIORITY CHAR(15), O_CLERK CHAR(15), O_SHIPPRIORITY INTEGER, O_COMMENT VARCHAR(79) === LineItem Table: CREATE TABLE IF NOT EXISTS LINEITEM ( L_ORDERKEY BIGINT not null, L_PARTKEY BIGINT, L_SUPPKEY BIGINT, L_LINENUMBERINTEGER not null, L_QUANTITY DECIMAL, L_EXTENDEDPRICE DECIMAL, L_DISCOUNT DECIMAL, L_TAX DECIMAL, L_SHIPDATE CHAR(10), L_COMMITDATECHAR(10), L_RECEIPTDATE CHAR(10), L_RETURNFLAGCHAR(1), L_LINESTATUSCHAR(1), L_SHIPINSTRUCT CHAR(25), L_SHIPMODE CHAR(10), L_COMMENT VARCHAR(44), CONSTRAINT pk PRIMARY KEY (L_ORDERKEY, L_LINENUMBER ) )
Re: How To Implement More Than One Subquery in Scala/Spark
Because of how closures work in Scala, there is no support for nested map/rdd-based operations. Specifically, if you have Context a { Context b { } } Operations within context b, when distributed across nodes, will no longer have visibility of variables specific to context a because that context is not distributed alongside that operation! To get around this you need to serialize your operations. For example , run a map job. Take the output of that and run a second map job to filter. Another option is to run two separate map jobs and join their results. Keep in mind that another useful technique is to execute the groupByKey routine , particularly if you want to operate on a particular variable. On Oct 11, 2014 11:09 AM, arthur.hk.c...@gmail.com arthur.hk.c...@gmail.com wrote: Hi, My Spark version is v1.1.0 and Hive is 0.12.0, I need to use more than 1 subquery in my Spark SQL, below are my sample table structures and a SQL that contains more than 1 subquery. Question 1: How to load a HIVE table into Scala/Spark? Question 2: How to implement a SQL_WITH_MORE_THAN_ONE_SUBQUERY in SCALA/SPARK? Question 3: What is the DATEADD function in Scala/Spark? or how to implement DATEADD(MONTH, 3, '2013-07-01')” and DATEADD(YEAR, 1, ' 2014-01-01')” in Spark or Hive? I can find HIVE (date_add(string startdate, int days)) but it is in days not MONTH / YEAR. Thanks. Regards Arthur === My sample SQL with more than 1 subquery: SELECT S_NAME, COUNT(*) AS NUMWAIT FROM SUPPLIER, LINEITEM L1, ORDERS WHERE S_SUPPKEY = L1.L_SUPPKEY AND O_ORDERKEY = L1.L_ORDERKEY AND O_ORDERSTATUS = 'F' AND L1.L_RECEIPTDATE L1.L_COMMITDATE AND EXISTS (SELECT * FROM LINEITEM L2 WHERE L2.L_ORDERKEY = L1.L_ORDERKEY AND L2.L_SUPPKEY L1.L_SUPPKEY) AND NOT EXISTS (SELECT * FROM LINEITEM L3 WHERE L3.L_ORDERKEY = L1.L_ORDERKEY AND L3.L_SUPPKEY L1.L_SUPPKEY AND L3.L_RECEIPTDATE L3.L_COMMITDATE) GROUP BY S_NAME ORDER BY NUMWAIT DESC, S_NAME limit 100; === Supplier Table: CREATE TABLE IF NOT EXISTS SUPPLIER ( S_SUPPKEY INTEGER PRIMARY KEY, S_NAME CHAR(25), S_ADDRESS VARCHAR(40), S_NATIONKEY BIGINT NOT NULL, S_PHONE CHAR(15), S_ACCTBAL DECIMAL, S_COMMENT VARCHAR(101) ) === Order Table: CREATE TABLE IF NOT EXISTS ORDERS ( O_ORDERKEY INTEGER PRIMARY KEY, O_CUSTKEY BIGINT NOT NULL, O_ORDERSTATUS CHAR(1), O_TOTALPRICEDECIMAL, O_ORDERDATE CHAR(10), O_ORDERPRIORITY CHAR(15), O_CLERK CHAR(15), O_SHIPPRIORITY INTEGER, O_COMMENT VARCHAR(79) === LineItem Table: CREATE TABLE IF NOT EXISTS LINEITEM ( L_ORDERKEY BIGINT not null, L_PARTKEY BIGINT, L_SUPPKEY BIGINT, L_LINENUMBERINTEGER not null, L_QUANTITY DECIMAL, L_EXTENDEDPRICE DECIMAL, L_DISCOUNT DECIMAL, L_TAX DECIMAL, L_SHIPDATE CHAR(10), L_COMMITDATECHAR(10), L_RECEIPTDATE CHAR(10), L_RETURNFLAGCHAR(1), L_LINESTATUSCHAR(1), L_SHIPINSTRUCT CHAR(25), L_SHIPMODE CHAR(10), L_COMMENT VARCHAR(44), CONSTRAINT pk PRIMARY KEY (L_ORDERKEY, L_LINENUMBER ) )