[jira] [Commented] (SPARK-43170) The spark sql like statement is pushed down to parquet for execution, but the data cannot be queried
[ https://issues.apache.org/jira/browse/SPARK-43170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17713855#comment-17713855 ] todd commented on SPARK-43170: -- [~yumwang] The code only executes spark.sql("xxx"), but does not perform cache-related operations. But the same code, why spark3.0 and spark3.2 have different results.If it is convenient for you, you can reproduce it. > The spark sql like statement is pushed down to parquet for execution, but the > data cannot be queried > > > Key: SPARK-43170 > URL: https://issues.apache.org/jira/browse/SPARK-43170 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.2.2 >Reporter: todd >Priority: Major > Attachments: image-2023-04-18-10-59-30-199.png > > > --DDL > CREATE TABLE `ecom_dwm`.`dwm_user_app_action_sum_all` ( > `gaid` STRING COMMENT '', > `beyla_id` STRING COMMENT '', > `dt` STRING, > `hour` STRING, > `appid` STRING COMMENT '包名') > USING parquet > PARTITIONED BY (dt, hour, appid) > LOCATION 's3://x/dwm_user_app_action_sum_all' > – partitions info > show partitions ecom_dwm.dwm_user_app_action_sum_all PARTITION > (dt='20230412'); > > dt=20230412/hour=23/appid=blibli.mobile.commerce > dt=20230412/hour=23/appid=cn.shopee.app > dt=20230412/hour=23/appid=cn.shopee.br > dt=20230412/hour=23/appid=cn.shopee.id > dt=20230412/hour=23/appid=cn.shopee.my > dt=20230412/hour=23/appid=cn.shopee.ph > > — query > select DISTINCT(appid) from ecom_dwm.dwm_user_app_action_sum_all > where dt='20230412' and appid like '%shopee%' > > --result > nodata > > — other > I use spark3.0.1 version and trino query engine to query the data。 > > > The physical execution node formed by spark 3.2 > (3) Scan parquet ecom_dwm.dwm_user_app_action_sum_all Output [3]: [dt#63, > hour#64, appid#65|#63, hour#64, appid#65] Batched: true Location: > InMemoryFileIndex [] > PartitionFilters: [isnotnull(dt#63), isnotnull(appid#65), (dt#63 = 20230412), > Contains(appid#65, shopee)|#63), isnotnull(appid#65), (dt#63 = 20230412), > Contains(appid#65, shopee)] ReadSchema: struct<> > > > !image-2023-04-18-10-59-30-199.png! > > – sql plan detail > {code:java} > == Physical Plan == > CollectLimit (9) > +- InMemoryTableScan (1) > +- InMemoryRelation (2) > +- * HashAggregate (8) >+- Exchange (7) > +- * HashAggregate (6) > +- * Project (5) > +- * ColumnarToRow (4) >+- Scan parquet > ecom_dwm.dwm_user_app_action_sum_all (3) > (1) InMemoryTableScan > Output [1]: [appid#65] > Arguments: [appid#65] > (2) InMemoryRelation > Arguments: [appid#65], > CachedRDDBuilder(org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer@ab5af13,StorageLevel(disk, > memory, deserialized, 1 replicas),*(2) HashAggregate(keys=[appid#65], > functions=[], output=[appid#65]) > +- Exchange hashpartitioning(appid#65, 200), ENSURE_REQUIREMENTS, [plan_id=24] >+- *(1) HashAggregate(keys=[appid#65], functions=[], output=[appid#65]) > +- *(1) Project [appid#65] > +- *(1) ColumnarToRow > +- FileScan parquet > ecom_dwm.dwm_user_app_action_sum_all[dt#63,hour#64,appid#65] Batched: true, > DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(0 paths)[], > PartitionFilters: [isnotnull(dt#63), isnotnull(appid#65), (dt#63 = 20230412), > StartsWith(appid#65, com)], PushedFilters: [], ReadSchema: struct<> > ,None) > (3) Scan parquet ecom_dwm.dwm_user_app_action_sum_all > Output [3]: [dt#63, hour#64, appid#65] > Batched: true > Location: InMemoryFileIndex [] > PartitionFilters: [isnotnull(dt#63), isnotnull(appid#65), (dt#63 = 20230412), > StartsWith(appid#65, com)] > ReadSchema: struct<> > (4) ColumnarToRow [codegen id : 1] > Input [3]: [dt#63, hour#64, appid#65] > (5) Project [codegen id : 1] > Output [1]: [appid#65] > Input [3]: [dt#63, hour#64, appid#65] > (6) HashAggregate [codegen id : 1] > Input [1]: [appid#65] > Keys [1]: [appid#65] > Functions: [] > Aggregate Attributes: [] > Results [1]: [appid#65] > (7) Exchange > Input [1]: [appid#65] > Arguments: hashpartitioning(appid#65, 200), ENSURE_REQUIREMENTS, [plan_id=24] > (8) HashAggregate [codegen id : 2] > Input [1]: [appid#65] > Keys [1]: [appid#65] > Functions: [] > Aggregate Attributes: [] > Results [1]: [appid#65] > (9) CollectLimit > Input [1]: [appid#65] > Arguments: 1 {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-43170) The spark sql like statement is pushed down to parquet for execution, but the data cannot be queried
[ https://issues.apache.org/jira/browse/SPARK-43170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17713543#comment-17713543 ] todd commented on SPARK-43170: -- [~yumwang] no cache > The spark sql like statement is pushed down to parquet for execution, but the > data cannot be queried > > > Key: SPARK-43170 > URL: https://issues.apache.org/jira/browse/SPARK-43170 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.2.2 >Reporter: todd >Priority: Major > Attachments: image-2023-04-18-10-59-30-199.png > > > --DDL > CREATE TABLE `ecom_dwm`.`dwm_user_app_action_sum_all` ( > `gaid` STRING COMMENT '', > `beyla_id` STRING COMMENT '', > `dt` STRING, > `hour` STRING, > `appid` STRING COMMENT '包名') > USING parquet > PARTITIONED BY (dt, hour, appid) > LOCATION 's3://x/dwm_user_app_action_sum_all' > – partitions info > show partitions ecom_dwm.dwm_user_app_action_sum_all PARTITION > (dt='20230412'); > > dt=20230412/hour=23/appid=blibli.mobile.commerce > dt=20230412/hour=23/appid=cn.shopee.app > dt=20230412/hour=23/appid=cn.shopee.br > dt=20230412/hour=23/appid=cn.shopee.id > dt=20230412/hour=23/appid=cn.shopee.my > dt=20230412/hour=23/appid=cn.shopee.ph > > — query > select DISTINCT(appid) from ecom_dwm.dwm_user_app_action_sum_all > where dt='20230412' and appid like '%shopee%' > > --result > nodata > > — other > I use spark3.0.1 version and trino query engine to query the data。 > > > The physical execution node formed by spark 3.2 > (3) Scan parquet ecom_dwm.dwm_user_app_action_sum_all Output [3]: [dt#63, > hour#64, appid#65|#63, hour#64, appid#65] Batched: true Location: > InMemoryFileIndex [] > PartitionFilters: [isnotnull(dt#63), isnotnull(appid#65), (dt#63 = 20230412), > Contains(appid#65, shopee)|#63), isnotnull(appid#65), (dt#63 = 20230412), > Contains(appid#65, shopee)] ReadSchema: struct<> > > > !image-2023-04-18-10-59-30-199.png! > > – sql plan detail > {code:java} > == Physical Plan == > CollectLimit (9) > +- InMemoryTableScan (1) > +- InMemoryRelation (2) > +- * HashAggregate (8) >+- Exchange (7) > +- * HashAggregate (6) > +- * Project (5) > +- * ColumnarToRow (4) >+- Scan parquet > ecom_dwm.dwm_user_app_action_sum_all (3) > (1) InMemoryTableScan > Output [1]: [appid#65] > Arguments: [appid#65] > (2) InMemoryRelation > Arguments: [appid#65], > CachedRDDBuilder(org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer@ab5af13,StorageLevel(disk, > memory, deserialized, 1 replicas),*(2) HashAggregate(keys=[appid#65], > functions=[], output=[appid#65]) > +- Exchange hashpartitioning(appid#65, 200), ENSURE_REQUIREMENTS, [plan_id=24] >+- *(1) HashAggregate(keys=[appid#65], functions=[], output=[appid#65]) > +- *(1) Project [appid#65] > +- *(1) ColumnarToRow > +- FileScan parquet > ecom_dwm.dwm_user_app_action_sum_all[dt#63,hour#64,appid#65] Batched: true, > DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(0 paths)[], > PartitionFilters: [isnotnull(dt#63), isnotnull(appid#65), (dt#63 = 20230412), > StartsWith(appid#65, com)], PushedFilters: [], ReadSchema: struct<> > ,None) > (3) Scan parquet ecom_dwm.dwm_user_app_action_sum_all > Output [3]: [dt#63, hour#64, appid#65] > Batched: true > Location: InMemoryFileIndex [] > PartitionFilters: [isnotnull(dt#63), isnotnull(appid#65), (dt#63 = 20230412), > StartsWith(appid#65, com)] > ReadSchema: struct<> > (4) ColumnarToRow [codegen id : 1] > Input [3]: [dt#63, hour#64, appid#65] > (5) Project [codegen id : 1] > Output [1]: [appid#65] > Input [3]: [dt#63, hour#64, appid#65] > (6) HashAggregate [codegen id : 1] > Input [1]: [appid#65] > Keys [1]: [appid#65] > Functions: [] > Aggregate Attributes: [] > Results [1]: [appid#65] > (7) Exchange > Input [1]: [appid#65] > Arguments: hashpartitioning(appid#65, 200), ENSURE_REQUIREMENTS, [plan_id=24] > (8) HashAggregate [codegen id : 2] > Input [1]: [appid#65] > Keys [1]: [appid#65] > Functions: [] > Aggregate Attributes: [] > Results [1]: [appid#65] > (9) CollectLimit > Input [1]: [appid#65] > Arguments: 1 {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-43170) The spark sql like statement is pushed down to parquet for execution, but the data cannot be queried
[ https://issues.apache.org/jira/browse/SPARK-43170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17713403#comment-17713403 ] todd commented on SPARK-43170: -- Spark3.2.x is currently used in production, and there is no plan to upgrade to a higher version for the time being. If it's a bug, isn't the spark3.2 version going to be fixed? > The spark sql like statement is pushed down to parquet for execution, but the > data cannot be queried > > > Key: SPARK-43170 > URL: https://issues.apache.org/jira/browse/SPARK-43170 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.2.2 >Reporter: todd >Priority: Major > Attachments: image-2023-04-18-10-59-30-199.png > > > --DDL > CREATE TABLE `ecom_dwm`.`dwm_user_app_action_sum_all` ( > `gaid` STRING COMMENT '', > `beyla_id` STRING COMMENT '', > `dt` STRING, > `hour` STRING, > `appid` STRING COMMENT '包名') > USING parquet > PARTITIONED BY (dt, hour, appid) > LOCATION 's3://x/dwm_user_app_action_sum_all' > – partitions info > show partitions ecom_dwm.dwm_user_app_action_sum_all PARTITION > (dt='20230412'); > > dt=20230412/hour=23/appid=blibli.mobile.commerce > dt=20230412/hour=23/appid=cn.shopee.app > dt=20230412/hour=23/appid=cn.shopee.br > dt=20230412/hour=23/appid=cn.shopee.id > dt=20230412/hour=23/appid=cn.shopee.my > dt=20230412/hour=23/appid=cn.shopee.ph > > — query > select DISTINCT(appid) from ecom_dwm.dwm_user_app_action_sum_all > where dt='20230412' and appid like '%shopee%' > > --result > nodata > > — other > I use spark3.0.1 version and trino query engine to query the data。 > > > The physical execution node formed by spark 3.2 > (3) Scan parquet ecom_dwm.dwm_user_app_action_sum_all Output [3]: [dt#63, > hour#64, appid#65|#63, hour#64, appid#65] Batched: true Location: > InMemoryFileIndex [] > PartitionFilters: [isnotnull(dt#63), isnotnull(appid#65), (dt#63 = 20230412), > Contains(appid#65, shopee)|#63), isnotnull(appid#65), (dt#63 = 20230412), > Contains(appid#65, shopee)] ReadSchema: struct<> > > > !image-2023-04-18-10-59-30-199.png! > > – sql plan detail > {code:java} > == Physical Plan == > CollectLimit (9) > +- InMemoryTableScan (1) > +- InMemoryRelation (2) > +- * HashAggregate (8) >+- Exchange (7) > +- * HashAggregate (6) > +- * Project (5) > +- * ColumnarToRow (4) >+- Scan parquet > ecom_dwm.dwm_user_app_action_sum_all (3) > (1) InMemoryTableScan > Output [1]: [appid#65] > Arguments: [appid#65] > (2) InMemoryRelation > Arguments: [appid#65], > CachedRDDBuilder(org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer@ab5af13,StorageLevel(disk, > memory, deserialized, 1 replicas),*(2) HashAggregate(keys=[appid#65], > functions=[], output=[appid#65]) > +- Exchange hashpartitioning(appid#65, 200), ENSURE_REQUIREMENTS, [plan_id=24] >+- *(1) HashAggregate(keys=[appid#65], functions=[], output=[appid#65]) > +- *(1) Project [appid#65] > +- *(1) ColumnarToRow > +- FileScan parquet > ecom_dwm.dwm_user_app_action_sum_all[dt#63,hour#64,appid#65] Batched: true, > DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(0 paths)[], > PartitionFilters: [isnotnull(dt#63), isnotnull(appid#65), (dt#63 = 20230412), > StartsWith(appid#65, com)], PushedFilters: [], ReadSchema: struct<> > ,None) > (3) Scan parquet ecom_dwm.dwm_user_app_action_sum_all > Output [3]: [dt#63, hour#64, appid#65] > Batched: true > Location: InMemoryFileIndex [] > PartitionFilters: [isnotnull(dt#63), isnotnull(appid#65), (dt#63 = 20230412), > StartsWith(appid#65, com)] > ReadSchema: struct<> > (4) ColumnarToRow [codegen id : 1] > Input [3]: [dt#63, hour#64, appid#65] > (5) Project [codegen id : 1] > Output [1]: [appid#65] > Input [3]: [dt#63, hour#64, appid#65] > (6) HashAggregate [codegen id : 1] > Input [1]: [appid#65] > Keys [1]: [appid#65] > Functions: [] > Aggregate Attributes: [] > Results [1]: [appid#65] > (7) Exchange > Input [1]: [appid#65] > Arguments: hashpartitioning(appid#65, 200), ENSURE_REQUIREMENTS, [plan_id=24] > (8) HashAggregate [codegen id : 2] > Input [1]: [appid#65] > Keys [1]: [appid#65] > Functions: [] > Aggregate Attributes: [] > Results [1]: [appid#65] > (9) CollectLimit > Input [1]: [appid#65] > Arguments: 1 {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-43170) The spark sql like statement is pushed down to parquet for execution, but the data cannot be queried
[ https://issues.apache.org/jira/browse/SPARK-43170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] todd updated SPARK-43170: - Description: --DDL CREATE TABLE `ecom_dwm`.`dwm_user_app_action_sum_all` ( `gaid` STRING COMMENT '', `beyla_id` STRING COMMENT '', `dt` STRING, `hour` STRING, `appid` STRING COMMENT '包名') USING parquet PARTITIONED BY (dt, hour, appid) LOCATION 's3://x/dwm_user_app_action_sum_all' – partitions info show partitions ecom_dwm.dwm_user_app_action_sum_all PARTITION (dt='20230412'); dt=20230412/hour=23/appid=blibli.mobile.commerce dt=20230412/hour=23/appid=cn.shopee.app dt=20230412/hour=23/appid=cn.shopee.br dt=20230412/hour=23/appid=cn.shopee.id dt=20230412/hour=23/appid=cn.shopee.my dt=20230412/hour=23/appid=cn.shopee.ph — query select DISTINCT(appid) from ecom_dwm.dwm_user_app_action_sum_all where dt='20230412' and appid like '%shopee%' --result nodata — other I use spark3.0.1 version and trino query engine to query the data。 The physical execution node formed by spark 3.2 (3) Scan parquet ecom_dwm.dwm_user_app_action_sum_all Output [3]: [dt#63, hour#64, appid#65|#63, hour#64, appid#65] Batched: true Location: InMemoryFileIndex [] PartitionFilters: [isnotnull(dt#63), isnotnull(appid#65), (dt#63 = 20230412), Contains(appid#65, shopee)|#63), isnotnull(appid#65), (dt#63 = 20230412), Contains(appid#65, shopee)] ReadSchema: struct<> !image-2023-04-18-10-59-30-199.png! – sql plan detail {code:java} == Physical Plan == CollectLimit (9) +- InMemoryTableScan (1) +- InMemoryRelation (2) +- * HashAggregate (8) +- Exchange (7) +- * HashAggregate (6) +- * Project (5) +- * ColumnarToRow (4) +- Scan parquet ecom_dwm.dwm_user_app_action_sum_all (3) (1) InMemoryTableScan Output [1]: [appid#65] Arguments: [appid#65] (2) InMemoryRelation Arguments: [appid#65], CachedRDDBuilder(org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer@ab5af13,StorageLevel(disk, memory, deserialized, 1 replicas),*(2) HashAggregate(keys=[appid#65], functions=[], output=[appid#65]) +- Exchange hashpartitioning(appid#65, 200), ENSURE_REQUIREMENTS, [plan_id=24] +- *(1) HashAggregate(keys=[appid#65], functions=[], output=[appid#65]) +- *(1) Project [appid#65] +- *(1) ColumnarToRow +- FileScan parquet ecom_dwm.dwm_user_app_action_sum_all[dt#63,hour#64,appid#65] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(0 paths)[], PartitionFilters: [isnotnull(dt#63), isnotnull(appid#65), (dt#63 = 20230412), StartsWith(appid#65, com)], PushedFilters: [], ReadSchema: struct<> ,None) (3) Scan parquet ecom_dwm.dwm_user_app_action_sum_all Output [3]: [dt#63, hour#64, appid#65] Batched: true Location: InMemoryFileIndex [] PartitionFilters: [isnotnull(dt#63), isnotnull(appid#65), (dt#63 = 20230412), StartsWith(appid#65, com)] ReadSchema: struct<> (4) ColumnarToRow [codegen id : 1] Input [3]: [dt#63, hour#64, appid#65] (5) Project [codegen id : 1] Output [1]: [appid#65] Input [3]: [dt#63, hour#64, appid#65] (6) HashAggregate [codegen id : 1] Input [1]: [appid#65] Keys [1]: [appid#65] Functions: [] Aggregate Attributes: [] Results [1]: [appid#65] (7) Exchange Input [1]: [appid#65] Arguments: hashpartitioning(appid#65, 200), ENSURE_REQUIREMENTS, [plan_id=24] (8) HashAggregate [codegen id : 2] Input [1]: [appid#65] Keys [1]: [appid#65] Functions: [] Aggregate Attributes: [] Results [1]: [appid#65] (9) CollectLimit Input [1]: [appid#65] Arguments: 1 {code} was: --DDL CREATE TABLE `ecom_dwm`.`dwm_user_app_action_sum_all` ( `gaid` STRING COMMENT '', `beyla_id` STRING COMMENT '', `dt` STRING, `hour` STRING, `appid` STRING COMMENT '包名') USING parquet PARTITIONED BY (dt, hour, appid) LOCATION 's3://x/dwm_user_app_action_sum_all' -- partitions info show partitions ecom_dwm.dwm_user_app_action_sum_all PARTITION (dt='20230412'); dt=20230412/hour=23/appid=blibli.mobile.commerce dt=20230412/hour=23/appid=cn.shopee.app dt=20230412/hour=23/appid=cn.shopee.br dt=20230412/hour=23/appid=cn.shopee.id dt=20230412/hour=23/appid=cn.shopee.my dt=20230412/hour=23/appid=cn.shopee.ph --- query select DISTINCT(appid) from ecom_dwm.dwm_user_app_action_sum_all where dt='20230412' and appid like '%shopee%' --result nodata --- other I use spark3.0.1 version and trino query engine to query the data。 The physical execution node formed by spark 3.2 (3) Scan parquet ecom_dwm.dwm_user_app_action_sum_all Output [3]: [dt#63, hour#64, appid#65] Batched: true Location: InMemoryFileIndex [] PartitionFilters: [isnotnull(dt#63), isnotnull(appid#65), (dt#63 = 20230412), Contains(appid#65, shopee)] ReadSchema: struct<> !image-2023-04-18-10-59-30-199.png! > The spark sql like statement is pushed down to parquet for execution, but the > dat
[jira] [Updated] (SPARK-43170) The spark sql like statement is pushed down to parquet for execution, but the data cannot be queried
[ https://issues.apache.org/jira/browse/SPARK-43170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] todd updated SPARK-43170: - Description: --DDL CREATE TABLE `ecom_dwm`.`dwm_user_app_action_sum_all` ( `gaid` STRING COMMENT '', `beyla_id` STRING COMMENT '', `dt` STRING, `hour` STRING, `appid` STRING COMMENT '包名') USING parquet PARTITIONED BY (dt, hour, appid) LOCATION 's3://x/dwm_user_app_action_sum_all' -- partitions info show partitions ecom_dwm.dwm_user_app_action_sum_all PARTITION (dt='20230412'); dt=20230412/hour=23/appid=blibli.mobile.commerce dt=20230412/hour=23/appid=cn.shopee.app dt=20230412/hour=23/appid=cn.shopee.br dt=20230412/hour=23/appid=cn.shopee.id dt=20230412/hour=23/appid=cn.shopee.my dt=20230412/hour=23/appid=cn.shopee.ph --- query select DISTINCT(appid) from ecom_dwm.dwm_user_app_action_sum_all where dt='20230412' and appid like '%shopee%' --result nodata --- other I use spark3.0.1 version and trino query engine to query the data。 The physical execution node formed by spark 3.2 (3) Scan parquet ecom_dwm.dwm_user_app_action_sum_all Output [3]: [dt#63, hour#64, appid#65] Batched: true Location: InMemoryFileIndex [] PartitionFilters: [isnotnull(dt#63), isnotnull(appid#65), (dt#63 = 20230412), Contains(appid#65, shopee)] ReadSchema: struct<> !image-2023-04-18-10-59-30-199.png! > The spark sql like statement is pushed down to parquet for execution, but the > data cannot be queried > > > Key: SPARK-43170 > URL: https://issues.apache.org/jira/browse/SPARK-43170 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.2.2 >Reporter: todd >Priority: Blocker > Attachments: image-2023-04-18-10-59-30-199.png > > > --DDL > CREATE TABLE `ecom_dwm`.`dwm_user_app_action_sum_all` ( > `gaid` STRING COMMENT '', > `beyla_id` STRING COMMENT '', > `dt` STRING, > `hour` STRING, > `appid` STRING COMMENT '包名') > USING parquet > PARTITIONED BY (dt, hour, appid) > LOCATION 's3://x/dwm_user_app_action_sum_all' > -- partitions info > show partitions ecom_dwm.dwm_user_app_action_sum_all PARTITION > (dt='20230412'); > > dt=20230412/hour=23/appid=blibli.mobile.commerce > dt=20230412/hour=23/appid=cn.shopee.app > dt=20230412/hour=23/appid=cn.shopee.br > dt=20230412/hour=23/appid=cn.shopee.id > dt=20230412/hour=23/appid=cn.shopee.my > dt=20230412/hour=23/appid=cn.shopee.ph > > --- query > select DISTINCT(appid) from ecom_dwm.dwm_user_app_action_sum_all > where dt='20230412' and appid like '%shopee%' > > --result > nodata > > --- other > I use spark3.0.1 version and trino query engine to query the data。 > > > The physical execution node formed by spark 3.2 > (3) Scan parquet ecom_dwm.dwm_user_app_action_sum_all Output [3]: [dt#63, > hour#64, appid#65] Batched: true Location: InMemoryFileIndex [] > PartitionFilters: [isnotnull(dt#63), isnotnull(appid#65), (dt#63 = 20230412), > Contains(appid#65, shopee)] ReadSchema: struct<> > > > !image-2023-04-18-10-59-30-199.png! > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-43170) The spark sql like statement is pushed down to parquet for execution, but the data cannot be queried
[ https://issues.apache.org/jira/browse/SPARK-43170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] todd updated SPARK-43170: - Attachment: image-2023-04-18-10-59-30-199.png > The spark sql like statement is pushed down to parquet for execution, but the > data cannot be queried > > > Key: SPARK-43170 > URL: https://issues.apache.org/jira/browse/SPARK-43170 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.2.2 >Reporter: todd >Priority: Blocker > Attachments: image-2023-04-18-10-59-30-199.png > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-43170) The spark sql like statement is pushed down to parquet for execution, but the data cannot be queried
todd created SPARK-43170: Summary: The spark sql like statement is pushed down to parquet for execution, but the data cannot be queried Key: SPARK-43170 URL: https://issues.apache.org/jira/browse/SPARK-43170 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.2.2 Reporter: todd -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-40298) shuffle data recovery on the reused PVCs no effect
[ https://issues.apache.org/jira/browse/SPARK-40298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17601225#comment-17601225 ] todd commented on SPARK-40298: -- [~dongjoon] I use aws spot cluster to terminate the instance where the pod is located during the shuffle read phase of the spark stage. The executed task will throw FetchFailed and ExecutorLostFailure exceptions. I hope that by reusing the PVC, only the current failed task will be recalculated, not the previous stage. We use this feature to avoid spark task recalculation when the aws spot cluster recycles machines, thereby saving computing costs. > shuffle data recovery on the reused PVCs no effect > --- > > Key: SPARK-40298 > URL: https://issues.apache.org/jira/browse/SPARK-40298 > Project: Spark > Issue Type: Bug > Components: Kubernetes >Affects Versions: 3.2.2 >Reporter: todd >Priority: Major > Attachments: 1662002808396.jpg, 1662002822097.jpg > > > I use spark3.2.2 to test the [ Support shuffle data recovery on the reused > PVCs (SPARK-35593) ] feature.I found that when shuffle read fails, data is > still read from source. > It can be confirmed that the pvc has been multiplexed by other pods, and the > Index and data data information has been sent > *This is my spark configuration information:* > --conf spark.driver.memory=5G > --conf spark.executor.memory=15G > --conf spark.executor.cores=1 > --conf spark.executor.instances=50 > --conf spark.sql.shuffle.partitions=50 > --conf spark.dynamicAllocation.enabled=false > --conf spark.kubernetes.driver.reusePersistentVolumeClaim=true > --conf spark.kubernetes.driver.ownPersistentVolumeClaim=true > --conf > spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.claimName=OnDemand > --conf > spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.storageClass=gp2 > --conf > spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.sizeLimit=100Gi > --conf > spark.kubernetes.executor.volumes.persistentVolumeClaim.data.mount.path=/tmp/data > --conf > spark.kubernetes.executor.volumes.persistentVolumeClaim.data.mount.readOnly=false > --conf spark.executorEnv.SPARK_EXECUTOR_DIRS=/tmp/data > --conf > spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO > --conf spark.kubernetes.executor.missingPodDetectDelta=10s > --conf spark.kubernetes.executor.apiPollingInterval=10s > --conf spark.shuffle.io.retryWait=60s > --conf spark.shuffle.io.maxRetries=5 > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] (SPARK-35593) Support shuffle data recovery on the reused PVCs
[ https://issues.apache.org/jira/browse/SPARK-35593 ] todd deleted comment on SPARK-35593: -- was (Author: todd5167): [~dongjoon] [~apachespark] Can you take a look at this question: https://issues.apache.org/jira/browse/SPARK-40298 > Support shuffle data recovery on the reused PVCs > > > Key: SPARK-35593 > URL: https://issues.apache.org/jira/browse/SPARK-35593 > Project: Spark > Issue Type: New Feature > Components: Kubernetes, Spark Core >Affects Versions: 3.2.0 >Reporter: Dongjoon Hyun >Assignee: Dongjoon Hyun >Priority: Major > Fix For: 3.2.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Reopened] (SPARK-40298) shuffle data recovery on the reused PVCs no effect
[ https://issues.apache.org/jira/browse/SPARK-40298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] todd reopened SPARK-40298: -- > shuffle data recovery on the reused PVCs no effect > --- > > Key: SPARK-40298 > URL: https://issues.apache.org/jira/browse/SPARK-40298 > Project: Spark > Issue Type: Bug > Components: Kubernetes >Affects Versions: 3.2.2 >Reporter: todd >Priority: Major > Attachments: 1662002808396.jpg, 1662002822097.jpg > > > I use spark3.2.2 to test the [ Support shuffle data recovery on the reused > PVCs (SPARK-35593) ] feature.I found that when shuffle read fails, data is > still read from source. > It can be confirmed that the pvc has been multiplexed by other pods, and the > Index and data data information has been sent > *This is my spark configuration information:* > --conf spark.driver.memory=5G > --conf spark.executor.memory=15G > --conf spark.executor.cores=1 > --conf spark.executor.instances=50 > --conf spark.sql.shuffle.partitions=50 > --conf spark.dynamicAllocation.enabled=false > --conf spark.kubernetes.driver.reusePersistentVolumeClaim=true > --conf spark.kubernetes.driver.ownPersistentVolumeClaim=true > --conf > spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.claimName=OnDemand > --conf > spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.storageClass=gp2 > --conf > spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.sizeLimit=100Gi > --conf > spark.kubernetes.executor.volumes.persistentVolumeClaim.data.mount.path=/tmp/data > --conf > spark.kubernetes.executor.volumes.persistentVolumeClaim.data.mount.readOnly=false > --conf spark.executorEnv.SPARK_EXECUTOR_DIRS=/tmp/data > --conf > spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO > --conf spark.kubernetes.executor.missingPodDetectDelta=10s > --conf spark.kubernetes.executor.apiPollingInterval=10s > --conf spark.shuffle.io.retryWait=60s > --conf spark.shuffle.io.maxRetries=5 > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-40298) shuffle data recovery on the reused PVCs no effect
[ https://issues.apache.org/jira/browse/SPARK-40298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] todd updated SPARK-40298: - Priority: Blocker (was: Major) > shuffle data recovery on the reused PVCs no effect > --- > > Key: SPARK-40298 > URL: https://issues.apache.org/jira/browse/SPARK-40298 > Project: Spark > Issue Type: Bug > Components: Kubernetes >Affects Versions: 3.2.2 >Reporter: todd >Priority: Blocker > Attachments: 1662002808396.jpg, 1662002822097.jpg > > > I use spark3.2.2 to test the [ Support shuffle data recovery on the reused > PVCs (SPARK-35593) ] feature.I found that when shuffle read fails, data is > still read from source. > It can be confirmed that the pvc has been multiplexed by other pods, and the > Index and data data information has been sent > *This is my spark configuration information:* > --conf spark.driver.memory=5G > --conf spark.executor.memory=15G > --conf spark.executor.cores=1 > --conf spark.executor.instances=50 > --conf spark.sql.shuffle.partitions=50 > --conf spark.dynamicAllocation.enabled=false > --conf spark.kubernetes.driver.reusePersistentVolumeClaim=true > --conf spark.kubernetes.driver.ownPersistentVolumeClaim=true > --conf > spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.claimName=OnDemand > --conf > spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.storageClass=gp2 > --conf > spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.sizeLimit=100Gi > --conf > spark.kubernetes.executor.volumes.persistentVolumeClaim.data.mount.path=/tmp/data > --conf > spark.kubernetes.executor.volumes.persistentVolumeClaim.data.mount.readOnly=false > --conf spark.executorEnv.SPARK_EXECUTOR_DIRS=/tmp/data > --conf > spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO > --conf spark.kubernetes.executor.missingPodDetectDelta=10s > --conf spark.kubernetes.executor.apiPollingInterval=10s > --conf spark.shuffle.io.retryWait=60s > --conf spark.shuffle.io.maxRetries=5 > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-35593) Support shuffle data recovery on the reused PVCs
[ https://issues.apache.org/jira/browse/SPARK-35593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17598758#comment-17598758 ] todd commented on SPARK-35593: -- [~dongjoon] [~apachespark] Can you take a look at this question: https://issues.apache.org/jira/browse/SPARK-40298 > Support shuffle data recovery on the reused PVCs > > > Key: SPARK-35593 > URL: https://issues.apache.org/jira/browse/SPARK-35593 > Project: Spark > Issue Type: New Feature > Components: Kubernetes, Spark Core >Affects Versions: 3.2.0 >Reporter: Dongjoon Hyun >Assignee: Dongjoon Hyun >Priority: Major > Fix For: 3.2.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-40298) shuffle data recovery on the reused PVCs no effect
[ https://issues.apache.org/jira/browse/SPARK-40298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] todd updated SPARK-40298: - Description: I use spark3.2.2 to test the [ Support shuffle data recovery on the reused PVCs (SPARK-35593) ] feature.I found that when shuffle read fails, data is still read from source. It can be confirmed that the pvc has been multiplexed by other pods, and the Index and data data information has been sent *This is my spark configuration information:* --conf spark.driver.memory=5G --conf spark.executor.memory=15G --conf spark.executor.cores=1 --conf spark.executor.instances=50 --conf spark.sql.shuffle.partitions=50 --conf spark.dynamicAllocation.enabled=false --conf spark.kubernetes.driver.reusePersistentVolumeClaim=true --conf spark.kubernetes.driver.ownPersistentVolumeClaim=true --conf spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.claimName=OnDemand --conf spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.storageClass=gp2 --conf spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.sizeLimit=100Gi --conf spark.kubernetes.executor.volumes.persistentVolumeClaim.data.mount.path=/tmp/data --conf spark.kubernetes.executor.volumes.persistentVolumeClaim.data.mount.readOnly=false --conf spark.executorEnv.SPARK_EXECUTOR_DIRS=/tmp/data --conf spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO --conf spark.kubernetes.executor.missingPodDetectDelta=10s --conf spark.kubernetes.executor.apiPollingInterval=10s --conf spark.shuffle.io.retryWait=60s --conf spark.shuffle.io.maxRetries=5 > shuffle data recovery on the reused PVCs no effect > --- > > Key: SPARK-40298 > URL: https://issues.apache.org/jira/browse/SPARK-40298 > Project: Spark > Issue Type: Bug > Components: Kubernetes >Affects Versions: 3.2.2 >Reporter: todd >Priority: Major > Attachments: 1662002808396.jpg, 1662002822097.jpg > > > I use spark3.2.2 to test the [ Support shuffle data recovery on the reused > PVCs (SPARK-35593) ] feature.I found that when shuffle read fails, data is > still read from source. > It can be confirmed that the pvc has been multiplexed by other pods, and the > Index and data data information has been sent > *This is my spark configuration information:* > --conf spark.driver.memory=5G > --conf spark.executor.memory=15G > --conf spark.executor.cores=1 > --conf spark.executor.instances=50 > --conf spark.sql.shuffle.partitions=50 > --conf spark.dynamicAllocation.enabled=false > --conf spark.kubernetes.driver.reusePersistentVolumeClaim=true > --conf spark.kubernetes.driver.ownPersistentVolumeClaim=true > --conf > spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.claimName=OnDemand > --conf > spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.storageClass=gp2 > --conf > spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.sizeLimit=100Gi > --conf > spark.kubernetes.executor.volumes.persistentVolumeClaim.data.mount.path=/tmp/data > --conf > spark.kubernetes.executor.volumes.persistentVolumeClaim.data.mount.readOnly=false > --conf spark.executorEnv.SPARK_EXECUTOR_DIRS=/tmp/data > --conf > spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO > --conf spark.kubernetes.executor.missingPodDetectDelta=10s > --conf spark.kubernetes.executor.apiPollingInterval=10s > --conf spark.shuffle.io.retryWait=60s > --conf spark.shuffle.io.maxRetries=5 > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-40298) shuffle data recovery on the reused PVCs no effect
[ https://issues.apache.org/jira/browse/SPARK-40298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] todd updated SPARK-40298: - Attachment: 1662002808396.jpg 1662002822097.jpg > shuffle data recovery on the reused PVCs no effect > --- > > Key: SPARK-40298 > URL: https://issues.apache.org/jira/browse/SPARK-40298 > Project: Spark > Issue Type: Bug > Components: Kubernetes >Affects Versions: 3.2.2 >Reporter: todd >Priority: Major > Attachments: 1662002808396.jpg, 1662002822097.jpg > > > I use spark3.2.2 to test the [ Support shuffle data recovery on the reused > PVCs (SPARK-35593) ] feature.I found that when shuffle read fails, data is > still read from source. > It can be confirmed that the pvc has been multiplexed by other pods, and the > Index and data data information has been sent > *This is my spark configuration information:* > --conf spark.driver.memory=5G > --conf spark.executor.memory=15G > --conf spark.executor.cores=1 > --conf spark.executor.instances=50 > --conf spark.sql.shuffle.partitions=50 > --conf spark.dynamicAllocation.enabled=false > --conf spark.kubernetes.driver.reusePersistentVolumeClaim=true > --conf spark.kubernetes.driver.ownPersistentVolumeClaim=true > --conf > spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.claimName=OnDemand > --conf > spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.storageClass=gp2 > --conf > spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.sizeLimit=100Gi > --conf > spark.kubernetes.executor.volumes.persistentVolumeClaim.data.mount.path=/tmp/data > --conf > spark.kubernetes.executor.volumes.persistentVolumeClaim.data.mount.readOnly=false > --conf spark.executorEnv.SPARK_EXECUTOR_DIRS=/tmp/data > --conf > spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO > --conf spark.kubernetes.executor.missingPodDetectDelta=10s > --conf spark.kubernetes.executor.apiPollingInterval=10s > --conf spark.shuffle.io.retryWait=60s > --conf spark.shuffle.io.maxRetries=5 > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-40298) shuffle data recovery on the reused PVCs no effect
todd created SPARK-40298: Summary: shuffle data recovery on the reused PVCs no effect Key: SPARK-40298 URL: https://issues.apache.org/jira/browse/SPARK-40298 Project: Spark Issue Type: Bug Components: Kubernetes Affects Versions: 3.2.2 Reporter: todd -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18105) LZ4 failed to decompress a stream of shuffled data
[ https://issues.apache.org/jira/browse/SPARK-18105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17397898#comment-17397898 ] Cameron Todd commented on SPARK-18105: -- Oh sorry, I meant just a portion of the code can be replaced or only the import of the parquet part because I hashed the data file so the 'pivot_hash' column already exists. The real test begins with the following joins and aggregations per the .java file I uploaded. > LZ4 failed to decompress a stream of shuffled data > -- > > Key: SPARK-18105 > URL: https://issues.apache.org/jira/browse/SPARK-18105 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.1, 3.1.1 >Reporter: Davies Liu >Priority: Major > Attachments: TestWeightedGraph.java > > > When lz4 is used to compress the shuffle files, it may fail to decompress it > as "stream is corrupt" > {code} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 92 in stage 5.0 failed 4 times, most recent failure: Lost task 92.3 in > stage 5.0 (TID 16616, 10.0.27.18): java.io.IOException: Stream is corrupted > at > org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:220) > at > org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109) > at java.io.BufferedInputStream.read(BufferedInputStream.java:353) > at java.io.DataInputStream.read(DataInputStream.java:149) > at com.google.common.io.ByteStreams.read(ByteStreams.java:828) > at com.google.common.io.ByteStreams.readFully(ByteStreams.java:695) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110) > at scala.collection.Iterator$$anon$13.next(Iterator.scala:372) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30) > at > org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at > org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:86) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > https://github.com/jpountz/lz4-java/issues/89 -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18105) LZ4 failed to decompress a stream of shuffled data
[ https://issues.apache.org/jira/browse/SPARK-18105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17397603#comment-17397603 ] Cameron Todd commented on SPARK-18105: -- Good to hear. Also the count of 136,935,074 is right. When you say "I downloaded and ran the test code in my laptop. It works well to me". You mean the java code I attached or do you mean the read file and count() that you just attached above. Because the error is only raised after the first few aggregations and joins in my code and I also doubt your laptop is big enough to process that ;) . I can attach key config variables on your request but I'm using a cloud Kubernetes spark cluster managed by OVH (spark 3.0.1) so it's out of box solution managed by OVH not really me. > LZ4 failed to decompress a stream of shuffled data > -- > > Key: SPARK-18105 > URL: https://issues.apache.org/jira/browse/SPARK-18105 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.1, 3.1.1 >Reporter: Davies Liu >Priority: Major > Attachments: TestWeightedGraph.java > > > When lz4 is used to compress the shuffle files, it may fail to decompress it > as "stream is corrupt" > {code} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 92 in stage 5.0 failed 4 times, most recent failure: Lost task 92.3 in > stage 5.0 (TID 16616, 10.0.27.18): java.io.IOException: Stream is corrupted > at > org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:220) > at > org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109) > at java.io.BufferedInputStream.read(BufferedInputStream.java:353) > at java.io.DataInputStream.read(DataInputStream.java:149) > at com.google.common.io.ByteStreams.read(ByteStreams.java:828) > at com.google.common.io.ByteStreams.readFully(ByteStreams.java:695) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110) > at scala.collection.Iterator$$anon$13.next(Iterator.scala:372) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30) > at > org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at > org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:86) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > https://github.com/jpountz/lz4-java/issues/89 -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18105) LZ4 failed to decompress a stream of shuffled data
[ https://issues.apache.org/jira/browse/SPARK-18105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17397301#comment-17397301 ] Cameron Todd commented on SPARK-18105: -- Ok I added the zip file on this public S3 bucket, it holds a parquet file with snappy compression, it's 1.75gb. You can retrieve the data as such {code:java} wget https://storage.gra.cloud.ovh.net/v1/AUTH_147b880980f148f5ad1af09542e6f37a/public_data/SPARK18105/hashed_data.zip {code} > LZ4 failed to decompress a stream of shuffled data > -- > > Key: SPARK-18105 > URL: https://issues.apache.org/jira/browse/SPARK-18105 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.1, 3.1.1 >Reporter: Davies Liu >Priority: Major > Attachments: TestWeightedGraph.java > > > When lz4 is used to compress the shuffle files, it may fail to decompress it > as "stream is corrupt" > {code} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 92 in stage 5.0 failed 4 times, most recent failure: Lost task 92.3 in > stage 5.0 (TID 16616, 10.0.27.18): java.io.IOException: Stream is corrupted > at > org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:220) > at > org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109) > at java.io.BufferedInputStream.read(BufferedInputStream.java:353) > at java.io.DataInputStream.read(DataInputStream.java:149) > at com.google.common.io.ByteStreams.read(ByteStreams.java:828) > at com.google.common.io.ByteStreams.readFully(ByteStreams.java:695) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110) > at scala.collection.Iterator$$anon$13.next(Iterator.scala:372) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30) > at > org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at > org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:86) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > https://github.com/jpountz/lz4-java/issues/89 -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-18105) LZ4 failed to decompress a stream of shuffled data
[ https://issues.apache.org/jira/browse/SPARK-18105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17395953#comment-17395953 ] Cameron Todd edited comment on SPARK-18105 at 8/9/21, 10:31 AM: Yep I understand. I have hashed my data keeping the same distribution and the full_name hashed column is weak but string distance functions still work on it. Do you have any recommendations where I can upload this data, it's only 2gb? {code:java} //So this line of code: Dataset relevantPivots = spark.read().parquet(pathToDataDedup) .select("id", "full_name", "last_name","birthdate") .na().drop() .withColumn("pivot_hash", hash(col("last_name"),col("birthdate"))) .drop("last_name","birthdate") .repartition(5000) .cache(); // can be replaced with this Dataset relevantPivots = spark.read().parquet(pathToDataDedup).repartition(5000).cache();{code} I have also run the same code on the same hashed data and getting the same corrupted stream error. Also in case it wasn't clear my data normally sits on an s3 bucket. was (Author: cameron.todd): Yep I understand. I have hashed my data keeping the same distribution and the full_name hashed column is weak but string distance functions still work on it. Do you have any recommendations where I can upload this data? {code:java} //So this line of code: Dataset relevantPivots = spark.read().parquet(pathToDataDedup) .select("id", "full_name", "last_name","birthdate") .na().drop() .withColumn("pivot_hash", hash(col("last_name"),col("birthdate"))) .drop("last_name","birthdate") .repartition(5000) .cache(); // can be replaced with this Dataset relevantPivots = spark.read().parquet(pathToDataDedup).repartition(5000).cache();{code} I have also run the same code on the same hashed data and getting the same corrupted stream error. Also in case it wasn't clear my data normally sits on an s3 bucket. > LZ4 failed to decompress a stream of shuffled data > -- > > Key: SPARK-18105 > URL: https://issues.apache.org/jira/browse/SPARK-18105 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.1, 3.1.1 >Reporter: Davies Liu >Priority: Major > Attachments: TestWeightedGraph.java > > > When lz4 is used to compress the shuffle files, it may fail to decompress it > as "stream is corrupt" > {code} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 92 in stage 5.0 failed 4 times, most recent failure: Lost task 92.3 in > stage 5.0 (TID 16616, 10.0.27.18): java.io.IOException: Stream is corrupted > at > org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:220) > at > org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109) > at java.io.BufferedInputStream.read(BufferedInputStream.java:353) > at java.io.DataInputStream.read(DataInputStream.java:149) > at com.google.common.io.ByteStreams.read(ByteStreams.java:828) > at com.google.common.io.ByteStreams.readFully(ByteStreams.java:695) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110) > at scala.collection.Iterator$$anon$13.next(Iterator.scala:372) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30) > at > org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at > org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply
[jira] [Comment Edited] (SPARK-18105) LZ4 failed to decompress a stream of shuffled data
[ https://issues.apache.org/jira/browse/SPARK-18105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17395953#comment-17395953 ] Cameron Todd edited comment on SPARK-18105 at 8/9/21, 10:29 AM: Yep I understand. I have hashed my data keeping the same distribution and the full_name hashed column is weak but string distance functions still work on it. Do you have any recommendations where I can upload this data? {code:java} //So this line of code: Dataset relevantPivots = spark.read().parquet(pathToDataDedup) .select("id", "full_name", "last_name","birthdate") .na().drop() .withColumn("pivot_hash", hash(col("last_name"),col("birthdate"))) .drop("last_name","birthdate") .repartition(5000) .cache(); // can be replaced with this Dataset relevantPivots = spark.read().parquet(pathToDataDedup).repartition(5000).cache();{code} I have also run the same code on the same hashed data and getting the same corrupted stream error. Also in case it wasn't clear my data normally sits on an s3 bucket. was (Author: cameron.todd): Yep I understand. I have attached my hashed data keeping the same distribution and the full_name hashed column is weak but string distance functions still work on it. So this line of code: {code:java} Dataset relevantPivots = spark.read().parquet(pathToDataDedup) .select("id", "full_name", "last_name","birthdate") .na().drop() .withColumn("pivot_hash", hash(col("last_name"),col("birthdate"))) .drop("last_name","birthdate") .repartition(5000) .cache(); // can be replaced with this Dataset relevantPivots = spark.read().parquet(pathToDataDedup).repartition(5000).cache();{code} I have also run the same code on the same hashed data and getting the same corrupted stream error. Also in case it wasn't clear my data normally sits on an s3 bucket. > LZ4 failed to decompress a stream of shuffled data > -- > > Key: SPARK-18105 > URL: https://issues.apache.org/jira/browse/SPARK-18105 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.1, 3.1.1 >Reporter: Davies Liu >Priority: Major > Attachments: TestWeightedGraph.java > > > When lz4 is used to compress the shuffle files, it may fail to decompress it > as "stream is corrupt" > {code} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 92 in stage 5.0 failed 4 times, most recent failure: Lost task 92.3 in > stage 5.0 (TID 16616, 10.0.27.18): java.io.IOException: Stream is corrupted > at > org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:220) > at > org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109) > at java.io.BufferedInputStream.read(BufferedInputStream.java:353) > at java.io.DataInputStream.read(DataInputStream.java:149) > at com.google.common.io.ByteStreams.read(ByteStreams.java:828) > at com.google.common.io.ByteStreams.readFully(ByteStreams.java:695) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110) > at scala.collection.Iterator$$anon$13.next(Iterator.scala:372) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30) > at > org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at > org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > a
[jira] [Issue Comment Deleted] (SPARK-18105) LZ4 failed to decompress a stream of shuffled data
[ https://issues.apache.org/jira/browse/SPARK-18105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cameron Todd updated SPARK-18105: - Comment: was deleted (was: [^hashed_data.zip]) > LZ4 failed to decompress a stream of shuffled data > -- > > Key: SPARK-18105 > URL: https://issues.apache.org/jira/browse/SPARK-18105 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.1, 3.1.1 >Reporter: Davies Liu >Priority: Major > Attachments: TestWeightedGraph.java > > > When lz4 is used to compress the shuffle files, it may fail to decompress it > as "stream is corrupt" > {code} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 92 in stage 5.0 failed 4 times, most recent failure: Lost task 92.3 in > stage 5.0 (TID 16616, 10.0.27.18): java.io.IOException: Stream is corrupted > at > org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:220) > at > org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109) > at java.io.BufferedInputStream.read(BufferedInputStream.java:353) > at java.io.DataInputStream.read(DataInputStream.java:149) > at com.google.common.io.ByteStreams.read(ByteStreams.java:828) > at com.google.common.io.ByteStreams.readFully(ByteStreams.java:695) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110) > at scala.collection.Iterator$$anon$13.next(Iterator.scala:372) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30) > at > org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at > org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:86) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > https://github.com/jpountz/lz4-java/issues/89 -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18105) LZ4 failed to decompress a stream of shuffled data
[ https://issues.apache.org/jira/browse/SPARK-18105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17395954#comment-17395954 ] Cameron Todd commented on SPARK-18105: -- [^hashed_data.zip] > LZ4 failed to decompress a stream of shuffled data > -- > > Key: SPARK-18105 > URL: https://issues.apache.org/jira/browse/SPARK-18105 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.1, 3.1.1 >Reporter: Davies Liu >Priority: Major > Attachments: TestWeightedGraph.java > > > When lz4 is used to compress the shuffle files, it may fail to decompress it > as "stream is corrupt" > {code} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 92 in stage 5.0 failed 4 times, most recent failure: Lost task 92.3 in > stage 5.0 (TID 16616, 10.0.27.18): java.io.IOException: Stream is corrupted > at > org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:220) > at > org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109) > at java.io.BufferedInputStream.read(BufferedInputStream.java:353) > at java.io.DataInputStream.read(DataInputStream.java:149) > at com.google.common.io.ByteStreams.read(ByteStreams.java:828) > at com.google.common.io.ByteStreams.readFully(ByteStreams.java:695) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110) > at scala.collection.Iterator$$anon$13.next(Iterator.scala:372) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30) > at > org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at > org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:86) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > https://github.com/jpountz/lz4-java/issues/89 -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18105) LZ4 failed to decompress a stream of shuffled data
[ https://issues.apache.org/jira/browse/SPARK-18105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17395953#comment-17395953 ] Cameron Todd commented on SPARK-18105: -- Yep I understand. I have attached my hashed data keeping the same distribution and the full_name hashed column is weak but string distance functions still work on it. So this line of code: {code:java} Dataset relevantPivots = spark.read().parquet(pathToDataDedup) .select("id", "full_name", "last_name","birthdate") .na().drop() .withColumn("pivot_hash", hash(col("last_name"),col("birthdate"))) .drop("last_name","birthdate") .repartition(5000) .cache(); // can be replaced with this Dataset relevantPivots = spark.read().parquet(pathToDataDedup).repartition(5000).cache();{code} I have also run the same code on the same hashed data and getting the same corrupted stream error. Also in case it wasn't clear my data normally sits on an s3 bucket. > LZ4 failed to decompress a stream of shuffled data > -- > > Key: SPARK-18105 > URL: https://issues.apache.org/jira/browse/SPARK-18105 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.1, 3.1.1 >Reporter: Davies Liu >Priority: Major > Attachments: TestWeightedGraph.java > > > When lz4 is used to compress the shuffle files, it may fail to decompress it > as "stream is corrupt" > {code} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 92 in stage 5.0 failed 4 times, most recent failure: Lost task 92.3 in > stage 5.0 (TID 16616, 10.0.27.18): java.io.IOException: Stream is corrupted > at > org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:220) > at > org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109) > at java.io.BufferedInputStream.read(BufferedInputStream.java:353) > at java.io.DataInputStream.read(DataInputStream.java:149) > at com.google.common.io.ByteStreams.read(ByteStreams.java:828) > at com.google.common.io.ByteStreams.readFully(ByteStreams.java:695) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110) > at scala.collection.Iterator$$anon$13.next(Iterator.scala:372) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30) > at > org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at > org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:86) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > https://github.com/jpountz/lz4-java/issues/89 -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18105) LZ4 failed to decompress a stream of shuffled data
[ https://issues.apache.org/jira/browse/SPARK-18105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17392971#comment-17392971 ] Cameron Todd commented on SPARK-18105: -- Let me know if that's enough info. From my tests if I remove the .repartition after import of file, I get no corrupted stream error but I get skewed joins and memory problems later. Then if I add the repartition back, the job does not complete because multiple tasks crash and are retried but still crash with the stream corrupted error before completely crashing the job > LZ4 failed to decompress a stream of shuffled data > -- > > Key: SPARK-18105 > URL: https://issues.apache.org/jira/browse/SPARK-18105 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.1, 3.1.1 >Reporter: Davies Liu >Priority: Major > Attachments: TestWeightedGraph.java > > > When lz4 is used to compress the shuffle files, it may fail to decompress it > as "stream is corrupt" > {code} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 92 in stage 5.0 failed 4 times, most recent failure: Lost task 92.3 in > stage 5.0 (TID 16616, 10.0.27.18): java.io.IOException: Stream is corrupted > at > org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:220) > at > org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109) > at java.io.BufferedInputStream.read(BufferedInputStream.java:353) > at java.io.DataInputStream.read(DataInputStream.java:149) > at com.google.common.io.ByteStreams.read(ByteStreams.java:828) > at com.google.common.io.ByteStreams.readFully(ByteStreams.java:695) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110) > at scala.collection.Iterator$$anon$13.next(Iterator.scala:372) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30) > at > org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at > org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:86) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > https://github.com/jpountz/lz4-java/issues/89 -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18105) LZ4 failed to decompress a stream of shuffled data
[ https://issues.apache.org/jira/browse/SPARK-18105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17392968#comment-17392968 ] Cameron Todd commented on SPARK-18105: -- I'll attach a portion of the code that is not proprietary but it's mostly all there. Anyway the code is one small step in our entity resolution process that processes 1billion person records so lots of pairwise comparisons and skewed joins before aggregate and output staging results.[^TestWeightedGraph.java] > LZ4 failed to decompress a stream of shuffled data > -- > > Key: SPARK-18105 > URL: https://issues.apache.org/jira/browse/SPARK-18105 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.1, 3.1.1 >Reporter: Davies Liu >Priority: Major > Attachments: TestWeightedGraph.java > > > When lz4 is used to compress the shuffle files, it may fail to decompress it > as "stream is corrupt" > {code} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 92 in stage 5.0 failed 4 times, most recent failure: Lost task 92.3 in > stage 5.0 (TID 16616, 10.0.27.18): java.io.IOException: Stream is corrupted > at > org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:220) > at > org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109) > at java.io.BufferedInputStream.read(BufferedInputStream.java:353) > at java.io.DataInputStream.read(DataInputStream.java:149) > at com.google.common.io.ByteStreams.read(ByteStreams.java:828) > at com.google.common.io.ByteStreams.readFully(ByteStreams.java:695) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110) > at scala.collection.Iterator$$anon$13.next(Iterator.scala:372) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30) > at > org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at > org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:86) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > https://github.com/jpountz/lz4-java/issues/89 -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-18105) LZ4 failed to decompress a stream of shuffled data
[ https://issues.apache.org/jira/browse/SPARK-18105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cameron Todd updated SPARK-18105: - Attachment: TestWeightedGraph.java > LZ4 failed to decompress a stream of shuffled data > -- > > Key: SPARK-18105 > URL: https://issues.apache.org/jira/browse/SPARK-18105 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.1, 3.1.1 >Reporter: Davies Liu >Priority: Major > Attachments: TestWeightedGraph.java > > > When lz4 is used to compress the shuffle files, it may fail to decompress it > as "stream is corrupt" > {code} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 92 in stage 5.0 failed 4 times, most recent failure: Lost task 92.3 in > stage 5.0 (TID 16616, 10.0.27.18): java.io.IOException: Stream is corrupted > at > org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:220) > at > org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109) > at java.io.BufferedInputStream.read(BufferedInputStream.java:353) > at java.io.DataInputStream.read(DataInputStream.java:149) > at com.google.common.io.ByteStreams.read(ByteStreams.java:828) > at com.google.common.io.ByteStreams.readFully(ByteStreams.java:695) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110) > at scala.collection.Iterator$$anon$13.next(Iterator.scala:372) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30) > at > org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at > org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:86) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > https://github.com/jpountz/lz4-java/issues/89 -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-18105) LZ4 failed to decompress a stream of shuffled data
[ https://issues.apache.org/jira/browse/SPARK-18105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17392205#comment-17392205 ] Cameron Todd edited comment on SPARK-18105 at 8/3/21, 9:44 AM: --- I'm also facing this same error when scaling up my project to a larger dataset. It's consistently crashing in the same spot (I can reproduce it) and I'm pretty sure it's coming down to using a repartition() after import as below: {code:java} Dataset relevantPivots = spark.read().parquet(jobConf.pathToDataDedup) .select(VadisSparkUtils.scalaSeqConverterColumn(allColsNeeded)) .na().drop() .withColumn("pivot_hash", hash(VadisSparkUtils.scalaSeqConverterColumn(pivot))) .drop(VadisSparkUtils.scalaSeqConverterString(pivot)) .repartition(5000) .cache(); {code} I have already tried to disable "spark.unsafe.sorter.spill.read.ahead.enabled" but did not work. My cluster environment is spark 3.0.1 standalone with Kubernetes and using lz4-java-1.7.1.jar Not entirely sure how to debug this further and give you guys anymore info because I can't get the logs on each slave node was (Author: cameron.todd): I'm also facing this same error when scaling up my project to a larger dataset. It's consistently crashing in the same spot and I'm pretty sure it's coming down to using a repartition() after import as below: {code:java} Dataset relevantPivots = spark.read().parquet(jobConf.pathToDataDedup) .select(VadisSparkUtils.scalaSeqConverterColumn(allColsNeeded)) .na().drop() .withColumn("pivot_hash", hash(VadisSparkUtils.scalaSeqConverterColumn(pivot))) .drop(VadisSparkUtils.scalaSeqConverterString(pivot)) .repartition(5000) .cache(); {code} I have already tried to disable "spark.unsafe.sorter.spill.read.ahead.enabled" but did not work. My cluster environment is spark 3.0.1 standalone with Kubernetes and using lz4-java-1.7.1.jar Not entirely sure how to debug this further and give you guys anymore info because I can't get the logs on each slave node > LZ4 failed to decompress a stream of shuffled data > -- > > Key: SPARK-18105 > URL: https://issues.apache.org/jira/browse/SPARK-18105 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.1, 3.1.1 >Reporter: Davies Liu >Priority: Major > > When lz4 is used to compress the shuffle files, it may fail to decompress it > as "stream is corrupt" > {code} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 92 in stage 5.0 failed 4 times, most recent failure: Lost task 92.3 in > stage 5.0 (TID 16616, 10.0.27.18): java.io.IOException: Stream is corrupted > at > org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:220) > at > org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109) > at java.io.BufferedInputStream.read(BufferedInputStream.java:353) > at java.io.DataInputStream.read(DataInputStream.java:149) > at com.google.common.io.ByteStreams.read(ByteStreams.java:828) > at com.google.common.io.ByteStreams.readFully(ByteStreams.java:695) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110) > at scala.collection.Iterator$$anon$13.next(Iterator.scala:372) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30) > at > org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at > org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationC
[jira] [Commented] (SPARK-18105) LZ4 failed to decompress a stream of shuffled data
[ https://issues.apache.org/jira/browse/SPARK-18105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17392205#comment-17392205 ] Cameron Todd commented on SPARK-18105: -- I'm also facing this same error when scaling up my project to a larger dataset. It's consistently crashing in the same spot and I'm pretty sure it's coming down to using a repartition() after import as below: {code:java} Dataset relevantPivots = spark.read().parquet(jobConf.pathToDataDedup) .select(VadisSparkUtils.scalaSeqConverterColumn(allColsNeeded)) .na().drop() .withColumn("pivot_hash", hash(VadisSparkUtils.scalaSeqConverterColumn(pivot))) .drop(VadisSparkUtils.scalaSeqConverterString(pivot)) .repartition(5000) .cache(); {code} I have already tried to disable "spark.unsafe.sorter.spill.read.ahead.enabled" but did not work. My cluster environment is spark 3.0.1 standalone with Kubernetes and using lz4-java-1.7.1.jar Not entirely sure how to debug this further and give you guys anymore info because I can't get the logs on each slave node > LZ4 failed to decompress a stream of shuffled data > -- > > Key: SPARK-18105 > URL: https://issues.apache.org/jira/browse/SPARK-18105 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.1, 3.1.1 >Reporter: Davies Liu >Priority: Major > > When lz4 is used to compress the shuffle files, it may fail to decompress it > as "stream is corrupt" > {code} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 92 in stage 5.0 failed 4 times, most recent failure: Lost task 92.3 in > stage 5.0 (TID 16616, 10.0.27.18): java.io.IOException: Stream is corrupted > at > org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:220) > at > org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109) > at java.io.BufferedInputStream.read(BufferedInputStream.java:353) > at java.io.DataInputStream.read(DataInputStream.java:149) > at com.google.common.io.ByteStreams.read(ByteStreams.java:828) > at com.google.common.io.ByteStreams.readFully(ByteStreams.java:695) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110) > at scala.collection.Iterator$$anon$13.next(Iterator.scala:372) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30) > at > org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at > org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:86) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > https://github.com/jpountz/lz4-java/issues/89 -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org