This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 01bef4b [Load] Add "LOAD WITH HDFS" model, and make hdfs_reader
support hdfs ha (#6161)
01bef4b is described below
commit 01bef4b40d2df3b4def828423ab0a0d9fdd29d86
Author: pengxiangyu <[email protected]>
AuthorDate: Sat Jul 10 10:11:52 2021 +0800
[Load] Add "LOAD WITH HDFS" model, and make hdfs_reader support hdfs ha
(#6161)
Support load data from HDFS by using `LOAD WITH HDFS` syntax and read data
directly via libhdfs3
---
be/src/exec/hdfs_file_reader.cpp | 4 +-
.../Data Manipulation/BROKER LOAD.md | 48 +++++++++++++++-
.../Data Manipulation/BROKER LOAD.md | 46 +++++++++++++++
.../org/apache/doris/common/util/BrokerUtil.java | 66 ++++++++++++++++++++++
.../org/apache/doris/planner/BrokerScanNode.java | 6 ++
gensrc/thrift/PlanNodes.thrift | 13 ++---
6 files changed, 172 insertions(+), 11 deletions(-)
diff --git a/be/src/exec/hdfs_file_reader.cpp b/be/src/exec/hdfs_file_reader.cpp
index 95c819a..6c05840 100644
--- a/be/src/exec/hdfs_file_reader.cpp
+++ b/be/src/exec/hdfs_file_reader.cpp
@@ -30,9 +30,7 @@ HdfsFileReader::HdfsFileReader(THdfsParams hdfs_params, const
std::string& path,
_file_size(-1),
_hdfs_fs(nullptr),
_hdfs_file(nullptr) {
- std::stringstream namenode_ss;
- namenode_ss << "hdfs://" << _hdfs_params.host << ":" << _hdfs_params.port;
- _namenode = namenode_ss.str();
+ _namenode = _hdfs_params.fs_name;
}
HdfsFileReader::~HdfsFileReader() {
diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/BROKER
LOAD.md b/docs/en/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md
index 90a7fde..092a88a 100644
--- a/docs/en/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md
+++ b/docs/en/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md
@@ -218,6 +218,22 @@ under the License.
"AWS_SECRET_KEY"="",
"AWS_REGION" = ""
)
+ 6. if using load with hdfs, you need to specify the following
attributes
+ (
+ "fs.defaultFS" = "",
+ "hdfs_user"="",
+ "kerb_principal" = "",
+ "kerb_ticket_cache_path" = "",
+ "kerb_token" = ""
+ )
+ fs.defaultFS: defaultFS
+ hdfs_user: hdfs user
+ namenode HA:
+ By configuring namenode HA, new namenode can be automatically
identified when the namenode is switched
+ dfs.nameservices: hdfs service name, customize, eg:
"dfs.nameservices" = "my_ha"
+ dfs.ha.namenodes.xxx: Customize the name of a namenode, separated
by commas. XXX is a custom name in dfs. name services, such as "dfs. ha.
namenodes. my_ha" = "my_nn"
+ dfs.namenode.rpc-address.xxx.nn: Specify RPC address information
for namenode, where NN denotes the name of the namenode configured in
dfs.ha.namenodes.xxxx, such as: "dfs.namenode.rpc-address.my_ha.my_nn"=
"host:port"
+ dfs.client.failover.proxy.provider: Specify the provider that
client connects to namenode by default: org. apache. hadoop. hdfs. server.
namenode. ha. Configured Failover ProxyProvider.
4. opt_properties
@@ -529,7 +545,37 @@ under the License.
properties("fuzzy_parse"="true", "strip_outer_array"="true")
)
WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
-
+
+ 16. LOAD WITH HDFS, normal HDFS cluster
+ LOAD LABEL example_db.label_filter
+ (
+ DATA INFILE("hdfs://host:port/user/data/*/test.txt")
+ INTO TABLE `tbl1`
+ COLUMNS TERMINATED BY ","
+ (k1,k2,v1,v2)
+ )
+ with HDFS (
+ "fs.defaultFS"="hdfs://testFs",
+ "hdfs_user"="user"
+ );
+ 17. LOAD WITH HDFS, hdfs ha
+ LOAD LABEL example_db.label_filter
+ (
+ DATA INFILE("hdfs://host:port/user/data/*/test.txt")
+ INTO TABLE `tbl1`
+ COLUMNS TERMINATED BY ","
+ (k1,k2,v1,v2)
+ )
+ with HDFS (
+ "fs.defaultFS"="hdfs://testFs",
+ "hdfs_user"="user"
+ "dfs.nameservices"="my_ha",
+ "dfs.ha.namenodes.xxx"="my_nn1,my_nn2",
+ "dfs.namenode.rpc-address.xxx.my_nn1"="host1:port",
+ "dfs.namenode.rpc-address.xxx.my_nn2"="host2:port",
+
"dfs.client.failover.proxy.provider.xxx"="org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
+ );
+
## keyword
BROKER,LOAD
diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BROKER
LOAD.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BROKER
LOAD.md
index 18240f6..b9edbfb 100644
--- a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md
+++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md
@@ -217,6 +217,22 @@ under the License.
"AWS_SECRET_KEY"="",
"AWS_REGION" = ""
)
+ 6. 如果使用HDFS协议直接连接远程存储时需要指定如下属性
+ (
+ "fs.defaultFS" = "",
+ "hdfs_user"="",
+ "kerb_principal" = "",
+ "kerb_ticket_cache_path" = "",
+ "kerb_token" = ""
+ )
+ fs.defaultFS: hdfs集群defaultFS
+ hdfs_user: 连接hdfs集群时使用的用户名
+ namenode HA:
+ 通过配置 namenode HA,可以在 namenode 切换时,自动识别到新的 namenode
+ dfs.nameservices: 指定 hdfs 服务的名字,自定义,如:"dfs.nameservices" = "my_ha"
+ dfs.ha.namenodes.xxx:自定义 namenode 的名字,多个名字以逗号分隔。其中 xxx 为
dfs.nameservices 中自定义的名字,如 "dfs.ha.namenodes.my_ha" = "my_nn"
+ dfs.namenode.rpc-address.xxx.nn:指定 namenode 的rpc地址信息。其中 nn 表示
dfs.ha.namenodes.xxx 中配置的 namenode 的名字,如:"dfs.namenode.rpc-address.my_ha.my_nn"
= "host:port"
+ dfs.client.failover.proxy.provider:指定 client 连接 namenode 的
provider,默认为:org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
4. opt_properties
@@ -547,6 +563,36 @@ under the License.
)
WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
+ 16. LOAD WITH HDFS, 普通HDFS集群
+ LOAD LABEL example_db.label_filter
+ (
+ DATA INFILE("hdfs://host:port/user/data/*/test.txt")
+ INTO TABLE `tbl1`
+ COLUMNS TERMINATED BY ","
+ (k1,k2,v1,v2)
+ )
+ with HDFS (
+ "fs.defaultFS"="hdfs://testFs",
+ "hdfs_user"="user"
+ );
+ 17. LOAD WITH HDFS, 带ha的HDFS集群
+ LOAD LABEL example_db.label_filter
+ (
+ DATA INFILE("hdfs://host:port/user/data/*/test.txt")
+ INTO TABLE `tbl1`
+ COLUMNS TERMINATED BY ","
+ (k1,k2,v1,v2)
+ )
+ with HDFS (
+ "fs.defaultFS"="hdfs://testFs",
+ "hdfs_user"="user"
+ "dfs.nameservices"="my_ha",
+ "dfs.ha.namenodes.xxx"="my_nn1,my_nn2",
+ "dfs.namenode.rpc-address.xxx.my_nn1"="host1:port",
+ "dfs.namenode.rpc-address.xxx.my_nn2"="host2:port",
+
"dfs.client.failover.proxy.provider.xxx"="org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
+ );
+
## keyword
BROKER,LOAD
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
index c936e90..cc73098 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
@@ -48,31 +48,69 @@ import org.apache.doris.thrift.TBrokerOperationStatus;
import org.apache.doris.thrift.TBrokerOperationStatusCode;
import org.apache.doris.thrift.TBrokerPReadRequest;
import org.apache.doris.thrift.TBrokerPWriteRequest;
+import org.apache.doris.thrift.TBrokerRangeDesc;
import org.apache.doris.thrift.TBrokerReadResponse;
import org.apache.doris.thrift.TBrokerRenamePathRequest;
import org.apache.doris.thrift.TBrokerVersion;
+import org.apache.doris.thrift.THdfsConf;
+import org.apache.doris.thrift.THdfsParams;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPaloBrokerService;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
import java.io.FileInputStream;
import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
public class BrokerUtil {
private static final Logger LOG = LogManager.getLogger(BrokerUtil.class);
private static int READ_BUFFER_SIZE_B = 1024 * 1024;
+ private static String HDFS_FS_KEY = "fs.defaultFS";
+ private static String HDFS_USER_KEY = "hdfs_user";
+ private static String HDFS_KERB_PRINCIPAL = "kerb_principal";
+ private static String HDFS_KERB_TICKET_CACHE_PATH =
"kerb_ticket_cache_path";
+ private static String HDFS_KERB_TOKEN = "kerb_token";
+
+ public static void generateHdfsParam(Map<String, String> properties,
TBrokerRangeDesc rangeDesc) {
+ rangeDesc.setHdfsParams(new THdfsParams());
+ rangeDesc.hdfs_params.setHdfsConf(new ArrayList<>());
+ for (Map.Entry<String, String> property : properties.entrySet()) {
+ if (property.getKey().equalsIgnoreCase(HDFS_FS_KEY)) {
+ rangeDesc.hdfs_params.setFsName(property.getValue());
+ } else if (property.getKey().equalsIgnoreCase(HDFS_USER_KEY)) {
+ rangeDesc.hdfs_params.setUser(property.getValue());
+ } else if
(property.getKey().equalsIgnoreCase(HDFS_KERB_PRINCIPAL)) {
+ rangeDesc.hdfs_params.setKerbPrincipal(property.getValue());
+ } else if
(property.getKey().equalsIgnoreCase(HDFS_KERB_TICKET_CACHE_PATH)) {
+
rangeDesc.hdfs_params.setKerbTicketCachePath(property.getValue());
+ } else if (property.getKey().equalsIgnoreCase(HDFS_KERB_TOKEN)) {
+ rangeDesc.hdfs_params.setToken(property.getValue());
+ } else {
+ THdfsConf hdfsConf = new THdfsConf();
+ hdfsConf.setKey(property.getKey());
+ hdfsConf.setValue(property.getValue());
+ rangeDesc.hdfs_params.hdfs_conf.add(hdfsConf);
+ }
+ }
+ }
/**
* Parse file status in path with broker, except directory
@@ -127,6 +165,34 @@ public class BrokerUtil {
fileStatuses.add(new TBrokerFileStatus(r.getName(),
!r.isFile(), r.getSize(), r.isFile()));
}
}
+ } else if (brokerDesc.getStorageType() ==
StorageBackend.StorageType.HDFS) {
+ if (!brokerDesc.getProperties().containsKey(HDFS_FS_KEY)
+ || !brokerDesc.getProperties().containsKey(HDFS_USER_KEY))
{
+ throw new UserException(String.format(
+ "The properties of hdfs is invalid. %s and %s are
needed", HDFS_FS_KEY, HDFS_USER_KEY));
+ }
+ String hdfsFsName = brokerDesc.getProperties().get(HDFS_FS_KEY);
+ String user = brokerDesc.getProperties().get(HDFS_USER_KEY);
+ Configuration conf = new Configuration();
+ for (Map.Entry<String, String> propEntry :
brokerDesc.getProperties().entrySet()) {
+ if (propEntry.getKey().equals(HDFS_FS_KEY) ||
propEntry.getKey().equals(HDFS_USER_KEY)) {
+ continue;
+ }
+ conf.set(propEntry.getKey(), propEntry.getValue());
+ }
+ try {
+ FileSystem fs = FileSystem.get(new URI(hdfsFsName), conf,
user);
+ FileStatus[] statusList = fs.listStatus(new Path(path));
+ for (FileStatus status : statusList) {
+ if (status.isFile()) {
+ fileStatuses.add(new
TBrokerFileStatus(status.getPath().toUri().getPath(),
+ status.isDirectory(), status.getLen(),
status.isFile()));
+ }
+ }
+ } catch (IOException | InterruptedException | URISyntaxException
e) {
+ LOG.warn("hdfs check error: ", e);
+ throw new UserException(e.getMessage());
+ }
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
index e3df23e..269f570 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
@@ -502,6 +502,12 @@ public class BrokerScanNode extends LoadScanNode {
rangeDesc.setFileSize(fileStatus.size);
rangeDesc.setNumOfColumnsFromFile(numberOfColumnsFromFile);
rangeDesc.setColumnsFromPath(columnsFromPath);
+ // set hdfs params for hdfs file type.
+ switch (brokerDesc.getFileType()) {
+ case FILE_HDFS:
+ BrokerUtil.generateHdfsParam(brokerDesc.getProperties(),
rangeDesc);
+ break;
+ }
return rangeDesc;
}
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 320b9e0..d3b2cd0 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -114,13 +114,12 @@ struct THdfsConf {
}
struct THdfsParams {
- 1: optional string host
- 2: optional i32 port
- 3: optional string user
- 4: optional string kerb_principal
- 5: optional string kerb_ticket_cache_path
- 6: optional string token
- 7: optional list<THdfsConf> hdfs_conf
+ 1: optional string fs_name
+ 2: optional string user
+ 3: optional string kerb_principal
+ 4: optional string kerb_ticket_cache_path
+ 5: optional string token
+ 6: optional list<THdfsConf> hdfs_conf
}
// One broker range information.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]