[ 
https://issues.apache.org/jira/browse/SPARK-30460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17017928#comment-17017928
 ] 

Steve Loughran commented on SPARK-30460:
----------------------------------------

bq. I understand spark 3.0 has new committer but as you said it is not deeply 
tested. 

well, been tested and shipping with Hortonworks and Cloudera releases of Spark 
for a while

but: 
# doesn't do stream checkpoints. Nobody has looked at that, though it's 
something I'd like to see.
# you are seeing a stack trace on EMR; they have their own reimplementation of 
the committer; it may be different.


bq. I had a long discussion with AWS folks but they are asking to report this 
to open source to verify it

that stack trace shows it's their S3 connector which is rejecting the request 
-but the S3A one is going to to reject it in exactly the same way. 

You going to need a way to checkpoint that does not use append. Sorry


> Spark checkpoint failing after some run with S3 path 
> -----------------------------------------------------
>
>                 Key: SPARK-30460
>                 URL: https://issues.apache.org/jira/browse/SPARK-30460
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams
>    Affects Versions: 2.4.4
>            Reporter: Sachin Pasalkar
>            Priority: Major
>
> We are using EMR with the SQS as source of stream. However it is failing, 
> after 4-6 hours of run, with below exception. Application shows its running 
> but stops the processing the messages
> {code:java}
> 2020-01-06 13:04:10,548 WARN [BatchedWriteAheadLog Writer] 
> org.apache.spark.streaming.util.BatchedWriteAheadLog:BatchedWriteAheadLog 
> Writer failed to write ArrayBuffer(Record(java.nio.HeapByteBuffer[pos=0 
> lim=1226 cap=1226],1578315850302,Future(<not completed>)))
> java.lang.UnsupportedOperationException
>       at 
> com.amazon.ws.emr.hadoop.fs.s3n2.S3NativeFileSystem2.append(S3NativeFileSystem2.java:150)
>       at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1181)
>       at 
> com.amazon.ws.emr.hadoop.fs.EmrFileSystem.append(EmrFileSystem.java:295)
>       at 
> org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35)
>       at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.stream$lzycompute(FileBasedWriteAheadLogWriter.scala:32)
>       at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.stream(FileBasedWriteAheadLogWriter.scala:32)
>       at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.<init>(FileBasedWriteAheadLogWriter.scala:35)
>       at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:229)
>       at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:94)
>       at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:50)
>       at 
> org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(BatchedWriteAheadLog.scala:175)
>       at 
> org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1.run(BatchedWriteAheadLog.scala:142)
>       at java.lang.Thread.run(Thread.java:748)
> 2020-01-06 13:04:10,554 WARN [wal-batching-thread-pool-0] 
> org.apache.spark.streaming.scheduler.ReceivedBlockTracker:Exception thrown 
> while writing record: 
> BlockAdditionEvent(ReceivedBlockInfo(0,Some(3),None,WriteAheadLogBasedStoreResult(input-0-1578315849800,Some(3),FileBasedWriteAheadLogSegment(s3://mss-prod-us-east-1-ueba-bucket/streaming/checkpoint/receivedData/0/log-1578315850001-1578315910001,0,5175))))
>  to the WriteAheadLog.
> org.apache.spark.SparkException: Exception thrown in awaitResult: 
>       at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:226)
>       at 
> org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:84)
>       at 
> org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:242)
>       at 
> org.apache.spark.streaming.scheduler.ReceivedBlockTracker.addBlock(ReceivedBlockTracker.scala:89)
>       at 
> org.apache.spark.streaming.scheduler.ReceiverTracker.org$apache$spark$streaming$scheduler$ReceiverTracker$$addBlock(ReceiverTracker.scala:347)
>       at 
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1$$anonfun$run$1.apply$mcV$sp(ReceiverTracker.scala:522)
>       at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1340)
>       at 
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1.run(ReceiverTracker.scala:520)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.UnsupportedOperationException
>       at 
> com.amazon.ws.emr.hadoop.fs.s3n2.S3NativeFileSystem2.append(S3NativeFileSystem2.java:150)
>       at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1181)
>       at 
> com.amazon.ws.emr.hadoop.fs.EmrFileSystem.append(EmrFileSystem.java:295)
>       at 
> org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35)
>       at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.stream$lzycompute(FileBasedWriteAheadLogWriter.scala:32)
>       at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.stream(FileBasedWriteAheadLogWriter.scala:32)
>       at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.<init>(FileBasedWriteAheadLogWriter.scala:35)
>       at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:229)
>       at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:94)
>       at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:50)
>       at 
> org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(BatchedWriteAheadLog.scala:175)
>       at 
> org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1.run(BatchedWriteAheadLog.scala:142)
>       ... 1 more
> 2020-01-06 13:04:10,568 WARN [dispatcher-event-loop-1] 
> org.apache.spark.streaming.scheduler.ReceiverTracker:Error reported by 
> receiver for stream 0: Error in block pushing thread - 
> org.apache.spark.SparkException: Failed to add block to receiver tracker.
>       at 
> org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:163)
>       at 
> org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:129)
>       at 
> org.apache.spark.streaming.receiver.ReceiverSupervisorImpl$$anon$2.onPushBlock(ReceiverSupervisorImpl.scala:110)
>       at 
> org.apache.spark.streaming.receiver.BlockGenerator.pushBlock(BlockGenerator.scala:297)
>       at 
> org.apache.spark.streaming.receiver.BlockGenerator.org$apache$spark$streaming$receiver$BlockGenerator$$keepPushingBlocks(BlockGenerator.scala:269)
>       at 
> org.apache.spark.streaming.receiver.BlockGenerator$$anon$1.run(BlockGenerator.scala:110)
> {code}
> We have enabled the spark streaming with checkpoint using below code, we have 
> removed the business logic in it. 
> [https://stackoverflow.com/questions/59222815/spark-streaming-sqs-with-checkpoint-enable]
> When I looked at the code of HdfsUtils.scala I don't see a case where it has 
> handled the case for S3 filesystem as it do not support append
> I am suggesting below change either to add 
> {code:java}
> if (dfs.getScheme.toLowerCase.contains("s3")){ dfs.create(dfsPath) }
> {code}
> as below
> {code:java}
> def getOutputStream(path: String, conf: Configuration): FSDataOutputStream = {
>   val dfsPath = new Path(path)
>   val dfs = getFileSystemForPath(dfsPath, conf)
>   // If the file exists and we have append support, append instead of 
> creating a new file
>   val stream: FSDataOutputStream = {
>     if (dfs.isFile(dfsPath)) {
>       if (dfs.getScheme.toLowerCase.contains("s3")){
>         dfs.create(dfsPath)
>       } else if (conf.getBoolean("dfs.support.append", true) || 
> conf.getBoolean("hdfs.append.support", false) || 
> dfs.isInstanceOf[RawLocalFileSystem]) {
>         dfs.append(dfsPath)
>       } else {
>         throw new IllegalStateException("File exists and there is no append 
> support!")
>       }
>     } else {
>       dfs.create(dfsPath)
>     }
>   }
>   stream
> }
> {code}
> OR
> Adding the check as assuming with S3 we enabled
> {noformat}
> WriteAheadLogUtils.RECEIVER_WAL_ENABLE_CONF_KEY{noformat}
> {code:java}
> if (conf.getBoolean(WriteAheadLogUtils.RECEIVER_WAL_ENABLE_CONF_KEY,false)){ 
> dfs.create(dfsPath) }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to