Because of how closures work in Scala, there is no support for nested map/rdd-based operations. Specifically, if you have
Context a { Context b { } } Operations within context b, when distributed across nodes, will no longer have visibility of variables specific to context a because that context is not distributed alongside that operation! To get around this you need to serialize your operations. For example , run a map job. Take the output of that and run a second map job to filter. Another option is to run two separate map jobs and join their results. Keep in mind that another useful technique is to execute the groupByKey routine , particularly if you want to operate on a particular variable. On Oct 11, 2014 11:09 AM, "arthur.hk.c...@gmail.com" < arthur.hk.c...@gmail.com> wrote: > Hi, > > My Spark version is v1.1.0 and Hive is 0.12.0, I need to use more than 1 > subquery in my Spark SQL, below are my sample table structures and a SQL > that contains more than 1 subquery. > > Question 1: How to load a HIVE table into Scala/Spark? > Question 2: How to implement a SQL_WITH_MORE_THAN_ONE_SUBQUERY in > SCALA/SPARK? > Question 3: What is the DATEADD function in Scala/Spark? or how to > implement "DATEADD(MONTH, 3, '2013-07-01')” and "DATEADD(YEAR, 1, ' > 2014-01-01')” in Spark or Hive? > I can find HIVE (date_add(string startdate, int days)) but it is in days > not MONTH / YEAR. > > Thanks. > > Regards > Arthur > > === > My sample SQL with more than 1 subquery: > SELECT S_NAME, > COUNT(*) AS NUMWAIT > FROM SUPPLIER, > LINEITEM L1, > ORDERS > WHERE S_SUPPKEY = L1.L_SUPPKEY > AND O_ORDERKEY = L1.L_ORDERKEY > AND O_ORDERSTATUS = 'F' > AND L1.L_RECEIPTDATE > L1.L_COMMITDATE > AND EXISTS (SELECT * > FROM LINEITEM L2 > WHERE L2.L_ORDERKEY = L1.L_ORDERKEY > AND L2.L_SUPPKEY <> L1.L_SUPPKEY) > AND NOT EXISTS (SELECT * > FROM LINEITEM L3 > WHERE L3.L_ORDERKEY = L1.L_ORDERKEY > AND L3.L_SUPPKEY <> L1.L_SUPPKEY > AND L3.L_RECEIPTDATE > L3.L_COMMITDATE) > GROUP BY S_NAME > ORDER BY NUMWAIT DESC, S_NAME > limit 100; > > > === > Supplier Table: > CREATE TABLE IF NOT EXISTS SUPPLIER ( > S_SUPPKEY INTEGER PRIMARY KEY, > S_NAME CHAR(25), > S_ADDRESS VARCHAR(40), > S_NATIONKEY BIGINT NOT NULL, > S_PHONE CHAR(15), > S_ACCTBAL DECIMAL, > S_COMMENT VARCHAR(101) > ) > > === > Order Table: > CREATE TABLE IF NOT EXISTS ORDERS ( > O_ORDERKEY INTEGER PRIMARY KEY, > O_CUSTKEY BIGINT NOT NULL, > O_ORDERSTATUS CHAR(1), > O_TOTALPRICE DECIMAL, > O_ORDERDATE CHAR(10), > O_ORDERPRIORITY CHAR(15), > O_CLERK CHAR(15), > O_SHIPPRIORITY INTEGER, > O_COMMENT VARCHAR(79) > > === > LineItem Table: > CREATE TABLE IF NOT EXISTS LINEITEM ( > L_ORDERKEY BIGINT not null, > L_PARTKEY BIGINT, > L_SUPPKEY BIGINT, > L_LINENUMBER INTEGER not null, > L_QUANTITY DECIMAL, > L_EXTENDEDPRICE DECIMAL, > L_DISCOUNT DECIMAL, > L_TAX DECIMAL, > L_SHIPDATE CHAR(10), > L_COMMITDATE CHAR(10), > L_RECEIPTDATE CHAR(10), > L_RETURNFLAG CHAR(1), > L_LINESTATUS CHAR(1), > L_SHIPINSTRUCT CHAR(25), > L_SHIPMODE CHAR(10), > L_COMMENT VARCHAR(44), > CONSTRAINT pk PRIMARY KEY (L_ORDERKEY, L_LINENUMBER ) > ) > >