Sergei Morozov created FLINK-36891:
--------------------------------------
Summary: MySQL CDC connector produces corrupted state in case of
serialization failure
Key: FLINK-36891
URL: https://issues.apache.org/jira/browse/FLINK-36891
Project: Flink
Issue Type: Bug
Components: Flink CDC
Affects Versions: cdc-3.2.1
Reporter: Sergei Morozov
PendingSplitsStateSerializer maintains a {{DataOutputSerializer}} instance
stored in {{{}SERIALIZER_CACHE{}}}. If a call to {{serialize()}} fails with an
exception, then the value returned by a subsequent call will contain the
partial results of the previous serialization.
As a result, this state is corrupted and cannot be deserialized.
Example serialization failure:
{code:java}
org.apache.flink.runtime.checkpoint.CheckpointException: Trigger checkpoint
failure.
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$getCheckpointException$19(CheckpointCoordinator.java:2162)
at java.base/java.util.Optional.orElseGet(Optional.java:364)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.getCheckpointException(CheckpointCoordinator.java:2161)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.onTriggerFailure(CheckpointCoordinator.java:930)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.onTriggerFailure(CheckpointCoordinator.java:908)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$startTriggeringCheckpoint$7(CheckpointCoordinator.java:636)
at
java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930)
at
java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907)
at
java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
at java.base/java.lang.Thread.run(Thread.java:831)
Caused by: java.util.ConcurrentModificationException: null
at java.base/java.util.HashMap$HashIterator.nextNode(HashMap.java:1584)
at java.base/java.util.HashMap$EntryIterator.next(HashMap.java:1617)
at java.base/java.util.HashMap$EntryIterator.next(HashMap.java:1615)
at
org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplitSerializer.writeTableSchemas(MySqlSplitSerializer.java:194)
at
org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.serializeSnapshotPendingSplitsState(PendingSplitsStateSerializer.java:161)
at
org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.serializeHybridPendingSplitsState(PendingSplitsStateSerializer.java:178)
at
org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.serialize(PendingSplitsStateSerializer.java:84)
at
org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.serialize(PendingSplitsStateSerializer.java:45)
at
org.apache.flink.runtime.source.coordinator.SourceCoordinator.writeCheckpointBytes(SourceCoordinator.java:462)
at
org.apache.flink.runtime.source.coordinator.SourceCoordinator.toBytes(SourceCoordinator.java:447)
at
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$checkpointCoordinator$6(SourceCoordinator.java:321)
at
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$9(SourceCoordinator.java:406)
at
org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
... 6 common frames omitted
{code}
Example deserialization failure:
{code:java}
java.lang.IllegalArgumentException: Invalid identifier:
at
org.apache.flink.cdc.connectors.shaded.io.debezium.relational.TableIdParser$TableIdTokenizer.tokenize(TableIdParser.java:75)
at
org.apache.flink.cdc.connectors.shaded.io.debezium.text.TokenStream.start(TokenStream.java:446)
at
org.apache.flink.cdc.connectors.shaded.io.debezium.relational.TableIdParser.parse(TableIdParser.java:31)
at
org.apache.flink.cdc.connectors.shaded.io.debezium.relational.TableId.parseParts(TableId.java:51)
at
org.apache.flink.cdc.connectors.shaded.io.debezium.relational.TableId.parse(TableId.java:40)
at
org.apache.flink.cdc.connectors.shaded.io.debezium.relational.TableId.parse(TableId.java:27)
at
org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplitSerializer.readTableSchemas(MySqlSplitSerializer.java:210)
at
org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.deserializeSnapshotPendingSplitsState(PendingSplitsStateSerializer.java:283)
at
org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.deserializeHybridPendingSplitsState(PendingSplitsStateSerializer.java:318)
at
org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.deserializePendingSplitsState(PendingSplitsStateSerializer.java:139)
at
org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.deserialize(PendingSplitsStateSerializer.java:108)
at
org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsStateSerializer.deserialize(PendingSplitsStateSerializer.java:45)
at
org.apache.flink.runtime.source.coordinator.SourceCoordinator.deserializeCheckpoint(SourceCoordinator.java:489)
at
org.apache.flink.runtime.source.coordinator.SourceCoordinator.resetToCheckpoint(SourceCoordinator.java:384)
at
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.resetAndStart(RecreateOnResetOperatorCoordinator.java:390)
at
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.lambda$resetToCheckpoint$6(RecreateOnResetOperatorCoordinator.java:144)
at
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at
java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2143)
at
org.apache.flink.runtime.operators.coordination.ComponentClosingUtils.lambda$closeAsyncWithTimeout$0(ComponentClosingUtils.java:77)
at java.base/java.lang.Thread.run(Thread.java:831)
{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)