I'm attempting to update a Beam application (intended to run on flink 1.10 on
EMR) from 2.21 to 2.27. In the process I've encountered an issue with
checkpointing/savepointing and stateful do fns. This issue is not reproducible
locally on the Flink MiniCluster (or at least, all of my tests worked locally).
After backtesting through prior versions, it appears that 2.22 is the earliest
version that this problem appears in.
public static void main(String[] args) {
FlinkPipelineOptions options =
PipelineOptionsFactory.fromArgs(args).as(FlinkPipelineOptions.class);
options.setRunner(FlinkRunner.class);
options.setCheckpointingInterval(30000L);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply("UnboundedSource", GenerateSequence.from(0).withRate(1,
Duration.standardSeconds(5)))
.apply(
"ApplyTriggering",
Window.<Long>configure()
.discardingFiredPanes()
.triggering(
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(10)))))
.apply("KeyByNull", WithKeys.<Void, Long>of((Void)
null).withKeyType(voids()))
.apply(
"StatefulFn",
ParDo.of(
new DoFn<KV<Void, Long>, KV<Long, Long>>() {
private static final long serialVersionUID = 1L;
@StateId("SUM")
private final StateSpec<ValueState<Long>> sumSpec =
StateSpecs.value();
@StateId("COUNT")
private final StateSpec<ValueState<Long>> countSpec =
StateSpecs.value();
@ProcessElement
public void processElement(
@Element KV<Void, Long> element,
@StateId("SUM") ValueState<Long> sumState,
@StateId("COUNT") ValueState<Long> countState,
OutputReceiver<KV<Long, Long>> out) {
long currentSum =
Optional.ofNullable(sumState.read()).orElse(0L) +
element.getValue();
sumState.write(currentSum);
long currentCount =
Optional.ofNullable(countState.read()).orElse(0L) + 1;
countState.write(currentCount);
out.output(KV.of(currentCount, currentSum));
}
}))
.apply(
MapElements.into(voids())
.via(
kv -> {
System.out.println(
String.format(
"Time: %s, Count: %d, Sum: %d",
Instant.now(), kv.getKey(), kv.getValue()));
return null;
}));
pipeline.run();
I run this pipeline from the EMR master node (with flink 1.10.0, and rocksdb as
a state backend) using this command:
flink run -m yarn-cluster -yjm 1g -ytm 1g -ys 1 -c
my.test.package.StatefulTestPipeline -p 1 -d pipeline.jar
At first this pipeline runs as expected (output is printed on stdout for the
task manager):
Time: 2021-02-13T17:25:08.820Z, Count: 1, Sum: 0
But when the first checkpoint (or savepoint) starts, an error appears in the
logs and no more output appears:
17:25:11.231 [StatefulFn/ParMultiDo(Anonymous) ->
MapElements/Map/ParMultiDo(Anonymous) (1/1)] ERROR
org.apache.flink.runtime.taskmanager.Task - FATAL - exception in resource
cleanup of task StatefulFn/ParMultiDo(Anonymous) ->
MapElements/Map/ParMultiDo(Anonymous) (1/1) (d60f456dd213162dbc957a705609f561).
java.lang.IllegalStateException: null
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)
~[flink-dist_2.11-1.10.0.jar:1.10.0]
at
org.apache.flink.runtime.taskmanager.Task.notifyFinalState(Task.java:926)
~[falls-risk-pipeline-0.5.0-MLECOSYSTM-1579-SNAPSHOT.jar:0.5.0-MLECOSYSTM-1579-SNAPSHOT]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:832)
[falls-risk-pipeline-0.5.0-MLECOSYSTM-1579-SNAPSHOT.jar:0.5.0-MLECOSYSTM-1579-SNAPSHOT]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
[falls-risk-pipeline-0.5.0-MLECOSYSTM-1579-SNAPSHOT.jar:0.5.0-MLECOSYSTM-1579-SNAPSHOT]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
17:25:11.245 [StatefulFn/ParMultiDo(Anonymous) ->
MapElements/Map/ParMultiDo(Anonymous) (1/1)] ERROR
org.apache.flink.runtime.taskexecutor.TaskExecutor - FATAL - exception in
resource cleanup of task StatefulFn/ParMultiDo(Anonymous) ->
MapElements/Map/ParMultiDo(Anonymous) (1/1) (d60f456dd213162dbc957a705609f561).
java.lang.IllegalStateException: null
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)
~[flink-dist_2.11-1.10.0.jar:1.10.0]
at
org.apache.flink.runtime.taskmanager.Task.notifyFinalState(Task.java:926)
[falls-risk-pipeline-0.5.0-MLECOSYSTM-1579-SNAPSHOT.jar:0.5.0-MLECOSYSTM-1579-SNAPSHOT]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:832)
[falls-risk-pipeline-0.5.0-MLECOSYSTM-1579-SNAPSHOT.jar:0.5.0-MLECOSYSTM-1579-SNAPSHOT]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
[falls-risk-pipeline-0.5.0-MLECOSYSTM-1579-SNAPSHOT.jar:0.5.0-MLECOSYSTM-1579-SNAPSHOT]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
This exception appears to indicate that this particular task is done, however
in the flink dashboard, none of them read as finished/failed/canceled. They all
show up as running.
I attempted to run this same pipeline on flink 1.10.0 and beam 2.27 with the
same results.
I also tried on flink 1.11.2 and beam 2.27, however none of the subtasks
started (they got stuck in the Created state), and I haven't had a chance to
dig farther in to see if a native flink stream works.
I skimmed through the jira queue, and it’s possible that this is related to
https://issues.apache.org/jira/browse/BEAM-10927
CONFIDENTIALITY NOTICE This message and any included attachments are from
Cerner Corporation and are intended only for the addressee. The information
contained in this message is confidential and may constitute inside or
non-public information under international, federal, or state securities laws.
Unauthorized forwarding, printing, copying, distribution, or use of such
information is strictly prohibited and may be unlawful. If you are not the
addressee, please promptly delete this message and notify the sender of the
delivery error by e-mail or you may call Cerner's corporate offices in Kansas
City, Missouri, U.S.A at (+1) (816)221-1024.