[jira] [Comment Edited] (SPARK-43106) Data lost from the table if the INSERT OVERWRITE query fails

2023-04-29 Thread Vaibhav Beriwala (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-43106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17717917#comment-17717917
 ] 

Vaibhav Beriwala edited comment on SPARK-43106 at 4/29/23 4:19 PM:
---

[~dongjoon] Thank you for taking a look at this issue.

1) This is not specific to just Spark-3.3.2. The issue exists in the master 
branch as well.

2) As [~itskals] mentioned above, at Uber we mostly use HDFS as the storage 
backend but this same issue would exist for cloud object storage as well.

3) Running any *_INSERT OVERWRITE TABLE_* query over any unpartitioned table 
would help you quickly reproduce this issue. You will notice that Spark would 
first clean up the table output path and then launch a job that does the 
computation for the new data making the table data unavailable if this Spark 
job fails.

 

Some code pointers on this:

1) Refer to the class _*InsertIntoHadoopFsRelation*_ -> method *_run._*

2) Inside the _*run*_ method, you would see that we first call 
{_}*deleteMatchingPartitions*{_}(this will clean up the table data) and then 
later call  {_}*FileFormatWriter.write*{_}(this will trigger the actual job).


was (Author: vaibhavb):
[~dongjoon] Thank you for taking a look at this issue.

1) This is not specific to just Spark-3.3.2. The issue exists in the master 
branch as well.

2) As [~itskals] mentioned above, at Uber we mostly use HDFS as the storage 
backend but this same issue would exist for cloud object storage as well.

3) Running any *_INSERT OVERWRITE TABLE_* query over any unpartitioned table 
would help you quickly reproduce this issue. You will notice that Spark would 
first clean up the table output path and then launch a job that does the 
computation for the new data.

 

Some code pointers on this:

1) Refer to the class _*InsertIntoHadoopFsRelation*_ -> method *_run._*

2) Inside the _*run*_ method, you would see that we first call 
{_}*deleteMatchingPartitions*{_}(this will clean up the table data) and then 
later call  {_}*FileFormatWriter.write*{_}(this will trigger the actual job).

> Data lost from the table if the INSERT OVERWRITE query fails
> 
>
> Key: SPARK-43106
> URL: https://issues.apache.org/jira/browse/SPARK-43106
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.4.0
>Reporter: Vaibhav Beriwala
>Priority: Major
>
> When we run an INSERT OVERWRITE query for an unpartitioned table on Spark-3, 
> Spark has the following behavior:
> 1) It will first clean up all the data from the actual table path.
> 2) It will then launch a job that performs the actual insert.
>  
> There are 2 major issues with this approach:
> 1) If the insert job launched in step 2 above fails for any reason, the data 
> from the original table is lost. 
> 2) If the insert job in step 2 above takes a huge time to complete, then 
> table data is unavailable to other readers for the entire duration the job 
> takes.
> This behavior is the same even for the partitioned tables when using static 
> partitioning. For dynamic partitioning, we do not delete the table data 
> before the job launch.
>  
> Is there a reason as to why we perform this delete before the job launch and 
> not as part of the Job commit operation? This issue is not there with Hive - 
> where the data is cleaned up as part of the Job commit operation probably. As 
> part of SPARK-19183, we did add a new hook in the commit protocol for this 
> exact same purpose, but seems like its default behavior is still to delete 
> the table data before the job launch.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-43320) Directly call Hive 2.3.9 API

2023-04-29 Thread Sean R. Owen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean R. Owen updated SPARK-43320:
-
Priority: Trivial  (was: Major)

> Directly call Hive 2.3.9 API
> 
>
> Key: SPARK-43320
> URL: https://issues.apache.org/jira/browse/SPARK-43320
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.5.0
>Reporter: Cheng Pan
>Assignee: Cheng Pan
>Priority: Trivial
> Fix For: 3.5.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-43320) Directly call Hive 2.3.9 API

2023-04-29 Thread Sean R. Owen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean R. Owen resolved SPARK-43320.
--
Fix Version/s: 3.5.0
 Assignee: Cheng Pan
   Resolution: Fixed

Resolved by https://github.com/apache/spark/pull/40995

This essentially reverts https://issues.apache.org/jira/browse/SPARK-37446

> Directly call Hive 2.3.9 API
> 
>
> Key: SPARK-43320
> URL: https://issues.apache.org/jira/browse/SPARK-43320
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.5.0
>Reporter: Cheng Pan
>Assignee: Cheng Pan
>Priority: Major
> Fix For: 3.5.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-43106) Data lost from the table if the INSERT OVERWRITE query fails

2023-04-29 Thread Vaibhav Beriwala (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-43106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17717917#comment-17717917
 ] 

Vaibhav Beriwala commented on SPARK-43106:
--

[~dongjoon] Thank you for taking a look at this issue.

1) This is not specific to just Spark-3.3.2. The issue exists in the master 
branch as well.

2) As [~itskals] mentioned above, at Uber we mostly use HDFS as the storage 
backend but this same issue would exist for cloud object storage as well.

