Just to follow up, the queries worked against master and I got my whole flow 
rolling. Thanks for the suggestion! Now if only Spark 1.2 will come out with 
the next release of CDH5  :P

-Terry

From: Terry Siu <terry....@smartfocus.com<mailto:terry....@smartfocus.com>>
Date: Monday, October 20, 2014 at 12:22 PM
To: Michael Armbrust <mich...@databricks.com<mailto:mich...@databricks.com>>
Cc: "user@spark.apache.org<mailto:user@spark.apache.org>" 
<user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: SparkSQL - TreeNodeException for unresolved attributes

Hi Michael,

Thanks again for the reply. Was hoping it was something I was doing wrong in 
1.1.0, but I’ll try master.

Thanks,
-Terry

From: Michael Armbrust <mich...@databricks.com<mailto:mich...@databricks.com>>
Date: Monday, October 20, 2014 at 12:11 PM
To: Terry Siu <terry....@smartfocus.com<mailto:terry....@smartfocus.com>>
Cc: "user@spark.apache.org<mailto:user@spark.apache.org>" 
<user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: SparkSQL - TreeNodeException for unresolved attributes

Have you tried this on master?  There were several problems with resolution of 
complex queries that were registered as tables in the 1.1.0 release.

On Mon, Oct 20, 2014 at 10:33 AM, Terry Siu 
<terry....@smartfocus.com<mailto:terry....@smartfocus.com>> wrote:
Hi all,

I’m getting a TreeNodeException for unresolved attributes when I do a simple 
select from a schemaRDD generated by a join in Spark 1.1.0. A little background 
first. I am using a HiveContext (against Hive 0.12) to grab two tables, join 
them, and then perform multiple INSERT-SELECT with GROUP BY to write back out 
to a Hive rollup table that has two partitions. This task is an effort to 
simulate the unsupported GROUPING SETS functionality in SparkSQL.

In my first attempt, I got really close using  SchemaRDD.groupBy until I 
realized that SchemaRDD.insertTo API does not support partitioned tables yet. 
This prompted my second attempt to pass in SQL to the HiveContext.sql API 
instead.

Here’s a rundown of the commands I executed on the spark-shell:


val hc = new HiveContext(sc)

