Remove IoChannelUtils from PackageUtil

* Staging location as a new directory

* Add GcsCreateOptions to override the default upload buffer size value


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a7d6ddc2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a7d6ddc2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a7d6ddc2

Branch: refs/heads/DSL_SQL
Commit: a7d6ddc2a669392fd808a24e31f7cd45742eaa43
Parents: 3b61f6a
Author: Vikas Kedigehalli <vika...@google.com>
Authored: Fri Apr 28 18:07:31 2017 -0700
Committer: Dan Halperin <dhalp...@google.com>
Committed: Wed May 3 08:09:13 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/dataflow/util/GcsStager.java   |  26 ++--
 .../beam/runners/dataflow/util/PackageUtil.java |  38 +++--
 .../runners/dataflow/DataflowRunnerTest.java    |  20 ++-
 .../runners/dataflow/util/PackageUtilTest.java  | 143 ++++++++++++-------
 .../org/apache/beam/sdk/io/FileSystems.java     |  28 ++--
 .../gcp/storage/GcsCreateOptions.java           |  56 ++++++++
 .../extensions/gcp/storage/GcsFileSystem.java   |   7 +-
 .../java/org/apache/beam/sdk/util/GcsUtil.java  |  12 +-
 8 files changed, 224 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a7d6ddc2/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
index 53822e3..d18e306 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java
@@ -22,14 +22,12 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.api.services.dataflow.model.DataflowPackage;
-import com.google.api.services.storage.Storage;
 import java.util.List;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.sdk.extensions.gcp.storage.GcsCreateOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.GcsUtil;
