chamikaramj commented on a change in pull request #4149: [BEAM-3060] Add
Compressed TextIOIT
URL: https://github.com/apache/beam/pull/4149#discussion_r152900718
##########
File path:
sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
##########
@@ -83,25 +90,82 @@ private static String appendTimestamp(String
filenamePrefix) {
return String.format("%s_%s", filenamePrefix, new Date().getTime());
}
- @Test
- public void writeThenReadAll() {
- PCollection<String> testFilenames = pipeline
- .apply("Generate sequence",
GenerateSequence.from(0).to(numberOfTextLines))
- .apply("Produce text lines", ParDo.of(new
DeterministicallyConstructTestTextLineFn()))
- .apply("Write content to files",
TextIO.write().to(filenamePrefix).withOutputFilenames())
- .getPerDestinationOutputFilenames().apply(Values.<String>create());
+ /** IO IT with no compression. */
+ @RunWith(JUnit4.class)
+ public static class UncompressedTextIOIT {
+
+ @Rule
+ public TestPipeline pipeline = TestPipeline.create();
+
+ @Test
+ public void writeThenReadAll() {
+ PCollection<String> testFilenames = pipeline
+ .apply("Generate sequence",
GenerateSequence.from(0).to(numberOfTextLines))
+ .apply("Produce text lines", ParDo.of(new
DeterministicallyConstructTestTextLineFn()))
+ .apply("Write content to files",
TextIO.write().to(filenamePrefix).withOutputFilenames())
+ .getPerDestinationOutputFilenames().apply(Values.<String>create());
+
+ PCollection<String> consolidatedHashcode = testFilenames
+ .apply("Read all files", TextIO.readAll())
+ .apply("Calculate hashcode", Combine.globally(new HashingFn()));
+
+ String expectedHash = getExpectedHashForLineCount(numberOfTextLines);
+ PAssert.thatSingleton(consolidatedHashcode).isEqualTo(expectedHash);
+
+ testFilenames.apply("Delete test files", ParDo.of(new DeleteFileFn())
+
.withSideInputs(consolidatedHashcode.apply(View.<String>asSingleton())));
+
+ pipeline.run().waitUntilFinish();
+ }
+ }
+
+ /** IO IT with various compression types. */
+ @RunWith(Parameterized.class)
+ public static class CompressedTextIOIT {
+
+ @Rule
+ public TestPipeline pipeline = TestPipeline.create();
+
+ @Parameterized.Parameters()
+ public static Iterable<Compression> data() {
+ return ImmutableList.<Compression>builder()
+ .add(GZIP)
+ .add(DEFLATE)
+ .add(BZIP2)
+ .build();
+ }
+
+ @Parameterized.Parameter()
+ public Compression compression;
+
+ @Test
+ public void writeThenReadAllWithCompression() {
+ TextIO.TypedWrite<String, Object> write = TextIO
+ .write()
+ .to(filenamePrefix)
+ .withOutputFilenames()
+ .withCompression(compression);
+
+ TextIO.ReadAll read = TextIO.readAll().withCompression(AUTO);
- PCollection<String> consolidatedHashcode = testFilenames
- .apply("Read all files", TextIO.readAll())
- .apply("Calculate hashcode", Combine.globally(new HashingFn()));
+ PCollection<String> testFilenames = pipeline
Review comment:
This and uncompressed version have the same pipeline. Can't we share to code
between tests (and keep the same test class TextIOIT) and add "compression
type" as a parameter to the test (a Maven -D parameter for the perfkit based
runs) ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services