This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new ae0779641cc branch-2.1: [feat](binlog) Add lock binlog method #46887
(#48046)
ae0779641cc is described below
commit ae0779641ccab7e125fa0d1dc691924a04ba8ae3
Author: walter <[email protected]>
AuthorDate: Wed Feb 19 19:24:37 2025 +0800
branch-2.1: [feat](binlog) Add lock binlog method #46887 (#48046)
cherry pick from #46887
---
.../org/apache/doris/binlog/BinlogManager.java | 25 ++++-
.../java/org/apache/doris/binlog/DBBinlog.java | 54 +++++++++++
.../java/org/apache/doris/binlog/TableBinlog.java | 47 ++++++++++
.../apache/doris/service/FrontendServiceImpl.java | 104 ++++++++++++++++++++-
gensrc/thrift/FrontendService.thrift | 24 +++++
5 files changed, 247 insertions(+), 7 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
index b21b7e751d5..2403ede6fb3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
@@ -130,7 +130,7 @@ public class BinlogManager {
}
private void addBinlog(long dbId, List<Long> tableIds, long commitSeq,
long timestamp, TBinlogType type,
- String data, boolean removeEnableCache, Object raw)
{
+ String data, boolean removeEnableCache, Object raw) {
if (!Config.enable_feature_binlog) {
return;
}
@@ -431,7 +431,6 @@ public class BinlogManager {
addBarrierLog(log, commitSeq);
}
-
private boolean supportedRecoverInfo(RecoverInfo info) {
//table name and partitionName added together.
// recover table case, tablename must exist in newer version
@@ -501,6 +500,26 @@ public class BinlogManager {
}
}
+ public Pair<TStatus, Long> lockBinlog(long dbId, long tableId,
+ String jobUniqueId, long lockCommitSeq) {
+ LOG.debug("lock binlog. dbId: {}, tableId: {}, jobUniqueId: {},
lockCommitSeq: {}",
+ dbId, tableId, jobUniqueId, lockCommitSeq);
+
+ DBBinlog dbBinlog = null;
+ lock.readLock().lock();
+ try {
+ dbBinlog = dbBinlogMap.get(dbId);
+ } finally {
+ lock.readLock().unlock();
+ }
+
+ if (dbBinlog == null) {
+ LOG.warn("db binlog not found. dbId: {}", dbId);
+ return Pair.of(new TStatus(TStatusCode.BINLOG_NOT_FOUND_DB), -1L);
+ }
+ return dbBinlog.lockBinlog(tableId, jobUniqueId, lockCommitSeq);
+ }
+
// get the dropped partitions of the db.
public List<Long> getDroppedPartitions(long dbId) {
lock.readLock().lock();
@@ -617,7 +636,6 @@ public class BinlogManager {
}
}
-
private static void writeTBinlogToStream(DataOutputStream dos, TBinlog
binlog) throws TException, IOException {
TMemoryBuffer buffer = new TMemoryBuffer(BUFFER_SIZE);
TBinaryProtocol protocol = new TBinaryProtocol(buffer);
@@ -627,7 +645,6 @@ public class BinlogManager {
dos.write(data);
}
-
// not thread safety, do this without lock
public long write(DataOutputStream dos, long checksum) throws IOException {
if (!Config.enable_feature_binlog) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
index 77a086e9872..675c9dc78a8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
@@ -75,6 +75,10 @@ public class DBBinlog {
private BinlogConfigCache binlogConfigCache;
+ // The binlogs that are locked by the syncer.
+ // syncer id => commit seq
+ private Map<String, Long> lockedBinlogs;
+
public DBBinlog(BinlogConfigCache binlogConfigCache, TBinlog binlog) {
lock = new ReentrantReadWriteLock();
this.dbId = binlog.getDbId();
@@ -89,6 +93,7 @@ public class DBBinlog {
droppedPartitions = Lists.newArrayList();
droppedTables = Lists.newArrayList();
droppedIndexes = Lists.newArrayList();
+ lockedBinlogs = Maps.newHashMap();
TBinlog dummy;
if (binlog.getType() == TBinlogType.DUMMY) {
@@ -281,6 +286,55 @@ public class DBBinlog {
}
}
+ public Pair<TStatus, Long> lockBinlog(long tableId, String jobUniqueId,
long lockCommitSeq) {
+ TableBinlog tableBinlog = null;
+
+ lock.writeLock().lock();
+ try {
+ if (tableId < 0) {
+ return lockDbBinlog(jobUniqueId, lockCommitSeq);
+ }
+
+ tableBinlog = tableBinlogMap.get(tableId);
+ } finally {
+ lock.writeLock().unlock();
+ }
+
+ if (tableBinlog == null) {
+ LOG.warn("table binlog not found. dbId: {}, tableId: {}", dbId,
tableId);
+ return Pair.of(new TStatus(TStatusCode.BINLOG_NOT_FOUND_TABLE),
-1L);
+ }
+ return tableBinlog.lockBinlog(jobUniqueId, lockCommitSeq);
+ }
+
+ // Require: the write lock is held by the caller.
+ private Pair<TStatus, Long> lockDbBinlog(String jobUniqueId, long
lockCommitSeq) {
+ TBinlog firstBinlog = allBinlogs.first();
+ TBinlog lastBinlog = allBinlogs.last();
+
+ if (lockCommitSeq < 0) {
+ // lock the latest binlog
+ lockCommitSeq = lastBinlog.getCommitSeq();
+ } else if (lockCommitSeq < firstBinlog.getCommitSeq()) {
+ // lock the first binlog
+ lockCommitSeq = firstBinlog.getCommitSeq();
+ } else if (lastBinlog.getCommitSeq() < lockCommitSeq) {
+ LOG.warn("try lock future binlogs, dbId: {}, lockCommitSeq: {},
lastCommitSeq: {}, jobId: {}",
+ dbId, lockCommitSeq, lastBinlog.getCommitSeq(),
jobUniqueId);
+ return Pair.of(new TStatus(TStatusCode.BINLOG_TOO_NEW_COMMIT_SEQ),
-1L);
+ }
+
+ // keep idempotent
+ Long commitSeq = lockedBinlogs.get(jobUniqueId);
+ if (commitSeq != null && lockCommitSeq <= commitSeq) {
+ LOG.debug("binlog is locked, commitSeq: {}, jobId: {}, dbId: {}",
commitSeq, jobUniqueId, dbId);
+ return Pair.of(new TStatus(TStatusCode.OK), commitSeq);
+ }
+
+ lockedBinlogs.put(jobUniqueId, lockCommitSeq);
+ return Pair.of(new TStatus(TStatusCode.OK), lockCommitSeq);
+ }
+
public BinlogTombstone gc() {
// check db
BinlogConfig dbBinlogConfig =
binlogConfigCache.getDBBinlogConfig(dbId);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
index 4a98768c304..b6cf328eccc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
@@ -26,8 +26,10 @@ import org.apache.doris.common.proc.BaseProcResult;
import org.apache.doris.thrift.TBinlog;
import org.apache.doris.thrift.TBinlogType;
import org.apache.doris.thrift.TStatus;
+import org.apache.doris.thrift.TStatusCode;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -36,6 +38,7 @@ import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -54,6 +57,10 @@ public class TableBinlog {
private BinlogConfigCache binlogConfigCache;
+ // The binlogs that are locked by the syncer.
+ // syncer id => commit seq
+ private Map<String, Long> lockedBinlogs;
+
public TableBinlog(BinlogConfigCache binlogConfigCache, TBinlog binlog,
long dbId, long tableId) {
this.dbId = dbId;
this.tableId = tableId;
@@ -61,6 +68,7 @@ public class TableBinlog {
lock = new ReentrantReadWriteLock();
binlogs =
Sets.newTreeSet(Comparator.comparingLong(TBinlog::getCommitSeq));
timestamps = Lists.newArrayList();
+ lockedBinlogs = Maps.newHashMap();
TBinlog dummy;
if (binlog.getType() == TBinlogType.DUMMY) {
@@ -124,6 +132,45 @@ public class TableBinlog {
}
}
+ public Pair<TStatus, Long> lockBinlog(String jobUniqueId, long
lockCommitSeq) {
+ lock.writeLock().lock();
+ try {
+ return lockTableBinlog(jobUniqueId, lockCommitSeq);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ // Require: the lock is held by the caller.
+ private Pair<TStatus, Long> lockTableBinlog(String jobUniqueId, long
lockCommitSeq) {
+ TBinlog firstBinlog = binlogs.first();
+ TBinlog lastBinlog = binlogs.last();
+
+ if (lockCommitSeq < 0) {
+ // lock the latest binlog
+ lockCommitSeq = lastBinlog.getCommitSeq();
+ } else if (lockCommitSeq < firstBinlog.getCommitSeq()) {
+ // lock the first binlog
+ lockCommitSeq = firstBinlog.getCommitSeq();
+ } else if (lastBinlog.getCommitSeq() < lockCommitSeq) {
+ LOG.warn(
+ "try lock future binlogs, dbId: {}, tableId: {},
lockCommitSeq: {}, lastCommitSeq: {}, jobId: {}",
+ dbId, tableId, lockCommitSeq, lastBinlog.getCommitSeq(),
jobUniqueId);
+ return Pair.of(new TStatus(TStatusCode.BINLOG_TOO_NEW_COMMIT_SEQ),
-1L);
+ }
+
+ // keep idempotent
+ Long commitSeq = lockedBinlogs.get(jobUniqueId);
+ if (commitSeq != null && lockCommitSeq <= commitSeq) {
+ LOG.debug("binlog is locked, commitSeq: {}, jobId: {}, dbId: {},
tableId: {}",
+ commitSeq, jobUniqueId, dbId, tableId);
+ return Pair.of(new TStatus(TStatusCode.OK), commitSeq);
+ }
+
+ lockedBinlogs.put(jobUniqueId, lockCommitSeq);
+ return Pair.of(new TStatus(TStatusCode.OK), lockCommitSeq);
+ }
+
private Pair<TBinlog, Long>
getLastUpsertAndLargestCommitSeq(BinlogComparator checker) {
if (binlogs.size() <= 1) {
return null;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 45a9c75c7a5..31d7771f717 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -191,6 +191,8 @@ import org.apache.doris.thrift.TLoadTxnCommitRequest;
import org.apache.doris.thrift.TLoadTxnCommitResult;
import org.apache.doris.thrift.TLoadTxnRollbackRequest;
import org.apache.doris.thrift.TLoadTxnRollbackResult;
+import org.apache.doris.thrift.TLockBinlogRequest;
+import org.apache.doris.thrift.TLockBinlogResult;
import org.apache.doris.thrift.TMasterOpRequest;
import org.apache.doris.thrift.TMasterOpResult;
import org.apache.doris.thrift.TMasterResult;
@@ -3364,7 +3366,7 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
public TGetBinlogLagResult getBinlogLag(TGetBinlogRequest request) throws
TException {
String clientAddr = getClientAddrAsString();
if (LOG.isDebugEnabled()) {
- LOG.debug("receive get binlog request: {}", request);
+ LOG.debug("receive get binlog lag request: {}", request);
}
TGetBinlogLagResult result = new TGetBinlogLagResult();
@@ -3375,14 +3377,14 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
status.setStatusCode(TStatusCode.NOT_MASTER);
status.addToErrorMsgs(NOT_MASTER_ERR_MSG);
result.setMasterAddress(getMasterAddress());
- LOG.error("failed to get beginTxn: {}", NOT_MASTER_ERR_MSG);
+ LOG.error("failed to get binlog lag: {}", NOT_MASTER_ERR_MSG);
return result;
}
try {
result = getBinlogLagImpl(request, clientAddr);
} catch (UserException e) {
- LOG.warn("failed to get binlog: {}", e.getMessage());
+ LOG.warn("failed to get binlog lag: {}", e.getMessage());
status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs(e.getMessage());
} catch (Throwable e) {
@@ -3465,6 +3467,102 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
return result;
}
+ public TLockBinlogResult lockBinlog(TLockBinlogRequest request) throws
TException {
+ String clientAddr = getClientAddrAsString();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("receive lock binlog request: {}", request);
+ }
+
+ TLockBinlogResult result = new TLockBinlogResult();
+ TStatus status = new TStatus(TStatusCode.OK);
+ result.setStatus(status);
+
+ if (!Env.getCurrentEnv().isMaster()) {
+ status.setStatusCode(TStatusCode.NOT_MASTER);
+ status.addToErrorMsgs(NOT_MASTER_ERR_MSG);
+ result.setMasterAddress(getMasterAddress());
+ LOG.error("failed to lock binlog: {}", NOT_MASTER_ERR_MSG);
+ return result;
+ }
+
+ try {
+ result = lockBinlogImpl(request, clientAddr);
+ } catch (UserException e) {
+ LOG.warn("failed to lock binlog: {}", e.getMessage());
+ status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
+ status.addToErrorMsgs(e.getMessage());
+ } catch (Throwable e) {
+ LOG.warn("catch unknown result.", e);
+ status.setStatusCode(TStatusCode.INTERNAL_ERROR);
+ status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
+ return result;
+ }
+
+ return result;
+ }
+
+ private TLockBinlogResult lockBinlogImpl(TLockBinlogRequest request,
String clientIp) throws UserException {
+ /// Check all required arg: user, passwd, db, prev_commit_seq
+ if (!request.isSetUser()) {
+ throw new UserException("user is not set");
+ }
+ if (!request.isSetPasswd()) {
+ throw new UserException("passwd is not set");
+ }
+ if (!request.isSetDb()) {
+ throw new UserException("db is not set");
+ }
+ if (!request.isSetJobUniqueId()) {
+ throw new UserException("job_unique_id is not set");
+ }
+
+ // step 1: check auth
+ if (Strings.isNullOrEmpty(request.getToken())) {
+ checkSingleTablePasswordAndPrivs(request.getUser(),
request.getPasswd(), request.getDb(),
+ request.getTable(), clientIp, PrivPredicate.SELECT);
+ }
+
+ // step 3: check database
+ Env env = Env.getCurrentEnv();
+ String fullDbName = request.getDb();
+ Database db = env.getInternalCatalog().getDbNullable(fullDbName);
+ if (db == null) {
+ String dbName = fullDbName;
+ if (Strings.isNullOrEmpty(request.getCluster())) {
+ dbName = request.getDb();
+ }
+ throw new UserException("unknown database, database=" + dbName);
+ }
+
+ // step 4: fetch all tableIds
+ // lookup tables && convert into tableIdList
+ long tableId = -1;
+ if (request.isSetTableId()) {
+ tableId = request.getTableId();
+ } else if (request.isSetTable()) {
+ String tableName = request.getTable();
+ Table table = db.getTableOrMetaException(tableName,
TableType.OLAP);
+ if (table == null) {
+ throw new UserException("unknown table, table=" + tableName);
+ }
+ tableId = table.getId();
+ }
+
+ // step 6: lock binlog
+ long dbId = db.getId();
+ String jobUniqueId = request.getJobUniqueId();
+ long lockCommitSeq = -1L;
+ if (request.isSetLockCommitSeq()) {
+ lockCommitSeq = request.getLockCommitSeq();
+ }
+ Pair<TStatus, Long> statusSeqPair = env.getBinlogManager().lockBinlog(
+ dbId, tableId, jobUniqueId, lockCommitSeq);
+ TLockBinlogResult result = new TLockBinlogResult();
+ result.setStatus(statusSeqPair.first);
+ result.setLockedCommitSeq(statusSeqPair.second);
+ return result;
+ }
+
@Override
public TStatus updateStatsCache(TUpdateFollowerStatsCacheRequest request)
throws TException {
StatisticsCacheKey k = GsonUtils.GSON.fromJson(request.key,
StatisticsCacheKey.class);
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index ad993dcbf5d..bf4561ba835 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1115,6 +1115,29 @@ struct TQueryStatsResult {
5: optional map<i64, i64> tablet_stats
}
+// Lock the binlogs, to avoid being GC during sync.
+//
+// The caller should lock the binlog before backup, and bumps lock commit seq
intervally.
+//
+// The locked binlogs will be kept until the binlog properties ttl_seconds,
max_bytes ... are reached.
+struct TLockBinlogRequest {
+ 1: optional string cluster
+ 2: optional string user
+ 3: optional string passwd
+ 4: optional string db
+ 5: optional string table
+ 6: optional i64 table_id
+ 7: optional string token
+ 8: optional string job_unique_id
+ 9: optional i64 lock_commit_seq // if not set, lock the latest binlog
+}
+
+struct TLockBinlogResult {
+ 1: optional Status.TStatus status
+ 2: optional i64 locked_commit_seq
+ 3: optional Types.TNetworkAddress master_address
+}
+
struct TGetBinlogRequest {
1: optional string cluster
2: optional string user
@@ -1680,6 +1703,7 @@ service FrontendService {
TGetBinlogResult getBinlog(1: TGetBinlogRequest request)
TGetSnapshotResult getSnapshot(1: TGetSnapshotRequest request)
TRestoreSnapshotResult restoreSnapshot(1: TRestoreSnapshotRequest request)
+ TLockBinlogResult lockBinlog(1: TLockBinlogRequest request)
TWaitingTxnStatusResult waitingTxnStatus(1: TWaitingTxnStatusRequest
request)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]