Just to follow up, the queries worked against master and I got my whole flow rolling. Thanks for the suggestion! Now if only Spark 1.2 will come out with the next release of CDH5 :P
-Terry From: Terry Siu <terry....@smartfocus.com<mailto:terry....@smartfocus.com>> Date: Monday, October 20, 2014 at 12:22 PM To: Michael Armbrust <mich...@databricks.com<mailto:mich...@databricks.com>> Cc: "user@spark.apache.org<mailto:user@spark.apache.org>" <user@spark.apache.org<mailto:user@spark.apache.org>> Subject: Re: SparkSQL - TreeNodeException for unresolved attributes Hi Michael, Thanks again for the reply. Was hoping it was something I was doing wrong in 1.1.0, but I’ll try master. Thanks, -Terry From: Michael Armbrust <mich...@databricks.com<mailto:mich...@databricks.com>> Date: Monday, October 20, 2014 at 12:11 PM To: Terry Siu <terry....@smartfocus.com<mailto:terry....@smartfocus.com>> Cc: "user@spark.apache.org<mailto:user@spark.apache.org>" <user@spark.apache.org<mailto:user@spark.apache.org>> Subject: Re: SparkSQL - TreeNodeException for unresolved attributes Have you tried this on master? There were several problems with resolution of complex queries that were registered as tables in the 1.1.0 release. On Mon, Oct 20, 2014 at 10:33 AM, Terry Siu <terry....@smartfocus.com<mailto:terry....@smartfocus.com>> wrote: Hi all, I’m getting a TreeNodeException for unresolved attributes when I do a simple select from a schemaRDD generated by a join in Spark 1.1.0. A little background first. I am using a HiveContext (against Hive 0.12) to grab two tables, join them, and then perform multiple INSERT-SELECT with GROUP BY to write back out to a Hive rollup table that has two partitions. This task is an effort to simulate the unsupported GROUPING SETS functionality in SparkSQL. In my first attempt, I got really close using SchemaRDD.groupBy until I realized that SchemaRDD.insertTo API does not support partitioned tables yet. This prompted my second attempt to pass in SQL to the HiveContext.sql API instead. Here’s a rundown of the commands I executed on the spark-shell: val hc = new HiveContext(sc) hc.setConf("spark.sql.hive.convertMetastoreParquet", "true”) hc.setConf("spark.sql.parquet.compression.codec", "snappy”) // For implicit conversions to Expression val sqlContext = new SQLContext(sc) import sqlContext._ val segCusts = hc.hql(“select …”) val segTxns = hc.hql(“select …”) val sc = segCusts.as('sc) val st = segTxns.as(‘st) // Join the segCusts and segTxns tables val rup = sc.join(st, Inner, Some("sc.segcustomerid".attr==="st.customerid".attr)) rup.registerAsTable(“rupbrand”) If I do a printSchema on the rup, I get: root |-- segcustomerid: string (nullable = true) |-- sales: double (nullable = false) |-- tx_count: long (nullable = false) |-- storeid: string (nullable = true) |-- transdate: long (nullable = true) |-- transdate_ts: string (nullable = true) |-- transdate_dt: string (nullable = true) |-- unitprice: double (nullable = true) |-- translineitem: string (nullable = true) |-- offerid: string (nullable = true) |-- customerid: string (nullable = true) |-- customerkey: string (nullable = true) |-- sku: string (nullable = true) |-- quantity: double (nullable = true) |-- returnquantity: double (nullable = true) |-- channel: string (nullable = true) |-- unitcost: double (nullable = true) |-- transid: string (nullable = true) |-- productid: string (nullable = true) |-- id: string (nullable = true) |-- campaign_campaigncost: double (nullable = true) |-- campaign_begindate: long (nullable = true) |-- campaign_begindate_ts: string (nullable = true) |-- campaign_begindate_dt: string (nullable = true) |-- campaign_enddate: long (nullable = true) |-- campaign_enddate_ts: string (nullable = true) |-- campaign_enddate_dt: string (nullable = true) |-- campaign_campaigntitle: string (nullable = true) |-- campaign_campaignname: string (nullable = true) |-- campaign_id: string (nullable = true) |-- product_categoryid: string (nullable = true) |-- product_company: string (nullable = true) |-- product_brandname: string (nullable = true) |-- product_vendorid: string (nullable = true) |-- product_color: string (nullable = true) |-- product_brandid: string (nullable = true) |-- product_description: string (nullable = true) |-- product_size: string (nullable = true) |-- product_subcategoryid: string (nullable = true) |-- product_departmentid: string (nullable = true) |-- product_productname: string (nullable = true) |-- product_categoryname: string (nullable = true) |-- product_vendorname: string (nullable = true) |-- product_sku: string (nullable = true) |-- product_subcategoryname: string (nullable = true) |-- product_status: string (nullable = true) |-- product_departmentname: string (nullable = true) |-- product_style: string (nullable = true) |-- product_id: string (nullable = true) |-- customer_lastname: string (nullable = true) |-- customer_familystatus: string (nullable = true) |-- customer_customertype: string (nullable = true) |-- customer_city: string (nullable = true) |-- customer_country: string (nullable = true) |-- customer_state: string (nullable = true) |-- customer_region: string (nullable = true) |-- customer_customergroup: string (nullable = true) |-- customer_maritalstatus: string (nullable = true) |-- customer_agerange: string (nullable = true) |-- customer_zip: string (nullable = true) |-- customer_age: double (nullable = true) |-- customer_address2: string (nullable = true) |-- customer_incomerange: string (nullable = true) |-- customer_gender: string (nullable = true) |-- customer_customerkey: string (nullable = true) |-- customer_address1: string (nullable = true) |-- customer_email: string (nullable = true) |-- customer_education: string (nullable = true) |-- customer_birthdate: long (nullable = true) |-- customer_birthdate_ts: string (nullable = true) |-- customer_birthdate_dt: string (nullable = true) |-- customer_id: string (nullable = true) |-- customer_firstname: string (nullable = true) |-- transnum: long (nullable = true) |-- transmonth: string (nullable = true) Nothing but a flat schema with no duplicated column names. I then execute: hc.sql(“select transid from rupbrand”) and I get: scala> hc.sql("select transid from rupbrand") 14/10/20 10:01:44 INFO ParseDriver: Parsing command: select transid from rupbrand 14/10/20 10:01:44 INFO ParseDriver: Parse Completed res18: org.apache.spark.sql.SchemaRDD = SchemaRDD[121] at RDD at SchemaRDD.scala:103 == Query Plan == == Physical Plan == org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: 'transid, tree: Project ['transid] LowerCaseSchema Subquery rupbrand Join Inner, Some(('sc.segcustomerid = 'st.customerid)) Subquery sc Filter CAST((COUNT(DISTINCT 't.transid) > 0), BooleanType) Aggregate ['c.customerid], ['c.customerId AS segcustomerid#5,SUM('t.sales) AS sales#6,COUNT(DISTINCT 't.transid) AS tx_count#7] Filter 'c.gender IN (Male) Join Inner, Some(('c.customerid = 't.customerid)) Subquery t Aggregate [customerid#3259,transid#3266], ['d.customerId AS customerid#1,transid#3266 AS transid#2,SUM((quantity#3262 * … I’m wondering if the query for the column in my join table is somehow conflicting with the columns from the two tables on which the join table is constructed from as I see the plan, a breakdown of various pieces from the queries on my two source tables. Help? Thanks, -Terry