This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 97fa840324 [feature](multi-catalog)support iceberg hadoop catalog
external table query (#22949)
97fa840324 is described below
commit 97fa840324088a10b31b2e14f7d19f03fd0e067c
Author: slothever <[email protected]>
AuthorDate: Sun Aug 20 19:29:25 2023 +0800
[feature](multi-catalog)support iceberg hadoop catalog external table query
(#22949)
support iceberg hadoop catalog external table query
---
docs/en/docs/lakehouse/multi-catalog/iceberg.md | 59 ++++++++++++++++++--
docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md | 61 ++++++++++++++++++---
.../org/apache/doris/catalog/HdfsResource.java | 1 +
.../datasource/iceberg/IcebergExternalCatalog.java | 1 +
.../iceberg/IcebergExternalCatalogFactory.java | 2 +
.../iceberg/IcebergHadoopExternalCatalog.java | 62 ++++++++++++++++++++++
.../org/apache/doris/persist/gson/GsonUtils.java | 2 +
.../planner/external/iceberg/IcebergScanNode.java | 23 ++++++--
...est_external_catalog_iceberg_hadoop_catalog.out | 17 ++++++
..._external_catalog_iceberg_hadoop_catalog.groovy | 43 +++++++++++++++
10 files changed, 256 insertions(+), 15 deletions(-)
diff --git a/docs/en/docs/lakehouse/multi-catalog/iceberg.md
b/docs/en/docs/lakehouse/multi-catalog/iceberg.md
index 54af57bee8..2baa05770f 100644
--- a/docs/en/docs/lakehouse/multi-catalog/iceberg.md
+++ b/docs/en/docs/lakehouse/multi-catalog/iceberg.md
@@ -53,7 +53,30 @@ CREATE CATALOG iceberg PROPERTIES (
### Create Catalog based on Iceberg API
-Use the Iceberg API to access metadata, and support services such as Hive,
REST, DLF and Glue as Iceberg's Catalog.
+Use the Iceberg API to access metadata, and support services such as Hadoop
File System, Hive, REST, DLF and Glue as Iceberg's Catalog.
+
+#### Hadoop Catalog
+
+```sql
+CREATE CATALOG iceberg_hadoop PROPERTIES (
+ 'type'='iceberg',
+ 'iceberg.catalog.type' = 'hadoop',
+ 'warehouse' = 'hdfs://your-host:8020/dir/key'
+);
+```
+
+```sql
+CREATE CATALOG iceberg_hadoop_ha PROPERTIES (
+ 'type'='iceberg',
+ 'iceberg.catalog.type' = 'hadoop',
+ 'warehouse' = 'hdfs://your-nameservice/dir/key',
+ 'dfs.nameservices'='your-nameservice',
+ 'dfs.ha.namenodes.your-nameservice'='nn1,nn2',
+ 'dfs.namenode.rpc-address.your-nameservice.nn1'='172.21.0.2:4007',
+ 'dfs.namenode.rpc-address.your-nameservice.nn2'='172.21.0.3:4007',
+
'dfs.client.failover.proxy.provider.your-nameservice'='org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider'
+);
+```
#### Hive Metastore
@@ -133,16 +156,42 @@ CREATE CATALOG iceberg PROPERTIES (
`hive.metastore.uris`: Dataproc Metastore URI,See in Metastore Services
:[Dataproc Metastore
Services](https://console.cloud.google.com/dataproc/metastore).
-### Iceberg On S3
+### Iceberg On Object Storage
If the data is stored on S3, the following parameters can be used in
properties:
```
"s3.access_key" = "ak"
"s3.secret_key" = "sk"
-"s3.endpoint" = "http://endpoint-uri"
-"s3.region" = "your-region"
-"s3.credentials.provider" = "provider-class-name" //
可选,默认凭证类基于BasicAWSCredentials实现。
+"s3.endpoint" = "s3.us-east-1.amazonaws.com"
+"s3.region" = "us-east-1"
+```
+
+The data is stored on Alibaba Cloud OSS:
+
+```
+"oss.access_key" = "ak"
+"oss.secret_key" = "sk"
+"oss.endpoint" = "oss-cn-beijing-internal.aliyuncs.com"
+"oss.region" = "oss-cn-beijing"
+```
+
+The data is stored on Tencent Cloud COS:
+
+```
+"cos.access_key" = "ak"
+"cos.secret_key" = "sk"
+"cos.endpoint" = "cos.ap-beijing.myqcloud.com"
+"cos.region" = "ap-beijing"
+```
+
+The data is stored on Huawei Cloud OBS:
+
+```
+"obs.access_key" = "ak"
+"obs.secret_key" = "sk"
+"obs.endpoint" = "obs.cn-north-4.myhuaweicloud.com"
+"obs.region" = "cn-north-4"
```
## Column type mapping
diff --git a/docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md
b/docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md
index ab5b447005..3e6a4826d0 100644
--- a/docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md
+++ b/docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md
@@ -53,7 +53,30 @@ CREATE CATALOG iceberg PROPERTIES (
### 基于Iceberg API创建Catalog
-使用Iceberg API访问元数据的方式,支持Hive、REST、Glue、DLF等服务作为Iceberg的Catalog。
+使用Iceberg API访问元数据的方式,支持Hadoop File
System、Hive、REST、Glue、DLF等服务作为Iceberg的Catalog。
+
+#### Hadoop Catalog
+
+```sql
+CREATE CATALOG iceberg_hadoop PROPERTIES (
+ 'type'='iceberg',
+ 'iceberg.catalog.type' = 'hadoop',
+ 'warehouse' = 'hdfs://your-host:8020/dir/key'
+);
+```
+
+```sql
+CREATE CATALOG iceberg_hadoop_ha PROPERTIES (
+ 'type'='iceberg',
+ 'iceberg.catalog.type' = 'hadoop',
+ 'warehouse' = 'hdfs://your-nameservice/dir/key',
+ 'dfs.nameservices'='your-nameservice',
+ 'dfs.ha.namenodes.your-nameservice'='nn1,nn2',
+ 'dfs.namenode.rpc-address.your-nameservice.nn1'='172.21.0.2:4007',
+ 'dfs.namenode.rpc-address.your-nameservice.nn2'='172.21.0.3:4007',
+
'dfs.client.failover.proxy.provider.your-nameservice'='org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider'
+);
+```
#### Hive Metastore
@@ -133,16 +156,42 @@ CREATE CATALOG iceberg PROPERTIES (
`hive.metastore.uris`: Dataproc Metastore 服务开放的接口,在 Metastore 管理页面获取
:[Dataproc Metastore
Services](https://console.cloud.google.com/dataproc/metastore).
-### Iceberg On S3
+### Iceberg On Object Storage
-若数据存放在S3上,properties中可以使用以下参数
+若数据存放在S3上,properties中可以使用以下参数:
```
"s3.access_key" = "ak"
"s3.secret_key" = "sk"
-"s3.endpoint" = "http://endpoint-uri"
-"s3.region" = "your-region"
-"s3.credentials.provider" = "provider-class-name" //
可选,默认凭证类基于BasicAWSCredentials实现。
+"s3.endpoint" = "s3.us-east-1.amazonaws.com"
+"s3.region" = "us-east-1"
+```
+
+数据存放在阿里云OSS上:
+
+```
+"oss.access_key" = "ak"
+"oss.secret_key" = "sk"
+"oss.endpoint" = "oss-cn-beijing-internal.aliyuncs.com"
+"oss.region" = "oss-cn-beijing"
+```
+
+数据存放在腾讯云COS上:
+
+```
+"cos.access_key" = "ak"
+"cos.secret_key" = "sk"
+"cos.endpoint" = "cos.ap-beijing.myqcloud.com"
+"cos.region" = "ap-beijing"
+```
+
+数据存放在华为云OBS上:
+
+```
+"obs.access_key" = "ak"
+"obs.secret_key" = "sk"
+"obs.endpoint" = "obs.cn-north-4.myhuaweicloud.com"
+"obs.region" = "cn-north-4"
```
## 列类型映射
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java
index 1f87f93e9d..d2a03aaf90 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java
@@ -54,6 +54,7 @@ public class HdfsResource extends Resource {
public static String HADOOP_SOCKET_PATH = "dfs.domain.socket.path";
public static String DSF_NAMESERVICES = "dfs.nameservices";
public static final String HDFS_PREFIX = "hdfs:";
+ public static final String HDFS_FILE_PREFIX = "hdfs://";
@SerializedName(value = "properties")
private Map<String, String> properties;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
index 50816b77a0..8df4acfc8f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
@@ -44,6 +44,7 @@ public abstract class IcebergExternalCatalog extends
ExternalCatalog {
public static final String ICEBERG_CATALOG_TYPE = "iceberg.catalog.type";
public static final String ICEBERG_REST = "rest";
public static final String ICEBERG_HMS = "hms";
+ public static final String ICEBERG_HADOOP = "hadoop";
public static final String ICEBERG_GLUE = "glue";
public static final String ICEBERG_DLF = "dlf";
protected String icebergCatalogType;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java
index 6ea5a3a73b..e8f593f293 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java
@@ -39,6 +39,8 @@ public class IcebergExternalCatalogFactory {
return new IcebergGlueExternalCatalog(catalogId, name,
resource, props, comment);
case IcebergExternalCatalog.ICEBERG_DLF:
return new IcebergDLFExternalCatalog(catalogId, name,
resource, props, comment);
+ case IcebergExternalCatalog.ICEBERG_HADOOP:
+ return new IcebergHadoopExternalCatalog(catalogId, name,
resource, props, comment);
default:
throw new DdlException("Unknown " +
IcebergExternalCatalog.ICEBERG_CATALOG_TYPE
+ " value: " + catalogType);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHadoopExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHadoopExternalCatalog.java
new file mode 100644
index 0000000000..06d1a4caaa
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHadoopExternalCatalog.java
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg;
+
+import org.apache.doris.catalog.HdfsResource;
+import org.apache.doris.datasource.CatalogProperty;
+import org.apache.doris.datasource.property.PropertyConverter;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class IcebergHadoopExternalCatalog extends IcebergExternalCatalog {
+
+ public IcebergHadoopExternalCatalog(long catalogId, String name, String
resource, Map<String, String> props,
+ String comment) {
+ super(catalogId, name, comment);
+ props = PropertyConverter.convertToMetaProperties(props);
+ String warehouse = props.get(CatalogProperties.WAREHOUSE_LOCATION);
+ Preconditions.checkArgument(StringUtils.isNotEmpty(warehouse),
+ "Cannot initialize Iceberg HadoopCatalog because 'warehouse'
must not be null or empty");
+ String nameService = StringUtils.substringBetween(warehouse,
HdfsResource.HDFS_FILE_PREFIX, "/");
+ if (StringUtils.isEmpty(nameService)) {
+ throw new IllegalArgumentException("Unrecognized 'warehouse'
location format"
+ + " because name service is required.");
+ }
+ catalogProperty = new CatalogProperty(resource, props);
+ catalogProperty.addProperty(HdfsResource.HADOOP_FS_NAME,
HdfsResource.HDFS_FILE_PREFIX + nameService);
+ }
+
+ @Override
+ protected void initLocalObjectsImpl() {
+ icebergCatalogType = ICEBERG_HADOOP;
+ HadoopCatalog hadoopCatalog = new HadoopCatalog();
+ hadoopCatalog.setConf(getConfiguration());
+ // initialize hive catalog
+ Map<String, String> catalogProperties = new HashMap<>();
+ String warehouse =
catalogProperty.getProperties().get(CatalogProperties.WAREHOUSE_LOCATION);
+ catalogProperties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouse);
+ hadoopCatalog.initialize(icebergCatalogType, catalogProperties);
+ catalog = hadoopCatalog;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
index 892ffbeca5..196e68bad3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
@@ -68,6 +68,7 @@ import
org.apache.doris.datasource.iceberg.IcebergDLFExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergGlueExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergHMSExternalCatalog;
+import org.apache.doris.datasource.iceberg.IcebergHadoopExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergRestExternalCatalog;
import org.apache.doris.datasource.jdbc.JdbcExternalCatalog;
import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
@@ -205,6 +206,7 @@ public class GsonUtils {
.registerSubtype(IcebergGlueExternalCatalog.class,
IcebergGlueExternalCatalog.class.getSimpleName())
.registerSubtype(IcebergRestExternalCatalog.class,
IcebergRestExternalCatalog.class.getSimpleName())
.registerSubtype(IcebergDLFExternalCatalog.class,
IcebergDLFExternalCatalog.class.getSimpleName())
+ .registerSubtype(IcebergHadoopExternalCatalog.class,
IcebergHadoopExternalCatalog.class.getSimpleName())
.registerSubtype(PaimonExternalCatalog.class,
PaimonExternalCatalog.class.getSimpleName())
.registerSubtype(PaimonHMSExternalCatalog.class,
PaimonHMSExternalCatalog.class.getSimpleName())
.registerSubtype(MaxComputeExternalCatalog.class,
MaxComputeExternalCatalog.class.getSimpleName());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
index 7293982ebb..c89c606b8b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
@@ -23,6 +23,7 @@ import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.external.ExternalTable;
import org.apache.doris.catalog.external.HMSExternalTable;
@@ -111,6 +112,7 @@ public class IcebergScanNode extends FileQueryScanNode {
case IcebergExternalCatalog.ICEBERG_REST:
case IcebergExternalCatalog.ICEBERG_DLF:
case IcebergExternalCatalog.ICEBERG_GLUE:
+ case IcebergExternalCatalog.ICEBERG_HADOOP:
source = new IcebergApiSource((IcebergExternalTable)
table, desc, columnNameToRange);
break;
default:
@@ -194,7 +196,7 @@ public class IcebergScanNode extends FileQueryScanNode {
// Min split size is DEFAULT_SPLIT_SIZE(128MB).
long splitSize =
Math.max(ConnectContext.get().getSessionVariable().getFileSplitSize(),
DEFAULT_SPLIT_SIZE);
HashSet<String> partitionPathSet = new HashSet<>();
- String dataPath = icebergTable.location() + icebergTable.properties()
+ String dataPath = normalizeLocation(icebergTable.location()) +
icebergTable.properties()
.getOrDefault(TableProperties.WRITE_DATA_LOCATION,
DEFAULT_DATA_PATH);
boolean isPartitionedTable = icebergTable.spec().isPartitioned();
@@ -202,7 +204,7 @@ public class IcebergScanNode extends FileQueryScanNode {
try (CloseableIterable<CombinedScanTask> combinedScanTasks =
TableScanUtil.planTasks(fileScanTasks, splitSize, 1, 0)) {
combinedScanTasks.forEach(taskGrp ->
taskGrp.files().forEach(splitTask -> {
- String dataFilePath = splitTask.file().path().toString();
+ String dataFilePath =
normalizeLocation(splitTask.file().path().toString());
// Counts the number of partitions read
if (isPartitionedTable) {
@@ -311,8 +313,21 @@ public class IcebergScanNode extends FileQueryScanNode {
@Override
public TFileType getLocationType(String location) throws UserException {
- return getTFileType(location).orElseThrow(() ->
- new DdlException("Unknown file location " + location + " for
iceberg table " + icebergTable.name()));
+ final String fLocation = normalizeLocation(location);
+ return getTFileType(fLocation).orElseThrow(() ->
+ new DdlException("Unknown file location " + fLocation + " for
iceberg table " + icebergTable.name()));
+ }
+
+ private String normalizeLocation(String location) {
+ Map<String, String> props = source.getCatalog().getProperties();
+ String icebergCatalogType =
props.get(IcebergExternalCatalog.ICEBERG_CATALOG_TYPE);
+ if (icebergCatalogType.equalsIgnoreCase("hadoop")) {
+ if (!location.startsWith(HdfsResource.HDFS_PREFIX)) {
+ String fsName = props.get(HdfsResource.HADOOP_FS_NAME);
+ location = fsName + location;
+ }
+ }
+ return location;
}
@Override
diff --git
a/regression-test/data/external_table_p2/iceberg/test_external_catalog_iceberg_hadoop_catalog.out
b/regression-test/data/external_table_p2/iceberg/test_external_catalog_iceberg_hadoop_catalog.out
new file mode 100644
index 0000000000..fa1a58f6f1
--- /dev/null
+++
b/regression-test/data/external_table_p2/iceberg/test_external_catalog_iceberg_hadoop_catalog.out
@@ -0,0 +1,17 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !q01 --
+2879562
+
+-- !q02 --
+1
+3
+5
+6
+7
+8
+11
+
+-- !q03 --
+1 Customer#000000001 j5JsirBM9P MOROCCO 0 MOROCCO AFRICA
25-989-741-2988 BUILDING
+3 Customer#000000003 fkRGN8n ARGENTINA7 ARGENTINA AMERICA
11-719-748-3364 AUTOMOBILE
+5 Customer#000000005 hwBtxkoBF qSW4KrI CANADA 5 CANADA
AMERICA 13-750-942-6364 HOUSEHOLD
diff --git
a/regression-test/suites/external_table_p2/iceberg/test_external_catalog_iceberg_hadoop_catalog.groovy
b/regression-test/suites/external_table_p2/iceberg/test_external_catalog_iceberg_hadoop_catalog.groovy
new file mode 100644
index 0000000000..b35a799b28
--- /dev/null
+++
b/regression-test/suites/external_table_p2/iceberg/test_external_catalog_iceberg_hadoop_catalog.groovy
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_external_catalog_iceberg_hadoop_catalog",
"p2,external,iceberg,external_remote,external_remote_iceberg") {
+ String enabled = context.config.otherConfigs.get("enableExternalHiveTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String iceberg_catalog_name = "test_external_iceberg_catalog_hadoop"
+ String extHiveHmsHost =
context.config.otherConfigs.get("extHiveHmsHost")
+ String extHdfsPort = context.config.otherConfigs.get("extHdfsPort")
+ sql """drop catalog if exists ${iceberg_catalog_name};"""
+ sql """
+ create catalog if not exists ${iceberg_catalog_name} properties (
+ 'type'='iceberg',
+ 'iceberg.catalog.type'='hadoop',
+ 'warehouse' =
'hdfs://${extHiveHmsHost}:${extHdfsPort}/usr/hive/warehouse/hadoop_catalog'
+ );
+ """
+
+ sql """switch ${iceberg_catalog_name};"""
+ def q01 = {
+ qt_q01 """ select count(*) from iceberg_hadoop_catalog """
+ qt_q02 """ select c_custkey from iceberg_hadoop_catalog group by
c_custkey order by c_custkey limit 7 """
+ qt_q03 """ select * from iceberg_hadoop_catalog order by c_custkey
limit 3 """
+ }
+
+ sql """ use `multi_catalog`; """
+ q01()
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]