This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new f2ea39a7fba branch-3.0: [improvement](info) Add F/L commitSeq and
commitTs for get_lag #46769 (#47138)
f2ea39a7fba is described below
commit f2ea39a7fbadf9d355e57c87d2a8e3c85fa886ae
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Feb 19 19:28:25 2025 +0800
branch-3.0: [improvement](info) Add F/L commitSeq and commitTs for get_lag
#46769 (#47138)
Cherry-picked from #46769
Co-authored-by: Uniqueyou <[email protected]>
---
.../org/apache/doris/binlog/BinlogLagInfo.java | 63 ++++++++++++++++++++++
.../org/apache/doris/binlog/BinlogManager.java | 3 +-
.../java/org/apache/doris/binlog/BinlogUtils.java | 24 ++++++---
.../java/org/apache/doris/binlog/DBBinlog.java | 3 +-
.../java/org/apache/doris/binlog/TableBinlog.java | 3 +-
.../apache/doris/service/FrontendServiceImpl.java | 16 +++---
gensrc/thrift/FrontendService.thrift | 4 ++
7 files changed, 100 insertions(+), 16 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogLagInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogLagInfo.java
new file mode 100644
index 00000000000..83b4181fa2f
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogLagInfo.java
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.binlog;
+
+public class BinlogLagInfo {
+ private long lag;
+ private long firstCommitSeq;
+ private long lastCommitSeq;
+ private long firstCommitTs;
+ private long lastCommitTs;
+
+ public BinlogLagInfo(long lag, long firstCommitSeq, long lastCommitSeq,
long firstCommitTs, long lastCommitTs) {
+ this.lag = lag;
+ this.firstCommitSeq = firstCommitSeq;
+ this.lastCommitSeq = lastCommitSeq;
+ this.firstCommitTs = firstCommitTs;
+ this.lastCommitTs = lastCommitTs;
+ }
+
+ public BinlogLagInfo() {
+ lag = 0;
+ firstCommitSeq = 0;
+ lastCommitSeq = 0;
+ firstCommitTs = 0;
+ lastCommitTs = 0;
+ }
+
+ public long getLag() {
+ return lag;
+ }
+
+ public long getFirstCommitSeq() {
+ return firstCommitSeq;
+ }
+
+ public long getLastCommitSeq() {
+ return lastCommitSeq;
+ }
+
+ public long getFirstCommitTs() {
+ return firstCommitTs;
+ }
+
+ public long getLastCommitTs() {
+ return lastCommitTs;
+ }
+
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
index b15e511e75c..af4d8d408e7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
@@ -500,7 +500,7 @@ public class BinlogManager {
}
// get binlog by dbId, return first binlog.version > version
- public Pair<TStatus, Long> getBinlogLag(long dbId, long tableId, long
prevCommitSeq) {
+ public Pair<TStatus, BinlogLagInfo> getBinlogLag(long dbId, long tableId,
long prevCommitSeq) {
TStatus status = new TStatus(TStatusCode.OK);
lock.readLock().lock();
try {
@@ -510,7 +510,6 @@ public class BinlogManager {
LOG.warn("dbBinlog not found. dbId: {}", dbId);
return Pair.of(status, null);
}
-
return dbBinlog.getBinlogLag(tableId, prevCommitSeq);
} finally {
lock.readLock().unlock();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java
index e51bc931759..1fa930cd4d7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java
@@ -61,12 +61,15 @@ public class BinlogUtils {
}
}
- public static Pair<TStatus, Long> getBinlogLag(TreeSet<TBinlog> binlogs,
long prevCommitSeq) {
+ public static Pair<TStatus, BinlogLagInfo> getBinlogLag(TreeSet<TBinlog>
binlogs, long prevCommitSeq) {
TStatus status = new TStatus(TStatusCode.OK);
TBinlog firstBinlog = binlogs.first();
+ TBinlog lastBinlog = binlogs.last();
if (firstBinlog.getCommitSeq() > prevCommitSeq) {
- return Pair.of(status, Long.valueOf(binlogs.size()));
+ BinlogLagInfo lagInfo = new BinlogLagInfo(binlogs.size(),
firstBinlog.getCommitSeq(),
+ firstBinlog.getTimestamp(), lastBinlog.getCommitSeq(),
lastBinlog.getTimestamp());
+ return Pair.of(status, lagInfo);
}
// find first binlog whose commitSeq > commitSeq
@@ -75,11 +78,20 @@ public class BinlogUtils {
TBinlog binlog = binlogs.higher(guard);
// all prevCommitSeq <= commitSeq
- if (binlog == null) {
- return Pair.of(status, 0L);
- } else {
- return Pair.of(status,
Long.valueOf(binlogs.tailSet(binlog).size()));
+ long lag = 0;
+ long lastCommitSeq = 0;
+ long lastCommitTs = 0;
+ long firstCommitSeq = 0;
+ long firstCommitTs = 0;
+ if (binlog != null) {
+ lag = binlogs.tailSet(binlog).size();
+ firstCommitSeq = binlog.getCommitSeq();
+ firstCommitTs = binlog.getTimestamp();
+ lastCommitSeq = lastBinlog.getCommitSeq();
+ lastCommitTs = lastBinlog.getTimestamp();
}
+ return Pair.of(status, new BinlogLagInfo(lag, firstCommitSeq,
lastCommitSeq,
+ firstCommitTs, lastCommitTs));
}
public static TBinlog newDummyBinlog(long dbId, long tableId) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
index 2fbee550c91..09a06660b13 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
@@ -266,7 +266,7 @@ public class DBBinlog {
}
}
- public Pair<TStatus, Long> getBinlogLag(long tableId, long prevCommitSeq) {
+ public Pair<TStatus, BinlogLagInfo> getBinlogLag(long tableId, long
prevCommitSeq) {
TStatus status = new TStatus(TStatusCode.OK);
lock.readLock().lock();
try {
@@ -777,4 +777,5 @@ public class DBBinlog {
}
}
}
+
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
index 718fce49ee1..c4b291bca52 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
@@ -123,7 +123,7 @@ public class TableBinlog {
}
}
- public Pair<TStatus, Long> getBinlogLag(long prevCommitSeq) {
+ public Pair<TStatus, BinlogLagInfo> getBinlogLag(long prevCommitSeq) {
lock.readLock().lock();
try {
return BinlogUtils.getBinlogLag(binlogs, prevCommitSeq);
@@ -400,4 +400,5 @@ public class TableBinlog {
lock.readLock().unlock();
}
}
+
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 5f0b250f593..5ab79333385 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -29,6 +29,7 @@ import org.apache.doris.analysis.TableName;
import org.apache.doris.analysis.TableRef;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.backup.Snapshot;
+import org.apache.doris.binlog.BinlogLagInfo;
import org.apache.doris.catalog.AutoIncrementGenerator;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
@@ -3373,16 +3374,19 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
result.setStatus(new TStatus(TStatusCode.OK));
long prevCommitSeq = request.getPrevCommitSeq();
- Pair<TStatus, Long> statusLagPair =
env.getBinlogManager().getBinlogLag(dbId, tableId, prevCommitSeq);
- TStatus status = statusLagPair.first;
+ Pair<TStatus, BinlogLagInfo> binlogLagInfo =
env.getBinlogManager().getBinlogLag(dbId, tableId, prevCommitSeq);
+ TStatus status = binlogLagInfo.first;
if (status != null && status.getStatusCode() != TStatusCode.OK) {
result.setStatus(status);
}
- Long binlogLag = statusLagPair.second;
- if (binlogLag != null) {
- result.setLag(binlogLag);
+ BinlogLagInfo lagInfo = binlogLagInfo.second;
+ if (lagInfo != null) {
+ result.setLag(lagInfo.getLag());
+ result.setFirstCommitSeq(lagInfo.getFirstCommitSeq());
+ result.setLastCommitSeq(lagInfo.getLastCommitSeq());
+ result.setFirstBinlogTimestamp(lagInfo.getFirstCommitTs());
+ result.setLastBinlogTimestamp(lagInfo.getLastCommitTs());
}
-
return result;
}
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index f27b8bff9a4..1f91a7db1d0 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1493,6 +1493,10 @@ struct TGetBinlogLagResult {
1: optional Status.TStatus status
2: optional i64 lag
3: optional Types.TNetworkAddress master_address
+ 4: optional i64 first_commit_seq
+ 5: optional i64 last_commit_seq
+ 6: optional i64 first_binlog_timestamp
+ 7: optional i64 last_binlog_timestamp
}
struct TUpdateFollowerStatsCacheRequest {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]