This is an automated email from the ASF dual-hosted git repository.
critas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new babc2af50ee Replace ExportTsFile By SubscriptionTsFile (#14812)
babc2af50ee is described below
commit babc2af50eef5b4812e125edcf17812e3e0237ca
Author: Summer <[email protected]>
AuthorDate: Fri Feb 21 18:26:16 2025 +0800
Replace ExportTsFile By SubscriptionTsFile (#14812)
* Replace ExportTsFile By SubscriptionTsFile
* add dependency logback
* update description in options
* update description in options
* it
* Remove the impact on other consumers when one consumer fails to open
* add space
* add space+1
---------
Co-authored-by: 2b3c511 <[email protected]>
---
.../apache/iotdb/tools/it/ExportTsFileTestIT.java | 32 +-
iotdb-client/cli/pom.xml | 4 +
.../org/apache/iotdb/tool/common/Constants.java | 30 +-
.../org/apache/iotdb/tool/common/OptionsUtil.java | 112 +++++
.../org/apache/iotdb/tool/tsfile/ExportTsFile.java | 539 +++------------------
.../subscription/AbstractSubscriptionTsFile.java | 74 +++
.../tool/tsfile/subscription/CommonParam.java | 212 ++++++++
.../subscription/SubscriptionTableTsFile.java | 186 +++++++
.../subscription/SubscriptionTreeTsFile.java | 183 +++++++
9 files changed, 884 insertions(+), 488 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/tools/it/ExportTsFileTestIT.java
b/integration-test/src/test/java/org/apache/iotdb/tools/it/ExportTsFileTestIT.java
index 57b76e55e81..b85ef9d6adf 100644
---
a/integration-test/src/test/java/org/apache/iotdb/tools/it/ExportTsFileTestIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/tools/it/ExportTsFileTestIT.java
@@ -76,7 +76,7 @@ public class ExportTsFileTestIT extends AbstractScriptIT {
@Override
protected void testOnWindows() throws IOException {
- final String[] output = {"!!!Warning:Tablet is empty,no data can be
exported."};
+ final String[] output = {"Export TsFile Count: 0"};
ProcessBuilder builder =
new ProcessBuilder(
"cmd.exe",
@@ -90,10 +90,8 @@ public class ExportTsFileTestIT extends AbstractScriptIT {
"root",
"-pw",
"root",
- "-t",
- "target",
- "-q",
- "select * from root.test.t2 where time > 1 and time <
1000000000000",
+ "-path",
+ "root.test.t2.**",
"&",
"exit",
"%^errorlevel%");
@@ -102,7 +100,7 @@ public class ExportTsFileTestIT extends AbstractScriptIT {
prepareData();
- final String[] output1 = {"Export completely!"};
+ final String[] output1 = {"Export TsFile Count: "};
ProcessBuilder builder1 =
new ProcessBuilder(
"cmd.exe",
@@ -116,10 +114,8 @@ public class ExportTsFileTestIT extends AbstractScriptIT {
"root",
"-pw",
"root",
- "-t",
- "target",
- "-q",
- "select * from root.test.t2 where time > 1 and time <
1000000000000",
+ "-path",
+ "root.test.t2.**",
"&",
"exit",
"%^errorlevel%");
@@ -129,7 +125,7 @@ public class ExportTsFileTestIT extends AbstractScriptIT {
@Override
protected void testOnUnix() throws IOException {
- final String[] output = {"!!!Warning:Tablet is empty,no data can be
exported."};
+ final String[] output = {"Export TsFile Count: 0"};
// -h 127.0.0.1 -p 6667 -u root -pw root -td ./ -q "select * from root.**"
ProcessBuilder builder =
new ProcessBuilder(
@@ -143,16 +139,14 @@ public class ExportTsFileTestIT extends AbstractScriptIT {
"root",
"-pw",
"root",
- "-t",
- "target",
- "-q",
- "select * from root.**");
+ "-path",
+ "root.**");
builder.environment().put("CLASSPATH", libPath);
testOutput(builder, output, 0);
prepareData();
- final String[] output1 = {"Export completely!"};
+ final String[] output1 = {"Export TsFile Count: "};
// -h 127.0.0.1 -p 6667 -u root -pw root -td ./ -q "select * from root.**"
ProcessBuilder builder1 =
new ProcessBuilder(
@@ -166,10 +160,8 @@ public class ExportTsFileTestIT extends AbstractScriptIT {
"root",
"-pw",
"root",
- "-t",
- "target",
- "-q",
- "select * from root.**");
+ "-path",
+ "root.**");
builder1.environment().put("CLASSPATH", libPath);
testOutput(builder1, output1, 0);
}
diff --git a/iotdb-client/cli/pom.xml b/iotdb-client/cli/pom.xml
index 600684bc4ec..665928d57de 100644
--- a/iotdb-client/cli/pom.xml
+++ b/iotdb-client/cli/pom.xml
@@ -98,6 +98,10 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ </dependency>
<dependency>
<groupId>org.antlr</groupId>
<artifactId>antlr4-runtime</artifactId>
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/common/Constants.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/common/Constants.java
index ee86564d733..429ffd143e9 100644
--- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/common/Constants.java
+++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/common/Constants.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.tool.common;
import org.apache.tsfile.enums.TSDataType;
+import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.Map;
@@ -95,14 +96,14 @@ public class Constants {
public static final String DB_ARGS = "db";
public static final String DB_NAME = "database";
public static final String DB_DESC =
- "The database to be exported,Only takes effect when sql_dialect is of
type table.(optional)";
+ "The database to be exported,only takes effect when sql_dialect is
table.(optional)";
public static final String TABLE_ARGS = "table";
public static final String TABLE_DESC =
- "The table to be exported,only takes effect when sql_dialect is of type
table";
+ "The table to be exported,only takes effect when sql_dialect is
table.(optional)";
public static final String TABLE_DESC_EXPORT =
TABLE_DESC
- + ".If the '- q' parameter is specified, this parameter does not
take effect. If the export type is tsfile or sql, this parameter is required.
(optional)";
+ + ".If the '-q' parameter is specified, this parameter does not take
effect. If the export type is tsfile or sql, this parameter is required.
(optional)";
public static final String TABLE_DESC_IMPORT = TABLE_DESC + " and file_type
is csv. (optional)";
public static final String DATATYPE_BOOLEAN = "boolean";
@@ -177,12 +178,14 @@ public class Constants {
public static final String TARGET_DIR_NAME = "target";
public static final String TARGET_DIR_ARGS_NAME = "target_directory";
public static final String TARGET_DIR_DESC = "Target file directory
(required)";
+ public static final String TARGET_DIR_SUBSCRIPTION_DESC =
+ "Target file directory.default ./target (optional)";
public static final String QUERY_COMMAND_ARGS = "q";
public static final String QUERY_COMMAND_NAME = "query";
public static final String QUERY_COMMAND_ARGS_NAME = "query_command";
public static final String QUERY_COMMAND_DESC =
- "The query command that you want to execute.If sql-dialect is of type
table The 'q' parameter is only applicable to export types of CSV, and is not
available for other types.If the '- q' parameter is not empty, then the
parameters' creatTime ',' EndTime 'and' table 'are not effective.(optional)";
+ "The query command that you want to execute.If sql_dialect is table The
'q' parameter is only applicable to export types of CSV, and is not available
for other types.If the '- q' parameter is not empty, then the parameters'
creatTime ',' EndTime 'and' table 'are not effective.(optional)";
public static final String TARGET_FILE_ARGS = "pfn";
public static final String TARGET_FILE_NAME = "prefix_file_name";
@@ -211,6 +214,10 @@ public class Constants {
public static final String[] TIME_FORMAT =
new String[] {"default", "long", "number", "timestamp"};
+ public static final String PATH_ARGS = "path";
+ public static final String PATH_DESC =
+ "The path to be exported,only takes effect when sql_dialect is
tree.(optional)";
+
public static final long memoryThreshold = 10 * 1024 * 1024;
public static final String[] STRING_TIME_FORMAT =
@@ -253,6 +260,21 @@ public class Constants {
"yyyy.MM.dd'T'HH:mm:ss"
};
+ public static final String SUBSCRIPTION_CLI_PREFIX = "Export TsFile";
+ public static final int MAX_RETRY_TIMES = 2;
+ public static final String LOOSE_RANGE = "";
+ public static final boolean STRICT = false;
+ public static final String MODE = "snapshot";
+ public static final boolean AUTO_COMMIT = false;
+ public static final String TABLE_MODEL = "table";
+ public static final long AUTO_COMMIT_INTERVAL = 5000;
+ public static final long POLL_MESSAGE_TIMEOUT = 10000;
+ public static final String TOPIC_NAME_PREFIX = "topic_";
+ public static final String GROUP_NAME_PREFIX = "group_";
+ public static final String HANDLER = "TsFileHandler";
+ public static final String CONSUMER_NAME_PREFIX = "consumer_";
+ public static final SimpleDateFormat DATE_FORMAT_VIEW = new
SimpleDateFormat("yyyyMMddHHmmssSSS");
+
// import constants
public static final String IMPORT_CLI_PREFIX = "Import Data";
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/common/OptionsUtil.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/common/OptionsUtil.java
index aeddf05e011..bde4831ea9f 100644
---
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/common/OptionsUtil.java
+++
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/common/OptionsUtil.java
@@ -912,4 +912,116 @@ public class OptionsUtil extends Constants {
options.addOption(opTimestampPrecision);
return options;
}
+
+ public static Options createSubscriptionTsFileOptions() {
+ Options options = new Options();
+
+ Option opSqlDialect =
+ Option.builder(SQL_DIALECT_ARGS)
+ .longOpt(SQL_DIALECT_ARGS)
+ .argName(SQL_DIALECT_ARGS)
+ .hasArg()
+ .desc(SQL_DIALECT_DESC)
+ .build();
+ options.addOption(opSqlDialect);
+
+ Option opHost =
+ Option.builder(HOST_ARGS)
+ .longOpt(HOST_NAME)
+ .argName(HOST_NAME)
+ .hasArg()
+ .desc(HOST_DESC)
+ .build();
+ options.addOption(opHost);
+
+ Option opPort =
+ Option.builder(PORT_ARGS)
+ .longOpt(PORT_NAME)
+ .argName(PORT_NAME)
+ .hasArg()
+ .desc(PORT_DESC)
+ .build();
+ options.addOption(opPort);
+
+ Option opUsername =
+ Option.builder(USERNAME_ARGS)
+ .longOpt(USERNAME_NAME)
+ .argName(USERNAME_NAME)
+ .hasArg()
+ .desc(USERNAME_DESC)
+ .build();
+ options.addOption(opUsername);
+
+ Option opPassword =
+ Option.builder(PW_ARGS)
+ .longOpt(PW_NAME)
+ .optionalArg(true)
+ .argName(PW_NAME)
+ .hasArg()
+ .desc(PW_DESC)
+ .build();
+ options.addOption(opPassword);
+
+ Option opPath =
+ Option.builder(PATH_ARGS)
+ .longOpt(PATH_ARGS)
+ .argName(PATH_ARGS)
+ .hasArg()
+ .desc(PATH_DESC)
+ .build();
+ options.addOption(opPath);
+
+ Option opDatabase =
+
Option.builder(DB_ARGS).longOpt(DB_NAME).argName(DB_ARGS).hasArg().desc(DB_DESC).build();
+ options.addOption(opDatabase);
+
+ Option opTable =
+ Option.builder(TABLE_ARGS)
+ .longOpt(TABLE_ARGS)
+ .argName(TABLE_ARGS)
+ .hasArg()
+ .desc(TABLE_DESC)
+ .build();
+ options.addOption(opTable);
+
+ Option opStartTime =
+ Option.builder(START_TIME_ARGS)
+ .longOpt(START_TIME_ARGS)
+ .argName(START_TIME_ARGS)
+ .hasArg()
+ .desc(START_TIME_DESC)
+ .build();
+ options.addOption(opStartTime);
+
+ Option opEndTime =
+ Option.builder(END_TIME_ARGS)
+ .longOpt(END_TIME_ARGS)
+ .argName(END_TIME_ARGS)
+ .hasArg()
+ .desc(END_TIME_DESC)
+ .build();
+ options.addOption(opEndTime);
+
+ Option opFile =
+ Option.builder(TARGET_DIR_ARGS)
+ .longOpt(TARGET_DIR_NAME)
+ .argName(TARGET_DIR_ARGS_NAME)
+ .hasArg()
+ .desc(TARGET_DIR_SUBSCRIPTION_DESC)
+ .build();
+ options.addOption(opFile);
+
+ Option opThreadNum =
+ Option.builder(THREAD_NUM_ARGS)
+ .longOpt(THREAD_NUM_NAME)
+ .argName(THREAD_NUM_NAME)
+ .hasArg()
+ .desc(THREAD_NUM_DESC)
+ .build();
+ options.addOption(opThreadNum);
+
+ Option opHelp =
Option.builder(HELP_ARGS).longOpt(HELP_ARGS).hasArg().desc(HELP_DESC).build();
+ options.addOption(opHelp);
+ return options;
+ }
}
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ExportTsFile.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ExportTsFile.java
index 7be26207b16..71794ce244c 100644
---
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ExportTsFile.java
+++
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ExportTsFile.java
@@ -19,495 +19,106 @@
package org.apache.iotdb.tool.tsfile;
-import org.apache.iotdb.cli.type.ExitType;
-import org.apache.iotdb.cli.utils.CliContext;
import org.apache.iotdb.cli.utils.IoTPrinter;
-import org.apache.iotdb.cli.utils.JlineUtils;
-import org.apache.iotdb.exception.ArgsErrorException;
-import org.apache.iotdb.isession.SessionDataSet;
-import org.apache.iotdb.rpc.IoTDBConnectionException;
-import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.session.Session;
+import org.apache.iotdb.tool.common.Constants;
+import org.apache.iotdb.tool.common.OptionsUtil;
+import org.apache.iotdb.tool.tsfile.subscription.AbstractSubscriptionTsFile;
+import org.apache.iotdb.tool.tsfile.subscription.CommonParam;
+import ch.qos.logback.classic.Level;
+import ch.qos.logback.classic.Logger;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
-import org.apache.tsfile.enums.TSDataType;
-import org.apache.tsfile.exception.write.WriteProcessException;
-import org.apache.tsfile.file.metadata.enums.CompressionType;
-import org.apache.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.tsfile.fileSystem.FSFactoryProducer;
-import org.apache.tsfile.read.common.Field;
-import org.apache.tsfile.read.common.Path;
-import org.apache.tsfile.read.common.RowRecord;
-import org.apache.tsfile.write.TsFileWriter;
-import org.apache.tsfile.write.record.Tablet;
-import org.apache.tsfile.write.schema.IMeasurementSchema;
-import org.apache.tsfile.write.schema.MeasurementSchema;
-import org.jline.reader.LineReader;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.LoggerFactory;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
-public class ExportTsFile extends AbstractTsFileTool {
-
- private static final String TARGET_DIR_ARGS = "t";
- private static final String TARGET_DIR_NAME = "targetDirectory";
- private static final String TARGET_DIR_NAME_BACK = "target";
-
- private static final String TARGET_FILE_ARGS = "tfn";
- private static final String TARGET_FILE_NAME = "targetFileName";
- private static final String TARGET_FILE_ARGS_BACK = "pfn";
-
- private static final String SQL_FILE_ARGS = "s";
- private static final String SQL_FILE_NAME = "sourceSqlFile";
- private static final String QUERY_COMMAND_ARGS = "q";
- private static final String QUERY_COMMAND_NAME = "queryCommand";
- private static final String DUMP_FILE_NAME_DEFAULT = "dump";
- private static final String TSFILEDB_CLI_PREFIX = "ExportTsFile";
-
- private static String targetDirectory;
- private static String targetFile = DUMP_FILE_NAME_DEFAULT;
- private static String queryCommand;
- private static String sqlFile;
-
- private static long timeout = -1;
+public class ExportTsFile {
private static final IoTPrinter ioTPrinter = new IoTPrinter(System.out);
-
- @SuppressWarnings({
- "squid:S3776",
- "squid:S2093"
- }) // Suppress high Cognitive Complexity warning, ignore try-with-resources
- /* main function of export tsFile tool. */
- public static void main(String[] args) {
- int exitCode = getCommandLine(args);
- exportTsfile(exitCode);
- }
-
- public static void exportTsfile(int exitCode) {
- try {
- session = new Session(host, Integer.parseInt(port), username, password);
- session.open(false);
-
- if (queryCommand == null) {
- String sql;
-
- if (sqlFile == null) {
- LineReader lineReader =
- JlineUtils.getLineReader(
- new CliContext(System.in, System.out, System.err,
ExitType.EXCEPTION),
- username,
- host,
- port);
- sql = lineReader.readLine(TSFILEDB_CLI_PREFIX + "> please input
query: ");
- ioTPrinter.println(sql);
- String[] values = sql.trim().split(";");
- for (int i = 0; i < values.length; i++) {
- legalCheck(values[i]);
- dumpResult(values[i], i);
- }
-
- } else {
- dumpFromSqlFile(sqlFile);
- }
- } else {
- legalCheck(queryCommand);
- dumpResult(queryCommand, 0);
+ private static CommonParam commonParam = CommonParam.getInstance();
+
+ public static void main(String[] args) throws Exception {
+ Logger logger =
+ (Logger)
LoggerFactory.getLogger("org.apache.iotdb.session.subscription.consumer.base");
+ logger.setLevel(Level.ERROR);
+ Options options = OptionsUtil.createSubscriptionTsFileOptions();
+ parseParams(args, options);
+ if (StringUtils.isEmpty(commonParam.getPath())) {
+ commonParam.setSqlDialect(Constants.TABLE_MODEL);
+ }
+ AbstractSubscriptionTsFile.setSubscriptionSession();
+ String nowFormat =
Constants.DATE_FORMAT_VIEW.format(System.currentTimeMillis());
+ String topicName = Constants.TOPIC_NAME_PREFIX + nowFormat;
+ String groupId = Constants.GROUP_NAME_PREFIX + nowFormat;
+ commonParam.getSubscriptionTsFile().createTopics(topicName);
+ commonParam.getSubscriptionTsFile().createConsumers(groupId);
+ commonParam.getSubscriptionTsFile().subscribe(topicName);
+ ExecutorService executor =
Executors.newFixedThreadPool(commonParam.getConsumerCount());
+ commonParam.getSubscriptionTsFile().consumerPoll(executor, topicName);
+ executor.shutdown();
+ while (true) {
+ if (executor.isTerminated()) {
+ break;
}
-
- } catch (IOException e) {
- ioTPrinter.println("Failed to operate on file, because " +
e.getMessage());
- exitCode = CODE_ERROR;
- } catch (IoTDBConnectionException e) {
- ioTPrinter.println("Connect failed because " + e.getMessage());
- exitCode = CODE_ERROR;
- } finally {
- if (session != null) {
- try {
- session.close();
- } catch (IoTDBConnectionException e) {
- exitCode = CODE_ERROR;
- ioTPrinter.println(
- "Encounter an error when closing session, error is: " +
e.getMessage());
- }
- }
- }
- System.exit(exitCode);
- }
-
- public ExportTsFile(CommandLine commandLine) {
- try {
- parseBasicParams(commandLine);
- parseSpecialParamsBack(commandLine);
- } catch (ArgsErrorException e) {
- ioTPrinter.println("Invalid args: " + e.getMessage());
- System.exit(CODE_ERROR);
}
+ commonParam.getSubscriptionTsFile().doClean();
+ ioTPrinter.println("Export TsFile Count: " +
commonParam.getCountFile().get());
}
- protected static int getCommandLine(String[] args) {
- createOptions();
+ private static void parseParams(String[] args, Options options) {
HelpFormatter hf = new HelpFormatter();
- CommandLine commandLine = null;
- CommandLineParser parser = new DefaultParser();
- hf.setOptionComparator(null); // avoid reordering
- hf.setWidth(MAX_HELP_CONSOLE_WIDTH);
-
- if (args == null || args.length == 0) {
- ioTPrinter.println("Too few params input, please check the following
hint.");
- hf.printHelp(TSFILEDB_CLI_PREFIX, options, true);
- System.exit(CODE_ERROR);
- }
+ CommandLine cli = null;
+ CommandLineParser cliParser = new DefaultParser();
try {
- commandLine = parser.parse(options, args);
- } catch (ParseException e) {
- ioTPrinter.println(e.getMessage());
- hf.printHelp(TSFILEDB_CLI_PREFIX, options, true);
- System.exit(CODE_ERROR);
- }
- if (commandLine.hasOption(HELP_ARGS)) {
- hf.printHelp(TSFILEDB_CLI_PREFIX, options, true);
- System.exit(CODE_ERROR);
- }
- int exitCode = CODE_OK;
- try {
- parseBasicParams(commandLine);
- parseSpecialParams(commandLine);
- } catch (ArgsErrorException e) {
- ioTPrinter.println("Invalid args: " + e.getMessage());
- exitCode = CODE_ERROR;
- }
- return exitCode;
- }
-
- private static void legalCheck(String sql) {
- String sqlLower = sql.toLowerCase();
- if (sqlLower.contains("count(")
- || sqlLower.contains("sum(")
- || sqlLower.contains("avg(")
- || sqlLower.contains("extreme(")
- || sqlLower.contains("max_value(")
- || sqlLower.contains("min_value(")
- || sqlLower.contains("first_value(")
- || sqlLower.contains("last_value(")
- || sqlLower.contains("max_time(")
- || sqlLower.contains("min_time(")
- || sqlLower.contains("stddev(")
- || sqlLower.contains("stddev_pop(")
- || sqlLower.contains("stddev_samp(")
- || sqlLower.contains("variance(")
- || sqlLower.contains("var_pop(")
- || sqlLower.contains("var_samp(")
- || sqlLower.contains("max_by(")
- || sqlLower.contains("min_by(")) {
- ioTPrinter.println("The sql you entered is invalid, please don't use
aggregate query.");
- System.exit(CODE_ERROR);
- }
- }
-
- private static void parseSpecialParams(CommandLine commandLine) throws
ArgsErrorException {
- targetDirectory = checkRequiredArg(TARGET_DIR_ARGS, TARGET_DIR_NAME,
commandLine);
- queryCommand = commandLine.getOptionValue(QUERY_COMMAND_ARGS);
- targetFile = commandLine.getOptionValue(TARGET_FILE_ARGS);
- sqlFile = commandLine.getOptionValue(SQL_FILE_ARGS);
- String timeoutString = commandLine.getOptionValue(TIMEOUT_ARGS);
- if (timeoutString != null) {
- timeout = Long.parseLong(timeoutString);
- }
- if (targetFile == null) {
- targetFile = DUMP_FILE_NAME_DEFAULT;
- }
-
- if (!targetDirectory.endsWith("/") && !targetDirectory.endsWith("\\")) {
- targetDirectory += File.separator;
- }
- }
-
- private static void parseSpecialParamsBack(CommandLine commandLine) throws
ArgsErrorException {
- targetDirectory = checkRequiredArg(TARGET_DIR_ARGS, TARGET_DIR_NAME_BACK,
commandLine);
- queryCommand = commandLine.getOptionValue(QUERY_COMMAND_ARGS);
- targetFile = commandLine.getOptionValue(TARGET_FILE_ARGS_BACK);
- String timeoutString = commandLine.getOptionValue(TIMEOUT_ARGS);
- if (timeoutString != null) {
- timeout = Long.parseLong(timeoutString);
- }
- if (targetFile == null) {
- targetFile = DUMP_FILE_NAME_DEFAULT;
- }
-
- if (!targetDirectory.endsWith("/") && !targetDirectory.endsWith("\\")) {
- targetDirectory += File.separator;
- }
- }
-
- /**
- * commandline option create.
- *
- * @return object Options
- */
- private static void createOptions() {
- createBaseOptions();
-
- Option opTargetFile =
- Option.builder(TARGET_DIR_ARGS)
- .required()
- .argName(TARGET_DIR_NAME)
- .hasArg()
- .desc("Target File Directory (required)")
- .build();
- options.addOption(opTargetFile);
-
- Option targetFileName =
- Option.builder(TARGET_FILE_ARGS)
- .argName(TARGET_FILE_NAME)
- .hasArg()
- .desc("Export file name (optional)")
- .build();
- options.addOption(targetFileName);
-
- Option opSqlFile =
- Option.builder(SQL_FILE_ARGS)
- .argName(SQL_FILE_NAME)
- .hasArg()
- .desc("SQL File Path (optional)")
- .build();
- options.addOption(opSqlFile);
-
- Option opQuery =
- Option.builder(QUERY_COMMAND_ARGS)
- .argName(QUERY_COMMAND_NAME)
- .hasArg()
- .desc("The query command that you want to execute. (optional)")
- .build();
- options.addOption(opQuery);
-
- Option opHelp =
- Option.builder(HELP_ARGS)
- .longOpt(HELP_ARGS)
- .hasArg(false)
- .desc("Display help information")
- .build();
- options.addOption(opHelp);
-
- Option opTimeout =
- Option.builder(TIMEOUT_ARGS)
- .argName(TIMEOUT_NAME)
- .hasArg()
- .desc("Timeout for session query")
- .build();
- options.addOption(opTimeout);
- }
-
- /**
- * This method will be called, if the query commands are written in a sql
file.
- *
- * @param filePath:file path
- * @throws IOException: exception
- */
- private static void dumpFromSqlFile(String filePath) throws IOException {
- try (BufferedReader reader = new BufferedReader(new FileReader(filePath)))
{
- String sql;
- int i = 0;
- while ((sql = reader.readLine()) != null) {
- legalCheck(sql);
- dumpResult(sql, i++);
+ cli = cliParser.parse(options, args);
+ if (cli.hasOption(Constants.SQL_DIALECT_ARGS)) {
+
commonParam.setSqlDialect(cli.getOptionValue(Constants.SQL_DIALECT_ARGS));
}
- }
- }
-
- /**
- * Dump files from database to tsFile.
- *
- * @param sql export the result of executing the sql
- */
- private static void dumpResult(String sql, int index) {
- final String path = targetDirectory + targetFile + index + ".tsfile";
- try (SessionDataSet sessionDataSet = session.executeQueryStatement(sql,
timeout)) {
- long start = System.currentTimeMillis();
- boolean isComplete = writeWithTablets(sessionDataSet, path);
- if (isComplete) {
- long end = System.currentTimeMillis();
- ioTPrinter.println("Export completely!cost: " + (end - start) + "
ms.");
+ if (cli.hasOption(Constants.HOST_ARGS)) {
+ commonParam.setSrcHost(cli.getOptionValue(Constants.HOST_ARGS));
}
- } catch (StatementExecutionException
- | IoTDBConnectionException
- | IOException
- | WriteProcessException e) {
- ioTPrinter.println("Cannot dump result because: " + e.getMessage());
- }
- }
-
- private static void collectSchemas(
- List<String> columnNames,
- List<String> columnTypes,
- Map<String, List<IMeasurementSchema>> deviceSchemaMap,
- Set<String> alignedDevices,
- Map<String, List<Integer>> deviceColumnIndices)
- throws IoTDBConnectionException, StatementExecutionException {
- for (int i = 0; i < columnNames.size(); i++) {
- String column = columnNames.get(i);
- if (!column.startsWith("root.")) {
- continue;
+ if (cli.hasOption(Constants.PORT_ARGS)) {
+
commonParam.setSrcPort(Integer.valueOf(cli.getOptionValue(Constants.PORT_ARGS)));
}
- TSDataType tsDataType = getTsDataType(columnTypes.get(i));
- Path path = new Path(column, true);
- String deviceId = path.getDeviceString();
- // query whether the device is aligned or not
- try (SessionDataSet deviceDataSet =
- session.executeQueryStatement("show devices " + deviceId, timeout)) {
- List<Field> deviceList = deviceDataSet.next().getFields();
- if (deviceList.size() > 1 &&
"true".equals(deviceList.get(1).getStringValue())) {
- alignedDevices.add(deviceId);
- }
+ if (cli.hasOption(Constants.USERNAME_ARGS)) {
+
commonParam.setSrcUserName(cli.getOptionValue(Constants.USERNAME_ARGS));
}
-
- // query timeseries metadata
- MeasurementSchema measurementSchema =
- new MeasurementSchema(path.getMeasurement(), tsDataType);
- List<Field> seriesList =
- session.executeQueryStatement("show timeseries " + column,
timeout).next().getFields();
-
measurementSchema.setEncoding(TSEncoding.valueOf(seriesList.get(4).getStringValue()));
- measurementSchema.setCompressionType(
- CompressionType.valueOf(seriesList.get(5).getStringValue()));
-
- deviceSchemaMap.computeIfAbsent(deviceId, key -> new
ArrayList<>()).add(measurementSchema);
- deviceColumnIndices.computeIfAbsent(deviceId, key -> new
ArrayList<>()).add(i);
- }
- }
-
- private static List<Tablet> constructTablets(
- Map<String, List<IMeasurementSchema>> deviceSchemaMap,
- Set<String> alignedDevices,
- TsFileWriter tsFileWriter)
- throws WriteProcessException {
- List<Tablet> tabletList = new ArrayList<>(deviceSchemaMap.size());
- for (Map.Entry<String, List<IMeasurementSchema>> stringListEntry :
deviceSchemaMap.entrySet()) {
- String deviceId = stringListEntry.getKey();
- List<IMeasurementSchema> schemaList = stringListEntry.getValue();
- Tablet tablet = new Tablet(deviceId, schemaList);
- tablet.initBitMaps();
- Path path = new Path(tablet.getDeviceId());
- if (alignedDevices.contains(tablet.getDeviceId())) {
- tsFileWriter.registerAlignedTimeseries(path, schemaList);
- } else {
- tsFileWriter.registerTimeseries(path, schemaList);
+ if (cli.hasOption(Constants.PW_ARGS)) {
+ commonParam.setSrcPassword(cli.getOptionValue(Constants.PW_ARGS));
}
- tabletList.add(tablet);
- }
- return tabletList;
- }
-
- private static void writeWithTablets(
- SessionDataSet sessionDataSet,
- List<Tablet> tabletList,
- Set<String> alignedDevices,
- TsFileWriter tsFileWriter,
- Map<String, List<Integer>> deviceColumnIndices)
- throws IoTDBConnectionException,
- StatementExecutionException,
- IOException,
- WriteProcessException {
- while (sessionDataSet.hasNext()) {
- RowRecord rowRecord = sessionDataSet.next();
- List<Field> fields = rowRecord.getFields();
-
- for (Tablet tablet : tabletList) {
- String deviceId = tablet.getDeviceId();
- List<Integer> columnIndices = deviceColumnIndices.get(deviceId);
- int rowIndex = tablet.getRowSize();
- tablet.addTimestamp(rowIndex, rowRecord.getTimestamp());
- List<IMeasurementSchema> schemas = tablet.getSchemas();
-
- for (int i = 0, columnIndicesSize = columnIndices.size(); i <
columnIndicesSize; i++) {
- Integer columnIndex = columnIndices.get(i);
- IMeasurementSchema measurementSchema = schemas.get(i);
- // -1 for time not in fields
- Object value = fields.get(columnIndex -
1).getObjectValue(measurementSchema.getType());
- tablet.addValue(measurementSchema.getMeasurementName(), rowIndex,
value);
- }
-
- if (tablet.getRowSize() == tablet.getMaxRowNumber()) {
- writeToTsFile(alignedDevices, tsFileWriter, tablet);
- tablet.initBitMaps();
- tablet.reset();
- }
+ if (cli.hasOption(Constants.PATH_ARGS)) {
+ commonParam.setPath(cli.getOptionValue(Constants.PATH_ARGS));
}
- }
-
- for (Tablet tablet : tabletList) {
- if (tablet.getRowSize() != 0) {
- writeToTsFile(alignedDevices, tsFileWriter, tablet);
+ if (cli.hasOption(Constants.DB_ARGS)) {
+ commonParam.setDatabase(cli.getOptionValue(Constants.DB_ARGS));
}
- }
- }
-
- @SuppressWarnings({
- "squid:S3776",
- "squid:S6541"
- }) // Suppress high Cognitive Complexity warning, Suppress many task in one
method warning
- public static Boolean writeWithTablets(SessionDataSet sessionDataSet, String
filePath)
- throws IOException,
- IoTDBConnectionException,
- StatementExecutionException,
- WriteProcessException {
- List<String> columnNames = sessionDataSet.getColumnNames();
- List<String> columnTypes = sessionDataSet.getColumnTypes();
- File f = FSFactoryProducer.getFSFactory().getFile(filePath);
- if (f.exists()) {
- Files.delete(f.toPath());
- }
- boolean isEmpty = false;
- try (TsFileWriter tsFileWriter = new TsFileWriter(f)) {
- // device -> column indices in columnNames
- Map<String, List<Integer>> deviceColumnIndices = new HashMap<>();
- Set<String> alignedDevices = new HashSet<>();
- Map<String, List<IMeasurementSchema>> deviceSchemaMap = new
LinkedHashMap<>();
-
- collectSchemas(
- columnNames, columnTypes, deviceSchemaMap, alignedDevices,
deviceColumnIndices);
-
- List<Tablet> tabletList = constructTablets(deviceSchemaMap,
alignedDevices, tsFileWriter);
-
- if (!tabletList.isEmpty()) {
- writeWithTablets(
- sessionDataSet, tabletList, alignedDevices, tsFileWriter,
deviceColumnIndices);
- tsFileWriter.flush();
- } else {
- isEmpty = true;
+ if (cli.hasOption(Constants.TABLE_ARGS)) {
+ commonParam.setTable(cli.getOptionValue(Constants.TABLE_ARGS));
}
+ if (cli.hasOption(Constants.TARGET_DIR_ARGS)) {
+
commonParam.setTargetDir(cli.getOptionValue(Constants.TARGET_DIR_ARGS));
+ }
+ if (cli.hasOption(Constants.START_TIME_ARGS)) {
+
commonParam.setStartTime(cli.getOptionValue(Constants.START_TIME_ARGS));
+ }
+ if (cli.hasOption(Constants.END_TIME_ARGS)) {
+ commonParam.setEndTime(cli.getOptionValue(Constants.END_TIME_ARGS));
+ }
+ if (cli.hasOption(Constants.THREAD_NUM_ARGS)) {
+ commonParam.setConsumerCount(
+ Integer.valueOf(cli.getOptionValue(Constants.THREAD_NUM_ARGS)));
+ }
+ } catch (ParseException e) {
+ ioTPrinter.println(e.getMessage());
+ hf.printHelp(Constants.SUBSCRIPTION_CLI_PREFIX, options, true);
+ System.exit(Constants.CODE_ERROR);
}
- if (isEmpty) {
- ioTPrinter.println("!!!Warning:Tablet is empty,no data can be
exported.");
- return false;
- }
- return true;
- }
-
- private static void writeToTsFile(
- Set<String> deviceFilterSet, TsFileWriter tsFileWriter, Tablet tablet)
- throws IOException, WriteProcessException {
- if (deviceFilterSet.contains(tablet.getDeviceId())) {
- tsFileWriter.writeAligned(tablet);
- } else {
- tsFileWriter.writeTree(tablet);
- }
- }
-
- private static TSDataType getTsDataType(String type) {
- return TSDataType.valueOf(type);
}
}
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/subscription/AbstractSubscriptionTsFile.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/subscription/AbstractSubscriptionTsFile.java
new file mode 100644
index 00000000000..60d38af45f2
--- /dev/null
+++
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/subscription/AbstractSubscriptionTsFile.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tool.tsfile.subscription;
+
+import org.apache.iotdb.cli.utils.IoTPrinter;
+import org.apache.iotdb.isession.SessionConfig;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.subscription.SubscriptionTableSessionBuilder;
+import org.apache.iotdb.session.subscription.SubscriptionTreeSessionBuilder;
+import org.apache.iotdb.tool.common.Constants;
+
+import java.util.concurrent.ExecutorService;
+
+public abstract class AbstractSubscriptionTsFile {
+
+ protected static final IoTPrinter ioTPrinter = new IoTPrinter(System.out);
+ protected static CommonParam commonParam = CommonParam.getInstance();
+
+ public static void setSubscriptionSession() throws IoTDBConnectionException {
+ if (Constants.TABLE_MODEL.equalsIgnoreCase(commonParam.getSqlDialect())) {
+ commonParam.setSubscriptionTsFile(new SubscriptionTableTsFile());
+ commonParam.setTableSubs(
+ new SubscriptionTableSessionBuilder()
+ .host(commonParam.getSrcHost())
+ .port(commonParam.getSrcPort())
+ .username(commonParam.getSrcUserName())
+ .password(commonParam.getSrcPassword())
+ .thriftMaxFrameSize(SessionConfig.DEFAULT_MAX_FRAME_SIZE)
+ .build());
+ commonParam.getTableSubs().open();
+ } else {
+ commonParam.setSubscriptionTsFile(new SubscriptionTreeTsFile());
+ commonParam.setTreeSubs(
+ new SubscriptionTreeSessionBuilder()
+ .host(commonParam.getSrcHost())
+ .port(commonParam.getSrcPort())
+ .username(commonParam.getSrcUserName())
+ .password(commonParam.getSrcPassword())
+ .thriftMaxFrameSize(SessionConfig.DEFAULT_MAX_FRAME_SIZE)
+ .build());
+ commonParam.getTreeSubs().open();
+ }
+ }
+
+ public abstract void createTopics(String topicName)
+ throws IoTDBConnectionException, StatementExecutionException;
+
+ public abstract void doClean() throws Exception;
+
+ public abstract void createConsumers(String groupId);
+
+ public abstract void subscribe(String topicName)
+ throws IoTDBConnectionException, StatementExecutionException;
+
+ public abstract void consumerPoll(ExecutorService executor, String
topicName);
+}
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/subscription/CommonParam.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/subscription/CommonParam.java
new file mode 100644
index 00000000000..34b26a6db02
--- /dev/null
+++
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/subscription/CommonParam.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tool.tsfile.subscription;
+
+import org.apache.iotdb.session.subscription.ISubscriptionTableSession;
+import org.apache.iotdb.session.subscription.ISubscriptionTreeSession;
+import
org.apache.iotdb.session.subscription.consumer.ISubscriptionTablePullConsumer;
+import
org.apache.iotdb.session.subscription.consumer.tree.SubscriptionTreePullConsumer;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class CommonParam {
+
+ private static AtomicInteger countFile = new AtomicInteger(0);
+ private static String path = "root.**";
+ private static String table = ".*";
+ private static String database = ".*";
+ private static int startIndex = 0;
+ private static int consumerCount = 8;
+ private static ISubscriptionTableSession tableSubs;
+ private static ISubscriptionTreeSession treeSubs;
+
+ private static String pathFull = "root.**";
+ private static String srcHost = "127.0.0.1";
+ private static int srcPort = 6667;
+ private static String srcUserName = "root";
+ private static String srcPassword = "root";
+ private static String sqlDialect = "tree";
+ private static String startTime = "";
+ private static String endTime = "";
+ private static String targetDir = "target";
+ private static List<ISubscriptionTablePullConsumer> pullTableConsumers;
+ private static List<SubscriptionTreePullConsumer> pullTreeConsumers;
+
+ private static AbstractSubscriptionTsFile subscriptionTsFile;
+
+ private static CommonParam instance;
+
+ public static synchronized CommonParam getInstance() {
+ if (null == instance) {
+ instance = new CommonParam();
+ }
+ return instance;
+ }
+
+ public static String getPath() {
+ return path;
+ }
+
+ public static void setPath(String path) {
+ CommonParam.path = path;
+ }
+
+ public static String getTable() {
+ return table;
+ }
+
+ public static void setTable(String table) {
+ CommonParam.table = table;
+ }
+
+ public static String getDatabase() {
+ return database;
+ }
+
+ public static void setDatabase(String database) {
+ CommonParam.database = database;
+ }
+
+ public static int getStartIndex() {
+ return startIndex;
+ }
+
+ public static int getConsumerCount() {
+ return consumerCount;
+ }
+
+ public static void setConsumerCount(int consumerCount) {
+ CommonParam.consumerCount = consumerCount;
+ }
+
+ public static ISubscriptionTableSession getTableSubs() {
+ return tableSubs;
+ }
+
+ public static void setTableSubs(ISubscriptionTableSession tableSubs) {
+ CommonParam.tableSubs = tableSubs;
+ }
+
+ public static ISubscriptionTreeSession getTreeSubs() {
+ return treeSubs;
+ }
+
+ public static void setTreeSubs(ISubscriptionTreeSession treeSubs) {
+ CommonParam.treeSubs = treeSubs;
+ }
+
+ public static String getPathFull() {
+ return pathFull;
+ }
+
+ public static String getSrcHost() {
+ return srcHost;
+ }
+
+ public static void setSrcHost(String srcHost) {
+ CommonParam.srcHost = srcHost;
+ }
+
+ public static int getSrcPort() {
+ return srcPort;
+ }
+
+ public static void setSrcPort(int srcPort) {
+ CommonParam.srcPort = srcPort;
+ }
+
+ public static String getSrcUserName() {
+ return srcUserName;
+ }
+
+ public static void setSrcUserName(String srcUserName) {
+ CommonParam.srcUserName = srcUserName;
+ }
+
+ public static String getSrcPassword() {
+ return srcPassword;
+ }
+
+ public static void setSrcPassword(String srcPassword) {
+ CommonParam.srcPassword = srcPassword;
+ }
+
+ public static String getSqlDialect() {
+ return sqlDialect;
+ }
+
+ public static void setSqlDialect(String sqlDialect) {
+ CommonParam.sqlDialect = sqlDialect;
+ }
+
+ public static String getStartTime() {
+ return startTime;
+ }
+
+ public static void setStartTime(String startTime) {
+ CommonParam.startTime = startTime;
+ }
+
+ public static String getEndTime() {
+ return endTime;
+ }
+
+ public static void setEndTime(String endTime) {
+ CommonParam.endTime = endTime;
+ }
+
+ public static String getTargetDir() {
+ return targetDir;
+ }
+
+ public static void setTargetDir(String targetDir) {
+ CommonParam.targetDir = targetDir;
+ }
+
+ public static List<ISubscriptionTablePullConsumer> getPullTableConsumers() {
+ return pullTableConsumers;
+ }
+
+ public static void setPullTableConsumers(
+ List<ISubscriptionTablePullConsumer> pullTableConsumers) {
+ CommonParam.pullTableConsumers = pullTableConsumers;
+ }
+
+ public static List<SubscriptionTreePullConsumer> getPullTreeConsumers() {
+ return pullTreeConsumers;
+ }
+
+ public static void setPullTreeConsumers(List<SubscriptionTreePullConsumer>
pullTreeConsumers) {
+ CommonParam.pullTreeConsumers = pullTreeConsumers;
+ }
+
+ public static AbstractSubscriptionTsFile getSubscriptionTsFile() {
+ return subscriptionTsFile;
+ }
+
+ public static void setSubscriptionTsFile(AbstractSubscriptionTsFile
subscriptionTsFile) {
+ CommonParam.subscriptionTsFile = subscriptionTsFile;
+ }
+
+ public static AtomicInteger getCountFile() {
+ return countFile;
+ }
+}
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/subscription/SubscriptionTableTsFile.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/subscription/SubscriptionTableTsFile.java
new file mode 100644
index 00000000000..0c9c3268c94
--- /dev/null
+++
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/subscription/SubscriptionTableTsFile.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tool.tsfile.subscription;
+
+import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.rpc.subscription.config.TopicConstant;
+import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
+import
org.apache.iotdb.session.subscription.consumer.ISubscriptionTablePullConsumer;
+import
org.apache.iotdb.session.subscription.consumer.table.SubscriptionTablePullConsumer;
+import
org.apache.iotdb.session.subscription.consumer.table.SubscriptionTablePullConsumerBuilder;
+import org.apache.iotdb.session.subscription.model.Topic;
+import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
+import org.apache.iotdb.session.subscription.payload.SubscriptionTsFileHandler;
+import org.apache.iotdb.tool.common.Constants;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+
+import static java.lang.System.out;
+
+public class SubscriptionTableTsFile extends AbstractSubscriptionTsFile {
+
+ @Override
+ public void createTopics(String topicName)
+ throws IoTDBConnectionException, StatementExecutionException {
+ Properties properties = new Properties();
+ properties.put(TopicConstant.MODE_KEY, Constants.MODE);
+ properties.put(TopicConstant.FORMAT_KEY, Constants.HANDLER);
+ properties.put(TopicConstant.STRICT_KEY, Constants.STRICT);
+ properties.put(TopicConstant.LOOSE_RANGE_KEY, Constants.LOOSE_RANGE);
+ if (StringUtils.isNotBlank(commonParam.getStartTime()))
+ properties.put(TopicConstant.START_TIME_KEY, commonParam.getStartTime());
+ if (StringUtils.isNotBlank(commonParam.getEndTime()))
+ properties.put(TopicConstant.END_TIME_KEY, commonParam.getEndTime());
+ if (StringUtils.isNotBlank(commonParam.getDatabase())
+ || StringUtils.isNotBlank(commonParam.getTable())) {
+ properties.put(TopicConstant.DATABASE_KEY, commonParam.getDatabase());
+ properties.put(TopicConstant.TABLE_KEY, commonParam.getTable());
+ }
+ commonParam.getTableSubs().createTopic(topicName, properties);
+ }
+
+ @Override
+ public void doClean() throws Exception {
+ List<ISubscriptionTablePullConsumer> pullTableConsumers =
commonParam.getPullTableConsumers();
+ for (int i = commonParam.getStartIndex(); i < pullTableConsumers.size();
i++) {
+ SubscriptionTablePullConsumer consumer =
+ (SubscriptionTablePullConsumer) pullTableConsumers.get(i);
+ String path =
+ commonParam.getTargetDir()
+ + File.separator
+ + consumer.getConsumerGroupId()
+ + File.separator
+ + consumer.getConsumerId();
+ File file = new File(path);
+ if (file.exists()) {
+ FileUtils.deleteFileOrDirectory(file);
+ }
+ }
+ for (Topic topic : CommonParam.getTableSubs().getTopics()) {
+ try {
+ commonParam.getTableSubs().dropTopicIfExists(topic.getTopicName());
+ } catch (Exception e) {
+
+ }
+ }
+ commonParam.getTableSubs().close();
+ }
+
+ @Override
+ public void createConsumers(String groupId) {
+ commonParam.setPullTableConsumers(new
ArrayList<>(CommonParam.getConsumerCount()));
+ for (int i = commonParam.getStartIndex(); i <
commonParam.getConsumerCount(); i++) {
+ commonParam
+ .getPullTableConsumers()
+ .add(
+ new SubscriptionTablePullConsumerBuilder()
+ .host(commonParam.getSrcHost())
+ .port(commonParam.getSrcPort())
+ .consumerId(Constants.CONSUMER_NAME_PREFIX + i)
+ .consumerGroupId(groupId)
+ .autoCommit(Constants.AUTO_COMMIT)
+ .autoCommitIntervalMs(Constants.AUTO_COMMIT_INTERVAL)
+ .fileSaveDir(commonParam.getTargetDir())
+ .buildTablePullConsumer());
+ }
+ commonParam
+ .getPullTableConsumers()
+ .removeIf(
+ consumer -> {
+ try {
+ consumer.open();
+ return false;
+ } catch (SubscriptionException e) {
+ return true;
+ }
+ });
+ commonParam.setConsumerCount(commonParam.getPullTableConsumers().size());
+ }
+
+ @Override
+ public void subscribe(String topicName)
+ throws IoTDBConnectionException, StatementExecutionException {
+ List<ISubscriptionTablePullConsumer> pullTableConsumers =
commonParam.getPullTableConsumers();
+ for (int i = 0; i < pullTableConsumers.size(); i++) {
+ try {
+ pullTableConsumers.get(i).subscribe(topicName);
+ } catch (Exception e) {
+ e.printStackTrace(out);
+ }
+ }
+ }
+
+ @Override
+ public void consumerPoll(ExecutorService executor, String topicName) {
+ List<ISubscriptionTablePullConsumer> pullTableConsumers =
commonParam.getPullTableConsumers();
+ for (int i = commonParam.getStartIndex(); i < pullTableConsumers.size();
i++) {
+ SubscriptionTablePullConsumer consumer =
+ (SubscriptionTablePullConsumer) pullTableConsumers.get(i);
+ final String consumerGroupId = consumer.getConsumerGroupId();
+ executor.submit(
+ new Runnable() {
+ @Override
+ public void run() {
+ int retryCount = 0;
+ while (true) {
+ try {
+ List<SubscriptionMessage> messages =
+
consumer.poll(Duration.ofMillis(Constants.POLL_MESSAGE_TIMEOUT));
+ consumer.commitSync(messages);
+ if (messages.isEmpty()) {
+ retryCount++;
+ if (retryCount >= Constants.MAX_RETRY_TIMES) {
+ break;
+ }
+ }
+ for (final SubscriptionMessage message : messages) {
+ SubscriptionTsFileHandler fp = message.getTsFileHandler();
+ ioTPrinter.println(fp.getFile().getName());
+ try {
+ fp.moveFile(
+ Paths.get(
+ commonParam.getTargetDir() + File.separator +
consumerGroupId,
+ fp.getPath().getFileName().toString()));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ commonParam.getCountFile().incrementAndGet();
+ }
+ } catch (Exception e) {
+ e.printStackTrace(System.out);
+ }
+ }
+ consumer.unsubscribe(topicName);
+ }
+ });
+ }
+ }
+}
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/subscription/SubscriptionTreeTsFile.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/subscription/SubscriptionTreeTsFile.java
new file mode 100644
index 00000000000..72a50f1a65d
--- /dev/null
+++
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/subscription/SubscriptionTreeTsFile.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tool.tsfile.subscription;
+
+import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.rpc.subscription.config.TopicConstant;
+import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
+import
org.apache.iotdb.session.subscription.consumer.tree.SubscriptionTreePullConsumer;
+import org.apache.iotdb.session.subscription.model.Topic;
+import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
+import org.apache.iotdb.session.subscription.payload.SubscriptionTsFileHandler;
+import org.apache.iotdb.tool.common.Constants;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+
+import static java.lang.System.out;
+
+public class SubscriptionTreeTsFile extends AbstractSubscriptionTsFile {
+
+ @Override
+ public void createTopics(String topicName)
+ throws IoTDBConnectionException, StatementExecutionException {
+ Properties properties = new Properties();
+ properties.put(TopicConstant.MODE_KEY, Constants.MODE);
+ properties.put(TopicConstant.FORMAT_KEY, Constants.HANDLER);
+ properties.put(TopicConstant.STRICT_KEY, Constants.STRICT);
+ properties.put(TopicConstant.LOOSE_RANGE_KEY, Constants.LOOSE_RANGE);
+ if (StringUtils.isNotBlank(commonParam.getStartTime()))
+ properties.put(TopicConstant.START_TIME_KEY, commonParam.getStartTime());
+ if (StringUtils.isNotBlank(commonParam.getEndTime()))
+ properties.put(TopicConstant.END_TIME_KEY, commonParam.getEndTime());
+ properties.put(
+ TopicConstant.PATH_KEY,
+ StringUtils.isNotBlank(commonParam.getPath())
+ ? commonParam.getPath()
+ : commonParam.getPathFull());
+ commonParam.getTreeSubs().createTopic(topicName, properties);
+ }
+
+ @Override
+ public void doClean() throws Exception {
+ List<SubscriptionTreePullConsumer> pullTreeConsumers =
commonParam.getPullTreeConsumers();
+ for (int i = commonParam.getStartIndex(); i < pullTreeConsumers.size();
i++) {
+ SubscriptionTreePullConsumer consumer = pullTreeConsumers.get(i);
+ String path =
+ commonParam.getTargetDir()
+ + File.separator
+ + consumer.getConsumerGroupId()
+ + File.separator
+ + consumer.getConsumerId();
+ File file = new File(path);
+ if (file.exists()) {
+ FileUtils.deleteFileOrDirectory(file);
+ }
+ }
+ for (Topic topic : commonParam.getTreeSubs().getTopics()) {
+ try {
+ commonParam.getTreeSubs().dropTopicIfExists(topic.getTopicName());
+ } catch (Exception e) {
+
+ }
+ }
+ commonParam.getTreeSubs().close();
+ }
+
+ @Override
+ public void createConsumers(String groupId) {
+ commonParam.setPullTreeConsumers(new
ArrayList<>(commonParam.getConsumerCount()));
+ for (int i = commonParam.getStartIndex(); i <
commonParam.getConsumerCount(); i++) {
+ commonParam
+ .getPullTreeConsumers()
+ .add(
+ new SubscriptionTreePullConsumer.Builder()
+ .host(commonParam.getSrcHost())
+ .port(commonParam.getSrcPort())
+ .consumerId(Constants.CONSUMER_NAME_PREFIX + i)
+ .consumerGroupId(groupId)
+ .autoCommit(Constants.AUTO_COMMIT)
+ .autoCommitIntervalMs(Constants.AUTO_COMMIT_INTERVAL)
+ .fileSaveDir(commonParam.getTargetDir())
+ .buildPullConsumer());
+ }
+ commonParam
+ .getPullTreeConsumers()
+ .removeIf(
+ consumer -> {
+ try {
+ consumer.open();
+ return false;
+ } catch (SubscriptionException e) {
+ return true;
+ }
+ });
+ commonParam.setConsumerCount(commonParam.getPullTreeConsumers().size());
+ }
+
+ @Override
+ public void subscribe(String topicName)
+ throws IoTDBConnectionException, StatementExecutionException {
+ List<SubscriptionTreePullConsumer> pullTreeConsumers =
commonParam.getPullTreeConsumers();
+ for (int i = 0; i < pullTreeConsumers.size(); i++) {
+ try {
+ pullTreeConsumers.get(i).subscribe(topicName);
+ } catch (Exception e) {
+ e.printStackTrace(out);
+ }
+ }
+ }
+
+ @Override
+ public void consumerPoll(ExecutorService executor, String topicName) {
+ List<SubscriptionTreePullConsumer> pullTreeConsumers =
commonParam.getPullTreeConsumers();
+ for (int i = commonParam.getStartIndex(); i < pullTreeConsumers.size();
i++) {
+ SubscriptionTreePullConsumer consumer =
commonParam.getPullTreeConsumers().get(i);
+ executor.submit(
+ new Runnable() {
+ @Override
+ public void run() {
+ int retryCount = 0;
+ while (true) {
+ try {
+ List<SubscriptionMessage> messages =
+
consumer.poll(Duration.ofMillis(Constants.POLL_MESSAGE_TIMEOUT));
+ consumer.commitSync(messages);
+ if (messages.isEmpty()) {
+ retryCount++;
+ if (retryCount >= Constants.MAX_RETRY_TIMES) {
+ break;
+ }
+ }
+ for (final SubscriptionMessage message : messages) {
+ SubscriptionTsFileHandler fp = message.getTsFileHandler();
+ ioTPrinter.println(fp.getFile().getName());
+ try {
+ fp.moveFile(
+ Paths.get(
+ commonParam.getTargetDir()
+ + File.separator
+ + consumer.getConsumerGroupId(),
+ fp.getPath().getFileName().toString()));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ commonParam.getCountFile().incrementAndGet();
+ }
+ } catch (Exception e) {
+ e.printStackTrace(System.out);
+ }
+ }
+ consumer.unsubscribe(topicName);
+ }
+ });
+ }
+ }
+}