This is an automated email from the ASF dual-hosted git repository. xuba pushed a commit to branch v0.7.x-test-front in repository https://gitbox.apache.org/repos/asf/amoro.git
commit 23148e1e74a564320fb68a5b2d56296717b5928a Author: Xavier Bai <[email protected]> AuthorDate: Wed Mar 6 11:17:58 2024 +0800 [WAP][AWS] Sweep S3 Object and load Glue's IAM information separately --- .../server/AmoroStaticAwsCredentialsProvider.java | 66 ++++++++++++ core/pom.xml | 6 ++ .../java/com/webex/arctic/io/S3SweepFileIO.java | 109 ++++++++++++++++++++ core/src/main/java/com/webex/arctic/io/S3URI.java | 112 +++++++++++++++++++++ 4 files changed, 293 insertions(+) diff --git a/ams/server/src/main/java/com/webex/arctic/server/AmoroStaticAwsCredentialsProvider.java b/ams/server/src/main/java/com/webex/arctic/server/AmoroStaticAwsCredentialsProvider.java new file mode 100644 index 000000000..46767b564 --- /dev/null +++ b/ams/server/src/main/java/com/webex/arctic/server/AmoroStaticAwsCredentialsProvider.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.webex.arctic.server; + +import com.google.common.base.MoreObjects; +import org.apache.commons.lang.StringUtils; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.AwsSessionCredentials; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.core.SdkSystemSetting; +import software.amazon.awssdk.utils.Validate; + +import java.util.Map; + +public class AmoroStaticAwsCredentialsProvider implements AwsCredentialsProvider { + + private final AwsCredentials credentials; + + private AmoroStaticAwsCredentialsProvider(AwsCredentials credentials) { + this.credentials = Validate.notNull(credentials, "Credentials must not be null."); + } + + @Override + public AwsCredentials resolveCredentials() { + return credentials; + } + + public static AwsCredentialsProvider create(Map<String, String> properties) { + String ak = properties.get(SdkSystemSetting.AWS_ACCESS_KEY_ID.property()); + String sk = properties.get(SdkSystemSetting.AWS_SECRET_ACCESS_KEY.property()); + String session = properties.get(SdkSystemSetting.AWS_SESSION_TOKEN.property()); + + if (StringUtils.isNotBlank(ak) && StringUtils.isNotBlank(sk)) { + if (StringUtils.isBlank(session)) { + return new AmoroStaticAwsCredentialsProvider(AwsBasicCredentials.create(ak, sk)); + } else { + return new AmoroStaticAwsCredentialsProvider(AwsSessionCredentials.create(ak, sk, session)); + } + } else { + return DefaultCredentialsProvider.builder().build(); + } + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this).add("credentials", credentials).toString(); + } +} diff --git a/core/pom.xml b/core/pom.xml index ef98bf57a..bcf023693 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -166,6 +166,12 @@ <artifactId>lucene-core</artifactId> </dependency> + <dependency> + <groupId>software.amazon.awssdk</groupId> + <artifactId>s3</artifactId> + <scope>provided</scope> + </dependency> + <!-- test dependencies --> <dependency> <groupId>org.apache.iceberg</groupId> diff --git a/core/src/main/java/com/webex/arctic/io/S3SweepFileIO.java b/core/src/main/java/com/webex/arctic/io/S3SweepFileIO.java new file mode 100644 index 000000000..814eb0a18 --- /dev/null +++ b/core/src/main/java/com/webex/arctic/io/S3SweepFileIO.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.webex.arctic.io; + +import org.apache.iceberg.aws.AwsClientFactory; +import org.apache.iceberg.aws.S3FileIOAwsClientFactories; +import org.apache.iceberg.aws.s3.S3FileIO; +import org.apache.iceberg.aws.s3.S3FileIOAwsClientFactory; +import org.apache.iceberg.aws.s3.S3FileIOProperties; +import org.apache.iceberg.util.SerializableSupplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.DeleteMarkerEntry; +import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; +import software.amazon.awssdk.services.s3.model.ListObjectVersionsRequest; +import software.amazon.awssdk.services.s3.model.ListObjectVersionsResponse; +import software.amazon.awssdk.services.s3.model.ObjectVersion; + +import java.util.List; +import java.util.Map; + +public class S3SweepFileIO extends S3FileIO { + + private static final Logger LOG = LoggerFactory.getLogger(S3SweepFileIO.class); + private SerializableSupplier<S3Client> clientSupplier; + private transient volatile S3Client s3Client; + + @Override + public void initialize(Map<String, String> props) { + super.initialize(props); + if (clientSupplier == null) { + Object clientFactory = S3FileIOAwsClientFactories.initialize(props); + if (clientFactory instanceof S3FileIOAwsClientFactory) { + this.clientSupplier = ((S3FileIOAwsClientFactory) clientFactory)::s3; + } + if (clientFactory instanceof AwsClientFactory) { + this.clientSupplier = ((AwsClientFactory) clientFactory)::s3; + } + } + } + + @Override + public void deleteFile(String path) { + super.deleteFile(path); + + S3FileIOProperties properties = new S3FileIOProperties(this.properties()); + S3URI location = new S3URI(path, properties.bucketToAccessPointMapping()); + + ListObjectVersionsResponse response = + listObjectVersionsResponse(location.bucket(), location.key()); + List<ObjectVersion> versions = response.versions(); + List<DeleteMarkerEntry> deleteMarkers = response.deleteMarkers(); + + if (deleteMarkers.isEmpty()) { + return; + } + + // delete versions + versions.stream() + .map(version -> buildDeleteRequest(location, version.versionId())) + .forEach(s3Client()::deleteObject); + // delete deleteMarkers + deleteMarkers.stream() + .map(marker -> buildDeleteRequest(location, marker.versionId())) + .forEach(s3Client()::deleteObject); + } + + private DeleteObjectRequest buildDeleteRequest(S3URI location, String versionId) { + return DeleteObjectRequest.builder() + .bucket(location.bucket()) + .key(location.key()) + .versionId(versionId) + .build(); + } + + private ListObjectVersionsResponse listObjectVersionsResponse(String bucket, String objectKey) { + ListObjectVersionsRequest request = + ListObjectVersionsRequest.builder().bucket(bucket).prefix(objectKey).build(); + return s3Client().listObjectVersions(request); + } + + public S3Client s3Client() { + if (s3Client == null) { + synchronized (this) { + if (s3Client == null) { + s3Client = clientSupplier.get(); + } + } + } + return s3Client; + } +} diff --git a/core/src/main/java/com/webex/arctic/io/S3URI.java b/core/src/main/java/com/webex/arctic/io/S3URI.java new file mode 100644 index 000000000..7c1156805 --- /dev/null +++ b/core/src/main/java/com/webex/arctic/io/S3URI.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.webex.arctic.io; + +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; + +import java.util.Map; + +/** Copy from org.apache.iceberg.aws.s3.S3URI */ +public class S3URI { + private static final String SCHEME_DELIM = "://"; + private static final String PATH_DELIM = "/"; + private static final String QUERY_DELIM = "\\?"; + private static final String FRAGMENT_DELIM = "#"; + + private final String location; + private final String scheme; + private final String bucket; + private final String key; + + /** + * Creates a new S3URI in the form of scheme://bucket/key?query#fragment + * + * <p>The URI supports any valid URI schemes to be backwards compatible with s3a and s3n, and also + * allows users to use S3FileIO with other S3-compatible object storage services like GCS. + * + * @param location fully qualified URI + */ + S3URI(String location) { + this(location, ImmutableMap.of()); + } + + /** + * Creates a new S3URI in the form of scheme://(bucket|accessPoint)/key?query#fragment with + * additional information on accessPoints. + * + * <p>The URI supports any valid URI schemes to be backwards compatible with s3a and s3n, and also + * allows users to use S3FileIO with other S3-compatible object storage services like GCS. + * + * @param location fully qualified URI + * @param bucketToAccessPointMapping contains mapping of bucket to access point + */ + S3URI(String location, Map<String, String> bucketToAccessPointMapping) { + Preconditions.checkNotNull(location, "Location cannot be null."); + + this.location = location; + String[] schemeSplit = location.split(SCHEME_DELIM, -1); + ValidationException.check( + schemeSplit.length == 2, "Invalid S3 URI, cannot determine scheme: %s", location); + this.scheme = schemeSplit[0]; + + String[] authoritySplit = schemeSplit[1].split(PATH_DELIM, 2); + + this.bucket = + bucketToAccessPointMapping == null + ? authoritySplit[0] + : bucketToAccessPointMapping.getOrDefault(authoritySplit[0], authoritySplit[0]); + + // Strip query and fragment if they exist + String path = authoritySplit.length > 1 ? authoritySplit[1] : ""; + path = path.split(QUERY_DELIM, -1)[0]; + path = path.split(FRAGMENT_DELIM, -1)[0]; + this.key = path; + } + + /** Returns S3 bucket name. */ + public String bucket() { + return bucket; + } + + /** Returns S3 object key name. */ + public String key() { + return key; + } + + /** Returns original, unmodified S3 URI location. */ + public String location() { + return location; + } + + /** + * Returns the original scheme provided in the location. + * + * @return uri scheme + */ + public String scheme() { + return scheme; + } + + @Override + public String toString() { + return location; + } +}
