This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 21e7428  [Feature] Support cancel load jobs in batch (#4515)
21e7428 is described below

commit 21e7428999c1ec5cd201eebba8a38ebb50a3b662
Author: xy720 <[email protected]>
AuthorDate: Thu Oct 15 22:49:39 2020 +0800

    [Feature] Support cancel load jobs in batch (#4515)
    
    Support statement like:
    `cancel load where label like 'xxx';`
---
 .../org/apache/doris/analysis/CancelLoadStmt.java  |  25 ++++-
 .../src/main/java/org/apache/doris/load/Load.java  | 114 +++++++++++++++++++--
 .../org/apache/doris/load/loadv2/LoadManager.java  |  56 ++++++++++
 .../main/java/org/apache/doris/qe/DdlExecutor.java |  16 ++-
 .../apache/doris/analysis/CancelLoadStmtTest.java  |  88 ++++++++++++++++
 5 files changed, 285 insertions(+), 14 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java
index 0576527..4fda341 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java
@@ -24,12 +24,17 @@ import org.apache.doris.common.UserException;
 
 import com.google.common.base.Strings;
 
+// CANCEL LOAD statement used to cancel load job.
+//
+// syntax:
+//      CANCEL LOAD [FROM db] WHERE load_label (= "xxx" | LIKE "xxx")
 public class CancelLoadStmt extends DdlStmt {
 
     private String dbName;
     private String label;
 
     private Expr whereClause;
+    private boolean isAccurateMatch;
 
     public String getDbName() {
         return dbName;
@@ -42,6 +47,11 @@ public class CancelLoadStmt extends DdlStmt {
     public CancelLoadStmt(String dbName, Expr whereClause) {
         this.dbName = dbName;
         this.whereClause = whereClause;
+        this.isAccurateMatch = false;
+    }
+
+    public boolean isAccurateMatch() {
+        return isAccurateMatch;
     }
 
     @Override
@@ -68,10 +78,17 @@ public class CancelLoadStmt extends DdlStmt {
 
             if (whereClause instanceof BinaryPredicate) {
                 BinaryPredicate binaryPredicate = (BinaryPredicate) 
whereClause;
+                isAccurateMatch = true;
                 if (binaryPredicate.getOp() != Operator.EQ) {
                     valid = false;
                     break;
                 }
+            } else if (whereClause instanceof LikePredicate) {
+                LikePredicate likePredicate = (LikePredicate) whereClause;
+                if (likePredicate.getOp() != LikePredicate.Operator.LIKE) {
+                    valid = false;
+                    break;
+                }
             } else {
                 valid = false;
                 break;
@@ -101,7 +118,8 @@ public class CancelLoadStmt extends DdlStmt {
         } while (false);
 
         if (!valid) {
-            throw new AnalysisException("Where clause should looks like: LABEL 
= \"your_load_label\"");
+            throw new AnalysisException("Where clause should looks like: LABEL 
= \"your_load_label\"," +
+                    " or LABEL LIKE \"matcher\"");
         }
     }
 
@@ -119,4 +137,9 @@ public class CancelLoadStmt extends DdlStmt {
         return stringBuilder.toString();
     }
 
+    @Override
+    public String toString() {
+        return toSql();
+    }
+
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
index 5460616..79e4787 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
@@ -107,7 +107,6 @@ import 
org.apache.doris.transaction.TransactionState.LoadJobSourceType;
 import org.apache.doris.transaction.TransactionState.TxnCoordinator;
 import org.apache.doris.transaction.TransactionState.TxnSourceType;
 import org.apache.doris.transaction.TransactionStatus;
-
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
@@ -133,6 +132,7 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
 
 public class Load {
     private static final Logger LOG = LogManager.getLogger(Load.class);
@@ -1554,7 +1554,7 @@ public class Load {
         return false;
     }
 
-    public boolean isLabelExist(String dbName, String label) throws 
DdlException {
+    public boolean isLabelExist(String dbName, String labelValue, boolean 
isAccurateMatch) throws DdlException {
         // get load job and check state
         Database db = Catalog.getCurrentCatalog().getDb(dbName);
         if (db == null) {
@@ -1566,8 +1566,19 @@ public class Load {
             if (labelToLoadJobs == null) {
                 return false;
             }
-            List<LoadJob> loadJobs = labelToLoadJobs.get(label);
-            if (loadJobs == null) {
+            List<LoadJob> loadJobs = Lists.newArrayList();
+            if (isAccurateMatch) {
+                if (labelToLoadJobs.containsKey(labelValue)) {
+                    loadJobs.addAll(labelToLoadJobs.get(labelValue));
+                }
+            } else {
+                for (Map.Entry<String, List<LoadJob>> entry : 
labelToLoadJobs.entrySet()) {
+                    if (entry.getKey().contains(labelValue)) {
+                        loadJobs.addAll(entry.getValue());
+                    }
+                }
+            }
+            if (loadJobs.isEmpty()) {
                 return false;
             }
             if (loadJobs.stream().filter(entity -> entity.getState() != 
JobState.CANCELLED).count() == 0) {
@@ -1579,6 +1590,93 @@ public class Load {
         }
     }
 
+    public boolean cancelLoadJob(CancelLoadStmt stmt, boolean isAccurateMatch) 
throws DdlException {
+        // get params
+        String dbName = stmt.getDbName();
+        String label = stmt.getLabel();
+
+        // get load job and check state
+        Database db = Catalog.getCurrentCatalog().getDb(dbName);
+        if (db == null) {
+            throw new DdlException("Db does not exist. name: " + dbName);
+        }
+        // List of load jobs waiting to be cancelled
+        List<LoadJob> loadJobs = Lists.newArrayList();
+        readLock();
+        try {
+            Map<String, List<LoadJob>> labelToLoadJobs = 
dbLabelToLoadJobs.get(db.getId());
+            if (labelToLoadJobs == null) {
+                throw new DdlException("Load job does not exist");
+            }
+
+            // get jobs by label
+            List<LoadJob> matchLoadJobs = Lists.newArrayList();
+            if (isAccurateMatch) {
+                if (labelToLoadJobs.containsKey(label)) {
+                    matchLoadJobs.addAll(labelToLoadJobs.get(label));
+                }
+            } else {
+                for (Map.Entry<String, List<LoadJob>> entry : 
labelToLoadJobs.entrySet()) {
+                    if (entry.getKey().contains(label)) {
+                        matchLoadJobs.addAll(entry.getValue());
+                    }
+                }
+            }
+
+            if (matchLoadJobs.isEmpty()) {
+                throw new DdlException("Load job does not exist");
+            }
+
+            // check state here
+            List<LoadJob> uncompletedLoadJob = 
matchLoadJobs.stream().filter(job -> {
+                JobState state = job.getState();
+                return state != JobState.CANCELLED && state != 
JobState.QUORUM_FINISHED && state != JobState.FINISHED;
+            }).collect(Collectors.toList());
+            if (uncompletedLoadJob.isEmpty()) {
+                throw new DdlException("There is no uncompleted job which 
label " +
+                        (isAccurateMatch ? "is " : "like ") + stmt.getLabel());
+            }
+            loadJobs.addAll(uncompletedLoadJob);
+        } finally {
+            readUnlock();
+        }
+
+        // check auth here, cause we need table info
+        Set<String> tableNames = Sets.newHashSet();
+        for (LoadJob loadJob : loadJobs) {
+            tableNames.addAll(loadJob.getTableNames());
+        }
+
+        if (tableNames.isEmpty()) {
+            if 
(Catalog.getCurrentCatalog().getAuth().checkDbPriv(ConnectContext.get(), dbName,
+                    PrivPredicate.LOAD)) {
+                
ErrorReport.reportDdlException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, 
"CANCEL LOAD");
+            }
+        } else {
+            for (String tblName : tableNames) {
+                if 
(!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), 
dbName, tblName,
+                        PrivPredicate.LOAD)) {
+                    
ErrorReport.reportDdlException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "CANCEL 
LOAD",
+                            ConnectContext.get().getQualifiedUser(),
+                            ConnectContext.get().getRemoteIP(), tblName);
+                }
+            }
+        }
+
+        // cancel job
+        for (LoadJob loadJob : loadJobs) {
+            List<String> failedMsg = Lists.newArrayList();
+            boolean ok = cancelLoadJob(loadJob, CancelType.USER_CANCEL, "user 
cancel", failedMsg);
+            if (!ok) {
+                throw new DdlException("Cancel load job [" + loadJob.getId() + 
"] fail, " +
+                        "label=[" + loadJob.getLabel() + "] failed msg=" +
+                        (failedMsg.isEmpty() ? "Unknown reason" : 
failedMsg.get(0)));
+            }
+        }
+
+        return true;
+    }
+
     public boolean cancelLoadJob(CancelLoadStmt stmt) throws DdlException {
         // get params
         String dbName = stmt.getDbName();
@@ -1618,16 +1716,16 @@ public class Load {
         if (tableNames.isEmpty()) {
             // forward compatibility
             if 
(!Catalog.getCurrentCatalog().getAuth().checkDbPriv(ConnectContext.get(), 
dbName,
-                                                                   
PrivPredicate.LOAD)) {
+                    PrivPredicate.LOAD)) {
                 
ErrorReport.reportDdlException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, 
"CANCEL LOAD");
             }
         } else {
             for (String tblName : tableNames) {
                 if 
(!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), 
dbName, tblName,
-                                                                        
PrivPredicate.LOAD)) {
+                        PrivPredicate.LOAD)) {
                     
ErrorReport.reportDdlException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "CANCEL 
LOAD",
-                                                   
ConnectContext.get().getQualifiedUser(),
-                                                   
ConnectContext.get().getRemoteIP(), tblName);
+                            ConnectContext.get().getQualifiedUser(),
+                            ConnectContext.get().getRemoteIP(), tblName);
                 }
             }
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
index 2321a9d..8b42ab7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
@@ -284,6 +284,62 @@ public class LoadManager implements Writable{
         Catalog.getCurrentCatalog().getEditLog().logCreateLoadJob(loadJob);
     }
 
+    public void cancelLoadJob(CancelLoadStmt stmt, boolean isAccurateMatch) 
throws DdlException {
+        Database db = Catalog.getCurrentCatalog().getDb(stmt.getDbName());
+        if (db == null) {
+            throw new DdlException("Db does not exist. name: " + 
stmt.getDbName());
+        }
+
+        // List of load jobs waiting to be cancelled
+        List<LoadJob> loadJobs = Lists.newArrayList();
+        readLock();
+        try {
+            Map<String, List<LoadJob>> labelToLoadJobs = 
dbIdToLabelToLoadJobs.get(db.getId());
+            if (labelToLoadJobs == null) {
+                throw new DdlException("Load job does not exist");
+            }
+
+            // get jobs by label
+            List<LoadJob> matchLoadJobs = Lists.newArrayList();
+            if (isAccurateMatch) {
+                if (labelToLoadJobs.containsKey(stmt.getLabel())) {
+                    matchLoadJobs.addAll(labelToLoadJobs.get(stmt.getLabel()));
+                }
+            } else {
+                for (Map.Entry<String, List<LoadJob>> entry : 
labelToLoadJobs.entrySet()) {
+                    if (entry.getKey().contains(stmt.getLabel())) {
+                        matchLoadJobs.addAll(entry.getValue());
+                    }
+                }
+            }
+
+            if (matchLoadJobs.isEmpty()) {
+                throw new DdlException("Load job does not exist");
+            }
+
+            // check state here
+            List<LoadJob> uncompletedLoadJob = 
matchLoadJobs.stream().filter(entity -> !entity.isTxnDone())
+                    .collect(Collectors.toList());
+            if (uncompletedLoadJob.isEmpty()) {
+                throw new DdlException("There is no uncompleted job which 
label " +
+                        (isAccurateMatch ? "is " : "like ") + stmt.getLabel());
+            }
+
+            loadJobs.addAll(uncompletedLoadJob);
+        } finally {
+            readUnlock();
+        }
+
+        for (LoadJob loadJob : loadJobs) {
+            try {
+                loadJob.cancelJob(new FailMsg(FailMsg.CancelType.USER_CANCEL, 
"user cancel"));
+            } catch (DdlException e) {
+                throw new DdlException("Cancel load job [" + loadJob.getId() + 
"] fail, " +
+                        "label=[" + loadJob.getLabel() + "] failed msg=" + 
e.getMessage());
+            }
+        }
+    }
+
     public void cancelLoadJob(CancelLoadStmt stmt) throws DdlException {
         Database db = Catalog.getCurrentCatalog().getDb(stmt.getDbName());
         if (db == null) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
index 60b9aae..1f1a2ec 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
@@ -135,11 +135,17 @@ public class DdlExecutor {
                 catalog.getLoadManager().createLoadJobFromStmt(loadStmt);
             }
         } else if (ddlStmt instanceof CancelLoadStmt) {
-            if (catalog.getLoadInstance().isLabelExist(
-                    ((CancelLoadStmt) ddlStmt).getDbName(), ((CancelLoadStmt) 
ddlStmt).getLabel())) {
-                catalog.getLoadInstance().cancelLoadJob((CancelLoadStmt) 
ddlStmt);
-            } else {
-                catalog.getLoadManager().cancelLoadJob((CancelLoadStmt) 
ddlStmt);
+            boolean isAccurateMatch = ((CancelLoadStmt) 
ddlStmt).isAccurateMatch();
+            boolean isLabelExist = catalog.getLoadInstance().isLabelExist(
+                    ((CancelLoadStmt) ddlStmt).getDbName(),
+                    ((CancelLoadStmt) ddlStmt).getLabel(), isAccurateMatch);
+            if (isLabelExist) {
+                catalog.getLoadInstance().cancelLoadJob((CancelLoadStmt) 
ddlStmt,
+                        isAccurateMatch);
+            }
+            if (!isLabelExist || isAccurateMatch) {
+                catalog.getLoadManager().cancelLoadJob((CancelLoadStmt) 
ddlStmt,
+                        isAccurateMatch);
             }
         } else if (ddlStmt instanceof CreateRoutineLoadStmt) {
             
catalog.getRoutineLoadManager().createRoutineLoadJob((CreateRoutineLoadStmt) 
ddlStmt);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelLoadStmtTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelLoadStmtTest.java
new file mode 100644
index 0000000..5104b71
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelLoadStmtTest.java
@@ -0,0 +1,88 @@
+package org.apache.doris.analysis;
+
+import mockit.Expectations;
+
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.FakeCatalog;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.UserException;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CancelLoadStmtTest {
+    private Analyzer analyzer;
+    private Catalog catalog;
+
+    FakeCatalog fakeCatalog;
+
+    @Before
+    public void setUp() {
+        fakeCatalog = new FakeCatalog();
+
+        catalog = AccessTestUtil.fetchAdminCatalog();
+        FakeCatalog.setCatalog(catalog);
+
+        analyzer = AccessTestUtil.fetchAdminAnalyzer(true);
+        new Expectations(analyzer) {
+            {
+                analyzer.getDefaultDb();
+                minTimes = 0;
+                result = "testCluster:testDb";
+
+                analyzer.getQualifiedUser();
+                minTimes = 0;
+                result = "testCluster:testUser";
+
+                analyzer.getClusterName();
+                minTimes = 0;
+                result = "testCluster";
+
+                analyzer.getCatalog();
+                minTimes = 0;
+                result = catalog;
+            }
+        };
+    }
+
+    @Test
+    public void testNormal() throws UserException, AnalysisException {
+        SlotRef slotRef = new SlotRef(null, "label");
+        StringLiteral stringLiteral = new StringLiteral("doris_test_label");
+
+        BinaryPredicate binaryPredicate = new 
BinaryPredicate(BinaryPredicate.Operator.EQ, slotRef, stringLiteral);
+        CancelLoadStmt stmt = new CancelLoadStmt(null, binaryPredicate);
+        stmt.analyze(analyzer);
+        Assert.assertTrue(stmt.isAccurateMatch());
+        Assert.assertEquals("CANCEL LOAD FROM testCluster:testDb WHERE `label` 
= 'doris_test_label'", stmt.toString());
+
+        LikePredicate likePredicate = new 
LikePredicate(LikePredicate.Operator.LIKE, slotRef, stringLiteral);
+        stmt = new CancelLoadStmt(null, likePredicate);
+        stmt.analyze(analyzer);
+        Assert.assertFalse(stmt.isAccurateMatch());
+        Assert.assertEquals("CANCEL LOAD FROM testCluster:testDb WHERE `label` 
LIKE 'doris_test_label'", stmt.toString());
+    }
+
+    @Test(expected = AnalysisException.class)
+    public void testNoDb() throws UserException, AnalysisException {
+        SlotRef slotRef = new SlotRef(null, "label");
+        StringLiteral stringLiteral = new StringLiteral("doris_test_label");
+        new Expectations(analyzer) {
+            {
+                analyzer.getDefaultDb();
+                minTimes = 0;
+                result = "";
+
+                analyzer.getClusterName();
+                minTimes = 0;
+                result = "testCluster";
+            }
+        };
+
+        BinaryPredicate binaryPredicate = new 
BinaryPredicate(BinaryPredicate.Operator.EQ, slotRef, stringLiteral);
+        CancelLoadStmt stmt = new CancelLoadStmt(null, binaryPredicate);
+        stmt.analyze(analyzer);
+        Assert.fail("No exception throws.");
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to