[ https://issues.apache.org/jira/browse/SPARK-23050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16326330#comment-16326330 ]
Steve Loughran commented on SPARK-23050: ---------------------------------------- Quick review of the code Yes, there's potentially a failure if a cached 404 is picked up in taskCommit. It'd be slightly less brittle to return the array of URIs in the task commit message, have {{commitJob}} call getFileStatus() for each. That'd eliminate the problem except for any task committed immediately before commitJob & whose ref was still in the negative cache of the S3 load balances. It would also help catch the potential issue "file is lost between task commit and job commit". Even so, it'd be safe to do some little retry a bit ike ScalaTest;s {{eventually()}} to deal with that negative caching. it *should* only be for a few seconds, at worst (we don't have any real figures on it, it's usually so rarely seen, at least with the s3a client). Following the commit Job code path, {{HDFSMetadataLog}} could be made object store aware, and opt for a direct (atomic) overwrite of the log, rather than the write to temp & rename. Without that, time to commit becomes O(files) rather than (1) > Structured Streaming with S3 file source duplicates data because of eventual > consistency. > ----------------------------------------------------------------------------------------- > > Key: SPARK-23050 > URL: https://issues.apache.org/jira/browse/SPARK-23050 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.2.0 > Reporter: Yash Sharma > Priority: Major > > Spark Structured streaming with S3 file source duplicates data because of > eventual consistency. > Re producing the scenario - > - Structured streaming reading from S3 source. Writing back to S3. > - Spark tries to commitTask on completion of a task, by verifying if all the > files have been written to Filesystem. > {{ManifestFileCommitProtocol.commitTask}}. > - [Eventual consistency issue] Spark finds that the file is not present and > fails the task. {{org.apache.spark.SparkException: Task failed while writing > rows. No such file or directory > 's3://path/data/part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet'}} > - By this time S3 eventually gets the file. > - Spark reruns the task and completes the task, but gets a new file name this > time. {{ManifestFileCommitProtocol.newTaskTempFile. > part-00256-b62fa7a4-b7e0-43d6-8c38-9705076a7ee1-c000.snappy.parquet.}} > - Data duplicates in results and the same data is processed twice and written > to S3. > - There is no data duplication if spark is able to list presence of all > committed files and all tasks succeed. > Code: > {code} > query = selected_df.writeStream \ > .format("parquet") \ > .option("compression", "snappy") \ > .option("path", "s3://path/data/") \ > .option("checkpointLocation", "s3://path/checkpoint/") \ > .start() > {code} > Same sized duplicate S3 Files: > {code} > $ aws s3 ls s3://path/data/ | grep part-00256 > 2018-01-11 03:37:00 17070 > part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet > 2018-01-11 03:37:10 17070 > part-00256-b62fa7a4-b7e0-43d6-8c38-9705076a7ee1-c000.snappy.parquet > {code} > Exception on S3 listing and task failure: > {code} > [Stage 5:========================> (277 + 100) / > 597]18/01/11 03:36:59 WARN TaskSetManager: Lost task 256.0 in stage 5.0 (TID > org.apache.spark.SparkException: Task failed while writing rows > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:191) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:190) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:108) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.FileNotFoundException: No such file or directory > 's3://path/data/part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet' > at > com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:816) > at > com.amazon.ws.emr.hadoop.fs.EmrFileSystem.getFileStatus(EmrFileSystem.java:509) > at > org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol$$anonfun$4.apply(ManifestFileCommitProtocol.scala:109) > at > org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol$$anonfun$4.apply(ManifestFileCommitProtocol.scala:109) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol.commitTask(ManifestFileCommitProtocol.scala:109) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:260) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:256) > at > org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1375) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:261) > ... 8 more > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org