[jira] [Commented] (SPARK-44884) Spark doesn't create SUCCESS file in Spark 3.3.0+ when partitionOverwriteMode is dynamic

2024-07-22 Thread Anika Kelhanka (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-44884?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17867688#comment-17867688
 ] 

Anika Kelhanka commented on SPARK-44884:


*Issue:*
* This issue happens specifically when  {{partitionOverwriteMode = dynamic}} 
(Insert Overwrite - 
[SPARK-20236|https://issues.apache.org/jira/browse/SPARK-20236]).
* "_SUCCESS" file is created for spark version <= 3.0.2, given:  
{{"spark.hadoop.mapreduce.fileoutputcommitter.marksuccessfuljobs"=”true”}}.
* "_SUCCESS" file is not created for spark version > 3.0.2 even when 
{{"spark.hadoop.mapreduce.fileoutputcommitter.marksuccessfuljobs"=”true”}}.



*Analysis (RCA):*

* In the Spark versions prior to 3.0.2, the SUCCESS Marker file is created on 
the root path when spark job is successful. This is expected behavior.
* What changed: After the change for 
[SPARK-29302|https://issues.apache.org/jira/browse/SPARK-29302] (dynamic 
partition overwrite with speculation enabled) got merged, the SUCCESS marker 
file stopped getting created at the root location when the Spark job writes in 
dynamic partition override mode. 
* The change [SPARK-29302|https://issues.apache.org/jira/browse/SPARK-29302] 
(dynamic partition overwrite with speculation enabled) sets the 
{{committerOutputPath=${stagingDir}}} which previously stored root dir path, in 
[this 
codeblock|https://github.com/apache/spark/pull/29000/files#diff-15b529afe19e971b138fc604909bcab2e42484babdcea937f41d18cb22d9401dR167-R175].
 
* The {{committerOutputPath}} parameter is passed on to the hadoop committer, 
which creates the SUCCESS marker file at the path specified in 
{{committerOutputPath}} parameter. Thus, the SUCESS marker is now created 
inside the stagingDir.
* Once Hadoop committer has finished writing, The Spark Commit Protocol logic 
copies all the data files to root path, [but NOT the SUCCESS marker] before 
deleting the ${stagingDir}. 

* The stagingDir is then deleted along with SUCCESS Marker file. 


*Proposed Fix:*

The gap in this logic can be mended by adding a step to copy _SUCCESS file as 
well to the final location before deleting the stagingDir. 
Also, ensure that when 
{{"spark.hadoop.mapreduce.fileoutputcommitter.marksuccessfuljobs"=”false”}}, 
the _SUCCESS marker file will not be created by the Hadoop output committers in 
stagingDir itself.

I am working on a fix for same. 

> Spark doesn't create SUCCESS file in Spark 3.3.0+ when partitionOverwriteMode 
> is dynamic
> 
>
> Key: SPARK-44884
> URL: https://issues.apache.org/jira/browse/SPARK-44884
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.3.0
>Reporter: Dipayan Dev
>Priority: Major
> Attachments: image-2023-08-20-18-46-53-342.png, 
> image-2023-08-25-13-01-42-137.png
>
>
> The issue is not happening in Spark 2.x (I am using 2.4.0), but only in 3.3.0 
> (tested with 3.4.1 as well)
> Code to reproduce the issue
>  
> {code:java}
> scala> spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic") 
> scala> val DF = Seq(("test1", 123)).toDF("name", "num")
> scala> DF.write.option("path", 
> "gs://test_bucket/table").mode("overwrite").partitionBy("num").format("orc").saveAsTable("test_schema.test_tb1")
>  {code}
>  
> The above code succeeds and creates external Hive table, but {*}there is no 
> SUCCESS file generated{*}.
> Adding the content of the bucket after table creation
> !image-2023-08-25-13-01-42-137.png|width=500,height=130!
>  The same code when running with spark 2.4.0 (with or without external path), 
> generates the SUCCESS file.
> {code:java}
> scala> 
> DF.write.mode(SaveMode.Overwrite).partitionBy("num").format("orc").saveAsTable("test_schema.test_tb1"){code}
> !image-2023-08-20-18-46-53-342.png|width=465,height=166!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-44884) Spark doesn't create SUCCESS file in Spark 3.3.0+ when partitionOverwriteMode is dynamic

2024-07-22 Thread Anika Kelhanka (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-44884?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17867688#comment-17867688
 ] 

Anika Kelhanka edited comment on SPARK-44884 at 7/22/24 8:57 AM:
-

*Issue:*
* This issue happens specifically when  {{partitionOverwriteMode = dynamic}} 
(Insert Overwrite - 
[SPARK-20236|https://issues.apache.org/jira/browse/SPARK-20236]).
* "_SUCCESS" file is created for spark version <= 3.0.2, given:  
{{"spark.hadoop.mapreduce.fileoutputcommitter.marksuccessfuljobs"=”true”}}.
* "_SUCCESS" file is not created for spark version > 3.0.2 even when 
{{"spark.hadoop.mapreduce.fileoutputcommitter.marksuccessfuljobs"=”true”}}.

*Analysis (RCA):*

* In the Spark versions prior to 3.0.2, the SUCCESS Marker file is created on 
the root path when spark job is successful. This is expected behavior.
* What changed: After the change for 
[SPARK-29302|https://issues.apache.org/jira/browse/SPARK-29302] (dynamic 
partition overwrite with speculation enabled) got merged, the SUCCESS marker 
file stopped getting created at the root location when the Spark job writes in 
dynamic partition override mode. 
* The change [SPARK-29302|https://issues.apache.org/jira/browse/SPARK-29302] 
(dynamic partition overwrite with speculation enabled) sets the 
{{committerOutputPath=${stagingDir}}} which previously stored root dir path, in 
[this 
codeblock|https://github.com/apache/spark/pull/29000/files#diff-15b529afe19e971b138fc604909bcab2e42484babdcea937f41d18cb22d9401dR167-R175].
 
* The {{committerOutputPath}} parameter is passed on to the hadoop committer, 
which creates the SUCCESS marker file at the path specified in 
{{committerOutputPath}} parameter. Thus, the SUCESS marker is now created 
inside the stagingDir.
* Once Hadoop committer has finished writing, The Spark Commit Protocol logic 
copies all the data files to root path, [but NOT the SUCCESS marker] before 
deleting the ${stagingDir}. 

* The stagingDir is then deleted along with SUCCESS Marker file. 

*Proposed Fix:*

The gap in this logic can be mended by adding a step to copy _SUCCESS file as 
well to the final location before deleting the stagingDir. 
Also, ensure that when 
{{"spark.hadoop.mapreduce.fileoutputcommitter.marksuccessfuljobs"=”false”}}, 
the _SUCCESS marker file will not be created by the Hadoop output committers in 
stagingDir itself.

I am working on a fix for same. 


was (Author: anikakelhanka):
*Issue:*
* This issue happens specifically when  {{partitionOverwriteMode = dynamic}} 
(Insert Overwrite - 
[SPARK-20236|https://issues.apache.org/jira/browse/SPARK-20236]).
* "_SUCCESS" file is created for spark version <= 3.0.2, given:  
{{"spark.hadoop.mapreduce.fileoutputcommitter.marksuccessfuljobs"=”true”}}.
* "_SUCCESS" file is not created for spark version > 3.0.2 even when 
{{"spark.hadoop.mapreduce.fileoutputcommitter.marksuccessfuljobs"=”true”}}.



*Analysis (RCA):*

* In the Spark versions prior to 3.0.2, the SUCCESS Marker file is created on 
the root path when spark job is successful. This is expected behavior.
* What changed: After the change for 
[SPARK-29302|https://issues.apache.org/jira/browse/SPARK-29302] (dynamic 
partition overwrite with speculation enabled) got merged, the SUCCESS marker 
file stopped getting created at the root location when the Spark job writes in 
dynamic partition override mode. 
* The change [SPARK-29302|https://issues.apache.org/jira/browse/SPARK-29302] 
(dynamic partition overwrite with speculation enabled) sets the 
{{committerOutputPath=${stagingDir}}} which previously stored root dir path, in 
[this 
codeblock|https://github.com/apache/spark/pull/29000/files#diff-15b529afe19e971b138fc604909bcab2e42484babdcea937f41d18cb22d9401dR167-R175].
 
* The {{committerOutputPath}} parameter is passed on to the hadoop committer, 
which creates the SUCCESS marker file at the path specified in 
{{committerOutputPath}} parameter. Thus, the SUCESS marker is now created 
inside the stagingDir.
* Once Hadoop committer has finished writing, The Spark Commit Protocol logic 
copies all the data files to root path, [but NOT the SUCCESS marker] before 
deleting the ${stagingDir}. 

* The stagingDir is then deleted along with SUCCESS Marker file. 


*Proposed Fix:*

The gap in this logic can be mended by adding a step to copy _SUCCESS file as 
well to the final location before deleting the stagingDir. 
Also, ensure that when 
{{"spark.hadoop.mapreduce.fileoutputcommitter.marksuccessfuljobs"=”false”}}, 
the _SUCCESS marker file will not be created by the Hadoop output committers in 
stagingDir itself.

I am working on a fix for same. 

> Spark 

[jira] [Comment Edited] (SPARK-25080) NPE in HiveShim$.toCatalystDecimal(HiveShim.scala:110)

2020-10-09 Thread Anika Kelhanka (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-25080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17211011#comment-17211011
 ] 

Anika Kelhanka edited comment on SPARK-25080 at 10/9/20, 2:24 PM:
--

I am able to produce this issue while querying a external Hive on parquet table 
from spark shell in Spark 2.4. The scenarios is:  Certain decimal fields in 
parquet have value higher than the precision defined in hive table. Basically, 
Parquet has a value that needs to be converted to a target type with not enough 
precision. 

scala> val df = spark.sql("select 'dummy' as name, 
100010.7010 as value")

scala> df.write.mode("Overwrite").parquet("/my/hdfs/location/test")

 

hive> create external table db1.test_precision(name string, value 
Decimal(18,6)) STORED As PARQUET LOCATION '/my/hdfs/location/test';

 

scala> spark.conf.set("spark.sql.hive.convertMetastoreParquet","false")

scala> val df_hive = spark.sql("select * from db_gwm_morph_mrd.test_precision")

scala> df_hive.show
 20/10/09 09:33:12 WARN hadoop.ParquetRecordReader: Can not initialize counter 
due to context is not a instance of TaskInputOutputContext, but is 
org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
 20/10/09 09:33:12 ERROR executor.Executor: Exception in task 0.0 in stage 5.0 
(TID 5)
 java.lang.NullPointerException
 at org.apache.spark.sql.hive.HiveShim$.toCatalystDecimal(HiveShim.scala:107)
 at 
org.apache.spark.sql.hive.HadoopTableReader$$anonfun$14$$anonfun$apply$11.apply(TableReader.scala:415)
 at 
org.apache.spark.sql.hive.HadoopTableReader$$anonfun$14$$anonfun$apply$11.apply(TableReader.scala:414)
 at 
org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:443)
 at 
org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:434)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
 at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
 at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
 at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
 at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
 at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
 at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
 at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
 at org.apache.spark.scheduler.Task.run(Task.scala:121)
 at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)

 

 

 

 

 


was (Author: anikakelhanka):
I am able to produce this issue while querying a external Hive on parquet table 
with value is higher than the precision hive table is defined with (significant 
side) from spark shell in Spark 2.4. Th



scala> val df = spark.sql("select 'dummy' as name, 
100010.7010 as value")

scala> df.write.mode("Overwrite").parquet("/my/hdfs/location/test")

 

hive> create external table db1.test_precision(name string, value 
Decimal(18,6)) STORED As PARQUET LOCATION '/my/hdfs/location/test';

 

scala> spark.conf.set("spark.sql.hive.convertMetastoreParquet","false")

scala> val df_hive = spark.sql("select * from db_gwm_morph_mrd.test_precision")

scala> df_hive.show
20/10/09 09:33:12 WARN hadoop.ParquetRecordReader: Can not initialize counter 
due to context is not a instance of TaskInputOutputContext, but is 
org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
20/10/09 09:33:12 ERROR executor.Executor: Exception in task 0.0 in stage 5.0 
(TID 5)
java.lang.NullPointerException
 at org.apache.spark.sql.hive.HiveShim$.toCatalystDecimal(HiveShim.scala:107)
 at 
org.apache.spark.sql.hive.HadoopTableReader$$anonfun$14$$anonfun$apply$11.apply(TableReader.scala:415)
 at 

[jira] [Commented] (SPARK-25080) NPE in HiveShim$.toCatalystDecimal(HiveShim.scala:110)

2020-10-09 Thread Anika Kelhanka (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-25080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17211011#comment-17211011
 ] 

Anika Kelhanka commented on SPARK-25080:


I am able to produce this issue while querying a external Hive on parquet table 
with value is higher than the precision hive table is defined with (significant 
side) from spark shell in Spark 2.4. Th



scala> val df = spark.sql("select 'dummy' as name, 
100010.7010 as value")

scala> df.write.mode("Overwrite").parquet("/my/hdfs/location/test")

 

hive> create external table db1.test_precision(name string, value 
Decimal(18,6)) STORED As PARQUET LOCATION '/my/hdfs/location/test';

 

scala> spark.conf.set("spark.sql.hive.convertMetastoreParquet","false")

scala> val df_hive = spark.sql("select * from db_gwm_morph_mrd.test_precision")

scala> df_hive.show
20/10/09 09:33:12 WARN hadoop.ParquetRecordReader: Can not initialize counter 
due to context is not a instance of TaskInputOutputContext, but is 
org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
20/10/09 09:33:12 ERROR executor.Executor: Exception in task 0.0 in stage 5.0 
(TID 5)
java.lang.NullPointerException
 at org.apache.spark.sql.hive.HiveShim$.toCatalystDecimal(HiveShim.scala:107)
 at 
org.apache.spark.sql.hive.HadoopTableReader$$anonfun$14$$anonfun$apply$11.apply(TableReader.scala:415)
 at 
org.apache.spark.sql.hive.HadoopTableReader$$anonfun$14$$anonfun$apply$11.apply(TableReader.scala:414)
 at 
org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:443)
 at 
org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:434)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
 at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
 at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
 at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
 at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
 at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
 at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
 at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
 at org.apache.spark.scheduler.Task.run(Task.scala:121)
 at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)



 

 

 

 

 

> NPE in HiveShim$.toCatalystDecimal(HiveShim.scala:110)
> --
>
> Key: SPARK-25080
> URL: https://issues.apache.org/jira/browse/SPARK-25080
> Project: Spark
>  Issue Type: Improvement
>  Components: Input/Output
>Affects Versions: 2.3.1
> Environment: AWS EMR
>Reporter: Andrew K Long
>Priority: Minor
>
> NPE while reading hive table.
>  
> ```
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 1190 in stage 392.0 failed 4 times, most recent failure: Lost task 
> 1190.3 in stage 392.0 (TID 122055, ip-172-31-32-196.ec2.internal, executor 
> 487): java.lang.NullPointerException
> at org.apache.spark.sql.hive.HiveShim$.toCatalystDecimal(HiveShim.scala:110)
> at 
> org.apache.spark.sql.hive.HadoopTableReader$$anonfun$14$$anonfun$apply$11.apply(TableReader.scala:414)
> at 
> org.apache.spark.sql.hive.HadoopTableReader$$anonfun$14$$anonfun$apply$11.apply(TableReader.scala:413)
> at 
> org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:442)
> at 
> org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:433)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at 
>