please help why error thrown out in org.apache.spark.sql.catalyst.expressions.BindReferences in spark3

2023-03-26 Thread zhangliyun



hi all




 i have a query 

 ```




1  spark.sql("select

distinct cust_id,

cast (b.device_name as varchar(200)) as devc_name_cast,

prmry_reside_cntry_code

from (select * from ${model_db}.crs_recent_30d_SF_dim_cust_info where 
dt='${today}') a

join fact_rsk_magnes_txn b on a.cust_id = b.customer_id

where b.device_name <> '#'

and b.device_name is not null

and b.device_name <>'~'").registerTempTable("device_driver_info_0")




2 spark.sql("select 

*,

lower(regexp_REPLACE (devc_name_cast, 'â', 'a') ) as devc_name_norm

from device_driver_info_0").registerTempTable("device_driver_info_1")










 3 spark.sql("select 

cust_id,

devc_name_norm ||'_'|| prmry_reside_cntry_code as Device_Name_Country

from device_driver_info_1 where 
dt='${today}'").registerTempTable("device_driver_info")




4 spark.sql("select 

cust_id,

Device_Name_Country

from device_driver_info

where Device_Name_Country is not null

group by 1,2").registerTempTable("device_name_SF_final_acct_info")

 




5 spark.sql("select

Device_Name_Country,

count(distinct cust_id) as cust_cnt

from device_name_SF_final_acct_info 

group by 1").registerTempTable("device_count_1")




spark.sql("select * from device_count_1 where cust_cnt between 5 and 
5000").registerTempTable("device_count")




6 spark.sql("select

b.cust_id,

cast('Device_Name_Country' as varchar(100)) network_source,

cast(a.Device_Name_Country as varchar(100)) as network_source_value

from device_count a

left join device_name_SF_final_acct_info b

on a.Device_Name_Country=b.Device_Name_Country").write

.mode(SaveMode.Overwrite)

.insertInto(s"$databaseName.$tableName")

 ```




the problem here is from the logical plan , we can see Device_Name_Country is 
composed

by 'devc_name_norm ||'_'|| prmry_reside_cntry_code' in sql#3  but it does not 
show in below logic plan so it throws error.  I find the sql run successfully 
on spark2 while on 

spark3.1.2 it has error, please help 













ShuffleQueryStage 6

+- Exchange hashpartitioning(cust_id#4030, Device_Name_Country#4099, 3001), 
ENSURE_REQUIREMENTS, [id=#2669]

+- *(5) HashAggregate(keys=[cust_id#4030, Device_Name_Country#4099], 
functions=[], output=[cust_id#4030, Device_Name_Country#4099])

+- CustomShuffleReader coalesced

+- ShuffleQueryStage 3

+- Exchange hashpartitioning(cust_id#4030, devc_name_cast#4029, 
prmry_reside_cntry_code#4036, 3001), ENSURE_REQUIREMENTS, [id=#2376]

+- *(3) HashAggregate(keys=[cust_id#4030, devc_name_cast#4029, 
prmry_reside_cntry_code#4036], functions=[], output=[cust_id#4030, 
devc_name_cast#4029, prmry_reside_cntry_code#4036])

+- *(3) Project [cust_id#4030, device_name#3453 AS devc_name_cast#4029, 
prmry_reside_cntry_code#4036]

+- *(3) BroadcastHashJoin [cust_id#4030], [customer_id#3431], Inner, BuildLeft, 
isnotnull(concat(concat(lower(regexp_replace(device_name#3453, â, a, 1)), _), 
prmry_reside_cntry_code#4036)), false

:- BroadcastQueryStage 0

: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, 
false]),false), [id=#2132]

: +- *(1) Filter isnotnull(cust_id#4030)

: +- Scan hive unified_group_review_cri_group.crs_recent_30d_sf_dim_cust_info 
[cust_id#4030, prmry_reside_cntry_code#4036], HiveTableRelation 
[`unified_group_review_cri_group`.`crs_recent_30d_sf_dim_cust_info`, 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [cust_id#4030, 
acct_cre_dt#4031, is_guest_y_n#4032, prmry_email_domain#4033, 
cust_first_name#4034..., Partition Cols: [dt#4048], Pruned Partitions: 
[(dt=2023-03-23)]], [isnotnull(dt#4048), (dt#4048 = 2023-03-23)]

+- *(3) Project [customer_id#3431, device_name#3453]

+- *(3) Filter (((NOT (device_name#3453 = #) AND isnotnull(device_name#3453)) 
AND NOT (device_name#3453 = ~)) AND isnotnull(customer_id#3431))

+- *(3) ColumnarToRow

+- FileScan parquet 
pp_risk_ops_qh_tables.magnes_fraudnet_login_raw[customer_id#3431,device_name#3453,ts#3603,event_dt#3604]
 Batched: true, DataFilters: [NOT (device_name#3453 = #), 
isnotnull(device_name#3453), NOT (device_name#3453 = ~), isnotnull(c..., 
Format: Parquet, Location: 
InMemoryFileIndex[gs://pypl-bkt-prd-row-std-gds-non-edw-tables/apps/risk/ads/rda/magnes_fraudnet_...,
 PartitionFilters: [isnotnull(event_dt#3604), (cast(event_dt#3604 as date) >= 
19409), (event_dt#3604 <= 2023-03-23)], PushedFilters: 
[Not(EqualTo(device_name,#)), IsNotNull(device_name), 
Not(EqualTo(device_name,~)), IsNotNull(cust..., ReadSchema: 
struct




ERROR 
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: 
execute, tree:
ShuffleQueryStage 6
+- Exchange hashpartitioning(cust_id#13555, Device_Name_Country#13624, 3001), 
ENSURE_REQUIREMENTS, [id=#23666]
+- *(5) HashAggregate(keys=[cust_id#13555, Device_Name_Country#13624], 
functions=[], output=[cust_id#13555, Device_Name_Country#13624])
+- CustomShuffleReader coalesced
+- ShuffleQueryStage 3
+- Exchange hashpartitioning(cust_id#13555, 

[no subject]

2023-03-26 Thread Tanay Banerjee
unsubscribe


Unsubscribe

2023-03-26 Thread ankur
unsubscribe