[
https://issues.apache.org/jira/browse/SPARK-30460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17012728#comment-17012728
]
Sachin Pasalkar commented on SPARK-30460:
-----------------------------------------
[~gsomogyi] Yes I am using S3 for checkpoint and as we know S3 do not support
appending object. However, if you look at the exception stack-trace, it seems
it is trying to append the object, which causing failure. If you follow the
stack trace `FileBasedWriteAheadLogWriter` gets `outputstream` using HDFSUtils.
However HDFSUtils, only supports case for HDFS not for the other non
append-able system.
I don't see it as issue of consistency model but bug in code
> Spark checkpoint failing after some run with S3 path
> -----------------------------------------------------
>
> Key: SPARK-30460
> URL: https://issues.apache.org/jira/browse/SPARK-30460
> Project: Spark
> Issue Type: Bug
> Components: DStreams
> Affects Versions: 2.4.4
> Reporter: Sachin Pasalkar
> Priority: Major
>
> We are using EMR with the SQS as source of stream. However it is failing,
> after 4-6 hours of run, with below exception. Application shows its running
> but stops the processing the messages
> {code:java}
> 2020-01-06 13:04:10,548 WARN [BatchedWriteAheadLog Writer]
> org.apache.spark.streaming.util.BatchedWriteAheadLog:BatchedWriteAheadLog
> Writer failed to write ArrayBuffer(Record(java.nio.HeapByteBuffer[pos=0
> lim=1226 cap=1226],1578315850302,Future(<not completed>)))
> java.lang.UnsupportedOperationException
> at
> com.amazon.ws.emr.hadoop.fs.s3n2.S3NativeFileSystem2.append(S3NativeFileSystem2.java:150)
> at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1181)
> at
> com.amazon.ws.emr.hadoop.fs.EmrFileSystem.append(EmrFileSystem.java:295)
> at
> org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35)
> at
> org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.stream$lzycompute(FileBasedWriteAheadLogWriter.scala:32)
> at
> org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.stream(FileBasedWriteAheadLogWriter.scala:32)
> at
> org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.<init>(FileBasedWriteAheadLogWriter.scala:35)
> at
> org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:229)
> at
> org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:94)
> at
> org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:50)
> at
> org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(BatchedWriteAheadLog.scala:175)
> at
> org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1.run(BatchedWriteAheadLog.scala:142)
> at java.lang.Thread.run(Thread.java:748)
> 2020-01-06 13:04:10,554 WARN [wal-batching-thread-pool-0]
> org.apache.spark.streaming.scheduler.ReceivedBlockTracker:Exception thrown
> while writing record:
> BlockAdditionEvent(ReceivedBlockInfo(0,Some(3),None,WriteAheadLogBasedStoreResult(input-0-1578315849800,Some(3),FileBasedWriteAheadLogSegment(s3://mss-prod-us-east-1-ueba-bucket/streaming/checkpoint/receivedData/0/log-1578315850001-1578315910001,0,5175))))
> to the WriteAheadLog.
> org.apache.spark.SparkException: Exception thrown in awaitResult:
> at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:226)
> at
> org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:84)
> at
> org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:242)
> at
> org.apache.spark.streaming.scheduler.ReceivedBlockTracker.addBlock(ReceivedBlockTracker.scala:89)
> at
> org.apache.spark.streaming.scheduler.ReceiverTracker.org$apache$spark$streaming$scheduler$ReceiverTracker$$addBlock(ReceiverTracker.scala:347)
> at
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1$$anonfun$run$1.apply$mcV$sp(ReceiverTracker.scala:522)
> at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1340)
> at
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1.run(ReceiverTracker.scala:520)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.UnsupportedOperationException
> at
> com.amazon.ws.emr.hadoop.fs.s3n2.S3NativeFileSystem2.append(S3NativeFileSystem2.java:150)
> at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1181)
> at
> com.amazon.ws.emr.hadoop.fs.EmrFileSystem.append(EmrFileSystem.java:295)
> at
> org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35)
> at
> org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.stream$lzycompute(FileBasedWriteAheadLogWriter.scala:32)
> at
> org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.stream(FileBasedWriteAheadLogWriter.scala:32)
> at
> org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.<init>(FileBasedWriteAheadLogWriter.scala:35)
> at
> org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:229)
> at
> org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:94)
> at
> org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:50)
> at
> org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(BatchedWriteAheadLog.scala:175)
> at
> org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1.run(BatchedWriteAheadLog.scala:142)
> ... 1 more
> 2020-01-06 13:04:10,568 WARN [dispatcher-event-loop-1]
> org.apache.spark.streaming.scheduler.ReceiverTracker:Error reported by
> receiver for stream 0: Error in block pushing thread -
> org.apache.spark.SparkException: Failed to add block to receiver tracker.
> at
> org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:163)
> at
> org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:129)
> at
> org.apache.spark.streaming.receiver.ReceiverSupervisorImpl$$anon$2.onPushBlock(ReceiverSupervisorImpl.scala:110)
> at
> org.apache.spark.streaming.receiver.BlockGenerator.pushBlock(BlockGenerator.scala:297)
> at
> org.apache.spark.streaming.receiver.BlockGenerator.org$apache$spark$streaming$receiver$BlockGenerator$$keepPushingBlocks(BlockGenerator.scala:269)
> at
> org.apache.spark.streaming.receiver.BlockGenerator$$anon$1.run(BlockGenerator.scala:110)
> {code}
> We have enabled the spark streaming with checkpoint using below code, we have
> removed the business logic in it.
> [https://stackoverflow.com/questions/59222815/spark-streaming-sqs-with-checkpoint-enable]
> When I looked at the code of HdfsUtils.scala I don't see a case where it has
> handled the case for S3 filesystem as it do not support append
> I am suggesting below change either to add
> {code:java}
> if (dfs.getScheme.toLowerCase.contains("s3")){ dfs.create(dfsPath) }
> {code}
> as below
> {code:java}
> def getOutputStream(path: String, conf: Configuration): FSDataOutputStream = {
> val dfsPath = new Path(path)
> val dfs = getFileSystemForPath(dfsPath, conf)
> // If the file exists and we have append support, append instead of
> creating a new file
> val stream: FSDataOutputStream = {
> if (dfs.isFile(dfsPath)) {
> if (dfs.getScheme.toLowerCase.contains("s3")){
> dfs.create(dfsPath)
> } else if (conf.getBoolean("dfs.support.append", true) ||
> conf.getBoolean("hdfs.append.support", false) ||
> dfs.isInstanceOf[RawLocalFileSystem]) {
> dfs.append(dfsPath)
> } else {
> throw new IllegalStateException("File exists and there is no append
> support!")
> }
> } else {
> dfs.create(dfsPath)
> }
> }
> stream
> }
> {code}
> OR
> Adding the check as assuming with S3 we enabled
> {noformat}
> WriteAheadLogUtils.RECEIVER_WAL_ENABLE_CONF_KEY{noformat}
> {code:java}
> if (conf.getBoolean(WriteAheadLogUtils.RECEIVER_WAL_ENABLE_CONF_KEY,false)){
> dfs.create(dfsPath) }
> {code}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]