Repository: beam
Updated Branches:
  refs/heads/master f7d4583bd -> 80b9cf9c2


Moves Match into FileIO.match()/matchAll()

FileIO will later gain other methods, such as read()/write().

Also introduces FileIO.MatchConfiguration - a common type to use
by various file-based IOs to reduce boilerplate, and uses it in TextIO.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/58e8a01b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/58e8a01b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/58e8a01b

Branch: refs/heads/master
Commit: 58e8a01b351a80ca233983c8ccfd4b2699c86a3a
Parents: f7d4583
Author: Eugene Kirpichov <kirpic...@google.com>
Authored: Thu Aug 24 16:31:41 2017 -0700
Committer: Eugene Kirpichov <ekirpic...@gmail.com>
Committed: Sun Sep 3 16:14:54 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 232 +++++++---------
 .../java/org/apache/beam/sdk/io/FileIO.java     | 265 +++++++++++++++++++
 .../main/java/org/apache/beam/sdk/io/Match.java | 156 -----------
 .../beam/sdk/io/ReadAllViaFileBasedSource.java  |   2 +-
 .../java/org/apache/beam/sdk/io/TextIO.java     | 147 ++++------
 .../java/org/apache/beam/sdk/io/AvroIOTest.java |   4 +-
 .../org/apache/beam/sdk/io/TextIOReadTest.java  |   5 +-
 7 files changed, 418 insertions(+), 393 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/58e8a01b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 6eeeac9..c4711e8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -19,7 +19,6 @@ package org.apache.beam.sdk.io;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.beam.sdk.transforms.Watch.Growth.ignoreInput;
 
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
@@ -40,6 +39,7 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
+import org.apache.beam.sdk.io.FileIO.MatchConfiguration;
 import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.ValueProvider;
@@ -266,7 +266,7 @@ public class AvroIO {
    */
   public static <T> Read<T> read(Class<T> recordClass) {
     return new AutoValue_AvroIO_Read.Builder<T>()
-        .setEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW)
+        
.setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW))
         .setRecordClass(recordClass)
         .setSchema(ReflectData.get().getSchema(recordClass))
         .setHintMatchesManyFiles(false)
@@ -276,7 +276,7 @@ public class AvroIO {
   /** Like {@link #read}, but reads each filepattern in the input {@link 
PCollection}. */
   public static <T> ReadAll<T> readAll(Class<T> recordClass) {
     return new AutoValue_AvroIO_ReadAll.Builder<T>()
-        .setEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD)
+        
.setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD))
         .setRecordClass(recordClass)
         .setSchema(ReflectData.get().getSchema(recordClass))
         // 64MB is a reasonable value that allows to amortize the cost of 
opening files,
@@ -289,7 +289,7 @@ public class AvroIO {
   /** Reads Avro file(s) containing records of the specified schema. */
   public static Read<GenericRecord> readGenericRecords(Schema schema) {
     return new AutoValue_AvroIO_Read.Builder<GenericRecord>()
-        .setEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW)
+        
.setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW))
         .setRecordClass(GenericRecord.class)
         .setSchema(schema)
         .setHintMatchesManyFiles(false)
@@ -302,7 +302,7 @@ public class AvroIO {
    */
   public static ReadAll<GenericRecord> readAllGenericRecords(Schema schema) {
     return new AutoValue_AvroIO_ReadAll.Builder<GenericRecord>()
-        .setEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD)
+        
.setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD))
         .setRecordClass(GenericRecord.class)
         .setSchema(schema)
         .setDesiredBundleSizeBytes(64 * 1024 * 1024L)
@@ -331,7 +331,7 @@ public class AvroIO {
    */
   public static <T> Parse<T> 
parseGenericRecords(SerializableFunction<GenericRecord, T> parseFn) {
     return new AutoValue_AvroIO_Parse.Builder<T>()
-        .setEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW)
+        
.setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW))
         .setParseFn(parseFn)
         .setHintMatchesManyFiles(false)
         .build();
@@ -344,7 +344,7 @@ public class AvroIO {
   public static <T> ParseAll<T> parseAllGenericRecords(
       SerializableFunction<GenericRecord, T> parseFn) {
     return new AutoValue_AvroIO_ParseAll.Builder<T>()
-        .setEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD)
+        
.setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD))
         .setParseFn(parseFn)
         .setDesiredBundleSizeBytes(64 * 1024 * 1024L)
         .build();
