[ 
https://issues.apache.org/jira/browse/SPARK-34372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17281815#comment-17281815
 ] 

Attila Zsolt Piros commented on SPARK-34372:
--------------------------------------------

A Direct output committer with speculation could lead to this kind of problems 
even to data loss.

Please check this out
 https://issues.apache.org/jira/browse/SPARK-10063

Although DirectParquetOutputCommitter is removed you are using 
DirectFileOutputCommitter.

There must be a warning in the logs:
[https://github.com/apache/spark/blob/18b30107adb37d3c7a767a20cc02813f0fdb86da/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L1050-L1057]

> Speculation results in broken CSV files in Amazon S3
> ----------------------------------------------------
>
>                 Key: SPARK-34372
>                 URL: https://issues.apache.org/jira/browse/SPARK-34372
>             Project: Spark
>          Issue Type: Bug
>          Components: Input/Output
>    Affects Versions: 2.4.7
>         Environment: Amazon EMR with AMI version 5.32.0
>            Reporter: Daehee Han
>            Priority: Minor
>              Labels: csv, s3, spark, speculation
>
> Hi, we've been experiencing some rows get corrupted while partitioned CSV 
> files were written to Amazon S3. Some records were found broken without any 
> error on Spark. Digging into the root cause, we found out Spark speculation 
> tried to upload a partition being uploaded slowly and ended up uploading only 
> a part of the partition, letting broken data uploaded to S3.
> Here're stacktraces we've found. There are two executor involved - A: the 
> first executor which tried to upload the file, but it took much longer than 
> other executor (but still succeeded), which made spark speculation cut in and 
> kick off another executor B. Executor B started to upload the file too, but 
> was interrupted during uploading (killed: another attempt succeeded), and 
> ended up uploading only a part of the whole file. You can see in the log, the 
> file executor A uploaded (8461990 bytes originally) was overwritten by 
> executor B (uploaded only 3145728 bytes).
>  
> Executor A:
> {quote}21/01/28 17:22:21 INFO Executor: Running task 426.0 in stage 45.0 (TID 
> 13201) 
>  21/01/28 17:22:21 INFO ShuffleBlockFetcherIterator: Getting 470 non-empty 
> blocks including 10 local blocks and 460 remote blocks 
>  21/01/28 17:22:21 INFO ShuffleBlockFetcherIterator: Started 46 remote 
> fetches in 18 ms 
>  21/01/28 17:22:21 INFO FileOutputCommitter: File Output Committer Algorithm 
> version is 2 
>  21/01/28 17:22:21 INFO FileOutputCommitter: FileOutputCommitter skip cleanup 
> _temporary folders under output directory:false, ignore cleanup failures: 
> true 
>  21/01/28 17:22:21 INFO DirectFileOutputCommitter: Direct Write: ENABLED 
>  21/01/28 17:22:21 INFO SQLConfCommitterProvider: Using output committer class
>  21/01/28 17:22:21 INFO  INFO CSEMultipartUploadOutputStream: close 
> closed:false 
> s3://\{obfuscated}/part-00426-7d5677a9-f740-4db6-9d3c-dc589d75e965-c000.csv
>  21/01/28 17:22:31 INFO DefaultMultipartUploadDispatcher: Completed multipart 
> upload of 1 parts 8461990 bytes 
>  21/01/28 17:22:31 INFO CSEMultipartUploadOutputStream: Finished uploading 
> \{obfuscated}/part-00426-7d5677a9-f740-4db6-9d3c-dc589d75e965-c000.csv. 
> Elapsed seconds: 10. 
>  21/01/28 17:22:31 INFO SparkHadoopMapRedUtil: No need to commit output of 
> task because needsTaskCommit=false: 
> attempt_20210128172219_0045_m_000426_13201 
>  21/01/28 17:22:31 INFO Executor: Finished task 426.0 in stage 45.0 (TID 
> 13201). 8782 bytes result sent to driver
> {quote}
> Executor B:
> {quote}21/01/28 17:22:31 INFO CoarseGrainedExecutorBackend: Got assigned task 
> 13245 21/01/28 17:22:31 INFO Executor: Running task 426.1 in stage 45.0 (TID 
> 13245) 
>  21/01/28 17:22:31 INFO ShuffleBlockFetcherIterator: Getting 470 non-empty 
> blocks including 11 local blocks and 459 remote blocks 
>  21/01/28 17:22:31 INFO ShuffleBlockFetcherIterator: Started 46 remote 
> fetches in 2 ms 
>  21/01/28 17:22:31 INFO FileOutputCommitter: File Output Committer Algorithm 
> version is 2 
>  21/01/28 17:22:31 INFO FileOutputCommitter: FileOutputCommitter skip cleanup 
> _temporary folders under output directory:false, ignore cleanup failures: 
> true 
>  21/01/28 17:22:31 INFO DirectFileOutputCommitter: Direct Write: ENABLED 
>  21/01/28 17:22:31 INFO SQLConfCommitterProvider: Using output committer 
> class org.apache.hadoop.mapreduce.lib.output.DirectFileOutputCommitter 
>  21/01/28 17:22:31 INFO Executor: Executor is trying to kill task 426.1 in 
> stage 45.0 (TID 13245), reason: another attempt succeeded 
>  21/01/28 17:22:31 INFO CSEMultipartUploadOutputStream: close closed:false 
> s3://\{obfuscated}/part-00426-7d5677a9-f740-4db6-9d3c-dc589d75e965-c000.csv 
>  21/01/28 17:22:32 INFO DefaultMultipartUploadDispatcher: Completed multipart 
> upload of 1 parts 3145728 bytes 
>  21/01/28 17:22:32 INFO CSEMultipartUploadOutputStream: Finished uploading 
> \{obfuscated}/part-00426-7d5677a9-f740-4db6-9d3c-dc589d75e965-c000.csv. 
> Elapsed seconds: 0. 
>  21/01/28 17:22:32 ERROR Utils: Aborting task 
> com.univocity.parsers.common.TextWritingException: Error writing row. 
> Internal state when error was thrown: recordCount=18449, recordData=[
> Unknown macro: \{obfuscated}
> ] at 
> com.univocity.parsers.common.AbstractWriter.throwExceptionAndClose(AbstractWriter.java:935)
>  at 
> com.univocity.parsers.common.AbstractWriter.writeRow(AbstractWriter.java:714) 
> at 
> org.apache.spark.sql.execution.datasources.csv.UnivocityGenerator.write(UnivocityGenerator.scala:84)
>  at 
> org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.write(CSVFileFormat.scala:181)
>  at 
> org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:137)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:245)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
>  at 
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1439)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
>  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at 
> org.apache.spark.scheduler.Task.run(Task.scala:123) at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405) at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748) Caused by: 
> com.univocity.parsers.common.TextWritingException: Error writing row. 
> Internal state when error was thrown: recordCount=18449, 
> recordCharacters=\{obfuscated} at 
> com.univocity.parsers.common.AbstractWriter.throwExceptionAndClose(AbstractWriter.java:920)
>  at 
> com.univocity.parsers.common.AbstractWriter.internalWriteRow(AbstractWriter.java:829)
>  at 
> com.univocity.parsers.common.AbstractWriter.writeRow(AbstractWriter.java:712) 
> ... 17 more Caused by: java.io.InterruptedIOException at 
> java.io.PipedInputStream.awaitSpace(PipedInputStream.java:275) at 
> java.io.PipedInputStream.receive(PipedInputStream.java:231) at 
> java.io.PipedOutputStream.write(PipedOutputStream.java:149) at 
> com.amazon.ws.emr.hadoop.fs.cse.CSEMultipartUploadOutputStream.write(CSEMultipartUploadOutputStream.java:242)
>  at 
> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:60)
>  at java.io.DataOutputStream.write(DataOutputStream.java:107) at 
> sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221) at 
> sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282) at 
> sun.nio.cs.StreamEncoder.write(StreamEncoder.java:125) at 
> java.io.OutputStreamWriter.write(OutputStreamWriter.java:207) at 
> com.univocity.parsers.common.input.WriterCharAppender.writeCharsAndReset(WriterCharAppender.java:153)
>  at 
> com.univocity.parsers.common.AbstractWriter.internalWriteRow(AbstractWriter.java:826)
>  ... 18 more 
>  21/01/28 17:22:32 INFO DirectFileOutputCommitter: Nothing to clean up on 
> abort since there are no temporary files written 
>  21/01/28 17:22:32 ERROR FileFormatWriter: Job job_20210128172219_0045 
> aborted. 
>  21/01/28 17:22:32 INFO Executor: Executor interrupted and killed task 426.1 
> in stage 45.0 (TID 13245), reason: another attempt succeeded
> {quote}
> We'll be bypassing this problem by setting speculation off, however ideally 
> it seems that either speculation should be disabled in this stage (writing to 
> S3).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to