[jira] [Commented] (SPARK-30460) Spark checkpoint failing after some run with S3 path

2020-01-17 Thread Steve Loughran (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17017928#comment-17017928
 ] 

Steve Loughran commented on SPARK-30460:


bq. I understand spark 3.0 has new committer but as you said it is not deeply 
tested. 

well, been tested and shipping with Hortonworks and Cloudera releases of Spark 
for a while

but: 
# doesn't do stream checkpoints. Nobody has looked at that, though it's 
something I'd like to see.
# you are seeing a stack trace on EMR; they have their own reimplementation of 
the committer; it may be different.


bq. I had a long discussion with AWS folks but they are asking to report this 
to open source to verify it

that stack trace shows it's their S3 connector which is rejecting the request 
-but the S3A one is going to to reject it in exactly the same way. 

You going to need a way to checkpoint that does not use append. Sorry


> Spark checkpoint failing after some run with S3 path 
> -
>
> Key: SPARK-30460
> URL: https://issues.apache.org/jira/browse/SPARK-30460
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.4.4
>Reporter: Sachin Pasalkar
>Priority: Major
>
> We are using EMR with the SQS as source of stream. However it is failing, 
> after 4-6 hours of run, with below exception. Application shows its running 
> but stops the processing the messages
> {code:java}
> 2020-01-06 13:04:10,548 WARN [BatchedWriteAheadLog Writer] 
> org.apache.spark.streaming.util.BatchedWriteAheadLog:BatchedWriteAheadLog 
> Writer failed to write ArrayBuffer(Record(java.nio.HeapByteBuffer[pos=0 
> lim=1226 cap=1226],1578315850302,Future()))
> java.lang.UnsupportedOperationException
>   at 
> com.amazon.ws.emr.hadoop.fs.s3n2.S3NativeFileSystem2.append(S3NativeFileSystem2.java:150)
>   at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1181)
>   at 
> com.amazon.ws.emr.hadoop.fs.EmrFileSystem.append(EmrFileSystem.java:295)
>   at 
> org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35)
>   at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.stream$lzycompute(FileBasedWriteAheadLogWriter.scala:32)
>   at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.stream(FileBasedWriteAheadLogWriter.scala:32)
>   at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.(FileBasedWriteAheadLogWriter.scala:35)
>   at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:229)
>   at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:94)
>   at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:50)
>   at 
> org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(BatchedWriteAheadLog.scala:175)
>   at 
> org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1.run(BatchedWriteAheadLog.scala:142)
>   at java.lang.Thread.run(Thread.java:748)
> 2020-01-06 13:04:10,554 WARN [wal-batching-thread-pool-0] 
> org.apache.spark.streaming.scheduler.ReceivedBlockTracker:Exception thrown 
> while writing record: 
> BlockAdditionEvent(ReceivedBlockInfo(0,Some(3),None,WriteAheadLogBasedStoreResult(input-0-1578315849800,Some(3),FileBasedWriteAheadLogSegment(s3://mss-prod-us-east-1-ueba-bucket/streaming/checkpoint/receivedData/0/log-1578315850001-1578315910001,0,5175
>  to the WriteAheadLog.
> org.apache.spark.SparkException: Exception thrown in awaitResult: 
>   at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:226)
>   at 
> org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:84)
>   at 
> org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:242)
>   at 
> org.apache.spark.streaming.scheduler.ReceivedBlockTracker.addBlock(ReceivedBlockTracker.scala:89)
>   at 
> org.apache.spark.streaming.scheduler.ReceiverTracker.org$apache$spark$streaming$scheduler$ReceiverTracker$$addBlock(ReceiverTracker.scala:347)
>   at 
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1$$anonfun$run$1.apply$mcV$sp(ReceiverTracker.scala:522)
>   at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1340)
>   at 
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1.run(ReceiverTracker.scala:520)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at 

[jira] [Commented] (SPARK-30460) Spark checkpoint failing after some run with S3 path

2020-01-10 Thread Gabor Somogyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17012928#comment-17012928
 ] 

Gabor Somogyi commented on SPARK-30460:
---

[~Sachin] OK, good luck then :)

