[ 
https://issues.apache.org/jira/browse/FLINK-34857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17828849#comment-17828849
 ] 

Flink CDC Issue Import commented on FLINK-34857:
------------------------------------------------

Date: Wed Jan 03 19:42:48 CST 2024, Author: [klion26|https://github.com/klion26]

Please see more information in the [analysis 
doc|https://docs.qq.com/doc/DQU5SemJCUXpmT3FN]

PS  Can't restore the checkpoint with FlinkCDC-2.4.2 

> [Bug] Can't restore from checkpoint because assignedSplits does not equal to 
> splitFinishedOffsets
> -------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-34857
>                 URL: https://issues.apache.org/jira/browse/FLINK-34857
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>            Reporter: Flink CDC Issue Import
>            Priority: Major
>              Labels: github-import
>
> ### Search before asking
> - [X] I searched in the 
> [issues|https://github.com/ververica/flink-cdc-connectors/issues] and found 
> nothing similar.
> ### Flink version
> 1.14.5
> ### Flink CDC version
> 2.4.0
> ### Database and its version
> MySQL
> ### Minimal reproduce step
> Can't restore from the checkpoint(please unzip the 
> [ck.tar.gz|https://github.com/ververica/flink-cdc-connectors/files/13818472/ck.tar.gz)
>  and restore from the _metadata)
> can use the following ut code to debug
> ```
> @Test
>     public void testRestoreFaild() throws IOException {
>         String checkpoint = "/Path/to/_metadata";
>         Path chkPath = new Path(checkpoint);
>         InputStream in = chkPath.getFileSystem().open(chkPath);
>         DataInputStream inputStream = new DataInputStream(in);
>         CheckpointMetadata metadata =
>                 Checkpoints.loadCheckpointMetadata(inputStream, 
> Thread.currentThread().getContextClassLoader(), checkpoint);
>         HybridSourceEnumeratorStateSerializer serializer = new 
> HybridSourceEnumeratorStateSerializer();
>         PendingSplitsStateSerializer splitsStateSerializer = new 
> PendingSplitsStateSerializer(MySqlSplitSerializer.INSTANCE);
>         for (OperatorState operatorState : metadata.getOperatorStates()) {
>             ByteStreamStateHandle coordinatorState = 
> operatorState.getCoordinatorState();
>             if (coordinatorState != null && 
> "6dc0226b15c44c9c2e1f9ea1a65fd400".equals(operatorState.getOperatorID().toHexString()))
>  {
>                 HybridSourceEnumeratorState hybridSourceEnumeratorState =
>                         serializer.deserialize(0, coordinatorState.getData());
>                 System.out.println(hybridSourceEnumeratorState);
>                 HybridPendingSplitsState pendingSplitsState =
>                         (HybridPendingSplitsState) 
> splitsStateSerializer.deserialize(
>                                 
> hybridSourceEnumeratorState.getWrappedStateSerializerVersion(),
>                                 
> hybridSourceEnumeratorState.getWrappedState());
>                 
> System.out.println("================================================================================");
>                 Set<String> splits = new HashSet<>();
>                 for (MySqlSchemalessSnapshotSplit split : 
> pendingSplitsState.getSnapshotPendingSplits().getAssignedSplits().values()) {
>                     if 
> (!pendingSplitsState.getSnapshotPendingSplits().getSplitFinishedOffsets().containsKey(split.splitId()))
>  {
>                         System.out.println(split);
>                     }
>                     splits.add(split.getTableId().identifier());
>                 }
>                 
> System.out.println("================================================================================");
>                 Set<String> tables = new HashSet<>();
>                 for (TableId id : 
> pendingSplitsState.getSnapshotPendingSplits().getAlreadyProcessedTables()) {
>                     tables.add(id.identifier());
>                 }
>                 System.out.println("================================");
>                 System.out.println(splits);
>                 System.out.println("================================");
>                 System.out.println(tables);
>                 System.out.println("================================");
> //                System.out.println(pendingSplitsState);
>             }
>         }
>     }
> ```
> ### What did you expect to see?
> Can restore from checkpoint
> ### What did you see instead?
> The exception is below
> ```
> java.lang.NullPointerException: null
>         at 
> com.ververica.cdc.connectors.mysql.source.assigners.MySqlHybridSplitAssigner.createBinlogSplit(MySqlHybridSplitAssigner.java:208]
>  ~[flink-sql-connector-mysql-cdc-2.4.0.jar:2.4.0]
>         at 
> com.ververica.cdc.connectors.mysql.source.assigners.MySqlHybridSplitAssigner.getNext(MySqlHybridSplitAssigner.java:112)
>  ~[flink-sql-connector-mysql-cdc-2.4.0.jar:2.4.0]
>         at 
> com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator.assignSplits(MySqlSourceEnumerator.java:199)
>  ~[flink-sql-connector-mysql-cdc-2.4.0.jar:2.4.0]
>         at 
> com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator.handleSplitRequest(MySqlSourceEnumerator.java:104)
>  ~[flink-sql-connector-mysql-cdc-2.4.0.jar:2.4.0]
>         at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$handleEventFromOperator$1(SourceCoordinator.java:157)
>  ~[flink-dist_2.12-1.14.5.jar:1.14.5]
> ```
> ### Anything else?
> _No response_
> ### Are you willing to submit a PR?
> - [x] I'm willing to submit a PR!
> ---------------- Imported from GitHub ----------------
> Url: https://github.com/apache/flink-cdc/issues/2958
> Created by: [klion26|https://github.com/klion26]
> Labels: bug, 
> Created at: Wed Jan 03 19:40:17 CST 2024
> State: open



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to