[ 
https://issues.apache.org/jira/browse/SPARK-23050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16326330#comment-16326330
 ] 

Steve Loughran edited comment on SPARK-23050 at 1/15/18 3:24 PM:
-----------------------------------------------------------------

Quick review of the code

Yes, there's potentially a failure if a cached 404 is picked up in taskCommit.

It'd be slightly less brittle to return the array of URIs in the task commit 
message, have {{commitJob}} call getFileStatus() for each. That'd eliminate the 
problem except for any task committed immediately before commitJob & whose ref 
was still in the negative cache of the S3 load balances. It would also help 
catch the potential issue "file is lost between task commit and job commit". 
Even so, it'd be safe to do some little retry a bit ike ScalaTest;s 
{{eventually()}} to deal with that negative caching. it *should* only be for a 
few seconds, at worst (we don't have any real figures on it, it's usually so 
rarely seen, at least with the s3a client).

Following the commit Job code path, {{HDFSMetadataLog}} could be made object 
store aware, and opt for a direct (atomic) overwrite of the log, rather than 
the write to temp & rename. Without that, time to commit becomes O(files) 
rather than (1)

Oh, and the issue is the map in the commit job
{code:java}
  val statuses: Seq[SinkFileStatus] =
        addedFiles.map(f => SinkFileStatus(fs.getFileStatus(new Path(f))))
{code}
Probing S3 for a single file can take a few hundred millis, as it's a single 
HEAD for a file, [more for a 
directory|https://steveloughran.blogspot.co.uk/2016/12/how-long-does-filesystemexists-take.html].
 And the more queries you make, the higher the risk of S3 throttling. Doing 
that in the jobCommit amplifies the risk of throttling.

More efficient would be to do a single list of the task working dir into a map, 
then use that to look up the files. approximately O(1) per directory, though as 
LIST consistency lags create consistency, it's actually less reliable.

Finally, ORC may not write a file if there's nothing there to write, I 
believe...the file not being there isn't going to be a failure case 
(implication: any busy-wait for a file coming into existence should not wait 
that long)

+[~fabbri], [~Thomas Demoor], [~ehiggs]


was (Author: ste...@apache.org):
Quick review of the code

Yes, there's potentially a failure if a cached 404 is picked up in taskCommit.

It'd be slightly less brittle to return the array of URIs in the task commit 
message, have  {{commitJob}} call getFileStatus() for each. That'd eliminate 
the problem except for any task committed immediately before commitJob & whose 
ref was still in the negative cache of the S3 load balances. It would also  
help catch the potential issue "file is lost between task commit and job 
commit". Even so, it'd be safe to do some little retry a bit ike ScalaTest;s 
{{eventually()}} to deal with that negative caching. it *should* only be for a 
few seconds, at worst (we don't have any real figures on it, it's usually so 
rarely seen, at least with  the s3a client).

Following the commit Job code path, {{HDFSMetadataLog}} could be made object 
store aware, and opt for a direct (atomic) overwrite of the log, rather than 
the write to temp & rename. Without that, time to commit becomes O(files) 
rather than (1)


> Structured Streaming with S3 file source duplicates data because of eventual 
> consistency.
> -----------------------------------------------------------------------------------------
>
>                 Key: SPARK-23050
>                 URL: https://issues.apache.org/jira/browse/SPARK-23050
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.2.0
>            Reporter: Yash Sharma
>            Priority: Major
>
> Spark Structured streaming with S3 file source duplicates data because of 
> eventual consistency.
> Re producing the scenario -
> - Structured streaming reading from S3 source. Writing back to S3.
> - Spark tries to commitTask on completion of a task, by verifying if all the 
> files have been written to Filesystem. 
> {{ManifestFileCommitProtocol.commitTask}}.
> - [Eventual consistency issue] Spark finds that the file is not present and 
> fails the task. {{org.apache.spark.SparkException: Task failed while writing 
> rows. No such file or directory 
> 's3://path/data/part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet'}}
> - By this time S3 eventually gets the file.
> - Spark reruns the task and completes the task, but gets a new file name this 
> time. {{ManifestFileCommitProtocol.newTaskTempFile. 
> part-00256-b62fa7a4-b7e0-43d6-8c38-9705076a7ee1-c000.snappy.parquet.}}
> - Data duplicates in results and the same data is processed twice and written 
> to S3.
> - There is no data duplication if spark is able to list presence of all 
> committed files and all tasks succeed.
> Code:
> {code}
> query = selected_df.writeStream \
>     .format("parquet") \
>     .option("compression", "snappy") \
>     .option("path", "s3://path/data/") \
>     .option("checkpointLocation", "s3://path/checkpoint/") \
>     .start()
> {code}
> Same sized duplicate S3 Files:
> {code}
> $ aws s3 ls s3://path/data/ | grep part-00256
> 2018-01-11 03:37:00      17070 
> part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet
> 2018-01-11 03:37:10      17070 
> part-00256-b62fa7a4-b7e0-43d6-8c38-9705076a7ee1-c000.snappy.parquet
> {code}
> Exception on S3 listing and task failure:
> {code}
> [Stage 5:========================>                            (277 + 100) / 
> 597]18/01/11 03:36:59 WARN TaskSetManager: Lost task 256.0 in stage 5.0 (TID  
> org.apache.spark.SparkException: Task failed while writing rows
>       at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)
>       at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:191)
>       at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:190)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>       at org.apache.spark.scheduler.Task.run(Task.scala:108)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
>  Caused by: java.io.FileNotFoundException: No such file or directory 
> 's3://path/data/part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet'
>       at 
> com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:816)
>       at 
> com.amazon.ws.emr.hadoop.fs.EmrFileSystem.getFileStatus(EmrFileSystem.java:509)
>       at 
> org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol$$anonfun$4.apply(ManifestFileCommitProtocol.scala:109)
>       at 
> org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol$$anonfun$4.apply(ManifestFileCommitProtocol.scala:109)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>       at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>       at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>       at 
> org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol.commitTask(ManifestFileCommitProtocol.scala:109)
>       at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:260)
>       at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:256)
>       at 
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1375)
>       at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:261)
>       ... 8 more
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to