This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 08e71e5f84 [fix](multi-catalog)compatible with hdfs HA empty prefix 
(#22480)
08e71e5f84 is described below

commit 08e71e5f8489a93262b30b1371fb168a590d0b60
Author: slothever <[email protected]>
AuthorDate: Wed Aug 2 09:52:20 2023 +0800

    [fix](multi-catalog)compatible with hdfs HA empty prefix (#22480)
    
    cherry-pick from master,
    #22424
---
 docs/en/docs/lakehouse/multi-catalog/iceberg.md    | 15 +++++++
 docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md | 15 +++++++
 .../org/apache/doris/catalog/HdfsResource.java     |  1 +
 .../java/org/apache/doris/common/util/S3Util.java  | 50 ++++++++++++++++++++--
 .../doris/datasource/hive/HiveMetaStoreCache.java  |  7 +--
 .../planner/external/iceberg/IcebergScanNode.java  |  7 +--
 .../planner/external/iceberg/IcebergSplit.java     |  6 ++-
 7 files changed, 91 insertions(+), 10 deletions(-)

diff --git a/docs/en/docs/lakehouse/multi-catalog/iceberg.md 
b/docs/en/docs/lakehouse/multi-catalog/iceberg.md
index 4509fbc4fa..54af57bee8 100644
--- a/docs/en/docs/lakehouse/multi-catalog/iceberg.md
+++ b/docs/en/docs/lakehouse/multi-catalog/iceberg.md
@@ -93,11 +93,26 @@ see [Alibaba Cloud DLF Catalog](dlf.md)
 
 This method needs to provide REST services in advance, and users need to 
implement the REST interface for obtaining Iceberg metadata.
 
+```sql
+CREATE CATALOG iceberg PROPERTIES (
+    'type'='iceberg',
+    'iceberg.catalog.type'='rest',
+    'uri' = 'http://172.21.0.1:8181'
+);
+```
+
+If the data is on HDFS and High Availability (HA) is set up, need to add HA 
configuration to the Catalog.
+
 ```sql
 CREATE CATALOG iceberg PROPERTIES (
     'type'='iceberg',
     'iceberg.catalog.type'='rest',
     'uri' = 'http://172.21.0.1:8181',
+    'dfs.nameservices'='your-nameservice',
+    'dfs.ha.namenodes.your-nameservice'='nn1,nn2',
+    'dfs.namenode.rpc-address.your-nameservice.nn1'='172.21.0.1:8020',
+    'dfs.namenode.rpc-address.your-nameservice.nn2'='172.21.0.2:8020',
+    
'dfs.client.failover.proxy.provider.your-nameservice'='org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider'
 );
 ```
 
diff --git a/docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md 
b/docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md
index c93fca28e5..ab5b447005 100644
--- a/docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md
+++ b/docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md
@@ -93,11 +93,26 @@ Iceberg 属性详情参见 [Iceberg Glue 
Catalog](https://iceberg.apache.org/doc
 
 该方式需要预先提供REST服务,用户需实现获取Iceberg元数据的REST接口。
 
+```sql
+CREATE CATALOG iceberg PROPERTIES (
+    'type'='iceberg',
+    'iceberg.catalog.type'='rest',
+    'uri' = 'http://172.21.0.1:8181'
+);
+```
+
+如果使用HDFS存储数据,并开启了高可用模式,还需在Catalog中增加HDFS高可用配置:
+
 ```sql
 CREATE CATALOG iceberg PROPERTIES (
     'type'='iceberg',
     'iceberg.catalog.type'='rest',
     'uri' = 'http://172.21.0.1:8181',
+    'dfs.nameservices'='your-nameservice',
+    'dfs.ha.namenodes.your-nameservice'='nn1,nn2',
+    'dfs.namenode.rpc-address.your-nameservice.nn1'='172.21.0.1:8020',
+    'dfs.namenode.rpc-address.your-nameservice.nn2'='172.21.0.2:8020',
+    
'dfs.client.failover.proxy.provider.your-nameservice'='org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider'
 );
 ```
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java
index cdfb169590..9735f2f059 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java
@@ -53,6 +53,7 @@ public class HdfsResource extends Resource {
     public static String HADOOP_SHORT_CIRCUIT = "dfs.client.read.shortcircuit";
     public static String HADOOP_SOCKET_PATH = "dfs.domain.socket.path";
     public static String DSF_NAMESERVICES = "dfs.nameservices";
+    public static final String HDFS_PREFIX = "hdfs:";
 
     @SerializedName(value = "properties")
     private Map<String, String> properties;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java
index 64c897c306..623e699fb6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java
@@ -17,9 +17,11 @@
 
 package org.apache.doris.common.util;
 
+import org.apache.doris.catalog.HdfsResource;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.datasource.credentials.CloudCredential;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -38,7 +40,9 @@ import software.amazon.awssdk.services.s3.S3Client;
 import software.amazon.awssdk.services.s3.S3Configuration;
 
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.time.Duration;
+import java.util.Map;
 
 public class S3Util {
     private static final Logger LOG = LogManager.getLogger(S3Util.class);
@@ -63,7 +67,7 @@ public class S3Util {
      * @param location origin location
      * @return metadata location path. just convert when storage is compatible 
with s3 client.
      */
-    public static String convertToS3IfNecessary(String location) {
+    public static String convertToS3IfNecessary(String location, Map<String, 
String> props) {
         LOG.debug("try convert location to s3 prefix: " + location);
         if (isObjStorageUseS3Client(location)) {
             int pos = location.indexOf("://");
@@ -72,6 +76,46 @@ public class S3Util {
             }
             return "s3" + location.substring(pos);
         }
+        return normalizedLocation(location, props);
+    }
+
+    private static String normalizedLocation(String location, Map<String, 
String> props) {
+        try {
+            if (location.startsWith(HdfsResource.HDFS_PREFIX)) {
+                return normalizedHdfsPath(location, props);
+            }
+            return location;
+        } catch (URISyntaxException e) {
+            throw new RuntimeException(e.getMessage(), e);
+        }
+    }
+
+    private static String normalizedHdfsPath(String location, Map<String, 
String> props) throws URISyntaxException {
+        URI normalizedUri = new URI(location);
+        // compatible with 'hdfs:///' or 'hdfs:/'
+        if (StringUtils.isEmpty(normalizedUri.getHost())) {
+            String normalizedPrefix = HdfsResource.HDFS_PREFIX + "//";
+            String brokenPrefix = HdfsResource.HDFS_PREFIX + "/";
+            if (location.startsWith(brokenPrefix) && 
!location.startsWith(normalizedPrefix)) {
+                location = location.replace(brokenPrefix, normalizedPrefix);
+            }
+            // Need add hdfs host to location
+            String host = props.get(HdfsResource.DSF_NAMESERVICES);
+            if (StringUtils.isNotEmpty(host)) {
+                // Replace 'hdfs://key/' to 'hdfs://name_service/key/'
+                // Or hdfs:///abc to hdfs://name_service/abc
+                return location.replace(normalizedPrefix, normalizedPrefix + 
host + "/");
+            } else {
+                // 'hdfs://null/' equals the 'hdfs:///'
+                if (location.startsWith(HdfsResource.HDFS_PREFIX + "///")) {
+                    // Do not support hdfs:///location
+                    throw new RuntimeException("Invalid location with empty 
host: " + location);
+                } else {
+                    // Replace 'hdfs://key/' to '/key/', try access local 
NameNode on BE.
+                    return location.replace(normalizedPrefix, "/");
+                }
+            }
+        }
         return location;
     }
 
@@ -80,7 +124,7 @@ public class S3Util {
      * @param location origin split path
      * @return BE scan range path
      */
-    public static Path toScanRangeLocation(String location) {
+    public static Path toScanRangeLocation(String location, Map<String, 
String> props) {
         // All storage will use s3 client on BE.
         if (isObjStorage(location)) {
             int pos = location.indexOf("://");
@@ -95,7 +139,7 @@ public class S3Util {
                 location = "s3" + location.substring(pos);
             }
         }
-        return new Path(location);
+        return new Path(normalizedLocation(location, props));
     }
 
     public static boolean isHdfsOnOssEndpoint(String location) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
index 0bd190d945..e1fa35d07e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
@@ -378,7 +378,7 @@ public class HiveMetaStoreCache {
             RemoteFiles locatedFiles = fs.listLocatedFiles(location, true, 
true);
             for (RemoteFile remoteFile : locatedFiles.files()) {
                 Path srcPath = remoteFile.getPath();
-                Path convertedPath = 
S3Util.toScanRangeLocation(srcPath.toString());
+                Path convertedPath = 
S3Util.toScanRangeLocation(srcPath.toString(), catalog.getProperties());
                 if (!convertedPath.toString().equals(srcPath.toString())) {
                     remoteFile.setPath(convertedPath);
                 }
@@ -403,7 +403,7 @@ public class HiveMetaStoreCache {
         ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
         try {
             
Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
-            String finalLocation = S3Util.convertToS3IfNecessary(key.location);
+            String finalLocation = S3Util.convertToS3IfNecessary(key.location, 
catalog.getProperties());
             // disable the fs cache in FileSystem, or it will always from new 
FileSystem
             // and save it in cache when calling 
FileInputFormat.setInputPaths().
             try {
@@ -437,7 +437,8 @@ public class HiveMetaStoreCache {
                     for (int i = 0; i < splits.length; i++) {
                         org.apache.hadoop.mapred.FileSplit fs = 
((org.apache.hadoop.mapred.FileSplit) splits[i]);
                         // todo: get modification time
-                        Path splitFilePath = 
S3Util.toScanRangeLocation(fs.getPath().toString());
+                        Path splitFilePath = 
S3Util.toScanRangeLocation(fs.getPath().toString(),
+                                    catalog.getProperties());
                         result.addSplit(new FileSplit(splitFilePath, 
fs.getStart(), fs.getLength(), -1, null, null));
                     }
                 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
index 3d3634fb66..23bd919461 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
@@ -125,7 +125,7 @@ public class IcebergScanNode extends FileQueryScanNode {
             for (IcebergDeleteFileFilter filter : 
icebergSplit.getDeleteFileFilters()) {
                 TIcebergDeleteFileDesc deleteFileDesc = new 
TIcebergDeleteFileDesc();
                 String deleteFilePath = filter.getDeleteFilePath();
-                
deleteFileDesc.setPath(S3Util.toScanRangeLocation(deleteFilePath).toString());
+                
deleteFileDesc.setPath(S3Util.toScanRangeLocation(deleteFilePath, 
icebergSplit.getConfig()).toString());
                 if (filter instanceof IcebergDeleteFileFilter.PositionDelete) {
                     fileDesc.setContent(FileContent.POSITION_DELETES.id());
                     IcebergDeleteFileFilter.PositionDelete positionDelete =
@@ -188,13 +188,14 @@ public class IcebergScanNode extends FileQueryScanNode {
                  TableScanUtil.planTasks(fileScanTasks, splitSize, 1, 0)) {
             combinedScanTasks.forEach(taskGrp -> 
taskGrp.files().forEach(splitTask -> {
                 String dataFilePath = splitTask.file().path().toString();
-                Path finalDataFilePath = 
S3Util.toScanRangeLocation(dataFilePath);
+                Path finalDataFilePath = 
S3Util.toScanRangeLocation(dataFilePath, source.getCatalog().getProperties());
                 IcebergSplit split = new IcebergSplit(
                         finalDataFilePath,
                         splitTask.start(),
                         splitTask.length(),
                         splitTask.file().fileSizeInBytes(),
-                        new String[0]);
+                        new String[0],
+                        source.getCatalog().getProperties());
                 split.setFormatVersion(formatVersion);
                 if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) {
                     
split.setDeleteFileFilters(getDeleteFileFilters(splitTask));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
index 9064017088..de3f2ec6aa 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
@@ -23,16 +23,20 @@ import lombok.Data;
 import org.apache.hadoop.fs.Path;
 
 import java.util.List;
+import java.util.Map;
 
 @Data
 public class IcebergSplit extends FileSplit {
     // File path will be changed if the file is modified, so there's no need 
to get modification time.
-    public IcebergSplit(Path file, long start, long length, long fileLength, 
String[] hosts) {
+    public IcebergSplit(Path file, long start, long length, long fileLength, 
String[] hosts,
+                        Map<String, String> config) {
         super(file, start, length, fileLength, hosts, null);
+        this.config = config;
     }
 
     private Integer formatVersion;
     private List<IcebergDeleteFileFilter> deleteFileFilters;
+    private Map<String, String> config;
 }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to