This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 7e0c9f551efb38880d898b6431db12baf5f390fc
Author: DeadlineFen <[email protected]>
AuthorDate: Fri Jul 14 16:59:32 2023 +0800

    [Enhancement] (binlog) TBinlog and BinlogManager V2 (#21674)
---
 .../org/apache/doris/binlog/BinlogComparator.java  |  24 ++
 .../java/org/apache/doris/binlog/BinlogGcer.java   |   5 +-
 .../org/apache/doris/binlog/BinlogManager.java     | 120 +++++----
 .../org/apache/doris/binlog/BinlogTombstone.java   |   8 +-
 .../java/org/apache/doris/binlog/BinlogUtils.java  |  53 ++++
 .../java/org/apache/doris/binlog/DBBinlog.java     | 298 +++++++++++++++------
 .../java/org/apache/doris/binlog/TableBinlog.java  | 167 ++++++++----
 gensrc/thrift/FrontendService.thrift               |   5 +-
 8 files changed, 478 insertions(+), 202 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogComparator.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogComparator.java
new file mode 100644
index 0000000000..9e35cc3bd6
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogComparator.java
@@ -0,0 +1,24 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.binlog;
+
+import org.apache.doris.thrift.TBinlog;
+
+public interface BinlogComparator {
+    boolean isExpired(TBinlog binlog, long expired);
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogGcer.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogGcer.java
index a2e6afd1fc..96d41946ff 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogGcer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogGcer.java
@@ -135,9 +135,8 @@ public class BinlogGcer extends MasterDaemon {
         }
     }
 
-    private void sendTableGcInfoToBe(Map<Long, BinlogGcTask> 
beBinlogGcTaskMap, OlapTable table,
+    private void sendTableGcInfoToBe(Map<Long, BinlogGcTask> 
beBinlogGcTaskMap, OlapTable olapTable,
             UpsertRecord.TableRecord tableRecord) {
-        OlapTable olapTable = (OlapTable) table;
 
         olapTable.readLock();
         try {
@@ -173,7 +172,7 @@ public class BinlogGcer extends MasterDaemon {
                 }
             }
         } finally {
-            table.readUnlock();
+            olapTable.readUnlock();
         }
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
index 63e773af4d..b07072955f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
@@ -38,15 +38,10 @@ import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.transport.TMemoryBuffer;
 import org.apache.thrift.transport.TMemoryInputTransport;
-import org.apache.thrift.transport.TTransportException;
 
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -83,7 +78,7 @@ public class BinlogManager {
         try {
             dbBinlog = dbBinlogMap.get(dbId);
             if (dbBinlog == null) {
-                dbBinlog = new DBBinlog(dbId);
+                dbBinlog = new DBBinlog(binlog);
                 dbBinlogMap.put(dbId, dbBinlog);
             }
         } finally {
@@ -105,6 +100,7 @@ public class BinlogManager {
         if (tableIds != null && !tableIds.isEmpty()) {
             binlog.setTableIds(tableIds);
         }
+        binlog.setTableRef(0);
         addBinlog(binlog);
     }
 
@@ -121,7 +117,7 @@ public class BinlogManager {
 
     public void addAddPartitionRecord(AddPartitionRecord addPartitionRecord) {
         long dbId = addPartitionRecord.getDbId();
-        List<Long> tableIds = new ArrayList<Long>();
+        List<Long> tableIds = Lists.newArrayList();
         tableIds.add(addPartitionRecord.getTableId());
         long commitSeq = addPartitionRecord.getCommitSeq();
         long timestamp = -1;
@@ -133,7 +129,7 @@ public class BinlogManager {
 
     public void addCreateTableRecord(CreateTableRecord createTableRecord) {
         long dbId = createTableRecord.getDbId();
-        List<Long> tableIds = new ArrayList<Long>();
+        List<Long> tableIds = Lists.newArrayList();
         tableIds.add(createTableRecord.getTableId());
         long commitSeq = createTableRecord.getCommitSeq();
         long timestamp = -1;
@@ -145,7 +141,7 @@ public class BinlogManager {
 
     public void addDropPartitionRecord(DropPartitionInfo dropPartitionInfo, 
long commitSeq) {
         long dbId = dropPartitionInfo.getDbId();
-        List<Long> tableIds = new ArrayList<Long>();
+        List<Long> tableIds = Lists.newArrayList();
         tableIds.add(dropPartitionInfo.getTableId());
         long timestamp = -1;
         TBinlogType type = TBinlogType.DROP_PARTITION;
@@ -156,7 +152,7 @@ public class BinlogManager {
 
     public void addDropTableRecord(DropTableRecord record) {
         long dbId = record.getDbId();
-        List<Long> tableIds = new ArrayList<Long>();
+        List<Long> tableIds = Lists.newArrayList();
         tableIds.add(record.getTableId());
         long commitSeq = record.getCommitSeq();
         long timestamp = -1;
@@ -168,7 +164,7 @@ public class BinlogManager {
 
     public void addAlterJobV2(AlterJobV2 alterJob, long commitSeq) {
         long dbId = alterJob.getDbId();
-        List<Long> tableIds = new ArrayList<Long>();
+        List<Long> tableIds = Lists.newArrayList();
         tableIds.add(alterJob.getTableId());
         long timestamp = -1;
         TBinlogType type = TBinlogType.ALTER_JOB;
@@ -179,7 +175,7 @@ public class BinlogManager {
 
     public void addModifyTableAddOrDropColumns(TableAddOrDropColumnsInfo info, 
long commitSeq) {
         long dbId = info.getDbId();
-        List<Long> tableIds = new ArrayList<Long>();
+        List<Long> tableIds = Lists.newArrayList();
         tableIds.add(info.getTableId());
         long timestamp = -1;
         TBinlogType type = TBinlogType.MODIFY_TABLE_ADD_OR_DROP_COLUMNS;
@@ -228,9 +224,9 @@ public class BinlogManager {
         LOG.info("begin gc binlog");
 
         lock.writeLock().lock();
-        Map<Long, DBBinlog> gcDbBinlogMap = null;
+        Map<Long, DBBinlog> gcDbBinlogMap;
         try {
-            gcDbBinlogMap = new HashMap<Long, DBBinlog>(dbBinlogMap);
+            gcDbBinlogMap = Maps.newHashMap(dbBinlogMap);
         } finally {
             lock.writeLock().unlock();
         }
@@ -242,9 +238,9 @@ public class BinlogManager {
 
         List<BinlogTombstone> tombstones = Lists.newArrayList();
         for (DBBinlog dbBinlog : gcDbBinlogMap.values()) {
-            List<BinlogTombstone> dbTombstones = dbBinlog.gc();
+            BinlogTombstone dbTombstones = dbBinlog.gc();
             if (dbTombstones != null) {
-                tombstones.addAll(dbTombstones);
+                tombstones.add(dbTombstones);
             }
         }
         return tombstones;
@@ -252,9 +248,9 @@ public class BinlogManager {
 
     public void replayGc(BinlogGcInfo binlogGcInfo) {
         lock.writeLock().lock();
-        Map<Long, DBBinlog> gcDbBinlogMap = null;
+        Map<Long, DBBinlog> gcDbBinlogMap;
         try {
-            gcDbBinlogMap = new HashMap<Long, DBBinlog>(dbBinlogMap);
+            gcDbBinlogMap = Maps.newHashMap(dbBinlogMap);
         } finally {
             lock.writeLock().unlock();
         }
@@ -309,18 +305,11 @@ public class BinlogManager {
             return checksum;
         }
 
-        List<TBinlog> binlogs = new ArrayList<TBinlog>();
+        List<TBinlog> binlogs = Lists.newArrayList();
         // Step 1: get all binlogs
         for (DBBinlog dbBinlog : dbBinlogMap.values()) {
             dbBinlog.getAllBinlogs(binlogs);
         }
-        // sort binlogs by commitSeq
-        Collections.sort(binlogs, new Comparator<TBinlog>() {
-            @Override
-            public int compare(TBinlog o1, TBinlog o2) {
-                return Long.compare(o1.getCommitSeq(), o2.getCommitSeq());
-            }
-        });
 
         // Step 2: write binlogs length
         dos.writeInt(binlogs.size());
@@ -339,31 +328,6 @@ public class BinlogManager {
         return checksum;
     }
 
-    public void read(DataInputStream dis) throws IOException {
-        // Step 1: read binlogs length
-        int length = dis.readInt();
-
-        // Step 2: read all binlogs from dis && add binlog
-        TMemoryBuffer buffer;
-        TBinaryProtocol protocol;
-        try {
-            buffer = new TMemoryBuffer(BUFFER_SIZE);
-            protocol = new TBinaryProtocol(buffer);
-        } catch (TTransportException e) {
-            throw new IOException("failed to create TMemoryBuffer");
-        }
-
-        for (int i = 0; i < length; i++) {
-            TBinlog binlog = new TBinlog();
-            try {
-                binlog.read(protocol);
-            } catch (TException e) {
-                throw new IOException("failed to read binlog from 
TMemoryBuffer");
-            }
-            addBinlog(binlog);
-        }
-    }
-
     public TBinlog readTBinlogFromStream(DataInputStream dis) throws 
TException, IOException {
         // We assume that the first int is the length of the serialized data.
         int length = dis.readInt();
@@ -376,19 +340,63 @@ public class BinlogManager {
         return binlog;
     }
 
+    // db TBinlogs in file struct:
+    // (tableDummy)TBinlog.belong == tableId, (dbDummy)TBinlog.belong == -1
+    // 
+---------------------------+------------------+-----------------------------------+
+    // | (tableDummy)TBinlog | ... | (dbDummy)TBinlog | TBinlog | TBinlog | 
TBinlog | ... |
+    // 
+---------------------------+------------------+-----------------------------------+
+    // |        Unnecessary        |     Necessary    |             
Unnecessary           |
+    // 
+---------------------------+------------------+-----------------------------------+
     public long read(DataInputStream dis, long checksum) throws IOException {
         // Step 1: read binlogs length
         int size = dis.readInt();
         LOG.info("read binlogs length: {}", size);
 
         // Step 2: read all binlogs from dis
-        for (int i = 0; i < size; i++) {
-            try {
+        long currentDbId = -1;
+        boolean currentDbBinlogEnable = false;
+        List<TBinlog> tableDummies = Lists.newArrayList();
+        try {
+            for (int i = 0; i < size; i++) {
+                // Step 2.1: read a binlog
                 TBinlog binlog = readTBinlogFromStream(dis);
-                addBinlog(binlog);
-            } catch (TException e) {
-                throw new IOException("failed to read binlog from 
TMemoryBuffer", e);
+
+                // Step 2.2: check if there is in next db Binlogs region
+                long dbId = binlog.getDbId();
+                if (dbId != currentDbId) {
+                    // if there is in next db Binlogs region, check and update 
metadata
+                    Database db = 
Env.getCurrentInternalCatalog().getDbNullable(dbId);
+                    if (db == null) {
+                        LOG.warn("db not found. dbId: {}", dbId);
+                        continue;
+                    }
+                    currentDbId = dbId;
+                    currentDbBinlogEnable = db.getBinlogConfig().isEnable();
+                    tableDummies = Lists.newArrayList();
+                }
+
+                // step 2.3: recover binlog
+                if (binlog.getType() == TBinlogType.DUMMY) {
+                    // collect tableDummyBinlogs and dbDummyBinlog to recover 
DBBinlog and TableBinlog
+                    if (binlog.getBelong() == -1) {
+                        DBBinlog dbBinlog = DBBinlog.recoverDbBinlog(binlog, 
tableDummies, currentDbBinlogEnable);
+                        dbBinlogMap.put(dbId, dbBinlog);
+                    } else {
+                        tableDummies.add(binlog);
+                    }
+                } else {
+                    // recover common binlogs
+                    DBBinlog dbBinlog = dbBinlogMap.get(dbId);
+                    if (dbBinlog == null) {
+                        LOG.warn("dbBinlog recover fail! binlog {} is before 
dummy. dbId: {}", binlog, dbId);
+                        continue;
+                    }
+                    binlog.setTableRef(0);
+                    dbBinlog.recoverBinlog(binlog, currentDbBinlogEnable);
+                }
             }
+        } catch (TException e) {
+            throw new IOException("failed to read binlog from TMemoryBuffer", 
e);
         }
 
         return checksum;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogTombstone.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogTombstone.java
index 22e0fd9eba..50d9f90a01 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogTombstone.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogTombstone.java
@@ -71,8 +71,8 @@ public class BinlogTombstone {
         tableVersionMap.put(tableId, tableRecord);
     }
 
-    public void addTableRecord(long tableId, UpsertRecord.TableRecord record) {
-        tableVersionMap.put(tableId, record);
+    public void addTableRecord(Map<Long, UpsertRecord.TableRecord> records) {
+        tableVersionMap.putAll(records);
     }
 
     public boolean isDbBinlogTomstone() {
@@ -91,6 +91,10 @@ public class BinlogTombstone {
         return commitSeq;
     }
 
+    public void setCommitSeq(long seq) {
+        commitSeq = seq;
+    }
+
     public Map<Long, UpsertRecord.TableRecord> getTableVersionMap() {
         return tableVersionMap;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java
index 9742bed23d..3af4053ef1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java
@@ -17,14 +17,24 @@
 
 package org.apache.doris.binlog;
 
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Table;
 import org.apache.doris.common.Pair;
 import org.apache.doris.thrift.TBinlog;
+import org.apache.doris.thrift.TBinlogType;
 import org.apache.doris.thrift.TStatus;
 import org.apache.doris.thrift.TStatusCode;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 import java.util.TreeSet;
 
 public class BinlogUtils {
+    private static final Logger LOG = LogManager.getLogger(BinlogUtils.class);
+
     public static Pair<TStatus, TBinlog> getBinlog(TreeSet<TBinlog> binlogs, 
long prevCommitSeq) {
         TStatus status = new TStatus(TStatusCode.OK);
         TBinlog firstBinlog = binlogs.first();
@@ -69,4 +79,47 @@ public class BinlogUtils {
             return Pair.of(status, 
Long.valueOf(binlogs.tailSet(binlog).size()));
         }
     }
+
+    public static TBinlog newDummyBinlog(long dbId, long tableId) {
+        TBinlog dummy = new TBinlog();
+        dummy.setCommitSeq(-1);
+        dummy.setTimestamp(-1);
+        dummy.setType(TBinlogType.DUMMY);
+        dummy.setDbId(dbId);
+        dummy.setBelong(tableId);
+        return dummy;
+    }
+
+    public static boolean tableEnabledBinlog(long dbId, long tableId) {
+        Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
+        if (db == null) {
+            LOG.error("db not found. dbId: {}", dbId);
+            return false;
+        }
+
+        OlapTable table;
+        try {
+            Table tbl = db.getTableOrMetaException(tableId);
+            if (tbl == null) {
+                LOG.warn("fail to get table. db: {}, table id: {}", 
db.getFullName(), tableId);
+                return false;
+            }
+            if (!(tbl instanceof OlapTable)) {
+                LOG.warn("table is not olap table. db: {}, table id: {}", 
db.getFullName(), tableId);
+                return false;
+            }
+            table = (OlapTable) tbl;
+        } catch (Exception e) {
+            LOG.warn("fail to get table. db: {}, table id: {}", 
db.getFullName(), tableId);
+            return false;
+        }
+
+        return table.getBinlogConfig().isEnable();
+    }
+
+    public static long getExpiredMs(long ttlSeconds) {
+        long currentSeconds = System.currentTimeMillis() / 1000;
+        long expireSeconds = currentSeconds - ttlSeconds;
+        return expireSeconds * 1000;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
index 0f113ff491..58708c8fe6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
@@ -21,15 +21,17 @@ import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.Pair;
 import org.apache.doris.thrift.TBinlog;
+import org.apache.doris.thrift.TBinlogType;
 import org.apache.doris.thrift.TStatus;
 import org.apache.doris.thrift.TStatusCode;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
+import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -52,21 +54,74 @@ public class DBBinlog {
     // need UpsertRecord to add timestamps for gc
     private List<Pair<Long, Long>> timestamps;
 
-    public DBBinlog(long dbId) {
+    private List<TBinlog> tableDummyBinlogs;
+
+    public DBBinlog(TBinlog binlog) {
         lock = new ReentrantReadWriteLock();
-        this.dbId = dbId;
+        this.dbId = binlog.getDbId();
+
         // allBinlogs treeset order by commitSeq
-        allBinlogs = new TreeSet<TBinlog>((o1, o2) -> {
-            if (o1.getCommitSeq() < o2.getCommitSeq()) {
-                return -1;
-            } else if (o1.getCommitSeq() > o2.getCommitSeq()) {
-                return 1;
-            } else {
-                return 0;
+        allBinlogs = 
Sets.newTreeSet(Comparator.comparingLong(TBinlog::getCommitSeq));
+        tableDummyBinlogs = Lists.newArrayList();
+        tableBinlogMap = Maps.newHashMap();
+        timestamps = Lists.newArrayList();
+
+        TBinlog dummy;
+        if (binlog.getType() == TBinlogType.DUMMY) {
+            dummy = binlog;
+        } else {
+            dummy = BinlogUtils.newDummyBinlog(dbId, -1);
+        }
+        allBinlogs.add(dummy);
+    }
+
+    public static DBBinlog recoverDbBinlog(TBinlog dbDummy, List<TBinlog> 
tableDummies, boolean dbBinlogEnable) {
+        DBBinlog dbBinlog = new DBBinlog(dbDummy);
+        for (TBinlog tableDummy : tableDummies) {
+            long tableId = tableDummy.getBelong();
+            if (!dbBinlogEnable && 
!BinlogUtils.tableEnabledBinlog(dbBinlog.getDbId(), tableId)) {
+                continue;
+            }
+            dbBinlog.tableBinlogMap.put(tableId, new TableBinlog(tableDummy, 
tableId));
+            dbBinlog.tableDummyBinlogs.add(tableDummy);
+        }
+
+        return dbBinlog;
+    }
+
+    // not thread safety, do this without lock
+    public void recoverBinlog(TBinlog binlog, boolean dbBinlogEnable) {
+        List<Long> tableIds = binlog.getTableIds();
+
+        if (binlog.getTimestamp() > 0 && dbBinlogEnable) {
+            timestamps.add(Pair.of(binlog.getCommitSeq(), 
binlog.getTimestamp()));
+        }
+
+        allBinlogs.add(binlog);
+
+        if (tableIds == null) {
+            return;
+        }
+
+        for (long tableId : tableIds) {
+            TableBinlog tableBinlog = getTableBinlog(binlog, tableId, 
dbBinlogEnable);
+            if (tableBinlog == null) {
+                continue;
+            }
+            tableBinlog.recoverBinlog(binlog);
+        }
+    }
+
+    private TableBinlog getTableBinlog(TBinlog binlog, long tableId, boolean 
dbBinlogEnable) {
+        TableBinlog tableBinlog = tableBinlogMap.get(tableId);
+        if (tableBinlog == null) {
+            if (dbBinlogEnable || BinlogUtils.tableEnabledBinlog(dbId, 
tableId)) {
+                tableBinlog = new TableBinlog(binlog, tableId);
+                tableBinlogMap.put(tableId, tableBinlog);
+                tableDummyBinlogs.add(tableBinlog.getDummyBinlog());
             }
-        });
-        tableBinlogMap = new HashMap<Long, TableBinlog>();
-        timestamps = new ArrayList<Pair<Long, Long>>();
+        }
+        return tableBinlog;
     }
 
     public void addBinlog(TBinlog binlog, boolean dbBinlogEnable) {
@@ -97,16 +152,21 @@ public class DBBinlog {
             }
 
             for (long tableId : tableIds) {
-                TableBinlog tableBinlog = tableBinlogMap.get(tableId);
-                if (tableBinlog == null) {
-                    tableBinlog = new TableBinlog(tableId);
-                    tableBinlogMap.put(tableId, tableBinlog);
+                TableBinlog tableBinlog = getTableBinlog(binlog, tableId, 
dbBinlogEnable);
+                if (tableBinlog != null) {
+                    tableBinlog.addBinlog(binlog);
                 }
-                tableBinlog.addBinlog(binlog);
             }
         } finally {
             lock.writeLock().unlock();
         }
+
+        lock.readLock().lock();
+        try {
+            LOG.info("[deadlinefen] after add, db {} binlogs: {}, dummys: {}", 
dbId, allBinlogs, tableDummyBinlogs);
+        } finally {
+            lock.readLock().unlock();
+        }
     }
 
     public long getDbId() {
@@ -151,7 +211,7 @@ public class DBBinlog {
         }
     }
 
-    public List<BinlogTombstone> gc() {
+    public BinlogTombstone gc() {
         // check db
         Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
         if (db == null) {
@@ -160,33 +220,55 @@ public class DBBinlog {
         }
 
         boolean dbBinlogEnable = db.getBinlogConfig().isEnable();
+        BinlogTombstone tombstone;
         if (dbBinlogEnable) {
-            // db binlog is enable, only one binlogTombstones
+            // db binlog is enabled, only one binlogTombstones
             long ttlSeconds = db.getBinlogConfig().getTtlSeconds();
-            long currentSeconds = System.currentTimeMillis() / 1000;
-            long expireSeconds = currentSeconds - ttlSeconds;
-            long expireMs = expireSeconds * 1000;
+            long expiredMs = BinlogUtils.getExpiredMs(ttlSeconds);
 
-            BinlogTombstone tombstone = dbBinlogEnableGc(expireMs);
-            List<BinlogTombstone> tombstones = new 
ArrayList<BinlogTombstone>();
-            if (tombstone != null) {
-                tombstones.add(tombstone);
-            }
-            return tombstones;
+            tombstone = dbBinlogEnableGc(expiredMs);
         } else {
-            return dbBinlogDisableGc(db);
+            tombstone = dbBinlogDisableGc(db);
         }
+
+        return tombstone;
     }
 
-    private List<BinlogTombstone> dbBinlogDisableGc(Database db) {
-        List<BinlogTombstone> tombstones = new ArrayList<BinlogTombstone>();
-        List<TableBinlog> tableBinlogs = null;
+    private BinlogTombstone collectTableTombstone(List<BinlogTombstone> 
tableTombstones) {
+        if (tableTombstones.isEmpty()) {
+            return null;
+        }
 
-        lock.writeLock().lock();
+        List<Long> tableIds = Lists.newArrayList();
+        long largestExpiredCommitSeq = -1;
+        BinlogTombstone dbTombstone = new BinlogTombstone(dbId, tableIds, -1);
+        for (BinlogTombstone tableTombstone : tableTombstones) {
+            long commitSeq = tableTombstone.getCommitSeq();
+            if (largestExpiredCommitSeq < commitSeq) {
+                largestExpiredCommitSeq = commitSeq;
+            }
+            Map<Long, UpsertRecord.TableRecord> tableVersionMap = 
tableTombstone.getTableVersionMap();
+            if (tableVersionMap.size() > 1) {
+                LOG.warn("tableVersionMap size is greater than 1. 
tableVersionMap: {}", tableVersionMap);
+            }
+            tableIds.addAll(tableTombstone.getTableIds());
+            dbTombstone.addTableRecord(tableVersionMap);
+        }
+
+        dbTombstone.setCommitSeq(largestExpiredCommitSeq);
+
+        return dbTombstone;
+    }
+
+    private BinlogTombstone dbBinlogDisableGc(Database db) {
+        List<BinlogTombstone> tombstones = Lists.newArrayList();
+        List<TableBinlog> tableBinlogs;
+
+        lock.readLock().lock();
         try {
-            tableBinlogs = new ArrayList<TableBinlog>(tableBinlogMap.values());
+            tableBinlogs = Lists.newArrayList(tableBinlogMap.values());
         } finally {
-            lock.writeLock().unlock();
+            lock.readLock().unlock();
         }
 
         for (TableBinlog tableBinlog : tableBinlogs) {
@@ -195,70 +277,110 @@ public class DBBinlog {
                 tombstones.add(tombstone);
             }
         }
-        return tombstones;
-    }
+        BinlogTombstone tombstone = collectTableTombstone(tombstones);
+        if (tombstone != null) {
+            removeExpiredMetaData(tombstone.getCommitSeq());
+
+            lock.readLock().lock();
+            try {
+                LOG.info("[deadlinefen] after gc, db {} binlogs: {}, 
tombstone.seq: {}, dummys: {}",
+                        dbId, allBinlogs, tombstone.getCommitSeq(), 
tableDummyBinlogs);
+            } finally {
+                lock.readLock().unlock();
+            }
+        }
 
-    private BinlogTombstone dbBinlogEnableGc(long expireMs) {
-        // find commitSeq from timestamps, if commitSeq's timestamp is less 
than expireSeconds, then remove it
-        long largestExpiredCommitSeq = -1;
-        TBinlog tombstoneBinlog = null;
-        List<Long> tableIds = null;
-        List<TableBinlog> tableBinlogs = null;
+        return tombstone;
+    }
 
+    private void removeExpiredMetaData(long largestExpiredCommitSeq) {
         lock.writeLock().lock();
         try {
-            Iterator<Pair<Long, Long>> iterator = timestamps.iterator();
-            while (iterator.hasNext()) {
-                Pair<Long, Long> pair = iterator.next();
-                if (pair.second < expireMs) {
-                    largestExpiredCommitSeq = pair.first;
-                    iterator.remove();
+            Iterator<TBinlog> binlogIter = allBinlogs.iterator();
+            TBinlog dummy = binlogIter.next();
+            boolean foundFirstUsingBinlog = false;
+            long lastCommitSeq = -1;
+
+            while (binlogIter.hasNext()) {
+                TBinlog binlog = binlogIter.next();
+                long commitSeq = binlog.getCommitSeq();
+                if (commitSeq <= largestExpiredCommitSeq) {
+                    if (binlog.table_ref <= 0) {
+                        binlogIter.remove();
+                        if (!foundFirstUsingBinlog) {
+                            lastCommitSeq = commitSeq;
+                        }
+                    } else {
+                        foundFirstUsingBinlog = true;
+                    }
                 } else {
                     break;
                 }
             }
 
-            Iterator<TBinlog> binlogIterator = allBinlogs.iterator();
-            while (binlogIterator.hasNext()) {
-                TBinlog binlog = binlogIterator.next();
-                if (binlog.getCommitSeq() <= largestExpiredCommitSeq) {
-                    tombstoneBinlog = binlog;
-                    binlogIterator.remove();
+            if (lastCommitSeq != -1) {
+                dummy.setCommitSeq(lastCommitSeq);
+            }
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    private BinlogTombstone dbBinlogEnableGc(long expiredMs) {
+        // step 1: get current tableBinlog info and expiredCommitSeq
+        long expiredCommitSeq = -1;
+        lock.readLock().lock();
+        try {
+            Iterator<Pair<Long, Long>> timeIter = timestamps.iterator();
+            while (timeIter.hasNext()) {
+                Pair<Long, Long> pair = timeIter.next();
+                if (pair.second <= expiredMs) {
+                    expiredCommitSeq = pair.first;
+                    timeIter.remove();
                 } else {
                     break;
                 }
             }
 
-            tableIds = new ArrayList<Long>(tableBinlogMap.keySet());
-            tableBinlogs = new ArrayList<TableBinlog>(tableBinlogMap.values());
+            Iterator<TBinlog> binlogIter = allBinlogs.iterator();
+            TBinlog dummy = binlogIter.next();
+            dummy.setCommitSeq(expiredCommitSeq);
+
+            while (binlogIter.hasNext()) {
+                TBinlog binlog = binlogIter.next();
+                if (binlog.getCommitSeq() <= expiredCommitSeq) {
+                    binlogIter.remove();
+                } else {
+                    break;
+                }
+            }
         } finally {
-            lock.writeLock().unlock();
+            lock.readLock().unlock();
         }
-        LOG.info("gc binlog. dbId: {}, expireMs: {}, largestExpiredCommitSeq: 
{}",
-                dbId, expireMs, largestExpiredCommitSeq);
-        if (tombstoneBinlog == null) {
+
+        if (expiredCommitSeq == -1) {
             return null;
         }
 
-        BinlogTombstone tombstone = new BinlogTombstone(dbId, tableIds, 
tombstoneBinlog.getCommitSeq());
-        for (TableBinlog tableBinlog : tableBinlogs) {
-            BinlogTombstone binlogTombstone = 
tableBinlog.gc(largestExpiredCommitSeq);
-            if (binlogTombstone == null) {
-                continue;
+        // step 2: gc every tableBinlog in dbBinlog, get table tombstone to 
complete db tombstone
+        List<BinlogTombstone> tableTombstones = Lists.newArrayList();
+        for (TableBinlog tableBinlog : tableBinlogMap.values()) {
+            // step 2.1: gc tableBinlog,and get table tombstone
+            BinlogTombstone tableTombstone = tableBinlog.gc(expiredCommitSeq);
+            if (tableTombstone != null) {
+                tableTombstones.add(tableTombstone);
             }
+        }
 
-            Map<Long, UpsertRecord.TableRecord> tableVersionMap = 
binlogTombstone.getTableVersionMap();
-            if (tableVersionMap.size() > 1) {
-                LOG.warn("tableVersionMap size is greater than 1. 
tableVersionMap: {}", tableVersionMap);
-            }
-            for (Map.Entry<Long, UpsertRecord.TableRecord> entry : 
tableVersionMap.entrySet()) {
-                long tableId = entry.getKey();
-                UpsertRecord.TableRecord record = entry.getValue();
-                tombstone.addTableRecord(tableId, record);
-            }
+        lock.readLock().lock();
+        try {
+            LOG.info("[deadlinefen] after gc, db {} binlogs: {}, 
tombstone.seq: {}, dummys: {}",
+                    dbId, allBinlogs, expiredCommitSeq, tableDummyBinlogs);
+        } finally {
+            lock.readLock().unlock();
         }
 
-        return tombstone;
+        return collectTableTombstone(tableTombstones);
     }
 
     public void replayGc(BinlogTombstone tombstone) {
@@ -266,6 +388,7 @@ public class DBBinlog {
             dbBinlogEnableReplayGc(tombstone);
         } else {
             dbBinlogDisableReplayGc(tombstone);
+            removeExpiredMetaData(tombstone.getCommitSeq());
         }
     }
 
@@ -274,11 +397,11 @@ public class DBBinlog {
 
         lock.writeLock().lock();
         try {
-            Iterator<Pair<Long, Long>> iterator = timestamps.iterator();
-            while (iterator.hasNext()) {
-                Pair<Long, Long> pair = iterator.next();
-                if (pair.first <= largestExpiredCommitSeq) {
-                    iterator.remove();
+            Iterator<Pair<Long, Long>> timeIter = timestamps.iterator();
+            while (timeIter.hasNext()) {
+                long commitSeq = timeIter.next().first;
+                if (commitSeq <= largestExpiredCommitSeq) {
+                    timeIter.remove();
                 } else {
                     break;
                 }
@@ -301,11 +424,11 @@ public class DBBinlog {
     }
 
     public void dbBinlogDisableReplayGc(BinlogTombstone tombstone) {
-        List<TableBinlog> tableBinlogs = null;
+        List<TableBinlog> tableBinlogs;
 
         lock.writeLock().lock();
         try {
-            tableBinlogs = new ArrayList<TableBinlog>(tableBinlogMap.values());
+            tableBinlogs = Lists.newArrayList(tableBinlogMap.values());
         } finally {
             lock.writeLock().unlock();
         }
@@ -314,7 +437,7 @@ public class DBBinlog {
             return;
         }
 
-        Set<Long> tableIds = new HashSet<Long>(tombstone.getTableIds());
+        Set<Long> tableIds = Sets.newHashSet(tombstone.getTableIds());
         long largestExpiredCommitSeq = tombstone.getCommitSeq();
         for (TableBinlog tableBinlog : tableBinlogs) {
             if (tableIds.contains(tableBinlog.getTableId())) {
@@ -325,6 +448,7 @@ public class DBBinlog {
 
     // not thread safety, do this without lock
     public void getAllBinlogs(List<TBinlog> binlogs) {
+        binlogs.addAll(tableDummyBinlogs);
         binlogs.addAll(allBinlogs);
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
index 2b0d45b694..9b82272f63 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
@@ -25,9 +25,11 @@ import org.apache.doris.thrift.TBinlog;
 import org.apache.doris.thrift.TBinlogType;
 import org.apache.doris.thrift.TStatus;
 
+import com.google.common.collect.Sets;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.Comparator;
 import java.util.Iterator;
 import java.util.TreeSet;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -39,29 +41,43 @@ public class TableBinlog {
     private ReentrantReadWriteLock lock;
     private TreeSet<TBinlog> binlogs;
 
-    public TableBinlog(long tableId) {
+    public TableBinlog(TBinlog binlog, long tableId) {
         this.tableId = tableId;
         lock = new ReentrantReadWriteLock();
-        // binlogs treeset order by commitSeq
-        binlogs = new TreeSet<TBinlog>((o1, o2) -> {
-            if (o1.getCommitSeq() < o2.getCommitSeq()) {
-                return -1;
-            } else if (o1.getCommitSeq() > o2.getCommitSeq()) {
-                return 1;
-            } else {
-                return 0;
-            }
-        });
+        binlogs = 
Sets.newTreeSet(Comparator.comparingLong(TBinlog::getCommitSeq));
+
+        TBinlog dummy;
+        if (binlog.getType() == TBinlogType.DUMMY) {
+            dummy = binlog;
+        } else {
+            dummy = BinlogUtils.newDummyBinlog(binlog.getDbId(), tableId);
+        }
+        binlogs.add(dummy);
+    }
+
+    public TBinlog getDummyBinlog() {
+        return binlogs.first();
     }
 
     public long getTableId() {
         return tableId;
     }
 
+    // not thread safety, do this without lock
+    public void recoverBinlog(TBinlog binlog) {
+        TBinlog dummy = getDummyBinlog();
+        if (binlog.getCommitSeq() > dummy.getCommitSeq()) {
+            binlogs.add(binlog);
+            ++binlog.table_ref;
+        }
+    }
+
     public void addBinlog(TBinlog binlog) {
         lock.writeLock().lock();
         try {
             binlogs.add(binlog);
+            ++binlog.table_ref;
+            LOG.info("[deadlinefen] after add, table {} binlogs: {}", tableId, 
binlogs);
         } finally {
             lock.writeLock().unlock();
         }
@@ -85,41 +101,80 @@ public class TableBinlog {
         }
     }
 
-    // this method call when db binlog enable
-    public BinlogTombstone gc(long largestExpiredCommitSeq) {
+    private Pair<TBinlog, Long> getLastUpsertAndLargestCommitSeq(long expired, 
BinlogComparator check) {
+        if (binlogs.size() <= 1) {
+            return null;
+        }
+
+        Iterator<TBinlog> iter = binlogs.iterator();
+        TBinlog dummyBinlog = iter.next();
         TBinlog tombstoneUpsert = null;
+        TBinlog lastExpiredBinlog = null;
+        while (iter.hasNext()) {
+            TBinlog binlog = iter.next();
+            if (check.isExpired(binlog, expired)) {
+                lastExpiredBinlog = binlog;
+                --binlog.table_ref;
+                if (binlog.getType() == TBinlogType.UPSERT) {
+                    tombstoneUpsert = binlog;
+                }
+                iter.remove();
+            } else {
+                break;
+            }
+        }
+
+        if (lastExpiredBinlog == null) {
+            return null;
+        }
+
+        dummyBinlog.setCommitSeq(lastExpiredBinlog.getCommitSeq());
+
+        return Pair.of(tombstoneUpsert, lastExpiredBinlog.getCommitSeq());
+    }
+
+    // this method call when db binlog enable
+    public BinlogTombstone gc(long expiredCommitSeq) {
+        Pair<TBinlog, Long> tombstoneInfo;
 
+        // step 1: get tombstoneUpsertBinlog and dummyBinlog
         lock.writeLock().lock();
         try {
-            Iterator<TBinlog> iter = binlogs.iterator();
-            while (iter.hasNext()) {
-                TBinlog binlog = iter.next();
-                if (binlog.getCommitSeq() <= largestExpiredCommitSeq) {
-                    if (binlog.getType() == TBinlogType.UPSERT) {
-                        tombstoneUpsert = binlog;
-                    }
-                    iter.remove();
-                } else {
-                    break;
-                }
-            }
+            BinlogComparator check = (binlog, expire) -> binlog.getCommitSeq() 
<= expire;
+            tombstoneInfo = getLastUpsertAndLargestCommitSeq(expiredCommitSeq, 
check);
         } finally {
             lock.writeLock().unlock();
         }
 
-        if (tombstoneUpsert == null) {
+        // step 2: set tombstone by tombstoneInfo
+        // if there have expired Binlogs, tombstoneInfo != null
+        if (tombstoneInfo == null) {
             return null;
         }
 
-        BinlogTombstone tombstone = new BinlogTombstone(-1, 
largestExpiredCommitSeq);
-        UpsertRecord upsertRecord = 
UpsertRecord.fromJson(tombstoneUpsert.getData());
-        tombstone.addTableRecord(tableId, upsertRecord);
+        TBinlog lastUpsertBinlog = tombstoneInfo.first;
+        long largestCommitSeq = tombstoneInfo.second;
+        BinlogTombstone tombstone = new BinlogTombstone(-1, largestCommitSeq);
+        if (lastUpsertBinlog != null) {
+            UpsertRecord upsertRecord = 
UpsertRecord.fromJson(lastUpsertBinlog.getData());
+            tombstone.addTableRecord(tableId, upsertRecord);
+        }
+
+        lock.readLock().lock();
+        try {
+            LOG.info("[deadlinefen] after gc, table {} binlogs: {}, 
tombstone.seq: {}",
+                    tableId, binlogs, tombstone.getCommitSeq());
+        } finally {
+            lock.readLock().unlock();
+        }
+
         return tombstone;
     }
 
     // this method call when db binlog disable
     public BinlogTombstone gc(Database db) {
-        OlapTable table = null;
+        // step 1: get expire time
+        OlapTable table;
         try {
             Table tbl = db.getTableOrMetaException(tableId);
             if (tbl == null) {
@@ -138,39 +193,45 @@ public class TableBinlog {
 
         long dbId = db.getId();
         long ttlSeconds = table.getBinlogConfig().getTtlSeconds();
-        long currentSeconds = System.currentTimeMillis() / 1000;
-        long expireSeconds = currentSeconds - ttlSeconds;
-        long expireMs = expireSeconds * 1000;
+        long expiredMs = BinlogUtils.getExpiredMs(ttlSeconds);
 
-        TBinlog tombstoneUpsert = null;
-        long largestExpiredCommitSeq = 0;
+        if (expiredMs < 0) {
+            return null;
+        }
+
+        // step 2: get tombstoneUpsertBinlog and dummyBinlog
+        Pair<TBinlog, Long> tombstoneInfo;
         lock.writeLock().lock();
         try {
-            Iterator<TBinlog> iter = binlogs.iterator();
-            while (iter.hasNext()) {
-                TBinlog binlog = iter.next();
-                long timestamp = binlog.getTimestamp();
-
-                if (timestamp > expireMs) {
-                    break;
-                }
-
-                if (binlog.getType() == TBinlogType.UPSERT) {
-                    tombstoneUpsert = binlog;
-                }
-                largestExpiredCommitSeq = binlog.getCommitSeq();
-                iter.remove();
-            }
+            BinlogComparator check = (binlog, expire) -> binlog.getTimestamp() 
<= expire;
+            tombstoneInfo = getLastUpsertAndLargestCommitSeq(expiredMs, check);
         } finally {
             lock.writeLock().unlock();
         }
 
-        BinlogTombstone tombstone = new BinlogTombstone(dbId, tableId, 
largestExpiredCommitSeq);
-        if (tombstoneUpsert != null) {
-            UpsertRecord upsertRecord = 
UpsertRecord.fromJson(tombstoneUpsert.getData());
+        // step 3: set tombstone by tombstoneInfo
+        // if have expired Binlogs, tombstoneInfo != null
+        if (tombstoneInfo == null) {
+            return null;
+        }
+
+        TBinlog lastUpsertBinlog = tombstoneInfo.first;
+        long largestCommitSeq = tombstoneInfo.second;
+        BinlogTombstone tombstone = new BinlogTombstone(dbId, tableId, 
largestCommitSeq);
+        if (lastUpsertBinlog != null) {
+            UpsertRecord upsertRecord = 
UpsertRecord.fromJson(lastUpsertBinlog.getData());
             tombstone.addTableRecord(tableId, upsertRecord);
         }
 
+        lock.readLock().lock();
+        try {
+            LOG.info("[deadlinefen] after gc, table {} binlogs: {}, 
tombstone.seq: {}",
+                    tableId, binlogs, tombstone.getCommitSeq());
+        } finally {
+            lock.readLock().unlock();
+        }
+
+
         return tombstone;
     }
 
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 4630f8bdd0..619337321f 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -961,8 +961,9 @@ enum TBinlogType {
   CREATE_TABLE = 2,
   DROP_PARTITION = 3,
   DROP_TABLE = 4,
-  ALTER_JOB = 5, 
+  ALTER_JOB = 5,
   MODIFY_TABLE_ADD_OR_DROP_COLUMNS = 6,
+  DUMMY = 7,
 }
 
 struct TBinlog {
@@ -972,6 +973,8 @@ struct TBinlog {
     4: optional i64 db_id
     5: optional list<i64> table_ids
     6: optional string data
+    7: optional i64 belong  // belong == -1 if type is not DUMMY
+    8: optional i64 table_ref // only use for gc
 }
 
 struct TGetBinlogResult {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to