[ https://issues.apache.org/jira/browse/FLINK-34857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17828849#comment-17828849 ]
Flink CDC Issue Import commented on FLINK-34857: ------------------------------------------------ Date: Wed Jan 03 19:42:48 CST 2024, Author: [klion26|https://github.com/klion26] Please see more information in the [analysis doc|https://docs.qq.com/doc/DQU5SemJCUXpmT3FN] PS Can't restore the checkpoint with FlinkCDC-2.4.2 > [Bug] Can't restore from checkpoint because assignedSplits does not equal to > splitFinishedOffsets > ------------------------------------------------------------------------------------------------- > > Key: FLINK-34857 > URL: https://issues.apache.org/jira/browse/FLINK-34857 > Project: Flink > Issue Type: Bug > Components: Flink CDC > Reporter: Flink CDC Issue Import > Priority: Major > Labels: github-import > > ### Search before asking > - [X] I searched in the > [issues|https://github.com/ververica/flink-cdc-connectors/issues] and found > nothing similar. > ### Flink version > 1.14.5 > ### Flink CDC version > 2.4.0 > ### Database and its version > MySQL > ### Minimal reproduce step > Can't restore from the checkpoint(please unzip the > [ck.tar.gz|https://github.com/ververica/flink-cdc-connectors/files/13818472/ck.tar.gz) > and restore from the _metadata) > can use the following ut code to debug > ``` > @Test > public void testRestoreFaild() throws IOException { > String checkpoint = "/Path/to/_metadata"; > Path chkPath = new Path(checkpoint); > InputStream in = chkPath.getFileSystem().open(chkPath); > DataInputStream inputStream = new DataInputStream(in); > CheckpointMetadata metadata = > Checkpoints.loadCheckpointMetadata(inputStream, > Thread.currentThread().getContextClassLoader(), checkpoint); > HybridSourceEnumeratorStateSerializer serializer = new > HybridSourceEnumeratorStateSerializer(); > PendingSplitsStateSerializer splitsStateSerializer = new > PendingSplitsStateSerializer(MySqlSplitSerializer.INSTANCE); > for (OperatorState operatorState : metadata.getOperatorStates()) { > ByteStreamStateHandle coordinatorState = > operatorState.getCoordinatorState(); > if (coordinatorState != null && > "6dc0226b15c44c9c2e1f9ea1a65fd400".equals(operatorState.getOperatorID().toHexString())) > { > HybridSourceEnumeratorState hybridSourceEnumeratorState = > serializer.deserialize(0, coordinatorState.getData()); > System.out.println(hybridSourceEnumeratorState); > HybridPendingSplitsState pendingSplitsState = > (HybridPendingSplitsState) > splitsStateSerializer.deserialize( > > hybridSourceEnumeratorState.getWrappedStateSerializerVersion(), > > hybridSourceEnumeratorState.getWrappedState()); > > System.out.println("================================================================================"); > Set<String> splits = new HashSet<>(); > for (MySqlSchemalessSnapshotSplit split : > pendingSplitsState.getSnapshotPendingSplits().getAssignedSplits().values()) { > if > (!pendingSplitsState.getSnapshotPendingSplits().getSplitFinishedOffsets().containsKey(split.splitId())) > { > System.out.println(split); > } > splits.add(split.getTableId().identifier()); > } > > System.out.println("================================================================================"); > Set<String> tables = new HashSet<>(); > for (TableId id : > pendingSplitsState.getSnapshotPendingSplits().getAlreadyProcessedTables()) { > tables.add(id.identifier()); > } > System.out.println("================================"); > System.out.println(splits); > System.out.println("================================"); > System.out.println(tables); > System.out.println("================================"); > // System.out.println(pendingSplitsState); > } > } > } > ``` > ### What did you expect to see? > Can restore from checkpoint > ### What did you see instead? > The exception is below > ``` > java.lang.NullPointerException: null > at > com.ververica.cdc.connectors.mysql.source.assigners.MySqlHybridSplitAssigner.createBinlogSplit(MySqlHybridSplitAssigner.java:208] > ~[flink-sql-connector-mysql-cdc-2.4.0.jar:2.4.0] > at > com.ververica.cdc.connectors.mysql.source.assigners.MySqlHybridSplitAssigner.getNext(MySqlHybridSplitAssigner.java:112) > ~[flink-sql-connector-mysql-cdc-2.4.0.jar:2.4.0] > at > com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator.assignSplits(MySqlSourceEnumerator.java:199) > ~[flink-sql-connector-mysql-cdc-2.4.0.jar:2.4.0] > at > com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator.handleSplitRequest(MySqlSourceEnumerator.java:104) > ~[flink-sql-connector-mysql-cdc-2.4.0.jar:2.4.0] > at > org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$handleEventFromOperator$1(SourceCoordinator.java:157) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > ``` > ### Anything else? > _No response_ > ### Are you willing to submit a PR? > - [x] I'm willing to submit a PR! > ---------------- Imported from GitHub ---------------- > Url: https://github.com/apache/flink-cdc/issues/2958 > Created by: [klion26|https://github.com/klion26] > Labels: bug, > Created at: Wed Jan 03 19:40:17 CST 2024 > State: open -- This message was sent by Atlassian Jira (v8.20.10#820010)