[jira] [Commented] (SPARK-44884) Spark doesn't create SUCCESS file in Spark 3.3.0+ when partitionOverwriteMode is dynamic
[ https://issues.apache.org/jira/browse/SPARK-44884?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17867688#comment-17867688 ] Anika Kelhanka commented on SPARK-44884: *Issue:* * This issue happens specifically when {{partitionOverwriteMode = dynamic}} (Insert Overwrite - [SPARK-20236|https://issues.apache.org/jira/browse/SPARK-20236]). * "_SUCCESS" file is created for spark version <= 3.0.2, given: {{"spark.hadoop.mapreduce.fileoutputcommitter.marksuccessfuljobs"=”true”}}. * "_SUCCESS" file is not created for spark version > 3.0.2 even when {{"spark.hadoop.mapreduce.fileoutputcommitter.marksuccessfuljobs"=”true”}}. *Analysis (RCA):* * In the Spark versions prior to 3.0.2, the SUCCESS Marker file is created on the root path when spark job is successful. This is expected behavior. * What changed: After the change for [SPARK-29302|https://issues.apache.org/jira/browse/SPARK-29302] (dynamic partition overwrite with speculation enabled) got merged, the SUCCESS marker file stopped getting created at the root location when the Spark job writes in dynamic partition override mode. * The change [SPARK-29302|https://issues.apache.org/jira/browse/SPARK-29302] (dynamic partition overwrite with speculation enabled) sets the {{committerOutputPath=${stagingDir}}} which previously stored root dir path, in [this codeblock|https://github.com/apache/spark/pull/29000/files#diff-15b529afe19e971b138fc604909bcab2e42484babdcea937f41d18cb22d9401dR167-R175]. * The {{committerOutputPath}} parameter is passed on to the hadoop committer, which creates the SUCCESS marker file at the path specified in {{committerOutputPath}} parameter. Thus, the SUCESS marker is now created inside the stagingDir. * Once Hadoop committer has finished writing, The Spark Commit Protocol logic copies all the data files to root path, [but NOT the SUCCESS marker] before deleting the ${stagingDir}. * The stagingDir is then deleted along with SUCCESS Marker file. *Proposed Fix:* The gap in this logic can be mended by adding a step to copy _SUCCESS file as well to the final location before deleting the stagingDir. Also, ensure that when {{"spark.hadoop.mapreduce.fileoutputcommitter.marksuccessfuljobs"=”false”}}, the _SUCCESS marker file will not be created by the Hadoop output committers in stagingDir itself. I am working on a fix for same. > Spark doesn't create SUCCESS file in Spark 3.3.0+ when partitionOverwriteMode > is dynamic > > > Key: SPARK-44884 > URL: https://issues.apache.org/jira/browse/SPARK-44884 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.3.0 >Reporter: Dipayan Dev >Priority: Major > Attachments: image-2023-08-20-18-46-53-342.png, > image-2023-08-25-13-01-42-137.png > > > The issue is not happening in Spark 2.x (I am using 2.4.0), but only in 3.3.0 > (tested with 3.4.1 as well) > Code to reproduce the issue > > {code:java} > scala> spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic") > scala> val DF = Seq(("test1", 123)).toDF("name", "num") > scala> DF.write.option("path", > "gs://test_bucket/table").mode("overwrite").partitionBy("num").format("orc").saveAsTable("test_schema.test_tb1") > {code} > > The above code succeeds and creates external Hive table, but {*}there is no > SUCCESS file generated{*}. > Adding the content of the bucket after table creation > !image-2023-08-25-13-01-42-137.png|width=500,height=130! > The same code when running with spark 2.4.0 (with or without external path), > generates the SUCCESS file. > {code:java} > scala> > DF.write.mode(SaveMode.Overwrite).partitionBy("num").format("orc").saveAsTable("test_schema.test_tb1"){code} > !image-2023-08-20-18-46-53-342.png|width=465,height=166! > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-44884) Spark doesn't create SUCCESS file in Spark 3.3.0+ when partitionOverwriteMode is dynamic
[ https://issues.apache.org/jira/browse/SPARK-44884?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17867688#comment-17867688 ] Anika Kelhanka edited comment on SPARK-44884 at 7/22/24 8:57 AM: - *Issue:* * This issue happens specifically when {{partitionOverwriteMode = dynamic}} (Insert Overwrite - [SPARK-20236|https://issues.apache.org/jira/browse/SPARK-20236]). * "_SUCCESS" file is created for spark version <= 3.0.2, given: {{"spark.hadoop.mapreduce.fileoutputcommitter.marksuccessfuljobs"=”true”}}. * "_SUCCESS" file is not created for spark version > 3.0.2 even when {{"spark.hadoop.mapreduce.fileoutputcommitter.marksuccessfuljobs"=”true”}}. *Analysis (RCA):* * In the Spark versions prior to 3.0.2, the SUCCESS Marker file is created on the root path when spark job is successful. This is expected behavior. * What changed: After the change for [SPARK-29302|https://issues.apache.org/jira/browse/SPARK-29302] (dynamic partition overwrite with speculation enabled) got merged, the SUCCESS marker file stopped getting created at the root location when the Spark job writes in dynamic partition override mode. * The change [SPARK-29302|https://issues.apache.org/jira/browse/SPARK-29302] (dynamic partition overwrite with speculation enabled) sets the {{committerOutputPath=${stagingDir}}} which previously stored root dir path, in [this codeblock|https://github.com/apache/spark/pull/29000/files#diff-15b529afe19e971b138fc604909bcab2e42484babdcea937f41d18cb22d9401dR167-R175]. * The {{committerOutputPath}} parameter is passed on to the hadoop committer, which creates the SUCCESS marker file at the path specified in {{committerOutputPath}} parameter. Thus, the SUCESS marker is now created inside the stagingDir. * Once Hadoop committer has finished writing, The Spark Commit Protocol logic copies all the data files to root path, [but NOT the SUCCESS marker] before deleting the ${stagingDir}. * The stagingDir is then deleted along with SUCCESS Marker file. *Proposed Fix:* The gap in this logic can be mended by adding a step to copy _SUCCESS file as well to the final location before deleting the stagingDir. Also, ensure that when {{"spark.hadoop.mapreduce.fileoutputcommitter.marksuccessfuljobs"=”false”}}, the _SUCCESS marker file will not be created by the Hadoop output committers in stagingDir itself. I am working on a fix for same. was (Author: anikakelhanka): *Issue:* * This issue happens specifically when {{partitionOverwriteMode = dynamic}} (Insert Overwrite - [SPARK-20236|https://issues.apache.org/jira/browse/SPARK-20236]). * "_SUCCESS" file is created for spark version <= 3.0.2, given: {{"spark.hadoop.mapreduce.fileoutputcommitter.marksuccessfuljobs"=”true”}}. * "_SUCCESS" file is not created for spark version > 3.0.2 even when {{"spark.hadoop.mapreduce.fileoutputcommitter.marksuccessfuljobs"=”true”}}. *Analysis (RCA):* * In the Spark versions prior to 3.0.2, the SUCCESS Marker file is created on the root path when spark job is successful. This is expected behavior. * What changed: After the change for [SPARK-29302|https://issues.apache.org/jira/browse/SPARK-29302] (dynamic partition overwrite with speculation enabled) got merged, the SUCCESS marker file stopped getting created at the root location when the Spark job writes in dynamic partition override mode. * The change [SPARK-29302|https://issues.apache.org/jira/browse/SPARK-29302] (dynamic partition overwrite with speculation enabled) sets the {{committerOutputPath=${stagingDir}}} which previously stored root dir path, in [this codeblock|https://github.com/apache/spark/pull/29000/files#diff-15b529afe19e971b138fc604909bcab2e42484babdcea937f41d18cb22d9401dR167-R175]. * The {{committerOutputPath}} parameter is passed on to the hadoop committer, which creates the SUCCESS marker file at the path specified in {{committerOutputPath}} parameter. Thus, the SUCESS marker is now created inside the stagingDir. * Once Hadoop committer has finished writing, The Spark Commit Protocol logic copies all the data files to root path, [but NOT the SUCCESS marker] before deleting the ${stagingDir}. * The stagingDir is then deleted along with SUCCESS Marker file. *Proposed Fix:* The gap in this logic can be mended by adding a step to copy _SUCCESS file as well to the final location before deleting the stagingDir. Also, ensure that when {{"spark.hadoop.mapreduce.fileoutputcommitter.marksuccessfuljobs"=”false”}}, the _SUCCESS marker file will not be created by the Hadoop output committers in stagingDir itself. I am working on a fix for same. > Spark
[jira] [Comment Edited] (SPARK-25080) NPE in HiveShim$.toCatalystDecimal(HiveShim.scala:110)
[ https://issues.apache.org/jira/browse/SPARK-25080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17211011#comment-17211011 ] Anika Kelhanka edited comment on SPARK-25080 at 10/9/20, 2:24 PM: -- I am able to produce this issue while querying a external Hive on parquet table from spark shell in Spark 2.4. The scenarios is: Certain decimal fields in parquet have value higher than the precision defined in hive table. Basically, Parquet has a value that needs to be converted to a target type with not enough precision. scala> val df = spark.sql("select 'dummy' as name, 100010.7010 as value") scala> df.write.mode("Overwrite").parquet("/my/hdfs/location/test") hive> create external table db1.test_precision(name string, value Decimal(18,6)) STORED As PARQUET LOCATION '/my/hdfs/location/test'; scala> spark.conf.set("spark.sql.hive.convertMetastoreParquet","false") scala> val df_hive = spark.sql("select * from db_gwm_morph_mrd.test_precision") scala> df_hive.show 20/10/09 09:33:12 WARN hadoop.ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl 20/10/09 09:33:12 ERROR executor.Executor: Exception in task 0.0 in stage 5.0 (TID 5) java.lang.NullPointerException at org.apache.spark.sql.hive.HiveShim$.toCatalystDecimal(HiveShim.scala:107) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$14$$anonfun$apply$11.apply(TableReader.scala:415) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$14$$anonfun$apply$11.apply(TableReader.scala:414) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:443) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:434) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) was (Author: anikakelhanka): I am able to produce this issue while querying a external Hive on parquet table with value is higher than the precision hive table is defined with (significant side) from spark shell in Spark 2.4. Th scala> val df = spark.sql("select 'dummy' as name, 100010.7010 as value") scala> df.write.mode("Overwrite").parquet("/my/hdfs/location/test") hive> create external table db1.test_precision(name string, value Decimal(18,6)) STORED As PARQUET LOCATION '/my/hdfs/location/test'; scala> spark.conf.set("spark.sql.hive.convertMetastoreParquet","false") scala> val df_hive = spark.sql("select * from db_gwm_morph_mrd.test_precision") scala> df_hive.show 20/10/09 09:33:12 WARN hadoop.ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl 20/10/09 09:33:12 ERROR executor.Executor: Exception in task 0.0 in stage 5.0 (TID 5) java.lang.NullPointerException at org.apache.spark.sql.hive.HiveShim$.toCatalystDecimal(HiveShim.scala:107) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$14$$anonfun$apply$11.apply(TableReader.scala:415) at
[jira] [Commented] (SPARK-25080) NPE in HiveShim$.toCatalystDecimal(HiveShim.scala:110)
[ https://issues.apache.org/jira/browse/SPARK-25080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17211011#comment-17211011 ] Anika Kelhanka commented on SPARK-25080: I am able to produce this issue while querying a external Hive on parquet table with value is higher than the precision hive table is defined with (significant side) from spark shell in Spark 2.4. Th scala> val df = spark.sql("select 'dummy' as name, 100010.7010 as value") scala> df.write.mode("Overwrite").parquet("/my/hdfs/location/test") hive> create external table db1.test_precision(name string, value Decimal(18,6)) STORED As PARQUET LOCATION '/my/hdfs/location/test'; scala> spark.conf.set("spark.sql.hive.convertMetastoreParquet","false") scala> val df_hive = spark.sql("select * from db_gwm_morph_mrd.test_precision") scala> df_hive.show 20/10/09 09:33:12 WARN hadoop.ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl 20/10/09 09:33:12 ERROR executor.Executor: Exception in task 0.0 in stage 5.0 (TID 5) java.lang.NullPointerException at org.apache.spark.sql.hive.HiveShim$.toCatalystDecimal(HiveShim.scala:107) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$14$$anonfun$apply$11.apply(TableReader.scala:415) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$14$$anonfun$apply$11.apply(TableReader.scala:414) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:443) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:434) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) > NPE in HiveShim$.toCatalystDecimal(HiveShim.scala:110) > -- > > Key: SPARK-25080 > URL: https://issues.apache.org/jira/browse/SPARK-25080 > Project: Spark > Issue Type: Improvement > Components: Input/Output >Affects Versions: 2.3.1 > Environment: AWS EMR >Reporter: Andrew K Long >Priority: Minor > > NPE while reading hive table. > > ``` > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 1190 in stage 392.0 failed 4 times, most recent failure: Lost task > 1190.3 in stage 392.0 (TID 122055, ip-172-31-32-196.ec2.internal, executor > 487): java.lang.NullPointerException > at org.apache.spark.sql.hive.HiveShim$.toCatalystDecimal(HiveShim.scala:110) > at > org.apache.spark.sql.hive.HadoopTableReader$$anonfun$14$$anonfun$apply$11.apply(TableReader.scala:414) > at > org.apache.spark.sql.hive.HadoopTableReader$$anonfun$14$$anonfun$apply$11.apply(TableReader.scala:413) > at > org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:442) > at > org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:433) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at >