> Spark checkpoint failing after some run with S3 path 
> -
>
> Key: SPARK-30460
> URL: https://issues.apache.org/jira/browse/SPARK-30460
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.4.4
>Reporter: Sachin Pasalkar
>Priority: Major
>
> We are using EMR with the SQS as source of stream. However it is failing, 
> after 4-6 hours of run, with below exception. Application shows its running 
> but stops the processing the messages
> {code:java}
> 2020-01-06 13:04:10,548 WARN [BatchedWriteAheadLog Writer] 
> org.apache.spark.streaming.util.BatchedWriteAheadLog:BatchedWriteAheadLog 
> Writer failed to write ArrayBuffer(Record(java.nio.HeapByteBuffer[pos=0 
> lim=1226 cap=1226],1578315850302,Future()))
> java.lang.UnsupportedOperationException
>   at 
> com.amazon.ws.emr.hadoop.fs.s3n2.S3NativeFileSystem2.append(S3NativeFileSystem2.java:150)
>   at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1181)
>   at 
> com.amazon.ws.emr.hadoop.fs.EmrFileSystem.append(EmrFileSystem.java:295)
>   at 
> org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35)
>   at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.stream$lzycompute(FileBasedWriteAheadLogWriter.scala:32)
>   at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.stream(FileBasedWriteAheadLogWriter.scala:32)
>   at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.(FileBasedWriteAheadLogWriter.scala:35)
>   at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:229)
>   at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:94)
>   at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:50)
>   at 
> org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(BatchedWriteAheadLog.scala:175)
>   at 
> org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1.run(BatchedWriteAheadLog.scala:142)
>   at java.lang.Thread.run(Thread.java:748)
> 2020-01-06 13:04:10,554 WARN [wal-batching-thread-pool-0] 
> org.apache.spark.streaming.scheduler.ReceivedBlockTracker:Exception thrown 
> while writing record: 
> BlockAdditionEvent(ReceivedBlockInfo(0,Some(3),None,WriteAheadLogBasedStoreResult(input-0-1578315849800,Some(3),FileBasedWriteAheadLogSegment(s3://mss-prod-us-east-1-ueba-bucket/streaming/checkpoint/receivedData/0/log-1578315850001-1578315910001,0,5175
>  to the WriteAheadLog.
> org.apache.spark.SparkException: Exception thrown in awaitResult: 
>   at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:226)
>   at 
> org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:84)
>   at 
> org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:242)
>   at 
> org.apache.spark.streaming.scheduler.ReceivedBlockTracker.addBlock(ReceivedBlockTracker.scala:89)
>   at 
> org.apache.spark.streaming.scheduler.ReceiverTracker.org$apache$spark$streaming$scheduler$ReceiverTracker$$addBlock(ReceiverTracker.scala:347)
>   at 
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1$$anonfun$run$1.apply$mcV$sp(ReceiverTracker.scala:522)
>   at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1340)
>   at 
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1.run(ReceiverTracker.scala:520)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.UnsupportedOperationException
>   at 
> com.amazon.ws.emr.hadoop.fs.s3n2.S3NativeFileSystem2.append(S3NativeFileSystem2.java:150)
>   at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1181)
>   at 
> com.amazon.ws.emr.hadoop.fs.EmrFileSystem.append(EmrFileSystem.java:295)
>   at 
> org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35)
>   at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.stream$lzycompute(FileBasedWriteAheadLogWriter.scala:32)
>   at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.stream(FileBasedWriteAheadLogWriter.scala:32)
>   at 
> 

[jira] [Commented] (SPARK-30460) Spark checkpoint failing after some run with S3 path

2020-01-10 Thread Sachin Pasalkar (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17012784#comment-17012784
 ] 

Sachin Pasalkar commented on SPARK-30460:
-

Yes may be or may be not. 

I was able to run this on my production for 4-6 hours without any other issues 
for 4-5 times. It always failed with this issue. If this fix the some part of 
problem we should fix it. 

I understand spark 3.0 has new committer but as you said it is not deeply 
tested. Soon I am going to run my Production with this fix in place, I will 
update ticket around next EOW. If I was able to run system smoothly or not

