This is an automated email from the ASF dual-hosted git repository.

atul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 75045970cd S3 Ingestion from non-default endpoints (#11798)
75045970cd is described below

commit 75045970cde40562d9445ba4a9d27705742b29ab
Author: Atul Mohan <[email protected]>
AuthorDate: Fri Jul 15 11:03:34 2022 -0700

    S3 Ingestion from non-default endpoints (#11798)
    
    * Add endpoint support for s3inputsource
    
    * Changes to tests
    
    * Fix docs
    
    * Fix config
    
    * Fix inspections
    
    * Fix spelling
    
    * Remove password from toString
---
 .../apache/druid/common/aws/AWSClientConfig.java   |  11 ++
 .../apache/druid/common/aws/AWSEndpointConfig.java |   9 ++
 .../apache/druid/common/aws/AWSProxyConfig.java    |  10 ++
 docs/ingestion/native-batch-input-source.md        |  42 +++++
 .../apache/druid/data/input/s3/S3InputSource.java  | 142 +++++++++++++---
 .../druid/storage/s3/S3StorageDruidModule.java     |  58 +------
 .../java/org/apache/druid/storage/s3/S3Utils.java  |  58 +++++++
 .../druid/data/input/s3/S3InputSourceTest.java     | 179 ++++++++++++++++++++-
 website/.spelling                                  |   3 +
 9 files changed, 431 insertions(+), 81 deletions(-)

diff --git 
a/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSClientConfig.java
 
b/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSClientConfig.java
index 5a0a8b0afb..7886bd0a9e 100644
--- 
a/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSClientConfig.java
+++ 
b/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSClientConfig.java
@@ -55,4 +55,15 @@ public class AWSClientConfig
   {
     return forceGlobalBucketAccessEnabled;
   }
+
+  @Override
+  public String toString()
+  {
+    return "AWSClientConfig{" +
+           "protocol='" + protocol + '\'' +
+           ", disableChunkedEncoding=" + disableChunkedEncoding +
+           ", enablePathStyleAccess=" + enablePathStyleAccess +
+           ", forceGlobalBucketAccessEnabled=" + 
forceGlobalBucketAccessEnabled +
+           '}';
+  }
 }
diff --git 
a/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSEndpointConfig.java
 
b/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSEndpointConfig.java
index 80216d9711..302ca09821 100644
--- 
a/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSEndpointConfig.java
+++ 
b/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSEndpointConfig.java
@@ -44,4 +44,13 @@ public class AWSEndpointConfig
   {
     return signingRegion;
   }
+
+  @Override
+  public String toString()
+  {
+    return "AWSEndpointConfig{" +
+           "url='" + url + '\'' +
+           ", signingRegion='" + signingRegion + '\'' +
+           '}';
+  }
 }
diff --git 
a/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSProxyConfig.java
 
b/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSProxyConfig.java
index 085e810834..5153a74239 100644
--- 
a/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSProxyConfig.java
+++ 
b/cloud/aws-common/src/main/java/org/apache/druid/common/aws/AWSProxyConfig.java
@@ -54,4 +54,14 @@ public class AWSProxyConfig
   {
     return password;
   }
+
+  @Override
+  public String toString()
+  {
+    return "AWSProxyConfig{" +
+           "host='" + host + '\'' +
+           ", port=" + port +
+           ", username='" + username + '\'' +
+           '}';
+  }
 }
