This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f61e6483bf [enhancement](broker-load) support compress type for old
broker load, and split compress type from file format (#23882)
f61e6483bf is described below
commit f61e6483bf809b6a9f4d7900d87445d2a7b25cbe
Author: Siyang Tang <[email protected]>
AuthorDate: Thu Sep 14 21:42:28 2023 +0800
[enhancement](broker-load) support compress type for old broker load, and
split compress type from file format (#23882)
---
.../Load/BROKER-LOAD.md | 4 +
.../Load/BROKER-LOAD.md | 4 +
fe/fe-core/src/main/cup/sql_parser.cup | 15 ++-
.../org/apache/doris/analysis/DataDescription.java | 88 +++++++++-----
.../java/org/apache/doris/common/util/Util.java | 46 +++++---
.../org/apache/doris/load/BrokerFileGroup.java | 11 --
.../doris/planner/external/FileGroupInfo.java | 31 ++---
.../doris/planner/external/LoadScanProvider.java | 34 ++----
fe/fe-core/src/main/jflex/sql_scanner.flex | 1 +
.../load_p0/broker_load/test_compress_type.out | 4 +
.../suites/load_p0/broker_load/ddl/basic_data.sql | 29 +++++
.../load_p0/broker_load/ddl/basic_data_drop.sql | 1 +
.../load_p0/broker_load/test_compress_type.groovy | 129 +++++++++++++++++++++
13 files changed, 292 insertions(+), 105 deletions(-)
diff --git
a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md
b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md
index dac37e7a15..b6922bc41c 100644
---
a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md
+++
b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md
@@ -67,6 +67,7 @@ WITH BROKER broker_name
[COLUMNS TERMINATED BY "column_separator"]
[LINES TERMINATED BY "line_delimiter"]
[FORMAT AS "file_type"]
+ [COMPRESS_TYPE AS "compress_type"]
[(column_list)]
[COLUMNS FROM PATH AS (c1, c2, ...)]
[SET (column_mapping)]
@@ -105,6 +106,9 @@ WITH BROKER broker_name
Specifies the file type, CSV, PARQUET and ORC formats are supported.
Default is CSV.
+ - `COMPRESS_TYPE AS`
+ Specifies the file compress type, GZ/LZO/BZ2/LZ4FRAME/DEFLATE/LZOP
+
- `column list`
Used to specify the column order in the original file. For a detailed
introduction to this part, please refer to the [Column Mapping, Conversion and
Filtering](../../../../data-operate/import/import-scenes/load-data-convert.md)
document.
diff --git
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md
index 962108240f..9e3ecc80ce 100644
---
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md
+++
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md
@@ -67,6 +67,7 @@ WITH BROKER broker_name
[COLUMNS TERMINATED BY "column_separator"]
[LINES TERMINATED BY "line_delimiter"]
[FORMAT AS "file_type"]
+ [COMPRESS_TYPE AS "compress_type"]
[(column_list)]
[COLUMNS FROM PATH AS (c1, c2, ...)]
[SET (column_mapping)]
@@ -105,6 +106,9 @@ WITH BROKER broker_name
指定文件类型,支持 CSV、PARQUET 和 ORC 格式。默认为 CSV。
+ - `COMPRESS_TYPE AS`
+ 指定文件压缩类型, 支持GZ/BZ2/LZ4FRAME。
+
- `column list`
用于指定原始文件中的列顺序。关于这部分详细介绍,可以参阅
[列的映射,转换与过滤](../../../../data-operate/import/import-scenes/load-data-convert.md)
文档。
diff --git a/fe/fe-core/src/main/cup/sql_parser.cup
b/fe/fe-core/src/main/cup/sql_parser.cup
index 50066f9c8c..f0e83f0964 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -304,6 +304,7 @@ terminal String
KW_COMMITTED,
KW_COMPACT,
KW_COMPLETE,
+ KW_COMPRESS_TYPE,
KW_CONFIG,
KW_CONNECTION,
KW_CONNECTION_ID,
@@ -904,8 +905,8 @@ nonterminal GroupByClause group_by_clause,
grouping_elements;
//
nonterminal String keyword, ident, ident_or_text, variable_name,
charset_name_or_default, old_or_new_charset_name_or_default,
opt_collate,
- collation_name_or_default, type_func_name_keyword, type_function_name,
opt_file_format, time_unit,
- literal_or_ident;
+ collation_name_or_default, type_func_name_keyword, type_function_name,
opt_file_format, opt_file_compress_type,
+ time_unit, literal_or_ident;
nonterminal PassVar text_or_password;
// sync job
@@ -2433,6 +2434,7 @@ data_desc ::=
opt_field_term:colSep
opt_line_term:lineDelimiter
opt_file_format:fileFormat
+ opt_file_compress_type:fileCompressType
opt_col_list:colList
opt_columns_from_path:columnsFromPath
opt_col_mapping_list:colMappingList
@@ -2442,7 +2444,7 @@ data_desc ::=
sequence_col_clause:sequenceColName
opt_properties:properties
{:
- RESULT = new DataDescription(tableName, partitionNames, files,
colList, colSep, lineDelimiter, fileFormat,
+ RESULT = new DataDescription(tableName, partitionNames, files,
colList, colSep, lineDelimiter, fileFormat, fileCompressType,
columnsFromPath, isNeg, colMappingList, preFilterExpr, whereExpr,
mergeType, deleteExpr, sequenceColName, properties);
:}
| opt_merge_type:mergeType KW_DATA KW_FROM KW_TABLE ident:srcTableName
@@ -2549,6 +2551,13 @@ opt_file_format ::=
{: RESULT = format; :}
;
+opt_file_compress_type ::=
+ /* Empty */
+ {: RESULT = null; :}
+ | KW_COMPRESS_TYPE KW_AS ident_or_text:compress_type
+ {: RESULT = compress_type; :}
+ ;
+
opt_columns_from_path ::=
/* Empty */
{: RESULT = null; :}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
index c60cbb76af..fc867847f1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
@@ -28,6 +28,7 @@ import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.common.util.Util;
@@ -173,22 +174,22 @@ public class DataDescription implements
InsertStmt.DataDesc {
}
public DataDescription(String tableName,
- PartitionNames partitionNames,
- List<String> filePaths,
- List<String> columns,
- Separator columnSeparator,
- String fileFormat,
- List<String> columnsFromPath,
- boolean isNegative,
- List<Expr> columnMappingList,
- Expr fileFilterExpr,
- Expr whereExpr,
- LoadTask.MergeType mergeType,
- Expr deleteCondition,
- String sequenceColName,
- Map<String, String> properties) {
+ PartitionNames partitionNames,
+ List<String> filePaths,
+ List<String> columns,
+ Separator columnSeparator,
+ String fileFormat,
+ List<String> columnsFromPath,
+ boolean isNegative,
+ List<Expr> columnMappingList,
+ Expr fileFilterExpr,
+ Expr whereExpr,
+ LoadTask.MergeType mergeType,
+ Expr deleteCondition,
+ String sequenceColName,
+ Map<String, String> properties) {
this(tableName, partitionNames, filePaths, columns, columnSeparator,
null,
- fileFormat, columnsFromPath, isNegative, columnMappingList,
fileFilterExpr, whereExpr,
+ fileFormat, null, columnsFromPath, isNegative,
columnMappingList, fileFilterExpr, whereExpr,
mergeType, deleteCondition, sequenceColName, properties);
}
@@ -199,6 +200,7 @@ public class DataDescription implements InsertStmt.DataDesc
{
Separator columnSeparator,
Separator lineDelimiter,
String fileFormat,
+ String compressType,
List<String> columnsFromPath,
boolean isNegative,
List<Expr> columnMappingList,
@@ -215,6 +217,7 @@ public class DataDescription implements InsertStmt.DataDesc
{
this.columnSeparator = columnSeparator;
this.lineDelimiter = lineDelimiter;
this.fileFormat = fileFormat;
+ this.compressType = Util.getFileCompressType(compressType);
this.columnsFromPath = columnsFromPath;
this.isNegative = isNegative;
this.columnMappingList = columnMappingList;
@@ -362,8 +365,8 @@ public class DataDescription implements InsertStmt.DataDesc
{
}
public static void validateMappingFunction(String functionName,
List<String> args,
- Map<String, String> columnNameMap,
- Column mappingColumn, boolean isHadoopLoad) throws
AnalysisException {
+ Map<String, String>
columnNameMap,
+ Column mappingColumn, boolean
isHadoopLoad) throws AnalysisException {
if (functionName.equalsIgnoreCase("alignment_timestamp")) {
validateAlignmentTimestamp(args, columnNameMap);
} else if (functionName.equalsIgnoreCase("strftime")) {
@@ -1050,6 +1053,13 @@ public class DataDescription implements
InsertStmt.DataDesc {
if (isAnalyzed) {
return;
}
+ checkLoadPriv(fullDbName);
+ checkMergeType();
+ analyzeWithoutCheckPriv(fullDbName);
+ isAnalyzed = true;
+ }
+
+ private void checkMergeType() throws AnalysisException {
if (mergeType != LoadTask.MergeType.MERGE && deleteCondition != null) {
throw new AnalysisException("not support DELETE ON clause when
merge type is not MERGE.");
}
@@ -1059,24 +1069,32 @@ public class DataDescription implements
InsertStmt.DataDesc {
if (mergeType != LoadTask.MergeType.APPEND && isNegative) {
throw new AnalysisException("not support MERGE or DELETE with
NEGATIVE.");
}
- checkLoadPriv(fullDbName);
- analyzeWithoutCheckPriv(fullDbName);
- if (isNegative && mergeType != LoadTask.MergeType.APPEND) {
- throw new AnalysisException("Negative is only used when merge type
is append.");
- }
- isAnalyzed = true;
}
public void analyzeWithoutCheckPriv(String fullDbName) throws
AnalysisException {
+ analyzeFilePaths();
+
+ analyzeLoadAttributes();
+
+ analyzeColumns();
+ analyzeMultiLoadColumns();
+ analyzeSequenceCol(fullDbName);
+
+ if (properties != null) {
+ analyzeProperties();
+ }
+ }
+
+ private void analyzeFilePaths() throws AnalysisException {
if (!isLoadFromTable()) {
if (filePaths == null || filePaths.isEmpty()) {
throw new AnalysisException("No file path in load statement.");
}
- for (int i = 0; i < filePaths.size(); ++i) {
- filePaths.set(i, filePaths.get(i).trim());
- }
+ filePaths.replaceAll(String::trim);
}
+ }
+ private void analyzeLoadAttributes() throws AnalysisException {
if (columnSeparator != null) {
columnSeparator.analyze();
}
@@ -1089,12 +1107,18 @@ public class DataDescription implements
InsertStmt.DataDesc {
partitionNames.analyze(null);
}
- analyzeColumns();
- analyzeMultiLoadColumns();
- analyzeSequenceCol(fullDbName);
-
- if (properties != null) {
- analyzeProperties();
+ // file format
+ // note(tsy): for historical reason, file format here must be string
type rather than TFileFormatType
+ if (fileFormat != null) {
+ if (!fileFormat.equalsIgnoreCase("parquet")
+ && !fileFormat.equalsIgnoreCase(FeConstants.csv)
+ && !fileFormat.equalsIgnoreCase("orc")
+ && !fileFormat.equalsIgnoreCase("json")
+ && !fileFormat.equalsIgnoreCase(FeConstants.csv_with_names)
+ &&
!fileFormat.equalsIgnoreCase(FeConstants.csv_with_names_and_types)
+ && !fileFormat.equalsIgnoreCase("hive_text")) {
+ throw new AnalysisException("File Format Type " + fileFormat +
" is invalid.");
+ }
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
index 0dff4b6caa..2cdc3f1972 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
@@ -21,6 +21,7 @@ import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
+import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.qe.ConnectContext;
@@ -50,6 +51,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Random;
import java.util.function.LongUnaryOperator;
import java.util.function.Predicate;
@@ -534,29 +536,38 @@ public class Util {
@NotNull
- public static TFileFormatType getFileFormatType(String path) {
+ public static TFileFormatType getFileFormatTypeFromPath(String path) {
String lowerCasePath = path.toLowerCase();
- if (lowerCasePath.endsWith(".parquet") ||
lowerCasePath.endsWith(".parq")) {
+ if (lowerCasePath.contains(".parquet") ||
lowerCasePath.contains(".parq")) {
return TFileFormatType.FORMAT_PARQUET;
- } else if (lowerCasePath.endsWith(".gz")) {
- return TFileFormatType.FORMAT_CSV_GZ;
- } else if (lowerCasePath.endsWith(".bz2")) {
- return TFileFormatType.FORMAT_CSV_BZ2;
- } else if (lowerCasePath.endsWith(".lz4")) {
- return TFileFormatType.FORMAT_CSV_LZ4FRAME;
- } else if (lowerCasePath.endsWith(".lzo")) {
- return TFileFormatType.FORMAT_CSV_LZOP;
- } else if (lowerCasePath.endsWith(".lzo_deflate")) {
- return TFileFormatType.FORMAT_CSV_LZO;
- } else if (lowerCasePath.endsWith(".deflate")) {
- return TFileFormatType.FORMAT_CSV_DEFLATE;
- } else if (lowerCasePath.endsWith(".snappy")) {
- return TFileFormatType.FORMAT_CSV_SNAPPYBLOCK;
+ } else if (lowerCasePath.contains(".orc")) {
+ return TFileFormatType.FORMAT_ORC;
+ } else if (lowerCasePath.contains(".json")) {
+ return TFileFormatType.FORMAT_JSON;
} else {
return TFileFormatType.FORMAT_CSV_PLAIN;
}
}
+ public static TFileFormatType getFileFormatTypeFromName(String formatName)
{
+ String lowerFileFormat =
Objects.requireNonNull(formatName).toLowerCase();
+ if (lowerFileFormat.equals("parquet")) {
+ return TFileFormatType.FORMAT_PARQUET;
+ } else if (lowerFileFormat.equals("orc")) {
+ return TFileFormatType.FORMAT_ORC;
+ } else if (lowerFileFormat.equals("json")) {
+ return TFileFormatType.FORMAT_JSON;
+ // csv/csv_with_name/csv_with_names_and_types treat as csv format
+ } else if (lowerFileFormat.equals(FeConstants.csv) ||
lowerFileFormat.equals(FeConstants.csv_with_names)
+ || lowerFileFormat.equals(FeConstants.csv_with_names_and_types)
+ // TODO: Add TEXTFILE to TFileFormatType to Support hive text
file format.
+ || lowerFileFormat.equals(FeConstants.text)) {
+ return TFileFormatType.FORMAT_CSV_PLAIN;
+ } else {
+ return TFileFormatType.FORMAT_UNKNOWN;
+ }
+ }
+
/**
* Infer {@link TFileCompressType} from file name.
*
@@ -585,6 +596,9 @@ public class Util {
}
public static TFileCompressType getFileCompressType(String compressType) {
+ if (Strings.isNullOrEmpty(compressType)) {
+ return TFileCompressType.UNKNOWN;
+ }
final String upperCaseType = compressType.toUpperCase();
return TFileCompressType.valueOf(upperCaseType);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
index 332dcfb049..c193ab8f77 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
@@ -34,7 +34,6 @@ import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
-import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
@@ -214,16 +213,6 @@ public class BrokerFileGroup implements Writable {
escape = dataDescription.getEscape();
fileFormat = dataDescription.getFileFormat();
- if (fileFormat != null) {
- if (!fileFormat.equalsIgnoreCase("parquet") &&
!fileFormat.equalsIgnoreCase(FeConstants.csv)
- && !fileFormat.equalsIgnoreCase("orc")
- && !fileFormat.equalsIgnoreCase("json")
- && !fileFormat.equalsIgnoreCase(FeConstants.csv_with_names)
- &&
!fileFormat.equalsIgnoreCase(FeConstants.csv_with_names_and_types)
- && !fileFormat.equalsIgnoreCase("hive_text")) {
- throw new DdlException("File Format Type " + fileFormat + " is
invalid.");
- }
- }
columnSeparator = dataDescription.getColumnSeparator();
if (columnSeparator == null) {
if (fileFormat != null &&
fileFormat.equalsIgnoreCase("hive_text")) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java
index f626bb71c2..80fd901345 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java
@@ -24,7 +24,6 @@ import org.apache.doris.catalog.FsBroker;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
-import org.apache.doris.common.FeConstants;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.common.util.Util;
@@ -207,6 +206,9 @@ public class FileGroupInfo {
// header_type
TFileFormatType formatType =
formatType(context.fileGroup.getFileFormat(), fileStatus.path);
context.params.setFormatType(formatType);
+ context.params.setCompressType(
+
Util.getOrInferCompressType(context.fileGroup.getCompressType(),
fileStatus.path)
+ );
List<String> columnsFromPath =
BrokerUtil.parseColumnsFromPath(fileStatus.path,
context.fileGroup.getColumnNamesFromPath());
// Assign scan range locations only for broker load.
@@ -293,26 +295,15 @@ public class FileGroupInfo {
}
private TFileFormatType formatType(String fileFormat, String path) throws
UserException {
- if (fileFormat != null) {
- if (fileFormat.equalsIgnoreCase("parquet")) {
- return TFileFormatType.FORMAT_PARQUET;
- } else if (fileFormat.equalsIgnoreCase("orc")) {
- return TFileFormatType.FORMAT_ORC;
- } else if (fileFormat.equalsIgnoreCase("json")) {
- return TFileFormatType.FORMAT_JSON;
- // csv/csv_with_name/csv_with_names_and_types treat as csv
format
- } else if (fileFormat.equalsIgnoreCase(FeConstants.csv) ||
fileFormat.toLowerCase()
- .equals(FeConstants.csv_with_names) ||
fileFormat.toLowerCase()
- .equals(FeConstants.csv_with_names_and_types)
- // TODO: Add TEXTFILE to TFileFormatType to Support hive
text file format.
- || fileFormat.equalsIgnoreCase(FeConstants.text)) {
- return TFileFormatType.FORMAT_CSV_PLAIN;
- } else {
- throw new UserException("Not supported file format: " +
fileFormat);
- }
+ if (fileFormat == null) {
+ // get file format by the file path
+ return Util.getFileFormatTypeFromPath(path);
}
-
- return Util.getFileFormatType(path);
+ TFileFormatType formatType =
Util.getFileFormatTypeFromName(fileFormat);
+ if (formatType == TFileFormatType.FORMAT_UNKNOWN) {
+ throw new UserException("Not supported file format: " +
fileFormat);
+ }
+ return formatType;
}
private TFileRangeDesc createFileRangeDesc(long curFileOffset,
TBrokerFileStatus fileStatus, long rangeBytes,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
index 7f8c85d196..d36958ad88 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
@@ -88,7 +88,7 @@ public class LoadScanProvider {
ctx.timezone = analyzer.getTimezone();
TFileScanRangeParams params = new TFileScanRangeParams();
-
params.setFormatType(formatType(fileGroupInfo.getFileGroup().getFileFormat(),
""));
+
params.setFormatType(formatType(fileGroupInfo.getFileGroup().getFileFormat()));
params.setCompressType(fileGroupInfo.getFileGroup().getCompressType());
params.setStrictMode(fileGroupInfo.isStrictMode());
if (fileGroupInfo.getFileGroup().getFileFormat() != null
@@ -211,7 +211,7 @@ public class LoadScanProvider {
List<Integer> srcSlotIds = Lists.newArrayList();
Load.initColumns(fileGroupInfo.getTargetTable(), columnDescs,
context.fileGroup.getColumnToHadoopFunction(),
context.exprMap, analyzer, context.srcTupleDescriptor,
context.srcSlotDescByName, srcSlotIds,
- formatType(context.fileGroup.getFileFormat(), ""),
fileGroupInfo.getHiddenColumns(),
+ formatType(context.fileGroup.getFileFormat()),
fileGroupInfo.getHiddenColumns(),
fileGroupInfo.isPartialUpdate());
int columnCountFromPath = 0;
@@ -242,28 +242,16 @@ public class LoadScanProvider {
.equalsIgnoreCase(Column.DELETE_SIGN);
}
- private TFileFormatType formatType(String fileFormat, String path) throws
UserException {
- if (fileFormat != null) {
- String lowerFileFormat = fileFormat.toLowerCase();
- if (lowerFileFormat.equals("parquet")) {
- return TFileFormatType.FORMAT_PARQUET;
- } else if (lowerFileFormat.equals("orc")) {
- return TFileFormatType.FORMAT_ORC;
- } else if (lowerFileFormat.equals("json")) {
- return TFileFormatType.FORMAT_JSON;
- // csv/csv_with_name/csv_with_names_and_types treat as csv
format
- } else if (lowerFileFormat.equals(FeConstants.csv) ||
lowerFileFormat.equals(FeConstants.csv_with_names)
- ||
lowerFileFormat.equals(FeConstants.csv_with_names_and_types)
- // TODO: Add TEXTFILE to TFileFormatType to Support hive
text file format.
- || lowerFileFormat.equals(FeConstants.text)) {
- return TFileFormatType.FORMAT_CSV_PLAIN;
- } else {
- throw new UserException("Not supported file format: " +
fileFormat);
- }
- } else {
- // get file format by the suffix of file
- return Util.getFileFormatType(path);
+ private TFileFormatType formatType(String fileFormat) throws UserException
{
+ if (fileFormat == null) {
+ // get file format by the file path
+ return TFileFormatType.FORMAT_CSV_PLAIN;
+ }
+ TFileFormatType formatType =
Util.getFileFormatTypeFromName(fileFormat);
+ if (formatType == TFileFormatType.FORMAT_UNKNOWN) {
+ throw new UserException("Not supported file format: " +
fileFormat);
}
+ return formatType;
}
public TableIf getTargetTable() {
diff --git a/fe/fe-core/src/main/jflex/sql_scanner.flex
b/fe/fe-core/src/main/jflex/sql_scanner.flex
index e6802642dd..9845c2c8df 100644
--- a/fe/fe-core/src/main/jflex/sql_scanner.flex
+++ b/fe/fe-core/src/main/jflex/sql_scanner.flex
@@ -235,6 +235,7 @@ import org.apache.doris.qe.SqlModeHelper;
keywordMap.put("for", new Integer(SqlParserSymbols.KW_FOR));
keywordMap.put("force", new Integer(SqlParserSymbols.KW_FORCE));
keywordMap.put("format", new Integer(SqlParserSymbols.KW_FORMAT));
+ keywordMap.put("compress_type", new
Integer(SqlParserSymbols.KW_COMPRESS_TYPE));
keywordMap.put("free", new Integer(SqlParserSymbols.KW_FREE));
keywordMap.put("from", new Integer(SqlParserSymbols.KW_FROM));
keywordMap.put("frontend", new Integer(SqlParserSymbols.KW_FRONTEND));
diff --git a/regression-test/data/load_p0/broker_load/test_compress_type.out
b/regression-test/data/load_p0/broker_load/test_compress_type.out
new file mode 100644
index 0000000000..e66359296c
--- /dev/null
+++ b/regression-test/data/load_p0/broker_load/test_compress_type.out
@@ -0,0 +1,4 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+240
+
diff --git a/regression-test/suites/load_p0/broker_load/ddl/basic_data.sql
b/regression-test/suites/load_p0/broker_load/ddl/basic_data.sql
new file mode 100644
index 0000000000..41c3660e11
--- /dev/null
+++ b/regression-test/suites/load_p0/broker_load/ddl/basic_data.sql
@@ -0,0 +1,29 @@
+CREATE TABLE basic_data
+(
+ k00 INT NOT NULL,
+ k01 DATE NOT NULL,
+ k02 BOOLEAN NULL,
+ k03 TINYINT NULL,
+ k04 SMALLINT NULL,
+ k05 INT NULL,
+ k06 BIGINT NULL,
+ k07 LARGEINT NULL,
+ k08 FLOAT NULL,
+ k09 DOUBLE NULL,
+ k10 DECIMAL(9,1) NULL,
+ k11 DECIMALV3(9,1) NULL,
+ k12 DATETIME NULL,
+ k13 DATEV2 NULL,
+ k14 DATETIMEV2 NULL,
+ k15 CHAR NULL,
+ k16 VARCHAR NULL,
+ k17 STRING NULL,
+ k18 JSON NULL
+
+)
+DUPLICATE KEY(k00)
+DISTRIBUTED BY HASH(k00) BUCKETS 32
+PROPERTIES (
+ "bloom_filter_columns"="k05",
+ "replication_num" = "1"
+);
\ No newline at end of file
diff --git a/regression-test/suites/load_p0/broker_load/ddl/basic_data_drop.sql
b/regression-test/suites/load_p0/broker_load/ddl/basic_data_drop.sql
new file mode 100644
index 0000000000..c4ad606d9b
--- /dev/null
+++ b/regression-test/suites/load_p0/broker_load/ddl/basic_data_drop.sql
@@ -0,0 +1 @@
+DROP TABLE IF EXISTS basic_data
\ No newline at end of file
diff --git
a/regression-test/suites/load_p0/broker_load/test_compress_type.groovy
b/regression-test/suites/load_p0/broker_load/test_compress_type.groovy
new file mode 100644
index 0000000000..50d4e34cd9
--- /dev/null
+++ b/regression-test/suites/load_p0/broker_load/test_compress_type.groovy
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_compress_type", "load_p0") {
+ def tableName = "basic_data"
+
+ // GZ/LZO/BZ2/LZ4FRAME/DEFLATE/LZOP
+ def compressTypes = [
+ "COMPRESS_TYPE AS \"GZ\"",
+ "COMPRESS_TYPE AS \"BZ2\"",
+ "COMPRESS_TYPE AS \"LZ4FRAME\"",
+ "COMPRESS_TYPE AS \"GZ\"",
+ "COMPRESS_TYPE AS \"BZ2\"",
+ "COMPRESS_TYPE AS \"LZ4FRAME\"",
+ "",
+ "",
+ "",
+ "",
+ "",
+ "",
+ ]
+
+ def fileFormat = [
+ "FORMAT AS \"CSV\"",
+ "FORMAT AS \"CSV\"",
+ "FORMAT AS \"CSV\"",
+ "",
+ "",
+ "",
+ "FORMAT AS \"CSV\"",
+ "FORMAT AS \"CSV\"",
+ "FORMAT AS \"CSV\"",
+ "",
+ "",
+ ""
+ ]
+
+ def paths = [
+
"s3://doris-build-1308700295/regression/load/data/basic_data.csv.gz",
+
"s3://doris-build-1308700295/regression/load/data/basic_data.csv.bz2",
+
"s3://doris-build-1308700295/regression/load/data/basic_data.csv.lz4",
+
"s3://doris-build-1308700295/regression/load/data/basic_data.csv.gz",
+
"s3://doris-build-1308700295/regression/load/data/basic_data.csv.bz2",
+
"s3://doris-build-1308700295/regression/load/data/basic_data.csv.lz4",
+
"s3://doris-build-1308700295/regression/load/data/basic_data.csv.gz",
+
"s3://doris-build-1308700295/regression/load/data/basic_data.csv.bz2",
+
"s3://doris-build-1308700295/regression/load/data/basic_data.csv.lz4",
+
"s3://doris-build-1308700295/regression/load/data/basic_data.csv.gz",
+
"s3://doris-build-1308700295/regression/load/data/basic_data.csv.bz2",
+
"s3://doris-build-1308700295/regression/load/data/basic_data.csv.lz4"
+ ]
+ def labels = []
+
+ def ak = getS3AK()
+ def sk = getS3SK()
+
+
+ def i = 0
+ sql new File("""${ context.file.parent }/ddl/basic_data_drop.sql""").text
+ sql new File("""${ context.file.parent }/ddl/basic_data.sql""").text
+ for (String compressType : compressTypes) {
+ def label = "test_s3_load_compress" +
UUID.randomUUID().toString().replace("-", "0") + i
+ labels.add(label)
+ def format_str = fileFormat[i]
+ def path = paths[i]
+ def sql_str = """
+ LOAD LABEL $label (
+ DATA INFILE("$path")
+ INTO TABLE $tableName
+ COLUMNS TERMINATED BY "|"
+ $format_str
+ $compressType
+ )
+ WITH S3 (
+ "AWS_ACCESS_KEY" = "$ak",
+ "AWS_SECRET_KEY" = "$sk",
+ "AWS_ENDPOINT" = "cos.ap-beijing.myqcloud.com",
+ "AWS_REGION" = "ap-beijing"
+ )
+ properties(
+ "use_new_load_scan_node" = "true"
+ )
+ """
+ logger.info("submit sql: ${sql_str}");
+ sql """${sql_str}"""
+ logger.info("Submit load with lable: $label, table: $tableName, path:
$path")
+ ++i
+ }
+
+ i = 0
+ for (String label in labels) {
+ def max_try_milli_secs = 600000
+ while (max_try_milli_secs > 0) {
+ String[][] result = sql """ show load where label="$label" order
by createtime desc limit 1; """
+ if (result[0][2].equals("FINISHED")) {
+ logger.info("Load FINISHED " + label)
+ break
+ }
+ if (result[0][2].equals("CANCELLED")) {
+ def reason = result[0][7]
+ logger.info("load failed, index: $i, reason:$reason")
+ assertTrue(1 == 2)
+ break;
+ }
+ Thread.sleep(1000)
+ max_try_milli_secs -= 1000
+ if(max_try_milli_secs <= 0) {
+ assertTrue(1 == 2, "load Timeout: $label")
+ }
+ }
+ i++
+ }
+
+ qt_sql """ select count(*) from ${tableName} """
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]