[ 
https://issues.apache.org/jira/browse/SPARK-18512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15702368#comment-15702368
 ] 

Steve Loughran commented on SPARK-18512:
----------------------------------------

This looks like a consistency problem; s3 listing always lags the 
creation/deletion/update of contents.

the committer has listed paths to merge in, then gone through each one to see 
their type: if not a file, lists the subdirectory, and, interestingly gets an 
exception at this point. Maybe the first listing found an object which is no 
longer there by the time the second listing went through, that is, the 
exception isn't a delay-on-create, its a delay-on-delete. 

create listing delays could be handled in the committer by having a retry on an 
FNFE; it'd slighly increase the time before a failure, but as that's a failure 
path, not too serious; delete delays could be addressed the opposite: ignore 
the problem, on the basis that if the listing failed, there's no file to 
rename. That's more worrying as it's a sign of a problem which could have 
implications further up the commit process: things are changing in the listing 
of files being renamed.

HADOOP-13345 is going to address list inconsistency; I'm doing a committer 
there which I could also try to make more robust even when not using a 
dynamo-DB backed bucket. Question is: what is the good retry policy here, 
especially given once an inconsistency has surfaced, a large amount of the 
merge may already have taken place. Backing up and retrying may be differently 
dangerous.

One thing I would recommend trying is: commit to HDFS, then copy. Do that and 
you can turn speculation on in your executors, get the local Virtual HDD perf 
and networking, as well as a consistent view. copy to s3a after all that you 
want done is complete.





> FileNotFoundException on _temporary directory with Spark Streaming 2.0.1 and 
> S3A
> --------------------------------------------------------------------------------
>
>                 Key: SPARK-18512
>                 URL: https://issues.apache.org/jira/browse/SPARK-18512
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.0.1
>         Environment: AWS EMR 5.0.1
> Spark 2.0.1
> S3 EU-West-1 (S3A with read-after-write consistency)
>            Reporter: Giuseppe Bonaccorso
>
> After a few hours of streaming processing and data saving in Parquet format, 
> I got always this exception:
> {code:java}
> java.io.FileNotFoundException: No such file or directory: 
> s3a://xxx/_temporary/0/task_xxxx
>       at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1004)
>       at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.listStatus(S3AFileSystem.java:745)
>       at 
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:426)
>       at 
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:362)
>       at 
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:334)
>       at 
> org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:46)
>       at 
> org.apache.spark.sql.execution.datasources.BaseWriterContainer.commitJob(WriterContainer.scala:222)
>       at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelationCommand.scala:144)
>       at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply(InsertIntoHadoopFsRelationCommand.scala:115)
>       at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply(InsertIntoHadoopFsRelationCommand.scala:115)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
>       at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:115)
>       at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:60)
>       at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:58)
>       at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>       at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
>       at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
>       at 
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:86)
>       at 
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:86)
>       at 
> org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:510)
>       at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211)
>       at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:194)
>       at 
> org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:488)
> {code}
> I've tried also s3:// and s3n:// but it always happens after a 3-5 hours. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to