Hi all, I am using the filesystem state backend with checkpointing to S3. From the JobManager logs, I can see that it works most of the time, e.g.,
2016-07-26 17:49:07,311 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
checkpoint 3 @ 1469555347310
2016-07-26 17:49:11,128 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
checkpoint 3 (in 3335 ms)
However, taking the checkpoint fails with the
following exception from time to time:
2016-07-26 17:50:07,310 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
checkpoint 4 @ 1469555407310
2016-07-26 17:50:12,225 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph -
TriggerWindow(SlidingEventTimeWindows(3600000, 1000),
ListStateDescriptor{name=window-contents, defaultValue=null,
serializer=org.apache.flink.api.java.typeutils.runtime.PojoSerializer@103b8046},
EventTimeTrigger(), WindowedStream.apply(WindowedStream.java:226)) -> Sink:
Unnamed (1/1) (0ec242b46c49039f673dc902fd983f49) switched from RUNNING to FAILED
2016-07-26 17:50:12,227 INFO org.apache.flink.runtime.jobmanager.JobManager
- Status of job bd2930a4d6e7cf8d04d3bbafe22e386b ([...]) changed to
FAILING.
java.lang.RuntimeException: Error triggering a checkpoint as the result of
receiving checkpoint barrier
#011at
org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:701)
#011at
org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:691)
#011at
org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203)
#011at
org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129)
#011at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:175)
#011at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
#011at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
#011at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
#011at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Failed to fetch state handle size
#011at
org.apache.flink.runtime.taskmanager.RuntimeEnvironment.acknowledgeCheckpoint(RuntimeEnvironment.java:234)
#011at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:528)
#011at
org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:695)
#011... 8 more
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 403,
AWS Service: Amazon S3, AWS Request ID: [...], AWS Error Code: null, AWS Error
Message: Forbidden, S3 Extended Request ID: [...]
#011at
com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
#011at
com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
#011at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
#011at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
#011at
com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:976)
#011at
com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:956)
#011at
org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:892)
#011at
org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:77)
#011at
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:351)
#011at
org.apache.flink.runtime.state.filesystem.AbstractFileStateHandle.getFileSize(AbstractFileStateHandle.java:93)
#011at
org.apache.flink.runtime.state.filesystem.FileStreamStateHandle.getStateSize(FileStreamStateHandle.java:58)
#011at
org.apache.flink.runtime.state.AbstractStateBackend$DataInputViewHandle.getStateSize(AbstractStateBackend.java:428)
#011at
org.apache.flink.streaming.runtime.tasks.StreamTaskStateList.getStateSize(StreamTaskStateList.java:77)
#011at
org.apache.flink.runtime.taskmanager.RuntimeEnvironment.acknowledgeCheckpoint(RuntimeEnvironment.java:231)
#011... 10 more
All logs are from the machine running the JobManager.
The status code suggests that this is a problem with permissions.
However, I can see checkpoints stored correctly in the configured
S3 bucket. Also, sometimes old checkpoints are not removed. Does anybody
here experience the same problems? Can it be that S3 is flaky?
Find below my configuration:
Flink 1.0.3
libs/
aws-java-sdk-1.7.4.jar
hadoop-aws-2.7.2.jar
httpclient-4.2.5.jar
httpcore-4.2.5.jar
contents of core-site.xml
<configuration>
<property>
<name>fs.s3.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/data/tmp/hadoop</value>
</property>
<property>
<name>fs.s3a.attempts.maximum</name>
<value>10</value>
</property>
<property>
<name>fs.s3a.endpoint</name>
<value>s3-eu-west-1.amazonaws.com</value>
</property>
</configuration>
Best,
Gary
signature.asc
Description: Message signed with OpenPGP using GPGMail
