Repository: beam
Updated Branches:
  refs/heads/master 80b9cf9c2 -> c3bcd4b42


Introduces XmlIO.readFiles


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/abda38dc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/abda38dc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/abda38dc

Branch: refs/heads/master
Commit: abda38dc70cdaf107b96e3c3f4322160fe9fa8f7
Parents: 513d26c
Author: Eugene Kirpichov <kirpic...@google.com>
Authored: Thu Aug 31 17:21:20 2017 -0700
Committer: Eugene Kirpichov <ekirpic...@gmail.com>
Committed: Sun Sep 3 16:32:25 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/ReadAllViaFileBasedSource.java  |  22 +-
 .../java/org/apache/beam/sdk/io/xml/XmlIO.java  | 280 +++++++++++++------
 .../org/apache/beam/sdk/io/xml/XmlSource.java   |  63 ++---
 .../apache/beam/sdk/io/xml/XmlSourceTest.java   |  74 ++---
 4 files changed, 253 insertions(+), 186 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/abda38dc/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
index 03b9b55..03cdbb1 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io;
 
 import java.io.IOException;
 import java.util.concurrent.ThreadLocalRandom;
+import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.FileIO.ReadableFile;
 import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
@@ -34,21 +35,23 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 
 /**
- * Reads each file in the input {@link PCollection} of {@link Metadata} using 
given parameters for
- * splitting files into offset ranges and for creating a {@link 
FileBasedSource} for a file. The
+ * Reads each file in the input {@link PCollection} of {@link ReadableFile} 
using given parameters
+ * for splitting files into offset ranges and for creating a {@link 
FileBasedSource} for a file. The
  * input {@link PCollection} must not contain {@link ResourceId#isDirectory 
directories}.
  *
- * <p>To obtain the collection of {@link Metadata} from a filepattern, use 
{@link FileIO#match} or
- * {@link FileIO#matchAll}.
+ * <p>To obtain the collection of {@link ReadableFile} from a filepattern, use 
{@link
+ * FileIO#readMatches()}.
  */
