This is an automated email from the ASF dual-hosted git repository. lcwik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 6e7369e [BEAM-4684] Add integration test for support of @RequiresStableInput (#6220) 6e7369e is described below commit 6e7369ee4dbc3e94be7261a5f0515266577772d4 Author: Yueyang Qiu <robiny...@gmail.com> AuthorDate: Thu Sep 13 15:23:05 2018 -0700 [BEAM-4684] Add integration test for support of @RequiresStableInput (#6220) --- runners/google-cloud-dataflow-java/build.gradle | 21 +++ sdks/java/core/build.gradle | 2 + .../beam/sdk/testing/FileChecksumMatcher.java | 2 +- .../beam/sdk/testing/SerializableMatchers.java | 2 +- .../sdk/util/FilePatternMatchingShardedFile.java | 140 ++++++++++++++++++ .../apache/beam/sdk/util/NumberedShardedFile.java | 16 --- .../org/apache/beam/sdk/RequiresStableInputIT.java | 160 +++++++++++++++++++++ .../util/FilePatternMatchingShardedFileTest.java | 143 ++++++++++++++++++ 8 files changed, 468 insertions(+), 18 deletions(-) diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 9682e94..168ac97 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -48,6 +48,7 @@ test { configurations { validatesRunner + coreSDKJavaIntegrationTest examplesJavaIntegrationTest googleCloudPlatformIntegrationTest } @@ -89,6 +90,8 @@ dependencies { validatesRunner project(path: project.path, configuration: "shadow") validatesRunner library.java.hamcrest_core validatesRunner library.java.hamcrest_library + coreSDKJavaIntegrationTest project(path: project.path, configuration: "shadow") + coreSDKJavaIntegrationTest project(path: ":beam-sdks-java-core", configuration: "shadowTest") examplesJavaIntegrationTest project(path: project.path, configuration: "shadow") examplesJavaIntegrationTest project(path: ":beam-examples-java", configuration: "shadowTest") googleCloudPlatformIntegrationTest project(path: project.path, configuration: "shadow") @@ -175,11 +178,29 @@ task examplesJavaIntegrationTest(type: Test) { useJUnit { } } +task coreSDKJavaIntegrationTest(type: Test) { + group = "Verification" + def dataflowProject = project.findProperty('dataflowProject') ?: 'apache-beam-testing' + def dataflowTempRoot = project.findProperty('dataflowTempRoot') ?: 'gs://temp-storage-for-end-to-end-tests' + systemProperty "beamTestPipelineOptions", JsonOutput.toJson([ + "--runner=TestDataflowRunner", + "--project=${dataflowProject}", + "--tempRoot=${dataflowTempRoot}", + ]) + + include '**/*IT.class' + maxParallelForks 4 + classpath = configurations.coreSDKJavaIntegrationTest + testClassesDirs = files(project(":beam-sdks-java-core").sourceSets.test.output.classesDirs) + useJUnit { } +} + task postCommit { group = "Verification" description = "Various integration tests using the Dataflow runner." dependsOn googleCloudPlatformIntegrationTest dependsOn examplesJavaIntegrationTest + dependsOn coreSDKJavaIntegrationTest } def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing' diff --git a/sdks/java/core/build.gradle b/sdks/java/core/build.gradle index b7c3659..3fb2d55 100644 --- a/sdks/java/core/build.gradle +++ b/sdks/java/core/build.gradle @@ -72,5 +72,7 @@ dependencies { shadowTest library.java.guava_testlib shadowTest library.java.slf4j_jdk14 shadowTest library.java.mockito_core + shadowTest library.java.hamcrest_core + shadowTest library.java.hamcrest_library shadowTest "com.esotericsoftware.kryo:kryo:2.21" } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java index e2755bd..0655c89 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/FileChecksumMatcher.java @@ -151,7 +151,7 @@ public class FileChecksumMatcher extends TypeSafeMatcher<PipelineResult> return actualChecksum; } - private String computeHash(@Nonnull List<String> strs) { + private static String computeHash(@Nonnull List<String> strs) { if (strs.isEmpty()) { return Hashing.sha1().hashString("", StandardCharsets.UTF_8).toString(); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SerializableMatchers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SerializableMatchers.java index fd4aaa3..e99de1e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SerializableMatchers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SerializableMatchers.java @@ -54,7 +54,7 @@ import org.hamcrest.Matchers; * iterable is undefined, use a matcher like {@code kv(equalTo("some key"), containsInAnyOrder(1, 2, * 3))}. */ -class SerializableMatchers implements Serializable { +public class SerializableMatchers implements Serializable { // Serializable only because of capture by anonymous inner classes private SerializableMatchers() {} // not instantiable diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FilePatternMatchingShardedFile.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FilePatternMatchingShardedFile.java new file mode 100644 index 0000000..bbd674f --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FilePatternMatchingShardedFile.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.util; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.common.io.CharStreams; +import java.io.IOException; +import java.io.Reader; +import java.nio.channels.Channels; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.MatchResult.Metadata; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A sharded file which matches a given file pattern. Note that the file pattern must match at least + * one file. + * + * <p>Note that file matching should only occur once the file system is in a stable state and + * guaranteed to provide a consistent result during file pattern matching. + */ +public class FilePatternMatchingShardedFile implements ShardedFile { + + private static final Logger LOG = LoggerFactory.getLogger(FilePatternMatchingShardedFile.class); + + private static final int MAX_READ_RETRIES = 4; + private static final Duration DEFAULT_SLEEP_DURATION = Duration.standardSeconds(10L); + static final FluentBackoff BACK_OFF_FACTORY = + FluentBackoff.DEFAULT + .withInitialBackoff(DEFAULT_SLEEP_DURATION) + .withMaxRetries(MAX_READ_RETRIES); + + private final String filePattern; + + /** + * Constructs an {@link FilePatternMatchingShardedFile} for the given file pattern. Note that the + * file pattern must match at least one file. + * + * <p>Note that file matching should only occur once the file system is in a stable state and + * guaranteed to provide a consistent result during file pattern matching. + */ + public FilePatternMatchingShardedFile(String filePattern) { + checkArgument( + !Strings.isNullOrEmpty(filePattern), + "Expected valid file path, but received %s", + filePattern); + this.filePattern = filePattern; + } + + @Override + public List<String> readFilesWithRetries(Sleeper sleeper, BackOff backOff) + throws IOException, InterruptedException { + IOException lastException = null; + + do { + try { + Collection<Metadata> files = FileSystems.match(filePattern).metadata(); + LOG.debug( + "Found file(s) {} by matching the path: {}", + files + .stream() + .map(Metadata::resourceId) + .map(ResourceId::getFilename) + .collect(Collectors.joining(",")), + filePattern); + if (files.isEmpty()) { + continue; + } + // Read data from file paths + return readLines(files); + } catch (IOException e) { + // Ignore and retry + lastException = e; + LOG.warn("Error in file reading. Ignore and retry."); + } + } while (BackOffUtils.next(sleeper, backOff)); + // Failed after max retries + throw new IOException( + String.format("Unable to read file(s) after retrying %d times", MAX_READ_RETRIES), + lastException); + } + + /** Discovers all shards of this file using the provided {@link Sleeper} and {@link BackOff}. */ + public List<String> readFilesWithRetries() throws IOException, InterruptedException { + return readFilesWithRetries(Sleeper.DEFAULT, BACK_OFF_FACTORY.backoff()); + } + + @Override + public String toString() { + return String.format("sharded file matching pattern: %s", filePattern); + } + + /** + * Reads all the lines of all the files. + * + * <p>Not suitable for use except in testing of small data, since the data size may be far more + * than can be reasonably processed serially, in-memory, by a single thread. + */ + @VisibleForTesting + List<String> readLines(Collection<Metadata> files) throws IOException { + List<String> allLines = Lists.newArrayList(); + int i = 1; + for (Metadata file : files) { + try (Reader reader = + Channels.newReader(FileSystems.open(file.resourceId()), StandardCharsets.UTF_8.name())) { + List<String> lines = CharStreams.readLines(reader); + allLines.addAll(lines); + LOG.debug("[{} of {}] Read {} lines from file: {}", i, files.size(), lines.size(), file); + } + i++; + } + return allLines; + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java index c5832eb..d24080f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java @@ -25,20 +25,16 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; -import com.google.common.hash.HashCode; -import com.google.common.hash.Hashing; import com.google.common.io.CharStreams; import java.io.IOException; import java.io.Reader; import java.nio.channels.Channels; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; -import javax.annotation.Nonnull; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.fs.MatchResult.Metadata; import org.joda.time.Duration; @@ -204,16 +200,4 @@ public class NumberedShardedFile implements ShardedFile { } return false; } - - private String computeHash(@Nonnull List<String> strs) { - if (strs.isEmpty()) { - return Hashing.sha1().hashString("", StandardCharsets.UTF_8).toString(); - } - - List<HashCode> hashCodes = new ArrayList<>(); - for (String str : strs) { - hashCodes.add(Hashing.sha1().hashString(str, StandardCharsets.UTF_8)); - } - return Hashing.combineUnordered(hashCodes).toString(); - } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/RequiresStableInputIT.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/RequiresStableInputIT.java new file mode 100644 index 0000000..617a7b1 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/RequiresStableInputIT.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; +import java.nio.charset.StandardCharsets; +import java.util.Date; +import java.util.UUID; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.FileChecksumMatcher; +import org.apache.beam.sdk.testing.SerializableMatchers; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestPipelineOptions; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.util.FilePatternMatchingShardedFile; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Integration test for the support of {@link + * org.apache.beam.sdk.transforms.DoFn.RequiresStableInput} annotation. + */ +@RunWith(JUnit4.class) +public class RequiresStableInputIT { + + private static final String VALUE = "value"; + // SHA-1 hash of string "value" + private static final String VALUE_CHECKSUM = "f32b67c7e26342af42efabc674d441dca0a281c5"; + + private static class PairWithRandomKeyFn extends SimpleFunction<String, KV<String, String>> { + @Override + public KV<String, String> apply(String value) { + String key = UUID.randomUUID().toString(); + return KV.of(key, value); + } + } + + private static class MakeSideEffectAndThenFailFn extends DoFn<KV<String, String>, String> { + private final String outputPrefix; + + private MakeSideEffectAndThenFailFn(String outputPrefix) { + this.outputPrefix = outputPrefix; + } + + @RequiresStableInput + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + MatchResult matchResult = FileSystems.match(outputPrefix + "*"); + boolean firstTime = (matchResult.metadata().size() == 0); + + KV<String, String> kv = c.element(); + writeTextToFileSideEffect(kv.getValue(), outputPrefix + kv.getKey()); + if (firstTime) { + throw new Exception( + "Deliberate failure: should happen only once for each application of the DoFn" + + "within the transform graph."); + } + } + + private static void writeTextToFileSideEffect(String text, String filename) throws IOException { + ResourceId rid = FileSystems.matchNewResource(filename, false); + WritableByteChannel chan = FileSystems.create(rid, "text/plain"); + chan.write(ByteBuffer.wrap(text.getBytes(StandardCharsets.UTF_8))); + chan.close(); + } + } + + @BeforeClass + public static void setup() { + PipelineOptionsFactory.register(TestPipelineOptions.class); + } + + /** + * Test for the support of {@link org.apache.beam.sdk.transforms.DoFn.RequiresStableInput} in both + * {@link ParDo.SingleOutput} and {@link ParDo.MultiOutput}. + * + * <p>In each test, a singleton string value is paired with a random key. In the following + * transform, the value is written to a file, whose path is specified by the random key, and then + * the transform fails. When the pipeline retries, the latter transform should receive the same + * input from the former transform, because its {@link DoFn} is annotated with {@link + * org.apache.beam.sdk.transforms.DoFn.RequiresStableInput}, and it will not fail due to presence + * of the file. Therefore, only one file for each transform is expected. + */ + @Test + public void testParDoRequiresStableInput() { + TestPipelineOptions options = + TestPipeline.testingPipelineOptions().as(TestPipelineOptions.class); + + ResourceId outputDir = + FileSystems.matchNewResource(options.getTempRoot(), true) + .resolve( + String.format("requires-stable-input-%tF-%<tH-%<tM-%<tS-%<tL", new Date()), + StandardResolveOptions.RESOLVE_DIRECTORY); + String singleOutputPrefix = + outputDir + .resolve("pardo-single-output", StandardResolveOptions.RESOLVE_DIRECTORY) + .resolve("key-", StandardResolveOptions.RESOLVE_FILE) + .toString(); + String multiOutputPrefix = + outputDir + .resolve("pardo-multi-output", StandardResolveOptions.RESOLVE_DIRECTORY) + .resolve("key-", StandardResolveOptions.RESOLVE_FILE) + .toString(); + + options.setOnSuccessMatcher( + SerializableMatchers.allOf( + new FileChecksumMatcher( + VALUE_CHECKSUM, new FilePatternMatchingShardedFile(singleOutputPrefix + "*")), + new FileChecksumMatcher( + VALUE_CHECKSUM, new FilePatternMatchingShardedFile(multiOutputPrefix + "*")))); + + Pipeline p = Pipeline.create(options); + + PCollection<String> singleton = p.apply("CreatePCollectionOfOneValue", Create.of(VALUE)); + singleton + .apply("Single-PairWithRandomKey", MapElements.via(new PairWithRandomKeyFn())) + .apply( + "Single-MakeSideEffectAndThenFail", + ParDo.of(new MakeSideEffectAndThenFailFn(singleOutputPrefix))); + singleton + .apply("Multi-PairWithRandomKey", MapElements.via(new PairWithRandomKeyFn())) + .apply( + "Multi-MakeSideEffectAndThenFail", + ParDo.of(new MakeSideEffectAndThenFailFn(multiOutputPrefix)) + .withOutputTags(new TupleTag<>(), TupleTagList.empty())); + + p.run().waitUntilFinish(); + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FilePatternMatchingShardedFileTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FilePatternMatchingShardedFileTest.java new file mode 100644 index 0000000..064754f --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FilePatternMatchingShardedFileTest.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.empty; +import static org.mockito.Matchers.anyCollection; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.spy; + +import com.google.common.io.Files; +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import org.apache.beam.sdk.io.LocalResources; +import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link FilePatternMatchingShardedFile}. */ +@RunWith(JUnit4.class) +public class FilePatternMatchingShardedFileTest { + @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); + @Rule public ExpectedException thrown = ExpectedException.none(); + private final Sleeper fastClock = + millis -> { + // No sleep. + }; + private final BackOff backOff = FilePatternMatchingShardedFile.BACK_OFF_FACTORY.backoff(); + private String filePattern; + + @Before + public void setup() throws IOException { + filePattern = + LocalResources.fromFile(tmpFolder.getRoot(), true) + .resolve("*", StandardResolveOptions.RESOLVE_FILE) + .toString(); + } + + @Test + public void testPreconditionFilePathIsNull() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage(containsString("Expected valid file path, but received")); + new FilePatternMatchingShardedFile(null); + } + + @Test + public void testPreconditionFilePathIsEmpty() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage(containsString("Expected valid file path, but received")); + new FilePatternMatchingShardedFile(""); + } + + @Test + public void testReadMultipleShards() throws Exception { + String contents1 = "To be or not to be, ", + contents2 = "it is not a question.", + contents3 = "should not be included"; + + File tmpFile1 = tmpFolder.newFile("result-000-of-002"); + File tmpFile2 = tmpFolder.newFile("result-001-of-002"); + File tmpFile3 = tmpFolder.newFile("tmp"); + Files.write(contents1, tmpFile1, StandardCharsets.UTF_8); + Files.write(contents2, tmpFile2, StandardCharsets.UTF_8); + Files.write(contents3, tmpFile3, StandardCharsets.UTF_8); + + filePattern = + LocalResources.fromFile(tmpFolder.getRoot(), true) + .resolve("result-*", StandardResolveOptions.RESOLVE_FILE) + .toString(); + FilePatternMatchingShardedFile shardedFile = new FilePatternMatchingShardedFile(filePattern); + + assertThat(shardedFile.readFilesWithRetries(), containsInAnyOrder(contents1, contents2)); + } + + @Test + public void testReadMultipleShardsWithoutShardNumber() throws Exception { + String contents1 = "To be or not to be, "; + String contents2 = "it is not a question."; + + File tmpFile1 = tmpFolder.newFile("result"); + File tmpFile2 = tmpFolder.newFile("tmp"); + Files.write(contents1, tmpFile1, StandardCharsets.UTF_8); + Files.write(contents2, tmpFile2, StandardCharsets.UTF_8); + + FilePatternMatchingShardedFile shardedFile = new FilePatternMatchingShardedFile(filePattern); + + assertThat(shardedFile.readFilesWithRetries(), containsInAnyOrder(contents1, contents2)); + } + + @Test + public void testReadEmpty() throws Exception { + File emptyFile = tmpFolder.newFile("result-000-of-001"); + Files.write("", emptyFile, StandardCharsets.UTF_8); + FilePatternMatchingShardedFile shardedFile = new FilePatternMatchingShardedFile(filePattern); + + assertThat(shardedFile.readFilesWithRetries(), empty()); + } + + @Test + public void testReadWithRetriesFailsSinceFilesystemError() throws Exception { + File tmpFile = tmpFolder.newFile(); + Files.write("Test for file checksum verifier.", tmpFile, StandardCharsets.UTF_8); + FilePatternMatchingShardedFile shardedFile = + spy(new FilePatternMatchingShardedFile(filePattern)); + doThrow(IOException.class).when(shardedFile).readLines(anyCollection()); + + thrown.expect(IOException.class); + thrown.expectMessage(containsString("Unable to read file(s) after retrying")); + shardedFile.readFilesWithRetries(fastClock, backOff); + } + + @Test + public void testReadWithRetriesFailsWhenOutputDirEmpty() throws Exception { + FilePatternMatchingShardedFile shardedFile = new FilePatternMatchingShardedFile(filePattern); + + thrown.expect(IOException.class); + thrown.expectMessage(containsString("Unable to read file(s) after retrying")); + shardedFile.readFilesWithRetries(fastClock, backOff); + } +}