[ https://issues.apache.org/jira/browse/SPARK-30460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17010742#comment-17010742 ]
Sachin Pasalkar commented on SPARK-30460: ----------------------------------------- I had a long discussion with AWS folks but they are asking to report this to open source to verify it > Spark checkpoint failing after some run with S3 path > ----------------------------------------------------- > > Key: SPARK-30460 > URL: https://issues.apache.org/jira/browse/SPARK-30460 > Project: Spark > Issue Type: Bug > Components: DStreams > Affects Versions: 2.4.4 > Reporter: Sachin Pasalkar > Priority: Major > > We are using EMR with the SQS as source of stream. However it is failing, > after 4-6 hours of run, with below exception. Application shows its running > but stops the processing the messages > {code:java} > 2020-01-06 13:04:10,548 WARN [BatchedWriteAheadLog Writer] > org.apache.spark.streaming.util.BatchedWriteAheadLog:BatchedWriteAheadLog > Writer failed to write ArrayBuffer(Record(java.nio.HeapByteBuffer[pos=0 > lim=1226 cap=1226],1578315850302,Future(<not completed>))) > java.lang.UnsupportedOperationException > at > com.amazon.ws.emr.hadoop.fs.s3n2.S3NativeFileSystem2.append(S3NativeFileSystem2.java:150) > at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1181) > at > com.amazon.ws.emr.hadoop.fs.EmrFileSystem.append(EmrFileSystem.java:295) > at > org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35) > at > org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.stream$lzycompute(FileBasedWriteAheadLogWriter.scala:32) > at > org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.stream(FileBasedWriteAheadLogWriter.scala:32) > at > org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.<init>(FileBasedWriteAheadLogWriter.scala:35) > at > org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:229) > at > org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:94) > at > org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:50) > at > org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(BatchedWriteAheadLog.scala:175) > at > org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1.run(BatchedWriteAheadLog.scala:142) > at java.lang.Thread.run(Thread.java:748) > 2020-01-06 13:04:10,554 WARN [wal-batching-thread-pool-0] > org.apache.spark.streaming.scheduler.ReceivedBlockTracker:Exception thrown > while writing record: > BlockAdditionEvent(ReceivedBlockInfo(0,Some(3),None,WriteAheadLogBasedStoreResult(input-0-1578315849800,Some(3),FileBasedWriteAheadLogSegment(s3://mss-prod-us-east-1-ueba-bucket/streaming/checkpoint/receivedData/0/log-1578315850001-1578315910001,0,5175)))) > to the WriteAheadLog. > org.apache.spark.SparkException: Exception thrown in awaitResult: > at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:226) > at > org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:84) > at > org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:242) > at > org.apache.spark.streaming.scheduler.ReceivedBlockTracker.addBlock(ReceivedBlockTracker.scala:89) > at > org.apache.spark.streaming.scheduler.ReceiverTracker.org$apache$spark$streaming$scheduler$ReceiverTracker$$addBlock(ReceiverTracker.scala:347) > at > org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1$$anonfun$run$1.apply$mcV$sp(ReceiverTracker.scala:522) > at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1340) > at > org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1.run(ReceiverTracker.scala:520) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.UnsupportedOperationException > at > com.amazon.ws.emr.hadoop.fs.s3n2.S3NativeFileSystem2.append(S3NativeFileSystem2.java:150) > at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1181) > at > com.amazon.ws.emr.hadoop.fs.EmrFileSystem.append(EmrFileSystem.java:295) > at > org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35) > at > org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.stream$lzycompute(FileBasedWriteAheadLogWriter.scala:32) > at > org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.stream(FileBasedWriteAheadLogWriter.scala:32) > at > org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.<init>(FileBasedWriteAheadLogWriter.scala:35) > at > org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:229) > at > org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:94) > at > org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:50) > at > org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(BatchedWriteAheadLog.scala:175) > at > org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1.run(BatchedWriteAheadLog.scala:142) > ... 1 more > 2020-01-06 13:04:10,568 WARN [dispatcher-event-loop-1] > org.apache.spark.streaming.scheduler.ReceiverTracker:Error reported by > receiver for stream 0: Error in block pushing thread - > org.apache.spark.SparkException: Failed to add block to receiver tracker. > at > org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:163) > at > org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:129) > at > org.apache.spark.streaming.receiver.ReceiverSupervisorImpl$$anon$2.onPushBlock(ReceiverSupervisorImpl.scala:110) > at > org.apache.spark.streaming.receiver.BlockGenerator.pushBlock(BlockGenerator.scala:297) > at > org.apache.spark.streaming.receiver.BlockGenerator.org$apache$spark$streaming$receiver$BlockGenerator$$keepPushingBlocks(BlockGenerator.scala:269) > at > org.apache.spark.streaming.receiver.BlockGenerator$$anon$1.run(BlockGenerator.scala:110) > {code} > We have enabled the spark streaming with checkpoint using below code, we have > removed the business logic in it. > [https://stackoverflow.com/questions/59222815/spark-streaming-sqs-with-checkpoint-enable] > When I looked at the code of HdfsUtils.scala I don't see a case where it has > handled the case for S3 filesystem as it do not support append > I am suggesting below change either to add > {code:java} > if (dfs.getScheme.toLowerCase.contains("s3")){ dfs.create(dfsPath) } > {code} > as below > {code:java} > def getOutputStream(path: String, conf: Configuration): FSDataOutputStream = { > val dfsPath = new Path(path) > val dfs = getFileSystemForPath(dfsPath, conf) > // If the file exists and we have append support, append instead of > creating a new file > val stream: FSDataOutputStream = { > if (dfs.isFile(dfsPath)) { > if (dfs.getScheme.toLowerCase.contains("s3")){ > dfs.create(dfsPath) > } else if (conf.getBoolean("dfs.support.append", true) || > conf.getBoolean("hdfs.append.support", false) || > dfs.isInstanceOf[RawLocalFileSystem]) { > dfs.append(dfsPath) > } else { > throw new IllegalStateException("File exists and there is no append > support!") > } > } else { > dfs.create(dfsPath) > } > } > stream > } > {code} > OR > Adding the check as assuming with S3 we enabled > {noformat} > WriteAheadLogUtils.RECEIVER_WAL_ENABLE_CONF_KEY{noformat} > {code:java} > if (conf.getBoolean(WriteAheadLogUtils.RECEIVER_WAL_ENABLE_CONF_KEY,false)){ > dfs.create(dfsPath) } > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org