This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f61e6483bf [enhancement](broker-load) support compress type for old 
broker load, and split compress type from file format (#23882)
f61e6483bf is described below

commit f61e6483bf809b6a9f4d7900d87445d2a7b25cbe
Author: Siyang Tang <[email protected]>
AuthorDate: Thu Sep 14 21:42:28 2023 +0800

    [enhancement](broker-load) support compress type for old broker load, and 
split compress type from file format (#23882)
---
 .../Load/BROKER-LOAD.md                            |   4 +
 .../Load/BROKER-LOAD.md                            |   4 +
 fe/fe-core/src/main/cup/sql_parser.cup             |  15 ++-
 .../org/apache/doris/analysis/DataDescription.java |  88 +++++++++-----
 .../java/org/apache/doris/common/util/Util.java    |  46 +++++---
 .../org/apache/doris/load/BrokerFileGroup.java     |  11 --
 .../doris/planner/external/FileGroupInfo.java      |  31 ++---
 .../doris/planner/external/LoadScanProvider.java   |  34 ++----
 fe/fe-core/src/main/jflex/sql_scanner.flex         |   1 +
 .../load_p0/broker_load/test_compress_type.out     |   4 +
 .../suites/load_p0/broker_load/ddl/basic_data.sql  |  29 +++++
 .../load_p0/broker_load/ddl/basic_data_drop.sql    |   1 +
 .../load_p0/broker_load/test_compress_type.groovy  | 129 +++++++++++++++++++++
 13 files changed, 292 insertions(+), 105 deletions(-)

diff --git 
a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md
 
b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md
index dac37e7a15..b6922bc41c 100644
--- 
a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md
+++ 
b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md
@@ -67,6 +67,7 @@ WITH BROKER broker_name
   [COLUMNS TERMINATED BY "column_separator"]
   [LINES TERMINATED BY "line_delimiter"]
   [FORMAT AS "file_type"]
+  [COMPRESS_TYPE AS "compress_type"]
   [(column_list)]
   [COLUMNS FROM PATH AS (c1, c2, ...)]
   [SET (column_mapping)]
@@ -105,6 +106,9 @@ WITH BROKER broker_name
 
     Specifies the file type, CSV, PARQUET and ORC formats are supported. 
Default is CSV.
 
+  - `COMPRESS_TYPE AS`
+    Specifies the file compress type, GZ/LZO/BZ2/LZ4FRAME/DEFLATE/LZOP
+
   - `column list`
 
     Used to specify the column order in the original file. For a detailed 
introduction to this part, please refer to the [Column Mapping, Conversion and 
Filtering](../../../../data-operate/import/import-scenes/load-data-convert.md) 
document.
diff --git 
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md
 
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md
index 962108240f..9e3ecc80ce 100644
--- 
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md
+++ 
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md
@@ -67,6 +67,7 @@ WITH BROKER broker_name
   [COLUMNS TERMINATED BY "column_separator"]
   [LINES TERMINATED BY "line_delimiter"]
   [FORMAT AS "file_type"]
+  [COMPRESS_TYPE AS "compress_type"]
   [(column_list)]
   [COLUMNS FROM PATH AS (c1, c2, ...)]
   [SET (column_mapping)]
@@ -105,6 +106,9 @@ WITH BROKER broker_name
 
     指定文件类型,支持 CSV、PARQUET 和 ORC 格式。默认为 CSV。
 
+  - `COMPRESS_TYPE AS`
+    指定文件压缩类型, 支持GZ/BZ2/LZ4FRAME。
+
   - `column list`
 
     用于指定原始文件中的列顺序。关于这部分详细介绍,可以参阅 
[列的映射,转换与过滤](../../../../data-operate/import/import-scenes/load-data-convert.md)
 文档。
diff --git a/fe/fe-core/src/main/cup/sql_parser.cup 
b/fe/fe-core/src/main/cup/sql_parser.cup
index 50066f9c8c..f0e83f0964 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -304,6 +304,7 @@ terminal String
     KW_COMMITTED,
     KW_COMPACT,
     KW_COMPLETE,
+    KW_COMPRESS_TYPE,
     KW_CONFIG,
     KW_CONNECTION,
     KW_CONNECTION_ID,
@@ -904,8 +905,8 @@ nonterminal GroupByClause group_by_clause, 
grouping_elements;
 //
 nonterminal String keyword, ident, ident_or_text, variable_name,
         charset_name_or_default, old_or_new_charset_name_or_default, 
opt_collate,
-        collation_name_or_default, type_func_name_keyword, type_function_name, 
opt_file_format, time_unit,
-        literal_or_ident;
+        collation_name_or_default, type_func_name_keyword, type_function_name, 
opt_file_format, opt_file_compress_type,
+        time_unit, literal_or_ident;
 nonterminal PassVar text_or_password;
 
 // sync job
@@ -2433,6 +2434,7 @@ data_desc ::=
     opt_field_term:colSep
     opt_line_term:lineDelimiter
     opt_file_format:fileFormat
+    opt_file_compress_type:fileCompressType
     opt_col_list:colList
     opt_columns_from_path:columnsFromPath
     opt_col_mapping_list:colMappingList
@@ -2442,7 +2444,7 @@ data_desc ::=
     sequence_col_clause:sequenceColName
     opt_properties:properties
     {:
-        RESULT = new DataDescription(tableName, partitionNames, files, 
colList, colSep, lineDelimiter, fileFormat,
+        RESULT = new DataDescription(tableName, partitionNames, files, 
colList, colSep, lineDelimiter, fileFormat, fileCompressType,
         columnsFromPath, isNeg, colMappingList, preFilterExpr, whereExpr, 
mergeType, deleteExpr, sequenceColName, properties);
     :}
     | opt_merge_type:mergeType KW_DATA KW_FROM KW_TABLE ident:srcTableName
@@ -2549,6 +2551,13 @@ opt_file_format ::=
     {: RESULT = format; :}
     ;
 
+opt_file_compress_type ::=
+    /* Empty */
+    {: RESULT = null; :}
+    | KW_COMPRESS_TYPE KW_AS ident_or_text:compress_type
+    {: RESULT = compress_type; :}
+    ;
+
 opt_columns_from_path ::=
     /* Empty */
     {: RESULT = null; :}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
index c60cbb76af..fc867847f1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
@@ -28,6 +28,7 @@ import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.util.SqlParserUtils;
 import org.apache.doris.common.util.Util;
@@ -173,22 +174,22 @@ public class DataDescription implements 
InsertStmt.DataDesc {
     }
 
     public DataDescription(String tableName,
-            PartitionNames partitionNames,
-            List<String> filePaths,
-            List<String> columns,
-            Separator columnSeparator,
-            String fileFormat,
-            List<String> columnsFromPath,
-            boolean isNegative,
-            List<Expr> columnMappingList,
-            Expr fileFilterExpr,
-            Expr whereExpr,
-            LoadTask.MergeType mergeType,
-            Expr deleteCondition,
-            String sequenceColName,
-            Map<String, String> properties) {
+                           PartitionNames partitionNames,
+                           List<String> filePaths,
+                           List<String> columns,
+                           Separator columnSeparator,
+                           String fileFormat,
+                           List<String> columnsFromPath,
+                           boolean isNegative,
+                           List<Expr> columnMappingList,
+                           Expr fileFilterExpr,
+                           Expr whereExpr,
+                           LoadTask.MergeType mergeType,
+                           Expr deleteCondition,
+                           String sequenceColName,
+                           Map<String, String> properties) {
         this(tableName, partitionNames, filePaths, columns, columnSeparator, 
null,
-                fileFormat, columnsFromPath, isNegative, columnMappingList, 
fileFilterExpr, whereExpr,
+                fileFormat, null, columnsFromPath, isNegative, 
columnMappingList, fileFilterExpr, whereExpr,
                 mergeType, deleteCondition, sequenceColName, properties);
     }
 
@@ -199,6 +200,7 @@ public class DataDescription implements InsertStmt.DataDesc 
{
                            Separator columnSeparator,
                            Separator lineDelimiter,
                            String fileFormat,
+                           String compressType,
                            List<String> columnsFromPath,
                            boolean isNegative,
                            List<Expr> columnMappingList,
@@ -215,6 +217,7 @@ public class DataDescription implements InsertStmt.DataDesc 
{
         this.columnSeparator = columnSeparator;
         this.lineDelimiter = lineDelimiter;
         this.fileFormat = fileFormat;
+        this.compressType = Util.getFileCompressType(compressType);
         this.columnsFromPath = columnsFromPath;
         this.isNegative = isNegative;
         this.columnMappingList = columnMappingList;
@@ -362,8 +365,8 @@ public class DataDescription implements InsertStmt.DataDesc 
{
     }
 
     public static void validateMappingFunction(String functionName, 
List<String> args,
-            Map<String, String> columnNameMap,
-            Column mappingColumn, boolean isHadoopLoad) throws 
AnalysisException {
+                                               Map<String, String> 
columnNameMap,
+                                               Column mappingColumn, boolean 
isHadoopLoad) throws AnalysisException {
         if (functionName.equalsIgnoreCase("alignment_timestamp")) {
             validateAlignmentTimestamp(args, columnNameMap);
         } else if (functionName.equalsIgnoreCase("strftime")) {
@@ -1050,6 +1053,13 @@ public class DataDescription implements 
InsertStmt.DataDesc {
         if (isAnalyzed) {
             return;
         }
+        checkLoadPriv(fullDbName);
+        checkMergeType();
+        analyzeWithoutCheckPriv(fullDbName);
+        isAnalyzed = true;
+    }
+
+    private void checkMergeType() throws AnalysisException {
         if (mergeType != LoadTask.MergeType.MERGE && deleteCondition != null) {
             throw new AnalysisException("not support DELETE ON clause when 
merge type is not MERGE.");
         }
@@ -1059,24 +1069,32 @@ public class DataDescription implements 
InsertStmt.DataDesc {
         if (mergeType != LoadTask.MergeType.APPEND && isNegative) {
             throw new AnalysisException("not support MERGE or DELETE with 
NEGATIVE.");
         }
-        checkLoadPriv(fullDbName);
-        analyzeWithoutCheckPriv(fullDbName);
-        if (isNegative && mergeType != LoadTask.MergeType.APPEND) {
-            throw new AnalysisException("Negative is only used when merge type 
is append.");
-        }
-        isAnalyzed = true;
     }
 
     public void analyzeWithoutCheckPriv(String fullDbName) throws 
AnalysisException {
+        analyzeFilePaths();
+
+        analyzeLoadAttributes();
+
+        analyzeColumns();
+        analyzeMultiLoadColumns();
+        analyzeSequenceCol(fullDbName);
+
+        if (properties != null) {
+            analyzeProperties();
+        }
+    }
+
+    private void analyzeFilePaths() throws AnalysisException {
         if (!isLoadFromTable()) {
             if (filePaths == null || filePaths.isEmpty()) {
                 throw new AnalysisException("No file path in load statement.");
             }
-            for (int i = 0; i < filePaths.size(); ++i) {
-                filePaths.set(i, filePaths.get(i).trim());
-            }
+            filePaths.replaceAll(String::trim);
         }
+    }
 
+    private void analyzeLoadAttributes() throws AnalysisException {
         if (columnSeparator != null) {
             columnSeparator.analyze();
         }
@@ -1089,12 +1107,18 @@ public class DataDescription implements 
InsertStmt.DataDesc {
             partitionNames.analyze(null);
         }
 
-        analyzeColumns();
-        analyzeMultiLoadColumns();
-        analyzeSequenceCol(fullDbName);
-
-        if (properties != null) {
-            analyzeProperties();
+        // file format
+        // note(tsy): for historical reason, file format here must be string 
type rather than TFileFormatType
+        if (fileFormat != null) {
+            if (!fileFormat.equalsIgnoreCase("parquet")
+                    && !fileFormat.equalsIgnoreCase(FeConstants.csv)
+                    && !fileFormat.equalsIgnoreCase("orc")
+                    && !fileFormat.equalsIgnoreCase("json")
+                    && !fileFormat.equalsIgnoreCase(FeConstants.csv_with_names)
+                    && 
!fileFormat.equalsIgnoreCase(FeConstants.csv_with_names_and_types)
+                    && !fileFormat.equalsIgnoreCase("hive_text")) {
+                throw new AnalysisException("File Format Type " + fileFormat + 
" is invalid.");
+            }
         }
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
index 0dff4b6caa..2cdc3f1972 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
@@ -21,6 +21,7 @@ import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.PrimitiveType;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.FeNameFormat;
 import org.apache.doris.datasource.InternalCatalog;
 import org.apache.doris.qe.ConnectContext;
@@ -50,6 +51,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Random;
 import java.util.function.LongUnaryOperator;
 import java.util.function.Predicate;
@@ -534,29 +536,38 @@ public class Util {
 
 
     @NotNull
-    public static TFileFormatType getFileFormatType(String path) {
+    public static TFileFormatType getFileFormatTypeFromPath(String path) {
         String lowerCasePath = path.toLowerCase();
-        if (lowerCasePath.endsWith(".parquet") || 
lowerCasePath.endsWith(".parq")) {
+        if (lowerCasePath.contains(".parquet") || 
lowerCasePath.contains(".parq")) {
             return TFileFormatType.FORMAT_PARQUET;
-        } else if (lowerCasePath.endsWith(".gz")) {
-            return TFileFormatType.FORMAT_CSV_GZ;
-        } else if (lowerCasePath.endsWith(".bz2")) {
-            return TFileFormatType.FORMAT_CSV_BZ2;
-        } else if (lowerCasePath.endsWith(".lz4")) {
-            return TFileFormatType.FORMAT_CSV_LZ4FRAME;
-        } else if (lowerCasePath.endsWith(".lzo")) {
-            return TFileFormatType.FORMAT_CSV_LZOP;
-        } else if (lowerCasePath.endsWith(".lzo_deflate")) {
-            return TFileFormatType.FORMAT_CSV_LZO;
-        } else if (lowerCasePath.endsWith(".deflate")) {
-            return TFileFormatType.FORMAT_CSV_DEFLATE;
-        } else if (lowerCasePath.endsWith(".snappy")) {
-            return TFileFormatType.FORMAT_CSV_SNAPPYBLOCK;
+        } else if (lowerCasePath.contains(".orc")) {
+            return TFileFormatType.FORMAT_ORC;
+        } else if (lowerCasePath.contains(".json")) {
+            return TFileFormatType.FORMAT_JSON;
         } else {
             return TFileFormatType.FORMAT_CSV_PLAIN;
         }
     }
 
+    public static TFileFormatType getFileFormatTypeFromName(String formatName) 
{
+        String lowerFileFormat = 
Objects.requireNonNull(formatName).toLowerCase();
+        if (lowerFileFormat.equals("parquet")) {
+            return TFileFormatType.FORMAT_PARQUET;
+        } else if (lowerFileFormat.equals("orc")) {
+            return TFileFormatType.FORMAT_ORC;
+        } else if (lowerFileFormat.equals("json")) {
+            return TFileFormatType.FORMAT_JSON;
+            // csv/csv_with_name/csv_with_names_and_types treat as csv format
+        } else if (lowerFileFormat.equals(FeConstants.csv) || 
lowerFileFormat.equals(FeConstants.csv_with_names)
+                || lowerFileFormat.equals(FeConstants.csv_with_names_and_types)
+                // TODO: Add TEXTFILE to TFileFormatType to Support hive text 
file format.
+                || lowerFileFormat.equals(FeConstants.text)) {
+            return TFileFormatType.FORMAT_CSV_PLAIN;
+        } else {
+            return TFileFormatType.FORMAT_UNKNOWN;
+        }
+    }
+
     /**
      * Infer {@link TFileCompressType} from file name.
      *
@@ -585,6 +596,9 @@ public class Util {
     }
 
     public static TFileCompressType getFileCompressType(String compressType) {
+        if (Strings.isNullOrEmpty(compressType)) {
+            return TFileCompressType.UNKNOWN;
+        }
         final String upperCaseType = compressType.toUpperCase();
         return TFileCompressType.valueOf(upperCaseType);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
index 332dcfb049..c193ab8f77 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
@@ -34,7 +34,6 @@ import org.apache.doris.catalog.Partition;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.DdlException;
-import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
@@ -214,16 +213,6 @@ public class BrokerFileGroup implements Writable {
         escape = dataDescription.getEscape();
 
         fileFormat = dataDescription.getFileFormat();
-        if (fileFormat != null) {
-            if (!fileFormat.equalsIgnoreCase("parquet") && 
!fileFormat.equalsIgnoreCase(FeConstants.csv)
-                    && !fileFormat.equalsIgnoreCase("orc")
-                    && !fileFormat.equalsIgnoreCase("json")
-                    && !fileFormat.equalsIgnoreCase(FeConstants.csv_with_names)
-                    && 
!fileFormat.equalsIgnoreCase(FeConstants.csv_with_names_and_types)
-                    && !fileFormat.equalsIgnoreCase("hive_text")) {
-                throw new DdlException("File Format Type " + fileFormat + " is 
invalid.");
-            }
-        }
         columnSeparator = dataDescription.getColumnSeparator();
         if (columnSeparator == null) {
             if (fileFormat != null && 
fileFormat.equalsIgnoreCase("hive_text")) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java
index f626bb71c2..80fd901345 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java
@@ -24,7 +24,6 @@ import org.apache.doris.catalog.FsBroker;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
-import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.BrokerUtil;
 import org.apache.doris.common.util.Util;
@@ -207,6 +206,9 @@ public class FileGroupInfo {
             // header_type
             TFileFormatType formatType = 
formatType(context.fileGroup.getFileFormat(), fileStatus.path);
             context.params.setFormatType(formatType);
+            context.params.setCompressType(
+                    
Util.getOrInferCompressType(context.fileGroup.getCompressType(), 
fileStatus.path)
+            );
             List<String> columnsFromPath = 
BrokerUtil.parseColumnsFromPath(fileStatus.path,
                     context.fileGroup.getColumnNamesFromPath());
             // Assign scan range locations only for broker load.
@@ -293,26 +295,15 @@ public class FileGroupInfo {
     }
 
     private TFileFormatType formatType(String fileFormat, String path) throws 
UserException {
-        if (fileFormat != null) {
-            if (fileFormat.equalsIgnoreCase("parquet")) {
-                return TFileFormatType.FORMAT_PARQUET;
-            } else if (fileFormat.equalsIgnoreCase("orc")) {
-                return TFileFormatType.FORMAT_ORC;
-            } else if (fileFormat.equalsIgnoreCase("json")) {
-                return TFileFormatType.FORMAT_JSON;
-                // csv/csv_with_name/csv_with_names_and_types treat as csv 
format
-            } else if (fileFormat.equalsIgnoreCase(FeConstants.csv) || 
fileFormat.toLowerCase()
-                    .equals(FeConstants.csv_with_names) || 
fileFormat.toLowerCase()
-                    .equals(FeConstants.csv_with_names_and_types)
-                    // TODO: Add TEXTFILE to TFileFormatType to Support hive 
text file format.
-                    || fileFormat.equalsIgnoreCase(FeConstants.text)) {
-                return TFileFormatType.FORMAT_CSV_PLAIN;
-            } else {
-                throw new UserException("Not supported file format: " + 
fileFormat);
-            }
+        if (fileFormat == null) {
+            // get file format by the file path
+            return Util.getFileFormatTypeFromPath(path);
         }
-
-        return Util.getFileFormatType(path);
+        TFileFormatType formatType = 
Util.getFileFormatTypeFromName(fileFormat);
+        if (formatType == TFileFormatType.FORMAT_UNKNOWN) {
+            throw new UserException("Not supported file format: " + 
fileFormat);
+        }
+        return formatType;
     }
 
     private TFileRangeDesc createFileRangeDesc(long curFileOffset, 
TBrokerFileStatus fileStatus, long rangeBytes,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
index 7f8c85d196..d36958ad88 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
@@ -88,7 +88,7 @@ public class LoadScanProvider {
         ctx.timezone = analyzer.getTimezone();
 
         TFileScanRangeParams params = new TFileScanRangeParams();
-        
params.setFormatType(formatType(fileGroupInfo.getFileGroup().getFileFormat(), 
""));
+        
params.setFormatType(formatType(fileGroupInfo.getFileGroup().getFileFormat()));
         params.setCompressType(fileGroupInfo.getFileGroup().getCompressType());
         params.setStrictMode(fileGroupInfo.isStrictMode());
         if (fileGroupInfo.getFileGroup().getFileFormat() != null
@@ -211,7 +211,7 @@ public class LoadScanProvider {
         List<Integer> srcSlotIds = Lists.newArrayList();
         Load.initColumns(fileGroupInfo.getTargetTable(), columnDescs, 
context.fileGroup.getColumnToHadoopFunction(),
                 context.exprMap, analyzer, context.srcTupleDescriptor, 
context.srcSlotDescByName, srcSlotIds,
-                formatType(context.fileGroup.getFileFormat(), ""), 
fileGroupInfo.getHiddenColumns(),
+                formatType(context.fileGroup.getFileFormat()), 
fileGroupInfo.getHiddenColumns(),
                 fileGroupInfo.isPartialUpdate());
 
         int columnCountFromPath = 0;
@@ -242,28 +242,16 @@ public class LoadScanProvider {
                 .equalsIgnoreCase(Column.DELETE_SIGN);
     }
 
-    private TFileFormatType formatType(String fileFormat, String path) throws 
UserException {
-        if (fileFormat != null) {
-            String lowerFileFormat = fileFormat.toLowerCase();
-            if (lowerFileFormat.equals("parquet")) {
-                return TFileFormatType.FORMAT_PARQUET;
-            } else if (lowerFileFormat.equals("orc")) {
-                return TFileFormatType.FORMAT_ORC;
-            } else if (lowerFileFormat.equals("json")) {
-                return TFileFormatType.FORMAT_JSON;
-                // csv/csv_with_name/csv_with_names_and_types treat as csv 
format
-            } else if (lowerFileFormat.equals(FeConstants.csv) || 
lowerFileFormat.equals(FeConstants.csv_with_names)
-                    || 
lowerFileFormat.equals(FeConstants.csv_with_names_and_types)
-                    // TODO: Add TEXTFILE to TFileFormatType to Support hive 
text file format.
-                    || lowerFileFormat.equals(FeConstants.text)) {
-                return TFileFormatType.FORMAT_CSV_PLAIN;
-            } else {
-                throw new UserException("Not supported file format: " + 
fileFormat);
-            }
-        } else {
-            // get file format by the suffix of file
-            return Util.getFileFormatType(path);
+    private TFileFormatType formatType(String fileFormat) throws UserException 
{
+        if (fileFormat == null) {
+            // get file format by the file path
+            return TFileFormatType.FORMAT_CSV_PLAIN;
+        }
+        TFileFormatType formatType = 
Util.getFileFormatTypeFromName(fileFormat);
+        if (formatType == TFileFormatType.FORMAT_UNKNOWN) {
+            throw new UserException("Not supported file format: " + 
fileFormat);
         }
+        return formatType;
     }
 
     public TableIf getTargetTable() {
diff --git a/fe/fe-core/src/main/jflex/sql_scanner.flex 
b/fe/fe-core/src/main/jflex/sql_scanner.flex
index e6802642dd..9845c2c8df 100644
--- a/fe/fe-core/src/main/jflex/sql_scanner.flex
+++ b/fe/fe-core/src/main/jflex/sql_scanner.flex
@@ -235,6 +235,7 @@ import org.apache.doris.qe.SqlModeHelper;
         keywordMap.put("for", new Integer(SqlParserSymbols.KW_FOR));
         keywordMap.put("force", new Integer(SqlParserSymbols.KW_FORCE));
         keywordMap.put("format", new Integer(SqlParserSymbols.KW_FORMAT));
+        keywordMap.put("compress_type", new 
Integer(SqlParserSymbols.KW_COMPRESS_TYPE));
         keywordMap.put("free", new Integer(SqlParserSymbols.KW_FREE));
         keywordMap.put("from", new Integer(SqlParserSymbols.KW_FROM));
         keywordMap.put("frontend", new Integer(SqlParserSymbols.KW_FRONTEND));
diff --git a/regression-test/data/load_p0/broker_load/test_compress_type.out 
b/regression-test/data/load_p0/broker_load/test_compress_type.out
new file mode 100644
index 0000000000..e66359296c
--- /dev/null
+++ b/regression-test/data/load_p0/broker_load/test_compress_type.out
@@ -0,0 +1,4 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+240
+
diff --git a/regression-test/suites/load_p0/broker_load/ddl/basic_data.sql 
b/regression-test/suites/load_p0/broker_load/ddl/basic_data.sql
new file mode 100644
index 0000000000..41c3660e11
--- /dev/null
+++ b/regression-test/suites/load_p0/broker_load/ddl/basic_data.sql
@@ -0,0 +1,29 @@
+CREATE TABLE basic_data
+(
+    k00 INT             NOT NULL,
+    k01 DATE            NOT NULL,
+    k02 BOOLEAN         NULL,
+    k03 TINYINT         NULL,
+    k04 SMALLINT        NULL,
+    k05 INT             NULL,
+    k06 BIGINT          NULL,
+    k07 LARGEINT        NULL,
+    k08 FLOAT           NULL,
+    k09 DOUBLE          NULL,
+    k10 DECIMAL(9,1)    NULL,
+    k11 DECIMALV3(9,1)  NULL,
+    k12 DATETIME        NULL,
+    k13 DATEV2          NULL,
+    k14 DATETIMEV2      NULL,
+    k15 CHAR            NULL,
+    k16 VARCHAR         NULL,
+    k17 STRING          NULL,
+    k18 JSON            NULL
+
+)
+DUPLICATE KEY(k00)
+DISTRIBUTED BY HASH(k00) BUCKETS 32
+PROPERTIES (
+    "bloom_filter_columns"="k05",
+    "replication_num" = "1"
+);
\ No newline at end of file
diff --git a/regression-test/suites/load_p0/broker_load/ddl/basic_data_drop.sql 
b/regression-test/suites/load_p0/broker_load/ddl/basic_data_drop.sql
new file mode 100644
index 0000000000..c4ad606d9b
--- /dev/null
+++ b/regression-test/suites/load_p0/broker_load/ddl/basic_data_drop.sql
@@ -0,0 +1 @@
+DROP TABLE IF EXISTS basic_data
\ No newline at end of file
diff --git 
a/regression-test/suites/load_p0/broker_load/test_compress_type.groovy 
b/regression-test/suites/load_p0/broker_load/test_compress_type.groovy
new file mode 100644
index 0000000000..50d4e34cd9
--- /dev/null
+++ b/regression-test/suites/load_p0/broker_load/test_compress_type.groovy
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_compress_type", "load_p0") {
+    def tableName = "basic_data"
+
+    // GZ/LZO/BZ2/LZ4FRAME/DEFLATE/LZOP
+    def compressTypes = [
+            "COMPRESS_TYPE AS \"GZ\"",
+            "COMPRESS_TYPE AS \"BZ2\"",
+            "COMPRESS_TYPE AS \"LZ4FRAME\"",
+            "COMPRESS_TYPE AS \"GZ\"",
+            "COMPRESS_TYPE AS \"BZ2\"",
+            "COMPRESS_TYPE AS \"LZ4FRAME\"",
+            "",
+            "",
+            "",
+            "",
+            "",
+            "",
+    ]
+
+    def fileFormat = [
+            "FORMAT AS \"CSV\"",
+            "FORMAT AS \"CSV\"",
+            "FORMAT AS \"CSV\"",
+            "",
+            "",
+            "",
+            "FORMAT AS \"CSV\"",
+            "FORMAT AS \"CSV\"",
+            "FORMAT AS \"CSV\"",
+            "",
+            "",
+            ""
+    ]
+
+    def paths = [
+            
"s3://doris-build-1308700295/regression/load/data/basic_data.csv.gz",
+            
"s3://doris-build-1308700295/regression/load/data/basic_data.csv.bz2",
+            
"s3://doris-build-1308700295/regression/load/data/basic_data.csv.lz4",
+            
"s3://doris-build-1308700295/regression/load/data/basic_data.csv.gz",
+            
"s3://doris-build-1308700295/regression/load/data/basic_data.csv.bz2",
+            
"s3://doris-build-1308700295/regression/load/data/basic_data.csv.lz4",
+            
"s3://doris-build-1308700295/regression/load/data/basic_data.csv.gz",
+            
"s3://doris-build-1308700295/regression/load/data/basic_data.csv.bz2",
+            
"s3://doris-build-1308700295/regression/load/data/basic_data.csv.lz4",
+            
"s3://doris-build-1308700295/regression/load/data/basic_data.csv.gz",
+            
"s3://doris-build-1308700295/regression/load/data/basic_data.csv.bz2",
+            
"s3://doris-build-1308700295/regression/load/data/basic_data.csv.lz4"
+    ]
+    def labels = []
+
+    def ak = getS3AK()
+    def sk = getS3SK()
+
+
+    def i = 0
+    sql new File("""${ context.file.parent }/ddl/basic_data_drop.sql""").text
+    sql new File("""${ context.file.parent }/ddl/basic_data.sql""").text
+    for (String compressType : compressTypes) {
+        def label = "test_s3_load_compress" + 
UUID.randomUUID().toString().replace("-", "0") + i
+        labels.add(label)
+        def format_str = fileFormat[i]
+        def path = paths[i]
+        def sql_str = """
+            LOAD LABEL $label (
+                DATA INFILE("$path")
+                INTO TABLE $tableName
+                COLUMNS TERMINATED BY "|"
+                $format_str
+                $compressType
+            )
+            WITH S3 (
+                "AWS_ACCESS_KEY" = "$ak",
+                "AWS_SECRET_KEY" = "$sk",
+                "AWS_ENDPOINT" = "cos.ap-beijing.myqcloud.com",
+                "AWS_REGION" = "ap-beijing"
+            )
+            properties(
+                "use_new_load_scan_node" = "true"
+            )
+            """
+        logger.info("submit sql: ${sql_str}");
+        sql """${sql_str}"""
+        logger.info("Submit load with lable: $label, table: $tableName, path: 
$path")
+        ++i
+    }
+
+    i = 0
+    for (String label in labels) {
+        def max_try_milli_secs = 600000
+        while (max_try_milli_secs > 0) {
+            String[][] result = sql """ show load where label="$label" order 
by createtime desc limit 1; """
+            if (result[0][2].equals("FINISHED")) {
+                logger.info("Load FINISHED " + label)
+                break
+            }
+            if (result[0][2].equals("CANCELLED")) {
+                def reason = result[0][7]
+                logger.info("load failed, index: $i, reason:$reason")
+                assertTrue(1 == 2)
+                break;
+            }
+            Thread.sleep(1000)
+            max_try_milli_secs -= 1000
+            if(max_try_milli_secs <= 0) {
+                assertTrue(1 == 2, "load Timeout: $label")
+            }
+        }
+        i++
+    }
+
+    qt_sql """ select count(*) from ${tableName} """
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to