This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 51b0bbb667 [Feature] (binlog) Add getBinlogLag (#21637)
51b0bbb667 is described below
commit 51b0bbb667617c73c0fbd14a3de51631bc06c6af
Author: Jack Drogon <[email protected]>
AuthorDate: Sat Jul 8 07:41:45 2023 +0800
[Feature] (binlog) Add getBinlogLag (#21637)
Signed-off-by: Jack Drogon <[email protected]>
---
.../org/apache/doris/binlog/BinlogManager.java | 22 ++++-
.../java/org/apache/doris/binlog/BinlogUtils.java | 29 ++++++-
.../java/org/apache/doris/binlog/DBBinlog.java | 25 +++++-
.../java/org/apache/doris/binlog/TableBinlog.java | 13 ++-
.../apache/doris/service/FrontendServiceImpl.java | 96 ++++++++++++++++++++++
gensrc/thrift/FrontendService.thrift | 9 ++
6 files changed, 183 insertions(+), 11 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
index bccc5dfc48..822f045da8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
@@ -141,7 +141,7 @@ public class BinlogManager {
}
// get binlog by dbId, return first binlog.version > version
- public Pair<TStatus, TBinlog> getBinlog(long dbId, long tableId, long
commitSeq) {
+ public Pair<TStatus, TBinlog> getBinlog(long dbId, long tableId, long
prevCommitSeq) {
TStatus status = new TStatus(TStatusCode.OK);
lock.readLock().lock();
try {
@@ -152,7 +152,25 @@ public class BinlogManager {
return Pair.of(status, null);
}
- return dbBinlog.getBinlog(tableId, commitSeq);
+ return dbBinlog.getBinlog(tableId, prevCommitSeq);
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ // get binlog by dbId, return first binlog.version > version
+ public Pair<TStatus, Long> getBinlogLag(long dbId, long tableId, long
prevCommitSeq) {
+ TStatus status = new TStatus(TStatusCode.OK);
+ lock.readLock().lock();
+ try {
+ DBBinlog dbBinlog = dbBinlogMap.get(dbId);
+ if (dbBinlog == null) {
+ status.setStatusCode(TStatusCode.BINLOG_NOT_FOUND_DB);
+ LOG.warn("dbBinlog not found. dbId: {}", dbId);
+ return Pair.of(status, null);
+ }
+
+ return dbBinlog.getBinlogLag(tableId, prevCommitSeq);
} finally {
lock.readLock().unlock();
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java
index 226fac2666..9742bed23d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java
@@ -25,22 +25,22 @@ import org.apache.doris.thrift.TStatusCode;
import java.util.TreeSet;
public class BinlogUtils {
- public static Pair<TStatus, TBinlog> getBinlog(TreeSet<TBinlog> binlogs,
long commitSeq) {
+ public static Pair<TStatus, TBinlog> getBinlog(TreeSet<TBinlog> binlogs,
long prevCommitSeq) {
TStatus status = new TStatus(TStatusCode.OK);
TBinlog firstBinlog = binlogs.first();
// all commitSeq > commitSeq
- if (firstBinlog.getCommitSeq() > commitSeq) {
+ if (firstBinlog.getCommitSeq() > prevCommitSeq) {
status.setStatusCode(TStatusCode.BINLOG_TOO_OLD_COMMIT_SEQ);
return Pair.of(status, firstBinlog);
}
// find first binlog whose commitSeq > commitSeq
TBinlog guard = new TBinlog();
- guard.setCommitSeq(commitSeq);
+ guard.setCommitSeq(prevCommitSeq);
TBinlog binlog = binlogs.higher(guard);
- // all commitSeq <= commitSeq
+ // all commitSeq <= prevCommitSeq
if (binlog == null) {
status.setStatusCode(TStatusCode.BINLOG_TOO_NEW_COMMIT_SEQ);
return Pair.of(status, null);
@@ -48,4 +48,25 @@ public class BinlogUtils {
return Pair.of(status, binlog);
}
}
+
+ public static Pair<TStatus, Long> getBinlogLag(TreeSet<TBinlog> binlogs,
long prevCommitSeq) {
+ TStatus status = new TStatus(TStatusCode.OK);
+ TBinlog firstBinlog = binlogs.first();
+
+ if (firstBinlog.getCommitSeq() > prevCommitSeq) {
+ return Pair.of(status, Long.valueOf(binlogs.size()));
+ }
+
+ // find first binlog whose commitSeq > commitSeq
+ TBinlog guard = new TBinlog();
+ guard.setCommitSeq(prevCommitSeq);
+ TBinlog binlog = binlogs.higher(guard);
+
+ // all prevCommitSeq <= commitSeq
+ if (binlog == null) {
+ return Pair.of(status, 0L);
+ } else {
+ return Pair.of(status,
Long.valueOf(binlogs.tailSet(binlog).size()));
+ }
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
index d6408b3076..48c20becaf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
@@ -100,7 +100,7 @@ public class DBBinlog {
return dbId;
}
- public Pair<TStatus, TBinlog> getBinlog(long tableId, long commitSeq) {
+ public Pair<TStatus, TBinlog> getBinlog(long tableId, long prevCommitSeq) {
TStatus status = new TStatus(TStatusCode.OK);
lock.readLock().lock();
try {
@@ -110,10 +110,29 @@ public class DBBinlog {
status.setStatusCode(TStatusCode.BINLOG_NOT_FOUND_TABLE);
return Pair.of(status, null);
}
- return tableBinlog.getBinlog(commitSeq);
+ return tableBinlog.getBinlog(prevCommitSeq);
}
- return BinlogUtils.getBinlog(allBinlogs, commitSeq);
+ return BinlogUtils.getBinlog(allBinlogs, prevCommitSeq);
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ public Pair<TStatus, Long> getBinlogLag(long tableId, long prevCommitSeq) {
+ TStatus status = new TStatus(TStatusCode.OK);
+ lock.readLock().lock();
+ try {
+ if (tableId >= 0) {
+ TableBinlog tableBinlog = tableBinlogMap.get(tableId);
+ if (tableBinlog == null) {
+ status.setStatusCode(TStatusCode.BINLOG_NOT_FOUND_TABLE);
+ return Pair.of(status, null);
+ }
+ return tableBinlog.getBinlogLag(prevCommitSeq);
+ }
+
+ return BinlogUtils.getBinlogLag(allBinlogs, prevCommitSeq);
} finally {
lock.readLock().unlock();
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
index 47c91f2a76..44545b6fb3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
@@ -67,10 +67,19 @@ public class TableBinlog {
}
}
- public Pair<TStatus, TBinlog> getBinlog(long commitSeq) {
+ public Pair<TStatus, TBinlog> getBinlog(long prevCommitSeq) {
lock.readLock().lock();
try {
- return BinlogUtils.getBinlog(binlogs, commitSeq);
+ return BinlogUtils.getBinlog(binlogs, prevCommitSeq);
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ public Pair<TStatus, Long> getBinlogLag(long prevCommitSeq) {
+ lock.readLock().lock();
+ try {
+ return BinlogUtils.getBinlogLag(binlogs, prevCommitSeq);
} finally {
lock.readLock().unlock();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index a9f81c8ee8..0ea0f7d839 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -106,6 +106,7 @@ import org.apache.doris.thrift.TFinishTaskRequest;
import org.apache.doris.thrift.TFrontendPingFrontendRequest;
import org.apache.doris.thrift.TFrontendPingFrontendResult;
import org.apache.doris.thrift.TFrontendPingFrontendStatusCode;
+import org.apache.doris.thrift.TGetBinlogLagResult;
import org.apache.doris.thrift.TGetBinlogRequest;
import org.apache.doris.thrift.TGetBinlogResult;
import org.apache.doris.thrift.TGetDbsParams;
@@ -2572,4 +2573,99 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
return result;
}
+
+ // getBinlogLag
+ public TGetBinlogLagResult getBinlogLag(TGetBinlogRequest request) throws
TException {
+ String clientAddr = getClientAddrAsString();
+ LOG.debug("receive get binlog request: {}", request);
+
+ TGetBinlogLagResult result = new TGetBinlogLagResult();
+ TStatus status = new TStatus(TStatusCode.OK);
+ result.setStatus(status);
+ try {
+ result = getBinlogLagImpl(request, clientAddr);
+ } catch (UserException e) {
+ LOG.warn("failed to get binlog: {}", e.getMessage());
+ status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
+ status.addToErrorMsgs(e.getMessage());
+ } catch (Throwable e) {
+ LOG.warn("catch unknown result.", e);
+ status.setStatusCode(TStatusCode.INTERNAL_ERROR);
+ status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
+ return result;
+ }
+
+ return result;
+ }
+
+ private TGetBinlogLagResult getBinlogLagImpl(TGetBinlogRequest request,
String clientIp) throws UserException {
+ /// Check all required arg: user, passwd, db, prev_commit_seq
+ if (!request.isSetUser()) {
+ throw new UserException("user is not set");
+ }
+ if (!request.isSetPasswd()) {
+ throw new UserException("passwd is not set");
+ }
+ if (!request.isSetDb()) {
+ throw new UserException("db is not set");
+ }
+ if (!request.isSetPrevCommitSeq()) {
+ throw new UserException("prev_commit_seq is not set");
+ }
+
+
+ // step 1: check auth
+ String cluster = request.getCluster();
+ if (Strings.isNullOrEmpty(cluster)) {
+ cluster = SystemInfoService.DEFAULT_CLUSTER;
+ }
+ if (Strings.isNullOrEmpty(request.getToken())) {
+ checkPasswordAndPrivs(cluster, request.getUser(),
request.getPasswd(), request.getDb(), request.getTable(),
+ request.getUserIp(), PrivPredicate.SELECT);
+ }
+
+ // step 3: check database
+ Env env = Env.getCurrentEnv();
+ String fullDbName = ClusterNamespace.getFullName(cluster,
request.getDb());
+ Database db = env.getInternalCatalog().getDbNullable(fullDbName);
+ if (db == null) {
+ String dbName = fullDbName;
+ if (Strings.isNullOrEmpty(request.getCluster())) {
+ dbName = request.getDb();
+ }
+ throw new UserException("unknown database, database=" + dbName);
+ }
+
+ // step 4: fetch all tableIds
+ // lookup tables && convert into tableIdList
+ long tableId = -1;
+ if (request.isSetTableId()) {
+ tableId = request.getTableId();
+ } else if (request.isSetTable()) {
+ String tableName = request.getTable();
+ Table table = db.getTableOrMetaException(tableName,
TableType.OLAP);
+ if (table == null) {
+ throw new UserException("unknown table, table=" + tableName);
+ }
+ tableId = table.getId();
+ }
+
+ // step 6: get binlog
+ long dbId = db.getId();
+ TGetBinlogLagResult result = new TGetBinlogLagResult();
+ result.setStatus(new TStatus(TStatusCode.OK));
+ long prevCommitSeq = request.getPrevCommitSeq();
+
+ Pair<TStatus, Long> statusLagPair =
env.getBinlogManager().getBinlogLag(dbId, tableId, prevCommitSeq);
+ TStatus status = statusLagPair.first;
+ if (status != null && status.getStatusCode() != TStatusCode.OK) {
+ result.setStatus(status);
+ }
+ Long binlogLag = statusLagPair.second;
+ if (binlogLag != null) {
+ result.setLag(binlogLag);
+ }
+
+ return result;
+ }
}
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index 6cbf075739..9c84ba6906 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1046,6 +1046,13 @@ struct TGetMasterTokenResult {
2: optional string token
}
+typedef TGetBinlogRequest TGetBinlogLagRequest
+
+struct TGetBinlogLagResult {
+ 1: optional Status.TStatus status
+ 2: optional i64 lag
+}
+
service FrontendService {
TGetDbsResult getDbNames(1: TGetDbsParams params)
TGetTablesResult getTableNames(1: TGetTablesParams params)
@@ -1108,4 +1115,6 @@ service FrontendService {
TGetTabletReplicaInfosResult getTabletReplicaInfos(1:
TGetTabletReplicaInfosRequest request)
TGetMasterTokenResult getMasterToken(1: TGetMasterTokenRequest request)
+
+ TGetBinlogLagResult getBinlogLag(1: TGetBinlogLagRequest request)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]