pabloem commented on a change in pull request #12581:
URL: https://github.com/apache/beam/pull/12581#discussion_r473193248



##########
File path: 
sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/options/BlobstoreOptions.java
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.azure.options;
+
+import com.azure.core.credential.TokenCredential;
+import com.azure.core.http.HttpClient;
+import com.azure.core.http.HttpPipeline;
+import com.azure.core.http.policy.HttpPipelinePolicy;
+import com.azure.core.util.Configuration;
+import com.azure.identity.DefaultAzureCredentialBuilder;
+import com.azure.storage.blob.models.CustomerProvidedKey;
+import com.azure.storage.common.StorageSharedKeyCredential;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import 
org.apache.beam.sdk.io.azure.blobstore.DefaultBlobstoreClientBuilderFactory;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+// TODO: Tag each option with @Default or @Nullable
+
+@Experimental(Kind.FILESYSTEM)
+/** Options used to configure Microsoft Azure Blob Storage. */
+public interface BlobstoreOptions extends PipelineOptions {
+
+  @Description(
+      "Factory class that should be created and used to create a builder of 
Azure Blobstore client."
+          + "Override the default value if you need a Azure client with custom 
properties.")
+  @Default.Class(DefaultBlobstoreClientBuilderFactory.class)
+  Class<? extends BlobstoreClientBuilderFactory> 
getBlobstoreClientFactoryClass();
+
+  void setBlobstoreClientFactoryClass(
+      Class<? extends BlobstoreClientBuilderFactory> 
blobstoreClientFactoryClass);
+
+  @Description("Adds a pipeline policy to apply on each request sent to the 
blob service client.")
+  @Nullable
+  HttpPipelinePolicy getPipelinePolicy();
+
+  void setPipelinePolicy(HttpPipelinePolicy pipelinePolicy);
+
+  /** The client configuration instance that should be used to configure Azure 
service clients. */
+  @Description(
+      "The configuration instance used to retrieve environment configuration 
values "
+          + "when building an Azure Blobstore client. Set only those that need 
custom changes.")
+  @Default.InstanceFactory(BlobstoreOptions.ConfigurationFactory.class)
+  @Nullable
+  Configuration getEnvironmentConfiguration();
+
+  void setEnvironmentConfiguration(Configuration configuration);
+
+  /** Default Azure client configuration. */
+  class ConfigurationFactory implements DefaultValueFactory<Configuration> {
+
+    @Override
+    public Configuration create(PipelineOptions options) {
+      return new Configuration();
+    }
+  }
+

Review comment:
       The Environment Configuration property is failing to serialize, and thus 
it's preventing me from running the pipeline on DirectRunner. We can work 
around this by writing a custom JSON serializer (like in S3). But maybe we can 
remove `environmentConfiguration` for now and add it on a later change?

##########
File path: 
sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystem.java
##########
@@ -17,67 +17,438 @@
  */
 package org.apache.beam.sdk.io.azure.blobstore;
 
+import static java.nio.channels.Channels.newChannel;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import com.azure.core.http.rest.PagedIterable;
+import com.azure.storage.blob.BlobClient;
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.BlobServiceClientBuilder;
+import com.azure.storage.blob.models.BlobItem;
+import com.azure.storage.blob.models.BlobProperties;
+import com.azure.storage.blob.models.BlobStorageException;
+import com.azure.storage.blob.models.ListBlobsOptions;
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.SharedAccessAccountPolicy;
+import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
+import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Date;
+import java.util.Iterator;
 import java.util.List;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 import org.apache.beam.sdk.io.FileSystem;
+import org.apache.beam.sdk.io.azure.options.BlobstoreClientBuilderFactory;
+import org.apache.beam.sdk.io.azure.options.BlobstoreOptions;
 import org.apache.beam.sdk.io.fs.CreateOptions;
 import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.util.InstanceBuilder;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.FluentIterable;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 class AzureBlobStoreFileSystem extends FileSystem<AzfsResourceId> {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(AzureBlobStoreFileSystem.class);
+
+  private static final ImmutableSet<String> NON_READ_SEEK_EFFICIENT_ENCODINGS =
+      ImmutableSet.of("gzip");
+
+  private Supplier<BlobServiceClient> client;
+  private final BlobstoreOptions options;
+
+  AzureBlobStoreFileSystem(BlobstoreOptions options) {
+    this.options = checkNotNull(options, "options");
+
+    BlobServiceClientBuilder builder =
+        InstanceBuilder.ofType(BlobstoreClientBuilderFactory.class)
+            .fromClass(options.getBlobstoreClientFactoryClass())
+            .build()
+            .createBuilder(options);
+
+    // The Supplier is to make sure we don't call .build() unless we are 
actually using Azure.
+    client = Suppliers.memoize(builder::buildClient);
+  }
+
+  @VisibleForTesting
+  void setClient(BlobServiceClient client) {
+    this.client = Suppliers.ofInstance(client);
+  }
+
+  @VisibleForTesting
+  BlobServiceClient getClient() {
+    return client.get();
+  }
+
   @Override
   protected String getScheme() {
-    return "azfs";
+    return AzfsResourceId.SCHEME;
   }
 
   @Override
-  protected List<MatchResult> match(List<String> specs) throws IOException {
-    // TODO
-    return null;
+  protected List<MatchResult> match(List<String> specs) {
+    List<AzfsResourceId> paths =
+        
specs.stream().map(AzfsResourceId::fromUri).collect(Collectors.toList());
+    List<AzfsResourceId> globs = new ArrayList<>();
+    List<AzfsResourceId> nonGlobs = new ArrayList<>();
+    List<Boolean> isGlobBooleans = new ArrayList<>();
+
+    for (AzfsResourceId path : paths) {
+      if (path.isWildcard()) {
+        globs.add(path);
+        isGlobBooleans.add(true);
+      } else {
+        nonGlobs.add(path);
+        isGlobBooleans.add(false);
+      }
+    }
+
+    Iterator<MatchResult> globMatches = matchGlobPaths(globs).iterator();
+    Iterator<MatchResult> nonGlobMatches = 
matchNonGlobPaths(nonGlobs).iterator();
+
+    ImmutableList.Builder<MatchResult> matchResults = ImmutableList.builder();
+    for (Boolean isGlob : isGlobBooleans) {
+      if (isGlob) {
+        checkState(globMatches.hasNext(), "Expect globMatches has next.");
+        matchResults.add(globMatches.next());
+      } else {
+        checkState(nonGlobMatches.hasNext(), "Expect nonGlobMatches has 
next.");
+        matchResults.add(nonGlobMatches.next());
+      }
+    }
+    checkState(!globMatches.hasNext(), "Expect no more elements in 
globMatches.");
+    checkState(!nonGlobMatches.hasNext(), "Expect no more elements in 
nonGlobMatches.");
+
+    return matchResults.build();
+  }
+
+  /**
+   * Expands glob expressions to regular expressions.
+   *
+   * @param globExp the glob expression to expand
+   * @return a string with the regular expression this glob expands to
+   */
+  @VisibleForTesting
+  static String wildcardToRegexp(String globExp) {
+    StringBuilder dst = new StringBuilder();
+    char[] src = globExp.replace("**/*", "**").toCharArray();
+    int i = 0;
+    while (i < src.length) {
+      char c = src[i++];
+      switch (c) {
+        case '*':
+          // One char lookahead for **
+          if (i < src.length && src[i] == '*') {
+            dst.append(".*");
+            ++i;
+          } else {
+            dst.append("[^/]*");
+          }
+          break;
+        case '?':
+          dst.append("[^/]");
+          break;
+        case '.':
+        case '+':
+        case '{':
+        case '}':
+        case '(':
+        case ')':
+        case '|':
+        case '^':
+        case '$':
+          // These need to be escaped in regular expressions
+          dst.append('\\').append(c);
+          break;
+        case '\\':
+          i = doubleSlashes(dst, src, i);
+          break;
+        default:
+          dst.append(c);
+          break;
+      }
+    }
+    return dst.toString();
+  }
+
+  private static int doubleSlashes(StringBuilder dst, char[] src, int i) {
+    // Emit the next character without special interpretation
+    dst.append("\\\\");
+    if ((i - 1) != src.length) {
+      dst.append(src[i]);
+      i++;
+    } else {
+      // A backslash at the very end is treated like an escaped backslash
+      dst.append('\\');
+    }
+    return i;
+  }
+
+  private List<MatchResult> matchGlobPaths(List<AzfsResourceId> globs) {
+    return FluentIterable.from(globs).transform(this::expand).toList();
+  }
+
+  /** Expands a pattern into {@link MatchResult}. */
+  @VisibleForTesting
+  MatchResult expand(AzfsResourceId azfsPattern) {
+
+    checkArgument(azfsPattern.isWildcard(), "is Wildcard");
+    String blobPrefix = azfsPattern.getBlobNonWildcardPrefix();
+    Pattern wildcardAsRegexp = 
Pattern.compile(wildcardToRegexp(azfsPattern.getBlob()));
+
+    LOG.debug(
+        "matching files in container {}, prefix {} against pattern {}",
+        azfsPattern.getContainer(),
+        blobPrefix,
+        wildcardAsRegexp.toString());
+
+    ListBlobsOptions listOptions = new 
ListBlobsOptions().setPrefix(blobPrefix);
+    Duration timeout = Duration.ZERO.plusMinutes(1);
+
+    String account = azfsPattern.getAccount();
+    String container = azfsPattern.getContainer();
+    BlobContainerClient blobContainerClient = 
client.get().getBlobContainerClient(container);
+    PagedIterable<BlobItem> blobs = blobContainerClient.listBlobs(listOptions, 
timeout);
+    List<MatchResult.Metadata> results = new ArrayList<>();
+
+    blobs.forEach(
+        blob -> {
+          String name = blob.getName();
+          if (wildcardAsRegexp.matcher(name).matches() && !name.endsWith("/")) 
{
+            LOG.debug("Matched object: {}", name);
+
+            BlobProperties properties = 
blobContainerClient.getBlobClient(name).getProperties();
+            AzfsResourceId rid =
+                AzfsResourceId.fromComponents(account, container, name)
+                    .withSize(properties.getBlobSize())
+                    
.withLastModified(Date.from(properties.getLastModified().toInstant()));
+
+            results.add(toMetadata(rid, properties.getContentEncoding()));
+          }
+        });
+
+    return MatchResult.create(MatchResult.Status.OK, results);

Review comment:
       perfect. thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to