-import org.apache.beam.sdk.util.GcsUtil.GcsUtilFactory;
-import org.apache.beam.sdk.util.Transport;
+import org.apache.beam.sdk.util.MimeTypes;
 
 /**
  * Utility class for staging files to GCS.
@@ -49,22 +47,24 @@ public class GcsStager implements Stager {
   @Override
   public List<DataflowPackage> stageFiles() {
     checkNotNull(options.getStagingLocation());
-    List<String> filesToStage = options.getFilesToStage();
     String windmillBinary =
         
options.as(DataflowPipelineDebugOptions.class).getOverrideWindmillBinary();
     if (windmillBinary != null) {
-      filesToStage.add("windmill_main=" + windmillBinary);
+      options.getFilesToStage().add("windmill_main=" + windmillBinary);
     }
+
     int uploadSizeBytes = firstNonNull(options.getGcsUploadBufferSizeBytes(), 
1024 * 1024);
     checkArgument(uploadSizeBytes > 0, "gcsUploadBufferSizeBytes must be > 0");
     uploadSizeBytes = Math.min(uploadSizeBytes, 1024 * 1024);
-    Storage.Builder storageBuilder = Transport.newStorageClient(options);
-    GcsUtil util = GcsUtilFactory.create(
-        storageBuilder.build(),
-        storageBuilder.getHttpRequestInitializer(),
-        options.getExecutorService(),
-        uploadSizeBytes);
+
+    GcsCreateOptions createOptions = GcsCreateOptions.builder()
+        .setGcsUploadBufferSizeBytes(uploadSizeBytes)
+        .setMimeType(MimeTypes.BINARY)
+        .build();
+
     return PackageUtil.stageClasspathElements(
-        options.getFilesToStage(), options.getStagingLocation(), util);
+        options.getFilesToStage(),
+        options.getStagingLocation(),
+        createOptions);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a7d6ddc2/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
index 0d52c5d..5ddcd29 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java
@@ -51,14 +51,11 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.CreateOptions;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
 import org.apache.beam.sdk.util.FluentBackoff;
-import org.apache.beam.sdk.util.GcsIOChannelFactory;
-import org.apache.beam.sdk.util.GcsUtil;
-import org.apache.beam.sdk.util.IOChannelFactory;
-import org.apache.beam.sdk.util.IOChannelUtils;
-import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.beam.sdk.util.ZipFiles;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
 import org.joda.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -110,7 +107,8 @@ class PackageUtil {
 
       // Create the DataflowPackage with staging name and location.
       String uniqueName = getUniqueContentName(source, hash);
-      String resourcePath = IOChannelUtils.resolve(stagingPath, uniqueName);
+      String resourcePath = FileSystems.matchNewResource(stagingPath, true)
+          .resolve(uniqueName, StandardResolveOptions.RESOLVE_FILE).toString();
       DataflowPackage target = new DataflowPackage();
       target.setName(overridePackageName != null ? overridePackageName : 
uniqueName);
       target.setLocation(resourcePath);
@@ -181,14 +179,9 @@ class PackageUtil {
     }
   }
 
-  private static WritableByteChannel makeWriter(String target, GcsUtil gcsUtil)
+  private static WritableByteChannel makeWriter(String target, CreateOptions 
createOptions)
       throws IOException {
-    IOChannelFactory factory = IOChannelUtils.getFactory(target);
-    if (factory instanceof GcsIOChannelFactory) {
-      return gcsUtil.create(GcsPath.fromUri(target), MimeTypes.BINARY);
-    } else {
-      return factory.create(target, MimeTypes.BINARY);
-    }
+    return FileSystems.create(FileSystems.matchNewResource(target, false), 
createOptions);
   }
 
   /**
@@ -197,7 +190,7 @@ class PackageUtil {
    */
   private static void stageOnePackage(
       PackageAttributes attributes, AtomicInteger numUploaded, AtomicInteger 
numCached,
-      Sleeper retrySleeper, GcsUtil gcsUtil) {
+      Sleeper retrySleeper, CreateOptions createOptions) {
     String source = attributes.getSourcePath();
     String target = attributes.getDataflowPackage().getLocation();
 
@@ -205,7 +198,7 @@ class PackageUtil {
     // always using MimeTypes.BINARY?
     try {
       try {
-        long remoteLength = IOChannelUtils.getSizeBytes(target);
+        long remoteLength = 
FileSystems.matchSingleFileSpec(target).sizeBytes();
         if (remoteLength == attributes.getSize()) {
           LOG.debug("Skipping classpath element already staged: {} at {}",
               attributes.getSourcePath(), target);
@@ -221,7 +214,7 @@ class PackageUtil {
       while (true) {
         try {
           LOG.debug("Uploading classpath element {} to {}", source, target);
-          try (WritableByteChannel writer = makeWriter(target, gcsUtil)) {
+          try (WritableByteChannel writer = makeWriter(target, createOptions)) 
{
             copyContent(source, writer);
           }
           numUploaded.incrementAndGet();
@@ -262,12 +255,12 @@ class PackageUtil {
    * @return A list of cloud workflow packages, each representing a classpath 
element.
    */
   static List<DataflowPackage> stageClasspathElements(
-      Collection<String> classpathElements, String stagingPath, GcsUtil 
gcsUtil) {
+      Collection<String> classpathElements, String stagingPath, CreateOptions 
createOptions) {
     ListeningExecutorService executorService =
         MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(32));
     try {
-      return stageClasspathElements(
-          classpathElements, stagingPath, Sleeper.DEFAULT, executorService, 
gcsUtil);
+      return stageClasspathElements(classpathElements, stagingPath, 
Sleeper.DEFAULT,
+          executorService, createOptions);
     } finally {
       executorService.shutdown();
     }
@@ -276,7 +269,8 @@ class PackageUtil {
   // Visible for testing.
   static List<DataflowPackage> stageClasspathElements(
       Collection<String> classpathElements, final String stagingPath,
-      final Sleeper retrySleeper, ListeningExecutorService executorService, 
final GcsUtil gcsUtil) {
+      final Sleeper retrySleeper, ListeningExecutorService executorService,
+      final CreateOptions createOptions) {
     LOG.info("Uploading {} files from PipelineOptions.filesToStage to staging 
location to "
         + "prepare for execution.", classpathElements.size());
 
@@ -314,7 +308,7 @@ class PackageUtil {
       futures.add(executorService.submit(new Runnable() {
         @Override
         public void run() {
-          stageOnePackage(attributes, numUploaded, numCached, retrySleeper, 
gcsUtil);
+          stageOnePackage(attributes, numUploaded, numCached, retrySleeper, 
createOptions);
         }
       }));
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/a7d6ddc2/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index d011994..fa106ac 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -33,6 +33,8 @@ import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyListOf;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isA;
@@ -46,6 +48,7 @@ import 
com.google.api.services.dataflow.model.ListJobsResponse;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URL;
 import java.net.URLClassLoader;
@@ -449,8 +452,7 @@ public class DataflowRunnerTest {
 
   @Test
   public void testRunWithFiles() throws IOException {
-    // Test that the function DataflowRunner.stageFiles works as
-    // expected.
+    // Test that the function DataflowRunner.stageFiles works as expected.
     final String cloudDataflowDataset = "somedataset";
 
     // Create some temporary files.
@@ -461,6 +463,10 @@ public class DataflowRunnerTest {
 
     String overridePackageName = "alias.txt";
 
+    when(mockGcsUtil.getObjects(anyListOf(GcsPath.class)))
+        .thenReturn(ImmutableList.of(GcsUtil.StorageObjectOrIOException.create(
+            new FileNotFoundException("some/path"))));
+
     DataflowPipelineOptions options = 
PipelineOptionsFactory.as(DataflowPipelineOptions.class);
     options.setFilesToStage(ImmutableList.of(
         temp1.getAbsolutePath(),
@@ -475,6 +481,16 @@ public class DataflowRunnerTest {
     options.setGcsUtil(mockGcsUtil);
     options.setGcpCredential(new TestCredential());
 
+    when(mockGcsUtil.create(any(GcsPath.class), anyString(), anyInt()))
+        .then(new Answer<SeekableByteChannel>() {
+          @Override
+          public SeekableByteChannel answer(InvocationOnMock invocation) 
throws Throwable {
+            return FileChannel.open(
+                Files.createTempFile("channel-", ".tmp"),
+                StandardOpenOption.CREATE, StandardOpenOption.DELETE_ON_CLOSE);
+          }
+        });
+
     Pipeline p = buildDataflowPipeline(options);
 
     DataflowPipelineJob job = (DataflowPipelineJob) p.run();

http://git-wip-us.apache.org/repos/asf/beam/blob/a7d6ddc2/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
index 877832c..4ae3a77 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java
@@ -27,6 +27,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyListOf;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -49,6 +50,7 @@ import com.google.api.client.testing.http.MockHttpTransport;
 import com.google.api.client.testing.http.MockLowLevelHttpRequest;
 import com.google.api.client.testing.http.MockLowLevelHttpResponse;
 import com.google.api.services.dataflow.model.DataflowPackage;
+import com.google.api.services.storage.model.StorageObject;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
@@ -58,6 +60,7 @@ import com.google.common.util.concurrent.MoreExecutors;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.math.BigInteger;
 import java.nio.channels.Channels;
 import java.nio.channels.Pipe;
 import java.nio.channels.Pipe.SinkChannel;
@@ -70,12 +73,17 @@ import java.util.zip.ZipEntry;
 import java.util.zip.ZipInputStream;
 import org.apache.beam.runners.dataflow.util.PackageUtil.PackageAttributes;
 import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.CreateOptions;
+import org.apache.beam.sdk.io.fs.CreateOptions.StandardCreateOptions;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
 import org.apache.beam.sdk.testing.RegexMatcher;
 import org.apache.beam.sdk.util.GcsUtil;
-import org.apache.beam.sdk.util.IOChannelUtils;
+import org.apache.beam.sdk.util.GcsUtil.StorageObjectOrIOException;
+import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.beam.sdk.util.gcsfs.GcsPath;
 import org.hamcrest.Matchers;
 import org.junit.Before;
@@ -104,6 +112,7 @@ public class PackageUtilTest {
 
   // 128 bits, base64 encoded is 171 bits, rounds to 22 bytes
   private static final String HASH_PATTERN = "[a-zA-Z0-9+-]{22}";
+  private CreateOptions createOptions;
 
   @Before
   public void setUp() {
@@ -111,8 +120,8 @@ public class PackageUtilTest {
 
     GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class);
     pipelineOptions.setGcsUtil(mockGcsUtil);
-
-    IOChannelUtils.registerIOFactoriesAllowOverride(pipelineOptions);
+    FileSystems.setDefaultConfigInWorkers(pipelineOptions);
+    createOptions = 
StandardCreateOptions.builder().setMimeType(MimeTypes.BINARY).build();
   }
 
   private File makeFileWithContents(String name, String contents) throws 
Exception {
@@ -122,7 +131,8 @@ public class PackageUtilTest {
     return tmpFile;
   }
 
-  static final String STAGING_PATH = GcsPath.fromComponents("somebucket", 
"base/path").toString();
+  static final GcsPath STAGING_GCS_PATH = GcsPath.fromComponents("somebucket", 
"base/path/");
+  static final String STAGING_PATH = STAGING_GCS_PATH.toString();
   private static PackageAttributes makePackageAttributes(File file, String 
overridePackageName) {
     return PackageUtil.createPackageAttributes(file, STAGING_PATH, 
overridePackageName);
   }
@@ -135,7 +145,7 @@ public class PackageUtilTest {
     DataflowPackage target = attr.getDataflowPackage();
 
     assertThat(target.getName(), RegexMatcher.matches("file-" + HASH_PATTERN + 
".txt"));
-    assertThat(target.getLocation(), equalTo(STAGING_PATH + '/' + 
target.getName()));
+    assertThat(target.getLocation(), equalTo(STAGING_PATH + target.getName()));
     assertThat(attr.getSize(), equalTo((long) contents.length()));
   }
 
@@ -145,7 +155,7 @@ public class PackageUtilTest {
     DataflowPackage target = makePackageAttributes(tmpFile, 
null).getDataflowPackage();
 
     assertThat(target.getName(), RegexMatcher.matches("file-" + HASH_PATTERN));
-    assertThat(target.getLocation(), equalTo(STAGING_PATH + '/' + 
target.getName()));
+    assertThat(target.getLocation(), equalTo(STAGING_PATH + target.getName()));
   }
 
   @Test
@@ -154,7 +164,7 @@ public class PackageUtilTest {
     DataflowPackage target = makePackageAttributes(tmpDirectory, 
null).getDataflowPackage();
 
     assertThat(target.getName(), RegexMatcher.matches("folder-" + HASH_PATTERN 
+ ".jar"));
-    assertThat(target.getLocation(), equalTo(STAGING_PATH + '/' + 
target.getName()));
+    assertThat(target.getLocation(), equalTo(STAGING_PATH + target.getName()));
   }
 
   @Test
@@ -203,8 +213,10 @@ public class PackageUtilTest {
   @Test
   public void testPackageUploadWithLargeClasspathLogsWarning() throws 
Exception {
     File tmpFile = makeFileWithContents("file.txt", "This is a test!");
-    // all files will be present and cached so no upload needed.
-    
when(mockGcsUtil.fileSize(any(GcsPath.class))).thenReturn(tmpFile.length());
+    when(mockGcsUtil.getObjects(anyListOf(GcsPath.class))).thenReturn(
+        ImmutableList.of(StorageObjectOrIOException.create(
+            createStorageObject(STAGING_PATH, tmpFile.length())))
+    );
 
     List<String> classpathElements = Lists.newLinkedList();
     for (int i = 0; i < 1005; ++i) {
@@ -212,8 +224,7 @@ public class PackageUtilTest {
       classpathElements.add(eltName + '=' + tmpFile.getAbsolutePath());
     }
 
-    PackageUtil.stageClasspathElements(classpathElements, STAGING_PATH, 
mockGcsUtil);
-
+    PackageUtil.stageClasspathElements(classpathElements, STAGING_PATH, 
createOptions);
     logged.verifyWarn("Your classpath contains 1005 elements, which Google 
Cloud Dataflow");
   }
 
@@ -222,20 +233,22 @@ public class PackageUtilTest {
     Pipe pipe = Pipe.open();
     String contents = "This is a test!";
     File tmpFile = makeFileWithContents("file.txt", contents);
-    when(mockGcsUtil.fileSize(any(GcsPath.class)))
-        .thenThrow(new FileNotFoundException("some/path"));
+    when(mockGcsUtil.getObjects(anyListOf(GcsPath.class)))
+        .thenReturn(ImmutableList.of(StorageObjectOrIOException.create(
+            new FileNotFoundException("some/path"))));
+
     when(mockGcsUtil.create(any(GcsPath.class), 
anyString())).thenReturn(pipe.sink());
 
     List<DataflowPackage> targets = PackageUtil.stageClasspathElements(
-        ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH, 
mockGcsUtil);
+        ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH, 
createOptions);
     DataflowPackage target = Iterables.getOnlyElement(targets);
 
-    verify(mockGcsUtil).fileSize(any(GcsPath.class));
+    verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
     verify(mockGcsUtil).create(any(GcsPath.class), anyString());
     verifyNoMoreInteractions(mockGcsUtil);
 
     assertThat(target.getName(), RegexMatcher.matches("file-" + HASH_PATTERN + 
".txt"));
-    assertThat(target.getLocation(), equalTo(STAGING_PATH + '/' + 
target.getName()));
+    assertThat(target.getLocation(), equalTo(STAGING_PATH + target.getName()));
     assertThat(new LineReader(Channels.newReader(pipe.source(), 
"UTF-8")).readLine(),
         equalTo(contents));
   }
@@ -244,8 +257,10 @@ public class PackageUtilTest {
   public void testStagingPreservesClasspath() throws Exception {
     File smallFile = makeFileWithContents("small.txt", "small");
     File largeFile = makeFileWithContents("large.txt", "large contents");
-    when(mockGcsUtil.fileSize(any(GcsPath.class)))
-        .thenThrow(new FileNotFoundException("some/path"));
+    when(mockGcsUtil.getObjects(anyListOf(GcsPath.class)))
+        .thenReturn(ImmutableList.of(StorageObjectOrIOException.create(
+            new FileNotFoundException("some/path"))));
+
     when(mockGcsUtil.create(any(GcsPath.class), anyString()))
         .thenAnswer(new Answer<SinkChannel>() {
           @Override
@@ -256,7 +271,7 @@ public class PackageUtilTest {
 
     List<DataflowPackage> targets = PackageUtil.stageClasspathElements(
         ImmutableList.of(smallFile.getAbsolutePath(), 
largeFile.getAbsolutePath()),
-        STAGING_PATH, mockGcsUtil);
+        STAGING_PATH, createOptions);
     // Verify that the packages are returned small, then large, matching input 
order even though
     // the large file would be uploaded first.
     assertThat(targets.get(0).getName(), startsWith("small"));
@@ -272,14 +287,15 @@ public class PackageUtilTest {
     makeFileWithContents("folder/file.txt", "This is a test!");
     makeFileWithContents("folder/directory/file.txt", "This is also a test!");
 
-    when(mockGcsUtil.fileSize(any(GcsPath.class)))
-        .thenThrow(new FileNotFoundException("some/path"));
+    when(mockGcsUtil.getObjects(anyListOf(GcsPath.class)))
+        .thenReturn(ImmutableList.of(StorageObjectOrIOException.create(
+            new FileNotFoundException("some/path"))));
     when(mockGcsUtil.create(any(GcsPath.class), 
anyString())).thenReturn(pipe.sink());
 
     PackageUtil.stageClasspathElements(
-        ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH, 
mockGcsUtil);
+        ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH, 
createOptions);
 
-    verify(mockGcsUtil).fileSize(any(GcsPath.class));
+    verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
     verify(mockGcsUtil).create(any(GcsPath.class), anyString());
     verifyNoMoreInteractions(mockGcsUtil);
 
@@ -299,28 +315,30 @@ public class PackageUtilTest {
     Pipe pipe = Pipe.open();
     File tmpDirectory = tmpFolder.newFolder("folder");
 
-    when(mockGcsUtil.fileSize(any(GcsPath.class)))
-        .thenThrow(new FileNotFoundException("some/path"));
+    when(mockGcsUtil.getObjects(anyListOf(GcsPath.class)))
+        .thenReturn(ImmutableList.of(StorageObjectOrIOException.create(
+            new FileNotFoundException("some/path"))));
     when(mockGcsUtil.create(any(GcsPath.class), 
anyString())).thenReturn(pipe.sink());
 
     List<DataflowPackage> targets = PackageUtil.stageClasspathElements(
-        ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH, 
mockGcsUtil);
+        ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH, 
createOptions);
     DataflowPackage target = Iterables.getOnlyElement(targets);
 
-    verify(mockGcsUtil).fileSize(any(GcsPath.class));
+    verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
     verify(mockGcsUtil).create(any(GcsPath.class), anyString());
     verifyNoMoreInteractions(mockGcsUtil);
 
     assertThat(target.getName(), RegexMatcher.matches("folder-" + HASH_PATTERN 
+ ".jar"));
-    assertThat(target.getLocation(), equalTo(STAGING_PATH + '/' + 
target.getName()));
+    assertThat(target.getLocation(), equalTo(STAGING_PATH + target.getName()));
     assertNull(new 
ZipInputStream(Channels.newInputStream(pipe.source())).getNextEntry());
   }
 
   @Test(expected = RuntimeException.class)
   public void testPackageUploadFailsWhenIOExceptionThrown() throws Exception {
     File tmpFile = makeFileWithContents("file.txt", "This is a test!");
-    when(mockGcsUtil.fileSize(any(GcsPath.class)))
-        .thenThrow(new FileNotFoundException("some/path"));
+    when(mockGcsUtil.getObjects(anyListOf(GcsPath.class)))
+        .thenReturn(ImmutableList.of(StorageObjectOrIOException.create(
+            new FileNotFoundException("some/path"))));
     when(mockGcsUtil.create(any(GcsPath.class), anyString()))
         .thenThrow(new IOException("Fake Exception: Upload error"));
 
@@ -328,9 +346,9 @@ public class PackageUtilTest {
       PackageUtil.stageClasspathElements(
           ImmutableList.of(tmpFile.getAbsolutePath()),
           STAGING_PATH, fastNanoClockAndSleeper, 
MoreExecutors.newDirectExecutorService(),
-          mockGcsUtil);
+          createOptions);
     } finally {
-      verify(mockGcsUtil).fileSize(any(GcsPath.class));
+      verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
       verify(mockGcsUtil, times(5)).create(any(GcsPath.class), anyString());
       verifyNoMoreInteractions(mockGcsUtil);
     }
@@ -339,8 +357,9 @@ public class PackageUtilTest {
   @Test
   public void testPackageUploadFailsWithPermissionsErrorGivesDetailedMessage() 
throws Exception {
     File tmpFile = makeFileWithContents("file.txt", "This is a test!");
-    when(mockGcsUtil.fileSize(any(GcsPath.class)))
-        .thenThrow(new FileNotFoundException("some/path"));
+    when(mockGcsUtil.getObjects(anyListOf(GcsPath.class)))
+        .thenReturn(ImmutableList.of(StorageObjectOrIOException.create(
+            new FileNotFoundException("some/path"))));
     when(mockGcsUtil.create(any(GcsPath.class), anyString()))
         .thenThrow(new IOException("Failed to write to GCS path " + 
STAGING_PATH,
             googleJsonResponseException(
@@ -350,7 +369,7 @@ public class PackageUtilTest {
       PackageUtil.stageClasspathElements(
           ImmutableList.of(tmpFile.getAbsolutePath()),
           STAGING_PATH, fastNanoClockAndSleeper, 
MoreExecutors.newDirectExecutorService(),
-          mockGcsUtil);
+          createOptions);
       fail("Expected RuntimeException");
     } catch (RuntimeException e) {
       assertThat("Expected RuntimeException wrapping IOException.",
@@ -364,7 +383,7 @@ public class PackageUtilTest {
                   "Stale credentials can be resolved by executing 'gcloud auth 
application-default "
                       + "login'")));
     } finally {
-      verify(mockGcsUtil).fileSize(any(GcsPath.class));
+      verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
       verify(mockGcsUtil).create(any(GcsPath.class), anyString());
       verifyNoMoreInteractions(mockGcsUtil);
     }
@@ -374,8 +393,9 @@ public class PackageUtilTest {
   public void testPackageUploadEventuallySucceeds() throws Exception {
     Pipe pipe = Pipe.open();
     File tmpFile = makeFileWithContents("file.txt", "This is a test!");
-    when(mockGcsUtil.fileSize(any(GcsPath.class)))
-        .thenThrow(new FileNotFoundException("some/path"));
+    when(mockGcsUtil.getObjects(anyListOf(GcsPath.class)))
+        .thenReturn(ImmutableList.of(StorageObjectOrIOException.create(
+            new FileNotFoundException("some/path"))));
     when(mockGcsUtil.create(any(GcsPath.class), anyString()))
         .thenThrow(new IOException("Fake Exception: 410 Gone")) // First 
attempt fails
         .thenReturn(pipe.sink());                               // second 
attempt succeeds
@@ -383,9 +403,9 @@ public class PackageUtilTest {
     try {
       PackageUtil.stageClasspathElements(
           ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH, 
fastNanoClockAndSleeper,
-          MoreExecutors.newDirectExecutorService(), mockGcsUtil);
+          MoreExecutors.newDirectExecutorService(), createOptions);
     } finally {
-      verify(mockGcsUtil).fileSize(any(GcsPath.class));
+      verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
       verify(mockGcsUtil, times(2)).create(any(GcsPath.class), anyString());
       verifyNoMoreInteractions(mockGcsUtil);
     }
@@ -394,12 +414,14 @@ public class PackageUtilTest {
   @Test
   public void testPackageUploadIsSkippedWhenFileAlreadyExists() throws 
Exception {
     File tmpFile = makeFileWithContents("file.txt", "This is a test!");
-    
when(mockGcsUtil.fileSize(any(GcsPath.class))).thenReturn(tmpFile.length());
+    when(mockGcsUtil.getObjects(anyListOf(GcsPath.class)))
+        .thenReturn(ImmutableList.of(StorageObjectOrIOException.create(
+            createStorageObject(STAGING_PATH, tmpFile.length()))));
 
-    PackageUtil.stageClasspathElements(
-        ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH, 
mockGcsUtil);
+    
PackageUtil.stageClasspathElements(ImmutableList.of(tmpFile.getAbsolutePath()), 
STAGING_PATH,
+        createOptions);
 
-    verify(mockGcsUtil).fileSize(any(GcsPath.class));
+    verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
     verifyNoMoreInteractions(mockGcsUtil);
   }
 
@@ -411,13 +433,15 @@ public class PackageUtilTest {
     tmpFolder.newFolder("folder", "directory");
     makeFileWithContents("folder/file.txt", "This is a test!");
     makeFileWithContents("folder/directory/file.txt", "This is also a test!");
-    when(mockGcsUtil.fileSize(any(GcsPath.class))).thenReturn(Long.MAX_VALUE);
+    when(mockGcsUtil.getObjects(anyListOf(GcsPath.class)))
+        .thenReturn(ImmutableList.of(StorageObjectOrIOException.create(
+            createStorageObject(STAGING_PATH, Long.MAX_VALUE))));
     when(mockGcsUtil.create(any(GcsPath.class), 
anyString())).thenReturn(pipe.sink());
 
     PackageUtil.stageClasspathElements(
-        ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH, 
mockGcsUtil);
+        ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH, 
createOptions);
 
-    verify(mockGcsUtil).fileSize(any(GcsPath.class));
+    verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
     verify(mockGcsUtil).create(any(GcsPath.class), anyString());
     verifyNoMoreInteractions(mockGcsUtil);
   }
@@ -428,30 +452,31 @@ public class PackageUtilTest {
     File tmpFile = makeFileWithContents("file.txt", "This is a test!");
     final String overriddenName = "alias.txt";
 
-    when(mockGcsUtil.fileSize(any(GcsPath.class)))
-        .thenThrow(new FileNotFoundException("some/path"));
+    when(mockGcsUtil.getObjects(anyListOf(GcsPath.class)))
+        .thenReturn(ImmutableList.of(StorageObjectOrIOException.create(
+            new FileNotFoundException("some/path"))));
     when(mockGcsUtil.create(any(GcsPath.class), 
anyString())).thenReturn(pipe.sink());
 
     List<DataflowPackage> targets = PackageUtil.stageClasspathElements(
         ImmutableList.of(overriddenName + "=" + tmpFile.getAbsolutePath()), 
STAGING_PATH,
-        mockGcsUtil);
+        createOptions);
     DataflowPackage target = Iterables.getOnlyElement(targets);
 
-    verify(mockGcsUtil).fileSize(any(GcsPath.class));
+    verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class));
     verify(mockGcsUtil).create(any(GcsPath.class), anyString());
     verifyNoMoreInteractions(mockGcsUtil);
 
     assertThat(target.getName(), equalTo(overriddenName));
     assertThat(target.getLocation(),
-        RegexMatcher.matches(STAGING_PATH + "/file-" + HASH_PATTERN + ".txt"));
+        RegexMatcher.matches(STAGING_PATH + "file-" + HASH_PATTERN + ".txt"));
   }
 
   @Test
   public void testPackageUploadIsSkippedWithNonExistentResource() throws 
Exception {
-    String nonExistentFile =
-        IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), 
"non-existent-file");
+    String nonExistentFile = 
FileSystems.matchNewResource(tmpFolder.getRoot().getPath(), true)
+        .resolve("non-existent-file", 
StandardResolveOptions.RESOLVE_FILE).toString();
     assertEquals(Collections.EMPTY_LIST, PackageUtil.stageClasspathElements(
-        ImmutableList.of(nonExistentFile), STAGING_PATH, mockGcsUtil));
+        ImmutableList.of(nonExistentFile), STAGING_PATH, createOptions));
   }
 
   /**
@@ -485,4 +510,12 @@ public class PackageUtilTest {
     HttpResponse response = request.execute();
     return GoogleJsonResponseException.from(jsonFactory, response);
   }
+
+  private StorageObject createStorageObject(String gcsFilename, long fileSize) 
{
+    GcsPath gcsPath = GcsPath.fromUri(gcsFilename);
+    return new StorageObject()
+        .setBucket(gcsPath.getBucket())
+        .setName(gcsPath.getObject())
+        .setSize(BigInteger.valueOf(fileSize));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a7d6ddc2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
index e4f00ea..0110a0c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
@@ -32,6 +32,8 @@ import com.google.common.collect.Multimap;
 import com.google.common.collect.Ordering;
 import com.google.common.collect.Sets;
 import com.google.common.collect.TreeMultimap;
+
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
@@ -111,26 +113,30 @@ public class FileSystems {
    *
    * @param spec a resource specification that matches exactly one result.
    * @return the {@link Metadata} for the specified resource.
+   * @throws FileNotFoundException if the file resource is not found.
    * @throws IOException in the event of an error in the inner call to {@link 
#match},
    * or if the given spec does not match exactly 1 result.
    */
   public static Metadata matchSingleFileSpec(String spec) throws IOException {
     List<MatchResult> matches = 
FileSystems.match(Collections.singletonList(spec));
     MatchResult matchResult = Iterables.getOnlyElement(matches);
-    if (matchResult.status() != Status.OK) {
+    if (matchResult.status() == Status.NOT_FOUND) {
+      throw new FileNotFoundException(String.format("File spec %s not found", 
spec));
+    } else if (matchResult.status() != Status.OK) {
       throw new IOException(
           String.format("Error matching file spec %s: status %s", spec, 
matchResult.status()));
+    } else {
+      List<Metadata> metadata = matchResult.metadata();
+      if (metadata.size() != 1) {
+        throw new IOException(
+            String.format(
+                "Expecting spec %s to match exactly one file, but matched %s: 
%s",
+                spec,
+                metadata.size(),
+                metadata));
+      }
+      return metadata.get(0);
     }
-    List<Metadata> metadata = matchResult.metadata();
-    if (metadata.size() != 1) {
-      throw new IOException(
-        String.format(
-            "Expecting spec %s to match exactly one file, but matched %s: %s",
-            spec,
-            metadata.size(),
-            metadata));
-    }
-    return metadata.get(0);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/a7d6ddc2/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsCreateOptions.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsCreateOptions.java
 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsCreateOptions.java
new file mode 100644
index 0000000..dbfe960
--- /dev/null
+++ 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsCreateOptions.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.gcp.storage;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.fs.CreateOptions;
+
+/**
+ * An abstract class that contains common configuration options for creating 
resources.
+ */
+@AutoValue
+public abstract class GcsCreateOptions extends CreateOptions {
+
+  /**
+   * The buffer size (in bytes) to use when uploading files to GCS. Please see 
the documentation for
+   * {@link AbstractGoogleAsyncWriteChannel#setUploadBufferSize} for more 
information on the
+   * restrictions and performance implications of this value.
+   */
+  @Nullable
+  public abstract Integer gcsUploadBufferSizeBytes();
+
+  // TODO: Add other GCS options when needed.
+
+  /**
+   * Returns a {@link GcsCreateOptions.Builder}.
+   */
+  public static GcsCreateOptions.Builder builder() {
+    return new AutoValue_GcsCreateOptions.Builder();
+  }
+
+  /**
+   * A builder for {@link GcsCreateOptions}.
+   */
+  @AutoValue.Builder
+  public abstract static class Builder extends 
CreateOptions.Builder<GcsCreateOptions.Builder> {
+    public abstract GcsCreateOptions build();
+    public abstract GcsCreateOptions.Builder 
setGcsUploadBufferSizeBytes(@Nullable Integer bytes);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7d6ddc2/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java
 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java
index 69dd8fc..38b8347 100644
--- 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java
+++ 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java
@@ -102,7 +102,12 @@ class GcsFileSystem extends FileSystem<GcsResourceId> {
   @Override
   protected WritableByteChannel create(GcsResourceId resourceId, CreateOptions 
createOptions)
       throws IOException {
-    return options.getGcsUtil().create(resourceId.getGcsPath(), 
createOptions.mimeType());
+    if (createOptions instanceof GcsCreateOptions) {
+      return options.getGcsUtil().create(resourceId.getGcsPath(), 
createOptions.mimeType(),
+          ((GcsCreateOptions) createOptions).gcsUploadBufferSizeBytes());
+    } else {
+      return options.getGcsUtil().create(resourceId.getGcsPath(), 
createOptions.mimeType());
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/a7d6ddc2/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
index c8e6839..ee2e231 100644
--- 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
+++ 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
@@ -436,8 +436,16 @@ public class GcsUtil {
    * @param type the type of object, eg "text/plain".
    * @return a Callable object that encloses the operation.
    */
-  public WritableByteChannel create(GcsPath path,
-      String type) throws IOException {
+  public WritableByteChannel create(GcsPath path, String type) throws 
IOException {
+    return create(path, type, uploadBufferSizeBytes);
+  }
+
+  /**
+   * Same as {@link GcsUtil#create(GcsPath, String)} but allows overriding
+   * {code uploadBufferSizeBytes}.
+   */
+  public WritableByteChannel create(GcsPath path, String type, Integer 
uploadBufferSizeBytes)
+      throws IOException {
     GoogleCloudStorageWriteChannel channel = new 
GoogleCloudStorageWriteChannel(
         executorService,
         storageClient,

Reply via email to