[ 
https://issues.apache.org/jira/browse/BEAM-4684?focusedWorklogId=144100&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-144100
 ]

ASF GitHub Bot logged work on BEAM-4684:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 13/Sep/18 22:23
            Start Date: 13/Sep/18 22:23
    Worklog Time Spent: 10m 
      Work Description: lukecwik closed pull request #6220: [BEAM-4684] Add 
integration test for support of @RequiresStableInput
URL: https://github.com/apache/beam/pull/6220
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/runners/google-cloud-dataflow-java/build.gradle 
b/runners/google-cloud-dataflow-java/build.gradle
index 9682e94604a..168ac9745fc 100644
--- a/runners/google-cloud-dataflow-java/build.gradle
+++ b/runners/google-cloud-dataflow-java/build.gradle
@@ -48,6 +48,7 @@ test {
 
 configurations {
   validatesRunner
+  coreSDKJavaIntegrationTest
   examplesJavaIntegrationTest
   googleCloudPlatformIntegrationTest
 }
@@ -89,6 +90,8 @@ dependencies {
   validatesRunner project(path: project.path, configuration: "shadow")
   validatesRunner library.java.hamcrest_core
   validatesRunner library.java.hamcrest_library
+  coreSDKJavaIntegrationTest project(path: project.path, configuration: 
"shadow")
+  coreSDKJavaIntegrationTest project(path: ":beam-sdks-java-core", 
configuration: "shadowTest")
   examplesJavaIntegrationTest project(path: project.path, configuration: 
"shadow")
   examplesJavaIntegrationTest project(path: ":beam-examples-java", 
configuration: "shadowTest")
   googleCloudPlatformIntegrationTest project(path: project.path, 
configuration: "shadow")
@@ -175,11 +178,29 @@ task examplesJavaIntegrationTest(type: Test) {
   useJUnit { }
 }
 
+task coreSDKJavaIntegrationTest(type: Test) {
+  group = "Verification"
+  def dataflowProject = project.findProperty('dataflowProject') ?: 
'apache-beam-testing'
+  def dataflowTempRoot = project.findProperty('dataflowTempRoot') ?: 
'gs://temp-storage-for-end-to-end-tests'
+  systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
+          "--runner=TestDataflowRunner",
+          "--project=${dataflowProject}",
+          "--tempRoot=${dataflowTempRoot}",
+  ])
+
+  include '**/*IT.class'
+  maxParallelForks 4
+  classpath = configurations.coreSDKJavaIntegrationTest
+  testClassesDirs = 
files(project(":beam-sdks-java-core").sourceSets.test.output.classesDirs)
+  useJUnit { }
+}
+
 task postCommit {
   group = "Verification"
   description = "Various integration tests using the Dataflow runner."
   dependsOn googleCloudPlatformIntegrationTest
   dependsOn examplesJavaIntegrationTest
+  dependsOn coreSDKJavaIntegrationTest
 }
 
 def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing'
diff --git a/sdks/java/core/build.gradle b/sdks/java/core/build.gradle
index b7c3659c196..3fb2d55c802 100644
--- a/sdks/java/core/build.gradle
+++ b/sdks/java/core/build.gradle
@@ -72,5 +72,7 @@ dependencies {
   shadowTest library.java.guava_testlib
   shadowTest library.java.slf4j_jdk14
   shadowTest library.java.mockito_core
+  shadowTest library.java.hamcrest_core
+  shadowTest library.java.hamcrest_library
   shadowTest "com.esotericsoftware.kryo:kryo:2.21"
 }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java
index e2755bd3386..0655c892112 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java
@@ -151,7 +151,7 @@ private String getActualChecksum() {
     return actualChecksum;
   }
 
