[ https://issues.apache.org/jira/browse/SPARK-23050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16324745#comment-16324745 ]
Steve Loughran commented on SPARK-23050: ---------------------------------------- this s3n is the amazon EMR closed source impl; nothing the ASF can handle. AWS S3 has create consistency: if you create a file which has never existed and to a HEAD or GET against it, you will see it, so it shouldn't be failiing in getFileStatus except in the special case of the s3 load balancers briefling caching 404 responses. A previous HEAD before the write may have caused the 404 to be cached Amazon offer a consistent view as a [premium option|https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-consistent-view.html]. Not only should this offer list consistency, you are likely to get speedup on things like job partitioning, streaming scans of an s3 source bucket and similar. Disclaimer: I've not used it, have no evidence. the ASF s3a connector has something similar in Hadoop 3.1; S3Guard, again it uses DDB for consistency and performance. And HADOOP-13884 is the proposal to fix that negative cache problem. I've not looked @ the spark streaming commit protocol to see what it expects from its store. Clearly it expects the store to be "real" FS. If its just trying to enumerate stuff, not expect rename() to be atomic or similar, then some retry awaiting for an FNFE to go away could work. I'd need to look @ the code to see Meanwhile, for your entertainment, what it takes [to commit to S3|https://github.com/steveloughran/zero-rename-committer/releases/tag/tag_draft_002]. > Structured Streaming with S3 file source duplicates data because of eventual > consistency. > ----------------------------------------------------------------------------------------- > > Key: SPARK-23050 > URL: https://issues.apache.org/jira/browse/SPARK-23050 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.2.0 > Reporter: Yash Sharma > > Spark Structured streaming with S3 file source duplicates data because of > eventual consistency. > Re producing the scenario - > - Structured streaming reading from S3 source. Writing back to S3. > - Spark tries to commitTask on completion of a task, by verifying if all the > files have been written to Filesystem. > {{ManifestFileCommitProtocol.commitTask}}. > - [Eventual consistency issue] Spark finds that the file is not present and > fails the task. {{org.apache.spark.SparkException: Task failed while writing > rows. No such file or directory > 's3://path/data/part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet'}} > - By this time S3 eventually gets the file. > - Spark reruns the task and completes the task, but gets a new file name this > time. {{ManifestFileCommitProtocol.newTaskTempFile. > part-00256-b62fa7a4-b7e0-43d6-8c38-9705076a7ee1-c000.snappy.parquet.}} > - Data duplicates in results and the same data is processed twice and written > to S3. > - There is no data duplication if spark is able to list presence of all > committed files and all tasks succeed. > Code: > {code} > query = selected_df.writeStream \ > .format("parquet") \ > .option("compression", "snappy") \ > .option("path", "s3://path/data/") \ > .option("checkpointLocation", "s3://path/checkpoint/") \ > .start() > {code} > Same sized duplicate S3 Files: > {code} > $ aws s3 ls s3://path/data/ | grep part-00256 > 2018-01-11 03:37:00 17070 > part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet > 2018-01-11 03:37:10 17070 > part-00256-b62fa7a4-b7e0-43d6-8c38-9705076a7ee1-c000.snappy.parquet > {code} > Exception on S3 listing and task failure: > {code} > [Stage 5:========================> (277 + 100) / > 597]18/01/11 03:36:59 WARN TaskSetManager: Lost task 256.0 in stage 5.0 (TID > org.apache.spark.SparkException: Task failed while writing rows > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:191) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:190) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:108) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.FileNotFoundException: No such file or directory > 's3://path/data/part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet' > at > com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:816) > at > com.amazon.ws.emr.hadoop.fs.EmrFileSystem.getFileStatus(EmrFileSystem.java:509) > at > org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol$$anonfun$4.apply(ManifestFileCommitProtocol.scala:109) > at > org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol$$anonfun$4.apply(ManifestFileCommitProtocol.scala:109) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol.commitTask(ManifestFileCommitProtocol.scala:109) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:260) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:256) > at > org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1375) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:261) > ... 8 more > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org