>From Michael Blow <[email protected]>: Michael Blow has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20568?usp=email )
Change subject: [NO ISSUE][*DB][STO] Enable round/robin dns resolution for async client ...................................................................... [NO ISSUE][*DB][STO] Enable round/robin dns resolution for async client Ext-ref: MB-69061 Change-Id: I93fce9317774b368965f224179ca177dfbf0a45c Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20568 Tested-by: Michael Blow <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Michael Blow <[email protected]> Reviewed-by: Ali Alsuliman <[email protected]> --- A asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/RoundRobinAddressResolverGroup.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java A asterixdb/asterix-cloud/src/main/java/software/amazon/awssdk/http/nio/netty/internal/CustomResolverBootstrapProvider.java M asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java 6 files changed, 238 insertions(+), 20 deletions(-) Approvals: Jenkins: Verified Ali Alsuliman: Looks good to me, approved Michael Blow: Looks good to me, but someone else must approve; Verified diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/RoundRobinAddressResolverGroup.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/RoundRobinAddressResolverGroup.java new file mode 100644 index 0000000..50f11bf --- /dev/null +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/RoundRobinAddressResolverGroup.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.cloud.clients.aws.s3; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.List; +import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Stream; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import io.netty.resolver.AbstractAddressResolver; +import io.netty.resolver.AddressResolver; +import io.netty.resolver.AddressResolverGroup; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.Promise; + +public final class RoundRobinAddressResolverGroup extends AddressResolverGroup<InetSocketAddress> { + private static final Logger LOGGER = LogManager.getLogger(); + + @Override + protected AddressResolver<InetSocketAddress> newResolver(EventExecutor eventExecutor) throws Exception { + return new AbstractAddressResolver<>(eventExecutor) { + static final AtomicInteger counter = new AtomicInteger(new Random().nextInt()); + + @Override + protected boolean doIsResolved(InetSocketAddress address) { + return !address.isUnresolved(); + } + + @Override + protected void doResolve(InetSocketAddress unresolved, Promise<InetSocketAddress> promise) { + try { + InetAddress[] all = InetAddress.getAllByName(unresolved.getHostString()); + int index = Math.floorMod(counter.getAndIncrement(), all.length); + InetSocketAddress resolved = new InetSocketAddress(all[index], unresolved.getPort()); + LOGGER.debug("resolved {} ({}/{})", resolved, index + 1, all.length); + promise.setSuccess(resolved); + } catch (Throwable t) { + promise.setFailure(t); + } + } + + @Override + protected void doResolveAll(InetSocketAddress unresolved, Promise<List<InetSocketAddress>> promise) { + try { + List<InetSocketAddress> list = Stream.of(InetAddress.getAllByName(unresolved.getHostString())) + .map(addr -> new InetSocketAddress(addr, unresolved.getPort())).toList(); + promise.setSuccess(list); + } catch (Throwable t) { + promise.setFailure(t); + } + } + + }; + } +} diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java index a75eb4b..c9046a7 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java @@ -49,12 +49,13 @@ private final boolean storageListEventuallyConsistent; private final int s3ReadTimeoutInSeconds; private final S3ParallelDownloaderClientType parallelDownloaderClientType; + private final boolean roundRobinDnsResolver; public S3ClientConfig(String region, String endpoint, String prefix, boolean anonymousAuth, - long profilerLogInterval, int writeBufferSize, - S3ParallelDownloaderClientType parallelDownloaderClientType) { + long profilerLogInterval, int writeBufferSize, S3ParallelDownloaderClientType parallelDownloaderClientType, + boolean roundRobinDnsResolver) { this(region, endpoint, prefix, anonymousAuth, profilerLogInterval, writeBufferSize, 1, 0, 0, 0, false, false, - false, 0, 0, -1, parallelDownloaderClientType); + false, 0, 0, -1, parallelDownloaderClientType, roundRobinDnsResolver); } private S3ClientConfig(String region, String endpoint, String prefix, boolean anonymousAuth, @@ -62,7 +63,7 @@ int readMaxRequestsPerSeconds, int requestsMaxHttpConnections, boolean forcePathStyle, boolean disableSslVerify, boolean storageListEventuallyConsistent, int requestsMaxPendingHttpConnections, int requestsHttpConnectionAcquireTimeout, int s3ReadTimeoutInSeconds, - S3ParallelDownloaderClientType parallelDownloaderClientType) { + S3ParallelDownloaderClientType parallelDownloaderClientType, boolean roundRobinDnsResolver) { this.region = Objects.requireNonNull(region, "region"); this.endpoint = endpoint; this.prefix = Objects.requireNonNull(prefix, "prefix"); @@ -80,6 +81,7 @@ this.storageListEventuallyConsistent = storageListEventuallyConsistent; this.s3ReadTimeoutInSeconds = s3ReadTimeoutInSeconds; this.parallelDownloaderClientType = parallelDownloaderClientType; + this.roundRobinDnsResolver = roundRobinDnsResolver; } public static S3ClientConfig of(CloudProperties cloudProperties) { @@ -92,7 +94,8 @@ cloudProperties.isStorageListEventuallyConsistent(), cloudProperties.getRequestsMaxPendingHttpConnections(), cloudProperties.getRequestsHttpConnectionAcquireTimeout(), cloudProperties.getS3ReadTimeoutInSeconds(), - S3ParallelDownloaderClientType.valueOf(cloudProperties.getS3ParallelDownloaderClientType())); + S3ParallelDownloaderClientType.valueOf(cloudProperties.getS3ParallelDownloaderClientType()), + cloudProperties.useRoundRobinDnsResolver()); } public enum S3ParallelDownloaderClientType { @@ -125,7 +128,7 @@ boolean anonymousAuth = false; return new S3ClientConfig(region, endPoint, prefix, anonymousAuth, profilerLogInterval, writeBufferSize, - S3ParallelDownloaderClientType.ASYNC); + S3ParallelDownloaderClientType.ASYNC, false); } public String getRegion() { @@ -200,4 +203,8 @@ public int getS3ReadTimeoutInSeconds() { return s3ReadTimeoutInSeconds; } + + public boolean useRoundRobinDnsResolver() { + return roundRobinDnsResolver; + } } diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java index bfb52c9..c1a92a6 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java @@ -37,10 +37,14 @@ import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.control.nc.io.IOManager; import org.apache.hyracks.util.annotations.ThreadSafe; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import software.amazon.awssdk.http.SdkHttpConfigurationOption; import software.amazon.awssdk.http.async.SdkAsyncHttpClient; import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; +import software.amazon.awssdk.http.nio.netty.SdkEventLoopGroup; +import software.amazon.awssdk.http.nio.netty.internal.CustomResolverBootstrapProvider; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3AsyncClientBuilder; @@ -56,22 +60,29 @@ import software.amazon.awssdk.transfer.s3.model.FileDownload; import software.amazon.awssdk.utils.AttributeMap; +/** + * Parallel S3 downloader with optional custom DNS resolution. + */ @ThreadSafe class S3ParallelDownloader extends AbstractParallelDownloader { + private static final Logger LOGGER = LogManager.getLogger(); private final String bucket; private final IOManager ioManager; private final S3AsyncClient s3AsyncClient; private final S3TransferManager transferManager; private final S3ClientConfig config; private final IRequestProfilerLimiter profiler; + // Shared event loop group (reused across Netty client instances if multiple downloaders are created) + private static final SdkEventLoopGroup SHARED_EVENT_LOOP = SdkEventLoopGroup.builder() + .numberOfThreads(Math.max(2, Runtime.getRuntime().availableProcessors())).build(); S3ParallelDownloader(String bucket, IOManager ioManager, S3ClientConfig config, IRequestProfilerLimiter profiler) { this.bucket = bucket; this.ioManager = ioManager; this.config = config; this.profiler = profiler; - s3AsyncClient = createAsyncClient(config); - transferManager = createS3TransferManager(s3AsyncClient); + this.s3AsyncClient = createAsyncClient(config); + this.transferManager = createS3TransferManager(s3AsyncClient); } @Override @@ -94,6 +105,7 @@ public void close() { transferManager.close(); s3AsyncClient.close(); + // Do NOT close SHARED_EVENT_LOOP here (shared globally). Provide a separate shutdown hook if needed. } private void downloadFilesAndWait(Collection<FileReference> toDownload) @@ -187,32 +199,44 @@ builder.credentialsProvider(config.createCredentialsProvider()); builder.region(Region.of(config.getRegion())); builder.forcePathStyle(config.isForcePathStyle()); - AttributeMap.Builder customHttpConfigBuilder = AttributeMap.builder(); if (config.getEndpoint() != null && !config.getEndpoint().isEmpty()) { builder.endpointOverride(URI.create(config.getEndpoint())); } + AttributeMap.Builder httpOptions = AttributeMap.builder(); if (config.isDisableSslVerify()) { - customHttpConfigBuilder.put(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, true); + httpOptions.put(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, true); } if (config.getRequestsMaxHttpConnections() > 0) { - customHttpConfigBuilder.put(SdkHttpConfigurationOption.MAX_CONNECTIONS, - config.getRequestsMaxHttpConnections()); + httpOptions.put(SdkHttpConfigurationOption.MAX_CONNECTIONS, config.getRequestsMaxHttpConnections()); } if (config.getRequestsMaxPendingHttpConnections() > 0) { - customHttpConfigBuilder.put(SdkHttpConfigurationOption.MAX_PENDING_CONNECTION_ACQUIRES, + httpOptions.put(SdkHttpConfigurationOption.MAX_PENDING_CONNECTION_ACQUIRES, config.getRequestsMaxPendingHttpConnections()); } if (config.getRequestsHttpConnectionAcquireTimeout() > 0) { - customHttpConfigBuilder.put(SdkHttpConfigurationOption.CONNECTION_ACQUIRE_TIMEOUT, + httpOptions.put(SdkHttpConfigurationOption.CONNECTION_ACQUIRE_TIMEOUT, Duration.ofSeconds(config.getRequestsHttpConnectionAcquireTimeout())); } if (config.getS3ReadTimeoutInSeconds() > 0) { - customHttpConfigBuilder.put(SdkHttpConfigurationOption.READ_TIMEOUT, + httpOptions.put(SdkHttpConfigurationOption.READ_TIMEOUT, Duration.ofSeconds(config.getS3ReadTimeoutInSeconds())); } - SdkAsyncHttpClient nettyHttpClient = - NettyNioAsyncHttpClient.builder().buildWithDefaults(customHttpConfigBuilder.build()); - builder.httpClient(nettyHttpClient); + + NettyNioAsyncHttpClient.Builder nettyBuilder = + NettyNioAsyncHttpClient.builder().eventLoopGroup(SHARED_EVENT_LOOP); + + SdkAsyncHttpClient nettyClient = nettyBuilder.buildWithDefaults(httpOptions.build()); + if (config.useRoundRobinDnsResolver()) { + try { + CustomResolverBootstrapProvider.bindTo(nettyClient, new RoundRobinAddressResolverGroup()); + } catch (Exception e) { + LOGGER.warn( + "failed to bind RoundRobinDnsBootstrapProvider to Netty client, falling back to default resolver", + e); + } + } + + builder.httpClient(nettyClient); return builder.build(); } @@ -230,4 +254,5 @@ private S3TransferManager createS3TransferManager(S3AsyncClient s3AsyncClient) { return S3TransferManager.builder().s3Client(s3AsyncClient).build(); } + } diff --git a/asterixdb/asterix-cloud/src/main/java/software/amazon/awssdk/http/nio/netty/internal/CustomResolverBootstrapProvider.java b/asterixdb/asterix-cloud/src/main/java/software/amazon/awssdk/http/nio/netty/internal/CustomResolverBootstrapProvider.java new file mode 100644 index 0000000..4bf9386 --- /dev/null +++ b/asterixdb/asterix-cloud/src/main/java/software/amazon/awssdk/http/nio/netty/internal/CustomResolverBootstrapProvider.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// original copyright appears below +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.http.nio.netty.internal; + +import java.lang.reflect.Field; +import java.net.InetSocketAddress; + +import io.netty.bootstrap.Bootstrap; +import io.netty.resolver.AddressResolverGroup; +import software.amazon.awssdk.http.async.SdkAsyncHttpClient; +import software.amazon.awssdk.http.nio.netty.SdkEventLoopGroup; + +/** + * The primary purpose of this Bootstrap provider is to ensure that all Bootstraps created by it are 'unresolved' + * InetSocketAddress. This is to prevent Netty from caching the resolved address of a host and then re-using it in + * subsequent connection attempts, and instead deferring to the JVM to handle address resolution and caching. + */ +public class CustomResolverBootstrapProvider extends BootstrapProvider { + private final AddressResolverGroup<InetSocketAddress> customResolverGroup; + + CustomResolverBootstrapProvider(SdkEventLoopGroup sdkEventLoopGroup, NettyConfiguration nettyConfiguration, + SdkChannelOptions sdkChannelOptions, AddressResolverGroup<InetSocketAddress> customResolverGroup) { + super(sdkEventLoopGroup, nettyConfiguration, sdkChannelOptions); + this.customResolverGroup = customResolverGroup; + } + + public static void bindTo(SdkAsyncHttpClient nettyClient, + AddressResolverGroup<InetSocketAddress> customResolverGroup) throws Exception { + + Field poolsField = getAccessibleField(nettyClient.getClass(), "pools"); + Field bootstrapProviderField = getAccessibleField(AwaitCloseChannelPoolMap.class, "bootstrapProvider"); + Object poolsInstance = poolsField.get(nettyClient); + Object bootstrapProviderInstance = bootstrapProviderField.get(poolsInstance); + SdkEventLoopGroup sdkEventLoopGroup = + (SdkEventLoopGroup) getAccessibleField(BootstrapProvider.class, "sdkEventLoopGroup") + .get(bootstrapProviderInstance); + NettyConfiguration nettyConfiguration = + (NettyConfiguration) getAccessibleField(BootstrapProvider.class, "nettyConfiguration") + .get(bootstrapProviderInstance); + SdkChannelOptions sdkChannelOptions = + (SdkChannelOptions) getAccessibleField(BootstrapProvider.class, "sdkChannelOptions") + .get(bootstrapProviderInstance); + bootstrapProviderField.set(poolsInstance, new CustomResolverBootstrapProvider(sdkEventLoopGroup, + nettyConfiguration, sdkChannelOptions, customResolverGroup)); + } + + private static Field getAccessibleField(Class<?> clazz, String fieldName) throws NoSuchFieldException { + Field field = clazz.getDeclaredField(fieldName); + field.setAccessible(true); + return field; + } + + /** + * Creates a Bootstrap for a specific host and port with an unresolved InetSocketAddress as the remoteAddress. + * + * @param host The unresolved remote hostname + * @param port The remote port + * @param useNonBlockingDnsResolver If true, uses the default non-blocking DNS resolver from Netty. Otherwise, the default + * JDK blocking DNS resolver will be used. + * @return A newly created Bootstrap using the configuration this provider was initialized with, and having an unresolved + * remote address. + */ + public Bootstrap createBootstrap(String host, int port, Boolean useNonBlockingDnsResolver) { + Bootstrap bootstrap = super.createBootstrap(host, port, useNonBlockingDnsResolver); + bootstrap.resolver(customResolverGroup); + return bootstrap; + } + +} diff --git a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java index 5f8ee8c..382bb4f 100644 --- a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java +++ b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java @@ -68,7 +68,7 @@ LOGGER.info("Client created successfully"); int writeBufferSize = StorageUtil.getIntSizeInBytes(5, StorageUtil.StorageUnit.MEGABYTE); S3ClientConfig config = new S3ClientConfig(MOCK_SERVER_REGION, MOCK_SERVER_HOSTNAME, "", true, 0, - writeBufferSize, S3ClientConfig.S3ParallelDownloaderClientType.ASYNC); + writeBufferSize, S3ClientConfig.S3ParallelDownloaderClientType.ASYNC, false); CLOUD_CLIENT = new S3CloudClient(config, ICloudGuardian.NoOpCloudGuardian.INSTANCE); } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java index 50ec1f0..abe6560 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java @@ -82,7 +82,8 @@ CLOUD_STORAGE_S3_PARALLEL_DOWNLOADER_CLIENT_TYPE(STRING, (Function<IApplicationConfig, String>) app -> { String endpoint = app.getString(CLOUD_STORAGE_ENDPOINT); return (endpoint == null || endpoint.isEmpty()) ? "crt" : "async"; - }); + }), + CLOUD_STORAGE_S3_USE_ROUND_ROBIN_DNS_RESOLVER(BOOLEAN, false),; private final IOptionType interpreter; private final Object defaultValue; @@ -202,6 +203,9 @@ return "The read timeout (in seconds) for S3 sync client (-1 means SDK default)"; case CLOUD_STORAGE_S3_PARALLEL_DOWNLOADER_CLIENT_TYPE: return "The S3 client to use for parallel downloads (crt, async or sync)"; + case CLOUD_STORAGE_S3_USE_ROUND_ROBIN_DNS_RESOLVER: + return "Whether or not to use a round-robin DNS resolver for S3 client connections. Currently" + + " only applicable when using the async S3 client for parallel downloads."; default: throw new IllegalStateException("NYI: " + this); } @@ -336,4 +340,8 @@ public int getS3ReadTimeoutInSeconds() { return accessor.getInt(Option.CLOUD_STORAGE_S3_CLIENT_READ_TIMEOUT); } + + public boolean useRoundRobinDnsResolver() { + return accessor.getBoolean(Option.CLOUD_STORAGE_S3_USE_ROUND_ROBIN_DNS_RESOLVER); + } } -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20568?usp=email To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings?usp=email Gerrit-MessageType: merged Gerrit-Project: asterixdb Gerrit-Branch: phoenix Gerrit-Change-Id: I93fce9317774b368965f224179ca177dfbf0a45c Gerrit-Change-Number: 20568 Gerrit-PatchSet: 4 Gerrit-Owner: Michael Blow <[email protected]> Gerrit-Reviewer: Ali Alsuliman <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]>
