This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e7369e  [BEAM-4684] Add integration test for support of 
@RequiresStableInput (#6220)
6e7369e is described below

commit 6e7369ee4dbc3e94be7261a5f0515266577772d4
Author: Yueyang Qiu <robiny...@gmail.com>
AuthorDate: Thu Sep 13 15:23:05 2018 -0700

    [BEAM-4684] Add integration test for support of @RequiresStableInput (#6220)
---
 runners/google-cloud-dataflow-java/build.gradle    |  21 +++
 sdks/java/core/build.gradle                        |   2 +
 .../beam/sdk/testing/FileChecksumMatcher.java      |   2 +-
 .../beam/sdk/testing/SerializableMatchers.java     |   2 +-
 .../sdk/util/FilePatternMatchingShardedFile.java   | 140 ++++++++++++++++++
 .../apache/beam/sdk/util/NumberedShardedFile.java  |  16 ---
 .../org/apache/beam/sdk/RequiresStableInputIT.java | 160 +++++++++++++++++++++
 .../util/FilePatternMatchingShardedFileTest.java   | 143 ++++++++++++++++++
 8 files changed, 468 insertions(+), 18 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/build.gradle 
b/runners/google-cloud-dataflow-java/build.gradle
index 9682e94..168ac97 100644
--- a/runners/google-cloud-dataflow-java/build.gradle
+++ b/runners/google-cloud-dataflow-java/build.gradle
@@ -48,6 +48,7 @@ test {
 
 configurations {
   validatesRunner
+  coreSDKJavaIntegrationTest
   examplesJavaIntegrationTest
   googleCloudPlatformIntegrationTest
 }
@@ -89,6 +90,8 @@ dependencies {
   validatesRunner project(path: project.path, configuration: "shadow")
   validatesRunner library.java.hamcrest_core
   validatesRunner library.java.hamcrest_library
+  coreSDKJavaIntegrationTest project(path: project.path, configuration: 
"shadow")
+  coreSDKJavaIntegrationTest project(path: ":beam-sdks-java-core", 
configuration: "shadowTest")
   examplesJavaIntegrationTest project(path: project.path, configuration: 
"shadow")
   examplesJavaIntegrationTest project(path: ":beam-examples-java", 
configuration: "shadowTest")
   googleCloudPlatformIntegrationTest project(path: project.path, 
configuration: "shadow")
@@ -175,11 +178,29 @@ task examplesJavaIntegrationTest(type: Test) {
   useJUnit { }
 }
 
+task coreSDKJavaIntegrationTest(type: Test) {
+  group = "Verification"
+  def dataflowProject = project.findProperty('dataflowProject') ?: 
'apache-beam-testing'
+  def dataflowTempRoot = project.findProperty('dataflowTempRoot') ?: 
'gs://temp-storage-for-end-to-end-tests'
+  systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
+          "--runner=TestDataflowRunner",
+          "--project=${dataflowProject}",
+          "--tempRoot=${dataflowTempRoot}",
+  ])
+
+  include '**/*IT.class'
+  maxParallelForks 4
+  classpath = configurations.coreSDKJavaIntegrationTest
+  testClassesDirs = 
files(project(":beam-sdks-java-core").sourceSets.test.output.classesDirs)
+  useJUnit { }
+}
+
 task postCommit {
   group = "Verification"
   description = "Various integration tests using the Dataflow runner."
   dependsOn googleCloudPlatformIntegrationTest
   dependsOn examplesJavaIntegrationTest
+  dependsOn coreSDKJavaIntegrationTest
 }
 
 def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing'
diff --git a/sdks/java/core/build.gradle b/sdks/java/core/build.gradle
index b7c3659..3fb2d55 100644
--- a/sdks/java/core/build.gradle
+++ b/sdks/java/core/build.gradle
@@ -72,5 +72,7 @@ dependencies {
   shadowTest library.java.guava_testlib
   shadowTest library.java.slf4j_jdk14
   shadowTest library.java.mockito_core
+  shadowTest library.java.hamcrest_core
+  shadowTest library.java.hamcrest_library
   shadowTest "com.esotericsoftware.kryo:kryo:2.21"
 }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java
index e2755bd..0655c89 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java
@@ -151,7 +151,7 @@ public class FileChecksumMatcher extends 
TypeSafeMatcher<PipelineResult>
     return actualChecksum;
   }
 
-  private String computeHash(@Nonnull List<String> strs) {
+  private static String computeHash(@Nonnull List<String> strs) {
     if (strs.isEmpty()) {
       return Hashing.sha1().hashString("", StandardCharsets.UTF_8).toString();
     }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SerializableMatchers.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SerializableMatchers.java
index fd4aaa3..e99de1e 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SerializableMatchers.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SerializableMatchers.java
@@ -54,7 +54,7 @@ import org.hamcrest.Matchers;
  * iterable is undefined, use a matcher like {@code kv(equalTo("some key"), 
containsInAnyOrder(1, 2,
  * 3))}.
  */
-class SerializableMatchers implements Serializable {
+public class SerializableMatchers implements Serializable {
 
   // Serializable only because of capture by anonymous inner classes
   private SerializableMatchers() {} // not instantiable
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FilePatternMatchingShardedFile.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FilePatternMatchingShardedFile.java
new file mode 100644
index 0000000..bbd674f
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FilePatternMatchingShardedFile.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.io.CharStreams;
+import java.io.IOException;
+import java.io.Reader;
+import java.nio.channels.Channels;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A sharded file which matches a given file pattern. Note that the file 
pattern must match at least
+ * one file.
+ *
+ * <p>Note that file matching should only occur once the file system is in a 
stable state and
+ * guaranteed to provide a consistent result during file pattern matching.
+ */
+public class FilePatternMatchingShardedFile implements ShardedFile {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(FilePatternMatchingShardedFile.class);
+
+  private static final int MAX_READ_RETRIES = 4;
+  private static final Duration DEFAULT_SLEEP_DURATION = 
Duration.standardSeconds(10L);
+  static final FluentBackoff BACK_OFF_FACTORY =
+      FluentBackoff.DEFAULT
+          .withInitialBackoff(DEFAULT_SLEEP_DURATION)
+          .withMaxRetries(MAX_READ_RETRIES);
+
+  private final String filePattern;
+
+  /**
+   * Constructs an {@link FilePatternMatchingShardedFile} for the given file 
pattern. Note that the
+   * file pattern must match at least one file.
+   *
+   * <p>Note that file matching should only occur once the file system is in a 
stable state and
+   * guaranteed to provide a consistent result during file pattern matching.
+   */
+  public FilePatternMatchingShardedFile(String filePattern) {
+    checkArgument(
+        !Strings.isNullOrEmpty(filePattern),
+        "Expected valid file path, but received %s",
+        filePattern);
+    this.filePattern = filePattern;
+  }
+
+  @Override
+  public List<String> readFilesWithRetries(Sleeper sleeper, BackOff backOff)
+      throws IOException, InterruptedException {
+    IOException lastException = null;
+
+    do {
+      try {
+        Collection<Metadata> files = FileSystems.match(filePattern).metadata();
+        LOG.debug(
+            "Found file(s) {} by matching the path: {}",
+            files
+                .stream()
+                .map(Metadata::resourceId)
+                .map(ResourceId::getFilename)
+                .collect(Collectors.joining(",")),
+            filePattern);
+        if (files.isEmpty()) {
+          continue;
+        }
+        // Read data from file paths
+        return readLines(files);
+      } catch (IOException e) {
+        // Ignore and retry
+        lastException = e;
+        LOG.warn("Error in file reading. Ignore and retry.");
+      }
+    } while (BackOffUtils.next(sleeper, backOff));
+    // Failed after max retries
+    throw new IOException(
+        String.format("Unable to read file(s) after retrying %d times", 
MAX_READ_RETRIES),
+        lastException);
+  }
+
+  /** Discovers all shards of this file using the provided {@link Sleeper} and 
{@link BackOff}. */
+  public List<String> readFilesWithRetries() throws IOException, 
InterruptedException {
+    return readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff());
+  }
+
+  @Override
+  public String toString() {
+    return String.format("sharded file matching pattern: %s", filePattern);
+  }
+
+  /**
+   * Reads all the lines of all the files.
+   *
+   * <p>Not suitable for use except in testing of small data, since the data 
size may be far more
+   * than can be reasonably processed serially, in-memory, by a single thread.
+   */
+  @VisibleForTesting
+  List<String> readLines(Collection<Metadata> files) throws IOException {
+    List<String> allLines = Lists.newArrayList();
+    int i = 1;
+    for (Metadata file : files) {
+      try (Reader reader =
+          Channels.newReader(FileSystems.open(file.resourceId()), 
StandardCharsets.UTF_8.name())) {
+        List<String> lines = CharStreams.readLines(reader);
+        allLines.addAll(lines);
+        LOG.debug("[{} of {}] Read {} lines from file: {}", i, files.size(), 
lines.size(), file);
+      }
+      i++;
+    }
+    return allLines;
+  }
+}
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java
index c5832eb..d24080f 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java
@@ -25,20 +25,16 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
-import com.google.common.hash.HashCode;
-import com.google.common.hash.Hashing;
 import com.google.common.io.CharStreams;
 import java.io.IOException;
 import java.io.Reader;
 import java.nio.channels.Channels;
 import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
-import javax.annotation.Nonnull;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.joda.time.Duration;
@@ -204,16 +200,4 @@ public class NumberedShardedFile implements ShardedFile {
     }
     return false;
   }
-
-  private String computeHash(@Nonnull List<String> strs) {
-    if (strs.isEmpty()) {
-      return Hashing.sha1().hashString("", StandardCharsets.UTF_8).toString();
-    }
-
-    List<HashCode> hashCodes = new ArrayList<>();
-    for (String str : strs) {
-      hashCodes.add(Hashing.sha1().hashString(str, StandardCharsets.UTF_8));
-    }
-    return Hashing.combineUnordered(hashCodes).toString();
-  }
 }
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/RequiresStableInputIT.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/RequiresStableInputIT.java
new file mode 100644
index 0000000..617a7b1
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/RequiresStableInputIT.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.nio.charset.StandardCharsets;
+import java.util.Date;
+import java.util.UUID;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.FileChecksumMatcher;
+import org.apache.beam.sdk.testing.SerializableMatchers;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.util.FilePatternMatchingShardedFile;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Integration test for the support of {@link
+ * org.apache.beam.sdk.transforms.DoFn.RequiresStableInput} annotation.
+ */
+@RunWith(JUnit4.class)
+public class RequiresStableInputIT {
+
+  private static final String VALUE = "value";
+  // SHA-1 hash of string "value"
+  private static final String VALUE_CHECKSUM = 
"f32b67c7e26342af42efabc674d441dca0a281c5";
+
+  private static class PairWithRandomKeyFn extends SimpleFunction<String, 
KV<String, String>> {
+    @Override
+    public KV<String, String> apply(String value) {
+      String key = UUID.randomUUID().toString();
+      return KV.of(key, value);
+    }
+  }
+
+  private static class MakeSideEffectAndThenFailFn extends DoFn<KV<String, 
String>, String> {
+    private final String outputPrefix;
+
+    private MakeSideEffectAndThenFailFn(String outputPrefix) {
+      this.outputPrefix = outputPrefix;
+    }
+
+    @RequiresStableInput
+    @ProcessElement
+    public void processElement(ProcessContext c) throws Exception {
+      MatchResult matchResult = FileSystems.match(outputPrefix + "*");
+      boolean firstTime = (matchResult.metadata().size() == 0);
+
+      KV<String, String> kv = c.element();
+      writeTextToFileSideEffect(kv.getValue(), outputPrefix + kv.getKey());
+      if (firstTime) {
+        throw new Exception(
+            "Deliberate failure: should happen only once for each application 
of the DoFn"
+                + "within the transform graph.");
+      }
+    }
+
+    private static void writeTextToFileSideEffect(String text, String 
filename) throws IOException {
+      ResourceId rid = FileSystems.matchNewResource(filename, false);
+      WritableByteChannel chan = FileSystems.create(rid, "text/plain");
+      chan.write(ByteBuffer.wrap(text.getBytes(StandardCharsets.UTF_8)));
+      chan.close();
+    }
+  }
+
+  @BeforeClass
+  public static void setup() {
+    PipelineOptionsFactory.register(TestPipelineOptions.class);
+  }
+
+  /**
+   * Test for the support of {@link 
org.apache.beam.sdk.transforms.DoFn.RequiresStableInput} in both
+   * {@link ParDo.SingleOutput} and {@link ParDo.MultiOutput}.
+   *
+   * <p>In each test, a singleton string value is paired with a random key. In 
the following
+   * transform, the value is written to a file, whose path is specified by the 
random key, and then
+   * the transform fails. When the pipeline retries, the latter transform 
should receive the same
+   * input from the former transform, because its {@link DoFn} is annotated 
with {@link
+   * org.apache.beam.sdk.transforms.DoFn.RequiresStableInput}, and it will not 
fail due to presence
+   * of the file. Therefore, only one file for each transform is expected.
+   */
+  @Test
+  public void testParDoRequiresStableInput() {
+    TestPipelineOptions options =
+        TestPipeline.testingPipelineOptions().as(TestPipelineOptions.class);
+
+    ResourceId outputDir =
+        FileSystems.matchNewResource(options.getTempRoot(), true)
+            .resolve(
+                String.format("requires-stable-input-%tF-%<tH-%<tM-%<tS-%<tL", 
new Date()),
+                StandardResolveOptions.RESOLVE_DIRECTORY);
+    String singleOutputPrefix =
+        outputDir
+            .resolve("pardo-single-output", 
StandardResolveOptions.RESOLVE_DIRECTORY)
+            .resolve("key-", StandardResolveOptions.RESOLVE_FILE)
+            .toString();
+    String multiOutputPrefix =
+        outputDir
+            .resolve("pardo-multi-output", 
StandardResolveOptions.RESOLVE_DIRECTORY)
+            .resolve("key-", StandardResolveOptions.RESOLVE_FILE)
+            .toString();
+
+    options.setOnSuccessMatcher(
+        SerializableMatchers.allOf(
+            new FileChecksumMatcher(
+                VALUE_CHECKSUM, new 
FilePatternMatchingShardedFile(singleOutputPrefix + "*")),
+            new FileChecksumMatcher(
+                VALUE_CHECKSUM, new 
FilePatternMatchingShardedFile(multiOutputPrefix + "*"))));
+
+    Pipeline p = Pipeline.create(options);
+
+    PCollection<String> singleton = p.apply("CreatePCollectionOfOneValue", 
Create.of(VALUE));
+    singleton
+        .apply("Single-PairWithRandomKey", MapElements.via(new 
PairWithRandomKeyFn()))
+        .apply(
+            "Single-MakeSideEffectAndThenFail",
+            ParDo.of(new MakeSideEffectAndThenFailFn(singleOutputPrefix)));
+    singleton
+        .apply("Multi-PairWithRandomKey", MapElements.via(new 
PairWithRandomKeyFn()))
+        .apply(
+            "Multi-MakeSideEffectAndThenFail",
+            ParDo.of(new MakeSideEffectAndThenFailFn(multiOutputPrefix))
+                .withOutputTags(new TupleTag<>(), TupleTagList.empty()));
+
+    p.run().waitUntilFinish();
+  }
+}
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FilePatternMatchingShardedFileTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FilePatternMatchingShardedFileTest.java
new file mode 100644
index 0000000..064754f
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FilePatternMatchingShardedFileTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.empty;
+import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+
+import com.google.common.io.Files;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import org.apache.beam.sdk.io.LocalResources;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link FilePatternMatchingShardedFile}. */
+@RunWith(JUnit4.class)
+public class FilePatternMatchingShardedFileTest {
+  @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
+  @Rule public ExpectedException thrown = ExpectedException.none();
+  private final Sleeper fastClock =
+      millis -> {
+        // No sleep.
+      };
+  private final BackOff backOff = 
FilePatternMatchingShardedFile.BACK_OFF_FACTORY.backoff();
+  private String filePattern;
+
+  @Before
+  public void setup() throws IOException {
+    filePattern =
+        LocalResources.fromFile(tmpFolder.getRoot(), true)
+            .resolve("*", StandardResolveOptions.RESOLVE_FILE)
+            .toString();
+  }
+
+  @Test
+  public void testPreconditionFilePathIsNull() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(containsString("Expected valid file path, but 
received"));
+    new FilePatternMatchingShardedFile(null);
+  }
+
+  @Test
+  public void testPreconditionFilePathIsEmpty() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(containsString("Expected valid file path, but 
received"));
+    new FilePatternMatchingShardedFile("");
+  }
+
+  @Test
+  public void testReadMultipleShards() throws Exception {
+    String contents1 = "To be or not to be, ",
+        contents2 = "it is not a question.",
+        contents3 = "should not be included";
+
+    File tmpFile1 = tmpFolder.newFile("result-000-of-002");
+    File tmpFile2 = tmpFolder.newFile("result-001-of-002");
+    File tmpFile3 = tmpFolder.newFile("tmp");
+    Files.write(contents1, tmpFile1, StandardCharsets.UTF_8);
+    Files.write(contents2, tmpFile2, StandardCharsets.UTF_8);
+    Files.write(contents3, tmpFile3, StandardCharsets.UTF_8);
+
+    filePattern =
+        LocalResources.fromFile(tmpFolder.getRoot(), true)
+            .resolve("result-*", StandardResolveOptions.RESOLVE_FILE)
+            .toString();
+    FilePatternMatchingShardedFile shardedFile = new 
FilePatternMatchingShardedFile(filePattern);
+
+    assertThat(shardedFile.readFilesWithRetries(), 
containsInAnyOrder(contents1, contents2));
+  }
+
+  @Test
+  public void testReadMultipleShardsWithoutShardNumber() throws Exception {
+    String contents1 = "To be or not to be, ";
+    String contents2 = "it is not a question.";
+
+    File tmpFile1 = tmpFolder.newFile("result");
+    File tmpFile2 = tmpFolder.newFile("tmp");
+    Files.write(contents1, tmpFile1, StandardCharsets.UTF_8);
+    Files.write(contents2, tmpFile2, StandardCharsets.UTF_8);
+
+    FilePatternMatchingShardedFile shardedFile = new 
FilePatternMatchingShardedFile(filePattern);
+
+    assertThat(shardedFile.readFilesWithRetries(), 
containsInAnyOrder(contents1, contents2));
+  }
+
+  @Test
+  public void testReadEmpty() throws Exception {
+    File emptyFile = tmpFolder.newFile("result-000-of-001");
+    Files.write("", emptyFile, StandardCharsets.UTF_8);
+    FilePatternMatchingShardedFile shardedFile = new 
FilePatternMatchingShardedFile(filePattern);
+
+    assertThat(shardedFile.readFilesWithRetries(), empty());
+  }
+
+  @Test
+  public void testReadWithRetriesFailsSinceFilesystemError() throws Exception {
+    File tmpFile = tmpFolder.newFile();
+    Files.write("Test for file checksum verifier.", tmpFile, 
StandardCharsets.UTF_8);
+    FilePatternMatchingShardedFile shardedFile =
+        spy(new FilePatternMatchingShardedFile(filePattern));
+    doThrow(IOException.class).when(shardedFile).readLines(anyCollection());
+
+    thrown.expect(IOException.class);
+    thrown.expectMessage(containsString("Unable to read file(s) after 
retrying"));
+    shardedFile.readFilesWithRetries(fastClock, backOff);
+  }
+
+  @Test
+  public void testReadWithRetriesFailsWhenOutputDirEmpty() throws Exception {
+    FilePatternMatchingShardedFile shardedFile = new 
FilePatternMatchingShardedFile(filePattern);
+
+    thrown.expect(IOException.class);
+    thrown.expectMessage(containsString("Unable to read file(s) after 
retrying"));
+    shardedFile.readFilesWithRetries(fastClock, backOff);
+  }
+}

Reply via email to