This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 323f8c91a99 branch-3.0: [feat](binlog) Support getting binlogs in
batch #47557 (#47638)
323f8c91a99 is described below
commit 323f8c91a99332cef2bac3035503cad29c41aef4
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Feb 19 10:30:58 2025 +0800
branch-3.0: [feat](binlog) Support getting binlogs in batch #47557 (#47638)
Cherry-picked from #47557
Co-authored-by: walter <[email protected]>
Co-authored-by: Dongyang Li <[email protected]>
---
.../java/org/apache/doris/binlog/BinlogManager.java | 11 ++++++++++-
.../main/java/org/apache/doris/binlog/BinlogUtils.java | 17 ++++++++++++++---
.../src/main/java/org/apache/doris/binlog/DBBinlog.java | 6 +++---
.../main/java/org/apache/doris/binlog/TableBinlog.java | 4 ++--
.../org/apache/doris/service/FrontendServiceImpl.java | 10 +++++-----
gensrc/thrift/FrontendService.thrift | 1 +
6 files changed, 35 insertions(+), 14 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
index 57d356d8763..723262ff31b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
@@ -475,6 +475,15 @@ public class BinlogManager {
// get binlog by dbId, return first binlog.version > version
public Pair<TStatus, TBinlog> getBinlog(long dbId, long tableId, long
prevCommitSeq) {
+ Pair<TStatus, List<TBinlog>> result = getBinlog(dbId, tableId,
prevCommitSeq, 1);
+ if (result.second != null && result.second.size() > 0) {
+ return Pair.of(result.first, result.second.get(0));
+ }
+ return Pair.of(result.first, null);
+ }
+
+ // get binlogs by dbId, return the first N binlogs, which first
binlog.version > prevCommitSeq
+ public Pair<TStatus, List<TBinlog>> getBinlog(long dbId, long tableId,
long prevCommitSeq, long numAcquired) {
TStatus status = new TStatus(TStatusCode.OK);
lock.readLock().lock();
try {
@@ -485,7 +494,7 @@ public class BinlogManager {
return Pair.of(status, null);
}
- return dbBinlog.getBinlog(tableId, prevCommitSeq);
+ return dbBinlog.getBinlog(tableId, prevCommitSeq, numAcquired);
} finally {
lock.readLock().unlock();
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java
index 6b79fab143b..e51bc931759 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java
@@ -23,17 +23,23 @@ import org.apache.doris.thrift.TBinlogType;
import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TStatusCode;
+import java.util.ArrayList;
+import java.util.List;
import java.util.TreeSet;
+import java.util.stream.Collectors;
public class BinlogUtils {
- public static Pair<TStatus, TBinlog> getBinlog(TreeSet<TBinlog> binlogs,
long prevCommitSeq) {
+ public static Pair<TStatus, List<TBinlog>> getBinlog(
+ TreeSet<TBinlog> binlogs, long prevCommitSeq, long numAcquired) {
TStatus status = new TStatus(TStatusCode.OK);
TBinlog firstBinlog = binlogs.first();
// all commitSeq > commitSeq
if (firstBinlog.getCommitSeq() > prevCommitSeq) {
status.setStatusCode(TStatusCode.BINLOG_TOO_OLD_COMMIT_SEQ);
- return Pair.of(status, firstBinlog);
+ List<TBinlog> array = new ArrayList<>();
+ array.add(firstBinlog);
+ return Pair.of(status, array);
}
// find first binlog whose commitSeq > commitSeq
@@ -46,7 +52,12 @@ public class BinlogUtils {
status.setStatusCode(TStatusCode.BINLOG_TOO_NEW_COMMIT_SEQ);
return Pair.of(status, null);
} else {
- return Pair.of(status, binlog);
+ numAcquired = Math.min(Math.max(numAcquired, 1), 255);
+ List<TBinlog> obtain = binlogs.tailSet(binlog)
+ .stream()
+ .limit(numAcquired)
+ .collect(Collectors.toList());
+ return Pair.of(status, obtain);
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
index 0816564f150..b5e8a48df84 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
@@ -205,7 +205,7 @@ public class DBBinlog {
return dbId;
}
- public Pair<TStatus, TBinlog> getBinlog(long tableId, long prevCommitSeq) {
+ public Pair<TStatus, List<TBinlog>> getBinlog(long tableId, long
prevCommitSeq, long numAcquired) {
TStatus status = new TStatus(TStatusCode.OK);
lock.readLock().lock();
try {
@@ -216,10 +216,10 @@ public class DBBinlog {
status.setStatusCode(TStatusCode.BINLOG_NOT_FOUND_TABLE);
return Pair.of(status, null);
}
- return tableBinlog.getBinlog(prevCommitSeq);
+ return tableBinlog.getBinlog(prevCommitSeq, numAcquired);
}
- return BinlogUtils.getBinlog(allBinlogs, prevCommitSeq);
+ return BinlogUtils.getBinlog(allBinlogs, prevCommitSeq,
numAcquired);
} finally {
lock.readLock().unlock();
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
index f3279b328c9..755a0bfd171 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
@@ -106,10 +106,10 @@ public class TableBinlog {
}
}
- public Pair<TStatus, TBinlog> getBinlog(long prevCommitSeq) {
+ public Pair<TStatus, List<TBinlog>> getBinlog(long prevCommitSeq, long
numAcquired) {
lock.readLock().lock();
try {
- return BinlogUtils.getBinlog(binlogs, prevCommitSeq);
+ return BinlogUtils.getBinlog(binlogs, prevCommitSeq, numAcquired);
} finally {
lock.readLock().unlock();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 4a16a358dee..6e744ca5621 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -2869,7 +2869,9 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
TGetBinlogResult result = new TGetBinlogResult();
result.setStatus(new TStatus(TStatusCode.OK));
long prevCommitSeq = request.getPrevCommitSeq();
- Pair<TStatus, TBinlog> statusBinlogPair =
env.getBinlogManager().getBinlog(dbId, tableId, prevCommitSeq);
+ long numAcquired = request.getNumAcquired();
+ Pair<TStatus, List<TBinlog>> statusBinlogPair = env.getBinlogManager()
+ .getBinlog(dbId, tableId, prevCommitSeq, numAcquired);
TStatus status = statusBinlogPair.first;
if (status != null && status.getStatusCode() != TStatusCode.OK) {
result.setStatus(status);
@@ -2878,10 +2880,8 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
return result;
}
}
- TBinlog binlog = statusBinlogPair.second;
- if (binlog != null) {
- List<TBinlog> binlogs = Lists.newArrayList();
- binlogs.add(binlog);
+ List<TBinlog> binlogs = statusBinlogPair.second;
+ if (binlogs != null) {
result.setBinlogs(binlogs);
}
return result;
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index 5f1698bf36e..878eb4104e0 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1172,6 +1172,7 @@ struct TGetBinlogRequest {
7: optional string user_ip
8: optional string token
9: optional i64 prev_commit_seq
+ 10: optional i64 num_acquired // the max num of binlogs in a batch
}
enum TBinlogType {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]