This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new c23a577f34 [Hotfix][Hive Connector] Fix Hive hdfs-site.xml and
hive-site.xml not be load error (#7069)
c23a577f34 is described below
commit c23a577f3442a65fc8e396a6feaaf1ce4e261ca9
Author: Eric <[email protected]>
AuthorDate: Mon Jul 1 14:33:55 2024 +0800
[Hotfix][Hive Connector] Fix Hive hdfs-site.xml and hive-site.xml not be
load error (#7069)
---
docs/en/connector-v2/sink/Hive.md | 2 +
docs/en/connector-v2/source/Hive.md | 164 +++++++++++++++++++--
.../seatunnel/file/config/HadoopConf.java | 3 +-
.../hive/exception/HiveConnectorErrorCode.java | 1 +
.../seatunnel/hive/storage/AbstractStorage.java | 95 ++++++------
.../seatunnel/hive/utils/HiveMetaStoreProxy.java | 58 ++++++--
6 files changed, 245 insertions(+), 78 deletions(-)
diff --git a/docs/en/connector-v2/sink/Hive.md
b/docs/en/connector-v2/sink/Hive.md
index dac4a814c2..023bb38ddb 100644
--- a/docs/en/connector-v2/sink/Hive.md
+++ b/docs/en/connector-v2/sink/Hive.md
@@ -176,6 +176,7 @@ sink {
metastore_uri = "thrift://ctyun7:9083"
hive.hadoop.conf = {
bucket = "s3a://mybucket"
+
fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider"
}
}
```
@@ -258,6 +259,7 @@ sink {
hive.hadoop.conf-path = "/home/ec2-user/hadoop-conf"
hive.hadoop.conf = {
bucket="s3://ws-package"
+
fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider"
}
}
}
diff --git a/docs/en/connector-v2/source/Hive.md
b/docs/en/connector-v2/source/Hive.md
index bb7b851409..da70cf7aa3 100644
--- a/docs/en/connector-v2/source/Hive.md
+++ b/docs/en/connector-v2/source/Hive.md
@@ -33,19 +33,21 @@ Read all the data in a split in a pollNext call. What
splits are read will be sa
## Options
-| name | type | required | default value |
-|----------------------|--------|----------|----------------|
-| table_name | string | yes | - |
-| metastore_uri | string | yes | - |
-| krb5_path | string | no | /etc/krb5.conf |
-| kerberos_principal | string | no | - |
-| kerberos_keytab_path | string | no | - |
-| hdfs_site_path | string | no | - |
-| hive_site_path | string | no | - |
-| read_partitions | list | no | - |
-| read_columns | list | no | - |
-| compress_codec | string | no | none |
-| common-options | | no | - |
+| name | type | required | default value |
+|-----------------------|--------|----------|----------------|
+| table_name | string | yes | - |
+| metastore_uri | string | yes | - |
+| krb5_path | string | no | /etc/krb5.conf |
+| kerberos_principal | string | no | - |
+| kerberos_keytab_path | string | no | - |
+| hdfs_site_path | string | no | - |
+| hive_site_path | string | no | - |
+| hive.hadoop.conf | Map | no | - |
+| hive.hadoop.conf-path | string | no | - |
+| read_partitions | list | no | - |
+| read_columns | list | no | - |
+| compress_codec | string | no | none |
+| common-options | | no | - |
### table_name [string]
@@ -59,6 +61,14 @@ Hive metastore uri
The path of `hdfs-site.xml`, used to load ha configuration of namenodes
+### hive.hadoop.conf [map]
+
+Properties in hadoop conf('core-site.xml', 'hdfs-site.xml', 'hive-site.xml')
+
+### hive.hadoop.conf-path [string]
+
+The specified loading path for the 'core-site.xml', 'hdfs-site.xml',
'hive-site.xml' files
+
### read_partitions [list]
The target partitions that user want to read from hive table, if user does not
set this parameter, it will read all the data from hive table.
@@ -128,6 +138,134 @@ Source plugin common parameters, please refer to [Source
Common Options](common-
```
+## Hive on s3
+
+### Step 1
+
+Create the lib dir for hive of emr.
+
+```shell
+mkdir -p ${SEATUNNEL_HOME}/plugins/Hive/lib
+```
+
+### Step 2
+
+Get the jars from maven center to the lib.
+
+```shell
+cd ${SEATUNNEL_HOME}/plugins/Hive/lib
+wget
https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.6.5/hadoop-aws-2.6.5.jar
+wget
https://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.9/hive-exec-2.3.9.jar
+```
+
+### Step 3
+
+Copy the jars from your environment on emr to the lib dir.
+
+```shell
+cp /usr/share/aws/emr/emrfs/lib/emrfs-hadoop-assembly-2.60.0.jar
${SEATUNNEL_HOME}/plugins/Hive/lib
+cp /usr/share/aws/emr/hadoop-state-pusher/lib/hadoop-common-3.3.6-amzn-1.jar
${SEATUNNEL_HOME}/plugins/Hive/lib
+cp /usr/share/aws/emr/hadoop-state-pusher/lib/javax.inject-1.jar
${SEATUNNEL_HOME}/plugins/Hive/lib
+cp /usr/share/aws/emr/hadoop-state-pusher/lib/aopalliance-1.0.jar
${SEATUNNEL_HOME}/plugins/Hive/lib
+```
+
+### Step 4
+
+Run the case.
+
+```shell
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Hive {
+ table_name = "test_hive.test_hive_sink_on_s3"
+ metastore_uri =
"thrift://ip-192-168-0-202.cn-north-1.compute.internal:9083"
+ hive.hadoop.conf-path = "/home/ec2-user/hadoop-conf"
+ hive.hadoop.conf = {
+ bucket="s3://ws-package"
+
fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider"
+ }
+ read_columns = ["pk_id", "name", "score"]
+ }
+}
+
+sink {
+ Hive {
+ table_name = "test_hive.test_hive_sink_on_s3_sink"
+ metastore_uri =
"thrift://ip-192-168-0-202.cn-north-1.compute.internal:9083"
+ hive.hadoop.conf-path = "/home/ec2-user/hadoop-conf"
+ hive.hadoop.conf = {
+ bucket="s3://ws-package"
+
fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider"
+ }
+ }
+}
+```
+
+## Hive on oss
+
+### Step 1
+
+Create the lib dir for hive of emr.
+
+```shell
+mkdir -p ${SEATUNNEL_HOME}/plugins/Hive/lib
+```
+
+### Step 2
+
+Get the jars from maven center to the lib.
+
+```shell
+cd ${SEATUNNEL_HOME}/plugins/Hive/lib
+wget
https://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.9/hive-exec-2.3.9.jar
+```
+
+### Step 3
+
+Copy the jars from your environment on emr to the lib dir and delete the
conflicting jar.
+
+```shell
+cp -r /opt/apps/JINDOSDK/jindosdk-current/lib/jindo-*.jar
${SEATUNNEL_HOME}/plugins/Hive/lib
+rm -f ${SEATUNNEL_HOME}/lib/hadoop-aliyun-*.jar
+```
+
+### Step 4
+
+Run the case.
+
+```shell
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Hive {
+ table_name = "test_hive.test_hive_sink_on_oss"
+ metastore_uri =
"thrift://master-1-1.c-1009b01725b501f2.cn-wulanchabu.emr.aliyuncs.com:9083"
+ hive.hadoop.conf-path = "/tmp/hadoop"
+ hive.hadoop.conf = {
+ bucket="oss://emr-osshdfs.cn-wulanchabu.oss-dls.aliyuncs.com"
+ }
+ }
+}
+
+sink {
+ Hive {
+ table_name = "test_hive.test_hive_sink_on_oss_sink"
+ metastore_uri =
"thrift://master-1-1.c-1009b01725b501f2.cn-wulanchabu.emr.aliyuncs.com:9083"
+ hive.hadoop.conf-path = "/tmp/hadoop"
+ hive.hadoop.conf = {
+ bucket="oss://emr-osshdfs.cn-wulanchabu.oss-dls.aliyuncs.com"
+ }
+ }
+}
+```
+
## Changelog
### 2.2.0-beta 2022-09-26
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java
index 3a88fa3b3d..c7a2182753 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.file.config;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.Path;
@@ -62,7 +63,7 @@ public class HadoopConf implements Serializable {
removeUnwantedOverwritingProps(extraOptions);
extraOptions.forEach(configuration::set);
}
- if (hdfsSitePath != null) {
+ if (StringUtils.isNotBlank(hdfsSitePath)) {
Configuration hdfsSiteConfiguration = new Configuration();
hdfsSiteConfiguration.addResource(new Path(hdfsSitePath));
unsetUnwantedOverwritingProps(hdfsSiteConfiguration);
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/exception/HiveConnectorErrorCode.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/exception/HiveConnectorErrorCode.java
index a0923acc1b..bf3268bdbe 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/exception/HiveConnectorErrorCode.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/exception/HiveConnectorErrorCode.java
@@ -25,6 +25,7 @@ public enum HiveConnectorErrorCode implements
SeaTunnelErrorCode {
GET_HIVE_TABLE_INFORMATION_FAILED(
"HIVE-03", "Get hive table information from hive metastore service
failed"),
HIVE_TABLE_NAME_ERROR("HIVE-04", "Hive table name is invalid"),
+ LOAD_HIVE_BASE_HADOOP_CONFIG_FAILED("HIVE-05", "Load hive base hadoop
config failed"),
;
private final String code;
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/AbstractStorage.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/AbstractStorage.java
index 22f9e61880..7f453c936b 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/AbstractStorage.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/AbstractStorage.java
@@ -23,8 +23,11 @@ import
org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig;
+import org.apache.seatunnel.common.utils.ExceptionUtils;
+import
org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig;
+import
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
@@ -63,57 +66,49 @@ public abstract class AbstractStorage implements Storage {
* @return
*/
protected Configuration loadHiveBaseHadoopConfig(ReadonlyConfig
readonlyConfig) {
- Configuration configuration = new Configuration();
- // Try to load from hadoop_conf_path(The Bucket configuration is
typically in core-site.xml)
- Optional<String> hadoopConfPath =
readonlyConfig.getOptional(HiveConfig.HADOOP_CONF_PATH);
- if (hadoopConfPath.isPresent()) {
- HADOOP_CONF_FILES.forEach(
- confFile -> {
- java.nio.file.Path path =
Paths.get(hadoopConfPath.get(), confFile);
- if (Files.exists(path)) {
- try {
-
configuration.addResource(path.toUri().toURL());
- } catch (IOException e) {
- log.warn(
- "Error adding Hadoop resource {},
resource was not added",
- path,
- e);
- }
- }
- });
- }
- readonlyConfig
- .getOptional(BaseSinkConfig.HDFS_SITE_PATH)
- .ifPresent(
- hdfsSitePath -> {
- try {
- configuration.addResource(new
File(hdfsSitePath).toURI().toURL());
- } catch (IOException e) {
- log.warn(
- "Error adding Hadoop resource {},
resource was not added",
- hdfsSitePath,
- e);
+ try {
+ Configuration configuration = new Configuration();
+ // Try to load from hadoop_conf_path(The Bucket configuration is
typically in
+ // core-site.xml)
+ Optional<String> hadoopConfPath =
+ readonlyConfig.getOptional(HiveConfig.HADOOP_CONF_PATH);
+ if (hadoopConfPath.isPresent()) {
+ HADOOP_CONF_FILES.forEach(
+ confFile -> {
+ java.nio.file.Path path =
Paths.get(hadoopConfPath.get(), confFile);
+ if (Files.exists(path)) {
+ try {
+
configuration.addResource(path.toUri().toURL());
+ } catch (IOException e) {
+ log.warn(
+ "Error adding Hadoop resource {},
resource was not added",
+ path,
+ e);
+ }
}
});
- readonlyConfig
- .getOptional(HiveConfig.HIVE_SITE_PATH)
- .ifPresent(
- hiveSitePath -> {
- try {
- configuration.addResource(new
File(hiveSitePath).toURI().toURL());
- } catch (IOException e) {
- log.warn(
- "Error adding Hadoop resource {},
resource was not added",
- hiveSitePath,
- e);
- }
- });
- // Try to load from hadoopConf
- Optional<Map<String, String>> hadoopConf =
- readonlyConfig.getOptional(HiveConfig.HADOOP_CONF);
- if (hadoopConf.isPresent()) {
- hadoopConf.get().forEach((k, v) -> configuration.set(k, v));
+ }
+ String hiveSitePath =
readonlyConfig.get(HiveConfig.HIVE_SITE_PATH);
+ String hdfsSitePath =
readonlyConfig.get(HdfsSourceConfigOptions.HDFS_SITE_PATH);
+ if (StringUtils.isNotBlank(hdfsSitePath)) {
+ configuration.addResource(new
File(hdfsSitePath).toURI().toURL());
+ }
+
+ if (StringUtils.isNotBlank(hiveSitePath)) {
+ configuration.addResource(new
File(hiveSitePath).toURI().toURL());
+ }
+ // Try to load from hadoopConf
+ Optional<Map<String, String>> hadoopConf =
+ readonlyConfig.getOptional(HiveConfig.HADOOP_CONF);
+ if (hadoopConf.isPresent()) {
+ hadoopConf.get().forEach((k, v) -> configuration.set(k, v));
+ }
+ return configuration;
+ } catch (Exception e) {
+ String errorMsg = String.format("Failed to load hadoop
configuration, please check it");
+ log.error(errorMsg + ":" + ExceptionUtils.getMessage(e));
+ throw new HiveConnectorException(
+
HiveConnectorErrorCode.LOAD_HIVE_BASE_HADOOP_CONFIG_FAILED, e);
}
- return configuration;
}
}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
index d4d8ca3b7f..62d917ca0d 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
@@ -17,9 +17,12 @@
package org.apache.seatunnel.connectors.seatunnel.hive.utils;
+import org.apache.seatunnel.shade.com.google.common.collect.ImmutableList;
+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
import
org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopLoginFactory;
+import
org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfigOptions;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig;
import
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.hive.source.config.HiveSourceOptions;
@@ -37,7 +40,10 @@ import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import java.io.File;
+import java.io.IOException;
import java.net.MalformedURLException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
import java.util.List;
import java.util.Objects;
@@ -45,36 +51,60 @@ import java.util.Objects;
public class HiveMetaStoreProxy {
private HiveMetaStoreClient hiveMetaStoreClient;
private static volatile HiveMetaStoreProxy INSTANCE = null;
+ private static final List<String> HADOOP_CONF_FILES =
ImmutableList.of("hive-site.xml");
private HiveMetaStoreProxy(ReadonlyConfig readonlyConfig) {
String metastoreUri =
readonlyConfig.get(HiveSourceOptions.METASTORE_URI);
+ String hiveHadoopConfigPath =
readonlyConfig.get(HiveConfig.HADOOP_CONF_PATH);
+ String hiveSitePath = readonlyConfig.get(HiveConfig.HIVE_SITE_PATH);
HiveConf hiveConf = new HiveConf();
hiveConf.set("hive.metastore.uris", metastoreUri);
try {
- if
(StringUtils.isNotEmpty(readonlyConfig.get(HiveSourceOptions.HIVE_SITE_PATH))) {
- String hiveSitePath =
readonlyConfig.get(HiveSourceOptions.HIVE_SITE_PATH);
+ if (StringUtils.isNotBlank(hiveHadoopConfigPath)) {
+ HADOOP_CONF_FILES.forEach(
+ confFile -> {
+ java.nio.file.Path path =
Paths.get(hiveHadoopConfigPath, confFile);
+ if (Files.exists(path)) {
+ try {
+ hiveConf.addResource(path.toUri().toURL());
+ } catch (IOException e) {
+ log.warn(
+ "Error adding Hadoop resource {},
resource was not added",
+ path,
+ e);
+ }
+ }
+ });
+ }
+
+ if (StringUtils.isNotBlank(hiveSitePath)) {
hiveConf.addResource(new File(hiveSitePath).toURI().toURL());
}
+
+ log.info("hive client conf:{}", hiveConf);
if (HiveMetaStoreProxyUtils.enableKerberos(readonlyConfig)) {
- Configuration hadoopConfig = new Configuration();
- hadoopConfig.set("hadoop.security.authentication", "kerberos");
+ // login Kerberos
+ Configuration authConf = new Configuration();
+ authConf.set("hadoop.security.authentication", "kerberos");
this.hiveMetaStoreClient =
HadoopLoginFactory.loginWithKerberos(
- hadoopConfig,
-
readonlyConfig.get(BaseSourceConfigOptions.KRB5_PATH),
-
readonlyConfig.get(BaseSourceConfigOptions.KERBEROS_PRINCIPAL),
-
readonlyConfig.get(BaseSourceConfigOptions.KERBEROS_KEYTAB_PATH),
- (configuration, userGroupInformation) ->
- new HiveMetaStoreClient(hiveConf));
+ authConf,
+
readonlyConfig.get(HdfsSourceConfigOptions.KRB5_PATH),
+
readonlyConfig.get(HdfsSourceConfigOptions.KERBEROS_PRINCIPAL),
+
readonlyConfig.get(HdfsSourceConfigOptions.KERBEROS_KEYTAB_PATH),
+ (conf, userGroupInformation) -> {
+ return new HiveMetaStoreClient(hiveConf);
+ });
return;
}
if (HiveMetaStoreProxyUtils.enableRemoteUser(readonlyConfig)) {
this.hiveMetaStoreClient =
HadoopLoginFactory.loginWithRemoteUser(
new Configuration(),
-
readonlyConfig.get(BaseSourceConfigOptions.REMOTE_USER),
- (configuration, userGroupInformation) ->
- new HiveMetaStoreClient(hiveConf));
+
readonlyConfig.get(HdfsSourceConfigOptions.REMOTE_USER),
+ (conf, userGroupInformation) -> {
+ return new HiveMetaStoreClient(hiveConf);
+ });
return;
}
this.hiveMetaStoreClient = new HiveMetaStoreClient(hiveConf);