-  private String computeHash(@Nonnull List<String> strs) {
+  private static String computeHash(@Nonnull List<String> strs) {
     if (strs.isEmpty()) {
       return Hashing.sha1().hashString("", StandardCharsets.UTF_8).toString();
     }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SerializableMatchers.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SerializableMatchers.java
index fd4aaa349c5..e99de1e79c4 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SerializableMatchers.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SerializableMatchers.java
@@ -54,7 +54,7 @@
  * iterable is undefined, use a matcher like {@code kv(equalTo("some key"), 
containsInAnyOrder(1, 2,
  * 3))}.
  */
-class SerializableMatchers implements Serializable {
+public class SerializableMatchers implements Serializable {
 
   // Serializable only because of capture by anonymous inner classes
   private SerializableMatchers() {} // not instantiable
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FilePatternMatchingShardedFile.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FilePatternMatchingShardedFile.java
new file mode 100644
index 00000000000..bbd674f0162
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FilePatternMatchingShardedFile.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.io.CharStreams;
+import java.io.IOException;
+import java.io.Reader;
+import java.nio.channels.Channels;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A sharded file which matches a given file pattern. Note that the file 
pattern must match at least
+ * one file.
+ *
+ * <p>Note that file matching should only occur once the file system is in a 
stable state and
+ * guaranteed to provide a consistent result during file pattern matching.
+ */
+public class FilePatternMatchingShardedFile implements ShardedFile {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(FilePatternMatchingShardedFile.class);
+
+  private static final int MAX_READ_RETRIES = 4;
+  private static final Duration DEFAULT_SLEEP_DURATION = 
Duration.standardSeconds(10L);
+  static final FluentBackoff BACK_OFF_FACTORY =
+      FluentBackoff.DEFAULT
+          .withInitialBackoff(DEFAULT_SLEEP_DURATION)
+          .withMaxRetries(MAX_READ_RETRIES);
+
+  private final String filePattern;
+
+  /**
+   * Constructs an {@link FilePatternMatchingShardedFile} for the given file 
pattern. Note that the
+   * file pattern must match at least one file.
+   *
+   * <p>Note that file matching should only occur once the file system is in a 
stable state and
+   * guaranteed to provide a consistent result during file pattern matching.
+   */
+  public FilePatternMatchingShardedFile(String filePattern) {
+    checkArgument(
+        !Strings.isNullOrEmpty(filePattern),
+        "Expected valid file path, but received %s",
+        filePattern);
+    this.filePattern = filePattern;
+  }
+
+  @Override
+  public List<String> readFilesWithRetries(Sleeper sleeper, BackOff backOff)
+      throws IOException, InterruptedException {
+    IOException lastException = null;
+
+    do {
+      try {
+        Collection<Metadata> files = FileSystems.match(filePattern).metadata();
+        LOG.debug(
+            "Found file(s) {} by matching the path: {}",
+            files
+                .stream()
+                .map(Metadata::resourceId)
+                .map(ResourceId::getFilename)
+                .collect(Collectors.joining(",")),
+            filePattern);
+        if (files.isEmpty()) {
+          continue;
+        }
+        // Read data from file paths
+        return readLines(files);
+      } catch (IOException e) {
+        // Ignore and retry
+        lastException = e;
+        LOG.warn("Error in file reading. Ignore and retry.");
+      }
+    } while (BackOffUtils.next(sleeper, backOff));
+    // Failed after max retries
+    throw new IOException(
+        String.format("Unable to read file(s) after retrying %d times", 
MAX_READ_RETRIES),
+        lastException);
+  }
+
+  /** Discovers all shards of this file using the provided {@link Sleeper} and 
{@link BackOff}. */
+  public List<String> readFilesWithRetries() throws IOException, 
InterruptedException {
+    return readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff());
+  }
+
+  @Override
+  public String toString() {
+    return String.format("sharded file matching pattern: %s", filePattern);
+  }
+
+  /**
+   * Reads all the lines of all the files.
+   *
+   * <p>Not suitable for use except in testing of small data, since the data 
size may be far more
+   * than can be reasonably processed serially, in-memory, by a single thread.
+   */
+  @VisibleForTesting
+  List<String> readLines(Collection<Metadata> files) throws IOException {
+    List<String> allLines = Lists.newArrayList();
+    int i = 1;
+    for (Metadata file : files) {
+      try (Reader reader =
+          Channels.newReader(FileSystems.open(file.resourceId()), 
StandardCharsets.UTF_8.name())) {
+        List<String> lines = CharStreams.readLines(reader);
+        allLines.addAll(lines);
+        LOG.debug("[{} of {}] Read {} lines from file: {}", i, files.size(), 
lines.size(), file);
+      }
+      i++;
+    }
+    return allLines;
+  }
+}
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java
index c5832ebaf68..d24080f21cd 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java
@@ -25,20 +25,16 @@
 import com.google.common.base.Strings;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
-import com.google.common.hash.HashCode;
-import com.google.common.hash.Hashing;
 import com.google.common.io.CharStreams;
 import java.io.IOException;
 import java.io.Reader;
 import java.nio.channels.Channels;
 import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
-import javax.annotation.Nonnull;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.joda.time.Duration;
@@ -204,16 +200,4 @@ boolean checkTotalNumOfFiles(Collection<Metadata> files) {
     }
     return false;
   }
-
-  private String computeHash(@Nonnull List<String> strs) {
-    if (strs.isEmpty()) {
-      return Hashing.sha1().hashString("", StandardCharsets.UTF_8).toString();
-    }
-
-    List<HashCode> hashCodes = new ArrayList<>();
-    for (String str : strs) {
-      hashCodes.add(Hashing.sha1().hashString(str, StandardCharsets.UTF_8));
-    }
-    return Hashing.combineUnordered(hashCodes).toString();
-  }
 }
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/RequiresStableInputIT.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/RequiresStableInputIT.java
new file mode 100644
index 00000000000..617a7b1428a
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/RequiresStableInputIT.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.nio.charset.StandardCharsets;
+import java.util.Date;
+import java.util.UUID;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.FileChecksumMatcher;
+import org.apache.beam.sdk.testing.SerializableMatchers;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.util.FilePatternMatchingShardedFile;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Integration test for the support of {@link
+ * org.apache.beam.sdk.transforms.DoFn.RequiresStableInput} annotation.
+ */
+@RunWith(JUnit4.class)
+public class RequiresStableInputIT {
+
+  private static final String VALUE = "value";
+  // SHA-1 hash of string "value"
+  private static final String VALUE_CHECKSUM = 
"f32b67c7e26342af42efabc674d441dca0a281c5";
+
+  private static class PairWithRandomKeyFn extends SimpleFunction<String, 
KV<String, String>> {
+    @Override
+    public KV<String, String> apply(String value) {
+      String key = UUID.randomUUID().toString();
+      return KV.of(key, value);
+    }
+  }
+
+  private static class MakeSideEffectAndThenFailFn extends DoFn<KV<String, 
String>, String> {
+    private final String outputPrefix;
+
+    private MakeSideEffectAndThenFailFn(String outputPrefix) {
+      this.outputPrefix = outputPrefix;
+    }
+
+    @RequiresStableInput
+    @ProcessElement
+    public void processElement(ProcessContext c) throws Exception {
+      MatchResult matchResult = FileSystems.match(outputPrefix + "*");
+      boolean firstTime = (matchResult.metadata().size() == 0);
+
+      KV<String, String> kv = c.element();
+      writeTextToFileSideEffect(kv.getValue(), outputPrefix + kv.getKey());
+      if (firstTime) {
+        throw new Exception(
+            "Deliberate failure: should happen only once for each application 
of the DoFn"
+                + "within the transform graph.");
+      }
+    }
+
+    private static void writeTextToFileSideEffect(String text, String 
filename) throws IOException {
+      ResourceId rid = FileSystems.matchNewResource(filename, false);
+      WritableByteChannel chan = FileSystems.create(rid, "text/plain");
+      chan.write(ByteBuffer.wrap(text.getBytes(StandardCharsets.UTF_8)));
+      chan.close();
+    }
+  }
+
+  @BeforeClass
+  public static void setup() {
+    PipelineOptionsFactory.register(TestPipelineOptions.class);
+  }
+
+  /**
+   * Test for the support of {@link 
org.apache.beam.sdk.transforms.DoFn.RequiresStableInput} in both
+   * {@link ParDo.SingleOutput} and {@link ParDo.MultiOutput}.
+   *
+   * <p>In each test, a singleton string value is paired with a random key. In 
the following
+   * transform, the value is written to a file, whose path is specified by the 
random key, and then
+   * the transform fails. When the pipeline retries, the latter transform 
should receive the same
+   * input from the former transform, because its {@link DoFn} is annotated 
with {@link
+   * org.apache.beam.sdk.transforms.DoFn.RequiresStableInput}, and it will not 
fail due to presence
+   * of the file. Therefore, only one file for each transform is expected.
+   */
+  @Test
+  public void testParDoRequiresStableInput() {
+    TestPipelineOptions options =
+        TestPipeline.testingPipelineOptions().as(TestPipelineOptions.class);
+
+    ResourceId outputDir =
+        FileSystems.matchNewResource(options.getTempRoot(), true)
+            .resolve(
+                String.format("requires-stable-input-%tF-%<tH-%<tM-%<tS-%<tL", 
new Date()),
+                StandardResolveOptions.RESOLVE_DIRECTORY);
+    String singleOutputPrefix =
+        outputDir
+            .resolve("pardo-single-output", 
StandardResolveOptions.RESOLVE_DIRECTORY)
+            .resolve("key-", StandardResolveOptions.RESOLVE_FILE)
+            .toString();
+    String multiOutputPrefix =
+        outputDir
+            .resolve("pardo-multi-output", 
StandardResolveOptions.RESOLVE_DIRECTORY)
+            .resolve("key-", StandardResolveOptions.RESOLVE_FILE)
+            .toString();
+
+    options.setOnSuccessMatcher(
+        SerializableMatchers.allOf(
+            new FileChecksumMatcher(
+                VALUE_CHECKSUM, new 
FilePatternMatchingShardedFile(singleOutputPrefix + "*")),
+            new FileChecksumMatcher(
+                VALUE_CHECKSUM, new 
FilePatternMatchingShardedFile(multiOutputPrefix + "*"))));
+
+    Pipeline p = Pipeline.create(options);
+
+    PCollection<String> singleton = p.apply("CreatePCollectionOfOneValue", 
Create.of(VALUE));
+    singleton
+        .apply("Single-PairWithRandomKey", MapElements.via(new 
PairWithRandomKeyFn()))
+        .apply(
+            "Single-MakeSideEffectAndThenFail",
+            ParDo.of(new MakeSideEffectAndThenFailFn(singleOutputPrefix)));
+    singleton
+        .apply("Multi-PairWithRandomKey", MapElements.via(new 
PairWithRandomKeyFn()))
+        .apply(
+            "Multi-MakeSideEffectAndThenFail",
+            ParDo.of(new MakeSideEffectAndThenFailFn(multiOutputPrefix))
+                .withOutputTags(new TupleTag<>(), TupleTagList.empty()));
+
+    p.run().waitUntilFinish();
+  }
+}
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FilePatternMatchingShardedFileTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FilePatternMatchingShardedFileTest.java
new file mode 100644
index 00000000000..064754f47c8
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FilePatternMatchingShardedFileTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.empty;
+import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+
+import com.google.common.io.Files;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import org.apache.beam.sdk.io.LocalResources;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link FilePatternMatchingShardedFile}. */
+@RunWith(JUnit4.class)
+public class FilePatternMatchingShardedFileTest {
+  @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
+  @Rule public ExpectedException thrown = ExpectedException.none();
+  private final Sleeper fastClock =
+      millis -> {
+        // No sleep.
+      };
+  private final BackOff backOff = 
FilePatternMatchingShardedFile.BACK_OFF_FACTORY.backoff();
+  private String filePattern;
+
+  @Before
+  public void setup() throws IOException {
+    filePattern =
+        LocalResources.fromFile(tmpFolder.getRoot(), true)
+            .resolve("*", StandardResolveOptions.RESOLVE_FILE)
+            .toString();
+  }
+
+  @Test
+  public void testPreconditionFilePathIsNull() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(containsString("Expected valid file path, but 
received"));
+    new FilePatternMatchingShardedFile(null);
+  }
+
+  @Test
+  public void testPreconditionFilePathIsEmpty() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(containsString("Expected valid file path, but 
received"));
+    new FilePatternMatchingShardedFile("");
+  }
+
+  @Test
+  public void testReadMultipleShards() throws Exception {
+    String contents1 = "To be or not to be, ",
+        contents2 = "it is not a question.",
+        contents3 = "should not be included";
+
+    File tmpFile1 = tmpFolder.newFile("result-000-of-002");
+    File tmpFile2 = tmpFolder.newFile("result-001-of-002");
+    File tmpFile3 = tmpFolder.newFile("tmp");
+    Files.write(contents1, tmpFile1, StandardCharsets.UTF_8);
+    Files.write(contents2, tmpFile2, StandardCharsets.UTF_8);
+    Files.write(contents3, tmpFile3, StandardCharsets.UTF_8);
+
+    filePattern =
+        LocalResources.fromFile(tmpFolder.getRoot(), true)
+            .resolve("result-*", StandardResolveOptions.RESOLVE_FILE)
+            .toString();
+    FilePatternMatchingShardedFile shardedFile = new 
FilePatternMatchingShardedFile(filePattern);
+
+    assertThat(shardedFile.readFilesWithRetries(), 
containsInAnyOrder(contents1, contents2));
+  }
+
+  @Test
+  public void testReadMultipleShardsWithoutShardNumber() throws Exception {
+    String contents1 = "To be or not to be, ";
+    String contents2 = "it is not a question.";
+
+    File tmpFile1 = tmpFolder.newFile("result");
+    File tmpFile2 = tmpFolder.newFile("tmp");
+    Files.write(contents1, tmpFile1, StandardCharsets.UTF_8);
+    Files.write(contents2, tmpFile2, StandardCharsets.UTF_8);
+
+    FilePatternMatchingShardedFile shardedFile = new 
FilePatternMatchingShardedFile(filePattern);
+
+    assertThat(shardedFile.readFilesWithRetries(), 
containsInAnyOrder(contents1, contents2));
+  }
+
+  @Test
+  public void testReadEmpty() throws Exception {
+    File emptyFile = tmpFolder.newFile("result-000-of-001");
+    Files.write("", emptyFile, StandardCharsets.UTF_8);
+    FilePatternMatchingShardedFile shardedFile = new 
FilePatternMatchingShardedFile(filePattern);
+
+    assertThat(shardedFile.readFilesWithRetries(), empty());
+  }
+
+  @Test
+  public void testReadWithRetriesFailsSinceFilesystemError() throws Exception {
+    File tmpFile = tmpFolder.newFile();
+    Files.write("Test for file checksum verifier.", tmpFile, 
StandardCharsets.UTF_8);
+    FilePatternMatchingShardedFile shardedFile =
+        spy(new FilePatternMatchingShardedFile(filePattern));
+    doThrow(IOException.class).when(shardedFile).readLines(anyCollection());
+
+    thrown.expect(IOException.class);
+    thrown.expectMessage(containsString("Unable to read file(s) after 
retrying"));
+    shardedFile.readFilesWithRetries(fastClock, backOff);
+  }
+
+  @Test
+  public void testReadWithRetriesFailsWhenOutputDirEmpty() throws Exception {
+    FilePatternMatchingShardedFile shardedFile = new 
FilePatternMatchingShardedFile(filePattern);
+
+    thrown.expect(IOException.class);
+    thrown.expectMessage(containsString("Unable to read file(s) after 
retrying"));
+    shardedFile.readFilesWithRetries(fastClock, backOff);
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 144100)
    Time Spent: 3.5h  (was: 3h 20m)

> Support @RequiresStableInput on Dataflow runner in Java SDK
> -----------------------------------------------------------
>
>                 Key: BEAM-4684
>                 URL: https://issues.apache.org/jira/browse/BEAM-4684
>             Project: Beam
>          Issue Type: New Feature
>          Components: runner-dataflow
>            Reporter: Yueyang Qiu
>            Assignee: Yueyang Qiu
>            Priority: Major
>          Time Spent: 3.5h
>  Remaining Estimate: 0h
>
> https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to