chamikaramj commented on a change in pull request #4149: [BEAM-3060] Add 
Compressed TextIOIT
URL: https://github.com/apache/beam/pull/4149#discussion_r152900718
 
 

 ##########
 File path: 
sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
 ##########
 @@ -83,25 +90,82 @@ private static String appendTimestamp(String 
filenamePrefix) {
     return String.format("%s_%s", filenamePrefix, new Date().getTime());
   }
 
-  @Test
-  public void writeThenReadAll() {
-    PCollection<String> testFilenames = pipeline
-        .apply("Generate sequence", 
GenerateSequence.from(0).to(numberOfTextLines))
-        .apply("Produce text lines", ParDo.of(new 
DeterministicallyConstructTestTextLineFn()))
-        .apply("Write content to files", 
TextIO.write().to(filenamePrefix).withOutputFilenames())
-        .getPerDestinationOutputFilenames().apply(Values.<String>create());
+  /** IO IT with no compression. */
+  @RunWith(JUnit4.class)
+  public static class UncompressedTextIOIT {
+
+    @Rule
+    public TestPipeline pipeline = TestPipeline.create();
+
+    @Test
+    public void writeThenReadAll() {
+      PCollection<String> testFilenames = pipeline
+          .apply("Generate sequence", 
GenerateSequence.from(0).to(numberOfTextLines))
+          .apply("Produce text lines", ParDo.of(new 
DeterministicallyConstructTestTextLineFn()))
+          .apply("Write content to files", 
TextIO.write().to(filenamePrefix).withOutputFilenames())
+          .getPerDestinationOutputFilenames().apply(Values.<String>create());
+
+      PCollection<String> consolidatedHashcode = testFilenames
+          .apply("Read all files", TextIO.readAll())
+          .apply("Calculate hashcode", Combine.globally(new HashingFn()));
+
+      String expectedHash = getExpectedHashForLineCount(numberOfTextLines);
+      PAssert.thatSingleton(consolidatedHashcode).isEqualTo(expectedHash);
+
+      testFilenames.apply("Delete test files", ParDo.of(new DeleteFileFn())
+          
.withSideInputs(consolidatedHashcode.apply(View.<String>asSingleton())));
+
+      pipeline.run().waitUntilFinish();
+    }
+  }
+
+  /** IO IT with various compression types. */
+  @RunWith(Parameterized.class)
+  public static class CompressedTextIOIT {
+
+    @Rule
+    public TestPipeline pipeline = TestPipeline.create();
+
+    @Parameterized.Parameters()
+    public static Iterable<Compression> data() {
+      return ImmutableList.<Compression>builder()
+          .add(GZIP)
+          .add(DEFLATE)
+          .add(BZIP2)
+          .build();
+    }
+
+    @Parameterized.Parameter()
+    public Compression compression;
+
+    @Test
+    public void writeThenReadAllWithCompression() {
+      TextIO.TypedWrite<String, Object> write = TextIO
+          .write()
+          .to(filenamePrefix)
+          .withOutputFilenames()
+          .withCompression(compression);
+
+      TextIO.ReadAll read = TextIO.readAll().withCompression(AUTO);
 
-    PCollection<String> consolidatedHashcode = testFilenames
-        .apply("Read all files", TextIO.readAll())
-        .apply("Calculate hashcode", Combine.globally(new HashingFn()));
+      PCollection<String> testFilenames = pipeline
 
 Review comment:
   This and uncompressed version have the same pipeline. Can't we share to code 
between tests (and keep the same test class TextIOIT) and add "compression 
type" as a parameter to the test (a Maven -D parameter for the perfkit based 
runs) ?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to