Repository: beam Updated Branches: refs/heads/master 027dd777d -> 5bfd3e049
[BEAM-59] Move GcsFileSystem to gcp-core It is used by both runner and IO, so should be in core. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b2a4ae2b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b2a4ae2b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b2a4ae2b Branch: refs/heads/master Commit: b2a4ae2b307b8c540ff8a40878521bf3d5e532ff Parents: 027dd77 Author: Dan Halperin <dhalp...@google.com> Authored: Tue May 2 11:08:16 2017 -0700 Committer: Dan Halperin <dhalp...@google.com> Committed: Tue May 2 16:17:57 2017 -0700 ---------------------------------------------------------------------- .../src/main/resources/beam/findbugs-filter.xml | 2 +- .../extensions/gcp/storage/GcsFileSystem.java | 266 ++++++++++++++++++ .../gcp/storage/GcsFileSystemRegistrar.java | 43 +++ .../extensions/gcp/storage/GcsResourceId.java | 128 +++++++++ .../extensions/gcp/storage/package-info.java | 21 ++ .../gcp/storage/GcsFileSystemRegistrarTest.java | 52 ++++ .../gcp/storage/GcsFileSystemTest.java | 274 +++++++++++++++++++ .../gcp/storage/GcsResourceIdTest.java | 169 ++++++++++++ sdks/java/io/google-cloud-platform/pom.xml | 5 - .../beam/sdk/io/gcp/storage/GcsFileSystem.java | 266 ------------------ .../io/gcp/storage/GcsFileSystemRegistrar.java | 43 --- .../beam/sdk/io/gcp/storage/GcsResourceId.java | 128 --------- .../beam/sdk/io/gcp/storage/package-info.java | 21 -- .../gcp/storage/GcsFileSystemRegistrarTest.java | 52 ---- .../sdk/io/gcp/storage/GcsFileSystemTest.java | 274 ------------------- .../sdk/io/gcp/storage/GcsResourceIdTest.java | 169 ------------ 16 files changed, 954 insertions(+), 959 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/b2a4ae2b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml ---------------------------------------------------------------------- diff --git a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml index d1d8b4d..28bbc3c 100644 --- a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml +++ b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml @@ -203,7 +203,7 @@ </Match> <Match> - <Class name="org.apache.beam.sdk.io.gcp.storage.GcsResourceId"/> + <Class name="org.apache.beam.sdk.extensions.gcp.storage.GcsResourceId"/> <Method name="getCurrentDirectory" /> <Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"/> <!-- http://git-wip-us.apache.org/repos/asf/beam/blob/b2a4ae2b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java new file mode 100644 index 0000000..69dd8fc --- /dev/null +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.gcp.storage; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import com.google.api.services.storage.model.Objects; +import com.google.api.services.storage.model.StorageObject; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.math.BigInteger; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.regex.Pattern; +import javax.annotation.Nullable; +import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; +import org.apache.beam.sdk.io.FileSystem; +import org.apache.beam.sdk.io.fs.CreateOptions; +import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.MatchResult.Metadata; +import org.apache.beam.sdk.io.fs.MatchResult.Status; +import org.apache.beam.sdk.util.GcsUtil; +import org.apache.beam.sdk.util.GcsUtil.StorageObjectOrIOException; +import org.apache.beam.sdk.util.gcsfs.GcsPath; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link FileSystem} implementation for Google Cloud Storage. + */ +class GcsFileSystem extends FileSystem<GcsResourceId> { + private static final Logger LOG = LoggerFactory.getLogger(GcsFileSystem.class); + + private final GcsOptions options; + + GcsFileSystem(GcsOptions options) { + this.options = checkNotNull(options, "options"); + } + + @Override + protected List<MatchResult> match(List<String> specs) throws IOException { + List<GcsPath> gcsPaths = toGcsPaths(specs); + + List<GcsPath> globs = Lists.newArrayList(); + List<GcsPath> nonGlobs = Lists.newArrayList(); + List<Boolean> isGlobBooleans = Lists.newArrayList(); + + for (GcsPath path : gcsPaths) { + if (GcsUtil.isGlob(path)) { + globs.add(path); + isGlobBooleans.add(true); + } else { + nonGlobs.add(path); + isGlobBooleans.add(false); + } + } + + Iterator<MatchResult> globsMatchResults = matchGlobs(globs).iterator(); + Iterator<MatchResult> nonGlobsMatchResults = matchNonGlobs(nonGlobs).iterator(); + + ImmutableList.Builder<MatchResult> ret = ImmutableList.builder(); + for (Boolean isGlob : isGlobBooleans) { + if (isGlob) { + checkState(globsMatchResults.hasNext(), "Expect globsMatchResults has next."); + ret.add(globsMatchResults.next()); + } else { + checkState(nonGlobsMatchResults.hasNext(), "Expect nonGlobsMatchResults has next."); + ret.add(nonGlobsMatchResults.next()); + } + } + checkState(!globsMatchResults.hasNext(), "Expect no more elements in globsMatchResults."); + checkState(!nonGlobsMatchResults.hasNext(), "Expect no more elements in nonGlobsMatchResults."); + return ret.build(); + } + + @Override + protected WritableByteChannel create(GcsResourceId resourceId, CreateOptions createOptions) + throws IOException { + return options.getGcsUtil().create(resourceId.getGcsPath(), createOptions.mimeType()); + } + + @Override + protected ReadableByteChannel open(GcsResourceId resourceId) throws IOException { + return options.getGcsUtil().open(resourceId.getGcsPath()); + } + + @Override + protected void rename( + List<GcsResourceId> srcResourceIds, + List<GcsResourceId> destResourceIds) throws IOException { + copy(srcResourceIds, destResourceIds); + delete(srcResourceIds); + } + + @Override + protected void delete(Collection<GcsResourceId> resourceIds) throws IOException { + options.getGcsUtil().remove(toFilenames(resourceIds)); + } + + @Override + protected GcsResourceId matchNewResource(String singleResourceSpec, boolean isDirectory) { + if (isDirectory) { + if (!singleResourceSpec.endsWith("/")) { + singleResourceSpec += '/'; + } + } else { + checkArgument( + !singleResourceSpec.endsWith("/"), + "Expected a file path, but [%s], ends with '/'. This is unsupported in GcsFileSystem.", + singleResourceSpec); + } + GcsPath path = GcsPath.fromUri(singleResourceSpec); + return GcsResourceId.fromGcsPath(path); + } + + @Override + protected void copy(List<GcsResourceId> srcResourceIds, List<GcsResourceId> destResourceIds) + throws IOException { + options.getGcsUtil().copy(toFilenames(srcResourceIds), toFilenames(destResourceIds)); + } + + @Override + protected String getScheme() { + return "gs"; + } + + private List<MatchResult> matchGlobs(List<GcsPath> globs) { + // TODO: Executes in parallel, address https://issues.apache.org/jira/browse/BEAM-1503. + return FluentIterable.from(globs) + .transform(new Function<GcsPath, MatchResult>() { + @Override + public MatchResult apply(GcsPath gcsPath) { + try { + return expand(gcsPath); + } catch (IOException e) { + return MatchResult.create(Status.ERROR, e); + } + }}) + .toList(); + } + + /** + * Expands a pattern into {@link MatchResult}. + * + * @throws IllegalArgumentException if {@code gcsPattern} does not contain globs. + */ + @VisibleForTesting + MatchResult expand(GcsPath gcsPattern) throws IOException { + String prefix = GcsUtil.getGlobPrefix(gcsPattern.getObject()); + Pattern p = Pattern.compile(GcsUtil.globToRegexp(gcsPattern.getObject())); + + LOG.debug("matching files in bucket {}, prefix {} against pattern {}", gcsPattern.getBucket(), + prefix, p.toString()); + + String pageToken = null; + List<Metadata> results = new LinkedList<>(); + do { + Objects objects = options.getGcsUtil().listObjects(gcsPattern.getBucket(), prefix, pageToken); + if (objects.getItems() == null) { + break; + } + + // Filter objects based on the regex. + for (StorageObject o : objects.getItems()) { + String name = o.getName(); + // Skip directories, which end with a slash. + if (p.matcher(name).matches() && !name.endsWith("/")) { + LOG.debug("Matched object: {}", name); + results.add(toMetadata(o)); + } + } + pageToken = objects.getNextPageToken(); + } while (pageToken != null); + return MatchResult.create(Status.OK, results); + } + + /** + * Returns {@link MatchResult MatchResults} for the given {@link GcsPath GcsPaths}. + * + *<p>The number of returned {@link MatchResult MatchResults} equals to the number of given + * {@link GcsPath GcsPaths}. Each {@link MatchResult} contains one {@link Metadata}. + */ + @VisibleForTesting + List<MatchResult> matchNonGlobs(List<GcsPath> gcsPaths) throws IOException { + List<StorageObjectOrIOException> results = options.getGcsUtil().getObjects(gcsPaths); + + ImmutableList.Builder<MatchResult> ret = ImmutableList.builder(); + for (StorageObjectOrIOException result : results) { + ret.add(toMatchResult(result)); + } + return ret.build(); + } + + private MatchResult toMatchResult(StorageObjectOrIOException objectOrException) { + @Nullable IOException exception = objectOrException.ioException(); + if (exception instanceof FileNotFoundException) { + return MatchResult.create(Status.NOT_FOUND, exception); + } else if (exception != null) { + return MatchResult.create(Status.ERROR, exception); + } else { + StorageObject object = objectOrException.storageObject(); + assert object != null; // fix a warning; guaranteed by StorageObjectOrIOException semantics. + return MatchResult.create(Status.OK, ImmutableList.of(toMetadata(object))); + } + } + + private Metadata toMetadata(StorageObject storageObject) { + // TODO: Address https://issues.apache.org/jira/browse/BEAM-1494 + // It is incorrect to set IsReadSeekEfficient true for files with content encoding set to gzip. + Metadata.Builder ret = Metadata.builder() + .setIsReadSeekEfficient(true) + .setResourceId(GcsResourceId.fromGcsPath(GcsPath.fromObject(storageObject))); + BigInteger size = storageObject.getSize(); + if (size != null) { + ret.setSizeBytes(size.longValue()); + } + return ret.build(); + } + + private List<String> toFilenames(Collection<GcsResourceId> resources) { + return FluentIterable.from(resources) + .transform( + new Function<GcsResourceId, String>() { + @Override + public String apply(GcsResourceId resource) { + return resource.getGcsPath().toString(); + }}) + .toList(); + } + + private List<GcsPath> toGcsPaths(Collection<String> specs) { + return FluentIterable.from(specs) + .transform(new Function<String, GcsPath>() { + @Override + public GcsPath apply(String spec) { + return GcsPath.fromUri(spec); + }}) + .toList(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/b2a4ae2b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrar.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrar.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrar.java new file mode 100644 index 0000000..9f5980a --- /dev/null +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrar.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.gcp.storage; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.auto.service.AutoService; +import com.google.common.collect.ImmutableList; +import javax.annotation.Nonnull; +import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; +import org.apache.beam.sdk.io.FileSystem; +import org.apache.beam.sdk.io.FileSystemRegistrar; +import org.apache.beam.sdk.options.PipelineOptions; + +/** + * {@link AutoService} registrar for the {@link GcsFileSystem}. + */ +@AutoService(FileSystemRegistrar.class) +public class GcsFileSystemRegistrar implements FileSystemRegistrar { + + @Override + public Iterable<FileSystem> fromOptions(@Nonnull PipelineOptions options) { + checkNotNull( + options, + "Expect the runner have called FileSystems.setDefaultConfigInWorkers()."); + return ImmutableList.<FileSystem>of(new GcsFileSystem(options.as(GcsOptions.class))); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/b2a4ae2b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsResourceId.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsResourceId.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsResourceId.java new file mode 100644 index 0000000..e53e5fa --- /dev/null +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsResourceId.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.gcp.storage; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import javax.annotation.Nullable; +import org.apache.beam.sdk.io.fs.ResolveOptions; +import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.util.gcsfs.GcsPath; + +/** + * {@link ResourceId} implementation for Google Cloud Storage. + */ +public class GcsResourceId implements ResourceId { + + private final GcsPath gcsPath; + + static GcsResourceId fromGcsPath(GcsPath gcsPath) { + checkNotNull(gcsPath, "gcsPath"); + return new GcsResourceId(gcsPath); + } + + private GcsResourceId(GcsPath gcsPath) { + this.gcsPath = gcsPath; + } + + @Override + public GcsResourceId resolve(String other, ResolveOptions resolveOptions) { + checkState( + isDirectory(), + String.format("Expected the gcsPath is a directory, but had [%s].", gcsPath)); + checkArgument( + resolveOptions.equals(StandardResolveOptions.RESOLVE_FILE) + || resolveOptions.equals(StandardResolveOptions.RESOLVE_DIRECTORY), + String.format("ResolveOptions: [%s] is not supported.", resolveOptions)); + if (resolveOptions.equals(StandardResolveOptions.RESOLVE_FILE)) { + checkArgument( + !other.endsWith("/"), + "The resolved file: [%s] should not end with '/'.", other); + return fromGcsPath(gcsPath.resolve(other)); + } else { + // StandardResolveOptions.RESOLVE_DIRECTORY + if (other.endsWith("/")) { + // other already contains the delimiter for gcs. + // It is not recommended for callers to set the delimiter. + // However, we consider it as a valid input. + return fromGcsPath(gcsPath.resolve(other)); + } else { + return fromGcsPath(gcsPath.resolve(other + "/")); + } + } + } + + @Override + public GcsResourceId getCurrentDirectory() { + if (isDirectory()) { + return this; + } else { + GcsPath parent = gcsPath.getParent(); + checkState( + parent != null, + String.format("Failed to get the current directory for path: [%s].", gcsPath)); + return fromGcsPath(parent); + } + } + + @Override + public boolean isDirectory() { + return gcsPath.endsWith("/"); + } + + @Override + public String getScheme() { + return "gs"; + } + + @Override + @Nullable public String getFilename() { + if (gcsPath.getNameCount() <= 1) { + return null; + } else { + GcsPath gcsFilename = gcsPath.getFileName(); + return gcsFilename == null ? null : gcsFilename.toString(); + } + } + + GcsPath getGcsPath() { + return gcsPath; + } + + @Override + public String toString() { + return gcsPath.toString(); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof GcsResourceId)) { + return false; + } + GcsResourceId other = (GcsResourceId) obj; + return this.gcsPath.equals(other.gcsPath); + } + + @Override + public int hashCode() { + return gcsPath.hashCode(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/b2a4ae2b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/package-info.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/package-info.java new file mode 100644 index 0000000..ee8552f --- /dev/null +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Defines IO connectors for Google Cloud Storage. + */ +package org.apache.beam.sdk.extensions.gcp.storage; http://git-wip-us.apache.org/repos/asf/beam/blob/b2a4ae2b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrarTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrarTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrarTest.java new file mode 100644 index 0000000..c9ce1e5 --- /dev/null +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrarTest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.gcp.storage; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +import com.google.common.collect.Lists; +import java.util.ServiceLoader; +import org.apache.beam.sdk.io.FileSystem; +import org.apache.beam.sdk.io.FileSystemRegistrar; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link GcsFileSystemRegistrar}. + */ +@RunWith(JUnit4.class) +public class GcsFileSystemRegistrarTest { + + @Test + public void testServiceLoader() { + for (FileSystemRegistrar registrar + : Lists.newArrayList(ServiceLoader.load(FileSystemRegistrar.class).iterator())) { + if (registrar instanceof GcsFileSystemRegistrar) { + Iterable<FileSystem> fileSystems = registrar.fromOptions(PipelineOptionsFactory.create()); + assertThat(fileSystems, contains(instanceOf(GcsFileSystem.class))); + return; + } + } + fail("Expected to find " + GcsFileSystemRegistrar.class); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/b2a4ae2b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemTest.java new file mode 100644 index 0000000..37ff9c8 --- /dev/null +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemTest.java @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.gcp.storage; + +import static org.hamcrest.Matchers.contains; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.isNull; +import static org.mockito.Mockito.when; + +import com.google.api.services.storage.model.Objects; +import com.google.api.services.storage.model.StorageObject; +import com.google.common.base.Function; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableList; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; +import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.MatchResult.Metadata; +import org.apache.beam.sdk.io.fs.MatchResult.Status; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.util.GcsUtil; +import org.apache.beam.sdk.util.GcsUtil.StorageObjectOrIOException; +import org.apache.beam.sdk.util.gcsfs.GcsPath; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +/** + * Tests for {@link GcsFileSystem}. + */ +@RunWith(JUnit4.class) +public class GcsFileSystemTest { + + @Rule + public transient ExpectedException thrown = ExpectedException.none(); + @Mock + private GcsUtil mockGcsUtil; + private GcsOptions gcsOptions; + private GcsFileSystem gcsFileSystem; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + gcsOptions = PipelineOptionsFactory.as(GcsOptions.class); + gcsOptions.setGcsUtil(mockGcsUtil); + gcsFileSystem = new GcsFileSystem(gcsOptions); + } + + @Test + public void testMatch() throws Exception { + Objects modelObjects = new Objects(); + List<StorageObject> items = new ArrayList<>(); + // A directory + items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/")); + + // Files within the directory + items.add(createStorageObject("gs://testbucket/testdirectory/file1name", 1L /* fileSize */)); + items.add(createStorageObject("gs://testbucket/testdirectory/file2name", 2L /* fileSize */)); + items.add(createStorageObject("gs://testbucket/testdirectory/file3name", 3L /* fileSize */)); + items.add(createStorageObject("gs://testbucket/testdirectory/file4name", 4L /* fileSize */)); + items.add(createStorageObject("gs://testbucket/testdirectory/otherfile", 5L /* fileSize */)); + items.add(createStorageObject("gs://testbucket/testdirectory/anotherfile", 6L /* fileSize */)); + + modelObjects.setItems(items); + when(mockGcsUtil.listObjects(eq("testbucket"), anyString(), isNull(String.class))) + .thenReturn(modelObjects); + + List<GcsPath> gcsPaths = ImmutableList.of( + GcsPath.fromUri("gs://testbucket/testdirectory/non-exist-file"), + GcsPath.fromUri("gs://testbucket/testdirectory/otherfile")); + + when(mockGcsUtil.getObjects(eq(gcsPaths))).thenReturn( + ImmutableList.of( + StorageObjectOrIOException.create(new FileNotFoundException()), + StorageObjectOrIOException.create( + createStorageObject("gs://testbucket/testdirectory/otherfile", 4L)))); + + List<String> specs = ImmutableList.of( + "gs://testbucket/testdirectory/file[1-3]*", + "gs://testbucket/testdirectory/non-exist-file", + "gs://testbucket/testdirectory/otherfile"); + List<MatchResult> matchResults = gcsFileSystem.match(specs); + assertEquals(3, matchResults.size()); + assertEquals(Status.OK, matchResults.get(0).status()); + assertThat( + ImmutableList.of( + "gs://testbucket/testdirectory/file1name", + "gs://testbucket/testdirectory/file2name", + "gs://testbucket/testdirectory/file3name"), + contains(toFilenames(matchResults.get(0)).toArray())); + assertEquals(Status.NOT_FOUND, matchResults.get(1).status()); + assertEquals(Status.OK, matchResults.get(2).status()); + assertThat( + ImmutableList.of("gs://testbucket/testdirectory/otherfile"), + contains(toFilenames(matchResults.get(2)).toArray())); + + } + + @Test + public void testGlobExpansion() throws IOException { + Objects modelObjects = new Objects(); + List<StorageObject> items = new ArrayList<>(); + // A directory + items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/")); + + // Files within the directory + items.add(createStorageObject("gs://testbucket/testdirectory/file1name", 1L /* fileSize */)); + items.add(createStorageObject("gs://testbucket/testdirectory/file2name", 2L /* fileSize */)); + items.add(createStorageObject("gs://testbucket/testdirectory/file3name", 3L /* fileSize */)); + items.add(createStorageObject("gs://testbucket/testdirectory/otherfile", 4L /* fileSize */)); + items.add(createStorageObject("gs://testbucket/testdirectory/anotherfile", 5L /* fileSize */)); + items.add(createStorageObject( + "gs://testbucket/testotherdirectory/file4name", 6L /* fileSize */)); + + modelObjects.setItems(items); + + when(mockGcsUtil.listObjects(eq("testbucket"), anyString(), isNull(String.class))) + .thenReturn(modelObjects); + + // Test patterns. + { + GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file*"); + List<String> expectedFiles = ImmutableList.of( + "gs://testbucket/testdirectory/file1name", + "gs://testbucket/testdirectory/file2name", + "gs://testbucket/testdirectory/file3name"); + + assertThat( + expectedFiles, + contains(toFilenames(gcsFileSystem.expand(pattern)).toArray())); + } + + { + GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file*"); + List<String> expectedFiles = ImmutableList.of( + "gs://testbucket/testdirectory/file1name", + "gs://testbucket/testdirectory/file2name", + "gs://testbucket/testdirectory/file3name"); + + assertThat( + expectedFiles, + contains(toFilenames(gcsFileSystem.expand(pattern)).toArray())); + } + + { + GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file[1-3]*"); + List<String> expectedFiles = ImmutableList.of( + "gs://testbucket/testdirectory/file1name", + "gs://testbucket/testdirectory/file2name", + "gs://testbucket/testdirectory/file3name"); + + assertThat( + expectedFiles, + contains(toFilenames(gcsFileSystem.expand(pattern)).toArray())); + } + + { + GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file?name"); + List<String> expectedFiles = ImmutableList.of( + "gs://testbucket/testdirectory/file1name", + "gs://testbucket/testdirectory/file2name", + "gs://testbucket/testdirectory/file3name"); + + assertThat( + expectedFiles, + contains(toFilenames(gcsFileSystem.expand(pattern)).toArray())); + } + + { + GcsPath pattern = GcsPath.fromUri("gs://testbucket/test*ectory/fi*name"); + List<String> expectedFiles = ImmutableList.of( + "gs://testbucket/testdirectory/file1name", + "gs://testbucket/testdirectory/file2name", + "gs://testbucket/testdirectory/file3name", + "gs://testbucket/testotherdirectory/file4name"); + + assertThat( + expectedFiles, + contains(toFilenames(gcsFileSystem.expand(pattern)).toArray())); + } + } + + @Test + public void testExpandNonGlob() throws Exception { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Glob expression: [testdirectory/otherfile] is not expandable."); + gcsFileSystem.expand(GcsPath.fromUri("gs://testbucket/testdirectory/otherfile")); + } + + // Patterns that contain recursive wildcards ('**') are not supported. + @Test + public void testRecursiveGlobExpansionFails() throws IOException { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Unsupported wildcard usage"); + gcsFileSystem.expand(GcsPath.fromUri("gs://testbucket/test**")); + } + + @Test + public void testMatchNonGlobs() throws Exception { + List<StorageObjectOrIOException> items = new ArrayList<>(); + // Files within the directory + items.add(StorageObjectOrIOException.create( + createStorageObject("gs://testbucket/testdirectory/file1name", 1L /* fileSize */))); + items.add(StorageObjectOrIOException.create(new FileNotFoundException())); + items.add(StorageObjectOrIOException.create(new IOException())); + items.add(StorageObjectOrIOException.create( + createStorageObject("gs://testbucket/testdirectory/file4name", 4L /* fileSize */))); + + List<GcsPath> gcsPaths = ImmutableList.of( + GcsPath.fromUri("gs://testbucket/testdirectory/file1name"), + GcsPath.fromUri("gs://testbucket/testdirectory/file2name"), + GcsPath.fromUri("gs://testbucket/testdirectory/file3name"), + GcsPath.fromUri("gs://testbucket/testdirectory/file4name")); + + when(mockGcsUtil.getObjects(eq(gcsPaths))).thenReturn(items); + List<MatchResult> matchResults = gcsFileSystem.matchNonGlobs(gcsPaths); + + assertEquals(4, matchResults.size()); + assertThat( + ImmutableList.of("gs://testbucket/testdirectory/file1name"), + contains(toFilenames(matchResults.get(0)).toArray())); + assertEquals(Status.NOT_FOUND, matchResults.get(1).status()); + assertEquals(Status.ERROR, matchResults.get(2).status()); + assertThat( + ImmutableList.of("gs://testbucket/testdirectory/file4name"), + contains(toFilenames(matchResults.get(3)).toArray())); + } + + private StorageObject createStorageObject(String gcsFilename, long fileSize) { + GcsPath gcsPath = GcsPath.fromUri(gcsFilename); + return new StorageObject() + .setBucket(gcsPath.getBucket()) + .setName(gcsPath.getObject()) + .setSize(BigInteger.valueOf(fileSize)); + } + + private List<String> toFilenames(MatchResult matchResult) throws IOException { + return FluentIterable + .from(matchResult.metadata()) + .transform(new Function<Metadata, String>() { + @Override + public String apply(Metadata metadata) { + return ((GcsResourceId) metadata.resourceId()).getGcsPath().toString(); + }}) + .toList(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/b2a4ae2b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsResourceIdTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsResourceIdTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsResourceIdTest.java new file mode 100644 index 0000000..b245610 --- /dev/null +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsResourceIdTest.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.gcp.storage; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + +import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.util.gcsfs.GcsPath; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link GcsResourceId}. + */ +@RunWith(JUnit4.class) +public class GcsResourceIdTest { + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testResolve() throws Exception { + // Tests for common gcs paths. + assertEquals( + toResourceIdentifier("gs://bucket/tmp/aa"), + toResourceIdentifier("gs://bucket/tmp/") + .resolve("aa", StandardResolveOptions.RESOLVE_FILE)); + assertEquals( + toResourceIdentifier("gs://bucket/tmp/aa/bb/cc/"), + toResourceIdentifier("gs://bucket/tmp/") + .resolve("aa", StandardResolveOptions.RESOLVE_DIRECTORY) + .resolve("bb", StandardResolveOptions.RESOLVE_DIRECTORY) + .resolve("cc", StandardResolveOptions.RESOLVE_DIRECTORY)); + + // Tests absolute path. + assertEquals( + toResourceIdentifier("gs://bucket/tmp/aa"), + toResourceIdentifier("gs://bucket/tmp/bb/") + .resolve("gs://bucket/tmp/aa", StandardResolveOptions.RESOLVE_FILE)); + + // Tests bucket with no ending '/'. + assertEquals( + toResourceIdentifier("gs://my_bucket/tmp"), + toResourceIdentifier("gs://my_bucket") + .resolve("tmp", StandardResolveOptions.RESOLVE_FILE)); + + // Tests path with unicode + assertEquals( + toResourceIdentifier("gs://bucket/è¾åº ç®å½/è¾åº æ件01.txt"), + toResourceIdentifier("gs://bucket/è¾åº ç®å½/") + .resolve("è¾åº æ件01.txt", StandardResolveOptions.RESOLVE_FILE)); + } + + @Test + public void testResolveHandleBadInputs() throws Exception { + assertEquals( + toResourceIdentifier("gs://my_bucket/tmp/"), + toResourceIdentifier("gs://my_bucket/") + .resolve("tmp/", StandardResolveOptions.RESOLVE_DIRECTORY)); + } + + @Test + public void testResolveInvalidInputs() throws Exception { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("The resolved file: [tmp/] should not end with '/'."); + toResourceIdentifier("gs://my_bucket/").resolve("tmp/", StandardResolveOptions.RESOLVE_FILE); + } + + @Test + public void testResolveInvalidNotDirectory() throws Exception { + ResourceId tmpDir = toResourceIdentifier("gs://my_bucket/") + .resolve("tmp dir", StandardResolveOptions.RESOLVE_FILE); + + thrown.expect(IllegalStateException.class); + thrown.expectMessage("Expected the gcsPath is a directory, but had [gs://my_bucket/tmp dir]."); + tmpDir.resolve("aa", StandardResolveOptions.RESOLVE_FILE); + } + + @Test + public void testGetCurrentDirectory() throws Exception { + // Tests gcs paths. + assertEquals( + toResourceIdentifier("gs://my_bucket/tmp dir/"), + toResourceIdentifier("gs://my_bucket/tmp dir/").getCurrentDirectory()); + + // Tests path with unicode. + assertEquals( + toResourceIdentifier("gs://my_bucket/è¾åº ç®å½/"), + toResourceIdentifier("gs://my_bucket/è¾åº ç®å½/æ件01.txt").getCurrentDirectory()); + + // Tests bucket with no ending '/'. + assertEquals( + toResourceIdentifier("gs://my_bucket/"), + toResourceIdentifier("gs://my_bucket").getCurrentDirectory()); + } + + @Test + public void testIsDirectory() throws Exception { + assertTrue(toResourceIdentifier("gs://my_bucket/tmp dir/").isDirectory()); + assertTrue(toResourceIdentifier("gs://my_bucket/").isDirectory()); + assertTrue(toResourceIdentifier("gs://my_bucket").isDirectory()); + + assertFalse(toResourceIdentifier("gs://my_bucket/file").isDirectory()); + } + + @Test + public void testInvalidGcsPath() throws Exception { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid GCS URI: gs://"); + toResourceIdentifier("gs://"); + } + + @Test + public void testGetScheme() throws Exception { + // Tests gcs paths. + assertEquals("gs", toResourceIdentifier("gs://my_bucket/tmp dir/").getScheme()); + + // Tests bucket with no ending '/'. + assertEquals("gs", toResourceIdentifier("gs://my_bucket").getScheme()); + } + + @Test + public void testEquals() throws Exception { + assertEquals( + toResourceIdentifier("gs://my_bucket/tmp/"), + toResourceIdentifier("gs://my_bucket/tmp/")); + + assertNotEquals( + toResourceIdentifier("gs://my_bucket/tmp"), + toResourceIdentifier("gs://my_bucket/tmp/")); + } + + @Test + public void testGetFilename() throws Exception { + assertEquals(toResourceIdentifier("gs://my_bucket/").getFilename(), null); + assertEquals(toResourceIdentifier("gs://my_bucket/abc").getFilename(), + "abc"); + assertEquals(toResourceIdentifier("gs://my_bucket/abc/").getFilename(), + "abc"); + assertEquals(toResourceIdentifier("gs://my_bucket/abc/xyz.txt").getFilename(), + "xyz.txt"); + } + + private GcsResourceId toResourceIdentifier(String str) throws Exception { + return GcsResourceId.fromGcsPath(GcsPath.fromUri(str)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/b2a4ae2b/sdks/java/io/google-cloud-platform/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/pom.xml b/sdks/java/io/google-cloud-platform/pom.xml index 3bdc5d0..9051d98 100644 --- a/sdks/java/io/google-cloud-platform/pom.xml +++ b/sdks/java/io/google-cloud-platform/pom.xml @@ -88,11 +88,6 @@ <dependency> <groupId>com.google.apis</groupId> - <artifactId>google-api-services-storage</artifactId> - </dependency> - - <dependency> - <groupId>com.google.apis</groupId> <artifactId>google-api-services-pubsub</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/beam/blob/b2a4ae2b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java deleted file mode 100644 index 2663864..0000000 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java +++ /dev/null @@ -1,266 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.storage; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; - -import com.google.api.services.storage.model.Objects; -import com.google.api.services.storage.model.StorageObject; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.collect.FluentIterable; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.math.BigInteger; -import java.nio.channels.ReadableByteChannel; -import java.nio.channels.WritableByteChannel; -import java.util.Collection; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.regex.Pattern; -import javax.annotation.Nullable; -import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; -import org.apache.beam.sdk.io.FileSystem; -import org.apache.beam.sdk.io.fs.CreateOptions; -import org.apache.beam.sdk.io.fs.MatchResult; -import org.apache.beam.sdk.io.fs.MatchResult.Metadata; -import org.apache.beam.sdk.io.fs.MatchResult.Status; -import org.apache.beam.sdk.util.GcsUtil; -import org.apache.beam.sdk.util.GcsUtil.StorageObjectOrIOException; -import org.apache.beam.sdk.util.gcsfs.GcsPath; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * {@link FileSystem} implementation for Google Cloud Storage. - */ -class GcsFileSystem extends FileSystem<GcsResourceId> { - private static final Logger LOG = LoggerFactory.getLogger(GcsFileSystem.class); - - private final GcsOptions options; - - GcsFileSystem(GcsOptions options) { - this.options = checkNotNull(options, "options"); - } - - @Override - protected List<MatchResult> match(List<String> specs) throws IOException { - List<GcsPath> gcsPaths = toGcsPaths(specs); - - List<GcsPath> globs = Lists.newArrayList(); - List<GcsPath> nonGlobs = Lists.newArrayList(); - List<Boolean> isGlobBooleans = Lists.newArrayList(); - - for (GcsPath path : gcsPaths) { - if (GcsUtil.isGlob(path)) { - globs.add(path); - isGlobBooleans.add(true); - } else { - nonGlobs.add(path); - isGlobBooleans.add(false); - } - } - - Iterator<MatchResult> globsMatchResults = matchGlobs(globs).iterator(); - Iterator<MatchResult> nonGlobsMatchResults = matchNonGlobs(nonGlobs).iterator(); - - ImmutableList.Builder<MatchResult> ret = ImmutableList.builder(); - for (Boolean isGlob : isGlobBooleans) { - if (isGlob) { - checkState(globsMatchResults.hasNext(), "Expect globsMatchResults has next."); - ret.add(globsMatchResults.next()); - } else { - checkState(nonGlobsMatchResults.hasNext(), "Expect nonGlobsMatchResults has next."); - ret.add(nonGlobsMatchResults.next()); - } - } - checkState(!globsMatchResults.hasNext(), "Expect no more elements in globsMatchResults."); - checkState(!nonGlobsMatchResults.hasNext(), "Expect no more elements in nonGlobsMatchResults."); - return ret.build(); - } - - @Override - protected WritableByteChannel create(GcsResourceId resourceId, CreateOptions createOptions) - throws IOException { - return options.getGcsUtil().create(resourceId.getGcsPath(), createOptions.mimeType()); - } - - @Override - protected ReadableByteChannel open(GcsResourceId resourceId) throws IOException { - return options.getGcsUtil().open(resourceId.getGcsPath()); - } - - @Override - protected void rename( - List<GcsResourceId> srcResourceIds, - List<GcsResourceId> destResourceIds) throws IOException { - copy(srcResourceIds, destResourceIds); - delete(srcResourceIds); - } - - @Override - protected void delete(Collection<GcsResourceId> resourceIds) throws IOException { - options.getGcsUtil().remove(toFilenames(resourceIds)); - } - - @Override - protected GcsResourceId matchNewResource(String singleResourceSpec, boolean isDirectory) { - if (isDirectory) { - if (!singleResourceSpec.endsWith("/")) { - singleResourceSpec += '/'; - } - } else { - checkArgument( - !singleResourceSpec.endsWith("/"), - "Expected a file path, but [%s], ends with '/'. This is unsupported in GcsFileSystem.", - singleResourceSpec); - } - GcsPath path = GcsPath.fromUri(singleResourceSpec); - return GcsResourceId.fromGcsPath(path); - } - - @Override - protected void copy(List<GcsResourceId> srcResourceIds, List<GcsResourceId> destResourceIds) - throws IOException { - options.getGcsUtil().copy(toFilenames(srcResourceIds), toFilenames(destResourceIds)); - } - - @Override - protected String getScheme() { - return "gs"; - } - - private List<MatchResult> matchGlobs(List<GcsPath> globs) { - // TODO: Executes in parallel, address https://issues.apache.org/jira/browse/BEAM-1503. - return FluentIterable.from(globs) - .transform(new Function<GcsPath, MatchResult>() { - @Override - public MatchResult apply(GcsPath gcsPath) { - try { - return expand(gcsPath); - } catch (IOException e) { - return MatchResult.create(Status.ERROR, e); - } - }}) - .toList(); - } - - /** - * Expands a pattern into {@link MatchResult}. - * - * @throws IllegalArgumentException if {@code gcsPattern} does not contain globs. - */ - @VisibleForTesting - MatchResult expand(GcsPath gcsPattern) throws IOException { - String prefix = GcsUtil.getGlobPrefix(gcsPattern.getObject()); - Pattern p = Pattern.compile(GcsUtil.globToRegexp(gcsPattern.getObject())); - - LOG.debug("matching files in bucket {}, prefix {} against pattern {}", gcsPattern.getBucket(), - prefix, p.toString()); - - String pageToken = null; - List<Metadata> results = new LinkedList<>(); - do { - Objects objects = options.getGcsUtil().listObjects(gcsPattern.getBucket(), prefix, pageToken); - if (objects.getItems() == null) { - break; - } - - // Filter objects based on the regex. - for (StorageObject o : objects.getItems()) { - String name = o.getName(); - // Skip directories, which end with a slash. - if (p.matcher(name).matches() && !name.endsWith("/")) { - LOG.debug("Matched object: {}", name); - results.add(toMetadata(o)); - } - } - pageToken = objects.getNextPageToken(); - } while (pageToken != null); - return MatchResult.create(Status.OK, results); - } - - /** - * Returns {@link MatchResult MatchResults} for the given {@link GcsPath GcsPaths}. - * - *<p>The number of returned {@link MatchResult MatchResults} equals to the number of given - * {@link GcsPath GcsPaths}. Each {@link MatchResult} contains one {@link Metadata}. - */ - @VisibleForTesting - List<MatchResult> matchNonGlobs(List<GcsPath> gcsPaths) throws IOException { - List<StorageObjectOrIOException> results = options.getGcsUtil().getObjects(gcsPaths); - - ImmutableList.Builder<MatchResult> ret = ImmutableList.builder(); - for (StorageObjectOrIOException result : results) { - ret.add(toMatchResult(result)); - } - return ret.build(); - } - - private MatchResult toMatchResult(StorageObjectOrIOException objectOrException) { - @Nullable IOException exception = objectOrException.ioException(); - if (exception instanceof FileNotFoundException) { - return MatchResult.create(Status.NOT_FOUND, exception); - } else if (exception != null) { - return MatchResult.create(Status.ERROR, exception); - } else { - StorageObject object = objectOrException.storageObject(); - assert object != null; // fix a warning; guaranteed by StorageObjectOrIOException semantics. - return MatchResult.create(Status.OK, ImmutableList.of(toMetadata(object))); - } - } - - private Metadata toMetadata(StorageObject storageObject) { - // TODO: Address https://issues.apache.org/jira/browse/BEAM-1494 - // It is incorrect to set IsReadSeekEfficient true for files with content encoding set to gzip. - Metadata.Builder ret = Metadata.builder() - .setIsReadSeekEfficient(true) - .setResourceId(GcsResourceId.fromGcsPath(GcsPath.fromObject(storageObject))); - BigInteger size = storageObject.getSize(); - if (size != null) { - ret.setSizeBytes(size.longValue()); - } - return ret.build(); - } - - private List<String> toFilenames(Collection<GcsResourceId> resources) { - return FluentIterable.from(resources) - .transform( - new Function<GcsResourceId, String>() { - @Override - public String apply(GcsResourceId resource) { - return resource.getGcsPath().toString(); - }}) - .toList(); - } - - private List<GcsPath> toGcsPaths(Collection<String> specs) { - return FluentIterable.from(specs) - .transform(new Function<String, GcsPath>() { - @Override - public GcsPath apply(String spec) { - return GcsPath.fromUri(spec); - }}) - .toList(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/b2a4ae2b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrar.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrar.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrar.java deleted file mode 100644 index 1d4e4ad..0000000 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrar.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.storage; - -import static com.google.common.base.Preconditions.checkNotNull; - -import com.google.auto.service.AutoService; -import com.google.common.collect.ImmutableList; -import javax.annotation.Nonnull; -import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; -import org.apache.beam.sdk.io.FileSystem; -import org.apache.beam.sdk.io.FileSystemRegistrar; -import org.apache.beam.sdk.options.PipelineOptions; - -/** - * {@link AutoService} registrar for the {@link GcsFileSystem}. - */ -@AutoService(FileSystemRegistrar.class) -public class GcsFileSystemRegistrar implements FileSystemRegistrar { - - @Override - public Iterable<FileSystem> fromOptions(@Nonnull PipelineOptions options) { - checkNotNull( - options, - "Expect the runner have called FileSystems.setDefaultConfigInWorkers()."); - return ImmutableList.<FileSystem>of(new GcsFileSystem(options.as(GcsOptions.class))); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/b2a4ae2b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceId.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceId.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceId.java deleted file mode 100644 index 29215e7..0000000 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceId.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.storage; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; - -import javax.annotation.Nullable; -import org.apache.beam.sdk.io.fs.ResolveOptions; -import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; -import org.apache.beam.sdk.io.fs.ResourceId; -import org.apache.beam.sdk.util.gcsfs.GcsPath; - -/** - * {@link ResourceId} implementation for Google Cloud Storage. - */ -public class GcsResourceId implements ResourceId { - - private final GcsPath gcsPath; - - static GcsResourceId fromGcsPath(GcsPath gcsPath) { - checkNotNull(gcsPath, "gcsPath"); - return new GcsResourceId(gcsPath); - } - - private GcsResourceId(GcsPath gcsPath) { - this.gcsPath = gcsPath; - } - - @Override - public GcsResourceId resolve(String other, ResolveOptions resolveOptions) { - checkState( - isDirectory(), - String.format("Expected the gcsPath is a directory, but had [%s].", gcsPath)); - checkArgument( - resolveOptions.equals(StandardResolveOptions.RESOLVE_FILE) - || resolveOptions.equals(StandardResolveOptions.RESOLVE_DIRECTORY), - String.format("ResolveOptions: [%s] is not supported.", resolveOptions)); - if (resolveOptions.equals(StandardResolveOptions.RESOLVE_FILE)) { - checkArgument( - !other.endsWith("/"), - "The resolved file: [%s] should not end with '/'.", other); - return fromGcsPath(gcsPath.resolve(other)); - } else { - // StandardResolveOptions.RESOLVE_DIRECTORY - if (other.endsWith("/")) { - // other already contains the delimiter for gcs. - // It is not recommended for callers to set the delimiter. - // However, we consider it as a valid input. - return fromGcsPath(gcsPath.resolve(other)); - } else { - return fromGcsPath(gcsPath.resolve(other + "/")); - } - } - } - - @Override - public GcsResourceId getCurrentDirectory() { - if (isDirectory()) { - return this; - } else { - GcsPath parent = gcsPath.getParent(); - checkState( - parent != null, - String.format("Failed to get the current directory for path: [%s].", gcsPath)); - return fromGcsPath(parent); - } - } - - @Override - public boolean isDirectory() { - return gcsPath.endsWith("/"); - } - - @Override - public String getScheme() { - return "gs"; - } - - @Override - @Nullable public String getFilename() { - if (gcsPath.getNameCount() <= 1) { - return null; - } else { - GcsPath gcsFilename = gcsPath.getFileName(); - return gcsFilename == null ? null : gcsFilename.toString(); - } - } - - GcsPath getGcsPath() { - return gcsPath; - } - - @Override - public String toString() { - return gcsPath.toString(); - } - - @Override - public boolean equals(Object obj) { - if (!(obj instanceof GcsResourceId)) { - return false; - } - GcsResourceId other = (GcsResourceId) obj; - return this.gcsPath.equals(other.gcsPath); - } - - @Override - public int hashCode() { - return gcsPath.hashCode(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/b2a4ae2b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/package-info.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/package-info.java deleted file mode 100644 index b5378be..0000000 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Defines IO connectors for Google Cloud Storage. - */ -package org.apache.beam.sdk.io.gcp.storage; http://git-wip-us.apache.org/repos/asf/beam/blob/b2a4ae2b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrarTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrarTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrarTest.java deleted file mode 100644 index 2fc337a..0000000 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrarTest.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.storage; - -import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.instanceOf; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; - -import com.google.common.collect.Lists; -import java.util.ServiceLoader; -import org.apache.beam.sdk.io.FileSystem; -import org.apache.beam.sdk.io.FileSystemRegistrar; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for {@link GcsFileSystemRegistrar}. - */ -@RunWith(JUnit4.class) -public class GcsFileSystemRegistrarTest { - - @Test - public void testServiceLoader() { - for (FileSystemRegistrar registrar - : Lists.newArrayList(ServiceLoader.load(FileSystemRegistrar.class).iterator())) { - if (registrar instanceof GcsFileSystemRegistrar) { - Iterable<FileSystem> fileSystems = registrar.fromOptions(PipelineOptionsFactory.create()); - assertThat(fileSystems, contains(instanceOf(GcsFileSystem.class))); - return; - } - } - fail("Expected to find " + GcsFileSystemRegistrar.class); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/b2a4ae2b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemTest.java deleted file mode 100644 index cc2e0c4..0000000 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemTest.java +++ /dev/null @@ -1,274 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.storage; - -import static org.hamcrest.Matchers.contains; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; -import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.eq; -import static org.mockito.Matchers.isNull; -import static org.mockito.Mockito.when; - -import com.google.api.services.storage.model.Objects; -import com.google.api.services.storage.model.StorageObject; -import com.google.common.base.Function; -import com.google.common.collect.FluentIterable; -import com.google.common.collect.ImmutableList; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.math.BigInteger; -import java.util.ArrayList; -import java.util.List; -import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; -import org.apache.beam.sdk.io.fs.MatchResult; -import org.apache.beam.sdk.io.fs.MatchResult.Metadata; -import org.apache.beam.sdk.io.fs.MatchResult.Status; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.util.GcsUtil; -import org.apache.beam.sdk.util.GcsUtil.StorageObjectOrIOException; -import org.apache.beam.sdk.util.gcsfs.GcsPath; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; - -/** - * Tests for {@link GcsFileSystem}. - */ -@RunWith(JUnit4.class) -public class GcsFileSystemTest { - - @Rule - public transient ExpectedException thrown = ExpectedException.none(); - @Mock - private GcsUtil mockGcsUtil; - private GcsOptions gcsOptions; - private GcsFileSystem gcsFileSystem; - - @Before - public void setUp() { - MockitoAnnotations.initMocks(this); - gcsOptions = PipelineOptionsFactory.as(GcsOptions.class); - gcsOptions.setGcsUtil(mockGcsUtil); - gcsFileSystem = new GcsFileSystem(gcsOptions); - } - - @Test - public void testMatch() throws Exception { - Objects modelObjects = new Objects(); - List<StorageObject> items = new ArrayList<>(); - // A directory - items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/")); - - // Files within the directory - items.add(createStorageObject("gs://testbucket/testdirectory/file1name", 1L /* fileSize */)); - items.add(createStorageObject("gs://testbucket/testdirectory/file2name", 2L /* fileSize */)); - items.add(createStorageObject("gs://testbucket/testdirectory/file3name", 3L /* fileSize */)); - items.add(createStorageObject("gs://testbucket/testdirectory/file4name", 4L /* fileSize */)); - items.add(createStorageObject("gs://testbucket/testdirectory/otherfile", 5L /* fileSize */)); - items.add(createStorageObject("gs://testbucket/testdirectory/anotherfile", 6L /* fileSize */)); - - modelObjects.setItems(items); - when(mockGcsUtil.listObjects(eq("testbucket"), anyString(), isNull(String.class))) - .thenReturn(modelObjects); - - List<GcsPath> gcsPaths = ImmutableList.of( - GcsPath.fromUri("gs://testbucket/testdirectory/non-exist-file"), - GcsPath.fromUri("gs://testbucket/testdirectory/otherfile")); - - when(mockGcsUtil.getObjects(eq(gcsPaths))).thenReturn( - ImmutableList.of( - StorageObjectOrIOException.create(new FileNotFoundException()), - StorageObjectOrIOException.create( - createStorageObject("gs://testbucket/testdirectory/otherfile", 4L)))); - - List<String> specs = ImmutableList.of( - "gs://testbucket/testdirectory/file[1-3]*", - "gs://testbucket/testdirectory/non-exist-file", - "gs://testbucket/testdirectory/otherfile"); - List<MatchResult> matchResults = gcsFileSystem.match(specs); - assertEquals(3, matchResults.size()); - assertEquals(Status.OK, matchResults.get(0).status()); - assertThat( - ImmutableList.of( - "gs://testbucket/testdirectory/file1name", - "gs://testbucket/testdirectory/file2name", - "gs://testbucket/testdirectory/file3name"), - contains(toFilenames(matchResults.get(0)).toArray())); - assertEquals(Status.NOT_FOUND, matchResults.get(1).status()); - assertEquals(Status.OK, matchResults.get(2).status()); - assertThat( - ImmutableList.of("gs://testbucket/testdirectory/otherfile"), - contains(toFilenames(matchResults.get(2)).toArray())); - - } - - @Test - public void testGlobExpansion() throws IOException { - Objects modelObjects = new Objects(); - List<StorageObject> items = new ArrayList<>(); - // A directory - items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/")); - - // Files within the directory - items.add(createStorageObject("gs://testbucket/testdirectory/file1name", 1L /* fileSize */)); - items.add(createStorageObject("gs://testbucket/testdirectory/file2name", 2L /* fileSize */)); - items.add(createStorageObject("gs://testbucket/testdirectory/file3name", 3L /* fileSize */)); - items.add(createStorageObject("gs://testbucket/testdirectory/otherfile", 4L /* fileSize */)); - items.add(createStorageObject("gs://testbucket/testdirectory/anotherfile", 5L /* fileSize */)); - items.add(createStorageObject( - "gs://testbucket/testotherdirectory/file4name", 6L /* fileSize */)); - - modelObjects.setItems(items); - - when(mockGcsUtil.listObjects(eq("testbucket"), anyString(), isNull(String.class))) - .thenReturn(modelObjects); - - // Test patterns. - { - GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file*"); - List<String> expectedFiles = ImmutableList.of( - "gs://testbucket/testdirectory/file1name", - "gs://testbucket/testdirectory/file2name", - "gs://testbucket/testdirectory/file3name"); - - assertThat( - expectedFiles, - contains(toFilenames(gcsFileSystem.expand(pattern)).toArray())); - } - - { - GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file*"); - List<String> expectedFiles = ImmutableList.of( - "gs://testbucket/testdirectory/file1name", - "gs://testbucket/testdirectory/file2name", - "gs://testbucket/testdirectory/file3name"); - - assertThat( - expectedFiles, - contains(toFilenames(gcsFileSystem.expand(pattern)).toArray())); - } - - { - GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file[1-3]*"); - List<String> expectedFiles = ImmutableList.of( - "gs://testbucket/testdirectory/file1name", - "gs://testbucket/testdirectory/file2name", - "gs://testbucket/testdirectory/file3name"); - - assertThat( - expectedFiles, - contains(toFilenames(gcsFileSystem.expand(pattern)).toArray())); - } - - { - GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file?name"); - List<String> expectedFiles = ImmutableList.of( - "gs://testbucket/testdirectory/file1name", - "gs://testbucket/testdirectory/file2name", - "gs://testbucket/testdirectory/file3name"); - - assertThat( - expectedFiles, - contains(toFilenames(gcsFileSystem.expand(pattern)).toArray())); - } - - { - GcsPath pattern = GcsPath.fromUri("gs://testbucket/test*ectory/fi*name"); - List<String> expectedFiles = ImmutableList.of( - "gs://testbucket/testdirectory/file1name", - "gs://testbucket/testdirectory/file2name", - "gs://testbucket/testdirectory/file3name", - "gs://testbucket/testotherdirectory/file4name"); - - assertThat( - expectedFiles, - contains(toFilenames(gcsFileSystem.expand(pattern)).toArray())); - } - } - - @Test - public void testExpandNonGlob() throws Exception { - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("Glob expression: [testdirectory/otherfile] is not expandable."); - gcsFileSystem.expand(GcsPath.fromUri("gs://testbucket/testdirectory/otherfile")); - } - - // Patterns that contain recursive wildcards ('**') are not supported. - @Test - public void testRecursiveGlobExpansionFails() throws IOException { - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("Unsupported wildcard usage"); - gcsFileSystem.expand(GcsPath.fromUri("gs://testbucket/test**")); - } - - @Test - public void testMatchNonGlobs() throws Exception { - List<StorageObjectOrIOException> items = new ArrayList<>(); - // Files within the directory - items.add(StorageObjectOrIOException.create( - createStorageObject("gs://testbucket/testdirectory/file1name", 1L /* fileSize */))); - items.add(StorageObjectOrIOException.create(new FileNotFoundException())); - items.add(StorageObjectOrIOException.create(new IOException())); - items.add(StorageObjectOrIOException.create( - createStorageObject("gs://testbucket/testdirectory/file4name", 4L /* fileSize */))); - - List<GcsPath> gcsPaths = ImmutableList.of( - GcsPath.fromUri("gs://testbucket/testdirectory/file1name"), - GcsPath.fromUri("gs://testbucket/testdirectory/file2name"), - GcsPath.fromUri("gs://testbucket/testdirectory/file3name"), - GcsPath.fromUri("gs://testbucket/testdirectory/file4name")); - - when(mockGcsUtil.getObjects(eq(gcsPaths))).thenReturn(items); - List<MatchResult> matchResults = gcsFileSystem.matchNonGlobs(gcsPaths); - - assertEquals(4, matchResults.size()); - assertThat( - ImmutableList.of("gs://testbucket/testdirectory/file1name"), - contains(toFilenames(matchResults.get(0)).toArray())); - assertEquals(Status.NOT_FOUND, matchResults.get(1).status()); - assertEquals(Status.ERROR, matchResults.get(2).status()); - assertThat( - ImmutableList.of("gs://testbucket/testdirectory/file4name"), - contains(toFilenames(matchResults.get(3)).toArray())); - } - - private StorageObject createStorageObject(String gcsFilename, long fileSize) { - GcsPath gcsPath = GcsPath.fromUri(gcsFilename); - return new StorageObject() - .setBucket(gcsPath.getBucket()) - .setName(gcsPath.getObject()) - .setSize(BigInteger.valueOf(fileSize)); - } - - private List<String> toFilenames(MatchResult matchResult) throws IOException { - return FluentIterable - .from(matchResult.metadata()) - .transform(new Function<Metadata, String>() { - @Override - public String apply(Metadata metadata) { - return ((GcsResourceId) metadata.resourceId()).getGcsPath().toString(); - }}) - .toList(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/b2a4ae2b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceIdTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceIdTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceIdTest.java deleted file mode 100644 index 702e754..0000000 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceIdTest.java +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.storage; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertTrue; - -import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; -import org.apache.beam.sdk.io.fs.ResourceId; -import org.apache.beam.sdk.util.gcsfs.GcsPath; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for {@link GcsResourceId}. - */ -@RunWith(JUnit4.class) -public class GcsResourceIdTest { - - @Rule - public ExpectedException thrown = ExpectedException.none(); - - @Test - public void testResolve() throws Exception { - // Tests for common gcs paths. - assertEquals( - toResourceIdentifier("gs://bucket/tmp/aa"), - toResourceIdentifier("gs://bucket/tmp/") - .resolve("aa", StandardResolveOptions.RESOLVE_FILE)); - assertEquals( - toResourceIdentifier("gs://bucket/tmp/aa/bb/cc/"), - toResourceIdentifier("gs://bucket/tmp/") - .resolve("aa", StandardResolveOptions.RESOLVE_DIRECTORY) - .resolve("bb", StandardResolveOptions.RESOLVE_DIRECTORY) - .resolve("cc", StandardResolveOptions.RESOLVE_DIRECTORY)); - - // Tests absolute path. - assertEquals( - toResourceIdentifier("gs://bucket/tmp/aa"), - toResourceIdentifier("gs://bucket/tmp/bb/") - .resolve("gs://bucket/tmp/aa", StandardResolveOptions.RESOLVE_FILE)); - - // Tests bucket with no ending '/'. - assertEquals( - toResourceIdentifier("gs://my_bucket/tmp"), - toResourceIdentifier("gs://my_bucket") - .resolve("tmp", StandardResolveOptions.RESOLVE_FILE)); - - // Tests path with unicode - assertEquals( - toResourceIdentifier("gs://bucket/è¾åº ç®å½/è¾åº æ件01.txt"), - toResourceIdentifier("gs://bucket/è¾åº ç®å½/") - .resolve("è¾åº æ件01.txt", StandardResolveOptions.RESOLVE_FILE)); - } - - @Test - public void testResolveHandleBadInputs() throws Exception { - assertEquals( - toResourceIdentifier("gs://my_bucket/tmp/"), - toResourceIdentifier("gs://my_bucket/") - .resolve("tmp/", StandardResolveOptions.RESOLVE_DIRECTORY)); - } - - @Test - public void testResolveInvalidInputs() throws Exception { - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("The resolved file: [tmp/] should not end with '/'."); - toResourceIdentifier("gs://my_bucket/").resolve("tmp/", StandardResolveOptions.RESOLVE_FILE); - } - - @Test - public void testResolveInvalidNotDirectory() throws Exception { - ResourceId tmpDir = toResourceIdentifier("gs://my_bucket/") - .resolve("tmp dir", StandardResolveOptions.RESOLVE_FILE); - - thrown.expect(IllegalStateException.class); - thrown.expectMessage("Expected the gcsPath is a directory, but had [gs://my_bucket/tmp dir]."); - tmpDir.resolve("aa", StandardResolveOptions.RESOLVE_FILE); - } - - @Test - public void testGetCurrentDirectory() throws Exception { - // Tests gcs paths. - assertEquals( - toResourceIdentifier("gs://my_bucket/tmp dir/"), - toResourceIdentifier("gs://my_bucket/tmp dir/").getCurrentDirectory()); - - // Tests path with unicode. - assertEquals( - toResourceIdentifier("gs://my_bucket/è¾åº ç®å½/"), - toResourceIdentifier("gs://my_bucket/è¾åº ç®å½/æ件01.txt").getCurrentDirectory()); - - // Tests bucket with no ending '/'. - assertEquals( - toResourceIdentifier("gs://my_bucket/"), - toResourceIdentifier("gs://my_bucket").getCurrentDirectory()); - } - - @Test - public void testIsDirectory() throws Exception { - assertTrue(toResourceIdentifier("gs://my_bucket/tmp dir/").isDirectory()); - assertTrue(toResourceIdentifier("gs://my_bucket/").isDirectory()); - assertTrue(toResourceIdentifier("gs://my_bucket").isDirectory()); - - assertFalse(toResourceIdentifier("gs://my_bucket/file").isDirectory()); - } - - @Test - public void testInvalidGcsPath() throws Exception { - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("Invalid GCS URI: gs://"); - toResourceIdentifier("gs://"); - } - - @Test - public void testGetScheme() throws Exception { - // Tests gcs paths. - assertEquals("gs", toResourceIdentifier("gs://my_bucket/tmp dir/").getScheme()); - - // Tests bucket with no ending '/'. - assertEquals("gs", toResourceIdentifier("gs://my_bucket").getScheme()); - } - - @Test - public void testEquals() throws Exception { - assertEquals( - toResourceIdentifier("gs://my_bucket/tmp/"), - toResourceIdentifier("gs://my_bucket/tmp/")); - - assertNotEquals( - toResourceIdentifier("gs://my_bucket/tmp"), - toResourceIdentifier("gs://my_bucket/tmp/")); - } - - @Test - public void testGetFilename() throws Exception { - assertEquals(toResourceIdentifier("gs://my_bucket/").getFilename(), null); - assertEquals(toResourceIdentifier("gs://my_bucket/abc").getFilename(), - "abc"); - assertEquals(toResourceIdentifier("gs://my_bucket/abc/").getFilename(), - "abc"); - assertEquals(toResourceIdentifier("gs://my_bucket/abc/xyz.txt").getFilename(), - "xyz.txt"); - } - - private GcsResourceId toResourceIdentifier(String str) throws Exception { - return GcsResourceId.fromGcsPath(GcsPath.fromUri(str)); - } -}