Repository: beam Updated Branches: refs/heads/master ec1782c18 -> 5130cab4a
Fixes TextIO and AvroIO tests of watchForNewFiles * AvroIO: Need to specify a trigger to make sure that files are really generated continuously and testing of watchForNewFiles is non-vacuous. * TextIO: files were generated by manual code, and sometimes writing of a file could race with TextIO reading it, and it might see the same file with two different sizes, and count it as two different files (two Metadata objects for the same filename with different sizes are not equal) and read the file twice. It makes sense to address that separately: e.g. in the Watch transform allow specifying a key extractor - but it's outside the scope of this PR. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/35183c7d Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/35183c7d Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/35183c7d Branch: refs/heads/master Commit: 35183c7d77614f07ed2e643690e07f1b0741efcc Parents: ec1782c Author: Eugene Kirpichov <kirpic...@google.com> Authored: Fri Oct 6 13:29:10 2017 -0700 Committer: Eugene Kirpichov <kirpic...@google.com> Committed: Mon Oct 9 11:32:51 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/io/AvroIOTest.java | 21 +++++--- .../org/apache/beam/sdk/io/TextIOReadTest.java | 56 +++++++------------- 2 files changed, 34 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/35183c7d/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java index 695e196..3976392 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java @@ -59,7 +59,6 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.reflect.Nullable; import org.apache.avro.reflect.ReflectData; import org.apache.avro.reflect.ReflectDatumReader; -import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.DefaultCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -81,10 +80,12 @@ import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.Watch; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; +import org.apache.beam.sdk.transforms.windowing.AfterPane; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.Repeatedly; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.values.PCollection; @@ -307,25 +308,32 @@ public class AvroIOTest { for (int i = 0; i < 7; ++i) { (i < 3 ? firstValues : secondValues).add(mapFn.apply((long) i)); } - writePipeline.apply( + // Configure windowing of the input so that it fires every time a new element is generated, + // so that files are written continuously. + Window<Long> window = Window.<Long>into(FixedWindows.of(Duration.millis(100))) + .withAllowedLateness(Duration.ZERO) + .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) + .discardingFiredPanes(); + readPipeline.apply( "Sequence first", GenerateSequence.from(0).to(3).withRate(1, Duration.millis(300))) + .apply("Window first", window) .apply("Map first", MapElements.via(mapFn)) .apply( "Write first", AvroIO.write(GenericClass.class) .to(tmpFolder.getRoot().getAbsolutePath() + "/first") - .withNumShards(2)); - writePipeline.apply( + .withNumShards(2).withWindowedWrites()); + readPipeline.apply( "Sequence second", GenerateSequence.from(3).to(7).withRate(1, Duration.millis(300))) + .apply("Window second", window) .apply("Map second", MapElements.via(mapFn)) .apply( "Write second", AvroIO.write(GenericClass.class) .to(tmpFolder.getRoot().getAbsolutePath() + "/second") - .withNumShards(3)); - PipelineResult writeRes = writePipeline.run(); + .withNumShards(3).withWindowedWrites()); // Test read(), readAll(), parse(), and parseAllGenericRecords() with watchForNewFiles(). PAssert.that( @@ -374,7 +382,6 @@ public class AvroIOTest { .containsInAnyOrder(Iterables.concat(firstValues, secondValues)); readPipeline.run(); - writeRes.waitUntilFinish(); } @Test http://git-wip-us.apache.org/repos/asf/beam/blob/35183c7d/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java index f7bb12c..e4fca47 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java @@ -60,7 +60,6 @@ import java.util.Set; import java.util.zip.GZIPOutputStream; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; - import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.fs.EmptyMatchTreatment; @@ -74,9 +73,14 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.UsesSplittableParDo; import org.apache.beam.sdk.testing.ValidatesRunner; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.ToString; import org.apache.beam.sdk.transforms.Watch; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; +import org.apache.beam.sdk.transforms.windowing.AfterPane; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.Repeatedly; +import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.values.PCollection; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; @@ -827,50 +831,30 @@ public class TextIOReadTest { public void testReadWatchForNewFiles() throws IOException, InterruptedException { final Path basePath = tempFolder.getRoot().toPath().resolve("readWatch"); basePath.toFile().mkdir(); + + p.apply(GenerateSequence.from(0).to(10).withRate(1, Duration.millis(100))) + .apply( + Window.<Long>into(FixedWindows.of(Duration.millis(150))) + .withAllowedLateness(Duration.ZERO) + .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) + .discardingFiredPanes()) + .apply(ToString.elements()) + .apply( + TextIO.write() + .to(basePath.resolve("data").toString()) + .withNumShards(1) + .withWindowedWrites()); + PCollection<String> lines = p.apply( TextIO.read() .from(basePath.resolve("*").toString()) - // Make sure that compression type propagates into readAll() - .withCompression(ZIP) .watchForNewFiles( Duration.millis(100), Watch.Growth.<String>afterTimeSinceNewOutput(Duration.standardSeconds(3)))); - Thread writer = - new Thread() { - @Override - public void run() { - try { - Thread.sleep(1000); - writeToFile( - Arrays.asList("a.1", "a.2"), - tempFolder, - basePath.resolve("fileA").toString(), - ZIP); - Thread.sleep(300); - writeToFile( - Arrays.asList("b.1", "b.2"), - tempFolder, - basePath.resolve("fileB").toString(), - ZIP); - Thread.sleep(300); - writeToFile( - Arrays.asList("c.1", "c.2"), - tempFolder, - basePath.resolve("fileC").toString(), - ZIP); - } catch (IOException | InterruptedException e) { - throw new RuntimeException(e); - } - } - }; - writer.start(); - - PAssert.that(lines).containsInAnyOrder("a.1", "a.2", "b.1", "b.2", "c.1", "c.2"); + PAssert.that(lines).containsInAnyOrder("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"); p.run(); - - writer.join(); } } }