This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 01bef4b  [Load] Add "LOAD WITH HDFS" model, and make hdfs_reader 
support hdfs ha (#6161)
01bef4b is described below

commit 01bef4b40d2df3b4def828423ab0a0d9fdd29d86
Author: pengxiangyu <[email protected]>
AuthorDate: Sat Jul 10 10:11:52 2021 +0800

    [Load] Add "LOAD WITH HDFS" model, and make hdfs_reader support hdfs ha 
(#6161)
    
    Support load data from HDFS by using `LOAD WITH HDFS` syntax and read data 
directly via libhdfs3
---
 be/src/exec/hdfs_file_reader.cpp                   |  4 +-
 .../Data Manipulation/BROKER LOAD.md               | 48 +++++++++++++++-
 .../Data Manipulation/BROKER LOAD.md               | 46 +++++++++++++++
 .../org/apache/doris/common/util/BrokerUtil.java   | 66 ++++++++++++++++++++++
 .../org/apache/doris/planner/BrokerScanNode.java   |  6 ++
 gensrc/thrift/PlanNodes.thrift                     | 13 ++---
 6 files changed, 172 insertions(+), 11 deletions(-)

diff --git a/be/src/exec/hdfs_file_reader.cpp b/be/src/exec/hdfs_file_reader.cpp
index 95c819a..6c05840 100644
--- a/be/src/exec/hdfs_file_reader.cpp
+++ b/be/src/exec/hdfs_file_reader.cpp
@@ -30,9 +30,7 @@ HdfsFileReader::HdfsFileReader(THdfsParams hdfs_params, const 
std::string& path,
           _file_size(-1),
           _hdfs_fs(nullptr),
           _hdfs_file(nullptr) {
-    std::stringstream namenode_ss;
-    namenode_ss << "hdfs://" << _hdfs_params.host << ":" << _hdfs_params.port;
-    _namenode = namenode_ss.str();
+    _namenode = _hdfs_params.fs_name;
 }
 
 HdfsFileReader::~HdfsFileReader() {
diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/BROKER 
LOAD.md b/docs/en/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md
index 90a7fde..092a88a 100644
--- a/docs/en/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md     
+++ b/docs/en/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md     
@@ -218,6 +218,22 @@ under the License.
                 "AWS_SECRET_KEY"="",
                 "AWS_REGION" = ""
             )
+        6. if using load with hdfs, you need to specify the following 
attributes 
+            (
+                "fs.defaultFS" = "",
+                "hdfs_user"="",
+                "kerb_principal" = "",
+                "kerb_ticket_cache_path" = "",
+                "kerb_token" = ""
+            )
+            fs.defaultFS: defaultFS
+            hdfs_user: hdfs user
+            namenode HA:
+            By configuring namenode HA, new namenode can be automatically 
identified when the namenode is switched
+            dfs.nameservices: hdfs service name, customize, eg: 
"dfs.nameservices" = "my_ha"
+            dfs.ha.namenodes.xxx: Customize the name of a namenode, separated 
by commas. XXX is a custom name in dfs. name services, such as "dfs. ha. 
namenodes. my_ha" = "my_nn"
+            dfs.namenode.rpc-address.xxx.nn: Specify RPC address information 
for namenode, where NN denotes the name of the namenode configured in 
dfs.ha.namenodes.xxxx, such as: "dfs.namenode.rpc-address.my_ha.my_nn"= 
"host:port"
+            dfs.client.failover.proxy.provider: Specify the provider that 
client connects to namenode by default: org. apache. hadoop. hdfs. server. 
namenode. ha. Configured Failover ProxyProvider.
 
     4. opt_properties
 
@@ -529,7 +545,37 @@ under the License.
         properties("fuzzy_parse"="true", "strip_outer_array"="true")
         )
         WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password"); 
  
-     
+
+    16. LOAD WITH HDFS, normal HDFS cluster
+        LOAD LABEL example_db.label_filter
+        (
+            DATA INFILE("hdfs://host:port/user/data/*/test.txt")
+            INTO TABLE `tbl1`
+            COLUMNS TERMINATED BY ","
+            (k1,k2,v1,v2)
+        ) 
+        with HDFS (
+            "fs.defaultFS"="hdfs://testFs",
+            "hdfs_user"="user"
+        );
+    17. LOAD WITH HDFS, hdfs ha
+        LOAD LABEL example_db.label_filter
+        (
+            DATA INFILE("hdfs://host:port/user/data/*/test.txt")
+            INTO TABLE `tbl1`
+            COLUMNS TERMINATED BY ","
+            (k1,k2,v1,v2)
+        ) 
+        with HDFS (
+            "fs.defaultFS"="hdfs://testFs",
+            "hdfs_user"="user"
+            "dfs.nameservices"="my_ha",
+            "dfs.ha.namenodes.xxx"="my_nn1,my_nn2",
+            "dfs.namenode.rpc-address.xxx.my_nn1"="host1:port",
+            "dfs.namenode.rpc-address.xxx.my_nn2"="host2:port",
+            
"dfs.client.failover.proxy.provider.xxx"="org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
+        );
+
 ## keyword
 
     BROKER,LOAD
diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BROKER 
LOAD.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BROKER 
LOAD.md
index 18240f6..b9edbfb 100644
--- a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md  
+++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md  
@@ -217,6 +217,22 @@ under the License.
                 "AWS_SECRET_KEY"="",
                 "AWS_REGION" = ""
             )
+        6. 如果使用HDFS协议直接连接远程存储时需要指定如下属性
+            (
+                "fs.defaultFS" = "",
+                "hdfs_user"="",
+                "kerb_principal" = "",
+                "kerb_ticket_cache_path" = "",
+                "kerb_token" = ""
+            )
+            fs.defaultFS: hdfs集群defaultFS
+            hdfs_user: 连接hdfs集群时使用的用户名
+            namenode HA:
+            通过配置 namenode HA,可以在 namenode 切换时,自动识别到新的 namenode
+            dfs.nameservices: 指定 hdfs 服务的名字,自定义,如:"dfs.nameservices" = "my_ha"
+            dfs.ha.namenodes.xxx:自定义 namenode 的名字,多个名字以逗号分隔。其中 xxx 为 
dfs.nameservices 中自定义的名字,如 "dfs.ha.namenodes.my_ha" = "my_nn"
+            dfs.namenode.rpc-address.xxx.nn:指定 namenode 的rpc地址信息。其中 nn 表示 
dfs.ha.namenodes.xxx 中配置的 namenode 的名字,如:"dfs.namenode.rpc-address.my_ha.my_nn" 
= "host:port"
+            dfs.client.failover.proxy.provider:指定 client 连接 namenode 的 
provider,默认为:org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
         
     4. opt_properties
 
@@ -547,6 +563,36 @@ under the License.
         )
         WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
 