3) Running any *_INSERT OVERWRITE TABLE_* query over any unpartitioned table 
would help you quickly reproduce this issue. You will notice that Spark would 
first clean up the table output path and then launch a job that does the 
computation for the new data.

 

Some code pointers on this:

1) Refer to the class _*InsertIntoHadoopFsRelation*_ -> method *_run._*

2) Inside the _*run*_ method, you would see that we first call 
{_}*deleteMatchingPartitions*{_}(this will clean up the table data) and then 
later call  {_}*FileFormatWriter.write*{_}(this will trigger the actual job).

> Data lost from the table if the INSERT OVERWRITE query fails
> 
>
> Key: SPARK-43106
> URL: https://issues.apache.org/jira/browse/SPARK-43106
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.2
>Reporter: Vaibhav Beriwala
>Priority: Major
>
> When we run an INSERT OVERWRITE query for an unpartitioned table on Spark-3, 
> Spark has the following behavior:
> 1) It will first clean up all the data from the actual table path.
> 2) It will then launch a job that performs the actual insert.
>  
> There are 2 major issues with this approach:
> 1) If the insert job launched in step 2 above fails for any reason, the data 
> from the original table is lost. 
> 2) If the insert job in step 2 above takes a huge time to complete, then 
> table data is unavailable to other readers for the entire duration the job 
> takes.
> This behavior is the same even for the partitioned tables when using static 
> partitioning. For dynamic partitioning, we do not delete the table data 
> before the job launch.
>  
> Is there a reason as to why we perform this delete before the job launch and 
> not as part of the Job commit operation? This issue is not there with Hive - 
> where the data is cleaned up as part of the Job commit operation probably. As 
> part of SPARK-19183, we did add a new hook in the commit protocol for this 
> exact same purpose, but seems like its default behavior is still to delete 
> the table data before the job launch.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-43106) Data lost from the table if the INSERT OVERWRITE query fails

2023-04-29 Thread Vaibhav Beriwala (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vaibhav Beriwala updated SPARK-43106:
-
Affects Version/s: 3.4.0
   (was: 3.3.2)

> Data lost from the table if the INSERT OVERWRITE query fails
> 
>
> Key: SPARK-43106
> URL: https://issues.apache.org/jira/browse/SPARK-43106
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.4.0
>Reporter: Vaibhav Beriwala
>Priority: Major
>
> When we run an INSERT OVERWRITE query for an unpartitioned table on Spark-3, 
> Spark has the following behavior:
> 1) It will first clean up all the data from the actual table path.
> 2) It will then launch a job that performs the actual insert.
>  
> There are 2 major issues with this approach:
> 1) If the insert job launched in step 2 above fails for any reason, the data 
> from the original table is lost. 
> 2) If the insert job in step 2 above takes a huge time to complete, then 
> table data is unavailable to other readers for the entire duration the job 
> takes.
> This behavior is the same even for the partitioned tables when using static 
> partitioning. For dynamic partitioning, we do not delete the table data 
> before the job launch.
>  
> Is there a reason as to why we perform this delete before the job launch and 
> not as part of the Job commit operation? This issue is not there with Hive - 
> where the data is cleaned up as part of the Job commit operation probably. As 
> part of SPARK-19183, we did add a new hook in the commit protocol for this 
> exact same purpose, but seems like its default behavior is still to delete 
> the table data before the job launch.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-43244) RocksDB State Store can accumulate unbounded native memory

