ettirapp commented on a change in pull request #12581:
URL: https://github.com/apache/beam/pull/12581#discussion_r473344165
##########
File path:
sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystem.java
##########
@@ -17,67 +17,438 @@
*/
package org.apache.beam.sdk.io.azure.blobstore;
+import static java.nio.channels.Channels.newChannel;
+import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import com.azure.core.http.rest.PagedIterable;
+import com.azure.storage.blob.BlobClient;
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.BlobServiceClientBuilder;
+import com.azure.storage.blob.models.BlobItem;
+import com.azure.storage.blob.models.BlobProperties;
+import com.azure.storage.blob.models.BlobStorageException;
+import com.azure.storage.blob.models.ListBlobsOptions;
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.SharedAccessAccountPolicy;
+import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.OutputStream;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
+import java.time.Duration;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Date;
+import java.util.Iterator;
import java.util.List;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
import org.apache.beam.sdk.io.FileSystem;
+import org.apache.beam.sdk.io.azure.options.BlobstoreClientBuilderFactory;
+import org.apache.beam.sdk.io.azure.options.BlobstoreOptions;
import org.apache.beam.sdk.io.fs.CreateOptions;
import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.util.InstanceBuilder;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.FluentIterable;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
class AzureBlobStoreFileSystem extends FileSystem<AzfsResourceId> {
+ private static final Logger LOG =
LoggerFactory.getLogger(AzureBlobStoreFileSystem.class);
+
+ private static final ImmutableSet<String> NON_READ_SEEK_EFFICIENT_ENCODINGS =
+ ImmutableSet.of("gzip");
+
+ private Supplier<BlobServiceClient> client;
+ private final BlobstoreOptions options;
+
+ AzureBlobStoreFileSystem(BlobstoreOptions options) {
+ this.options = checkNotNull(options, "options");
+
+ BlobServiceClientBuilder builder =
+ InstanceBuilder.ofType(BlobstoreClientBuilderFactory.class)
+ .fromClass(options.getBlobstoreClientFactoryClass())
+ .build()
+ .createBuilder(options);
+
+ // The Supplier is to make sure we don't call .build() unless we are
actually using Azure.
+ client = Suppliers.memoize(builder::buildClient);
+ }
+
+ @VisibleForTesting
+ void setClient(BlobServiceClient client) {
+ this.client = Suppliers.ofInstance(client);
+ }
+
+ @VisibleForTesting
+ BlobServiceClient getClient() {
+ return client.get();
+ }
+
@Override
protected String getScheme() {
- return "azfs";
+ return AzfsResourceId.SCHEME;
}
@Override
- protected List<MatchResult> match(List<String> specs) throws IOException {
- // TODO
- return null;
+ protected List<MatchResult> match(List<String> specs) {
+ List<AzfsResourceId> paths =
+
specs.stream().map(AzfsResourceId::fromUri).collect(Collectors.toList());
+ List<AzfsResourceId> globs = new ArrayList<>();
+ List<AzfsResourceId> nonGlobs = new ArrayList<>();
+ List<Boolean> isGlobBooleans = new ArrayList<>();
+
+ for (AzfsResourceId path : paths) {
+ if (path.isWildcard()) {
+ globs.add(path);
+ isGlobBooleans.add(true);
+ } else {
+ nonGlobs.add(path);
+ isGlobBooleans.add(false);
+ }
+ }
+
+ Iterator<MatchResult> globMatches = matchGlobPaths(globs).iterator();
+ Iterator<MatchResult> nonGlobMatches =
matchNonGlobPaths(nonGlobs).iterator();
+
+ ImmutableList.Builder<MatchResult> matchResults = ImmutableList.builder();
+ for (Boolean isGlob : isGlobBooleans) {
+ if (isGlob) {
+ checkState(globMatches.hasNext(), "Expect globMatches has next.");
+ matchResults.add(globMatches.next());
+ } else {
+ checkState(nonGlobMatches.hasNext(), "Expect nonGlobMatches has
next.");
+ matchResults.add(nonGlobMatches.next());
+ }
+ }
+ checkState(!globMatches.hasNext(), "Expect no more elements in
globMatches.");
+ checkState(!nonGlobMatches.hasNext(), "Expect no more elements in
nonGlobMatches.");
+
+ return matchResults.build();
+ }
+
+ /**
+ * Expands glob expressions to regular expressions.
+ *
+ * @param globExp the glob expression to expand
+ * @return a string with the regular expression this glob expands to
+ */
+ @VisibleForTesting
+ static String wildcardToRegexp(String globExp) {
+ StringBuilder dst = new StringBuilder();
+ char[] src = globExp.replace("**/*", "**").toCharArray();
+ int i = 0;
+ while (i < src.length) {
+ char c = src[i++];
+ switch (c) {
+ case '*':
+ // One char lookahead for **
+ if (i < src.length && src[i] == '*') {
+ dst.append(".*");
+ ++i;
+ } else {
+ dst.append("[^/]*");
+ }
+ break;
+ case '?':
+ dst.append("[^/]");
+ break;
+ case '.':
+ case '+':
+ case '{':
+ case '}':
+ case '(':
+ case ')':
+ case '|':
+ case '^':
+ case '$':
+ // These need to be escaped in regular expressions
+ dst.append('\\').append(c);
+ break;
+ case '\\':
+ i = doubleSlashes(dst, src, i);
+ break;
+ default:
+ dst.append(c);
+ break;
+ }
+ }
+ return dst.toString();
+ }
+
+ private static int doubleSlashes(StringBuilder dst, char[] src, int i) {
+ // Emit the next character without special interpretation
+ dst.append("\\\\");
+ if ((i - 1) != src.length) {
Review comment:
As we discussed, there seems to be an error in this method, but we will
resolve it in a follow-up PR.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]