This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 10abbd2b62 [Feauture](Export) support parallel export job using Job
Schedule (#22854)
10abbd2b62 is described below
commit 10abbd2b620cf4a39df734d8471f87ea1d58f906
Author: Tiewei Fang <[email protected]>
AuthorDate: Fri Aug 18 22:24:42 2023 +0800
[Feauture](Export) support parallel export job using Job Schedule (#22854)
---
.../main/java/org/apache/doris/common/Config.java | 5 +
.../apache/doris/analysis/CancelExportStmt.java | 12 +-
.../java/org/apache/doris/analysis/ExportStmt.java | 230 ++++---
.../org/apache/doris/analysis/OutFileClause.java | 2 +-
.../org/apache/doris/analysis/ShowExportStmt.java | 8 +-
.../main/java/org/apache/doris/catalog/Env.java | 2 +-
.../org/apache/doris/common/proc/JobsProcDir.java | 18 +-
.../org/apache/doris/journal/JournalEntity.java | 3 +-
.../main/java/org/apache/doris/load/ExportJob.java | 702 +++++++++------------
.../java/org/apache/doris/load/ExportJobState.java | 46 ++
.../apache/doris/load/ExportJobStateTransfer.java | 88 +++
.../main/java/org/apache/doris/load/ExportMgr.java | 110 ++--
.../org/apache/doris/load/ExportTaskExecutor.java | 171 +++++
.../java/org/apache/doris/load/OutfileInfo.java | 37 ++
.../java/org/apache/doris/persist/EditLog.java | 8 +-
.../java/org/apache/doris/qe/ShowExecutor.java | 6 +-
.../java/org/apache/doris/qe/StmtExecutor.java | 3 +-
.../org/apache/doris/task/ExportExportingTask.java | 24 +-
.../doris/analysis/CancelExportStmtTest.java | 16 +-
.../org/apache/doris/qe/SessionVariablesTest.java | 13 +-
.../suites/export_p0/test_export_basic.groovy | 3 +-
.../suites/export_p2/test_export_with_hdfs.groovy | 8 +-
.../suites/export_p2/test_export_with_s3.groovy | 9 +-
23 files changed, 906 insertions(+), 618 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 2819439c15..c2dc625829 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2102,6 +2102,11 @@ public class Config extends ConfigBase {
"The maximum parallelism allowed by Export job"})
public static int maximum_parallelism_of_export_job = 50;
+ @ConfField(mutable = true, description = {
+ "ExportExecutorTask任务中一个OutFile语句允许的最大tablets数量",
+ "The maximum number of tablets allowed by an OutfileStatement in
an ExportExecutorTask"})
+ public static int maximum_tablets_of_outfile_in_export = 10;
+
@ConfField(mutable = true, description = {
"是否用 mysql 的 bigint 类型来返回 Doris 的 largeint 类型",
"Whether to use mysql's bigint type to return Doris's largeint
type"})
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelExportStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelExportStmt.java
index ca55761420..cdd2ecf73c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelExportStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelExportStmt.java
@@ -21,8 +21,7 @@ import org.apache.doris.analysis.BinaryPredicate.Operator;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
-import org.apache.doris.load.ExportJob;
-import org.apache.doris.load.ExportJob.JobState;
+import org.apache.doris.load.ExportJobState;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
@@ -83,11 +82,10 @@ public class CancelExportStmt extends DdlStmt {
throw new AnalysisException("Only label can use like");
}
state = inputValue;
- ExportJob.JobState jobState = ExportJob.JobState.valueOf(state);
- if (jobState != ExportJob.JobState.PENDING
- && jobState != JobState.IN_QUEUE
- && jobState != ExportJob.JobState.EXPORTING) {
- throw new AnalysisException("Only support
PENDING/IN_QUEUE/EXPORTING, invalid state: " + state);
+ ExportJobState jobState = ExportJobState.valueOf(state);
+ if (jobState != ExportJobState.PENDING
+ && jobState != ExportJobState.EXPORTING) {
+ throw new AnalysisException("Only support PENDING/EXPORTING,
invalid state: " + state);
}
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
index 930c3121b9..69f4d7174f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
@@ -17,13 +17,13 @@
package org.apache.doris.analysis;
+import org.apache.doris.catalog.BrokerMgr;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.FsBroker;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.AnalysisException;
-import org.apache.doris.common.DdlException;
+import org.apache.doris.common.Config;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeNameFormat;
@@ -32,6 +32,7 @@ import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.URI;
import org.apache.doris.common.util.Util;
+import org.apache.doris.load.ExportJob;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
@@ -39,25 +40,26 @@ import org.apache.doris.qe.VariableMgr;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
+import lombok.Getter;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.UUID;
// EXPORT statement, export data to dirs by broker.
//
// syntax:
-// EXPORT TABLE tablename [PARTITION (name1[, ...])]
+// EXPORT TABLE table_name [PARTITION (name1[, ...])]
// TO 'export_target_path'
// [PROPERTIES("key"="value")]
// BY BROKER 'broker_name' [( $broker_attrs)]
+@Getter
public class ExportStmt extends StatementBase {
- private static final Logger LOG = LogManager.getLogger(ExportStmt.class);
public static final String PARALLELISM = "parallelism";
public static final String LABEL = "label";
@@ -106,6 +108,8 @@ public class ExportStmt extends StatementBase {
private UserIdentity userIdentity;
+ private ExportJob exportJob;
+
public ExportStmt(TableRef tableRef, Expr whereExpr, String path,
Map<String, String> properties, BrokerDesc brokerDesc) {
this.tableRef = tableRef;
@@ -118,75 +122,15 @@ public class ExportStmt extends StatementBase {
this.columnSeparator = DEFAULT_COLUMN_SEPARATOR;
this.lineDelimiter = DEFAULT_LINE_DELIMITER;
this.columns = DEFAULT_COLUMNS;
- if (ConnectContext.get() != null) {
- this.sessionVariables = ConnectContext.get().getSessionVariable();
- } else {
- this.sessionVariables = VariableMgr.getDefaultSessionVariable();
- }
- }
-
- public String getColumns() {
- return columns;
- }
-
- public TableName getTblName() {
- return tblName;
- }
-
- public List<String> getPartitions() {
- return partitionStringNames;
- }
-
- public Expr getWhereExpr() {
- return whereExpr;
- }
-
- public String getPath() {
- return path;
- }
-
- public BrokerDesc getBrokerDesc() {
- return brokerDesc;
- }
-
- public String getColumnSeparator() {
- return this.columnSeparator;
- }
-
- public String getLineDelimiter() {
- return this.lineDelimiter;
- }
-
- public TableRef getTableRef() {
- return this.tableRef;
- }
-
- public String getFormat() {
- return format;
- }
-
- public String getLabel() {
- return label;
- }
-
- public SessionVariable getSessionVariables() {
- return sessionVariables;
- }
- public String getQualifiedUser() {
- return qualifiedUser;
- }
-
- public UserIdentity getUserIdentity() {
- return this.userIdentity;
+ Optional<SessionVariable> optionalSessionVariable =
Optional.ofNullable(
+ ConnectContext.get().getSessionVariable());
+ this.sessionVariables =
optionalSessionVariable.orElse(VariableMgr.getDefaultSessionVariable());
}
@Override
public boolean needAuditEncryption() {
- if (brokerDesc != null) {
- return true;
- }
- return false;
+ return brokerDesc != null;
}
@Override
@@ -197,16 +141,17 @@ public class ExportStmt extends StatementBase {
Preconditions.checkNotNull(tableRef);
tableRef.analyze(analyzer);
- this.tblName = tableRef.getName();
// disallow external catalog
+ tblName = tableRef.getName();
Util.prohibitExternalCatalog(tblName.getCtl(),
this.getClass().getSimpleName());
- PartitionNames partitionNames = tableRef.getPartitionNames();
- if (partitionNames != null) {
- if (partitionNames.isTemp()) {
+ // get partitions name
+ Optional<PartitionNames> optionalPartitionNames =
Optional.ofNullable(tableRef.getPartitionNames());
+ if (optionalPartitionNames.isPresent()) {
+ if (optionalPartitionNames.get().isTemp()) {
throw new AnalysisException("Do not support exporting
temporary partitions");
}
- partitionStringNames = partitionNames.getPartitionNames();
+ partitionStringNames =
optionalPartitionNames.get().getPartitionNames();
}
// check auth
@@ -222,7 +167,7 @@ public class ExportStmt extends StatementBase {
userIdentity = ConnectContext.get().getCurrentUserIdentity();
// check table && partitions whether exist
- checkTable(analyzer.getEnv());
+ checkPartitions(analyzer.getEnv());
// check broker whether exist
if (brokerDesc == null) {
@@ -232,28 +177,86 @@ public class ExportStmt extends StatementBase {
// check path is valid
path = checkPath(path, brokerDesc.getStorageType());
if (brokerDesc.getStorageType() == StorageBackend.StorageType.BROKER) {
- if
(!analyzer.getEnv().getBrokerMgr().containsBroker(brokerDesc.getName())) {
+ BrokerMgr brokerMgr = analyzer.getEnv().getBrokerMgr();
+ if (!brokerMgr.containsBroker(brokerDesc.getName())) {
throw new AnalysisException("broker " + brokerDesc.getName() +
" does not exist");
}
-
- FsBroker broker =
analyzer.getEnv().getBrokerMgr().getAnyBroker(brokerDesc.getName());
- if (broker == null) {
+ if (null == brokerMgr.getAnyBroker(brokerDesc.getName())) {
throw new AnalysisException("failed to get alive broker");
}
}
// check properties
checkProperties(properties);
+
+ // create job and analyze job
+ setJob();
+ exportJob.analyze();
}
- private void checkTable(Env env) throws AnalysisException {
+ private void setJob() throws UserException {
+ exportJob = new ExportJob();
+
+ Database db =
Env.getCurrentInternalCatalog().getDbOrDdlException(this.tblName.getDb());
+ exportJob.setDbId(db.getId());
+ exportJob.setTableName(this.tblName);
+
exportJob.setExportTable(db.getTableOrDdlException(this.tblName.getTbl()));
+
exportJob.setTableId(db.getTableOrDdlException(this.tblName.getTbl()).getId());
+ exportJob.setTableRef(this.tableRef);
+
+ // set partitions
+ exportJob.setPartitionNames(this.partitionStringNames);
+
+ // set where expr
+ exportJob.setWhereExpr(this.whereExpr);
+
+ // set path
+ exportJob.setExportPath(this.path);
+
+ // set properties
+ exportJob.setLabel(this.label);
+ exportJob.setColumnSeparator(this.columnSeparator);
+ exportJob.setLineDelimiter(this.lineDelimiter);
+ exportJob.setFormat(this.format);
+ exportJob.setColumns(this.columns);
+ exportJob.setParallelism(this.parallelism);
+ exportJob.setMaxFileSize(this.maxFileSize);
+ exportJob.setDeleteExistingFiles(this.deleteExistingFiles);
+
+ if (!Strings.isNullOrEmpty(this.columns)) {
+ Splitter split = Splitter.on(',').trimResults().omitEmptyStrings();
+
exportJob.setExportColumns(split.splitToList(this.columns.toLowerCase()));
+ }
+
+ // set broker desc
+ exportJob.setBrokerDesc(this.brokerDesc);
+
+ // set sessions
+ exportJob.setQualifiedUser(this.qualifiedUser);
+ exportJob.setUserIdentity(this.userIdentity);
+ exportJob.setSessionVariables(this.sessionVariables);
+ exportJob.setTimeoutSecond(this.sessionVariables.getQueryTimeoutS());
+
+ exportJob.setSql(this.toSql());
+ exportJob.setOrigStmt(this.getOrigStmt());
+ }
+
+ // check partitions specified by user are belonged to the table.
+ private void checkPartitions(Env env) throws AnalysisException {
+ if (partitionStringNames == null) {
+ return;
+ }
+
+ if (partitionStringNames.size() >
Config.maximum_number_of_export_partitions) {
+ throw new AnalysisException("The partitions number of this export
job is larger than the maximum number"
+ + " of partitions allowed by an export job");
+ }
+
Database db =
env.getInternalCatalog().getDbOrAnalysisException(tblName.getDb());
Table table = db.getTableOrAnalysisException(tblName.getTbl());
table.readLock();
try {
- if (partitionStringNames == null) {
- return;
- }
+ // check table
if (!table.isPartitioned()) {
throw new AnalysisException("Table[" + tblName.getTbl() + "]
is not partitioned.");
}
@@ -270,13 +273,14 @@ public class ExportStmt extends StatementBase {
case VIEW:
default:
throw new AnalysisException("Table[" + tblName.getTbl() +
"] is "
- + tblType.toString() + " type, do not support
EXPORT.");
+ + tblType + " type, do not support EXPORT.");
}
for (String partitionName : partitionStringNames) {
Partition partition = table.getPartition(partitionName);
if (partition == null) {
- throw new AnalysisException("Partition [" + partitionName
+ "] does not exist");
+ throw new AnalysisException("Partition [" + partitionName
+ "] does not exist "
+ + "in Table[" + tblName.getTbl() + "]");
}
}
} finally {
@@ -286,13 +290,17 @@ public class ExportStmt extends StatementBase {
public static String checkPath(String path, StorageBackend.StorageType
type) throws AnalysisException {
if (Strings.isNullOrEmpty(path)) {
- throw new AnalysisException("No dest path specified.");
+ throw new AnalysisException("No destination path specified.");
}
URI uri = URI.create(path);
String schema = uri.getScheme();
+ if (schema == null) {
+ throw new AnalysisException(
+ "Invalid export path, there is no schema of URI found.
please check your path.");
+ }
if (type == StorageBackend.StorageType.BROKER) {
- if (schema == null || (!schema.equalsIgnoreCase("bos")
+ if (!schema.equalsIgnoreCase("bos")
&& !schema.equalsIgnoreCase("afs")
&& !schema.equalsIgnoreCase("hdfs")
&& !schema.equalsIgnoreCase("ofs")
@@ -302,23 +310,17 @@ public class ExportStmt extends StatementBase {
&& !schema.equalsIgnoreCase("cosn")
&& !schema.equalsIgnoreCase("gfs")
&& !schema.equalsIgnoreCase("jfs")
- && !schema.equalsIgnoreCase("gs"))) {
+ && !schema.equalsIgnoreCase("gs")) {
throw new AnalysisException("Invalid broker path. please use
valid 'hdfs://', 'afs://' , 'bos://',"
+ " 'ofs://', 'obs://', 'oss://', 's3a://', 'cosn://',
'gfs://', 'gs://' or 'jfs://' path.");
}
- } else if (type == StorageBackend.StorageType.S3) {
- if (schema == null || !schema.equalsIgnoreCase("s3")) {
- throw new AnalysisException("Invalid export path. please use
valid 's3://' path.");
- }
- } else if (type == StorageBackend.StorageType.HDFS) {
- if (schema == null || !schema.equalsIgnoreCase("hdfs")) {
- throw new AnalysisException("Invalid export path. please use
valid 'HDFS://' path.");
- }
- } else if (type == StorageBackend.StorageType.LOCAL) {
- if (schema != null && !schema.equalsIgnoreCase("file")) {
- throw new AnalysisException(
+ } else if (type == StorageBackend.StorageType.S3 &&
!schema.equalsIgnoreCase("s3")) {
+ throw new AnalysisException("Invalid export path. please use valid
's3://' path.");
+ } else if (type == StorageBackend.StorageType.HDFS &&
!schema.equalsIgnoreCase("hdfs")) {
+ throw new AnalysisException("Invalid export path. please use valid
'HDFS://' path.");
+ } else if (type == StorageBackend.StorageType.LOCAL &&
!schema.equalsIgnoreCase("file")) {
+ throw new AnalysisException(
"Invalid export path. please use valid '" +
OutFileClause.LOCAL_FILE_PREFIX + "' path.");
- }
}
return path;
}
@@ -326,7 +328,7 @@ public class ExportStmt extends StatementBase {
private void checkProperties(Map<String, String> properties) throws
UserException {
for (String key : properties.keySet()) {
if (!PROPERTIES_SET.contains(key.toLowerCase())) {
- throw new DdlException("Invalid property key: '" + key + "'");
+ throw new UserException("Invalid property key: [" + key + "]");
}
}
@@ -348,20 +350,24 @@ public class ExportStmt extends StatementBase {
// parallelism
String parallelismString = properties.getOrDefault(PARALLELISM,
DEFAULT_PARALLELISM);
- parallelism = Integer.parseInt(parallelismString);
+ try {
+ this.parallelism = Integer.parseInt(parallelismString);
+ } catch (NumberFormatException e) {
+ throw new UserException("The value of parallelism is invalid!");
+ }
// max_file_size
this.maxFileSize =
properties.getOrDefault(OutFileClause.PROP_MAX_FILE_SIZE, "");
this.deleteExistingFiles =
properties.getOrDefault(OutFileClause.PROP_DELETE_EXISTING_FILES, "");
+ // label
if (properties.containsKey(LABEL)) {
FeNameFormat.checkLabel(properties.get(LABEL));
+ this.label = properties.get(LABEL);
} else {
// generate a random label
- String label = "export_" + UUID.randomUUID();
- properties.put(LABEL, label);
+ this.label = "export_" + UUID.randomUUID();
}
- label = properties.get(LABEL);
}
@Override
@@ -408,16 +414,4 @@ public class ExportStmt extends StatementBase {
public String toString() {
return toSql();
}
-
- public String getMaxFileSize() {
- return maxFileSize;
- }
-
- public String getDeleteExistingFiles() {
- return deleteExistingFiles;
- }
-
- public Integer getParallelNum() {
- return parallelism;
- }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
index 24ed977a7d..d2f760d5b3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
@@ -255,7 +255,7 @@ public class OutFileClause {
if (brokerDesc != null && isLocalOutput) {
throw new AnalysisException("No need to specify BROKER properties
in OUTFILE clause for local file output");
} else if (brokerDesc == null && !isLocalOutput) {
- throw new AnalysisException("Must specify BROKER properties in
OUTFILE clause");
+ throw new AnalysisException("Must specify BROKER properties or
current local file path in OUTFILE clause");
}
isAnalyzed = true;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowExportStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowExportStmt.java
index d826da8340..eb2b548b92 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowExportStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowExportStmt.java
@@ -27,7 +27,7 @@ import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.common.proc.ExportProcNode;
import org.apache.doris.common.util.OrderByPair;
-import org.apache.doris.load.ExportJob.JobState;
+import org.apache.doris.load.ExportJobState;
import org.apache.doris.qe.ShowResultSetMetaData;
import com.google.common.base.Strings;
@@ -54,7 +54,7 @@ public class ShowExportStmt extends ShowStmt {
private boolean isLabelUseLike = false;
private String stateValue = null;
- private JobState jobState;
+ private ExportJobState jobState;
private ArrayList<OrderByPair> orderByPairs;
@@ -84,7 +84,7 @@ public class ShowExportStmt extends ShowStmt {
return this.jobId;
}
- public JobState getJobState() {
+ public ExportJobState getJobState() {
if (Strings.isNullOrEmpty(stateValue)) {
return null;
}
@@ -152,7 +152,7 @@ public class ShowExportStmt extends ShowStmt {
if (!Strings.isNullOrEmpty(value)) {
stateValue = value.toUpperCase();
try {
- jobState = JobState.valueOf(stateValue);
+ jobState = ExportJobState.valueOf(stateValue);
valid = true;
} catch (IllegalArgumentException e) {
LOG.warn("illegal state argument in export stmt.
stateValue={}, error={}", stateValue, e);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 0611016964..041db9b430 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -3755,7 +3755,7 @@ public class Env {
return timerJobManager;
}
- public TransientTaskManager getMemoryTaskManager() {
+ public TransientTaskManager getTransientTaskManager() {
return transientTaskManager;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsProcDir.java
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsProcDir.java
index b81561399d..781ee38d52 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsProcDir.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/JobsProcDir.java
@@ -22,7 +22,7 @@ import org.apache.doris.alter.SchemaChangeHandler;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
-import org.apache.doris.load.ExportJob;
+import org.apache.doris.load.ExportJobState;
import org.apache.doris.load.ExportMgr;
import org.apache.doris.load.loadv2.LoadManager;
@@ -147,10 +147,10 @@ public class JobsProcDir implements ProcDirInterface {
// export
ExportMgr exportMgr = Env.getCurrentEnv().getExportMgr();
- pendingNum = exportMgr.getJobNum(ExportJob.JobState.PENDING, dbId);
- runningNum = exportMgr.getJobNum(ExportJob.JobState.EXPORTING, dbId);
- finishedNum = exportMgr.getJobNum(ExportJob.JobState.FINISHED, dbId);
- cancelledNum = exportMgr.getJobNum(ExportJob.JobState.CANCELLED, dbId);
+ pendingNum = exportMgr.getJobNum(ExportJobState.PENDING, dbId);
+ runningNum = exportMgr.getJobNum(ExportJobState.EXPORTING, dbId);
+ finishedNum = exportMgr.getJobNum(ExportJobState.FINISHED, dbId);
+ cancelledNum = exportMgr.getJobNum(ExportJobState.CANCELLED, dbId);
totalNum = pendingNum + runningNum + finishedNum + cancelledNum;
result.addRow(Lists.newArrayList(EXPORT, pendingNum.toString(),
runningNum.toString(), finishedNum.toString(),
cancelledNum.toString(), totalNum.toString()));
@@ -209,10 +209,10 @@ public class JobsProcDir implements ProcDirInterface {
// export
ExportMgr exportMgr = Env.getCurrentEnv().getExportMgr();
- pendingNum = exportMgr.getJobNum(ExportJob.JobState.PENDING);
- runningNum = exportMgr.getJobNum(ExportJob.JobState.EXPORTING);
- finishedNum = exportMgr.getJobNum(ExportJob.JobState.FINISHED);
- cancelledNum = exportMgr.getJobNum(ExportJob.JobState.CANCELLED);
+ pendingNum = exportMgr.getJobNum(ExportJobState.PENDING);
+ runningNum = exportMgr.getJobNum(ExportJobState.EXPORTING);
+ finishedNum = exportMgr.getJobNum(ExportJobState.FINISHED);
+ cancelledNum = exportMgr.getJobNum(ExportJobState.CANCELLED);
totalNum = pendingNum + runningNum + finishedNum + cancelledNum;
result.addRow(Lists.newArrayList(EXPORT, pendingNum.toString(),
runningNum.toString(), finishedNum.toString(),
cancelledNum.toString(), totalNum.toString()));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
index e155751d5b..d8b8c62bd5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -47,6 +47,7 @@ import org.apache.doris.ha.MasterInfo;
import org.apache.doris.journal.bdbje.Timestamp;
import org.apache.doris.load.DeleteInfo;
import org.apache.doris.load.ExportJob;
+import org.apache.doris.load.ExportJobStateTransfer;
import org.apache.doris.load.LoadErrorHub;
import org.apache.doris.load.LoadJob;
import org.apache.doris.load.StreamLoadRecordMgr.FetchStreamLoadRecord;
@@ -319,7 +320,7 @@ public class JournalEntity implements Writable {
isRead = true;
break;
case OperationType.OP_EXPORT_UPDATE_STATE:
- data = ExportJob.StateTransfer.read(in);
+ data = ExportJobStateTransfer.read(in);
isRead = true;
break;
case OperationType.OP_FINISH_DELETE: {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
index 1527ba1ce4..bd975bb6fe 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
@@ -48,17 +48,17 @@ import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
-import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.persist.gson.GsonUtils;
-import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.scheduler.exception.JobException;
+import org.apache.doris.scheduler.registry.ExportTaskRegister;
+import org.apache.doris.scheduler.registry.TransientTaskRegister;
import org.apache.doris.task.ExportExportingTask;
import org.apache.doris.thrift.TNetworkAddress;
-import org.apache.doris.thrift.TScanRangeLocations;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
@@ -66,7 +66,7 @@ import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
-import lombok.Getter;
+import lombok.Data;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -81,27 +81,21 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
-// NOTE: we must be carefully if we send next request
-// as soon as receiving one instance's report from one BE,
-// because we may change job's member concurrently.
+@Data
public class ExportJob implements Writable {
private static final Logger LOG = LogManager.getLogger(ExportJob.class);
private static final String BROKER_PROPERTY_PREFIXES = "broker.";
- public enum JobState {
- PENDING,
- IN_QUEUE,
- EXPORTING,
- FINISHED,
- CANCELLED,
- }
+ private static final int MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT =
Config.maximum_tablets_of_outfile_in_export;
+
+ public static final TransientTaskRegister register = new
ExportTaskRegister(
+ Env.getCurrentEnv().getTransientTaskManager());
@SerializedName("id")
private long id;
- @SerializedName("queryId")
- private String queryId;
@SerializedName("label")
private String label;
@SerializedName("dbId")
@@ -121,7 +115,7 @@ public class ExportJob implements Writable {
@SerializedName("tableName")
private TableName tableName;
@SerializedName("state")
- private JobState state;
+ private ExportJobState state;
@SerializedName("createTimeMs")
private long createTimeMs;
// this is the origin stmt of ExportStmt, we use it to persist where expr
of Export job,
@@ -153,15 +147,19 @@ public class ExportJob implements Writable {
// progress has two functions at EXPORTING stage:
// 1. when progress < 100, it indicates exporting
// 2. set progress = 100 ONLY when exporting progress is completely done
+ @SerializedName("progress")
private int progress;
+ @SerializedName("tabletsNum")
+ private Integer tabletsNum;
+
private TableRef tableRef;
private Expr whereExpr;
private String sql = "";
- private Integer parallelNum;
+ private Integer parallelism;
public Map<String, Long> getPartitionToVersion() {
return partitionToVersion;
@@ -170,9 +168,11 @@ public class ExportJob implements Writable {
private Map<String, Long> partitionToVersion = Maps.newHashMap();
// The selectStmt is sql 'select ... into outfile ...'
- @Getter
+ // TODO(ftw): delete
private List<SelectStmt> selectStmtList = Lists.newArrayList();
+ private List<List<SelectStmt>> selectStmtListPerParallel =
Lists.newArrayList();
+
private List<StmtExecutor> stmtExecutorList;
private List<String> exportColumns = Lists.newArrayList();
@@ -188,16 +188,21 @@ public class ExportJob implements Writable {
private ExportExportingTask task;
- private List<TScanRangeLocations> tabletLocations = Lists.newArrayList();
// backend_address => snapshot path
private List<Pair<TNetworkAddress, String>> snapshotPaths =
Lists.newArrayList();
+ private List<ExportTaskExecutor> jobExecutorList;
+
+ private ConcurrentHashMap<Long, ExportTaskExecutor> taskIdToExecutor = new
ConcurrentHashMap<>();
+
+ private Integer finishedTaskCount = 0;
+ private List<List<OutfileInfo>> allOutfileInfo = Lists.newArrayList();
+
public ExportJob() {
this.id = -1;
- this.queryId = "";
this.dbId = -1;
this.tableId = -1;
- this.state = JobState.PENDING;
+ this.state = ExportJobState.PENDING;
this.progress = 0;
this.createTimeMs = System.currentTimeMillis();
this.startTimeMs = -1;
@@ -215,55 +220,37 @@ public class ExportJob implements Writable {
this.id = jobId;
}
- public void setJob(ExportStmt stmt) throws UserException {
- String dbName = stmt.getTblName().getDb();
- Database db =
Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
- Preconditions.checkNotNull(stmt.getBrokerDesc());
- this.brokerDesc = stmt.getBrokerDesc();
- this.columnSeparator = stmt.getColumnSeparator();
- this.lineDelimiter = stmt.getLineDelimiter();
- this.label = stmt.getLabel();
- this.queryId = ConnectContext.get() != null ?
DebugUtil.printId(ConnectContext.get().queryId()) : "N/A";
- String path = stmt.getPath();
- Preconditions.checkArgument(!Strings.isNullOrEmpty(path));
- this.whereExpr = stmt.getWhereExpr();
- this.parallelNum = stmt.getParallelNum();
- this.exportPath = path;
- this.sessionVariables = stmt.getSessionVariables();
- this.timeoutSecond = sessionVariables.getQueryTimeoutS();
-
- this.qualifiedUser = stmt.getQualifiedUser();
- this.userIdentity = stmt.getUserIdentity();
- this.format = stmt.getFormat();
- this.maxFileSize = stmt.getMaxFileSize();
- this.deleteExistingFiles = stmt.getDeleteExistingFiles();
- this.partitionNames = stmt.getPartitions();
-
- this.exportTable =
db.getTableOrDdlException(stmt.getTblName().getTbl());
- this.columns = stmt.getColumns();
- this.tableRef = stmt.getTableRef();
- if (!Strings.isNullOrEmpty(this.columns)) {
- Splitter split = Splitter.on(',').trimResults().omitEmptyStrings();
- this.exportColumns =
split.splitToList(stmt.getColumns().toLowerCase());
- }
+ /**
+ * For an ExportJob:
+ * The ExportJob is divided into multiple 'ExportTaskExecutor'
+ * according to the 'parallelism' set by the user.
+ * The tablets which will be exported by this ExportJob are divided into
'parallelism' copies,
+ * and each ExportTaskExecutor is responsible for a list of tablets.
+ * The tablets responsible for an ExportTaskExecutor will be assigned to
multiple OutfileStmt
+ * according to the 'TABLETS_NUM_PER_OUTFILE_IN_EXPORT'.
+ *
+ * @throws UserException
+ */
+ public void analyze() throws UserException {
exportTable.readLock();
try {
- this.dbId = db.getId();
- this.tableId = exportTable.getId();
- this.tableName = stmt.getTblName();
- if (selectStmtList.isEmpty()) {
- // This scenario is used for 'EXPORT TABLE tbl INTO PATH'
- // we need generate Select Statement
- generateQueryStmt(stmt);
- }
+ // generateQueryStmtOld
+ generateQueryStmt();
} finally {
exportTable.readUnlock();
}
- this.sql = stmt.toSql();
- this.origStmt = stmt.getOrigStmt();
+ generateExportJobExecutor();
+ }
+
+ public void generateExportJobExecutor() {
+ jobExecutorList = Lists.newArrayList();
+ for (List<SelectStmt> selectStmts : selectStmtListPerParallel) {
+ ExportTaskExecutor executor = new ExportTaskExecutor(selectStmts,
this);
+ jobExecutorList.add(executor);
+ }
}
- private void generateQueryStmt(ExportStmt stmt) throws UserException {
+ private void generateQueryStmtOld() throws UserException {
SelectList list = new SelectList();
if (exportColumns.isEmpty()) {
list.addItem(SelectListItem.createStarItem(this.tableName));
@@ -278,7 +265,16 @@ public class ExportJob implements Writable {
}
}
- ArrayList<ArrayList<TableRef>> tableRefListPerQuery =
splitTablets(stmt);
+ ArrayList<ArrayList<Long>> tabletsListPerQuery = splitTablets();
+
+ ArrayList<ArrayList<TableRef>> tableRefListPerQuery =
Lists.newArrayList();
+ for (ArrayList<Long> tabletsList : tabletsListPerQuery) {
+ TableRef tblRef = new TableRef(this.tableRef.getName(),
this.tableRef.getAlias(), null, tabletsList,
+ this.tableRef.getTableSample(),
this.tableRef.getCommonHints());
+ ArrayList<TableRef> tableRefList = Lists.newArrayList();
+ tableRefList.add(tblRef);
+ tableRefListPerQuery.add(tableRefList);
+ }
LOG.info("Export task is split into {} outfile statements.",
tableRefListPerQuery.size());
if (LOG.isDebugEnabled()) {
@@ -306,30 +302,104 @@ public class ExportJob implements Writable {
}
}
- private ArrayList<ArrayList<TableRef>> splitTablets(ExportStmt stmt)
throws UserException {
+ /**
+ * Generate outfile select stmt
+ * @throws UserException
+ */
+ private void generateQueryStmt() throws UserException {
+ SelectList list = new SelectList();
+ if (exportColumns.isEmpty()) {
+ list.addItem(SelectListItem.createStarItem(this.tableName));
+ } else {
+ for (Column column : exportTable.getBaseSchema()) {
+ String colName = column.getName().toLowerCase();
+ if (exportColumns.contains(colName)) {
+ SlotRef slotRef = new SlotRef(this.tableName, colName);
+ SelectListItem selectListItem = new
SelectListItem(slotRef, null);
+ list.addItem(selectListItem);
+ }
+ }
+ }
+
+ ArrayList<ArrayList<TableRef>> tableRefListPerParallel =
getTableRefListPerParallel();
+ LOG.info("Export Job [{}] is split into {} Export Task Executor.", id,
tableRefListPerParallel.size());
+
+ // debug LOG output
+ if (LOG.isDebugEnabled()) {
+ for (int i = 0; i < tableRefListPerParallel.size(); i++) {
+ LOG.debug("ExportTaskExecutor {} is responsible for tablets:",
i);
+ for (TableRef tableRef : tableRefListPerParallel.get(i)) {
+ LOG.debug("Tablet id: [{}]",
tableRef.getSampleTabletIds());
+ }
+ }
+ }
+
+ // generate 'select..outfile..' statement
+ for (ArrayList<TableRef> tableRefList : tableRefListPerParallel) {
+ List<SelectStmt> selectStmtLists = Lists.newArrayList();
+ for (TableRef tableRef : tableRefList) {
+ ArrayList<TableRef> tmpTableRefList =
Lists.newArrayList(tableRef);
+ FromClause fromClause = new FromClause(tmpTableRefList);
+ // generate outfile clause
+ OutFileClause outfile = new OutFileClause(this.exportPath,
this.format, convertOutfileProperties());
+ SelectStmt selectStmt = new SelectStmt(list, fromClause,
this.whereExpr, null,
+ null, null, LimitElement.NO_LIMIT);
+ selectStmt.setOutFileClause(outfile);
+ selectStmt.setOrigStmt(new OriginStatement(selectStmt.toSql(),
0));
+ selectStmtLists.add(selectStmt);
+ }
+ selectStmtListPerParallel.add(selectStmtLists);
+ }
+
+ // debug LOG output
+ if (LOG.isDebugEnabled()) {
+ for (int i = 0; i < selectStmtListPerParallel.size(); ++i) {
+ LOG.debug("ExportTaskExecutor {} is responsible for outfile:",
i);
+ for (SelectStmt outfile : selectStmtListPerParallel.get(i)) {
+ LOG.debug("outfile sql: [{}]", outfile.toSql());
+ }
+ }
+ }
+ }
+
+ private ArrayList<ArrayList<TableRef>> getTableRefListPerParallel() throws
UserException {
+ ArrayList<ArrayList<Long>> tabletsListPerParallel = splitTablets();
+
+ ArrayList<ArrayList<TableRef>> tableRefListPerParallel =
Lists.newArrayList();
+ for (ArrayList<Long> tabletsList : tabletsListPerParallel) {
+ ArrayList<TableRef> tableRefList = Lists.newArrayList();
+ for (int i = 0; i < tabletsList.size(); i +=
MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT) {
+ int end = i + MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT <
tabletsList.size()
+ ? i + MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT :
tabletsList.size();
+ ArrayList<Long> tablets = new
ArrayList<>(tabletsList.subList(i, end));
+ TableRef tblRef = new TableRef(this.tableRef.getName(),
this.tableRef.getAlias(),
+ this.tableRef.getPartitionNames(), tablets,
+ this.tableRef.getTableSample(),
this.tableRef.getCommonHints());
+ tableRefList.add(tblRef);
+ }
+ tableRefListPerParallel.add(tableRefList);
+ }
+ return tableRefListPerParallel;
+ }
+
+ private ArrayList<ArrayList<Long>> splitTablets() throws UserException {
// get tablets
- Database db =
Env.getCurrentEnv().getInternalCatalog().getDbOrAnalysisException(stmt.getTblName().getDb());
- OlapTable table =
db.getOlapTableOrAnalysisException(stmt.getTblName().getTbl());
+ Database db =
Env.getCurrentEnv().getInternalCatalog().getDbOrAnalysisException(this.tableName.getDb());
+ OlapTable table =
db.getOlapTableOrAnalysisException(this.tableName.getTbl());
List<Long> tabletIdList = Lists.newArrayList();
table.readLock();
try {
- Collection<Partition> partitions = new ArrayList<Partition>();
+ final Collection<Partition> partitions = new
ArrayList<Partition>();
// get partitions
// user specifies partitions, already checked in ExportStmt
if (this.partitionNames != null) {
- if (partitionNames.size() >
Config.maximum_number_of_export_partitions) {
- throw new UserException("The partitions number of this
export job is larger than the maximum number"
- + " of partitions allowed by a export job");
- }
- for (String partName : this.partitionNames) {
- partitions.add(table.getPartition(partName));
- }
+ this.partitionNames.forEach(partitionName ->
partitions.add(table.getPartition(partitionName)));
} else {
if (table.getPartitions().size() >
Config.maximum_number_of_export_partitions) {
throw new UserException("The partitions number of this
export job is larger than the maximum number"
+ " of partitions allowed by a export job");
}
- partitions = table.getPartitions();
+ partitions.addAll(table.getPartitions());
}
// get tablets
@@ -344,38 +414,34 @@ public class ExportJob implements Writable {
}
Integer tabletsAllNum = tabletIdList.size();
- Integer tabletsNumPerQuery = tabletsAllNum / this.parallelNum;
- Integer tabletsNumPerQueryRemainder = tabletsAllNum -
tabletsNumPerQuery * this.parallelNum;
-
- Integer start = 0;
-
- ArrayList<ArrayList<TableRef>> tableRefListPerQuery =
Lists.newArrayList();
-
- int outfileNum = this.parallelNum;
- if (tabletsAllNum < this.parallelNum) {
- outfileNum = tabletsAllNum;
+ tabletsNum = tabletsAllNum;
+ Integer tabletsNumPerParallel = tabletsAllNum / this.parallelism;
+ Integer tabletsNumPerQueryRemainder = tabletsAllNum -
tabletsNumPerParallel * this.parallelism;
+
+ ArrayList<ArrayList<Long>> tabletsListPerParallel =
Lists.newArrayList();
+ Integer realParallelism = this.parallelism;
+ if (tabletsAllNum < this.parallelism) {
+ realParallelism = tabletsAllNum;
LOG.warn("Export Job [{}]: The number of tablets ({}) is smaller
than parallelism ({}), "
- + "set parallelism to tablets num.", id,
tabletsAllNum, this.parallelNum);
+ + "set parallelism to tablets num.", id,
tabletsAllNum, this.parallelism);
}
- for (int i = 0; i < outfileNum; ++i) {
- Integer tabletsNum = tabletsNumPerQuery;
+ Integer start = 0;
+ for (int i = 0; i < realParallelism; ++i) {
+ Integer tabletsNum = tabletsNumPerParallel;
if (tabletsNumPerQueryRemainder > 0) {
tabletsNum = tabletsNum + 1;
--tabletsNumPerQueryRemainder;
}
ArrayList<Long> tablets = new
ArrayList<>(tabletIdList.subList(start, start + tabletsNum));
start += tabletsNum;
- TableRef tblRef = new TableRef(this.tableRef.getName(),
this.tableRef.getAlias(), null, tablets,
- this.tableRef.getTableSample(),
this.tableRef.getCommonHints());
- ArrayList<TableRef> tableRefList = Lists.newArrayList();
- tableRefList.add(tblRef);
- tableRefListPerQuery.add(tableRefList);
+
+ tabletsListPerParallel.add(tablets);
}
- return tableRefListPerQuery;
+ return tabletsListPerParallel;
}
private Map<String, String> convertOutfileProperties() {
- Map<String, String> outfileProperties = Maps.newHashMap();
+ final Map<String, String> outfileProperties = Maps.newHashMap();
// file properties
if (format.equals("csv") || format.equals("csv_with_names") ||
format.equals("csv_with_names_and_types")) {
@@ -393,9 +459,7 @@ public class ExportJob implements Writable {
// outfile clause's broker properties need 'broker.' prefix
if (brokerDesc.getStorageType() == StorageType.BROKER) {
outfileProperties.put(BROKER_PROPERTY_PREFIXES + "name",
brokerDesc.getName());
- for (Entry<String, String> kv :
brokerDesc.getProperties().entrySet()) {
- outfileProperties.put(BROKER_PROPERTY_PREFIXES + kv.getKey(),
kv.getValue());
- }
+ brokerDesc.getProperties().forEach((k, v) ->
outfileProperties.put(BROKER_PROPERTY_PREFIXES + k, v));
} else {
for (Entry<String, String> kv :
brokerDesc.getProperties().entrySet()) {
outfileProperties.put(kv.getKey(), kv.getValue());
@@ -404,131 +468,25 @@ public class ExportJob implements Writable {
return outfileProperties;
}
- public String getColumns() {
- return columns;
- }
-
- public long getId() {
- return id;
- }
-
- public long getDbId() {
- return dbId;
- }
-
- public long getTableId() {
- return this.tableId;
- }
-
- public Expr getWhereExpr() {
- return whereExpr;
- }
-
- public synchronized JobState getState() {
+ public synchronized ExportJobState getState() {
return state;
}
- public BrokerDesc getBrokerDesc() {
- return brokerDesc;
- }
-
- public void setBrokerDesc(BrokerDesc brokerDesc) {
- this.brokerDesc = brokerDesc;
- }
-
- public String getExportPath() {
- return exportPath;
- }
-
- public String getColumnSeparator() {
- return this.columnSeparator;
- }
-
- public String getLineDelimiter() {
- return this.lineDelimiter;
- }
-
- public int getTimeoutSecond() {
- return timeoutSecond;
- }
-
- public String getFormat() {
- return format;
- }
-
- public String getMaxFileSize() {
- return maxFileSize;
- }
-
- public String getDeleteExistingFiles() {
- return deleteExistingFiles;
- }
-
- public String getQualifiedUser() {
- return qualifiedUser;
- }
-
- public UserIdentity getUserIdentity() {
- return userIdentity;
- }
-
- public List<String> getPartitions() {
- return partitionNames;
- }
-
- public int getProgress() {
- return progress;
- }
-
- public void setProgress(int progress) {
- this.progress = progress;
- }
-
- public long getCreateTimeMs() {
- return createTimeMs;
- }
-
- public long getStartTimeMs() {
- return startTimeMs;
- }
-
- public void setStartTimeMs(long startTimeMs) {
- this.startTimeMs = startTimeMs;
- }
-
- public long getFinishTimeMs() {
- return finishTimeMs;
- }
-
- public void setFinishTimeMs(long finishTimeMs) {
- this.finishTimeMs = finishTimeMs;
- }
-
- public ExportFailMsg getFailMsg() {
- return failMsg;
- }
-
- public void setFailMsg(ExportFailMsg failMsg) {
- this.failMsg = failMsg;
- }
-
- public String getOutfileInfo() {
- return outfileInfo;
- }
-
- public void setOutfileInfo(String outfileInfo) {
- this.outfileInfo = outfileInfo;
+ private void setExportJobState(ExportJobState newState) {
+ this.state = newState;
}
-
+ // TODO(ftw): delete
public synchronized Thread getDoExportingThread() {
return doExportingThread;
}
+ // TODO(ftw): delete
public synchronized void setDoExportingThread(Thread isExportingThread) {
this.doExportingThread = isExportingThread;
}
+ // TODO(ftw): delete
public synchronized void setStmtExecutor(int idx, StmtExecutor executor) {
this.stmtExecutorList.set(idx, executor);
}
@@ -537,51 +495,20 @@ public class ExportJob implements Writable {
return this.stmtExecutorList.get(idx);
}
- public List<TScanRangeLocations> getTabletLocations() {
- return tabletLocations;
- }
-
- public List<Pair<TNetworkAddress, String>> getSnapshotPaths() {
- return this.snapshotPaths;
- }
-
- public void addSnapshotPath(Pair<TNetworkAddress, String> snapshotPath) {
- this.snapshotPaths.add(snapshotPath);
- }
-
- public String getSql() {
- return sql;
- }
-
- public ExportExportingTask getTask() {
- return task;
- }
-
- public void setTask(ExportExportingTask task) {
- this.task = task;
- }
-
- public TableName getTableName() {
- return tableName;
- }
-
- public SessionVariable getSessionVariables() {
- return sessionVariables;
- }
-
+ // TODO(ftw): delete
public synchronized void cancel(ExportFailMsg.CancelType type, String msg)
{
if (msg != null) {
failMsg = new ExportFailMsg(type, msg);
}
// maybe user cancel this job
- if (task != null && state == JobState.EXPORTING && stmtExecutorList !=
null) {
+ if (task != null && state == ExportJobState.EXPORTING &&
stmtExecutorList != null) {
for (int idx = 0; idx < stmtExecutorList.size(); ++idx) {
stmtExecutorList.get(idx).cancel();
}
}
- if (updateState(ExportJob.JobState.CANCELLED, false)) {
+ if (updateState(ExportJobState.CANCELLED, false)) {
// release snapshot
// Status releaseSnapshotStatus = releaseSnapshotPaths();
// if (!releaseSnapshotStatus.ok()) {
@@ -592,23 +519,150 @@ public class ExportJob implements Writable {
}
}
+ public synchronized void updateExportJobState(ExportJobState newState,
Long taskId,
+ List<OutfileInfo> outfileInfoList, ExportFailMsg.CancelType type,
String msg) throws JobException {
+ switch (newState) {
+ case PENDING:
+ throw new JobException("Can not update ExportJob state to
'PENDING', job id: [{}], task id: [{}]",
+ id, taskId);
+ case EXPORTING:
+ exportExportJob();
+ break;
+ case CANCELLED:
+ cancelExportTask(type, msg);
+ break;
+ case FINISHED:
+ finishExportTask(taskId, outfileInfoList);
+ break;
+ default:
+ return;
+ }
+ }
+
+ public void cancelReplayedExportJob(ExportFailMsg.CancelType type, String
msg) {
+ setExportJobState(ExportJobState.CANCELLED);
+ failMsg = new ExportFailMsg(type, msg);
+ }
+
+ private void cancelExportTask(ExportFailMsg.CancelType type, String msg)
throws JobException {
+ if (getState() == ExportJobState.CANCELLED) {
+ return;
+ }
+
+ if (getState() == ExportJobState.FINISHED) {
+ throw new JobException("Job {} has finished, can not been
cancelled", id);
+ }
+
+ if (getState() == ExportJobState.PENDING) {
+ startTimeMs = System.currentTimeMillis();
+ }
+
+ // we need cancel all task
+ taskIdToExecutor.keySet().forEach(id -> {
+ try {
+ register.cancelTask(id);
+ } catch (JobException e) {
+ LOG.warn("cancel export task {} exception: {}", id, e);
+ }
+ });
+
+ cancelExportJobUnprotected(type, msg);
+ }
+
+ private void cancelExportJobUnprotected(ExportFailMsg.CancelType type,
String msg) {
+ setExportJobState(ExportJobState.CANCELLED);
+ finishTimeMs = System.currentTimeMillis();
+ failMsg = new ExportFailMsg(type, msg);
+ Env.getCurrentEnv().getEditLog().logExportUpdateState(id,
ExportJobState.CANCELLED);
+ }
+
+ // TODO(ftw): delete
public synchronized boolean finish(List<OutfileInfo> outfileInfoList) {
outfileInfo = GsonUtils.GSON.toJson(outfileInfoList);
- if (updateState(ExportJob.JobState.FINISHED)) {
+ if (updateState(ExportJobState.FINISHED)) {
return true;
}
return false;
}
- public synchronized boolean updateState(ExportJob.JobState newState) {
+ private void exportExportJob() {
+ // The first exportTaskExecutor will set state to EXPORTING,
+ // other exportTaskExecutors do not need to set up state.
+ if (getState() == ExportJobState.EXPORTING) {
+ return;
+ }
+ setExportJobState(ExportJobState.EXPORTING);
+ // if isReplay == true, startTimeMs will be read from LOG
+ startTimeMs = System.currentTimeMillis();
+ }
+
+ private void finishExportTask(Long taskId, List<OutfileInfo>
outfileInfoList) throws JobException {
+ if (getState() == ExportJobState.CANCELLED) {
+ throw new JobException("Job [{}] has been cancelled, can not
finish this task: {}", id, taskId);
+ }
+
+ allOutfileInfo.add(outfileInfoList);
+ ++finishedTaskCount;
+
+ // calculate progress
+ int tmpProgress = finishedTaskCount * 100 / jobExecutorList.size();
+ if (finishedTaskCount * 100 / jobExecutorList.size() >= 100) {
+ progress = 99;
+ } else {
+ progress = tmpProgress;
+ }
+
+ // if all task finished
+ if (finishedTaskCount == jobExecutorList.size()) {
+ finishExportJobUnprotected();
+ }
+ }
+
+ private void finishExportJobUnprotected() {
+ progress = 100;
+ setExportJobState(ExportJobState.FINISHED);
+ finishTimeMs = System.currentTimeMillis();
+ outfileInfo = GsonUtils.GSON.toJson(allOutfileInfo);
+ Env.getCurrentEnv().getEditLog().logExportUpdateState(id,
ExportJobState.FINISHED);
+ }
+
+ public void replayExportJobState(ExportJobState newState) {
+ switch (newState) {
+ // We do not persist EXPORTING state in new version of metadata,
+ // but EXPORTING state may still exist in older versions of
metadata.
+ // So if isReplay == true and newState == EXPORTING, we set
newState = CANCELLED.
+ case EXPORTING:
+ // We do not need IN_QUEUE state in new version of export
+ // but IN_QUEUE state may still exist in older versions of
metadata.
+ // So if isReplay == true and newState == IN_QUEUE, we set
newState = CANCELLED.
+ case IN_QUEUE:
+ newState = ExportJobState.CANCELLED;
+ break;
+ case PENDING:
+ case CANCELLED:
+ progress = 0;
+ break;
+ case FINISHED:
+ progress = 100;
+ break;
+ default:
+ Preconditions.checkState(false, "wrong job state: " +
newState.name());
+ break;
+ }
+ setExportJobState(newState);
+ }
+
+ // TODO(ftw): delete
+ public synchronized boolean updateState(ExportJobState newState) {
return this.updateState(newState, false);
}
- public synchronized boolean updateState(ExportJob.JobState newState,
boolean isReplay) {
+ // TODO(ftw): delete
+ public synchronized boolean updateState(ExportJobState newState, boolean
isReplay) {
// We do not persist EXPORTING state in new version of metadata,
// but EXPORTING state may still exist in older versions of metadata.
// So if isReplay == true and newState == EXPORTING, we just ignore
this update.
- if (isFinalState() || (isReplay && newState == JobState.EXPORTING)) {
+ if (isFinalState() || (isReplay && newState ==
ExportJobState.EXPORTING)) {
return false;
}
state = newState;
@@ -618,7 +672,7 @@ public class ExportJob implements Writable {
progress = 0;
break;
case EXPORTING:
- // if isReplay == true, startTimeMs will be read from log
+ // if isReplay == true, startTimeMs will be read from LOG
if (!isReplay) {
startTimeMs = System.currentTimeMillis();
}
@@ -630,7 +684,7 @@ public class ExportJob implements Writable {
progress = 100;
break;
case CANCELLED:
- // if isReplay == true, finishTimeMs will be read from log
+ // if isReplay == true, finishTimeMs will be read from LOG
if (!isReplay) {
finishTimeMs = System.currentTimeMillis();
}
@@ -640,27 +694,19 @@ public class ExportJob implements Writable {
break;
}
// we only persist Pending/Cancel/Finish state
- if (!isReplay && newState != JobState.IN_QUEUE && newState !=
JobState.EXPORTING) {
+ if (!isReplay && newState != ExportJobState.IN_QUEUE && newState !=
ExportJobState.EXPORTING) {
Env.getCurrentEnv().getEditLog().logExportUpdateState(id,
newState);
}
return true;
}
public synchronized boolean isFinalState() {
- return this.state == ExportJob.JobState.CANCELLED || this.state ==
ExportJob.JobState.FINISHED;
+ return this.state == ExportJobState.CANCELLED || this.state ==
ExportJobState.FINISHED;
}
public boolean isExpired(long curTime) {
return (curTime - createTimeMs) / 1000 >
Config.history_job_keep_max_second
- && (state == ExportJob.JobState.CANCELLED || state ==
ExportJob.JobState.FINISHED);
- }
-
- public String getLabel() {
- return label;
- }
-
- public String getQueryId() {
- return queryId;
+ && (state == ExportJobState.CANCELLED || state ==
ExportJobState.FINISHED);
}
@Override
@@ -737,7 +783,7 @@ public class ExportJob implements Writable {
}
}
- state = JobState.valueOf(Text.readString(in));
+ state = ExportJobState.valueOf(Text.readString(in));
createTimeMs = in.readLong();
startTimeMs = in.readLong();
finishTimeMs = in.readLong();
@@ -793,132 +839,4 @@ public class ExportJob implements Writable {
return false;
}
-
- public boolean isReplayed() {
- return isReplayed;
- }
-
- // for only persist op when switching job state.
- public static class StateTransfer implements Writable {
- @SerializedName("jobId")
- long jobId;
- @SerializedName("state")
- JobState state;
- @SerializedName("startTimeMs")
- private long startTimeMs;
- @SerializedName("finishTimeMs")
- private long finishTimeMs;
- @SerializedName("failMsg")
- private ExportFailMsg failMsg;
- @SerializedName("outFileInfo")
- private String outFileInfo;
-
- // used for reading from one log
- public StateTransfer() {
- this.jobId = -1;
- this.state = JobState.CANCELLED;
- this.failMsg = new ExportFailMsg(ExportFailMsg.CancelType.UNKNOWN,
"");
- this.outFileInfo = "";
- }
-
- // used for persisting one log
- public StateTransfer(long jobId, JobState state) {
- this.jobId = jobId;
- this.state = state;
- ExportJob job = Env.getCurrentEnv().getExportMgr().getJob(jobId);
- this.startTimeMs = job.getStartTimeMs();
- this.finishTimeMs = job.getFinishTimeMs();
- this.failMsg = job.getFailMsg();
- this.outFileInfo = job.getOutfileInfo();
- }
-
- public long getJobId() {
- return jobId;
- }
-
- public JobState getState() {
- return state;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- String json = GsonUtils.GSON.toJson(this);
- Text.writeString(out, json);
- }
-
- public static StateTransfer read(DataInput in) throws IOException {
- if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_120)
{
- StateTransfer transfer = new StateTransfer();
- transfer.readFields(in);
- return transfer;
- }
- String json = Text.readString(in);
- StateTransfer transfer = GsonUtils.GSON.fromJson(json,
ExportJob.StateTransfer.class);
- return transfer;
- }
-
- private void readFields(DataInput in) throws IOException {
- jobId = in.readLong();
- state = JobState.valueOf(Text.readString(in));
- }
-
- public long getStartTimeMs() {
- return startTimeMs;
- }
-
- public long getFinishTimeMs() {
- return finishTimeMs;
- }
-
- public String getOutFileInfo() {
- return outFileInfo;
- }
-
- public ExportFailMsg getFailMsg() {
- return failMsg;
- }
- }
-
- public static class OutfileInfo {
- @SerializedName("fileNumber")
- private String fileNumber;
- @SerializedName("totalRows")
- private String totalRows;
- @SerializedName("fileSize")
- private String fileSize;
- @SerializedName("url")
- private String url;
-
- public String getUrl() {
- return url;
- }
-
- public void setUrl(String url) {
- this.url = url;
- }
-
- public String getFileNumber() {
- return fileNumber;
- }
-
- public void setFileNumber(String fileNumber) {
- this.fileNumber = fileNumber;
- }
-
- public String getTotalRows() {
- return totalRows;
- }
-
- public void setTotalRows(String totalRows) {
- this.totalRows = totalRows;
- }
-
- public String getFileSize() {
- return fileSize;
- }
-
- public void setFileSize(String fileSize) {
- this.fileSize = fileSize;
- }
- }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJobState.java
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJobState.java
new file mode 100644
index 0000000000..4fd4cabf80
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJobState.java
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load;
+
+public enum ExportJobState {
+
+ /**
+ * the initial state of export job.
+ */
+ PENDING,
+
+ /**
+ * When the export job is waiting to be schedule.
+ */
+ IN_QUEUE,
+
+ /**
+ * When the export job is exporting, the EXPORTING state will be triggered.
+ */
+ EXPORTING,
+
+ /**
+ * When the export job is finished, the FINISHED state will be triggered.
+ */
+ FINISHED,
+
+ /**
+ * When the export job is cancelled, the CANCELLED state will be triggered.
+ */
+ CANCELLED,
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJobStateTransfer.java
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJobStateTransfer.java
new file mode 100644
index 0000000000..06253b1f1e
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJobStateTransfer.java
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.FeMetaVersion;
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.gson.annotations.SerializedName;
+import lombok.Getter;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+@Getter
+public class ExportJobStateTransfer implements Writable {
+ @SerializedName("jobId")
+ long jobId;
+ @SerializedName("state")
+ private ExportJobState state;
+ @SerializedName("startTimeMs")
+ private long startTimeMs;
+ @SerializedName("finishTimeMs")
+ private long finishTimeMs;
+ @SerializedName("failMsg")
+ private ExportFailMsg failMsg;
+ @SerializedName("outFileInfo")
+ private String outFileInfo;
+
+ // used for reading from one log
+ public ExportJobStateTransfer() {
+ this.jobId = -1;
+ this.state = ExportJobState.CANCELLED;
+ this.failMsg = new ExportFailMsg(ExportFailMsg.CancelType.UNKNOWN, "");
+ this.outFileInfo = "";
+ }
+
+ // used for persisting one log
+ public ExportJobStateTransfer(long jobId, ExportJobState state) {
+ this.jobId = jobId;
+ this.state = state;
+ ExportJob job = Env.getCurrentEnv().getExportMgr().getJob(jobId);
+ this.startTimeMs = job.getStartTimeMs();
+ this.finishTimeMs = job.getFinishTimeMs();
+ this.failMsg = job.getFailMsg();
+ this.outFileInfo = job.getOutfileInfo();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ String json = GsonUtils.GSON.toJson(this);
+ Text.writeString(out, json);
+ }
+
+ public static ExportJobStateTransfer read(DataInput in) throws IOException
{
+ if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_120) {
+ ExportJobStateTransfer transfer = new ExportJobStateTransfer();
+ transfer.readFields(in);
+ return transfer;
+ }
+ String json = Text.readString(in);
+ ExportJobStateTransfer transfer = GsonUtils.GSON.fromJson(json,
ExportJobStateTransfer.class);
+ return transfer;
+ }
+
+ private void readFields(DataInput in) throws IOException {
+ jobId = in.readLong();
+ state = ExportJobState.valueOf(Text.readString(in));
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
index 0e0b26f273..9a0bc7953f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
@@ -35,9 +35,9 @@ import org.apache.doris.common.util.ListComparator;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.common.util.OrderByPair;
import org.apache.doris.common.util.TimeUtils;
-import org.apache.doris.load.ExportJob.JobState;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.scheduler.exception.JobException;
import org.apache.doris.task.ExportExportingTask;
import org.apache.doris.task.MasterTask;
import org.apache.doris.task.MasterTaskExecutor;
@@ -69,8 +69,8 @@ public class ExportMgr extends MasterDaemon {
// lock is private and must use after db lock
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
- private Map<Long, ExportJob> idToJob = Maps.newHashMap(); // exportJobId
to exportJob
- private Map<String, Long> labelToJobId = Maps.newHashMap();
+ private Map<Long, ExportJob> exportIdToJob = Maps.newHashMap(); //
exportJobId to exportJob
+ private Map<String, Long> labelToExportJobId = Maps.newHashMap();
private MasterTaskExecutor exportingExecutor;
@@ -103,7 +103,7 @@ public class ExportMgr extends MasterDaemon {
@Override
protected void runAfterCatalogReady() {
- List<ExportJob> pendingJobs = getExportJobs(JobState.PENDING);
+ List<ExportJob> pendingJobs = getExportJobs(ExportJobState.PENDING);
List<ExportJob> newInQueueJobs = Lists.newArrayList();
for (ExportJob job : pendingJobs) {
if (handlePendingJobs(job)) {
@@ -128,7 +128,7 @@ public class ExportMgr extends MasterDaemon {
private boolean handlePendingJobs(ExportJob job) {
// because maybe this job has been cancelled by user.
- if (job.getState() != JobState.PENDING) {
+ if (job.getState() != ExportJobState.PENDING) {
return false;
}
@@ -136,11 +136,12 @@ public class ExportMgr extends MasterDaemon {
// If the job is created from replay thread, all plan info will be
lost.
// so the job has to be cancelled.
String failMsg = "FE restarted or Master changed during exporting.
Job must be cancelled.";
- job.cancel(ExportFailMsg.CancelType.RUN_FAIL, failMsg);
+ // job.cancel(ExportFailMsg.CancelType.RUN_FAIL, failMsg);
+ job.cancelReplayedExportJob(ExportFailMsg.CancelType.RUN_FAIL,
failMsg);
return false;
}
- if (job.updateState(JobState.IN_QUEUE)) {
+ if (job.updateState(ExportJobState.IN_QUEUE)) {
LOG.info("Exchange pending status to in_queue status success. job:
{}", job);
return true;
}
@@ -148,7 +149,7 @@ public class ExportMgr extends MasterDaemon {
}
public List<ExportJob> getJobs() {
- return Lists.newArrayList(idToJob.values());
+ return Lists.newArrayList(exportIdToJob.values());
}
public void addExportJob(ExportStmt stmt) throws Exception {
@@ -156,7 +157,7 @@ public class ExportMgr extends MasterDaemon {
ExportJob job = createJob(jobId, stmt);
writeLock();
try {
- if (labelToJobId.containsKey(job.getLabel())) {
+ if (labelToExportJobId.containsKey(job.getLabel())) {
throw new LabelAlreadyUsedException(job.getLabel());
}
unprotectAddJob(job);
@@ -167,6 +168,28 @@ public class ExportMgr extends MasterDaemon {
LOG.info("add export job. {}", job);
}
+ public void addExportJobAndRegisterTask(ExportStmt stmt) throws Exception {
+ ExportJob job = stmt.getExportJob();
+ long jobId = Env.getCurrentEnv().getNextId();
+ job.setId(jobId);
+ writeLock();
+ try {
+ if (labelToExportJobId.containsKey(job.getLabel())) {
+ throw new LabelAlreadyUsedException(job.getLabel());
+ }
+ unprotectAddJob(job);
+ job.getJobExecutorList().forEach(executor -> {
+ Long taskId = ExportJob.register.registerTask(executor);
+ executor.setTaskId(taskId);
+ job.getTaskIdToExecutor().put(taskId, executor);
+ });
+ Env.getCurrentEnv().getEditLog().logExportCreate(job);
+ } finally {
+ writeUnlock();
+ }
+ LOG.info("add export job. {}", job);
+ }
+
public void cancelExportJob(CancelExportStmt stmt) throws DdlException,
AnalysisException {
// List of export jobs waiting to be cancelled
List<ExportJob> matchExportJobs = getWaitingCancelJobs(stmt);
@@ -178,14 +201,20 @@ public class ExportMgr extends MasterDaemon {
if (matchExportJobs.isEmpty()) {
throw new DdlException("All export job(s) are at final state
(CANCELLED/FINISHED)");
}
- for (ExportJob exportJob : matchExportJobs) {
- exportJob.cancel(ExportFailMsg.CancelType.USER_CANCEL, "user
cancel");
+ try {
+ for (ExportJob exportJob : matchExportJobs) {
+ // exportJob.cancel(ExportFailMsg.CancelType.USER_CANCEL,
"user cancel");
+ exportJob.updateExportJobState(ExportJobState.CANCELLED, 0L,
null,
+ ExportFailMsg.CancelType.USER_CANCEL, "user cancel");
+ }
+ } catch (JobException e) {
+ throw new AnalysisException(e.getMessage());
}
}
public void unprotectAddJob(ExportJob job) {
- idToJob.put(job.getId(), job);
- labelToJobId.putIfAbsent(job.getLabel(), job.getId());
+ exportIdToJob.put(job.getId(), job);
+ labelToExportJobId.putIfAbsent(job.getLabel(), job.getId());
}
private List<ExportJob> getWaitingCancelJobs(CancelExportStmt stmt) throws
AnalysisException {
@@ -224,28 +253,28 @@ public class ExportMgr extends MasterDaemon {
};
}
- private ExportJob createJob(long jobId, ExportStmt stmt) throws Exception {
- ExportJob job = new ExportJob(jobId);
- job.setJob(stmt);
- return job;
+ private ExportJob createJob(long jobId, ExportStmt stmt) {
+ ExportJob exportJob = stmt.getExportJob();
+ exportJob.setId(jobId);
+ return exportJob;
}
public ExportJob getJob(long jobId) {
- ExportJob job = null;
+ ExportJob job;
readLock();
try {
- job = idToJob.get(jobId);
+ job = exportIdToJob.get(jobId);
} finally {
readUnlock();
}
return job;
}
- public List<ExportJob> getExportJobs(ExportJob.JobState state) {
+ public List<ExportJob> getExportJobs(ExportJobState state) {
List<ExportJob> result = Lists.newArrayList();
readLock();
try {
- for (ExportJob job : idToJob.values()) {
+ for (ExportJob job : exportIdToJob.values()) {
if (job.getState() == state) {
result.add(job);
}
@@ -260,7 +289,7 @@ public class ExportMgr extends MasterDaemon {
// used for `show export` statement
// NOTE: jobid and states may both specified, or only one of them, or
neither
public List<List<String>> getExportJobInfosByIdOrState(
- long dbId, long jobId, String label, boolean isLabelUseLike,
Set<ExportJob.JobState> states,
+ long dbId, long jobId, String label, boolean isLabelUseLike,
Set<ExportJobState> states,
ArrayList<OrderByPair> orderByPairs, long limit) throws
AnalysisException {
long resultNum = limit == -1L ? Integer.MAX_VALUE : limit;
@@ -273,9 +302,9 @@ public class ExportMgr extends MasterDaemon {
readLock();
try {
int counter = 0;
- for (ExportJob job : idToJob.values()) {
+ for (ExportJob job : exportIdToJob.values()) {
long id = job.getId();
- ExportJob.JobState state = job.getState();
+ ExportJobState state = job.getState();
String jobLabel = job.getLabel();
if (job.getDbId() != dbId) {
@@ -345,7 +374,7 @@ public class ExportMgr extends MasterDaemon {
readLock();
try {
int counter = 0;
- for (ExportJob job : idToJob.values()) {
+ for (ExportJob job : exportIdToJob.values()) {
// check auth
if (isJobShowable(job)) {
exportJobInfos.add(composeExportJobInfo(job));
@@ -406,7 +435,7 @@ public class ExportMgr extends MasterDaemon {
// task infos
Map<String, Object> infoMap = Maps.newHashMap();
- List<String> partitions = job.getPartitions();
+ List<String> partitions = job.getPartitionNames();
if (partitions == null) {
partitions = Lists.newArrayList();
partitions.add("*");
@@ -422,7 +451,7 @@ public class ExportMgr extends MasterDaemon {
infoMap.put("format", job.getFormat());
infoMap.put("line_delimiter", job.getLineDelimiter());
infoMap.put("columns", job.getColumns());
- infoMap.put("tablet_num", job.getTabletLocations() == null ? -1 :
job.getTabletLocations().size());
+ infoMap.put("tablet_num", job.getTabletsNum());
infoMap.put("max_file_size", job.getMaxFileSize());
infoMap.put("delete_existing_files", job.getDeleteExistingFiles());
jobInfo.add(new Gson().toJson(infoMap));
@@ -435,7 +464,7 @@ public class ExportMgr extends MasterDaemon {
jobInfo.add(job.getTimeoutSecond());
// error msg
- if (job.getState() == ExportJob.JobState.CANCELLED) {
+ if (job.getState() == ExportJobState.CANCELLED) {
ExportFailMsg failMsg = job.getFailMsg();
jobInfo.add("type:" + failMsg.getCancelType() + "; msg:" +
failMsg.getMsg());
} else {
@@ -443,7 +472,7 @@ public class ExportMgr extends MasterDaemon {
}
// outfileInfo
- if (job.getState() == JobState.FINISHED) {
+ if (job.getState() == ExportJobState.FINISHED) {
jobInfo.add(job.getOutfileInfo());
} else {
jobInfo.add(FeConstants.null_string);
@@ -457,15 +486,15 @@ public class ExportMgr extends MasterDaemon {
writeLock();
try {
- Iterator<Map.Entry<Long, ExportJob>> iter =
idToJob.entrySet().iterator();
+ Iterator<Map.Entry<Long, ExportJob>> iter =
exportIdToJob.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<Long, ExportJob> entry = iter.next();
ExportJob job = entry.getValue();
if ((currentTimeMs - job.getCreateTimeMs()) / 1000 >
Config.history_job_keep_max_second
- && (job.getState() == ExportJob.JobState.CANCELLED
- || job.getState() == ExportJob.JobState.FINISHED))
{
+ && (job.getState() == ExportJobState.CANCELLED
+ || job.getState() == ExportJobState.FINISHED)) {
iter.remove();
- labelToJobId.remove(job.getLabel(), job.getId());
+ labelToExportJobId.remove(job.getLabel(), job.getId());
}
}
} finally {
@@ -482,11 +511,12 @@ public class ExportMgr extends MasterDaemon {
}
}
- public void replayUpdateJobState(ExportJob.StateTransfer stateTransfer) {
+ public void replayUpdateJobState(ExportJobStateTransfer stateTransfer) {
readLock();
try {
- ExportJob job = idToJob.get(stateTransfer.getJobId());
- job.updateState(stateTransfer.getState(), true);
+ ExportJob job = exportIdToJob.get(stateTransfer.getJobId());
+ // job.updateState(stateTransfer.getState(), true);
+ job.replayExportJobState(stateTransfer.getState());
job.setStartTimeMs(stateTransfer.getStartTimeMs());
job.setFinishTimeMs(stateTransfer.getFinishTimeMs());
job.setFailMsg(stateTransfer.getFailMsg());
@@ -496,11 +526,11 @@ public class ExportMgr extends MasterDaemon {
}
}
- public long getJobNum(ExportJob.JobState state, long dbId) {
+ public long getJobNum(ExportJobState state, long dbId) {
int size = 0;
readLock();
try {
- for (ExportJob job : idToJob.values()) {
+ for (ExportJob job : exportIdToJob.values()) {
if (job.getState() == state && job.getDbId() == dbId) {
++size;
}
@@ -511,11 +541,11 @@ public class ExportMgr extends MasterDaemon {
return size;
}
- public long getJobNum(ExportJob.JobState state) {
+ public long getJobNum(ExportJobState state) {
int size = 0;
readLock();
try {
- for (ExportJob job : idToJob.values()) {
+ for (ExportJob job : exportIdToJob.values()) {
if
(!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(),
Env.getCurrentEnv().getCatalogMgr().getDbNullable(job.getDbId()).getFullName(),
PrivPredicate.LOAD)) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java
new file mode 100644
index 0000000000..5fdc6962fc
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load;
+
+import org.apache.doris.analysis.OutFileClause;
+import org.apache.doris.analysis.SelectStmt;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.TabletMeta;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.load.ExportFailMsg.CancelType;
+import org.apache.doris.qe.AutoCloseConnectContext;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.QueryState.MysqlStateType;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.scheduler.exception.JobException;
+import org.apache.doris.scheduler.executor.TransientTaskExecutor;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.thrift.TUniqueId;
+
+import com.google.common.collect.Lists;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+@Slf4j
+public class ExportTaskExecutor implements TransientTaskExecutor {
+
+ List<SelectStmt> selectStmtLists;
+
+ ExportJob exportJob;
+
+ @Setter
+ Long taskId;
+
+ private StmtExecutor stmtExecutor;
+
+ private AtomicBoolean isCanceled;
+
+ private AtomicBoolean isFinished;
+
+ ExportTaskExecutor(List<SelectStmt> selectStmtLists, ExportJob exportJob) {
+ this.selectStmtLists = selectStmtLists;
+ this.exportJob = exportJob;
+ this.isCanceled = new AtomicBoolean(false);
+ this.isFinished = new AtomicBoolean(false);
+ }
+
+ @Override
+ public void execute() throws JobException {
+ if (isCanceled.get()) {
+ throw new JobException("Export executor has been canceled, task
id: {}", taskId);
+ }
+ exportJob.updateExportJobState(ExportJobState.EXPORTING, taskId, null,
null, null);
+ List<OutfileInfo> outfileInfoList = Lists.newArrayList();
+ for (int idx = 0; idx < selectStmtLists.size(); ++idx) {
+ if (isCanceled.get()) {
+ throw new JobException("Export executor has been canceled,
task id: {}", taskId);
+ }
+ // check the version of tablets
+ try {
+ Database db =
Env.getCurrentEnv().getInternalCatalog().getDbOrAnalysisException(
+ exportJob.getTableName().getDb());
+ OlapTable table =
db.getOlapTableOrAnalysisException(exportJob.getTableName().getTbl());
+ table.readLock();
+ try {
+ SelectStmt selectStmt = selectStmtLists.get(idx);
+ List<Long> tabletIds =
selectStmt.getTableRefs().get(0).getSampleTabletIds();
+ for (Long tabletId : tabletIds) {
+ TabletMeta tabletMeta =
Env.getCurrentEnv().getTabletInvertedIndex().getTabletMeta(
+ tabletId);
+ Partition partition =
table.getPartition(tabletMeta.getPartitionId());
+ long nowVersion = partition.getVisibleVersion();
+ long oldVersion =
exportJob.getPartitionToVersion().get(partition.getName());
+ if (nowVersion != oldVersion) {
+
exportJob.updateExportJobState(ExportJobState.CANCELLED, taskId, null,
+ CancelType.RUN_FAIL, "The version of
tablet {" + tabletId + "} has changed");
+ throw new JobException("Export Job[{}]: Tablet {}
has changed version, old version = {}, "
+ + "now version = {}", exportJob.getId(),
tabletId, oldVersion, nowVersion);
+ }
+ }
+ } finally {
+ table.readUnlock();
+ }
+ } catch (AnalysisException e) {
+ exportJob.updateExportJobState(ExportJobState.CANCELLED,
taskId, null,
+ ExportFailMsg.CancelType.RUN_FAIL, e.getMessage());
+ throw new JobException(e);
+ }
+
+ try (AutoCloseConnectContext r = buildConnectContext()) {
+ stmtExecutor = new StmtExecutor(r.connectContext,
selectStmtLists.get(idx));
+ stmtExecutor.execute();
+ if (r.connectContext.getState().getStateType() ==
MysqlStateType.ERR) {
+ exportJob.updateExportJobState(ExportJobState.CANCELLED,
taskId, null,
+ ExportFailMsg.CancelType.RUN_FAIL,
r.connectContext.getState().getErrorMessage());
+ return;
+ }
+ OutfileInfo outfileInfo =
getOutFileInfo(r.connectContext.getResultAttachedInfo());
+ outfileInfoList.add(outfileInfo);
+ } catch (Exception e) {
+ exportJob.updateExportJobState(ExportJobState.CANCELLED,
taskId, null,
+ ExportFailMsg.CancelType.RUN_FAIL, e.getMessage());
+ throw new JobException(e);
+ } finally {
+ stmtExecutor.addProfileToSpan();
+ }
+ }
+ if (isCanceled.get()) {
+ throw new JobException("Export executor has been canceled, task
id: {}", taskId);
+ }
+ exportJob.updateExportJobState(ExportJobState.FINISHED, taskId,
outfileInfoList, null, null);
+ isFinished.getAndSet(true);
+ }
+
+ @Override
+ public void cancel() throws JobException {
+ if (isFinished.get()) {
+ throw new JobException("Export executor has finished, task id:
{}", taskId);
+ }
+ isCanceled.getAndSet(true);
+ if (stmtExecutor != null) {
+ stmtExecutor.cancel();
+ }
+ }
+
+ private AutoCloseConnectContext buildConnectContext() {
+ ConnectContext connectContext = new ConnectContext();
+ connectContext.setSessionVariable(exportJob.getSessionVariables());
+ connectContext.setEnv(Env.getCurrentEnv());
+ connectContext.setDatabase(exportJob.getTableName().getDb());
+ connectContext.setQualifiedUser(exportJob.getQualifiedUser());
+ connectContext.setCurrentUserIdentity(exportJob.getUserIdentity());
+ UUID uuid = UUID.randomUUID();
+ TUniqueId queryId = new TUniqueId(uuid.getMostSignificantBits(),
uuid.getLeastSignificantBits());
+ connectContext.setQueryId(queryId);
+ connectContext.setStartTime();
+ connectContext.setCluster(SystemInfoService.DEFAULT_CLUSTER);
+ return new AutoCloseConnectContext(connectContext);
+ }
+
+ private OutfileInfo getOutFileInfo(Map<String, String> resultAttachedInfo)
{
+ OutfileInfo outfileInfo = new OutfileInfo();
+
outfileInfo.setFileNumber(resultAttachedInfo.get(OutFileClause.FILE_NUMBER));
+
outfileInfo.setTotalRows(resultAttachedInfo.get(OutFileClause.TOTAL_ROWS));
+
outfileInfo.setFileSize(resultAttachedInfo.get(OutFileClause.FILE_SIZE) +
"bytes");
+ outfileInfo.setUrl(resultAttachedInfo.get(OutFileClause.URL));
+ return outfileInfo;
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/OutfileInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/load/OutfileInfo.java
new file mode 100644
index 0000000000..b9befd9d32
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/OutfileInfo.java
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load;
+
+import com.google.gson.annotations.SerializedName;
+import lombok.Data;
+
+@Data
+public class OutfileInfo {
+
+ @SerializedName("fileNumber")
+ private String fileNumber;
+
+ @SerializedName("totalRows")
+ private String totalRows;
+
+ @SerializedName("fileSize")
+ private String fileSize;
+
+ @SerializedName("url")
+ private String url;
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 68d2153556..9a1841be69 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -61,6 +61,8 @@ import org.apache.doris.journal.local.LocalJournal;
import org.apache.doris.load.DeleteHandler;
import org.apache.doris.load.DeleteInfo;
import org.apache.doris.load.ExportJob;
+import org.apache.doris.load.ExportJobState;
+import org.apache.doris.load.ExportJobStateTransfer;
import org.apache.doris.load.ExportMgr;
import org.apache.doris.load.LoadJob;
import org.apache.doris.load.StreamLoadRecordMgr.FetchStreamLoadRecord;
@@ -365,7 +367,7 @@ public class EditLog {
break;
}
case OperationType.OP_EXPORT_UPDATE_STATE: {
- ExportJob.StateTransfer op = (ExportJob.StateTransfer)
journal.getData();
+ ExportJobStateTransfer op = (ExportJobStateTransfer)
journal.getData();
ExportMgr exportMgr = env.getExportMgr();
exportMgr.replayUpdateJobState(op);
break;
@@ -1461,8 +1463,8 @@ public class EditLog {
logEdit(OperationType.OP_EXPORT_CREATE, job);
}
- public void logExportUpdateState(long jobId, ExportJob.JobState newState) {
- ExportJob.StateTransfer transfer = new ExportJob.StateTransfer(jobId,
newState);
+ public void logExportUpdateState(long jobId, ExportJobState newState) {
+ ExportJobStateTransfer transfer = new ExportJobStateTransfer(jobId,
newState);
logEdit(OperationType.OP_EXPORT_UPDATE_STATE, transfer);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
index 11190edc8b..7637c3869d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
@@ -181,7 +181,7 @@ import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.HMSExternalCatalog;
import org.apache.doris.external.iceberg.IcebergTableCreationRecord;
import org.apache.doris.load.DeleteHandler;
-import org.apache.doris.load.ExportJob;
+import org.apache.doris.load.ExportJobState;
import org.apache.doris.load.ExportMgr;
import org.apache.doris.load.Load;
import org.apache.doris.load.LoadJob;
@@ -1922,8 +1922,8 @@ public class ShowExecutor {
ExportMgr exportMgr = env.getExportMgr();
- Set<ExportJob.JobState> states = null;
- ExportJob.JobState state = showExportStmt.getJobState();
+ Set<ExportJobState> states = null;
+ ExportJobState state = showExportStmt.getJobState();
if (state != null) {
states = Sets.newHashSet(state);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index cdd0d66f89..3bbd96408a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -2213,7 +2213,8 @@ public class StmtExecutor {
private void handleExportStmt() throws Exception {
ExportStmt exportStmt = (ExportStmt) parsedStmt;
- context.getEnv().getExportMgr().addExportJob(exportStmt);
+ // context.getEnv().getExportMgr().addExportJob(exportStmt);
+
context.getEnv().getExportMgr().addExportJobAndRegisterTask(exportStmt);
}
private void handleCtasStmt() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java
index 48f3ce609e..1b17806c71 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java
@@ -30,8 +30,8 @@ import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.load.ExportFailMsg;
import org.apache.doris.load.ExportFailMsg.CancelType;
import org.apache.doris.load.ExportJob;
-import org.apache.doris.load.ExportJob.JobState;
-import org.apache.doris.load.ExportJob.OutfileInfo;
+import org.apache.doris.load.ExportJobState;
+import org.apache.doris.load.OutfileInfo;
import org.apache.doris.qe.AutoCloseConnectContext;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState.MysqlStateType;
@@ -69,9 +69,9 @@ public class ExportExportingTask extends MasterTask {
private ExportFailMsg failMsg;
- private ExportJob.OutfileInfo outfileInfo;
+ private OutfileInfo outfileInfo;
- public ExportResult(boolean isFailed, ExportFailMsg failMsg,
ExportJob.OutfileInfo outfileInfo) {
+ public ExportResult(boolean isFailed, ExportFailMsg failMsg,
OutfileInfo outfileInfo) {
this.isFailed = isFailed;
this.failMsg = failMsg;
this.outfileInfo = outfileInfo;
@@ -93,11 +93,11 @@ public class ExportExportingTask extends MasterTask {
@Override
protected void exec() {
- if (job.getState() == JobState.IN_QUEUE) {
+ if (job.getState() == ExportJobState.IN_QUEUE) {
handleInQueueState();
}
- if (job.getState() != ExportJob.JobState.EXPORTING) {
+ if (job.getState() != ExportJobState.EXPORTING) {
return;
}
LOG.info("begin execute export job in exporting state. job: {}", job);
@@ -112,7 +112,7 @@ public class ExportExportingTask extends MasterTask {
List<SelectStmt> selectStmtList = job.getSelectStmtList();
int completeTaskNum = 0;
- List<ExportJob.OutfileInfo> outfileInfoList = Lists.newArrayList();
+ List<OutfileInfo> outfileInfoList = Lists.newArrayList();
int parallelNum = selectStmtList.size();
CompletionService<ExportResult> completionService = new
ExecutorCompletionService<>(exportExecPool);
@@ -122,7 +122,7 @@ public class ExportExportingTask extends MasterTask {
final int idx = i;
completionService.submit(() -> {
// maybe user cancelled this job
- if (job.getState() != JobState.EXPORTING) {
+ if (job.getState() != ExportJobState.EXPORTING) {
return new ExportResult(true, null, null);
}
try {
@@ -162,7 +162,7 @@ public class ExportExportingTask extends MasterTask {
return new ExportResult(true, new
ExportFailMsg(ExportFailMsg.CancelType.RUN_FAIL,
r.connectContext.getState().getErrorMessage()), null);
}
- ExportJob.OutfileInfo outfileInfo =
getOutFileInfo(r.connectContext.getResultAttachedInfo());
+ OutfileInfo outfileInfo =
getOutFileInfo(r.connectContext.getResultAttachedInfo());
return new ExportResult(false, null, outfileInfo);
} catch (Exception e) {
return new ExportResult(true, new
ExportFailMsg(ExportFailMsg.CancelType.RUN_FAIL,
@@ -250,8 +250,8 @@ public class ExportExportingTask extends MasterTask {
return new AutoCloseConnectContext(connectContext);
}
- private ExportJob.OutfileInfo getOutFileInfo(Map<String, String>
resultAttachedInfo) {
- ExportJob.OutfileInfo outfileInfo = new ExportJob.OutfileInfo();
+ private OutfileInfo getOutFileInfo(Map<String, String> resultAttachedInfo)
{
+ OutfileInfo outfileInfo = new OutfileInfo();
outfileInfo.setFileNumber(resultAttachedInfo.get(OutFileClause.FILE_NUMBER));
outfileInfo.setTotalRows(resultAttachedInfo.get(OutFileClause.TOTAL_ROWS));
outfileInfo.setFileSize(resultAttachedInfo.get(OutFileClause.FILE_SIZE) +
"bytes");
@@ -274,7 +274,7 @@ public class ExportExportingTask extends MasterTask {
// return;
// }
- if (job.updateState(ExportJob.JobState.EXPORTING)) {
+ if (job.updateState(ExportJobState.EXPORTING)) {
LOG.info("Exchange pending status to exporting status success.
job: {}", job);
}
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java
b/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java
index c06cae0500..1e49f207d0 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java
@@ -23,6 +23,7 @@ import org.apache.doris.common.ExceptionChecker;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.UserException;
import org.apache.doris.load.ExportJob;
+import org.apache.doris.load.ExportJobState;
import org.apache.doris.load.ExportMgr;
import org.apache.doris.utframe.TestWithFeService;
@@ -154,12 +155,12 @@ public class CancelExportStmtTest extends
TestWithFeService {
List<ExportJob> exportJobList2 = Lists.newLinkedList();
ExportJob job1 = new ExportJob();
ExportJob job2 = new ExportJob();
- job2.updateState(ExportJob.JobState.CANCELLED, true);
+ job2.updateState(ExportJobState.CANCELLED, true);
ExportJob job3 = new ExportJob();
- job3.updateState(ExportJob.JobState.EXPORTING, false);
+ job3.updateState(ExportJobState.EXPORTING, false);
ExportJob job4 = new ExportJob();
ExportJob job5 = new ExportJob();
- job5.updateState(ExportJob.JobState.IN_QUEUE, false);
+ job5.updateState(ExportJobState.IN_QUEUE, false);
exportJobList1.add(job1);
exportJobList1.add(job2);
exportJobList1.add(job3);
@@ -188,15 +189,6 @@ public class CancelExportStmtTest extends
TestWithFeService {
Assert.assertTrue(exportJobList1.stream().filter(filter).count() == 1);
- stateStringLiteral = new StringLiteral("IN_QUEUE");
- stateEqPredicate =
- new BinaryPredicate(BinaryPredicate.Operator.EQ, stateSlotRef,
stateStringLiteral);
- stmt = new CancelExportStmt(null, stateEqPredicate);
- stmt.analyze(analyzer);
- filter = ExportMgr.buildCancelJobFilter(stmt);
-
- Assert.assertTrue(exportJobList2.stream().filter(filter).count() == 1);
-
}
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java
b/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java
index 7d1cd5a47a..e7209d58a4 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java
@@ -30,6 +30,7 @@ import org.apache.doris.common.VariableAnnotation;
import org.apache.doris.common.util.ProfileManager;
import org.apache.doris.common.util.RuntimeProfile;
import org.apache.doris.load.ExportJob;
+import org.apache.doris.load.ExportJobState;
import org.apache.doris.task.ExportExportingTask;
import org.apache.doris.thrift.TQueryOptions;
import org.apache.doris.utframe.TestWithFeService;
@@ -171,14 +172,14 @@ public class SessionVariablesTest extends
TestWithFeService {
ExportStmt exportStmt = (ExportStmt)
parseAndAnalyzeStmt("EXPORT TABLE test_d.test_t1 TO
\"file:///tmp/test_t1\"", connectContext);
- ExportJob job = new ExportJob(1234);
- job.setJob(exportStmt);
+ ExportJob job = exportStmt.getExportJob();
+ job.setId(1234);
new Expectations(job) {
{
job.getState();
minTimes = 0;
- result = ExportJob.JobState.EXPORTING;
+ result = ExportJobState.EXPORTING;
}
};
@@ -201,14 +202,14 @@ public class SessionVariablesTest extends
TestWithFeService {
ExportStmt exportStmt = (ExportStmt)
parseAndAnalyzeStmt("EXPORT TABLE test_d.test_t1 TO
\"file:///tmp/test_t1\"", connectContext);
- ExportJob job = new ExportJob(1234);
- job.setJob(exportStmt);
+ ExportJob job = exportStmt.getExportJob();
+ job.setId(1234);
new Expectations(job) {
{
job.getState();
minTimes = 0;
- result = ExportJob.JobState.EXPORTING;
+ result = ExportJobState.EXPORTING;
}
};
diff --git a/regression-test/suites/export_p0/test_export_basic.groovy
b/regression-test/suites/export_p0/test_export_basic.groovy
index 95805db491..162b63065e 100644
--- a/regression-test/suites/export_p0/test_export_basic.groovy
+++ b/regression-test/suites/export_p0/test_export_basic.groovy
@@ -69,7 +69,8 @@ suite("test_export_basic", "p0") {
PARTITION between_20_70 VALUES [("20"),("70")),
PARTITION more_than_70 VALUES LESS THAN ("151")
)
- DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1");
+ DISTRIBUTED BY HASH(id) BUCKETS 3
+ PROPERTIES("replication_num" = "1");
"""
StringBuilder sb = new StringBuilder()
int i = 1
diff --git a/regression-test/suites/export_p2/test_export_with_hdfs.groovy
b/regression-test/suites/export_p2/test_export_with_hdfs.groovy
index 205b1ffd71..3be2794cbd 100644
--- a/regression-test/suites/export_p2/test_export_with_hdfs.groovy
+++ b/regression-test/suites/export_p2/test_export_with_hdfs.groovy
@@ -37,7 +37,8 @@ suite("test_export_with_hdfs", "p2") {
PARTITION between_20_70 VALUES [("20"),("70")),
PARTITION more_than_70 VALUES LESS THAN ("151")
)
- DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1");
+ DISTRIBUTED BY HASH(id) BUCKETS 3
+ PROPERTIES("replication_num" = "1");
"""
StringBuilder sb = new StringBuilder()
int i = 1
@@ -62,8 +63,9 @@ suite("test_export_with_hdfs", "p2") {
if (res[0][2] == "FINISHED") {
def json = parseJson(res[0][11])
assert json instanceof List
- assertEquals("1", json.fileNumber[0])
- return json.url[0];
+ assertEquals("1", json.fileNumber[0][0])
+ log.info("outfile_path: ${json.url[0][0]}")
+ return json.url[0][0];
} else if (res[0][2] == "CANCELLED") {
throw new IllegalStateException("""export failed:
${res[0][10]}""")
} else {
diff --git a/regression-test/suites/export_p2/test_export_with_s3.groovy
b/regression-test/suites/export_p2/test_export_with_s3.groovy
index a26dde3238..82aa831cdd 100644
--- a/regression-test/suites/export_p2/test_export_with_s3.groovy
+++ b/regression-test/suites/export_p2/test_export_with_s3.groovy
@@ -38,7 +38,8 @@ suite("test_export_with_s3", "p2") {
PARTITION between_20_70 VALUES [("20"),("70")),
PARTITION more_than_70 VALUES LESS THAN ("151")
)
- DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1");
+ DISTRIBUTED BY HASH(id) BUCKETS 3
+ PROPERTIES("replication_num" = "1");
"""
StringBuilder sb = new StringBuilder()
int i = 1
@@ -63,9 +64,9 @@ suite("test_export_with_s3", "p2") {
if (res[0][2] == "FINISHED") {
def json = parseJson(res[0][11])
assert json instanceof List
- assertEquals("1", json.fileNumber[0])
- log.info("outfile_path: ${json.url[0]}")
- return json.url[0];
+ assertEquals("1", json.fileNumber[0][0])
+ log.info("outfile_path: ${json.url[0][0]}")
+ return json.url[0][0];
} else if (res[0][2] == "CANCELLED") {
throw new IllegalStateException("""export failed:
${res[0][10]}""")
} else {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]