Hey Chen Qin,

this seems to be an issue with the S3 file system. The root cause is:

 Caused by: java.lang.NullPointerException at
org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:268)
at 
org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:344)
at 
org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:416)
at 
org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:198)
at org.apache.hadoop.fs.s3a.S3AOutputStream.<init>(S3AOutputStream.java:87)
at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:410)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:907) at
org.apache.hadoop.fs.FileSystem.create(FileSystem.java:888) at
org.apache.hadoop.fs.FileSystem.create(FileSystem.java:785) at
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:404)
at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:48)
at 
org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream.flush(FsStateBackend.java:489)
... 25 more

>From [1] it looks like you have to specify

fs.s3a.buffer.dir

in the Hadoop configuration (where you set the S3 file system).

The expected value is a comma separated list of local directories used
to buffer results prior to transmitting the to S3 (for large files).

Does this fix the issue? Please report back so that we can include in
the "common issues" section of the AWS docs.

– Ufuk

[1] http://deploymentzone.com/2015/12/20/s3a-on-spark-on-aws-ec2/


On Wed, May 4, 2016 at 2:41 AM, Chen Qin <qinnc...@gmail.com> wrote:
> Hi there,
>
> I run a test job with filestatebackend and save checkpoints on s3 (via s3a)
>
> The job crash when checkpoint triggered. Looking into s3 directory and list
> objects. I found the directory is create successfully but all checkpoints
> directory size are empty.
>
> The host running task manager shows following error.
>
> Received error response: com.amazonaws.services.s3.model.AmazonS3Exception:
> Status Code: 404, AWS Service: null, AWS Request ID: CF1845CA84E07549, AWS
> Error Code: null, AWS Error Message: Not Found, S3 Extended Request ID:xxxxx
>
> Has anyone met this issue before?
>
> flink 1.0.0
> scala 2.10
> hadoop-aws 2.7.2
> aws-java-sdk 1.7.4
>
>
> Thanks,
> Chen
>
> Attached full log that shows on web dashboard when job canceled.
> java.lang.RuntimeException: Error triggering a checkpoint as the result of
> receiving checkpoint barrier at
> org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:681)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:674)
> at
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203)
> at
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:175)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at
> java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Could
> not open output stream for state backend at
> org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream.flush(FsStateBackend.java:498)
> at
> org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream.write(FsStateBackend.java:444)
> at java.io.DataOutputStream.write(DataOutputStream.java:88) at
> java.io.DataOutputStream.write(DataOutputStream.java:88) at
> org.apache.flink.types.StringValue.writeString(StringValue.java:813) at
> org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:64)
> at
> org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
> at
> org.apache.flink.runtime.state.ArrayListSerializer.serialize(ArrayListSerializer.java:78)
> at
> org.apache.flink.runtime.state.ArrayListSerializer.serialize(ArrayListSerializer.java:27)
> at
> org.apache.flink.runtime.state.filesystem.AbstractFsState.snapshot(AbstractFsState.java:85)
> at
> org.apache.flink.runtime.state.AbstractStateBackend.snapshotPartitionedState(AbstractStateBackend.java:265)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotOperatorState(AbstractStreamOperator.java:175)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(AbstractUdfStreamOperator.java:121)
> at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.snapshotOperatorState(WindowOperator.java:509)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:481)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:678)
> ... 8 more Caused by: java.lang.NullPointerException at
> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:268)
> at
> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:344)
> at
> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:416)
> at
> org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:198)
> at org.apache.hadoop.fs.s3a.S3AOutputStream.<init>(S3AOutputStream.java:87)
> at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:410) at
> org.apache.hadoop.fs.FileSystem.create(FileSystem.java:907) at
> org.apache.hadoop.fs.FileSystem.create(FileSystem.java:888) at
> org.apache.hadoop.fs.FileSystem.create(FileSystem.java:785) at
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:404)
> at
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:48)
> at
> org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream.flush(FsStateBackend.java:489)
> ... 25 more
>

Reply via email to