[ https://issues.apache.org/jira/browse/FLINK-36139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17876231#comment-17876231 ]
Vladislav Keda commented on FLINK-36139: ---------------------------------------- It seems that problem happens because of BatchingFunction implements the ResultTypeQueryable interface. The Accumulator type is calculated incorrectly by function TypeExtractor.getAggregateFunctionAccumulatorType(..), which causes exceptions during checkpoints. > ClassCastException when checkpointing AggregateFunction > ------------------------------------------------------- > > Key: FLINK-36139 > URL: https://issues.apache.org/jira/browse/FLINK-36139 > Project: Flink > Issue Type: Bug > Components: API / DataStream > Affects Versions: 1.18.1 > Environment: Kubernetes Native Session Cluster > Reporter: Vladislav Keda > Priority: Major > > Let's consider simple example of AggregateFunction with custom Accumulator: > > {code:java} > static class BatchingFunction implements AggregateFunction<Row, > BatchingFunction.Accumulator, Row>, ResultTypeQueryable<Row> { > @Getter > private final RowTypeInfo producedType; > > public BatchingFunction() { > this.producedType = ... > } > > @Override > public Accumulator createAccumulator() { > return new Accumulator(); > } > @Override > public Accumulator add(Row row, Accumulator acc) { > acc.add(row); > return acc; > } > > @Override > public Accumulator merge(Accumulator acc1, Accumulator acc2) { > acc1.merge(acc2); > return acc1; > } > @Override > public Row getResult(Accumulator accumulator) { > ... > } > private static class Accumulator implements Serializable { > > private final List<Row> rows = new ArrayList<>(); > List<Row> getAll() { > return rows; > } > Accumulator merge(Accumulator acc2) { > this.rows.addAll(acc2.rows); > acc2.clear(); > return this; > } > void add(Row row) { > rows.add(row); > } > void clear() { > rows.clear(); > } > } > } > {code} > When resubmitting a job on a Flink Kubernetes Session cluster with aligned > checkpoints that include this BatchingFunction, a ClassCastException is > thrown in the JobManager logs: > > {code:java} > org.apache.flink.util.SerializedThrowable: > org.apache.flink.runtime.checkpoint.CheckpointException: Asynchronous task > checkpoint failed. at > org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:320) > ~[flink-dist-1.18.1.jar:1.18.1] at > org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:155) > ~[flink-dist-1.18.1.jar:1.18.1] at > java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?] at > java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?] > at java.lang.Thread.run(Unknown Source) [?:?]Caused by: > org.apache.flink.util.SerializedThrowable: java.lang.Exception: Could not > materialize checkpoint 1 for operator transform -> (Sink: metrics_transform, > sink: Writer -> sink: Committer) (1/1)#0. at > org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:298) > ~[flink-dist-1.18.1.jar:1.18.1] ... 4 moreCaused by: > org.apache.flink.util.SerializedThrowable: > java.util.concurrent.ExecutionException: java.lang.ClassCastException: class > ru.glowbyte.streaming.core.operators.transformations.Batch$BatchingFunction$Accumulator > cannot be cast to class org.apache.flink.types.Row > (ru.glowbyte.streaming.core.operators.transformations.Batch$BatchingFunction$Accumulator > is in unnamed module of loader org.apache.flink.util.ChildFirstClassLoader > @3e54db3e; org.apache.flink.types.Row is in unnamed module of loader 'app') > at java.util.concurrent.FutureTask.report(Unknown Source) ~[?:?] at > java.util.concurrent.FutureTask.get(Unknown Source) ~[?:?] at > org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:511) > ~[flink-dist-1.18.1.jar:1.18.1] at > org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54) > ~[flink-dist-1.18.1.jar:1.18.1] at > org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191) > ~[flink-dist-1.18.1.jar:1.18.1] at > org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124) > ~[flink-dist-1.18.1.jar:1.18.1] ... 3 moreCaused by: > org.apache.flink.util.SerializedThrowable: java.lang.ClassCastException: > class > ru.glowbyte.streaming.core.operators.transformations.Batch$BatchingFunction$Accumulator > cannot be cast to class org.apache.flink.types.Row > (ru.glowbyte.streaming.core.operators.transformations.Batch$BatchingFunction$Accumulator > is in unnamed module of loader org.apache.flink.util.ChildFirstClassLoader > @3e54db3e; org.apache.flink.types.Row is in unnamed module of loader 'app') > at > org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:69) > ~[flink-dist-1.18.1.jar:1.18.1] at > org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot.java:147) > ~[flink-dist-1.18.1.jar:1.18.1] at > org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot.writeStateInKeyGroup(AbstractStateTableSnapshot.java:116) > ~[flink-dist-1.18.1.jar:1.18.1] at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(CopyOnWriteStateTableSnapshot.java:38) > ~[flink-dist-1.18.1.jar:1.18.1] at > org.apache.flink.runtime.state.heap.HeapSnapshotStrategy.lambda$asyncSnapshot$3(HeapSnapshotStrategy.java:172) > ~[flink-dist-1.18.1.jar:1.18.1] at > org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:91) > ~[flink-dist-1.18.1.jar:1.18.1] at > org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:88) > ~[flink-dist-1.18.1.jar:1.18.1] at > org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:78) > ~[flink-dist-1.18.1.jar:1.18.1] at > java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?] at > org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:508) > ~[flink-dist-1.18.1.jar:1.18.1] at > org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54) > ~[flink-dist-1.18.1.jar:1.18.1] at > org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191) > ~[flink-dist-1.18.1.jar:1.18.1] at > org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124) > ~[flink-dist-1.18.1.jar:1.18.1] ... 3 more {code} > In this case Flink uses incorrect serializer to write Accumulator objects to > state. I would also like to note that this behavior is stochastic, since when > I launch job first time on a new cluster such errors are not observed during > checkpoints. > -- This message was sent by Atlassian Jira (v8.20.10#820010)