This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 79bb70101a [bigfix][S3 File]:Change the [SCHEMA] attribute of the
[S3CONF class] to be non-static to avoid being reassigned after deserialization
(#6717)
79bb70101a is described below
commit 79bb70101a5056de7cd44197ce6a613bf5ad3a01
Author: Leon Yoah <[email protected]>
AuthorDate: Wed Apr 17 09:50:54 2024 +0800
[bigfix][S3 File]:Change the [SCHEMA] attribute of the [S3CONF class] to be
non-static to avoid being reassigned after deserialization (#6717)
---
.../seatunnel/file/s3/config/S3Conf.java | 41 +++++++---------------
1 file changed, 13 insertions(+), 28 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3Conf.java
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3Conf.java
index dbc05323b4..cc35141866 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3Conf.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3Conf.java
@@ -31,7 +31,7 @@ public class S3Conf extends HadoopConf {
private static final String HDFS_S3A_IMPL =
"org.apache.hadoop.fs.s3a.S3AFileSystem";
private static final String S3A_SCHEMA = "s3a";
private static final String DEFAULT_SCHEMA = "s3n";
- private static String SCHEMA = DEFAULT_SCHEMA;
+ private String schema = DEFAULT_SCHEMA;
@Override
public String getFsHdfsImpl() {
@@ -40,7 +40,11 @@ public class S3Conf extends HadoopConf {
@Override
public String getSchema() {
- return SCHEMA;
+ return this.schema;
+ }
+
+ public void setSchema(String schema) {
+ this.schema = schema;
}
private S3Conf(String hdfsNameKey) {
@@ -49,13 +53,13 @@ public class S3Conf extends HadoopConf {
public static HadoopConf buildWithConfig(Config config) {
- HadoopConf hadoopConf = new
S3Conf(config.getString(S3ConfigOptions.S3_BUCKET.key()));
String bucketName = config.getString(S3ConfigOptions.S3_BUCKET.key());
+ S3Conf hadoopConf = new S3Conf(bucketName);
if (bucketName.startsWith(S3A_SCHEMA)) {
- SCHEMA = S3A_SCHEMA;
+ hadoopConf.setSchema(S3A_SCHEMA);
}
HashMap<String, String> s3Options = new HashMap<>();
- putS3SK(s3Options, config);
+ hadoopConf.putS3SK(s3Options, config);
if (CheckConfigUtil.isValidParam(config,
S3ConfigOptions.S3_PROPERTIES.key())) {
config.getObject(S3ConfigOptions.S3_PROPERTIES.key())
.forEach((key, value) -> s3Options.put(key,
String.valueOf(value.unwrapped())));
@@ -73,30 +77,11 @@ public class S3Conf extends HadoopConf {
public static HadoopConf buildWithReadOnlyConfig(ReadonlyConfig
readonlyConfig) {
Config config = readonlyConfig.toConfig();
- HadoopConf hadoopConf = new
S3Conf(readonlyConfig.get(S3ConfigOptions.S3_BUCKET));
- String bucketName = readonlyConfig.get(S3ConfigOptions.S3_BUCKET);
- if (bucketName.startsWith(S3A_SCHEMA)) {
- SCHEMA = S3A_SCHEMA;
- }
- HashMap<String, String> s3Options = new HashMap<>();
- putS3SK(s3Options, config);
- if (CheckConfigUtil.isValidParam(config,
S3ConfigOptions.S3_PROPERTIES.key())) {
- config.getObject(S3ConfigOptions.S3_PROPERTIES.key())
- .forEach((key, value) -> s3Options.put(key,
String.valueOf(value.unwrapped())));
- }
-
- s3Options.put(
- S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER.key(),
-
readonlyConfig.get(S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER).getProvider());
- s3Options.put(
- S3ConfigOptions.FS_S3A_ENDPOINT.key(),
- readonlyConfig.get(S3ConfigOptions.FS_S3A_ENDPOINT));
- hadoopConf.setExtraOptions(s3Options);
- return hadoopConf;
+ return buildWithConfig(config);
}
private String switchHdfsImpl() {
- switch (SCHEMA) {
+ switch (this.schema) {
case S3A_SCHEMA:
return HDFS_S3A_IMPL;
default:
@@ -104,14 +89,14 @@ public class S3Conf extends HadoopConf {
}
}
- private static void putS3SK(Map<String, String> s3Options, Config config) {
+ private void putS3SK(Map<String, String> s3Options, Config config) {
if (!CheckConfigUtil.isValidParam(config,
S3ConfigOptions.S3_ACCESS_KEY.key())
&& !CheckConfigUtil.isValidParam(config,
S3ConfigOptions.S3_SECRET_KEY.key())) {
return;
}
String accessKey =
config.getString(S3ConfigOptions.S3_ACCESS_KEY.key());
String secretKey =
config.getString(S3ConfigOptions.S3_SECRET_KEY.key());
- if (S3A_SCHEMA.equals(SCHEMA)) {
+ if (S3A_SCHEMA.equals(this.schema)) {
s3Options.put("fs.s3a.access.key", accessKey);
s3Options.put("fs.s3a.secret.key", secretKey);
return;