Repository: beam Updated Branches: refs/heads/master 00ea3f7d7 -> 013f11885
[BEAM-59] Beam GcsFileSystem: port expand() from GcsUtil for glob matching. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/993cd0c7 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/993cd0c7 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/993cd0c7 Branch: refs/heads/master Commit: 993cd0c7bcd161cbb794651a5594499e1dbe0c47 Parents: 00ea3f7 Author: Pei He <pe...@google.com> Authored: Mon Feb 13 17:17:55 2017 -0800 Committer: Pei He <pe...@google.com> Committed: Wed Feb 15 16:31:16 2017 -0800 ---------------------------------------------------------------------- .../DataflowPipelineTranslatorTest.java | 2 - .../runners/dataflow/DataflowRunnerTest.java | 2 - .../java/org/apache/beam/sdk/util/GcsUtil.java | 104 +++++----- .../beam/sdk/util/GcsPathValidatorTest.java | 2 - sdks/java/io/google-cloud-platform/pom.xml | 5 + .../beam/sdk/io/gcp/storage/GcsFileSystem.java | 62 ++++++ .../sdk/io/gcp/storage/GcsFileSystemTest.java | 189 +++++++++++++++++++ 7 files changed, 313 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/993cd0c7/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index 84b585a..2ff1032 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -30,7 +30,6 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; @@ -161,7 +160,6 @@ public class DataflowPipelineTranslatorTest implements Serializable { } }); when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(true); - when(mockGcsUtil.isGcsPatternSupported(anyString())).thenCallRealMethod(); DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); options.setRunner(DataflowRunner.class); http://git-wip-us.apache.org/repos/asf/beam/blob/993cd0c7/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index 4fff1c6..b2bc319 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -168,7 +168,6 @@ public class DataflowRunnerTest { StandardOpenOption.CREATE, StandardOpenOption.DELETE_ON_CLOSE); } }); - when(mockGcsUtil.isGcsPatternSupported(anyString())).thenReturn(true); when(mockGcsUtil.expand(any(GcsPath.class))).then(new Answer<List<GcsPath>>() { @Override public List<GcsPath> answer(InvocationOnMock invocation) throws Throwable { @@ -238,7 +237,6 @@ public class DataflowRunnerTest { StandardOpenOption.CREATE, StandardOpenOption.DELETE_ON_CLOSE); } }); - when(mockGcsUtil.isGcsPatternSupported(anyString())).thenReturn(true); when(mockGcsUtil.expand(any(GcsPath.class))).then(new Answer<List<GcsPath>>() { @Override public List<GcsPath> answer(InvocationOnMock invocation) throws Throwable { http://git-wip-us.apache.org/repos/asf/beam/blob/993cd0c7/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java index 44c49bc..6345867 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java @@ -160,15 +160,68 @@ public class GcsUtil { * Returns true if the given GCS pattern is supported otherwise fails with an * exception. */ - public boolean isGcsPatternSupported(String gcsPattern) { + public static boolean isGcsPatternSupported(String gcsPattern) { if (RECURSIVE_GCS_PATTERN.matcher(gcsPattern).matches()) { throw new IllegalArgumentException("Unsupported wildcard usage in \"" + gcsPattern + "\": " + " recursive wildcards are not supported."); } - return true; } + /** + * Returns the prefix portion of the glob that doesn't contain wildcards. + */ + public static String getGlobPrefix(String globExp) { + checkArgument(isGcsPatternSupported(globExp)); + Matcher m = GLOB_PREFIX.matcher(globExp); + checkArgument( + m.matches(), + String.format("Glob expression: [%s] is not expandable.", globExp)); + return m.group("PREFIX"); + } + + /** + * Expands glob expressions to regular expressions. + * + * @param globExp the glob expression to expand + * @return a string with the regular expression this glob expands to + */ + public static String globToRegexp(String globExp) { + StringBuilder dst = new StringBuilder(); + char[] src = globExp.toCharArray(); + int i = 0; + while (i < src.length) { + char c = src[i++]; + switch (c) { + case '*': + dst.append("[^/]*"); + break; + case '?': + dst.append("[^/]"); + break; + case '.': + case '+': + case '{': + case '}': + case '(': + case ')': + case '|': + case '^': + case '$': + // These need to be escaped in regular expressions + dst.append('\\').append(c); + break; + case '\\': + i = doubleSlashes(dst, src, i); + break; + default: + dst.append(c); + break; + } + } + return dst.toString(); + } + private GcsUtil( Storage storageClient, HttpRequestInitializer httpRequestInitializer, @@ -192,10 +245,9 @@ public class GcsUtil { */ public List<GcsPath> expand(GcsPath gcsPattern) throws IOException { checkArgument(isGcsPatternSupported(gcsPattern.getObject())); - Matcher m = GLOB_PREFIX.matcher(gcsPattern.getObject()); Pattern p = null; String prefix = null; - if (!m.matches()) { + if (!GLOB_PREFIX.matcher(gcsPattern.getObject()).matches()) { // Not a glob. try { // Use a get request to fetch the metadata of the object, and ignore the return value. @@ -208,7 +260,7 @@ public class GcsUtil { } } else { // Part before the first wildcard character. - prefix = m.group("PREFIX"); + prefix = getGlobPrefix(gcsPattern.getObject()); p = Pattern.compile(globToRegexp(gcsPattern.getObject())); } @@ -668,48 +720,6 @@ public class GcsUtil { return storageClient.batch(httpRequestInitializer); } - /** - * Expands glob expressions to regular expressions. - * - * @param globExp the glob expression to expand - * @return a string with the regular expression this glob expands to - */ - static String globToRegexp(String globExp) { - StringBuilder dst = new StringBuilder(); - char[] src = globExp.toCharArray(); - int i = 0; - while (i < src.length) { - char c = src[i++]; - switch (c) { - case '*': - dst.append("[^/]*"); - break; - case '?': - dst.append("[^/]"); - break; - case '.': - case '+': - case '{': - case '}': - case '(': - case ')': - case '|': - case '^': - case '$': - // These need to be escaped in regular expressions - dst.append('\\').append(c); - break; - case '\\': - i = doubleSlashes(dst, src, i); - break; - default: - dst.append(c); - break; - } - } - return dst.toString(); - } - private static int doubleSlashes(StringBuilder dst, char[] src, int i) { // Emit the next character without special interpretation dst.append('\\'); http://git-wip-us.apache.org/repos/asf/beam/blob/993cd0c7/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java index 286490d..dc36319 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.util; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.when; import org.apache.beam.sdk.options.GcsOptions; @@ -45,7 +44,6 @@ public class GcsPathValidatorTest { public void setUp() throws Exception { MockitoAnnotations.initMocks(this); when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(true); - when(mockGcsUtil.isGcsPatternSupported(anyString())).thenCallRealMethod(); GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class); options.setGcpCredential(new TestCredential()); options.setGcsUtil(mockGcsUtil); http://git-wip-us.apache.org/repos/asf/beam/blob/993cd0c7/sdks/java/io/google-cloud-platform/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/pom.xml b/sdks/java/io/google-cloud-platform/pom.xml index da345b4..95a524f 100644 --- a/sdks/java/io/google-cloud-platform/pom.xml +++ b/sdks/java/io/google-cloud-platform/pom.xml @@ -100,6 +100,11 @@ </dependency> <dependency> + <groupId>com.google.apis</groupId> + <artifactId>google-api-services-storage</artifactId> + </dependency> + + <dependency> <groupId>com.google.auto.service</groupId> <artifactId>auto-service</artifactId> <optional>true</optional> http://git-wip-us.apache.org/repos/asf/beam/blob/993cd0c7/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java index 16c4f93..1811fec 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java @@ -19,22 +19,36 @@ package org.apache.beam.sdk.io.gcp.storage; import static com.google.common.base.Preconditions.checkNotNull; +import com.google.api.services.storage.model.Objects; +import com.google.api.services.storage.model.StorageObject; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.collect.FluentIterable; import java.io.IOException; +import java.math.BigInteger; import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; import java.util.Collection; +import java.util.LinkedList; import java.util.List; +import java.util.regex.Pattern; import org.apache.beam.sdk.io.FileSystem; import org.apache.beam.sdk.io.fs.CreateOptions; import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.MatchResult.Metadata; +import org.apache.beam.sdk.io.fs.MatchResult.Status; import org.apache.beam.sdk.options.GcsOptions; +import org.apache.beam.sdk.util.GcsUtil; +import org.apache.beam.sdk.util.gcsfs.GcsPath; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * {@link FileSystem} implementation for Google Cloud Storage. */ class GcsFileSystem extends FileSystem<GcsResourceId> { + private static final Logger LOG = LoggerFactory.getLogger(GcsFileSystem.class); + private final GcsOptions options; GcsFileSystem(GcsOptions options) { @@ -76,6 +90,41 @@ class GcsFileSystem extends FileSystem<GcsResourceId> { options.getGcsUtil().copy(toFilenames(srcResourceIds), toFilenames(destResourceIds)); } + /** + * Expands a pattern into {@link MatchResult}. + * + * @throws IllegalArgumentException if {@code gcsPattern} does not contain globs. + */ + @VisibleForTesting + MatchResult expand(GcsPath gcsPattern) throws IOException { + String prefix = GcsUtil.getGlobPrefix(gcsPattern.getObject()); + Pattern p = Pattern.compile(GcsUtil.globToRegexp(gcsPattern.getObject())); + + LOG.debug("matching files in bucket {}, prefix {} against pattern {}", gcsPattern.getBucket(), + prefix, p.toString()); + + String pageToken = null; + List<Metadata> results = new LinkedList<>(); + do { + Objects objects = options.getGcsUtil().listObjects(gcsPattern.getBucket(), prefix, pageToken); + if (objects.getItems() == null) { + break; + } + + // Filter objects based on the regex. + for (StorageObject o : objects.getItems()) { + String name = o.getName(); + // Skip directories, which end with a slash. + if (p.matcher(name).matches() && !name.endsWith("/")) { + LOG.debug("Matched object: {}", name); + results.add(toMetadata(o)); + } + } + pageToken = objects.getNextPageToken(); + } while (pageToken != null); + return MatchResult.create(Status.OK, results.toArray(new Metadata[results.size()])); + } + private List<String> toFilenames(Collection<GcsResourceId> resources) { return FluentIterable.from(resources) .transform( @@ -86,4 +135,17 @@ class GcsFileSystem extends FileSystem<GcsResourceId> { }}) .toList(); } + + private Metadata toMetadata(StorageObject storageObject) { + // TODO: Address https://issues.apache.org/jira/browse/BEAM-1494 + // It is incorrect to set IsReadSeekEfficient true for files with content encoding set to gzip. + Metadata.Builder ret = Metadata.builder() + .setIsReadSeekEfficient(true) + .setResourceId(GcsResourceId.fromGcsPath(GcsPath.fromObject(storageObject))); + BigInteger size = storageObject.getSize(); + if (size != null) { + ret.setSizeBytes(size.longValue()); + } + return ret.build(); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/993cd0c7/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemTest.java new file mode 100644 index 0000000..4deb7b3 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemTest.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.storage; + +import static org.hamcrest.Matchers.contains; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.isNull; +import static org.mockito.Mockito.when; + +import com.google.api.services.storage.model.Objects; +import com.google.api.services.storage.model.StorageObject; +import com.google.common.base.Function; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableList; +import java.io.IOException; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.MatchResult.Metadata; +import org.apache.beam.sdk.options.GcsOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.util.GcsUtil; +import org.apache.beam.sdk.util.gcsfs.GcsPath; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +/** + * Tests for {@link GcsFileSystem}. + */ +@RunWith(JUnit4.class) +public class GcsFileSystemTest { + + @Rule + public transient ExpectedException thrown = ExpectedException.none(); + @Mock + private GcsUtil mockGcsUtil; + private GcsOptions gcsOptions; + private GcsFileSystem gcsFileSystem; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + gcsOptions = PipelineOptionsFactory.as(GcsOptions.class); + gcsOptions.setGcsUtil(mockGcsUtil); + gcsFileSystem = new GcsFileSystem(gcsOptions); + } + + @Test + public void testGlobExpansion() throws IOException { + Objects modelObjects = new Objects(); + List<StorageObject> items = new ArrayList<>(); + // A directory + items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/")); + + // Files within the directory + items.add(createStorageObject("gs://testbucket/testdirectory/file1name", 1L /* fileSize */)); + items.add(createStorageObject("gs://testbucket/testdirectory/file2name", 2L /* fileSize */)); + items.add(createStorageObject("gs://testbucket/testdirectory/file3name", 3L /* fileSize */)); + items.add(createStorageObject("gs://testbucket/testdirectory/otherfile", 4L /* fileSize */)); + items.add(createStorageObject("gs://testbucket/testdirectory/anotherfile", 5L /* fileSize */)); + items.add(createStorageObject( + "gs://testbucket/testotherdirectory/file4name", 6L /* fileSize */)); + + modelObjects.setItems(items); + + when(mockGcsUtil.listObjects(eq("testbucket"), anyString(), isNull(String.class))) + .thenReturn(modelObjects); + + // Test patterns. + { + GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file*"); + List<String> expectedFiles = ImmutableList.of( + "gs://testbucket/testdirectory/file1name", + "gs://testbucket/testdirectory/file2name", + "gs://testbucket/testdirectory/file3name"); + + assertThat( + expectedFiles, + contains(toFilenames(gcsFileSystem.expand(pattern)).toArray())); + } + + { + GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file*"); + List<String> expectedFiles = ImmutableList.of( + "gs://testbucket/testdirectory/file1name", + "gs://testbucket/testdirectory/file2name", + "gs://testbucket/testdirectory/file3name"); + + assertThat( + expectedFiles, + contains(toFilenames(gcsFileSystem.expand(pattern)).toArray())); + } + + { + GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file[1-3]*"); + List<String> expectedFiles = ImmutableList.of( + "gs://testbucket/testdirectory/file1name", + "gs://testbucket/testdirectory/file2name", + "gs://testbucket/testdirectory/file3name"); + + assertThat( + expectedFiles, + contains(toFilenames(gcsFileSystem.expand(pattern)).toArray())); + } + + { + GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file?name"); + List<String> expectedFiles = ImmutableList.of( + "gs://testbucket/testdirectory/file1name", + "gs://testbucket/testdirectory/file2name", + "gs://testbucket/testdirectory/file3name"); + + assertThat( + expectedFiles, + contains(toFilenames(gcsFileSystem.expand(pattern)).toArray())); + } + + { + GcsPath pattern = GcsPath.fromUri("gs://testbucket/test*ectory/fi*name"); + List<String> expectedFiles = ImmutableList.of( + "gs://testbucket/testdirectory/file1name", + "gs://testbucket/testdirectory/file2name", + "gs://testbucket/testdirectory/file3name", + "gs://testbucket/testotherdirectory/file4name"); + + assertThat( + expectedFiles, + contains(toFilenames(gcsFileSystem.expand(pattern)).toArray())); + } + } + + @Test + public void testExpandNonGlob() throws Exception { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Glob expression: [testdirectory/otherfile] is not expandable."); + gcsFileSystem.expand(GcsPath.fromUri("gs://testbucket/testdirectory/otherfile")); + } + + // Patterns that contain recursive wildcards ('**') are not supported. + @Test + public void testRecursiveGlobExpansionFails() throws IOException { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Unsupported wildcard usage"); + gcsFileSystem.expand(GcsPath.fromUri("gs://testbucket/test**")); + } + + private StorageObject createStorageObject(String gcsFilename, long fileSize) { + GcsPath gcsPath = GcsPath.fromUri(gcsFilename); + return new StorageObject() + .setBucket(gcsPath.getBucket()) + .setName(gcsPath.getObject()) + .setSize(BigInteger.valueOf(fileSize)); + } + + private List<String> toFilenames(MatchResult matchResult) throws IOException { + return FluentIterable + .from(matchResult.metadata()) + .transform(new Function<Metadata, String>() { + @Override + public String apply(Metadata metadata) { + return ((GcsResourceId) metadata.resourceId()).getGcsPath().toString(); + }}) + .toList(); + } +}