Repository: beam Updated Branches: refs/heads/master 022d5b657 -> 62f041e56
http://git-wip-us.apache.org/repos/asf/beam/blob/393a90c7/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/package-info.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/package-info.java new file mode 100644 index 0000000..9c5089a --- /dev/null +++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Transforms for reading and writing Xml files. + */ +package org.apache.beam.sdk.io.xml; http://git-wip-us.apache.org/repos/asf/beam/blob/393a90c7/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java new file mode 100644 index 0000000..5f1330d --- /dev/null +++ b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.xml; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +import com.google.common.collect.ImmutableList; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; +import javax.xml.bind.annotation.XmlRootElement; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.StandardCoder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.testing.CoderProperties; +import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Unit tests for {@link JAXBCoder}. */ +@RunWith(JUnit4.class) +public class JAXBCoderTest { + + @XmlRootElement + static class TestType { + private String testString = null; + private int testInt; + + public TestType() {} + + public TestType(String testString, int testInt) { + this.testString = testString; + this.testInt = testInt; + } + + public String getTestString() { + return testString; + } + + public void setTestString(String testString) { + this.testString = testString; + } + + public int getTestInt() { + return testInt; + } + + public void setTestInt(int testInt) { + this.testInt = testInt; + } + + @Override + public int hashCode() { + int hashCode = 1; + hashCode = 31 * hashCode + (testString == null ? 0 : testString.hashCode()); + hashCode = 31 * hashCode + testInt; + return hashCode; + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof TestType)) { + return false; + } + + TestType other = (TestType) obj; + return (testString == null || testString.equals(other.testString)) + && (testInt == other.testInt); + } + } + + @Test + public void testEncodeDecodeOuter() throws Exception { + JAXBCoder<TestType> coder = JAXBCoder.of(TestType.class); + + byte[] encoded = CoderUtils.encodeToByteArray(coder, new TestType("abc", 9999)); + assertEquals(new TestType("abc", 9999), CoderUtils.decodeFromByteArray(coder, encoded)); + } + + @Test + public void testEncodeDecodeAfterClone() throws Exception { + JAXBCoder<TestType> coder = SerializableUtils.clone(JAXBCoder.of(TestType.class)); + + byte[] encoded = CoderUtils.encodeToByteArray(coder, new TestType("abc", 9999)); + assertEquals(new TestType("abc", 9999), CoderUtils.decodeFromByteArray(coder, encoded)); + } + + @Test + public void testEncodeDecodeNested() throws Exception { + JAXBCoder<TestType> jaxbCoder = JAXBCoder.of(TestType.class); + TestCoder nesting = new TestCoder(jaxbCoder); + + byte[] encoded = CoderUtils.encodeToByteArray(nesting, new TestType("abc", 9999)); + assertEquals( + new TestType("abc", 9999), CoderUtils.decodeFromByteArray(nesting, encoded)); + } + + @Test + public void testEncodeDecodeMultithreaded() throws Throwable { + final JAXBCoder<TestType> coder = JAXBCoder.of(TestType.class); + int numThreads = 100; + + final CountDownLatch ready = new CountDownLatch(numThreads); + final CountDownLatch start = new CountDownLatch(1); + final CountDownLatch done = new CountDownLatch(numThreads); + + final AtomicReference<Throwable> thrown = new AtomicReference<>(); + + Executor executor = Executors.newCachedThreadPool(); + for (int i = 0; i < numThreads; i++) { + final TestType elem = new TestType("abc", i); + final int index = i; + executor.execute( + new Runnable() { + @Override + public void run() { + ready.countDown(); + try { + start.await(); + } catch (InterruptedException e) { + } + + try { + byte[] encoded = CoderUtils.encodeToByteArray(coder, elem); + assertEquals( + new TestType("abc", index), CoderUtils.decodeFromByteArray(coder, encoded)); + } catch (Throwable e) { + thrown.compareAndSet(null, e); + } + done.countDown(); + } + }); + } + ready.await(); + start.countDown(); + + done.await(); + Throwable actuallyThrown = thrown.get(); + if (actuallyThrown != null) { + throw actuallyThrown; + } + } + + /** + * A coder that surrounds the value with two values, to demonstrate nesting. + */ + private static class TestCoder extends StandardCoder<TestType> { + private final JAXBCoder<TestType> jaxbCoder; + public TestCoder(JAXBCoder<TestType> jaxbCoder) { + this.jaxbCoder = jaxbCoder; + } + + @Override + public void encode(TestType value, OutputStream outStream, Context context) + throws CoderException, IOException { + Context nestedContext = context.nested(); + VarIntCoder.of().encode(3, outStream, nestedContext); + jaxbCoder.encode(value, outStream, nestedContext); + VarLongCoder.of().encode(22L, outStream, context); + } + + @Override + public TestType decode(InputStream inStream, Context context) + throws CoderException, IOException { + Context nestedContext = context.nested(); + VarIntCoder.of().decode(inStream, nestedContext); + TestType result = jaxbCoder.decode(inStream, nestedContext); + VarLongCoder.of().decode(inStream, context); + return result; + } + + @Override + public List<? extends Coder<?>> getCoderArguments() { + return ImmutableList.of(jaxbCoder); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + jaxbCoder.verifyDeterministic(); + } + } + + @Test + public void testEncodable() throws Exception { + CoderProperties.coderSerializable(JAXBCoder.of(TestType.class)); + } + + @Test + public void testEncodingId() throws Exception { + Coder<TestType> coder = JAXBCoder.of(TestType.class); + CoderProperties.coderHasEncodingId( + coder, TestType.class.getName()); + } + + @Test + public void testEncodedTypeDescriptor() throws Exception { + assertThat( + JAXBCoder.of(TestType.class).getEncodedTypeDescriptor(), + equalTo(TypeDescriptor.of(TestType.class))); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/393a90c7/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java new file mode 100644 index 0000000..a6e1b87 --- /dev/null +++ b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.xml; + +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import com.google.common.collect.Lists; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileOutputStream; +import java.io.FileReader; +import java.nio.channels.WritableByteChannel; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; +import javax.xml.bind.annotation.XmlType; +import org.apache.beam.sdk.io.xml.XmlSink.XmlWriteOperation; +import org.apache.beam.sdk.io.xml.XmlSink.XmlWriter; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for XmlSink. + */ +@RunWith(JUnit4.class) +public class XmlSinkTest { + @Rule + public TemporaryFolder tmpFolder = new TemporaryFolder(); + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + private String testRootElement = "testElement"; + private String testFilePrefix = "/path/to/testPrefix"; + + /** + * An XmlWriter correctly writes objects as Xml elements with an enclosing root element. + */ + @Test + public void testXmlWriter() throws Exception { + PipelineOptions options = PipelineOptionsFactory.create(); + XmlWriteOperation<Bird> writeOp = + XmlIO.<Bird>write() + .toFilenamePrefix(testFilePrefix) + .withRecordClass(Bird.class) + .withRootElement("birds") + .createSink() + .createWriteOperation(options); + XmlWriter<Bird> writer = writeOp.createWriter(options); + + List<Bird> bundle = + Lists.newArrayList(new Bird("bemused", "robin"), new Bird("evasive", "goose")); + List<String> lines = Arrays.asList("<birds>", "<bird>", "<species>robin</species>", + "<adjective>bemused</adjective>", "</bird>", "<bird>", "<species>goose</species>", + "<adjective>evasive</adjective>", "</bird>", "</birds>"); + runTestWrite(writer, bundle, lines); + } + + /** + * Builder methods correctly initialize an XML Sink. + */ + @Test + public void testBuildXmlWriteTransform() { + XmlIO.Write<Bird> write = + XmlIO.<Bird>write() + .toFilenamePrefix(testFilePrefix) + .withRecordClass(Bird.class) + .withRootElement(testRootElement); + assertEquals(Bird.class, write.getRecordClass()); + assertEquals(testRootElement, write.getRootElement()); + assertEquals(testFilePrefix, write.getFilenamePrefix()); + } + + /** Validation ensures no fields are missing. */ + @Test + public void testValidateXmlSinkMissingRecordClass() { + thrown.expect(NullPointerException.class); + XmlIO.<Bird>write() + .withRootElement(testRootElement) + .toFilenamePrefix(testFilePrefix) + .validate(null); + } + + @Test + public void testValidateXmlSinkMissingRootElement() { + thrown.expect(NullPointerException.class); + XmlIO.<Bird>write().withRecordClass(Bird.class).toFilenamePrefix(testFilePrefix).validate(null); + } + + @Test + public void testValidateXmlSinkMissingFilePrefix() { + thrown.expect(NullPointerException.class); + XmlIO.<Bird>write().withRecordClass(Bird.class).withRootElement(testRootElement).validate(null); + } + + /** + * An XML Sink correctly creates an XmlWriteOperation. + */ + @Test + public void testCreateWriteOperations() { + PipelineOptions options = PipelineOptionsFactory.create(); + XmlSink<Bird> sink = + XmlIO.<Bird>write() + .withRecordClass(Bird.class) + .withRootElement(testRootElement) + .toFilenamePrefix(testFilePrefix) + .createSink(); + XmlWriteOperation<Bird> writeOp = sink.createWriteOperation(options); + Path outputPath = new File(testFilePrefix).toPath(); + Path tempPath = new File(writeOp.getTemporaryDirectory()).toPath(); + assertEquals(outputPath.getParent(), tempPath.getParent()); + assertThat( + tempPath.getFileName().toString(), containsString("temp-beam-" + outputPath.getFileName())); + } + + /** + * An XmlWriteOperation correctly creates an XmlWriter. + */ + @Test + public void testCreateWriter() throws Exception { + PipelineOptions options = PipelineOptionsFactory.create(); + XmlWriteOperation<Bird> writeOp = + XmlIO.<Bird>write() + .withRecordClass(Bird.class) + .withRootElement(testRootElement) + .toFilenamePrefix(testFilePrefix) + .createSink() + .createWriteOperation(options); + XmlWriter<Bird> writer = writeOp.createWriter(options); + Path outputPath = new File(testFilePrefix).toPath(); + Path tempPath = new File(writer.getWriteOperation().getTemporaryDirectory()).toPath(); + assertEquals(outputPath.getParent(), tempPath.getParent()); + assertThat( + tempPath.getFileName().toString(), containsString("temp-beam-" + outputPath.getFileName())); + assertNotNull(writer.marshaller); + } + + @Test + public void testDisplayData() { + XmlIO.Write<Integer> write = XmlIO.<Integer>write() + .toFilenamePrefix("foobar") + .withRootElement("bird") + .withRecordClass(Integer.class); + + DisplayData displayData = DisplayData.from(write); + + assertThat(displayData, hasDisplayItem("fileNamePattern", "foobar-SSSSS-of-NNNNN.xml")); + assertThat(displayData, hasDisplayItem("rootElement", "bird")); + assertThat(displayData, hasDisplayItem("recordClass", Integer.class)); + } + + /** + * Write a bundle with an XmlWriter and verify the output is expected. + */ + private <T> void runTestWrite(XmlWriter<T> writer, List<T> bundle, List<String> expected) + throws Exception { + File tmpFile = tmpFolder.newFile("foo.txt"); + try (FileOutputStream fileOutputStream = new FileOutputStream(tmpFile)) { + writeBundle(writer, bundle, fileOutputStream.getChannel()); + } + List<String> lines = new ArrayList<>(); + try (BufferedReader reader = new BufferedReader(new FileReader(tmpFile))) { + for (;;) { + String line = reader.readLine(); + if (line == null) { + break; + } + line = line.trim(); + if (line.length() > 0) { + lines.add(line); + } + } + assertEquals(expected, lines); + } + } + + /** + * Write a bundle with an XmlWriter. + */ + private <T> void writeBundle(XmlWriter<T> writer, List<T> elements, WritableByteChannel channel) + throws Exception { + writer.prepareWrite(channel); + writer.writeHeader(); + for (T elem : elements) { + writer.write(elem); + } + writer.writeFooter(); + } + + /** + * Test JAXB annotated class. + */ + @SuppressWarnings("unused") + @XmlRootElement(name = "bird") + @XmlType(propOrder = {"name", "adjective"}) + private static final class Bird { + private String name; + private String adjective; + + @XmlElement(name = "species") + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getAdjective() { + return adjective; + } + + public void setAdjective(String adjective) { + this.adjective = adjective; + } + + public Bird() {} + + public Bird(String adjective, String name) { + this.adjective = adjective; + this.name = name; + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/393a90c7/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSourceTest.java b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSourceTest.java new file mode 100644 index 0000000..5b33be3 --- /dev/null +++ b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSourceTest.java @@ -0,0 +1,893 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.xml; + +import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionExhaustive; +import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionFails; +import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionSucceedsAndConsistent; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static org.hamcrest.Matchers.both; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.containsString; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +import com.google.common.collect.ImmutableList; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import javax.xml.bind.annotation.XmlAttribute; +import javax.xml.bind.annotation.XmlRootElement; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.Source.Reader; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.values.PCollection; +import org.hamcrest.Matchers; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests XmlSource. + */ +@RunWith(JUnit4.class) +public class XmlSourceTest { + + @Rule + public TestPipeline p = TestPipeline.create(); + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Rule + public ExpectedException exception = ExpectedException.none(); + + String tinyXML = + "<trains><train><name>Thomas</name></train><train><name>Henry</name></train>" + + "<train><name>James</name></train></trains>"; + + String xmlWithMultiByteElementName = + "<දà·à¶¸à·à¶»à·à¶ºà¶±à·><දà·à¶¸à·à¶»à·à¶º><name>Thomas</name></දà·à¶¸à·à¶»à·à¶º><දà·à¶¸à·à¶»à·à¶º><name>Henry</name></දà·à¶¸à·à¶»à·à¶º>" + + "<දà·à¶¸à·à¶»à·à¶º><name>James</name></දà·à¶¸à·à¶»à·à¶º></දà·à¶¸à·à¶»à·à¶ºà¶±à·>"; + + String xmlWithMultiByteChars = + "<trains><train><name>ThomasÂ¥</name></train><train><name>Hen¶ry</name></train>" + + "<train><name>JamÃes</name></train></trains>"; + + String trainXML = + "<trains>" + + "<train><name>Thomas</name><number>1</number><color>blue</color></train>" + + "<train><name>Henry</name><number>3</number><color>green</color></train>" + + "<train><name>Toby</name><number>7</number><color>brown</color></train>" + + "<train><name>Gordon</name><number>4</number><color>blue</color></train>" + + "<train><name>Emily</name><number>-1</number><color>red</color></train>" + + "<train><name>Percy</name><number>6</number><color>green</color></train>" + + "</trains>"; + + String trainXMLWithEmptyTags = + "<trains>" + + "<train/>" + + "<train><name>Thomas</name><number>1</number><color>blue</color></train>" + + "<train><name>Henry</name><number>3</number><color>green</color></train>" + + "<train/>" + + "<train><name>Toby</name><number>7</number><color>brown</color></train>" + + "<train><name>Gordon</name><number>4</number><color>blue</color></train>" + + "<train><name>Emily</name><number>-1</number><color>red</color></train>" + + "<train><name>Percy</name><number>6</number><color>green</color></train>" + + "</trains>"; + + String trainXMLWithAttributes = + "<trains>" + + "<train size=\"small\"><name>Thomas</name><number>1</number><color>blue</color></train>" + + "<train size=\"big\"><name>Henry</name><number>3</number><color>green</color></train>" + + "<train size=\"small\"><name>Toby</name><number>7</number><color>brown</color></train>" + + "<train size=\"big\"><name>Gordon</name><number>4</number><color>blue</color></train>" + + "<train size=\"small\"><name>Emily</name><number>-1</number><color>red</color></train>" + + "<train size=\"small\"><name>Percy</name><number>6</number><color>green</color></train>" + + "</trains>"; + + String trainXMLWithSpaces = + "<trains>" + + "<train><name>Thomas </name> <number>1</number><color>blue</color></train>" + + "<train><name>Henry</name><number>3</number><color>green</color></train>\n" + + "<train><name>Toby</name><number>7</number><color> brown </color></train> " + + "<train><name>Gordon</name> <number>4</number><color>blue</color>\n</train>\t" + + "<train><name>Emily</name><number>-1</number>\t<color>red</color></train>" + + "<train>\n<name>Percy</name> <number>6 </number> <color>green</color></train>" + + "</trains>"; + + String trainXMLWithAllFeaturesMultiByte = + "<දà·à¶¸à·à¶»à·à¶ºà¶±à·>" + + "<දà·à¶¸à·à¶»à·à¶º/>" + + "<දà·à¶¸à·à¶»à·à¶º size=\"small\"><name> ThomasÂ¥</name><number>1</number><color>blue</color>" + + "</දà·à¶¸à·à¶»à·à¶º>" + + "<දà·à¶¸à·à¶»à·à¶º size=\"big\"><name>He nry</name><number>3</number><color>green</color></දà·à¶¸à·à¶»à·à¶º>" + + "<දà·à¶¸à·à¶»à·à¶º size=\"small\"><name>Toby </name><number>7</number><color>br¶own</color>" + + "</දà·à¶¸à·à¶»à·à¶º>" + + "<දà·à¶¸à·à¶»à·à¶º/>" + + "<දà·à¶¸à·à¶»à·à¶º size=\"big\"><name>Gordon</name><number>4</number><color> blue</color></දà·à¶¸à·à¶»à·à¶º>" + + "<දà·à¶¸à·à¶»à·à¶º size=\"small\"><name>Emily</name><number>-1</number><color>red</color></දà·à¶¸à·à¶»à·à¶º>" + + "<දà·à¶¸à·à¶»à·à¶º size=\"small\"><name>Percy</name><number>6</number><color>green</color>" + + "</දà·à¶¸à·à¶»à·à¶º>" + + "</දà·à¶¸à·à¶»à·à¶ºà¶±à·>"; + + String trainXMLWithAllFeaturesSingleByte = + "<trains>" + + "<train/>" + + "<train size=\"small\"><name> Thomas</name><number>1</number><color>blue</color>" + + "</train>" + + "<train size=\"big\"><name>He nry</name><number>3</number><color>green</color></train>" + + "<train size=\"small\"><name>Toby </name><number>7</number><color>brown</color>" + + "</train>" + + "<train/>" + + "<train size=\"big\"><name>Gordon</name><number>4</number><color> blue</color></train>" + + "<train size=\"small\"><name>Emily</name><number>-1</number><color>red</color></train>" + + "<train size=\"small\"><name>Percy</name><number>6</number><color>green</color>" + + "</train>" + + "</trains>"; + + @XmlRootElement + static class Train { + public static final int TRAIN_NUMBER_UNDEFINED = -1; + public String name = null; + public String color = null; + public int number = TRAIN_NUMBER_UNDEFINED; + + @XmlAttribute(name = "size") + public String size = null; + + public Train() {} + + public Train(String name, int number, String color, String size) { + this.name = name; + this.number = number; + this.color = color; + this.size = size; + } + + @Override + public int hashCode() { + int hashCode = 1; + hashCode = 31 * hashCode + (name == null ? 0 : name.hashCode()); + hashCode = 31 * hashCode + number; + hashCode = 31 * hashCode + (color == null ? 0 : name.hashCode()); + hashCode = 31 * hashCode + (size == null ? 0 : name.hashCode()); + return hashCode; + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof Train)) { + return false; + } + + Train other = (Train) obj; + return (name == null || name.equals(other.name)) && (number == other.number) + && (color == null || color.equals(other.color)) + && (size == null || size.equals(other.size)); + } + + @Override + public String toString() { + String str = "Train["; + boolean first = true; + if (name != null) { + str = str + "name=" + name; + first = false; + } + if (number != Integer.MIN_VALUE) { + if (!first) { + str = str + ","; + } + str = str + "number=" + number; + first = false; + } + if (color != null) { + if (!first) { + str = str + ","; + } + str = str + "color=" + color; + first = false; + } + if (size != null) { + if (!first) { + str = str + ","; + } + str = str + "size=" + size; + } + str = str + "]"; + return str; + } + } + + private List<Train> generateRandomTrainList(int size) { + String[] names = {"Thomas", "Henry", "Gordon", "Emily", "Toby", "Percy", "Mavis", "Edward", + "Bertie", "Harold", "Hiro", "Terence", "Salty", "Trevor"}; + int[] numbers = {-1, 1, 2, 3, 4, 5, 6, 7, 8, 9}; + String[] colors = {"red", "blue", "green", "orange", "brown", "black", "white"}; + String[] sizes = {"small", "medium", "big"}; + + Random random = new Random(System.currentTimeMillis()); + + List<Train> trains = new ArrayList<>(); + for (int i = 0; i < size; i++) { + trains.add(new Train(names[random.nextInt(names.length - 1)], + numbers[random.nextInt(numbers.length - 1)], colors[random.nextInt(colors.length - 1)], + sizes[random.nextInt(sizes.length - 1)])); + } + + return trains; + } + + private String trainToXMLElement(Train train) { + return "<train size=\"" + train.size + "\"><name>" + train.name + "</name><number>" + + train.number + "</number><color>" + train.color + "</color></train>"; + } + + private File createRandomTrainXML(String fileName, List<Train> trains) throws IOException { + File file = tempFolder.newFile(fileName); + try (BufferedWriter writer = new BufferedWriter(new FileWriter(file))) { + writer.write("<trains>"); + writer.newLine(); + for (Train train : trains) { + String str = trainToXMLElement(train); + writer.write(str); + writer.newLine(); + } + writer.write("</trains>"); + writer.newLine(); + } + return file; + } + + private List<Train> readEverythingFromReader(Reader<Train> reader) throws IOException { + List<Train> results = new ArrayList<>(); + for (boolean available = reader.start(); available; available = reader.advance()) { + Train train = reader.getCurrent(); + results.add(train); + } + return results; + } + + @Test + public void testReadXMLTiny() throws IOException { + File file = tempFolder.newFile("trainXMLTiny"); + Files.write(file.toPath(), tinyXML.getBytes(StandardCharsets.UTF_8)); + + BoundedSource<Train> source = + XmlIO.<Train>read() + .from(file.toPath().toString()) + .withRootElement("trains") + .withRecordElement("train") + .withRecordClass(Train.class) + .withMinBundleSize(1024) + .createSource(); + + List<Train> expectedResults = ImmutableList.of( + new Train("Thomas", Train.TRAIN_NUMBER_UNDEFINED, null, null), + new Train("Henry", Train.TRAIN_NUMBER_UNDEFINED, null, null), + new Train("James", Train.TRAIN_NUMBER_UNDEFINED, null, null)); + + assertThat( + trainsToStrings(expectedResults), + containsInAnyOrder( + trainsToStrings(readEverythingFromReader(source.createReader(null))).toArray())); + } + + @Test + public void testReadXMLWithMultiByteChars() throws IOException { + File file = tempFolder.newFile("trainXMLTiny"); + Files.write(file.toPath(), xmlWithMultiByteChars.getBytes(StandardCharsets.UTF_8)); + + BoundedSource<Train> source = + XmlIO.<Train>read() + .from(file.toPath().toString()) + .withRootElement("trains") + .withRecordElement("train") + .withRecordClass(Train.class) + .withMinBundleSize(1024) + .createSource(); + + List<Train> expectedResults = ImmutableList.of( + new Train("ThomasÂ¥", Train.TRAIN_NUMBER_UNDEFINED, null, null), + new Train("Hen¶ry", Train.TRAIN_NUMBER_UNDEFINED, null, null), + new Train("JamÃes", Train.TRAIN_NUMBER_UNDEFINED, null, null)); + + assertThat( + trainsToStrings(expectedResults), + containsInAnyOrder( + trainsToStrings(readEverythingFromReader(source.createReader(null))).toArray())); + } + + @Test + @Ignore( + "Multi-byte characters in XML are not supported because the parser " + + "currently does not correctly report byte offsets") + public void testReadXMLWithMultiByteElementName() throws IOException { + File file = tempFolder.newFile("trainXMLTiny"); + Files.write(file.toPath(), xmlWithMultiByteElementName.getBytes(StandardCharsets.UTF_8)); + + BoundedSource<Train> source = + XmlIO.<Train>read() + .from(file.toPath().toString()) + .withRootElement("දà·à¶¸à·à¶»à·à¶ºà¶±à·") + .withRecordElement("දà·à¶¸à·à¶»à·à¶º") + .withRecordClass(Train.class) + .withMinBundleSize(1024) + .createSource(); + + List<Train> expectedResults = ImmutableList.of( + new Train("Thomas", Train.TRAIN_NUMBER_UNDEFINED, null, null), + new Train("Henry", Train.TRAIN_NUMBER_UNDEFINED, null, null), + new Train("James", Train.TRAIN_NUMBER_UNDEFINED, null, null)); + + assertThat( + trainsToStrings(expectedResults), + containsInAnyOrder( + trainsToStrings(readEverythingFromReader(source.createReader(null))).toArray())); + } + + @Test + public void testSplitWithEmptyBundleAtEnd() throws Exception { + File file = tempFolder.newFile("trainXMLTiny"); + Files.write(file.toPath(), tinyXML.getBytes(StandardCharsets.UTF_8)); + + BoundedSource<Train> source = + XmlIO.<Train>read() + .from(file.toPath().toString()) + .withRootElement("trains") + .withRecordElement("train") + .withRecordClass(Train.class) + .withMinBundleSize(10) + .createSource(); + List<? extends BoundedSource<Train>> splits = source.split(50, null); + + assertTrue(splits.size() > 2); + + List<Train> results = new ArrayList<>(); + for (BoundedSource<Train> split : splits) { + results.addAll(readEverythingFromReader(split.createReader(null))); + } + + List<Train> expectedResults = ImmutableList.of( + new Train("Thomas", Train.TRAIN_NUMBER_UNDEFINED, null, null), + new Train("Henry", Train.TRAIN_NUMBER_UNDEFINED, null, null), + new Train("James", Train.TRAIN_NUMBER_UNDEFINED, null, null)); + + assertThat( + trainsToStrings(expectedResults), containsInAnyOrder(trainsToStrings(results).toArray())); + } + + List<String> trainsToStrings(List<Train> input) { + List<String> strings = new ArrayList<>(); + for (Object data : input) { + strings.add(data.toString()); + } + return strings; + } + + @Test + public void testReadXMLSmall() throws IOException { + File file = tempFolder.newFile("trainXMLSmall"); + Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8)); + + BoundedSource<Train> source = + XmlIO.<Train>read() + .from(file.toPath().toString()) + .withRootElement("trains") + .withRecordElement("train") + .withRecordClass(Train.class) + .withMinBundleSize(1024) + .createSource(); + + List<Train> expectedResults = + ImmutableList.of(new Train("Thomas", 1, "blue", null), new Train("Henry", 3, "green", null), + new Train("Toby", 7, "brown", null), new Train("Gordon", 4, "blue", null), + new Train("Emily", -1, "red", null), new Train("Percy", 6, "green", null)); + + assertThat( + trainsToStrings(expectedResults), + containsInAnyOrder( + trainsToStrings(readEverythingFromReader(source.createReader(null))).toArray())); + } + + @Test + public void testReadXMLNoRootElement() throws IOException { + File file = tempFolder.newFile("trainXMLSmall"); + Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8)); + + BoundedSource<Train> source = + XmlIO.<Train>read() + .from(file.toPath().toString()) + .withRecordElement("train") + .withRecordClass(Train.class) + .createSource(); + + exception.expect(NullPointerException.class); + exception.expectMessage( + "rootElement is null. Use builder method withRootElement() to set this."); + readEverythingFromReader(source.createReader(null)); + } + + @Test + public void testReadXMLNoRecordElement() throws IOException { + File file = tempFolder.newFile("trainXMLSmall"); + Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8)); + + BoundedSource<Train> source = + XmlIO.<Train>read() + .from(file.toPath().toString()) + .withRootElement("trains") + .withRecordClass(Train.class) + .createSource(); + + exception.expect(NullPointerException.class); + exception.expectMessage( + "recordElement is null. Use builder method withRecordElement() to set this."); + readEverythingFromReader(source.createReader(null)); + } + + @Test + public void testReadXMLNoRecordClass() throws IOException { + File file = tempFolder.newFile("trainXMLSmall"); + Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8)); + + BoundedSource<Train> source = + XmlIO.<Train>read() + .from(file.toPath().toString()) + .withRootElement("trains") + .withRecordElement("train") + .createSource(); + + exception.expect(NullPointerException.class); + exception.expectMessage( + "recordClass is null. Use builder method withRecordClass() to set this."); + readEverythingFromReader(source.createReader(null)); + } + + @Test + public void testReadXMLIncorrectRootElement() throws IOException { + File file = tempFolder.newFile("trainXMLSmall"); + Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8)); + + BoundedSource<Train> source = + XmlIO.<Train>read() + .from(file.toPath().toString()) + .withRootElement("something") + .withRecordElement("train") + .withRecordClass(Train.class) + .createSource(); + + exception.expectMessage("Unexpected close tag </trains>; expected </something>."); + readEverythingFromReader(source.createReader(null)); + } + + @Test + public void testReadXMLIncorrectRecordElement() throws IOException { + File file = tempFolder.newFile("trainXMLSmall"); + Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8)); + + BoundedSource<Train> source = + XmlIO.<Train>read() + .from(file.toPath().toString()) + .withRootElement("trains") + .withRecordElement("something") + .withRecordClass(Train.class) + .createSource(); + + assertEquals(readEverythingFromReader(source.createReader(null)), new ArrayList<Train>()); + } + + @XmlRootElement + private static class WrongTrainType { + @SuppressWarnings("unused") + public String something; + } + + @Test + public void testReadXMLInvalidRecordClass() throws IOException { + File file = tempFolder.newFile("trainXMLSmall"); + Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8)); + + BoundedSource<WrongTrainType> source = + XmlIO.<WrongTrainType>read() + .from(file.toPath().toString()) + .withRootElement("trains") + .withRecordElement("train") + .withRecordClass(WrongTrainType.class) + .createSource(); + + exception.expect(RuntimeException.class); + + // JAXB internationalizes the error message. So this is all we can match for. + exception.expectMessage(both(containsString("name")).and(Matchers.containsString("something"))); + try (Reader<WrongTrainType> reader = source.createReader(null)) { + + List<WrongTrainType> results = new ArrayList<>(); + for (boolean available = reader.start(); available; available = reader.advance()) { + WrongTrainType train = reader.getCurrent(); + results.add(train); + } + } + } + + @Test + public void testReadXMLNoBundleSize() throws IOException { + File file = tempFolder.newFile("trainXMLSmall"); + Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8)); + + BoundedSource<Train> source = + XmlIO.<Train>read() + .from(file.toPath().toString()) + .withRootElement("trains") + .withRecordElement("train") + .withRecordClass(Train.class) + .createSource(); + + List<Train> expectedResults = + ImmutableList.of(new Train("Thomas", 1, "blue", null), new Train("Henry", 3, "green", null), + new Train("Toby", 7, "brown", null), new Train("Gordon", 4, "blue", null), + new Train("Emily", -1, "red", null), new Train("Percy", 6, "green", null)); + + assertThat( + trainsToStrings(expectedResults), + containsInAnyOrder( + trainsToStrings(readEverythingFromReader(source.createReader(null))).toArray())); + } + + + @Test + public void testReadXMLWithEmptyTags() throws IOException { + File file = tempFolder.newFile("trainXMLSmall"); + Files.write(file.toPath(), trainXMLWithEmptyTags.getBytes(StandardCharsets.UTF_8)); + + BoundedSource<Train> source = + XmlIO.<Train>read() + .from(file.toPath().toString()) + .withRootElement("trains") + .withRecordElement("train") + .withRecordClass(Train.class) + .withMinBundleSize(1024) + .createSource(); + + List<Train> expectedResults = ImmutableList.of(new Train("Thomas", 1, "blue", null), + new Train("Henry", 3, "green", null), new Train("Toby", 7, "brown", null), + new Train("Gordon", 4, "blue", null), new Train("Emily", -1, "red", null), + new Train("Percy", 6, "green", null), new Train(), new Train()); + + assertThat( + trainsToStrings(expectedResults), + containsInAnyOrder( + trainsToStrings(readEverythingFromReader(source.createReader(null))).toArray())); + } + + @Test + @Category(NeedsRunner.class) + public void testReadXMLSmallPipeline() throws IOException { + File file = tempFolder.newFile("trainXMLSmall"); + Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8)); + + PCollection<Train> output = + p.apply( + "ReadFileData", + XmlIO.<Train>read() + .from(file.toPath().toString()) + .withRootElement("trains") + .withRecordElement("train") + .withRecordClass(Train.class) + .withMinBundleSize(1024)); + + List<Train> expectedResults = + ImmutableList.of(new Train("Thomas", 1, "blue", null), new Train("Henry", 3, "green", null), + new Train("Toby", 7, "brown", null), new Train("Gordon", 4, "blue", null), + new Train("Emily", -1, "red", null), new Train("Percy", 6, "green", null)); + + PAssert.that(output).containsInAnyOrder(expectedResults); + p.run(); + } + + @Test + public void testReadXMLWithAttributes() throws IOException { + File file = tempFolder.newFile("trainXMLSmall"); + Files.write(file.toPath(), trainXMLWithAttributes.getBytes(StandardCharsets.UTF_8)); + + BoundedSource<Train> source = + XmlIO.<Train>read() + .from(file.toPath().toString()) + .withRootElement("trains") + .withRecordElement("train") + .withRecordClass(Train.class) + .withMinBundleSize(1024) + .createSource(); + + List<Train> expectedResults = ImmutableList.of(new Train("Thomas", 1, "blue", "small"), + new Train("Henry", 3, "green", "big"), new Train("Toby", 7, "brown", "small"), + new Train("Gordon", 4, "blue", "big"), new Train("Emily", -1, "red", "small"), + new Train("Percy", 6, "green", "small")); + + assertThat( + trainsToStrings(expectedResults), + containsInAnyOrder( + trainsToStrings(readEverythingFromReader(source.createReader(null))).toArray())); + } + + @Test + public void testReadXMLWithWhitespaces() throws IOException { + File file = tempFolder.newFile("trainXMLSmall"); + Files.write(file.toPath(), trainXMLWithSpaces.getBytes(StandardCharsets.UTF_8)); + + BoundedSource<Train> source = + XmlIO.<Train>read() + .from(file.toPath().toString()) + .withRootElement("trains") + .withRecordElement("train") + .withRecordClass(Train.class) + .withMinBundleSize(1024) + .createSource(); + + List<Train> expectedResults = ImmutableList.of(new Train("Thomas ", 1, "blue", null), + new Train("Henry", 3, "green", null), new Train("Toby", 7, " brown ", null), + new Train("Gordon", 4, "blue", null), new Train("Emily", -1, "red", null), + new Train("Percy", 6, "green", null)); + + assertThat( + trainsToStrings(expectedResults), + containsInAnyOrder( + trainsToStrings(readEverythingFromReader(source.createReader(null))).toArray())); + } + + @Test + public void testReadXMLLarge() throws IOException { + String fileName = "temp.xml"; + List<Train> trains = generateRandomTrainList(100); + File file = createRandomTrainXML(fileName, trains); + + BoundedSource<Train> source = + XmlIO.<Train>read() + .from(file.toPath().toString()) + .withRootElement("trains") + .withRecordElement("train") + .withRecordClass(Train.class) + .withMinBundleSize(1024) + .createSource(); + + assertThat( + trainsToStrings(trains), + containsInAnyOrder( + trainsToStrings(readEverythingFromReader(source.createReader(null))).toArray())); + } + + @Test + @Category(NeedsRunner.class) + public void testReadXMLLargePipeline() throws IOException { + String fileName = "temp.xml"; + List<Train> trains = generateRandomTrainList(100); + File file = createRandomTrainXML(fileName, trains); + + PCollection<Train> output = + p.apply( + "ReadFileData", + XmlIO.<Train>read() + .from(file.toPath().toString()) + .withRootElement("trains") + .withRecordElement("train") + .withRecordClass(Train.class) + .withMinBundleSize(1024)); + + PAssert.that(output).containsInAnyOrder(trains); + p.run(); + } + + @Test + public void testSplitWithEmptyBundles() throws Exception { + String fileName = "temp.xml"; + List<Train> trains = generateRandomTrainList(10); + File file = createRandomTrainXML(fileName, trains); + + BoundedSource<Train> source = + XmlIO.<Train>read() + .from(file.toPath().toString()) + .withRootElement("trains") + .withRecordElement("train") + .withRecordClass(Train.class) + .withMinBundleSize(10) + .createSource(); + List<? extends BoundedSource<Train>> splits = source.split(100, null); + + assertTrue(splits.size() > 2); + + List<Train> results = new ArrayList<>(); + for (BoundedSource<Train> split : splits) { + results.addAll(readEverythingFromReader(split.createReader(null))); + } + + assertThat(trainsToStrings(trains), containsInAnyOrder(trainsToStrings(results).toArray())); + } + + @Test + public void testXMLWithSplits() throws Exception { + String fileName = "temp.xml"; + List<Train> trains = generateRandomTrainList(100); + File file = createRandomTrainXML(fileName, trains); + + BoundedSource<Train> source = + XmlIO.<Train>read() + .from(file.toPath().toString()) + .withRootElement("trains") + .withRecordElement("train") + .withRecordClass(Train.class) + .withMinBundleSize(10) + .createSource(); + List<? extends BoundedSource<Train>> splits = source.split(256, null); + + // Not a trivial split + assertTrue(splits.size() > 2); + + List<Train> results = new ArrayList<>(); + for (BoundedSource<Train> split : splits) { + results.addAll(readEverythingFromReader(split.createReader(null))); + } + assertThat(trainsToStrings(trains), containsInAnyOrder(trainsToStrings(results).toArray())); + } + + @Test + public void testSplitAtFraction() throws Exception { + PipelineOptions options = PipelineOptionsFactory.create(); + String fileName = "temp.xml"; + List<Train> trains = generateRandomTrainList(100); + File file = createRandomTrainXML(fileName, trains); + + BoundedSource<Train> fileSource = + XmlIO.<Train>read() + .from(file.toPath().toString()) + .withRootElement("trains") + .withRecordElement("train") + .withRecordClass(Train.class) + .withMinBundleSize(10) + .createSource(); + + List<? extends BoundedSource<Train>> splits = + fileSource.split(file.length() / 3, null); + for (BoundedSource<Train> splitSource : splits) { + int numItems = readEverythingFromReader(splitSource.createReader(null)).size(); + // Should not split while unstarted. + assertSplitAtFractionFails(splitSource, 0, 0.7, options); + assertSplitAtFractionSucceedsAndConsistent(splitSource, 1, 0.7, options); + assertSplitAtFractionSucceedsAndConsistent(splitSource, 15, 0.7, options); + assertSplitAtFractionFails(splitSource, 0, 0.0, options); + assertSplitAtFractionFails(splitSource, 20, 0.3, options); + assertSplitAtFractionFails(splitSource, numItems, 1.0, options); + + // After reading 100 elements we will be approximately at position + // 0.99 * (endOffset - startOffset) hence trying to split at fraction 0.9 will be + // unsuccessful. + assertSplitAtFractionFails(splitSource, numItems, 0.9, options); + + // Following passes since we can always find a fraction that is extremely close to 1 such that + // the position suggested by the fraction will be larger than the position the reader is at + // after reading "items - 1" elements. + // This also passes for "numItemsToReadBeforeSplit = items" if the position at suggested + // fraction is larger than the position the reader is at after reading all "items" elements + // (i.e., the start position of the last element). This is true for most cases but will not + // be true if reader position is only one less than the end position. (i.e., the last element + // of the bundle start at the last byte that belongs to the bundle). + assertSplitAtFractionSucceedsAndConsistent(splitSource, numItems - 1, 0.999, options); + } + } + + @Test + public void testSplitAtFractionExhaustiveSingleByte() throws Exception { + PipelineOptions options = PipelineOptionsFactory.create(); + File file = tempFolder.newFile("trainXMLSmall"); + Files.write(file.toPath(), trainXMLWithAllFeaturesSingleByte.getBytes(StandardCharsets.UTF_8)); + + BoundedSource<Train> source = + XmlIO.<Train>read() + .from(file.toPath().toString()) + .withRootElement("trains") + .withRecordElement("train") + .withRecordClass(Train.class) + .createSource(); + assertSplitAtFractionExhaustive(source, options); + } + + @Test + @Ignore( + "Multi-byte characters in XML are not supported because the parser " + + "currently does not correctly report byte offsets") + public void testSplitAtFractionExhaustiveMultiByte() throws Exception { + PipelineOptions options = PipelineOptionsFactory.create(); + File file = tempFolder.newFile("trainXMLSmall"); + Files.write(file.toPath(), trainXMLWithAllFeaturesMultiByte.getBytes(StandardCharsets.UTF_8)); + + BoundedSource<Train> source = + XmlIO.<Train>read() + .from(file.toPath().toString()) + .withRootElement("දà·à¶¸à·à¶»à·à¶ºà¶±à·") + .withRecordElement("දà·à¶¸à·à¶»à·à¶º") + .withRecordClass(Train.class) + .createSource(); + assertSplitAtFractionExhaustive(source, options); + } + + @Test + @Category(NeedsRunner.class) + public void testReadXMLFilePattern() throws IOException { + List<Train> trains1 = generateRandomTrainList(20); + File file = createRandomTrainXML("temp1.xml", trains1); + List<Train> trains2 = generateRandomTrainList(10); + createRandomTrainXML("temp2.xml", trains2); + List<Train> trains3 = generateRandomTrainList(15); + createRandomTrainXML("temp3.xml", trains3); + generateRandomTrainList(8); + createRandomTrainXML("otherfile.xml", trains1); + + PCollection<Train> output = + p.apply( + "ReadFileData", + XmlIO.<Train>read() + .from(file.getParent() + "/" + "temp*.xml") + .withRootElement("trains") + .withRecordElement("train") + .withRecordClass(Train.class) + .withMinBundleSize(1024)); + + List<Train> expectedResults = new ArrayList<>(); + expectedResults.addAll(trains1); + expectedResults.addAll(trains2); + expectedResults.addAll(trains3); + + PAssert.that(output).containsInAnyOrder(expectedResults); + p.run(); + } + + @Test + public void testDisplayData() { + DisplayData displayData = + DisplayData.from( + XmlIO.<Integer>read() + .from("foo.xml") + .withRootElement("bird") + .withRecordElement("cat") + .withMinBundleSize(1234) + .withRecordClass(Integer.class)); + + assertThat(displayData, hasDisplayItem("filePattern", "foo.xml")); + assertThat(displayData, hasDisplayItem("rootElement", "bird")); + assertThat(displayData, hasDisplayItem("recordElement", "cat")); + assertThat(displayData, hasDisplayItem("recordClass", Integer.class)); + assertThat(displayData, hasDisplayItem("minBundleSize", 1234)); + } +}