This is an automated email from the ASF dual-hosted git repository.
atul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 75045970cd S3 Ingestion from non-default endpoints (#11798)
75045970cd is described below
commit 75045970cde40562d9445ba4a9d27705742b29ab
Author: Atul Mohan <[email protected]>
AuthorDate: Fri Jul 15 11:03:34 2022 -0700
S3 Ingestion from non-default endpoints (#11798)
* Add endpoint support for s3inputsource
* Changes to tests
* Fix docs
* Fix config
* Fix inspections
* Fix spelling
* Remove password from toString
---
.../apache/druid/common/aws/AWSClientConfig.java | 11 ++
.../apache/druid/common/aws/AWSEndpointConfig.java | 9 ++
.../apache/druid/common/aws/AWSProxyConfig.java | 10 ++
docs/ingestion/native-batch-input-source.md | 42 +++++
.../apache/druid/data/input/s3/S3InputSource.java | 142 +++++++++++++---
.../druid/storage/s3/S3StorageDruidModule.java | 58 +------
.../java/org/apache/druid/storage/s3/S3Utils.java | 58 +++++++
.../druid/data/input/s3/S3InputSourceTest.java | 179 ++++++++++++++++++++-
website/.spelling | 3 +
9 files changed, 431 insertions(+), 81 deletions(-)
diff --git
a/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSClientConfig.java
b/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSClientConfig.java
index 5a0a8b0afb..7886bd0a9e 100644
---
a/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSClientConfig.java
+++
b/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSClientConfig.java
@@ -55,4 +55,15 @@ public class AWSClientConfig
{
return forceGlobalBucketAccessEnabled;
}
+
+ @Override
+ public String toString()
+ {
+ return "AWSClientConfig{" +
+ "protocol='" + protocol + '\'' +
+ ", disableChunkedEncoding=" + disableChunkedEncoding +
+ ", enablePathStyleAccess=" + enablePathStyleAccess +
+ ", forceGlobalBucketAccessEnabled=" +
forceGlobalBucketAccessEnabled +
+ '}';
+ }
}
diff --git
a/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSEndpointConfig.java
b/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSEndpointConfig.java
index 80216d9711..302ca09821 100644
---
a/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSEndpointConfig.java
+++
b/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSEndpointConfig.java
@@ -44,4 +44,13 @@ public class AWSEndpointConfig
{
return signingRegion;
}
+
+ @Override
+ public String toString()
+ {
+ return "AWSEndpointConfig{" +
+ "url='" + url + '\'' +
+ ", signingRegion='" + signingRegion + '\'' +
+ '}';
+ }
}
diff --git
a/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSProxyConfig.java
b/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSProxyConfig.java
index 085e810834..5153a74239 100644
---
a/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSProxyConfig.java
+++
b/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSProxyConfig.java
@@ -54,4 +54,14 @@ public class AWSProxyConfig
{
return password;
}
+
+ @Override
+ public String toString()
+ {
+ return "AWSProxyConfig{" +
+ "host='" + host + '\'' +
+ ", port=" + port +
+ ", username='" + username + '\'' +
+ '}';
+ }
}
diff --git a/docs/ingestion/native-batch-input-source.md
b/docs/ingestion/native-batch-input-source.md
index 8ce42073dc..f4b92bdfe7 100644
--- a/docs/ingestion/native-batch-input-source.md
+++ b/docs/ingestion/native-batch-input-source.md
@@ -138,6 +138,45 @@ Sample specs:
...
```
+```json
+...
+ "ioConfig": {
+ "type": "index_parallel",
+ "inputSource": {
+ "type": "s3",
+ "uris": ["s3://foo/bar/file.json", "s3://bar/foo/file2.json"],
+ "endpointConfig": {
+ "url" : "s3-store.aws.com",
+ "signingRegion" : "us-west-2"
+ },
+ "clientConfig": {
+ "protocol" : "http",
+ "disableChunkedEncoding" : true,
+ "enablePathStyleAccess" : true,
+ "forceGlobalBucketAccessEnabled" : false
+ },
+ "proxyConfig": {
+ "host" : "proxy-s3.aws.com",
+ "port" : 8888,
+ "username" : "admin",
+ "password" : "admin"
+ },
+
+ "properties": {
+ "accessKeyId": "KLJ78979SDFdS2",
+ "secretAccessKey": "KLS89s98sKJHKJKJH8721lljkd",
+ "assumeRoleArn": "arn:aws:iam::2981002874992:role/role-s3"
+ }
+ },
+ "inputFormat": {
+ "type": "json"
+ },
+ ...
+ },
+...
+```
+
+
|property|description|default|required?|
|--------|-----------|-------|---------|
|type|This should be `s3`.|None|yes|
@@ -145,6 +184,9 @@ Sample specs:
|prefixes|JSON array of URI prefixes for the locations of S3 objects to be
ingested. Empty objects starting with one of the given prefixes will be
skipped.|None|`uris` or `prefixes` or `objects` must be set|
|objects|JSON array of S3 Objects to be ingested.|None|`uris` or `prefixes` or
`objects` must be set|
|filter|A wildcard filter for files. See
[here](http://commons.apache.org/proper/commons-io/apidocs/org/apache/commons/io/filefilter/WildcardFileFilter)
for more information. Files matching the filter criteria are considered for
ingestion. Files not matching the filter criteria are ignored.|None|no|
+| endpointConfig |Config for overriding the default S3 endpoint and signing
region. This would allow ingesting data from a different S3 store. Please see
[s3
config](../development/extensions-core/s3.md#connecting-to-s3-configuration)
for more information.|None|No (defaults will be used if not given)
+| clientConfig |S3 client properties for the overridden s3 endpoint. This is
used in conjunction with `endPointConfig`. Please see [s3
config](../development/extensions-core/s3.md#connecting-to-s3-configuration)
for more information.|None|No (defaults will be used if not given)
+| proxyConfig |Properties for specifying proxy information for the overridden
s3 endpoint. This is used in conjunction with `clientConfig`. Please see [s3
config](../development/extensions-core/s3.md#connecting-to-s3-configuration)
for more information.|None|No (defaults will be used if not given)
|properties|Properties Object for overriding the default S3 configuration. See
below for more information.|None|No (defaults will be used if not given)
Note that the S3 input source will skip all empty objects only when `prefixes`
is specified.
diff --git
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java
index e94b679a25..549df7247e 100644
---
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java
+++
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java
@@ -19,10 +19,12 @@
package org.apache.druid.data.input.s3;
+import com.amazonaws.Protocol;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
+import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
import
com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
@@ -35,6 +37,9 @@ import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.Iterators;
import org.apache.commons.io.FilenameUtils;
+import org.apache.druid.common.aws.AWSClientConfig;
+import org.apache.druid.common.aws.AWSEndpointConfig;
+import org.apache.druid.common.aws.AWSProxyConfig;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputFileAttribute;
import org.apache.druid.data.input.InputSplit;
@@ -69,24 +74,30 @@ public class S3InputSource extends CloudObjectInputSource
@JsonProperty("properties")
private final S3InputSourceConfig s3InputSourceConfig;
private final S3InputDataConfig inputDataConfig;
+ private final AWSProxyConfig awsProxyConfig;
+ private final AWSClientConfig awsClientConfig;
+ private final AWSEndpointConfig awsEndpointConfig;
private final AWSCredentialsProvider awsCredentialsProvider;
private int maxRetries;
/**
* Constructor for S3InputSource
- * @param s3Client The default ServerSideEncryptingAmazonS3
client built with all default configs
- * from Guice. This injected singleton client
is use when {@param s3InputSourceConfig}
- * is not provided and hence, we can skip
building a new client from
- * {@param s3ClientBuilder}
- * @param s3ClientBuilder Use for building a new s3Client to use
instead of the default injected
- * {@param s3Client}. The configurations of
the client can be changed
- * before being built
- * @param inputDataConfig Stores the configuration for options
related to reading input data
- * @param uris User provided uris to read input data
- * @param prefixes User provided prefixes to read input data
- * @param objects User provided cloud objects values to read
input data
- * @param s3InputSourceConfig User provided properties for overriding
the default S3 configuration
*
+ * @param s3Client The default ServerSideEncryptingAmazonS3
client built with all default configs
+ * from Guice. This injected singleton client is
use when {@param s3InputSourceConfig}
+ * is not provided and hence, we can skip
building a new client from
+ * {@param s3ClientBuilder}
+ * @param s3ClientBuilder Use for building a new s3Client to use instead
of the default injected
+ * {@param s3Client}. The configurations of the
client can be changed
+ * before being built
+ * @param inputDataConfig Stores the configuration for options related
to reading input data
+ * @param uris User provided uris to read input data
+ * @param prefixes User provided prefixes to read input data
+ * @param objects User provided cloud objects values to read
input data
+ * @param s3InputSourceConfig User provided properties for overriding the
default S3 credentials
+ * @param awsProxyConfig User provided proxy information for the
overridden s3 client
+ * @param awsEndpointConfig User provided s3 endpoint and region for
overriding the default S3 endpoint
+ * @param awsClientConfig User provided properties for the S3 client
with the overridden endpoint
*/
@JsonCreator
public S3InputSource(
@@ -98,6 +109,9 @@ public class S3InputSource extends CloudObjectInputSource
@JsonProperty("objects") @Nullable List<CloudObjectLocation> objects,
@JsonProperty("filter") @Nullable String filter,
@JsonProperty("properties") @Nullable S3InputSourceConfig
s3InputSourceConfig,
+ @JsonProperty("proxyConfig") @Nullable AWSProxyConfig awsProxyConfig,
+ @JsonProperty("endpointConfig") @Nullable AWSEndpointConfig
awsEndpointConfig,
+ @JsonProperty("clientConfig") @Nullable AWSClientConfig awsClientConfig,
@JacksonInject AWSCredentialsProvider awsCredentialsProvider
)
{
@@ -105,9 +119,39 @@ public class S3InputSource extends CloudObjectInputSource
this.inputDataConfig = Preconditions.checkNotNull(inputDataConfig,
"S3DataSegmentPusherConfig");
Preconditions.checkNotNull(s3Client, "s3Client");
this.s3InputSourceConfig = s3InputSourceConfig;
+ this.awsProxyConfig = awsProxyConfig;
+ this.awsClientConfig = awsClientConfig;
+ this.awsEndpointConfig = awsEndpointConfig;
+
this.s3ClientSupplier = Suppliers.memoize(
() -> {
if (s3ClientBuilder != null && s3InputSourceConfig != null) {
+ if (awsEndpointConfig != null && awsEndpointConfig.getUrl() !=
null) {
+ s3ClientBuilder
+ .getAmazonS3ClientBuilder().setEndpointConfiguration(
+ new AwsClientBuilder.EndpointConfiguration(
+ awsEndpointConfig.getUrl(),
+ awsEndpointConfig.getSigningRegion()
+ ));
+ if (awsClientConfig != null) {
+ s3ClientBuilder
+ .getAmazonS3ClientBuilder()
+
.withChunkedEncodingDisabled(awsClientConfig.isDisableChunkedEncoding())
+
.withPathStyleAccessEnabled(awsClientConfig.isEnablePathStyleAccess())
+
.withForceGlobalBucketAccessEnabled(awsClientConfig.isForceGlobalBucketAccessEnabled());
+
+ if (awsProxyConfig != null) {
+ final Protocol protocol =
S3Utils.determineProtocol(awsClientConfig, awsEndpointConfig);
+ s3ClientBuilder
+ .getAmazonS3ClientBuilder()
+ .withClientConfiguration(S3Utils.setProxyConfig(
+ s3ClientBuilder.getAmazonS3ClientBuilder()
+ .getClientConfiguration(),
+ awsProxyConfig
+ ).withProtocol(protocol));
+ }
+ }
+ }
if (s3InputSourceConfig.isCredentialsConfigured()) {
if (s3InputSourceConfig.getAssumeRoleArn() == null) {
s3ClientBuilder
@@ -142,10 +186,25 @@ public class S3InputSource extends CloudObjectInputSource
List<URI> prefixes,
List<CloudObjectLocation> objects,
String filter,
- S3InputSourceConfig s3InputSourceConfig
+ S3InputSourceConfig s3InputSourceConfig,
+ AWSProxyConfig awsProxyConfig,
+ AWSEndpointConfig awsEndpointConfig,
+ AWSClientConfig awsClientConfig
)
{
- this(s3Client, s3ClientBuilder, inputDataConfig, uris, prefixes, objects,
filter, s3InputSourceConfig, null);
+ this(s3Client,
+ s3ClientBuilder,
+ inputDataConfig,
+ uris,
+ prefixes,
+ objects,
+ filter,
+ s3InputSourceConfig,
+ awsProxyConfig,
+ awsEndpointConfig,
+ awsClientConfig,
+ null
+ );
}
@VisibleForTesting
@@ -158,10 +217,26 @@ public class S3InputSource extends CloudObjectInputSource
List<CloudObjectLocation> objects,
String filter,
S3InputSourceConfig s3InputSourceConfig,
+ AWSProxyConfig awsProxyConfig,
+ AWSEndpointConfig awsEndpointConfig,
+ AWSClientConfig awsClientConfig,
int maxRetries
)
{
- this(s3Client, s3ClientBuilder, inputDataConfig, uris, prefixes, objects,
filter, s3InputSourceConfig, null);
+ this(
+ s3Client,
+ s3ClientBuilder,
+ inputDataConfig,
+ uris,
+ prefixes,
+ objects,
+ filter,
+ s3InputSourceConfig,
+ awsProxyConfig,
+ awsEndpointConfig,
+ awsClientConfig,
+ null
+ );
this.maxRetries = maxRetries;
}
@@ -175,8 +250,9 @@ public class S3InputSource extends CloudObjectInputSource
if (assumeRoleArn != null) {
String roleSessionName = StringUtils.format("druid-s3-input-source-%s",
UUID.randomUUID().toString());
AWSSecurityTokenService securityTokenService =
AWSSecurityTokenServiceClientBuilder.standard()
-
.withCredentials(awsCredentialsProvider)
-
.build();
+
.withCredentials(
+
awsCredentialsProvider)
+
.build();
STSAssumeRoleSessionCredentialsProvider.Builder
roleCredentialsProviderBuilder;
roleCredentialsProviderBuilder = new
STSAssumeRoleSessionCredentialsProvider
.Builder(assumeRoleArn,
roleSessionName).withStsClient(securityTokenService);
@@ -207,6 +283,27 @@ public class S3InputSource extends CloudObjectInputSource
return s3InputSourceConfig;
}
+ @Nullable
+ @JsonProperty("proxyConfig")
+ public AWSProxyConfig getAwsProxyConfig()
+ {
+ return awsProxyConfig;
+ }
+
+ @Nullable
+ @JsonProperty("clientConfig")
+ public AWSClientConfig getAwsClientConfig()
+ {
+ return awsClientConfig;
+ }
+
+ @Nullable
+ @JsonProperty("endpointConfig")
+ public AWSEndpointConfig getAwsEndpointConfig()
+ {
+ return awsEndpointConfig;
+ }
+
@Override
protected InputEntity createEntity(CloudObjectLocation location)
{
@@ -240,6 +337,9 @@ public class S3InputSource extends CloudObjectInputSource
split.get(),
getFilter(),
getS3InputSourceConfig(),
+ getAwsProxyConfig(),
+ getAwsEndpointConfig(),
+ getAwsClientConfig(),
awsCredentialsProvider
);
}
@@ -263,7 +363,10 @@ public class S3InputSource extends CloudObjectInputSource
return false;
}
S3InputSource that = (S3InputSource) o;
- return Objects.equals(s3InputSourceConfig, that.s3InputSourceConfig);
+ return Objects.equals(s3InputSourceConfig, that.s3InputSourceConfig) &&
+ Objects.equals(awsProxyConfig, that.awsProxyConfig) &&
+ Objects.equals(awsClientConfig, that.awsClientConfig) &&
+ Objects.equals(awsEndpointConfig, that.awsEndpointConfig);
}
@Override
@@ -275,6 +378,9 @@ public class S3InputSource extends CloudObjectInputSource
", objects=" + getObjects() +
", filter=" + getFilter() +
", s3InputSourceConfig=" + getS3InputSourceConfig() +
+ ", awsProxyConfig=" + getAwsProxyConfig() +
+ ", awsEndpointConfig=" + getAwsEndpointConfig() +
+ ", awsClientConfig=" + getAwsClientConfig() +
'}';
}
diff --git
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java
index 69feb8cedb..3747088aeb 100644
---
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java
+++
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java
@@ -43,12 +43,8 @@ import org.apache.druid.guice.Binders;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.initialization.DruidModule;
-import org.apache.druid.java.util.common.IAE;
-import org.apache.druid.java.util.common.URIs;
import org.apache.druid.java.util.common.logger.Logger;
-import javax.annotation.Nullable;
-import java.net.URI;
import java.util.List;
/**
@@ -62,56 +58,6 @@ public class S3StorageDruidModule implements DruidModule
private static final Logger log = new Logger(S3StorageDruidModule.class);
- private static ClientConfiguration setProxyConfig(ClientConfiguration conf,
AWSProxyConfig proxyConfig)
- {
- if (StringUtils.isNotEmpty(proxyConfig.getHost())) {
- conf.setProxyHost(proxyConfig.getHost());
- }
- if (proxyConfig.getPort() != -1) {
- conf.setProxyPort(proxyConfig.getPort());
- }
- if (StringUtils.isNotEmpty(proxyConfig.getUsername())) {
- conf.setProxyUsername(proxyConfig.getUsername());
- }
- if (StringUtils.isNotEmpty(proxyConfig.getPassword())) {
- conf.setProxyPassword(proxyConfig.getPassword());
- }
- return conf;
- }
-
- @Nullable
- private static Protocol parseProtocol(@Nullable String protocol)
- {
- if (protocol == null) {
- return null;
- }
-
- if (protocol.equalsIgnoreCase("http")) {
- return Protocol.HTTP;
- } else if (protocol.equalsIgnoreCase("https")) {
- return Protocol.HTTPS;
- } else {
- throw new IAE("Unknown protocol[%s]", protocol);
- }
- }
-
- private static Protocol determineProtocol(AWSClientConfig clientConfig,
AWSEndpointConfig endpointConfig)
- {
- final Protocol protocolFromClientConfig =
parseProtocol(clientConfig.getProtocol());
- final String endpointUrl = endpointConfig.getUrl();
- if (StringUtils.isNotEmpty(endpointUrl)) {
- //noinspection ConstantConditions
- final URI uri = URIs.parse(endpointUrl,
protocolFromClientConfig.toString());
- final Protocol protocol = parseProtocol(uri.getScheme());
- if (protocol != null && (protocol != protocolFromClientConfig)) {
- log.warn("[%s] protocol will be used for endpoint [%s]", protocol,
endpointUrl);
- }
- return protocol;
- } else {
- return protocolFromClientConfig;
- }
- }
-
@Override
public List<? extends Module> getJacksonModules()
{
@@ -188,11 +134,11 @@ public class S3StorageDruidModule implements DruidModule
)
{
final ClientConfiguration configuration = new
ClientConfigurationFactory().getConfig();
- final Protocol protocol = determineProtocol(clientConfig, endpointConfig);
+ final Protocol protocol = S3Utils.determineProtocol(clientConfig,
endpointConfig);
final AmazonS3ClientBuilder amazonS3ClientBuilder = AmazonS3Client
.builder()
.withCredentials(provider)
- .withClientConfiguration(setProxyConfig(configuration,
proxyConfig).withProtocol(protocol))
+ .withClientConfiguration(S3Utils.setProxyConfig(configuration,
proxyConfig).withProtocol(protocol))
.withChunkedEncodingDisabled(clientConfig.isDisableChunkedEncoding())
.withPathStyleAccessEnabled(clientConfig.isEnablePathStyleAccess())
.withForceGlobalBucketAccessEnabled(clientConfig.isForceGlobalBucketAccessEnabled());
diff --git
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java
index eb40d4aa7a..a017698497 100644
---
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java
+++
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java
@@ -20,6 +20,8 @@
package org.apache.druid.storage.s3;
import com.amazonaws.AmazonClientException;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.Protocol;
import com.amazonaws.SdkClientException;
import com.amazonaws.services.s3.model.AccessControlList;
import com.amazonaws.services.s3.model.AmazonS3Exception;
@@ -34,14 +36,20 @@ import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
+import org.apache.druid.common.aws.AWSClientConfig;
import org.apache.druid.common.aws.AWSClientUtil;
+import org.apache.druid.common.aws.AWSEndpointConfig;
+import org.apache.druid.common.aws.AWSProxyConfig;
import org.apache.druid.data.input.impl.CloudObjectLocation;
+import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.RetryUtils.Task;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.URIs;
import org.apache.druid.java.util.common.logger.Logger;
+import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.net.URI;
@@ -314,4 +322,54 @@ public class S3Utils
log.info("Pushing [%s] to bucket[%s] and key[%s].", file, bucket, key);
service.putObject(putObjectRequest);
}
+
+ @Nullable
+ private static Protocol parseProtocol(@Nullable String protocol)
+ {
+ if (protocol == null) {
+ return null;
+ }
+
+ if (protocol.equalsIgnoreCase("http")) {
+ return Protocol.HTTP;
+ } else if (protocol.equalsIgnoreCase("https")) {
+ return Protocol.HTTPS;
+ } else {
+ throw new IAE("Unknown protocol[%s]", protocol);
+ }
+ }
+
+ public static Protocol determineProtocol(AWSClientConfig clientConfig,
AWSEndpointConfig endpointConfig)
+ {
+ final Protocol protocolFromClientConfig =
parseProtocol(clientConfig.getProtocol());
+ final String endpointUrl = endpointConfig.getUrl();
+ if (org.apache.commons.lang.StringUtils.isNotEmpty(endpointUrl)) {
+ //noinspection ConstantConditions
+ final URI uri = URIs.parse(endpointUrl,
protocolFromClientConfig.toString());
+ final Protocol protocol = parseProtocol(uri.getScheme());
+ if (protocol != null && (protocol != protocolFromClientConfig)) {
+ log.warn("[%s] protocol will be used for endpoint [%s]", protocol,
endpointUrl);
+ }
+ return protocol;
+ } else {
+ return protocolFromClientConfig;
+ }
+ }
+
+ public static ClientConfiguration setProxyConfig(ClientConfiguration conf,
AWSProxyConfig proxyConfig)
+ {
+ if (org.apache.commons.lang.StringUtils.isNotEmpty(proxyConfig.getHost()))
{
+ conf.setProxyHost(proxyConfig.getHost());
+ }
+ if (proxyConfig.getPort() != -1) {
+ conf.setProxyPort(proxyConfig.getPort());
+ }
+ if
(org.apache.commons.lang.StringUtils.isNotEmpty(proxyConfig.getUsername())) {
+ conf.setProxyUsername(proxyConfig.getUsername());
+ }
+ if
(org.apache.commons.lang.StringUtils.isNotEmpty(proxyConfig.getPassword())) {
+ conf.setProxyPassword(proxyConfig.getPassword());
+ }
+ return conf;
+ }
}
diff --git
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java
index 739e3a5889..359de18039 100644
---
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java
+++
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java
@@ -19,6 +19,7 @@
package org.apache.druid.data.input.s3;
+import com.amazonaws.ClientConfiguration;
import com.amazonaws.SdkClientException;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.s3.AmazonS3;
@@ -43,7 +44,10 @@ import com.google.inject.Binder;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Provides;
+import org.apache.druid.common.aws.AWSClientConfig;
import org.apache.druid.common.aws.AWSCredentialsUtils;
+import org.apache.druid.common.aws.AWSEndpointConfig;
+import org.apache.druid.common.aws.AWSProxyConfig;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
@@ -96,6 +100,7 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
{
private static final ObjectMapper MAPPER = createS3ObjectMapper();
private static final AmazonS3Client S3_CLIENT =
EasyMock.createMock(AmazonS3Client.class);
+ private static final ClientConfiguration CLIENT_CONFIGURATION =
EasyMock.createMock(ClientConfiguration.class);
private static final ServerSideEncryptingAmazonS3.Builder
SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER =
EasyMock.createMock(ServerSideEncryptingAmazonS3.Builder.class);
private static final AmazonS3ClientBuilder AMAZON_S3_CLIENT_BUILDER =
AmazonS3Client.builder();
@@ -145,6 +150,9 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
private static final S3InputSourceConfig CLOUD_CONFIG_PROPERTIES = new
S3InputSourceConfig(
new DefaultPasswordProvider("myKey"), new
DefaultPasswordProvider("mySecret"), null, null);
+ private static final AWSEndpointConfig ENDPOINT_CONFIG = new
AWSEndpointConfig();
+ private static final AWSProxyConfig PROXY_CONFIG = new AWSProxyConfig();
+ private static final AWSClientConfig CLIENT_CONFIG = new AWSClientConfig();
private static final List<CloudObjectLocation> EXPECTED_LOCATION =
ImmutableList.of(new CloudObjectLocation("foo", "bar/file.csv"));
@@ -175,6 +183,9 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
null,
null,
null,
+ null,
+ null,
+ null,
null
);
@@ -195,6 +206,9 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
PREFIXES,
null,
null,
+ null,
+ null,
+ null,
null
);
@@ -215,7 +229,11 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
null,
null,
"*.parquet",
+ null,
+ null,
+ null,
null
+
);
Assert.assertEquals(
@@ -235,6 +253,9 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
null,
null,
null,
+ null,
+ null,
+ null,
null
);
final S3InputSource serdeWithUris =
MAPPER.readValue(MAPPER.writeValueAsString(withUris), S3InputSource.class);
@@ -252,6 +273,9 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
PREFIXES,
null,
null,
+ null,
+ null,
+ null,
null
);
final S3InputSource serdeWithPrefixes =
@@ -270,6 +294,9 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
null,
EXPECTED_LOCATION,
null,
+ null,
+ null,
+ null,
null
);
final S3InputSource serdeWithPrefixes =
@@ -283,6 +310,7 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
EasyMock.reset(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
EasyMock.expect(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER.getAmazonS3ClientBuilder())
.andStubReturn(AMAZON_S3_CLIENT_BUILDER);
+ AMAZON_S3_CLIENT_BUILDER.withClientConfiguration(CLIENT_CONFIGURATION);
EasyMock.expect(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER.build())
.andReturn(SERVICE);
EasyMock.replay(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
@@ -294,7 +322,10 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
null,
EXPECTED_LOCATION,
null,
- CLOUD_CONFIG_PROPERTIES
+ CLOUD_CONFIG_PROPERTIES,
+ null,
+ null,
+ null
);
final S3InputSource serdeWithPrefixes =
MAPPER.readValue(MAPPER.writeValueAsString(withPrefixes),
S3InputSource.class);
@@ -305,7 +336,73 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
}
@Test
- public void
testS3InputSourceUseDefaultPasswordWhenCloudConfigPropertiesWithoutCrediential()
+ public void testS3InputSourceUseEndPointClientProxy()
+ {
+ S3InputSourceConfig mockConfigPropertiesWithoutKeyAndSecret =
EasyMock.createMock(S3InputSourceConfig.class);
+ AWSEndpointConfig mockAwsEndpointConfig =
EasyMock.createMock(AWSEndpointConfig.class);
+ AWSClientConfig mockAwsClientConfig =
EasyMock.createMock(AWSClientConfig.class);
+ AWSProxyConfig mockAwsProxyConfig =
EasyMock.createMock(AWSProxyConfig.class);
+
+ EasyMock.reset(mockConfigPropertiesWithoutKeyAndSecret);
+ EasyMock.reset(mockAwsEndpointConfig);
+ EasyMock.reset(mockAwsClientConfig);
+ EasyMock.reset(mockAwsProxyConfig);
+
+ EasyMock.expect(mockAwsEndpointConfig.getUrl()).andStubReturn("endpoint");
+
EasyMock.expect(mockAwsEndpointConfig.getSigningRegion()).andStubReturn("region");
+
+
EasyMock.expect(mockAwsClientConfig.isDisableChunkedEncoding()).andStubReturn(false);
+
EasyMock.expect(mockAwsClientConfig.isEnablePathStyleAccess()).andStubReturn(false);
+
EasyMock.expect(mockAwsClientConfig.isForceGlobalBucketAccessEnabled()).andStubReturn(true);
+ EasyMock.expect(mockAwsClientConfig.getProtocol()).andStubReturn("http");
+
+ EasyMock.expect(mockAwsProxyConfig.getHost()).andStubReturn("");
+ EasyMock.expect(mockAwsProxyConfig.getPort()).andStubReturn(-1);
+ EasyMock.expect(mockAwsProxyConfig.getUsername()).andStubReturn("");
+ EasyMock.expect(mockAwsProxyConfig.getPassword()).andStubReturn("");
+
+
EasyMock.expect(mockConfigPropertiesWithoutKeyAndSecret.getAssumeRoleArn()).andStubReturn(null);
+
EasyMock.expect(mockConfigPropertiesWithoutKeyAndSecret.isCredentialsConfigured())
+ .andStubReturn(false);
+ EasyMock.replay(mockConfigPropertiesWithoutKeyAndSecret);
+ EasyMock.replay(mockAwsEndpointConfig);
+ EasyMock.replay(mockAwsClientConfig);
+ EasyMock.replay(mockAwsProxyConfig);
+
+ EasyMock.reset(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
+
+
EasyMock.expect(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER.getAmazonS3ClientBuilder())
+ .andStubReturn(AMAZON_S3_CLIENT_BUILDER);
+
+ AMAZON_S3_CLIENT_BUILDER.withClientConfiguration(CLIENT_CONFIGURATION);
+
+ EasyMock.expect(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER.build())
+ .andReturn(SERVICE);
+ EasyMock.replay(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
+ final S3InputSource withPrefixes = new S3InputSource(
+ SERVICE,
+ SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
+ INPUT_DATA_CONFIG,
+ null,
+ null,
+ EXPECTED_LOCATION,
+ null,
+ mockConfigPropertiesWithoutKeyAndSecret,
+ mockAwsProxyConfig,
+ mockAwsEndpointConfig,
+ mockAwsClientConfig
+ );
+ Assert.assertNotNull(withPrefixes);
+ // This is to force the s3ClientSupplier to initialize the
ServerSideEncryptingAmazonS3
+ withPrefixes.createEntity(new CloudObjectLocation("bucket", "path"));
+ EasyMock.verify(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
+ EasyMock.verify(mockAwsEndpointConfig);
+ EasyMock.verify(mockAwsClientConfig);
+ EasyMock.verify(mockAwsProxyConfig);
+ }
+
+ @Test
+ public void
testS3InputSourceUseDefaultPasswordWhenCloudConfigPropertiesWithoutCredential()
{
S3InputSourceConfig mockConfigPropertiesWithoutKeyAndSecret =
EasyMock.createMock(S3InputSourceConfig.class);
EasyMock.reset(mockConfigPropertiesWithoutKeyAndSecret);
@@ -325,7 +422,10 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
null,
EXPECTED_LOCATION,
null,
- mockConfigPropertiesWithoutKeyAndSecret
+ mockConfigPropertiesWithoutKeyAndSecret,
+ PROXY_CONFIG,
+ ENDPOINT_CONFIG,
+ CLIENT_CONFIG
);
Assert.assertNotNull(withPrefixes);
// This is to force the s3ClientSupplier to initialize the
ServerSideEncryptingAmazonS3
@@ -335,7 +435,7 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
}
@Test
- public void testSerdeS3ClientLazyInitializedWithCrediential() throws
Exception
+ public void testSerdeS3ClientLazyInitializedWithCredential() throws Exception
{
// Amazon S3 builder should not build anything as we did not make any call
that requires the S3 client
EasyMock.reset(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
@@ -348,7 +448,10 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
null,
EXPECTED_LOCATION,
null,
- CLOUD_CONFIG_PROPERTIES
+ CLOUD_CONFIG_PROPERTIES,
+ null,
+ null,
+ null
);
final S3InputSource serdeWithPrefixes =
MAPPER.readValue(MAPPER.writeValueAsString(withPrefixes),
S3InputSource.class);
@@ -357,7 +460,7 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
}
@Test
- public void testSerdeS3ClientLazyInitializedWithoutCrediential() throws
Exception
+ public void testSerdeS3ClientLazyInitializedWithoutCredential() throws
Exception
{
// Amazon S3 builder should not build anything as we did not make any call
that requires the S3 client
EasyMock.reset(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
@@ -370,8 +473,10 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
null,
EXPECTED_LOCATION,
null,
+ null,
+ null,
+ null,
null
-
);
final S3InputSource serdeWithPrefixes =
MAPPER.readValue(MAPPER.writeValueAsString(withPrefixes),
S3InputSource.class);
@@ -390,6 +495,9 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
ImmutableList.of(),
EXPECTED_LOCATION,
null,
+ null,
+ null,
+ null,
null
);
final S3InputSource serdeWithPrefixes =
@@ -410,6 +518,9 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
null,
null,
null,
+ null,
+ null,
+ null,
null
);
}
@@ -427,6 +538,9 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
null,
EXPECTED_OBJECTS,
null,
+ null,
+ null,
+ null,
null
);
}
@@ -444,6 +558,9 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
PREFIXES,
EXPECTED_OBJECTS,
null,
+ null,
+ null,
+ null,
null
);
}
@@ -461,6 +578,9 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
PREFIXES,
null,
null,
+ null,
+ null,
+ null,
null
);
}
@@ -478,6 +598,9 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
PREFIXES,
EXPECTED_LOCATION,
null,
+ null,
+ null,
+ null,
null
);
}
@@ -495,6 +618,9 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
PREFIXES,
ImmutableList.of(),
null,
+ null,
+ null,
+ null,
null
);
}
@@ -512,6 +638,9 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
PREFIXES,
EXPECTED_LOCATION,
null,
+ null,
+ null,
+ null,
null
);
}
@@ -527,6 +656,9 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
null,
null,
null,
+ null,
+ null,
+ null,
null
);
@@ -549,6 +681,9 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
null,
null,
"*.csv",
+ null,
+ null,
+ null,
null
);
@@ -571,6 +706,9 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
null,
OBJECTS_BEFORE_FILTER,
"*.csv",
+ null,
+ null,
+ null,
null
);
@@ -593,6 +731,9 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
null,
EXPECTED_OBJECTS,
null,
+ null,
+ null,
+ null,
null
);
@@ -620,6 +761,9 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
PREFIXES,
null,
null,
+ null,
+ null,
+ null,
null
);
@@ -648,6 +792,9 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
PREFIXES,
null,
"*.csv",
+ null,
+ null,
+ null,
null
);
@@ -676,6 +823,9 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
PREFIXES,
null,
null,
+ null,
+ null,
+ null,
null
);
@@ -707,6 +857,9 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
PREFIXES,
null,
null,
+ null,
+ null,
+ null,
null
);
@@ -737,6 +890,9 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
ImmutableList.of(PREFIXES.get(0), EXPECTED_URIS.get(1)),
null,
null,
+ null,
+ null,
+ null,
null
);
@@ -769,6 +925,9 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
ImmutableList.of(PREFIXES.get(0), EXPECTED_URIS.get(1)),
null,
null,
+ null,
+ null,
+ null,
null
);
@@ -813,6 +972,9 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
null,
null,
null,
+ null,
+ null,
+ null,
3 // only have three retries since they are slow
);
@@ -857,6 +1019,9 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
ImmutableList.of(PREFIXES.get(0), EXPECTED_COMPRESSED_URIS.get(1)),
null,
null,
+ null,
+ null,
+ null,
null
);
diff --git a/website/.spelling b/website/.spelling
index 88a474f667..84a523e141 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -232,6 +232,7 @@ broadcasted
checksums
classpath
clickstream
+clientConfig
codebase
codec
colocated
@@ -266,6 +267,7 @@ druid–kubernetes-extensions
e.g.
encodings
endian
+endpointConfig
enum
expr
failover
@@ -378,6 +380,7 @@ procs
programmatically
proto
proxied
+proxyConfig
QPS
quantile
quantiles
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]