+    16. LOAD WITH HDFS, 普通HDFS集群
+        LOAD LABEL example_db.label_filter
+        (
+            DATA INFILE("hdfs://host:port/user/data/*/test.txt")
+            INTO TABLE `tbl1`
+            COLUMNS TERMINATED BY ","
+            (k1,k2,v1,v2)
+        ) 
+        with HDFS (
+            "fs.defaultFS"="hdfs://testFs",
+            "hdfs_user"="user"
+        );
+    17. LOAD WITH HDFS, 带ha的HDFS集群
+        LOAD LABEL example_db.label_filter
+        (
+            DATA INFILE("hdfs://host:port/user/data/*/test.txt")
+            INTO TABLE `tbl1`
+            COLUMNS TERMINATED BY ","
+            (k1,k2,v1,v2)
+        ) 
+        with HDFS (
+            "fs.defaultFS"="hdfs://testFs",
+            "hdfs_user"="user"
+            "dfs.nameservices"="my_ha",
+            "dfs.ha.namenodes.xxx"="my_nn1,my_nn2",
+            "dfs.namenode.rpc-address.xxx.my_nn1"="host1:port",
+            "dfs.namenode.rpc-address.xxx.my_nn2"="host2:port",
+            
"dfs.client.failover.proxy.provider.xxx"="org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
+        );
+
 ## keyword
 
     BROKER,LOAD
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
index c936e90..cc73098 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
@@ -48,31 +48,69 @@ import org.apache.doris.thrift.TBrokerOperationStatus;
 import org.apache.doris.thrift.TBrokerOperationStatusCode;
 import org.apache.doris.thrift.TBrokerPReadRequest;
 import org.apache.doris.thrift.TBrokerPWriteRequest;
+import org.apache.doris.thrift.TBrokerRangeDesc;
 import org.apache.doris.thrift.TBrokerReadResponse;
 import org.apache.doris.thrift.TBrokerRenamePathRequest;
 import org.apache.doris.thrift.TBrokerVersion;
