Hi there,
I have been testing checkpointing on rocksdb backed by s3. Checkpoints
seems successful except snapshot states of timeWindow operator on
keyedStream. Here is the env setting I used
env.setStateBackend(new RocksDBStateBackend(new URI("s3://backupdir/")))
The checkpoint for always fail consistently when it goes to window operator
snapshotting. Exception log attached below.
I tried to env.setStateBackend(new RocksDBStateBackend(new URI(
"file:///tmp/checkpoints"))); or MemoryStateBackend(default) works no issue
with checkpoints.
Does anyone saw this issue before? Or did I mess up with configuration?
Thanks,
Chen
2016-05-16 17:20:32,132 INFO
org.apache.flink.runtime.state.filesystem.FsStateBackend -
Initializing file state backend to URI
s3://xxx/checkpoints/7e6abf126ce3d18f173733b34eda81a9
2016-05-16 17:20:32,423 INFO
org.apache.flink.streaming.runtime.tasks.StreamTask - Using
user-defined state backend:
org.apache.flink.contrib.streaming.state.RocksDBStateBackend@2fa68a53
2016-05-16 17:20:32,423 INFO
org.apache.flink.runtime.state.filesystem.FsStateBackend -
Initializing file state backend to URI
s3://uber-beats/sjc1/checkpoints/7e6abf126ce3d18f173733b34eda81a9
2016-05-16 17:21:31,423 INFO
org.apache.flink.contrib.streaming.state.AbstractRocksDBState - RocksDB
(/directory/flink-io-723a5c14-2a8a-4abc-881a-9a60138816b0/7e6abf126ce3d18f173733b34eda81a9/WindowOperator_131_0/window-contents/dbc64864de-8373-4b41-bd74-a26a8007f066)
backup (synchronous part) took 8 ms.
2016-05-16 17:21:36,125 ERROR
org.apache.flink.streaming.runtime.tasks.StreamTask - Caught
exception while materializing asynchronous checkpoints.
com.amazonaws.AmazonClientException: Unable to calculate MD5
hash:/directory//flink-io-723a5c14-2a8a-4abc-881a-9a60138816b0/7e6abf126ce3d18f173733b34eda81a9/WindowOperator_131_0/window-contents/local-chk-599
(Is a directory)
at
com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298)
at
com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108)
at
com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100)
at
com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192)
at
com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150)
at
com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
...
Tests look like
.setParallelism(1).assignTimestampsAndWatermarks(new
AssignerWithPunctuatedWatermarks<String>() {
@Override
public Watermark checkAndGetNextWatermark(String s, long l) {
long ts = System.currentTimeMillis() - 60*1000l;
return new Watermark(ts);
}
@Override
public long extractTimestamp(String s, long l) {
long ts = System.currentTimeMillis();
return ts;
}
}).flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Long>>
collector) throws Exception {
collector.collect(new Tuple2<>(s, 1l));
}
}).keyBy(0).timeWindow(Time.seconds(60)).apply(new
RichWindowFunction<Tuple2<String,Long>, Tuple2<String, Long>, Tuple,
TimeWindow>() {
@Override
public void apply(Tuple tuple, TimeWindow timeWindow,
Iterable<Tuple2<String, Long>> iterable, Collector<Tuple2<String,
Long>> collector) throws Exception {
log.info("trigger fire at ", System.currentTimeMillis());
collector.collect(new
Tuple2<>(String.valueOf(timeWindow.toString()), 1l));
}
}).rebalance().addSink(new FakeSink<>());
JobExecutionResult result = env.execute();