[ 
https://issues.apache.org/jira/browse/FLINK-36139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17876231#comment-17876231
 ] 

Vladislav Keda commented on FLINK-36139:
----------------------------------------

It seems that problem happens because of BatchingFunction implements the 
ResultTypeQueryable interface. The Accumulator type is calculated incorrectly 
by function TypeExtractor.getAggregateFunctionAccumulatorType(..), which causes 
exceptions during checkpoints.

> ClassCastException when checkpointing AggregateFunction
> -------------------------------------------------------
>
>                 Key: FLINK-36139
>                 URL: https://issues.apache.org/jira/browse/FLINK-36139
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>    Affects Versions: 1.18.1
>         Environment: Kubernetes Native Session Cluster
>            Reporter: Vladislav Keda
>            Priority: Major
>
> Let's consider simple example of AggregateFunction with custom Accumulator:
>  
> {code:java}
>     static class BatchingFunction implements AggregateFunction<Row, 
> BatchingFunction.Accumulator, Row>, ResultTypeQueryable<Row> {
>         @Getter        
>         private final RowTypeInfo producedType;
>         
>         public BatchingFunction() {
>             this.producedType = ...
>         }
>         
>         @Override
>         public Accumulator createAccumulator() {
>             return new Accumulator();
>         }
>         @Override
>         public Accumulator add(Row row, Accumulator acc) {
>             acc.add(row);
>             return acc;
>         }
>    
>         @Override
>         public Accumulator merge(Accumulator acc1, Accumulator acc2) {
>             acc1.merge(acc2);
>             return acc1;
>         }
>         @Override
>         public Row getResult(Accumulator accumulator) {
>             ...
>         }
>         private static class Accumulator implements Serializable {
>             
>             private final List<Row> rows = new ArrayList<>();
>             List<Row> getAll() {
>                 return rows;
>             }
>             Accumulator merge(Accumulator acc2) {
>                 this.rows.addAll(acc2.rows);
>                 acc2.clear();
>                 return this;
>             }
>             void add(Row row) {
>                 rows.add(row);
>             }
>             void clear() {
>                 rows.clear();
>             }
>         }
>     }
>  {code}
> When resubmitting a job on a Flink Kubernetes Session cluster with aligned 
> checkpoints that include this BatchingFunction, a ClassCastException is 
> thrown in the JobManager logs:
>  
> {code:java}
> org.apache.flink.util.SerializedThrowable: 
> org.apache.flink.runtime.checkpoint.CheckpointException: Asynchronous task 
> checkpoint failed.    at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:320)
>  ~[flink-dist-1.18.1.jar:1.18.1]    at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:155)
>  ~[flink-dist-1.18.1.jar:1.18.1]    at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]    at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]    
> at java.lang.Thread.run(Unknown Source) [?:?]Caused by: 
> org.apache.flink.util.SerializedThrowable: java.lang.Exception: Could not 
> materialize checkpoint 1 for operator transform -> (Sink: metrics_transform, 
> sink: Writer -> sink: Committer) (1/1)#0.    at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:298)
>  ~[flink-dist-1.18.1.jar:1.18.1]    ... 4 moreCaused by: 
> org.apache.flink.util.SerializedThrowable: 
> java.util.concurrent.ExecutionException: java.lang.ClassCastException: class 
> ru.glowbyte.streaming.core.operators.transformations.Batch$BatchingFunction$Accumulator
>  cannot be cast to class org.apache.flink.types.Row 
> (ru.glowbyte.streaming.core.operators.transformations.Batch$BatchingFunction$Accumulator
>  is in unnamed module of loader org.apache.flink.util.ChildFirstClassLoader 
> @3e54db3e; org.apache.flink.types.Row is in unnamed module of loader 'app')   
>  at java.util.concurrent.FutureTask.report(Unknown Source) ~[?:?]    at 
> java.util.concurrent.FutureTask.get(Unknown Source) ~[?:?]    at 
> org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:511)
>  ~[flink-dist-1.18.1.jar:1.18.1]    at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54)
>  ~[flink-dist-1.18.1.jar:1.18.1]    at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191)
>  ~[flink-dist-1.18.1.jar:1.18.1]    at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124)
>  ~[flink-dist-1.18.1.jar:1.18.1]    ... 3 moreCaused by: 
> org.apache.flink.util.SerializedThrowable: java.lang.ClassCastException: 
> class 
> ru.glowbyte.streaming.core.operators.transformations.Batch$BatchingFunction$Accumulator
>  cannot be cast to class org.apache.flink.types.Row 
> (ru.glowbyte.streaming.core.operators.transformations.Batch$BatchingFunction$Accumulator
>  is in unnamed module of loader org.apache.flink.util.ChildFirstClassLoader 
> @3e54db3e; org.apache.flink.types.Row is in unnamed module of loader 'app')   
>  at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:69)
>  ~[flink-dist-1.18.1.jar:1.18.1]    at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot.java:147)
>  ~[flink-dist-1.18.1.jar:1.18.1]    at 
> org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot.writeStateInKeyGroup(AbstractStateTableSnapshot.java:116)
>  ~[flink-dist-1.18.1.jar:1.18.1]    at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(CopyOnWriteStateTableSnapshot.java:38)
>  ~[flink-dist-1.18.1.jar:1.18.1]    at 
> org.apache.flink.runtime.state.heap.HeapSnapshotStrategy.lambda$asyncSnapshot$3(HeapSnapshotStrategy.java:172)
>  ~[flink-dist-1.18.1.jar:1.18.1]    at 
> org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:91)
>  ~[flink-dist-1.18.1.jar:1.18.1]    at 
> org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:88)
>  ~[flink-dist-1.18.1.jar:1.18.1]    at 
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:78)
>  ~[flink-dist-1.18.1.jar:1.18.1]    at 
> java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]    at 
> org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:508)
>  ~[flink-dist-1.18.1.jar:1.18.1]    at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54)
>  ~[flink-dist-1.18.1.jar:1.18.1]    at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191)
>  ~[flink-dist-1.18.1.jar:1.18.1]    at 
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124)
>  ~[flink-dist-1.18.1.jar:1.18.1]    ... 3 more {code}
> In this case Flink uses incorrect serializer to write Accumulator objects to 
> state. I would also like to note that this behavior is stochastic, since when 
> I launch job first time on a new cluster such errors are not observed during 
> checkpoints.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to