+import org.apache.doris.thrift.THdfsConf;
+import org.apache.doris.thrift.THdfsParams;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TPaloBrokerService;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.thrift.TException;
 
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 public class BrokerUtil {
     private static final Logger LOG = LogManager.getLogger(BrokerUtil.class);
 
     private static int READ_BUFFER_SIZE_B = 1024 * 1024;
+    private static String HDFS_FS_KEY = "fs.defaultFS";
+    private static String HDFS_USER_KEY = "hdfs_user";
+    private static String HDFS_KERB_PRINCIPAL = "kerb_principal";
+    private static String HDFS_KERB_TICKET_CACHE_PATH = 
"kerb_ticket_cache_path";
+    private static String HDFS_KERB_TOKEN = "kerb_token";
+
+    public static void generateHdfsParam(Map<String, String> properties, 
TBrokerRangeDesc rangeDesc) {
+        rangeDesc.setHdfsParams(new THdfsParams());
+        rangeDesc.hdfs_params.setHdfsConf(new ArrayList<>());
+        for (Map.Entry<String, String> property : properties.entrySet()) {
+            if (property.getKey().equalsIgnoreCase(HDFS_FS_KEY)) {
+                rangeDesc.hdfs_params.setFsName(property.getValue());
+            } else if (property.getKey().equalsIgnoreCase(HDFS_USER_KEY)) {
+                rangeDesc.hdfs_params.setUser(property.getValue());
+            } else if 
(property.getKey().equalsIgnoreCase(HDFS_KERB_PRINCIPAL)) {
+                rangeDesc.hdfs_params.setKerbPrincipal(property.getValue());
+            } else if 
(property.getKey().equalsIgnoreCase(HDFS_KERB_TICKET_CACHE_PATH)) {
+                
rangeDesc.hdfs_params.setKerbTicketCachePath(property.getValue());
+            } else if (property.getKey().equalsIgnoreCase(HDFS_KERB_TOKEN)) {
+                rangeDesc.hdfs_params.setToken(property.getValue());
+            } else {
+                THdfsConf hdfsConf = new THdfsConf();
+                hdfsConf.setKey(property.getKey());
+                hdfsConf.setValue(property.getValue());
+                rangeDesc.hdfs_params.hdfs_conf.add(hdfsConf);
+            }
+        }
+    }
 
     /**
      * Parse file status in path with broker, except directory
@@ -127,6 +165,34 @@ public class BrokerUtil {
                     fileStatuses.add(new TBrokerFileStatus(r.getName(), 
!r.isFile(), r.getSize(), r.isFile()));
                 }
             }
+        } else if (brokerDesc.getStorageType() == 
StorageBackend.StorageType.HDFS) {
+            if (!brokerDesc.getProperties().containsKey(HDFS_FS_KEY)
+                    || !brokerDesc.getProperties().containsKey(HDFS_USER_KEY)) 
{
+                throw new UserException(String.format(
+                        "The properties of hdfs is invalid. %s and %s are 
needed", HDFS_FS_KEY, HDFS_USER_KEY));
+            }
+            String hdfsFsName = brokerDesc.getProperties().get(HDFS_FS_KEY);
+            String user = brokerDesc.getProperties().get(HDFS_USER_KEY);
+            Configuration conf = new Configuration();
+            for (Map.Entry<String, String> propEntry : 
brokerDesc.getProperties().entrySet()) {
+                if (propEntry.getKey().equals(HDFS_FS_KEY) || 
propEntry.getKey().equals(HDFS_USER_KEY)) {
+                    continue;
+                }
+                conf.set(propEntry.getKey(), propEntry.getValue());
+            }
+            try {
+                FileSystem fs = FileSystem.get(new URI(hdfsFsName), conf, 
user);
+                FileStatus[] statusList = fs.listStatus(new Path(path));
+                for (FileStatus status : statusList) {
+                    if (status.isFile()) {
+                        fileStatuses.add(new 
TBrokerFileStatus(status.getPath().toUri().getPath(),
+                                status.isDirectory(), status.getLen(), 
status.isFile()));
+                    }
+                }
+            } catch (IOException | InterruptedException | URISyntaxException 
e) {
+                LOG.warn("hdfs check error: ", e);
+                throw new UserException(e.getMessage());
+            }
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
index e3df23e..269f570 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
@@ -502,6 +502,12 @@ public class BrokerScanNode extends LoadScanNode {
         rangeDesc.setFileSize(fileStatus.size);
         rangeDesc.setNumOfColumnsFromFile(numberOfColumnsFromFile);
         rangeDesc.setColumnsFromPath(columnsFromPath);
+        // set hdfs params for hdfs file type.
+        switch (brokerDesc.getFileType()) {
+            case FILE_HDFS:
+                BrokerUtil.generateHdfsParam(brokerDesc.getProperties(), 
rangeDesc);
+                break;
+        }
         return rangeDesc;
     }
 
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 320b9e0..d3b2cd0 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -114,13 +114,12 @@ struct THdfsConf {
 }
 
 struct THdfsParams {
-    1: optional string host
-    2: optional i32 port
-    3: optional string user
-    4: optional string kerb_principal
-    5: optional string kerb_ticket_cache_path
-    6: optional string token
-    7: optional list<THdfsConf> hdfs_conf
+    1: optional string fs_name
+    2: optional string user
+    3: optional string kerb_principal
+    4: optional string kerb_ticket_cache_path
+    5: optional string token
+    6: optional list<THdfsConf> hdfs_conf
 }
 
 // One broker range information.

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to