[ 
https://issues.apache.org/jira/browse/SPARK-19013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15801059#comment-15801059
 ] 

Steve Loughran commented on SPARK-19013:
----------------------------------------

ok, that is potentially the problem. One thing here, this is using s3://, which 
implies to me that this is is EMR, which has the option of 

# I think someone could rework S3a create, so that on ovewrite=true, the check 
for the file existing is skipped: we only need to make sure the destination 
isn't a directory. HADOOP-13950 covers that. As usual, code with tests welcome.
# For object stores where close() does PUT, and PUT is atomic, Spark could 
bypass the write+ rename strategy altogether. It could just be a direct write, 
knowing that the output will atomically overwrite then destination when it has 
finished. This actually delivers better atomicity/consistency than rename, as 
well as being faster.

For point #2, the hard part is having Spark know that the operation is going to 
have that characteristic. I could put it in the FS API. HADOOP-9565 has long 
discussed this, but it's been hard to come up with a good strategy here which 
supports very different behaviours, not just across store schemas, but 
potentially against different endpoints. Example; enterprise S3 endpoints may 
have different semantics than AWS S3.

How about if hadoop added a configuration option to list all object store 
schemas; such as {{fs.schemas.object-stores.atomic-put-on-close}}, which would 
default to" "s3n, s3a, wasb, gcs", on EMR amazon could add s3 there. (ASF 
deprecated s3 is different, see). Custom deployments could say different 
things, just through reconfiguration. While this doesn't address different 
semantics on different buckets, it will let spark, hive, etc, see that they are 
working with an object store *without needing any changes in the FS API*

This could be probed with a different policy
{code}
val putDirect = 
conf.getStringCollection("fs.schemas.object-stores.atomic-put-on-close").contains(dest.toURI.getSchema)
if (putDirect) { /* do a direct create+write */ } else { do write + rename }
{code}


This will make a big different on large file checkpoints. Currently, the time 
to close() is data/upload-bandwidth; rename is data/store-copy-bandwidth 
(~6MB/s on S3).
With the latest s3a block upload, you get uploads during the write, but the PUT 
is still atomic in the close(). With that and no-rename, way faster.


> java.util.ConcurrentModificationException when using s3 path as 
> checkpointLocation 
> -----------------------------------------------------------------------------------
>
>                 Key: SPARK-19013
>                 URL: https://issues.apache.org/jira/browse/SPARK-19013
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.0.2
>            Reporter: Tim Chan
>
> I have a structured stream job running on EMR. The job will fail due to this
> {code}
> Multiple HDFSMetadataLog are using s3://mybucket/myapp 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.org$apache$spark$sql$execution$streaming$HDFSMetadataLog$$writeBatch(HDFSMetadataLog.scala:162)
> {code}
> There is only one instance of this stream job running.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to