> Spark checkpoint failing after some run with S3 path 
> -
>
> Key: SPARK-30460
> URL: https://issues.apache.org/jira/browse/SPARK-30460
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.4.4
>Reporter: Sachin Pasalkar
>Priority: Major
>
> We are using EMR with the SQS as source of stream. However it is failing, 
> after 4-6 hours of run, with below exception. Application shows its running 
> but stops the processing the messages
> {code:java}
> 2020-01-06 13:04:10,548 WARN [BatchedWriteAheadLog Writer] 
> org.apache.spark.streaming.util.BatchedWriteAheadLog:BatchedWriteAheadLog 
> Writer failed to write ArrayBuffer(Record(java.nio.HeapByteBuffer[pos=0 
> lim=1226 cap=1226],1578315850302,Future()))
> java.lang.UnsupportedOperationException
>   at 
> com.amazon.ws.emr.hadoop.fs.s3n2.S3NativeFileSystem2.append(S3NativeFileSystem2.java:150)
>   at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1181)
>   at 
> com.amazon.ws.emr.hadoop.fs.EmrFileSystem.append(EmrFileSystem.java:295)
>   at 
> org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35)
>   at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.stream$lzycompute(FileBasedWriteAheadLogWriter.scala:32)
>   at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.stream(FileBasedWriteAheadLogWriter.scala:32)
>   at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.(FileBasedWriteAheadLogWriter.scala:35)
>   at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:229)
>   at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:94)
>   at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:50)
>   at 
> org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(BatchedWriteAheadLog.scala:175)
>   at 
> org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1.run(BatchedWriteAheadLog.scala:142)
>   at java.lang.Thread.run(Thread.java:748)
> 2020-01-06 13:04:10,554 WARN [wal-batching-thread-pool-0] 
> org.apache.spark.streaming.scheduler.ReceivedBlockTracker:Exception thrown 
> while writing record: 
> BlockAdditionEvent(ReceivedBlockInfo(0,Some(3),None,WriteAheadLogBasedStoreResult(input-0-1578315849800,Some(3),FileBasedWriteAheadLogSegment(s3://mss-prod-us-east-1-ueba-bucket/streaming/checkpoint/receivedData/0/log-1578315850001-1578315910001,0,5175
>  to the WriteAheadLog.
> org.apache.spark.SparkException: Exception thrown in awaitResult: 
>   at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:226)
>   at 
> org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:84)
>   at 
> org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:242)
>   at 
> org.apache.spark.streaming.scheduler.ReceivedBlockTracker.addBlock(ReceivedBlockTracker.scala:89)
>   at 
> org.apache.spark.streaming.scheduler.ReceiverTracker.org$apache$spark$streaming$scheduler$ReceiverTracker$$addBlock(ReceiverTracker.scala:347)
>   at 
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1$$anonfun$run$1.apply$mcV$sp(ReceiverTracker.scala:522)
>   at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1340)
>   at 
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1.run(ReceiverTracker.scala:520)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.UnsupportedOperationException
>   at 
> com.amazon.ws.emr.hadoop.fs.s3n2.S3NativeFileSystem2.append(S3NativeFileSystem2.java:150)
>   at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1181)
>   at 
> 

[jira] [Commented] (SPARK-30460) Spark checkpoint failing after some run with S3 path

2020-01-10 Thread Gabor Somogyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17012756#comment-17012756
 ] 

Gabor Somogyi commented on SPARK-30460:
---

[~Sachin] even if somebody hunt down this specific issue S3 checkpoint makes 
streaming jobs dead many other different ways.


