This is an automated email from the ASF dual-hosted git repository.

xuba pushed a commit to branch v0.7.x-test-front
in repository https://gitbox.apache.org/repos/asf/amoro.git

commit 23148e1e74a564320fb68a5b2d56296717b5928a
Author: Xavier Bai <[email protected]>
AuthorDate: Wed Mar 6 11:17:58 2024 +0800

    [WAP][AWS] Sweep S3 Object and load Glue's IAM information separately
---
 .../server/AmoroStaticAwsCredentialsProvider.java  |  66 ++++++++++++
 core/pom.xml                                       |   6 ++
 .../java/com/webex/arctic/io/S3SweepFileIO.java    | 109 ++++++++++++++++++++
 core/src/main/java/com/webex/arctic/io/S3URI.java  | 112 +++++++++++++++++++++
 4 files changed, 293 insertions(+)

diff --git 
a/ams/server/src/main/java/com/webex/arctic/server/AmoroStaticAwsCredentialsProvider.java
 
b/ams/server/src/main/java/com/webex/arctic/server/AmoroStaticAwsCredentialsProvider.java
new file mode 100644
index 000000000..46767b564
--- /dev/null
+++ 
b/ams/server/src/main/java/com/webex/arctic/server/AmoroStaticAwsCredentialsProvider.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.webex.arctic.server;
+
+import com.google.common.base.MoreObjects;
+import org.apache.commons.lang.StringUtils;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
+import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
+import software.amazon.awssdk.core.SdkSystemSetting;
+import software.amazon.awssdk.utils.Validate;
+
+import java.util.Map;
+
+public class AmoroStaticAwsCredentialsProvider implements 
AwsCredentialsProvider {
+
+  private final AwsCredentials credentials;
+
+  private AmoroStaticAwsCredentialsProvider(AwsCredentials credentials) {
+    this.credentials = Validate.notNull(credentials, "Credentials must not be 
null.");
+  }
+
+  @Override
+  public AwsCredentials resolveCredentials() {
+    return credentials;
+  }
+
+  public static AwsCredentialsProvider create(Map<String, String> properties) {
+    String ak = properties.get(SdkSystemSetting.AWS_ACCESS_KEY_ID.property());
+    String sk = 
properties.get(SdkSystemSetting.AWS_SECRET_ACCESS_KEY.property());
+    String session = 
properties.get(SdkSystemSetting.AWS_SESSION_TOKEN.property());
+
+    if (StringUtils.isNotBlank(ak) && StringUtils.isNotBlank(sk)) {
+      if (StringUtils.isBlank(session)) {
+        return new 
AmoroStaticAwsCredentialsProvider(AwsBasicCredentials.create(ak, sk));
+      } else {
+        return new 
AmoroStaticAwsCredentialsProvider(AwsSessionCredentials.create(ak, sk, 
session));
+      }
+    } else {
+      return DefaultCredentialsProvider.builder().build();
+    }
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this).add("credentials", 
credentials).toString();
+  }
+}
diff --git a/core/pom.xml b/core/pom.xml
index ef98bf57a..bcf023693 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -166,6 +166,12 @@
             <artifactId>lucene-core</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>s3</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
         <!-- test dependencies -->
         <dependency>
             <groupId>org.apache.iceberg</groupId>
