[ https://issues.apache.org/jira/browse/SPARK-34372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17281815#comment-17281815 ]
Attila Zsolt Piros commented on SPARK-34372: -------------------------------------------- A Direct output committer with speculation could lead to this kind of problems even to data loss. Please check this out https://issues.apache.org/jira/browse/SPARK-10063 Although DirectParquetOutputCommitter is removed you are using DirectFileOutputCommitter. There must be a warning in the logs: [https://github.com/apache/spark/blob/18b30107adb37d3c7a767a20cc02813f0fdb86da/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L1050-L1057] > Speculation results in broken CSV files in Amazon S3 > ---------------------------------------------------- > > Key: SPARK-34372 > URL: https://issues.apache.org/jira/browse/SPARK-34372 > Project: Spark > Issue Type: Bug > Components: Input/Output > Affects Versions: 2.4.7 > Environment: Amazon EMR with AMI version 5.32.0 > Reporter: Daehee Han > Priority: Minor > Labels: csv, s3, spark, speculation > > Hi, we've been experiencing some rows get corrupted while partitioned CSV > files were written to Amazon S3. Some records were found broken without any > error on Spark. Digging into the root cause, we found out Spark speculation > tried to upload a partition being uploaded slowly and ended up uploading only > a part of the partition, letting broken data uploaded to S3. > Here're stacktraces we've found. There are two executor involved - A: the > first executor which tried to upload the file, but it took much longer than > other executor (but still succeeded), which made spark speculation cut in and > kick off another executor B. Executor B started to upload the file too, but > was interrupted during uploading (killed: another attempt succeeded), and > ended up uploading only a part of the whole file. You can see in the log, the > file executor A uploaded (8461990 bytes originally) was overwritten by > executor B (uploaded only 3145728 bytes). > > Executor A: > {quote}21/01/28 17:22:21 INFO Executor: Running task 426.0 in stage 45.0 (TID > 13201) > 21/01/28 17:22:21 INFO ShuffleBlockFetcherIterator: Getting 470 non-empty > blocks including 10 local blocks and 460 remote blocks > 21/01/28 17:22:21 INFO ShuffleBlockFetcherIterator: Started 46 remote > fetches in 18 ms > 21/01/28 17:22:21 INFO FileOutputCommitter: File Output Committer Algorithm > version is 2 > 21/01/28 17:22:21 INFO FileOutputCommitter: FileOutputCommitter skip cleanup > _temporary folders under output directory:false, ignore cleanup failures: > true > 21/01/28 17:22:21 INFO DirectFileOutputCommitter: Direct Write: ENABLED > 21/01/28 17:22:21 INFO SQLConfCommitterProvider: Using output committer class > 21/01/28 17:22:21 INFO INFO CSEMultipartUploadOutputStream: close > closed:false > s3://\{obfuscated}/part-00426-7d5677a9-f740-4db6-9d3c-dc589d75e965-c000.csv > 21/01/28 17:22:31 INFO DefaultMultipartUploadDispatcher: Completed multipart > upload of 1 parts 8461990 bytes > 21/01/28 17:22:31 INFO CSEMultipartUploadOutputStream: Finished uploading > \{obfuscated}/part-00426-7d5677a9-f740-4db6-9d3c-dc589d75e965-c000.csv. > Elapsed seconds: 10. > 21/01/28 17:22:31 INFO SparkHadoopMapRedUtil: No need to commit output of > task because needsTaskCommit=false: > attempt_20210128172219_0045_m_000426_13201 > 21/01/28 17:22:31 INFO Executor: Finished task 426.0 in stage 45.0 (TID > 13201). 8782 bytes result sent to driver > {quote} > Executor B: > {quote}21/01/28 17:22:31 INFO CoarseGrainedExecutorBackend: Got assigned task > 13245 21/01/28 17:22:31 INFO Executor: Running task 426.1 in stage 45.0 (TID > 13245) > 21/01/28 17:22:31 INFO ShuffleBlockFetcherIterator: Getting 470 non-empty > blocks including 11 local blocks and 459 remote blocks > 21/01/28 17:22:31 INFO ShuffleBlockFetcherIterator: Started 46 remote > fetches in 2 ms > 21/01/28 17:22:31 INFO FileOutputCommitter: File Output Committer Algorithm > version is 2 > 21/01/28 17:22:31 INFO FileOutputCommitter: FileOutputCommitter skip cleanup > _temporary folders under output directory:false, ignore cleanup failures: > true > 21/01/28 17:22:31 INFO DirectFileOutputCommitter: Direct Write: ENABLED > 21/01/28 17:22:31 INFO SQLConfCommitterProvider: Using output committer > class org.apache.hadoop.mapreduce.lib.output.DirectFileOutputCommitter > 21/01/28 17:22:31 INFO Executor: Executor is trying to kill task 426.1 in > stage 45.0 (TID 13245), reason: another attempt succeeded > 21/01/28 17:22:31 INFO CSEMultipartUploadOutputStream: close closed:false > s3://\{obfuscated}/part-00426-7d5677a9-f740-4db6-9d3c-dc589d75e965-c000.csv > 21/01/28 17:22:32 INFO DefaultMultipartUploadDispatcher: Completed multipart > upload of 1 parts 3145728 bytes > 21/01/28 17:22:32 INFO CSEMultipartUploadOutputStream: Finished uploading > \{obfuscated}/part-00426-7d5677a9-f740-4db6-9d3c-dc589d75e965-c000.csv. > Elapsed seconds: 0. > 21/01/28 17:22:32 ERROR Utils: Aborting task > com.univocity.parsers.common.TextWritingException: Error writing row. > Internal state when error was thrown: recordCount=18449, recordData=[ > Unknown macro: \{obfuscated} > ] at > com.univocity.parsers.common.AbstractWriter.throwExceptionAndClose(AbstractWriter.java:935) > at > com.univocity.parsers.common.AbstractWriter.writeRow(AbstractWriter.java:714) > at > org.apache.spark.sql.execution.datasources.csv.UnivocityGenerator.write(UnivocityGenerator.scala:84) > at > org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.write(CSVFileFormat.scala:181) > at > org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:137) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:245) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242) > at > org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1439) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at > org.apache.spark.scheduler.Task.run(Task.scala:123) at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405) at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) Caused by: > com.univocity.parsers.common.TextWritingException: Error writing row. > Internal state when error was thrown: recordCount=18449, > recordCharacters=\{obfuscated} at > com.univocity.parsers.common.AbstractWriter.throwExceptionAndClose(AbstractWriter.java:920) > at > com.univocity.parsers.common.AbstractWriter.internalWriteRow(AbstractWriter.java:829) > at > com.univocity.parsers.common.AbstractWriter.writeRow(AbstractWriter.java:712) > ... 17 more Caused by: java.io.InterruptedIOException at > java.io.PipedInputStream.awaitSpace(PipedInputStream.java:275) at > java.io.PipedInputStream.receive(PipedInputStream.java:231) at > java.io.PipedOutputStream.write(PipedOutputStream.java:149) at > com.amazon.ws.emr.hadoop.fs.cse.CSEMultipartUploadOutputStream.write(CSEMultipartUploadOutputStream.java:242) > at > org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:60) > at java.io.DataOutputStream.write(DataOutputStream.java:107) at > sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221) at > sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282) at > sun.nio.cs.StreamEncoder.write(StreamEncoder.java:125) at > java.io.OutputStreamWriter.write(OutputStreamWriter.java:207) at > com.univocity.parsers.common.input.WriterCharAppender.writeCharsAndReset(WriterCharAppender.java:153) > at > com.univocity.parsers.common.AbstractWriter.internalWriteRow(AbstractWriter.java:826) > ... 18 more > 21/01/28 17:22:32 INFO DirectFileOutputCommitter: Nothing to clean up on > abort since there are no temporary files written > 21/01/28 17:22:32 ERROR FileFormatWriter: Job job_20210128172219_0045 > aborted. > 21/01/28 17:22:32 INFO Executor: Executor interrupted and killed task 426.1 > in stage 45.0 (TID 13245), reason: another attempt succeeded > {quote} > We'll be bypassing this problem by setting speculation off, however ideally > it seems that either speculation should be disabled in this stage (writing to > S3). -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org