> Spark checkpoint failing after some run with S3 path 
> -
>
> Key: SPARK-30460
> URL: https://issues.apache.org/jira/browse/SPARK-30460
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.4.4
>Reporter: Sachin Pasalkar
>Priority: Major
>
> We are using EMR with the SQS as source of stream. However it is failing, 
> after 4-6 hours of run, with below exception. Application shows its running 
> but stops the processing the messages
> {code:java}
> 2020-01-06 13:04:10,548 WARN [BatchedWriteAheadLog Writer] 
> org.apache.spark.streaming.util.BatchedWriteAheadLog:BatchedWriteAheadLog 
> Writer failed to write ArrayBuffer(Record(java.nio.HeapByteBuffer[pos=0 
> lim=1226 cap=1226],1578315850302,Future()))
> java.lang.UnsupportedOperationException
>   at 
> com.amazon.ws.emr.hadoop.fs.s3n2.S3NativeFileSystem2.append(S3NativeFileSystem2.java:150)
>   at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1181)
>   at 
> com.amazon.ws.emr.hadoop.fs.EmrFileSystem.append(EmrFileSystem.java:295)
>   at 
> org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35)
>   at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.stream$lzycompute(FileBasedWriteAheadLogWriter.scala:32)
>   at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.stream(FileBasedWriteAheadLogWriter.scala:32)
>   at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.(FileBasedWriteAheadLogWriter.scala:35)
>   at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:229)
>   at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:94)
>   at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:50)
>   at 
> org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(BatchedWriteAheadLog.scala:175)
>   at 
> org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1.run(BatchedWriteAheadLog.scala:142)
>   at java.lang.Thread.run(Thread.java:748)
> 2020-01-06 13:04:10,554 WARN [wal-batching-thread-pool-0] 
> org.apache.spark.streaming.scheduler.ReceivedBlockTracker:Exception thrown 
> while writing record: 
> BlockAdditionEvent(ReceivedBlockInfo(0,Some(3),None,WriteAheadLogBasedStoreResult(input-0-1578315849800,Some(3),FileBasedWriteAheadLogSegment(s3://mss-prod-us-east-1-ueba-bucket/streaming/checkpoint/receivedData/0/log-1578315850001-1578315910001,0,5175
>  to the WriteAheadLog.
> org.apache.spark.SparkException: Exception thrown in awaitResult: 
>   at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:226)
>   at 
> org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:84)
>   at 
> org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:242)
>   at 
> org.apache.spark.streaming.scheduler.ReceivedBlockTracker.addBlock(ReceivedBlockTracker.scala:89)
>   at 
> org.apache.spark.streaming.scheduler.ReceiverTracker.org$apache$spark$streaming$scheduler$ReceiverTracker$$addBlock(ReceiverTracker.scala:347)
>   at 
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1$$anonfun$run$1.apply$mcV$sp(ReceiverTracker.scala:522)
>   at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1340)
>   at 
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1.run(ReceiverTracker.scala:520)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.UnsupportedOperationException
>   at 
> com.amazon.ws.emr.hadoop.fs.s3n2.S3NativeFileSystem2.append(S3NativeFileSystem2.java:150)
>   at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1181)
>   at 
> com.amazon.ws.emr.hadoop.fs.EmrFileSystem.append(EmrFileSystem.java:295)
>   at 
> org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35)
>   at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.stream$lzycompute(FileBasedWriteAheadLogWriter.scala:32)
>   at 
> 

[jira] [Commented] (SPARK-30460) Spark checkpoint failing after some run with S3 path

2020-01-10 Thread Sachin Pasalkar (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17012728#comment-17012728
 ] 

Sachin Pasalkar commented on SPARK-30460:
-

[~gsomogyi] Yes I am using S3 for checkpoint and as we know S3 do not support 
appending object. However, if you look at the exception stack-trace, it seems 
it is trying to append the object, which causing failure. If you follow the 
stack trace `FileBasedWriteAheadLogWriter` gets `outputstream` using HDFSUtils. 
However HDFSUtils, only supports case for HDFS not for the other non 
append-able system.

I don't see it as issue of consistency model but bug in code

> Spark checkpoint failing after some run with S3 path 
> -
>
> Key: SPARK-30460
> URL: https://issues.apache.org/jira/browse/SPARK-30460
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.4.4
>Reporter: Sachin Pasalkar
>Priority: Major
>
> We are using EMR with the SQS as source of stream. However it is failing, 
> after 4-6 hours of run, with below exception. Application shows its running 
> but stops the processing the messages
> {code:java}
> 2020-01-06 13:04:10,548 WARN [BatchedWriteAheadLog Writer] 
> org.apache.spark.streaming.util.BatchedWriteAheadLog:BatchedWriteAheadLog 
> Writer failed to write ArrayBuffer(Record(java.nio.HeapByteBuffer[pos=0 
> lim=1226 cap=1226],1578315850302,Future()))
> java.lang.UnsupportedOperationException
>   at 
> com.amazon.ws.emr.hadoop.fs.s3n2.S3NativeFileSystem2.append(S3NativeFileSystem2.java:150)
>   at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1181)
>   at 
> com.amazon.ws.emr.hadoop.fs.EmrFileSystem.append(EmrFileSystem.java:295)
>   at 
> org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35)
>   at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.stream$lzycompute(FileBasedWriteAheadLogWriter.scala:32)
>   at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.stream(FileBasedWriteAheadLogWriter.scala:32)
>   at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.(FileBasedWriteAheadLogWriter.scala:35)
>   at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:229)
>   at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:94)
>   at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:50)
>   at 
> org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(BatchedWriteAheadLog.scala:175)
>   at 
> org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1.run(BatchedWriteAheadLog.scala:142)
>   at java.lang.Thread.run(Thread.java:748)
> 2020-01-06 13:04:10,554 WARN [wal-batching-thread-pool-0] 
> org.apache.spark.streaming.scheduler.ReceivedBlockTracker:Exception thrown 
> while writing record: 
> BlockAdditionEvent(ReceivedBlockInfo(0,Some(3),None,WriteAheadLogBasedStoreResult(input-0-1578315849800,Some(3),FileBasedWriteAheadLogSegment(s3://mss-prod-us-east-1-ueba-bucket/streaming/checkpoint/receivedData/0/log-1578315850001-1578315910001,0,5175
>  to the WriteAheadLog.
> org.apache.spark.SparkException: Exception thrown in awaitResult: 
>   at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:226)
>   at 
> org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:84)
>   at 
> org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:242)
>   at 
> org.apache.spark.streaming.scheduler.ReceivedBlockTracker.addBlock(ReceivedBlockTracker.scala:89)
>   at 
> org.apache.spark.streaming.scheduler.ReceiverTracker.org$apache$spark$streaming$scheduler$ReceiverTracker$$addBlock(ReceiverTracker.scala:347)
>   at 
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1$$anonfun$run$1.apply$mcV$sp(ReceiverTracker.scala:522)
>   at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1340)
>   at 
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1.run(ReceiverTracker.scala:520)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.UnsupportedOperationException
>   at 
> com.amazon.ws.emr.hadoop.fs.s3n2.S3NativeFileSystem2.append(S3NativeFileSystem2.java:150)
>   at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1181)
>   at 
> 

