This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 7e0c9f551efb38880d898b6431db12baf5f390fc Author: DeadlineFen <[email protected]> AuthorDate: Fri Jul 14 16:59:32 2023 +0800 [Enhancement] (binlog) TBinlog and BinlogManager V2 (#21674) --- .../org/apache/doris/binlog/BinlogComparator.java | 24 ++ .../java/org/apache/doris/binlog/BinlogGcer.java | 5 +- .../org/apache/doris/binlog/BinlogManager.java | 120 +++++---- .../org/apache/doris/binlog/BinlogTombstone.java | 8 +- .../java/org/apache/doris/binlog/BinlogUtils.java | 53 ++++ .../java/org/apache/doris/binlog/DBBinlog.java | 298 +++++++++++++++------ .../java/org/apache/doris/binlog/TableBinlog.java | 167 ++++++++---- gensrc/thrift/FrontendService.thrift | 5 +- 8 files changed, 478 insertions(+), 202 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogComparator.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogComparator.java new file mode 100644 index 0000000000..9e35cc3bd6 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogComparator.java @@ -0,0 +1,24 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.binlog; + +import org.apache.doris.thrift.TBinlog; + +public interface BinlogComparator { + boolean isExpired(TBinlog binlog, long expired); +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogGcer.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogGcer.java index a2e6afd1fc..96d41946ff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogGcer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogGcer.java @@ -135,9 +135,8 @@ public class BinlogGcer extends MasterDaemon { } } - private void sendTableGcInfoToBe(Map<Long, BinlogGcTask> beBinlogGcTaskMap, OlapTable table, + private void sendTableGcInfoToBe(Map<Long, BinlogGcTask> beBinlogGcTaskMap, OlapTable olapTable, UpsertRecord.TableRecord tableRecord) { - OlapTable olapTable = (OlapTable) table; olapTable.readLock(); try { @@ -173,7 +172,7 @@ public class BinlogGcer extends MasterDaemon { } } } finally { - table.readUnlock(); + olapTable.readUnlock(); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java index 63e773af4d..b07072955f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java @@ -38,15 +38,10 @@ import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.transport.TMemoryBuffer; import org.apache.thrift.transport.TMemoryInputTransport; -import org.apache.thrift.transport.TTransportException; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -83,7 +78,7 @@ public class BinlogManager { try { dbBinlog = dbBinlogMap.get(dbId); if (dbBinlog == null) { - dbBinlog = new DBBinlog(dbId); + dbBinlog = new DBBinlog(binlog); dbBinlogMap.put(dbId, dbBinlog); } } finally { @@ -105,6 +100,7 @@ public class BinlogManager { if (tableIds != null && !tableIds.isEmpty()) { binlog.setTableIds(tableIds); } + binlog.setTableRef(0); addBinlog(binlog); } @@ -121,7 +117,7 @@ public class BinlogManager { public void addAddPartitionRecord(AddPartitionRecord addPartitionRecord) { long dbId = addPartitionRecord.getDbId(); - List<Long> tableIds = new ArrayList<Long>(); + List<Long> tableIds = Lists.newArrayList(); tableIds.add(addPartitionRecord.getTableId()); long commitSeq = addPartitionRecord.getCommitSeq(); long timestamp = -1; @@ -133,7 +129,7 @@ public class BinlogManager { public void addCreateTableRecord(CreateTableRecord createTableRecord) { long dbId = createTableRecord.getDbId(); - List<Long> tableIds = new ArrayList<Long>(); + List<Long> tableIds = Lists.newArrayList(); tableIds.add(createTableRecord.getTableId()); long commitSeq = createTableRecord.getCommitSeq(); long timestamp = -1; @@ -145,7 +141,7 @@ public class BinlogManager { public void addDropPartitionRecord(DropPartitionInfo dropPartitionInfo, long commitSeq) { long dbId = dropPartitionInfo.getDbId(); - List<Long> tableIds = new ArrayList<Long>(); + List<Long> tableIds = Lists.newArrayList(); tableIds.add(dropPartitionInfo.getTableId()); long timestamp = -1; TBinlogType type = TBinlogType.DROP_PARTITION; @@ -156,7 +152,7 @@ public class BinlogManager { public void addDropTableRecord(DropTableRecord record) { long dbId = record.getDbId(); - List<Long> tableIds = new ArrayList<Long>(); + List<Long> tableIds = Lists.newArrayList(); tableIds.add(record.getTableId()); long commitSeq = record.getCommitSeq(); long timestamp = -1; @@ -168,7 +164,7 @@ public class BinlogManager { public void addAlterJobV2(AlterJobV2 alterJob, long commitSeq) { long dbId = alterJob.getDbId(); - List<Long> tableIds = new ArrayList<Long>(); + List<Long> tableIds = Lists.newArrayList(); tableIds.add(alterJob.getTableId()); long timestamp = -1; TBinlogType type = TBinlogType.ALTER_JOB; @@ -179,7 +175,7 @@ public class BinlogManager { public void addModifyTableAddOrDropColumns(TableAddOrDropColumnsInfo info, long commitSeq) { long dbId = info.getDbId(); - List<Long> tableIds = new ArrayList<Long>(); + List<Long> tableIds = Lists.newArrayList(); tableIds.add(info.getTableId()); long timestamp = -1; TBinlogType type = TBinlogType.MODIFY_TABLE_ADD_OR_DROP_COLUMNS; @@ -228,9 +224,9 @@ public class BinlogManager { LOG.info("begin gc binlog"); lock.writeLock().lock(); - Map<Long, DBBinlog> gcDbBinlogMap = null; + Map<Long, DBBinlog> gcDbBinlogMap; try { - gcDbBinlogMap = new HashMap<Long, DBBinlog>(dbBinlogMap); + gcDbBinlogMap = Maps.newHashMap(dbBinlogMap); } finally { lock.writeLock().unlock(); } @@ -242,9 +238,9 @@ public class BinlogManager { List<BinlogTombstone> tombstones = Lists.newArrayList(); for (DBBinlog dbBinlog : gcDbBinlogMap.values()) { - List<BinlogTombstone> dbTombstones = dbBinlog.gc(); + BinlogTombstone dbTombstones = dbBinlog.gc(); if (dbTombstones != null) { - tombstones.addAll(dbTombstones); + tombstones.add(dbTombstones); } } return tombstones; @@ -252,9 +248,9 @@ public class BinlogManager { public void replayGc(BinlogGcInfo binlogGcInfo) { lock.writeLock().lock(); - Map<Long, DBBinlog> gcDbBinlogMap = null; + Map<Long, DBBinlog> gcDbBinlogMap; try { - gcDbBinlogMap = new HashMap<Long, DBBinlog>(dbBinlogMap); + gcDbBinlogMap = Maps.newHashMap(dbBinlogMap); } finally { lock.writeLock().unlock(); } @@ -309,18 +305,11 @@ public class BinlogManager { return checksum; } - List<TBinlog> binlogs = new ArrayList<TBinlog>(); + List<TBinlog> binlogs = Lists.newArrayList(); // Step 1: get all binlogs for (DBBinlog dbBinlog : dbBinlogMap.values()) { dbBinlog.getAllBinlogs(binlogs); } - // sort binlogs by commitSeq - Collections.sort(binlogs, new Comparator<TBinlog>() { - @Override - public int compare(TBinlog o1, TBinlog o2) { - return Long.compare(o1.getCommitSeq(), o2.getCommitSeq()); - } - }); // Step 2: write binlogs length dos.writeInt(binlogs.size()); @@ -339,31 +328,6 @@ public class BinlogManager { return checksum; } - public void read(DataInputStream dis) throws IOException { - // Step 1: read binlogs length - int length = dis.readInt(); - - // Step 2: read all binlogs from dis && add binlog - TMemoryBuffer buffer; - TBinaryProtocol protocol; - try { - buffer = new TMemoryBuffer(BUFFER_SIZE); - protocol = new TBinaryProtocol(buffer); - } catch (TTransportException e) { - throw new IOException("failed to create TMemoryBuffer"); - } - - for (int i = 0; i < length; i++) { - TBinlog binlog = new TBinlog(); - try { - binlog.read(protocol); - } catch (TException e) { - throw new IOException("failed to read binlog from TMemoryBuffer"); - } - addBinlog(binlog); - } - } - public TBinlog readTBinlogFromStream(DataInputStream dis) throws TException, IOException { // We assume that the first int is the length of the serialized data. int length = dis.readInt(); @@ -376,19 +340,63 @@ public class BinlogManager { return binlog; } + // db TBinlogs in file struct: + // (tableDummy)TBinlog.belong == tableId, (dbDummy)TBinlog.belong == -1 + // +---------------------------+------------------+-----------------------------------+ + // | (tableDummy)TBinlog | ... | (dbDummy)TBinlog | TBinlog | TBinlog | TBinlog | ... | + // +---------------------------+------------------+-----------------------------------+ + // | Unnecessary | Necessary | Unnecessary | + // +---------------------------+------------------+-----------------------------------+ public long read(DataInputStream dis, long checksum) throws IOException { // Step 1: read binlogs length int size = dis.readInt(); LOG.info("read binlogs length: {}", size); // Step 2: read all binlogs from dis - for (int i = 0; i < size; i++) { - try { + long currentDbId = -1; + boolean currentDbBinlogEnable = false; + List<TBinlog> tableDummies = Lists.newArrayList(); + try { + for (int i = 0; i < size; i++) { + // Step 2.1: read a binlog TBinlog binlog = readTBinlogFromStream(dis); - addBinlog(binlog); - } catch (TException e) { - throw new IOException("failed to read binlog from TMemoryBuffer", e); + + // Step 2.2: check if there is in next db Binlogs region + long dbId = binlog.getDbId(); + if (dbId != currentDbId) { + // if there is in next db Binlogs region, check and update metadata + Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId); + if (db == null) { + LOG.warn("db not found. dbId: {}", dbId); + continue; + } + currentDbId = dbId; + currentDbBinlogEnable = db.getBinlogConfig().isEnable(); + tableDummies = Lists.newArrayList(); + } + + // step 2.3: recover binlog + if (binlog.getType() == TBinlogType.DUMMY) { + // collect tableDummyBinlogs and dbDummyBinlog to recover DBBinlog and TableBinlog + if (binlog.getBelong() == -1) { + DBBinlog dbBinlog = DBBinlog.recoverDbBinlog(binlog, tableDummies, currentDbBinlogEnable); + dbBinlogMap.put(dbId, dbBinlog); + } else { + tableDummies.add(binlog); + } + } else { + // recover common binlogs + DBBinlog dbBinlog = dbBinlogMap.get(dbId); + if (dbBinlog == null) { + LOG.warn("dbBinlog recover fail! binlog {} is before dummy. dbId: {}", binlog, dbId); + continue; + } + binlog.setTableRef(0); + dbBinlog.recoverBinlog(binlog, currentDbBinlogEnable); + } } + } catch (TException e) { + throw new IOException("failed to read binlog from TMemoryBuffer", e); } return checksum; diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogTombstone.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogTombstone.java index 22e0fd9eba..50d9f90a01 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogTombstone.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogTombstone.java @@ -71,8 +71,8 @@ public class BinlogTombstone { tableVersionMap.put(tableId, tableRecord); } - public void addTableRecord(long tableId, UpsertRecord.TableRecord record) { - tableVersionMap.put(tableId, record); + public void addTableRecord(Map<Long, UpsertRecord.TableRecord> records) { + tableVersionMap.putAll(records); } public boolean isDbBinlogTomstone() { @@ -91,6 +91,10 @@ public class BinlogTombstone { return commitSeq; } + public void setCommitSeq(long seq) { + commitSeq = seq; + } + public Map<Long, UpsertRecord.TableRecord> getTableVersionMap() { return tableVersionMap; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java index 9742bed23d..3af4053ef1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java @@ -17,14 +17,24 @@ package org.apache.doris.binlog; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Table; import org.apache.doris.common.Pair; import org.apache.doris.thrift.TBinlog; +import org.apache.doris.thrift.TBinlogType; import org.apache.doris.thrift.TStatus; import org.apache.doris.thrift.TStatusCode; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import java.util.TreeSet; public class BinlogUtils { + private static final Logger LOG = LogManager.getLogger(BinlogUtils.class); + public static Pair<TStatus, TBinlog> getBinlog(TreeSet<TBinlog> binlogs, long prevCommitSeq) { TStatus status = new TStatus(TStatusCode.OK); TBinlog firstBinlog = binlogs.first(); @@ -69,4 +79,47 @@ public class BinlogUtils { return Pair.of(status, Long.valueOf(binlogs.tailSet(binlog).size())); } } + + public static TBinlog newDummyBinlog(long dbId, long tableId) { + TBinlog dummy = new TBinlog(); + dummy.setCommitSeq(-1); + dummy.setTimestamp(-1); + dummy.setType(TBinlogType.DUMMY); + dummy.setDbId(dbId); + dummy.setBelong(tableId); + return dummy; + } + + public static boolean tableEnabledBinlog(long dbId, long tableId) { + Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId); + if (db == null) { + LOG.error("db not found. dbId: {}", dbId); + return false; + } + + OlapTable table; + try { + Table tbl = db.getTableOrMetaException(tableId); + if (tbl == null) { + LOG.warn("fail to get table. db: {}, table id: {}", db.getFullName(), tableId); + return false; + } + if (!(tbl instanceof OlapTable)) { + LOG.warn("table is not olap table. db: {}, table id: {}", db.getFullName(), tableId); + return false; + } + table = (OlapTable) tbl; + } catch (Exception e) { + LOG.warn("fail to get table. db: {}, table id: {}", db.getFullName(), tableId); + return false; + } + + return table.getBinlogConfig().isEnable(); + } + + public static long getExpiredMs(long ttlSeconds) { + long currentSeconds = System.currentTimeMillis() / 1000; + long expireSeconds = currentSeconds - ttlSeconds; + return expireSeconds * 1000; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java index 0f113ff491..58708c8fe6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java @@ -21,15 +21,17 @@ import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.common.Pair; import org.apache.doris.thrift.TBinlog; +import org.apache.doris.thrift.TBinlogType; import org.apache.doris.thrift.TStatus; import org.apache.doris.thrift.TStatusCode; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; +import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -52,21 +54,74 @@ public class DBBinlog { // need UpsertRecord to add timestamps for gc private List<Pair<Long, Long>> timestamps; - public DBBinlog(long dbId) { + private List<TBinlog> tableDummyBinlogs; + + public DBBinlog(TBinlog binlog) { lock = new ReentrantReadWriteLock(); - this.dbId = dbId; + this.dbId = binlog.getDbId(); + // allBinlogs treeset order by commitSeq - allBinlogs = new TreeSet<TBinlog>((o1, o2) -> { - if (o1.getCommitSeq() < o2.getCommitSeq()) { - return -1; - } else if (o1.getCommitSeq() > o2.getCommitSeq()) { - return 1; - } else { - return 0; + allBinlogs = Sets.newTreeSet(Comparator.comparingLong(TBinlog::getCommitSeq)); + tableDummyBinlogs = Lists.newArrayList(); + tableBinlogMap = Maps.newHashMap(); + timestamps = Lists.newArrayList(); + + TBinlog dummy; + if (binlog.getType() == TBinlogType.DUMMY) { + dummy = binlog; + } else { + dummy = BinlogUtils.newDummyBinlog(dbId, -1); + } + allBinlogs.add(dummy); + } + + public static DBBinlog recoverDbBinlog(TBinlog dbDummy, List<TBinlog> tableDummies, boolean dbBinlogEnable) { + DBBinlog dbBinlog = new DBBinlog(dbDummy); + for (TBinlog tableDummy : tableDummies) { + long tableId = tableDummy.getBelong(); + if (!dbBinlogEnable && !BinlogUtils.tableEnabledBinlog(dbBinlog.getDbId(), tableId)) { + continue; + } + dbBinlog.tableBinlogMap.put(tableId, new TableBinlog(tableDummy, tableId)); + dbBinlog.tableDummyBinlogs.add(tableDummy); + } + + return dbBinlog; + } + + // not thread safety, do this without lock + public void recoverBinlog(TBinlog binlog, boolean dbBinlogEnable) { + List<Long> tableIds = binlog.getTableIds(); + + if (binlog.getTimestamp() > 0 && dbBinlogEnable) { + timestamps.add(Pair.of(binlog.getCommitSeq(), binlog.getTimestamp())); + } + + allBinlogs.add(binlog); + + if (tableIds == null) { + return; + } + + for (long tableId : tableIds) { + TableBinlog tableBinlog = getTableBinlog(binlog, tableId, dbBinlogEnable); + if (tableBinlog == null) { + continue; + } + tableBinlog.recoverBinlog(binlog); + } + } + + private TableBinlog getTableBinlog(TBinlog binlog, long tableId, boolean dbBinlogEnable) { + TableBinlog tableBinlog = tableBinlogMap.get(tableId); + if (tableBinlog == null) { + if (dbBinlogEnable || BinlogUtils.tableEnabledBinlog(dbId, tableId)) { + tableBinlog = new TableBinlog(binlog, tableId); + tableBinlogMap.put(tableId, tableBinlog); + tableDummyBinlogs.add(tableBinlog.getDummyBinlog()); } - }); - tableBinlogMap = new HashMap<Long, TableBinlog>(); - timestamps = new ArrayList<Pair<Long, Long>>(); + } + return tableBinlog; } public void addBinlog(TBinlog binlog, boolean dbBinlogEnable) { @@ -97,16 +152,21 @@ public class DBBinlog { } for (long tableId : tableIds) { - TableBinlog tableBinlog = tableBinlogMap.get(tableId); - if (tableBinlog == null) { - tableBinlog = new TableBinlog(tableId); - tableBinlogMap.put(tableId, tableBinlog); + TableBinlog tableBinlog = getTableBinlog(binlog, tableId, dbBinlogEnable); + if (tableBinlog != null) { + tableBinlog.addBinlog(binlog); } - tableBinlog.addBinlog(binlog); } } finally { lock.writeLock().unlock(); } + + lock.readLock().lock(); + try { + LOG.info("[deadlinefen] after add, db {} binlogs: {}, dummys: {}", dbId, allBinlogs, tableDummyBinlogs); + } finally { + lock.readLock().unlock(); + } } public long getDbId() { @@ -151,7 +211,7 @@ public class DBBinlog { } } - public List<BinlogTombstone> gc() { + public BinlogTombstone gc() { // check db Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId); if (db == null) { @@ -160,33 +220,55 @@ public class DBBinlog { } boolean dbBinlogEnable = db.getBinlogConfig().isEnable(); + BinlogTombstone tombstone; if (dbBinlogEnable) { - // db binlog is enable, only one binlogTombstones + // db binlog is enabled, only one binlogTombstones long ttlSeconds = db.getBinlogConfig().getTtlSeconds(); - long currentSeconds = System.currentTimeMillis() / 1000; - long expireSeconds = currentSeconds - ttlSeconds; - long expireMs = expireSeconds * 1000; + long expiredMs = BinlogUtils.getExpiredMs(ttlSeconds); - BinlogTombstone tombstone = dbBinlogEnableGc(expireMs); - List<BinlogTombstone> tombstones = new ArrayList<BinlogTombstone>(); - if (tombstone != null) { - tombstones.add(tombstone); - } - return tombstones; + tombstone = dbBinlogEnableGc(expiredMs); } else { - return dbBinlogDisableGc(db); + tombstone = dbBinlogDisableGc(db); } + + return tombstone; } - private List<BinlogTombstone> dbBinlogDisableGc(Database db) { - List<BinlogTombstone> tombstones = new ArrayList<BinlogTombstone>(); - List<TableBinlog> tableBinlogs = null; + private BinlogTombstone collectTableTombstone(List<BinlogTombstone> tableTombstones) { + if (tableTombstones.isEmpty()) { + return null; + } - lock.writeLock().lock(); + List<Long> tableIds = Lists.newArrayList(); + long largestExpiredCommitSeq = -1; + BinlogTombstone dbTombstone = new BinlogTombstone(dbId, tableIds, -1); + for (BinlogTombstone tableTombstone : tableTombstones) { + long commitSeq = tableTombstone.getCommitSeq(); + if (largestExpiredCommitSeq < commitSeq) { + largestExpiredCommitSeq = commitSeq; + } + Map<Long, UpsertRecord.TableRecord> tableVersionMap = tableTombstone.getTableVersionMap(); + if (tableVersionMap.size() > 1) { + LOG.warn("tableVersionMap size is greater than 1. tableVersionMap: {}", tableVersionMap); + } + tableIds.addAll(tableTombstone.getTableIds()); + dbTombstone.addTableRecord(tableVersionMap); + } + + dbTombstone.setCommitSeq(largestExpiredCommitSeq); + + return dbTombstone; + } + + private BinlogTombstone dbBinlogDisableGc(Database db) { + List<BinlogTombstone> tombstones = Lists.newArrayList(); + List<TableBinlog> tableBinlogs; + + lock.readLock().lock(); try { - tableBinlogs = new ArrayList<TableBinlog>(tableBinlogMap.values()); + tableBinlogs = Lists.newArrayList(tableBinlogMap.values()); } finally { - lock.writeLock().unlock(); + lock.readLock().unlock(); } for (TableBinlog tableBinlog : tableBinlogs) { @@ -195,70 +277,110 @@ public class DBBinlog { tombstones.add(tombstone); } } - return tombstones; - } + BinlogTombstone tombstone = collectTableTombstone(tombstones); + if (tombstone != null) { + removeExpiredMetaData(tombstone.getCommitSeq()); + + lock.readLock().lock(); + try { + LOG.info("[deadlinefen] after gc, db {} binlogs: {}, tombstone.seq: {}, dummys: {}", + dbId, allBinlogs, tombstone.getCommitSeq(), tableDummyBinlogs); + } finally { + lock.readLock().unlock(); + } + } - private BinlogTombstone dbBinlogEnableGc(long expireMs) { - // find commitSeq from timestamps, if commitSeq's timestamp is less than expireSeconds, then remove it - long largestExpiredCommitSeq = -1; - TBinlog tombstoneBinlog = null; - List<Long> tableIds = null; - List<TableBinlog> tableBinlogs = null; + return tombstone; + } + private void removeExpiredMetaData(long largestExpiredCommitSeq) { lock.writeLock().lock(); try { - Iterator<Pair<Long, Long>> iterator = timestamps.iterator(); - while (iterator.hasNext()) { - Pair<Long, Long> pair = iterator.next(); - if (pair.second < expireMs) { - largestExpiredCommitSeq = pair.first; - iterator.remove(); + Iterator<TBinlog> binlogIter = allBinlogs.iterator(); + TBinlog dummy = binlogIter.next(); + boolean foundFirstUsingBinlog = false; + long lastCommitSeq = -1; + + while (binlogIter.hasNext()) { + TBinlog binlog = binlogIter.next(); + long commitSeq = binlog.getCommitSeq(); + if (commitSeq <= largestExpiredCommitSeq) { + if (binlog.table_ref <= 0) { + binlogIter.remove(); + if (!foundFirstUsingBinlog) { + lastCommitSeq = commitSeq; + } + } else { + foundFirstUsingBinlog = true; + } } else { break; } } - Iterator<TBinlog> binlogIterator = allBinlogs.iterator(); - while (binlogIterator.hasNext()) { - TBinlog binlog = binlogIterator.next(); - if (binlog.getCommitSeq() <= largestExpiredCommitSeq) { - tombstoneBinlog = binlog; - binlogIterator.remove(); + if (lastCommitSeq != -1) { + dummy.setCommitSeq(lastCommitSeq); + } + } finally { + lock.writeLock().unlock(); + } + } + + private BinlogTombstone dbBinlogEnableGc(long expiredMs) { + // step 1: get current tableBinlog info and expiredCommitSeq + long expiredCommitSeq = -1; + lock.readLock().lock(); + try { + Iterator<Pair<Long, Long>> timeIter = timestamps.iterator(); + while (timeIter.hasNext()) { + Pair<Long, Long> pair = timeIter.next(); + if (pair.second <= expiredMs) { + expiredCommitSeq = pair.first; + timeIter.remove(); } else { break; } } - tableIds = new ArrayList<Long>(tableBinlogMap.keySet()); - tableBinlogs = new ArrayList<TableBinlog>(tableBinlogMap.values()); + Iterator<TBinlog> binlogIter = allBinlogs.iterator(); + TBinlog dummy = binlogIter.next(); + dummy.setCommitSeq(expiredCommitSeq); + + while (binlogIter.hasNext()) { + TBinlog binlog = binlogIter.next(); + if (binlog.getCommitSeq() <= expiredCommitSeq) { + binlogIter.remove(); + } else { + break; + } + } } finally { - lock.writeLock().unlock(); + lock.readLock().unlock(); } - LOG.info("gc binlog. dbId: {}, expireMs: {}, largestExpiredCommitSeq: {}", - dbId, expireMs, largestExpiredCommitSeq); - if (tombstoneBinlog == null) { + + if (expiredCommitSeq == -1) { return null; } - BinlogTombstone tombstone = new BinlogTombstone(dbId, tableIds, tombstoneBinlog.getCommitSeq()); - for (TableBinlog tableBinlog : tableBinlogs) { - BinlogTombstone binlogTombstone = tableBinlog.gc(largestExpiredCommitSeq); - if (binlogTombstone == null) { - continue; + // step 2: gc every tableBinlog in dbBinlog, get table tombstone to complete db tombstone + List<BinlogTombstone> tableTombstones = Lists.newArrayList(); + for (TableBinlog tableBinlog : tableBinlogMap.values()) { + // step 2.1: gc tableBinlogļ¼and get table tombstone + BinlogTombstone tableTombstone = tableBinlog.gc(expiredCommitSeq); + if (tableTombstone != null) { + tableTombstones.add(tableTombstone); } + } - Map<Long, UpsertRecord.TableRecord> tableVersionMap = binlogTombstone.getTableVersionMap(); - if (tableVersionMap.size() > 1) { - LOG.warn("tableVersionMap size is greater than 1. tableVersionMap: {}", tableVersionMap); - } - for (Map.Entry<Long, UpsertRecord.TableRecord> entry : tableVersionMap.entrySet()) { - long tableId = entry.getKey(); - UpsertRecord.TableRecord record = entry.getValue(); - tombstone.addTableRecord(tableId, record); - } + lock.readLock().lock(); + try { + LOG.info("[deadlinefen] after gc, db {} binlogs: {}, tombstone.seq: {}, dummys: {}", + dbId, allBinlogs, expiredCommitSeq, tableDummyBinlogs); + } finally { + lock.readLock().unlock(); } - return tombstone; + return collectTableTombstone(tableTombstones); } public void replayGc(BinlogTombstone tombstone) { @@ -266,6 +388,7 @@ public class DBBinlog { dbBinlogEnableReplayGc(tombstone); } else { dbBinlogDisableReplayGc(tombstone); + removeExpiredMetaData(tombstone.getCommitSeq()); } } @@ -274,11 +397,11 @@ public class DBBinlog { lock.writeLock().lock(); try { - Iterator<Pair<Long, Long>> iterator = timestamps.iterator(); - while (iterator.hasNext()) { - Pair<Long, Long> pair = iterator.next(); - if (pair.first <= largestExpiredCommitSeq) { - iterator.remove(); + Iterator<Pair<Long, Long>> timeIter = timestamps.iterator(); + while (timeIter.hasNext()) { + long commitSeq = timeIter.next().first; + if (commitSeq <= largestExpiredCommitSeq) { + timeIter.remove(); } else { break; } @@ -301,11 +424,11 @@ public class DBBinlog { } public void dbBinlogDisableReplayGc(BinlogTombstone tombstone) { - List<TableBinlog> tableBinlogs = null; + List<TableBinlog> tableBinlogs; lock.writeLock().lock(); try { - tableBinlogs = new ArrayList<TableBinlog>(tableBinlogMap.values()); + tableBinlogs = Lists.newArrayList(tableBinlogMap.values()); } finally { lock.writeLock().unlock(); } @@ -314,7 +437,7 @@ public class DBBinlog { return; } - Set<Long> tableIds = new HashSet<Long>(tombstone.getTableIds()); + Set<Long> tableIds = Sets.newHashSet(tombstone.getTableIds()); long largestExpiredCommitSeq = tombstone.getCommitSeq(); for (TableBinlog tableBinlog : tableBinlogs) { if (tableIds.contains(tableBinlog.getTableId())) { @@ -325,6 +448,7 @@ public class DBBinlog { // not thread safety, do this without lock public void getAllBinlogs(List<TBinlog> binlogs) { + binlogs.addAll(tableDummyBinlogs); binlogs.addAll(allBinlogs); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java index 2b0d45b694..9b82272f63 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java @@ -25,9 +25,11 @@ import org.apache.doris.thrift.TBinlog; import org.apache.doris.thrift.TBinlogType; import org.apache.doris.thrift.TStatus; +import com.google.common.collect.Sets; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.Comparator; import java.util.Iterator; import java.util.TreeSet; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -39,29 +41,43 @@ public class TableBinlog { private ReentrantReadWriteLock lock; private TreeSet<TBinlog> binlogs; - public TableBinlog(long tableId) { + public TableBinlog(TBinlog binlog, long tableId) { this.tableId = tableId; lock = new ReentrantReadWriteLock(); - // binlogs treeset order by commitSeq - binlogs = new TreeSet<TBinlog>((o1, o2) -> { - if (o1.getCommitSeq() < o2.getCommitSeq()) { - return -1; - } else if (o1.getCommitSeq() > o2.getCommitSeq()) { - return 1; - } else { - return 0; - } - }); + binlogs = Sets.newTreeSet(Comparator.comparingLong(TBinlog::getCommitSeq)); + + TBinlog dummy; + if (binlog.getType() == TBinlogType.DUMMY) { + dummy = binlog; + } else { + dummy = BinlogUtils.newDummyBinlog(binlog.getDbId(), tableId); + } + binlogs.add(dummy); + } + + public TBinlog getDummyBinlog() { + return binlogs.first(); } public long getTableId() { return tableId; } + // not thread safety, do this without lock + public void recoverBinlog(TBinlog binlog) { + TBinlog dummy = getDummyBinlog(); + if (binlog.getCommitSeq() > dummy.getCommitSeq()) { + binlogs.add(binlog); + ++binlog.table_ref; + } + } + public void addBinlog(TBinlog binlog) { lock.writeLock().lock(); try { binlogs.add(binlog); + ++binlog.table_ref; + LOG.info("[deadlinefen] after add, table {} binlogs: {}", tableId, binlogs); } finally { lock.writeLock().unlock(); } @@ -85,41 +101,80 @@ public class TableBinlog { } } - // this method call when db binlog enable - public BinlogTombstone gc(long largestExpiredCommitSeq) { + private Pair<TBinlog, Long> getLastUpsertAndLargestCommitSeq(long expired, BinlogComparator check) { + if (binlogs.size() <= 1) { + return null; + } + + Iterator<TBinlog> iter = binlogs.iterator(); + TBinlog dummyBinlog = iter.next(); TBinlog tombstoneUpsert = null; + TBinlog lastExpiredBinlog = null; + while (iter.hasNext()) { + TBinlog binlog = iter.next(); + if (check.isExpired(binlog, expired)) { + lastExpiredBinlog = binlog; + --binlog.table_ref; + if (binlog.getType() == TBinlogType.UPSERT) { + tombstoneUpsert = binlog; + } + iter.remove(); + } else { + break; + } + } + + if (lastExpiredBinlog == null) { + return null; + } + + dummyBinlog.setCommitSeq(lastExpiredBinlog.getCommitSeq()); + + return Pair.of(tombstoneUpsert, lastExpiredBinlog.getCommitSeq()); + } + + // this method call when db binlog enable + public BinlogTombstone gc(long expiredCommitSeq) { + Pair<TBinlog, Long> tombstoneInfo; + // step 1: get tombstoneUpsertBinlog and dummyBinlog lock.writeLock().lock(); try { - Iterator<TBinlog> iter = binlogs.iterator(); - while (iter.hasNext()) { - TBinlog binlog = iter.next(); - if (binlog.getCommitSeq() <= largestExpiredCommitSeq) { - if (binlog.getType() == TBinlogType.UPSERT) { - tombstoneUpsert = binlog; - } - iter.remove(); - } else { - break; - } - } + BinlogComparator check = (binlog, expire) -> binlog.getCommitSeq() <= expire; + tombstoneInfo = getLastUpsertAndLargestCommitSeq(expiredCommitSeq, check); } finally { lock.writeLock().unlock(); } - if (tombstoneUpsert == null) { + // step 2: set tombstone by tombstoneInfo + // if there have expired Binlogs, tombstoneInfo != null + if (tombstoneInfo == null) { return null; } - BinlogTombstone tombstone = new BinlogTombstone(-1, largestExpiredCommitSeq); - UpsertRecord upsertRecord = UpsertRecord.fromJson(tombstoneUpsert.getData()); - tombstone.addTableRecord(tableId, upsertRecord); + TBinlog lastUpsertBinlog = tombstoneInfo.first; + long largestCommitSeq = tombstoneInfo.second; + BinlogTombstone tombstone = new BinlogTombstone(-1, largestCommitSeq); + if (lastUpsertBinlog != null) { + UpsertRecord upsertRecord = UpsertRecord.fromJson(lastUpsertBinlog.getData()); + tombstone.addTableRecord(tableId, upsertRecord); + } + + lock.readLock().lock(); + try { + LOG.info("[deadlinefen] after gc, table {} binlogs: {}, tombstone.seq: {}", + tableId, binlogs, tombstone.getCommitSeq()); + } finally { + lock.readLock().unlock(); + } + return tombstone; } // this method call when db binlog disable public BinlogTombstone gc(Database db) { - OlapTable table = null; + // step 1: get expire time + OlapTable table; try { Table tbl = db.getTableOrMetaException(tableId); if (tbl == null) { @@ -138,39 +193,45 @@ public class TableBinlog { long dbId = db.getId(); long ttlSeconds = table.getBinlogConfig().getTtlSeconds(); - long currentSeconds = System.currentTimeMillis() / 1000; - long expireSeconds = currentSeconds - ttlSeconds; - long expireMs = expireSeconds * 1000; + long expiredMs = BinlogUtils.getExpiredMs(ttlSeconds); - TBinlog tombstoneUpsert = null; - long largestExpiredCommitSeq = 0; + if (expiredMs < 0) { + return null; + } + + // step 2: get tombstoneUpsertBinlog and dummyBinlog + Pair<TBinlog, Long> tombstoneInfo; lock.writeLock().lock(); try { - Iterator<TBinlog> iter = binlogs.iterator(); - while (iter.hasNext()) { - TBinlog binlog = iter.next(); - long timestamp = binlog.getTimestamp(); - - if (timestamp > expireMs) { - break; - } - - if (binlog.getType() == TBinlogType.UPSERT) { - tombstoneUpsert = binlog; - } - largestExpiredCommitSeq = binlog.getCommitSeq(); - iter.remove(); - } + BinlogComparator check = (binlog, expire) -> binlog.getTimestamp() <= expire; + tombstoneInfo = getLastUpsertAndLargestCommitSeq(expiredMs, check); } finally { lock.writeLock().unlock(); } - BinlogTombstone tombstone = new BinlogTombstone(dbId, tableId, largestExpiredCommitSeq); - if (tombstoneUpsert != null) { - UpsertRecord upsertRecord = UpsertRecord.fromJson(tombstoneUpsert.getData()); + // step 3: set tombstone by tombstoneInfo + // if have expired Binlogs, tombstoneInfo != null + if (tombstoneInfo == null) { + return null; + } + + TBinlog lastUpsertBinlog = tombstoneInfo.first; + long largestCommitSeq = tombstoneInfo.second; + BinlogTombstone tombstone = new BinlogTombstone(dbId, tableId, largestCommitSeq); + if (lastUpsertBinlog != null) { + UpsertRecord upsertRecord = UpsertRecord.fromJson(lastUpsertBinlog.getData()); tombstone.addTableRecord(tableId, upsertRecord); } + lock.readLock().lock(); + try { + LOG.info("[deadlinefen] after gc, table {} binlogs: {}, tombstone.seq: {}", + tableId, binlogs, tombstone.getCommitSeq()); + } finally { + lock.readLock().unlock(); + } + + return tombstone; } diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 4630f8bdd0..619337321f 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -961,8 +961,9 @@ enum TBinlogType { CREATE_TABLE = 2, DROP_PARTITION = 3, DROP_TABLE = 4, - ALTER_JOB = 5, + ALTER_JOB = 5, MODIFY_TABLE_ADD_OR_DROP_COLUMNS = 6, + DUMMY = 7, } struct TBinlog { @@ -972,6 +973,8 @@ struct TBinlog { 4: optional i64 db_id 5: optional list<i64> table_ids 6: optional string data + 7: optional i64 belong // belong == -1 if type is not DUMMY + 8: optional i64 table_ref // only use for gc } struct TGetBinlogResult { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
