This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 00be5d4a209 [fix](export) fix potential export concurrency issue
(#43109)
00be5d4a209 is described below
commit 00be5d4a2098eef19f9fdb2936f4269a34b56a41
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Sat Nov 2 08:35:44 2024 +0800
[fix](export) fix potential export concurrency issue (#43109)
### What problem does this PR solve?
Problem Summary:
```
2024-11-01 19:42:52,521 WARN (mysql-nio-pool-117|9514)
[StmtExecutor.execute():616] Analyze failed. stmt[250257,
59c581a512e7468f-b1cfd7d4b63fed33]
org.apache.doris.common.NereidsException: errCode = 2, detailMessage =
java.util.ConcurrentModificationException
at
org.apache.doris.qe.StmtExecutor.executeByNereids(StmtExecutor.java:780)
~[doris-fe.jar:1.2-SNAPSHOT]
at org.apache.doris.qe.StmtExecutor.execute(StmtExecutor.java:601)
~[doris-fe.jar:1.2-SNAPSHOT]
at
org.apache.doris.qe.StmtExecutor.queryRetry(StmtExecutor.java:564)
~[doris-fe.jar:1.2-SNAPSHOT]
at org.apache.doris.qe.StmtExecutor.execute(StmtExecutor.java:554)
~[doris-fe.jar:1.2-SNAPSHOT]
at
org.apache.doris.qe.ConnectProcessor.executeQuery(ConnectProcessor.java:340)
~[doris-fe.jar:1.2-SNAPSHOT]
at
org.apache.doris.qe.ConnectProcessor.handleQuery(ConnectProcessor.java:243)
~[doris-fe.jar:1.2-SNAPSHOT]
at
org.apache.doris.qe.MysqlConnectProcessor.handleQuery(MysqlConnectProcessor.java:208)
~[doris-fe.jar:1.2-SNAPSHOT]
at
org.apache.doris.qe.MysqlConnectProcessor.dispatch(MysqlConnectProcessor.java:236)
~[doris-fe.jar:1.2-SNAPSHOT]
at
org.apache.doris.qe.MysqlConnectProcessor.processOnce(MysqlConnectProcessor.java:413)
~[doris-fe.jar:1.2-SNAPSHOT]
at
org.apache.doris.mysql.ReadListener.lambda$handleEvent$0(ReadListener.java:52)
~[doris-fe.jar:1.2-SNAPSHOT]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
~[?:?]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
~[?:?]
at java.lang.Thread.run(Thread.java:840) ~[?:?]
Caused by: org.apache.doris.common.AnalysisException: errCode = 2,
detailMessage = java.util.ConcurrentModificationException
... 13 more
Caused by: java.util.ConcurrentModificationException
at java.util.ArrayList.forEach(ArrayList.java:1513) ~[?:?]
at
org.apache.doris.load.ExportMgr.addExportJobAndRegisterTask(ExportMgr.java:120)
~[doris-fe.jar:1.2-SNAPSHOT]
at
org.apache.doris.nereids.trees.plans.commands.ExportCommand.run(ExportCommand.java:149)
~[doris-fe.jar:1.2-SNAPSHOT]
at
org.apache.doris.qe.StmtExecutor.executeByNereids(StmtExecutor.java:749)
~[doris-fe.jar:1.2-SNAPSHOT]
... 12 more
```
### Check List (For Committer)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [x] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [x] Previous test can cover this change.
- [ ] No colde files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [x] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
- Release note
<!-- bugfix, feat, behavior changed need a release note -->
<!-- Add one line release note for this PR. -->
None
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java | 2 +-
fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java | 4 ++--
fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java | 4 +---
.../org/apache/doris/nereids/trees/plans/commands/ExportCommand.java | 2 +-
4 files changed, 5 insertions(+), 7 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
index a9ce85b2d3e..ba7aa50ec69 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
@@ -208,7 +208,7 @@ public class ExportStmt extends StatementBase implements
NotFallbackInParser {
}
private void setJob() throws UserException {
- exportJob = new ExportJob();
+ exportJob = new ExportJob(Env.getCurrentEnv().getNextId());
Database db =
Env.getCurrentInternalCatalog().getDbOrDdlException(this.tblName.getDb());
exportJob.setDbId(db.getId());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
index 5fe9c482633..9841ae2b11d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
@@ -396,8 +396,8 @@ public class ExportJob implements Writable {
return statementBase;
}
- public List<? extends TransientTaskExecutor> getTaskExecutors() {
- return jobExecutorList;
+ public List<? extends TransientTaskExecutor> getCopiedTaskExecutors() {
+ return Lists.newArrayList(jobExecutorList);
}
private void generateExportJobExecutor() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
index 80f738a4cdf..94f43d531bb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
@@ -95,8 +95,6 @@ public class ExportMgr {
}
public void addExportJobAndRegisterTask(ExportJob job) throws Exception {
- long jobId = Env.getCurrentEnv().getNextId();
- job.setId(jobId);
writeLock();
try {
if (dbTolabelToExportJobId.containsKey(job.getDbId())
@@ -117,7 +115,7 @@ public class ExportMgr {
BrokerUtil.deleteDirectoryWithFileSystem(fullPath.substring(0,
fullPath.lastIndexOf('/') + 1),
job.getBrokerDesc());
}
- job.getTaskExecutors().forEach(executor -> {
+ job.getCopiedTaskExecutors().forEach(executor -> {
Env.getCurrentEnv().getTransientTaskManager().addMemoryTask(executor);
});
Env.getCurrentEnv().getEditLog().logExportCreate(job);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java
index dbf6cf7067e..38083e406b9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java
@@ -242,7 +242,7 @@ public class ExportCommand extends Command implements
ForwardWithSync {
private ExportJob generateExportJob(ConnectContext ctx, Map<String,
String> fileProperties, TableName tblName)
throws UserException {
- ExportJob exportJob = new ExportJob();
+ ExportJob exportJob = new ExportJob(Env.getCurrentEnv().getNextId());
// set export job and check catalog/db/table
CatalogIf catalog =
ctx.getEnv().getCatalogMgr().getCatalogOrAnalysisException(tblName.getCtl());
DatabaseIf db = catalog.getDbOrAnalysisException(tblName.getDb());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]