[jira] [Commented] (SPARK-43170) The spark sql like statement is pushed down to parquet for execution, but the data cannot be queried

2023-04-18 Thread todd (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-43170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17713855#comment-17713855
 ] 

todd commented on SPARK-43170:
--

[~yumwang]  The code only executes spark.sql("xxx"), but does not perform 
cache-related operations. But the same code, why spark3.0 and spark3.2 have 
different results.If it is convenient for you, you can reproduce it.

> The spark sql like statement is pushed down to parquet for execution, but the 
> data cannot be queried
> 
>
> Key: SPARK-43170
> URL: https://issues.apache.org/jira/browse/SPARK-43170
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.2
>Reporter: todd
>Priority: Major
> Attachments: image-2023-04-18-10-59-30-199.png
>
>
> --DDL
> CREATE TABLE `ecom_dwm`.`dwm_user_app_action_sum_all` (
>   `gaid` STRING COMMENT '',
>   `beyla_id` STRING COMMENT '',
>   `dt` STRING,
>   `hour` STRING,
>   `appid` STRING COMMENT '包名')
> USING parquet
> PARTITIONED BY (dt, hour, appid)
> LOCATION 's3://x/dwm_user_app_action_sum_all'
> – partitions  info
> show partitions ecom_dwm.dwm_user_app_action_sum_all PARTITION 
> (dt='20230412');
>  
> dt=20230412/hour=23/appid=blibli.mobile.commerce
> dt=20230412/hour=23/appid=cn.shopee.app
> dt=20230412/hour=23/appid=cn.shopee.br
> dt=20230412/hour=23/appid=cn.shopee.id
> dt=20230412/hour=23/appid=cn.shopee.my
> dt=20230412/hour=23/appid=cn.shopee.ph
>  
> — query
> select DISTINCT(appid) from ecom_dwm.dwm_user_app_action_sum_all
> where dt='20230412' and appid like '%shopee%'
>  
> --result
>  nodata 
>  
> — other
> I use spark3.0.1 version and trino query engine to query the data。
>  
>  
> The physical execution node formed by spark 3.2
> (3) Scan parquet ecom_dwm.dwm_user_app_action_sum_all Output [3]: [dt#63, 
> hour#64, appid#65|#63, hour#64, appid#65] Batched: true Location: 
> InMemoryFileIndex []
> PartitionFilters: [isnotnull(dt#63), isnotnull(appid#65), (dt#63 = 20230412), 
> Contains(appid#65, shopee)|#63), isnotnull(appid#65), (dt#63 = 20230412), 
> Contains(appid#65, shopee)] ReadSchema: struct<>
>  
>  
> !image-2023-04-18-10-59-30-199.png!
>  
>  – sql plan detail
> {code:java}
> == Physical Plan ==
> CollectLimit (9)
> +- InMemoryTableScan (1)
>   +- InMemoryRelation (2)
> +- * HashAggregate (8)
>+- Exchange (7)
>   +- * HashAggregate (6)
>  +- * Project (5)
> +- * ColumnarToRow (4)
>+- Scan parquet 
> ecom_dwm.dwm_user_app_action_sum_all (3)
> (1) InMemoryTableScan
> Output [1]: [appid#65]
> Arguments: [appid#65]
> (2) InMemoryRelation
> Arguments: [appid#65], 
> CachedRDDBuilder(org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer@ab5af13,StorageLevel(disk,
>  memory, deserialized, 1 replicas),*(2) HashAggregate(keys=[appid#65], 
> functions=[], output=[appid#65])
> +- Exchange hashpartitioning(appid#65, 200), ENSURE_REQUIREMENTS, [plan_id=24]
>+- *(1) HashAggregate(keys=[appid#65], functions=[], output=[appid#65])
>   +- *(1) Project [appid#65]
>  +- *(1) ColumnarToRow
> +- FileScan parquet 
> ecom_dwm.dwm_user_app_action_sum_all[dt#63,hour#64,appid#65] Batched: true, 
> DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(0 paths)[], 
> PartitionFilters: [isnotnull(dt#63), isnotnull(appid#65), (dt#63 = 20230412), 
> StartsWith(appid#65, com)], PushedFilters: [], ReadSchema: struct<>
> ,None)
> (3) Scan parquet ecom_dwm.dwm_user_app_action_sum_all
> Output [3]: [dt#63, hour#64, appid#65]
> Batched: true
> Location: InMemoryFileIndex []
> PartitionFilters: [isnotnull(dt#63), isnotnull(appid#65), (dt#63 = 20230412), 
> StartsWith(appid#65, com)]
> ReadSchema: struct<>
> (4) ColumnarToRow [codegen id : 1]
> Input [3]: [dt#63, hour#64, appid#65]
> (5) Project [codegen id : 1]
> Output [1]: [appid#65]
> Input [3]: [dt#63, hour#64, appid#65]
> (6) HashAggregate [codegen id : 1]
> Input [1]: [appid#65]
> Keys [1]: [appid#65]
> Functions: []
> Aggregate Attributes: []
> Results [1]: [appid#65]
> (7) Exchange
> Input [1]: [appid#65]
> Arguments: hashpartitioning(appid#65, 200), ENSURE_REQUIREMENTS, [plan_id=24]
> (8) HashAggregate [codegen id : 2]
> Input [1]: [appid#65]
> Keys [1]: [appid#65]
> Functions: []
> Aggregate Attributes: []
> Results [1]: [appid#65]
> (9) CollectLimit
> Input [1]: [appid#65]
> Arguments: 1 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-43170) The spark sql like statement is pushed down to parquet for execution, but the data cannot be queried

2023-04-18 Thread todd (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-43170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17713543#comment-17713543
 ] 

todd commented on SPARK-43170:
--

[~yumwang]  no cache

> The spark sql like statement is pushed down to parquet for execution, but the 
> data cannot be queried
> 
>
> Key: SPARK-43170
> URL: https://issues.apache.org/jira/browse/SPARK-43170
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.2
>Reporter: todd
>Priority: Major
> Attachments: image-2023-04-18-10-59-30-199.png
>
>
> --DDL
> CREATE TABLE `ecom_dwm`.`dwm_user_app_action_sum_all` (
>   `gaid` STRING COMMENT '',
>   `beyla_id` STRING COMMENT '',
>   `dt` STRING,
>   `hour` STRING,
>   `appid` STRING COMMENT '包名')
> USING parquet
> PARTITIONED BY (dt, hour, appid)
> LOCATION 's3://x/dwm_user_app_action_sum_all'
> – partitions  info
> show partitions ecom_dwm.dwm_user_app_action_sum_all PARTITION 
> (dt='20230412');
>  
> dt=20230412/hour=23/appid=blibli.mobile.commerce
> dt=20230412/hour=23/appid=cn.shopee.app
> dt=20230412/hour=23/appid=cn.shopee.br
> dt=20230412/hour=23/appid=cn.shopee.id
> dt=20230412/hour=23/appid=cn.shopee.my
> dt=20230412/hour=23/appid=cn.shopee.ph
>  
> — query
> select DISTINCT(appid) from ecom_dwm.dwm_user_app_action_sum_all
> where dt='20230412' and appid like '%shopee%'
>  
> --result
>  nodata 
>  
> — other
> I use spark3.0.1 version and trino query engine to query the data。
>  
>  
> The physical execution node formed by spark 3.2
> (3) Scan parquet ecom_dwm.dwm_user_app_action_sum_all Output [3]: [dt#63, 
> hour#64, appid#65|#63, hour#64, appid#65] Batched: true Location: 
> InMemoryFileIndex []
> PartitionFilters: [isnotnull(dt#63), isnotnull(appid#65), (dt#63 = 20230412), 
> Contains(appid#65, shopee)|#63), isnotnull(appid#65), (dt#63 = 20230412), 
> Contains(appid#65, shopee)] ReadSchema: struct<>
>  
>  
> !image-2023-04-18-10-59-30-199.png!
>  
>  – sql plan detail
> {code:java}
> == Physical Plan ==
> CollectLimit (9)
> +- InMemoryTableScan (1)
>   +- InMemoryRelation (2)
> +- * HashAggregate (8)
>+- Exchange (7)
>   +- * HashAggregate (6)
>  +- * Project (5)
> +- * ColumnarToRow (4)
>+- Scan parquet 
> ecom_dwm.dwm_user_app_action_sum_all (3)
> (1) InMemoryTableScan
> Output [1]: [appid#65]
> Arguments: [appid#65]
> (2) InMemoryRelation
> Arguments: [appid#65], 
> CachedRDDBuilder(org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer@ab5af13,StorageLevel(disk,
>  memory, deserialized, 1 replicas),*(2) HashAggregate(keys=[appid#65], 
> functions=[], output=[appid#65])
> +- Exchange hashpartitioning(appid#65, 200), ENSURE_REQUIREMENTS, [plan_id=24]
>+- *(1) HashAggregate(keys=[appid#65], functions=[], output=[appid#65])
>   +- *(1) Project [appid#65]
>  +- *(1) ColumnarToRow
> +- FileScan parquet 
> ecom_dwm.dwm_user_app_action_sum_all[dt#63,hour#64,appid#65] Batched: true, 
> DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(0 paths)[], 
> PartitionFilters: [isnotnull(dt#63), isnotnull(appid#65), (dt#63 = 20230412), 
> StartsWith(appid#65, com)], PushedFilters: [], ReadSchema: struct<>
> ,None)
> (3) Scan parquet ecom_dwm.dwm_user_app_action_sum_all
> Output [3]: [dt#63, hour#64, appid#65]
> Batched: true
> Location: InMemoryFileIndex []
> PartitionFilters: [isnotnull(dt#63), isnotnull(appid#65), (dt#63 = 20230412), 
> StartsWith(appid#65, com)]
> ReadSchema: struct<>
> (4) ColumnarToRow [codegen id : 1]
> Input [3]: [dt#63, hour#64, appid#65]
> (5) Project [codegen id : 1]
> Output [1]: [appid#65]
> Input [3]: [dt#63, hour#64, appid#65]
> (6) HashAggregate [codegen id : 1]
> Input [1]: [appid#65]
> Keys [1]: [appid#65]
> Functions: []
> Aggregate Attributes: []
> Results [1]: [appid#65]
> (7) Exchange
> Input [1]: [appid#65]
> Arguments: hashpartitioning(appid#65, 200), ENSURE_REQUIREMENTS, [plan_id=24]
> (8) HashAggregate [codegen id : 2]
> Input [1]: [appid#65]
> Keys [1]: [appid#65]
> Functions: []
> Aggregate Attributes: []
> Results [1]: [appid#65]
> (9) CollectLimit
> Input [1]: [appid#65]
> Arguments: 1 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-43170) The spark sql like statement is pushed down to parquet for execution, but the data cannot be queried

2023-04-17 Thread todd (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-43170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17713403#comment-17713403
 ] 

todd commented on SPARK-43170:
--

Spark3.2.x is currently used in production, and there is no plan to upgrade to 
a higher version for the time being.   If it's a bug, isn't the spark3.2 
version going to be fixed?

> The spark sql like statement is pushed down to parquet for execution, but the 
> data cannot be queried
> 
>
> Key: SPARK-43170
> URL: https://issues.apache.org/jira/browse/SPARK-43170
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.2
>Reporter: todd
>Priority: Major
> Attachments: image-2023-04-18-10-59-30-199.png
>
>
> --DDL
> CREATE TABLE `ecom_dwm`.`dwm_user_app_action_sum_all` (
>   `gaid` STRING COMMENT '',
>   `beyla_id` STRING COMMENT '',
>   `dt` STRING,
>   `hour` STRING,
>   `appid` STRING COMMENT '包名')
> USING parquet
> PARTITIONED BY (dt, hour, appid)
> LOCATION 's3://x/dwm_user_app_action_sum_all'
> – partitions  info
> show partitions ecom_dwm.dwm_user_app_action_sum_all PARTITION 
> (dt='20230412');
>  
> dt=20230412/hour=23/appid=blibli.mobile.commerce
> dt=20230412/hour=23/appid=cn.shopee.app
> dt=20230412/hour=23/appid=cn.shopee.br
> dt=20230412/hour=23/appid=cn.shopee.id
> dt=20230412/hour=23/appid=cn.shopee.my
> dt=20230412/hour=23/appid=cn.shopee.ph
>  
> — query
> select DISTINCT(appid) from ecom_dwm.dwm_user_app_action_sum_all
> where dt='20230412' and appid like '%shopee%'
>  
> --result
>  nodata 
>  
> — other
> I use spark3.0.1 version and trino query engine to query the data。
>  
>  
> The physical execution node formed by spark 3.2
> (3) Scan parquet ecom_dwm.dwm_user_app_action_sum_all Output [3]: [dt#63, 
> hour#64, appid#65|#63, hour#64, appid#65] Batched: true Location: 
> InMemoryFileIndex []
> PartitionFilters: [isnotnull(dt#63), isnotnull(appid#65), (dt#63 = 20230412), 
> Contains(appid#65, shopee)|#63), isnotnull(appid#65), (dt#63 = 20230412), 
> Contains(appid#65, shopee)] ReadSchema: struct<>
>  
>  
> !image-2023-04-18-10-59-30-199.png!
>  
>  – sql plan detail
> {code:java}
> == Physical Plan ==
> CollectLimit (9)
> +- InMemoryTableScan (1)
>   +- InMemoryRelation (2)
> +- * HashAggregate (8)
>+- Exchange (7)
>   +- * HashAggregate (6)
>  +- * Project (5)
> +- * ColumnarToRow (4)
>+- Scan parquet 
> ecom_dwm.dwm_user_app_action_sum_all (3)
> (1) InMemoryTableScan
> Output [1]: [appid#65]
> Arguments: [appid#65]
> (2) InMemoryRelation
> Arguments: [appid#65], 
> CachedRDDBuilder(org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer@ab5af13,StorageLevel(disk,
>  memory, deserialized, 1 replicas),*(2) HashAggregate(keys=[appid#65], 
> functions=[], output=[appid#65])
> +- Exchange hashpartitioning(appid#65, 200), ENSURE_REQUIREMENTS, [plan_id=24]
>+- *(1) HashAggregate(keys=[appid#65], functions=[], output=[appid#65])
>   +- *(1) Project [appid#65]
>  +- *(1) ColumnarToRow
> +- FileScan parquet 
> ecom_dwm.dwm_user_app_action_sum_all[dt#63,hour#64,appid#65] Batched: true, 
> DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(0 paths)[], 
> PartitionFilters: [isnotnull(dt#63), isnotnull(appid#65), (dt#63 = 20230412), 
> StartsWith(appid#65, com)], PushedFilters: [], ReadSchema: struct<>
> ,None)
> (3) Scan parquet ecom_dwm.dwm_user_app_action_sum_all
> Output [3]: [dt#63, hour#64, appid#65]
> Batched: true
> Location: InMemoryFileIndex []
> PartitionFilters: [isnotnull(dt#63), isnotnull(appid#65), (dt#63 = 20230412), 
> StartsWith(appid#65, com)]
> ReadSchema: struct<>
> (4) ColumnarToRow [codegen id : 1]
> Input [3]: [dt#63, hour#64, appid#65]
> (5) Project [codegen id : 1]
> Output [1]: [appid#65]
> Input [3]: [dt#63, hour#64, appid#65]
> (6) HashAggregate [codegen id : 1]
> Input [1]: [appid#65]
> Keys [1]: [appid#65]
> Functions: []
> Aggregate Attributes: []
> Results [1]: [appid#65]
> (7) Exchange
> Input [1]: [appid#65]
> Arguments: hashpartitioning(appid#65, 200), ENSURE_REQUIREMENTS, [plan_id=24]
> (8) HashAggregate [codegen id : 2]
> Input [1]: [appid#65]
> Keys [1]: [appid#65]
> Functions: []
> Aggregate Attributes: []
> Results [1]: [appid#65]
> (9) CollectLimit
> Input [1]: [appid#65]
> Arguments: 1 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-43170) The spark sql like statement is pushed down to parquet for execution, but the data cannot be queried

2023-04-17 Thread todd (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

todd updated SPARK-43170:
-
Description: 
--DDL

CREATE TABLE `ecom_dwm`.`dwm_user_app_action_sum_all` (
  `gaid` STRING COMMENT '',
  `beyla_id` STRING COMMENT '',
  `dt` STRING,
  `hour` STRING,
  `appid` STRING COMMENT '包名')
USING parquet
PARTITIONED BY (dt, hour, appid)
LOCATION 's3://x/dwm_user_app_action_sum_all'

– partitions  info
show partitions ecom_dwm.dwm_user_app_action_sum_all PARTITION (dt='20230412');
 
dt=20230412/hour=23/appid=blibli.mobile.commerce
dt=20230412/hour=23/appid=cn.shopee.app
dt=20230412/hour=23/appid=cn.shopee.br
dt=20230412/hour=23/appid=cn.shopee.id
dt=20230412/hour=23/appid=cn.shopee.my
dt=20230412/hour=23/appid=cn.shopee.ph
 
— query
select DISTINCT(appid) from ecom_dwm.dwm_user_app_action_sum_all
where dt='20230412' and appid like '%shopee%'
 
--result
 nodata 
 
— other
I use spark3.0.1 version and trino query engine to query the data。
 
 
The physical execution node formed by spark 3.2
(3) Scan parquet ecom_dwm.dwm_user_app_action_sum_all Output [3]: [dt#63, 
hour#64, appid#65|#63, hour#64, appid#65] Batched: true Location: 
InMemoryFileIndex []
PartitionFilters: [isnotnull(dt#63), isnotnull(appid#65), (dt#63 = 20230412), 
Contains(appid#65, shopee)|#63), isnotnull(appid#65), (dt#63 = 20230412), 
Contains(appid#65, shopee)] ReadSchema: struct<>
 
 
!image-2023-04-18-10-59-30-199.png!
 
 – sql plan detail
{code:java}
== Physical Plan ==
CollectLimit (9)
+- InMemoryTableScan (1)
  +- InMemoryRelation (2)
+- * HashAggregate (8)
   +- Exchange (7)
  +- * HashAggregate (6)
 +- * Project (5)
+- * ColumnarToRow (4)
   +- Scan parquet ecom_dwm.dwm_user_app_action_sum_all 
(3)


(1) InMemoryTableScan
Output [1]: [appid#65]
Arguments: [appid#65]

(2) InMemoryRelation
Arguments: [appid#65], 
CachedRDDBuilder(org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer@ab5af13,StorageLevel(disk,
 memory, deserialized, 1 replicas),*(2) HashAggregate(keys=[appid#65], 
functions=[], output=[appid#65])
+- Exchange hashpartitioning(appid#65, 200), ENSURE_REQUIREMENTS, [plan_id=24]
   +- *(1) HashAggregate(keys=[appid#65], functions=[], output=[appid#65])
  +- *(1) Project [appid#65]
 +- *(1) ColumnarToRow
+- FileScan parquet 
ecom_dwm.dwm_user_app_action_sum_all[dt#63,hour#64,appid#65] Batched: true, 
DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(0 paths)[], 
PartitionFilters: [isnotnull(dt#63), isnotnull(appid#65), (dt#63 = 20230412), 
StartsWith(appid#65, com)], PushedFilters: [], ReadSchema: struct<>
,None)

(3) Scan parquet ecom_dwm.dwm_user_app_action_sum_all
Output [3]: [dt#63, hour#64, appid#65]
Batched: true
Location: InMemoryFileIndex []
PartitionFilters: [isnotnull(dt#63), isnotnull(appid#65), (dt#63 = 20230412), 
StartsWith(appid#65, com)]
ReadSchema: struct<>

(4) ColumnarToRow [codegen id : 1]
Input [3]: [dt#63, hour#64, appid#65]

(5) Project [codegen id : 1]
Output [1]: [appid#65]
Input [3]: [dt#63, hour#64, appid#65]

(6) HashAggregate [codegen id : 1]
Input [1]: [appid#65]
Keys [1]: [appid#65]
Functions: []
Aggregate Attributes: []
Results [1]: [appid#65]

(7) Exchange
Input [1]: [appid#65]
Arguments: hashpartitioning(appid#65, 200), ENSURE_REQUIREMENTS, [plan_id=24]

(8) HashAggregate [codegen id : 2]
Input [1]: [appid#65]
Keys [1]: [appid#65]
Functions: []
Aggregate Attributes: []
Results [1]: [appid#65]

(9) CollectLimit
Input [1]: [appid#65]
Arguments: 1 {code}

  was:
--DDL

CREATE TABLE `ecom_dwm`.`dwm_user_app_action_sum_all` (
  `gaid` STRING COMMENT '',
  `beyla_id` STRING COMMENT '',
  `dt` STRING,
  `hour` STRING,
  `appid` STRING COMMENT '包名')
USING parquet
PARTITIONED BY (dt, hour, appid)
LOCATION 's3://x/dwm_user_app_action_sum_all'

-- partitions  info
show partitions ecom_dwm.dwm_user_app_action_sum_all PARTITION (dt='20230412');
 
dt=20230412/hour=23/appid=blibli.mobile.commerce
dt=20230412/hour=23/appid=cn.shopee.app
dt=20230412/hour=23/appid=cn.shopee.br
dt=20230412/hour=23/appid=cn.shopee.id
dt=20230412/hour=23/appid=cn.shopee.my
dt=20230412/hour=23/appid=cn.shopee.ph
 
--- query
select DISTINCT(appid) from ecom_dwm.dwm_user_app_action_sum_all
where dt='20230412' and appid like '%shopee%'
 
--result
 nodata 
 
--- other
I use spark3.0.1 version and trino query engine to query the data。
 
 
The physical execution node formed by spark 3.2
(3) Scan parquet ecom_dwm.dwm_user_app_action_sum_all Output [3]: [dt#63, 
hour#64, appid#65] Batched: true Location: InMemoryFileIndex []
PartitionFilters: [isnotnull(dt#63), isnotnull(appid#65), (dt#63 = 20230412), 
Contains(appid#65, shopee)] ReadSchema: struct<>
 
 
!image-2023-04-18-10-59-30-199.png!
 
 


> The spark sql like statement is pushed down to parquet for execution, but the 
> dat

[jira] [Updated] (SPARK-43170) The spark sql like statement is pushed down to parquet for execution, but the data cannot be queried

2023-04-17 Thread todd (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

todd updated SPARK-43170:
-
Description: 
--DDL

CREATE TABLE `ecom_dwm`.`dwm_user_app_action_sum_all` (
  `gaid` STRING COMMENT '',
  `beyla_id` STRING COMMENT '',
  `dt` STRING,
  `hour` STRING,
  `appid` STRING COMMENT '包名')
USING parquet
PARTITIONED BY (dt, hour, appid)
LOCATION 's3://x/dwm_user_app_action_sum_all'

-- partitions  info
show partitions ecom_dwm.dwm_user_app_action_sum_all PARTITION (dt='20230412');
 
dt=20230412/hour=23/appid=blibli.mobile.commerce
dt=20230412/hour=23/appid=cn.shopee.app
dt=20230412/hour=23/appid=cn.shopee.br
dt=20230412/hour=23/appid=cn.shopee.id
dt=20230412/hour=23/appid=cn.shopee.my
dt=20230412/hour=23/appid=cn.shopee.ph
 
--- query
select DISTINCT(appid) from ecom_dwm.dwm_user_app_action_sum_all
where dt='20230412' and appid like '%shopee%'
 
--result
 nodata 
 
--- other
I use spark3.0.1 version and trino query engine to query the data。
 
 
The physical execution node formed by spark 3.2
(3) Scan parquet ecom_dwm.dwm_user_app_action_sum_all Output [3]: [dt#63, 
hour#64, appid#65] Batched: true Location: InMemoryFileIndex []
PartitionFilters: [isnotnull(dt#63), isnotnull(appid#65), (dt#63 = 20230412), 
Contains(appid#65, shopee)] ReadSchema: struct<>
 
 
!image-2023-04-18-10-59-30-199.png!
 
 

> The spark sql like statement is pushed down to parquet for execution, but the 
> data cannot be queried
> 
>
> Key: SPARK-43170
> URL: https://issues.apache.org/jira/browse/SPARK-43170
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.2
>Reporter: todd
>Priority: Blocker
> Attachments: image-2023-04-18-10-59-30-199.png
>
>
> --DDL
> CREATE TABLE `ecom_dwm`.`dwm_user_app_action_sum_all` (
>   `gaid` STRING COMMENT '',
>   `beyla_id` STRING COMMENT '',
>   `dt` STRING,
>   `hour` STRING,
>   `appid` STRING COMMENT '包名')
> USING parquet
> PARTITIONED BY (dt, hour, appid)
> LOCATION 's3://x/dwm_user_app_action_sum_all'
> -- partitions  info
> show partitions ecom_dwm.dwm_user_app_action_sum_all PARTITION 
> (dt='20230412');
>  
> dt=20230412/hour=23/appid=blibli.mobile.commerce
> dt=20230412/hour=23/appid=cn.shopee.app
> dt=20230412/hour=23/appid=cn.shopee.br
> dt=20230412/hour=23/appid=cn.shopee.id
> dt=20230412/hour=23/appid=cn.shopee.my
> dt=20230412/hour=23/appid=cn.shopee.ph
>  
> --- query
> select DISTINCT(appid) from ecom_dwm.dwm_user_app_action_sum_all
> where dt='20230412' and appid like '%shopee%'
>  
> --result
>  nodata 
>  
> --- other
> I use spark3.0.1 version and trino query engine to query the data。
>  
>  
> The physical execution node formed by spark 3.2
> (3) Scan parquet ecom_dwm.dwm_user_app_action_sum_all Output [3]: [dt#63, 
> hour#64, appid#65] Batched: true Location: InMemoryFileIndex []
> PartitionFilters: [isnotnull(dt#63), isnotnull(appid#65), (dt#63 = 20230412), 
> Contains(appid#65, shopee)] ReadSchema: struct<>
>  
>  
> !image-2023-04-18-10-59-30-199.png!
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-43170) The spark sql like statement is pushed down to parquet for execution, but the data cannot be queried

2023-04-17 Thread todd (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

todd updated SPARK-43170:
-
Attachment: image-2023-04-18-10-59-30-199.png

> The spark sql like statement is pushed down to parquet for execution, but the 
> data cannot be queried
> 
>
> Key: SPARK-43170
> URL: https://issues.apache.org/jira/browse/SPARK-43170
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.2
>Reporter: todd
>Priority: Blocker
> Attachments: image-2023-04-18-10-59-30-199.png
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-43170) The spark sql like statement is pushed down to parquet for execution, but the data cannot be queried

2023-04-17 Thread todd (Jira)
todd created SPARK-43170:


 Summary: The spark sql like statement is pushed down to parquet 
for execution, but the data cannot be queried
 Key: SPARK-43170
 URL: https://issues.apache.org/jira/browse/SPARK-43170
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.2.2
Reporter: todd






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-40298) shuffle data recovery on the reused PVCs no effect

2022-09-07 Thread todd (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-40298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17601225#comment-17601225
 ] 

todd commented on SPARK-40298:
--

[~dongjoon] 

I use aws spot cluster to terminate the instance where the pod is located 
during the shuffle read phase of the spark stage. The executed task will throw 
FetchFailed and ExecutorLostFailure exceptions. 

I hope that by reusing the PVC, only the current failed task will be 
recalculated, not the previous stage.

We use this feature to avoid spark task recalculation when the aws spot cluster 
recycles machines, thereby saving computing costs.

> shuffle data recovery on the reused PVCs  no effect
> ---
>
> Key: SPARK-40298
> URL: https://issues.apache.org/jira/browse/SPARK-40298
> Project: Spark
>  Issue Type: Bug
>  Components: Kubernetes
>Affects Versions: 3.2.2
>Reporter: todd
>Priority: Major
> Attachments: 1662002808396.jpg, 1662002822097.jpg
>
>
> I use spark3.2.2 to test the [ Support shuffle data recovery on the reused 
> PVCs (SPARK-35593) ] feature.I found that when shuffle read fails, data is 
> still read from source.
> It can be confirmed that the pvc has been multiplexed by other pods, and the 
> Index and data data information has been sent
> *This is my spark configuration information:*
> --conf spark.driver.memory=5G 
> --conf spark.executor.memory=15G 
> --conf spark.executor.cores=1
> --conf spark.executor.instances=50
> --conf spark.sql.shuffle.partitions=50
> --conf spark.dynamicAllocation.enabled=false
> --conf spark.kubernetes.driver.reusePersistentVolumeClaim=true
> --conf spark.kubernetes.driver.ownPersistentVolumeClaim=true
> --conf 
> spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.claimName=OnDemand
> --conf 
> spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.storageClass=gp2
> --conf 
> spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.sizeLimit=100Gi
> --conf 
> spark.kubernetes.executor.volumes.persistentVolumeClaim.data.mount.path=/tmp/data
> --conf 
> spark.kubernetes.executor.volumes.persistentVolumeClaim.data.mount.readOnly=false
> --conf spark.executorEnv.SPARK_EXECUTOR_DIRS=/tmp/data
> --conf 
> spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO
> --conf spark.kubernetes.executor.missingPodDetectDelta=10s
> --conf spark.kubernetes.executor.apiPollingInterval=10s
> --conf spark.shuffle.io.retryWait=60s
> --conf spark.shuffle.io.maxRetries=5
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] (SPARK-35593) Support shuffle data recovery on the reused PVCs

2022-09-07 Thread todd (Jira)


[ https://issues.apache.org/jira/browse/SPARK-35593 ]


todd deleted comment on SPARK-35593:
--

was (Author: todd5167):
[~dongjoon] [~apachespark]     Can you take a look at this question:  
https://issues.apache.org/jira/browse/SPARK-40298

> Support shuffle data recovery on the reused PVCs
> 
>
> Key: SPARK-35593
> URL: https://issues.apache.org/jira/browse/SPARK-35593
> Project: Spark
>  Issue Type: New Feature
>  Components: Kubernetes, Spark Core
>Affects Versions: 3.2.0
>Reporter: Dongjoon Hyun
>Assignee: Dongjoon Hyun
>Priority: Major
> Fix For: 3.2.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Reopened] (SPARK-40298) shuffle data recovery on the reused PVCs no effect

2022-09-04 Thread todd (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

todd reopened SPARK-40298:
--

> shuffle data recovery on the reused PVCs  no effect
> ---
>
> Key: SPARK-40298
> URL: https://issues.apache.org/jira/browse/SPARK-40298
> Project: Spark
>  Issue Type: Bug
>  Components: Kubernetes
>Affects Versions: 3.2.2
>Reporter: todd
>Priority: Major
> Attachments: 1662002808396.jpg, 1662002822097.jpg
>
>
> I use spark3.2.2 to test the [ Support shuffle data recovery on the reused 
> PVCs (SPARK-35593) ] feature.I found that when shuffle read fails, data is 
> still read from source.
> It can be confirmed that the pvc has been multiplexed by other pods, and the 
> Index and data data information has been sent
> *This is my spark configuration information:*
> --conf spark.driver.memory=5G 
> --conf spark.executor.memory=15G 
> --conf spark.executor.cores=1
> --conf spark.executor.instances=50
> --conf spark.sql.shuffle.partitions=50
> --conf spark.dynamicAllocation.enabled=false
> --conf spark.kubernetes.driver.reusePersistentVolumeClaim=true
> --conf spark.kubernetes.driver.ownPersistentVolumeClaim=true
> --conf 
> spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.claimName=OnDemand
> --conf 
> spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.storageClass=gp2
> --conf 
> spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.sizeLimit=100Gi
> --conf 
> spark.kubernetes.executor.volumes.persistentVolumeClaim.data.mount.path=/tmp/data
> --conf 
> spark.kubernetes.executor.volumes.persistentVolumeClaim.data.mount.readOnly=false
> --conf spark.executorEnv.SPARK_EXECUTOR_DIRS=/tmp/data
> --conf 
> spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO
> --conf spark.kubernetes.executor.missingPodDetectDelta=10s
> --conf spark.kubernetes.executor.apiPollingInterval=10s
> --conf spark.shuffle.io.retryWait=60s
> --conf spark.shuffle.io.maxRetries=5
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-40298) shuffle data recovery on the reused PVCs no effect

2022-09-01 Thread todd (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

todd updated SPARK-40298:
-
Priority: Blocker  (was: Major)

> shuffle data recovery on the reused PVCs  no effect
> ---
>
> Key: SPARK-40298
> URL: https://issues.apache.org/jira/browse/SPARK-40298
> Project: Spark
>  Issue Type: Bug
>  Components: Kubernetes
>Affects Versions: 3.2.2
>Reporter: todd
>Priority: Blocker
> Attachments: 1662002808396.jpg, 1662002822097.jpg
>
>
> I use spark3.2.2 to test the [ Support shuffle data recovery on the reused 
> PVCs (SPARK-35593) ] feature.I found that when shuffle read fails, data is 
> still read from source.
> It can be confirmed that the pvc has been multiplexed by other pods, and the 
> Index and data data information has been sent
> *This is my spark configuration information:*
> --conf spark.driver.memory=5G 
> --conf spark.executor.memory=15G 
> --conf spark.executor.cores=1
> --conf spark.executor.instances=50
> --conf spark.sql.shuffle.partitions=50
> --conf spark.dynamicAllocation.enabled=false
> --conf spark.kubernetes.driver.reusePersistentVolumeClaim=true
> --conf spark.kubernetes.driver.ownPersistentVolumeClaim=true
> --conf 
> spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.claimName=OnDemand
> --conf 
> spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.storageClass=gp2
> --conf 
> spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.sizeLimit=100Gi
> --conf 
> spark.kubernetes.executor.volumes.persistentVolumeClaim.data.mount.path=/tmp/data
> --conf 
> spark.kubernetes.executor.volumes.persistentVolumeClaim.data.mount.readOnly=false
> --conf spark.executorEnv.SPARK_EXECUTOR_DIRS=/tmp/data
> --conf 
> spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO
> --conf spark.kubernetes.executor.missingPodDetectDelta=10s
> --conf spark.kubernetes.executor.apiPollingInterval=10s
> --conf spark.shuffle.io.retryWait=60s
> --conf spark.shuffle.io.maxRetries=5
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-35593) Support shuffle data recovery on the reused PVCs

2022-08-31 Thread todd (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17598758#comment-17598758
 ] 

todd commented on SPARK-35593:
--

[~dongjoon] [~apachespark]     Can you take a look at this question:  
https://issues.apache.org/jira/browse/SPARK-40298

> Support shuffle data recovery on the reused PVCs
> 
>
> Key: SPARK-35593
> URL: https://issues.apache.org/jira/browse/SPARK-35593
> Project: Spark
>  Issue Type: New Feature
>  Components: Kubernetes, Spark Core
>Affects Versions: 3.2.0
>Reporter: Dongjoon Hyun
>Assignee: Dongjoon Hyun
>Priority: Major
> Fix For: 3.2.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-40298) shuffle data recovery on the reused PVCs no effect

2022-08-31 Thread todd (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

todd updated SPARK-40298:
-
Description: 
I use spark3.2.2 to test the [ Support shuffle data recovery on the reused PVCs 
(SPARK-35593) ] feature.I found that when shuffle read fails, data is still 
read from source.

It can be confirmed that the pvc has been multiplexed by other pods, and the 
Index and data data information has been sent

*This is my spark configuration information:*

--conf spark.driver.memory=5G 
--conf spark.executor.memory=15G 
--conf spark.executor.cores=1
--conf spark.executor.instances=50
--conf spark.sql.shuffle.partitions=50
--conf spark.dynamicAllocation.enabled=false
--conf spark.kubernetes.driver.reusePersistentVolumeClaim=true
--conf spark.kubernetes.driver.ownPersistentVolumeClaim=true
--conf 
spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.claimName=OnDemand
--conf 
spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.storageClass=gp2
--conf 
spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.sizeLimit=100Gi
--conf 
spark.kubernetes.executor.volumes.persistentVolumeClaim.data.mount.path=/tmp/data
--conf 
spark.kubernetes.executor.volumes.persistentVolumeClaim.data.mount.readOnly=false
--conf spark.executorEnv.SPARK_EXECUTOR_DIRS=/tmp/data
--conf 
spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO
--conf spark.kubernetes.executor.missingPodDetectDelta=10s
--conf spark.kubernetes.executor.apiPollingInterval=10s
--conf spark.shuffle.io.retryWait=60s
--conf spark.shuffle.io.maxRetries=5

 

 

> shuffle data recovery on the reused PVCs  no effect
> ---
>
> Key: SPARK-40298
> URL: https://issues.apache.org/jira/browse/SPARK-40298
> Project: Spark
>  Issue Type: Bug
>  Components: Kubernetes
>Affects Versions: 3.2.2
>Reporter: todd
>Priority: Major
> Attachments: 1662002808396.jpg, 1662002822097.jpg
>
>
> I use spark3.2.2 to test the [ Support shuffle data recovery on the reused 
> PVCs (SPARK-35593) ] feature.I found that when shuffle read fails, data is 
> still read from source.
> It can be confirmed that the pvc has been multiplexed by other pods, and the 
> Index and data data information has been sent
> *This is my spark configuration information:*
> --conf spark.driver.memory=5G 
> --conf spark.executor.memory=15G 
> --conf spark.executor.cores=1
> --conf spark.executor.instances=50
> --conf spark.sql.shuffle.partitions=50
> --conf spark.dynamicAllocation.enabled=false
> --conf spark.kubernetes.driver.reusePersistentVolumeClaim=true
> --conf spark.kubernetes.driver.ownPersistentVolumeClaim=true
> --conf 
> spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.claimName=OnDemand
> --conf 
> spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.storageClass=gp2
> --conf 
> spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.sizeLimit=100Gi
> --conf 
> spark.kubernetes.executor.volumes.persistentVolumeClaim.data.mount.path=/tmp/data
> --conf 
> spark.kubernetes.executor.volumes.persistentVolumeClaim.data.mount.readOnly=false
> --conf spark.executorEnv.SPARK_EXECUTOR_DIRS=/tmp/data
> --conf 
> spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO
> --conf spark.kubernetes.executor.missingPodDetectDelta=10s
> --conf spark.kubernetes.executor.apiPollingInterval=10s
> --conf spark.shuffle.io.retryWait=60s
> --conf spark.shuffle.io.maxRetries=5
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-40298) shuffle data recovery on the reused PVCs no effect

2022-08-31 Thread todd (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

todd updated SPARK-40298:
-
Attachment: 1662002808396.jpg
1662002822097.jpg

> shuffle data recovery on the reused PVCs  no effect
> ---
>
> Key: SPARK-40298
> URL: https://issues.apache.org/jira/browse/SPARK-40298
> Project: Spark
>  Issue Type: Bug
>  Components: Kubernetes
>Affects Versions: 3.2.2
>Reporter: todd
>Priority: Major
> Attachments: 1662002808396.jpg, 1662002822097.jpg
>
>
> I use spark3.2.2 to test the [ Support shuffle data recovery on the reused 
> PVCs (SPARK-35593) ] feature.I found that when shuffle read fails, data is 
> still read from source.
> It can be confirmed that the pvc has been multiplexed by other pods, and the 
> Index and data data information has been sent
> *This is my spark configuration information:*
> --conf spark.driver.memory=5G 
> --conf spark.executor.memory=15G 
> --conf spark.executor.cores=1
> --conf spark.executor.instances=50
> --conf spark.sql.shuffle.partitions=50
> --conf spark.dynamicAllocation.enabled=false
> --conf spark.kubernetes.driver.reusePersistentVolumeClaim=true
> --conf spark.kubernetes.driver.ownPersistentVolumeClaim=true
> --conf 
> spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.claimName=OnDemand
> --conf 
> spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.storageClass=gp2
> --conf 
> spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.sizeLimit=100Gi
> --conf 
> spark.kubernetes.executor.volumes.persistentVolumeClaim.data.mount.path=/tmp/data
> --conf 
> spark.kubernetes.executor.volumes.persistentVolumeClaim.data.mount.readOnly=false
> --conf spark.executorEnv.SPARK_EXECUTOR_DIRS=/tmp/data
> --conf 
> spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO
> --conf spark.kubernetes.executor.missingPodDetectDelta=10s
> --conf spark.kubernetes.executor.apiPollingInterval=10s
> --conf spark.shuffle.io.retryWait=60s
> --conf spark.shuffle.io.maxRetries=5
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-40298) shuffle data recovery on the reused PVCs no effect

2022-08-31 Thread todd (Jira)
todd created SPARK-40298:


 Summary: shuffle data recovery on the reused PVCs  no effect
 Key: SPARK-40298
 URL: https://issues.apache.org/jira/browse/SPARK-40298
 Project: Spark
  Issue Type: Bug
  Components: Kubernetes
Affects Versions: 3.2.2
Reporter: todd






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18105) LZ4 failed to decompress a stream of shuffled data

2021-08-12 Thread Cameron Todd (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-18105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17397898#comment-17397898
 ] 

Cameron Todd commented on SPARK-18105:
--

Oh sorry, I meant just a portion of the code can be replaced or only the import 
of the parquet part because I hashed the data file so the 'pivot_hash' column 
already exists. The real test begins with the following joins and aggregations 
per the .java file I uploaded.

> LZ4 failed to decompress a stream of shuffled data
> --
>
> Key: SPARK-18105
> URL: https://issues.apache.org/jira/browse/SPARK-18105
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.1, 3.1.1
>Reporter: Davies Liu
>Priority: Major
> Attachments: TestWeightedGraph.java
>
>
> When lz4 is used to compress the shuffle files, it may fail to decompress it 
> as "stream is corrupt"
> {code}
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 92 in stage 5.0 failed 4 times, most recent failure: Lost task 92.3 in 
> stage 5.0 (TID 16616, 10.0.27.18): java.io.IOException: Stream is corrupted
>   at 
> org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:220)
>   at 
> org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109)
>   at java.io.BufferedInputStream.read(BufferedInputStream.java:353)
>   at java.io.DataInputStream.read(DataInputStream.java:149)
>   at com.google.common.io.ByteStreams.read(ByteStreams.java:828)
>   at com.google.common.io.ByteStreams.readFully(ByteStreams.java:695)
>   at 
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127)
>   at 
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110)
>   at scala.collection.Iterator$$anon$13.next(Iterator.scala:372)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>   at 
> org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
>   at 
> org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at 
> org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397)
>   at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
>   at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>   at org.apache.spark.scheduler.Task.run(Task.scala:86)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> https://github.com/jpountz/lz4-java/issues/89



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18105) LZ4 failed to decompress a stream of shuffled data

2021-08-11 Thread Cameron Todd (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-18105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17397603#comment-17397603
 ] 

Cameron Todd commented on SPARK-18105:
--

Good to hear. Also the count of 136,935,074 is right. 

When you say "I downloaded and ran the test code in my laptop. It works well to 
me". You mean the java code I attached or do you mean the read file and count() 
that you just attached above. Because the error is only raised after the first 
few aggregations and joins in my code and I also doubt your laptop is big 
enough to process that ;) .

I can attach key config variables on your request but I'm using a cloud 
Kubernetes spark cluster managed by OVH (spark 3.0.1) so it's out of box 
solution managed by OVH not really me.

> LZ4 failed to decompress a stream of shuffled data
> --
>
> Key: SPARK-18105
> URL: https://issues.apache.org/jira/browse/SPARK-18105
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.1, 3.1.1
>Reporter: Davies Liu
>Priority: Major
> Attachments: TestWeightedGraph.java
>
>
> When lz4 is used to compress the shuffle files, it may fail to decompress it 
> as "stream is corrupt"
> {code}
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 92 in stage 5.0 failed 4 times, most recent failure: Lost task 92.3 in 
> stage 5.0 (TID 16616, 10.0.27.18): java.io.IOException: Stream is corrupted
>   at 
> org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:220)
>   at 
> org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109)
>   at java.io.BufferedInputStream.read(BufferedInputStream.java:353)
>   at java.io.DataInputStream.read(DataInputStream.java:149)
>   at com.google.common.io.ByteStreams.read(ByteStreams.java:828)
>   at com.google.common.io.ByteStreams.readFully(ByteStreams.java:695)
>   at 
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127)
>   at 
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110)
>   at scala.collection.Iterator$$anon$13.next(Iterator.scala:372)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>   at 
> org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
>   at 
> org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at 
> org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397)
>   at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
>   at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>   at org.apache.spark.scheduler.Task.run(Task.scala:86)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> https://github.com/jpountz/lz4-java/issues/89



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18105) LZ4 failed to decompress a stream of shuffled data

2021-08-11 Thread Cameron Todd (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-18105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17397301#comment-17397301
 ] 

Cameron Todd commented on SPARK-18105:
--

Ok I added the zip file on this public S3 bucket, it holds a parquet file with 
snappy compression, it's 1.75gb. You can retrieve the data as such
{code:java}
wget 
https://storage.gra.cloud.ovh.net/v1/AUTH_147b880980f148f5ad1af09542e6f37a/public_data/SPARK18105/hashed_data.zip
{code}

> LZ4 failed to decompress a stream of shuffled data
> --
>
> Key: SPARK-18105
> URL: https://issues.apache.org/jira/browse/SPARK-18105
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.1, 3.1.1
>Reporter: Davies Liu
>Priority: Major
> Attachments: TestWeightedGraph.java
>
>
> When lz4 is used to compress the shuffle files, it may fail to decompress it 
> as "stream is corrupt"
> {code}
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 92 in stage 5.0 failed 4 times, most recent failure: Lost task 92.3 in 
> stage 5.0 (TID 16616, 10.0.27.18): java.io.IOException: Stream is corrupted
>   at 
> org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:220)
>   at 
> org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109)
>   at java.io.BufferedInputStream.read(BufferedInputStream.java:353)
>   at java.io.DataInputStream.read(DataInputStream.java:149)
>   at com.google.common.io.ByteStreams.read(ByteStreams.java:828)
>   at com.google.common.io.ByteStreams.readFully(ByteStreams.java:695)
>   at 
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127)
>   at 
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110)
>   at scala.collection.Iterator$$anon$13.next(Iterator.scala:372)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>   at 
> org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
>   at 
> org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at 
> org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397)
>   at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
>   at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>   at org.apache.spark.scheduler.Task.run(Task.scala:86)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> https://github.com/jpountz/lz4-java/issues/89



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-18105) LZ4 failed to decompress a stream of shuffled data

2021-08-09 Thread Cameron Todd (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-18105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17395953#comment-17395953
 ] 

Cameron Todd edited comment on SPARK-18105 at 8/9/21, 10:31 AM:


Yep I understand. I have hashed my data keeping the same distribution and the 
full_name hashed column is weak but string distance functions still work on it. 
Do you have any recommendations where I can upload this data, it's only 2gb?
{code:java}
//So this line of code:
Dataset relevantPivots = spark.read().parquet(pathToDataDedup)
.select("id", "full_name", 
"last_name","birthdate")
.na().drop()
.withColumn("pivot_hash", 
hash(col("last_name"),col("birthdate")))
.drop("last_name","birthdate")
.repartition(5000)
.cache();
// can be replaced with this
Dataset relevantPivots = 
spark.read().parquet(pathToDataDedup).repartition(5000).cache();{code}
I have also run the same code on the same hashed data and getting the same 
corrupted stream error. Also in case it wasn't clear my data normally sits on 
an s3 bucket.


was (Author: cameron.todd):
Yep I understand. I have hashed my data keeping the same distribution and the 
full_name hashed column is weak but string distance functions still work on it. 
Do you have any recommendations where I can upload this data?
{code:java}
//So this line of code:
Dataset relevantPivots = spark.read().parquet(pathToDataDedup)
.select("id", "full_name", 
"last_name","birthdate")
.na().drop()
.withColumn("pivot_hash", 
hash(col("last_name"),col("birthdate")))
.drop("last_name","birthdate")
.repartition(5000)
.cache();
// can be replaced with this
Dataset relevantPivots = 
spark.read().parquet(pathToDataDedup).repartition(5000).cache();{code}
I have also run the same code on the same hashed data and getting the same 
corrupted stream error. Also in case it wasn't clear my data normally sits on 
an s3 bucket.

> LZ4 failed to decompress a stream of shuffled data
> --
>
> Key: SPARK-18105
> URL: https://issues.apache.org/jira/browse/SPARK-18105
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.1, 3.1.1
>Reporter: Davies Liu
>Priority: Major
> Attachments: TestWeightedGraph.java
>
>
> When lz4 is used to compress the shuffle files, it may fail to decompress it 
> as "stream is corrupt"
> {code}
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 92 in stage 5.0 failed 4 times, most recent failure: Lost task 92.3 in 
> stage 5.0 (TID 16616, 10.0.27.18): java.io.IOException: Stream is corrupted
>   at 
> org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:220)
>   at 
> org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109)
>   at java.io.BufferedInputStream.read(BufferedInputStream.java:353)
>   at java.io.DataInputStream.read(DataInputStream.java:149)
>   at com.google.common.io.ByteStreams.read(ByteStreams.java:828)
>   at com.google.common.io.ByteStreams.readFully(ByteStreams.java:695)
>   at 
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127)
>   at 
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110)
>   at scala.collection.Iterator$$anon$13.next(Iterator.scala:372)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>   at 
> org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
>   at 
> org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at 
> org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397)
>   at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply

[jira] [Comment Edited] (SPARK-18105) LZ4 failed to decompress a stream of shuffled data

2021-08-09 Thread Cameron Todd (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-18105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17395953#comment-17395953
 ] 

Cameron Todd edited comment on SPARK-18105 at 8/9/21, 10:29 AM:


Yep I understand. I have hashed my data keeping the same distribution and the 
full_name hashed column is weak but string distance functions still work on it. 
Do you have any recommendations where I can upload this data?
{code:java}
//So this line of code:
Dataset relevantPivots = spark.read().parquet(pathToDataDedup)
.select("id", "full_name", 
"last_name","birthdate")
.na().drop()
.withColumn("pivot_hash", 
hash(col("last_name"),col("birthdate")))
.drop("last_name","birthdate")
.repartition(5000)
.cache();
// can be replaced with this
Dataset relevantPivots = 
spark.read().parquet(pathToDataDedup).repartition(5000).cache();{code}
I have also run the same code on the same hashed data and getting the same 
corrupted stream error. Also in case it wasn't clear my data normally sits on 
an s3 bucket.


was (Author: cameron.todd):
Yep I understand. I have attached my hashed data keeping the same distribution 
and the full_name hashed column is weak but string distance functions still 
work on it. So this line of code:
{code:java}
Dataset relevantPivots = spark.read().parquet(pathToDataDedup)
.select("id", "full_name", 
"last_name","birthdate")
.na().drop()
.withColumn("pivot_hash", 
hash(col("last_name"),col("birthdate")))
.drop("last_name","birthdate")
.repartition(5000)
.cache();
// can be replaced with this
Dataset relevantPivots = 
spark.read().parquet(pathToDataDedup).repartition(5000).cache();{code}
I have also run the same code on the same hashed data and getting the same 
corrupted stream error. Also in case it wasn't clear my data normally sits on 
an s3 bucket.

> LZ4 failed to decompress a stream of shuffled data
> --
>
> Key: SPARK-18105
> URL: https://issues.apache.org/jira/browse/SPARK-18105
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.1, 3.1.1
>Reporter: Davies Liu
>Priority: Major
> Attachments: TestWeightedGraph.java
>
>
> When lz4 is used to compress the shuffle files, it may fail to decompress it 
> as "stream is corrupt"
> {code}
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 92 in stage 5.0 failed 4 times, most recent failure: Lost task 92.3 in 
> stage 5.0 (TID 16616, 10.0.27.18): java.io.IOException: Stream is corrupted
>   at 
> org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:220)
>   at 
> org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109)
>   at java.io.BufferedInputStream.read(BufferedInputStream.java:353)
>   at java.io.DataInputStream.read(DataInputStream.java:149)
>   at com.google.common.io.ByteStreams.read(ByteStreams.java:828)
>   at com.google.common.io.ByteStreams.readFully(ByteStreams.java:695)
>   at 
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127)
>   at 
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110)
>   at scala.collection.Iterator$$anon$13.next(Iterator.scala:372)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>   at 
> org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
>   at 
> org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at 
> org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397)
>   at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
>   a

[jira] [Issue Comment Deleted] (SPARK-18105) LZ4 failed to decompress a stream of shuffled data

2021-08-09 Thread Cameron Todd (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-18105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cameron Todd updated SPARK-18105:
-
Comment: was deleted

(was: [^hashed_data.zip])

> LZ4 failed to decompress a stream of shuffled data
> --
>
> Key: SPARK-18105
> URL: https://issues.apache.org/jira/browse/SPARK-18105
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.1, 3.1.1
>Reporter: Davies Liu
>Priority: Major
> Attachments: TestWeightedGraph.java
>
>
> When lz4 is used to compress the shuffle files, it may fail to decompress it 
> as "stream is corrupt"
> {code}
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 92 in stage 5.0 failed 4 times, most recent failure: Lost task 92.3 in 
> stage 5.0 (TID 16616, 10.0.27.18): java.io.IOException: Stream is corrupted
>   at 
> org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:220)
>   at 
> org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109)
>   at java.io.BufferedInputStream.read(BufferedInputStream.java:353)
>   at java.io.DataInputStream.read(DataInputStream.java:149)
>   at com.google.common.io.ByteStreams.read(ByteStreams.java:828)
>   at com.google.common.io.ByteStreams.readFully(ByteStreams.java:695)
>   at 
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127)
>   at 
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110)
>   at scala.collection.Iterator$$anon$13.next(Iterator.scala:372)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>   at 
> org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
>   at 
> org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at 
> org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397)
>   at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
>   at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>   at org.apache.spark.scheduler.Task.run(Task.scala:86)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> https://github.com/jpountz/lz4-java/issues/89



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18105) LZ4 failed to decompress a stream of shuffled data

2021-08-09 Thread Cameron Todd (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-18105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17395954#comment-17395954
 ] 

Cameron Todd commented on SPARK-18105:
--

[^hashed_data.zip]

> LZ4 failed to decompress a stream of shuffled data
> --
>
> Key: SPARK-18105
> URL: https://issues.apache.org/jira/browse/SPARK-18105
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.1, 3.1.1
>Reporter: Davies Liu
>Priority: Major
> Attachments: TestWeightedGraph.java
>
>
> When lz4 is used to compress the shuffle files, it may fail to decompress it 
> as "stream is corrupt"
> {code}
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 92 in stage 5.0 failed 4 times, most recent failure: Lost task 92.3 in 
> stage 5.0 (TID 16616, 10.0.27.18): java.io.IOException: Stream is corrupted
>   at 
> org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:220)
>   at 
> org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109)
>   at java.io.BufferedInputStream.read(BufferedInputStream.java:353)
>   at java.io.DataInputStream.read(DataInputStream.java:149)
>   at com.google.common.io.ByteStreams.read(ByteStreams.java:828)
>   at com.google.common.io.ByteStreams.readFully(ByteStreams.java:695)
>   at 
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127)
>   at 
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110)
>   at scala.collection.Iterator$$anon$13.next(Iterator.scala:372)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>   at 
> org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
>   at 
> org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at 
> org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397)
>   at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
>   at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>   at org.apache.spark.scheduler.Task.run(Task.scala:86)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> https://github.com/jpountz/lz4-java/issues/89



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18105) LZ4 failed to decompress a stream of shuffled data

2021-08-09 Thread Cameron Todd (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-18105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17395953#comment-17395953
 ] 

Cameron Todd commented on SPARK-18105:
--

Yep I understand. I have attached my hashed data keeping the same distribution 
and the full_name hashed column is weak but string distance functions still 
work on it. So this line of code:
{code:java}
Dataset relevantPivots = spark.read().parquet(pathToDataDedup)
.select("id", "full_name", 
"last_name","birthdate")
.na().drop()
.withColumn("pivot_hash", 
hash(col("last_name"),col("birthdate")))
.drop("last_name","birthdate")
.repartition(5000)
.cache();
// can be replaced with this
Dataset relevantPivots = 
spark.read().parquet(pathToDataDedup).repartition(5000).cache();{code}
I have also run the same code on the same hashed data and getting the same 
corrupted stream error. Also in case it wasn't clear my data normally sits on 
an s3 bucket.

> LZ4 failed to decompress a stream of shuffled data
> --
>
> Key: SPARK-18105
> URL: https://issues.apache.org/jira/browse/SPARK-18105
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.1, 3.1.1
>Reporter: Davies Liu
>Priority: Major
> Attachments: TestWeightedGraph.java
>
>
> When lz4 is used to compress the shuffle files, it may fail to decompress it 
> as "stream is corrupt"
> {code}
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 92 in stage 5.0 failed 4 times, most recent failure: Lost task 92.3 in 
> stage 5.0 (TID 16616, 10.0.27.18): java.io.IOException: Stream is corrupted
>   at 
> org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:220)
>   at 
> org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109)
>   at java.io.BufferedInputStream.read(BufferedInputStream.java:353)
>   at java.io.DataInputStream.read(DataInputStream.java:149)
>   at com.google.common.io.ByteStreams.read(ByteStreams.java:828)
>   at com.google.common.io.ByteStreams.readFully(ByteStreams.java:695)
>   at 
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127)
>   at 
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110)
>   at scala.collection.Iterator$$anon$13.next(Iterator.scala:372)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>   at 
> org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
>   at 
> org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at 
> org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397)
>   at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
>   at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>   at org.apache.spark.scheduler.Task.run(Task.scala:86)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> https://github.com/jpountz/lz4-java/issues/89



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18105) LZ4 failed to decompress a stream of shuffled data

2021-08-04 Thread Cameron Todd (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-18105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17392971#comment-17392971
 ] 

Cameron Todd commented on SPARK-18105:
--

Let me know if that's enough info. From my tests if I remove the  .repartition 
after import of file, I get no corrupted stream error but I get skewed joins 
and memory problems later. Then if I add the repartition back, the job does not 
complete because multiple tasks crash and are retried but still crash with the 
stream corrupted error before completely crashing the job

> LZ4 failed to decompress a stream of shuffled data
> --
>
> Key: SPARK-18105
> URL: https://issues.apache.org/jira/browse/SPARK-18105
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.1, 3.1.1
>Reporter: Davies Liu
>Priority: Major
> Attachments: TestWeightedGraph.java
>
>
> When lz4 is used to compress the shuffle files, it may fail to decompress it 
> as "stream is corrupt"
> {code}
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 92 in stage 5.0 failed 4 times, most recent failure: Lost task 92.3 in 
> stage 5.0 (TID 16616, 10.0.27.18): java.io.IOException: Stream is corrupted
>   at 
> org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:220)
>   at 
> org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109)
>   at java.io.BufferedInputStream.read(BufferedInputStream.java:353)
>   at java.io.DataInputStream.read(DataInputStream.java:149)
>   at com.google.common.io.ByteStreams.read(ByteStreams.java:828)
>   at com.google.common.io.ByteStreams.readFully(ByteStreams.java:695)
>   at 
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127)
>   at 
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110)
>   at scala.collection.Iterator$$anon$13.next(Iterator.scala:372)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>   at 
> org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
>   at 
> org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at 
> org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397)
>   at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
>   at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>   at org.apache.spark.scheduler.Task.run(Task.scala:86)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> https://github.com/jpountz/lz4-java/issues/89



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18105) LZ4 failed to decompress a stream of shuffled data

2021-08-04 Thread Cameron Todd (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-18105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17392968#comment-17392968
 ] 

Cameron Todd commented on SPARK-18105:
--

I'll attach a portion of the code that is not proprietary but it's mostly all 
there. Anyway the code is one small step in our entity resolution process that 
processes 1billion person records so lots of pairwise comparisons and skewed 
joins before aggregate and output staging results.[^TestWeightedGraph.java]

> LZ4 failed to decompress a stream of shuffled data
> --
>
> Key: SPARK-18105
> URL: https://issues.apache.org/jira/browse/SPARK-18105
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.1, 3.1.1
>Reporter: Davies Liu
>Priority: Major
> Attachments: TestWeightedGraph.java
>
>
> When lz4 is used to compress the shuffle files, it may fail to decompress it 
> as "stream is corrupt"
> {code}
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 92 in stage 5.0 failed 4 times, most recent failure: Lost task 92.3 in 
> stage 5.0 (TID 16616, 10.0.27.18): java.io.IOException: Stream is corrupted
>   at 
> org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:220)
>   at 
> org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109)
>   at java.io.BufferedInputStream.read(BufferedInputStream.java:353)
>   at java.io.DataInputStream.read(DataInputStream.java:149)
>   at com.google.common.io.ByteStreams.read(ByteStreams.java:828)
>   at com.google.common.io.ByteStreams.readFully(ByteStreams.java:695)
>   at 
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127)
>   at 
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110)
>   at scala.collection.Iterator$$anon$13.next(Iterator.scala:372)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>   at 
> org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
>   at 
> org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at 
> org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397)
>   at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
>   at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>   at org.apache.spark.scheduler.Task.run(Task.scala:86)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> https://github.com/jpountz/lz4-java/issues/89



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18105) LZ4 failed to decompress a stream of shuffled data

2021-08-04 Thread Cameron Todd (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-18105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cameron Todd updated SPARK-18105:
-
Attachment: TestWeightedGraph.java

> LZ4 failed to decompress a stream of shuffled data
> --
>
> Key: SPARK-18105
> URL: https://issues.apache.org/jira/browse/SPARK-18105
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.1, 3.1.1
>Reporter: Davies Liu
>Priority: Major
> Attachments: TestWeightedGraph.java
>
>
> When lz4 is used to compress the shuffle files, it may fail to decompress it 
> as "stream is corrupt"
> {code}
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 92 in stage 5.0 failed 4 times, most recent failure: Lost task 92.3 in 
> stage 5.0 (TID 16616, 10.0.27.18): java.io.IOException: Stream is corrupted
>   at 
> org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:220)
>   at 
> org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109)
>   at java.io.BufferedInputStream.read(BufferedInputStream.java:353)
>   at java.io.DataInputStream.read(DataInputStream.java:149)
>   at com.google.common.io.ByteStreams.read(ByteStreams.java:828)
>   at com.google.common.io.ByteStreams.readFully(ByteStreams.java:695)
>   at 
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127)
>   at 
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110)
>   at scala.collection.Iterator$$anon$13.next(Iterator.scala:372)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>   at 
> org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
>   at 
> org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at 
> org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397)
>   at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
>   at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>   at org.apache.spark.scheduler.Task.run(Task.scala:86)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> https://github.com/jpountz/lz4-java/issues/89



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-18105) LZ4 failed to decompress a stream of shuffled data

2021-08-03 Thread Cameron Todd (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-18105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17392205#comment-17392205
 ] 

Cameron Todd edited comment on SPARK-18105 at 8/3/21, 9:44 AM:
---

I'm also facing this same error when scaling up my project to a larger dataset. 
It's consistently crashing in the same spot (I can reproduce it) and I'm pretty 
sure it's coming down to using a repartition() after import as below:
{code:java}
Dataset relevantPivots = spark.read().parquet(jobConf.pathToDataDedup)
.select(VadisSparkUtils.scalaSeqConverterColumn(allColsNeeded))
.na().drop()
.withColumn("pivot_hash", 
hash(VadisSparkUtils.scalaSeqConverterColumn(pivot)))
.drop(VadisSparkUtils.scalaSeqConverterString(pivot))
.repartition(5000)
.cache();
{code}
I have already tried to disable  "spark.unsafe.sorter.spill.read.ahead.enabled" 
but did not work. My cluster environment is spark 3.0.1 standalone with 
Kubernetes and using lz4-java-1.7.1.jar

Not entirely sure how to debug this further and give you guys anymore info 
because I can't get the logs on each slave node

 


was (Author: cameron.todd):
I'm also facing this same error when scaling up my project to a larger dataset. 
It's consistently crashing in the same spot and I'm pretty sure it's coming 
down to using a repartition() after import as below:
{code:java}
Dataset relevantPivots = spark.read().parquet(jobConf.pathToDataDedup)
.select(VadisSparkUtils.scalaSeqConverterColumn(allColsNeeded))
.na().drop()
.withColumn("pivot_hash", 
hash(VadisSparkUtils.scalaSeqConverterColumn(pivot)))
.drop(VadisSparkUtils.scalaSeqConverterString(pivot))
.repartition(5000)
.cache();
{code}
I have already tried to disable  "spark.unsafe.sorter.spill.read.ahead.enabled" 
but did not work. My cluster environment is spark 3.0.1 standalone with 
Kubernetes and using lz4-java-1.7.1.jar

Not entirely sure how to debug this further and give you guys anymore info 
because I can't get the logs on each slave node

 

> LZ4 failed to decompress a stream of shuffled data
> --
>
> Key: SPARK-18105
> URL: https://issues.apache.org/jira/browse/SPARK-18105
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.1, 3.1.1
>Reporter: Davies Liu
>Priority: Major
>
> When lz4 is used to compress the shuffle files, it may fail to decompress it 
> as "stream is corrupt"
> {code}
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 92 in stage 5.0 failed 4 times, most recent failure: Lost task 92.3 in 
> stage 5.0 (TID 16616, 10.0.27.18): java.io.IOException: Stream is corrupted
>   at 
> org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:220)
>   at 
> org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109)
>   at java.io.BufferedInputStream.read(BufferedInputStream.java:353)
>   at java.io.DataInputStream.read(DataInputStream.java:149)
>   at com.google.common.io.ByteStreams.read(ByteStreams.java:828)
>   at com.google.common.io.ByteStreams.readFully(ByteStreams.java:695)
>   at 
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127)
>   at 
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110)
>   at scala.collection.Iterator$$anon$13.next(Iterator.scala:372)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>   at 
> org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
>   at 
> org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at 
> org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397)
>   at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
>   at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationC

[jira] [Commented] (SPARK-18105) LZ4 failed to decompress a stream of shuffled data

2021-08-03 Thread Cameron Todd (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-18105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17392205#comment-17392205
 ] 

Cameron Todd commented on SPARK-18105:
--

I'm also facing this same error when scaling up my project to a larger dataset. 
It's consistently crashing in the same spot and I'm pretty sure it's coming 
down to using a repartition() after import as below:
{code:java}
Dataset relevantPivots = spark.read().parquet(jobConf.pathToDataDedup)
.select(VadisSparkUtils.scalaSeqConverterColumn(allColsNeeded))
.na().drop()
.withColumn("pivot_hash", 
hash(VadisSparkUtils.scalaSeqConverterColumn(pivot)))
.drop(VadisSparkUtils.scalaSeqConverterString(pivot))
.repartition(5000)
.cache();
{code}
I have already tried to disable  "spark.unsafe.sorter.spill.read.ahead.enabled" 
but did not work. My cluster environment is spark 3.0.1 standalone with 
Kubernetes and using lz4-java-1.7.1.jar

Not entirely sure how to debug this further and give you guys anymore info 
because I can't get the logs on each slave node

 

> LZ4 failed to decompress a stream of shuffled data
> --
>
> Key: SPARK-18105
> URL: https://issues.apache.org/jira/browse/SPARK-18105
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.1, 3.1.1
>Reporter: Davies Liu
>Priority: Major
>
> When lz4 is used to compress the shuffle files, it may fail to decompress it 
> as "stream is corrupt"
> {code}
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 92 in stage 5.0 failed 4 times, most recent failure: Lost task 92.3 in 
> stage 5.0 (TID 16616, 10.0.27.18): java.io.IOException: Stream is corrupted
>   at 
> org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:220)
>   at 
> org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109)
>   at java.io.BufferedInputStream.read(BufferedInputStream.java:353)
>   at java.io.DataInputStream.read(DataInputStream.java:149)
>   at com.google.common.io.ByteStreams.read(ByteStreams.java:828)
>   at com.google.common.io.ByteStreams.readFully(ByteStreams.java:695)
>   at 
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127)
>   at 
> org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110)
>   at scala.collection.Iterator$$anon$13.next(Iterator.scala:372)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>   at 
> org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
>   at 
> org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at 
> org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397)
>   at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
>   at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>   at org.apache.spark.scheduler.Task.run(Task.scala:86)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> https://github.com/jpountz/lz4-java/issues/89



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org