-class ReadAllViaFileBasedSource<T> extends 
PTransform<PCollection<ReadableFile>, PCollection<T>> {
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public class ReadAllViaFileBasedSource<T>
+    extends PTransform<PCollection<ReadableFile>, PCollection<T>> {
   private final long desiredBundleSizeBytes;
-  private final SerializableFunction<String, FileBasedSource<T>> createSource;
+  private final SerializableFunction<String, ? extends FileBasedSource<T>> 
createSource;
   private final Coder<T> coder;
 
   public ReadAllViaFileBasedSource(
       long desiredBundleSizeBytes,
-      SerializableFunction<String, FileBasedSource<T>> createSource,
+      SerializableFunction<String, ? extends FileBasedSource<T>> createSource,
       Coder<T> coder) {
     this.desiredBundleSizeBytes = desiredBundleSizeBytes;
     this.createSource = createSource;
@@ -111,9 +114,10 @@ class ReadAllViaFileBasedSource<T> extends 
PTransform<PCollection<ReadableFile>,
   }
 
   private static class ReadFileRangesFn<T> extends DoFn<KV<ReadableFile, 
OffsetRange>, T> {
-    private final SerializableFunction<String, FileBasedSource<T>> 
createSource;
+    private final SerializableFunction<String, ? extends FileBasedSource<T>> 
createSource;
 
-    private ReadFileRangesFn(SerializableFunction<String, FileBasedSource<T>> 
createSource) {
+    private ReadFileRangesFn(
+        SerializableFunction<String, ? extends FileBasedSource<T>> 
createSource) {
       this.createSource = createSource;
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/abda38dc/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java 
b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
index 98559c2..749da51 100644
--- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
+++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
@@ -21,23 +21,28 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
+import java.io.Serializable;
 import java.nio.charset.Charset;
 import javax.annotation.Nullable;
 import javax.xml.bind.JAXBContext;
 import javax.xml.bind.JAXBException;
 import javax.xml.bind.ValidationEventHandler;
-import org.apache.beam.sdk.PipelineRunner;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.CompressedSource;
 import org.apache.beam.sdk.io.Compression;
 import org.apache.beam.sdk.io.FileBasedSink;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.FileIO.ReadableFile;
 import org.apache.beam.sdk.io.OffsetBasedSource;
+import org.apache.beam.sdk.io.ReadAllViaFileBasedSource;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
@@ -46,10 +51,9 @@ import org.apache.beam.sdk.values.PDone;
 public class XmlIO {
   // CHECKSTYLE.OFF: JavadocStyle
   /**
-   * Reads XML files. This source reads one or more XML files and creates a 
{@link PCollection} of a
-   * given type. Please note the example given below.
+   * Reads XML files as a {@link PCollection} of a given type mapped via JAXB.
    *
-   * <p>The XML file must be of the following form, where {@code root} and 
{@code record} are XML
+   * <p>The XML files must be of the following form, where {@code root} and 
{@code record} are XML
    * element names that are defined by the user:
    *
    * <pre>{@code
@@ -74,7 +78,7 @@ public class XmlIO {
    * Reading the source will generate a {@code PCollection} of the given JAXB 
annotated Java type.
    * Optionally users may provide a minimum size of a bundle that should be 
created for the source.
    *
-   * <p>The following example shows how to use this method in a Beam pipeline:
+   * <p>Example:
    *
    * <pre>{@code
    * PCollection<String> output = p.apply(XmlIO.<Record>read()
@@ -84,38 +88,48 @@ public class XmlIO {
    *     .withRecordClass(Record.class));
    * }</pre>
    *
-   * <p>By default, UTF-8 charset is used. If your file is using a different 
charset, you have to
-   * specify the following:
-   *
-   * <pre>{@code
-   * PCollection<String> output = p.apply(XmlIO.<Record>read()
-   *      .from(file.toPath().toString())
-   *      .withRooElement("root")
-   *      .withRecordElement("record")
-   *      .withRecordClass(Record.class)
-   *      .withCharset(StandardCharsets.ISO_8859_1));
-   * }</pre>
-   *
-   * <p>{@link java.nio.charset.StandardCharsets} provides static references 
to common charsets.
+   * <p>By default, UTF-8 charset is used. To specify a different charset, use 
{@link
+   * Read#withCharset}.
    *
    * <p>Currently, only XML files that use single-byte characters are 
supported. Using a file that
    * contains multi-byte characters may result in data loss or duplication.
    *
-   * <h3>Permissions</h3>
-   *
-   * <p>Permission requirements depend on the {@link PipelineRunner
-   * PipelineRunner} that is used to execute the Beam pipeline. Please refer 
to the documentation of
-   * corresponding {@link PipelineRunner PipelineRunners} for more details.
-   *
    * @param <T> Type of the objects that represent the records of the XML 
file. The {@code
    *     PCollection} generated by this source will be of this type.
    */
   // CHECKSTYLE.ON: JavadocStyle
   public static <T> Read<T> read() {
     return new AutoValue_XmlIO_Read.Builder<T>()
-        .setMinBundleSize(Read.DEFAULT_MIN_BUNDLE_SIZE)
+        .setConfiguration(
+            new 
AutoValue_XmlIO_MappingConfiguration.Builder<T>().setCharset("UTF-8").build())
+        .setMinBundleSize(1L)
         .setCompression(Compression.AUTO)
-        .setCharset("UTF-8")
+        .build();
+  }
+
+  /**
+   * Like {@link #read}, but reads each file in a {@link PCollection} of 
{@link ReadableFile}, which
+   * allows more flexible usage via different configuration options of {@link 
FileIO#match} and
+   * {@link FileIO#readMatches} that are not explicitly provided for {@link 
#read}.
+   *
+   * <p>For example:
+   *
+   * <pre>{@code
+   * PCollection<ReadableFile> files = p
+   *     
.apply(FileIO.match().filepattern(options.getInputFilepatternProvider()).continuously(
+   *       Duration.standardSeconds(30), 
afterTimeSinceNewOutput(Duration.standardMinutes(5))))
+   *     .apply(FileIO.readMatches().withCompression(GZIP));
+   *
+   * PCollection<String> output = files.apply(XmlIO.<Record>readFiles()
+   *     .withRootElement("root")
+   *     .withRecordElement("record")
+   *     .withRecordClass(Record.class));
+   * }</pre>
+   */
+  public static <T> ReadFiles<T> readFiles() {
+    return new AutoValue_XmlIO_ReadFiles.Builder<T>()
+        .setConfiguration(
+            new 
AutoValue_XmlIO_MappingConfiguration.Builder<T>().setCharset("UTF-8").build())
         .build();
   }
 
@@ -231,52 +245,92 @@ public class XmlIO {
     return new AutoValue_XmlIO_Write.Builder<T>().setCharset("UTF-8").build();
   }
 
-  /** Implementation of {@link #read}. */
   @AutoValue
-  public abstract static class Read<T> extends PTransform<PBegin, 
PCollection<T>> {
-    private static final int DEFAULT_MIN_BUNDLE_SIZE = 8 * 1024;
-
-    @Nullable
-    abstract String getFileOrPatternSpec();
-
-    @Nullable
-    abstract String getRootElement();
+  abstract static class MappingConfiguration<T> implements HasDisplayData, 
Serializable {
+    @Nullable abstract String getRootElement();
+    @Nullable abstract String getRecordElement();
+    @Nullable abstract Class<T> getRecordClass();
+    @Nullable abstract String getCharset();
+    @Nullable abstract ValidationEventHandler getValidationEventHandler();
 
-    @Nullable
-    abstract String getRecordElement();
+    abstract Builder<T> toBuilder();
 
-    @Nullable
-    abstract Class<T> getRecordClass();
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setRootElement(String rootElement);
+      abstract Builder<T> setRecordElement(String recordElement);
+      abstract Builder<T> setRecordClass(Class<T> recordClass);
+      abstract Builder<T> setCharset(String charset);
+      abstract Builder<T> setValidationEventHandler(ValidationEventHandler 
validationEventHandler);
 
-    abstract Compression getCompression();
+      abstract MappingConfiguration<T> build();
+    }
 
-    abstract long getMinBundleSize();
+    private MappingConfiguration<T> withRootElement(String rootElement) {
+      return toBuilder().setRootElement(rootElement).build();
+    }
 
-    @Nullable
-    abstract String getCharset();
+    private MappingConfiguration<T> withRecordElement(String recordElement) {
+      return toBuilder().setRecordElement(recordElement).build();
+    }
 
-    abstract Builder<T> toBuilder();
+    private MappingConfiguration<T> withRecordClass(Class<T> recordClass) {
+      return toBuilder().setRecordClass(recordClass).build();
+    }
 
-    @Nullable
-    abstract ValidationEventHandler getValidationEventHandler();
+    private MappingConfiguration<T> withCharset(Charset charset) {
+      return toBuilder().setCharset(charset.name()).build();
+    }
 
-    @AutoValue.Builder
-    abstract static class Builder<T> {
-      abstract Builder<T> setFileOrPatternSpec(String fileOrPatternSpec);
+    private MappingConfiguration<T> withValidationEventHandler(
+        ValidationEventHandler validationEventHandler) {
+      return 
toBuilder().setValidationEventHandler(validationEventHandler).build();
+    }
 
-      abstract Builder<T> setRootElement(String rootElement);
+    private void validate() {
+      checkNotNull(
+          getRootElement(),
+          "rootElement is null. Use builder method withRootElement() to set 
this.");
+      checkNotNull(
+          getRecordElement(),
+          "recordElement is null. Use builder method withRecordElement() to 
set this.");
+      checkNotNull(
+          getRecordClass(),
+          "recordClass is null. Use builder method withRecordClass() to set 
this.");
+      checkNotNull(
+          getCharset(),
+          "charset is null. Use builder method withCharset() to set this.");
+    }
 
-      abstract Builder<T> setRecordElement(String recordElement);
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      builder
+          .addIfNotNull(
+              DisplayData.item("rootElement", getRootElement()).withLabel("XML 
Root Element"))
+          .addIfNotNull(
+              DisplayData.item("recordElement", 
getRecordElement()).withLabel("XML Record Element"))
+          .addIfNotNull(
+              DisplayData.item("recordClass", getRecordClass()).withLabel("XML 
Record Class"))
+          .addIfNotNull(DisplayData.item("charset", 
getCharset()).withLabel("Charset"));
+    }
+  }
 
-      abstract Builder<T> setRecordClass(Class<T> recordClass);
+  /** Implementation of {@link #read}. */
+  @AutoValue
+  public abstract static class Read<T> extends PTransform<PBegin, 
PCollection<T>> {
+    abstract MappingConfiguration<T> getConfiguration();
+    @Nullable abstract String getFileOrPatternSpec();
+    abstract Compression getCompression();
+    abstract long getMinBundleSize();
 
-      abstract Builder<T> setMinBundleSize(long minBundleSize);
+    abstract Builder<T> toBuilder();
 
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setConfiguration(MappingConfiguration<T> 
configuration);
+      abstract Builder<T> setFileOrPatternSpec(String fileOrPatternSpec);
       abstract Builder<T> setCompression(Compression compression);
-
-      abstract Builder<T> setCharset(String charset);
-
-      abstract Builder<T> setValidationEventHandler(ValidationEventHandler 
validationEventHandler);
+      abstract Builder<T> setMinBundleSize(long minBundleSize);
 
       abstract Read<T> build();
     }
@@ -322,13 +376,17 @@ public class XmlIO {
       return toBuilder().setFileOrPatternSpec(fileOrPatternSpec).build();
     }
 
+    private Read<T> withConfiguration(MappingConfiguration<T> configuration) {
+      return toBuilder().setConfiguration(configuration).build();
+    }
+
     /**
      * Sets name of the root element of the XML document. This will be used to 
create a valid
      * starting root element when initiating a bundle of records created from 
an XML document. This
      * is a required parameter.
      */
     public Read<T> withRootElement(String rootElement) {
-      return toBuilder().setRootElement(rootElement).build();
+      return 
withConfiguration(getConfiguration().withRootElement(rootElement));
     }
 
     /**
@@ -336,7 +394,7 @@ public class XmlIO {
      * the first record of a bundle created from the XML document. This is a 
required parameter.
      */
     public Read<T> withRecordElement(String recordElement) {
-      return toBuilder().setRecordElement(recordElement).build();
+      return 
withConfiguration(getConfiguration().withRecordElement(recordElement));
     }
 
     /**
@@ -345,7 +403,7 @@ public class XmlIO {
      * parameter.
      */
     public Read<T> withRecordClass(Class<T> recordClass) {
-      return toBuilder().setRecordClass(recordClass).build();
+      return 
withConfiguration(getConfiguration().withRecordClass(recordClass));
     }
 
     /**
@@ -372,7 +430,7 @@ public class XmlIO {
      * Sets the XML file charset.
      */
     public Read<T> withCharset(Charset charset) {
-      return toBuilder().setCharset(charset.name()).build();
+      return withConfiguration(getConfiguration().withCharset(charset));
     }
 
     /**
@@ -380,23 +438,8 @@ public class XmlIO {
      * parameter will cause the JAXB unmarshaller event handler to be 
unspecified.
      */
     public Read<T> withValidationEventHandler(ValidationEventHandler 
validationEventHandler) {
-      return 
toBuilder().setValidationEventHandler(validationEventHandler).build();
-    }
-
-    @Override
-    public void validate(PipelineOptions options) {
-      checkNotNull(
-          getRootElement(),
-          "rootElement is null. Use builder method withRootElement() to set 
this.");
-      checkNotNull(
-          getRecordElement(),
-          "recordElement is null. Use builder method withRecordElement() to 
set this.");
-      checkNotNull(
-          getRecordClass(),
-          "recordClass is null. Use builder method withRecordClass() to set 
this.");
-      checkNotNull(
-          getCharset(),
-          "charset is null. Use builder method withCharset() to set this.");
+      return withConfiguration(
+          
getConfiguration().withValidationEventHandler(validationEventHandler));
     }
 
     @Override
@@ -407,27 +450,90 @@ public class XmlIO {
                   .withLabel("Minimum Bundle Size"),
               1L)
           .add(DisplayData.item("filePattern", 
getFileOrPatternSpec()).withLabel("File Pattern"))
-          .addIfNotNull(
-              DisplayData.item("rootElement", getRootElement()).withLabel("XML 
Root Element"))
-          .addIfNotNull(
-              DisplayData.item("recordElement", 
getRecordElement()).withLabel("XML Record Element"))
-          .addIfNotNull(
-              DisplayData.item("recordClass", getRecordClass()).withLabel("XML 
Record Class"))
-          .addIfNotNull(
-              DisplayData.item("charset", getCharset()).withLabel("Charset"));
+          .include("configuration", getConfiguration());
     }
 
     @VisibleForTesting
     BoundedSource<T> createSource() {
-        return CompressedSource.from(new 
XmlSource<>(this)).withCompression(getCompression());
+      return CompressedSource.from(
+              new XmlSource<>(
+                  StaticValueProvider.of(getFileOrPatternSpec()), 
getConfiguration(), 1L))
+          .withCompression(getCompression());
     }
 
     @Override
     public PCollection<T> expand(PBegin input) {
+      getConfiguration().validate();
       return input.apply(org.apache.beam.sdk.io.Read.from(createSource()));
     }
   }
 
+  /** Implementation of {@link #readFiles}. */
+  @AutoValue
+  public abstract static class ReadFiles<T>
+      extends PTransform<PCollection<ReadableFile>, PCollection<T>> {
+    abstract MappingConfiguration<T> getConfiguration();
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setConfiguration(MappingConfiguration<T> 
configuration);
+      abstract ReadFiles<T> build();
+    }
+
+    private ReadFiles<T> withConfiguration(MappingConfiguration<T> 
configuration) {
+      return toBuilder().setConfiguration(configuration).build();
+    }
+
+    /** Like {@link Read#withRootElement}. */
+    public ReadFiles<T> withRootElement(String rootElement) {
+      return 
withConfiguration(getConfiguration().withRootElement(rootElement));
+    }
+
+    /** Like {@link Read#withRecordElement}. */
+    public ReadFiles<T> withRecordElement(String recordElement) {
+      return 
withConfiguration(getConfiguration().withRecordElement(recordElement));
+    }
+
+    /** Like {@link Read#withRecordClass}. */
+    public ReadFiles<T> withRecordClass(Class<T> recordClass) {
+      return 
withConfiguration(getConfiguration().withRecordClass(recordClass));
+    }
+
+    /** Like {@link Read#withCharset}. */
+    public ReadFiles<T> withCharset(Charset charset) {
+      return withConfiguration(getConfiguration().withCharset(charset));
+    }
+
+    /** Like {@link Read#withValidationEventHandler}. */
+    public ReadFiles<T> withValidationEventHandler(ValidationEventHandler 
validationEventHandler) {
+      return withConfiguration(
+          
getConfiguration().withValidationEventHandler(validationEventHandler));
+    }
+
+    @Override
+    public PCollection<T> expand(PCollection<ReadableFile> input) {
+      return input.apply(
+          new ReadAllViaFileBasedSource<T>(
+              64 * 1024L * 1024L,
+              new CreateSourceFn<>(getConfiguration()),
+              JAXBCoder.of(getConfiguration().getRecordClass())));
+    }
+  }
+
+  private static class CreateSourceFn<T> implements 
SerializableFunction<String, XmlSource<T>> {
+    private final MappingConfiguration<T> configuration;
+
+    public CreateSourceFn(MappingConfiguration<T> configuration) {
+      this.configuration = configuration;
+    }
+
+    @Override
+    public XmlSource<T> apply(String input) {
+      return new XmlSource<>(StaticValueProvider.of(input), configuration, 1L);
+    }
+  }
+
   /** Implementation of {@link #write}. */
   @AutoValue
   public abstract static class Write<T> extends PTransform<PCollection<T>, 
PDone> {

http://git-wip-us.apache.org/repos/asf/beam/blob/abda38dc/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSource.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSource.java 
b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSource.java
index b893d43..921cd7a 100644
--- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSource.java
+++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSource.java
@@ -42,8 +42,7 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.FileBasedSource;
 import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
-import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.options.ValueProvider;
 import org.codehaus.stax2.XMLInputFactory2;
 
 /** Implementation of {@link XmlIO#read}. */
@@ -51,21 +50,29 @@ public class XmlSource<T> extends FileBasedSource<T> {
 
   private static final String XML_VERSION = "1.1";
 
-  private final XmlIO.Read<T> spec;
+  private final XmlIO.MappingConfiguration<T> configuration;
 
-  XmlSource(XmlIO.Read<T> spec) {
-    super(StaticValueProvider.of(spec.getFileOrPatternSpec()), 
spec.getMinBundleSize());
-    this.spec = spec;
+  XmlSource(
+      ValueProvider<String> spec,
+      XmlIO.MappingConfiguration<T> configuration,
+      long minBundleSizeBytes) {
+    super(spec, minBundleSizeBytes);
+    this.configuration = configuration;
   }
 
-  private XmlSource(XmlIO.Read<T> spec, Metadata metadata, long startOffset, 
long endOffset) {
-    super(metadata, spec.getMinBundleSize(), startOffset, endOffset);
-    this.spec = spec;
+  private XmlSource(
+      XmlIO.MappingConfiguration<T> configuration,
+      long minBundleSizeBytes,
+      Metadata metadata,
+      long startOffset,
+      long endOffset) {
+    super(metadata, minBundleSizeBytes, startOffset, endOffset);
+    this.configuration = configuration;
   }
 
   @Override
   protected FileBasedSource<T> createForSubrangeOfFile(Metadata metadata, long 
start, long end) {
-    return new XmlSource<T>(spec.from(metadata.toString()), metadata, start, 
end);
+    return new XmlSource<T>(configuration, getMinBundleSize(), metadata, 
start, end);
   }
 
   @Override
@@ -74,19 +81,8 @@ public class XmlSource<T> extends FileBasedSource<T> {
   }
 
   @Override
-  public void validate() {
-    super.validate();
-    spec.validate(null);
-  }
-
-  @Override
-  public void populateDisplayData(DisplayData.Builder builder) {
-    spec.populateDisplayData(builder);
-  }
-
-  @Override
   public Coder<T> getOutputCoder() {
-    return JAXBCoder.of(spec.getRecordClass());
+    return JAXBCoder.of(configuration.getRecordClass());
   }
 
   /**
@@ -137,10 +133,12 @@ public class XmlSource<T> extends FileBasedSource<T> {
 
       // Set up a JAXB Unmarshaller that can be used to unmarshall record 
objects.
       try {
-        JAXBContext jaxbContext = 
JAXBContext.newInstance(getCurrentSource().spec.getRecordClass());
+        JAXBContext jaxbContext =
+            
JAXBContext.newInstance(getCurrentSource().configuration.getRecordClass());
         jaxbUnmarshaller = jaxbContext.createUnmarshaller();
-        if (getCurrentSource().spec.getValidationEventHandler() != null) {
-          
jaxbUnmarshaller.setEventHandler(getCurrentSource().spec.getValidationEventHandler());
+        if (getCurrentSource().configuration.getValidationEventHandler() != 
null) {
+          jaxbUnmarshaller.setEventHandler(
+              getCurrentSource().configuration.getValidationEventHandler());
         }
       } catch (JAXBException e) {
         throw new RuntimeException(e);
@@ -179,10 +177,10 @@ public class XmlSource<T> extends FileBasedSource<T> {
       byte[] dummyStartDocumentBytes =
           (String.format(
                   "<?xml version=\"%s\" encoding=\""
-                      + getCurrentSource().spec.getCharset()
+                      + getCurrentSource().configuration.getCharset()
                       + "\"?><%s>",
-                  XML_VERSION, getCurrentSource().spec.getRootElement()))
-              .getBytes(getCurrentSource().spec.getCharset());
+                  XML_VERSION, 
getCurrentSource().configuration.getRootElement()))
+              .getBytes(getCurrentSource().configuration.getCharset());
       preambleByteBuffer.write(dummyStartDocumentBytes);
       // Gets the byte offset (in the input file) of the first record in 
ReadableByteChannel. This
       // method returns the offset and stores any bytes that should be used 
when creating the XML
@@ -230,7 +228,8 @@ public class XmlSource<T> extends FileBasedSource<T> {
 
       ByteBuffer buf = ByteBuffer.allocate(BUF_SIZE);
       byte[] recordStartBytes =
-          ("<" + 
getCurrentSource().spec.getRecordElement()).getBytes(StandardCharsets.UTF_8);
+          ("<" + getCurrentSource().configuration.getRecordElement())
+              .getBytes(StandardCharsets.UTF_8);
 
       outer: while (channel.read(buf) > 0) {
         buf.flip();
@@ -334,14 +333,14 @@ public class XmlSource<T> extends FileBasedSource<T> {
         this.parser = xmlInputFactory.createXMLStreamReader(
             new SequenceInputStream(
                 new ByteArrayInputStream(lookAhead), 
Channels.newInputStream(channel)),
-            getCurrentSource().spec.getCharset());
+            getCurrentSource().configuration.getCharset());
 
         // Current offset should be the offset before reading the record 
element.
         while (true) {
           int event = parser.next();
           if (event == XMLStreamConstants.START_ELEMENT) {
             String localName = parser.getLocalName();
-            if (localName.equals(getCurrentSource().spec.getRecordElement())) {
+            if 
(localName.equals(getCurrentSource().configuration.getRecordElement())) {
               break;
             }
           }
@@ -369,7 +368,7 @@ public class XmlSource<T> extends FileBasedSource<T> {
           }
         }
         JAXBElement<T> jb =
-            jaxbUnmarshaller.unmarshal(parser, 
getCurrentSource().spec.getRecordClass());
+            jaxbUnmarshaller.unmarshal(parser, 
getCurrentSource().configuration.getRecordClass());
         currentRecord = jb.getValue();
         return true;
       } catch (JAXBException | XMLStreamException e) {

http://git-wip-us.apache.org/repos/asf/beam/blob/abda38dc/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSourceTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSourceTest.java 
b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSourceTest.java
index abddcf9..a6adac6 100644
--- 
a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSourceTest.java
+++ 
b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSourceTest.java
@@ -41,6 +41,7 @@ import javax.xml.bind.ValidationEventHandler;
 import javax.xml.bind.annotation.XmlAttribute;
 import javax.xml.bind.annotation.XmlRootElement;
 import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.FileIO;
 import org.apache.beam.sdk.io.Source.Reader;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -459,60 +460,6 @@ public class XmlSourceTest {
   }
 
   @Test
-  public void testReadXMLNoRootElement() throws IOException {
-    File file = tempFolder.newFile("trainXMLSmall");
-    Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8));
-
-    BoundedSource<Train> source =
-        XmlIO.<Train>read()
-            .from(file.toPath().toString())
-            .withRecordElement("train")
-            .withRecordClass(Train.class)
-            .createSource();
-
-    exception.expect(NullPointerException.class);
-    exception.expectMessage(
-        "rootElement is null. Use builder method withRootElement() to set 
this.");
-    readEverythingFromReader(source.createReader(null));
-  }
-
-  @Test
-  public void testReadXMLNoRecordElement() throws IOException {
-    File file = tempFolder.newFile("trainXMLSmall");
-    Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8));
-
-    BoundedSource<Train> source =
-        XmlIO.<Train>read()
-            .from(file.toPath().toString())
-            .withRootElement("trains")
-            .withRecordClass(Train.class)
-            .createSource();
-
-    exception.expect(NullPointerException.class);
-    exception.expectMessage(
-        "recordElement is null. Use builder method withRecordElement() to set 
this.");
-    readEverythingFromReader(source.createReader(null));
-  }
-
-  @Test
-  public void testReadXMLNoRecordClass() throws IOException {
-    File file = tempFolder.newFile("trainXMLSmall");
-    Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8));
-
-    BoundedSource<Train> source =
-        XmlIO.<Train>read()
-            .from(file.toPath().toString())
-            .withRootElement("trains")
-            .withRecordElement("train")
-            .createSource();
-
-    exception.expect(NullPointerException.class);
-    exception.expectMessage(
-        "recordClass is null. Use builder method withRecordClass() to set 
this.");
-    readEverythingFromReader(source.createReader(null));
-  }
-
-  @Test
   public void testReadXMLIncorrectRootElement() throws IOException {
     File file = tempFolder.newFile("trainXMLSmall");
     Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8));
@@ -938,7 +885,7 @@ public class XmlSourceTest {
 
   @Test
   @Category(NeedsRunner.class)
-  public void testReadXMLFilePattern() throws IOException {
+  public void testReadXMLFilePatternUsingReadAndReadFiles() throws IOException 
{
     List<Train> trains1 = generateRandomTrainList(20);
     File file = createRandomTrainXML("temp1.xml", trains1);
     List<Train> trains2 = generateRandomTrainList(10);
@@ -948,9 +895,9 @@ public class XmlSourceTest {
     generateRandomTrainList(8);
     createRandomTrainXML("otherfile.xml", trains1);
 
-    PCollection<Train> output =
+    PCollection<Train> read =
         p.apply(
-            "ReadFileData",
+            "Read",
             XmlIO.<Train>read()
                 .from(file.getParent() + "/" + "temp*.xml")
                 .withRootElement("trains")
@@ -958,12 +905,23 @@ public class XmlSourceTest {
                 .withRecordClass(Train.class)
                 .withMinBundleSize(1024));
 
+    PCollection<Train> readFiles =
+        p.apply(FileIO.match().filepattern(file.getParent() + "/" + 
"temp*.xml"))
+            .apply(FileIO.readMatches())
+            .apply(
+                "ReadFiles",
+                XmlIO.<Train>readFiles()
+                    .withRootElement("trains")
+                    .withRecordElement("train")
+                    .withRecordClass(Train.class));
+
     List<Train> expectedResults = new ArrayList<>();
     expectedResults.addAll(trains1);
     expectedResults.addAll(trains2);
     expectedResults.addAll(trains3);
 
-    PAssert.that(output).containsInAnyOrder(expectedResults);
+    PAssert.that(read).containsInAnyOrder(expectedResults);
+    PAssert.that(readFiles).containsInAnyOrder(expectedResults);
     p.run();
   }
 

Reply via email to