This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 5593731566e [fix][refactor] refactor schema init of externa table and
some parquet issue (#30325)
5593731566e is described below
commit 5593731566e8fb1ad2a554bc5ec1c48fb8f994ba
Author: Mingyu Chen <[email protected]>
AuthorDate: Wed Jan 31 18:33:30 2024 +0800
[fix][refactor] refactor schema init of externa table and some parquet
issue (#30325)
1. Skip parquet file which has only 4 bytes length: PAR1
2. Refactor the schema init method of iceberg/hudi/hive table in hms catalog
1. Remove some redundant methods of `getIcebergTable`
2. Fix issue described in #23771
3. Support HoodieParquetInputFormatBase, treat it as normal hive table
format
4. When listing file, skip all hidden dirs and files
---
.gitignore | 2 +
be/src/vec/exec/format/parquet/vparquet_reader.cpp | 9 ++-
.../doris/catalog/HiveMetaStoreClientHelper.java | 2 +-
.../doris/catalog/external/HMSExternalTable.java | 73 +++++++++---------
.../catalog/external/IcebergExternalTable.java | 76 +-----------------
.../doris/datasource/hive/HiveMetaStoreCache.java | 18 ++---
.../datasource/iceberg/IcebergExternalCatalog.java | 2 +-
.../apache/doris/external/hive/util/HiveUtil.java | 44 ++---------
.../doris/external/iceberg/util/IcebergUtils.java | 90 ++++++++++++++++++++--
.../apache/doris/job/extensions/mtmv/MTMVTask.java | 4 +
.../planner/external/iceberg/IcebergApiSource.java | 9 +--
.../planner/external/iceberg/IcebergHMSSource.java | 4 +-
.../external/iceberg/IcebergMetadataCache.java | 80 ++++---------------
.../planner/external/iceberg/IcebergScanNode.java | 3 +-
.../doris/statistics/util/StatisticsUtil.java | 2 +-
15 files changed, 175 insertions(+), 243 deletions(-)
diff --git a/.gitignore b/.gitignore
index a8ad35b55d5..2513a796189 100644
--- a/.gitignore
+++ b/.gitignore
@@ -54,6 +54,8 @@ thirdparty/installed*
thirdparty/doris-thirdparty*.tar.xz
docker/thirdparties/docker-compose/mysql/data
+docker/thirdparties/docker-compose/hive/scripts/tpch1.db/
+docker/thirdparties/docker-compose/hive/scripts/paimon1
fe_plugins/output
fe_plugins/**/.factorypath
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index 6ae4ea2f5bc..dd8d01f4bb1 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -238,8 +238,11 @@ Status ParquetReader::_open_file() {
}
if (_file_metadata == nullptr) {
SCOPED_RAW_TIMER(&_statistics.parse_footer_time);
- if (_file_reader->size() == 0) {
- return Status::EndOfFile("open file failed, empty parquet file: "
+ _scan_range.path);
+ if (_file_reader->size() <= sizeof(PARQUET_VERSION_NUMBER)) {
+ // Some system may generate parquet file with only 4 bytes: PAR1
+ // Should consider it as empty file.
+ return Status::EndOfFile("open file failed, empty parquet file {}
with size: {}",
+ _scan_range.path, _file_reader->size());
}
size_t meta_size = 0;
if (_meta_cache == nullptr) {
@@ -928,4 +931,4 @@ int64_t ParquetReader::_get_column_start_offset(const
tparquet::ColumnMetaData&
}
return column.data_page_offset;
}
-} // namespace doris::vectorized
\ No newline at end of file
+} // namespace doris::vectorized
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
index 96419cd0b0a..7d0419fcbf9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
@@ -770,7 +770,7 @@ public class HiveMetaStoreClientHelper {
try {
hudiSchema =
HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchema());
} catch (Exception e) {
- throw new RuntimeException("Cannot get hudi table schema.");
+ throw new RuntimeException("Cannot get hudi table schema.", e);
}
return hudiSchema;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
index e1037ffd025..028928eafce 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
@@ -32,6 +32,7 @@ import org.apache.doris.datasource.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HMSCachedClient;
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
import org.apache.doris.datasource.hive.HivePartition;
+import org.apache.doris.external.iceberg.util.IcebergUtils;
import org.apache.doris.mtmv.MTMVRelatedTableIf;
import org.apache.doris.nereids.exceptions.NotSupportedException;
import org.apache.doris.statistics.AnalysisInfo;
@@ -62,8 +63,6 @@ import
org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
import org.apache.hadoop.hive.ql.io.AcidUtils;
-import org.apache.iceberg.Schema;
-import org.apache.iceberg.Table;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -87,8 +86,10 @@ import java.util.stream.Collectors;
public class HMSExternalTable extends ExternalTable implements
MTMVRelatedTableIf {
private static final Logger LOG =
LogManager.getLogger(HMSExternalTable.class);
- private static final Set<String> SUPPORTED_HIVE_FILE_FORMATS;
- private static final Set<String> SUPPORTED_HIVE_TRANSACTIONAL_FILE_FORMATS;
+ public static final Set<String> SUPPORTED_HIVE_FILE_FORMATS;
+ public static final Set<String> SUPPORTED_HIVE_TRANSACTIONAL_FILE_FORMATS;
+ public static final Set<String> SUPPORTED_HUDI_FILE_FORMATS;
+
private static final Map<StatsType, String> MAP_SPARK_STATS_TO_DORIS;
private static final String TBL_PROP_TXN_PROPERTIES =
"transactional_properties";
private static final String TBL_PROP_INSERT_ONLY = "insert_only";
@@ -111,13 +112,16 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
SUPPORTED_HIVE_FILE_FORMATS.add("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat");
SUPPORTED_HIVE_FILE_FORMATS.add("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat");
SUPPORTED_HIVE_FILE_FORMATS.add("org.apache.hadoop.mapred.TextInputFormat");
+ // Some hudi table use HoodieParquetInputFormatBase as input format
+ // But we can't treat it as hudi table.
+ // So add to SUPPORTED_HIVE_FILE_FORMATS and treat is as a hive table.
+ // Then Doris will just list the files from location and read parquet
files directly.
+
SUPPORTED_HIVE_FILE_FORMATS.add("org.apache.hudi.hadoop.HoodieParquetInputFormatBase");
SUPPORTED_HIVE_TRANSACTIONAL_FILE_FORMATS = Sets.newHashSet();
SUPPORTED_HIVE_TRANSACTIONAL_FILE_FORMATS.add("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat");
}
- private static final Set<String> SUPPORTED_HUDI_FILE_FORMATS;
-
static {
SUPPORTED_HUDI_FILE_FORMATS = Sets.newHashSet();
SUPPORTED_HUDI_FILE_FORMATS.add("org.apache.hudi.hadoop.HoodieParquetInputFormat");
@@ -405,10 +409,6 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
return ((HMSExternalCatalog) catalog).getHiveMetastoreUris();
}
- public String getHiveVersion() {
- return ((HMSExternalCatalog) catalog).getHiveVersion();
- }
-
public Map<String, String> getCatalogProperties() {
return catalog.getProperties();
}
@@ -454,32 +454,28 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
return initSchema();
}
-
@Override
public List<Column> initSchema() {
makeSureInitialized();
List<Column> columns;
- List<FieldSchema> schema = ((HMSExternalCatalog)
catalog).getClient().getSchema(dbName, name);
if (dlaType.equals(DLAType.ICEBERG)) {
- columns = getIcebergSchema(schema);
+ columns = getIcebergSchema();
} else if (dlaType.equals(DLAType.HUDI)) {
- columns = getHudiSchema(schema);
+ columns = getHudiSchema();
} else {
- List<Column> tmpSchema =
Lists.newArrayListWithCapacity(schema.size());
- for (FieldSchema field : schema) {
- tmpSchema.add(new
Column(field.getName().toLowerCase(Locale.ROOT),
-
HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType()), true, null,
- true, field.getComment(), true, -1));
- }
- columns = tmpSchema;
+ columns = getHiveSchema();
}
initPartitionColumns(columns);
return columns;
}
- public List<Column> getHudiSchema(List<FieldSchema> hmsSchema) {
+ private List<Column> getIcebergSchema() {
+ return IcebergUtils.getSchema(catalog, dbName, name);
+ }
+
+ private List<Column> getHudiSchema() {
org.apache.avro.Schema hudiSchema =
HiveMetaStoreClientHelper.getHudiTableSchema(this);
- List<Column> tmpSchema =
Lists.newArrayListWithCapacity(hmsSchema.size());
+ List<Column> tmpSchema =
Lists.newArrayListWithCapacity(hudiSchema.getFields().size());
for (org.apache.avro.Schema.Field hudiField : hudiSchema.getFields()) {
String columnName = hudiField.name().toLowerCase(Locale.ROOT);
tmpSchema.add(new Column(columnName,
HudiUtils.fromAvroHudiTypeToDorisType(hudiField.schema()),
@@ -488,6 +484,19 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
return tmpSchema;
}
+ private List<Column> getHiveSchema() {
+ List<Column> columns;
+ List<FieldSchema> schema = ((HMSExternalCatalog)
catalog).getClient().getSchema(dbName, name);
+ List<Column> tmpSchema = Lists.newArrayListWithCapacity(schema.size());
+ for (FieldSchema field : schema) {
+ tmpSchema.add(new Column(field.getName().toLowerCase(Locale.ROOT),
+
HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType()), true, null,
+ true, field.getComment(), true, -1));
+ }
+ columns = tmpSchema;
+ return columns;
+ }
+
@Override
public long getCacheRowCount() {
//Cached accurate information
@@ -528,20 +537,6 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
return 1;
}
- private List<Column> getIcebergSchema(List<FieldSchema> hmsSchema) {
- Table icebergTable =
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache().getIcebergTable(this);
- Schema schema = icebergTable.schema();
- List<Column> tmpSchema =
Lists.newArrayListWithCapacity(hmsSchema.size());
- for (FieldSchema field : hmsSchema) {
- tmpSchema.add(new Column(field.getName().toLowerCase(Locale.ROOT),
-
HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType(),
- IcebergExternalTable.ICEBERG_DATETIME_SCALE_MS),
- true, null, true, false, null, field.getComment(), true,
null,
-
schema.caseInsensitiveFindField(field.getName()).fieldId(), null));
- }
- return tmpSchema;
- }
-
private void initPartitionColumns(List<Column> schema) {
List<String> partitionKeys =
remoteTable.getPartitionKeys().stream().map(FieldSchema::getName)
.collect(Collectors.toList());
@@ -598,7 +593,9 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
return getHiveColumnStats(colName);
case ICEBERG:
return StatisticsUtil.getIcebergColumnStats(colName,
-
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache().getIcebergTable(this));
+
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache().getIcebergTable(
+ catalog, dbName, name
+ ));
default:
LOG.warn("get column stats for dlaType {} is not supported.",
dlaType);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java
index be99e26de62..be320fc9268 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java
@@ -17,12 +17,10 @@
package org.apache.doris.catalog.external;
-import org.apache.doris.catalog.ArrayType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.HiveMetaStoreClientHelper;
-import org.apache.doris.catalog.ScalarType;
-import org.apache.doris.catalog.Type;
import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
+import org.apache.doris.external.iceberg.util.IcebergUtils;
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.BaseAnalysisTask;
import org.apache.doris.statistics.ColumnStatistic;
@@ -33,21 +31,11 @@ import org.apache.doris.thrift.TIcebergTable;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;
-import com.google.common.collect.Lists;
-import org.apache.iceberg.Schema;
-import org.apache.iceberg.types.Types;
-
import java.util.HashMap;
import java.util.List;
-import java.util.Locale;
import java.util.Optional;
public class IcebergExternalTable extends ExternalTable {
-
- // https://iceberg.apache.org/spec/#schemas-and-data-types
- // All time and timestamp values are stored with microsecond precision
- public static final int ICEBERG_DATETIME_SCALE_MS = 6;
-
public IcebergExternalTable(long id, String name, String dbName,
IcebergExternalCatalog catalog) {
super(id, name, catalog, dbName, TableType.ICEBERG_EXTERNAL_TABLE);
}
@@ -65,66 +53,8 @@ public class IcebergExternalTable extends ExternalTable {
@Override
public List<Column> initSchema() {
- return HiveMetaStoreClientHelper.ugiDoAs(catalog.getConfiguration(),
() -> {
- Schema schema = ((IcebergExternalCatalog)
catalog).getIcebergTable(dbName, name).schema();
- List<Types.NestedField> columns = schema.columns();
- List<Column> tmpSchema =
Lists.newArrayListWithCapacity(columns.size());
- for (Types.NestedField field : columns) {
- tmpSchema.add(new Column(field.name().toLowerCase(Locale.ROOT),
- icebergTypeToDorisType(field.type()), true, null,
true, field.doc(), true,
-
schema.caseInsensitiveFindField(field.name()).fieldId()));
- }
- return tmpSchema;
- });
- }
-
- private Type
icebergPrimitiveTypeToDorisType(org.apache.iceberg.types.Type.PrimitiveType
primitive) {
- switch (primitive.typeId()) {
- case BOOLEAN:
- return Type.BOOLEAN;
- case INTEGER:
- return Type.INT;
- case LONG:
- return Type.BIGINT;
- case FLOAT:
- return Type.FLOAT;
- case DOUBLE:
- return Type.DOUBLE;
- case STRING:
- case BINARY:
- case UUID:
- return Type.STRING;
- case FIXED:
- Types.FixedType fixed = (Types.FixedType) primitive;
- return ScalarType.createCharType(fixed.length());
- case DECIMAL:
- Types.DecimalType decimal = (Types.DecimalType) primitive;
- return ScalarType.createDecimalV3Type(decimal.precision(),
decimal.scale());
- case DATE:
- return ScalarType.createDateV2Type();
- case TIMESTAMP:
- return
ScalarType.createDatetimeV2Type(ICEBERG_DATETIME_SCALE_MS);
- case TIME:
- return Type.UNSUPPORTED;
- default:
- throw new IllegalArgumentException("Cannot transform unknown
type: " + primitive);
- }
- }
-
- protected Type icebergTypeToDorisType(org.apache.iceberg.types.Type type) {
- if (type.isPrimitiveType()) {
- return
icebergPrimitiveTypeToDorisType((org.apache.iceberg.types.Type.PrimitiveType)
type);
- }
- switch (type.typeId()) {
- case LIST:
- Types.ListType list = (Types.ListType) type;
- return
ArrayType.create(icebergTypeToDorisType(list.elementType()), true);
- case MAP:
- case STRUCT:
- return Type.UNSUPPORTED;
- default:
- throw new IllegalArgumentException("Cannot transform unknown
type: " + type);
- }
+ makeSureInitialized();
+ return IcebergUtils.getSchema(catalog, dbName, name);
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
index 56fffc41ddd..ebe8d692c75 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
@@ -358,14 +358,14 @@ public class HiveMetaStoreCache {
}
// Get File Status by using FileSystem API.
- private FileCacheValue getFileCache(String location, InputFormat<?, ?>
inputFormat,
- JobConf jobConf,
- List<String> partitionValues,
- String bindBrokerName) throws
UserException {
+ private FileCacheValue getFileCache(String location, String inputFormat,
+ JobConf jobConf,
+ List<String> partitionValues,
+ String bindBrokerName) throws UserException {
FileCacheValue result = new FileCacheValue();
RemoteFileSystem fs =
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new
FileSystemCache.FileSystemCacheKey(LocationPath.getFSIdentity(
- location, bindBrokerName), jobConf, bindBrokerName));
+ location, bindBrokerName), jobConf, bindBrokerName));
result.setSplittable(HiveUtil.isSplittable(fs, inputFormat, location,
jobConf));
try {
// For Tez engine, it may generate subdirectoies for "union" query.
@@ -425,12 +425,12 @@ public class HiveMetaStoreCache {
FileInputFormat.setInputPaths(jobConf, finalLocation.get());
try {
FileCacheValue result;
- InputFormat<?, ?> inputFormat =
HiveUtil.getInputFormat(jobConf, key.inputFormat, false);
// TODO: This is a temp config, will remove it after the
HiveSplitter is stable.
if (key.useSelfSplitter) {
- result = getFileCache(finalLocation.get(), inputFormat,
jobConf,
- key.getPartitionValues(), key.bindBrokerName);
+ result = getFileCache(finalLocation.get(),
key.inputFormat, jobConf,
+ key.getPartitionValues(), key.bindBrokerName);
} else {
+ InputFormat<?, ?> inputFormat =
HiveUtil.getInputFormat(jobConf, key.inputFormat, false);
InputSplit[] splits;
String remoteUser =
jobConf.get(HdfsResource.HADOOP_USER_NAME);
if (!Strings.isNullOrEmpty(remoteUser)) {
@@ -1082,7 +1082,7 @@ public class HiveMetaStoreCache {
private static boolean isGeneratedPath(String name) {
return "_temporary".equals(name) // generated by spark
|| "_imapala_insert_staging".equals(name) // generated by
impala
- || name.startsWith(".hive-staging"); // generated by hive
+ || name.startsWith("."); // generated by hive or hidden
file
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
index c8ff468ab29..fadc60913be 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
@@ -106,6 +106,6 @@ public abstract class IcebergExternalCatalog extends
ExternalCatalog {
return Env.getCurrentEnv()
.getExtMetaCacheMgr()
.getIcebergMetadataCache()
- .getIcebergTable(catalog, id, dbName, tblName,
getProperties());
+ .getIcebergTable(this, dbName, tblName);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java
index deb048b5943..7ed44620a37 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java
@@ -21,16 +21,14 @@ import org.apache.doris.catalog.ArrayType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Type;
+import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
-import org.apache.doris.fs.FileSystemFactory;
import org.apache.doris.fs.remote.BrokerFileSystem;
import org.apache.doris.fs.remote.RemoteFileSystem;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat;
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
@@ -46,10 +44,7 @@ import org.apache.hadoop.util.ReflectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import java.io.IOException;
import java.io.UnsupportedEncodingException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.List;
@@ -134,7 +129,6 @@ public final class HiveUtil {
}
private static Type convertHiveTypeToiveDoris(TypeInfo hiveTypeInfo) {
-
switch (hiveTypeInfo.getCategory()) {
case PRIMITIVE: {
PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo)
hiveTypeInfo;
@@ -190,39 +184,14 @@ public final class HiveUtil {
}
}
- public static boolean isSplittable(RemoteFileSystem remoteFileSystem,
InputFormat<?, ?> inputFormat,
- String location, JobConf jobConf)
throws UserException {
+ public static boolean isSplittable(RemoteFileSystem remoteFileSystem,
String inputFormat,
+ String location, JobConf jobConf) throws UserException {
if (remoteFileSystem instanceof BrokerFileSystem) {
- return ((BrokerFileSystem) remoteFileSystem)
- .isSplittable(location,
inputFormat.getClass().getCanonicalName());
- }
-
- // ORC uses a custom InputFormat but is always splittable
- if (inputFormat.getClass().getSimpleName().equals("OrcInputFormat")) {
- return true;
- }
- // use reflection to get isSplitable method on FileInputFormat
- // ATTN: the method name is actually "isSplitable", but the right
spell is "isSplittable"
- Method method = null;
- for (Class<?> clazz = inputFormat.getClass(); clazz != null; clazz =
clazz.getSuperclass()) {
- try {
- method = clazz.getDeclaredMethod("isSplitable",
FileSystem.class, Path.class);
- break;
- } catch (NoSuchMethodException ignored) {
- LOG.debug("Class {} doesn't contain isSplitable method.",
clazz);
- }
+ return ((BrokerFileSystem)
remoteFileSystem).isSplittable(location, inputFormat);
}
- if (method == null) {
- return false;
- }
- Path path = new Path(location);
- try {
- method.setAccessible(true);
- return (boolean) method.invoke(inputFormat,
FileSystemFactory.getNativeByPath(path, jobConf), path);
- } catch (InvocationTargetException | IllegalAccessException |
IOException e) {
- throw new RuntimeException(e);
- }
+ // All supported hive input format are splittable
+ return
HMSExternalTable.SUPPORTED_HIVE_FILE_FORMATS.contains(inputFormat);
}
public static String getHivePartitionValue(String part) {
@@ -236,5 +205,4 @@ public final class HiveUtil {
throw new RuntimeException(e);
}
}
-
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/IcebergUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/IcebergUtils.java
index 78a0df2ee6d..8a6864aba3a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/IcebergUtils.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/IcebergUtils.java
@@ -33,9 +33,17 @@ import org.apache.doris.analysis.NullLiteral;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.analysis.Subquery;
+import org.apache.doris.catalog.ArrayType;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.HiveMetaStoreClientHelper;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.catalog.Type;
import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.thrift.TExprOpcode;
+import com.google.common.collect.Lists;
import org.apache.iceberg.Schema;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
@@ -45,19 +53,17 @@ import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
+import java.util.Locale;
/**
* Iceberg utils
*/
public class IcebergUtils {
private static final Logger LOG = LogManager.getLogger(IcebergUtils.class);
- private static ThreadLocal<Integer> columnIdThreadLocal = new
ThreadLocal<Integer>() {
- @Override
- public Integer initialValue() {
- return 0;
- }
- };
- static long MILLIS_TO_NANO_TIME = 1000;
+ private static long MILLIS_TO_NANO_TIME = 1000;
+ // https://iceberg.apache.org/spec/#schemas-and-data-types
+ // All time and timestamp values are stored with microsecond precision
+ private static final int ICEBERG_DATETIME_SCALE_MS = 6;
public static Expression convertToIcebergExpr(Expr expr, Schema schema) {
if (expr == null) {
@@ -238,4 +244,74 @@ public class IcebergUtils {
}
return slotRef;
}
+
+ private static Type
icebergPrimitiveTypeToDorisType(org.apache.iceberg.types.Type.PrimitiveType
primitive) {
+ switch (primitive.typeId()) {
+ case BOOLEAN:
+ return Type.BOOLEAN;
+ case INTEGER:
+ return Type.INT;
+ case LONG:
+ return Type.BIGINT;
+ case FLOAT:
+ return Type.FLOAT;
+ case DOUBLE:
+ return Type.DOUBLE;
+ case STRING:
+ case BINARY:
+ case UUID:
+ return Type.STRING;
+ case FIXED:
+ Types.FixedType fixed = (Types.FixedType) primitive;
+ return ScalarType.createCharType(fixed.length());
+ case DECIMAL:
+ Types.DecimalType decimal = (Types.DecimalType) primitive;
+ return ScalarType.createDecimalV3Type(decimal.precision(),
decimal.scale());
+ case DATE:
+ return ScalarType.createDateV2Type();
+ case TIMESTAMP:
+ return
ScalarType.createDatetimeV2Type(ICEBERG_DATETIME_SCALE_MS);
+ case TIME:
+ return Type.UNSUPPORTED;
+ default:
+ throw new IllegalArgumentException("Cannot transform unknown
type: " + primitive);
+ }
+ }
+
+ public static Type icebergTypeToDorisType(org.apache.iceberg.types.Type
type) {
+ if (type.isPrimitiveType()) {
+ return
icebergPrimitiveTypeToDorisType((org.apache.iceberg.types.Type.PrimitiveType)
type);
+ }
+ switch (type.typeId()) {
+ case LIST:
+ Types.ListType list = (Types.ListType) type;
+ return
ArrayType.create(icebergTypeToDorisType(list.elementType()), true);
+ case MAP:
+ case STRUCT:
+ return Type.UNSUPPORTED;
+ default:
+ throw new IllegalArgumentException("Cannot transform unknown
type: " + type);
+ }
+ }
+
+ /**
+ * Get iceberg schema from catalog and convert them to doris schema
+ */
+ public static List<Column> getSchema(ExternalCatalog catalog, String
dbName, String name) {
+ return HiveMetaStoreClientHelper.ugiDoAs(catalog.getConfiguration(),
() -> {
+ org.apache.iceberg.Table icebergTable = Env.getCurrentEnv()
+ .getExtMetaCacheMgr()
+ .getIcebergMetadataCache()
+ .getIcebergTable(catalog, dbName, name);
+ Schema schema = icebergTable.schema();
+ List<Types.NestedField> columns = schema.columns();
+ List<Column> tmpSchema =
Lists.newArrayListWithCapacity(columns.size());
+ for (Types.NestedField field : columns) {
+ tmpSchema.add(new Column(field.name().toLowerCase(Locale.ROOT),
+ IcebergUtils.icebergTypeToDorisType(field.type()),
true, null, true, field.doc(), true,
+
schema.caseInsensitiveFindField(field.name()).fieldId()));
+ }
+ return tmpSchema;
+ });
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
index 1a39f729738..9b5added1b9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
@@ -148,6 +148,10 @@ public class MTMVTask extends AbstractTask {
LOG.info("mtmv task run, taskId: {}", super.getTaskId());
try {
ConnectContext ctx = MTMVPlanUtil.createMTMVContext(mtmv);
+ if (LOG.isDebugEnabled()) {
+ String taskSessionContext =
ctx.getSessionVariable().toJson().toJSONString();
+ LOG.debug("mtmv task session variable, taskId: {}, session:
{}", super.getTaskId(), taskSessionContext);
+ }
// Every time a task is run, the relation is regenerated because
baseTables and baseViews may change,
// such as deleting a table and creating a view with the same name
this.relation = MTMVPlanUtil.generateMTMVRelation(mtmv, ctx);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergApiSource.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergApiSource.java
index 73ac8ed7b39..6eed310927c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergApiSource.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergApiSource.java
@@ -24,7 +24,6 @@ import org.apache.doris.catalog.external.IcebergExternalTable;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.ExternalCatalog;
-import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
import org.apache.doris.planner.ColumnRange;
import org.apache.doris.thrift.TFileAttributes;
@@ -48,11 +47,9 @@ public class IcebergApiSource implements IcebergSource {
this.icebergExtTable = table;
this.originTable =
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache().getIcebergTable(
- ((IcebergExternalCatalog)
icebergExtTable.getCatalog()).getCatalog(),
- icebergExtTable.getCatalog().getId(),
- icebergExtTable.getDbName(),
- icebergExtTable.getName(),
- icebergExtTable.getCatalog().getProperties());
+ icebergExtTable.getCatalog(),
+ icebergExtTable.getDbName(),
+ icebergExtTable.getName());
this.desc = desc;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergHMSSource.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergHMSSource.java
index 478e78c0d0e..62c96930f00 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergHMSSource.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergHMSSource.java
@@ -47,7 +47,9 @@ public class IcebergHMSSource implements IcebergSource {
this.desc = desc;
this.columnNameToRange = columnNameToRange;
this.icebergTable =
-
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache().getIcebergTable(hmsTable);
+
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache()
+ .getIcebergTable(hmsTable.getCatalog(),
+ hmsTable.getDbName(), hmsTable.getName());
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergMetadataCache.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergMetadataCache.java
index 91a208202d0..9f273ca6305 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergMetadataCache.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergMetadataCache.java
@@ -19,9 +19,7 @@ package org.apache.doris.planner.external.iceberg;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.HiveMetaStoreClientHelper;
-import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.common.Config;
-import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.HMSExternalCatalog;
@@ -72,76 +70,44 @@ public class IcebergMetadataCache {
return ifPresent;
}
- Table icebergTable = getIcebergTable(key, catalog,
params.getDatabase(), params.getTable());
+ Table icebergTable = getIcebergTable(catalog, params.getDatabase(),
params.getTable());
List<Snapshot> snaps = Lists.newArrayList();
Iterables.addAll(snaps, icebergTable.snapshots());
snapshotListCache.put(key, snaps);
return snaps;
}
- public Table getIcebergTable(IcebergMetadataCacheKey key, CatalogIf
catalog, String dbName, String tbName)
- throws UserException {
+ public Table getIcebergTable(CatalogIf catalog, String dbName, String
tbName) {
+ IcebergMetadataCacheKey key =
IcebergMetadataCacheKey.of(catalog.getId(), dbName, tbName);
Table cacheTable = tableCache.getIfPresent(key);
if (cacheTable != null) {
return cacheTable;
}
- Table icebergTable;
+ Catalog icebergCatalog;
if (catalog instanceof HMSExternalCatalog) {
HMSExternalCatalog ctg = (HMSExternalCatalog) catalog;
- icebergTable = createIcebergTable(
- ctg.getHiveMetastoreUris(),
- ctg.getCatalogProperty().getHadoopProperties(),
- dbName,
- tbName,
- ctg.getProperties());
+ icebergCatalog = createIcebergHiveCatalog(
+ ctg.getHiveMetastoreUris(),
+ ctg.getCatalogProperty().getHadoopProperties(),
+ ctg.getProperties());
} else if (catalog instanceof IcebergExternalCatalog) {
- IcebergExternalCatalog extCatalog = (IcebergExternalCatalog)
catalog;
- icebergTable = getIcebergTable(
- extCatalog.getCatalog(), extCatalog.getId(), dbName, tbName,
extCatalog.getProperties());
+ icebergCatalog = ((IcebergExternalCatalog) catalog).getCatalog();
} else {
- throw new UserException("Only support 'hms' and 'iceberg' type for
iceberg table");
- }
- tableCache.put(key, icebergTable);
- return icebergTable;
- }
-
- public Table getIcebergTable(IcebergSource icebergSource) throws
MetaNotFoundException {
- return icebergSource.getIcebergTable();
- }
-
- public Table getIcebergTable(HMSExternalTable hmsTable) {
- IcebergMetadataCacheKey key = IcebergMetadataCacheKey.of(
- hmsTable.getCatalog().getId(),
- hmsTable.getDbName(),
- hmsTable.getName());
- Table table = tableCache.getIfPresent(key);
- if (table != null) {
- return table;
+ throw new RuntimeException("Only support 'hms' and 'iceberg' type
for iceberg table");
}
- Table icebergTable = createIcebergTable(hmsTable);
+ Table icebergTable = HiveMetaStoreClientHelper.ugiDoAs(catalog.getId(),
+ () -> icebergCatalog.loadTable(TableIdentifier.of(dbName,
tbName)));
+ initIcebergTableFileIO(icebergTable, catalog.getProperties());
tableCache.put(key, icebergTable);
-
return icebergTable;
}
- public Table getIcebergTable(Catalog catalog, long catalogId, String
dbName, String tbName,
+ private Table getIcebergTable(Catalog catalog, long catalogId, String
dbName, String tbName,
Map<String, String> props) {
- IcebergMetadataCacheKey key = IcebergMetadataCacheKey.of(
- catalogId,
- dbName,
- tbName);
- Table cacheTable = tableCache.getIfPresent(key);
- if (cacheTable != null) {
- return cacheTable;
- }
-
Table table = HiveMetaStoreClientHelper.ugiDoAs(catalogId,
() -> catalog.loadTable(TableIdentifier.of(dbName, tbName)));
initIcebergTableFileIO(table, props);
-
- tableCache.put(key, table);
-
return table;
}
@@ -190,14 +156,12 @@ public class IcebergMetadataCache {
});
}
- private Table createIcebergTable(String uri, Map<String, String> hdfsConf,
String db, String tbl,
- Map<String, String> props) {
+ private Catalog createIcebergHiveCatalog(String uri, Map<String, String>
hdfsConf, Map<String, String> props) {
// set hdfs configure
Configuration conf = new HdfsConfiguration();
for (Map.Entry<String, String> entry : hdfsConf.entrySet()) {
conf.set(entry.getKey(), entry.getValue());
}
-
HiveCatalog hiveCatalog = new HiveCatalog();
hiveCatalog.setConf(conf);
@@ -211,20 +175,10 @@ public class IcebergMetadataCache {
catalogProperties.put("uri", uri);
hiveCatalog.initialize("hive", catalogProperties);
}
- Table table = HiveMetaStoreClientHelper.ugiDoAs(conf, () ->
hiveCatalog.loadTable(TableIdentifier.of(db, tbl)));
- initIcebergTableFileIO(table, props);
- return table;
- }
-
- private Table createIcebergTable(HMSExternalTable hmsTable) {
- return createIcebergTable(hmsTable.getMetastoreUri(),
- hmsTable.getHadoopProperties(),
- hmsTable.getDbName(),
- hmsTable.getName(),
- hmsTable.getCatalogProperties());
+ return hiveCatalog;
}
- private void initIcebergTableFileIO(Table table, Map<String, String>
props) {
+ private static void initIcebergTableFileIO(Table table, Map<String,
String> props) {
Map<String, String> ioConf = new HashMap<>();
table.properties().forEach((key, value) -> {
if (key.startsWith("io.")) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
index a335ccfa021..18b745e402a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
@@ -22,7 +22,6 @@ import org.apache.doris.analysis.FunctionCallExpr;
import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.catalog.HiveMetaStoreClientHelper;
import org.apache.doris.catalog.TableIf;
@@ -128,7 +127,7 @@ public class IcebergScanNode extends FileQueryScanNode {
@Override
protected void doInitialize() throws UserException {
- icebergTable =
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache().getIcebergTable(source);
+ icebergTable = source.getIcebergTable();
super.doInitialize();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
index 40ea594819c..65c4800c585 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
@@ -609,7 +609,7 @@ public class StatisticsUtil {
Table icebergTable = Env.getCurrentEnv()
.getExtMetaCacheMgr()
.getIcebergMetadataCache()
- .getIcebergTable(table);
+ .getIcebergTable(table.getCatalog(), table.getDbName(),
table.getName());
TableScan tableScan = icebergTable.newScan().includeColumnStats();
for (FileScanTask task : tableScan.planFiles()) {
rowCount += task.file().recordCount();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]