This is an automated email from the ASF dual-hosted git repository.

critas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new babc2af50ee Replace ExportTsFile By SubscriptionTsFile (#14812)
babc2af50ee is described below

commit babc2af50eef5b4812e125edcf17812e3e0237ca
Author: Summer <[email protected]>
AuthorDate: Fri Feb 21 18:26:16 2025 +0800

    Replace ExportTsFile By SubscriptionTsFile (#14812)
    
    * Replace ExportTsFile By SubscriptionTsFile
    
    * add dependency logback
    
    * update description in options
    
    * update description in options
    
    * it
    
    * Remove the impact on other consumers when one consumer fails to open
    
    * add space
    
    * add space+1
    
    ---------
    
    Co-authored-by: 2b3c511 <[email protected]>
---
 .../apache/iotdb/tools/it/ExportTsFileTestIT.java  |  32 +-
 iotdb-client/cli/pom.xml                           |   4 +
 .../org/apache/iotdb/tool/common/Constants.java    |  30 +-
 .../org/apache/iotdb/tool/common/OptionsUtil.java  | 112 +++++
 .../org/apache/iotdb/tool/tsfile/ExportTsFile.java | 539 +++------------------
 .../subscription/AbstractSubscriptionTsFile.java   |  74 +++
 .../tool/tsfile/subscription/CommonParam.java      | 212 ++++++++
 .../subscription/SubscriptionTableTsFile.java      | 186 +++++++
 .../subscription/SubscriptionTreeTsFile.java       | 183 +++++++
 9 files changed, 884 insertions(+), 488 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/tools/it/ExportTsFileTestIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/tools/it/ExportTsFileTestIT.java
index 57b76e55e81..b85ef9d6adf 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/tools/it/ExportTsFileTestIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/tools/it/ExportTsFileTestIT.java
@@ -76,7 +76,7 @@ public class ExportTsFileTestIT extends AbstractScriptIT {
 
   @Override
   protected void testOnWindows() throws IOException {
-    final String[] output = {"!!!Warning:Tablet is empty,no data can be 
exported."};
+    final String[] output = {"Export TsFile Count: 0"};
     ProcessBuilder builder =
         new ProcessBuilder(
             "cmd.exe",
@@ -90,10 +90,8 @@ public class ExportTsFileTestIT extends AbstractScriptIT {
             "root",
             "-pw",
             "root",
-            "-t",
-            "target",
-            "-q",
-            "select * from root.test.t2 where time > 1 and time < 
1000000000000",
+            "-path",
+            "root.test.t2.**",
             "&",
             "exit",
             "%^errorlevel%");
@@ -102,7 +100,7 @@ public class ExportTsFileTestIT extends AbstractScriptIT {
 
     prepareData();
 
-    final String[] output1 = {"Export completely!"};
+    final String[] output1 = {"Export TsFile Count: "};
     ProcessBuilder builder1 =
         new ProcessBuilder(
             "cmd.exe",
@@ -116,10 +114,8 @@ public class ExportTsFileTestIT extends AbstractScriptIT {
             "root",
             "-pw",
             "root",
-            "-t",
-            "target",
-            "-q",
-            "select * from root.test.t2 where time > 1 and time < 
1000000000000",
+            "-path",
+            "root.test.t2.**",
             "&",
             "exit",
             "%^errorlevel%");
@@ -129,7 +125,7 @@ public class ExportTsFileTestIT extends AbstractScriptIT {
 
   @Override
   protected void testOnUnix() throws IOException {
-    final String[] output = {"!!!Warning:Tablet is empty,no data can be 
exported."};
+    final String[] output = {"Export TsFile Count: 0"};
     // -h 127.0.0.1 -p 6667 -u root -pw root -td ./ -q "select * from root.**"
     ProcessBuilder builder =
         new ProcessBuilder(
@@ -143,16 +139,14 @@ public class ExportTsFileTestIT extends AbstractScriptIT {
             "root",
             "-pw",
             "root",
-            "-t",
-            "target",
-            "-q",
-            "select * from root.**");
+            "-path",
+            "root.**");
     builder.environment().put("CLASSPATH", libPath);
     testOutput(builder, output, 0);
 
     prepareData();
 
-    final String[] output1 = {"Export completely!"};
+    final String[] output1 = {"Export TsFile Count: "};
     // -h 127.0.0.1 -p 6667 -u root -pw root -td ./ -q "select * from root.**"
     ProcessBuilder builder1 =
         new ProcessBuilder(
@@ -166,10 +160,8 @@ public class ExportTsFileTestIT extends AbstractScriptIT {
             "root",
             "-pw",
             "root",
-            "-t",
-            "target",
-            "-q",
-            "select * from root.**");
+            "-path",
+            "root.**");
     builder1.environment().put("CLASSPATH", libPath);
     testOutput(builder1, output1, 0);
   }
diff --git a/iotdb-client/cli/pom.xml b/iotdb-client/cli/pom.xml
index 600684bc4ec..665928d57de 100644
--- a/iotdb-client/cli/pom.xml
+++ b/iotdb-client/cli/pom.xml
@@ -98,6 +98,10 @@
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
         </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.antlr</groupId>
             <artifactId>antlr4-runtime</artifactId>
diff --git 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/common/Constants.java 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/common/Constants.java
index ee86564d733..429ffd143e9 100644
--- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/common/Constants.java
+++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/common/Constants.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.tool.common;
 
 import org.apache.tsfile.enums.TSDataType;
 
+import java.text.SimpleDateFormat;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -95,14 +96,14 @@ public class Constants {
   public static final String DB_ARGS = "db";
   public static final String DB_NAME = "database";
   public static final String DB_DESC =
-      "The database to be exported,Only takes effect when sql_dialect is of 
type table.(optional)";
+      "The database to be exported,only takes effect when sql_dialect is 
table.(optional)";
 
   public static final String TABLE_ARGS = "table";
   public static final String TABLE_DESC =
-      "The table to be exported,only takes effect when sql_dialect is of type 
table";
+      "The table to be exported,only takes effect when sql_dialect is 
table.(optional)";
   public static final String TABLE_DESC_EXPORT =
       TABLE_DESC
-          + ".If the '- q' parameter is specified, this parameter does not 
take effect. If the export type is tsfile or sql, this parameter is required. 
(optional)";
+          + ".If the '-q' parameter is specified, this parameter does not take 
effect. If the export type is tsfile or sql, this parameter is required. 
(optional)";
   public static final String TABLE_DESC_IMPORT = TABLE_DESC + " and file_type 
is csv. (optional)";
 
   public static final String DATATYPE_BOOLEAN = "boolean";
@@ -177,12 +178,14 @@ public class Constants {
   public static final String TARGET_DIR_NAME = "target";
   public static final String TARGET_DIR_ARGS_NAME = "target_directory";
   public static final String TARGET_DIR_DESC = "Target file directory 
(required)";
+  public static final String TARGET_DIR_SUBSCRIPTION_DESC =
+      "Target file directory.default ./target (optional)";
 
   public static final String QUERY_COMMAND_ARGS = "q";
   public static final String QUERY_COMMAND_NAME = "query";
   public static final String QUERY_COMMAND_ARGS_NAME = "query_command";
   public static final String QUERY_COMMAND_DESC =
-      "The query command that you want to execute.If sql-dialect is of type 
table The 'q' parameter is only applicable to export types of CSV, and is not 
available for other types.If the '- q' parameter is not empty, then the 
parameters' creatTime ',' EndTime 'and' table 'are not effective.(optional)";
+      "The query command that you want to execute.If sql_dialect is table The 
'q' parameter is only applicable to export types of CSV, and is not available 
for other types.If the '- q' parameter is not empty, then the parameters' 
creatTime ',' EndTime 'and' table 'are not effective.(optional)";
 
   public static final String TARGET_FILE_ARGS = "pfn";
   public static final String TARGET_FILE_NAME = "prefix_file_name";
@@ -211,6 +214,10 @@ public class Constants {
   public static final String[] TIME_FORMAT =
       new String[] {"default", "long", "number", "timestamp"};
 
+  public static final String PATH_ARGS = "path";
+  public static final String PATH_DESC =
+      "The path to be exported,only takes effect when sql_dialect is 
tree.(optional)";
+
   public static final long memoryThreshold = 10 * 1024 * 1024;
 
   public static final String[] STRING_TIME_FORMAT =
@@ -253,6 +260,21 @@ public class Constants {
         "yyyy.MM.dd'T'HH:mm:ss"
       };
 
+  public static final String SUBSCRIPTION_CLI_PREFIX = "Export TsFile";
+  public static final int MAX_RETRY_TIMES = 2;
+  public static final String LOOSE_RANGE = "";
+  public static final boolean STRICT = false;
+  public static final String MODE = "snapshot";
+  public static final boolean AUTO_COMMIT = false;
+  public static final String TABLE_MODEL = "table";
+  public static final long AUTO_COMMIT_INTERVAL = 5000;
+  public static final long POLL_MESSAGE_TIMEOUT = 10000;
+  public static final String TOPIC_NAME_PREFIX = "topic_";
+  public static final String GROUP_NAME_PREFIX = "group_";
+  public static final String HANDLER = "TsFileHandler";
+  public static final String CONSUMER_NAME_PREFIX = "consumer_";
+  public static final SimpleDateFormat DATE_FORMAT_VIEW = new 
SimpleDateFormat("yyyyMMddHHmmssSSS");
+
   // import constants
   public static final String IMPORT_CLI_PREFIX = "Import Data";
 
diff --git 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/common/OptionsUtil.java 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/common/OptionsUtil.java
index aeddf05e011..bde4831ea9f 100644
--- 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/common/OptionsUtil.java
+++ 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/common/OptionsUtil.java
@@ -912,4 +912,116 @@ public class OptionsUtil extends Constants {
     options.addOption(opTimestampPrecision);
     return options;
   }
+
+  public static Options createSubscriptionTsFileOptions() {
+    Options options = new Options();
+
+    Option opSqlDialect =
+        Option.builder(SQL_DIALECT_ARGS)
+            .longOpt(SQL_DIALECT_ARGS)
+            .argName(SQL_DIALECT_ARGS)
+            .hasArg()
+            .desc(SQL_DIALECT_DESC)
+            .build();
+    options.addOption(opSqlDialect);
+
+    Option opHost =
+        Option.builder(HOST_ARGS)
+            .longOpt(HOST_NAME)
+            .argName(HOST_NAME)
+            .hasArg()
+            .desc(HOST_DESC)
+            .build();
+    options.addOption(opHost);
+
+    Option opPort =
+        Option.builder(PORT_ARGS)
+            .longOpt(PORT_NAME)
+            .argName(PORT_NAME)
+            .hasArg()
+            .desc(PORT_DESC)
+            .build();
+    options.addOption(opPort);
+
+    Option opUsername =
+        Option.builder(USERNAME_ARGS)
+            .longOpt(USERNAME_NAME)
+            .argName(USERNAME_NAME)
+            .hasArg()
+            .desc(USERNAME_DESC)
+            .build();
+    options.addOption(opUsername);
+
+    Option opPassword =
+        Option.builder(PW_ARGS)
+            .longOpt(PW_NAME)
+            .optionalArg(true)
+            .argName(PW_NAME)
+            .hasArg()
+            .desc(PW_DESC)
+            .build();
+    options.addOption(opPassword);
+
+    Option opPath =
+        Option.builder(PATH_ARGS)
+            .longOpt(PATH_ARGS)
+            .argName(PATH_ARGS)
+            .hasArg()
+            .desc(PATH_DESC)
+            .build();
+    options.addOption(opPath);
+
+    Option opDatabase =
+        
Option.builder(DB_ARGS).longOpt(DB_NAME).argName(DB_ARGS).hasArg().desc(DB_DESC).build();
+    options.addOption(opDatabase);
+
+    Option opTable =
+        Option.builder(TABLE_ARGS)
+            .longOpt(TABLE_ARGS)
+            .argName(TABLE_ARGS)
+            .hasArg()
+            .desc(TABLE_DESC)
+            .build();
+    options.addOption(opTable);
+
+    Option opStartTime =
+        Option.builder(START_TIME_ARGS)
+            .longOpt(START_TIME_ARGS)
+            .argName(START_TIME_ARGS)
+            .hasArg()
+            .desc(START_TIME_DESC)
+            .build();
+    options.addOption(opStartTime);
+
+    Option opEndTime =
+        Option.builder(END_TIME_ARGS)
+            .longOpt(END_TIME_ARGS)
+            .argName(END_TIME_ARGS)
+            .hasArg()
+            .desc(END_TIME_DESC)
+            .build();
+    options.addOption(opEndTime);
+
+    Option opFile =
+        Option.builder(TARGET_DIR_ARGS)
+            .longOpt(TARGET_DIR_NAME)
+            .argName(TARGET_DIR_ARGS_NAME)
+            .hasArg()
+            .desc(TARGET_DIR_SUBSCRIPTION_DESC)
+            .build();
+    options.addOption(opFile);
+
+    Option opThreadNum =
+        Option.builder(THREAD_NUM_ARGS)
+            .longOpt(THREAD_NUM_NAME)
+            .argName(THREAD_NUM_NAME)
+            .hasArg()
+            .desc(THREAD_NUM_DESC)
+            .build();
+    options.addOption(opThreadNum);
+
+    Option opHelp = 
Option.builder(HELP_ARGS).longOpt(HELP_ARGS).hasArg().desc(HELP_DESC).build();
+    options.addOption(opHelp);
+    return options;
+  }
 }
diff --git 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ExportTsFile.java 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ExportTsFile.java
index 7be26207b16..71794ce244c 100644
--- 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ExportTsFile.java
+++ 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ExportTsFile.java
@@ -19,495 +19,106 @@
 
 package org.apache.iotdb.tool.tsfile;
 
-import org.apache.iotdb.cli.type.ExitType;
-import org.apache.iotdb.cli.utils.CliContext;
 import org.apache.iotdb.cli.utils.IoTPrinter;
-import org.apache.iotdb.cli.utils.JlineUtils;
-import org.apache.iotdb.exception.ArgsErrorException;
-import org.apache.iotdb.isession.SessionDataSet;
-import org.apache.iotdb.rpc.IoTDBConnectionException;
-import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.session.Session;
+import org.apache.iotdb.tool.common.Constants;
+import org.apache.iotdb.tool.common.OptionsUtil;
+import org.apache.iotdb.tool.tsfile.subscription.AbstractSubscriptionTsFile;
+import org.apache.iotdb.tool.tsfile.subscription.CommonParam;
 
+import ch.qos.logback.classic.Level;
+import ch.qos.logback.classic.Logger;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.DefaultParser;
 import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
-import org.apache.tsfile.enums.TSDataType;
-import org.apache.tsfile.exception.write.WriteProcessException;
-import org.apache.tsfile.file.metadata.enums.CompressionType;
-import org.apache.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.tsfile.fileSystem.FSFactoryProducer;
-import org.apache.tsfile.read.common.Field;
-import org.apache.tsfile.read.common.Path;
-import org.apache.tsfile.read.common.RowRecord;
-import org.apache.tsfile.write.TsFileWriter;
-import org.apache.tsfile.write.record.Tablet;
-import org.apache.tsfile.write.schema.IMeasurementSchema;
-import org.apache.tsfile.write.schema.MeasurementSchema;
-import org.jline.reader.LineReader;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.LoggerFactory;
 
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
-public class ExportTsFile extends AbstractTsFileTool {
-
-  private static final String TARGET_DIR_ARGS = "t";
-  private static final String TARGET_DIR_NAME = "targetDirectory";
-  private static final String TARGET_DIR_NAME_BACK = "target";
-
-  private static final String TARGET_FILE_ARGS = "tfn";
-  private static final String TARGET_FILE_NAME = "targetFileName";
-  private static final String TARGET_FILE_ARGS_BACK = "pfn";
-
-  private static final String SQL_FILE_ARGS = "s";
-  private static final String SQL_FILE_NAME = "sourceSqlFile";
-  private static final String QUERY_COMMAND_ARGS = "q";
-  private static final String QUERY_COMMAND_NAME = "queryCommand";
-  private static final String DUMP_FILE_NAME_DEFAULT = "dump";
-  private static final String TSFILEDB_CLI_PREFIX = "ExportTsFile";
-
-  private static String targetDirectory;
-  private static String targetFile = DUMP_FILE_NAME_DEFAULT;
-  private static String queryCommand;
-  private static String sqlFile;
-
-  private static long timeout = -1;
+public class ExportTsFile {
 
   private static final IoTPrinter ioTPrinter = new IoTPrinter(System.out);
-
-  @SuppressWarnings({
-    "squid:S3776",
-    "squid:S2093"
-  }) // Suppress high Cognitive Complexity warning, ignore try-with-resources
-  /* main function of export tsFile tool. */
-  public static void main(String[] args) {
-    int exitCode = getCommandLine(args);
-    exportTsfile(exitCode);
-  }
-
-  public static void exportTsfile(int exitCode) {
-    try {
-      session = new Session(host, Integer.parseInt(port), username, password);
-      session.open(false);
-
-      if (queryCommand == null) {
-        String sql;
-
-        if (sqlFile == null) {
-          LineReader lineReader =
-              JlineUtils.getLineReader(
-                  new CliContext(System.in, System.out, System.err, 
ExitType.EXCEPTION),
-                  username,
-                  host,
-                  port);
-          sql = lineReader.readLine(TSFILEDB_CLI_PREFIX + "> please input 
query: ");
-          ioTPrinter.println(sql);
-          String[] values = sql.trim().split(";");
-          for (int i = 0; i < values.length; i++) {
-            legalCheck(values[i]);
-            dumpResult(values[i], i);
-          }
-
-        } else {
-          dumpFromSqlFile(sqlFile);
-        }
-      } else {
-        legalCheck(queryCommand);
-        dumpResult(queryCommand, 0);
+  private static CommonParam commonParam = CommonParam.getInstance();
+
+  public static void main(String[] args) throws Exception {
+    Logger logger =
+        (Logger) 
LoggerFactory.getLogger("org.apache.iotdb.session.subscription.consumer.base");
+    logger.setLevel(Level.ERROR);
+    Options options = OptionsUtil.createSubscriptionTsFileOptions();
+    parseParams(args, options);
+    if (StringUtils.isEmpty(commonParam.getPath())) {
+      commonParam.setSqlDialect(Constants.TABLE_MODEL);
+    }
+    AbstractSubscriptionTsFile.setSubscriptionSession();
+    String nowFormat = 
Constants.DATE_FORMAT_VIEW.format(System.currentTimeMillis());
+    String topicName = Constants.TOPIC_NAME_PREFIX + nowFormat;
+    String groupId = Constants.GROUP_NAME_PREFIX + nowFormat;
+    commonParam.getSubscriptionTsFile().createTopics(topicName);
+    commonParam.getSubscriptionTsFile().createConsumers(groupId);
+    commonParam.getSubscriptionTsFile().subscribe(topicName);
+    ExecutorService executor = 
Executors.newFixedThreadPool(commonParam.getConsumerCount());
+    commonParam.getSubscriptionTsFile().consumerPoll(executor, topicName);
+    executor.shutdown();
+    while (true) {
+      if (executor.isTerminated()) {
+        break;
       }
-
-    } catch (IOException e) {
-      ioTPrinter.println("Failed to operate on file, because " + 
e.getMessage());
-      exitCode = CODE_ERROR;
-    } catch (IoTDBConnectionException e) {
-      ioTPrinter.println("Connect failed because " + e.getMessage());
-      exitCode = CODE_ERROR;
-    } finally {
-      if (session != null) {
-        try {
-          session.close();
-        } catch (IoTDBConnectionException e) {
-          exitCode = CODE_ERROR;
-          ioTPrinter.println(
-              "Encounter an error when closing session, error is: " + 
e.getMessage());
-        }
-      }
-    }
-    System.exit(exitCode);
-  }
-
-  public ExportTsFile(CommandLine commandLine) {
-    try {
-      parseBasicParams(commandLine);
-      parseSpecialParamsBack(commandLine);
-    } catch (ArgsErrorException e) {
-      ioTPrinter.println("Invalid args: " + e.getMessage());
-      System.exit(CODE_ERROR);
     }
+    commonParam.getSubscriptionTsFile().doClean();
+    ioTPrinter.println("Export TsFile Count: " + 
commonParam.getCountFile().get());
   }
 
-  protected static int getCommandLine(String[] args) {
-    createOptions();
+  private static void parseParams(String[] args, Options options) {
     HelpFormatter hf = new HelpFormatter();
-    CommandLine commandLine = null;
-    CommandLineParser parser = new DefaultParser();
-    hf.setOptionComparator(null); // avoid reordering
-    hf.setWidth(MAX_HELP_CONSOLE_WIDTH);
-
-    if (args == null || args.length == 0) {
-      ioTPrinter.println("Too few params input, please check the following 
hint.");
-      hf.printHelp(TSFILEDB_CLI_PREFIX, options, true);
-      System.exit(CODE_ERROR);
-    }
+    CommandLine cli = null;
+    CommandLineParser cliParser = new DefaultParser();
     try {
-      commandLine = parser.parse(options, args);
-    } catch (ParseException e) {
-      ioTPrinter.println(e.getMessage());
-      hf.printHelp(TSFILEDB_CLI_PREFIX, options, true);
-      System.exit(CODE_ERROR);
-    }
-    if (commandLine.hasOption(HELP_ARGS)) {
-      hf.printHelp(TSFILEDB_CLI_PREFIX, options, true);
-      System.exit(CODE_ERROR);
-    }
-    int exitCode = CODE_OK;
-    try {
-      parseBasicParams(commandLine);
-      parseSpecialParams(commandLine);
-    } catch (ArgsErrorException e) {
-      ioTPrinter.println("Invalid args: " + e.getMessage());
-      exitCode = CODE_ERROR;
-    }
-    return exitCode;
-  }
-
-  private static void legalCheck(String sql) {
-    String sqlLower = sql.toLowerCase();
-    if (sqlLower.contains("count(")
-        || sqlLower.contains("sum(")
-        || sqlLower.contains("avg(")
-        || sqlLower.contains("extreme(")
-        || sqlLower.contains("max_value(")
-        || sqlLower.contains("min_value(")
-        || sqlLower.contains("first_value(")
-        || sqlLower.contains("last_value(")
-        || sqlLower.contains("max_time(")
-        || sqlLower.contains("min_time(")
-        || sqlLower.contains("stddev(")
-        || sqlLower.contains("stddev_pop(")
-        || sqlLower.contains("stddev_samp(")
-        || sqlLower.contains("variance(")
-        || sqlLower.contains("var_pop(")
-        || sqlLower.contains("var_samp(")
-        || sqlLower.contains("max_by(")
-        || sqlLower.contains("min_by(")) {
-      ioTPrinter.println("The sql you entered is invalid, please don't use 
aggregate query.");
-      System.exit(CODE_ERROR);
-    }
-  }
-
-  private static void parseSpecialParams(CommandLine commandLine) throws 
ArgsErrorException {
-    targetDirectory = checkRequiredArg(TARGET_DIR_ARGS, TARGET_DIR_NAME, 
commandLine);
-    queryCommand = commandLine.getOptionValue(QUERY_COMMAND_ARGS);
-    targetFile = commandLine.getOptionValue(TARGET_FILE_ARGS);
-    sqlFile = commandLine.getOptionValue(SQL_FILE_ARGS);
-    String timeoutString = commandLine.getOptionValue(TIMEOUT_ARGS);
-    if (timeoutString != null) {
-      timeout = Long.parseLong(timeoutString);
-    }
-    if (targetFile == null) {
-      targetFile = DUMP_FILE_NAME_DEFAULT;
-    }
-
-    if (!targetDirectory.endsWith("/") && !targetDirectory.endsWith("\\")) {
-      targetDirectory += File.separator;
-    }
-  }
-
-  private static void parseSpecialParamsBack(CommandLine commandLine) throws 
ArgsErrorException {
-    targetDirectory = checkRequiredArg(TARGET_DIR_ARGS, TARGET_DIR_NAME_BACK, 
commandLine);
-    queryCommand = commandLine.getOptionValue(QUERY_COMMAND_ARGS);
-    targetFile = commandLine.getOptionValue(TARGET_FILE_ARGS_BACK);
-    String timeoutString = commandLine.getOptionValue(TIMEOUT_ARGS);
-    if (timeoutString != null) {
-      timeout = Long.parseLong(timeoutString);
-    }
-    if (targetFile == null) {
-      targetFile = DUMP_FILE_NAME_DEFAULT;
-    }
-
-    if (!targetDirectory.endsWith("/") && !targetDirectory.endsWith("\\")) {
-      targetDirectory += File.separator;
-    }
-  }
-
-  /**
-   * commandline option create.
-   *
-   * @return object Options
-   */
-  private static void createOptions() {
-    createBaseOptions();
-
-    Option opTargetFile =
-        Option.builder(TARGET_DIR_ARGS)
-            .required()
-            .argName(TARGET_DIR_NAME)
-            .hasArg()
-            .desc("Target File Directory (required)")
-            .build();
-    options.addOption(opTargetFile);
-
-    Option targetFileName =
-        Option.builder(TARGET_FILE_ARGS)
-            .argName(TARGET_FILE_NAME)
-            .hasArg()
-            .desc("Export file name (optional)")
-            .build();
-    options.addOption(targetFileName);
-
-    Option opSqlFile =
-        Option.builder(SQL_FILE_ARGS)
-            .argName(SQL_FILE_NAME)
-            .hasArg()
-            .desc("SQL File Path (optional)")
-            .build();
-    options.addOption(opSqlFile);
-
-    Option opQuery =
-        Option.builder(QUERY_COMMAND_ARGS)
-            .argName(QUERY_COMMAND_NAME)
-            .hasArg()
-            .desc("The query command that you want to execute. (optional)")
-            .build();
-    options.addOption(opQuery);
-
-    Option opHelp =
-        Option.builder(HELP_ARGS)
-            .longOpt(HELP_ARGS)
-            .hasArg(false)
-            .desc("Display help information")
-            .build();
-    options.addOption(opHelp);
-
-    Option opTimeout =
-        Option.builder(TIMEOUT_ARGS)
-            .argName(TIMEOUT_NAME)
-            .hasArg()
-            .desc("Timeout for session query")
-            .build();
-    options.addOption(opTimeout);
-  }
-
-  /**
-   * This method will be called, if the query commands are written in a sql 
file.
-   *
-   * @param filePath:file path
-   * @throws IOException: exception
-   */
-  private static void dumpFromSqlFile(String filePath) throws IOException {
-    try (BufferedReader reader = new BufferedReader(new FileReader(filePath))) 
{
-      String sql;
-      int i = 0;
-      while ((sql = reader.readLine()) != null) {
-        legalCheck(sql);
-        dumpResult(sql, i++);
+      cli = cliParser.parse(options, args);
+      if (cli.hasOption(Constants.SQL_DIALECT_ARGS)) {
+        
commonParam.setSqlDialect(cli.getOptionValue(Constants.SQL_DIALECT_ARGS));
       }
-    }
-  }
-
-  /**
-   * Dump files from database to tsFile.
-   *
-   * @param sql export the result of executing the sql
-   */
-  private static void dumpResult(String sql, int index) {
-    final String path = targetDirectory + targetFile + index + ".tsfile";
-    try (SessionDataSet sessionDataSet = session.executeQueryStatement(sql, 
timeout)) {
-      long start = System.currentTimeMillis();
-      boolean isComplete = writeWithTablets(sessionDataSet, path);
-      if (isComplete) {
-        long end = System.currentTimeMillis();
-        ioTPrinter.println("Export completely!cost: " + (end - start) + " 
ms.");
+      if (cli.hasOption(Constants.HOST_ARGS)) {
+        commonParam.setSrcHost(cli.getOptionValue(Constants.HOST_ARGS));
       }
-    } catch (StatementExecutionException
-        | IoTDBConnectionException
-        | IOException
-        | WriteProcessException e) {
-      ioTPrinter.println("Cannot dump result because: " + e.getMessage());
-    }
-  }
-
-  private static void collectSchemas(
-      List<String> columnNames,
-      List<String> columnTypes,
-      Map<String, List<IMeasurementSchema>> deviceSchemaMap,
-      Set<String> alignedDevices,
-      Map<String, List<Integer>> deviceColumnIndices)
-      throws IoTDBConnectionException, StatementExecutionException {
-    for (int i = 0; i < columnNames.size(); i++) {
-      String column = columnNames.get(i);
-      if (!column.startsWith("root.")) {
-        continue;
+      if (cli.hasOption(Constants.PORT_ARGS)) {
+        
commonParam.setSrcPort(Integer.valueOf(cli.getOptionValue(Constants.PORT_ARGS)));
       }
-      TSDataType tsDataType = getTsDataType(columnTypes.get(i));
-      Path path = new Path(column, true);
-      String deviceId = path.getDeviceString();
-      // query whether the device is aligned or not
-      try (SessionDataSet deviceDataSet =
-          session.executeQueryStatement("show devices " + deviceId, timeout)) {
-        List<Field> deviceList = deviceDataSet.next().getFields();
-        if (deviceList.size() > 1 && 
"true".equals(deviceList.get(1).getStringValue())) {
-          alignedDevices.add(deviceId);
-        }
+      if (cli.hasOption(Constants.USERNAME_ARGS)) {
+        
commonParam.setSrcUserName(cli.getOptionValue(Constants.USERNAME_ARGS));
       }
-
-      // query timeseries metadata
-      MeasurementSchema measurementSchema =
-          new MeasurementSchema(path.getMeasurement(), tsDataType);
-      List<Field> seriesList =
-          session.executeQueryStatement("show timeseries " + column, 
timeout).next().getFields();
-      
measurementSchema.setEncoding(TSEncoding.valueOf(seriesList.get(4).getStringValue()));
-      measurementSchema.setCompressionType(
-          CompressionType.valueOf(seriesList.get(5).getStringValue()));
-
-      deviceSchemaMap.computeIfAbsent(deviceId, key -> new 
ArrayList<>()).add(measurementSchema);
-      deviceColumnIndices.computeIfAbsent(deviceId, key -> new 
ArrayList<>()).add(i);
-    }
-  }
-
-  private static List<Tablet> constructTablets(
-      Map<String, List<IMeasurementSchema>> deviceSchemaMap,
-      Set<String> alignedDevices,
-      TsFileWriter tsFileWriter)
-      throws WriteProcessException {
-    List<Tablet> tabletList = new ArrayList<>(deviceSchemaMap.size());
-    for (Map.Entry<String, List<IMeasurementSchema>> stringListEntry : 
deviceSchemaMap.entrySet()) {
-      String deviceId = stringListEntry.getKey();
-      List<IMeasurementSchema> schemaList = stringListEntry.getValue();
-      Tablet tablet = new Tablet(deviceId, schemaList);
-      tablet.initBitMaps();
-      Path path = new Path(tablet.getDeviceId());
-      if (alignedDevices.contains(tablet.getDeviceId())) {
-        tsFileWriter.registerAlignedTimeseries(path, schemaList);
-      } else {
-        tsFileWriter.registerTimeseries(path, schemaList);
+      if (cli.hasOption(Constants.PW_ARGS)) {
+        commonParam.setSrcPassword(cli.getOptionValue(Constants.PW_ARGS));
       }
-      tabletList.add(tablet);
-    }
-    return tabletList;
-  }
-
-  private static void writeWithTablets(
-      SessionDataSet sessionDataSet,
-      List<Tablet> tabletList,
-      Set<String> alignedDevices,
-      TsFileWriter tsFileWriter,
-      Map<String, List<Integer>> deviceColumnIndices)
-      throws IoTDBConnectionException,
-          StatementExecutionException,
-          IOException,
-          WriteProcessException {
-    while (sessionDataSet.hasNext()) {
-      RowRecord rowRecord = sessionDataSet.next();
-      List<Field> fields = rowRecord.getFields();
-
-      for (Tablet tablet : tabletList) {
-        String deviceId = tablet.getDeviceId();
-        List<Integer> columnIndices = deviceColumnIndices.get(deviceId);
-        int rowIndex = tablet.getRowSize();
-        tablet.addTimestamp(rowIndex, rowRecord.getTimestamp());
-        List<IMeasurementSchema> schemas = tablet.getSchemas();
-
-        for (int i = 0, columnIndicesSize = columnIndices.size(); i < 
columnIndicesSize; i++) {
-          Integer columnIndex = columnIndices.get(i);
-          IMeasurementSchema measurementSchema = schemas.get(i);
-          // -1 for time not in fields
-          Object value = fields.get(columnIndex - 
1).getObjectValue(measurementSchema.getType());
-          tablet.addValue(measurementSchema.getMeasurementName(), rowIndex, 
value);
-        }
-
-        if (tablet.getRowSize() == tablet.getMaxRowNumber()) {
-          writeToTsFile(alignedDevices, tsFileWriter, tablet);
-          tablet.initBitMaps();
-          tablet.reset();
-        }
+      if (cli.hasOption(Constants.PATH_ARGS)) {
+        commonParam.setPath(cli.getOptionValue(Constants.PATH_ARGS));
       }
-    }
-
-    for (Tablet tablet : tabletList) {
-      if (tablet.getRowSize() != 0) {
-        writeToTsFile(alignedDevices, tsFileWriter, tablet);
+      if (cli.hasOption(Constants.DB_ARGS)) {
+        commonParam.setDatabase(cli.getOptionValue(Constants.DB_ARGS));
       }
-    }
-  }
-
-  @SuppressWarnings({
-    "squid:S3776",
-    "squid:S6541"
-  }) // Suppress high Cognitive Complexity warning, Suppress many task in one 
method warning
-  public static Boolean writeWithTablets(SessionDataSet sessionDataSet, String 
filePath)
-      throws IOException,
-          IoTDBConnectionException,
-          StatementExecutionException,
-          WriteProcessException {
-    List<String> columnNames = sessionDataSet.getColumnNames();
-    List<String> columnTypes = sessionDataSet.getColumnTypes();
-    File f = FSFactoryProducer.getFSFactory().getFile(filePath);
-    if (f.exists()) {
-      Files.delete(f.toPath());
-    }
-    boolean isEmpty = false;
-    try (TsFileWriter tsFileWriter = new TsFileWriter(f)) {
-      // device -> column indices in columnNames
-      Map<String, List<Integer>> deviceColumnIndices = new HashMap<>();
-      Set<String> alignedDevices = new HashSet<>();
-      Map<String, List<IMeasurementSchema>> deviceSchemaMap = new 
LinkedHashMap<>();
-
-      collectSchemas(
-          columnNames, columnTypes, deviceSchemaMap, alignedDevices, 
deviceColumnIndices);
-
-      List<Tablet> tabletList = constructTablets(deviceSchemaMap, 
alignedDevices, tsFileWriter);
-
-      if (!tabletList.isEmpty()) {
-        writeWithTablets(
-            sessionDataSet, tabletList, alignedDevices, tsFileWriter, 
deviceColumnIndices);
-        tsFileWriter.flush();
-      } else {
-        isEmpty = true;
+      if (cli.hasOption(Constants.TABLE_ARGS)) {
+        commonParam.setTable(cli.getOptionValue(Constants.TABLE_ARGS));
       }
+      if (cli.hasOption(Constants.TARGET_DIR_ARGS)) {
+        
commonParam.setTargetDir(cli.getOptionValue(Constants.TARGET_DIR_ARGS));
+      }
+      if (cli.hasOption(Constants.START_TIME_ARGS)) {
+        
commonParam.setStartTime(cli.getOptionValue(Constants.START_TIME_ARGS));
+      }
+      if (cli.hasOption(Constants.END_TIME_ARGS)) {
+        commonParam.setEndTime(cli.getOptionValue(Constants.END_TIME_ARGS));
+      }
+      if (cli.hasOption(Constants.THREAD_NUM_ARGS)) {
+        commonParam.setConsumerCount(
+            Integer.valueOf(cli.getOptionValue(Constants.THREAD_NUM_ARGS)));
+      }
+    } catch (ParseException e) {
+      ioTPrinter.println(e.getMessage());
+      hf.printHelp(Constants.SUBSCRIPTION_CLI_PREFIX, options, true);
+      System.exit(Constants.CODE_ERROR);
     }
-    if (isEmpty) {
-      ioTPrinter.println("!!!Warning:Tablet is empty,no data can be 
exported.");
-      return false;
-    }
-    return true;
-  }
-
-  private static void writeToTsFile(
-      Set<String> deviceFilterSet, TsFileWriter tsFileWriter, Tablet tablet)
-      throws IOException, WriteProcessException {
-    if (deviceFilterSet.contains(tablet.getDeviceId())) {
-      tsFileWriter.writeAligned(tablet);
-    } else {
-      tsFileWriter.writeTree(tablet);
-    }
-  }
-
-  private static TSDataType getTsDataType(String type) {
-    return TSDataType.valueOf(type);
   }
 }
diff --git 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/subscription/AbstractSubscriptionTsFile.java
 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/subscription/AbstractSubscriptionTsFile.java
new file mode 100644
index 00000000000..60d38af45f2
--- /dev/null
+++ 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/subscription/AbstractSubscriptionTsFile.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tool.tsfile.subscription;
+
+import org.apache.iotdb.cli.utils.IoTPrinter;
+import org.apache.iotdb.isession.SessionConfig;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.subscription.SubscriptionTableSessionBuilder;
+import org.apache.iotdb.session.subscription.SubscriptionTreeSessionBuilder;
+import org.apache.iotdb.tool.common.Constants;
+
+import java.util.concurrent.ExecutorService;
+
+public abstract class AbstractSubscriptionTsFile {
+
+  protected static final IoTPrinter ioTPrinter = new IoTPrinter(System.out);
+  protected static CommonParam commonParam = CommonParam.getInstance();
+
+  public static void setSubscriptionSession() throws IoTDBConnectionException {
+    if (Constants.TABLE_MODEL.equalsIgnoreCase(commonParam.getSqlDialect())) {
+      commonParam.setSubscriptionTsFile(new SubscriptionTableTsFile());
+      commonParam.setTableSubs(
+          new SubscriptionTableSessionBuilder()
+              .host(commonParam.getSrcHost())
+              .port(commonParam.getSrcPort())
+              .username(commonParam.getSrcUserName())
+              .password(commonParam.getSrcPassword())
+              .thriftMaxFrameSize(SessionConfig.DEFAULT_MAX_FRAME_SIZE)
+              .build());
+      commonParam.getTableSubs().open();
+    } else {
+      commonParam.setSubscriptionTsFile(new SubscriptionTreeTsFile());
+      commonParam.setTreeSubs(
+          new SubscriptionTreeSessionBuilder()
+              .host(commonParam.getSrcHost())
+              .port(commonParam.getSrcPort())
+              .username(commonParam.getSrcUserName())
+              .password(commonParam.getSrcPassword())
+              .thriftMaxFrameSize(SessionConfig.DEFAULT_MAX_FRAME_SIZE)
+              .build());
+      commonParam.getTreeSubs().open();
+    }
+  }
+
+  public abstract void createTopics(String topicName)
+      throws IoTDBConnectionException, StatementExecutionException;
+
+  public abstract void doClean() throws Exception;
+
+  public abstract void createConsumers(String groupId);
+
+  public abstract void subscribe(String topicName)
+      throws IoTDBConnectionException, StatementExecutionException;
+
+  public abstract void consumerPoll(ExecutorService executor, String 
topicName);
+}
diff --git 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/subscription/CommonParam.java
 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/subscription/CommonParam.java
new file mode 100644
index 00000000000..34b26a6db02
--- /dev/null
+++ 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/subscription/CommonParam.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tool.tsfile.subscription;
+
+import org.apache.iotdb.session.subscription.ISubscriptionTableSession;
+import org.apache.iotdb.session.subscription.ISubscriptionTreeSession;
+import 
org.apache.iotdb.session.subscription.consumer.ISubscriptionTablePullConsumer;
+import 
org.apache.iotdb.session.subscription.consumer.tree.SubscriptionTreePullConsumer;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class CommonParam {
+
+  private static AtomicInteger countFile = new AtomicInteger(0);
+  private static String path = "root.**";
+  private static String table = ".*";
+  private static String database = ".*";
+  private static int startIndex = 0;
+  private static int consumerCount = 8;
+  private static ISubscriptionTableSession tableSubs;
+  private static ISubscriptionTreeSession treeSubs;
+
+  private static String pathFull = "root.**";
+  private static String srcHost = "127.0.0.1";
+  private static int srcPort = 6667;
+  private static String srcUserName = "root";
+  private static String srcPassword = "root";
+  private static String sqlDialect = "tree";
+  private static String startTime = "";
+  private static String endTime = "";
+  private static String targetDir = "target";
+  private static List<ISubscriptionTablePullConsumer> pullTableConsumers;
+  private static List<SubscriptionTreePullConsumer> pullTreeConsumers;
+
+  private static AbstractSubscriptionTsFile subscriptionTsFile;
+
+  private static CommonParam instance;
+
+  public static synchronized CommonParam getInstance() {
+    if (null == instance) {
+      instance = new CommonParam();
+    }
+    return instance;
+  }
+
+  public static String getPath() {
+    return path;
+  }
+
+  public static void setPath(String path) {
+    CommonParam.path = path;
+  }
+
+  public static String getTable() {
+    return table;
+  }
+
+  public static void setTable(String table) {
+    CommonParam.table = table;
+  }
+
+  public static String getDatabase() {
+    return database;
+  }
+
+  public static void setDatabase(String database) {
+    CommonParam.database = database;
+  }
+
+  public static int getStartIndex() {
+    return startIndex;
+  }
+
+  public static int getConsumerCount() {
+    return consumerCount;
+  }
+
+  public static void setConsumerCount(int consumerCount) {
+    CommonParam.consumerCount = consumerCount;
+  }
+
+  public static ISubscriptionTableSession getTableSubs() {
+    return tableSubs;
+  }
+
+  public static void setTableSubs(ISubscriptionTableSession tableSubs) {
+    CommonParam.tableSubs = tableSubs;
+  }
+
+  public static ISubscriptionTreeSession getTreeSubs() {
+    return treeSubs;
+  }
+
+  public static void setTreeSubs(ISubscriptionTreeSession treeSubs) {
+    CommonParam.treeSubs = treeSubs;
+  }
+
+  public static String getPathFull() {
+    return pathFull;
+  }
+
+  public static String getSrcHost() {
+    return srcHost;
+  }
+
+  public static void setSrcHost(String srcHost) {
+    CommonParam.srcHost = srcHost;
+  }
+
+  public static int getSrcPort() {
+    return srcPort;
+  }
+
+  public static void setSrcPort(int srcPort) {
+    CommonParam.srcPort = srcPort;
+  }
+
+  public static String getSrcUserName() {
+    return srcUserName;
+  }
+
+  public static void setSrcUserName(String srcUserName) {
+    CommonParam.srcUserName = srcUserName;
+  }
+
+  public static String getSrcPassword() {
+    return srcPassword;
+  }
+
+  public static void setSrcPassword(String srcPassword) {
+    CommonParam.srcPassword = srcPassword;
+  }
+
+  public static String getSqlDialect() {
+    return sqlDialect;
+  }
+
+  public static void setSqlDialect(String sqlDialect) {
+    CommonParam.sqlDialect = sqlDialect;
+  }
+
+  public static String getStartTime() {
+    return startTime;
+  }
+
+  public static void setStartTime(String startTime) {
+    CommonParam.startTime = startTime;
+  }
+
+  public static String getEndTime() {
+    return endTime;
+  }
+
+  public static void setEndTime(String endTime) {
+    CommonParam.endTime = endTime;
+  }
+
+  public static String getTargetDir() {
+    return targetDir;
+  }
+
+  public static void setTargetDir(String targetDir) {
+    CommonParam.targetDir = targetDir;
+  }
+
+  public static List<ISubscriptionTablePullConsumer> getPullTableConsumers() {
+    return pullTableConsumers;
+  }
+
+  public static void setPullTableConsumers(
+      List<ISubscriptionTablePullConsumer> pullTableConsumers) {
+    CommonParam.pullTableConsumers = pullTableConsumers;
+  }
+
+  public static List<SubscriptionTreePullConsumer> getPullTreeConsumers() {
+    return pullTreeConsumers;
+  }
+
+  public static void setPullTreeConsumers(List<SubscriptionTreePullConsumer> 
pullTreeConsumers) {
+    CommonParam.pullTreeConsumers = pullTreeConsumers;
+  }
+
+  public static AbstractSubscriptionTsFile getSubscriptionTsFile() {
+    return subscriptionTsFile;
+  }
+
+  public static void setSubscriptionTsFile(AbstractSubscriptionTsFile 
subscriptionTsFile) {
+    CommonParam.subscriptionTsFile = subscriptionTsFile;
+  }
+
+  public static AtomicInteger getCountFile() {
+    return countFile;
+  }
+}
diff --git 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/subscription/SubscriptionTableTsFile.java
 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/subscription/SubscriptionTableTsFile.java
new file mode 100644
index 00000000000..0c9c3268c94
--- /dev/null
+++ 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/subscription/SubscriptionTableTsFile.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tool.tsfile.subscription;
+
+import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.rpc.subscription.config.TopicConstant;
+import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
+import 
org.apache.iotdb.session.subscription.consumer.ISubscriptionTablePullConsumer;
+import 
org.apache.iotdb.session.subscription.consumer.table.SubscriptionTablePullConsumer;
+import 
org.apache.iotdb.session.subscription.consumer.table.SubscriptionTablePullConsumerBuilder;
+import org.apache.iotdb.session.subscription.model.Topic;
+import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
+import org.apache.iotdb.session.subscription.payload.SubscriptionTsFileHandler;
+import org.apache.iotdb.tool.common.Constants;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+
+import static java.lang.System.out;
+
+public class SubscriptionTableTsFile extends AbstractSubscriptionTsFile {
+
+  @Override
+  public void createTopics(String topicName)
+      throws IoTDBConnectionException, StatementExecutionException {
+    Properties properties = new Properties();
+    properties.put(TopicConstant.MODE_KEY, Constants.MODE);
+    properties.put(TopicConstant.FORMAT_KEY, Constants.HANDLER);
+    properties.put(TopicConstant.STRICT_KEY, Constants.STRICT);
+    properties.put(TopicConstant.LOOSE_RANGE_KEY, Constants.LOOSE_RANGE);
+    if (StringUtils.isNotBlank(commonParam.getStartTime()))
+      properties.put(TopicConstant.START_TIME_KEY, commonParam.getStartTime());
+    if (StringUtils.isNotBlank(commonParam.getEndTime()))
+      properties.put(TopicConstant.END_TIME_KEY, commonParam.getEndTime());
+    if (StringUtils.isNotBlank(commonParam.getDatabase())
+        || StringUtils.isNotBlank(commonParam.getTable())) {
+      properties.put(TopicConstant.DATABASE_KEY, commonParam.getDatabase());
+      properties.put(TopicConstant.TABLE_KEY, commonParam.getTable());
+    }
+    commonParam.getTableSubs().createTopic(topicName, properties);
+  }
+
+  @Override
+  public void doClean() throws Exception {
+    List<ISubscriptionTablePullConsumer> pullTableConsumers = 
commonParam.getPullTableConsumers();
+    for (int i = commonParam.getStartIndex(); i < pullTableConsumers.size(); 
i++) {
+      SubscriptionTablePullConsumer consumer =
+          (SubscriptionTablePullConsumer) pullTableConsumers.get(i);
+      String path =
+          commonParam.getTargetDir()
+              + File.separator
+              + consumer.getConsumerGroupId()
+              + File.separator
+              + consumer.getConsumerId();
+      File file = new File(path);
+      if (file.exists()) {
+        FileUtils.deleteFileOrDirectory(file);
+      }
+    }
+    for (Topic topic : CommonParam.getTableSubs().getTopics()) {
+      try {
+        commonParam.getTableSubs().dropTopicIfExists(topic.getTopicName());
+      } catch (Exception e) {
+
+      }
+    }
+    commonParam.getTableSubs().close();
+  }
+
+  @Override
+  public void createConsumers(String groupId) {
+    commonParam.setPullTableConsumers(new 
ArrayList<>(CommonParam.getConsumerCount()));
+    for (int i = commonParam.getStartIndex(); i < 
commonParam.getConsumerCount(); i++) {
+      commonParam
+          .getPullTableConsumers()
+          .add(
+              new SubscriptionTablePullConsumerBuilder()
+                  .host(commonParam.getSrcHost())
+                  .port(commonParam.getSrcPort())
+                  .consumerId(Constants.CONSUMER_NAME_PREFIX + i)
+                  .consumerGroupId(groupId)
+                  .autoCommit(Constants.AUTO_COMMIT)
+                  .autoCommitIntervalMs(Constants.AUTO_COMMIT_INTERVAL)
+                  .fileSaveDir(commonParam.getTargetDir())
+                  .buildTablePullConsumer());
+    }
+    commonParam
+        .getPullTableConsumers()
+        .removeIf(
+            consumer -> {
+              try {
+                consumer.open();
+                return false;
+              } catch (SubscriptionException e) {
+                return true;
+              }
+            });
+    commonParam.setConsumerCount(commonParam.getPullTableConsumers().size());
+  }
+
+  @Override
+  public void subscribe(String topicName)
+      throws IoTDBConnectionException, StatementExecutionException {
+    List<ISubscriptionTablePullConsumer> pullTableConsumers = 
commonParam.getPullTableConsumers();
+    for (int i = 0; i < pullTableConsumers.size(); i++) {
+      try {
+        pullTableConsumers.get(i).subscribe(topicName);
+      } catch (Exception e) {
+        e.printStackTrace(out);
+      }
+    }
+  }
+
+  @Override
+  public void consumerPoll(ExecutorService executor, String topicName) {
+    List<ISubscriptionTablePullConsumer> pullTableConsumers = 
commonParam.getPullTableConsumers();
+    for (int i = commonParam.getStartIndex(); i < pullTableConsumers.size(); 
i++) {
+      SubscriptionTablePullConsumer consumer =
+          (SubscriptionTablePullConsumer) pullTableConsumers.get(i);
+      final String consumerGroupId = consumer.getConsumerGroupId();
+      executor.submit(
+          new Runnable() {
+            @Override
+            public void run() {
+              int retryCount = 0;
+              while (true) {
+                try {
+                  List<SubscriptionMessage> messages =
+                      
consumer.poll(Duration.ofMillis(Constants.POLL_MESSAGE_TIMEOUT));
+                  consumer.commitSync(messages);
+                  if (messages.isEmpty()) {
+                    retryCount++;
+                    if (retryCount >= Constants.MAX_RETRY_TIMES) {
+                      break;
+                    }
+                  }
+                  for (final SubscriptionMessage message : messages) {
+                    SubscriptionTsFileHandler fp = message.getTsFileHandler();
+                    ioTPrinter.println(fp.getFile().getName());
+                    try {
+                      fp.moveFile(
+                          Paths.get(
+                              commonParam.getTargetDir() + File.separator + 
consumerGroupId,
+                              fp.getPath().getFileName().toString()));
+                    } catch (IOException e) {
+                      throw new RuntimeException(e);
+                    }
+                    commonParam.getCountFile().incrementAndGet();
+                  }
+                } catch (Exception e) {
+                  e.printStackTrace(System.out);
+                }
+              }
+              consumer.unsubscribe(topicName);
+            }
+          });
+    }
+  }
+}
diff --git 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/subscription/SubscriptionTreeTsFile.java
 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/subscription/SubscriptionTreeTsFile.java
new file mode 100644
index 00000000000..72a50f1a65d
--- /dev/null
+++ 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/subscription/SubscriptionTreeTsFile.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tool.tsfile.subscription;
+
+import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.rpc.subscription.config.TopicConstant;
+import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
+import 
org.apache.iotdb.session.subscription.consumer.tree.SubscriptionTreePullConsumer;
+import org.apache.iotdb.session.subscription.model.Topic;
+import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
+import org.apache.iotdb.session.subscription.payload.SubscriptionTsFileHandler;
+import org.apache.iotdb.tool.common.Constants;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+
+import static java.lang.System.out;
+
+public class SubscriptionTreeTsFile extends AbstractSubscriptionTsFile {
+
+  @Override
+  public void createTopics(String topicName)
+      throws IoTDBConnectionException, StatementExecutionException {
+    Properties properties = new Properties();
+    properties.put(TopicConstant.MODE_KEY, Constants.MODE);
+    properties.put(TopicConstant.FORMAT_KEY, Constants.HANDLER);
+    properties.put(TopicConstant.STRICT_KEY, Constants.STRICT);
+    properties.put(TopicConstant.LOOSE_RANGE_KEY, Constants.LOOSE_RANGE);
+    if (StringUtils.isNotBlank(commonParam.getStartTime()))
+      properties.put(TopicConstant.START_TIME_KEY, commonParam.getStartTime());
+    if (StringUtils.isNotBlank(commonParam.getEndTime()))
+      properties.put(TopicConstant.END_TIME_KEY, commonParam.getEndTime());
+    properties.put(
+        TopicConstant.PATH_KEY,
+        StringUtils.isNotBlank(commonParam.getPath())
+            ? commonParam.getPath()
+            : commonParam.getPathFull());
+    commonParam.getTreeSubs().createTopic(topicName, properties);
+  }
+
+  @Override
+  public void doClean() throws Exception {
+    List<SubscriptionTreePullConsumer> pullTreeConsumers = 
commonParam.getPullTreeConsumers();
+    for (int i = commonParam.getStartIndex(); i < pullTreeConsumers.size(); 
i++) {
+      SubscriptionTreePullConsumer consumer = pullTreeConsumers.get(i);
+      String path =
+          commonParam.getTargetDir()
+              + File.separator
+              + consumer.getConsumerGroupId()
+              + File.separator
+              + consumer.getConsumerId();
+      File file = new File(path);
+      if (file.exists()) {
+        FileUtils.deleteFileOrDirectory(file);
+      }
+    }
+    for (Topic topic : commonParam.getTreeSubs().getTopics()) {
+      try {
+        commonParam.getTreeSubs().dropTopicIfExists(topic.getTopicName());
+      } catch (Exception e) {
+
+      }
+    }
+    commonParam.getTreeSubs().close();
+  }
+
+  @Override
+  public void createConsumers(String groupId) {
+    commonParam.setPullTreeConsumers(new 
ArrayList<>(commonParam.getConsumerCount()));
+    for (int i = commonParam.getStartIndex(); i < 
commonParam.getConsumerCount(); i++) {
+      commonParam
+          .getPullTreeConsumers()
+          .add(
+              new SubscriptionTreePullConsumer.Builder()
+                  .host(commonParam.getSrcHost())
+                  .port(commonParam.getSrcPort())
+                  .consumerId(Constants.CONSUMER_NAME_PREFIX + i)
+                  .consumerGroupId(groupId)
+                  .autoCommit(Constants.AUTO_COMMIT)
+                  .autoCommitIntervalMs(Constants.AUTO_COMMIT_INTERVAL)
+                  .fileSaveDir(commonParam.getTargetDir())
+                  .buildPullConsumer());
+    }
+    commonParam
+        .getPullTreeConsumers()
+        .removeIf(
+            consumer -> {
+              try {
+                consumer.open();
+                return false;
+              } catch (SubscriptionException e) {
+                return true;
+              }
+            });
+    commonParam.setConsumerCount(commonParam.getPullTreeConsumers().size());
+  }
+
+  @Override
+  public void subscribe(String topicName)
+      throws IoTDBConnectionException, StatementExecutionException {
+    List<SubscriptionTreePullConsumer> pullTreeConsumers = 
commonParam.getPullTreeConsumers();
+    for (int i = 0; i < pullTreeConsumers.size(); i++) {
+      try {
+        pullTreeConsumers.get(i).subscribe(topicName);
+      } catch (Exception e) {
+        e.printStackTrace(out);
+      }
+    }
+  }
+
+  @Override
+  public void consumerPoll(ExecutorService executor, String topicName) {
+    List<SubscriptionTreePullConsumer> pullTreeConsumers = 
commonParam.getPullTreeConsumers();
+    for (int i = commonParam.getStartIndex(); i < pullTreeConsumers.size(); 
i++) {
+      SubscriptionTreePullConsumer consumer = 
commonParam.getPullTreeConsumers().get(i);
+      executor.submit(
+          new Runnable() {
+            @Override
+            public void run() {
+              int retryCount = 0;
+              while (true) {
+                try {
+                  List<SubscriptionMessage> messages =
+                      
consumer.poll(Duration.ofMillis(Constants.POLL_MESSAGE_TIMEOUT));
+                  consumer.commitSync(messages);
+                  if (messages.isEmpty()) {
+                    retryCount++;
+                    if (retryCount >= Constants.MAX_RETRY_TIMES) {
+                      break;
+                    }
+                  }
+                  for (final SubscriptionMessage message : messages) {
+                    SubscriptionTsFileHandler fp = message.getTsFileHandler();
+                    ioTPrinter.println(fp.getFile().getName());
+                    try {
+                      fp.moveFile(
+                          Paths.get(
+                              commonParam.getTargetDir()
+                                  + File.separator
+                                  + consumer.getConsumerGroupId(),
+                              fp.getPath().getFileName().toString()));
+                    } catch (IOException e) {
+                      throw new RuntimeException(e);
+                    }
+                    commonParam.getCountFile().incrementAndGet();
+                  }
+                } catch (Exception e) {
+                  e.printStackTrace(System.out);
+                }
+              }
+              consumer.unsubscribe(topicName);
+            }
+          });
+    }
+  }
+}

Reply via email to