2023-04-29 Thread Adam Binford (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Adam Binford resolved SPARK-43244.
--
Resolution: Duplicate

Will be addressed by https://issues.apache.org/jira/browse/SPARK-43311

> RocksDB State Store can accumulate unbounded native memory
> --
>
> Key: SPARK-43244
> URL: https://issues.apache.org/jira/browse/SPARK-43244
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 3.3.2
>Reporter: Adam Binford
>Priority: Major
>
> We noticed in one of our production stateful streaming jobs using RocksDB 
> that an executor with 20g of heap was using around 40g of resident memory. I 
> noticed a single RocksDB instance was using around 150 MiB of memory, and 
> only 5 MiB or so of this was from the write batch (which is now cleared after 
> committing).
> After reading about RocksDB memory usage (this link was helpful: 
> [https://github.com/EighteenZi/rocksdb_wiki/blob/master/Memory-usage-in-RocksDB.md])
>  I realized a lot of this was likely the "Index and Filters" memory usage. 
> This job is doing a streaming duplicate with a lot of unique keys so it makes 
> sense these block usages would be high. The problem is that, because as it is 
> now the underlying RocksDB instance stays open on an executor as long as that 
> executor is assigned that stateful partition (to be reused across batches). 
> So a single executor can accumulate a large number of RocksDB instances open 
> at once, each using a certain amount of native memory. In the worst case, a 
> single executor could need to keep open every single partitions' RocksDB 
> instance at once. 
> There are a couple ways you can control the amount of memory used, such as 
> limiting the max open files, or adding the option to use the block cache for 
> the indices and filters, but neither of these solve the underlying problem of 
> accumulating native memory from multiple partitions on an executor.
> The real fix needs to be a mechanism and option to close the underlying 
> RocksDB instance at the end of each task, so you have the option to only ever 
> have one RocksDB instance open at a time, thus having predictable memory 
> usage no matter the size of your data or number of shuffle partitions. 
> We are running this on Spark 3.3, but just kicked off a test to see if things 
> are any different in Spark 3.4.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-43326) CoalesceBucketsInJoin not working with SHUFFLE_HASH hint

2023-04-29 Thread Nikita Eshkeev (Jira)
Nikita Eshkeev created SPARK-43326:
--

 Summary: CoalesceBucketsInJoin not working with SHUFFLE_HASH hint
 Key: SPARK-43326
 URL: https://issues.apache.org/jira/browse/SPARK-43326
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.3.2
Reporter: Nikita Eshkeev


h1. NOTICE: related to SPARK-43021
h1. What I did

I define the following code:

{{from pyspark.sql import SparkSession}}

{{spark = (}}
{{  SparkSession}}
{{    .builder}}
{{    .appName("Bucketing")}}
{{    .master("local[4]")}}
{{    .config("spark.sql.bucketing.coalesceBucketsInJoin.enabled", True)}}
{{    .config("spark.sql.autoBroadcastJoinThreshold", "-1")}}
{{    .getOrCreate()}}
{{)}}

{{# AQE prevents CoalesceBucketsInJoin in 3.3.2 in 3.3.2}}
{{spark.conf.set("spark.sql.adaptive.enabled", False)}}

{{df1 = spark.range(0, 100)}}
{{df2 = spark.range(0, 100, 2)}}

{{df1.write.bucketBy(4, "id").mode("overwrite").saveAsTable("t1")}}
{{df2.write.bucketBy(2, "id").mode("overwrite").saveAsTable("t2")}}

{{t1 = spark.table("t1")}}
{{t2 = spark.table("t2")}}

{{t2.join(t1.hint("SHUFFLE_HASH"), "id").explain()}}

{{== Physical Plan ==}}
{{*(3) Project [id#23L|#23L]}}
{{+- *(3) ShuffledHashJoin [id#23L|#23L], [id#21L|#21L], Inner, BuildRight}}
{{:- Exchange hashpartitioning(id#23L, 4), ENSURE_REQUIREMENTS, [plan_id=667]}}
{{: +- *(1) Filter isnotnull(id#23L)}}
{{: +- *(1) ColumnarToRow}}
{{: +- FileScan parquet spark_catalog.default.t2[id#23L|#23L] Batched: true, 
Bucketed: false (disabled by query planner), DataFilters: 
[isnotnull(id#23L)|#23L)], Format: Parquet, Location: InMemoryFileIndex(1 
paths)[file:/home/jovyan/notebooks/spark-warehouse/t2|file:///home/jovyan/notebooks/spark-warehouse/t2],
 PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: 
struct}}
{{+- *(2) Filter isnotnull(id#21L)}}
{{+- *(2) ColumnarToRow}}
{{+- FileScan parquet spark_catalog.default.t1[id#21L|#21L] Batched: true, 
Bucketed: true, DataFilters: [isnotnull(id#21L)|#21L)], Format: Parquet, 
Location: InMemoryFileIndex(1 
paths)[file:/home/jovyan/notebooks/spark-warehouse/t1|file:///home/jovyan/notebooks/spark-warehouse/t1],
 PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: 
struct, SelectedBucketsCount: 4 out of 4}}
h1. What happened

There is an Exchange node in the join plan
h1. What is expected

According to the docs about 
[{{spark.sql.bucketing.coalesceBucketsInJoin.enabled}}|https://spark.apache.org/docs/latest/configuration.html#runtime-sql-configuration]
 there should not be any  Exchange/Shuffle nodes in the plan (the number of 
buckets of t1 is divided by the number of buckets of t2), but it doesn't say 
anything about what should happen if there are hints applied.

h1. Observation

# I checked other hints (MERGE and SHUFFLE_REPLICATE_NL) and they don't yield 
any Exchange nodes in the plan
# When I apply the hint to the t2 table ({{t2.hint("SHUFFLE_HASH").join(t1, 
"id")}}), no exchange happen for any hint.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-43325) regexp_extract_all DataFrame API

2023-04-29 Thread Sayed Mohammad Hossein Torabi (Jira)
Sayed Mohammad Hossein Torabi created SPARK-43325:
-

 Summary: regexp_extract_all DataFrame API 
 Key: SPARK-43325
 URL: https://issues.apache.org/jira/browse/SPARK-43325
 Project: Spark
  Issue Type: Improvement
  Components: PySpark, SQL
Affects Versions: 3.4.0
Reporter: Sayed Mohammad Hossein Torabi
 Fix For: 3.4.1


Implementing the `regexp_extract_all` DataFrame API and make it available for 
both scala/java and Python spark



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org