[ 
https://issues.apache.org/jira/browse/SPARK-23050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16333477#comment-16333477
 ] 

Steve Loughran commented on SPARK-23050:
----------------------------------------

there's one thing which worries me here: the implication that its repeatable. 
caching of -ve entries is generally rarely observed, at leat for me. Two 
possibilities spring to mind

# the VM rented is fast enough that it really is outrunning S3. That's the 
WONTFIX
# The PUT isn't completing before {{commitTask()}} is called. That could happen 
if the upload is being done asynchronously: either the executor's thread is 
still running when the task is committed (unlikely) or the upload to S3 is 
being done in a thread pool and the final POST to complete the write is still 
outstanding. That I can believe

[~yash...@gmail.com], some questions & them some homework for you

* What EC2 VM type is this?
* How much data is being written?

Homework
* does it still happen on different VMs?
* does it happen if you generate less data?
* if you switch to the s3a connector and log org.apache.hadoop.fs.s3a at debug 
level, what does it say about completing the upload? and, assuming you set up 
log4j to log threads, which thread

Finally, if you set up S3 on your target bucket to log HTTP requests, what is 
the ordering of the PUT/POST of the upload and the HEAD of the getFileStatus 
call? That can be examined even with EMR?



> Structured Streaming with S3 file source duplicates data because of eventual 
> consistency.
> -----------------------------------------------------------------------------------------
>
>                 Key: SPARK-23050
>                 URL: https://issues.apache.org/jira/browse/SPARK-23050
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.2.0
>            Reporter: Yash Sharma
>            Priority: Major
>
> Spark Structured streaming with S3 file source duplicates data because of 
> eventual consistency.
> Re producing the scenario -
> - Structured streaming reading from S3 source. Writing back to S3.
> - Spark tries to commitTask on completion of a task, by verifying if all the 
> files have been written to Filesystem. 
> {{ManifestFileCommitProtocol.commitTask}}.
> - [Eventual consistency issue] Spark finds that the file is not present and 
> fails the task. {{org.apache.spark.SparkException: Task failed while writing 
> rows. No such file or directory 
> 's3://path/data/part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet'}}
> - By this time S3 eventually gets the file.
> - Spark reruns the task and completes the task, but gets a new file name this 
> time. {{ManifestFileCommitProtocol.newTaskTempFile. 
> part-00256-b62fa7a4-b7e0-43d6-8c38-9705076a7ee1-c000.snappy.parquet.}}
> - Data duplicates in results and the same data is processed twice and written 
> to S3.
> - There is no data duplication if spark is able to list presence of all 
> committed files and all tasks succeed.
> Code:
> {code}
> query = selected_df.writeStream \
>     .format("parquet") \
>     .option("compression", "snappy") \
>     .option("path", "s3://path/data/") \
>     .option("checkpointLocation", "s3://path/checkpoint/") \
>     .start()
> {code}
> Same sized duplicate S3 Files:
> {code}
> $ aws s3 ls s3://path/data/ | grep part-00256
> 2018-01-11 03:37:00      17070 
> part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet
> 2018-01-11 03:37:10      17070 
> part-00256-b62fa7a4-b7e0-43d6-8c38-9705076a7ee1-c000.snappy.parquet
> {code}
> Exception on S3 listing and task failure:
> {code}
> [Stage 5:========================>                            (277 + 100) / 
> 597]18/01/11 03:36:59 WARN TaskSetManager: Lost task 256.0 in stage 5.0 (TID  
> org.apache.spark.SparkException: Task failed while writing rows
>       at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)
>       at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:191)
>       at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:190)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>       at org.apache.spark.scheduler.Task.run(Task.scala:108)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
>  Caused by: java.io.FileNotFoundException: No such file or directory 
> 's3://path/data/part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet'
>       at 
> com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:816)
>       at 
> com.amazon.ws.emr.hadoop.fs.EmrFileSystem.getFileStatus(EmrFileSystem.java:509)
>       at 
> org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol$$anonfun$4.apply(ManifestFileCommitProtocol.scala:109)
>       at 
> org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol$$anonfun$4.apply(ManifestFileCommitProtocol.scala:109)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>       at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>       at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>       at 
> org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol.commitTask(ManifestFileCommitProtocol.scala:109)
>       at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:260)
>       at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:256)
>       at 
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1375)
>       at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:261)
>       ... 8 more
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to