This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 21e7428 [Feature] Support cancel load jobs in batch (#4515)
21e7428 is described below
commit 21e7428999c1ec5cd201eebba8a38ebb50a3b662
Author: xy720 <[email protected]>
AuthorDate: Thu Oct 15 22:49:39 2020 +0800
[Feature] Support cancel load jobs in batch (#4515)
Support statement like:
`cancel load where label like 'xxx';`
---
.../org/apache/doris/analysis/CancelLoadStmt.java | 25 ++++-
.../src/main/java/org/apache/doris/load/Load.java | 114 +++++++++++++++++++--
.../org/apache/doris/load/loadv2/LoadManager.java | 56 ++++++++++
.../main/java/org/apache/doris/qe/DdlExecutor.java | 16 ++-
.../apache/doris/analysis/CancelLoadStmtTest.java | 88 ++++++++++++++++
5 files changed, 285 insertions(+), 14 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java
index 0576527..4fda341 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java
@@ -24,12 +24,17 @@ import org.apache.doris.common.UserException;
import com.google.common.base.Strings;
+// CANCEL LOAD statement used to cancel load job.
+//
+// syntax:
+// CANCEL LOAD [FROM db] WHERE load_label (= "xxx" | LIKE "xxx")
public class CancelLoadStmt extends DdlStmt {
private String dbName;
private String label;
private Expr whereClause;
+ private boolean isAccurateMatch;
public String getDbName() {
return dbName;
@@ -42,6 +47,11 @@ public class CancelLoadStmt extends DdlStmt {
public CancelLoadStmt(String dbName, Expr whereClause) {
this.dbName = dbName;
this.whereClause = whereClause;
+ this.isAccurateMatch = false;
+ }
+
+ public boolean isAccurateMatch() {
+ return isAccurateMatch;
}
@Override
@@ -68,10 +78,17 @@ public class CancelLoadStmt extends DdlStmt {
if (whereClause instanceof BinaryPredicate) {
BinaryPredicate binaryPredicate = (BinaryPredicate)
whereClause;
+ isAccurateMatch = true;
if (binaryPredicate.getOp() != Operator.EQ) {
valid = false;
break;
}
+ } else if (whereClause instanceof LikePredicate) {
+ LikePredicate likePredicate = (LikePredicate) whereClause;
+ if (likePredicate.getOp() != LikePredicate.Operator.LIKE) {
+ valid = false;
+ break;
+ }
} else {
valid = false;
break;
@@ -101,7 +118,8 @@ public class CancelLoadStmt extends DdlStmt {
} while (false);
if (!valid) {
- throw new AnalysisException("Where clause should looks like: LABEL
= \"your_load_label\"");
+ throw new AnalysisException("Where clause should looks like: LABEL
= \"your_load_label\"," +
+ " or LABEL LIKE \"matcher\"");
}
}
@@ -119,4 +137,9 @@ public class CancelLoadStmt extends DdlStmt {
return stringBuilder.toString();
}
+ @Override
+ public String toString() {
+ return toSql();
+ }
+
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
index 5460616..79e4787 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
@@ -107,7 +107,6 @@ import
org.apache.doris.transaction.TransactionState.LoadJobSourceType;
import org.apache.doris.transaction.TransactionState.TxnCoordinator;
import org.apache.doris.transaction.TransactionState.TxnSourceType;
import org.apache.doris.transaction.TransactionStatus;
-
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
@@ -133,6 +132,7 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
public class Load {
private static final Logger LOG = LogManager.getLogger(Load.class);
@@ -1554,7 +1554,7 @@ public class Load {
return false;
}
- public boolean isLabelExist(String dbName, String label) throws
DdlException {
+ public boolean isLabelExist(String dbName, String labelValue, boolean
isAccurateMatch) throws DdlException {
// get load job and check state
Database db = Catalog.getCurrentCatalog().getDb(dbName);
if (db == null) {
@@ -1566,8 +1566,19 @@ public class Load {
if (labelToLoadJobs == null) {
return false;
}
- List<LoadJob> loadJobs = labelToLoadJobs.get(label);
- if (loadJobs == null) {
+ List<LoadJob> loadJobs = Lists.newArrayList();
+ if (isAccurateMatch) {
+ if (labelToLoadJobs.containsKey(labelValue)) {
+ loadJobs.addAll(labelToLoadJobs.get(labelValue));
+ }
+ } else {
+ for (Map.Entry<String, List<LoadJob>> entry :
labelToLoadJobs.entrySet()) {
+ if (entry.getKey().contains(labelValue)) {
+ loadJobs.addAll(entry.getValue());
+ }
+ }
+ }
+ if (loadJobs.isEmpty()) {
return false;
}
if (loadJobs.stream().filter(entity -> entity.getState() !=
JobState.CANCELLED).count() == 0) {
@@ -1579,6 +1590,93 @@ public class Load {
}
}
+ public boolean cancelLoadJob(CancelLoadStmt stmt, boolean isAccurateMatch)
throws DdlException {
+ // get params
+ String dbName = stmt.getDbName();
+ String label = stmt.getLabel();
+
+ // get load job and check state
+ Database db = Catalog.getCurrentCatalog().getDb(dbName);
+ if (db == null) {
+ throw new DdlException("Db does not exist. name: " + dbName);
+ }
+ // List of load jobs waiting to be cancelled
+ List<LoadJob> loadJobs = Lists.newArrayList();
+ readLock();
+ try {
+ Map<String, List<LoadJob>> labelToLoadJobs =
dbLabelToLoadJobs.get(db.getId());
+ if (labelToLoadJobs == null) {
+ throw new DdlException("Load job does not exist");
+ }
+
+ // get jobs by label
+ List<LoadJob> matchLoadJobs = Lists.newArrayList();
+ if (isAccurateMatch) {
+ if (labelToLoadJobs.containsKey(label)) {
+ matchLoadJobs.addAll(labelToLoadJobs.get(label));
+ }
+ } else {
+ for (Map.Entry<String, List<LoadJob>> entry :
labelToLoadJobs.entrySet()) {
+ if (entry.getKey().contains(label)) {
+ matchLoadJobs.addAll(entry.getValue());
+ }
+ }
+ }
+
+ if (matchLoadJobs.isEmpty()) {
+ throw new DdlException("Load job does not exist");
+ }
+
+ // check state here
+ List<LoadJob> uncompletedLoadJob =
matchLoadJobs.stream().filter(job -> {
+ JobState state = job.getState();
+ return state != JobState.CANCELLED && state !=
JobState.QUORUM_FINISHED && state != JobState.FINISHED;
+ }).collect(Collectors.toList());
+ if (uncompletedLoadJob.isEmpty()) {
+ throw new DdlException("There is no uncompleted job which
label " +
+ (isAccurateMatch ? "is " : "like ") + stmt.getLabel());
+ }
+ loadJobs.addAll(uncompletedLoadJob);
+ } finally {
+ readUnlock();
+ }
+
+ // check auth here, cause we need table info
+ Set<String> tableNames = Sets.newHashSet();
+ for (LoadJob loadJob : loadJobs) {
+ tableNames.addAll(loadJob.getTableNames());
+ }
+
+ if (tableNames.isEmpty()) {
+ if
(Catalog.getCurrentCatalog().getAuth().checkDbPriv(ConnectContext.get(), dbName,
+ PrivPredicate.LOAD)) {
+
ErrorReport.reportDdlException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR,
"CANCEL LOAD");
+ }
+ } else {
+ for (String tblName : tableNames) {
+ if
(!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(),
dbName, tblName,
+ PrivPredicate.LOAD)) {
+
ErrorReport.reportDdlException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "CANCEL
LOAD",
+ ConnectContext.get().getQualifiedUser(),
+ ConnectContext.get().getRemoteIP(), tblName);
+ }
+ }
+ }
+
+ // cancel job
+ for (LoadJob loadJob : loadJobs) {
+ List<String> failedMsg = Lists.newArrayList();
+ boolean ok = cancelLoadJob(loadJob, CancelType.USER_CANCEL, "user
cancel", failedMsg);
+ if (!ok) {
+ throw new DdlException("Cancel load job [" + loadJob.getId() +
"] fail, " +
+ "label=[" + loadJob.getLabel() + "] failed msg=" +
+ (failedMsg.isEmpty() ? "Unknown reason" :
failedMsg.get(0)));
+ }
+ }
+
+ return true;
+ }
+
public boolean cancelLoadJob(CancelLoadStmt stmt) throws DdlException {
// get params
String dbName = stmt.getDbName();
@@ -1618,16 +1716,16 @@ public class Load {
if (tableNames.isEmpty()) {
// forward compatibility
if
(!Catalog.getCurrentCatalog().getAuth().checkDbPriv(ConnectContext.get(),
dbName,
-
PrivPredicate.LOAD)) {
+ PrivPredicate.LOAD)) {
ErrorReport.reportDdlException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR,
"CANCEL LOAD");
}
} else {
for (String tblName : tableNames) {
if
(!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(),
dbName, tblName,
-
PrivPredicate.LOAD)) {
+ PrivPredicate.LOAD)) {
ErrorReport.reportDdlException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "CANCEL
LOAD",
-
ConnectContext.get().getQualifiedUser(),
-
ConnectContext.get().getRemoteIP(), tblName);
+ ConnectContext.get().getQualifiedUser(),
+ ConnectContext.get().getRemoteIP(), tblName);
}
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
index 2321a9d..8b42ab7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
@@ -284,6 +284,62 @@ public class LoadManager implements Writable{
Catalog.getCurrentCatalog().getEditLog().logCreateLoadJob(loadJob);
}
+ public void cancelLoadJob(CancelLoadStmt stmt, boolean isAccurateMatch)
throws DdlException {
+ Database db = Catalog.getCurrentCatalog().getDb(stmt.getDbName());
+ if (db == null) {
+ throw new DdlException("Db does not exist. name: " +
stmt.getDbName());
+ }
+
+ // List of load jobs waiting to be cancelled
+ List<LoadJob> loadJobs = Lists.newArrayList();
+ readLock();
+ try {
+ Map<String, List<LoadJob>> labelToLoadJobs =
dbIdToLabelToLoadJobs.get(db.getId());
+ if (labelToLoadJobs == null) {
+ throw new DdlException("Load job does not exist");
+ }
+
+ // get jobs by label
+ List<LoadJob> matchLoadJobs = Lists.newArrayList();
+ if (isAccurateMatch) {
+ if (labelToLoadJobs.containsKey(stmt.getLabel())) {
+ matchLoadJobs.addAll(labelToLoadJobs.get(stmt.getLabel()));
+ }
+ } else {
+ for (Map.Entry<String, List<LoadJob>> entry :
labelToLoadJobs.entrySet()) {
+ if (entry.getKey().contains(stmt.getLabel())) {
+ matchLoadJobs.addAll(entry.getValue());
+ }
+ }
+ }
+
+ if (matchLoadJobs.isEmpty()) {
+ throw new DdlException("Load job does not exist");
+ }
+
+ // check state here
+ List<LoadJob> uncompletedLoadJob =
matchLoadJobs.stream().filter(entity -> !entity.isTxnDone())
+ .collect(Collectors.toList());
+ if (uncompletedLoadJob.isEmpty()) {
+ throw new DdlException("There is no uncompleted job which
label " +
+ (isAccurateMatch ? "is " : "like ") + stmt.getLabel());
+ }
+
+ loadJobs.addAll(uncompletedLoadJob);
+ } finally {
+ readUnlock();
+ }
+
+ for (LoadJob loadJob : loadJobs) {
+ try {
+ loadJob.cancelJob(new FailMsg(FailMsg.CancelType.USER_CANCEL,
"user cancel"));
+ } catch (DdlException e) {
+ throw new DdlException("Cancel load job [" + loadJob.getId() +
"] fail, " +
+ "label=[" + loadJob.getLabel() + "] failed msg=" +
e.getMessage());
+ }
+ }
+ }
+
public void cancelLoadJob(CancelLoadStmt stmt) throws DdlException {
Database db = Catalog.getCurrentCatalog().getDb(stmt.getDbName());
if (db == null) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
index 60b9aae..1f1a2ec 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
@@ -135,11 +135,17 @@ public class DdlExecutor {
catalog.getLoadManager().createLoadJobFromStmt(loadStmt);
}
} else if (ddlStmt instanceof CancelLoadStmt) {
- if (catalog.getLoadInstance().isLabelExist(
- ((CancelLoadStmt) ddlStmt).getDbName(), ((CancelLoadStmt)
ddlStmt).getLabel())) {
- catalog.getLoadInstance().cancelLoadJob((CancelLoadStmt)
ddlStmt);
- } else {
- catalog.getLoadManager().cancelLoadJob((CancelLoadStmt)
ddlStmt);
+ boolean isAccurateMatch = ((CancelLoadStmt)
ddlStmt).isAccurateMatch();
+ boolean isLabelExist = catalog.getLoadInstance().isLabelExist(
+ ((CancelLoadStmt) ddlStmt).getDbName(),
+ ((CancelLoadStmt) ddlStmt).getLabel(), isAccurateMatch);
+ if (isLabelExist) {
+ catalog.getLoadInstance().cancelLoadJob((CancelLoadStmt)
ddlStmt,
+ isAccurateMatch);
+ }
+ if (!isLabelExist || isAccurateMatch) {
+ catalog.getLoadManager().cancelLoadJob((CancelLoadStmt)
ddlStmt,
+ isAccurateMatch);
}
} else if (ddlStmt instanceof CreateRoutineLoadStmt) {
catalog.getRoutineLoadManager().createRoutineLoadJob((CreateRoutineLoadStmt)
ddlStmt);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelLoadStmtTest.java
b/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelLoadStmtTest.java
new file mode 100644
index 0000000..5104b71
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelLoadStmtTest.java
@@ -0,0 +1,88 @@
+package org.apache.doris.analysis;
+
+import mockit.Expectations;
+
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.FakeCatalog;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.UserException;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CancelLoadStmtTest {
+ private Analyzer analyzer;
+ private Catalog catalog;
+
+ FakeCatalog fakeCatalog;
+
+ @Before
+ public void setUp() {
+ fakeCatalog = new FakeCatalog();
+
+ catalog = AccessTestUtil.fetchAdminCatalog();
+ FakeCatalog.setCatalog(catalog);
+
+ analyzer = AccessTestUtil.fetchAdminAnalyzer(true);
+ new Expectations(analyzer) {
+ {
+ analyzer.getDefaultDb();
+ minTimes = 0;
+ result = "testCluster:testDb";
+
+ analyzer.getQualifiedUser();
+ minTimes = 0;
+ result = "testCluster:testUser";
+
+ analyzer.getClusterName();
+ minTimes = 0;
+ result = "testCluster";
+
+ analyzer.getCatalog();
+ minTimes = 0;
+ result = catalog;
+ }
+ };
+ }
+
+ @Test
+ public void testNormal() throws UserException, AnalysisException {
+ SlotRef slotRef = new SlotRef(null, "label");
+ StringLiteral stringLiteral = new StringLiteral("doris_test_label");
+
+ BinaryPredicate binaryPredicate = new
BinaryPredicate(BinaryPredicate.Operator.EQ, slotRef, stringLiteral);
+ CancelLoadStmt stmt = new CancelLoadStmt(null, binaryPredicate);
+ stmt.analyze(analyzer);
+ Assert.assertTrue(stmt.isAccurateMatch());
+ Assert.assertEquals("CANCEL LOAD FROM testCluster:testDb WHERE `label`
= 'doris_test_label'", stmt.toString());
+
+ LikePredicate likePredicate = new
LikePredicate(LikePredicate.Operator.LIKE, slotRef, stringLiteral);
+ stmt = new CancelLoadStmt(null, likePredicate);
+ stmt.analyze(analyzer);
+ Assert.assertFalse(stmt.isAccurateMatch());
+ Assert.assertEquals("CANCEL LOAD FROM testCluster:testDb WHERE `label`
LIKE 'doris_test_label'", stmt.toString());
+ }
+
+ @Test(expected = AnalysisException.class)
+ public void testNoDb() throws UserException, AnalysisException {
+ SlotRef slotRef = new SlotRef(null, "label");
+ StringLiteral stringLiteral = new StringLiteral("doris_test_label");
+ new Expectations(analyzer) {
+ {
+ analyzer.getDefaultDb();
+ minTimes = 0;
+ result = "";
+
+ analyzer.getClusterName();
+ minTimes = 0;
+ result = "testCluster";
+ }
+ };
+
+ BinaryPredicate binaryPredicate = new
BinaryPredicate(BinaryPredicate.Operator.EQ, slotRef, stringLiteral);
+ CancelLoadStmt stmt = new CancelLoadStmt(null, binaryPredicate);
+ stmt.analyze(analyzer);
+ Assert.fail("No exception throws.");
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]