@@ -425,9 +425,7 @@ public class AvroIO {
   @AutoValue
   public abstract static class Read<T> extends PTransform<PBegin, 
PCollection<T>> {
     @Nullable abstract ValueProvider<String> getFilepattern();
-    abstract EmptyMatchTreatment getEmptyMatchTreatment();
-    @Nullable abstract Duration getWatchForNewFilesInterval();
-    @Nullable abstract TerminationCondition<?, ?> 
getWatchForNewFilesTerminationCondition();
+    abstract MatchConfiguration getMatchConfiguration();
     @Nullable abstract Class<T> getRecordClass();
     @Nullable abstract Schema getSchema();
     abstract boolean getHintMatchesManyFiles();
@@ -437,10 +435,7 @@ public class AvroIO {
     @AutoValue.Builder
     abstract static class Builder<T> {
       abstract Builder<T> setFilepattern(ValueProvider<String> filepattern);
-      abstract Builder<T> setEmptyMatchTreatment(EmptyMatchTreatment 
emptyMatchTreatment);
-      abstract Builder<T> setWatchForNewFilesInterval(Duration 
watchForNewFilesInterval);
-      abstract Builder<T> setWatchForNewFilesTerminationCondition(
-              TerminationCondition<?, ?> condition);
+      abstract Builder<T> setMatchConfiguration(MatchConfiguration 
matchConfiguration);
       abstract Builder<T> setRecordClass(Class<T> recordClass);
       abstract Builder<T> setSchema(Schema schema);
       abstract Builder<T> setHintMatchesManyFiles(boolean hintManyFiles);
@@ -463,11 +458,15 @@ public class AvroIO {
       return from(StaticValueProvider.of(filepattern));
     }
 
-    /**
-     * Configures whether or not a filepattern matching no files is allowed.
-     */
+
+    /** Sets the {@link MatchConfiguration}. */
+    public Read<T> withMatchConfiguration(MatchConfiguration 
matchConfiguration) {
+      return toBuilder().setMatchConfiguration(matchConfiguration).build();
+    }
+
+    /** Configures whether or not a filepattern matching no files is allowed. 
*/
     public Read<T> withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
-      return toBuilder().setEmptyMatchTreatment(treatment).build();
+      return 
withMatchConfiguration(getMatchConfiguration().withEmptyMatchTreatment(treatment));
     }
 
     /**
@@ -476,16 +475,12 @@ public class AvroIO {
      * is unbounded.
      *
      * <p>This works only in runners supporting {@link Kind#SPLITTABLE_DO_FN}.
-     *
-     * @see TerminationCondition
      */
     @Experimental(Kind.SPLITTABLE_DO_FN)
     public Read<T> watchForNewFiles(
-        Duration pollInterval, TerminationCondition<?, ?> 
terminationCondition) {
-      return toBuilder()
-          .setWatchForNewFilesInterval(pollInterval)
-          .setWatchForNewFilesTerminationCondition(terminationCondition)
-          .build();
+        Duration pollInterval, TerminationCondition<String, ?> 
terminationCondition) {
+      return withMatchConfiguration(
+              getMatchConfiguration().continuously(pollInterval, 
terminationCondition));
     }
 
     /**
@@ -506,12 +501,15 @@ public class AvroIO {
       checkNotNull(getFilepattern(), "filepattern");
       checkNotNull(getSchema(), "schema");
 
-      if (getWatchForNewFilesInterval() == null && !getHintMatchesManyFiles()) 
{
+      if (getMatchConfiguration().getWatchInterval() == null && 
!getHintMatchesManyFiles()) {
         return input.apply(
             "Read",
             org.apache.beam.sdk.io.Read.from(
                 createSource(
-                    getFilepattern(), getEmptyMatchTreatment(), 
getRecordClass(), getSchema())));
+                    getFilepattern(),
+                    getMatchConfiguration().getEmptyMatchTreatment(),
+                    getRecordClass(),
+                    getSchema())));
       }
       // All other cases go through ReadAll.
 
@@ -519,12 +517,7 @@ public class AvroIO {
           (getRecordClass() == GenericRecord.class)
               ? (ReadAll<T>) readAllGenericRecords(getSchema())
               : readAll(getRecordClass());
-      readAll = readAll.withEmptyMatchTreatment(getEmptyMatchTreatment());
-      if (getWatchForNewFilesInterval() != null) {
-        TerminationCondition<String, ?> readAllCondition =
-            ignoreInput(getWatchForNewFilesTerminationCondition());
-        readAll = readAll.watchForNewFiles(getWatchForNewFilesInterval(), 
readAllCondition);
-      }
+      readAll = readAll.withMatchConfiguration(getMatchConfiguration());
       return input
           .apply("Create filepattern", Create.ofProvider(getFilepattern(), 
StringUtf8Coder.of()))
           .apply("Via ReadAll", readAll);
@@ -536,12 +529,7 @@ public class AvroIO {
       builder
           .addIfNotNull(
               DisplayData.item("filePattern", 
getFilepattern()).withLabel("Input File Pattern"))
-          .add(
-              DisplayData.item("emptyMatchTreatment", 
getEmptyMatchTreatment().toString())
-                  .withLabel("Treatment of filepatterns that match no files"))
-          .addIfNotNull(
-              DisplayData.item("watchForNewFilesInterval", 
getWatchForNewFilesInterval())
-                  .withLabel("Interval to watch for new files"));
+          .include("matchConfiguration", getMatchConfiguration());
     }
 
     @SuppressWarnings("unchecked")
@@ -563,9 +551,7 @@ public class AvroIO {
   /** Implementation of {@link #readAll}. */
   @AutoValue
   public abstract static class ReadAll<T> extends 
PTransform<PCollection<String>, PCollection<T>> {
-    abstract EmptyMatchTreatment getEmptyMatchTreatment();
-    @Nullable abstract Duration getWatchForNewFilesInterval();
-    @Nullable abstract TerminationCondition<String, ?> 
getWatchForNewFilesTerminationCondition();
+    abstract MatchConfiguration getMatchConfiguration();
     @Nullable abstract Class<T> getRecordClass();
     @Nullable abstract Schema getSchema();
     abstract long getDesiredBundleSizeBytes();
@@ -574,10 +560,7 @@ public class AvroIO {
 
     @AutoValue.Builder
     abstract static class Builder<T> {
-      abstract Builder<T> setEmptyMatchTreatment(EmptyMatchTreatment 
emptyMatchTreatment);
-      abstract Builder<T> setWatchForNewFilesInterval(Duration 
watchForNewFilesInterval);
-      abstract Builder<T> setWatchForNewFilesTerminationCondition(
-              TerminationCondition<String, ?> condition);
+      abstract Builder<T> setMatchConfiguration(MatchConfiguration 
matchConfiguration);
       abstract Builder<T> setRecordClass(Class<T> recordClass);
       abstract Builder<T> setSchema(Schema schema);
       abstract Builder<T> setDesiredBundleSizeBytes(long 
desiredBundleSizeBytes);
@@ -585,19 +568,23 @@ public class AvroIO {
       abstract ReadAll<T> build();
     }
 
+
+    /** Sets the {@link MatchConfiguration}. */
+    public ReadAll<T> withMatchConfiguration(MatchConfiguration configuration) 
{
+      return toBuilder().setMatchConfiguration(configuration).build();
+    }
+
     /** Like {@link Read#withEmptyMatchTreatment}. */
     public ReadAll<T> withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
-      return toBuilder().setEmptyMatchTreatment(treatment).build();
+      return 
withMatchConfiguration(getMatchConfiguration().withEmptyMatchTreatment(treatment));
     }
 
     /** Like {@link Read#watchForNewFiles}. */
     @Experimental(Kind.SPLITTABLE_DO_FN)
     public ReadAll<T> watchForNewFiles(
         Duration pollInterval, TerminationCondition<String, ?> 
terminationCondition) {
-      return toBuilder()
-              .setWatchForNewFilesInterval(pollInterval)
-              .setWatchForNewFilesTerminationCondition(terminationCondition)
-              .build();
+      return withMatchConfiguration(
+          getMatchConfiguration().continuously(pollInterval, 
terminationCondition));
     }
 
     @VisibleForTesting
@@ -608,48 +595,30 @@ public class AvroIO {
     @Override
     public PCollection<T> expand(PCollection<String> input) {
       checkNotNull(getSchema(), "schema");
-      Match.Filepatterns matchFilepatterns =
-          
Match.filepatterns().withEmptyMatchTreatment(getEmptyMatchTreatment());
-      if (getWatchForNewFilesInterval() != null) {
-        matchFilepatterns =
-            matchFilepatterns.continuously(
-                getWatchForNewFilesInterval(), 
getWatchForNewFilesTerminationCondition());
-      }
-
       return input
-          .apply(matchFilepatterns)
+          .apply(FileIO.matchAll().withConfiguration(getMatchConfiguration()))
           .apply(
               "Read all via FileBasedSource",
               new ReadAllViaFileBasedSource<>(
                   SerializableFunctions.<String, Boolean>constant(true) /* 
isSplittable */,
                   getDesiredBundleSizeBytes(),
-                  new CreateSourceFn<>(
-                      getEmptyMatchTreatment(), getRecordClass(), 
getSchema().toString())))
+                  new CreateSourceFn<>(getRecordClass(), 
getSchema().toString())))
           .setCoder(AvroCoder.of(getRecordClass(), getSchema()));
     }
 
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
-      builder
-          .add(
-              DisplayData.item("emptyMatchTreatment", 
getEmptyMatchTreatment().toString())
-                  .withLabel("Treatment of filepatterns that match no files"))
-          .addIfNotNull(
-              DisplayData.item("watchForNewFilesInterval", 
getWatchForNewFilesInterval())
-                  .withLabel("Interval to watch for new files"));
+      builder.include("matchConfiguration", getMatchConfiguration());
     }
   }
 
   private static class CreateSourceFn<T>
       implements SerializableFunction<String, FileBasedSource<T>> {
-    private final EmptyMatchTreatment emptyMatchTreatment;
     private final Class<T> recordClass;
     private final Supplier<Schema> schemaSupplier;
 
-    public CreateSourceFn(
-        EmptyMatchTreatment emptyMatchTreatment, Class<T> recordClass, String 
jsonSchema) {
-      this.emptyMatchTreatment = emptyMatchTreatment;
+    public CreateSourceFn(Class<T> recordClass, String jsonSchema) {
       this.recordClass = recordClass;
       this.schemaSupplier = AvroUtils.serializableSchemaSupplier(jsonSchema);
     }
@@ -657,7 +626,10 @@ public class AvroIO {
     @Override
     public FileBasedSource<T> apply(String input) {
       return Read.createSource(
-          StaticValueProvider.of(input), emptyMatchTreatment, recordClass, 
schemaSupplier.get());
+          StaticValueProvider.of(input),
+          EmptyMatchTreatment.DISALLOW,
+          recordClass,
+          schemaSupplier.get());
     }
   }
 
@@ -667,9 +639,7 @@ public class AvroIO {
   @AutoValue
   public abstract static class Parse<T> extends PTransform<PBegin, 
PCollection<T>> {
     @Nullable abstract ValueProvider<String> getFilepattern();
-    abstract EmptyMatchTreatment getEmptyMatchTreatment();
-    @Nullable abstract Duration getWatchForNewFilesInterval();
-    @Nullable abstract TerminationCondition<?, ?> 
getWatchForNewFilesTerminationCondition();
+    abstract MatchConfiguration getMatchConfiguration();
     abstract SerializableFunction<GenericRecord, T> getParseFn();
     @Nullable abstract Coder<T> getCoder();
     abstract boolean getHintMatchesManyFiles();
@@ -679,10 +649,7 @@ public class AvroIO {
     @AutoValue.Builder
     abstract static class Builder<T> {
       abstract Builder<T> setFilepattern(ValueProvider<String> filepattern);
-      abstract Builder<T> setEmptyMatchTreatment(EmptyMatchTreatment 
emptyMatchTreatment);
-      abstract Builder<T> setWatchForNewFilesInterval(Duration 
watchForNewFilesInterval);
-      abstract Builder<T> setWatchForNewFilesTerminationCondition(
-              TerminationCondition<?, ?> condition);
+      abstract Builder<T> setMatchConfiguration(MatchConfiguration 
matchConfiguration);
       abstract Builder<T> setParseFn(SerializableFunction<GenericRecord, T> 
parseFn);
       abstract Builder<T> setCoder(Coder<T> coder);
       abstract Builder<T> setHintMatchesManyFiles(boolean 
hintMatchesManyFiles);
@@ -700,19 +667,22 @@ public class AvroIO {
       return toBuilder().setFilepattern(filepattern).build();
     }
 
+    /** Sets the {@link MatchConfiguration}. */
+    public Parse<T> withMatchConfiguration(MatchConfiguration configuration) {
+      return toBuilder().setMatchConfiguration(configuration).build();
+    }
+
     /** Like {@link Read#withEmptyMatchTreatment}. */
     public Parse<T> withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
-      return toBuilder().setEmptyMatchTreatment(treatment).build();
+      return 
withMatchConfiguration(getMatchConfiguration().withEmptyMatchTreatment(treatment));
     }
 
     /** Like {@link Read#watchForNewFiles}. */
     @Experimental(Kind.SPLITTABLE_DO_FN)
     public Parse<T> watchForNewFiles(
-        Duration pollInterval, TerminationCondition<?, ?> 
terminationCondition) {
-      return toBuilder()
-          .setWatchForNewFilesInterval(pollInterval)
-          .setWatchForNewFilesTerminationCondition(terminationCondition)
-          .build();
+        Duration pollInterval, TerminationCondition<String, ?> 
terminationCondition) {
+      return withMatchConfiguration(
+          getMatchConfiguration().continuously(pollInterval, 
terminationCondition));
     }
 
     /** Sets a coder for the result of the parse function. */
@@ -730,24 +700,19 @@ public class AvroIO {
       checkNotNull(getFilepattern(), "filepattern");
       Coder<T> coder = inferCoder(getCoder(), getParseFn(), 
input.getPipeline().getCoderRegistry());
 
-      if (getWatchForNewFilesInterval() == null && !getHintMatchesManyFiles()) 
{
+      if (getMatchConfiguration().getWatchInterval() == null && 
!getHintMatchesManyFiles()) {
         return input.apply(
                 org.apache.beam.sdk.io.Read.from(
                         
AvroSource.from(getFilepattern()).withParseFn(getParseFn(), coder)));
       }
       // All other cases go through ParseAllGenericRecords.
-      ParseAll<T> parseAll =
-          parseAllGenericRecords(getParseFn())
-              .withCoder(coder)
-              .withEmptyMatchTreatment(getEmptyMatchTreatment());
-      if (getWatchForNewFilesInterval() != null) {
-        TerminationCondition<String, ?> parseAllCondition =
-            ignoreInput(getWatchForNewFilesTerminationCondition());
-        parseAll = parseAll.watchForNewFiles(getWatchForNewFilesInterval(), 
parseAllCondition);
-      }
       return input
           .apply("Create filepattern", Create.ofProvider(getFilepattern(), 
StringUtf8Coder.of()))
-          .apply("Via ParseAll", parseAll);
+          .apply(
+              "Via ParseAll",
+              parseAllGenericRecords(getParseFn())
+                  .withCoder(coder)
+                  .withMatchConfiguration(getMatchConfiguration()));
     }
 
     private static <T> Coder<T> inferCoder(
@@ -776,12 +741,7 @@ public class AvroIO {
           .addIfNotNull(
               DisplayData.item("filePattern", 
getFilepattern()).withLabel("Input File Pattern"))
           .add(DisplayData.item("parseFn", 
getParseFn().getClass()).withLabel("Parse function"))
-          .add(
-              DisplayData.item("emptyMatchTreatment", 
getEmptyMatchTreatment().toString())
-                  .withLabel("Treatment of filepatterns that match no files"))
-          .addIfNotNull(
-              DisplayData.item("watchForNewFilesInterval", 
getWatchForNewFilesInterval())
-                  .withLabel("Interval to watch for new files"));
+          .include("matchConfiguration", getMatchConfiguration());
     }
   }
 
@@ -790,9 +750,7 @@ public class AvroIO {
   /** Implementation of {@link #parseAllGenericRecords}. */
   @AutoValue
   public abstract static class ParseAll<T> extends 
PTransform<PCollection<String>, PCollection<T>> {
-    abstract EmptyMatchTreatment getEmptyMatchTreatment();
-    @Nullable abstract Duration getWatchForNewFilesInterval();
-    @Nullable abstract TerminationCondition<String, ?> 
getWatchForNewFilesTerminationCondition();
+    abstract MatchConfiguration getMatchConfiguration();
     abstract SerializableFunction<GenericRecord, T> getParseFn();
     @Nullable abstract Coder<T> getCoder();
     abstract long getDesiredBundleSizeBytes();
@@ -801,10 +759,7 @@ public class AvroIO {
 
     @AutoValue.Builder
     abstract static class Builder<T> {
-      abstract Builder<T> setEmptyMatchTreatment(EmptyMatchTreatment 
emptyMatchTreatment);
-      abstract Builder<T> setWatchForNewFilesInterval(Duration 
watchForNewFilesInterval);
-      abstract Builder<T> setWatchForNewFilesTerminationCondition(
-          TerminationCondition<String, ?> condition);
+      abstract Builder<T> setMatchConfiguration(MatchConfiguration 
matchConfiguration);
       abstract Builder<T> setParseFn(SerializableFunction<GenericRecord, T> 
parseFn);
       abstract Builder<T> setCoder(Coder<T> coder);
       abstract Builder<T> setDesiredBundleSizeBytes(long 
desiredBundleSizeBytes);
@@ -812,19 +767,22 @@ public class AvroIO {
       abstract ParseAll<T> build();
     }
 
+    /** Sets the {@link MatchConfiguration}. */
+    public ParseAll<T> withMatchConfiguration(MatchConfiguration 
configuration) {
+      return toBuilder().setMatchConfiguration(configuration).build();
+    }
+
     /** Like {@link Read#withEmptyMatchTreatment}. */
     public ParseAll<T> withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
-      return toBuilder().setEmptyMatchTreatment(treatment).build();
+      return 
withMatchConfiguration(getMatchConfiguration().withEmptyMatchTreatment(treatment));
     }
 
     /** Like {@link Read#watchForNewFiles}. */
     @Experimental(Kind.SPLITTABLE_DO_FN)
     public ParseAll<T> watchForNewFiles(
         Duration pollInterval, TerminationCondition<String, ?> 
terminationCondition) {
-      return toBuilder()
-          .setWatchForNewFilesInterval(pollInterval)
-          .setWatchForNewFilesTerminationCondition(terminationCondition)
-          .build();
+      return withMatchConfiguration(
+              getMatchConfiguration().continuously(pollInterval, 
terminationCondition));
     }
 
     /** Specifies the coder for the result of the {@code parseFn}. */
@@ -842,25 +800,10 @@ public class AvroIO {
       final Coder<T> coder =
           Parse.inferCoder(getCoder(), getParseFn(), 
input.getPipeline().getCoderRegistry());
       final SerializableFunction<GenericRecord, T> parseFn = getParseFn();
-      final EmptyMatchTreatment emptyMatchTreatment = getEmptyMatchTreatment();
       final SerializableFunction<String, FileBasedSource<T>> createSource =
-          new SerializableFunction<String, FileBasedSource<T>>() {
-            @Override
-            public FileBasedSource<T> apply(String input) {
-              return AvroSource.from(input)
-                  .withParseFn(parseFn, coder)
-                  .withEmptyMatchTreatment(emptyMatchTreatment);
-            }
-          };
-      Match.Filepatterns matchFilepatterns =
-              
Match.filepatterns().withEmptyMatchTreatment(emptyMatchTreatment);
-      if (getWatchForNewFilesInterval() != null) {
-        matchFilepatterns =
-                matchFilepatterns.continuously(
-                        getWatchForNewFilesInterval(), 
getWatchForNewFilesTerminationCondition());
-      }
+              new CreateParseSourceFn<>(parseFn, coder);
       return input
-          .apply(matchFilepatterns)
+          .apply(FileIO.matchAll().withConfiguration(getMatchConfiguration()))
           .apply(
               "Parse all via FileBasedSource",
               new ReadAllViaFileBasedSource<>(
@@ -875,12 +818,23 @@ public class AvroIO {
       super.populateDisplayData(builder);
       builder
           .add(DisplayData.item("parseFn", 
getParseFn().getClass()).withLabel("Parse function"))
-          .add(
-              DisplayData.item("emptyMatchTreatment", 
getEmptyMatchTreatment().toString())
-                  .withLabel("Treatment of filepatterns that match no files"))
-          .addIfNotNull(
-              DisplayData.item("watchForNewFilesInterval", 
getWatchForNewFilesInterval())
-                  .withLabel("Interval to watch for new files"));
+          .include("matchConfiguration", getMatchConfiguration());
+    }
+
+    private static class CreateParseSourceFn<T>
+        implements SerializableFunction<String, FileBasedSource<T>> {
+      private final SerializableFunction<GenericRecord, T> parseFn;
+      private final Coder<T> coder;
+
+      public CreateParseSourceFn(SerializableFunction<GenericRecord, T> 
parseFn, Coder<T> coder) {
+        this.parseFn = parseFn;
+        this.coder = coder;
+      }
+
+      @Override
+      public FileBasedSource<T> apply(String input) {
+        return AvroSource.from(input).withParseFn(parseFn, coder);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/58e8a01b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
new file mode 100644
index 0000000..1eb81df
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.Watch;
+import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Transforms for working with files. Currently includes matching of 
filepatterns via {@link #match}
+ * and {@link #matchAll}.
+ */
+public class FileIO {
+  private static final Logger LOG = LoggerFactory.getLogger(FileIO.class);
+
+  /**
+   * Matches a filepattern using {@link FileSystems#match} and produces a 
collection of matched
+   * resources (both files and directories) as {@link MatchResult.Metadata}.
+   *
+   * <p>By default, matches the filepattern once and produces a bounded {@link 
PCollection}. To
+   * continuously watch the filepattern for new matches, use {@link 
MatchAll#continuously(Duration,
+   * TerminationCondition)} - this will produce an unbounded {@link 
PCollection}.
+   *
+   * <p>By default, a filepattern matching no resources is treated according 
to {@link
+   * EmptyMatchTreatment#DISALLOW}. To configure this behavior, use {@link
+   * Match#withEmptyMatchTreatment}.
+   */
+  public static Match match() {
+    return new AutoValue_FileIO_Match.Builder()
+        
.setConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW))
+        .build();
+  }
+
+  /**
+   * Like {@link #match}, but matches each filepattern in a collection of 
filepatterns.
+   *
+   * <p>Resources are not deduplicated between filepatterns, i.e. if the same 
resource matches
+   * multiple filepatterns, it will be produced multiple times.
+   *
+   * <p>By default, a filepattern matching no resources is treated according 
to {@link
+   * EmptyMatchTreatment#ALLOW_IF_WILDCARD}. To configure this behavior, use 
{@link
+   * MatchAll#withEmptyMatchTreatment}.
+   */
+  public static MatchAll matchAll() {
+    return new AutoValue_FileIO_MatchAll.Builder()
+        
.setConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD))
+        .build();
+  }
+
+  /**
+   * Describes configuration for matching filepatterns, such as {@link 
EmptyMatchTreatment}
+   * and continuous watching for matching files.
+   */
+  @AutoValue
+  public abstract static class MatchConfiguration implements HasDisplayData, 
Serializable {
+    /** Creates a {@link MatchConfiguration} with the given {@link 
EmptyMatchTreatment}. */
+    public static MatchConfiguration create(EmptyMatchTreatment 
emptyMatchTreatment) {
+      return new AutoValue_FileIO_MatchConfiguration.Builder()
+          .setEmptyMatchTreatment(emptyMatchTreatment)
+          .build();
+    }
+
+    abstract EmptyMatchTreatment getEmptyMatchTreatment();
+    @Nullable abstract Duration getWatchInterval();
+    @Nullable abstract TerminationCondition<String, ?> 
getWatchTerminationCondition();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment treatment);
+      abstract Builder setWatchInterval(Duration watchInterval);
+      abstract Builder 
setWatchTerminationCondition(TerminationCondition<String, ?> condition);
+      abstract MatchConfiguration build();
+    }
+
+    /** Sets the {@link EmptyMatchTreatment}. */
+    public MatchConfiguration withEmptyMatchTreatment(EmptyMatchTreatment 
treatment) {
+      return toBuilder().setEmptyMatchTreatment(treatment).build();
+    }
+
+    /**
+     * Continuously watches for new files at the given interval until the 
given termination
+     * condition is reached, where the input to the condition is the 
filepattern.
+     */
+    public MatchConfiguration continuously(
+        Duration interval, TerminationCondition<String, ?> condition) {
+      return 
toBuilder().setWatchInterval(interval).setWatchTerminationCondition(condition).build();
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      builder
+          .add(
+              DisplayData.item("emptyMatchTreatment", 
getEmptyMatchTreatment().toString())
+                  .withLabel("Treatment of filepatterns that match no files"))
+          .addIfNotNull(
+              DisplayData.item("watchForNewFilesInterval", getWatchInterval())
+                  .withLabel("Interval to watch for new files"));
+    }
+  }
+
+  /** Implementation of {@link #match}. */
+  @AutoValue
+  public abstract static class Match extends PTransform<PBegin, 
PCollection<MatchResult.Metadata>> {
+    abstract ValueProvider<String> getFilepattern();
+    abstract MatchConfiguration getConfiguration();
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setFilepattern(ValueProvider<String> filepattern);
+      abstract Builder setConfiguration(MatchConfiguration configuration);
+      abstract Match build();
+    }
+
+    /** Matches the given filepattern. */
+    public Match filepattern(String filepattern) {
+      return 
this.filepattern(ValueProvider.StaticValueProvider.of(filepattern));
+    }
+
+    /** Like {@link #filepattern(String)} but using a {@link ValueProvider}. */
+    public Match filepattern(ValueProvider<String> filepattern) {
+      return toBuilder().setFilepattern(filepattern).build();
+    }
+
+    /** Sets the {@link MatchConfiguration}. */
+    public Match withConfiguration(MatchConfiguration configuration) {
+      return toBuilder().setConfiguration(configuration).build();
+    }
+
+    /** See {@link 
MatchConfiguration#withEmptyMatchTreatment(EmptyMatchTreatment)}. */
+    public Match withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+      return 
withConfiguration(getConfiguration().withEmptyMatchTreatment(treatment));
+    }
+
+    /**
+     * See {@link MatchConfiguration#continuously}. The returned {@link 
PCollection} is unbounded.
+     *
+     * <p>This works only in runners supporting {@link 
Experimental.Kind#SPLITTABLE_DO_FN}.
+     */
+    @Experimental(Experimental.Kind.SPLITTABLE_DO_FN)
+    public Match continuously(
+        Duration pollInterval, TerminationCondition<String, ?> 
terminationCondition) {
+      return withConfiguration(getConfiguration().continuously(pollInterval, 
terminationCondition));
+    }
+
+    @Override
+    public PCollection<MatchResult.Metadata> expand(PBegin input) {
+      return input
+          .apply("Create filepattern", Create.ofProvider(getFilepattern(), 
StringUtf8Coder.of()))
+          .apply("Via MatchAll", 
matchAll().withConfiguration(getConfiguration()));
+    }
+  }
+
+  /** Implementation of {@link #matchAll}. */
+  @AutoValue
+  public abstract static class MatchAll
+      extends PTransform<PCollection<String>, 
PCollection<MatchResult.Metadata>> {
+    abstract MatchConfiguration getConfiguration();
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setConfiguration(MatchConfiguration configuration);
+      abstract MatchAll build();
+    }
+
+    /** Like {@link Match#withConfiguration}. */
+    public MatchAll withConfiguration(MatchConfiguration configuration) {
+      return toBuilder().setConfiguration(configuration).build();
+    }
+
+    /** Like {@link Match#withEmptyMatchTreatment}. */
+    public MatchAll withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+      return 
withConfiguration(getConfiguration().withEmptyMatchTreatment(treatment));
+    }
+
+    /** Like {@link Match#continuously}. */
+    @Experimental(Experimental.Kind.SPLITTABLE_DO_FN)
+    public MatchAll continuously(
+        Duration pollInterval, TerminationCondition<String, ?> 
terminationCondition) {
+      return withConfiguration(getConfiguration().continuously(pollInterval, 
terminationCondition));
+    }
+
+    @Override
+    public PCollection<MatchResult.Metadata> expand(PCollection<String> input) 
{
+      if (getConfiguration().getWatchInterval() == null) {
+        return input.apply(
+            "Match filepatterns",
+            ParDo.of(new 
MatchFn(getConfiguration().getEmptyMatchTreatment())));
+      } else {
+        return input
+            .apply(
+                "Continuously match filepatterns",
+                Watch.growthOf(new MatchPollFn())
+                    .withPollInterval(getConfiguration().getWatchInterval())
+                    
.withTerminationPerInput(getConfiguration().getWatchTerminationCondition()))
+            .apply(Values.<MatchResult.Metadata>create());
+      }
+    }
+
+    private static class MatchFn extends DoFn<String, MatchResult.Metadata> {
+      private final EmptyMatchTreatment emptyMatchTreatment;
+
+      public MatchFn(EmptyMatchTreatment emptyMatchTreatment) {
+        this.emptyMatchTreatment = emptyMatchTreatment;
+      }
+
+      @ProcessElement
+      public void process(ProcessContext c) throws Exception {
+        String filepattern = c.element();
+        MatchResult match = FileSystems.match(filepattern, 
emptyMatchTreatment);
+        LOG.info("Matched {} files for pattern {}", match.metadata().size(), 
filepattern);
+        for (MatchResult.Metadata metadata : match.metadata()) {
+          c.output(metadata);
+        }
+      }
+    }
+
+    private static class MatchPollFn implements Watch.Growth.PollFn<String, 
MatchResult.Metadata> {
+      @Override
+      public Watch.Growth.PollResult<MatchResult.Metadata> apply(String input, 
Instant timestamp)
+          throws Exception {
+        return Watch.Growth.PollResult.incomplete(
+            Instant.now(), FileSystems.match(input, 
EmptyMatchTreatment.ALLOW).metadata());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/58e8a01b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Match.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Match.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Match.java
deleted file mode 100644
index bb44fac..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Match.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io;
-
-import com.google.auto.value.AutoValue;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
-import org.apache.beam.sdk.io.fs.MatchResult;
-import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Values;
-import org.apache.beam.sdk.transforms.Watch;
-import org.apache.beam.sdk.transforms.Watch.Growth.PollResult;
-import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
-import org.apache.beam.sdk.values.PCollection;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Matches each filepattern in a collection of filepatterns using {@link 
FileSystems#match}, and
- * produces a collection of matched resources (both files and directories) as 
{@link Metadata}.
- * Resources are not deduplicated between filepatterns, i.e. if the same 
resource matches multiple
- * filepatterns, it will be produced multiple times.
- *
- * <p>By default, this transform matches each filepattern once and produces a 
bounded {@link
- * PCollection}. To continuously watch each filepattern for new matches, use 
{@link
- * Filepatterns#continuously(Duration, TerminationCondition)} - this will 
produce an unbounded
- * {@link PCollection}.
- *
- * <p>By default, filepatterns matching no resources are treated according to 
{@link
- * EmptyMatchTreatment#ALLOW_IF_WILDCARD}. To configure this behavior, use 
{@link
- * Filepatterns#withEmptyMatchTreatment}.
- */
-public class Match {
-  private static final Logger LOG = LoggerFactory.getLogger(Match.class);
-
-  /** See {@link Match}. */
-  public static Filepatterns filepatterns() {
-    return new AutoValue_Match_Filepatterns.Builder()
-        .setEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD)
-        .build();
-  }
-
-  /** Implementation of {@link #filepatterns}. */
-  @AutoValue
-  public abstract static class Filepatterns
-      extends PTransform<PCollection<String>, PCollection<Metadata>> {
-    abstract EmptyMatchTreatment getEmptyMatchTreatment();
-
-    @Nullable
-    abstract Duration getWatchInterval();
-
-    @Nullable
-    abstract TerminationCondition<String, ?> getWatchTerminationCondition();
-
-    abstract Builder toBuilder();
-
-    @AutoValue.Builder
-    abstract static class Builder {
-      abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment treatment);
-
-      abstract Builder setWatchInterval(Duration watchInterval);
-
-      abstract Builder 
setWatchTerminationCondition(TerminationCondition<String, ?> condition);
-
-      abstract Filepatterns build();
-    }
-
-    /**
-     * Sets whether or not filepatterns matching no files are allowed. When 
using {@link
-     * #continuously}, they are always allowed, and this parameter is ignored.
-     */
-    public Filepatterns withEmptyMatchTreatment(EmptyMatchTreatment treatment) 
{
-      return toBuilder().setEmptyMatchTreatment(treatment).build();
-    }
-
-    /**
-     * Continuously watches for new resources matching the filepattern, 
repeatedly matching it at
-     * the given interval, until the given termination condition is reached. 
The returned {@link
-     * PCollection} is unbounded.
-     *
-     * <p>This works only in runners supporting {@link 
Experimental.Kind#SPLITTABLE_DO_FN}.
-     *
-     * @see TerminationCondition
-     */
-    @Experimental(Experimental.Kind.SPLITTABLE_DO_FN)
-    public Filepatterns continuously(
-        Duration pollInterval, TerminationCondition<String, ?> 
terminationCondition) {
-      return toBuilder()
-          .setWatchInterval(pollInterval)
-          .setWatchTerminationCondition(terminationCondition)
-          .build();
-    }
-
-    @Override
-    public PCollection<Metadata> expand(PCollection<String> input) {
-      if (getWatchInterval() == null) {
-        return input.apply("Match filepatterns", ParDo.of(new 
MatchFn(getEmptyMatchTreatment())));
-      } else {
-        return input
-            .apply(
-                "Continuously match filepatterns",
-                Watch.growthOf(new MatchPollFn())
-                    .withPollInterval(getWatchInterval())
-                    .withTerminationPerInput(getWatchTerminationCondition()))
-            .apply(Values.<Metadata>create());
-      }
-    }
-
-    private static class MatchFn extends DoFn<String, Metadata> {
-      private final EmptyMatchTreatment emptyMatchTreatment;
-
-      public MatchFn(EmptyMatchTreatment emptyMatchTreatment) {
-        this.emptyMatchTreatment = emptyMatchTreatment;
-      }
-
-      @ProcessElement
-      public void process(ProcessContext c) throws Exception {
-        String filepattern = c.element();
-        MatchResult match = FileSystems.match(filepattern, 
emptyMatchTreatment);
-        LOG.info("Matched {} files for pattern {}", match.metadata().size(), 
filepattern);
-        for (Metadata metadata : match.metadata()) {
-          c.output(metadata);
-        }
-      }
-    }
-
-    private static class MatchPollFn implements Watch.Growth.PollFn<String, 
Metadata> {
-      @Override
-      public PollResult<Metadata> apply(String input, Instant timestamp) 
throws Exception {
-        return PollResult.incomplete(
-            Instant.now(), FileSystems.match(input, 
EmptyMatchTreatment.ALLOW).metadata());
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/58e8a01b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
index 990f508..0cd7105 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
@@ -39,7 +39,7 @@ import org.apache.beam.sdk.values.PCollection;
  * input {@link PCollection} must not contain {@link ResourceId#isDirectory 
directories}.
  *
  * <p>To obtain the collection of {@link Metadata} from a filepattern, use 
{@link
- * Match#filepatterns()}.
+ * FileIO#match} or {@link FileIO#matchAll}.
  */
 class ReadAllViaFileBasedSource<T> extends PTransform<PCollection<Metadata>, 
PCollection<T>> {
   private final SerializableFunction<String, Boolean> isSplittable;

http://git-wip-us.apache.org/repos/asf/beam/blob/58e8a01b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index eba5ab5..57bfaa9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.io;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
-import static org.apache.beam.sdk.transforms.Watch.Growth.ignoreInput;
 
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
@@ -39,6 +38,7 @@ import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params;
 import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
+import org.apache.beam.sdk.io.FileIO.MatchConfiguration;
 import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.ValueProvider;
@@ -193,7 +193,7 @@ public class TextIO {
     return new AutoValue_TextIO_Read.Builder()
         .setCompression(Compression.AUTO)
         .setHintMatchesManyFiles(false)
-        .setEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW)
+        
.setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW))
         .build();
   }
 
@@ -214,7 +214,7 @@ public class TextIO {
         // but is not so large as to exhaust a typical runner's maximum amount 
of output per
         // ProcessElement call.
         .setDesiredBundleSizeBytes(64 * 1024 * 1024L)
-        .setEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD)
+        
.setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD))
         .build();
   }
 
@@ -259,32 +259,19 @@ public class TextIO {
   /** Implementation of {@link #read}. */
   @AutoValue
   public abstract static class Read extends PTransform<PBegin, 
PCollection<String>> {
-    @Nullable
-    abstract ValueProvider<String> getFilepattern();
-    abstract Compression getCompression();
-
-    @Nullable
-    abstract Duration getWatchForNewFilesInterval();
-
-    @Nullable
-    abstract TerminationCondition<?, ?> 
getWatchForNewFilesTerminationCondition();
-
+    @Nullable abstract ValueProvider<String> getFilepattern();
+    abstract MatchConfiguration getMatchConfiguration();
     abstract boolean getHintMatchesManyFiles();
-    abstract EmptyMatchTreatment getEmptyMatchTreatment();
-    @Nullable
-    abstract byte[] getDelimiter();
-
+    abstract Compression getCompression();
+    @Nullable abstract byte[] getDelimiter();
     abstract Builder toBuilder();
 
     @AutoValue.Builder
     abstract static class Builder {
       abstract Builder setFilepattern(ValueProvider<String> filepattern);
-      abstract Builder setCompression(Compression compression);
-      abstract Builder setWatchForNewFilesInterval(Duration 
watchForNewFilesInterval);
-      abstract Builder setWatchForNewFilesTerminationCondition(
-              TerminationCondition<?, ?> condition);
+      abstract Builder setMatchConfiguration(MatchConfiguration 
matchConfiguration);
       abstract Builder setHintMatchesManyFiles(boolean hintManyFiles);
-      abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment treatment);
+      abstract Builder setCompression(Compression compression);
       abstract Builder setDelimiter(byte[] delimiter);
 
       abstract Read build();
@@ -314,6 +301,11 @@ public class TextIO {
       return toBuilder().setFilepattern(filepattern).build();
     }
 
+    /** Sets the {@link MatchConfiguration}. */
+    public Read withMatchConfiguration(MatchConfiguration matchConfiguration) {
+      return toBuilder().setMatchConfiguration(matchConfiguration).build();
+    }
+
     /** @deprecated Use {@link #withCompression}. */
     @Deprecated
     public Read withCompressionType(TextIO.CompressionType compressionType) {
@@ -330,21 +322,15 @@ public class TextIO {
     }
 
     /**
-     * Continuously watches for new files matching the filepattern, polling it 
at the given
-     * interval, until the given termination condition is reached. The 
returned {@link PCollection}
-     * is unbounded.
+     * See {@link MatchConfiguration#continuously}.
      *
      * <p>This works only in runners supporting {@link Kind#SPLITTABLE_DO_FN}.
-     *
-     * @see TerminationCondition
      */
     @Experimental(Kind.SPLITTABLE_DO_FN)
     public Read watchForNewFiles(
-        Duration pollInterval, TerminationCondition<?, ?> 
terminationCondition) {
-      return toBuilder()
-          .setWatchForNewFilesInterval(pollInterval)
-          .setWatchForNewFilesTerminationCondition(terminationCondition)
-          .build();
+        Duration pollInterval, TerminationCondition<String, ?> 
terminationCondition) {
+      return withMatchConfiguration(
+          getMatchConfiguration().continuously(pollInterval, 
terminationCondition));
     }
 
     /**
@@ -360,12 +346,9 @@ public class TextIO {
       return toBuilder().setHintMatchesManyFiles(true).build();
     }
 
-    /**
-     * Configures whether or not a filepattern matching no files is allowed. 
When using {@link
-     * #watchForNewFiles}, it is always allowed and this parameter is ignored.
-     */
+    /** See {@link MatchConfiguration#withEmptyMatchTreatment}. */
     public Read withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
-      return toBuilder().setEmptyMatchTreatment(treatment).build();
+      return 
withMatchConfiguration(getMatchConfiguration().withEmptyMatchTreatment(treatment));
     }
 
     /**
@@ -390,29 +373,27 @@ public class TextIO {
     @Override
     public PCollection<String> expand(PBegin input) {
       checkNotNull(getFilepattern(), "need to set the filepattern of a 
TextIO.Read transform");
-      if (getWatchForNewFilesInterval() == null && !getHintMatchesManyFiles()) 
{
+      if (getMatchConfiguration().getWatchInterval() == null && 
!getHintMatchesManyFiles()) {
         return input.apply("Read", 
org.apache.beam.sdk.io.Read.from(getSource()));
       }
       // All other cases go through ReadAll.
-      ReadAll readAll =
-          readAll()
-              .withCompression(getCompression())
-              .withEmptyMatchTreatment(getEmptyMatchTreatment())
-              .withDelimiter(getDelimiter());
-      if (getWatchForNewFilesInterval() != null) {
-        TerminationCondition<String, ?> readAllCondition =
-            ignoreInput(getWatchForNewFilesTerminationCondition());
-        readAll = readAll.watchForNewFiles(getWatchForNewFilesInterval(), 
readAllCondition);
-      }
       return input
           .apply("Create filepattern", Create.ofProvider(getFilepattern(), 
StringUtf8Coder.of()))
-          .apply("Via ReadAll", readAll);
+          .apply(
+              "Via ReadAll",
+              readAll()
+                  .withCompression(getCompression())
+                  .withMatchConfiguration(getMatchConfiguration())
+                  .withDelimiter(getDelimiter()));
     }
 
     // Helper to create a source specific to the requested compression type.
     protected FileBasedSource<String> getSource() {
-      return CompressedSource
-          .from(new TextSource(getFilepattern(), getEmptyMatchTreatment(), 
getDelimiter()))
+      return CompressedSource.from(
+              new TextSource(
+                  getFilepattern(),
+                  getMatchConfiguration().getEmptyMatchTreatment(),
+                  getDelimiter()))
           .withCompression(getCompression());
     }
 
@@ -425,16 +406,10 @@ public class TextIO {
                   .withLabel("Compression Type"))
           .addIfNotNull(
               DisplayData.item("filePattern", 
getFilepattern()).withLabel("File Pattern"))
-          .add(
-              DisplayData.item("emptyMatchTreatment", 
getEmptyMatchTreatment().toString())
-                  .withLabel("Treatment of filepatterns that match no files"))
-          .addIfNotNull(
-              DisplayData.item("watchForNewFilesInterval", 
getWatchForNewFilesInterval())
-                  .withLabel("Interval to watch for new files"))
+          .include("matchConfiguration", getMatchConfiguration())
           .addIfNotNull(
               DisplayData.item("delimiter", Arrays.toString(getDelimiter()))
               .withLabel("Custom delimiter to split records"));
-
     }
   }
 
@@ -444,15 +419,8 @@ public class TextIO {
   @AutoValue
   public abstract static class ReadAll
       extends PTransform<PCollection<String>, PCollection<String>> {
+    abstract MatchConfiguration getMatchConfiguration();
     abstract Compression getCompression();
-
-    @Nullable
-    abstract Duration getWatchForNewFilesInterval();
-
-    @Nullable
-    abstract TerminationCondition<String, ?> 
getWatchForNewFilesTerminationCondition();
-
-    abstract EmptyMatchTreatment getEmptyMatchTreatment();
     abstract long getDesiredBundleSizeBytes();
     @Nullable
     abstract byte[] getDelimiter();
@@ -461,16 +429,18 @@ public class TextIO {
 
     @AutoValue.Builder
     abstract static class Builder {
+      abstract Builder setMatchConfiguration(MatchConfiguration 
matchConfiguration);
       abstract Builder setCompression(Compression compression);
-      abstract Builder setWatchForNewFilesInterval(Duration 
watchForNewFilesInterval);
-      abstract Builder setWatchForNewFilesTerminationCondition(
-          TerminationCondition<String, ?> condition);
-      abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment treatment);
       abstract Builder setDesiredBundleSizeBytes(long desiredBundleSizeBytes);
       abstract Builder setDelimiter(byte[] delimiter);
       abstract ReadAll build();
     }
 
+    /** Sets the {@link MatchConfiguration}. */
+    public ReadAll withMatchConfiguration(MatchConfiguration configuration) {
+      return toBuilder().setMatchConfiguration(configuration).build();
+    }
+
     /** @deprecated Use {@link #withCompression}. */
     @Deprecated
     public ReadAll withCompressionType(TextIO.CompressionType compressionType) 
{
@@ -488,17 +458,15 @@ public class TextIO {
 
     /** Same as {@link Read#withEmptyMatchTreatment}. */
     public ReadAll withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
-      return toBuilder().setEmptyMatchTreatment(treatment).build();
+      return 
withMatchConfiguration(getMatchConfiguration().withEmptyMatchTreatment(treatment));
     }
 
     /** Same as {@link Read#watchForNewFiles(Duration, TerminationCondition)}. 
*/
     @Experimental(Kind.SPLITTABLE_DO_FN)
     public ReadAll watchForNewFiles(
         Duration pollInterval, TerminationCondition<String, ?> 
terminationCondition) {
-      return toBuilder()
-          .setWatchForNewFilesInterval(pollInterval)
-          .setWatchForNewFilesTerminationCondition(terminationCondition)
-          .build();
+      return withMatchConfiguration(
+          getMatchConfiguration().continuously(pollInterval, 
terminationCondition));
     }
 
     @VisibleForTesting
@@ -512,22 +480,15 @@ public class TextIO {
 
     @Override
     public PCollection<String> expand(PCollection<String> input) {
-      Match.Filepatterns matchFilepatterns =
-          
Match.filepatterns().withEmptyMatchTreatment(getEmptyMatchTreatment());
-      if (getWatchForNewFilesInterval() != null) {
-        matchFilepatterns =
-            matchFilepatterns.continuously(
-                getWatchForNewFilesInterval(), 
getWatchForNewFilesTerminationCondition());
-      }
       return input
-          .apply(matchFilepatterns)
+          .apply(FileIO.matchAll().withConfiguration(getMatchConfiguration()))
           .apply(
               "Read all via FileBasedSource",
               new ReadAllViaFileBasedSource<>(
                   new IsSplittableFn(getCompression()),
                   getDesiredBundleSizeBytes(),
-                  new CreateTextSourceFn(getCompression(), 
getEmptyMatchTreatment(),
-                      getDelimiter()))).setCoder(StringUtf8Coder.of());
+                  new CreateTextSourceFn(getCompression(), getDelimiter())))
+          .setCoder(StringUtf8Coder.of());
     }
 
     @Override
@@ -536,30 +497,30 @@ public class TextIO {
 
       builder
           .add(
-          DisplayData.item("compressionType", getCompression().toString())
-              .withLabel("Compression Type"))
+              DisplayData.item("compressionType", getCompression().toString())
+                  .withLabel("Compression Type"))
           .addIfNotNull(
-          DisplayData.item("delimiter", Arrays.toString(getDelimiter()))
-              .withLabel("Custom delimiter to split records"));
+              DisplayData.item("delimiter", Arrays.toString(getDelimiter()))
+                  .withLabel("Custom delimiter to split records"))
+          .include("matchConfiguration", getMatchConfiguration());
     }
 
     private static class CreateTextSourceFn
         implements SerializableFunction<String, FileBasedSource<String>> {
       private final Compression compression;
-      private final EmptyMatchTreatment emptyMatchTreatment;
       private byte[] delimiter;
 
       private CreateTextSourceFn(
-          Compression compression, EmptyMatchTreatment emptyMatchTreatment, 
byte[] delimiter) {
+          Compression compression, byte[] delimiter) {
         this.compression = compression;
-        this.emptyMatchTreatment = emptyMatchTreatment;
         this.delimiter = delimiter;
       }
 
       @Override
       public FileBasedSource<String> apply(String input) {
         return CompressedSource.from(
-                new TextSource(StaticValueProvider.of(input), 
emptyMatchTreatment, delimiter))
+                new TextSource(
+                    StaticValueProvider.of(input), 
EmptyMatchTreatment.DISALLOW, delimiter))
             .withCompression(compression);
       }
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/58e8a01b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 8c68f57..695e196 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -335,7 +335,7 @@ public class AvroIOTest {
                     .from(tmpFolder.getRoot().getAbsolutePath() + "/first*")
                     .watchForNewFiles(
                         Duration.millis(100),
-                        
Watch.Growth.<Void>afterTimeSinceNewOutput(Duration.standardSeconds(3)))))
+                        
Watch.Growth.<String>afterTimeSinceNewOutput(Duration.standardSeconds(3)))))
         .containsInAnyOrder(firstValues);
     PAssert.that(
             readPipeline.apply(
@@ -344,7 +344,7 @@ public class AvroIOTest {
                     .from(tmpFolder.getRoot().getAbsolutePath() + "/first*")
                     .watchForNewFiles(
                         Duration.millis(100),
-                        
Watch.Growth.<Void>afterTimeSinceNewOutput(Duration.standardSeconds(3)))))
+                        
Watch.Growth.<String>afterTimeSinceNewOutput(Duration.standardSeconds(3)))))
         .containsInAnyOrder(firstValues);
 
     PCollection<String> paths =

http://git-wip-us.apache.org/repos/asf/beam/blob/58e8a01b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
index e55a820..e708b46 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
@@ -26,7 +26,6 @@ import static org.apache.beam.sdk.io.Compression.DEFLATE;
 import static org.apache.beam.sdk.io.Compression.GZIP;
 import static org.apache.beam.sdk.io.Compression.UNCOMPRESSED;
 import static org.apache.beam.sdk.io.Compression.ZIP;
-import static 
org.apache.beam.sdk.transforms.Watch.Growth.afterTimeSinceNewOutput;
 import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue;
 import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -78,6 +77,7 @@ import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.UsesSplittableParDo;
 import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Watch;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
 import org.apache.beam.sdk.util.CoderUtils;
@@ -951,7 +951,8 @@ public class TextIOReadTest {
                 // Make sure that compression type propagates into readAll()
                 .withCompression(ZIP)
                 .watchForNewFiles(
-                    Duration.millis(100), 
afterTimeSinceNewOutput(Duration.standardSeconds(3))));
+                    Duration.millis(100),
+                    
Watch.Growth.<String>afterTimeSinceNewOutput(Duration.standardSeconds(3))));
 
     Thread writer =
         new Thread() {

Reply via email to