hc.setConf("spark.sql.hive.convertMetastoreParquet", "true”)

hc.setConf("spark.sql.parquet.compression.codec", "snappy”)


// For implicit conversions to Expression

val sqlContext = new SQLContext(sc)

import sqlContext._


val segCusts = hc.hql(“select …”)

val segTxns = hc.hql(“select …”)


val sc = segCusts.as('sc)

val st = segTxns.as(‘st)


// Join the segCusts and segTxns tables

val rup = sc.join(st, Inner, 
Some("sc.segcustomerid".attr==="st.customerid".attr))

rup.registerAsTable(“rupbrand”)



If I do a printSchema on the rup, I get:

root

 |-- segcustomerid: string (nullable = true)

 |-- sales: double (nullable = false)

 |-- tx_count: long (nullable = false)

 |-- storeid: string (nullable = true)

 |-- transdate: long (nullable = true)

 |-- transdate_ts: string (nullable = true)

 |-- transdate_dt: string (nullable = true)

 |-- unitprice: double (nullable = true)

 |-- translineitem: string (nullable = true)

 |-- offerid: string (nullable = true)

 |-- customerid: string (nullable = true)

 |-- customerkey: string (nullable = true)

 |-- sku: string (nullable = true)

 |-- quantity: double (nullable = true)

 |-- returnquantity: double (nullable = true)

 |-- channel: string (nullable = true)

 |-- unitcost: double (nullable = true)

 |-- transid: string (nullable = true)

 |-- productid: string (nullable = true)

 |-- id: string (nullable = true)

 |-- campaign_campaigncost: double (nullable = true)

 |-- campaign_begindate: long (nullable = true)

 |-- campaign_begindate_ts: string (nullable = true)

 |-- campaign_begindate_dt: string (nullable = true)

 |-- campaign_enddate: long (nullable = true)

 |-- campaign_enddate_ts: string (nullable = true)

 |-- campaign_enddate_dt: string (nullable = true)

 |-- campaign_campaigntitle: string (nullable = true)

 |-- campaign_campaignname: string (nullable = true)

 |-- campaign_id: string (nullable = true)

 |-- product_categoryid: string (nullable = true)

 |-- product_company: string (nullable = true)

 |-- product_brandname: string (nullable = true)

 |-- product_vendorid: string (nullable = true)

 |-- product_color: string (nullable = true)

 |-- product_brandid: string (nullable = true)

 |-- product_description: string (nullable = true)

 |-- product_size: string (nullable = true)

 |-- product_subcategoryid: string (nullable = true)

 |-- product_departmentid: string (nullable = true)

 |-- product_productname: string (nullable = true)

 |-- product_categoryname: string (nullable = true)

 |-- product_vendorname: string (nullable = true)

 |-- product_sku: string (nullable = true)

 |-- product_subcategoryname: string (nullable = true)

 |-- product_status: string (nullable = true)

 |-- product_departmentname: string (nullable = true)

 |-- product_style: string (nullable = true)

 |-- product_id: string (nullable = true)

 |-- customer_lastname: string (nullable = true)

 |-- customer_familystatus: string (nullable = true)

 |-- customer_customertype: string (nullable = true)

 |-- customer_city: string (nullable = true)

 |-- customer_country: string (nullable = true)

 |-- customer_state: string (nullable = true)

 |-- customer_region: string (nullable = true)

 |-- customer_customergroup: string (nullable = true)

 |-- customer_maritalstatus: string (nullable = true)

 |-- customer_agerange: string (nullable = true)

 |-- customer_zip: string (nullable = true)

 |-- customer_age: double (nullable = true)

 |-- customer_address2: string (nullable = true)

 |-- customer_incomerange: string (nullable = true)

 |-- customer_gender: string (nullable = true)

 |-- customer_customerkey: string (nullable = true)

 |-- customer_address1: string (nullable = true)

 |-- customer_email: string (nullable = true)

 |-- customer_education: string (nullable = true)

 |-- customer_birthdate: long (nullable = true)

 |-- customer_birthdate_ts: string (nullable = true)

 |-- customer_birthdate_dt: string (nullable = true)

 |-- customer_id: string (nullable = true)

 |-- customer_firstname: string (nullable = true)

 |-- transnum: long (nullable = true)

 |-- transmonth: string (nullable = true)


Nothing but a flat schema with no duplicated column names. I then execute:


hc.sql(“select transid from rupbrand”)


and I get:


scala> hc.sql("select transid from rupbrand")

14/10/20 10:01:44 INFO ParseDriver: Parsing command: select transid from 
rupbrand

14/10/20 10:01:44 INFO ParseDriver: Parse Completed

res18: org.apache.spark.sql.SchemaRDD =

SchemaRDD[121] at RDD at SchemaRDD.scala:103

== Query Plan ==

== Physical Plan ==

org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved 
attributes: 'transid, tree:

Project ['transid]

 LowerCaseSchema

  Subquery rupbrand

   Join Inner, Some(('sc.segcustomerid = 'st.customerid))

    Subquery sc

     Filter CAST((COUNT(DISTINCT 't.transid) > 0), BooleanType)

      Aggregate ['c.customerid], ['c.customerId AS 
segcustomerid#5,SUM('t.sales) AS sales#6,COUNT(DISTINCT 't.transid) AS 
tx_count#7]

       Filter 'c.gender IN (Male)

        Join Inner, Some(('c.customerid = 't.customerid))

         Subquery t

          Aggregate [customerid#3259,transid#3266], ['d.customerId AS 
customerid#1,transid#3266 AS transid#2,SUM((quantity#3262 * …


I’m wondering if the query for the column in my join table is somehow 
conflicting with the columns from the two tables on which the join table is 
constructed from as I see the plan, a breakdown of various pieces from the 
queries on my two source tables.


Help?


Thanks,

-Terry



Reply via email to