This is an automated email from the ASF dual-hosted git repository. chamikara pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 381f434 [BEAM-3060] Add Compressed TextIOIT new 3b79b62 This closes 4149 381f434 is described below commit 381f4348c1f97fe5d3c31469fe856e575150ad2d Author: Ćukasz Gajowy <lukasz.gaj...@polidea.com> AuthorDate: Mon Nov 20 17:00:54 2017 +0100 [BEAM-3060] Add Compressed TextIOIT --- .../beam/sdk/io/common/IOTestPipelineOptions.java | 6 ++++ .../java/org/apache/beam/sdk/io/text/TextIOIT.java | 33 ++++++++++++++++++++-- 2 files changed, 36 insertions(+), 3 deletions(-) diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java index 91b3aa6..5a29d4f 100644 --- a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java +++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java @@ -100,4 +100,10 @@ public interface IOTestPipelineOptions extends TestPipelineOptions { String getFilenamePrefix(); void setFilenamePrefix(String prefix); + + @Description("File compression type for writing and reading test files") + @Default.String("UNCOMPRESSED") + String getCompressionType(); + + void setCompressionType(String compressionType); } diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java index d741f95..e9aac80 100644 --- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java +++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java @@ -15,18 +15,24 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.beam.sdk.io.text; +import static org.apache.beam.sdk.io.Compression.AUTO; + import com.google.common.base.Function; import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; + import java.io.IOException; import java.text.ParseException; import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.Map; + +import org.apache.beam.sdk.io.Compression; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.TextIO; @@ -50,7 +56,7 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; /** - * An integration test for {@link org.apache.beam.sdk.io.TextIO}. + * Integration tests for {@link org.apache.beam.sdk.io.TextIO}. * * <p>Run this test using the command below. Pass in connection information via PipelineOptions: * <pre> @@ -59,6 +65,7 @@ import org.junit.runners.JUnit4; * -DintegrationTestPipelineOptions='[ * "--numberOfRecords=100000", * "--filenamePrefix=TEXTIOIT" + * "--compressionType=GZIP" * ]' * </pre> * </p> @@ -70,6 +77,7 @@ public class TextIOIT { private static String filenamePrefix; private static Long numberOfTextLines; + private static Compression compressionType; @Rule public TestPipeline pipeline = TestPipeline.create(); @@ -82,6 +90,16 @@ public class TextIOIT { numberOfTextLines = options.getNumberOfRecords(); filenamePrefix = appendTimestamp(options.getFilenamePrefix()); + compressionType = parseCompressionType(options.getCompressionType()); + } + + private static Compression parseCompressionType(String compressionType) { + try { + return Compression.valueOf(compressionType.toUpperCase()); + } catch (IllegalArgumentException ex) { + throw new IllegalArgumentException( + String.format("Unsupported compression type: %s", compressionType)); + } } private static String appendTimestamp(String filenamePrefix) { @@ -90,14 +108,20 @@ public class TextIOIT { @Test public void writeThenReadAll() { + TextIO.TypedWrite<String, Object> write = TextIO + .write() + .to(filenamePrefix) + .withOutputFilenames() + .withCompression(compressionType); + PCollection<String> testFilenames = pipeline .apply("Generate sequence", GenerateSequence.from(0).to(numberOfTextLines)) .apply("Produce text lines", ParDo.of(new DeterministicallyConstructTestTextLineFn())) - .apply("Write content to files", TextIO.write().to(filenamePrefix).withOutputFilenames()) + .apply("Write content to files", write) .getPerDestinationOutputFilenames().apply(Values.<String>create()); PCollection<String> consolidatedHashcode = testFilenames - .apply("Read all files", TextIO.readAll()) + .apply("Read all files", TextIO.readAll().withCompression(AUTO)) .apply("Calculate hashcode", Combine.globally(new HashingFn())); String expectedHash = getExpectedHashForLineCount(numberOfTextLines); @@ -125,6 +149,7 @@ public class TextIOIT { } private static class DeterministicallyConstructTestTextLineFn extends DoFn<Long, String> { + @ProcessElement public void processElement(ProcessContext c) { c.output(String.format("IO IT Test line of text. Line seed: %s", c.element())); @@ -132,6 +157,7 @@ public class TextIOIT { } private static class DeleteFileFn extends DoFn<String, Void> { + @ProcessElement public void processElement(ProcessContext c) throws IOException { MatchResult match = Iterables @@ -142,6 +168,7 @@ public class TextIOIT { private Collection<ResourceId> toResourceIds(MatchResult match) throws IOException { return FluentIterable.from(match.metadata()) .transform(new Function<MatchResult.Metadata, ResourceId>() { + @Override public ResourceId apply(MatchResult.Metadata metadata) { return metadata.resourceId(); -- To stop receiving notification emails like this one, please contact ['"commits@beam.apache.org" <commits@beam.apache.org>'].