morningman commented on code in PR #21911:
URL: https://github.com/apache/doris/pull/21911#discussion_r1266955976


##########
fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java:
##########
@@ -119,8 +114,8 @@ public enum JobState {
     private String columnSeparator;
     @SerializedName("lineDelimiter")
     private String lineDelimiter;
-    @SerializedName("partitions")
-    private List<String> partitions;
+    @SerializedName("partitionsNames")

Review Comment:
   It is dangerous to rename the persist obj.
   you can use `alternate = {"xx"}` of `SerializedName` to make it compatible.



##########
fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java:
##########
@@ -266,17 +270,83 @@ private void generateQueryStmt() {
             }
         }
 
-        List<TableRef> tableRefList = Lists.newArrayList();
-        tableRefList.add(this.tableRef);
-        FromClause fromClause = new FromClause(tableRefList);
+        ArrayList<ArrayList<TableRef>> tableRefListPerQuery = 
splitTablets(stmt);
+        LOG.info("Export task is split into {} outfile statements.", 
tableRefListPerQuery.size());
+        for (int i = 0; i < tableRefListPerQuery.size(); i++) {
+            LOG.info("Outfile clause {} is responsible for tables: {}", i,
+                    tableRefListPerQuery.get(i).get(0).getSampleTabletIds());
+        }
+        for (ArrayList<TableRef> tableRefList : tableRefListPerQuery) {
+            FromClause fromClause = new FromClause(tableRefList);
+            // generate outfile clause
+            OutFileClause outfile = new OutFileClause(this.exportPath, 
this.format, convertOutfileProperties());
+            SelectStmt selectStmt = new SelectStmt(list, fromClause, 
this.whereExpr, null,
+                    null, null, LimitElement.NO_LIMIT);
+            selectStmt.setOutFileClause(outfile);
+            selectStmt.setOrigStmt(new OriginStatement(selectStmt.toSql(), 0));
+            selectStmtList.add(selectStmt);
+        }
+    }
+
+    private ArrayList<ArrayList<TableRef>> splitTablets(ExportStmt stmt) 
throws UserException {
+        // get tablets
+        Database db = 
Env.getCurrentEnv().getInternalCatalog().getDbOrAnalysisException(stmt.getTblName().getDb());
+        OlapTable table = 
db.getOlapTableOrAnalysisException(stmt.getTblName().getTbl());
+        List<Long> tabletIdList = Lists.newArrayList();
+        table.readLock();
+        try {
+            Collection<Partition> partitions = new ArrayList<Partition>();
+            // get partitions
+            // user specifies partitions, already checked in ExportStmt
+            if (partitionNames != null) {
+                for (String partName : partitionNames) {
+                    partitions.add(table.getPartition(partName));
+                }
+            } else {
+                partitions = table.getPartitions();
+            }
+
+            // get tablets
+            for (Partition partition : partitions) {
+                partition.getVisibleVersion();
+                partitionToVersion.put(partition.getName(), 
partition.getVisibleVersion());
+                for (MaterializedIndex index : 
partition.getMaterializedIndices(IndexExtState.ALL)) {
+                    tabletIdList.addAll(index.getTabletIdsInOrder());
+                }
+            }
+        } finally {
+            table.readUnlock();
+        }
+
+        Integer tabletsAllNum = tabletIdList.size();
+        Integer tabletsNumPerQuery = tabletsAllNum / this.parallelNum;
+        Integer tabletsNumPerQueryRemainder = tabletsAllNum - 
tabletsNumPerQuery * this.parallelNum;
 
-        SelectStmt selectStmt = new SelectStmt(list, fromClause, 
this.whereExpr, null,
-                null, null, LimitElement.NO_LIMIT);
-        // generate outfile clause
-        OutFileClause outfile = new OutFileClause(this.exportPath, 
this.format, convertOutfileProperties());
-        selectStmt.setOutFileClause(outfile);
-        selectStmt.setOrigStmt(new OriginStatement(selectStmt.toSql(), 0));
-        selectStmtList.add(selectStmt);
+        Integer start = 0;
+
+        ArrayList<ArrayList<TableRef>> tableRefListPerQuery = 
Lists.newArrayList();
+
+        int outfileNum = this.parallelNum;
+        if (tabletsAllNum < this.parallelNum) {
+            outfileNum = tabletsAllNum;
+            LOG.warn("The number of tablets is smaller than parallel_num, set 
parallel_num to tablets num.");
+        }
+        for (int i = 0; i < outfileNum; ++i) {
+            Integer tabletsNum = tabletsNumPerQuery;
+            if (tabletsNumPerQueryRemainder > 0) {
+                tabletsNum = tabletsNum + 1;
+                --tabletsNumPerQueryRemainder;
+            }
+            ArrayList<Long> tablets = new 
ArrayList<>(tabletIdList.subList(start, start + tabletsNum));
+            start += tabletsNum;
+            TableRef tblRef = new TableRef(this.tableRef.getName(), 
this.tableRef.getAlias(),

Review Comment:
   How to make sure that the tablet id selected here match the partition names?



##########
fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java:
##########
@@ -58,15 +58,30 @@
 //          BY BROKER 'broker_name' [( $broker_attrs)]
 public class ExportStmt extends StatementBase {
     private static final Logger LOG = LogManager.getLogger(ExportStmt.class);
-
-    public static final String TABLET_NUMBER_PER_TASK_PROP = 
"tablet_num_per_task";
+    public static final String PARALLEL_NUMBER = "parallel_num";

Review Comment:
   name it `parallelism`



##########
fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java:
##########
@@ -266,17 +270,83 @@ private void generateQueryStmt() {
             }
         }
 
-        List<TableRef> tableRefList = Lists.newArrayList();
-        tableRefList.add(this.tableRef);
-        FromClause fromClause = new FromClause(tableRefList);
+        ArrayList<ArrayList<TableRef>> tableRefListPerQuery = 
splitTablets(stmt);
+        LOG.info("Export task is split into {} outfile statements.", 
tableRefListPerQuery.size());
+        for (int i = 0; i < tableRefListPerQuery.size(); i++) {
+            LOG.info("Outfile clause {} is responsible for tables: {}", i,
+                    tableRefListPerQuery.get(i).get(0).getSampleTabletIds());
+        }
+        for (ArrayList<TableRef> tableRefList : tableRefListPerQuery) {
+            FromClause fromClause = new FromClause(tableRefList);
+            // generate outfile clause
+            OutFileClause outfile = new OutFileClause(this.exportPath, 
this.format, convertOutfileProperties());
+            SelectStmt selectStmt = new SelectStmt(list, fromClause, 
this.whereExpr, null,
+                    null, null, LimitElement.NO_LIMIT);
+            selectStmt.setOutFileClause(outfile);
+            selectStmt.setOrigStmt(new OriginStatement(selectStmt.toSql(), 0));
+            selectStmtList.add(selectStmt);
+        }
+    }
+
+    private ArrayList<ArrayList<TableRef>> splitTablets(ExportStmt stmt) 
throws UserException {
+        // get tablets
+        Database db = 
Env.getCurrentEnv().getInternalCatalog().getDbOrAnalysisException(stmt.getTblName().getDb());
+        OlapTable table = 
db.getOlapTableOrAnalysisException(stmt.getTblName().getTbl());
+        List<Long> tabletIdList = Lists.newArrayList();
+        table.readLock();
+        try {
+            Collection<Partition> partitions = new ArrayList<Partition>();
+            // get partitions
+            // user specifies partitions, already checked in ExportStmt
+            if (partitionNames != null) {
+                for (String partName : partitionNames) {
+                    partitions.add(table.getPartition(partName));
+                }
+            } else {
+                partitions = table.getPartitions();
+            }
+
+            // get tablets
+            for (Partition partition : partitions) {
+                partition.getVisibleVersion();
+                partitionToVersion.put(partition.getName(), 
partition.getVisibleVersion());
+                for (MaterializedIndex index : 
partition.getMaterializedIndices(IndexExtState.ALL)) {

Review Comment:
   ```suggestion
                   for (MaterializedIndex index : 
partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java:
##########
@@ -73,49 +111,105 @@ protected void exec() {
             job.setDoExportingThread(Thread.currentThread());
         }
 
-        List<QueryStmt> selectStmtList = job.getSelectStmtList();
-        boolean isFailed = false;
-        ExportFailMsg errorMsg = null;
+        List<SelectStmt> selectStmtList = job.getSelectStmtList();
         int completeTaskNum = 0;
         List<ExportJob.OutfileInfo> outfileInfoList = Lists.newArrayList();
+
+        int parallelNum = selectStmtList.size();
+        ThreadPoolExecutor exportExecPool = 
ThreadPoolManager.newDaemonFixedThreadPool(parallelNum,

Review Comment:
   Not a safe way to new thread pool for each job. If user submit a lot export 
job, there will be lots of threads and thread pool.



##########
fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java:
##########
@@ -58,15 +58,30 @@
 //          BY BROKER 'broker_name' [( $broker_attrs)]
 public class ExportStmt extends StatementBase {
     private static final Logger LOG = LogManager.getLogger(ExportStmt.class);
-
-    public static final String TABLET_NUMBER_PER_TASK_PROP = 
"tablet_num_per_task";
+    public static final String PARALLEL_NUMBER = "parallel_num";
     public static final String LABEL = "label";
 
     private static final String DEFAULT_COLUMN_SEPARATOR = "\t";
     private static final String DEFAULT_LINE_DELIMITER = "\n";
     private static final String DEFAULT_COLUMNS = "";
+    private static final String DEFAULT_PARALLEL_NUMBER = "1";
+
+    private static final ImmutableSet<String> PROPERTIES_SET = new 
ImmutableSet.Builder<String>()

Review Comment:
   Should be a case insensitive set?



##########
fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java:
##########
@@ -266,17 +270,83 @@ private void generateQueryStmt() {
             }
         }
 
-        List<TableRef> tableRefList = Lists.newArrayList();
-        tableRefList.add(this.tableRef);
-        FromClause fromClause = new FromClause(tableRefList);
+        ArrayList<ArrayList<TableRef>> tableRefListPerQuery = 
splitTablets(stmt);
+        LOG.info("Export task is split into {} outfile statements.", 
tableRefListPerQuery.size());
+        for (int i = 0; i < tableRefListPerQuery.size(); i++) {
+            LOG.info("Outfile clause {} is responsible for tables: {}", i,
+                    tableRefListPerQuery.get(i).get(0).getSampleTabletIds());
+        }
+        for (ArrayList<TableRef> tableRefList : tableRefListPerQuery) {
+            FromClause fromClause = new FromClause(tableRefList);
+            // generate outfile clause
+            OutFileClause outfile = new OutFileClause(this.exportPath, 
this.format, convertOutfileProperties());
+            SelectStmt selectStmt = new SelectStmt(list, fromClause, 
this.whereExpr, null,
+                    null, null, LimitElement.NO_LIMIT);
+            selectStmt.setOutFileClause(outfile);
+            selectStmt.setOrigStmt(new OriginStatement(selectStmt.toSql(), 0));
+            selectStmtList.add(selectStmt);
+        }
+    }
+
+    private ArrayList<ArrayList<TableRef>> splitTablets(ExportStmt stmt) 
throws UserException {
+        // get tablets
+        Database db = 
Env.getCurrentEnv().getInternalCatalog().getDbOrAnalysisException(stmt.getTblName().getDb());
+        OlapTable table = 
db.getOlapTableOrAnalysisException(stmt.getTblName().getTbl());
+        List<Long> tabletIdList = Lists.newArrayList();
+        table.readLock();
+        try {
+            Collection<Partition> partitions = new ArrayList<Partition>();
+            // get partitions
+            // user specifies partitions, already checked in ExportStmt
+            if (partitionNames != null) {
+                for (String partName : partitionNames) {
+                    partitions.add(table.getPartition(partName));
+                }
+            } else {
+                partitions = table.getPartitions();

Review Comment:
   A table may contains thousands of partitions. I think we need to config to 
limit the max number of partition, as a protection



##########
fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java:
##########
@@ -266,17 +270,83 @@ private void generateQueryStmt() {
             }
         }
 
-        List<TableRef> tableRefList = Lists.newArrayList();
-        tableRefList.add(this.tableRef);
-        FromClause fromClause = new FromClause(tableRefList);
+        ArrayList<ArrayList<TableRef>> tableRefListPerQuery = 
splitTablets(stmt);
+        LOG.info("Export task is split into {} outfile statements.", 
tableRefListPerQuery.size());
+        for (int i = 0; i < tableRefListPerQuery.size(); i++) {
+            LOG.info("Outfile clause {} is responsible for tables: {}", i,

Review Comment:
   ```suggestion
               LOG.debug("Outfile clause {} is responsible for tables: {}", i,
   ```
   
   And wrap it as `LOG.isDebugEnable()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to