This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch release-3.2
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git

commit 39afc4fc891abcb937fa0517e651caad34df196d
Author: Kevin Wang <data...@163.com>
AuthorDate: Wed Aug 21 10:40:17 2024 +0800

    [FLINK-36081][cdc-connector][mysql] Remove the schemas of outdated tables 
in the BinlogSplit when restart (#3548)
    
    Co-authored-by: 云时 <mingya....@alibaba-inc.com>
    (cherry picked from commit 77c63385d947f3bb8e726561a7f01cd383941a96)
---
 .../mysql/source/split/MySqlBinlogSplit.java       |  31 +++--
 .../mysql/source/split/MySqlBinlogSplitTest.java   | 140 +++++++++++++++++++++
 2 files changed, 164 insertions(+), 7 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplit.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplit.java
index 033844ab3..7a2a8ec20 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplit.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplit.java
@@ -183,30 +183,47 @@ public class MySqlBinlogSplit extends MySqlSplit {
      *
      * <p>When restore from a checkpoint, the finished split infos may contain 
some splits from the
      * deleted tables. We need to remove these splits from the total finished 
split infos and update
-     * the size.
+     * the size, while also removing the outdated tables from the table 
schemas of binlog split.
      */
     public static MySqlBinlogSplit filterOutdatedSplitInfos(
             MySqlBinlogSplit binlogSplit, Tables.TableFilter 
currentTableFilter) {
-        Set<TableId> tablesToRemove =
+        Map<TableId, TableChange> filteredTableSchemas =
+                binlogSplit.getTableSchemas().entrySet().stream()
+                        .filter(entry -> 
currentTableFilter.isIncluded(entry.getKey()))
+                        .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+        Set<TableId> tablesToRemoveInFinishedSnapshotSplitInfos =
                 binlogSplit.getFinishedSnapshotSplitInfos().stream()
                         .filter(i -> 
!currentTableFilter.isIncluded(i.getTableId()))
                         .map(split -> split.getTableId())
                         .collect(Collectors.toSet());
-        if (tablesToRemove.isEmpty()) {
-            return binlogSplit;
+        if (tablesToRemoveInFinishedSnapshotSplitInfos.isEmpty()) {
+            return new MySqlBinlogSplit(
+                    binlogSplit.splitId,
+                    binlogSplit.getStartingOffset(),
+                    binlogSplit.getEndingOffset(),
+                    binlogSplit.getFinishedSnapshotSplitInfos(),
+                    filteredTableSchemas,
+                    binlogSplit.totalFinishedSplitSize,
+                    binlogSplit.isSuspended());
         }
 
-        LOG.info("Reader remove tables after restart: {}", tablesToRemove);
+        LOG.info(
+                "Reader remove tables after restart: {}",
+                tablesToRemoveInFinishedSnapshotSplitInfos);
         List<FinishedSnapshotSplitInfo> allFinishedSnapshotSplitInfos =
                 binlogSplit.getFinishedSnapshotSplitInfos().stream()
-                        .filter(i -> !tablesToRemove.contains(i.getTableId()))
+                        .filter(
+                                i ->
+                                        
!tablesToRemoveInFinishedSnapshotSplitInfos.contains(
+                                                i.getTableId()))
                         .collect(Collectors.toList());
         return new MySqlBinlogSplit(
                 binlogSplit.splitId,
                 binlogSplit.getStartingOffset(),
                 binlogSplit.getEndingOffset(),
                 allFinishedSnapshotSplitInfos,
-                binlogSplit.getTableSchemas(),
+                filteredTableSchemas,
                 binlogSplit.getTotalFinishedSplitSize()
                         - (binlogSplit.getFinishedSnapshotSplitInfos().size()
                                 - allFinishedSnapshotSplitInfos.size()),
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplitTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplitTest.java
new file mode 100644
index 000000000..6066c19f8
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplitTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.mysql.source.split;
+
+import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
+
+import io.debezium.relational.Column;
+import io.debezium.relational.Table;
+import io.debezium.relational.TableEditor;
+import io.debezium.relational.TableId;
+import io.debezium.relational.Tables;
+import io.debezium.relational.history.TableChanges;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Unit tests for {@link MySqlBinlogSplit}. */
+public class MySqlBinlogSplitTest {
+
+    @Test
+    public void filterOutdatedSplitInfos() {
+        Map<TableId, TableChanges.TableChange> tableSchemas = new HashMap<>();
+
+        // mock table1
+        TableId tableId1 = new TableId("catalog1", null, "table1");
+
+        TableChanges.TableChange tableChange1 =
+                new TableChanges.TableChange(
+                        TableChanges.TableChangeType.CREATE,
+                        new MockTable(TableId.parse("catalog1.table1")));
+
+        // mock table2
+        TableId tableId2 = new TableId("catalog2", null, "table2");
+
+        TableChanges.TableChange tableChange2 =
+                new TableChanges.TableChange(
+                        TableChanges.TableChangeType.CREATE,
+                        new MockTable(TableId.parse("catalog2.table2")));
+        tableSchemas.put(tableId1, tableChange1);
+        tableSchemas.put(tableId2, tableChange2);
+        MySqlBinlogSplit binlogSplit =
+                new MySqlBinlogSplit(
+                        "binlog-split",
+                        BinlogOffset.ofLatest(),
+                        null,
+                        new ArrayList<>(),
+                        tableSchemas,
+                        0,
+                        false);
+
+        // case 1: only include table1
+        Tables.TableFilter currentTableFilter = tableId -> 
tableId.table().equals("table1");
+
+        MySqlBinlogSplit mySqlBinlogSplit =
+                MySqlBinlogSplit.filterOutdatedSplitInfos(binlogSplit, 
currentTableFilter);
+        Map<TableId, TableChanges.TableChange> filterTableSchemas =
+                mySqlBinlogSplit.getTableSchemas();
+        Assert.assertEquals(1, filterTableSchemas.size());
+        Assert.assertEquals(tableChange1, filterTableSchemas.get(tableId1));
+
+        // case 2: include all tables
+        currentTableFilter = tableId -> tableId.table().startsWith("table");
+
+        mySqlBinlogSplit =
+                MySqlBinlogSplit.filterOutdatedSplitInfos(binlogSplit, 
currentTableFilter);
+        filterTableSchemas = mySqlBinlogSplit.getTableSchemas();
+        Assert.assertEquals(2, filterTableSchemas.size());
+        Assert.assertEquals(tableChange1, filterTableSchemas.get(tableId1));
+        Assert.assertEquals(tableChange2, filterTableSchemas.get(tableId2));
+    }
+
+    /** A mock implementation for {@link Table} which is used for unit tests. 
*/
+    private static class MockTable implements Table {
+        private final TableId tableId;
+
+        public MockTable(TableId tableId) {
+            this.tableId = tableId;
+        }
+
+        @Override
+        public TableId id() {
+            return tableId;
+        }
+
+        @Override
+        public List<String> primaryKeyColumnNames() {
+            return Collections.emptyList();
+        }
+
+        @Override
+        public List<String> retrieveColumnNames() {
+            return Collections.emptyList();
+        }
+
+        @Override
+        public List<Column> columns() {
+            return Collections.emptyList();
+        }
+
+        @Override
+        public Column columnWithName(String name) {
+            throw new UnsupportedOperationException("Not implemented.");
+        }
+
+        @Override
+        public String defaultCharsetName() {
+            return "UTF-8";
+        }
+
+        @Override
+        public String comment() {
+            return null;
+        }
+
+        @Override
+        public TableEditor edit() {
+            throw new UnsupportedOperationException("Not implemented.");
+        }
+    }
+}

Reply via email to