[jira] [Commented] (SPARK-30460) Spark checkpoint failing after some run with S3 path

2020-01-10 Thread Gabor Somogyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17012717#comment-17012717
 ] 

Gabor Somogyi commented on SPARK-30460:
---

[~Sachin] Do I understand it correctly that you're using S3 as checkpoint 
location? If so then all I can say it's not working because S3 read-after-write 
consistency model.
In Spark 3.0 there is a new output committer where the expectation is that it 
will work but not yet deeply tested...

> Spark checkpoint failing after some run with S3 path 
> -
>
> Key: SPARK-30460
> URL: https://issues.apache.org/jira/browse/SPARK-30460
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.4.4
>Reporter: Sachin Pasalkar
>Priority: Major
>
> We are using EMR with the SQS as source of stream. However it is failing, 
> after 4-6 hours of run, with below exception. Application shows its running 
> but stops the processing the messages
> {code:java}
> 2020-01-06 13:04:10,548 WARN [BatchedWriteAheadLog Writer] 
> org.apache.spark.streaming.util.BatchedWriteAheadLog:BatchedWriteAheadLog 
> Writer failed to write ArrayBuffer(Record(java.nio.HeapByteBuffer[pos=0 
> lim=1226 cap=1226],1578315850302,Future()))
> java.lang.UnsupportedOperationException
>   at 
> com.amazon.ws.emr.hadoop.fs.s3n2.S3NativeFileSystem2.append(S3NativeFileSystem2.java:150)
>   at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1181)
>   at 
> com.amazon.ws.emr.hadoop.fs.EmrFileSystem.append(EmrFileSystem.java:295)
>   at 
> org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35)
>   at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.stream$lzycompute(FileBasedWriteAheadLogWriter.scala:32)
>   at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.stream(FileBasedWriteAheadLogWriter.scala:32)
>   at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.(FileBasedWriteAheadLogWriter.scala:35)
>   at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:229)
>   at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:94)
>   at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:50)
>   at 
> org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(BatchedWriteAheadLog.scala:175)
>   at 
> org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1.run(BatchedWriteAheadLog.scala:142)
>   at java.lang.Thread.run(Thread.java:748)
> 2020-01-06 13:04:10,554 WARN [wal-batching-thread-pool-0] 
> org.apache.spark.streaming.scheduler.ReceivedBlockTracker:Exception thrown 
> while writing record: 
> BlockAdditionEvent(ReceivedBlockInfo(0,Some(3),None,WriteAheadLogBasedStoreResult(input-0-1578315849800,Some(3),FileBasedWriteAheadLogSegment(s3://mss-prod-us-east-1-ueba-bucket/streaming/checkpoint/receivedData/0/log-1578315850001-1578315910001,0,5175
>  to the WriteAheadLog.
> org.apache.spark.SparkException: Exception thrown in awaitResult: 
>   at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:226)
>   at 
> org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:84)
>   at 
> org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:242)
>   at 
> org.apache.spark.streaming.scheduler.ReceivedBlockTracker.addBlock(ReceivedBlockTracker.scala:89)
>   at 
> org.apache.spark.streaming.scheduler.ReceiverTracker.org$apache$spark$streaming$scheduler$ReceiverTracker$$addBlock(ReceiverTracker.scala:347)
>   at 
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1$$anonfun$run$1.apply$mcV$sp(ReceiverTracker.scala:522)
>   at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1340)
>   at 
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1.run(ReceiverTracker.scala:520)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.UnsupportedOperationException
>   at 
> com.amazon.ws.emr.hadoop.fs.s3n2.S3NativeFileSystem2.append(S3NativeFileSystem2.java:150)
>   at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1181)
>   at 
> com.amazon.ws.emr.hadoop.fs.EmrFileSystem.append(EmrFileSystem.java:295)
>   at 
> org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35)
>   at 

