This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4a8669020af [fix](group commit) Fix some group commit case (#30132)
4a8669020af is described below
commit 4a8669020af171750e6c879eb5bac046be861c02
Author: meiyi <[email protected]>
AuthorDate: Sun Jan 21 11:42:25 2024 +0800
[fix](group commit) Fix some group commit case (#30132)
---
.../org/apache/doris/alter/SchemaChangeJobV2.java | 15 ++++++-------
.../java/org/apache/doris/alter/SystemHandler.java | 11 +++++-----
.../apache/doris/analysis/NativeInsertStmt.java | 2 +-
.../doris/httpv2/rest/CheckWalSizeAction.java | 3 +--
.../org/apache/doris/httpv2/rest/LoadAction.java | 23 +++++++++-----------
.../org/apache/doris/load/GroupCommitManager.java | 25 +++++++++-------------
.../apache/doris/planner/GroupCommitPlanner.java | 3 ---
gensrc/proto/internal_service.proto | 6 +++---
8 files changed, 38 insertions(+), 50 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index eef902dea82..5c74164ae33 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -48,7 +48,6 @@ import org.apache.doris.common.SchemaVersionAndHash;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.DbUtil;
import org.apache.doris.common.util.TimeUtils;
-import org.apache.doris.load.GroupCommitManager.SchemaChangeStatus;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTask;
@@ -602,8 +601,8 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
private void waitWalFinished() {
// wait wal done here
- Env.getCurrentEnv().getGroupCommitManager().setStatus(tableId,
SchemaChangeStatus.BLOCK);
- LOG.info("block table {}", tableId);
+ Env.getCurrentEnv().getGroupCommitManager().blockTable(tableId);
+ LOG.info("block group commit for table={} when schema change",
tableId);
List<Long> aliveBeIds =
Env.getCurrentSystemInfo().getAllBackendIds(true);
long expireTime = System.currentTimeMillis() +
Config.check_wal_queue_timeout_threshold;
while (true) {
@@ -611,21 +610,21 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
boolean walFinished = Env.getCurrentEnv().getGroupCommitManager()
.isPreviousWalFinished(tableId, aliveBeIds);
if (walFinished) {
- LOG.info("all wal is finished");
+ LOG.info("all wal is finished for table={}", tableId);
break;
} else if (System.currentTimeMillis() > expireTime) {
- LOG.warn("waitWalFinished time out");
+ LOG.warn("waitWalFinished time out for table={}", tableId);
break;
} else {
try {
Thread.sleep(100);
} catch (InterruptedException ie) {
- LOG.info("schema change job sleep wait for wal
InterruptedException: ", ie);
+ LOG.warn("failed to wait for wal for table={} when schema
change", tableId, ie);
}
}
}
- Env.getCurrentEnv().getGroupCommitManager().setStatus(tableId,
SchemaChangeStatus.NORMAL);
- LOG.info("release table {}", tableId);
+ Env.getCurrentEnv().getGroupCommitManager().unblockTable(tableId);
+ LOG.info("unblock group commit for table={} when schema change",
tableId);
}
private void onFinished(OlapTable tbl) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java
index 935dcf36293..86551ba0735 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java
@@ -83,7 +83,8 @@ public class SystemHandler extends AlterHandler {
}
List<Long> backendTabletIds =
invertedIndex.getTabletIdsByBackendId(beId);
- if (Config.drop_backend_after_decommission && checkTablets(beId,
backendTabletIds) && checkWal(backend)) {
+ boolean hasWal = checkWal(backend);
+ if (Config.drop_backend_after_decommission && checkTablets(beId,
backendTabletIds) && hasWal) {
try {
systemInfoService.dropBackend(beId);
LOG.info("no available tablet on decommission backend {},
drop it", beId);
@@ -94,8 +95,9 @@ public class SystemHandler extends AlterHandler {
continue;
}
- LOG.info("backend {} lefts {} replicas to decommission: {}", beId,
backendTabletIds.size(),
- backendTabletIds.subList(0, Math.min(10,
backendTabletIds.size())));
+ LOG.info("backend {} lefts {} replicas to decommission: {}{}",
beId, backendTabletIds.size(),
+ backendTabletIds.subList(0, Math.min(10,
backendTabletIds.size())),
+ hasWal ? "; and has unfinished WALs" : "");
}
}
@@ -197,8 +199,7 @@ public class SystemHandler extends AlterHandler {
}
private boolean checkWal(Backend backend) {
- return Env.getCurrentEnv().getGroupCommitManager()
- .getAllWalQueueSize(backend) == 0;
+ return
Env.getCurrentEnv().getGroupCommitManager().getAllWalQueueSize(backend) == 0;
}
private List<Backend> checkDecommission(DecommissionBackendClause
decommissionBackendClause)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
index d69ca40ceca..96dbb2e0edf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
@@ -1140,7 +1140,7 @@ public class NativeInsertStmt extends InsertStmt {
return;
}
boolean partialUpdate =
ConnectContext.get().getSessionVariable().isEnableUniqueKeyPartialUpdate();
- if (!partialUpdate &&
ConnectContext.get().getSessionVariable().isEnableInsertGroupCommit()
+ if (!isExplain() && !partialUpdate &&
ConnectContext.get().getSessionVariable().isEnableInsertGroupCommit()
&& ConnectContext.get().getSessionVariable().getSqlMode() !=
SqlModeHelper.MODE_NO_BACKSLASH_ESCAPES
&& targetTable instanceof OlapTable
&& ((OlapTable)
targetTable).getTableProperty().getUseSchemaLightChange()
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/CheckWalSizeAction.java
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/CheckWalSizeAction.java
index f7822580fb7..fdc39e8badd 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/CheckWalSizeAction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/CheckWalSizeAction.java
@@ -84,8 +84,7 @@ public class CheckWalSizeAction extends RestBaseController {
List<Backend> backends = getBackends(hostInfos);
List<String> backendsList = new ArrayList<>();
for (Backend backend : backends) {
- long size = Env.getCurrentEnv().getGroupCommitManager()
- .getAllWalQueueSize(backend);
+ long size =
Env.getCurrentEnv().getGroupCommitManager().getAllWalQueueSize(backend);
backendsList.add(backend.getHost() + ":" +
backend.getHeartbeatPort() + ":" + size);
}
return ResponseEntityBuilder.ok(backendsList);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
index a78a7e9fa58..6952bd37b5c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
@@ -88,12 +88,11 @@ public class LoadAction extends RestBaseController {
@PathVariable(value = DB_KEY) String db,
@PathVariable(value = TABLE_KEY) String table) {
boolean groupCommit = false;
String groupCommitStr = request.getHeader("group_commit");
- if (groupCommitStr != null && groupCommitStr.equals("async_mode")) {
+ if (groupCommitStr != null &&
groupCommitStr.equalsIgnoreCase("async_mode")) {
groupCommit = true;
try {
- String[] pair = new String[] {db, table};
- if (isGroupCommitBlock(pair)) {
- String msg = "insert table " + pair[1] + " is blocked on
schema change";
+ if (isGroupCommitBlock(db, table)) {
+ String msg = "insert table " + table + " is blocked on
schema change";
return new RestBaseResult(msg);
}
} catch (Exception e) {
@@ -122,19 +121,17 @@ public class LoadAction extends RestBaseController {
}
}
- @RequestMapping(path = "/api/_http_stream",
- method = RequestMethod.PUT)
- public Object streamLoadWithSql(HttpServletRequest request,
- HttpServletResponse response) {
+ @RequestMapping(path = "/api/_http_stream", method = RequestMethod.PUT)
+ public Object streamLoadWithSql(HttpServletRequest request,
HttpServletResponse response) {
String sql = request.getHeader("sql");
LOG.info("streaming load sql={}", sql);
boolean groupCommit = false;
String groupCommitStr = request.getHeader("group_commit");
- if (groupCommitStr != null && groupCommitStr.equals("async_mode")) {
+ if (groupCommitStr != null &&
groupCommitStr.equalsIgnoreCase("async_mode")) {
groupCommit = true;
try {
String[] pair = parseDbAndTb(sql);
- if (isGroupCommitBlock(pair)) {
+ if (isGroupCommitBlock(pair[0], pair[1])) {
String msg = "insert table " + pair[1] + " is blocked on
schema change";
return new RestBaseResult(msg);
}
@@ -164,11 +161,11 @@ public class LoadAction extends RestBaseController {
}
}
- private boolean isGroupCommitBlock(String[] pair) throws TException {
- String fullDbName = getFullDbName(pair[0]);
+ private boolean isGroupCommitBlock(String db, String table) throws
TException {
+ String fullDbName = getFullDbName(db);
Database dbObj = Env.getCurrentInternalCatalog()
.getDbOrException(fullDbName, s -> new TException("database is
invalid for dbName: " + s));
- Table tblObj = dbObj.getTableOrException(pair[1], s -> new
TException("table is invalid: " + s));
+ Table tblObj = dbObj.getTableOrException(table, s -> new
TException("table is invalid: " + s));
return
Env.getCurrentEnv().getGroupCommitManager().isBlock(tblObj.getId());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java
b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java
index 3b9719b2594..12410945e9f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java
@@ -17,7 +17,6 @@
package org.apache.doris.load;
-
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.proto.InternalService.PGetWalQueueSizeRequest;
@@ -30,31 +29,27 @@ import org.apache.doris.thrift.TStatusCode;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.util.HashSet;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.Set;
import java.util.concurrent.Future;
public class GroupCommitManager {
- public enum SchemaChangeStatus {
- BLOCK, NORMAL
- }
-
private static final Logger LOG =
LogManager.getLogger(GroupCommitManager.class);
- private final Map<Long, SchemaChangeStatus> statusMap = new
ConcurrentHashMap<>();
+ private Set<Long> blockedTableIds = new HashSet<>();
public boolean isBlock(long tableId) {
- if (statusMap.containsKey(tableId)) {
- return statusMap.get(tableId) == SchemaChangeStatus.BLOCK;
- }
- return false;
+ return blockedTableIds.contains(tableId);
+ }
+
+ public void blockTable(long tableId) {
+ blockedTableIds.add(tableId);
}
- public void setStatus(long tableId, SchemaChangeStatus status) {
- LOG.debug("Setting status for tableId {}: {}", tableId, status);
- statusMap.put(tableId, status);
+ public void unblockTable(long tableId) {
+ blockedTableIds.remove(tableId);
}
/**
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java
index 8b9f6b18331..b69ece3b9ae 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java
@@ -153,9 +153,6 @@ public class GroupCommitPlanner {
}
}
PGroupCommitInsertRequest request =
PGroupCommitInsertRequest.newBuilder()
- .setDbId(db.getId())
- .setTableId(table.getId())
- .setBaseSchemaVersion(table.getBaseSchemaVersion())
.setExecPlanFragmentRequest(InternalService.PExecPlanFragmentRequest.newBuilder()
.setRequest(execPlanFragmentParamsBytes)
.setCompact(false).setVersion(InternalService.PFragmentRequestVersion.VERSION_2).build())
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index 433144b304b..cf45d039522 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -766,11 +766,11 @@ message PGlobResponse {
}
message PGroupCommitInsertRequest {
- optional int64 db_id = 1;
- optional int64 table_id = 2;
+ optional int64 db_id = 1; // deprecated
+ optional int64 table_id = 2; // deprecated
// Descriptors.TDescriptorTable
// optional bytes desc_tbl = 3;
- optional int64 base_schema_version = 4;
+ optional int64 base_schema_version = 4; // deprecated
// TExecPlanFragmentParams -> TPlanFragment -> PlanNodes.TPlan
// optional bytes plan_node = 5;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]