lgajowy commented on a change in pull request #4149: [BEAM-3060] Add Compressed
TextIOIT
URL: https://github.com/apache/beam/pull/4149#discussion_r152999363
##########
File path:
sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
##########
@@ -83,25 +90,82 @@ private static String appendTimestamp(String
filenamePrefix) {
return String.format("%s_%s", filenamePrefix, new Date().getTime());
}
- @Test
- public void writeThenReadAll() {
- PCollection<String> testFilenames = pipeline
- .apply("Generate sequence",
GenerateSequence.from(0).to(numberOfTextLines))
- .apply("Produce text lines", ParDo.of(new
DeterministicallyConstructTestTextLineFn()))
- .apply("Write content to files",
TextIO.write().to(filenamePrefix).withOutputFilenames())
- .getPerDestinationOutputFilenames().apply(Values.<String>create());
+ /** IO IT with no compression. */
+ @RunWith(JUnit4.class)
+ public static class UncompressedTextIOIT {
+
+ @Rule
+ public TestPipeline pipeline = TestPipeline.create();
+
+ @Test
+ public void writeThenReadAll() {
+ PCollection<String> testFilenames = pipeline
+ .apply("Generate sequence",
GenerateSequence.from(0).to(numberOfTextLines))
+ .apply("Produce text lines", ParDo.of(new
DeterministicallyConstructTestTextLineFn()))
+ .apply("Write content to files",
TextIO.write().to(filenamePrefix).withOutputFilenames())
+ .getPerDestinationOutputFilenames().apply(Values.<String>create());
+
+ PCollection<String> consolidatedHashcode = testFilenames
+ .apply("Read all files", TextIO.readAll())
+ .apply("Calculate hashcode", Combine.globally(new HashingFn()));
+
+ String expectedHash = getExpectedHashForLineCount(numberOfTextLines);
+ PAssert.thatSingleton(consolidatedHashcode).isEqualTo(expectedHash);
+
+ testFilenames.apply("Delete test files", ParDo.of(new DeleteFileFn())
+
.withSideInputs(consolidatedHashcode.apply(View.<String>asSingleton())));
+
+ pipeline.run().waitUntilFinish();
+ }
+ }
+
+ /** IO IT with various compression types. */
+ @RunWith(Parameterized.class)
+ public static class CompressedTextIOIT {
+
+ @Rule
+ public TestPipeline pipeline = TestPipeline.create();
+
+ @Parameterized.Parameters()
+ public static Iterable<Compression> data() {
+ return ImmutableList.<Compression>builder()
+ .add(GZIP)
+ .add(DEFLATE)
+ .add(BZIP2)
+ .build();
+ }
+
+ @Parameterized.Parameter()
+ public Compression compression;
+
+ @Test
+ public void writeThenReadAllWithCompression() {
+ TextIO.TypedWrite<String, Object> write = TextIO
+ .write()
+ .to(filenamePrefix)
+ .withOutputFilenames()
+ .withCompression(compression);
+
+ TextIO.ReadAll read = TextIO.readAll().withCompression(AUTO);
- PCollection<String> consolidatedHashcode = testFilenames
- .apply("Read all files", TextIO.readAll())
- .apply("Calculate hashcode", Combine.globally(new HashingFn()));
+ PCollection<String> testFilenames = pipeline
Review comment:
I think it's hard to do right now without modifying perfkit's code. As we
checked, perfkit ignores -D parameters because builds the mvn verify command by
itself from the parameters passed . I think this could be done in some future
contribution. We will file a bug report in perfkit soon.
I think the best solution (at least for now) is to leave the compression
type in pipeline options. We pass them to perfkit either way (through
`beam_it_options`) and, what imo is more important, compressionType is very
test specific (same as numberOfRecords). WDYT?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services