[jira] [Commented] (SPARK-30460) Spark checkpoint failing after some run with S3 path

2020-01-08 Thread Sachin Pasalkar (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17010742#comment-17010742
 ] 

Sachin Pasalkar commented on SPARK-30460:
-

I had a long discussion with AWS folks but they are asking to report this to 
open source to verify it

> Spark checkpoint failing after some run with S3 path 
> -
>
> Key: SPARK-30460
> URL: https://issues.apache.org/jira/browse/SPARK-30460
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.4.4
>Reporter: Sachin Pasalkar
>Priority: Major
>
> We are using EMR with the SQS as source of stream. However it is failing, 
> after 4-6 hours of run, with below exception. Application shows its running 
> but stops the processing the messages
> {code:java}
> 2020-01-06 13:04:10,548 WARN [BatchedWriteAheadLog Writer] 
> org.apache.spark.streaming.util.BatchedWriteAheadLog:BatchedWriteAheadLog 
> Writer failed to write ArrayBuffer(Record(java.nio.HeapByteBuffer[pos=0 
> lim=1226 cap=1226],1578315850302,Future()))
> java.lang.UnsupportedOperationException
>   at 
> com.amazon.ws.emr.hadoop.fs.s3n2.S3NativeFileSystem2.append(S3NativeFileSystem2.java:150)
>   at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1181)
>   at 
> com.amazon.ws.emr.hadoop.fs.EmrFileSystem.append(EmrFileSystem.java:295)
>   at 
> org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35)
>   at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.stream$lzycompute(FileBasedWriteAheadLogWriter.scala:32)
>   at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.stream(FileBasedWriteAheadLogWriter.scala:32)
>   at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.(FileBasedWriteAheadLogWriter.scala:35)
>   at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:229)
>   at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:94)
>   at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:50)
>   at 
> org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(BatchedWriteAheadLog.scala:175)
>   at 
> org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1.run(BatchedWriteAheadLog.scala:142)
>   at java.lang.Thread.run(Thread.java:748)
> 2020-01-06 13:04:10,554 WARN [wal-batching-thread-pool-0] 
> org.apache.spark.streaming.scheduler.ReceivedBlockTracker:Exception thrown 
> while writing record: 
> BlockAdditionEvent(ReceivedBlockInfo(0,Some(3),None,WriteAheadLogBasedStoreResult(input-0-1578315849800,Some(3),FileBasedWriteAheadLogSegment(s3://mss-prod-us-east-1-ueba-bucket/streaming/checkpoint/receivedData/0/log-1578315850001-1578315910001,0,5175
>  to the WriteAheadLog.
> org.apache.spark.SparkException: Exception thrown in awaitResult: 
>   at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:226)
>   at 
> org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:84)
>   at 
> org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:242)
>   at 
> org.apache.spark.streaming.scheduler.ReceivedBlockTracker.addBlock(ReceivedBlockTracker.scala:89)
>   at 
> org.apache.spark.streaming.scheduler.ReceiverTracker.org$apache$spark$streaming$scheduler$ReceiverTracker$$addBlock(ReceiverTracker.scala:347)
>   at 
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1$$anonfun$run$1.apply$mcV$sp(ReceiverTracker.scala:522)
>   at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1340)
>   at 
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1.run(ReceiverTracker.scala:520)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.UnsupportedOperationException
>   at 
> com.amazon.ws.emr.hadoop.fs.s3n2.S3NativeFileSystem2.append(S3NativeFileSystem2.java:150)
>   at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1181)
>   at 
> com.amazon.ws.emr.hadoop.fs.EmrFileSystem.append(EmrFileSystem.java:295)
>   at 
> org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35)
>   at 
> org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.stream$lzycompute(FileBasedWriteAheadLogWriter.scala:32)
>   at 
>