Repository: beam
Updated Branches:
  refs/heads/master 022d5b657 -> 62f041e56


http://git-wip-us.apache.org/repos/asf/beam/blob/393a90c7/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/package-info.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/package-info.java 
b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/package-info.java
new file mode 100644
index 0000000..9c5089a
--- /dev/null
+++ 
b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Transforms for reading and writing Xml files.
+ */
+package org.apache.beam.sdk.io.xml;

http://git-wip-us.apache.org/repos/asf/beam/blob/393a90c7/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java 
b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java
new file mode 100644
index 0000000..5f1330d
--- /dev/null
+++ 
b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.xml;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.xml.bind.annotation.XmlRootElement;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StandardCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link JAXBCoder}. */
+@RunWith(JUnit4.class)
+public class JAXBCoderTest {
+
+  @XmlRootElement
+  static class TestType {
+    private String testString = null;
+    private int testInt;
+
+    public TestType() {}
+
+    public TestType(String testString, int testInt) {
+      this.testString = testString;
+      this.testInt = testInt;
+    }
+
+    public String getTestString() {
+      return testString;
+    }
+
+    public void setTestString(String testString) {
+      this.testString = testString;
+    }
+
+    public int getTestInt() {
+      return testInt;
+    }
+
+    public void setTestInt(int testInt) {
+      this.testInt = testInt;
+    }
+
+    @Override
+    public int hashCode() {
+      int hashCode = 1;
+      hashCode = 31 * hashCode + (testString == null ? 0 : 
testString.hashCode());
+      hashCode = 31 * hashCode + testInt;
+      return hashCode;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (!(obj instanceof TestType)) {
+        return false;
+      }
+
+      TestType other = (TestType) obj;
+      return (testString == null || testString.equals(other.testString))
+          && (testInt == other.testInt);
+    }
+  }
+
+  @Test
+  public void testEncodeDecodeOuter() throws Exception {
+    JAXBCoder<TestType> coder = JAXBCoder.of(TestType.class);
+
+    byte[] encoded = CoderUtils.encodeToByteArray(coder, new TestType("abc", 
9999));
+    assertEquals(new TestType("abc", 9999), 
CoderUtils.decodeFromByteArray(coder, encoded));
+  }
+
+  @Test
+  public void testEncodeDecodeAfterClone() throws Exception {
+    JAXBCoder<TestType> coder = 
SerializableUtils.clone(JAXBCoder.of(TestType.class));
+
+    byte[] encoded = CoderUtils.encodeToByteArray(coder, new TestType("abc", 
9999));
+    assertEquals(new TestType("abc", 9999), 
CoderUtils.decodeFromByteArray(coder, encoded));
+  }
+
+  @Test
+  public void testEncodeDecodeNested() throws Exception {
+    JAXBCoder<TestType> jaxbCoder = JAXBCoder.of(TestType.class);
+    TestCoder nesting = new TestCoder(jaxbCoder);
+
+    byte[] encoded = CoderUtils.encodeToByteArray(nesting, new TestType("abc", 
9999));
+    assertEquals(
+        new TestType("abc", 9999), CoderUtils.decodeFromByteArray(nesting, 
encoded));
+  }
+
+  @Test
+  public void testEncodeDecodeMultithreaded() throws Throwable {
+    final JAXBCoder<TestType> coder = JAXBCoder.of(TestType.class);
+    int numThreads = 100;
+
+    final CountDownLatch ready = new CountDownLatch(numThreads);
+    final CountDownLatch start = new CountDownLatch(1);
+    final CountDownLatch done = new CountDownLatch(numThreads);
+
+    final AtomicReference<Throwable> thrown = new AtomicReference<>();
+
+    Executor executor = Executors.newCachedThreadPool();
+    for (int i = 0; i < numThreads; i++) {
+      final TestType elem = new TestType("abc", i);
+      final int index = i;
+      executor.execute(
+          new Runnable() {
+            @Override
+            public void run() {
+              ready.countDown();
+              try {
+                start.await();
+              } catch (InterruptedException e) {
+              }
+
+              try {
+                byte[] encoded = CoderUtils.encodeToByteArray(coder, elem);
+                assertEquals(
+                    new TestType("abc", index), 
CoderUtils.decodeFromByteArray(coder, encoded));
+              } catch (Throwable e) {
+                thrown.compareAndSet(null, e);
+              }
+              done.countDown();
+            }
+          });
+    }
+    ready.await();
+    start.countDown();
+
+    done.await();
+    Throwable actuallyThrown = thrown.get();
+    if (actuallyThrown != null) {
+      throw actuallyThrown;
+    }
+  }
+
+  /**
+   * A coder that surrounds the value with two values, to demonstrate nesting.
+   */
+  private static class TestCoder extends StandardCoder<TestType> {
+    private final JAXBCoder<TestType> jaxbCoder;
+    public TestCoder(JAXBCoder<TestType> jaxbCoder) {
+      this.jaxbCoder = jaxbCoder;
+    }
+
+    @Override
+    public void encode(TestType value, OutputStream outStream, Context context)
+        throws CoderException, IOException {
+      Context nestedContext = context.nested();
+      VarIntCoder.of().encode(3, outStream, nestedContext);
+      jaxbCoder.encode(value, outStream, nestedContext);
+      VarLongCoder.of().encode(22L, outStream, context);
+    }
+
+    @Override
+    public TestType decode(InputStream inStream, Context context)
+        throws CoderException, IOException {
+      Context nestedContext = context.nested();
+      VarIntCoder.of().decode(inStream, nestedContext);
+      TestType result = jaxbCoder.decode(inStream, nestedContext);
+      VarLongCoder.of().decode(inStream, context);
+      return result;
+    }
+
+    @Override
+    public List<? extends Coder<?>> getCoderArguments() {
+      return ImmutableList.of(jaxbCoder);
+    }
+
+    @Override
+    public void verifyDeterministic() throws NonDeterministicException {
+      jaxbCoder.verifyDeterministic();
+    }
+  }
+
+  @Test
+  public void testEncodable() throws Exception {
+    CoderProperties.coderSerializable(JAXBCoder.of(TestType.class));
+  }
+
+  @Test
+  public void testEncodingId() throws Exception {
+    Coder<TestType> coder = JAXBCoder.of(TestType.class);
+    CoderProperties.coderHasEncodingId(
+        coder, TestType.class.getName());
+  }
+
+  @Test
+  public void testEncodedTypeDescriptor() throws Exception {
+    assertThat(
+        JAXBCoder.of(TestType.class).getEncodedTypeDescriptor(),
+        equalTo(TypeDescriptor.of(TestType.class)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/393a90c7/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java 
b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java
new file mode 100644
index 0000000..a6e1b87
--- /dev/null
+++ b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.xml;
+
+import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import com.google.common.collect.Lists;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.XmlType;
+import org.apache.beam.sdk.io.xml.XmlSink.XmlWriteOperation;
+import org.apache.beam.sdk.io.xml.XmlSink.XmlWriter;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for XmlSink.
+ */
+@RunWith(JUnit4.class)
+public class XmlSinkTest {
+  @Rule
+  public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  private String testRootElement = "testElement";
+  private String testFilePrefix = "/path/to/testPrefix";
+
+  /**
+   * An XmlWriter correctly writes objects as Xml elements with an enclosing 
root element.
+   */
+  @Test
+  public void testXmlWriter() throws Exception {
+    PipelineOptions options = PipelineOptionsFactory.create();
+    XmlWriteOperation<Bird> writeOp =
+        XmlIO.<Bird>write()
+            .toFilenamePrefix(testFilePrefix)
+            .withRecordClass(Bird.class)
+            .withRootElement("birds")
+            .createSink()
+            .createWriteOperation(options);
+    XmlWriter<Bird> writer = writeOp.createWriter(options);
+
+    List<Bird> bundle =
+        Lists.newArrayList(new Bird("bemused", "robin"), new Bird("evasive", 
"goose"));
+    List<String> lines = Arrays.asList("<birds>", "<bird>", 
"<species>robin</species>",
+        "<adjective>bemused</adjective>", "</bird>", "<bird>", 
"<species>goose</species>",
+        "<adjective>evasive</adjective>", "</bird>", "</birds>");
+    runTestWrite(writer, bundle, lines);
+  }
+
+  /**
+   * Builder methods correctly initialize an XML Sink.
+   */
+  @Test
+  public void testBuildXmlWriteTransform() {
+    XmlIO.Write<Bird> write =
+        XmlIO.<Bird>write()
+            .toFilenamePrefix(testFilePrefix)
+            .withRecordClass(Bird.class)
+            .withRootElement(testRootElement);
+    assertEquals(Bird.class, write.getRecordClass());
+    assertEquals(testRootElement, write.getRootElement());
+    assertEquals(testFilePrefix, write.getFilenamePrefix());
+  }
+
+  /** Validation ensures no fields are missing. */
+  @Test
+  public void testValidateXmlSinkMissingRecordClass() {
+    thrown.expect(NullPointerException.class);
+    XmlIO.<Bird>write()
+        .withRootElement(testRootElement)
+        .toFilenamePrefix(testFilePrefix)
+        .validate(null);
+  }
+
+  @Test
+  public void testValidateXmlSinkMissingRootElement() {
+    thrown.expect(NullPointerException.class);
+    
XmlIO.<Bird>write().withRecordClass(Bird.class).toFilenamePrefix(testFilePrefix).validate(null);
+  }
+
+  @Test
+  public void testValidateXmlSinkMissingFilePrefix() {
+    thrown.expect(NullPointerException.class);
+    
XmlIO.<Bird>write().withRecordClass(Bird.class).withRootElement(testRootElement).validate(null);
+  }
+
+  /**
+   * An XML Sink correctly creates an XmlWriteOperation.
+   */
+  @Test
+  public void testCreateWriteOperations() {
+    PipelineOptions options = PipelineOptionsFactory.create();
+    XmlSink<Bird> sink =
+        XmlIO.<Bird>write()
+            .withRecordClass(Bird.class)
+            .withRootElement(testRootElement)
+            .toFilenamePrefix(testFilePrefix)
+            .createSink();
+    XmlWriteOperation<Bird> writeOp = sink.createWriteOperation(options);
+    Path outputPath = new File(testFilePrefix).toPath();
+    Path tempPath = new File(writeOp.getTemporaryDirectory()).toPath();
+    assertEquals(outputPath.getParent(), tempPath.getParent());
+    assertThat(
+        tempPath.getFileName().toString(), containsString("temp-beam-" + 
outputPath.getFileName()));
+  }
+
+  /**
+   * An XmlWriteOperation correctly creates an XmlWriter.
+   */
+  @Test
+  public void testCreateWriter() throws Exception {
+    PipelineOptions options = PipelineOptionsFactory.create();
+    XmlWriteOperation<Bird> writeOp =
+        XmlIO.<Bird>write()
+            .withRecordClass(Bird.class)
+            .withRootElement(testRootElement)
+            .toFilenamePrefix(testFilePrefix)
+            .createSink()
+            .createWriteOperation(options);
+    XmlWriter<Bird> writer = writeOp.createWriter(options);
+    Path outputPath = new File(testFilePrefix).toPath();
+    Path tempPath = new 
File(writer.getWriteOperation().getTemporaryDirectory()).toPath();
+    assertEquals(outputPath.getParent(), tempPath.getParent());
+    assertThat(
+        tempPath.getFileName().toString(), containsString("temp-beam-" + 
outputPath.getFileName()));
+    assertNotNull(writer.marshaller);
+  }
+
+  @Test
+  public void testDisplayData() {
+    XmlIO.Write<Integer> write = XmlIO.<Integer>write()
+        .toFilenamePrefix("foobar")
+        .withRootElement("bird")
+        .withRecordClass(Integer.class);
+
+    DisplayData displayData = DisplayData.from(write);
+
+    assertThat(displayData, hasDisplayItem("fileNamePattern", 
"foobar-SSSSS-of-NNNNN.xml"));
+    assertThat(displayData, hasDisplayItem("rootElement", "bird"));
+    assertThat(displayData, hasDisplayItem("recordClass", Integer.class));
+  }
+
+  /**
+   * Write a bundle with an XmlWriter and verify the output is expected.
+   */
+  private <T> void runTestWrite(XmlWriter<T> writer, List<T> bundle, 
List<String> expected)
+      throws Exception {
+    File tmpFile = tmpFolder.newFile("foo.txt");
+    try (FileOutputStream fileOutputStream = new FileOutputStream(tmpFile)) {
+      writeBundle(writer, bundle, fileOutputStream.getChannel());
+    }
+    List<String> lines = new ArrayList<>();
+    try (BufferedReader reader = new BufferedReader(new FileReader(tmpFile))) {
+      for (;;) {
+        String line = reader.readLine();
+        if (line == null) {
+          break;
+        }
+        line = line.trim();
+        if (line.length() > 0) {
+          lines.add(line);
+        }
+      }
+      assertEquals(expected, lines);
+    }
+  }
+
+  /**
+   * Write a bundle with an XmlWriter.
+   */
+  private <T> void writeBundle(XmlWriter<T> writer, List<T> elements, 
WritableByteChannel channel)
+      throws Exception {
+    writer.prepareWrite(channel);
+    writer.writeHeader();
+    for (T elem : elements) {
+      writer.write(elem);
+    }
+    writer.writeFooter();
+  }
+
+  /**
+   * Test JAXB annotated class.
+   */
+  @SuppressWarnings("unused")
+  @XmlRootElement(name = "bird")
+  @XmlType(propOrder = {"name", "adjective"})
+  private static final class Bird {
+    private String name;
+    private String adjective;
+
+    @XmlElement(name = "species")
+    public String getName() {
+      return name;
+    }
+
+    public void setName(String name) {
+      this.name = name;
+    }
+
+    public String getAdjective() {
+      return adjective;
+    }
+
+    public void setAdjective(String adjective) {
+      this.adjective = adjective;
+    }
+
+    public Bird() {}
+
+    public Bird(String adjective, String name) {
+      this.adjective = adjective;
+      this.name = name;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/393a90c7/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSourceTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSourceTest.java 
b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSourceTest.java
new file mode 100644
index 0000000..5b33be3
--- /dev/null
+++ 
b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSourceTest.java
@@ -0,0 +1,893 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.xml;
+
+import static 
org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionExhaustive;
+import static 
org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionFails;
+import static 
org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionSucceedsAndConsistent;
+import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.hamcrest.Matchers.both;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.containsString;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.ImmutableList;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlRootElement;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.Source.Reader;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.PCollection;
+import org.hamcrest.Matchers;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests XmlSource.
+ */
+@RunWith(JUnit4.class)
+public class XmlSourceTest {
+
+  @Rule
+  public TestPipeline p = TestPipeline.create();
+
+  @Rule
+  public TemporaryFolder tempFolder = new TemporaryFolder();
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  String tinyXML =
+      
"<trains><train><name>Thomas</name></train><train><name>Henry</name></train>"
+      + "<train><name>James</name></train></trains>";
+
+  String xmlWithMultiByteElementName =
+      
"<දුම්රියන්><දුම්රිය><name>Thomas</name></දුම්රිය><දුම්රිය><name>Henry</name></දුම්රිය>"
+      + 
"<දුම්රිය><name>James</name></දුම්රිය></දුම්රියන්>";
+
+  String xmlWithMultiByteChars =
+      
"<trains><train><name>Thomas¥</name></train><train><name>Hen¶ry</name></train>"
+      + "<train><name>Jamßes</name></train></trains>";
+
+  String trainXML =
+      "<trains>"
+      + 
"<train><name>Thomas</name><number>1</number><color>blue</color></train>"
+      + 
"<train><name>Henry</name><number>3</number><color>green</color></train>"
+      + 
"<train><name>Toby</name><number>7</number><color>brown</color></train>"
+      + 
"<train><name>Gordon</name><number>4</number><color>blue</color></train>"
+      + 
"<train><name>Emily</name><number>-1</number><color>red</color></train>"
+      + 
"<train><name>Percy</name><number>6</number><color>green</color></train>"
+      + "</trains>";
+
+  String trainXMLWithEmptyTags =
+      "<trains>"
+      + "<train/>"
+      + 
"<train><name>Thomas</name><number>1</number><color>blue</color></train>"
+      + 
"<train><name>Henry</name><number>3</number><color>green</color></train>"
+      + "<train/>"
+      + 
"<train><name>Toby</name><number>7</number><color>brown</color></train>"
+      + 
"<train><name>Gordon</name><number>4</number><color>blue</color></train>"
+      + 
"<train><name>Emily</name><number>-1</number><color>red</color></train>"
+      + 
"<train><name>Percy</name><number>6</number><color>green</color></train>"
+      + "</trains>";
+
+  String trainXMLWithAttributes =
+      "<trains>"
+      + "<train 
size=\"small\"><name>Thomas</name><number>1</number><color>blue</color></train>"
+      + "<train 
size=\"big\"><name>Henry</name><number>3</number><color>green</color></train>"
+      + "<train 
size=\"small\"><name>Toby</name><number>7</number><color>brown</color></train>"
+      + "<train 
size=\"big\"><name>Gordon</name><number>4</number><color>blue</color></train>"
+      + "<train 
size=\"small\"><name>Emily</name><number>-1</number><color>red</color></train>"
+      + "<train 
size=\"small\"><name>Percy</name><number>6</number><color>green</color></train>"
+      + "</trains>";
+
+  String trainXMLWithSpaces =
+      "<trains>"
+      + "<train><name>Thomas   </name>   
<number>1</number><color>blue</color></train>"
+      + 
"<train><name>Henry</name><number>3</number><color>green</color></train>\n"
+      + "<train><name>Toby</name><number>7</number><color>  brown  
</color></train>  "
+      + "<train><name>Gordon</name>   
<number>4</number><color>blue</color>\n</train>\t"
+      + 
"<train><name>Emily</name><number>-1</number>\t<color>red</color></train>"
+      + "<train>\n<name>Percy</name>   <number>6  </number>   
<color>green</color></train>"
+      + "</trains>";
+
+  String trainXMLWithAllFeaturesMultiByte =
+      "<දුම්රියන්>"
+      + "<දුම්රිය/>"
+      + "<දුම්රිය size=\"small\"><name> 
Thomas¥</name><number>1</number><color>blue</color>"
+      + "</දුම්රිය>"
+      + "<දුම්රිය size=\"big\"><name>He 
nry</name><number>3</number><color>green</color></දුම්රිය>"
+      + "<දුම්රිය size=\"small\"><name>Toby  
</name><number>7</number><color>br¶own</color>"
+      + "</දුම්රිය>"
+      + "<දුම්රිය/>"
+      + "<දුම්රිය 
size=\"big\"><name>Gordon</name><number>4</number><color> 
blue</color></දුම්රිය>"
+      + "<දුම්රිය 
size=\"small\"><name>Emily</name><number>-1</number><color>red</color></දුම්රිය>"
+      + "<දුම්රිය 
size=\"small\"><name>Percy</name><number>6</number><color>green</color>"
+      + "</දුම්රිය>"
+      + "</දුම්රියන්>";
+
+  String trainXMLWithAllFeaturesSingleByte =
+      "<trains>"
+      + "<train/>"
+      + "<train size=\"small\"><name> 
Thomas</name><number>1</number><color>blue</color>"
+      + "</train>"
+      + "<train size=\"big\"><name>He 
nry</name><number>3</number><color>green</color></train>"
+      + "<train size=\"small\"><name>Toby  
</name><number>7</number><color>brown</color>"
+      + "</train>"
+      + "<train/>"
+      + "<train size=\"big\"><name>Gordon</name><number>4</number><color> 
blue</color></train>"
+      + "<train 
size=\"small\"><name>Emily</name><number>-1</number><color>red</color></train>"
+      + "<train 
size=\"small\"><name>Percy</name><number>6</number><color>green</color>"
+      + "</train>"
+      + "</trains>";
+
+  @XmlRootElement
+  static class Train {
+    public static final int TRAIN_NUMBER_UNDEFINED = -1;
+    public String name = null;
+    public String color = null;
+    public int number = TRAIN_NUMBER_UNDEFINED;
+
+    @XmlAttribute(name = "size")
+    public String size = null;
+
+    public Train() {}
+
+    public Train(String name, int number, String color, String size) {
+      this.name = name;
+      this.number = number;
+      this.color = color;
+      this.size = size;
+    }
+
+    @Override
+    public int hashCode() {
+      int hashCode = 1;
+      hashCode = 31 * hashCode + (name == null ? 0 : name.hashCode());
+      hashCode = 31 * hashCode + number;
+      hashCode = 31 * hashCode + (color == null ? 0 : name.hashCode());
+      hashCode = 31 * hashCode + (size == null ? 0 : name.hashCode());
+      return hashCode;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (!(obj instanceof Train)) {
+        return false;
+      }
+
+      Train other = (Train) obj;
+      return (name == null || name.equals(other.name)) && (number == 
other.number)
+          && (color == null || color.equals(other.color))
+          && (size == null || size.equals(other.size));
+    }
+
+    @Override
+    public String toString() {
+      String str = "Train[";
+      boolean first = true;
+      if (name != null) {
+        str = str + "name=" + name;
+        first = false;
+      }
+      if (number != Integer.MIN_VALUE) {
+        if (!first) {
+          str = str + ",";
+        }
+        str = str + "number=" + number;
+        first = false;
+      }
+      if (color != null) {
+        if (!first) {
+          str = str + ",";
+        }
+        str = str + "color=" + color;
+        first = false;
+      }
+      if (size != null) {
+        if (!first) {
+          str = str + ",";
+        }
+        str = str + "size=" + size;
+      }
+      str = str + "]";
+      return str;
+    }
+  }
+
+  private List<Train> generateRandomTrainList(int size) {
+    String[] names = {"Thomas", "Henry", "Gordon", "Emily", "Toby", "Percy", 
"Mavis", "Edward",
+        "Bertie", "Harold", "Hiro", "Terence", "Salty", "Trevor"};
+    int[] numbers = {-1, 1, 2, 3, 4, 5, 6, 7, 8, 9};
+    String[] colors = {"red", "blue", "green", "orange", "brown", "black", 
"white"};
+    String[] sizes = {"small", "medium", "big"};
+
+    Random random = new Random(System.currentTimeMillis());
+
+    List<Train> trains = new ArrayList<>();
+    for (int i = 0; i < size; i++) {
+      trains.add(new Train(names[random.nextInt(names.length - 1)],
+          numbers[random.nextInt(numbers.length - 1)], 
colors[random.nextInt(colors.length - 1)],
+          sizes[random.nextInt(sizes.length - 1)]));
+    }
+
+    return trains;
+  }
+
+  private String trainToXMLElement(Train train) {
+    return "<train size=\"" + train.size + "\"><name>" + train.name + 
"</name><number>"
+        + train.number + "</number><color>" + train.color + "</color></train>";
+  }
+
+  private File createRandomTrainXML(String fileName, List<Train> trains) 
throws IOException {
+    File file = tempFolder.newFile(fileName);
+    try (BufferedWriter writer = new BufferedWriter(new FileWriter(file))) {
+      writer.write("<trains>");
+      writer.newLine();
+      for (Train train : trains) {
+        String str = trainToXMLElement(train);
+        writer.write(str);
+        writer.newLine();
+      }
+      writer.write("</trains>");
+      writer.newLine();
+    }
+    return file;
+  }
+
+  private List<Train> readEverythingFromReader(Reader<Train> reader) throws 
IOException {
+    List<Train> results = new ArrayList<>();
+    for (boolean available = reader.start(); available; available = 
reader.advance()) {
+      Train train = reader.getCurrent();
+      results.add(train);
+    }
+    return results;
+  }
+
+  @Test
+  public void testReadXMLTiny() throws IOException {
+    File file = tempFolder.newFile("trainXMLTiny");
+    Files.write(file.toPath(), tinyXML.getBytes(StandardCharsets.UTF_8));
+
+    BoundedSource<Train> source =
+        XmlIO.<Train>read()
+            .from(file.toPath().toString())
+            .withRootElement("trains")
+            .withRecordElement("train")
+            .withRecordClass(Train.class)
+            .withMinBundleSize(1024)
+            .createSource();
+
+    List<Train> expectedResults = ImmutableList.of(
+        new Train("Thomas", Train.TRAIN_NUMBER_UNDEFINED, null, null),
+        new Train("Henry", Train.TRAIN_NUMBER_UNDEFINED, null, null),
+        new Train("James", Train.TRAIN_NUMBER_UNDEFINED, null, null));
+
+    assertThat(
+        trainsToStrings(expectedResults),
+        containsInAnyOrder(
+            
trainsToStrings(readEverythingFromReader(source.createReader(null))).toArray()));
+  }
+
+  @Test
+  public void testReadXMLWithMultiByteChars() throws IOException {
+    File file = tempFolder.newFile("trainXMLTiny");
+    Files.write(file.toPath(), 
xmlWithMultiByteChars.getBytes(StandardCharsets.UTF_8));
+
+    BoundedSource<Train> source =
+        XmlIO.<Train>read()
+            .from(file.toPath().toString())
+            .withRootElement("trains")
+            .withRecordElement("train")
+            .withRecordClass(Train.class)
+            .withMinBundleSize(1024)
+            .createSource();
+
+    List<Train> expectedResults = ImmutableList.of(
+        new Train("Thomas¥", Train.TRAIN_NUMBER_UNDEFINED, null, null),
+        new Train("Hen¶ry", Train.TRAIN_NUMBER_UNDEFINED, null, null),
+        new Train("Jamßes", Train.TRAIN_NUMBER_UNDEFINED, null, null));
+
+    assertThat(
+        trainsToStrings(expectedResults),
+        containsInAnyOrder(
+            
trainsToStrings(readEverythingFromReader(source.createReader(null))).toArray()));
+  }
+
+  @Test
+  @Ignore(
+      "Multi-byte characters in XML are not supported because the parser "
+          + "currently does not correctly report byte offsets")
+  public void testReadXMLWithMultiByteElementName() throws IOException {
+    File file = tempFolder.newFile("trainXMLTiny");
+    Files.write(file.toPath(), 
xmlWithMultiByteElementName.getBytes(StandardCharsets.UTF_8));
+
+    BoundedSource<Train> source =
+        XmlIO.<Train>read()
+            .from(file.toPath().toString())
+            .withRootElement("දුම්රියන්")
+            .withRecordElement("දුම්රිය")
+            .withRecordClass(Train.class)
+            .withMinBundleSize(1024)
+            .createSource();
+
+    List<Train> expectedResults = ImmutableList.of(
+        new Train("Thomas", Train.TRAIN_NUMBER_UNDEFINED, null, null),
+        new Train("Henry", Train.TRAIN_NUMBER_UNDEFINED, null, null),
+        new Train("James", Train.TRAIN_NUMBER_UNDEFINED, null, null));
+
+    assertThat(
+        trainsToStrings(expectedResults),
+        containsInAnyOrder(
+            
trainsToStrings(readEverythingFromReader(source.createReader(null))).toArray()));
+  }
+
+  @Test
+  public void testSplitWithEmptyBundleAtEnd() throws Exception {
+    File file = tempFolder.newFile("trainXMLTiny");
+    Files.write(file.toPath(), tinyXML.getBytes(StandardCharsets.UTF_8));
+
+    BoundedSource<Train> source =
+        XmlIO.<Train>read()
+            .from(file.toPath().toString())
+            .withRootElement("trains")
+            .withRecordElement("train")
+            .withRecordClass(Train.class)
+            .withMinBundleSize(10)
+            .createSource();
+    List<? extends BoundedSource<Train>> splits = source.split(50, null);
+
+    assertTrue(splits.size() > 2);
+
+    List<Train> results = new ArrayList<>();
+    for (BoundedSource<Train> split : splits) {
+      results.addAll(readEverythingFromReader(split.createReader(null)));
+    }
+
+    List<Train> expectedResults = ImmutableList.of(
+        new Train("Thomas", Train.TRAIN_NUMBER_UNDEFINED, null, null),
+        new Train("Henry", Train.TRAIN_NUMBER_UNDEFINED, null, null),
+        new Train("James", Train.TRAIN_NUMBER_UNDEFINED, null, null));
+
+    assertThat(
+        trainsToStrings(expectedResults), 
containsInAnyOrder(trainsToStrings(results).toArray()));
+  }
+
+  List<String> trainsToStrings(List<Train> input) {
+    List<String> strings = new ArrayList<>();
+    for (Object data : input) {
+      strings.add(data.toString());
+    }
+    return strings;
+  }
+
+  @Test
+  public void testReadXMLSmall() throws IOException {
+    File file = tempFolder.newFile("trainXMLSmall");
+    Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8));
+
+    BoundedSource<Train> source =
+        XmlIO.<Train>read()
+            .from(file.toPath().toString())
+            .withRootElement("trains")
+            .withRecordElement("train")
+            .withRecordClass(Train.class)
+            .withMinBundleSize(1024)
+            .createSource();
+
+    List<Train> expectedResults =
+        ImmutableList.of(new Train("Thomas", 1, "blue", null), new 
Train("Henry", 3, "green", null),
+            new Train("Toby", 7, "brown", null), new Train("Gordon", 4, 
"blue", null),
+            new Train("Emily", -1, "red", null), new Train("Percy", 6, 
"green", null));
+
+    assertThat(
+        trainsToStrings(expectedResults),
+        containsInAnyOrder(
+            
trainsToStrings(readEverythingFromReader(source.createReader(null))).toArray()));
+  }
+
+  @Test
+  public void testReadXMLNoRootElement() throws IOException {
+    File file = tempFolder.newFile("trainXMLSmall");
+    Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8));
+
+    BoundedSource<Train> source =
+        XmlIO.<Train>read()
+            .from(file.toPath().toString())
+            .withRecordElement("train")
+            .withRecordClass(Train.class)
+            .createSource();
+
+    exception.expect(NullPointerException.class);
+    exception.expectMessage(
+        "rootElement is null. Use builder method withRootElement() to set 
this.");
+    readEverythingFromReader(source.createReader(null));
+  }
+
+  @Test
+  public void testReadXMLNoRecordElement() throws IOException {
+    File file = tempFolder.newFile("trainXMLSmall");
+    Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8));
+
+    BoundedSource<Train> source =
+        XmlIO.<Train>read()
+            .from(file.toPath().toString())
+            .withRootElement("trains")
+            .withRecordClass(Train.class)
+            .createSource();
+
+    exception.expect(NullPointerException.class);
+    exception.expectMessage(
+        "recordElement is null. Use builder method withRecordElement() to set 
this.");
+    readEverythingFromReader(source.createReader(null));
+  }
+
+  @Test
+  public void testReadXMLNoRecordClass() throws IOException {
+    File file = tempFolder.newFile("trainXMLSmall");
+    Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8));
+
+    BoundedSource<Train> source =
+        XmlIO.<Train>read()
+            .from(file.toPath().toString())
+            .withRootElement("trains")
+            .withRecordElement("train")
+            .createSource();
+
+    exception.expect(NullPointerException.class);
+    exception.expectMessage(
+        "recordClass is null. Use builder method withRecordClass() to set 
this.");
+    readEverythingFromReader(source.createReader(null));
+  }
+
+  @Test
+  public void testReadXMLIncorrectRootElement() throws IOException {
+    File file = tempFolder.newFile("trainXMLSmall");
+    Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8));
+
+    BoundedSource<Train> source =
+        XmlIO.<Train>read()
+            .from(file.toPath().toString())
+            .withRootElement("something")
+            .withRecordElement("train")
+            .withRecordClass(Train.class)
+            .createSource();
+
+    exception.expectMessage("Unexpected close tag </trains>; expected 
</something>.");
+    readEverythingFromReader(source.createReader(null));
+  }
+
+  @Test
+  public void testReadXMLIncorrectRecordElement() throws IOException {
+    File file = tempFolder.newFile("trainXMLSmall");
+    Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8));
+
+    BoundedSource<Train> source =
+        XmlIO.<Train>read()
+            .from(file.toPath().toString())
+            .withRootElement("trains")
+            .withRecordElement("something")
+            .withRecordClass(Train.class)
+            .createSource();
+
+    assertEquals(readEverythingFromReader(source.createReader(null)), new 
ArrayList<Train>());
+  }
+
+  @XmlRootElement
+  private static class WrongTrainType {
+    @SuppressWarnings("unused")
+    public String something;
+  }
+
+  @Test
+  public void testReadXMLInvalidRecordClass() throws IOException {
+    File file = tempFolder.newFile("trainXMLSmall");
+    Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8));
+
+    BoundedSource<WrongTrainType> source =
+        XmlIO.<WrongTrainType>read()
+            .from(file.toPath().toString())
+            .withRootElement("trains")
+            .withRecordElement("train")
+            .withRecordClass(WrongTrainType.class)
+            .createSource();
+
+    exception.expect(RuntimeException.class);
+
+    // JAXB internationalizes the error message. So this is all we can match 
for.
+    
exception.expectMessage(both(containsString("name")).and(Matchers.containsString("something")));
+    try (Reader<WrongTrainType> reader = source.createReader(null)) {
+
+      List<WrongTrainType> results = new ArrayList<>();
+      for (boolean available = reader.start(); available; available = 
reader.advance()) {
+        WrongTrainType train = reader.getCurrent();
+        results.add(train);
+      }
+    }
+  }
+
+  @Test
+  public void testReadXMLNoBundleSize() throws IOException {
+    File file = tempFolder.newFile("trainXMLSmall");
+    Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8));
+
+    BoundedSource<Train> source =
+        XmlIO.<Train>read()
+            .from(file.toPath().toString())
+            .withRootElement("trains")
+            .withRecordElement("train")
+            .withRecordClass(Train.class)
+            .createSource();
+
+    List<Train> expectedResults =
+        ImmutableList.of(new Train("Thomas", 1, "blue", null), new 
Train("Henry", 3, "green", null),
+            new Train("Toby", 7, "brown", null), new Train("Gordon", 4, 
"blue", null),
+            new Train("Emily", -1, "red", null), new Train("Percy", 6, 
"green", null));
+
+    assertThat(
+        trainsToStrings(expectedResults),
+        containsInAnyOrder(
+            
trainsToStrings(readEverythingFromReader(source.createReader(null))).toArray()));
+  }
+
+
+  @Test
+  public void testReadXMLWithEmptyTags() throws IOException {
+    File file = tempFolder.newFile("trainXMLSmall");
+    Files.write(file.toPath(), 
trainXMLWithEmptyTags.getBytes(StandardCharsets.UTF_8));
+
+    BoundedSource<Train> source =
+        XmlIO.<Train>read()
+            .from(file.toPath().toString())
+            .withRootElement("trains")
+            .withRecordElement("train")
+            .withRecordClass(Train.class)
+            .withMinBundleSize(1024)
+            .createSource();
+
+    List<Train> expectedResults = ImmutableList.of(new Train("Thomas", 1, 
"blue", null),
+        new Train("Henry", 3, "green", null), new Train("Toby", 7, "brown", 
null),
+        new Train("Gordon", 4, "blue", null), new Train("Emily", -1, "red", 
null),
+        new Train("Percy", 6, "green", null), new Train(), new Train());
+
+    assertThat(
+        trainsToStrings(expectedResults),
+        containsInAnyOrder(
+            
trainsToStrings(readEverythingFromReader(source.createReader(null))).toArray()));
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testReadXMLSmallPipeline() throws IOException {
+    File file = tempFolder.newFile("trainXMLSmall");
+    Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8));
+
+    PCollection<Train> output =
+        p.apply(
+            "ReadFileData",
+            XmlIO.<Train>read()
+                .from(file.toPath().toString())
+                .withRootElement("trains")
+                .withRecordElement("train")
+                .withRecordClass(Train.class)
+                .withMinBundleSize(1024));
+
+    List<Train> expectedResults =
+        ImmutableList.of(new Train("Thomas", 1, "blue", null), new 
Train("Henry", 3, "green", null),
+            new Train("Toby", 7, "brown", null), new Train("Gordon", 4, 
"blue", null),
+            new Train("Emily", -1, "red", null), new Train("Percy", 6, 
"green", null));
+
+    PAssert.that(output).containsInAnyOrder(expectedResults);
+    p.run();
+  }
+
+  @Test
+  public void testReadXMLWithAttributes() throws IOException {
+    File file = tempFolder.newFile("trainXMLSmall");
+    Files.write(file.toPath(), 
trainXMLWithAttributes.getBytes(StandardCharsets.UTF_8));
+
+    BoundedSource<Train> source =
+        XmlIO.<Train>read()
+            .from(file.toPath().toString())
+            .withRootElement("trains")
+            .withRecordElement("train")
+            .withRecordClass(Train.class)
+            .withMinBundleSize(1024)
+            .createSource();
+
+    List<Train> expectedResults = ImmutableList.of(new Train("Thomas", 1, 
"blue", "small"),
+        new Train("Henry", 3, "green", "big"), new Train("Toby", 7, "brown", 
"small"),
+        new Train("Gordon", 4, "blue", "big"), new Train("Emily", -1, "red", 
"small"),
+        new Train("Percy", 6, "green", "small"));
+
+    assertThat(
+        trainsToStrings(expectedResults),
+        containsInAnyOrder(
+            
trainsToStrings(readEverythingFromReader(source.createReader(null))).toArray()));
+  }
+
+  @Test
+  public void testReadXMLWithWhitespaces() throws IOException {
+    File file = tempFolder.newFile("trainXMLSmall");
+    Files.write(file.toPath(), 
trainXMLWithSpaces.getBytes(StandardCharsets.UTF_8));
+
+    BoundedSource<Train> source =
+        XmlIO.<Train>read()
+            .from(file.toPath().toString())
+            .withRootElement("trains")
+            .withRecordElement("train")
+            .withRecordClass(Train.class)
+            .withMinBundleSize(1024)
+            .createSource();
+
+    List<Train> expectedResults = ImmutableList.of(new Train("Thomas   ", 1, 
"blue", null),
+        new Train("Henry", 3, "green", null), new Train("Toby", 7, "  brown  
", null),
+        new Train("Gordon", 4, "blue", null), new Train("Emily", -1, "red", 
null),
+        new Train("Percy", 6, "green", null));
+
+    assertThat(
+        trainsToStrings(expectedResults),
+        containsInAnyOrder(
+            
trainsToStrings(readEverythingFromReader(source.createReader(null))).toArray()));
+  }
+
+  @Test
+  public void testReadXMLLarge() throws IOException {
+    String fileName = "temp.xml";
+    List<Train> trains = generateRandomTrainList(100);
+    File file = createRandomTrainXML(fileName, trains);
+
+    BoundedSource<Train> source =
+        XmlIO.<Train>read()
+            .from(file.toPath().toString())
+            .withRootElement("trains")
+            .withRecordElement("train")
+            .withRecordClass(Train.class)
+            .withMinBundleSize(1024)
+            .createSource();
+
+    assertThat(
+        trainsToStrings(trains),
+        containsInAnyOrder(
+            
trainsToStrings(readEverythingFromReader(source.createReader(null))).toArray()));
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testReadXMLLargePipeline() throws IOException {
+    String fileName = "temp.xml";
+    List<Train> trains = generateRandomTrainList(100);
+    File file = createRandomTrainXML(fileName, trains);
+
+    PCollection<Train> output =
+        p.apply(
+            "ReadFileData",
+            XmlIO.<Train>read()
+                .from(file.toPath().toString())
+                .withRootElement("trains")
+                .withRecordElement("train")
+                .withRecordClass(Train.class)
+                .withMinBundleSize(1024));
+
+    PAssert.that(output).containsInAnyOrder(trains);
+    p.run();
+  }
+
+  @Test
+  public void testSplitWithEmptyBundles() throws Exception {
+    String fileName = "temp.xml";
+    List<Train> trains = generateRandomTrainList(10);
+    File file = createRandomTrainXML(fileName, trains);
+
+    BoundedSource<Train> source =
+        XmlIO.<Train>read()
+            .from(file.toPath().toString())
+            .withRootElement("trains")
+            .withRecordElement("train")
+            .withRecordClass(Train.class)
+            .withMinBundleSize(10)
+            .createSource();
+    List<? extends BoundedSource<Train>> splits = source.split(100, null);
+
+    assertTrue(splits.size() > 2);
+
+    List<Train> results = new ArrayList<>();
+    for (BoundedSource<Train> split : splits) {
+      results.addAll(readEverythingFromReader(split.createReader(null)));
+    }
+
+    assertThat(trainsToStrings(trains), 
containsInAnyOrder(trainsToStrings(results).toArray()));
+  }
+
+  @Test
+  public void testXMLWithSplits() throws Exception {
+    String fileName = "temp.xml";
+    List<Train> trains = generateRandomTrainList(100);
+    File file = createRandomTrainXML(fileName, trains);
+
+    BoundedSource<Train> source =
+        XmlIO.<Train>read()
+            .from(file.toPath().toString())
+            .withRootElement("trains")
+            .withRecordElement("train")
+            .withRecordClass(Train.class)
+            .withMinBundleSize(10)
+            .createSource();
+    List<? extends BoundedSource<Train>> splits = source.split(256, null);
+
+    // Not a trivial split
+    assertTrue(splits.size() > 2);
+
+    List<Train> results = new ArrayList<>();
+    for (BoundedSource<Train> split : splits) {
+      results.addAll(readEverythingFromReader(split.createReader(null)));
+    }
+    assertThat(trainsToStrings(trains), 
containsInAnyOrder(trainsToStrings(results).toArray()));
+  }
+
+  @Test
+  public void testSplitAtFraction() throws Exception {
+    PipelineOptions options = PipelineOptionsFactory.create();
+    String fileName = "temp.xml";
+    List<Train> trains = generateRandomTrainList(100);
+    File file = createRandomTrainXML(fileName, trains);
+
+    BoundedSource<Train> fileSource =
+        XmlIO.<Train>read()
+            .from(file.toPath().toString())
+            .withRootElement("trains")
+            .withRecordElement("train")
+            .withRecordClass(Train.class)
+            .withMinBundleSize(10)
+            .createSource();
+
+    List<? extends BoundedSource<Train>> splits =
+        fileSource.split(file.length() / 3, null);
+    for (BoundedSource<Train> splitSource : splits) {
+      int numItems = 
readEverythingFromReader(splitSource.createReader(null)).size();
+      // Should not split while unstarted.
+      assertSplitAtFractionFails(splitSource, 0, 0.7, options);
+      assertSplitAtFractionSucceedsAndConsistent(splitSource, 1, 0.7, options);
+      assertSplitAtFractionSucceedsAndConsistent(splitSource, 15, 0.7, 
options);
+      assertSplitAtFractionFails(splitSource, 0, 0.0, options);
+      assertSplitAtFractionFails(splitSource, 20, 0.3, options);
+      assertSplitAtFractionFails(splitSource, numItems, 1.0, options);
+
+      // After reading 100 elements we will be approximately at position
+      // 0.99 * (endOffset - startOffset) hence trying to split at fraction 
0.9 will be
+      // unsuccessful.
+      assertSplitAtFractionFails(splitSource, numItems, 0.9, options);
+
+      // Following passes since we can always find a fraction that is 
extremely close to 1 such that
+      // the position suggested by the fraction will be larger than the 
position the reader is at
+      // after reading "items - 1" elements.
+      // This also passes for "numItemsToReadBeforeSplit = items" if the 
position at suggested
+      // fraction is larger than the position the reader is at after reading 
all "items" elements
+      // (i.e., the start position of the last element). This is true for most 
cases but will not
+      // be true if reader position is only one less than the end position. 
(i.e., the last element
+      // of the bundle start at the last byte that belongs to the bundle).
+      assertSplitAtFractionSucceedsAndConsistent(splitSource, numItems - 1, 
0.999, options);
+    }
+  }
+
+  @Test
+  public void testSplitAtFractionExhaustiveSingleByte() throws Exception {
+    PipelineOptions options = PipelineOptionsFactory.create();
+    File file = tempFolder.newFile("trainXMLSmall");
+    Files.write(file.toPath(), 
trainXMLWithAllFeaturesSingleByte.getBytes(StandardCharsets.UTF_8));
+
+    BoundedSource<Train> source =
+        XmlIO.<Train>read()
+            .from(file.toPath().toString())
+            .withRootElement("trains")
+            .withRecordElement("train")
+            .withRecordClass(Train.class)
+            .createSource();
+    assertSplitAtFractionExhaustive(source, options);
+  }
+
+  @Test
+  @Ignore(
+      "Multi-byte characters in XML are not supported because the parser "
+      + "currently does not correctly report byte offsets")
+  public void testSplitAtFractionExhaustiveMultiByte() throws Exception {
+    PipelineOptions options = PipelineOptionsFactory.create();
+    File file = tempFolder.newFile("trainXMLSmall");
+    Files.write(file.toPath(), 
trainXMLWithAllFeaturesMultiByte.getBytes(StandardCharsets.UTF_8));
+
+    BoundedSource<Train> source =
+        XmlIO.<Train>read()
+            .from(file.toPath().toString())
+            .withRootElement("දුම්රියන්")
+            .withRecordElement("දුම්රිය")
+            .withRecordClass(Train.class)
+            .createSource();
+    assertSplitAtFractionExhaustive(source, options);
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testReadXMLFilePattern() throws IOException {
+    List<Train> trains1 = generateRandomTrainList(20);
+    File file = createRandomTrainXML("temp1.xml", trains1);
+    List<Train> trains2 = generateRandomTrainList(10);
+    createRandomTrainXML("temp2.xml", trains2);
+    List<Train> trains3 = generateRandomTrainList(15);
+    createRandomTrainXML("temp3.xml", trains3);
+    generateRandomTrainList(8);
+    createRandomTrainXML("otherfile.xml", trains1);
+
+    PCollection<Train> output =
+        p.apply(
+            "ReadFileData",
+            XmlIO.<Train>read()
+                .from(file.getParent() + "/" + "temp*.xml")
+                .withRootElement("trains")
+                .withRecordElement("train")
+                .withRecordClass(Train.class)
+                .withMinBundleSize(1024));
+
+    List<Train> expectedResults = new ArrayList<>();
+    expectedResults.addAll(trains1);
+    expectedResults.addAll(trains2);
+    expectedResults.addAll(trains3);
+
+    PAssert.that(output).containsInAnyOrder(expectedResults);
+    p.run();
+  }
+
+  @Test
+  public void testDisplayData() {
+    DisplayData displayData =
+        DisplayData.from(
+            XmlIO.<Integer>read()
+                .from("foo.xml")
+                .withRootElement("bird")
+                .withRecordElement("cat")
+                .withMinBundleSize(1234)
+                .withRecordClass(Integer.class));
+
+    assertThat(displayData, hasDisplayItem("filePattern", "foo.xml"));
+    assertThat(displayData, hasDisplayItem("rootElement", "bird"));
+    assertThat(displayData, hasDisplayItem("recordElement", "cat"));
+    assertThat(displayData, hasDisplayItem("recordClass", Integer.class));
+    assertThat(displayData, hasDisplayItem("minBundleSize", 1234));
+  }
+}

Reply via email to