This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 054c573cf4a8c23cdc81b387cddb5f762430bc7d
Author: DeadlineFen <[email protected]>
AuthorDate: Mon Jul 24 14:33:16 2023 +0800

    [Fix](binlog) Fix bugs in tombstone (#22031)
---
 .../org/apache/doris/binlog/BinlogTombstone.java   | 48 +++++++++----
 .../java/org/apache/doris/binlog/DBBinlog.java     | 84 ++++++++++------------
 .../java/org/apache/doris/binlog/TableBinlog.java  | 35 ++++-----
 3 files changed, 83 insertions(+), 84 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogTombstone.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogTombstone.java
index 50d9f90a01..2b6e3cb8e1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogTombstone.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogTombstone.java
@@ -36,33 +36,34 @@ public class BinlogTombstone {
     @SerializedName(value = "commitSeq")
     private long commitSeq;
 
+    // TODO(deadlinefen): delete this field later
+    // This is a reserved field for the transition between new and old 
versions.
+    // It will be deleted later
     @SerializedName(value = "tableIds")
     private List<Long> tableIds;
 
+    @SerializedName(value = "tableCommitSeqMap")
+    private Map<Long, Long> tableCommitSeqMap;
+
     @SerializedName(value = "tableVersionMap")
     // this map keep last upsert record <tableId, UpsertRecord>
     // only for master fe to send be gc task, not need persist
     private Map<Long, UpsertRecord.TableRecord> tableVersionMap = 
Maps.newHashMap();
 
-    public BinlogTombstone(long dbId, List<Long> tableIds, long commitSeq) {
-        this.dbBinlogTombstone = true;
-        this.dbId = dbId;
-        this.tableIds = tableIds;
-        this.commitSeq = commitSeq;
-    }
-
-    public BinlogTombstone(long dbId, long commitSeq) {
-        this.dbBinlogTombstone = false;
+    public BinlogTombstone(long dbId, boolean isDbTombstone) {
+        this.dbBinlogTombstone = isDbTombstone;
         this.dbId = dbId;
-        this.tableIds = null;
-        this.commitSeq = commitSeq;
+        this.commitSeq = -1;
+        this.tableIds = Collections.emptyList();
+        this.tableCommitSeqMap = Maps.newHashMap();
     }
 
-    public BinlogTombstone(long dbId, long tableId, long commitSeq) {
+    public BinlogTombstone(long tableId, long commitSeq) {
         this.dbBinlogTombstone = false;
-        this.dbId = dbId;
-        this.tableIds = Collections.singletonList(tableId);
+        this.dbId = -1;
         this.commitSeq = commitSeq;
+        this.tableIds = Collections.emptyList();
+        this.tableCommitSeqMap = Collections.singletonMap(tableId, commitSeq);
     }
 
     public void addTableRecord(long tableId, UpsertRecord upsertRecord) {
@@ -75,6 +76,14 @@ public class BinlogTombstone {
         tableVersionMap.putAll(records);
     }
 
+    // Can only be used to merge tombstone of the same db
+    public void mergeTableTombstone(BinlogTombstone tombstone) {
+        if (commitSeq < tombstone.getCommitSeq()) {
+            commitSeq = tombstone.getCommitSeq();
+        }
+        tableCommitSeqMap.putAll(tombstone.getTableCommitSeqMap());
+    }
+
     public boolean isDbBinlogTomstone() {
         return dbBinlogTombstone;
     }
@@ -83,10 +92,21 @@ public class BinlogTombstone {
         return dbId;
     }
 
+    // TODO(deadlinefen): delete this code later
     public List<Long> getTableIds() {
+        if (tableIds == null) {
+            tableIds = Collections.emptyList();
+        }
         return tableIds;
     }
 
+    public Map<Long, Long> getTableCommitSeqMap() {
+        if (tableCommitSeqMap == null) {
+            tableCommitSeqMap = Collections.emptyMap();
+        }
+        return tableCommitSeqMap;
+    }
+
     public long getCommitSeq() {
         return commitSeq;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
index 58708c8fe6..151c5e5be9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
@@ -35,7 +35,6 @@ import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -160,13 +159,6 @@ public class DBBinlog {
         } finally {
             lock.writeLock().unlock();
         }
-
-        lock.readLock().lock();
-        try {
-            LOG.info("[deadlinefen] after add, db {} binlogs: {}, dummys: {}", 
dbId, allBinlogs, tableDummyBinlogs);
-        } finally {
-            lock.readLock().unlock();
-        }
     }
 
     public long getDbId() {
@@ -234,28 +226,26 @@ public class DBBinlog {
         return tombstone;
     }
 
-    private BinlogTombstone collectTableTombstone(List<BinlogTombstone> 
tableTombstones) {
+    private BinlogTombstone collectTableTombstone(List<BinlogTombstone> 
tableTombstones, boolean isDbGc) {
         if (tableTombstones.isEmpty()) {
             return null;
         }
 
-        List<Long> tableIds = Lists.newArrayList();
-        long largestExpiredCommitSeq = -1;
-        BinlogTombstone dbTombstone = new BinlogTombstone(dbId, tableIds, -1);
+        BinlogTombstone dbTombstone = new BinlogTombstone(dbId, isDbGc);
         for (BinlogTombstone tableTombstone : tableTombstones) {
-            long commitSeq = tableTombstone.getCommitSeq();
-            if (largestExpiredCommitSeq < commitSeq) {
-                largestExpiredCommitSeq = commitSeq;
-            }
+            // collect tableCommitSeq
+            dbTombstone.mergeTableTombstone(tableTombstone);
+
+            // collect tableVersionMap
             Map<Long, UpsertRecord.TableRecord> tableVersionMap = 
tableTombstone.getTableVersionMap();
             if (tableVersionMap.size() > 1) {
                 LOG.warn("tableVersionMap size is greater than 1. 
tableVersionMap: {}", tableVersionMap);
             }
-            tableIds.addAll(tableTombstone.getTableIds());
             dbTombstone.addTableRecord(tableVersionMap);
         }
 
-        dbTombstone.setCommitSeq(largestExpiredCommitSeq);
+        LOG.info("After GC, dbId: {}, dbExpiredBinlog: {}, 
tableExpiredBinlogs: {}",
+                dbId, dbTombstone.getCommitSeq(), 
dbTombstone.getTableCommitSeqMap());
 
         return dbTombstone;
     }
@@ -277,17 +267,9 @@ public class DBBinlog {
                 tombstones.add(tombstone);
             }
         }
-        BinlogTombstone tombstone = collectTableTombstone(tombstones);
+        BinlogTombstone tombstone = collectTableTombstone(tombstones, false);
         if (tombstone != null) {
             removeExpiredMetaData(tombstone.getCommitSeq());
-
-            lock.readLock().lock();
-            try {
-                LOG.info("[deadlinefen] after gc, db {} binlogs: {}, 
tombstone.seq: {}, dummys: {}",
-                        dbId, allBinlogs, tombstone.getCommitSeq(), 
tableDummyBinlogs);
-            } finally {
-                lock.readLock().unlock();
-            }
         }
 
         return tombstone;
@@ -329,7 +311,7 @@ public class DBBinlog {
     private BinlogTombstone dbBinlogEnableGc(long expiredMs) {
         // step 1: get current tableBinlog info and expiredCommitSeq
         long expiredCommitSeq = -1;
-        lock.readLock().lock();
+        lock.writeLock().lock();
         try {
             Iterator<Pair<Long, Long>> timeIter = timestamps.iterator();
             while (timeIter.hasNext()) {
@@ -355,7 +337,7 @@ public class DBBinlog {
                 }
             }
         } finally {
-            lock.readLock().unlock();
+            lock.writeLock().unlock();
         }
 
         if (expiredCommitSeq == -1) {
@@ -372,15 +354,7 @@ public class DBBinlog {
             }
         }
 
-        lock.readLock().lock();
-        try {
-            LOG.info("[deadlinefen] after gc, db {} binlogs: {}, 
tombstone.seq: {}, dummys: {}",
-                    dbId, allBinlogs, expiredCommitSeq, tableDummyBinlogs);
-        } finally {
-            lock.readLock().unlock();
-        }
-
-        return collectTableTombstone(tableTombstones);
+        return collectTableTombstone(tableTombstones, true);
     }
 
     public void replayGc(BinlogTombstone tombstone) {
@@ -407,11 +381,14 @@ public class DBBinlog {
                 }
             }
 
-            Iterator<TBinlog> binlogIterator = allBinlogs.iterator();
-            while (binlogIterator.hasNext()) {
-                TBinlog binlog = binlogIterator.next();
+            Iterator<TBinlog> binlogIter = allBinlogs.iterator();
+            TBinlog dummy = binlogIter.next();
+            dummy.setCommitSeq(largestExpiredCommitSeq);
+
+            while (binlogIter.hasNext()) {
+                TBinlog binlog = binlogIter.next();
                 if (binlog.getCommitSeq() <= largestExpiredCommitSeq) {
-                    binlogIterator.remove();
+                    binlogIter.remove();
                 } else {
                     break;
                 }
@@ -426,22 +403,33 @@ public class DBBinlog {
     public void dbBinlogDisableReplayGc(BinlogTombstone tombstone) {
         List<TableBinlog> tableBinlogs;
 
-        lock.writeLock().lock();
+        lock.readLock().lock();
         try {
             tableBinlogs = Lists.newArrayList(tableBinlogMap.values());
         } finally {
-            lock.writeLock().unlock();
+            lock.readLock().unlock();
         }
 
         if (tableBinlogs.isEmpty()) {
             return;
         }
 
-        Set<Long> tableIds = Sets.newHashSet(tombstone.getTableIds());
-        long largestExpiredCommitSeq = tombstone.getCommitSeq();
+        Map<Long, Long> tableCommitSeqMap = tombstone.getTableCommitSeqMap();
+        // TODO(deadlinefen): delete this code
+        // This is a reserved code for the transition between new and old 
versions.
+        // It will be deleted later
+        if (tableCommitSeqMap.isEmpty()) {
+            long commitSeq = tombstone.getCommitSeq();
+            List<Long> tableIds = tombstone.getTableIds();
+            for (long tableId : tableIds) {
+                tableCommitSeqMap.put(tableId, commitSeq);
+            }
+        }
+
         for (TableBinlog tableBinlog : tableBinlogs) {
-            if (tableIds.contains(tableBinlog.getTableId())) {
-                tableBinlog.replayGc(largestExpiredCommitSeq);
+            long tableId = tableBinlog.getTableId();
+            if (tableCommitSeqMap.containsKey(tableId)) {
+                tableBinlog.replayGc(tableCommitSeqMap.get(tableId));
             }
         }
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
index 9b82272f63..8934084e99 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
@@ -77,7 +77,6 @@ public class TableBinlog {
         try {
             binlogs.add(binlog);
             ++binlog.table_ref;
-            LOG.info("[deadlinefen] after add, table {} binlogs: {}", tableId, 
binlogs);
         } finally {
             lock.writeLock().unlock();
         }
@@ -154,20 +153,12 @@ public class TableBinlog {
 
         TBinlog lastUpsertBinlog = tombstoneInfo.first;
         long largestCommitSeq = tombstoneInfo.second;
-        BinlogTombstone tombstone = new BinlogTombstone(-1, largestCommitSeq);
+        BinlogTombstone tombstone = new BinlogTombstone(tableId, 
largestCommitSeq);
         if (lastUpsertBinlog != null) {
             UpsertRecord upsertRecord = 
UpsertRecord.fromJson(lastUpsertBinlog.getData());
             tombstone.addTableRecord(tableId, upsertRecord);
         }
 
-        lock.readLock().lock();
-        try {
-            LOG.info("[deadlinefen] after gc, table {} binlogs: {}, 
tombstone.seq: {}",
-                    tableId, binlogs, tombstone.getCommitSeq());
-        } finally {
-            lock.readLock().unlock();
-        }
-
         return tombstone;
     }
 
@@ -191,7 +182,6 @@ public class TableBinlog {
             return null;
         }
 
-        long dbId = db.getId();
         long ttlSeconds = table.getBinlogConfig().getTtlSeconds();
         long expiredMs = BinlogUtils.getExpiredMs(ttlSeconds);
 
@@ -217,36 +207,37 @@ public class TableBinlog {
 
         TBinlog lastUpsertBinlog = tombstoneInfo.first;
         long largestCommitSeq = tombstoneInfo.second;
-        BinlogTombstone tombstone = new BinlogTombstone(dbId, tableId, 
largestCommitSeq);
+        BinlogTombstone tombstone = new BinlogTombstone(tableId, 
largestCommitSeq);
         if (lastUpsertBinlog != null) {
             UpsertRecord upsertRecord = 
UpsertRecord.fromJson(lastUpsertBinlog.getData());
             tombstone.addTableRecord(tableId, upsertRecord);
         }
 
-        lock.readLock().lock();
-        try {
-            LOG.info("[deadlinefen] after gc, table {} binlogs: {}, 
tombstone.seq: {}",
-                    tableId, binlogs, tombstone.getCommitSeq());
-        } finally {
-            lock.readLock().unlock();
-        }
-
-
         return tombstone;
     }
 
     public void replayGc(long largestExpiredCommitSeq) {
         lock.writeLock().lock();
         try {
+            long lastSeq = -1;
             Iterator<TBinlog> iter = binlogs.iterator();
+            TBinlog dummyBinlog = iter.next();
+
             while (iter.hasNext()) {
                 TBinlog binlog = iter.next();
-                if (binlog.getCommitSeq() <= largestExpiredCommitSeq) {
+                long commitSeq = binlog.getCommitSeq();
+                if (commitSeq <= largestExpiredCommitSeq) {
+                    lastSeq = commitSeq;
+                    --binlog.table_ref;
                     iter.remove();
                 } else {
                     break;
                 }
             }
+
+            if (lastSeq != -1) {
+                dummyBinlog.setCommitSeq(lastSeq);
+            }
         } finally {
             lock.writeLock().unlock();
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to