This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 46e4400a154 branch-3.0: [feat](binlog) Add Support recover binlog
#44818 (#45293)
46e4400a154 is described below
commit 46e4400a15408d7df1e7e702968ee7857de2d438
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Dec 12 19:45:30 2024 +0800
branch-3.0: [feat](binlog) Add Support recover binlog #44818 (#45293)
Cherry-picked from #44818
Co-authored-by: Vallish Pai <[email protected]>
---
.../org/apache/doris/binlog/BinlogManager.java | 27 ++++++++++++++++++
.../java/org/apache/doris/binlog/DBBinlog.java | 33 +++++++++++++++-------
.../apache/doris/catalog/CatalogRecycleBin.java | 6 ++--
.../apache/doris/datasource/InternalCatalog.java | 2 +-
.../java/org/apache/doris/persist/EditLog.java | 8 ++++--
.../java/org/apache/doris/persist/RecoverInfo.java | 22 +++++++++++++--
.../doris/persist/DropAndRecoverInfoTest.java | 2 +-
gensrc/thrift/FrontendService.thrift | 4 +--
8 files changed, 84 insertions(+), 20 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
index 67bb99a8bcd..b9eb91cc5f7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
@@ -34,6 +34,7 @@ import org.apache.doris.persist.DropInfo;
import org.apache.doris.persist.DropPartitionInfo;
import org.apache.doris.persist.ModifyCommentOperationLog;
import org.apache.doris.persist.ModifyTablePropertyOperationLog;
+import org.apache.doris.persist.RecoverInfo;
import org.apache.doris.persist.ReplacePartitionOperationLog;
import org.apache.doris.persist.ReplaceTableOperationLog;
import org.apache.doris.persist.TableAddOrDropColumnsInfo;
@@ -446,6 +447,32 @@ public class BinlogManager {
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false,
info);
}
+
+ private boolean supportedRecoverInfo(RecoverInfo info) {
+ //table name and partitionName added together.
+ // recover table case, tablename must exist in newer version
+ // recover partition case also table name must exist.
+ // so checking only table name here.
+ if (StringUtils.isEmpty(info.getTableName())) {
+ LOG.warn("skip recover info binlog, because tableName is empty.
info: {}", info);
+ return false;
+ }
+ return true;
+ }
+
+ public void addRecoverTableRecord(RecoverInfo info, long commitSeq) {
+ if (supportedRecoverInfo(info) == false) {
+ return;
+ }
+ long dbId = info.getDbId();
+ List<Long> tableIds = Lists.newArrayList();
+ tableIds.add(info.getTableId());
+ long timestamp = -1;
+ TBinlogType type = TBinlogType.RECOVER_INFO;
+ String data = info.toJson();
+ addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false,
info);
+ }
+
// get binlog by dbId, return first binlog.version > version
public Pair<TStatus, TBinlog> getBinlog(long dbId, long tableId, long
prevCommitSeq) {
TStatus status = new TStatus(TStatusCode.OK);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
index b78ed389a0f..0816564f150 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
@@ -25,6 +25,7 @@ import org.apache.doris.common.proc.BaseProcResult;
import org.apache.doris.persist.BarrierLog;
import org.apache.doris.persist.DropInfo;
import org.apache.doris.persist.DropPartitionInfo;
+import org.apache.doris.persist.RecoverInfo;
import org.apache.doris.persist.ReplaceTableOperationLog;
import org.apache.doris.thrift.TBinlog;
import org.apache.doris.thrift.TBinlogType;
@@ -124,7 +125,7 @@ public class DBBinlog {
allBinlogs.add(binlog);
binlogSize += BinlogUtils.getApproximateMemoryUsage(binlog);
- recordDroppedResources(binlog);
+ recordDroppedOrRecoveredResources(binlog);
if (tableIds == null) {
return;
@@ -178,7 +179,7 @@ public class DBBinlog {
return;
}
- recordDroppedResources(binlog, raw);
+ recordDroppedOrRecoveredResources(binlog, raw);
switch (binlog.getType()) {
case CREATE_TABLE:
@@ -623,16 +624,16 @@ public class DBBinlog {
}
}
- private void recordDroppedResources(TBinlog binlog) {
- recordDroppedResources(binlog, null);
+ private void recordDroppedOrRecoveredResources(TBinlog binlog) {
+ recordDroppedOrRecoveredResources(binlog, null);
}
// A method to record the dropped tables, indexes, and partitions.
- private void recordDroppedResources(TBinlog binlog, Object raw) {
- recordDroppedResources(binlog.getType(), binlog.getCommitSeq(),
binlog.getData(), raw);
+ private void recordDroppedOrRecoveredResources(TBinlog binlog, Object raw)
{
+ recordDroppedOrRecoveredResources(binlog.getType(),
binlog.getCommitSeq(), binlog.getData(), raw);
}
- private void recordDroppedResources(TBinlogType binlogType, long
commitSeq, String data, Object raw) {
+ private void recordDroppedOrRecoveredResources(TBinlogType binlogType,
long commitSeq, String data, Object raw) {
if (raw == null) {
switch (binlogType) {
case DROP_PARTITION:
@@ -656,6 +657,9 @@ public class DBBinlog {
case BARRIER:
raw = BarrierLog.fromJson(data);
break;
+ case RECOVER_INFO:
+ raw = RecoverInfo.fromJson(data);
+ break;
default:
break;
}
@@ -664,10 +668,10 @@ public class DBBinlog {
}
}
- recordDroppedResources(binlogType, commitSeq, raw);
+ recordDroppedOrRecoveredResources(binlogType, commitSeq, raw);
}
- private void recordDroppedResources(TBinlogType binlogType, long
commitSeq, Object raw) {
+ private void recordDroppedOrRecoveredResources(TBinlogType binlogType,
long commitSeq, Object raw) {
if (binlogType == TBinlogType.DROP_PARTITION && raw instanceof
DropPartitionInfo) {
long partitionId = ((DropPartitionInfo) raw).getPartitionId();
if (partitionId > 0) {
@@ -706,7 +710,16 @@ public class DBBinlog {
BarrierLog log = (BarrierLog) raw;
// keep compatible with doris 2.0/2.1
if (log.hasBinlog()) {
- recordDroppedResources(log.getBinlogType(), commitSeq,
log.getBinlog(), null);
+ recordDroppedOrRecoveredResources(log.getBinlogType(),
commitSeq, log.getBinlog(), null);
+ }
+ } else if ((binlogType == TBinlogType.RECOVER_INFO) && (raw instanceof
RecoverInfo)) {
+ RecoverInfo recoverInfo = (RecoverInfo) raw;
+ long partitionId = recoverInfo.getPartitionId();
+ long tableId = recoverInfo.getTableId();
+ if (partitionId > 0) {
+ droppedPartitions.removeIf(entry -> (entry.first ==
partitionId));
+ } else if (tableId > 0) {
+ droppedTables.removeIf(entry -> (entry.first == tableId));
}
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java
index 745c1c8a351..b5899435343 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java
@@ -774,7 +774,8 @@ public class CatalogRecycleBin extends MasterDaemon
implements Writable, GsonPos
LOG.info("replay recover table[{}]", table.getId());
} else {
// log
- RecoverInfo recoverInfo = new RecoverInfo(db.getId(),
table.getId(), -1L, "", newTableName, "");
+ RecoverInfo recoverInfo = new RecoverInfo(db.getId(),
table.getId(),
+ -1L, "", table.getName(),
newTableName, "", "");
Env.getCurrentEnv().getEditLog().logRecoverTable(recoverInfo);
}
// Only olap table need recover dynamic partition, other table
like jdbc odbc view.. do not need it
@@ -873,7 +874,8 @@ public class CatalogRecycleBin extends MasterDaemon
implements Writable, GsonPos
idToRecycleTime.remove(partitionId);
// log
- RecoverInfo recoverInfo = new RecoverInfo(dbId, table.getId(),
partitionId, "", "", newPartitionName);
+ RecoverInfo recoverInfo = new RecoverInfo(dbId, table.getId(),
partitionId, "",
+ table.getName(), "",
partitionName, newPartitionName);
Env.getCurrentEnv().getEditLog().logRecoverPartition(recoverInfo);
LOG.info("recover partition[{}]", partitionId);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index 53641d70c3e..b3ef531660b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -657,7 +657,7 @@ public class InternalCatalog implements CatalogIf<Database>
{
fullNameToDb.put(db.getFullName(), db);
idToDb.put(db.getId(), db);
// log
- RecoverInfo recoverInfo = new RecoverInfo(db.getId(), -1L, -1L,
newDbName, "", "");
+ RecoverInfo recoverInfo = new RecoverInfo(db.getId(), -1L, -1L,
newDbName, "", "", "", "");
Env.getCurrentEnv().getEditLog().logRecoverDb(recoverInfo);
db.unmarkDropped();
} finally {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index f1377e9daeb..73a2c6338da 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -297,11 +297,13 @@ public class EditLog {
case OperationType.OP_RECOVER_TABLE: {
RecoverInfo info = (RecoverInfo) journal.getData();
env.replayRecoverTable(info);
+ env.getBinlogManager().addRecoverTableRecord(info, logId);
break;
}
case OperationType.OP_RECOVER_PARTITION: {
RecoverInfo info = (RecoverInfo) journal.getData();
env.replayRecoverPartition(info);
+ env.getBinlogManager().addRecoverTableRecord(info, logId);
break;
}
case OperationType.OP_RENAME_TABLE: {
@@ -1444,7 +1446,8 @@ public class EditLog {
}
public void logRecoverPartition(RecoverInfo info) {
- logEdit(OperationType.OP_RECOVER_PARTITION, info);
+ long logId = logEdit(OperationType.OP_RECOVER_PARTITION, info);
+ Env.getCurrentEnv().getBinlogManager().addRecoverTableRecord(info,
logId);
}
public void logModifyPartition(ModifyPartitionInfo info) {
@@ -1471,7 +1474,8 @@ public class EditLog {
}
public void logRecoverTable(RecoverInfo info) {
- logEdit(OperationType.OP_RECOVER_TABLE, info);
+ long logId = logEdit(OperationType.OP_RECOVER_TABLE, info);
+ Env.getCurrentEnv().getBinlogManager().addRecoverTableRecord(info,
logId);
}
public void logDropRollup(DropInfo info) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/RecoverInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/RecoverInfo.java
index 15764a99b43..eb4af6494e8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/RecoverInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/RecoverInfo.java
@@ -38,10 +38,14 @@ public class RecoverInfo implements Writable,
GsonPostProcessable {
private String newDbName;
@SerializedName(value = "tableId")
private long tableId;
+ @SerializedName(value = "tableName")
+ private String tableName; /// added for table name.
@SerializedName(value = "newTableName")
private String newTableName;
@SerializedName(value = "partitionId")
private long partitionId;
+ @SerializedName(value = "partitionName")
+ private String partitionName;
@SerializedName(value = "newPartitionName")
private String newPartitionName;
@@ -49,13 +53,15 @@ public class RecoverInfo implements Writable,
GsonPostProcessable {
// for persist
}
- public RecoverInfo(long dbId, long tableId, long partitionId, String
newDbName, String newTableName,
- String newPartitionName) {
+ public RecoverInfo(long dbId, long tableId, long partitionId, String
newDbName, String tableName,
+ String newTableName, String partitionName, String
newPartitionName) {
this.dbId = dbId;
this.tableId = tableId;
+ this.tableName = tableName;
this.partitionId = partitionId;
this.newDbName = newDbName;
this.newTableName = newTableName;
+ this.partitionName = partitionName;
this.newPartitionName = newPartitionName;
}
@@ -67,6 +73,10 @@ public class RecoverInfo implements Writable,
GsonPostProcessable {
return tableId;
}
+ public String getTableName() {
+ return tableName;
+ }
+
public long getPartitionId() {
return partitionId;
}
@@ -109,4 +119,12 @@ public class RecoverInfo implements Writable,
GsonPostProcessable {
public void gsonPostProcess() throws IOException {
newDbName = ClusterNamespace.getNameFromFullName(newDbName);
}
+
+ public String toJson() {
+ return GsonUtils.GSON.toJson(this);
+ }
+
+ public static RecoverInfo fromJson(String json) {
+ return GsonUtils.GSON.fromJson(json, RecoverInfo.class);
+ }
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/persist/DropAndRecoverInfoTest.java
b/fe/fe-core/src/test/java/org/apache/doris/persist/DropAndRecoverInfoTest.java
index 8c74fba2753..63afe375548 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/persist/DropAndRecoverInfoTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/persist/DropAndRecoverInfoTest.java
@@ -86,7 +86,7 @@ public class DropAndRecoverInfoTest {
file.createNewFile();
DataOutputStream dos = new DataOutputStream(new
FileOutputStream(file));
- RecoverInfo info1 = new RecoverInfo(1, 2, 3, "a", "b", "c");
+ RecoverInfo info1 = new RecoverInfo(1, 2, 3, "a", "", "b", "", "c");
info1.write(dos);
dos.flush();
dos.close();
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index a18f7a26297..cb2f3fe9b9a 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1198,6 +1198,7 @@ enum TBinlogType {
RENAME_ROLLUP = 21,
RENAME_PARTITION = 22,
DROP_ROLLUP = 23,
+ RECOVER_INFO = 24,
// Keep some IDs for allocation so that when new binlog types are added in
the
// future, the changes can be picked back to the old versions without
breaking
@@ -1214,8 +1215,7 @@ enum TBinlogType {
// MODIFY_XXX = 17,
// MIN_UNKNOWN = 18,
// UNKNOWN_3 = 19,
- MIN_UNKNOWN = 24,
- UNKNOWN_9 = 25,
+ MIN_UNKNOWN = 25,
UNKNOWN_10 = 26,
UNKNOWN_11 = 27,
UNKNOWN_12 = 28,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]