Remove IoChannelUtils from PackageUtil * Staging location as a new directory
* Add GcsCreateOptions to override the default upload buffer size value Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a7d6ddc2 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a7d6ddc2 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a7d6ddc2 Branch: refs/heads/DSL_SQL Commit: a7d6ddc2a669392fd808a24e31f7cd45742eaa43 Parents: 3b61f6a Author: Vikas Kedigehalli <vika...@google.com> Authored: Fri Apr 28 18:07:31 2017 -0700 Committer: Dan Halperin <dhalp...@google.com> Committed: Wed May 3 08:09:13 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/dataflow/util/GcsStager.java | 26 ++-- .../beam/runners/dataflow/util/PackageUtil.java | 38 +++-- .../runners/dataflow/DataflowRunnerTest.java | 20 ++- .../runners/dataflow/util/PackageUtilTest.java | 143 ++++++++++++------- .../org/apache/beam/sdk/io/FileSystems.java | 28 ++-- .../gcp/storage/GcsCreateOptions.java | 56 ++++++++ .../extensions/gcp/storage/GcsFileSystem.java | 7 +- .../java/org/apache/beam/sdk/util/GcsUtil.java | 12 +- 8 files changed, 224 insertions(+), 106 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/a7d6ddc2/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java index 53822e3..d18e306 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java @@ -22,14 +22,12 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import com.google.api.services.dataflow.model.DataflowPackage; -import com.google.api.services.storage.Storage; import java.util.List; import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; +import org.apache.beam.sdk.extensions.gcp.storage.GcsCreateOptions; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.util.GcsUtil; -import org.apache.beam.sdk.util.GcsUtil.GcsUtilFactory; -import org.apache.beam.sdk.util.Transport; +import org.apache.beam.sdk.util.MimeTypes; /** * Utility class for staging files to GCS. @@ -49,22 +47,24 @@ public class GcsStager implements Stager { @Override public List<DataflowPackage> stageFiles() { checkNotNull(options.getStagingLocation()); - List<String> filesToStage = options.getFilesToStage(); String windmillBinary = options.as(DataflowPipelineDebugOptions.class).getOverrideWindmillBinary(); if (windmillBinary != null) { - filesToStage.add("windmill_main=" + windmillBinary); + options.getFilesToStage().add("windmill_main=" + windmillBinary); } + int uploadSizeBytes = firstNonNull(options.getGcsUploadBufferSizeBytes(), 1024 * 1024); checkArgument(uploadSizeBytes > 0, "gcsUploadBufferSizeBytes must be > 0"); uploadSizeBytes = Math.min(uploadSizeBytes, 1024 * 1024); - Storage.Builder storageBuilder = Transport.newStorageClient(options); - GcsUtil util = GcsUtilFactory.create( - storageBuilder.build(), - storageBuilder.getHttpRequestInitializer(), - options.getExecutorService(), - uploadSizeBytes); + + GcsCreateOptions createOptions = GcsCreateOptions.builder() + .setGcsUploadBufferSizeBytes(uploadSizeBytes) + .setMimeType(MimeTypes.BINARY) + .build(); + return PackageUtil.stageClasspathElements( - options.getFilesToStage(), options.getStagingLocation(), util); + options.getFilesToStage(), + options.getStagingLocation(), + createOptions); } } http://git-wip-us.apache.org/repos/asf/beam/blob/a7d6ddc2/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java index 0d52c5d..5ddcd29 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java @@ -51,14 +51,11 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nullable; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.CreateOptions; +import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; import org.apache.beam.sdk.util.FluentBackoff; -import org.apache.beam.sdk.util.GcsIOChannelFactory; -import org.apache.beam.sdk.util.GcsUtil; -import org.apache.beam.sdk.util.IOChannelFactory; -import org.apache.beam.sdk.util.IOChannelUtils; -import org.apache.beam.sdk.util.MimeTypes; import org.apache.beam.sdk.util.ZipFiles; -import org.apache.beam.sdk.util.gcsfs.GcsPath; import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -110,7 +107,8 @@ class PackageUtil { // Create the DataflowPackage with staging name and location. String uniqueName = getUniqueContentName(source, hash); - String resourcePath = IOChannelUtils.resolve(stagingPath, uniqueName); + String resourcePath = FileSystems.matchNewResource(stagingPath, true) + .resolve(uniqueName, StandardResolveOptions.RESOLVE_FILE).toString(); DataflowPackage target = new DataflowPackage(); target.setName(overridePackageName != null ? overridePackageName : uniqueName); target.setLocation(resourcePath); @@ -181,14 +179,9 @@ class PackageUtil { } } - private static WritableByteChannel makeWriter(String target, GcsUtil gcsUtil) + private static WritableByteChannel makeWriter(String target, CreateOptions createOptions) throws IOException { - IOChannelFactory factory = IOChannelUtils.getFactory(target); - if (factory instanceof GcsIOChannelFactory) { - return gcsUtil.create(GcsPath.fromUri(target), MimeTypes.BINARY); - } else { - return factory.create(target, MimeTypes.BINARY); - } + return FileSystems.create(FileSystems.matchNewResource(target, false), createOptions); } /** @@ -197,7 +190,7 @@ class PackageUtil { */ private static void stageOnePackage( PackageAttributes attributes, AtomicInteger numUploaded, AtomicInteger numCached, - Sleeper retrySleeper, GcsUtil gcsUtil) { + Sleeper retrySleeper, CreateOptions createOptions) { String source = attributes.getSourcePath(); String target = attributes.getDataflowPackage().getLocation(); @@ -205,7 +198,7 @@ class PackageUtil { // always using MimeTypes.BINARY? try { try { - long remoteLength = IOChannelUtils.getSizeBytes(target); + long remoteLength = FileSystems.matchSingleFileSpec(target).sizeBytes(); if (remoteLength == attributes.getSize()) { LOG.debug("Skipping classpath element already staged: {} at {}", attributes.getSourcePath(), target); @@ -221,7 +214,7 @@ class PackageUtil { while (true) { try { LOG.debug("Uploading classpath element {} to {}", source, target); - try (WritableByteChannel writer = makeWriter(target, gcsUtil)) { + try (WritableByteChannel writer = makeWriter(target, createOptions)) { copyContent(source, writer); } numUploaded.incrementAndGet(); @@ -262,12 +255,12 @@ class PackageUtil { * @return A list of cloud workflow packages, each representing a classpath element. */ static List<DataflowPackage> stageClasspathElements( - Collection<String> classpathElements, String stagingPath, GcsUtil gcsUtil) { + Collection<String> classpathElements, String stagingPath, CreateOptions createOptions) { ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(32)); try { - return stageClasspathElements( - classpathElements, stagingPath, Sleeper.DEFAULT, executorService, gcsUtil); + return stageClasspathElements(classpathElements, stagingPath, Sleeper.DEFAULT, + executorService, createOptions); } finally { executorService.shutdown(); } @@ -276,7 +269,8 @@ class PackageUtil { // Visible for testing. static List<DataflowPackage> stageClasspathElements( Collection<String> classpathElements, final String stagingPath, - final Sleeper retrySleeper, ListeningExecutorService executorService, final GcsUtil gcsUtil) { + final Sleeper retrySleeper, ListeningExecutorService executorService, + final CreateOptions createOptions) { LOG.info("Uploading {} files from PipelineOptions.filesToStage to staging location to " + "prepare for execution.", classpathElements.size()); @@ -314,7 +308,7 @@ class PackageUtil { futures.add(executorService.submit(new Runnable() { @Override public void run() { - stageOnePackage(attributes, numUploaded, numCached, retrySleeper, gcsUtil); + stageOnePackage(attributes, numUploaded, numCached, retrySleeper, createOptions); } })); } http://git-wip-us.apache.org/repos/asf/beam/blob/a7d6ddc2/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index d011994..fa106ac 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -33,6 +33,8 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyListOf; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isA; @@ -46,6 +48,7 @@ import com.google.api.services.dataflow.model.ListJobsResponse; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import java.io.File; +import java.io.FileNotFoundException; import java.io.IOException; import java.net.URL; import java.net.URLClassLoader; @@ -449,8 +452,7 @@ public class DataflowRunnerTest { @Test public void testRunWithFiles() throws IOException { - // Test that the function DataflowRunner.stageFiles works as - // expected. + // Test that the function DataflowRunner.stageFiles works as expected. final String cloudDataflowDataset = "somedataset"; // Create some temporary files. @@ -461,6 +463,10 @@ public class DataflowRunnerTest { String overridePackageName = "alias.txt"; + when(mockGcsUtil.getObjects(anyListOf(GcsPath.class))) + .thenReturn(ImmutableList.of(GcsUtil.StorageObjectOrIOException.create( + new FileNotFoundException("some/path")))); + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); options.setFilesToStage(ImmutableList.of( temp1.getAbsolutePath(), @@ -475,6 +481,16 @@ public class DataflowRunnerTest { options.setGcsUtil(mockGcsUtil); options.setGcpCredential(new TestCredential()); + when(mockGcsUtil.create(any(GcsPath.class), anyString(), anyInt())) + .then(new Answer<SeekableByteChannel>() { + @Override + public SeekableByteChannel answer(InvocationOnMock invocation) throws Throwable { + return FileChannel.open( + Files.createTempFile("channel-", ".tmp"), + StandardOpenOption.CREATE, StandardOpenOption.DELETE_ON_CLOSE); + } + }); + Pipeline p = buildDataflowPipeline(options); DataflowPipelineJob job = (DataflowPipelineJob) p.run(); http://git-wip-us.apache.org/repos/asf/beam/blob/a7d6ddc2/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java index 877832c..4ae3a77 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java @@ -27,6 +27,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyListOf; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -49,6 +50,7 @@ import com.google.api.client.testing.http.MockHttpTransport; import com.google.api.client.testing.http.MockLowLevelHttpRequest; import com.google.api.client.testing.http.MockLowLevelHttpResponse; import com.google.api.services.dataflow.model.DataflowPackage; +import com.google.api.services.storage.model.StorageObject; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; @@ -58,6 +60,7 @@ import com.google.common.util.concurrent.MoreExecutors; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; +import java.math.BigInteger; import java.nio.channels.Channels; import java.nio.channels.Pipe; import java.nio.channels.Pipe.SinkChannel; @@ -70,12 +73,17 @@ import java.util.zip.ZipEntry; import java.util.zip.ZipInputStream; import org.apache.beam.runners.dataflow.util.PackageUtil.PackageAttributes; import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.CreateOptions; +import org.apache.beam.sdk.io.fs.CreateOptions.StandardCreateOptions; +import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.ExpectedLogs; import org.apache.beam.sdk.testing.FastNanoClockAndSleeper; import org.apache.beam.sdk.testing.RegexMatcher; import org.apache.beam.sdk.util.GcsUtil; -import org.apache.beam.sdk.util.IOChannelUtils; +import org.apache.beam.sdk.util.GcsUtil.StorageObjectOrIOException; +import org.apache.beam.sdk.util.MimeTypes; import org.apache.beam.sdk.util.gcsfs.GcsPath; import org.hamcrest.Matchers; import org.junit.Before; @@ -104,6 +112,7 @@ public class PackageUtilTest { // 128 bits, base64 encoded is 171 bits, rounds to 22 bytes private static final String HASH_PATTERN = "[a-zA-Z0-9+-]{22}"; + private CreateOptions createOptions; @Before public void setUp() { @@ -111,8 +120,8 @@ public class PackageUtilTest { GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class); pipelineOptions.setGcsUtil(mockGcsUtil); - - IOChannelUtils.registerIOFactoriesAllowOverride(pipelineOptions); + FileSystems.setDefaultConfigInWorkers(pipelineOptions); + createOptions = StandardCreateOptions.builder().setMimeType(MimeTypes.BINARY).build(); } private File makeFileWithContents(String name, String contents) throws Exception { @@ -122,7 +131,8 @@ public class PackageUtilTest { return tmpFile; } - static final String STAGING_PATH = GcsPath.fromComponents("somebucket", "base/path").toString(); + static final GcsPath STAGING_GCS_PATH = GcsPath.fromComponents("somebucket", "base/path/"); + static final String STAGING_PATH = STAGING_GCS_PATH.toString(); private static PackageAttributes makePackageAttributes(File file, String overridePackageName) { return PackageUtil.createPackageAttributes(file, STAGING_PATH, overridePackageName); } @@ -135,7 +145,7 @@ public class PackageUtilTest { DataflowPackage target = attr.getDataflowPackage(); assertThat(target.getName(), RegexMatcher.matches("file-" + HASH_PATTERN + ".txt")); - assertThat(target.getLocation(), equalTo(STAGING_PATH + '/' + target.getName())); + assertThat(target.getLocation(), equalTo(STAGING_PATH + target.getName())); assertThat(attr.getSize(), equalTo((long) contents.length())); } @@ -145,7 +155,7 @@ public class PackageUtilTest { DataflowPackage target = makePackageAttributes(tmpFile, null).getDataflowPackage(); assertThat(target.getName(), RegexMatcher.matches("file-" + HASH_PATTERN)); - assertThat(target.getLocation(), equalTo(STAGING_PATH + '/' + target.getName())); + assertThat(target.getLocation(), equalTo(STAGING_PATH + target.getName())); } @Test @@ -154,7 +164,7 @@ public class PackageUtilTest { DataflowPackage target = makePackageAttributes(tmpDirectory, null).getDataflowPackage(); assertThat(target.getName(), RegexMatcher.matches("folder-" + HASH_PATTERN + ".jar")); - assertThat(target.getLocation(), equalTo(STAGING_PATH + '/' + target.getName())); + assertThat(target.getLocation(), equalTo(STAGING_PATH + target.getName())); } @Test @@ -203,8 +213,10 @@ public class PackageUtilTest { @Test public void testPackageUploadWithLargeClasspathLogsWarning() throws Exception { File tmpFile = makeFileWithContents("file.txt", "This is a test!"); - // all files will be present and cached so no upload needed. - when(mockGcsUtil.fileSize(any(GcsPath.class))).thenReturn(tmpFile.length()); + when(mockGcsUtil.getObjects(anyListOf(GcsPath.class))).thenReturn( + ImmutableList.of(StorageObjectOrIOException.create( + createStorageObject(STAGING_PATH, tmpFile.length()))) + ); List<String> classpathElements = Lists.newLinkedList(); for (int i = 0; i < 1005; ++i) { @@ -212,8 +224,7 @@ public class PackageUtilTest { classpathElements.add(eltName + '=' + tmpFile.getAbsolutePath()); } - PackageUtil.stageClasspathElements(classpathElements, STAGING_PATH, mockGcsUtil); - + PackageUtil.stageClasspathElements(classpathElements, STAGING_PATH, createOptions); logged.verifyWarn("Your classpath contains 1005 elements, which Google Cloud Dataflow"); } @@ -222,20 +233,22 @@ public class PackageUtilTest { Pipe pipe = Pipe.open(); String contents = "This is a test!"; File tmpFile = makeFileWithContents("file.txt", contents); - when(mockGcsUtil.fileSize(any(GcsPath.class))) - .thenThrow(new FileNotFoundException("some/path")); + when(mockGcsUtil.getObjects(anyListOf(GcsPath.class))) + .thenReturn(ImmutableList.of(StorageObjectOrIOException.create( + new FileNotFoundException("some/path")))); + when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink()); List<DataflowPackage> targets = PackageUtil.stageClasspathElements( - ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH, mockGcsUtil); + ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH, createOptions); DataflowPackage target = Iterables.getOnlyElement(targets); - verify(mockGcsUtil).fileSize(any(GcsPath.class)); + verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class)); verify(mockGcsUtil).create(any(GcsPath.class), anyString()); verifyNoMoreInteractions(mockGcsUtil); assertThat(target.getName(), RegexMatcher.matches("file-" + HASH_PATTERN + ".txt")); - assertThat(target.getLocation(), equalTo(STAGING_PATH + '/' + target.getName())); + assertThat(target.getLocation(), equalTo(STAGING_PATH + target.getName())); assertThat(new LineReader(Channels.newReader(pipe.source(), "UTF-8")).readLine(), equalTo(contents)); } @@ -244,8 +257,10 @@ public class PackageUtilTest { public void testStagingPreservesClasspath() throws Exception { File smallFile = makeFileWithContents("small.txt", "small"); File largeFile = makeFileWithContents("large.txt", "large contents"); - when(mockGcsUtil.fileSize(any(GcsPath.class))) - .thenThrow(new FileNotFoundException("some/path")); + when(mockGcsUtil.getObjects(anyListOf(GcsPath.class))) + .thenReturn(ImmutableList.of(StorageObjectOrIOException.create( + new FileNotFoundException("some/path")))); + when(mockGcsUtil.create(any(GcsPath.class), anyString())) .thenAnswer(new Answer<SinkChannel>() { @Override @@ -256,7 +271,7 @@ public class PackageUtilTest { List<DataflowPackage> targets = PackageUtil.stageClasspathElements( ImmutableList.of(smallFile.getAbsolutePath(), largeFile.getAbsolutePath()), - STAGING_PATH, mockGcsUtil); + STAGING_PATH, createOptions); // Verify that the packages are returned small, then large, matching input order even though // the large file would be uploaded first. assertThat(targets.get(0).getName(), startsWith("small")); @@ -272,14 +287,15 @@ public class PackageUtilTest { makeFileWithContents("folder/file.txt", "This is a test!"); makeFileWithContents("folder/directory/file.txt", "This is also a test!"); - when(mockGcsUtil.fileSize(any(GcsPath.class))) - .thenThrow(new FileNotFoundException("some/path")); + when(mockGcsUtil.getObjects(anyListOf(GcsPath.class))) + .thenReturn(ImmutableList.of(StorageObjectOrIOException.create( + new FileNotFoundException("some/path")))); when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink()); PackageUtil.stageClasspathElements( - ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH, mockGcsUtil); + ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH, createOptions); - verify(mockGcsUtil).fileSize(any(GcsPath.class)); + verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class)); verify(mockGcsUtil).create(any(GcsPath.class), anyString()); verifyNoMoreInteractions(mockGcsUtil); @@ -299,28 +315,30 @@ public class PackageUtilTest { Pipe pipe = Pipe.open(); File tmpDirectory = tmpFolder.newFolder("folder"); - when(mockGcsUtil.fileSize(any(GcsPath.class))) - .thenThrow(new FileNotFoundException("some/path")); + when(mockGcsUtil.getObjects(anyListOf(GcsPath.class))) + .thenReturn(ImmutableList.of(StorageObjectOrIOException.create( + new FileNotFoundException("some/path")))); when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink()); List<DataflowPackage> targets = PackageUtil.stageClasspathElements( - ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH, mockGcsUtil); + ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH, createOptions); DataflowPackage target = Iterables.getOnlyElement(targets); - verify(mockGcsUtil).fileSize(any(GcsPath.class)); + verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class)); verify(mockGcsUtil).create(any(GcsPath.class), anyString()); verifyNoMoreInteractions(mockGcsUtil); assertThat(target.getName(), RegexMatcher.matches("folder-" + HASH_PATTERN + ".jar")); - assertThat(target.getLocation(), equalTo(STAGING_PATH + '/' + target.getName())); + assertThat(target.getLocation(), equalTo(STAGING_PATH + target.getName())); assertNull(new ZipInputStream(Channels.newInputStream(pipe.source())).getNextEntry()); } @Test(expected = RuntimeException.class) public void testPackageUploadFailsWhenIOExceptionThrown() throws Exception { File tmpFile = makeFileWithContents("file.txt", "This is a test!"); - when(mockGcsUtil.fileSize(any(GcsPath.class))) - .thenThrow(new FileNotFoundException("some/path")); + when(mockGcsUtil.getObjects(anyListOf(GcsPath.class))) + .thenReturn(ImmutableList.of(StorageObjectOrIOException.create( + new FileNotFoundException("some/path")))); when(mockGcsUtil.create(any(GcsPath.class), anyString())) .thenThrow(new IOException("Fake Exception: Upload error")); @@ -328,9 +346,9 @@ public class PackageUtilTest { PackageUtil.stageClasspathElements( ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH, fastNanoClockAndSleeper, MoreExecutors.newDirectExecutorService(), - mockGcsUtil); + createOptions); } finally { - verify(mockGcsUtil).fileSize(any(GcsPath.class)); + verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class)); verify(mockGcsUtil, times(5)).create(any(GcsPath.class), anyString()); verifyNoMoreInteractions(mockGcsUtil); } @@ -339,8 +357,9 @@ public class PackageUtilTest { @Test public void testPackageUploadFailsWithPermissionsErrorGivesDetailedMessage() throws Exception { File tmpFile = makeFileWithContents("file.txt", "This is a test!"); - when(mockGcsUtil.fileSize(any(GcsPath.class))) - .thenThrow(new FileNotFoundException("some/path")); + when(mockGcsUtil.getObjects(anyListOf(GcsPath.class))) + .thenReturn(ImmutableList.of(StorageObjectOrIOException.create( + new FileNotFoundException("some/path")))); when(mockGcsUtil.create(any(GcsPath.class), anyString())) .thenThrow(new IOException("Failed to write to GCS path " + STAGING_PATH, googleJsonResponseException( @@ -350,7 +369,7 @@ public class PackageUtilTest { PackageUtil.stageClasspathElements( ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH, fastNanoClockAndSleeper, MoreExecutors.newDirectExecutorService(), - mockGcsUtil); + createOptions); fail("Expected RuntimeException"); } catch (RuntimeException e) { assertThat("Expected RuntimeException wrapping IOException.", @@ -364,7 +383,7 @@ public class PackageUtilTest { "Stale credentials can be resolved by executing 'gcloud auth application-default " + "login'"))); } finally { - verify(mockGcsUtil).fileSize(any(GcsPath.class)); + verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class)); verify(mockGcsUtil).create(any(GcsPath.class), anyString()); verifyNoMoreInteractions(mockGcsUtil); } @@ -374,8 +393,9 @@ public class PackageUtilTest { public void testPackageUploadEventuallySucceeds() throws Exception { Pipe pipe = Pipe.open(); File tmpFile = makeFileWithContents("file.txt", "This is a test!"); - when(mockGcsUtil.fileSize(any(GcsPath.class))) - .thenThrow(new FileNotFoundException("some/path")); + when(mockGcsUtil.getObjects(anyListOf(GcsPath.class))) + .thenReturn(ImmutableList.of(StorageObjectOrIOException.create( + new FileNotFoundException("some/path")))); when(mockGcsUtil.create(any(GcsPath.class), anyString())) .thenThrow(new IOException("Fake Exception: 410 Gone")) // First attempt fails .thenReturn(pipe.sink()); // second attempt succeeds @@ -383,9 +403,9 @@ public class PackageUtilTest { try { PackageUtil.stageClasspathElements( ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH, fastNanoClockAndSleeper, - MoreExecutors.newDirectExecutorService(), mockGcsUtil); + MoreExecutors.newDirectExecutorService(), createOptions); } finally { - verify(mockGcsUtil).fileSize(any(GcsPath.class)); + verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class)); verify(mockGcsUtil, times(2)).create(any(GcsPath.class), anyString()); verifyNoMoreInteractions(mockGcsUtil); } @@ -394,12 +414,14 @@ public class PackageUtilTest { @Test public void testPackageUploadIsSkippedWhenFileAlreadyExists() throws Exception { File tmpFile = makeFileWithContents("file.txt", "This is a test!"); - when(mockGcsUtil.fileSize(any(GcsPath.class))).thenReturn(tmpFile.length()); + when(mockGcsUtil.getObjects(anyListOf(GcsPath.class))) + .thenReturn(ImmutableList.of(StorageObjectOrIOException.create( + createStorageObject(STAGING_PATH, tmpFile.length())))); - PackageUtil.stageClasspathElements( - ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH, mockGcsUtil); + PackageUtil.stageClasspathElements(ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH, + createOptions); - verify(mockGcsUtil).fileSize(any(GcsPath.class)); + verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class)); verifyNoMoreInteractions(mockGcsUtil); } @@ -411,13 +433,15 @@ public class PackageUtilTest { tmpFolder.newFolder("folder", "directory"); makeFileWithContents("folder/file.txt", "This is a test!"); makeFileWithContents("folder/directory/file.txt", "This is also a test!"); - when(mockGcsUtil.fileSize(any(GcsPath.class))).thenReturn(Long.MAX_VALUE); + when(mockGcsUtil.getObjects(anyListOf(GcsPath.class))) + .thenReturn(ImmutableList.of(StorageObjectOrIOException.create( + createStorageObject(STAGING_PATH, Long.MAX_VALUE)))); when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink()); PackageUtil.stageClasspathElements( - ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH, mockGcsUtil); + ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH, createOptions); - verify(mockGcsUtil).fileSize(any(GcsPath.class)); + verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class)); verify(mockGcsUtil).create(any(GcsPath.class), anyString()); verifyNoMoreInteractions(mockGcsUtil); } @@ -428,30 +452,31 @@ public class PackageUtilTest { File tmpFile = makeFileWithContents("file.txt", "This is a test!"); final String overriddenName = "alias.txt"; - when(mockGcsUtil.fileSize(any(GcsPath.class))) - .thenThrow(new FileNotFoundException("some/path")); + when(mockGcsUtil.getObjects(anyListOf(GcsPath.class))) + .thenReturn(ImmutableList.of(StorageObjectOrIOException.create( + new FileNotFoundException("some/path")))); when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink()); List<DataflowPackage> targets = PackageUtil.stageClasspathElements( ImmutableList.of(overriddenName + "=" + tmpFile.getAbsolutePath()), STAGING_PATH, - mockGcsUtil); + createOptions); DataflowPackage target = Iterables.getOnlyElement(targets); - verify(mockGcsUtil).fileSize(any(GcsPath.class)); + verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class)); verify(mockGcsUtil).create(any(GcsPath.class), anyString()); verifyNoMoreInteractions(mockGcsUtil); assertThat(target.getName(), equalTo(overriddenName)); assertThat(target.getLocation(), - RegexMatcher.matches(STAGING_PATH + "/file-" + HASH_PATTERN + ".txt")); + RegexMatcher.matches(STAGING_PATH + "file-" + HASH_PATTERN + ".txt")); } @Test public void testPackageUploadIsSkippedWithNonExistentResource() throws Exception { - String nonExistentFile = - IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "non-existent-file"); + String nonExistentFile = FileSystems.matchNewResource(tmpFolder.getRoot().getPath(), true) + .resolve("non-existent-file", StandardResolveOptions.RESOLVE_FILE).toString(); assertEquals(Collections.EMPTY_LIST, PackageUtil.stageClasspathElements( - ImmutableList.of(nonExistentFile), STAGING_PATH, mockGcsUtil)); + ImmutableList.of(nonExistentFile), STAGING_PATH, createOptions)); } /** @@ -485,4 +510,12 @@ public class PackageUtilTest { HttpResponse response = request.execute(); return GoogleJsonResponseException.from(jsonFactory, response); } + + private StorageObject createStorageObject(String gcsFilename, long fileSize) { + GcsPath gcsPath = GcsPath.fromUri(gcsFilename); + return new StorageObject() + .setBucket(gcsPath.getBucket()) + .setName(gcsPath.getObject()) + .setSize(BigInteger.valueOf(fileSize)); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/a7d6ddc2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java index e4f00ea..0110a0c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java @@ -32,6 +32,8 @@ import com.google.common.collect.Multimap; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import com.google.common.collect.TreeMultimap; + +import java.io.FileNotFoundException; import java.io.IOException; import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; @@ -111,26 +113,30 @@ public class FileSystems { * * @param spec a resource specification that matches exactly one result. * @return the {@link Metadata} for the specified resource. + * @throws FileNotFoundException if the file resource is not found. * @throws IOException in the event of an error in the inner call to {@link #match}, * or if the given spec does not match exactly 1 result. */ public static Metadata matchSingleFileSpec(String spec) throws IOException { List<MatchResult> matches = FileSystems.match(Collections.singletonList(spec)); MatchResult matchResult = Iterables.getOnlyElement(matches); - if (matchResult.status() != Status.OK) { + if (matchResult.status() == Status.NOT_FOUND) { + throw new FileNotFoundException(String.format("File spec %s not found", spec)); + } else if (matchResult.status() != Status.OK) { throw new IOException( String.format("Error matching file spec %s: status %s", spec, matchResult.status())); + } else { + List<Metadata> metadata = matchResult.metadata(); + if (metadata.size() != 1) { + throw new IOException( + String.format( + "Expecting spec %s to match exactly one file, but matched %s: %s", + spec, + metadata.size(), + metadata)); + } + return metadata.get(0); } - List<Metadata> metadata = matchResult.metadata(); - if (metadata.size() != 1) { - throw new IOException( - String.format( - "Expecting spec %s to match exactly one file, but matched %s: %s", - spec, - metadata.size(), - metadata)); - } - return metadata.get(0); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/a7d6ddc2/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsCreateOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsCreateOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsCreateOptions.java new file mode 100644 index 0000000..dbfe960 --- /dev/null +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsCreateOptions.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.gcp.storage; + +import com.google.auto.value.AutoValue; +import com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel; +import javax.annotation.Nullable; +import org.apache.beam.sdk.io.fs.CreateOptions; + +/** + * An abstract class that contains common configuration options for creating resources. + */ +@AutoValue +public abstract class GcsCreateOptions extends CreateOptions { + + /** + * The buffer size (in bytes) to use when uploading files to GCS. Please see the documentation for + * {@link AbstractGoogleAsyncWriteChannel#setUploadBufferSize} for more information on the + * restrictions and performance implications of this value. + */ + @Nullable + public abstract Integer gcsUploadBufferSizeBytes(); + + // TODO: Add other GCS options when needed. + + /** + * Returns a {@link GcsCreateOptions.Builder}. + */ + public static GcsCreateOptions.Builder builder() { + return new AutoValue_GcsCreateOptions.Builder(); + } + + /** + * A builder for {@link GcsCreateOptions}. + */ + @AutoValue.Builder + public abstract static class Builder extends CreateOptions.Builder<GcsCreateOptions.Builder> { + public abstract GcsCreateOptions build(); + public abstract GcsCreateOptions.Builder setGcsUploadBufferSizeBytes(@Nullable Integer bytes); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/a7d6ddc2/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java index 69dd8fc..38b8347 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java @@ -102,7 +102,12 @@ class GcsFileSystem extends FileSystem<GcsResourceId> { @Override protected WritableByteChannel create(GcsResourceId resourceId, CreateOptions createOptions) throws IOException { - return options.getGcsUtil().create(resourceId.getGcsPath(), createOptions.mimeType()); + if (createOptions instanceof GcsCreateOptions) { + return options.getGcsUtil().create(resourceId.getGcsPath(), createOptions.mimeType(), + ((GcsCreateOptions) createOptions).gcsUploadBufferSizeBytes()); + } else { + return options.getGcsUtil().create(resourceId.getGcsPath(), createOptions.mimeType()); + } } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/a7d6ddc2/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java index c8e6839..ee2e231 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java @@ -436,8 +436,16 @@ public class GcsUtil { * @param type the type of object, eg "text/plain". * @return a Callable object that encloses the operation. */ - public WritableByteChannel create(GcsPath path, - String type) throws IOException { + public WritableByteChannel create(GcsPath path, String type) throws IOException { + return create(path, type, uploadBufferSizeBytes); + } + + /** + * Same as {@link GcsUtil#create(GcsPath, String)} but allows overriding + * {code uploadBufferSizeBytes}. + */ + public WritableByteChannel create(GcsPath path, String type, Integer uploadBufferSizeBytes) + throws IOException { GoogleCloudStorageWriteChannel channel = new GoogleCloudStorageWriteChannel( executorService, storageClient,