This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 36eeb107125 [opt](tvf) refine the class of
ExternalFileTableValuedFunction (#24706) (#25384)
36eeb107125 is described below
commit 36eeb1071250eac6074e9dd423b0a5ad3e69548f
Author: Mingyu Chen <[email protected]>
AuthorDate: Tue Oct 17 14:29:02 2023 +0800
[opt](tvf) refine the class of ExternalFileTableValuedFunction (#24706)
(#25384)
backport #24706
---
.../org/apache/doris/analysis/DataDescription.java | 16 +-
.../org/apache/doris/analysis/OutFileClause.java | 6 +-
.../org/apache/doris/analysis/S3TvfLoadStmt.java | 11 +-
.../org/apache/doris/analysis/StorageBackend.java | 42 ++++
.../java/org/apache/doris/common/FeConstants.java | 11 +-
.../doris/common/util/FileFormatConstants.java | 57 ++++++
.../apache/doris/common/util/FileFormatUtils.java | 108 ++++++++++
.../java/org/apache/doris/common/util/Util.java | 14 +-
.../org/apache/doris/planner/ResultFileSink.java | 6 +-
.../doris/planner/external/FileQueryScanNode.java | 11 +-
.../doris/planner/external/LoadScanProvider.java | 6 +-
.../apache/doris/planner/external/TVFScanNode.java | 4 -
.../ExternalFileTableValuedFunction.java | 226 +++++++--------------
.../tablefunction/HdfsTableValuedFunction.java | 65 +++---
.../HttpStreamTableValuedFunction.java | 25 +--
.../tablefunction/LocalTableValuedFunction.java | 39 ++--
.../doris/tablefunction/S3TableValuedFunction.java | 133 ++++++------
.../ExternalFileTableValuedFunctionTest.java | 10 +-
.../export_p2/test_export_max_file_size.groovy | 4 +-
.../suites/export_p2/test_export_with_hdfs.groovy | 2 +-
.../test_outfile_orc_max_file_size.groovy | 1 -
.../hive/test_different_parquet_types.groovy | 16 +-
.../external_table_p0/tvf/test_hdfs_tvf.groovy | 27 +--
.../tvf/test_hdfs_tvf_compression.groovy | 13 --
.../tvf/test_path_partition_keys.groovy | 5 -
.../tvf/test_s3_tvf_compression.groovy | 6 +
.../external_table_p2/tvf/test_tvf_p2.groovy | 18 +-
.../tvf/test_tvf_view_count_p2.groovy | 3 +-
.../external_table_p2/tvf/test_tvf_view_p2.groovy | 6 +-
29 files changed, 463 insertions(+), 428 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
index fc867847f18..618c80df5c0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
@@ -28,8 +28,8 @@ import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
-import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
+import org.apache.doris.common.util.FileFormatConstants;
import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.load.loadv2.LoadTask;
@@ -1110,13 +1110,13 @@ public class DataDescription implements
InsertStmt.DataDesc {
// file format
// note(tsy): for historical reason, file format here must be string
type rather than TFileFormatType
if (fileFormat != null) {
- if (!fileFormat.equalsIgnoreCase("parquet")
- && !fileFormat.equalsIgnoreCase(FeConstants.csv)
- && !fileFormat.equalsIgnoreCase("orc")
- && !fileFormat.equalsIgnoreCase("json")
- && !fileFormat.equalsIgnoreCase(FeConstants.csv_with_names)
- &&
!fileFormat.equalsIgnoreCase(FeConstants.csv_with_names_and_types)
- && !fileFormat.equalsIgnoreCase("hive_text")) {
+ if
(!fileFormat.equalsIgnoreCase(FileFormatConstants.FORMAT_PARQUET)
+ &&
!fileFormat.equalsIgnoreCase(FileFormatConstants.FORMAT_CSV)
+ &&
!fileFormat.equalsIgnoreCase(FileFormatConstants.FORMAT_CSV_WITH_NAMES)
+ &&
!fileFormat.equalsIgnoreCase(FileFormatConstants.FORMAT_CSV_WITH_NAMES_AND_TYPES)
+ &&
!fileFormat.equalsIgnoreCase(FileFormatConstants.FORMAT_ORC)
+ &&
!fileFormat.equalsIgnoreCase(FileFormatConstants.FORMAT_JSON)
+ &&
!fileFormat.equalsIgnoreCase(FileFormatConstants.FORMAT_HIVE_TEXT)) {
throw new AnalysisException("File Format Type " + fileFormat +
" is invalid.");
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
index 742e1636fdc..7d5c4433564 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
@@ -23,10 +23,10 @@ import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
-import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.FileFormatConstants;
import org.apache.doris.common.util.ParseUtil;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.common.util.Util;
@@ -242,11 +242,11 @@ public class OutFileClause {
fileFormatType = TFileFormatType.FORMAT_ORC;
break;
case "csv_with_names":
- headerType = FeConstants.csv_with_names;
+ headerType = FileFormatConstants.FORMAT_CSV_WITH_NAMES;
fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN;
break;
case "csv_with_names_and_types":
- headerType = FeConstants.csv_with_names_and_types;
+ headerType =
FileFormatConstants.FORMAT_CSV_WITH_NAMES_AND_TYPES;
fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN;
break;
default:
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/S3TvfLoadStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/S3TvfLoadStmt.java
index a6f864a8c06..ac83f26d49b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/S3TvfLoadStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/S3TvfLoadStmt.java
@@ -24,9 +24,9 @@ import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.FileFormatConstants;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.datasource.property.constants.S3Properties.Env;
-import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
import org.apache.doris.tablefunction.S3TableValuedFunction;
import com.google.common.annotations.VisibleForTesting;
@@ -145,7 +145,7 @@ public class S3TvfLoadStmt extends NativeInsertStmt {
final List<String> filePaths = dataDescription.getFilePaths();
Preconditions.checkState(filePaths.size() == 1, "there should be only
one file path");
final String s3FilePath = filePaths.get(0);
- params.put(S3TableValuedFunction.S3_URI, s3FilePath);
+ params.put(S3TableValuedFunction.PROP_URI, s3FilePath);
final Map<String, String> dataDescProp =
dataDescription.getProperties();
if (dataDescProp != null) {
@@ -153,7 +153,8 @@ public class S3TvfLoadStmt extends NativeInsertStmt {
}
final String format =
Optional.ofNullable(dataDescription.getFileFormat()).orElse(DEFAULT_FORMAT);
- params.put(ExternalFileTableValuedFunction.FORMAT, format);
+ params.put(FileFormatConstants.PROP_FORMAT, format);
+
if (isCsvFormat(format)) {
parseSeparator(dataDescription.getColumnSeparatorObj(), params);
parseSeparator(dataDescription.getLineDelimiterObj(), params);
@@ -161,7 +162,7 @@ public class S3TvfLoadStmt extends NativeInsertStmt {
List<String> columnsFromPath = dataDescription.getColumnsFromPath();
if (columnsFromPath != null) {
- params.put(ExternalFileTableValuedFunction.PATH_PARTITION_KEYS,
+ params.put(FileFormatConstants.PROP_PATH_PARTITION_KEYS,
String.join(",", columnsFromPath));
}
@@ -189,7 +190,7 @@ public class S3TvfLoadStmt extends NativeInsertStmt {
} catch (AnalysisException e) {
throw new DdlException(String.format("failed to parse
separator:%s", separator), e);
}
- tvfParams.put(ExternalFileTableValuedFunction.COLUMN_SEPARATOR,
separator.getSeparator());
+ tvfParams.put(FileFormatConstants.PROP_COLUMN_SEPARATOR,
separator.getSeparator());
}
private static boolean isCsvFormat(String format) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java
index 5d6c33c45e7..5d069ea4b23 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java
@@ -22,6 +22,7 @@ import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.NotImplementedException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.PrintableMap;
+import org.apache.doris.common.util.URI;
import org.apache.doris.datasource.property.constants.BosProperties;
import org.apache.doris.thrift.TStorageBackendType;
@@ -34,6 +35,47 @@ public class StorageBackend implements ParseNode {
private String location;
private StorageDesc storageDesc;
+ public static void checkPath(String path, StorageBackend.StorageType type)
throws AnalysisException {
+ if (Strings.isNullOrEmpty(path)) {
+ throw new AnalysisException("No destination path specified.");
+ }
+ checkUri(URI.create(path), type);
+ }
+
+ public static void checkUri(URI uri, StorageBackend.StorageType type)
throws AnalysisException {
+ String schema = uri.getScheme();
+ if (schema == null) {
+ throw new AnalysisException(
+ "Invalid export path, there is no schema of URI found.
please check your path.");
+ }
+ if (type == StorageBackend.StorageType.BROKER) {
+ if (!schema.equalsIgnoreCase("bos")
+ && !schema.equalsIgnoreCase("afs")
+ && !schema.equalsIgnoreCase("hdfs")
+ && !schema.equalsIgnoreCase("viewfs")
+ && !schema.equalsIgnoreCase("ofs")
+ && !schema.equalsIgnoreCase("obs")
+ && !schema.equalsIgnoreCase("oss")
+ && !schema.equalsIgnoreCase("s3a")
+ && !schema.equalsIgnoreCase("cosn")
+ && !schema.equalsIgnoreCase("gfs")
+ && !schema.equalsIgnoreCase("jfs")
+ && !schema.equalsIgnoreCase("gs")) {
+ throw new AnalysisException("Invalid broker path. please use
valid 'hdfs://', 'viewfs://', 'afs://',"
+ + " 'bos://', 'ofs://', 'obs://', 'oss://', 's3a://',
'cosn://', 'gfs://', 'gs://'"
+ + " or 'jfs://' path.");
+ }
+ } else if (type == StorageBackend.StorageType.S3 &&
!schema.equalsIgnoreCase("s3")) {
+ throw new AnalysisException("Invalid export path. please use valid
's3://' path.");
+ } else if (type == StorageBackend.StorageType.HDFS &&
!schema.equalsIgnoreCase("hdfs")
+ && !schema.equalsIgnoreCase("viewfs")) {
+ throw new AnalysisException("Invalid export path. please use valid
'HDFS://' or 'viewfs://' path.");
+ } else if (type == StorageBackend.StorageType.LOCAL &&
!schema.equalsIgnoreCase("file")) {
+ throw new AnalysisException(
+ "Invalid export path. please use valid '" +
OutFileClause.LOCAL_FILE_PREFIX + "' path.");
+ }
+ }
+
public StorageBackend(String storageName, String location,
StorageType storageType, Map<String, String> properties) {
this.storageDesc = new StorageDesc(storageName, storageType,
properties);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
index 0845d593e24..487d50283d9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
@@ -20,9 +20,6 @@ package org.apache.doris.common;
import org.apache.doris.persist.meta.FeMetaFormat;
public class FeConstants {
- // Database and table's default configurations, we will never change them
- public static short default_replication_num = 3;
-
// The default value of bucket setting && auto bucket without
estimate_partition_size
public static int default_bucket_num = 10;
@@ -39,7 +36,6 @@ public class FeConstants {
public static int heartbeat_interval_second = 5;
public static int checkpoint_interval_second = 60; // 1 minutes
- public static int ip_check_interval_second = 5;
// dpp version
public static String dpp_version = "3_2_0";
@@ -69,12 +65,6 @@ public class FeConstants {
public static long tablet_checker_interval_ms = 20 * 1000L;
public static long tablet_schedule_interval_ms = 1000L;
- public static String csv = "csv";
- public static String csv_with_names = "csv_with_names";
- public static String csv_with_names_and_types = "csv_with_names_and_types";
-
- public static String text = "hive_text";
-
public static String FS_PREFIX_S3 = "s3";
public static String FS_PREFIX_S3A = "s3a";
public static String FS_PREFIX_S3N = "s3n";
@@ -90,6 +80,7 @@ public class FeConstants {
public static String FS_PREFIX_HDFS = "hdfs";
public static String FS_PREFIX_VIEWFS = "viewfs";
public static String FS_PREFIX_FILE = "file";
+
public static final String INTERNAL_DB_NAME = "__internal_schema";
public static String TEMP_MATERIZLIZE_DVIEW_PREFIX =
"internal_tmp_materialized_view_";
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java
new file mode 100644
index 00000000000..7d60222d299
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.util;
+
+import java.util.regex.Pattern;
+
+public class FileFormatConstants {
+ public static final String DEFAULT_COLUMN_SEPARATOR = "\t";
+ public static final String DEFAULT_HIVE_TEXT_COLUMN_SEPARATOR = "\001";
+ public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+ public static final String FORMAT_CSV = "csv";
+ public static final String FORMAT_CSV_WITH_NAMES = "csv_with_names";
+ public static final String FORMAT_CSV_WITH_NAMES_AND_TYPES =
"csv_with_names_and_types";
+ public static final String FORMAT_HIVE_TEXT = "hive_text";
+ public static final String FORMAT_PARQUET = "parquet";
+ public static final String FORMAT_ORC = "orc";
+ public static final String FORMAT_JSON = "json";
+ public static final String FORMAT_AVRO = "avro";
+ public static final String FORMAT_WAL = "wal";
+
+ public static final String PROP_FORMAT = "format";
+ public static final String PROP_COLUMN_SEPARATOR = "column_separator";
+ public static final String PROP_LINE_DELIMITER = "line_delimiter";
+ public static final String PROP_JSON_ROOT = "json_root";
+ public static final String PROP_JSON_PATHS = "jsonpaths";
+ public static final String PROP_STRIP_OUTER_ARRAY = "strip_outer_array";
+ public static final String PROP_READ_JSON_BY_LINE = "read_json_by_line";
+ public static final String PROP_NUM_AS_STRING = "num_as_string";
+ public static final String PROP_FUZZY_PARSE = "fuzzy_parse";
+ public static final String PROP_TRIM_DOUBLE_QUOTES = "trim_double_quotes";
+ public static final String PROP_SKIP_LINES = "skip_lines";
+ public static final String PROP_CSV_SCHEMA = "csv_schema";
+ public static final String PROP_COMPRESS_TYPE = "compress_type";
+ public static final String PROP_PATH_PARTITION_KEYS =
"path_partition_keys";
+
+ // decimal(p,s)
+ public static final Pattern DECIMAL_TYPE_PATTERN =
Pattern.compile("decimal\\((\\d+),(\\d+)\\)");
+ // datetime(p)
+ public static final Pattern DATETIME_TYPE_PATTERN =
Pattern.compile("datetime\\((\\d+)\\)");
+
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatUtils.java
new file mode 100644
index 00000000000..0b646a00b16
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatUtils.java
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.util;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.FeNameFormat;
+
+import com.google.common.base.Strings;
+
+import java.util.List;
+import java.util.regex.Matcher;
+
+public class FileFormatUtils {
+
+ public static boolean isCsv(String formatStr) {
+ return FileFormatConstants.FORMAT_CSV.equalsIgnoreCase(formatStr)
+ ||
FileFormatConstants.FORMAT_CSV_WITH_NAMES.equalsIgnoreCase(formatStr)
+ ||
FileFormatConstants.FORMAT_CSV_WITH_NAMES_AND_TYPES.equalsIgnoreCase(formatStr)
+ ||
FileFormatConstants.FORMAT_HIVE_TEXT.equalsIgnoreCase(formatStr);
+ }
+
+ // public for unit test
+ public static void parseCsvSchema(List<Column> csvSchema, String
csvSchemaStr)
+ throws AnalysisException {
+ if (Strings.isNullOrEmpty(csvSchemaStr)) {
+ return;
+ }
+ // the schema str is like:
"k1:int;k2:bigint;k3:varchar(20);k4:datetime(6)"
+ String[] schemaStrs = csvSchemaStr.split(";");
+ try {
+ for (String schemaStr : schemaStrs) {
+ String[] kv = schemaStr.replace(" ", "").split(":");
+ if (kv.length != 2) {
+ throw new AnalysisException("invalid csv schema: " +
csvSchemaStr);
+ }
+ Column column = null;
+ String name = kv[0].toLowerCase();
+ FeNameFormat.checkColumnName(name);
+ String type = kv[1].toLowerCase();
+ if (type.equals("tinyint")) {
+ column = new Column(name, PrimitiveType.TINYINT, true);
+ } else if (type.equals("smallint")) {
+ column = new Column(name, PrimitiveType.SMALLINT, true);
+ } else if (type.equals("int")) {
+ column = new Column(name, PrimitiveType.INT, true);
+ } else if (type.equals("bigint")) {
+ column = new Column(name, PrimitiveType.BIGINT, true);
+ } else if (type.equals("largeint")) {
+ column = new Column(name, PrimitiveType.LARGEINT, true);
+ } else if (type.equals("float")) {
+ column = new Column(name, PrimitiveType.FLOAT, true);
+ } else if (type.equals("double")) {
+ column = new Column(name, PrimitiveType.DOUBLE, true);
+ } else if (type.startsWith("decimal")) {
+ // regex decimal(p, s)
+ Matcher matcher =
FileFormatConstants.DECIMAL_TYPE_PATTERN.matcher(type);
+ if (!matcher.find()) {
+ throw new AnalysisException("invalid decimal type: " +
type);
+ }
+ int precision = Integer.parseInt(matcher.group(1));
+ int scale = Integer.parseInt(matcher.group(2));
+ column = new Column(name,
ScalarType.createDecimalV3Type(precision, scale), false, null, true, null,
+ "");
+ } else if (type.equals("date")) {
+ column = new Column(name, ScalarType.createDateType(),
false, null, true, null, "");
+ } else if (type.startsWith("datetime")) {
+ int scale = 0;
+ if (!type.equals("datetime")) {
+ // regex datetime(s)
+ Matcher matcher =
FileFormatConstants.DATETIME_TYPE_PATTERN.matcher(type);
+ if (!matcher.find()) {
+ throw new AnalysisException("invalid datetime
type: " + type);
+ }
+ scale = Integer.parseInt(matcher.group(1));
+ }
+ column = new Column(name,
ScalarType.createDatetimeV2Type(scale), false, null, true, null, "");
+ } else if (type.equals("string")) {
+ column = new Column(name, PrimitiveType.STRING, true);
+ } else if (type.equals("boolean")) {
+ column = new Column(name, PrimitiveType.BOOLEAN, true);
+ } else {
+ throw new AnalysisException("unsupported column type: " +
type);
+ }
+ csvSchema.add(column);
+ }
+ } catch (Exception e) {
+ throw new AnalysisException("invalid csv schema: " +
e.getMessage());
+ }
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
index 2cdc3f1972f..a6924f784fe 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
@@ -21,7 +21,6 @@ import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
-import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.qe.ConnectContext;
@@ -551,17 +550,18 @@ public class Util {
public static TFileFormatType getFileFormatTypeFromName(String formatName)
{
String lowerFileFormat =
Objects.requireNonNull(formatName).toLowerCase();
- if (lowerFileFormat.equals("parquet")) {
+ if (lowerFileFormat.equals(FileFormatConstants.FORMAT_PARQUET)) {
return TFileFormatType.FORMAT_PARQUET;
- } else if (lowerFileFormat.equals("orc")) {
+ } else if (lowerFileFormat.equals(FileFormatConstants.FORMAT_ORC)) {
return TFileFormatType.FORMAT_ORC;
- } else if (lowerFileFormat.equals("json")) {
+ } else if (lowerFileFormat.equals(FileFormatConstants.FORMAT_JSON)) {
return TFileFormatType.FORMAT_JSON;
// csv/csv_with_name/csv_with_names_and_types treat as csv format
- } else if (lowerFileFormat.equals(FeConstants.csv) ||
lowerFileFormat.equals(FeConstants.csv_with_names)
- || lowerFileFormat.equals(FeConstants.csv_with_names_and_types)
+ } else if (lowerFileFormat.equals(FileFormatConstants.FORMAT_CSV)
+ ||
lowerFileFormat.equals(FileFormatConstants.FORMAT_CSV_WITH_NAMES)
+ ||
lowerFileFormat.equals(FileFormatConstants.FORMAT_CSV_WITH_NAMES_AND_TYPES)
// TODO: Add TEXTFILE to TFileFormatType to Support hive text
file format.
- || lowerFileFormat.equals(FeConstants.text)) {
+ ||
lowerFileFormat.equals(FileFormatConstants.FORMAT_HIVE_TEXT)) {
return TFileFormatType.FORMAT_CSV_PLAIN;
} else {
return TFileFormatType.FORMAT_UNKNOWN;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java
index 6d7031f61c7..d9213360583 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java
@@ -20,7 +20,7 @@ package org.apache.doris.planner;
import org.apache.doris.analysis.OutFileClause;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.analysis.TupleId;
-import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.util.FileFormatConstants;
import org.apache.doris.thrift.TDataSink;
import org.apache.doris.thrift.TDataSinkType;
import org.apache.doris.thrift.TExplainLevel;
@@ -65,8 +65,8 @@ public class ResultFileSink extends DataSink {
public ResultFileSink(PlanNodeId exchNodeId, OutFileClause outFileClause,
ArrayList<String> labels) {
this(exchNodeId, outFileClause);
- if (outFileClause.getHeaderType().equals(FeConstants.csv_with_names)
- ||
outFileClause.getHeaderType().equals(FeConstants.csv_with_names_and_types)) {
+ if
(outFileClause.getHeaderType().equals(FileFormatConstants.FORMAT_CSV_WITH_NAMES)
+ ||
outFileClause.getHeaderType().equals(FileFormatConstants.FORMAT_CSV_WITH_NAMES_AND_TYPES))
{
header = genNames(labels, outFileClause.getColumnSeparator(),
outFileClause.getLineDelimiter());
}
headerType = outFileClause.getHeaderType();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
index 67e09cbf205..7b57d9b0454 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
@@ -292,7 +292,7 @@ public abstract class FileQueryScanNode extends
FileScanNode {
for (Split split : inputSplits) {
FileSplit fileSplit = (FileSplit) split;
TFileType locationType =
getLocationType(fileSplit.getPath().toString());
- setLocationPropertiesIfNecessary(locationType, fileSplit,
locationProperties);
+ setLocationPropertiesIfNecessary(locationType, locationProperties);
TScanRangeLocations curLocations = newLocations();
// If fileSplit has partition values, use the values collected
from hive partitions.
@@ -368,7 +368,7 @@ public abstract class FileQueryScanNode extends
FileScanNode {
scanRangeLocations.size(), (System.currentTimeMillis() -
start));
}
- private void setLocationPropertiesIfNecessary(TFileType locationType,
FileSplit fileSplit,
+ private void setLocationPropertiesIfNecessary(TFileType locationType,
Map<String, String> locationProperties) throws UserException {
if (locationType == TFileType.FILE_HDFS || locationType ==
TFileType.FILE_BROKER) {
if (!params.isSetHdfsParams()) {
@@ -455,13 +455,6 @@ public abstract class FileQueryScanNode extends
FileScanNode {
protected abstract Map<String, String> getLocationProperties() throws
UserException;
- // eg: hdfs://namenode s3://buckets
- protected String getFsName(FileSplit split) {
- String fullPath = split.getPath().toUri().toString();
- String filePath = split.getPath().toUri().getPath();
- return fullPath.replace(filePath, "");
- }
-
protected static Optional<TFileType> getTFileType(String location) {
if (location != null && !location.isEmpty()) {
if (S3Util.isObjStorage(location)) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
index d36958ad88b..52bb119d7a7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
@@ -27,9 +27,9 @@ import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.DdlException;
-import org.apache.doris.common.FeConstants;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.FileFormatConstants;
import org.apache.doris.common.util.Util;
import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.load.Load;
@@ -131,8 +131,8 @@ public class LoadScanProvider {
private String getHeaderType(String formatType) {
if (formatType != null) {
- if (formatType.equalsIgnoreCase(FeConstants.csv_with_names) ||
formatType.equalsIgnoreCase(
- FeConstants.csv_with_names_and_types)) {
+ if
(formatType.equalsIgnoreCase(FileFormatConstants.FORMAT_CSV_WITH_NAMES)
+ ||
formatType.equalsIgnoreCase(FileFormatConstants.FORMAT_CSV_WITH_NAMES_AND_TYPES))
{
return formatType;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java
index c069aa43c30..6cdd9d9834f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java
@@ -81,10 +81,6 @@ public class TVFScanNode extends FileQueryScanNode {
numNodes = backendPolicy.numBackends();
}
- protected String getFsName(FileSplit split) {
- return tableValuedFunction.getFsName();
- }
-
@Override
public TFileAttributes getFileAttributes() throws UserException {
return tableValuedFunction.getFileAttributes();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
index 9ab79eecaaa..8fddb508466 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
@@ -32,12 +32,12 @@ import org.apache.doris.catalog.StructType;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.FeConstants;
-import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.BrokerUtil;
+import org.apache.doris.common.util.FileFormatConstants;
+import org.apache.doris.common.util.FileFormatUtils;
import org.apache.doris.common.util.Util;
-import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.planner.external.TVFScanNode;
@@ -67,6 +67,7 @@ import org.apache.doris.thrift.TTextSerdeType;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import com.google.protobuf.ByteString;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -80,8 +81,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
@@ -89,43 +88,22 @@ import java.util.stream.Collectors;
*/
public abstract class ExternalFileTableValuedFunction extends
TableValuedFunctionIf {
public static final Logger LOG =
LogManager.getLogger(ExternalFileTableValuedFunction.class);
- protected static String DEFAULT_COLUMN_SEPARATOR = ",";
- protected static String DEFAULT_HIVE_TEXT_COLUMN_SEPARATOR = "\001";
- protected static final String DEFAULT_LINE_DELIMITER = "\n";
- public static final String FORMAT = "format";
- public static final String COLUMN_SEPARATOR = "column_separator";
- public static final String LINE_DELIMITER = "line_delimiter";
- protected static final String JSON_ROOT = "json_root";
- protected static final String JSON_PATHS = "jsonpaths";
- protected static final String STRIP_OUTER_ARRAY = "strip_outer_array";
- protected static final String READ_JSON_BY_LINE = "read_json_by_line";
- protected static final String NUM_AS_STRING = "num_as_string";
- protected static final String FUZZY_PARSE = "fuzzy_parse";
- protected static final String TRIM_DOUBLE_QUOTES = "trim_double_quotes";
- protected static final String SKIP_LINES = "skip_lines";
- protected static final String CSV_SCHEMA = "csv_schema";
- protected static final String COMPRESS_TYPE = "compress_type";
- public static final String PATH_PARTITION_KEYS = "path_partition_keys";
- // decimal(p,s)
- private static final Pattern DECIMAL_TYPE_PATTERN =
Pattern.compile("decimal\\((\\d+),(\\d+)\\)");
- // datetime(p)
- private static final Pattern DATETIME_TYPE_PATTERN =
Pattern.compile("datetime\\((\\d+)\\)");
protected static final ImmutableSet<String> FILE_FORMAT_PROPERTIES = new
ImmutableSet.Builder<String>()
- .add(FORMAT)
- .add(JSON_ROOT)
- .add(JSON_PATHS)
- .add(STRIP_OUTER_ARRAY)
- .add(READ_JSON_BY_LINE)
- .add(NUM_AS_STRING)
- .add(FUZZY_PARSE)
- .add(COLUMN_SEPARATOR)
- .add(LINE_DELIMITER)
- .add(TRIM_DOUBLE_QUOTES)
- .add(SKIP_LINES)
- .add(CSV_SCHEMA)
- .add(COMPRESS_TYPE)
- .add(PATH_PARTITION_KEYS)
+ .add(FileFormatConstants.PROP_FORMAT)
+ .add(FileFormatConstants.PROP_JSON_ROOT)
+ .add(FileFormatConstants.PROP_JSON_PATHS)
+ .add(FileFormatConstants.PROP_STRIP_OUTER_ARRAY)
+ .add(FileFormatConstants.PROP_READ_JSON_BY_LINE)
+ .add(FileFormatConstants.PROP_NUM_AS_STRING)
+ .add(FileFormatConstants.PROP_FUZZY_PARSE)
+ .add(FileFormatConstants.PROP_COLUMN_SEPARATOR)
+ .add(FileFormatConstants.PROP_LINE_DELIMITER)
+ .add(FileFormatConstants.PROP_TRIM_DOUBLE_QUOTES)
+ .add(FileFormatConstants.PROP_SKIP_LINES)
+ .add(FileFormatConstants.PROP_CSV_SCHEMA)
+ .add(FileFormatConstants.PROP_COMPRESS_TYPE)
+ .add(FileFormatConstants.PROP_PATH_PARTITION_KEYS)
.build();
// Columns got from file and path(if has)
@@ -137,17 +115,16 @@ public abstract class ExternalFileTableValuedFunction
extends TableValuedFunctio
private List<String> pathPartitionKeys;
protected List<TBrokerFileStatus> fileStatuses = Lists.newArrayList();
- protected Map<String, String> locationProperties;
+ protected Map<String, String> locationProperties = Maps.newHashMap();
protected String filePath;
-
- private TFileFormatType fileFormatType;
+ protected TFileFormatType fileFormatType;
private TFileCompressType compressionType;
private String headerType = "";
private TTextSerdeType textSerdeType = TTextSerdeType.JSON_TEXT_SERDE;
- private String columnSeparator = DEFAULT_COLUMN_SEPARATOR;
- private String lineDelimiter = DEFAULT_LINE_DELIMITER;
+ private String columnSeparator =
FileFormatConstants.DEFAULT_COLUMN_SEPARATOR;
+ private String lineDelimiter = FileFormatConstants.DEFAULT_LINE_DELIMITER;
private String jsonRoot = "";
private String jsonPaths = "";
private boolean stripOuterArray;
@@ -175,20 +152,6 @@ public abstract class ExternalFileTableValuedFunction
extends TableValuedFunctio
return locationProperties;
}
- public List<Column> getCsvSchema() {
- return csvSchema;
- }
-
- public String getFsName() {
- TFileType fileType = getTFileType();
- if (fileType == TFileType.FILE_HDFS) {
- return locationProperties.get(HdfsResource.HADOOP_FS_NAME);
- } else if (fileType == TFileType.FILE_S3) {
- return locationProperties.get(S3Properties.ENDPOINT);
- }
- return "";
- }
-
public List<String> getPathPartitionKeys() {
return pathPartitionKeys;
}
@@ -203,25 +166,29 @@ public abstract class ExternalFileTableValuedFunction
extends TableValuedFunctio
}
}
- //The keys in the passed validParams map need to be lowercase.
- protected void parseProperties(Map<String, String> validParams) throws
AnalysisException {
- String formatString = validParams.getOrDefault(FORMAT, "");
- String defaultColumnSeparator = DEFAULT_COLUMN_SEPARATOR;
+ //The keys in properties map need to be lowercase.
+ protected Map<String, String> parseCommonProperties(Map<String, String>
properties) throws AnalysisException {
+ // Copy the properties, because we will remove the key from properties.
+ Map<String, String> copiedProps =
Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
+ copiedProps.putAll(properties);
+
+ String formatString = getOrDefaultAndRemove(copiedProps,
FileFormatConstants.PROP_FORMAT, "");
+ String defaultColumnSeparator =
FileFormatConstants.DEFAULT_COLUMN_SEPARATOR;
switch (formatString) {
case "csv":
this.fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN;
break;
case "hive_text":
this.fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN;
- defaultColumnSeparator = DEFAULT_HIVE_TEXT_COLUMN_SEPARATOR;
+ defaultColumnSeparator =
FileFormatConstants.DEFAULT_HIVE_TEXT_COLUMN_SEPARATOR;
this.textSerdeType = TTextSerdeType.HIVE_TEXT_SERDE;
break;
case "csv_with_names":
- this.headerType = FeConstants.csv_with_names;
+ this.headerType = FileFormatConstants.FORMAT_CSV_WITH_NAMES;
this.fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN;
break;
case "csv_with_names_and_types":
- this.headerType = FeConstants.csv_with_names_and_types;
+ this.headerType =
FileFormatConstants.FORMAT_CSV_WITH_NAMES_AND_TYPES;
this.fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN;
break;
case "parquet":
@@ -240,113 +207,61 @@ public abstract class ExternalFileTableValuedFunction
extends TableValuedFunctio
throw new AnalysisException("format:" + formatString + " is
not supported.");
}
- columnSeparator = validParams.getOrDefault(COLUMN_SEPARATOR,
defaultColumnSeparator);
+ columnSeparator = getOrDefaultAndRemove(copiedProps,
FileFormatConstants.PROP_COLUMN_SEPARATOR,
+ defaultColumnSeparator);
if (Strings.isNullOrEmpty(columnSeparator)) {
throw new AnalysisException("column_separator can not be empty.");
}
columnSeparator = Separator.convertSeparator(columnSeparator);
- lineDelimiter = validParams.getOrDefault(LINE_DELIMITER,
DEFAULT_LINE_DELIMITER);
+ lineDelimiter = getOrDefaultAndRemove(copiedProps,
FileFormatConstants.PROP_LINE_DELIMITER,
+ FileFormatConstants.DEFAULT_LINE_DELIMITER);
if (Strings.isNullOrEmpty(lineDelimiter)) {
throw new AnalysisException("line_delimiter can not be empty.");
}
lineDelimiter = Separator.convertSeparator(lineDelimiter);
- jsonRoot = validParams.getOrDefault(JSON_ROOT, "");
- jsonPaths = validParams.getOrDefault(JSON_PATHS, "");
- readJsonByLine =
Boolean.valueOf(validParams.get(READ_JSON_BY_LINE)).booleanValue();
- stripOuterArray =
Boolean.valueOf(validParams.get(STRIP_OUTER_ARRAY)).booleanValue();
- numAsString =
Boolean.valueOf(validParams.get(NUM_AS_STRING)).booleanValue();
- fuzzyParse =
Boolean.valueOf(validParams.get(FUZZY_PARSE)).booleanValue();
- trimDoubleQuotes =
Boolean.valueOf(validParams.get(TRIM_DOUBLE_QUOTES)).booleanValue();
- skipLines = Integer.valueOf(validParams.getOrDefault(SKIP_LINES,
"0")).intValue();
-
+ jsonRoot = getOrDefaultAndRemove(copiedProps,
FileFormatConstants.PROP_JSON_ROOT, "");
+ jsonPaths = getOrDefaultAndRemove(copiedProps,
FileFormatConstants.PROP_JSON_PATHS, "");
+ readJsonByLine = Boolean.valueOf(
+ getOrDefaultAndRemove(copiedProps,
FileFormatConstants.PROP_READ_JSON_BY_LINE, "")).booleanValue();
+ stripOuterArray = Boolean.valueOf(
+ getOrDefaultAndRemove(copiedProps,
FileFormatConstants.PROP_STRIP_OUTER_ARRAY, "")).booleanValue();
+ numAsString = Boolean.valueOf(
+ getOrDefaultAndRemove(copiedProps,
FileFormatConstants.PROP_NUM_AS_STRING, "")).booleanValue();
+ fuzzyParse = Boolean.valueOf(
+ getOrDefaultAndRemove(copiedProps,
FileFormatConstants.PROP_FUZZY_PARSE, "")).booleanValue();
+ trimDoubleQuotes = Boolean.valueOf(
+ getOrDefaultAndRemove(copiedProps,
FileFormatConstants.PROP_TRIM_DOUBLE_QUOTES, "")).booleanValue();
+ skipLines = Integer.valueOf(
+ getOrDefaultAndRemove(copiedProps,
FileFormatConstants.PROP_SKIP_LINES, "0")).intValue();
+
+ String compressTypeStr = getOrDefaultAndRemove(copiedProps,
FileFormatConstants.PROP_COMPRESS_TYPE, "UNKNOWN");
try {
- compressionType =
Util.getFileCompressType(validParams.getOrDefault(COMPRESS_TYPE, "UNKNOWN"));
+ compressionType = Util.getFileCompressType(compressTypeStr);
} catch (IllegalArgumentException e) {
- throw new AnalysisException("Compress type : " +
validParams.get(COMPRESS_TYPE) + " is not supported.");
+ throw new AnalysisException("Compress type : " + compressTypeStr
+ " is not supported.");
}
- if (formatString.equals("csv") || formatString.equals("csv_with_names")
- || formatString.equals("csv_with_names_and_types")) {
- parseCsvSchema(csvSchema, validParams);
+ if (FileFormatUtils.isCsv(formatString)) {
+ FileFormatUtils.parseCsvSchema(csvSchema,
getOrDefaultAndRemove(copiedProps,
+ FileFormatConstants.PROP_CSV_SCHEMA, ""));
+ LOG.debug("get csv schema: {}", csvSchema);
}
- pathPartitionKeys =
Optional.ofNullable(validParams.get(PATH_PARTITION_KEYS))
- .map(str ->
- Arrays.stream(str.split(","))
- .map(String::trim)
- .collect(Collectors.toList()))
+
+ pathPartitionKeys = Optional.ofNullable(
+ getOrDefaultAndRemove(copiedProps,
FileFormatConstants.PROP_PATH_PARTITION_KEYS, null))
+ .map(str -> Arrays.stream(str.split(","))
+ .map(String::trim)
+ .collect(Collectors.toList()))
.orElse(Lists.newArrayList());
+
+ return copiedProps;
}
- // public for unit test
- public static void parseCsvSchema(List<Column> csvSchema, Map<String,
String> validParams)
- throws AnalysisException {
- String csvSchemaStr = validParams.get(CSV_SCHEMA);
- if (Strings.isNullOrEmpty(csvSchemaStr)) {
- return;
- }
- // the schema str is like:
"k1:int;k2:bigint;k3:varchar(20);k4:datetime(6)"
- String[] schemaStrs = csvSchemaStr.split(";");
- try {
- for (String schemaStr : schemaStrs) {
- String[] kv = schemaStr.replace(" ", "").split(":");
- if (kv.length != 2) {
- throw new AnalysisException("invalid csv schema: " +
csvSchemaStr);
- }
- Column column = null;
- String name = kv[0].toLowerCase();
- FeNameFormat.checkColumnName(name);
- String type = kv[1].toLowerCase();
- if (type.equals("tinyint")) {
- column = new Column(name, PrimitiveType.TINYINT, true);
- } else if (type.equals("smallint")) {
- column = new Column(name, PrimitiveType.SMALLINT, true);
- } else if (type.equals("int")) {
- column = new Column(name, PrimitiveType.INT, true);
- } else if (type.equals("bigint")) {
- column = new Column(name, PrimitiveType.BIGINT, true);
- } else if (type.equals("largeint")) {
- column = new Column(name, PrimitiveType.LARGEINT, true);
- } else if (type.equals("float")) {
- column = new Column(name, PrimitiveType.FLOAT, true);
- } else if (type.equals("double")) {
- column = new Column(name, PrimitiveType.DOUBLE, true);
- } else if (type.startsWith("decimal")) {
- // regex decimal(p, s)
- Matcher matcher = DECIMAL_TYPE_PATTERN.matcher(type);
- if (!matcher.find()) {
- throw new AnalysisException("invalid decimal type: " +
type);
- }
- int precision = Integer.parseInt(matcher.group(1));
- int scale = Integer.parseInt(matcher.group(2));
- column = new Column(name,
ScalarType.createDecimalV3Type(precision, scale), false, null, true, null,
- "");
- } else if (type.equals("date")) {
- column = new Column(name, ScalarType.createDateType(),
false, null, true, null, "");
- } else if (type.startsWith("datetime")) {
- int scale = 0;
- if (!type.equals("datetime")) {
- // regex datetime(s)
- Matcher matcher = DATETIME_TYPE_PATTERN.matcher(type);
- if (!matcher.find()) {
- throw new AnalysisException("invalid datetime
type: " + type);
- }
- scale = Integer.parseInt(matcher.group(1));
- }
- column = new Column(name,
ScalarType.createDatetimeV2Type(scale), false, null, true, null, "");
- } else if (type.equals("string")) {
- column = new Column(name, PrimitiveType.STRING, true);
- } else if (type.equals("boolean")) {
- column = new Column(name, PrimitiveType.BOOLEAN, true);
- } else {
- throw new AnalysisException("unsupported column type: " +
type);
- }
- csvSchema.add(column);
- }
- LOG.debug("get csv schema: {}", csvSchema);
- } catch (Exception e) {
- throw new AnalysisException("invalid csv schema: " +
e.getMessage());
- }
+ protected String getOrDefaultAndRemove(Map<String, String> props, String
key, String defaultValue) {
+ String value = props.getOrDefault(key, defaultValue);
+ props.remove(key);
+ return value;
}
public List<TBrokerFileStatus> getFileStatuses() {
@@ -541,3 +456,4 @@ public abstract class ExternalFileTableValuedFunction
extends TableValuedFunctio
}
+
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HdfsTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HdfsTableValuedFunction.java
index eb8e8f70f7b..051706ae474 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HdfsTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HdfsTableValuedFunction.java
@@ -18,16 +18,14 @@
package org.apache.doris.tablefunction;
import org.apache.doris.analysis.BrokerDesc;
-import org.apache.doris.analysis.ExportStmt;
+import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.analysis.StorageBackend.StorageType;
import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.util.URI;
import org.apache.doris.thrift.TFileType;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
-import org.apache.commons.collections.map.CaseInsensitiveMap;
+import com.google.common.base.Strings;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -39,50 +37,41 @@ import java.util.Map;
*/
public class HdfsTableValuedFunction extends ExternalFileTableValuedFunction {
public static final Logger LOG =
LogManager.getLogger(HdfsTableValuedFunction.class);
-
public static final String NAME = "hdfs";
- public static final String HDFS_URI = "uri";
- // simple or kerberos
+ private static final String PROP_URI = "uri";
+
+ public HdfsTableValuedFunction(Map<String, String> properties) throws
AnalysisException {
+ init(properties);
+ }
- private static final ImmutableSet<String> LOCATION_PROPERTIES = new
ImmutableSet.Builder<String>()
- .add(HDFS_URI)
- .add(HdfsResource.HADOOP_SECURITY_AUTHENTICATION)
- .add(HdfsResource.HADOOP_FS_NAME)
- .add(HdfsResource.HADOOP_USER_NAME)
- .add(HdfsResource.HADOOP_KERBEROS_PRINCIPAL)
- .add(HdfsResource.HADOOP_KERBEROS_KEYTAB)
- .add(HdfsResource.HADOOP_SHORT_CIRCUIT)
- .add(HdfsResource.HADOOP_SOCKET_PATH)
- .build();
+ private void init(Map<String, String> properties) throws AnalysisException
{
+ // 1. analyze common properties
+ Map<String, String> otherProps =
super.parseCommonProperties(properties);
- private URI hdfsUri;
+ // 2. analyze uri
+ String uriStr = getOrDefaultAndRemove(otherProps, PROP_URI, null);
+ if (Strings.isNullOrEmpty(uriStr)) {
+ throw new AnalysisException(String.format("Properties '%s' is
required.", PROP_URI));
+ }
+ URI uri = URI.create(uriStr);
+ StorageBackend.checkUri(uri, StorageType.HDFS);
+ filePath = uri.getScheme() + "://" + uri.getAuthority() +
uri.getPath();
- public HdfsTableValuedFunction(Map<String, String> params) throws
AnalysisException {
- Map<String, String> fileParams = new CaseInsensitiveMap();
- locationProperties = Maps.newHashMap();
- for (String key : params.keySet()) {
- String lowerKey = key.toLowerCase();
- if (FILE_FORMAT_PROPERTIES.contains(lowerKey)) {
- fileParams.put(lowerKey, params.get(key));
- } else if (LOCATION_PROPERTIES.contains(lowerKey)) {
- locationProperties.put(lowerKey, params.get(key));
- } else if (HdfsResource.HADOOP_FS_NAME.equalsIgnoreCase(key)) {
+ // 3. analyze other properties
+ for (String key : otherProps.keySet()) {
+ if (HdfsResource.HADOOP_FS_NAME.equalsIgnoreCase(key)) {
// because HADOOP_FS_NAME contains upper and lower case
- locationProperties.put(HdfsResource.HADOOP_FS_NAME,
params.get(key));
+ locationProperties.put(HdfsResource.HADOOP_FS_NAME,
otherProps.get(key));
} else {
- locationProperties.put(key, params.get(key));
+ locationProperties.put(key, otherProps.get(key));
}
}
-
- if (!locationProperties.containsKey(HDFS_URI)) {
- throw new AnalysisException(String.format("Configuration '%s' is
required.", HDFS_URI));
+ // If the user does not specify the HADOOP_FS_NAME, we will use the
uri's scheme and authority
+ if (!locationProperties.containsKey(HdfsResource.HADOOP_FS_NAME)) {
+ locationProperties.put(HdfsResource.HADOOP_FS_NAME,
uri.getScheme() + "://" + uri.getAuthority());
}
- ExportStmt.checkPath(locationProperties.get(HDFS_URI),
StorageType.HDFS);
- hdfsUri = URI.create(locationProperties.get(HDFS_URI));
- filePath = locationProperties.get(HdfsResource.HADOOP_FS_NAME) +
hdfsUri.getPath();
-
- super.parseProperties(fileParams);
+ // 4. parse file
parseFile();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HttpStreamTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HttpStreamTableValuedFunction.java
index 265045d7a6f..5044f045c31 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HttpStreamTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HttpStreamTableValuedFunction.java
@@ -20,9 +20,9 @@ package org.apache.doris.tablefunction;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.StorageBackend.StorageType;
import org.apache.doris.common.AnalysisException;
+import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
-import org.apache.commons.collections.map.CaseInsensitiveMap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -36,24 +36,15 @@ public class HttpStreamTableValuedFunction extends
ExternalFileTableValuedFuncti
private static final Logger LOG =
LogManager.getLogger(HttpStreamTableValuedFunction.class);
public static final String NAME = "http_stream";
- public HttpStreamTableValuedFunction(Map<String, String> params) throws
AnalysisException {
- Map<String, String> fileParams = new CaseInsensitiveMap();
- for (String key : params.keySet()) {
- String lowerKey = key.toLowerCase();
- if (!FILE_FORMAT_PROPERTIES.contains(lowerKey)) {
- throw new AnalysisException(key + " is invalid property");
- }
- fileParams.put(lowerKey, params.get(key).toLowerCase());
- }
+ public HttpStreamTableValuedFunction(Map<String, String> properties)
throws AnalysisException {
+ // 1. analyze common properties
+ super.parseCommonProperties(properties);
- String formatString = fileParams.getOrDefault(FORMAT,
"").toLowerCase();
- if (formatString.equals("parquet")
- || formatString.equals("avro")
- || formatString.equals("orc")) {
- throw new AnalysisException("current http_stream does not yet
support parquet, avro and orc");
+ if (fileFormatType == TFileFormatType.FORMAT_PARQUET
+ || fileFormatType == TFileFormatType.FORMAT_AVRO
+ || fileFormatType == TFileFormatType.FORMAT_ORC) {
+ throw new AnalysisException("http_stream does not yet support
parquet, avro and orc");
}
-
- super.parseProperties(fileParams);
}
// =========== implement abstract methods of
ExternalFileTableValuedFunction =================
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/LocalTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/LocalTableValuedFunction.java
index 129c3f930c7..350621e3550 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/LocalTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/LocalTableValuedFunction.java
@@ -31,8 +31,6 @@ import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TNetworkAddress;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
-import org.apache.commons.collections.map.CaseInsensitiveMap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -46,42 +44,31 @@ import java.util.concurrent.TimeUnit;
*/
public class LocalTableValuedFunction extends ExternalFileTableValuedFunction {
private static final Logger LOG =
LogManager.getLogger(LocalTableValuedFunction.class);
-
public static final String NAME = "local";
- public static final String FILE_PATH = "file_path";
- public static final String BACKEND_ID = "backend_id";
+ public static final String PROP_FILE_PATH = "file_path";
+ public static final String PROP_BACKEND_ID = "backend_id";
private static final ImmutableSet<String> LOCATION_PROPERTIES = new
ImmutableSet.Builder<String>()
- .add(FILE_PATH)
- .add(BACKEND_ID)
+ .add(PROP_FILE_PATH)
+ .add(PROP_BACKEND_ID)
.build();
private long backendId;
- public LocalTableValuedFunction(Map<String, String> params) throws
AnalysisException {
- Map<String, String> fileParams = new CaseInsensitiveMap();
- locationProperties = Maps.newHashMap();
- for (String key : params.keySet()) {
- String lowerKey = key.toLowerCase();
- if (FILE_FORMAT_PROPERTIES.contains(lowerKey)) {
- fileParams.put(lowerKey, params.get(key));
- } else if (LOCATION_PROPERTIES.contains(lowerKey)) {
- locationProperties.put(lowerKey, params.get(key));
- } else {
- throw new AnalysisException(key + " is invalid property");
- }
- }
+ public LocalTableValuedFunction(Map<String, String> properties) throws
AnalysisException {
+ // 1. analyze common properties
+ Map<String, String> otherProps =
super.parseCommonProperties(properties);
+ // 2. analyze location properties
for (String key : LOCATION_PROPERTIES) {
- if (!locationProperties.containsKey(key)) {
- throw new AnalysisException(String.format("Configuration '%s'
is required.", key));
+ if (!otherProps.containsKey(key)) {
+ throw new AnalysisException(String.format("Property '%s' is
required.", key));
}
}
+ filePath = otherProps.get(PROP_FILE_PATH);
+ backendId = Long.parseLong(otherProps.get(PROP_BACKEND_ID));
- filePath = locationProperties.get(FILE_PATH);
- backendId = Long.parseLong(locationProperties.get(BACKEND_ID));
- super.parseProperties(fileParams);
-
+ // 3. parse file
getFileListFromBackend();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java
index 504730daaee..9ad6232c4e0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java
@@ -30,10 +30,9 @@ import
org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.fs.FileSystemFactory;
import org.apache.doris.thrift.TFileType;
-import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
-import java.util.HashMap;
import java.util.Map;
/**
@@ -49,71 +48,46 @@ import java.util.Map;
*/
public class S3TableValuedFunction extends ExternalFileTableValuedFunction {
public static final String NAME = "s3";
- public static final String S3_URI = "uri";
+ public static final String PROP_URI = "uri";
private static final ImmutableSet<String> DEPRECATED_KEYS =
- ImmutableSet.of("access_key", "secret_key", "session_token",
"region");
-
- private static final ImmutableSet<String> OPTIONAL_KEYS =
- ImmutableSet.of(S3Properties.SESSION_TOKEN,
PropertyConverter.USE_PATH_STYLE, S3Properties.REGION,
- PATH_PARTITION_KEYS);
-
- private static final ImmutableSet<String> LOCATION_PROPERTIES =
ImmutableSet.<String>builder()
- .add(S3_URI)
- .add(S3Properties.ENDPOINT)
- .addAll(DEPRECATED_KEYS)
- .addAll(S3Properties.TVF_REQUIRED_FIELDS)
- .addAll(OPTIONAL_KEYS)
- .build();
-
- private final S3URI s3uri;
- private final boolean forceVirtualHosted;
- private String virtualBucket = "";
+ ImmutableSet.of("access_key", "secret_key", "session_token",
"region",
+ "ACCESS_KEY", "SECRET_KEY", "SESSION_TOKEN", "REGION");
- public S3TableValuedFunction(Map<String, String> params) throws
AnalysisException {
+ private String virtualBucket = "";
- Map<String, String> fileParams = new HashMap<>();
- for (Map.Entry<String, String> entry : params.entrySet()) {
- String key = entry.getKey();
- String lowerKey = key.toLowerCase();
- if (!LOCATION_PROPERTIES.contains(lowerKey) &&
!FILE_FORMAT_PROPERTIES.contains(lowerKey)) {
- throw new AnalysisException("Invalid property: " + key);
- }
- if (DEPRECATED_KEYS.contains(lowerKey)) {
- lowerKey = S3Properties.S3_PREFIX + lowerKey;
- }
- fileParams.put(lowerKey, entry.getValue());
- }
+ public S3TableValuedFunction(Map<String, String> properties) throws
AnalysisException {
+ // 1. analyze common properties
+ Map<String, String> otherProps =
super.parseCommonProperties(properties);
- if (!fileParams.containsKey(S3_URI)) {
- throw new AnalysisException("Missing required property: " +
S3_URI);
+ // 2. analyze uri and other properties
+ String uriStr = getOrDefaultAndRemove(otherProps, PROP_URI, null);
+ if (Strings.isNullOrEmpty(uriStr)) {
+ throw new AnalysisException(String.format("Properties '%s' is
required.", PROP_URI));
}
-
- forceVirtualHosted = isVirtualHosted(fileParams);
- s3uri = getS3Uri(fileParams);
- final String endpoint = forceVirtualHosted
- ? getEndpointAndSetVirtualBucket(params)
- : s3uri.getBucketScheme();
- if (!fileParams.containsKey(S3Properties.REGION)) {
+ forwardCompatibleDeprecatedKeys(otherProps);
+
+ String usePathStyle = getOrDefaultAndRemove(otherProps,
PropertyConverter.USE_PATH_STYLE, "false");
+ boolean forceVirtualHosted = isVirtualHosted(uriStr,
Boolean.parseBoolean(usePathStyle));
+ S3URI s3uri = getS3Uri(uriStr, forceVirtualHosted);
+ String endpoint = forceVirtualHosted
+ ? getEndpointAndSetVirtualBucket(s3uri, otherProps) :
s3uri.getBucketScheme();
+ if (!otherProps.containsKey(S3Properties.REGION)) {
String region = S3Properties.getRegionOfEndpoint(endpoint);
- fileParams.put(S3Properties.REGION, region);
+ otherProps.put(S3Properties.REGION, region);
}
+ checkNecessaryS3Properties(otherProps);
CloudCredentialWithEndpoint credential = new
CloudCredentialWithEndpoint(endpoint,
- fileParams.get(S3Properties.REGION),
- fileParams.get(S3Properties.ACCESS_KEY),
- fileParams.get(S3Properties.SECRET_KEY));
- if (fileParams.containsKey(S3Properties.SESSION_TOKEN)) {
-
credential.setSessionToken(fileParams.get(S3Properties.SESSION_TOKEN));
+ otherProps.get(S3Properties.REGION),
+ otherProps.get(S3Properties.ACCESS_KEY),
+ otherProps.get(S3Properties.SECRET_KEY));
+ if (otherProps.containsKey(S3Properties.SESSION_TOKEN)) {
+
credential.setSessionToken(otherProps.get(S3Properties.SESSION_TOKEN));
}
- // set S3 location properties
- // these five properties is necessary, no one can be lost.
locationProperties = S3Properties.credentialToMap(credential);
- String usePathStyle =
fileParams.getOrDefault(PropertyConverter.USE_PATH_STYLE, "false");
locationProperties.put(PropertyConverter.USE_PATH_STYLE, usePathStyle);
-
this.locationProperties.putAll(S3ClientBEProperties.getBeFSProperties(this.locationProperties));
-
- super.parseProperties(fileParams);
+
locationProperties.putAll(S3ClientBEProperties.getBeFSProperties(locationProperties));
if (forceVirtualHosted) {
filePath = NAME + S3URI.SCHEME_DELIM + virtualBucket +
S3URI.PATH_DELIM
@@ -130,39 +104,59 @@ public class S3TableValuedFunction extends
ExternalFileTableValuedFunction {
}
}
- private String getEndpointAndSetVirtualBucket(Map<String, String> params)
throws AnalysisException {
- Preconditions.checkState(forceVirtualHosted, "only invoked when force
virtual hosted.");
- String[] fileds = s3uri.getVirtualBucket().split("\\.", 2);
- virtualBucket = fileds[0];
- if (fileds.length > 1) {
+ private void forwardCompatibleDeprecatedKeys(Map<String, String> props) {
+ for (String deprecatedKey : DEPRECATED_KEYS) {
+ String value = props.remove(deprecatedKey);
+ if (!Strings.isNullOrEmpty(value)) {
+ props.put("s3." + deprecatedKey.toLowerCase(), value);
+ }
+ }
+ }
+
+ private void checkNecessaryS3Properties(Map<String, String> props) throws
AnalysisException {
+ if (Strings.isNullOrEmpty(props.get(S3Properties.REGION))) {
+ throw new AnalysisException(String.format("Properties '%s' is
required.", S3Properties.REGION));
+ }
+ if (Strings.isNullOrEmpty(props.get(S3Properties.ACCESS_KEY))) {
+ throw new AnalysisException(String.format("Properties '%s' is
required.", S3Properties.ACCESS_KEY));
+ }
+ if (Strings.isNullOrEmpty(props.get(S3Properties.SECRET_KEY))) {
+ throw new AnalysisException(String.format("Properties '%s' is
required.", S3Properties.SECRET_KEY));
+ }
+ }
+
+ private String getEndpointAndSetVirtualBucket(S3URI s3uri, Map<String,
String> props)
+ throws AnalysisException {
+ String[] fields = s3uri.getVirtualBucket().split("\\.", 2);
+ virtualBucket = fields[0];
+ if (fields.length > 1) {
// At this point, s3uri.getVirtualBucket() is:
virtualBucket.endpoint, Eg:
// uri:
http://my_bucket.cos.ap-beijing.myqcloud.com/file.txt
// s3uri.getVirtualBucket() =
my_bucket.cos.ap-beijing.myqcloud.com,
// so we need separate virtualBucket and endpoint.
- return fileds[1];
- } else if (params.containsKey(S3Properties.ENDPOINT)) {
- return params.get(S3Properties.ENDPOINT);
+ return fields[1];
+ } else if (props.containsKey(S3Properties.ENDPOINT)) {
+ return props.get(S3Properties.ENDPOINT);
} else {
throw new AnalysisException("can not parse endpoint, please check
uri.");
}
}
- private boolean isVirtualHosted(Map<String, String> validParams) {
- String originUri = validParams.getOrDefault(S3_URI, "");
- if (originUri.toLowerCase().startsWith("s3")) {
+ private boolean isVirtualHosted(String uri, boolean usePathStyle) {
+ if (uri.toLowerCase().startsWith("s3")) {
// s3 protocol, default virtual-hosted style
return true;
} else {
// not s3 protocol, forceVirtualHosted is determined by
USE_PATH_STYLE.
- return
!Boolean.parseBoolean(validParams.get(PropertyConverter.USE_PATH_STYLE));
+ return !usePathStyle;
}
}
- private S3URI getS3Uri(Map<String, String> validParams) throws
AnalysisException {
+ private S3URI getS3Uri(String uri, boolean forceVirtualHosted) throws
AnalysisException {
try {
- return S3URI.create(validParams.get(S3_URI), forceVirtualHosted);
+ return S3URI.create(uri, forceVirtualHosted);
} catch (UserException e) {
- throw new AnalysisException("parse s3 uri failed, uri = " +
validParams.get(S3_URI), e);
+ throw new AnalysisException("parse s3 uri failed, uri = " + uri,
e);
}
}
@@ -189,3 +183,4 @@ public class S3TableValuedFunction extends
ExternalFileTableValuedFunction {
return "S3TableValuedFunction";
}
}
+
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunctionTest.java
b/fe/fe-core/src/test/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunctionTest.java
index f664415e6d5..e5b06bd5dd4 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunctionTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunctionTest.java
@@ -21,6 +21,8 @@ import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
+import org.apache.doris.common.util.FileFormatConstants;
+import org.apache.doris.common.util.FileFormatUtils;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -35,12 +37,12 @@ public class ExternalFileTableValuedFunctionTest {
public void testCsvSchemaParse() {
Config.enable_date_conversion = true;
Map<String, String> properties = Maps.newHashMap();
- properties.put(ExternalFileTableValuedFunction.CSV_SCHEMA,
+ properties.put(FileFormatConstants.PROP_CSV_SCHEMA,
"k1:int;k2:bigint;k3:float;k4:double;k5:smallint;k6:tinyint;k7:bool;"
+
"k8:char(10);k9:varchar(20);k10:date;k11:datetime;k12:decimal(10,2)");
List<Column> csvSchema = Lists.newArrayList();
try {
- ExternalFileTableValuedFunction.parseCsvSchema(csvSchema,
properties);
+ FileFormatUtils.parseCsvSchema(csvSchema,
properties.get(FileFormatConstants.PROP_CSV_SCHEMA));
Assert.fail();
} catch (AnalysisException e) {
e.printStackTrace();
@@ -48,11 +50,11 @@ public class ExternalFileTableValuedFunctionTest {
}
csvSchema.clear();
- properties.put(ExternalFileTableValuedFunction.CSV_SCHEMA,
+ properties.put(FileFormatConstants.PROP_CSV_SCHEMA,
"k1:int;k2:bigint;k3:float;k4:double;k5:smallint;k6:tinyint;k7:boolean;"
+ "k8:string;k9:date;k10:datetime;k11:decimal(10,
2);k12:decimal( 38,10); k13:datetime(5)");
try {
- ExternalFileTableValuedFunction.parseCsvSchema(csvSchema,
properties);
+ FileFormatUtils.parseCsvSchema(csvSchema,
properties.get(FileFormatConstants.PROP_CSV_SCHEMA));
Assert.assertEquals(13, csvSchema.size());
Column decimalCol = csvSchema.get(10);
Assert.assertEquals(10, decimalCol.getPrecision());
diff --git a/regression-test/suites/export_p2/test_export_max_file_size.groovy
b/regression-test/suites/export_p2/test_export_max_file_size.groovy
index 0efe3a82cfa..460bc130268 100644
--- a/regression-test/suites/export_p2/test_export_max_file_size.groovy
+++ b/regression-test/suites/export_p2/test_export_max_file_size.groovy
@@ -73,8 +73,8 @@ suite("test_export_max_file_size", "p2") {
insert into ${table_export_name}
select * from hdfs(
"uri" = "hdfs://${nameNodeHost}:${hdfsPort}${load_data_path}",
- "fs.defaultFS" = "${fs}",
"hadoop.username" = "${user_name}",
+ "column_separator" = ",",
"format" = "csv");
"""
@@ -126,8 +126,8 @@ suite("test_export_max_file_size", "p2") {
insert into ${table_load_name}
select * from hdfs(
"uri" = "${outfile_url}${j}.csv",
- "fs.defaultFS" = "${fs}",
"hadoop.username" = "${user_name}",
+ "column_separator" = ",",
"format" = "csv");
"""
}
diff --git a/regression-test/suites/export_p2/test_export_with_hdfs.groovy
b/regression-test/suites/export_p2/test_export_with_hdfs.groovy
index 205b1ffd716..d108d355e29 100644
--- a/regression-test/suites/export_p2/test_export_with_hdfs.groovy
+++ b/regression-test/suites/export_p2/test_export_with_hdfs.groovy
@@ -97,8 +97,8 @@ suite("test_export_with_hdfs", "p2") {
// check data correctness
order_qt_select """ select * from hdfs(
"uri" = "${outfile_url}0.${file_suffix}",
- "fs.defaultFS" = "${fs}",
"hadoop.username" = "${user_name}",
+ "column_separator" = ",",
"format" = "${format}");
"""
}
diff --git
a/regression-test/suites/export_p2/test_outfile_orc_max_file_size.groovy
b/regression-test/suites/export_p2/test_outfile_orc_max_file_size.groovy
index 3f9abe2c2b2..42a2354dd95 100644
--- a/regression-test/suites/export_p2/test_outfile_orc_max_file_size.groovy
+++ b/regression-test/suites/export_p2/test_outfile_orc_max_file_size.groovy
@@ -58,7 +58,6 @@ suite("test_outfile_orc_max_file_size", "p2") {
insert into ${table_export_name}
select * from hdfs(
"uri" = "hdfs://${nameNodeHost}:${hdfsPort}${load_data_path}",
- "fs.defaultFS" = "${fs}",
"hadoop.username" = "${user_name}",
"format" = "orc");
"""
diff --git
a/regression-test/suites/external_table_p0/hive/test_different_parquet_types.groovy
b/regression-test/suites/external_table_p0/hive/test_different_parquet_types.groovy
index beb3cd3e0cf..b9005037a5c 100644
---
a/regression-test/suites/external_table_p0/hive/test_different_parquet_types.groovy
+++
b/regression-test/suites/external_table_p0/hive/test_different_parquet_types.groovy
@@ -35,7 +35,7 @@ suite("test_different_parquet_types", "p0") {
logger.info("record res" + res1_2.toString())
def res1_3 = sql """
- select * from hdfs(\"uri" =
\"hdfs://127.0.0.1:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/delta_byte_array/delta_byte_array.parquet\",\"fs.defaultFS\"
= \"hdfs://127.0.0.1:${hdfs_port}\",\"format\" = \"parquet\") limit 10
+ select * from hdfs(\"uri" =
\"hdfs://${externalEnvIp}:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/delta_byte_array/delta_byte_array.parquet\",\"format\"
= \"parquet\") limit 10
"""
logger.info("record res" + res1_3.toString())
}
@@ -58,7 +58,7 @@ suite("test_different_parquet_types", "p0") {
//return nothing,but no exception
def res3_3 = sql """
- select * from hdfs(\"uri" =
\"hdfs://127.0.0.1:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/delta_binary_packed/delta_binary_packed.parquet\",\"fs.defaultFS\"
= \"hdfs://127.0.0.1:${hdfs_port}\",\"format\" = \"parquet\") limit 10
+ select * from hdfs(\"uri" =
\"hdfs://${externalEnvIp}:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/delta_binary_packed/delta_binary_packed.parquet\",\"format\"
= \"parquet\") limit 10
"""
logger.info("record res" + res3_3.toString())
}
@@ -76,7 +76,7 @@ suite("test_different_parquet_types", "p0") {
logger.info("record res" + res4_2.toString())
def res4_3 = sql """
- select * from hdfs(\"uri" =
\"hdfs://127.0.0.1:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/delta_encoding_required_column/delta_encoding_required_column.parquet\",\"fs.defaultFS\"
= \"hdfs://127.0.0.1:${hdfs_port}\",\"format\" = \"parquet\") limit 10
+ select * from hdfs(\"uri" =
\"hdfs://${externalEnvIp}:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/delta_encoding_required_column/delta_encoding_required_column.parquet\",\"format\"
= \"parquet\") limit 10
"""
logger.info("record res" + res4_3.toString())
}
@@ -95,7 +95,7 @@ suite("test_different_parquet_types", "p0") {
logger.info("record res" + res5_2.toString())
def res5_3 = sql """
- select * from hdfs(\"uri" =
\"hdfs://127.0.0.1:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/delta_encoding_optional_column/delta_encoding_optional_column.parquet\",\"fs.defaultFS\"
= \"hdfs://127.0.0.1:${hdfs_port}\",\"format\" = \"parquet\") limit 10
+ select * from hdfs(\"uri" =
\"hdfs://${externalEnvIp}:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/delta_encoding_optional_column/delta_encoding_optional_column.parquet\",\"format\"
= \"parquet\") limit 10
"""
logger.info("record res" + res5_3.toString())
}
@@ -114,7 +114,7 @@ suite("test_different_parquet_types", "p0") {
logger.info("record res" + res6_2.toString())
def res6_3 = sql """
- select * from hdfs(\"uri" =
\"hdfs://${externalEnvIp}:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/datapage_v1-snappy-compressed-checksum/datapage_v1-snappy-compressed-checksum.parquet\",\"fs.defaultFS\"
= \"hdfs://${externalEnvIp}:${hdfs_port}\",\"format\" = \"parquet\") limit 10
+ select * from hdfs(\"uri" =
\"hdfs://${externalEnvIp}:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/datapage_v1-snappy-compressed-checksum/datapage_v1-snappy-compressed-checksum.parquet\",\"format\"
= \"parquet\") limit 10
"""
logger.info("record res" + res6_3.toString())
@@ -133,7 +133,7 @@ suite("test_different_parquet_types", "p0") {
logger.info("record res" + res7_2.toString())
def res7_3 = sql """
- select * from hdfs(\"uri" =
\"hdfs://127.0.0.1:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/overflow_i16_page_cnt/overflow_i16_page_cnt.parquet\",\"fs.defaultFS\"
= \"hdfs://127.0.0.1:${hdfs_port}\",\"format\" = \"parquet\") limit 10
+ select * from hdfs(\"uri" =
\"hdfs://${externalEnvIp}:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/overflow_i16_page_cnt/overflow_i16_page_cnt.parquet\",\"format\"
= \"parquet\") limit 10
"""
logger.info("record res" + res7_3.toString())
}
@@ -152,7 +152,7 @@ suite("test_different_parquet_types", "p0") {
logger.info("record res" + res8_2.toString())
def res8_3 = sql """
- select * from hdfs(\"uri" =
\"hdfs://127.0.0.1:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/alltypes_tiny_pages/alltypes_tiny_pages.parquet\",\"fs.defaultFS\"
= \"hdfs://127.0.0.1:${hdfs_port}\",\"format\" = \"parquet\") limit 10
+ select * from hdfs(\"uri" =
\"hdfs://${externalEnvIp}:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/alltypes_tiny_pages/alltypes_tiny_pages.parquet\",\"format\"
= \"parquet\") limit 10
"""
logger.info("record res" + res8_3.toString())
}
@@ -170,7 +170,7 @@ suite("test_different_parquet_types", "p0") {
logger.info("record res" + res9_2.toString())
def res9_3 = sql """
- select * from hdfs(\"uri" =
\"hdfs://127.0.0.1:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/alltypes_tiny_pages_plain/alltypes_tiny_pages_plain.parquet\",\"fs.defaultFS\"
= \"hdfs://127.0.0.1:${hdfs_port}\",\"format\" = \"parquet\") limit 10
+ select * from hdfs(\"uri" =
\"hdfs://${externalEnvIp}:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/alltypes_tiny_pages_plain/alltypes_tiny_pages_plain.parquet\",\"format\"
= \"parquet\") limit 10
"""
logger.info("record res" + res9_3.toString())
}
diff --git a/regression-test/suites/external_table_p0/tvf/test_hdfs_tvf.groovy
b/regression-test/suites/external_table_p0/tvf/test_hdfs_tvf.groovy
index 0535eb6505a..a4b9bdd71c8 100644
--- a/regression-test/suites/external_table_p0/tvf/test_hdfs_tvf.groovy
+++ b/regression-test/suites/external_table_p0/tvf/test_hdfs_tvf.groovy
@@ -32,8 +32,8 @@ suite("test_hdfs_tvf") {
format = "csv"
qt_csv_all_types """ select * from HDFS(
"uri" = "${uri}",
- "fs.defaultFS"= "${defaultFS}",
"hadoop.username" = "${hdfsUserName}",
+ "column_separator" = ",",
"format" = "${format}") order by c1; """
@@ -41,15 +41,14 @@ suite("test_hdfs_tvf") {
format = "csv"
qt_csv_student """ select cast(c1 as INT) as id, c2 as name, c3 as
age from HDFS(
"uri" = "${uri}",
- "fs.defaultFS"= "${defaultFS}",
"hadoop.username" = "${hdfsUserName}",
+ "column_separator" = ",",
"format" = "${format}") order by id; """
uri = "${defaultFS}" +
"/user/doris/preinstalled_data/csv_format_test/array_malformat.csv"
format = "csv"
qt_csv_array_malformat """ select * from HDFS(
"uri" = "${uri}",
- "fs.defaultFS"= "${defaultFS}",
"hadoop.username" = "${hdfsUserName}",
"format" = "${format}",
"column_separator" = "|") order by c1;
"""
@@ -57,7 +56,6 @@ suite("test_hdfs_tvf") {
uri = "${defaultFS}" +
"/user/doris/preinstalled_data/csv_format_test/array_normal.csv"
format = "csv"
qt_csv_array_normal """ select * from HDFS("uri" = "${uri}",
- "fs.defaultFS"= "${defaultFS}",
"hadoop.username" = "${hdfsUserName}",
"format" = "${format}",
"column_separator" = "|") order by c1; """
@@ -67,9 +65,9 @@ suite("test_hdfs_tvf") {
format = "csv"
qt_csv_with_compress_type """ select * from HDFS(
"uri" = "${uri}",
- "fs.defaultFS"= "${defaultFS}",
"hadoop.username" = "${hdfsUserName}",
"format" = "${format}",
+ "column_separator" = ",",
"compress_type" = "GZ") order by c1; """
// test csv format infer compress type
@@ -77,8 +75,8 @@ suite("test_hdfs_tvf") {
format = "csv"
qt_csv_infer_compress_type """ select * from HDFS(
"uri" = "${uri}",
- "fs.defaultFS"= "${defaultFS}",
"hadoop.username" = "${hdfsUserName}",
+ "column_separator" = ",",
"format" = "${format}") order by c1; """
// test csv_with_names file format
@@ -86,8 +84,8 @@ suite("test_hdfs_tvf") {
format = "csv_with_names"
qt_csv_names """ select cast(id as INT) as id, name, age from HDFS(
"uri" = "${uri}",
- "fs.defaultFS"= "${defaultFS}",
"hadoop.username" = "${hdfsUserName}",
+ "column_separator" = ",",
"format" = "${format}") order by id; """
// test csv_with_names_and_types file format
@@ -95,8 +93,8 @@ suite("test_hdfs_tvf") {
format = "csv_with_names_and_types"
qt_csv_names_types """ select cast(id as INT) as id, name, age
from HDFS(
"uri" = "${uri}",
- "fs.defaultFS"= "${defaultFS}",
"hadoop.username" = "${hdfsUserName}",
+ "column_separator" = ",",
"format" = "${format}") order by id; """
@@ -105,7 +103,6 @@ suite("test_hdfs_tvf") {
format = "parquet"
qt_parquet """ select * from HDFS(
"uri" = "${uri}",
- "fs.defaultFS"= "${defaultFS}",
"hadoop.username" = "${hdfsUserName}",
"format" = "${format}") order by s_suppkey limit
20; """
@@ -114,7 +111,6 @@ suite("test_hdfs_tvf") {
format = "orc"
qt_orc """ select * from HDFS(
"uri" = "${uri}",
- "fs.defaultFS"= "${defaultFS}",
"hadoop.username" = "${hdfsUserName}",
"format" = "${format}") order by p_partkey limit
20; """
@@ -124,7 +120,6 @@ suite("test_hdfs_tvf") {
format = "json"
qt_json """ select * from HDFS(
"uri" = "${uri}",
- "fs.defaultFS"= "${defaultFS}",
"hadoop.username" = "${hdfsUserName}",
"format" = "${format}",
"strip_outer_array" = "false",
@@ -135,7 +130,6 @@ suite("test_hdfs_tvf") {
format = "json"
qt_json_limit1 """ select * from HDFS(
"uri" = "${uri}",
- "fs.defaultFS"= "${defaultFS}",
"hadoop.username" = "${hdfsUserName}",
"format" = "${format}",
"strip_outer_array" = "false",
@@ -145,7 +139,6 @@ suite("test_hdfs_tvf") {
format = "json"
qt_json_limit2 """ select * from HDFS(
"uri" = "${uri}",
- "fs.defaultFS"= "${defaultFS}",
"hadoop.username" = "${hdfsUserName}",
"format" = "${format}",
"strip_outer_array" = "true",
@@ -154,7 +147,6 @@ suite("test_hdfs_tvf") {
format = "json"
qt_json_limit3 """ select * from HDFS(
"uri" = "${uri}",
- "fs.defaultFS"= "${defaultFS}",
"hadoop.username" = "${hdfsUserName}",
"format" = "${format}",
"strip_outer_array" = "false",
@@ -163,7 +155,6 @@ suite("test_hdfs_tvf") {
format = "json"
qt_json_limit4 """ select * from HDFS(
"uri" = "${uri}",
- "fs.defaultFS"= "${defaultFS}",
"hadoop.username" = "${hdfsUserName}",
"format" = "${format}",
"strip_outer_array" = "false",
@@ -175,7 +166,6 @@ suite("test_hdfs_tvf") {
format = "json"
qt_json_root """ select cast(id as INT) as id, city, cast(code as
INT) as code from HDFS(
"uri" = "${uri}",
- "fs.defaultFS"= "${defaultFS}",
"hadoop.username" = "${hdfsUserName}",
"format" = "${format}",
"strip_outer_array" = "false",
@@ -187,7 +177,6 @@ suite("test_hdfs_tvf") {
format = "json"
qt_json_paths """ select cast(id as INT) as id, cast(code as INT)
as code from HDFS(
"uri" = "${uri}",
- "fs.defaultFS"= "${defaultFS}",
"hadoop.username" = "${hdfsUserName}",
"format" = "${format}",
"strip_outer_array" = "false",
@@ -199,7 +188,6 @@ suite("test_hdfs_tvf") {
format = "json"
qt_one_array """ select cast(id as INT) as id, city, cast(code as
INT) as code from HDFS(
"uri" = "${uri}",
- "fs.defaultFS"= "${defaultFS}",
"hadoop.username" = "${hdfsUserName}",
"format" = "${format}",
"strip_outer_array" = "true",
@@ -211,7 +199,6 @@ suite("test_hdfs_tvf") {
format = "json"
qt_cast """ select cast(id as INT) as id, city, cast(code as INT)
as code from HDFS(
"uri" = "${uri}",
- "fs.defaultFS"= "${defaultFS}",
"hadoop.username" = "${hdfsUserName}",
"format" = "${format}",
"strip_outer_array" = "false",
@@ -240,7 +227,6 @@ suite("test_hdfs_tvf") {
select cast (id as INT) as id, city, cast (code as INT) as
code
from HDFS(
"uri" = "${uri}",
- "fs.defaultFS"= "${defaultFS}",
"hadoop.username" = "${hdfsUserName}",
"format" = "${format}",
"strip_outer_array" = "false",
@@ -256,7 +242,6 @@ suite("test_hdfs_tvf") {
format = "parquet"
qt_desc """ desc function HDFS(
"uri" = "${uri}",
- "fs.defaultFS"= "${defaultFS}",
"hadoop.username" = "${hdfsUserName}",
"format" = "${format}"); """
} finally {
diff --git
a/regression-test/suites/external_table_p2/tvf/test_hdfs_tvf_compression.groovy
b/regression-test/suites/external_table_p2/tvf/test_hdfs_tvf_compression.groovy
index 40dc3c24405..d71e07487ca 100644
---
a/regression-test/suites/external_table_p2/tvf/test_hdfs_tvf_compression.groovy
+++
b/regression-test/suites/external_table_p2/tvf/test_hdfs_tvf_compression.groovy
@@ -30,7 +30,6 @@ suite("test_hdfs_tvf_compression",
"p2,external,tvf,external_remote,external_rem
qt_gz_1 """
select ${select_field} from HDFS(
"uri" = "${baseUri}/dt=gzip/000000_0.gz",
- "fs.defaultFS"= "${baseFs}",
"hadoop.username" = "hadoop",
"format" = "csv",
"column_separator" = '\001',
@@ -40,7 +39,6 @@ suite("test_hdfs_tvf_compression",
"p2,external,tvf,external_remote,external_rem
qt_gz_2 """
desc function HDFS(
"uri" = "${baseUri}/dt=gzip/000000_0.gz",
- "fs.defaultFS"= "${baseFs}",
"hadoop.username" = "hadoop",
"format" = "csv",
"column_separator" = '\001',
@@ -52,7 +50,6 @@ suite("test_hdfs_tvf_compression",
"p2,external,tvf,external_remote,external_rem
select ${select_field} from
HDFS(
"uri" = "${baseUri}/dt=bzip2/000000_0.bz2",
- "fs.defaultFS"= "${baseFs}",
"hadoop.username" = "hadoop",
"format" = "csv",
"column_separator" = '\001',
@@ -64,7 +61,6 @@ suite("test_hdfs_tvf_compression",
"p2,external,tvf,external_remote,external_rem
select ${select_field} from
HDFS(
"uri" = "${baseUri}/dt=deflate/000000_0_copy_1.deflate",
- "fs.defaultFS"= "${baseFs}",
"hadoop.username" = "hadoop",
"format" = "csv",
"column_separator" = '\001',
@@ -75,7 +71,6 @@ suite("test_hdfs_tvf_compression",
"p2,external,tvf,external_remote,external_rem
select c7 from
HDFS(
"uri" = "${baseUri}/dt=deflate/000000_0_copy_1.deflate",
- "fs.defaultFS"= "${baseFs}",
"hadoop.username" = "hadoop",
"format" = "csv",
"column_separator" = '\001',
@@ -88,7 +83,6 @@ suite("test_hdfs_tvf_compression",
"p2,external,tvf,external_remote,external_rem
select ${select_field} from
HDFS(
"uri" = "${baseUri}/dt=plain/000000_0",
- "fs.defaultFS"= "${baseFs}",
"hadoop.username" = "hadoop",
"format" = "csv",
"column_separator" = '\001',
@@ -99,7 +93,6 @@ suite("test_hdfs_tvf_compression",
"p2,external,tvf,external_remote,external_rem
select c3,c4,c10 from
HDFS(
"uri" = "${baseUri}/dt=plain/000000_0",
- "fs.defaultFS"= "${baseFs}",
"hadoop.username" = "hadoop",
"format" = "csv",
"column_separator" = '\001',
@@ -114,7 +107,6 @@ suite("test_hdfs_tvf_compression",
"p2,external,tvf,external_remote,external_rem
select count(*) from
HDFS(
"uri" =
"${test_data_dir}/test_data/ckbench_hits.part-00000.snappy.parquet",
- "fs.defaultFS" = "${baseFs}",
"format" = "parquet"
);
"""
@@ -124,7 +116,6 @@ suite("test_hdfs_tvf_compression",
"p2,external,tvf,external_remote,external_rem
select count(*) from
HDFS(
"uri" =
"${test_data_dir}/test_data/ckbench_hits.part-00000.snappy.parquet",
- "fs.defaultFS" = "${baseFs}",
"format" = "parquet"
);
"""
@@ -135,7 +126,6 @@ suite("test_hdfs_tvf_compression",
"p2,external,tvf,external_remote,external_rem
select count(*) from
HDFS(
"uri" = "${test_data_dir}/test_data/ckbench_hits.000000_0.orc",
- "fs.defaultFS" = "${baseFs}",
"format" = "orc"
);
"""
@@ -145,7 +135,6 @@ suite("test_hdfs_tvf_compression",
"p2,external,tvf,external_remote,external_rem
select count(*) from
HDFS(
"uri" = "${test_data_dir}/test_data/ckbench_hits.000000_0.orc",
- "fs.defaultFS" = "${baseFs}",
"format" = "orc"
);
"""
@@ -156,7 +145,6 @@ suite("test_hdfs_tvf_compression",
"p2,external,tvf,external_remote,external_rem
select count(*) from
HDFS(
"uri" =
"${test_data_dir}/test_data/tpcds_catalog_returns_data-m-00000.txt",
- "fs.defaultFS" = "${baseFs}",
"format" = "csv"
);
"""
@@ -166,7 +154,6 @@ suite("test_hdfs_tvf_compression",
"p2,external,tvf,external_remote,external_rem
select count(*) from
HDFS(
"uri" =
"${test_data_dir}/test_data/tpcds_catalog_returns_data-m-00000.txt",
- "fs.defaultFS" = "${baseFs}",
"format" = "csv"
);
"""
diff --git
a/regression-test/suites/external_table_p2/tvf/test_path_partition_keys.groovy
b/regression-test/suites/external_table_p2/tvf/test_path_partition_keys.groovy
index ad572936aec..dcd98af2f9d 100644
---
a/regression-test/suites/external_table_p2/tvf/test_path_partition_keys.groovy
+++
b/regression-test/suites/external_table_p2/tvf/test_path_partition_keys.groovy
@@ -27,7 +27,6 @@ suite("test_path_partition_keys",
"p2,external,tvf,external_remote,external_remo
order_qt_hdfs_1 """
select * from HDFS(
"uri" = "${baseUri}/dt1=cyw/*",
- "fs.defaultFS"= "${baseFs}",
"hadoop.username" = "hadoop",
"format" = "csv",
"path_partition_keys"="dt1" ) order by c1,c2 ;
@@ -36,7 +35,6 @@ suite("test_path_partition_keys",
"p2,external,tvf,external_remote,external_remo
order_qt_hdfs_2 """
select * from HDFS(
"uri" = "${baseUri}/dt1=cyw/*",
- "fs.defaultFS"= "${baseFs}",
"hadoop.username" = "hadoop",
"format" = "csv",
"path_partition_keys"="dt1") where dt1!="cyw" order by c1,c2 limit
3;
@@ -45,7 +43,6 @@ suite("test_path_partition_keys",
"p2,external,tvf,external_remote,external_remo
order_qt_hdfs_3 """
select dt1,c1,count(*) from HDFS(
"uri" = "${baseUri}/dt1=hello/*",
- "fs.defaultFS"= "${baseFs}",
"hadoop.username" = "hadoop",
"format" = "csv",
"path_partition_keys"="dt1") group by c1,dt1 order by c1;
@@ -54,7 +51,6 @@ suite("test_path_partition_keys",
"p2,external,tvf,external_remote,external_remo
order_qt_hdfs_4 """
select * from HDFS(
"uri" = "${baseUri}/dt2=two/dt1=hello/*",
- "fs.defaultFS"= "${baseFs}",
"hadoop.username" = "hadoop",
"format" = "csv",
"path_partition_keys"="dt1") order by c1;
@@ -63,7 +59,6 @@ suite("test_path_partition_keys",
"p2,external,tvf,external_remote,external_remo
order_qt_hdfs_5 """
select * from HDFS(
"uri" = "${baseUri}/dt2=two/dt1=cyw/*",
- "fs.defaultFS"= "${baseFs}",
"hadoop.username" = "hadoop",
"format" = "csv",
"path_partition_keys"="dt2,dt1");
diff --git
a/regression-test/suites/external_table_p2/tvf/test_s3_tvf_compression.groovy
b/regression-test/suites/external_table_p2/tvf/test_s3_tvf_compression.groovy
index 57cfdb136d0..279fcb5e8a5 100644
---
a/regression-test/suites/external_table_p2/tvf/test_s3_tvf_compression.groovy
+++
b/regression-test/suites/external_table_p2/tvf/test_s3_tvf_compression.groovy
@@ -34,6 +34,7 @@ suite("test_s3_tvf_compression",
"p2,external,tvf,external_remote,external_remot
"s3.secret_key" = "${sk}",
"REGION" = "${region}",
"FORMAT" = "csv",
+ "column_separator" = ",",
"use_path_style" = "true",
"compress_type" ="${compress_type}") order by c1,c2,c3,c4,c5 limit 20;
"""
@@ -47,6 +48,7 @@ suite("test_s3_tvf_compression",
"p2,external,tvf,external_remote,external_remot
"s3.secret_key" = "${sk}",
"REGION" = "${region}",
"FORMAT" = "csv",
+ "column_separator" = ",",
"use_path_style" = "true",
"compress_type" ="${compress_type}") order by cast(c1 as int),c4 limit
20;
"""
@@ -62,6 +64,7 @@ suite("test_s3_tvf_compression",
"p2,external,tvf,external_remote,external_remot
"s3.secret_key" = "${sk}",
"REGION" = "${region}",
"FORMAT" = "csv",
+ "column_separator" = ",",
"use_path_style" = "true",
"compress_type" ="${compress_type}") order by c1,c2,c3,c4,c5 limit 15;
"""
@@ -75,6 +78,7 @@ suite("test_s3_tvf_compression",
"p2,external,tvf,external_remote,external_remot
"s3.secret_key" = "${sk}",
"REGION" = "${region}",
"FORMAT" = "csv",
+ "column_separator" = ",",
"use_path_style" = "true",
"compress_type" ="${compress_type}") where c1!="100" order by
cast(c4 as date),c1 limit 13;
"""
@@ -90,6 +94,7 @@ suite("test_s3_tvf_compression",
"p2,external,tvf,external_remote,external_remot
"s3.secret_key" = "${sk}",
"REGION" = "${region}",
"FORMAT" = "csv",
+ "column_separator" = ",",
"use_path_style" = "true",
"compress_type" ="${compress_type}FRAME") order by c1,c2,c3,c4,c5
limit 14;
"""
@@ -103,6 +108,7 @@ suite("test_s3_tvf_compression",
"p2,external,tvf,external_remote,external_remot
"s3.secret_key" = "${sk}",
"REGION" = "${region}",
"FORMAT" = "csv",
+ "column_separator" = ",",
"use_path_style" = "true",
"compress_type" ="${compress_type}FRAME") where
c3="buHDwfGeNHfpRFdNaogneddi" order by c3,c1 limit 14;
"""
diff --git a/regression-test/suites/external_table_p2/tvf/test_tvf_p2.groovy
b/regression-test/suites/external_table_p2/tvf/test_tvf_p2.groovy
index 202bc6b9148..08776efd8ec 100644
--- a/regression-test/suites/external_table_p2/tvf/test_tvf_p2.groovy
+++ b/regression-test/suites/external_table_p2/tvf/test_tvf_p2.groovy
@@ -23,43 +23,41 @@ suite("test_tvf_p2", "p2") {
qt_eof_check """select * from hdfs(
"uri" =
"hdfs://${nameNodeHost}:${hdfsPort}/catalog/tvf/parquet/bad_store_sales.parquet",
- "fs.defaultFS" = "hdfs://${nameNodeHost}:${hdfsPort}",
"format" = "parquet")
where ss_store_sk = 4 and ss_addr_sk is null order by ss_item_sk"""
// array_ancestor_null.parquet is parquet file whose values in the
array column are all nulls in a page
qt_array_ancestor_null """select count(list_double_col) from hdfs(
"uri" =
"hdfs://${nameNodeHost}:${hdfsPort}/catalog/tvf/parquet/array_ancestor_null.parquet",
- "format" = "parquet",
- "fs.defaultFS" = "hdfs://${nameNodeHost}:${hdfsPort}")"""
+ "format" = "parquet");
+ """
// all_nested_types.parquet is parquet file that contains all complext
types
qt_nested_types_parquet """select count(array0), count(array1),
count(array2), count(array3), count(struct0), count(struct1), count(map0)
from hdfs(
"uri" =
"hdfs://${nameNodeHost}:${hdfsPort}/catalog/tvf/parquet/all_nested_types.parquet",
- "format" = "parquet",
- "fs.defaultFS" = "hdfs://${nameNodeHost}:${hdfsPort}")"""
+ "format" = "parquet");
+ """
// all_nested_types.orc is orc file that contains all complext types
qt_nested_types_orc """select count(array0), count(array1),
count(array2), count(array3), count(struct0), count(struct1), count(map0)
from hdfs(
"uri" =
"hdfs://${nameNodeHost}:${hdfsPort}/catalog/tvf/orc/all_nested_types.orc",
- "format" = "orc",
- "fs.defaultFS" = "hdfs://${nameNodeHost}:${hdfsPort}")"""
+ "format" = "orc");
+ """
// a row of complex type may be stored across more pages
qt_row_cross_pages """select count(id), count(m1), count(m2)
from hdfs(
"uri" =
"hdfs://${nameNodeHost}:${hdfsPort}/catalog/tvf/parquet/row_cross_pages.parquet",
- "format" = "parquet",
- "fs.defaultFS" = "hdfs://${nameNodeHost}:${hdfsPort}")"""
+ "format" = "parquet");
+ """
// viewfs
qt_viewfs """select count(id), count(m1), count(m2)
from hdfs(
"uri" =
"viewfs://my-cluster/ns1/catalog/tvf/parquet/row_cross_pages.parquet",
"format" = "parquet",
- "fs.defaultFS" = "viewfs://my-cluster",
"fs.viewfs.mounttable.my-cluster.link./ns1" =
"hdfs://${nameNodeHost}:${hdfsPort}/",
"fs.viewfs.mounttable.my-cluster.homedir" = "/ns1")"""
}
diff --git
a/regression-test/suites/external_table_p2/tvf/test_tvf_view_count_p2.groovy
b/regression-test/suites/external_table_p2/tvf/test_tvf_view_count_p2.groovy
index 6510571427e..32b9bac9c70 100644
--- a/regression-test/suites/external_table_p2/tvf/test_tvf_view_count_p2.groovy
+++ b/regression-test/suites/external_table_p2/tvf/test_tvf_view_count_p2.groovy
@@ -26,8 +26,7 @@ suite("test_tvf_view_count_p2",
"p2,external,tvf,external_remote,external_remote
sql """use test_tvf_view_count_p2"""
sql """set enable_nereids_planner=false"""
sql """create view tvf_view_count as select * from hdfs (
-
"uri"="hdfs://${nameNodeHost}:${hdfsPort}:/usr/hive/warehouse/tpch_1000_parquet.db/part/000091_0",
- "fs.defaultFS"="hdfs://${nameNodeHost}:${hdfsPort}",
+
"uri"="hdfs://${nameNodeHost}:${hdfsPort}/usr/hive/warehouse/tpch_1000_parquet.db/part/000091_0",
"hadoop.username" = "hadoop",
"format"="parquet");"""
diff --git
a/regression-test/suites/external_table_p2/tvf/test_tvf_view_p2.groovy
b/regression-test/suites/external_table_p2/tvf/test_tvf_view_p2.groovy
index 2323fcaff8a..8939154bb53 100644
--- a/regression-test/suites/external_table_p2/tvf/test_tvf_view_p2.groovy
+++ b/regression-test/suites/external_table_p2/tvf/test_tvf_view_p2.groovy
@@ -26,8 +26,7 @@ suite("test_tvf_view_p2",
"p2,external,tvf,external_remote,external_remote_tvf")
sql """use test_tvf_view_p2"""
sql """set enable_fallback_to_original_planner=false"""
sql """create view tvf_view as select * from hdfs (
-
"uri"="hdfs://${nameNodeHost}:${hdfsPort}:/usr/hive/warehouse/tpch_1000_parquet.db/part/000091_0",
- "fs.defaultFS"="hdfs://${nameNodeHost}:${hdfsPort}",
+
"uri"="hdfs://${nameNodeHost}:${hdfsPort}/usr/hive/warehouse/tpch_1000_parquet.db/part/000091_0",
"hadoop.username" = "hadoop",
"format"="parquet");"""
@@ -48,8 +47,7 @@ suite("test_tvf_view_p2",
"p2,external,tvf,external_remote,external_remote_tvf")
}
explain{
sql("select * from hdfs (\n" +
- "
\"uri\"=\"hdfs://${nameNodeHost}:${hdfsPort}:/usr/hive/warehouse/tpch_1000_parquet.db/part/000091_0\",\n"
+
- "
\"fs.defaultFS\"=\"hdfs://${nameNodeHost}:${hdfsPort}\",\n" +
+ "
\"uri\"=\"hdfs://${nameNodeHost}:${hdfsPort}/usr/hive/warehouse/tpch_1000_parquet.db/part/000091_0\",\n"
+
" \"hadoop.username\" = \"hadoop\",\n" +
" \"format\"=\"parquet\")")
contains("_table_valued_function_hdfs.p_partkey")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]