Repository: beam
Updated Branches:
  refs/heads/master ec1782c18 -> 5130cab4a


Fixes TextIO and AvroIO tests of watchForNewFiles

* AvroIO: Need to specify a trigger to make sure that files are really
generated continuously and testing of watchForNewFiles is non-vacuous.

* TextIO: files were generated by manual code,
and sometimes writing of a file could race with TextIO reading it, and
it might see the same file with two different sizes, and count it as two
different files (two Metadata objects for the same filename with
different sizes are not equal) and read the file twice.

It makes sense to address that separately: e.g. in the Watch transform
allow specifying a key extractor - but it's outside the scope of this
PR.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/35183c7d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/35183c7d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/35183c7d

Branch: refs/heads/master
Commit: 35183c7d77614f07ed2e643690e07f1b0741efcc
Parents: ec1782c
Author: Eugene Kirpichov <kirpic...@google.com>
Authored: Fri Oct 6 13:29:10 2017 -0700
Committer: Eugene Kirpichov <kirpic...@google.com>
Committed: Mon Oct 9 11:32:51 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/AvroIOTest.java | 21 +++++---
 .../org/apache/beam/sdk/io/TextIOReadTest.java  | 56 +++++++-------------
 2 files changed, 34 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/35183c7d/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 695e196..3976392 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -59,7 +59,6 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.reflect.Nullable;
 import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.DefaultCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -81,10 +80,12 @@ import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.Watch;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.values.PCollection;
@@ -307,25 +308,32 @@ public class AvroIOTest {
     for (int i = 0; i < 7; ++i) {
       (i < 3 ? firstValues : secondValues).add(mapFn.apply((long) i));
     }
-    writePipeline.apply(
+    // Configure windowing of the input so that it fires every time a new 
element is generated,
+    // so that files are written continuously.
+    Window<Long> window = 
Window.<Long>into(FixedWindows.of(Duration.millis(100)))
+        .withAllowedLateness(Duration.ZERO)
+        .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
+        .discardingFiredPanes();
+    readPipeline.apply(
             "Sequence first",
             GenerateSequence.from(0).to(3).withRate(1, Duration.millis(300)))
+        .apply("Window first", window)
         .apply("Map first", MapElements.via(mapFn))
         .apply(
             "Write first",
             AvroIO.write(GenericClass.class)
                 .to(tmpFolder.getRoot().getAbsolutePath() + "/first")
-                .withNumShards(2));
-    writePipeline.apply(
+                .withNumShards(2).withWindowedWrites());
+    readPipeline.apply(
             "Sequence second",
             GenerateSequence.from(3).to(7).withRate(1, Duration.millis(300)))
+        .apply("Window second", window)
         .apply("Map second", MapElements.via(mapFn))
         .apply(
             "Write second",
             AvroIO.write(GenericClass.class)
                 .to(tmpFolder.getRoot().getAbsolutePath() + "/second")
-                .withNumShards(3));
-    PipelineResult writeRes = writePipeline.run();
+                .withNumShards(3).withWindowedWrites());
 
     // Test read(), readAll(), parse(), and parseAllGenericRecords() with 
watchForNewFiles().
     PAssert.that(
@@ -374,7 +382,6 @@ public class AvroIOTest {
         .containsInAnyOrder(Iterables.concat(firstValues, secondValues));
 
     readPipeline.run();
-    writeRes.waitUntilFinish();
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/35183c7d/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
index f7bb12c..e4fca47 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
@@ -60,7 +60,6 @@ import java.util.Set;
 import java.util.zip.GZIPOutputStream;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipOutputStream;
-
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
@@ -74,9 +73,14 @@ import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.UsesSplittableParDo;
 import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.ToString;
 import org.apache.beam.sdk.transforms.Watch;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.values.PCollection;
 import 
org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
@@ -827,50 +831,30 @@ public class TextIOReadTest {
     public void testReadWatchForNewFiles() throws IOException, 
InterruptedException {
       final Path basePath = tempFolder.getRoot().toPath().resolve("readWatch");
       basePath.toFile().mkdir();
+
+      p.apply(GenerateSequence.from(0).to(10).withRate(1, 
Duration.millis(100)))
+          .apply(
+              Window.<Long>into(FixedWindows.of(Duration.millis(150)))
+                  .withAllowedLateness(Duration.ZERO)
+                  
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
+                  .discardingFiredPanes())
+          .apply(ToString.elements())
+          .apply(
+              TextIO.write()
+                  .to(basePath.resolve("data").toString())
+                  .withNumShards(1)
+                  .withWindowedWrites());
+
       PCollection<String> lines =
           p.apply(
               TextIO.read()
                   .from(basePath.resolve("*").toString())
-                  // Make sure that compression type propagates into readAll()
-                  .withCompression(ZIP)
                   .watchForNewFiles(
                       Duration.millis(100),
                       
Watch.Growth.<String>afterTimeSinceNewOutput(Duration.standardSeconds(3))));
 
-      Thread writer =
-        new Thread() {
-          @Override
-          public void run() {
-            try {
-              Thread.sleep(1000);
-              writeToFile(
-                Arrays.asList("a.1", "a.2"),
-                tempFolder,
-                basePath.resolve("fileA").toString(),
-                ZIP);
-              Thread.sleep(300);
-              writeToFile(
-                Arrays.asList("b.1", "b.2"),
-                tempFolder,
-                basePath.resolve("fileB").toString(),
-                ZIP);
-              Thread.sleep(300);
-              writeToFile(
-                Arrays.asList("c.1", "c.2"),
-                tempFolder,
-                basePath.resolve("fileC").toString(),
-                ZIP);
-            } catch (IOException | InterruptedException e) {
-              throw new RuntimeException(e);
-            }
-          }
-        };
-      writer.start();
-
-      PAssert.that(lines).containsInAnyOrder("a.1", "a.2", "b.1", "b.2", 
"c.1", "c.2");
+      PAssert.that(lines).containsInAnyOrder("0", "1", "2", "3", "4", "5", 
"6", "7", "8", "9");
       p.run();
-
-      writer.join();
     }
   }
 }

Reply via email to