diff --git a/core/src/main/java/com/webex/arctic/io/S3SweepFileIO.java 
b/core/src/main/java/com/webex/arctic/io/S3SweepFileIO.java
new file mode 100644
index 000000000..814eb0a18
--- /dev/null
+++ b/core/src/main/java/com/webex/arctic/io/S3SweepFileIO.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.webex.arctic.io;
+
+import org.apache.iceberg.aws.AwsClientFactory;
+import org.apache.iceberg.aws.S3FileIOAwsClientFactories;
+import org.apache.iceberg.aws.s3.S3FileIO;
+import org.apache.iceberg.aws.s3.S3FileIOAwsClientFactory;
+import org.apache.iceberg.aws.s3.S3FileIOProperties;
+import org.apache.iceberg.util.SerializableSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.DeleteMarkerEntry;
+import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
+import software.amazon.awssdk.services.s3.model.ListObjectVersionsRequest;
+import software.amazon.awssdk.services.s3.model.ListObjectVersionsResponse;
+import software.amazon.awssdk.services.s3.model.ObjectVersion;
+
+import java.util.List;
+import java.util.Map;
+
+public class S3SweepFileIO extends S3FileIO {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(S3SweepFileIO.class);
+  private SerializableSupplier<S3Client> clientSupplier;
+  private transient volatile S3Client s3Client;
+
+  @Override
+  public void initialize(Map<String, String> props) {
+    super.initialize(props);
+    if (clientSupplier == null) {
+      Object clientFactory = S3FileIOAwsClientFactories.initialize(props);
+      if (clientFactory instanceof S3FileIOAwsClientFactory) {
+        this.clientSupplier = ((S3FileIOAwsClientFactory) clientFactory)::s3;
+      }
+      if (clientFactory instanceof AwsClientFactory) {
+        this.clientSupplier = ((AwsClientFactory) clientFactory)::s3;
+      }
+    }
+  }
+
+  @Override
+  public void deleteFile(String path) {
+    super.deleteFile(path);
+
+    S3FileIOProperties properties = new S3FileIOProperties(this.properties());
+    S3URI location = new S3URI(path, properties.bucketToAccessPointMapping());
+
+    ListObjectVersionsResponse response =
+        listObjectVersionsResponse(location.bucket(), location.key());
+    List<ObjectVersion> versions = response.versions();
+    List<DeleteMarkerEntry> deleteMarkers = response.deleteMarkers();
+
+    if (deleteMarkers.isEmpty()) {
+      return;
+    }
+
+    // delete versions
+    versions.stream()
+        .map(version -> buildDeleteRequest(location, version.versionId()))
+        .forEach(s3Client()::deleteObject);
+    // delete deleteMarkers
+    deleteMarkers.stream()
+        .map(marker -> buildDeleteRequest(location, marker.versionId()))
+        .forEach(s3Client()::deleteObject);
+  }
+
+  private DeleteObjectRequest buildDeleteRequest(S3URI location, String 
versionId) {
+    return DeleteObjectRequest.builder()
+        .bucket(location.bucket())
+        .key(location.key())
+        .versionId(versionId)
+        .build();
+  }
+
+  private ListObjectVersionsResponse listObjectVersionsResponse(String bucket, 
String objectKey) {
+    ListObjectVersionsRequest request =
+        
ListObjectVersionsRequest.builder().bucket(bucket).prefix(objectKey).build();
+    return s3Client().listObjectVersions(request);
+  }
+
+  public S3Client s3Client() {
+    if (s3Client == null) {
+      synchronized (this) {
+        if (s3Client == null) {
+          s3Client = clientSupplier.get();
+        }
+      }
+    }
+    return s3Client;
+  }
+}
diff --git a/core/src/main/java/com/webex/arctic/io/S3URI.java 
b/core/src/main/java/com/webex/arctic/io/S3URI.java
new file mode 100644
index 000000000..7c1156805
--- /dev/null
+++ b/core/src/main/java/com/webex/arctic/io/S3URI.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.webex.arctic.io;
+
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+
+import java.util.Map;
+
+/** Copy from org.apache.iceberg.aws.s3.S3URI */
+public class S3URI {
+  private static final String SCHEME_DELIM = "://";
+  private static final String PATH_DELIM = "/";
+  private static final String QUERY_DELIM = "\\?";
+  private static final String FRAGMENT_DELIM = "#";
+
+  private final String location;
+  private final String scheme;
+  private final String bucket;
+  private final String key;
+
+  /**
+   * Creates a new S3URI in the form of scheme://bucket/key?query#fragment
+   *
+   * <p>The URI supports any valid URI schemes to be backwards compatible with 
s3a and s3n, and also
+   * allows users to use S3FileIO with other S3-compatible object storage 
services like GCS.
+   *
+   * @param location fully qualified URI
+   */
+  S3URI(String location) {
+    this(location, ImmutableMap.of());
+  }
+
+  /**
+   * Creates a new S3URI in the form of 
scheme://(bucket|accessPoint)/key?query#fragment with
+   * additional information on accessPoints.
+   *
+   * <p>The URI supports any valid URI schemes to be backwards compatible with 
s3a and s3n, and also
+   * allows users to use S3FileIO with other S3-compatible object storage 
services like GCS.
+   *
+   * @param location fully qualified URI
+   * @param bucketToAccessPointMapping contains mapping of bucket to access 
point
+   */
+  S3URI(String location, Map<String, String> bucketToAccessPointMapping) {
+    Preconditions.checkNotNull(location, "Location cannot be null.");
+
+    this.location = location;
+    String[] schemeSplit = location.split(SCHEME_DELIM, -1);
+    ValidationException.check(
+        schemeSplit.length == 2, "Invalid S3 URI, cannot determine scheme: 
%s", location);
+    this.scheme = schemeSplit[0];
+
+    String[] authoritySplit = schemeSplit[1].split(PATH_DELIM, 2);
+
+    this.bucket =
+        bucketToAccessPointMapping == null
+            ? authoritySplit[0]
+            : bucketToAccessPointMapping.getOrDefault(authoritySplit[0], 
authoritySplit[0]);
+
+    // Strip query and fragment if they exist
+    String path = authoritySplit.length > 1 ? authoritySplit[1] : "";
+    path = path.split(QUERY_DELIM, -1)[0];
+    path = path.split(FRAGMENT_DELIM, -1)[0];
+    this.key = path;
+  }
+
+  /** Returns S3 bucket name. */
+  public String bucket() {
+    return bucket;
+  }
+
+  /** Returns S3 object key name. */
+  public String key() {
+    return key;
+  }
+
+  /** Returns original, unmodified S3 URI location. */
+  public String location() {
+    return location;
+  }
+
+  /**
+   * Returns the original scheme provided in the location.
+   *
+   * @return uri scheme
+   */
+  public String scheme() {
+    return scheme;
+  }
+
+  @Override
+  public String toString() {
+    return location;
+  }
+}

Reply via email to