[ 
https://issues.apache.org/jira/browse/BEAM-2500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16270080#comment-16270080
 ] 

ASF GitHub Bot commented on BEAM-2500:
--------------------------------------

jacobmarble commented on a change in pull request #4080: [BEAM-2500] Add S3 
FileSystem to Java SDK
URL: https://github.com/apache/beam/pull/4080#discussion_r153688054
 
 

 ##########
 File path: 
sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java
 ##########
 @@ -0,0 +1,599 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.aws.s3;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.AmazonS3Exception;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
+import com.amazonaws.services.s3.model.CopyPartRequest;
+import com.amazonaws.services.s3.model.CopyPartResult;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
+import com.amazonaws.services.s3.model.ListObjectsV2Request;
+import com.amazonaws.services.s3.model.ListObjectsV2Result;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PartETag;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.base.Strings;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.regex.Pattern;
+import org.apache.beam.sdk.io.FileSystem;
+import org.apache.beam.sdk.io.aws.options.S3Options;
+import org.apache.beam.sdk.io.fs.CreateOptions;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class S3FileSystem extends FileSystem<S3ResourceId> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(S3FileSystem.class);
+
+  // Amazon S3 API docs: Each part must be at least 5 MB in size, except the 
last part.
+  private static final int MINIMUM_UPLOAD_BUFFER_SIZE_BYTES = 5 * 1024 * 1024;
+  private static final int DEFAULT_UPLOAD_BUFFER_SIZE_BYTES =
+      Runtime.getRuntime().maxMemory() < 512 * 1024 * 1024
+          ? MINIMUM_UPLOAD_BUFFER_SIZE_BYTES
+          : 64 * 1024 * 1024;
+  private static final int MAX_THREADS_PER_CONCURRENT_COPY = 3;
+
+  // S3 API, delete-objects: "You may specify up to 1000 keys."
+  private static final int MAX_DELETE_OBJECTS_PER_REQUEST = 1000;
+
+  // Non-final for testing.
+  private AmazonS3 amazonS3;
+  private final String storageClass;
+  private final int s3UploadBufferSizeBytes;
+  private final int threadPoolSize;
+
+  S3FileSystem(S3Options options) {
+    checkNotNull(options, "options");
+
+    checkArgument(!Strings.isNullOrEmpty(options.getAwsAccessKeyId()),
+        "--awsAccessKeyId is required");
+    checkArgument(!Strings.isNullOrEmpty(options.getAwsSecretAccessKey()),
+        "--awsSecretAccessKey is required");
+    checkArgument(!Strings.isNullOrEmpty(options.getAwsRegion()), "--awsRegion 
is required");
+
+    AWSStaticCredentialsProvider credentialsProvider =
+        new AWSStaticCredentialsProvider(
+            new BasicAWSCredentials(options.getAwsAccessKeyId(), 
options.getAwsSecretAccessKey()));
+    amazonS3 =
+        AmazonS3ClientBuilder.standard()
+            .withCredentials(credentialsProvider)
+            .withRegion(options.getAwsRegion())
+            .build();
+
+    this.storageClass = checkNotNull(options.getS3StorageClass(), 
"storageClass");
+
+    int uploadBufferSizeBytes;
+    if (options.getS3UploadBufferSizeBytes() != null) {
+      uploadBufferSizeBytes = options.getS3UploadBufferSizeBytes();
+    } else {
+      uploadBufferSizeBytes = DEFAULT_UPLOAD_BUFFER_SIZE_BYTES;
+    }
+    this.s3UploadBufferSizeBytes =
+        Math.max(MINIMUM_UPLOAD_BUFFER_SIZE_BYTES, uploadBufferSizeBytes);
+
+    checkArgument(options.getS3ThreadPoolSize() > 0, "threadPoolSize");
+    this.threadPoolSize = options.getS3ThreadPoolSize();
+  }
+
+  @Override
+  protected String getScheme() {
+    return S3ResourceId.SCHEME;
+  }
+
+  @VisibleForTesting
+  void setAmazonS3Client(AmazonS3 amazonS3) {
+    this.amazonS3 = amazonS3;
+  }
+
+  @VisibleForTesting
+  int getS3UploadBufferSizeBytes() {
+    return s3UploadBufferSizeBytes;
+  }
+
+  @Override
+  protected List<MatchResult> match(List<String> specs) throws IOException {
+    List<S3ResourceId> paths =
+        FluentIterable.from(specs)
+            .transform(
+                new Function<String, S3ResourceId>() {
+                  @Override
+                  public S3ResourceId apply(String spec) {
+                    return S3ResourceId.fromUri(spec);
+                  }
+                })
+            .toList();
+    List<S3ResourceId> globs = Lists.newArrayList();
+    List<S3ResourceId> nonGlobs = Lists.newArrayList();
+    List<Boolean> isGlobBooleans = Lists.newArrayList();
+
+    for (S3ResourceId path : paths) {
+      if (path.isWildcard()) {
+        globs.add(path);
+        isGlobBooleans.add(true);
+      } else {
+        nonGlobs.add(path);
+        isGlobBooleans.add(false);
+      }
+    }
+
+    Iterator<MatchResult> globMatches = matchGlobPaths(globs).iterator();
+    Iterator<MatchResult> nonGlobMatches = 
matchNonGlobPaths(nonGlobs).iterator();
+
+    ImmutableList.Builder<MatchResult> matchResults = ImmutableList.builder();
+    for (Boolean isGlob : isGlobBooleans) {
+      if (isGlob) {
+        checkState(globMatches.hasNext(), "Expect globMatches has next.");
+        matchResults.add(globMatches.next());
+      } else {
+        checkState(nonGlobMatches.hasNext(), "Expect nonGlobMatches has 
next.");
+        matchResults.add(nonGlobMatches.next());
+      }
+    }
+    checkState(!globMatches.hasNext(), "Expect no more elements in 
globMatches.");
+    checkState(!nonGlobMatches.hasNext(), "Expect no more elements in 
nonGlobMatches.");
+
+    return matchResults.build();
+  }
+
+  private List<MatchResult> matchGlobPaths(Collection<S3ResourceId> paths) 
throws IOException {
+    List<Callable<MatchResult>> tasks = new ArrayList<>(paths.size());
+    for (final S3ResourceId path : paths) {
+      tasks.add(
+          new Callable<MatchResult>() {
+            @Override
+            public MatchResult call() {
+              return matchGlobPath(path);
+            }
+          });
+    }
+
+    return callTasks(tasks);
+  }
+
+  /**
+   * Gets {@link MatchResult} representing all objects that match 
wildcard-containing path.
+   */
+  @VisibleForTesting
+  MatchResult matchGlobPath(S3ResourceId path) {
+    // The S3 API can list objects, filtered by prefix, but not by wildcard.
+    // Here, we find the longest prefix without wildcard "*",
+    // then filter the results with a regex.
+    checkArgument(path.isWildcard(), "isWildcard");
+    String keyPrefix = path.getKeyNonWildcardPrefix();
+    Pattern wildcardRegexp = Pattern.compile(wildcardToRegexp(path.getKey()));
+
+    LOG.debug(
+        "matching files in bucket {}, prefix {} against pattern {}",
+        path.getBucket(),
+        keyPrefix,
+        wildcardRegexp.toString());
+
+    ImmutableList.Builder<MatchResult.Metadata> results = 
ImmutableList.builder();
+    String continuationToken = null;
+
+    do {
+      ListObjectsV2Request request =
+          new ListObjectsV2Request()
+              .withBucketName(path.getBucket())
+              .withPrefix(keyPrefix)
+              .withContinuationToken(continuationToken);
+      ListObjectsV2Result result;
+      try {
+        result = amazonS3.listObjectsV2(request);
+      } catch (AmazonClientException e) {
+        return MatchResult.create(MatchResult.Status.ERROR, new 
IOException(e));
+      }
+      continuationToken = result.getNextContinuationToken();
+
+      for (S3ObjectSummary objectSummary : result.getObjectSummaries()) {
+        // Filter against regex.
+        if (wildcardRegexp.matcher(objectSummary.getKey()).matches()) {
+          S3ResourceId objectPath =
+              S3ResourceId.fromComponents(objectSummary.getBucketName(), 
objectSummary.getKey());
+          LOG.debug("Matched S3 object: {}", objectPath);
+          results.add(createBeamMetadata(objectPath, objectSummary.getSize()));
+        }
+      }
+    } while (continuationToken != null);
+
+    return MatchResult.create(MatchResult.Status.OK, results.build());
+  }
+
+  private List<MatchResult> matchNonGlobPaths(Collection<S3ResourceId> paths) 
throws IOException {
+    List<Callable<MatchResult>> tasks = new ArrayList<>(paths.size());
+    for (final S3ResourceId path : paths) {
+      tasks.add(
+          new Callable<MatchResult>() {
+            @Override
+            public MatchResult call() {
+              return matchNonGlobPath(path);
+            }
+          });
+    }
+
+    return callTasks(tasks);
+  }
+
+  @VisibleForTesting
+  MatchResult matchNonGlobPath(S3ResourceId path) {
+    ObjectMetadata s3Metadata;
+    try {
+      s3Metadata = amazonS3.getObjectMetadata(path.getBucket(), path.getKey());
+    } catch (AmazonClientException e) {
+      if (e instanceof AmazonS3Exception && ((AmazonS3Exception) 
e).getStatusCode() == 404) {
+        return MatchResult.create(MatchResult.Status.NOT_FOUND, new 
FileNotFoundException());
+      }
+      return MatchResult.create(MatchResult.Status.ERROR, new IOException(e));
+    }
+    return MatchResult.create(
+        MatchResult.Status.OK,
+        ImmutableList.of(createBeamMetadata(path, 
s3Metadata.getContentLength())));
+  }
+
+  private static MatchResult.Metadata createBeamMetadata(S3ResourceId path, 
long sizeBytes) {
+    // TODO: Address https://issues.apache.org/jira/browse/BEAM-1494
+    // It is incorrect to set IsReadSeekEfficient true for files with content 
encoding set to gzip.
+    return MatchResult.Metadata.builder()
+        .setIsReadSeekEfficient(true)
+        .setResourceId(path)
+        .setSizeBytes(sizeBytes)
+        .build();
+  }
+
+  /**
+   * Expands glob expressions to regular expressions.
+   *
+   * @param globExp the glob expression to expand
+   * @return a string with the regular expression this glob expands to
+   */
+  @VisibleForTesting
+  static String wildcardToRegexp(String globExp) {
+    StringBuilder dst = new StringBuilder();
+    char[] src = globExp.replace("**/*", "**").toCharArray();
+    int i = 0;
+    while (i < src.length) {
+      char c = src[i++];
+      switch (c) {
+        case '*':
+          // One char lookahead for **
+          if (i < src.length && src[i] == '*') {
+            dst.append(".*");
+            ++i;
+          } else {
+            dst.append("[^/]*");
+          }
+          break;
+        case '?':
+          dst.append("[^/]");
+          break;
+        case '.':
+        case '+':
+        case '{':
+        case '}':
+        case '(':
+        case ')':
+        case '|':
+        case '^':
+        case '$':
+          // These need to be escaped in regular expressions
+          dst.append('\\').append(c);
+          break;
+        case '\\':
+          i = doubleSlashes(dst, src, i);
+          break;
+        default:
+          dst.append(c);
+          break;
+      }
+    }
+    return dst.toString();
+  }
+
+  private static int doubleSlashes(StringBuilder dst, char[] src, int i) {
+    // Emit the next character without special interpretation
+    dst.append("\\\\");
+    if ((i - 1) != src.length) {
+      dst.append(src[i]);
+      i++;
+    } else {
+      // A backslash at the very end is treated like an escaped backslash
+      dst.append('\\');
+    }
+    return i;
+  }
+
+  @Override
+  protected WritableByteChannel create(S3ResourceId resourceId, CreateOptions 
createOptions)
+      throws IOException {
+    return new S3WritableByteChannel(amazonS3, resourceId, storageClass, 
s3UploadBufferSizeBytes);
 
 Review comment:
   Done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for S3 as a Apache Beam FileSystem
> ----------------------------------------------
>
>                 Key: BEAM-2500
>                 URL: https://issues.apache.org/jira/browse/BEAM-2500
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-extensions
>            Reporter: Luke Cwik
>            Assignee: Jacob Marble
>            Priority: Minor
>         Attachments: hadoop_fs_patch.patch
>
>
> Note that this is for providing direct integration with S3 as an Apache Beam 
> FileSystem.
> There is already support for using the Hadoop S3 connector by depending on 
> the Hadoop File System module[1], configuring HadoopFileSystemOptions[2] with 
> a S3 configuration[3].
> 1: https://github.com/apache/beam/tree/master/sdks/java/io/hadoop-file-system
> 2: 
> https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java#L53
> 3: https://wiki.apache.org/hadoop/AmazonS3



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to