This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 381f434  [BEAM-3060] Add Compressed TextIOIT
     new 3b79b62  This closes 4149
381f434 is described below

commit 381f4348c1f97fe5d3c31469fe856e575150ad2d
Author: Ɓukasz Gajowy <lukasz.gaj...@polidea.com>
AuthorDate: Mon Nov 20 17:00:54 2017 +0100

    [BEAM-3060] Add Compressed TextIOIT
---
 .../beam/sdk/io/common/IOTestPipelineOptions.java  |  6 ++++
 .../java/org/apache/beam/sdk/io/text/TextIOIT.java | 33 ++++++++++++++++++++--
 2 files changed, 36 insertions(+), 3 deletions(-)

diff --git 
a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
 
b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
index 91b3aa6..5a29d4f 100644
--- 
a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
+++ 
b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
@@ -100,4 +100,10 @@ public interface IOTestPipelineOptions extends 
TestPipelineOptions {
   String getFilenamePrefix();
 
   void setFilenamePrefix(String prefix);
+
+  @Description("File compression type for writing and reading test files")
+  @Default.String("UNCOMPRESSED")
+  String getCompressionType();
+
+  void setCompressionType(String compressionType);
 }
diff --git 
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
 
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
index d741f95..e9aac80 100644
--- 
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
+++ 
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
@@ -15,18 +15,24 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.beam.sdk.io.text;
 
+import static org.apache.beam.sdk.io.Compression.AUTO;
+
 import com.google.common.base.Function;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
+
 import java.io.IOException;
 import java.text.ParseException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
 import java.util.Map;
+
+import org.apache.beam.sdk.io.Compression;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.io.TextIO;
@@ -50,7 +56,7 @@ import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 /**
- * An integration test for {@link org.apache.beam.sdk.io.TextIO}.
+ * Integration tests for {@link org.apache.beam.sdk.io.TextIO}.
  *
  * <p>Run this test using the command below. Pass in connection information 
via PipelineOptions:
  * <pre>
@@ -59,6 +65,7 @@ import org.junit.runners.JUnit4;
  *  -DintegrationTestPipelineOptions='[
  *  "--numberOfRecords=100000",
  *  "--filenamePrefix=TEXTIOIT"
+ *  "--compressionType=GZIP"
  *  ]'
  * </pre>
  * </p>
@@ -70,6 +77,7 @@ public class TextIOIT {
 
   private static String filenamePrefix;
   private static Long numberOfTextLines;
+  private static Compression compressionType;
 
   @Rule
   public TestPipeline pipeline = TestPipeline.create();
@@ -82,6 +90,16 @@ public class TextIOIT {
 
     numberOfTextLines = options.getNumberOfRecords();
     filenamePrefix = appendTimestamp(options.getFilenamePrefix());
+    compressionType = parseCompressionType(options.getCompressionType());
+  }
+
+  private static Compression parseCompressionType(String compressionType) {
+    try {
+      return Compression.valueOf(compressionType.toUpperCase());
+    } catch (IllegalArgumentException ex) {
+      throw new IllegalArgumentException(
+          String.format("Unsupported compression type: %s", compressionType));
+    }
   }
 
   private static String appendTimestamp(String filenamePrefix) {
@@ -90,14 +108,20 @@ public class TextIOIT {
 
   @Test
   public void writeThenReadAll() {
+    TextIO.TypedWrite<String, Object> write = TextIO
+        .write()
+        .to(filenamePrefix)
+        .withOutputFilenames()
+        .withCompression(compressionType);
+
     PCollection<String> testFilenames = pipeline
         .apply("Generate sequence", 
GenerateSequence.from(0).to(numberOfTextLines))
         .apply("Produce text lines", ParDo.of(new 
DeterministicallyConstructTestTextLineFn()))
-        .apply("Write content to files", 
TextIO.write().to(filenamePrefix).withOutputFilenames())
+        .apply("Write content to files", write)
         .getPerDestinationOutputFilenames().apply(Values.<String>create());
 
     PCollection<String> consolidatedHashcode = testFilenames
-        .apply("Read all files", TextIO.readAll())
+        .apply("Read all files", TextIO.readAll().withCompression(AUTO))
         .apply("Calculate hashcode", Combine.globally(new HashingFn()));
 
     String expectedHash = getExpectedHashForLineCount(numberOfTextLines);
@@ -125,6 +149,7 @@ public class TextIOIT {
   }
 
   private static class DeterministicallyConstructTestTextLineFn extends 
DoFn<Long, String> {
+
     @ProcessElement
     public void processElement(ProcessContext c) {
       c.output(String.format("IO IT Test line of text. Line seed: %s", 
c.element()));
@@ -132,6 +157,7 @@ public class TextIOIT {
   }
 
   private static class DeleteFileFn extends DoFn<String, Void> {
+
     @ProcessElement
     public void processElement(ProcessContext c) throws IOException {
       MatchResult match = Iterables
@@ -142,6 +168,7 @@ public class TextIOIT {
     private Collection<ResourceId> toResourceIds(MatchResult match) throws 
IOException {
       return FluentIterable.from(match.metadata())
           .transform(new Function<MatchResult.Metadata, ResourceId>() {
+
             @Override
             public ResourceId apply(MatchResult.Metadata metadata) {
               return metadata.resourceId();

-- 
To stop receiving notification emails like this one, please contact
['"commits@beam.apache.org" <commits@beam.apache.org>'].

Reply via email to