This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 3808fb99b2994fd35fb11de90b378a95bedff5ba Author: Siyang Tang <[email protected]> AuthorDate: Thu Sep 14 21:42:28 2023 +0800 [enhancement](broker-load) support compress type for old broker load, and split compress type from file format (#23882) --- .../Load/BROKER-LOAD.md | 4 + .../Load/BROKER-LOAD.md | 4 + fe/fe-core/src/main/cup/sql_parser.cup | 15 ++- .../org/apache/doris/analysis/DataDescription.java | 88 +++++++++----- .../java/org/apache/doris/common/util/Util.java | 46 +++++--- .../org/apache/doris/load/BrokerFileGroup.java | 11 -- .../doris/planner/external/FileGroupInfo.java | 31 ++--- .../doris/planner/external/LoadScanProvider.java | 34 ++---- fe/fe-core/src/main/jflex/sql_scanner.flex | 1 + .../load_p0/broker_load/test_compress_type.out | 4 + .../suites/load_p0/broker_load/ddl/basic_data.sql | 29 +++++ .../load_p0/broker_load/ddl/basic_data_drop.sql | 1 + .../load_p0/broker_load/test_compress_type.groovy | 129 +++++++++++++++++++++ 13 files changed, 292 insertions(+), 105 deletions(-) diff --git a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md index 0cbfc8fd54..a54b7c815e 100644 --- a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md +++ b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md @@ -67,6 +67,7 @@ WITH BROKER broker_name [COLUMNS TERMINATED BY "column_separator"] [LINES TERMINATED BY "line_delimiter"] [FORMAT AS "file_type"] + [COMPRESS_TYPE AS "compress_type"] [(column_list)] [COLUMNS FROM PATH AS (c1, c2, ...)] [SET (column_mapping)] @@ -105,6 +106,9 @@ WITH BROKER broker_name Specifies the file type, CSV, PARQUET and ORC formats are supported. Default is CSV. + - `COMPRESS_TYPE AS` + Specifies the file compress type, GZ/LZO/BZ2/LZ4FRAME/DEFLATE/LZOP + - `column list` Used to specify the column order in the original file. For a detailed introduction to this part, please refer to the [Column Mapping, Conversion and Filtering](../../../../data-operate/import/import-scenes/load-data-convert.md) document. diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md index 6580fdf8f5..907f7cdf67 100644 --- a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md +++ b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md @@ -67,6 +67,7 @@ WITH BROKER broker_name [COLUMNS TERMINATED BY "column_separator"] [FORMAT AS "file_type"] [LINES TERMINATED BY "line_delimiter"] + [COMPRESS_TYPE AS "compress_type"] [(column_list)] [COLUMNS FROM PATH AS (c1, c2, ...)] [SET (column_mapping)] @@ -105,6 +106,9 @@ WITH BROKER broker_name 指定文件类型,支持 CSV、PARQUET 和 ORC 格式。默认为 CSV。 + - `COMPRESS_TYPE AS` + 指定文件压缩类型, 支持GZ/BZ2/LZ4FRAME。 + - `column list` 用于指定原始文件中的列顺序。关于这部分详细介绍,可以参阅 [列的映射,转换与过滤](../../../../data-operate/import/import-scenes/load-data-convert.md) 文档。 diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index 99758c1dbd..8c2e457451 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -303,6 +303,7 @@ terminal String KW_COMMITTED, KW_COMPACT, KW_COMPLETE, + KW_COMPRESS_TYPE, KW_CONFIG, KW_CONNECTION, KW_CONNECTION_ID, @@ -891,8 +892,8 @@ nonterminal GroupByClause group_by_clause, grouping_elements; // nonterminal String keyword, ident, ident_or_text, variable_name, charset_name_or_default, old_or_new_charset_name_or_default, opt_collate, - collation_name_or_default, type_func_name_keyword, type_function_name, opt_file_format, time_unit, - literal_or_ident; + collation_name_or_default, type_func_name_keyword, type_function_name, opt_file_format, opt_file_compress_type, + time_unit, literal_or_ident; nonterminal PassVar text_or_password; // sync job @@ -2407,6 +2408,7 @@ data_desc ::= opt_field_term:colSep opt_line_term:lineDelimiter opt_file_format:fileFormat + opt_file_compress_type:fileCompressType opt_col_list:colList opt_columns_from_path:columnsFromPath opt_col_mapping_list:colMappingList @@ -2416,7 +2418,7 @@ data_desc ::= sequence_col_clause:sequenceColName opt_properties:properties {: - RESULT = new DataDescription(tableName, partitionNames, files, colList, colSep, lineDelimiter, fileFormat, + RESULT = new DataDescription(tableName, partitionNames, files, colList, colSep, lineDelimiter, fileFormat, fileCompressType, columnsFromPath, isNeg, colMappingList, preFilterExpr, whereExpr, mergeType, deleteExpr, sequenceColName, properties); :} | opt_merge_type:mergeType KW_DATA KW_FROM KW_TABLE ident:srcTableName @@ -2523,6 +2525,13 @@ opt_file_format ::= {: RESULT = format; :} ; +opt_file_compress_type ::= + /* Empty */ + {: RESULT = null; :} + | KW_COMPRESS_TYPE KW_AS ident_or_text:compress_type + {: RESULT = compress_type; :} + ; + opt_columns_from_path ::= /* Empty */ {: RESULT = null; :} diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java index c60cbb76af..fc867847f1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java @@ -28,6 +28,7 @@ import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; import org.apache.doris.common.util.SqlParserUtils; import org.apache.doris.common.util.Util; @@ -173,22 +174,22 @@ public class DataDescription implements InsertStmt.DataDesc { } public DataDescription(String tableName, - PartitionNames partitionNames, - List<String> filePaths, - List<String> columns, - Separator columnSeparator, - String fileFormat, - List<String> columnsFromPath, - boolean isNegative, - List<Expr> columnMappingList, - Expr fileFilterExpr, - Expr whereExpr, - LoadTask.MergeType mergeType, - Expr deleteCondition, - String sequenceColName, - Map<String, String> properties) { + PartitionNames partitionNames, + List<String> filePaths, + List<String> columns, + Separator columnSeparator, + String fileFormat, + List<String> columnsFromPath, + boolean isNegative, + List<Expr> columnMappingList, + Expr fileFilterExpr, + Expr whereExpr, + LoadTask.MergeType mergeType, + Expr deleteCondition, + String sequenceColName, + Map<String, String> properties) { this(tableName, partitionNames, filePaths, columns, columnSeparator, null, - fileFormat, columnsFromPath, isNegative, columnMappingList, fileFilterExpr, whereExpr, + fileFormat, null, columnsFromPath, isNegative, columnMappingList, fileFilterExpr, whereExpr, mergeType, deleteCondition, sequenceColName, properties); } @@ -199,6 +200,7 @@ public class DataDescription implements InsertStmt.DataDesc { Separator columnSeparator, Separator lineDelimiter, String fileFormat, + String compressType, List<String> columnsFromPath, boolean isNegative, List<Expr> columnMappingList, @@ -215,6 +217,7 @@ public class DataDescription implements InsertStmt.DataDesc { this.columnSeparator = columnSeparator; this.lineDelimiter = lineDelimiter; this.fileFormat = fileFormat; + this.compressType = Util.getFileCompressType(compressType); this.columnsFromPath = columnsFromPath; this.isNegative = isNegative; this.columnMappingList = columnMappingList; @@ -362,8 +365,8 @@ public class DataDescription implements InsertStmt.DataDesc { } public static void validateMappingFunction(String functionName, List<String> args, - Map<String, String> columnNameMap, - Column mappingColumn, boolean isHadoopLoad) throws AnalysisException { + Map<String, String> columnNameMap, + Column mappingColumn, boolean isHadoopLoad) throws AnalysisException { if (functionName.equalsIgnoreCase("alignment_timestamp")) { validateAlignmentTimestamp(args, columnNameMap); } else if (functionName.equalsIgnoreCase("strftime")) { @@ -1050,6 +1053,13 @@ public class DataDescription implements InsertStmt.DataDesc { if (isAnalyzed) { return; } + checkLoadPriv(fullDbName); + checkMergeType(); + analyzeWithoutCheckPriv(fullDbName); + isAnalyzed = true; + } + + private void checkMergeType() throws AnalysisException { if (mergeType != LoadTask.MergeType.MERGE && deleteCondition != null) { throw new AnalysisException("not support DELETE ON clause when merge type is not MERGE."); } @@ -1059,24 +1069,32 @@ public class DataDescription implements InsertStmt.DataDesc { if (mergeType != LoadTask.MergeType.APPEND && isNegative) { throw new AnalysisException("not support MERGE or DELETE with NEGATIVE."); } - checkLoadPriv(fullDbName); - analyzeWithoutCheckPriv(fullDbName); - if (isNegative && mergeType != LoadTask.MergeType.APPEND) { - throw new AnalysisException("Negative is only used when merge type is append."); - } - isAnalyzed = true; } public void analyzeWithoutCheckPriv(String fullDbName) throws AnalysisException { + analyzeFilePaths(); + + analyzeLoadAttributes(); + + analyzeColumns(); + analyzeMultiLoadColumns(); + analyzeSequenceCol(fullDbName); + + if (properties != null) { + analyzeProperties(); + } + } + + private void analyzeFilePaths() throws AnalysisException { if (!isLoadFromTable()) { if (filePaths == null || filePaths.isEmpty()) { throw new AnalysisException("No file path in load statement."); } - for (int i = 0; i < filePaths.size(); ++i) { - filePaths.set(i, filePaths.get(i).trim()); - } + filePaths.replaceAll(String::trim); } + } + private void analyzeLoadAttributes() throws AnalysisException { if (columnSeparator != null) { columnSeparator.analyze(); } @@ -1089,12 +1107,18 @@ public class DataDescription implements InsertStmt.DataDesc { partitionNames.analyze(null); } - analyzeColumns(); - analyzeMultiLoadColumns(); - analyzeSequenceCol(fullDbName); - - if (properties != null) { - analyzeProperties(); + // file format + // note(tsy): for historical reason, file format here must be string type rather than TFileFormatType + if (fileFormat != null) { + if (!fileFormat.equalsIgnoreCase("parquet") + && !fileFormat.equalsIgnoreCase(FeConstants.csv) + && !fileFormat.equalsIgnoreCase("orc") + && !fileFormat.equalsIgnoreCase("json") + && !fileFormat.equalsIgnoreCase(FeConstants.csv_with_names) + && !fileFormat.equalsIgnoreCase(FeConstants.csv_with_names_and_types) + && !fileFormat.equalsIgnoreCase("hive_text")) { + throw new AnalysisException("File Format Type " + fileFormat + " is invalid."); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java index 0dff4b6caa..2cdc3f1972 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java @@ -21,6 +21,7 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; +import org.apache.doris.common.FeConstants; import org.apache.doris.common.FeNameFormat; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.qe.ConnectContext; @@ -50,6 +51,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Random; import java.util.function.LongUnaryOperator; import java.util.function.Predicate; @@ -534,29 +536,38 @@ public class Util { @NotNull - public static TFileFormatType getFileFormatType(String path) { + public static TFileFormatType getFileFormatTypeFromPath(String path) { String lowerCasePath = path.toLowerCase(); - if (lowerCasePath.endsWith(".parquet") || lowerCasePath.endsWith(".parq")) { + if (lowerCasePath.contains(".parquet") || lowerCasePath.contains(".parq")) { return TFileFormatType.FORMAT_PARQUET; - } else if (lowerCasePath.endsWith(".gz")) { - return TFileFormatType.FORMAT_CSV_GZ; - } else if (lowerCasePath.endsWith(".bz2")) { - return TFileFormatType.FORMAT_CSV_BZ2; - } else if (lowerCasePath.endsWith(".lz4")) { - return TFileFormatType.FORMAT_CSV_LZ4FRAME; - } else if (lowerCasePath.endsWith(".lzo")) { - return TFileFormatType.FORMAT_CSV_LZOP; - } else if (lowerCasePath.endsWith(".lzo_deflate")) { - return TFileFormatType.FORMAT_CSV_LZO; - } else if (lowerCasePath.endsWith(".deflate")) { - return TFileFormatType.FORMAT_CSV_DEFLATE; - } else if (lowerCasePath.endsWith(".snappy")) { - return TFileFormatType.FORMAT_CSV_SNAPPYBLOCK; + } else if (lowerCasePath.contains(".orc")) { + return TFileFormatType.FORMAT_ORC; + } else if (lowerCasePath.contains(".json")) { + return TFileFormatType.FORMAT_JSON; } else { return TFileFormatType.FORMAT_CSV_PLAIN; } } + public static TFileFormatType getFileFormatTypeFromName(String formatName) { + String lowerFileFormat = Objects.requireNonNull(formatName).toLowerCase(); + if (lowerFileFormat.equals("parquet")) { + return TFileFormatType.FORMAT_PARQUET; + } else if (lowerFileFormat.equals("orc")) { + return TFileFormatType.FORMAT_ORC; + } else if (lowerFileFormat.equals("json")) { + return TFileFormatType.FORMAT_JSON; + // csv/csv_with_name/csv_with_names_and_types treat as csv format + } else if (lowerFileFormat.equals(FeConstants.csv) || lowerFileFormat.equals(FeConstants.csv_with_names) + || lowerFileFormat.equals(FeConstants.csv_with_names_and_types) + // TODO: Add TEXTFILE to TFileFormatType to Support hive text file format. + || lowerFileFormat.equals(FeConstants.text)) { + return TFileFormatType.FORMAT_CSV_PLAIN; + } else { + return TFileFormatType.FORMAT_UNKNOWN; + } + } + /** * Infer {@link TFileCompressType} from file name. * @@ -585,6 +596,9 @@ public class Util { } public static TFileCompressType getFileCompressType(String compressType) { + if (Strings.isNullOrEmpty(compressType)) { + return TFileCompressType.UNKNOWN; + } final String upperCaseType = compressType.toUpperCase(); return TFileCompressType.valueOf(upperCaseType); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java index 332dcfb049..c193ab8f77 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java @@ -34,7 +34,6 @@ import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.Table; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; -import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; @@ -214,16 +213,6 @@ public class BrokerFileGroup implements Writable { escape = dataDescription.getEscape(); fileFormat = dataDescription.getFileFormat(); - if (fileFormat != null) { - if (!fileFormat.equalsIgnoreCase("parquet") && !fileFormat.equalsIgnoreCase(FeConstants.csv) - && !fileFormat.equalsIgnoreCase("orc") - && !fileFormat.equalsIgnoreCase("json") - && !fileFormat.equalsIgnoreCase(FeConstants.csv_with_names) - && !fileFormat.equalsIgnoreCase(FeConstants.csv_with_names_and_types) - && !fileFormat.equalsIgnoreCase("hive_text")) { - throw new DdlException("File Format Type " + fileFormat + " is invalid."); - } - } columnSeparator = dataDescription.getColumnSeparator(); if (columnSeparator == null) { if (fileFormat != null && fileFormat.equalsIgnoreCase("hive_text")) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java index f626bb71c2..80fd901345 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java @@ -24,7 +24,6 @@ import org.apache.doris.catalog.FsBroker; import org.apache.doris.catalog.Table; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; -import org.apache.doris.common.FeConstants; import org.apache.doris.common.UserException; import org.apache.doris.common.util.BrokerUtil; import org.apache.doris.common.util.Util; @@ -207,6 +206,9 @@ public class FileGroupInfo { // header_type TFileFormatType formatType = formatType(context.fileGroup.getFileFormat(), fileStatus.path); context.params.setFormatType(formatType); + context.params.setCompressType( + Util.getOrInferCompressType(context.fileGroup.getCompressType(), fileStatus.path) + ); List<String> columnsFromPath = BrokerUtil.parseColumnsFromPath(fileStatus.path, context.fileGroup.getColumnNamesFromPath()); // Assign scan range locations only for broker load. @@ -293,26 +295,15 @@ public class FileGroupInfo { } private TFileFormatType formatType(String fileFormat, String path) throws UserException { - if (fileFormat != null) { - if (fileFormat.equalsIgnoreCase("parquet")) { - return TFileFormatType.FORMAT_PARQUET; - } else if (fileFormat.equalsIgnoreCase("orc")) { - return TFileFormatType.FORMAT_ORC; - } else if (fileFormat.equalsIgnoreCase("json")) { - return TFileFormatType.FORMAT_JSON; - // csv/csv_with_name/csv_with_names_and_types treat as csv format - } else if (fileFormat.equalsIgnoreCase(FeConstants.csv) || fileFormat.toLowerCase() - .equals(FeConstants.csv_with_names) || fileFormat.toLowerCase() - .equals(FeConstants.csv_with_names_and_types) - // TODO: Add TEXTFILE to TFileFormatType to Support hive text file format. - || fileFormat.equalsIgnoreCase(FeConstants.text)) { - return TFileFormatType.FORMAT_CSV_PLAIN; - } else { - throw new UserException("Not supported file format: " + fileFormat); - } + if (fileFormat == null) { + // get file format by the file path + return Util.getFileFormatTypeFromPath(path); } - - return Util.getFileFormatType(path); + TFileFormatType formatType = Util.getFileFormatTypeFromName(fileFormat); + if (formatType == TFileFormatType.FORMAT_UNKNOWN) { + throw new UserException("Not supported file format: " + fileFormat); + } + return formatType; } private TFileRangeDesc createFileRangeDesc(long curFileOffset, TBrokerFileStatus fileStatus, long rangeBytes, diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java index 7f8c85d196..d36958ad88 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java @@ -88,7 +88,7 @@ public class LoadScanProvider { ctx.timezone = analyzer.getTimezone(); TFileScanRangeParams params = new TFileScanRangeParams(); - params.setFormatType(formatType(fileGroupInfo.getFileGroup().getFileFormat(), "")); + params.setFormatType(formatType(fileGroupInfo.getFileGroup().getFileFormat())); params.setCompressType(fileGroupInfo.getFileGroup().getCompressType()); params.setStrictMode(fileGroupInfo.isStrictMode()); if (fileGroupInfo.getFileGroup().getFileFormat() != null @@ -211,7 +211,7 @@ public class LoadScanProvider { List<Integer> srcSlotIds = Lists.newArrayList(); Load.initColumns(fileGroupInfo.getTargetTable(), columnDescs, context.fileGroup.getColumnToHadoopFunction(), context.exprMap, analyzer, context.srcTupleDescriptor, context.srcSlotDescByName, srcSlotIds, - formatType(context.fileGroup.getFileFormat(), ""), fileGroupInfo.getHiddenColumns(), + formatType(context.fileGroup.getFileFormat()), fileGroupInfo.getHiddenColumns(), fileGroupInfo.isPartialUpdate()); int columnCountFromPath = 0; @@ -242,28 +242,16 @@ public class LoadScanProvider { .equalsIgnoreCase(Column.DELETE_SIGN); } - private TFileFormatType formatType(String fileFormat, String path) throws UserException { - if (fileFormat != null) { - String lowerFileFormat = fileFormat.toLowerCase(); - if (lowerFileFormat.equals("parquet")) { - return TFileFormatType.FORMAT_PARQUET; - } else if (lowerFileFormat.equals("orc")) { - return TFileFormatType.FORMAT_ORC; - } else if (lowerFileFormat.equals("json")) { - return TFileFormatType.FORMAT_JSON; - // csv/csv_with_name/csv_with_names_and_types treat as csv format - } else if (lowerFileFormat.equals(FeConstants.csv) || lowerFileFormat.equals(FeConstants.csv_with_names) - || lowerFileFormat.equals(FeConstants.csv_with_names_and_types) - // TODO: Add TEXTFILE to TFileFormatType to Support hive text file format. - || lowerFileFormat.equals(FeConstants.text)) { - return TFileFormatType.FORMAT_CSV_PLAIN; - } else { - throw new UserException("Not supported file format: " + fileFormat); - } - } else { - // get file format by the suffix of file - return Util.getFileFormatType(path); + private TFileFormatType formatType(String fileFormat) throws UserException { + if (fileFormat == null) { + // get file format by the file path + return TFileFormatType.FORMAT_CSV_PLAIN; + } + TFileFormatType formatType = Util.getFileFormatTypeFromName(fileFormat); + if (formatType == TFileFormatType.FORMAT_UNKNOWN) { + throw new UserException("Not supported file format: " + fileFormat); } + return formatType; } public TableIf getTargetTable() { diff --git a/fe/fe-core/src/main/jflex/sql_scanner.flex b/fe/fe-core/src/main/jflex/sql_scanner.flex index 56f8faae18..4f4f3392dc 100644 --- a/fe/fe-core/src/main/jflex/sql_scanner.flex +++ b/fe/fe-core/src/main/jflex/sql_scanner.flex @@ -231,6 +231,7 @@ import org.apache.doris.qe.SqlModeHelper; keywordMap.put("for", new Integer(SqlParserSymbols.KW_FOR)); keywordMap.put("force", new Integer(SqlParserSymbols.KW_FORCE)); keywordMap.put("format", new Integer(SqlParserSymbols.KW_FORMAT)); + keywordMap.put("compress_type", new Integer(SqlParserSymbols.KW_COMPRESS_TYPE)); keywordMap.put("free", new Integer(SqlParserSymbols.KW_FREE)); keywordMap.put("from", new Integer(SqlParserSymbols.KW_FROM)); keywordMap.put("frontend", new Integer(SqlParserSymbols.KW_FRONTEND)); diff --git a/regression-test/data/load_p0/broker_load/test_compress_type.out b/regression-test/data/load_p0/broker_load/test_compress_type.out new file mode 100644 index 0000000000..e66359296c --- /dev/null +++ b/regression-test/data/load_p0/broker_load/test_compress_type.out @@ -0,0 +1,4 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +240 + diff --git a/regression-test/suites/load_p0/broker_load/ddl/basic_data.sql b/regression-test/suites/load_p0/broker_load/ddl/basic_data.sql new file mode 100644 index 0000000000..41c3660e11 --- /dev/null +++ b/regression-test/suites/load_p0/broker_load/ddl/basic_data.sql @@ -0,0 +1,29 @@ +CREATE TABLE basic_data +( + k00 INT NOT NULL, + k01 DATE NOT NULL, + k02 BOOLEAN NULL, + k03 TINYINT NULL, + k04 SMALLINT NULL, + k05 INT NULL, + k06 BIGINT NULL, + k07 LARGEINT NULL, + k08 FLOAT NULL, + k09 DOUBLE NULL, + k10 DECIMAL(9,1) NULL, + k11 DECIMALV3(9,1) NULL, + k12 DATETIME NULL, + k13 DATEV2 NULL, + k14 DATETIMEV2 NULL, + k15 CHAR NULL, + k16 VARCHAR NULL, + k17 STRING NULL, + k18 JSON NULL + +) +DUPLICATE KEY(k00) +DISTRIBUTED BY HASH(k00) BUCKETS 32 +PROPERTIES ( + "bloom_filter_columns"="k05", + "replication_num" = "1" +); \ No newline at end of file diff --git a/regression-test/suites/load_p0/broker_load/ddl/basic_data_drop.sql b/regression-test/suites/load_p0/broker_load/ddl/basic_data_drop.sql new file mode 100644 index 0000000000..c4ad606d9b --- /dev/null +++ b/regression-test/suites/load_p0/broker_load/ddl/basic_data_drop.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS basic_data \ No newline at end of file diff --git a/regression-test/suites/load_p0/broker_load/test_compress_type.groovy b/regression-test/suites/load_p0/broker_load/test_compress_type.groovy new file mode 100644 index 0000000000..50d4e34cd9 --- /dev/null +++ b/regression-test/suites/load_p0/broker_load/test_compress_type.groovy @@ -0,0 +1,129 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_compress_type", "load_p0") { + def tableName = "basic_data" + + // GZ/LZO/BZ2/LZ4FRAME/DEFLATE/LZOP + def compressTypes = [ + "COMPRESS_TYPE AS \"GZ\"", + "COMPRESS_TYPE AS \"BZ2\"", + "COMPRESS_TYPE AS \"LZ4FRAME\"", + "COMPRESS_TYPE AS \"GZ\"", + "COMPRESS_TYPE AS \"BZ2\"", + "COMPRESS_TYPE AS \"LZ4FRAME\"", + "", + "", + "", + "", + "", + "", + ] + + def fileFormat = [ + "FORMAT AS \"CSV\"", + "FORMAT AS \"CSV\"", + "FORMAT AS \"CSV\"", + "", + "", + "", + "FORMAT AS \"CSV\"", + "FORMAT AS \"CSV\"", + "FORMAT AS \"CSV\"", + "", + "", + "" + ] + + def paths = [ + "s3://doris-build-1308700295/regression/load/data/basic_data.csv.gz", + "s3://doris-build-1308700295/regression/load/data/basic_data.csv.bz2", + "s3://doris-build-1308700295/regression/load/data/basic_data.csv.lz4", + "s3://doris-build-1308700295/regression/load/data/basic_data.csv.gz", + "s3://doris-build-1308700295/regression/load/data/basic_data.csv.bz2", + "s3://doris-build-1308700295/regression/load/data/basic_data.csv.lz4", + "s3://doris-build-1308700295/regression/load/data/basic_data.csv.gz", + "s3://doris-build-1308700295/regression/load/data/basic_data.csv.bz2", + "s3://doris-build-1308700295/regression/load/data/basic_data.csv.lz4", + "s3://doris-build-1308700295/regression/load/data/basic_data.csv.gz", + "s3://doris-build-1308700295/regression/load/data/basic_data.csv.bz2", + "s3://doris-build-1308700295/regression/load/data/basic_data.csv.lz4" + ] + def labels = [] + + def ak = getS3AK() + def sk = getS3SK() + + + def i = 0 + sql new File("""${ context.file.parent }/ddl/basic_data_drop.sql""").text + sql new File("""${ context.file.parent }/ddl/basic_data.sql""").text + for (String compressType : compressTypes) { + def label = "test_s3_load_compress" + UUID.randomUUID().toString().replace("-", "0") + i + labels.add(label) + def format_str = fileFormat[i] + def path = paths[i] + def sql_str = """ + LOAD LABEL $label ( + DATA INFILE("$path") + INTO TABLE $tableName + COLUMNS TERMINATED BY "|" + $format_str + $compressType + ) + WITH S3 ( + "AWS_ACCESS_KEY" = "$ak", + "AWS_SECRET_KEY" = "$sk", + "AWS_ENDPOINT" = "cos.ap-beijing.myqcloud.com", + "AWS_REGION" = "ap-beijing" + ) + properties( + "use_new_load_scan_node" = "true" + ) + """ + logger.info("submit sql: ${sql_str}"); + sql """${sql_str}""" + logger.info("Submit load with lable: $label, table: $tableName, path: $path") + ++i + } + + i = 0 + for (String label in labels) { + def max_try_milli_secs = 600000 + while (max_try_milli_secs > 0) { + String[][] result = sql """ show load where label="$label" order by createtime desc limit 1; """ + if (result[0][2].equals("FINISHED")) { + logger.info("Load FINISHED " + label) + break + } + if (result[0][2].equals("CANCELLED")) { + def reason = result[0][7] + logger.info("load failed, index: $i, reason:$reason") + assertTrue(1 == 2) + break; + } + Thread.sleep(1000) + max_try_milli_secs -= 1000 + if(max_try_milli_secs <= 0) { + assertTrue(1 == 2, "load Timeout: $label") + } + } + i++ + } + + qt_sql """ select count(*) from ${tableName} """ +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
