Hi,
Since you get AbstractMethodError, there is likely a version mismatch
with Beam/Flink.
Could you please provide:
- Beam version used
- Flink Runner artifact used
- Flink version used
Thanks,
Max
On 20.05.19 01:16, [email protected] wrote:
Hello Beam Dev,
I am having a hard time to get checkpoint work with FlinkRunner. I setup
two simple pipelines, one read from unboundedsource Kinesis, and other
read from text file (and test with/without `--streaming=true` config).
But both are failed to save a checkpoint. The checkpoint are configured
to save to file system. I am wonder if I am missing something?. Below
are my pipelines and stack track for your references.
Appreciate if you can give me some pointers!
*Pipeline #1: *
pipeline.apply("ReadingFromKinesis123", KinesisIO./read/()
.withStreamName("test-stream")
.withInitialPositionInStream(InitialPositionInStream./TRIM_HORIZON/)
.withAWSClientsProvider("foo", "bar", Regions./US_WEST_2/,
"http://localhost:30002"));
*Pipeline#2:*
Pipeline p = Pipeline./create/(options);
p.apply("ReadLines", TextIO./read/().from(options.getInputFile()));
p.run().waitUntilFinish();
*One of my commands:*
bin/flink run -c org.apache.beam.examples.WordCount
../../word-count-beam/target/word-count-beam-bundled-0.1.jar
--runner=FlinkRunner --checkpointingInterval=5000
--externalizedCheckpointsEnabled=true --streamName=test-stream
--retainExternalizedCheckpointsOnCancellation=true --awsRegion=us-west-2
*flink-conf.yaml:*
state.backend: filesystem
state.checkpoints.dir: file:///tmp/flink-checkpoint/flink_app/
state.savepoints.dir: file:///tmp/flink-checkpoint/flink_app/savepoints/
*Stacktrack:*
AsynchronousException{java.lang.Exception: Could not materialize
checkpoint 7 for operator Source: ReadLines/Read ->
DropInputs/ParMultiDo(NoOp) (1/1).}
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 7 for
operator Source: ReadLines/Read -> DropInputs/ParMultiDo(NoOp) (1/1).
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
... 6 more
Caused by: java.util.concurrent.ExecutionException:
java.lang.AbstractMethodError:
org.apache.flink.api.common.typeutils.TypeSerializer.snapshotConfiguration()Lorg/apache/flink/api/common/typeutils/TypeSerializerSnapshot;
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)
at
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
... 5 more
Caused by: java.lang.AbstractMethodError:
org.apache.flink.api.common.typeutils.TypeSerializer.snapshotConfiguration()Lorg/apache/flink/api/common/typeutils/TypeSerializerSnapshot;
at
org.apache.flink.runtime.state.RegisteredOperatorStateBackendMetaInfo.computeSnapshot(RegisteredOperatorStateBackendMetaInfo.java:170)
at
org.apache.flink.runtime.state.RegisteredOperatorStateBackendMetaInfo.snapshot(RegisteredOperatorStateBackendMetaInfo.java:103)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:123)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
at
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)