diff --git a/docs/ingestion/native-batch-input-source.md 
b/docs/ingestion/native-batch-input-source.md
index 8ce42073dc..f4b92bdfe7 100644
--- a/docs/ingestion/native-batch-input-source.md
+++ b/docs/ingestion/native-batch-input-source.md
@@ -138,6 +138,45 @@ Sample specs:
 ...
 ```
 
+```json
+...
+    "ioConfig": {
+      "type": "index_parallel",
+      "inputSource": {
+        "type": "s3",
+        "uris": ["s3://foo/bar/file.json", "s3://bar/foo/file2.json"],
+        "endpointConfig": {
+             "url" : "s3-store.aws.com",
+             "signingRegion" : "us-west-2"
+         },
+         "clientConfig": {
+             "protocol" : "http",
+             "disableChunkedEncoding" : true,
+             "enablePathStyleAccess" : true,
+             "forceGlobalBucketAccessEnabled" : false
+         },
+         "proxyConfig": {
+             "host" : "proxy-s3.aws.com",
+             "port" : 8888,
+             "username" : "admin",
+             "password" : "admin"
+         },
+
+        "properties": {
+          "accessKeyId": "KLJ78979SDFdS2",
+          "secretAccessKey": "KLS89s98sKJHKJKJH8721lljkd",
+          "assumeRoleArn": "arn:aws:iam::2981002874992:role/role-s3"
+        }
+      },
+      "inputFormat": {
+        "type": "json"
+      },
+      ...
+    },
+...
+```
+
+
 |property|description|default|required?|
 |--------|-----------|-------|---------|
 |type|This should be `s3`.|None|yes|
@@ -145,6 +184,9 @@ Sample specs:
 |prefixes|JSON array of URI prefixes for the locations of S3 objects to be 
ingested. Empty objects starting with one of the given prefixes will be 
skipped.|None|`uris` or `prefixes` or `objects` must be set|
 |objects|JSON array of S3 Objects to be ingested.|None|`uris` or `prefixes` or 
`objects` must be set|
 |filter|A wildcard filter for files. See 
[here](http://commons.apache.org/proper/commons-io/apidocs/org/apache/commons/io/filefilter/WildcardFileFilter)
 for more information. Files matching the filter criteria are considered for 
ingestion. Files not matching the filter criteria are ignored.|None|no|
+| endpointConfig |Config for overriding the default S3 endpoint and signing 
region. This would allow ingesting data from a different S3 store. Please see 
[s3 
config](../development/extensions-core/s3.md#connecting-to-s3-configuration) 
for more information.|None|No (defaults will be used if not given)
+| clientConfig |S3 client properties for the overridden s3 endpoint. This is 
used in conjunction with `endPointConfig`. Please see [s3 
config](../development/extensions-core/s3.md#connecting-to-s3-configuration) 
for more information.|None|No (defaults will be used if not given)
+| proxyConfig |Properties for specifying proxy information for the overridden 
s3 endpoint. This is used in conjunction with `clientConfig`. Please see [s3 
config](../development/extensions-core/s3.md#connecting-to-s3-configuration) 
for more information.|None|No (defaults will be used if not given)
 |properties|Properties Object for overriding the default S3 configuration. See 
below for more information.|None|No (defaults will be used if not given)
 
 Note that the S3 input source will skip all empty objects only when `prefixes` 
is specified.
diff --git 
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java
 
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java
index e94b679a25..549df7247e 100644
--- 
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java
+++ 
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java
@@ -19,10 +19,12 @@
 
 package org.apache.druid.data.input.s3;
 
+import com.amazonaws.Protocol;
 import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.auth.AWSStaticCredentialsProvider;
 import com.amazonaws.auth.BasicAWSCredentials;
 import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
+import com.amazonaws.client.builder.AwsClientBuilder;
 import com.amazonaws.services.s3.model.S3ObjectSummary;
 import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
 import 
com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
@@ -35,6 +37,9 @@ import com.google.common.base.Supplier;
 import com.google.common.base.Suppliers;
 import com.google.common.collect.Iterators;
 import org.apache.commons.io.FilenameUtils;
+import org.apache.druid.common.aws.AWSClientConfig;
+import org.apache.druid.common.aws.AWSEndpointConfig;
+import org.apache.druid.common.aws.AWSProxyConfig;
 import org.apache.druid.data.input.InputEntity;
 import org.apache.druid.data.input.InputFileAttribute;
 import org.apache.druid.data.input.InputSplit;
@@ -69,24 +74,30 @@ public class S3InputSource extends CloudObjectInputSource
   @JsonProperty("properties")
   private final S3InputSourceConfig s3InputSourceConfig;
   private final S3InputDataConfig inputDataConfig;
+  private final AWSProxyConfig awsProxyConfig;
+  private final AWSClientConfig awsClientConfig;
+  private final AWSEndpointConfig awsEndpointConfig;
   private final AWSCredentialsProvider awsCredentialsProvider;
   private int maxRetries;
 
   /**
    * Constructor for S3InputSource
-   * @param s3Client                The default ServerSideEncryptingAmazonS3 
client built with all default configs
-   *                                from Guice. This injected singleton client 
is use when {@param s3InputSourceConfig}
-   *                                is not provided and hence, we can skip 
building a new client from
-   *                                {@param s3ClientBuilder}
-   * @param s3ClientBuilder         Use for building a new s3Client to use 
instead of the default injected
-   *                                {@param s3Client}. The configurations of 
the client can be changed
-   *                                before being built
-   * @param inputDataConfig         Stores the configuration for options 
related to reading input data
-   * @param uris                    User provided uris to read input data
-   * @param prefixes                User provided prefixes to read input data
-   * @param objects                 User provided cloud objects values to read 
input data
-   * @param s3InputSourceConfig     User provided properties for overriding 
the default S3 configuration
    *
+   * @param s3Client            The default ServerSideEncryptingAmazonS3 
client built with all default configs
+   *                            from Guice. This injected singleton client is 
use when {@param s3InputSourceConfig}
+   *                            is not provided and hence, we can skip 
building a new client from
+   *                            {@param s3ClientBuilder}
+   * @param s3ClientBuilder     Use for building a new s3Client to use instead 
of the default injected
+   *                            {@param s3Client}. The configurations of the 
client can be changed
+   *                            before being built
+   * @param inputDataConfig     Stores the configuration for options related 
to reading input data
+   * @param uris                User provided uris to read input data
+   * @param prefixes            User provided prefixes to read input data
+   * @param objects             User provided cloud objects values to read 
input data
+   * @param s3InputSourceConfig User provided properties for overriding the 
default S3 credentials
+   * @param awsProxyConfig      User provided proxy information for the 
overridden s3 client
+   * @param awsEndpointConfig   User provided s3 endpoint and region for 
overriding the default S3 endpoint
+   * @param awsClientConfig     User provided properties for the S3 client 
with the overridden endpoint
    */
   @JsonCreator
   public S3InputSource(
@@ -98,6 +109,9 @@ public class S3InputSource extends CloudObjectInputSource
       @JsonProperty("objects") @Nullable List<CloudObjectLocation> objects,
       @JsonProperty("filter") @Nullable String filter,
       @JsonProperty("properties") @Nullable S3InputSourceConfig 
s3InputSourceConfig,
+      @JsonProperty("proxyConfig") @Nullable AWSProxyConfig awsProxyConfig,
+      @JsonProperty("endpointConfig") @Nullable AWSEndpointConfig 
awsEndpointConfig,
+      @JsonProperty("clientConfig") @Nullable AWSClientConfig awsClientConfig,
       @JacksonInject AWSCredentialsProvider awsCredentialsProvider
   )
   {
@@ -105,9 +119,39 @@ public class S3InputSource extends CloudObjectInputSource
     this.inputDataConfig = Preconditions.checkNotNull(inputDataConfig, 
"S3DataSegmentPusherConfig");
     Preconditions.checkNotNull(s3Client, "s3Client");
     this.s3InputSourceConfig = s3InputSourceConfig;
+    this.awsProxyConfig = awsProxyConfig;
+    this.awsClientConfig = awsClientConfig;
+    this.awsEndpointConfig = awsEndpointConfig;
+
     this.s3ClientSupplier = Suppliers.memoize(
         () -> {
           if (s3ClientBuilder != null && s3InputSourceConfig != null) {
+            if (awsEndpointConfig != null && awsEndpointConfig.getUrl() != 
null) {
+              s3ClientBuilder
+                  .getAmazonS3ClientBuilder().setEndpointConfiguration(
+                      new AwsClientBuilder.EndpointConfiguration(
+                          awsEndpointConfig.getUrl(),
+                          awsEndpointConfig.getSigningRegion()
+                      ));
+              if (awsClientConfig != null) {
+                s3ClientBuilder
+                    .getAmazonS3ClientBuilder()
+                    
.withChunkedEncodingDisabled(awsClientConfig.isDisableChunkedEncoding())
+                    
.withPathStyleAccessEnabled(awsClientConfig.isEnablePathStyleAccess())
+                    
.withForceGlobalBucketAccessEnabled(awsClientConfig.isForceGlobalBucketAccessEnabled());
+
+                if (awsProxyConfig != null) {
+                  final Protocol protocol = 
S3Utils.determineProtocol(awsClientConfig, awsEndpointConfig);
+                  s3ClientBuilder
+                      .getAmazonS3ClientBuilder()
+                      .withClientConfiguration(S3Utils.setProxyConfig(
+                          s3ClientBuilder.getAmazonS3ClientBuilder()
+                                         .getClientConfiguration(),
+                          awsProxyConfig
+                      ).withProtocol(protocol));
+                }
+              }
+            }
             if (s3InputSourceConfig.isCredentialsConfigured()) {
               if (s3InputSourceConfig.getAssumeRoleArn() == null) {
                 s3ClientBuilder
@@ -142,10 +186,25 @@ public class S3InputSource extends CloudObjectInputSource
       List<URI> prefixes,
       List<CloudObjectLocation> objects,
       String filter,
-      S3InputSourceConfig s3InputSourceConfig
+      S3InputSourceConfig s3InputSourceConfig,
+      AWSProxyConfig awsProxyConfig,
+      AWSEndpointConfig awsEndpointConfig,
+      AWSClientConfig awsClientConfig
   )
   {
-    this(s3Client, s3ClientBuilder, inputDataConfig, uris, prefixes, objects, 
filter, s3InputSourceConfig, null);
+    this(s3Client,
+         s3ClientBuilder,
+         inputDataConfig,
+         uris,
+         prefixes,
+         objects,
+         filter,
+         s3InputSourceConfig,
+         awsProxyConfig,
+         awsEndpointConfig,
+         awsClientConfig,
+         null
+    );
   }
 
   @VisibleForTesting
@@ -158,10 +217,26 @@ public class S3InputSource extends CloudObjectInputSource
       List<CloudObjectLocation> objects,
       String filter,
       S3InputSourceConfig s3InputSourceConfig,
+      AWSProxyConfig awsProxyConfig,
+      AWSEndpointConfig awsEndpointConfig,
+      AWSClientConfig awsClientConfig,
       int maxRetries
   )
   {
-    this(s3Client, s3ClientBuilder, inputDataConfig, uris, prefixes, objects, 
filter, s3InputSourceConfig, null);
+    this(
+        s3Client,
+        s3ClientBuilder,
+        inputDataConfig,
+        uris,
+        prefixes,
+        objects,
+        filter,
+        s3InputSourceConfig,
+        awsProxyConfig,
+        awsEndpointConfig,
+        awsClientConfig,
+        null
+    );
     this.maxRetries = maxRetries;
   }
 
@@ -175,8 +250,9 @@ public class S3InputSource extends CloudObjectInputSource
     if (assumeRoleArn != null) {
       String roleSessionName = StringUtils.format("druid-s3-input-source-%s", 
UUID.randomUUID().toString());
       AWSSecurityTokenService securityTokenService = 
AWSSecurityTokenServiceClientBuilder.standard()
-                                                                          
.withCredentials(awsCredentialsProvider)
-                                                                          
.build();
+                                                                               
          .withCredentials(
+                                                                               
              awsCredentialsProvider)
+                                                                               
          .build();
       STSAssumeRoleSessionCredentialsProvider.Builder 
roleCredentialsProviderBuilder;
       roleCredentialsProviderBuilder = new 
STSAssumeRoleSessionCredentialsProvider
           .Builder(assumeRoleArn, 
roleSessionName).withStsClient(securityTokenService);
@@ -207,6 +283,27 @@ public class S3InputSource extends CloudObjectInputSource
     return s3InputSourceConfig;
   }
 
+  @Nullable
+  @JsonProperty("proxyConfig")
+  public AWSProxyConfig getAwsProxyConfig()
+  {
+    return awsProxyConfig;
+  }
+
+  @Nullable
+  @JsonProperty("clientConfig")
+  public AWSClientConfig getAwsClientConfig()
+  {
+    return awsClientConfig;
+  }
+
+  @Nullable
+  @JsonProperty("endpointConfig")
+  public AWSEndpointConfig getAwsEndpointConfig()
+  {
+    return awsEndpointConfig;
+  }
+
   @Override
   protected InputEntity createEntity(CloudObjectLocation location)
   {
@@ -240,6 +337,9 @@ public class S3InputSource extends CloudObjectInputSource
         split.get(),
         getFilter(),
         getS3InputSourceConfig(),
+        getAwsProxyConfig(),
+        getAwsEndpointConfig(),
+        getAwsClientConfig(),
         awsCredentialsProvider
     );
   }
@@ -263,7 +363,10 @@ public class S3InputSource extends CloudObjectInputSource
       return false;
     }
     S3InputSource that = (S3InputSource) o;
-    return Objects.equals(s3InputSourceConfig, that.s3InputSourceConfig);
+    return Objects.equals(s3InputSourceConfig, that.s3InputSourceConfig) &&
+           Objects.equals(awsProxyConfig, that.awsProxyConfig) &&
+           Objects.equals(awsClientConfig, that.awsClientConfig) &&
+           Objects.equals(awsEndpointConfig, that.awsEndpointConfig);
   }
 
   @Override
@@ -275,6 +378,9 @@ public class S3InputSource extends CloudObjectInputSource
            ", objects=" + getObjects() +
            ", filter=" + getFilter() +
            ", s3InputSourceConfig=" + getS3InputSourceConfig() +
+           ", awsProxyConfig=" + getAwsProxyConfig() +
+           ", awsEndpointConfig=" + getAwsEndpointConfig() +
+           ", awsClientConfig=" + getAwsClientConfig() +
            '}';
   }
 
diff --git 
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java
 
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java
index 69feb8cedb..3747088aeb 100644
--- 
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java
+++ 
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java
@@ -43,12 +43,8 @@ import org.apache.druid.guice.Binders;
 import org.apache.druid.guice.JsonConfigProvider;
 import org.apache.druid.guice.LazySingleton;
 import org.apache.druid.initialization.DruidModule;
-import org.apache.druid.java.util.common.IAE;
-import org.apache.druid.java.util.common.URIs;
 import org.apache.druid.java.util.common.logger.Logger;
 
-import javax.annotation.Nullable;
-import java.net.URI;
 import java.util.List;
 
 /**
@@ -62,56 +58,6 @@ public class S3StorageDruidModule implements DruidModule
 
   private static final Logger log = new Logger(S3StorageDruidModule.class);
 
-  private static ClientConfiguration setProxyConfig(ClientConfiguration conf, 
AWSProxyConfig proxyConfig)
-  {
-    if (StringUtils.isNotEmpty(proxyConfig.getHost())) {
-      conf.setProxyHost(proxyConfig.getHost());
-    }
-    if (proxyConfig.getPort() != -1) {
-      conf.setProxyPort(proxyConfig.getPort());
-    }
-    if (StringUtils.isNotEmpty(proxyConfig.getUsername())) {
-      conf.setProxyUsername(proxyConfig.getUsername());
-    }
-    if (StringUtils.isNotEmpty(proxyConfig.getPassword())) {
-      conf.setProxyPassword(proxyConfig.getPassword());
-    }
-    return conf;
-  }
-
-  @Nullable
-  private static Protocol parseProtocol(@Nullable String protocol)
-  {
-    if (protocol == null) {
-      return null;
-    }
-
-    if (protocol.equalsIgnoreCase("http")) {
-      return Protocol.HTTP;
-    } else if (protocol.equalsIgnoreCase("https")) {
-      return Protocol.HTTPS;
-    } else {
-      throw new IAE("Unknown protocol[%s]", protocol);
-    }
-  }
-
-  private static Protocol determineProtocol(AWSClientConfig clientConfig, 
AWSEndpointConfig endpointConfig)
-  {
-    final Protocol protocolFromClientConfig = 
parseProtocol(clientConfig.getProtocol());
-    final String endpointUrl = endpointConfig.getUrl();
-    if (StringUtils.isNotEmpty(endpointUrl)) {
-      //noinspection ConstantConditions
-      final URI uri = URIs.parse(endpointUrl, 
protocolFromClientConfig.toString());
-      final Protocol protocol = parseProtocol(uri.getScheme());
-      if (protocol != null && (protocol != protocolFromClientConfig)) {
-        log.warn("[%s] protocol will be used for endpoint [%s]", protocol, 
endpointUrl);
-      }
-      return protocol;
-    } else {
-      return protocolFromClientConfig;
-    }
-  }
-
   @Override
   public List<? extends Module> getJacksonModules()
   {
@@ -188,11 +134,11 @@ public class S3StorageDruidModule implements DruidModule
   )
   {
     final ClientConfiguration configuration = new 
ClientConfigurationFactory().getConfig();
-    final Protocol protocol = determineProtocol(clientConfig, endpointConfig);
+    final Protocol protocol = S3Utils.determineProtocol(clientConfig, 
endpointConfig);
     final AmazonS3ClientBuilder amazonS3ClientBuilder = AmazonS3Client
         .builder()
         .withCredentials(provider)
-        .withClientConfiguration(setProxyConfig(configuration, 
proxyConfig).withProtocol(protocol))
+        .withClientConfiguration(S3Utils.setProxyConfig(configuration, 
proxyConfig).withProtocol(protocol))
         .withChunkedEncodingDisabled(clientConfig.isDisableChunkedEncoding())
         .withPathStyleAccessEnabled(clientConfig.isEnablePathStyleAccess())
         
.withForceGlobalBucketAccessEnabled(clientConfig.isForceGlobalBucketAccessEnabled());
diff --git 
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java
 
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java
index eb40d4aa7a..a017698497 100644
--- 
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java
+++ 
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java
@@ -20,6 +20,8 @@
 package org.apache.druid.storage.s3;
 
 import com.amazonaws.AmazonClientException;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.Protocol;
 import com.amazonaws.SdkClientException;
 import com.amazonaws.services.s3.model.AccessControlList;
 import com.amazonaws.services.s3.model.AmazonS3Exception;
@@ -34,14 +36,20 @@ import com.amazonaws.services.s3.model.S3ObjectSummary;
 import com.google.common.base.Joiner;
 import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableList;
+import org.apache.druid.common.aws.AWSClientConfig;
 import org.apache.druid.common.aws.AWSClientUtil;
+import org.apache.druid.common.aws.AWSEndpointConfig;
+import org.apache.druid.common.aws.AWSProxyConfig;
 import org.apache.druid.data.input.impl.CloudObjectLocation;
+import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.RetryUtils;
 import org.apache.druid.java.util.common.RetryUtils.Task;
 import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.URIs;
 import org.apache.druid.java.util.common.logger.Logger;
 
+import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
@@ -314,4 +322,54 @@ public class S3Utils
     log.info("Pushing [%s] to bucket[%s] and key[%s].", file, bucket, key);
     service.putObject(putObjectRequest);
   }
+
+  @Nullable
+  private static Protocol parseProtocol(@Nullable String protocol)
+  {
+    if (protocol == null) {
+      return null;
+    }
+
+    if (protocol.equalsIgnoreCase("http")) {
+      return Protocol.HTTP;
+    } else if (protocol.equalsIgnoreCase("https")) {
+      return Protocol.HTTPS;
+    } else {
+      throw new IAE("Unknown protocol[%s]", protocol);
+    }
+  }
+
+  public static Protocol determineProtocol(AWSClientConfig clientConfig, 
AWSEndpointConfig endpointConfig)
+  {
+    final Protocol protocolFromClientConfig = 
parseProtocol(clientConfig.getProtocol());
+    final String endpointUrl = endpointConfig.getUrl();
+    if (org.apache.commons.lang.StringUtils.isNotEmpty(endpointUrl)) {
+      //noinspection ConstantConditions
+      final URI uri = URIs.parse(endpointUrl, 
protocolFromClientConfig.toString());
+      final Protocol protocol = parseProtocol(uri.getScheme());
+      if (protocol != null && (protocol != protocolFromClientConfig)) {
+        log.warn("[%s] protocol will be used for endpoint [%s]", protocol, 
endpointUrl);
+      }
+      return protocol;
+    } else {
+      return protocolFromClientConfig;
+    }
+  }
+
+  public static ClientConfiguration setProxyConfig(ClientConfiguration conf, 
AWSProxyConfig proxyConfig)
+  {
+    if (org.apache.commons.lang.StringUtils.isNotEmpty(proxyConfig.getHost())) 
{
+      conf.setProxyHost(proxyConfig.getHost());
+    }
+    if (proxyConfig.getPort() != -1) {
+      conf.setProxyPort(proxyConfig.getPort());
+    }
+    if 
(org.apache.commons.lang.StringUtils.isNotEmpty(proxyConfig.getUsername())) {
+      conf.setProxyUsername(proxyConfig.getUsername());
+    }
+    if 
(org.apache.commons.lang.StringUtils.isNotEmpty(proxyConfig.getPassword())) {
+      conf.setProxyPassword(proxyConfig.getPassword());
+    }
+    return conf;
+  }
 }
diff --git 
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java
 
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java
index 739e3a5889..359de18039 100644
--- 
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java
+++ 
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.data.input.s3;
 
+import com.amazonaws.ClientConfiguration;
 import com.amazonaws.SdkClientException;
 import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.services.s3.AmazonS3;
@@ -43,7 +44,10 @@ import com.google.inject.Binder;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.Provides;
+import org.apache.druid.common.aws.AWSClientConfig;
 import org.apache.druid.common.aws.AWSCredentialsUtils;
+import org.apache.druid.common.aws.AWSEndpointConfig;
+import org.apache.druid.common.aws.AWSProxyConfig;
 import org.apache.druid.data.input.ColumnsFilter;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.InputRowSchema;
@@ -96,6 +100,7 @@ public class S3InputSourceTest extends 
InitializedNullHandlingTest
 {
   private static final ObjectMapper MAPPER = createS3ObjectMapper();
   private static final AmazonS3Client S3_CLIENT = 
EasyMock.createMock(AmazonS3Client.class);
+  private static final ClientConfiguration CLIENT_CONFIGURATION = 
EasyMock.createMock(ClientConfiguration.class);
   private static final ServerSideEncryptingAmazonS3.Builder 
SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER =
       EasyMock.createMock(ServerSideEncryptingAmazonS3.Builder.class);
   private static final AmazonS3ClientBuilder AMAZON_S3_CLIENT_BUILDER = 
AmazonS3Client.builder();
@@ -145,6 +150,9 @@ public class S3InputSourceTest extends 
InitializedNullHandlingTest
 
   private static final S3InputSourceConfig CLOUD_CONFIG_PROPERTIES = new 
S3InputSourceConfig(
       new DefaultPasswordProvider("myKey"), new 
DefaultPasswordProvider("mySecret"), null, null);
+  private static final AWSEndpointConfig ENDPOINT_CONFIG = new 
AWSEndpointConfig();
+  private static final AWSProxyConfig PROXY_CONFIG = new AWSProxyConfig();
+  private static final AWSClientConfig CLIENT_CONFIG = new AWSClientConfig();
 
   private static final List<CloudObjectLocation> EXPECTED_LOCATION =
       ImmutableList.of(new CloudObjectLocation("foo", "bar/file.csv"));
@@ -175,6 +183,9 @@ public class S3InputSourceTest extends 
InitializedNullHandlingTest
         null,
         null,
         null,
+        null,
+        null,
+        null,
         null
     );
 
@@ -195,6 +206,9 @@ public class S3InputSourceTest extends 
InitializedNullHandlingTest
         PREFIXES,
         null,
         null,
+        null,
+        null,
+        null,
         null
     );
 
@@ -215,7 +229,11 @@ public class S3InputSourceTest extends 
InitializedNullHandlingTest
         null,
         null,
         "*.parquet",
+        null,
+        null,
+        null,
         null
+
     );
 
     Assert.assertEquals(
@@ -235,6 +253,9 @@ public class S3InputSourceTest extends 
InitializedNullHandlingTest
         null,
         null,
         null,
+        null,
+        null,
+        null,
         null
     );
     final S3InputSource serdeWithUris = 
MAPPER.readValue(MAPPER.writeValueAsString(withUris), S3InputSource.class);
@@ -252,6 +273,9 @@ public class S3InputSourceTest extends 
InitializedNullHandlingTest
         PREFIXES,
         null,
         null,
+        null,
+        null,
+        null,
         null
     );
     final S3InputSource serdeWithPrefixes =
@@ -270,6 +294,9 @@ public class S3InputSourceTest extends 
InitializedNullHandlingTest
         null,
         EXPECTED_LOCATION,
         null,
+        null,
+        null,
+        null,
         null
     );
     final S3InputSource serdeWithPrefixes =
@@ -283,6 +310,7 @@ public class S3InputSourceTest extends 
InitializedNullHandlingTest
     EasyMock.reset(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
     
EasyMock.expect(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER.getAmazonS3ClientBuilder())
             .andStubReturn(AMAZON_S3_CLIENT_BUILDER);
+    AMAZON_S3_CLIENT_BUILDER.withClientConfiguration(CLIENT_CONFIGURATION);
     EasyMock.expect(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER.build())
             .andReturn(SERVICE);
     EasyMock.replay(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
@@ -294,7 +322,10 @@ public class S3InputSourceTest extends 
InitializedNullHandlingTest
         null,
         EXPECTED_LOCATION,
         null,
-        CLOUD_CONFIG_PROPERTIES
+        CLOUD_CONFIG_PROPERTIES,
+        null,
+        null,
+        null
     );
     final S3InputSource serdeWithPrefixes =
         MAPPER.readValue(MAPPER.writeValueAsString(withPrefixes), 
S3InputSource.class);
@@ -305,7 +336,73 @@ public class S3InputSourceTest extends 
InitializedNullHandlingTest
   }
 
   @Test
-  public void 
testS3InputSourceUseDefaultPasswordWhenCloudConfigPropertiesWithoutCrediential()
+  public void testS3InputSourceUseEndPointClientProxy()
+  {
+    S3InputSourceConfig mockConfigPropertiesWithoutKeyAndSecret = 
EasyMock.createMock(S3InputSourceConfig.class);
+    AWSEndpointConfig mockAwsEndpointConfig = 
EasyMock.createMock(AWSEndpointConfig.class);
+    AWSClientConfig mockAwsClientConfig = 
EasyMock.createMock(AWSClientConfig.class);
+    AWSProxyConfig mockAwsProxyConfig = 
EasyMock.createMock(AWSProxyConfig.class);
+
+    EasyMock.reset(mockConfigPropertiesWithoutKeyAndSecret);
+    EasyMock.reset(mockAwsEndpointConfig);
+    EasyMock.reset(mockAwsClientConfig);
+    EasyMock.reset(mockAwsProxyConfig);
+
+    EasyMock.expect(mockAwsEndpointConfig.getUrl()).andStubReturn("endpoint");
+    
EasyMock.expect(mockAwsEndpointConfig.getSigningRegion()).andStubReturn("region");
+
+    
EasyMock.expect(mockAwsClientConfig.isDisableChunkedEncoding()).andStubReturn(false);
+    
EasyMock.expect(mockAwsClientConfig.isEnablePathStyleAccess()).andStubReturn(false);
+    
EasyMock.expect(mockAwsClientConfig.isForceGlobalBucketAccessEnabled()).andStubReturn(true);
+    EasyMock.expect(mockAwsClientConfig.getProtocol()).andStubReturn("http");
+
+    EasyMock.expect(mockAwsProxyConfig.getHost()).andStubReturn("");
+    EasyMock.expect(mockAwsProxyConfig.getPort()).andStubReturn(-1);
+    EasyMock.expect(mockAwsProxyConfig.getUsername()).andStubReturn("");
+    EasyMock.expect(mockAwsProxyConfig.getPassword()).andStubReturn("");
+
+    
EasyMock.expect(mockConfigPropertiesWithoutKeyAndSecret.getAssumeRoleArn()).andStubReturn(null);
+    
EasyMock.expect(mockConfigPropertiesWithoutKeyAndSecret.isCredentialsConfigured())
+            .andStubReturn(false);
+    EasyMock.replay(mockConfigPropertiesWithoutKeyAndSecret);
+    EasyMock.replay(mockAwsEndpointConfig);
+    EasyMock.replay(mockAwsClientConfig);
+    EasyMock.replay(mockAwsProxyConfig);
+
+    EasyMock.reset(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
+
+    
EasyMock.expect(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER.getAmazonS3ClientBuilder())
+            .andStubReturn(AMAZON_S3_CLIENT_BUILDER);
+
+    AMAZON_S3_CLIENT_BUILDER.withClientConfiguration(CLIENT_CONFIGURATION);
+
+    EasyMock.expect(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER.build())
+            .andReturn(SERVICE);
+    EasyMock.replay(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
+    final S3InputSource withPrefixes = new S3InputSource(
+        SERVICE,
+        SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
+        INPUT_DATA_CONFIG,
+        null,
+        null,
+        EXPECTED_LOCATION,
+        null,
+        mockConfigPropertiesWithoutKeyAndSecret,
+        mockAwsProxyConfig,
+        mockAwsEndpointConfig,
+        mockAwsClientConfig
+    );
+    Assert.assertNotNull(withPrefixes);
+    // This is to force the s3ClientSupplier to initialize the 
ServerSideEncryptingAmazonS3
+    withPrefixes.createEntity(new CloudObjectLocation("bucket", "path"));
+    EasyMock.verify(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
+    EasyMock.verify(mockAwsEndpointConfig);
+    EasyMock.verify(mockAwsClientConfig);
+    EasyMock.verify(mockAwsProxyConfig);
+  }
+
+  @Test
+  public void 
testS3InputSourceUseDefaultPasswordWhenCloudConfigPropertiesWithoutCredential()
   {
     S3InputSourceConfig mockConfigPropertiesWithoutKeyAndSecret = 
EasyMock.createMock(S3InputSourceConfig.class);
     EasyMock.reset(mockConfigPropertiesWithoutKeyAndSecret);
@@ -325,7 +422,10 @@ public class S3InputSourceTest extends 
InitializedNullHandlingTest
         null,
         EXPECTED_LOCATION,
         null,
-        mockConfigPropertiesWithoutKeyAndSecret
+        mockConfigPropertiesWithoutKeyAndSecret,
+        PROXY_CONFIG,
+        ENDPOINT_CONFIG,
+        CLIENT_CONFIG
     );
     Assert.assertNotNull(withPrefixes);
     // This is to force the s3ClientSupplier to initialize the 
ServerSideEncryptingAmazonS3
@@ -335,7 +435,7 @@ public class S3InputSourceTest extends 
InitializedNullHandlingTest
   }
 
   @Test
-  public void testSerdeS3ClientLazyInitializedWithCrediential() throws 
Exception
+  public void testSerdeS3ClientLazyInitializedWithCredential() throws Exception
   {
     // Amazon S3 builder should not build anything as we did not make any call 
that requires the S3 client
     EasyMock.reset(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
@@ -348,7 +448,10 @@ public class S3InputSourceTest extends 
InitializedNullHandlingTest
         null,
         EXPECTED_LOCATION,
         null,
-        CLOUD_CONFIG_PROPERTIES
+        CLOUD_CONFIG_PROPERTIES,
+        null,
+        null,
+        null
     );
     final S3InputSource serdeWithPrefixes =
         MAPPER.readValue(MAPPER.writeValueAsString(withPrefixes), 
S3InputSource.class);
@@ -357,7 +460,7 @@ public class S3InputSourceTest extends 
InitializedNullHandlingTest
   }
 
   @Test
-  public void testSerdeS3ClientLazyInitializedWithoutCrediential() throws 
Exception
+  public void testSerdeS3ClientLazyInitializedWithoutCredential() throws 
Exception
   {
     // Amazon S3 builder should not build anything as we did not make any call 
that requires the S3 client
     EasyMock.reset(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
@@ -370,8 +473,10 @@ public class S3InputSourceTest extends 
InitializedNullHandlingTest
         null,
         EXPECTED_LOCATION,
         null,
+        null,
+        null,
+        null,
         null
-
     );
     final S3InputSource serdeWithPrefixes =
         MAPPER.readValue(MAPPER.writeValueAsString(withPrefixes), 
S3InputSource.class);
@@ -390,6 +495,9 @@ public class S3InputSourceTest extends 
InitializedNullHandlingTest
         ImmutableList.of(),
         EXPECTED_LOCATION,
         null,
+        null,
+        null,
+        null,
         null
     );
     final S3InputSource serdeWithPrefixes =
@@ -410,6 +518,9 @@ public class S3InputSourceTest extends 
InitializedNullHandlingTest
         null,
         null,
         null,
+        null,
+        null,
+        null,
         null
     );
   }
@@ -427,6 +538,9 @@ public class S3InputSourceTest extends 
InitializedNullHandlingTest
         null,
         EXPECTED_OBJECTS,
         null,
+        null,
+        null,
+        null,
         null
     );
   }
@@ -444,6 +558,9 @@ public class S3InputSourceTest extends 
InitializedNullHandlingTest
         PREFIXES,
         EXPECTED_OBJECTS,
         null,
+        null,
+        null,
+        null,
         null
     );
   }
@@ -461,6 +578,9 @@ public class S3InputSourceTest extends 
InitializedNullHandlingTest
         PREFIXES,
         null,
         null,
+        null,
+        null,
+        null,
         null
     );
   }
@@ -478,6 +598,9 @@ public class S3InputSourceTest extends 
InitializedNullHandlingTest
         PREFIXES,
         EXPECTED_LOCATION,
         null,
+        null,
+        null,
+        null,
         null
     );
   }
@@ -495,6 +618,9 @@ public class S3InputSourceTest extends 
InitializedNullHandlingTest
         PREFIXES,
         ImmutableList.of(),
         null,
+        null,
+        null,
+        null,
         null
     );
   }
@@ -512,6 +638,9 @@ public class S3InputSourceTest extends 
InitializedNullHandlingTest
         PREFIXES,
         EXPECTED_LOCATION,
         null,
+        null,
+        null,
+        null,
         null
     );
   }
@@ -527,6 +656,9 @@ public class S3InputSourceTest extends 
InitializedNullHandlingTest
         null,
         null,
         null,
+        null,
+        null,
+        null,
         null
     );
 
@@ -549,6 +681,9 @@ public class S3InputSourceTest extends 
InitializedNullHandlingTest
         null,
         null,
         "*.csv",
+        null,
+        null,
+        null,
         null
     );
 
@@ -571,6 +706,9 @@ public class S3InputSourceTest extends 
InitializedNullHandlingTest
         null,
         OBJECTS_BEFORE_FILTER,
         "*.csv",
+        null,
+        null,
+        null,
         null
     );
 
@@ -593,6 +731,9 @@ public class S3InputSourceTest extends 
InitializedNullHandlingTest
         null,
         EXPECTED_OBJECTS,
         null,
+        null,
+        null,
+        null,
         null
     );
 
@@ -620,6 +761,9 @@ public class S3InputSourceTest extends 
InitializedNullHandlingTest
         PREFIXES,
         null,
         null,
+        null,
+        null,
+        null,
         null
     );
 
@@ -648,6 +792,9 @@ public class S3InputSourceTest extends 
InitializedNullHandlingTest
         PREFIXES,
         null,
         "*.csv",
+        null,
+        null,
+        null,
         null
     );
 
@@ -676,6 +823,9 @@ public class S3InputSourceTest extends 
InitializedNullHandlingTest
         PREFIXES,
         null,
         null,
+        null,
+        null,
+        null,
         null
     );
 
@@ -707,6 +857,9 @@ public class S3InputSourceTest extends 
InitializedNullHandlingTest
         PREFIXES,
         null,
         null,
+        null,
+        null,
+        null,
         null
     );
 
@@ -737,6 +890,9 @@ public class S3InputSourceTest extends 
InitializedNullHandlingTest
         ImmutableList.of(PREFIXES.get(0), EXPECTED_URIS.get(1)),
         null,
         null,
+        null,
+        null,
+        null,
         null
     );
 
@@ -769,6 +925,9 @@ public class S3InputSourceTest extends 
InitializedNullHandlingTest
         ImmutableList.of(PREFIXES.get(0), EXPECTED_URIS.get(1)),
         null,
         null,
+        null,
+        null,
+        null,
         null
     );
 
@@ -813,6 +972,9 @@ public class S3InputSourceTest extends 
InitializedNullHandlingTest
         null,
         null,
         null,
+        null,
+        null,
+        null,
         3 // only have three retries since they are slow
     );
 
@@ -857,6 +1019,9 @@ public class S3InputSourceTest extends 
InitializedNullHandlingTest
         ImmutableList.of(PREFIXES.get(0), EXPECTED_COMPRESSED_URIS.get(1)),
         null,
         null,
+        null,
+        null,
+        null,
         null
     );
 
diff --git a/website/.spelling b/website/.spelling
index 88a474f667..84a523e141 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -232,6 +232,7 @@ broadcasted
 checksums
 classpath
 clickstream
+clientConfig
 codebase
 codec
 colocated
@@ -266,6 +267,7 @@ druid–kubernetes-extensions
 e.g.
 encodings
 endian
+endpointConfig
 enum
 expr
 failover
@@ -378,6 +380,7 @@ procs
 programmatically
 proto
 proxied
+proxyConfig
 QPS
 quantile
 quantiles


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to