Repository: beam Updated Branches: refs/heads/master 80b9cf9c2 -> c3bcd4b42
Introduces XmlIO.readFiles Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/abda38dc Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/abda38dc Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/abda38dc Branch: refs/heads/master Commit: abda38dc70cdaf107b96e3c3f4322160fe9fa8f7 Parents: 513d26c Author: Eugene Kirpichov <kirpic...@google.com> Authored: Thu Aug 31 17:21:20 2017 -0700 Committer: Eugene Kirpichov <ekirpic...@gmail.com> Committed: Sun Sep 3 16:32:25 2017 -0700 ---------------------------------------------------------------------- .../beam/sdk/io/ReadAllViaFileBasedSource.java | 22 +- .../java/org/apache/beam/sdk/io/xml/XmlIO.java | 280 +++++++++++++------ .../org/apache/beam/sdk/io/xml/XmlSource.java | 63 ++--- .../apache/beam/sdk/io/xml/XmlSourceTest.java | 74 ++--- 4 files changed, 253 insertions(+), 186 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/abda38dc/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java index 03b9b55..03cdbb1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.io; import java.io.IOException; import java.util.concurrent.ThreadLocalRandom; +import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.FileIO.ReadableFile; import org.apache.beam.sdk.io.fs.MatchResult.Metadata; @@ -34,21 +35,23 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; /** - * Reads each file in the input {@link PCollection} of {@link Metadata} using given parameters for - * splitting files into offset ranges and for creating a {@link FileBasedSource} for a file. The + * Reads each file in the input {@link PCollection} of {@link ReadableFile} using given parameters + * for splitting files into offset ranges and for creating a {@link FileBasedSource} for a file. The * input {@link PCollection} must not contain {@link ResourceId#isDirectory directories}. * - * <p>To obtain the collection of {@link Metadata} from a filepattern, use {@link FileIO#match} or - * {@link FileIO#matchAll}. + * <p>To obtain the collection of {@link ReadableFile} from a filepattern, use {@link + * FileIO#readMatches()}. */ -class ReadAllViaFileBasedSource<T> extends PTransform<PCollection<ReadableFile>, PCollection<T>> { +@Experimental(Experimental.Kind.SOURCE_SINK) +public class ReadAllViaFileBasedSource<T> + extends PTransform<PCollection<ReadableFile>, PCollection<T>> { private final long desiredBundleSizeBytes; - private final SerializableFunction<String, FileBasedSource<T>> createSource; + private final SerializableFunction<String, ? extends FileBasedSource<T>> createSource; private final Coder<T> coder; public ReadAllViaFileBasedSource( long desiredBundleSizeBytes, - SerializableFunction<String, FileBasedSource<T>> createSource, + SerializableFunction<String, ? extends FileBasedSource<T>> createSource, Coder<T> coder) { this.desiredBundleSizeBytes = desiredBundleSizeBytes; this.createSource = createSource; @@ -111,9 +114,10 @@ class ReadAllViaFileBasedSource<T> extends PTransform<PCollection<ReadableFile>, } private static class ReadFileRangesFn<T> extends DoFn<KV<ReadableFile, OffsetRange>, T> { - private final SerializableFunction<String, FileBasedSource<T>> createSource; + private final SerializableFunction<String, ? extends FileBasedSource<T>> createSource; - private ReadFileRangesFn(SerializableFunction<String, FileBasedSource<T>> createSource) { + private ReadFileRangesFn( + SerializableFunction<String, ? extends FileBasedSource<T>> createSource) { this.createSource = createSource; } http://git-wip-us.apache.org/repos/asf/beam/blob/abda38dc/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java index 98559c2..749da51 100644 --- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java +++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java @@ -21,23 +21,28 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; +import java.io.Serializable; import java.nio.charset.Charset; import javax.annotation.Nullable; import javax.xml.bind.JAXBContext; import javax.xml.bind.JAXBException; import javax.xml.bind.ValidationEventHandler; -import org.apache.beam.sdk.PipelineRunner; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.CompressedSource; import org.apache.beam.sdk.io.Compression; import org.apache.beam.sdk.io.FileBasedSink; +import org.apache.beam.sdk.io.FileIO; +import org.apache.beam.sdk.io.FileIO.ReadableFile; import org.apache.beam.sdk.io.OffsetBasedSource; +import org.apache.beam.sdk.io.ReadAllViaFileBasedSource; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; @@ -46,10 +51,9 @@ import org.apache.beam.sdk.values.PDone; public class XmlIO { // CHECKSTYLE.OFF: JavadocStyle /** - * Reads XML files. This source reads one or more XML files and creates a {@link PCollection} of a - * given type. Please note the example given below. + * Reads XML files as a {@link PCollection} of a given type mapped via JAXB. * - * <p>The XML file must be of the following form, where {@code root} and {@code record} are XML + * <p>The XML files must be of the following form, where {@code root} and {@code record} are XML * element names that are defined by the user: * * <pre>{@code @@ -74,7 +78,7 @@ public class XmlIO { * Reading the source will generate a {@code PCollection} of the given JAXB annotated Java type. * Optionally users may provide a minimum size of a bundle that should be created for the source. * - * <p>The following example shows how to use this method in a Beam pipeline: + * <p>Example: * * <pre>{@code * PCollection<String> output = p.apply(XmlIO.<Record>read() @@ -84,38 +88,48 @@ public class XmlIO { * .withRecordClass(Record.class)); * }</pre> * - * <p>By default, UTF-8 charset is used. If your file is using a different charset, you have to - * specify the following: - * - * <pre>{@code - * PCollection<String> output = p.apply(XmlIO.<Record>read() - * .from(file.toPath().toString()) - * .withRooElement("root") - * .withRecordElement("record") - * .withRecordClass(Record.class) - * .withCharset(StandardCharsets.ISO_8859_1)); - * }</pre> - * - * <p>{@link java.nio.charset.StandardCharsets} provides static references to common charsets. + * <p>By default, UTF-8 charset is used. To specify a different charset, use {@link + * Read#withCharset}. * * <p>Currently, only XML files that use single-byte characters are supported. Using a file that * contains multi-byte characters may result in data loss or duplication. * - * <h3>Permissions</h3> - * - * <p>Permission requirements depend on the {@link PipelineRunner - * PipelineRunner} that is used to execute the Beam pipeline. Please refer to the documentation of - * corresponding {@link PipelineRunner PipelineRunners} for more details. - * * @param <T> Type of the objects that represent the records of the XML file. The {@code * PCollection} generated by this source will be of this type. */ // CHECKSTYLE.ON: JavadocStyle public static <T> Read<T> read() { return new AutoValue_XmlIO_Read.Builder<T>() - .setMinBundleSize(Read.DEFAULT_MIN_BUNDLE_SIZE) + .setConfiguration( + new AutoValue_XmlIO_MappingConfiguration.Builder<T>().setCharset("UTF-8").build()) + .setMinBundleSize(1L) .setCompression(Compression.AUTO) - .setCharset("UTF-8") + .build(); + } + + /** + * Like {@link #read}, but reads each file in a {@link PCollection} of {@link ReadableFile}, which + * allows more flexible usage via different configuration options of {@link FileIO#match} and + * {@link FileIO#readMatches} that are not explicitly provided for {@link #read}. + * + * <p>For example: + * + * <pre>{@code + * PCollection<ReadableFile> files = p + * .apply(FileIO.match().filepattern(options.getInputFilepatternProvider()).continuously( + * Duration.standardSeconds(30), afterTimeSinceNewOutput(Duration.standardMinutes(5)))) + * .apply(FileIO.readMatches().withCompression(GZIP)); + * + * PCollection<String> output = files.apply(XmlIO.<Record>readFiles() + * .withRootElement("root") + * .withRecordElement("record") + * .withRecordClass(Record.class)); + * }</pre> + */ + public static <T> ReadFiles<T> readFiles() { + return new AutoValue_XmlIO_ReadFiles.Builder<T>() + .setConfiguration( + new AutoValue_XmlIO_MappingConfiguration.Builder<T>().setCharset("UTF-8").build()) .build(); } @@ -231,52 +245,92 @@ public class XmlIO { return new AutoValue_XmlIO_Write.Builder<T>().setCharset("UTF-8").build(); } - /** Implementation of {@link #read}. */ @AutoValue - public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> { - private static final int DEFAULT_MIN_BUNDLE_SIZE = 8 * 1024; - - @Nullable - abstract String getFileOrPatternSpec(); - - @Nullable - abstract String getRootElement(); + abstract static class MappingConfiguration<T> implements HasDisplayData, Serializable { + @Nullable abstract String getRootElement(); + @Nullable abstract String getRecordElement(); + @Nullable abstract Class<T> getRecordClass(); + @Nullable abstract String getCharset(); + @Nullable abstract ValidationEventHandler getValidationEventHandler(); - @Nullable - abstract String getRecordElement(); + abstract Builder<T> toBuilder(); - @Nullable - abstract Class<T> getRecordClass(); + @AutoValue.Builder + abstract static class Builder<T> { + abstract Builder<T> setRootElement(String rootElement); + abstract Builder<T> setRecordElement(String recordElement); + abstract Builder<T> setRecordClass(Class<T> recordClass); + abstract Builder<T> setCharset(String charset); + abstract Builder<T> setValidationEventHandler(ValidationEventHandler validationEventHandler); - abstract Compression getCompression(); + abstract MappingConfiguration<T> build(); + } - abstract long getMinBundleSize(); + private MappingConfiguration<T> withRootElement(String rootElement) { + return toBuilder().setRootElement(rootElement).build(); + } - @Nullable - abstract String getCharset(); + private MappingConfiguration<T> withRecordElement(String recordElement) { + return toBuilder().setRecordElement(recordElement).build(); + } - abstract Builder<T> toBuilder(); + private MappingConfiguration<T> withRecordClass(Class<T> recordClass) { + return toBuilder().setRecordClass(recordClass).build(); + } - @Nullable - abstract ValidationEventHandler getValidationEventHandler(); + private MappingConfiguration<T> withCharset(Charset charset) { + return toBuilder().setCharset(charset.name()).build(); + } - @AutoValue.Builder - abstract static class Builder<T> { - abstract Builder<T> setFileOrPatternSpec(String fileOrPatternSpec); + private MappingConfiguration<T> withValidationEventHandler( + ValidationEventHandler validationEventHandler) { + return toBuilder().setValidationEventHandler(validationEventHandler).build(); + } - abstract Builder<T> setRootElement(String rootElement); + private void validate() { + checkNotNull( + getRootElement(), + "rootElement is null. Use builder method withRootElement() to set this."); + checkNotNull( + getRecordElement(), + "recordElement is null. Use builder method withRecordElement() to set this."); + checkNotNull( + getRecordClass(), + "recordClass is null. Use builder method withRecordClass() to set this."); + checkNotNull( + getCharset(), + "charset is null. Use builder method withCharset() to set this."); + } - abstract Builder<T> setRecordElement(String recordElement); + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder + .addIfNotNull( + DisplayData.item("rootElement", getRootElement()).withLabel("XML Root Element")) + .addIfNotNull( + DisplayData.item("recordElement", getRecordElement()).withLabel("XML Record Element")) + .addIfNotNull( + DisplayData.item("recordClass", getRecordClass()).withLabel("XML Record Class")) + .addIfNotNull(DisplayData.item("charset", getCharset()).withLabel("Charset")); + } + } - abstract Builder<T> setRecordClass(Class<T> recordClass); + /** Implementation of {@link #read}. */ + @AutoValue + public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> { + abstract MappingConfiguration<T> getConfiguration(); + @Nullable abstract String getFileOrPatternSpec(); + abstract Compression getCompression(); + abstract long getMinBundleSize(); - abstract Builder<T> setMinBundleSize(long minBundleSize); + abstract Builder<T> toBuilder(); + @AutoValue.Builder + abstract static class Builder<T> { + abstract Builder<T> setConfiguration(MappingConfiguration<T> configuration); + abstract Builder<T> setFileOrPatternSpec(String fileOrPatternSpec); abstract Builder<T> setCompression(Compression compression); - - abstract Builder<T> setCharset(String charset); - - abstract Builder<T> setValidationEventHandler(ValidationEventHandler validationEventHandler); + abstract Builder<T> setMinBundleSize(long minBundleSize); abstract Read<T> build(); } @@ -322,13 +376,17 @@ public class XmlIO { return toBuilder().setFileOrPatternSpec(fileOrPatternSpec).build(); } + private Read<T> withConfiguration(MappingConfiguration<T> configuration) { + return toBuilder().setConfiguration(configuration).build(); + } + /** * Sets name of the root element of the XML document. This will be used to create a valid * starting root element when initiating a bundle of records created from an XML document. This * is a required parameter. */ public Read<T> withRootElement(String rootElement) { - return toBuilder().setRootElement(rootElement).build(); + return withConfiguration(getConfiguration().withRootElement(rootElement)); } /** @@ -336,7 +394,7 @@ public class XmlIO { * the first record of a bundle created from the XML document. This is a required parameter. */ public Read<T> withRecordElement(String recordElement) { - return toBuilder().setRecordElement(recordElement).build(); + return withConfiguration(getConfiguration().withRecordElement(recordElement)); } /** @@ -345,7 +403,7 @@ public class XmlIO { * parameter. */ public Read<T> withRecordClass(Class<T> recordClass) { - return toBuilder().setRecordClass(recordClass).build(); + return withConfiguration(getConfiguration().withRecordClass(recordClass)); } /** @@ -372,7 +430,7 @@ public class XmlIO { * Sets the XML file charset. */ public Read<T> withCharset(Charset charset) { - return toBuilder().setCharset(charset.name()).build(); + return withConfiguration(getConfiguration().withCharset(charset)); } /** @@ -380,23 +438,8 @@ public class XmlIO { * parameter will cause the JAXB unmarshaller event handler to be unspecified. */ public Read<T> withValidationEventHandler(ValidationEventHandler validationEventHandler) { - return toBuilder().setValidationEventHandler(validationEventHandler).build(); - } - - @Override - public void validate(PipelineOptions options) { - checkNotNull( - getRootElement(), - "rootElement is null. Use builder method withRootElement() to set this."); - checkNotNull( - getRecordElement(), - "recordElement is null. Use builder method withRecordElement() to set this."); - checkNotNull( - getRecordClass(), - "recordClass is null. Use builder method withRecordClass() to set this."); - checkNotNull( - getCharset(), - "charset is null. Use builder method withCharset() to set this."); + return withConfiguration( + getConfiguration().withValidationEventHandler(validationEventHandler)); } @Override @@ -407,27 +450,90 @@ public class XmlIO { .withLabel("Minimum Bundle Size"), 1L) .add(DisplayData.item("filePattern", getFileOrPatternSpec()).withLabel("File Pattern")) - .addIfNotNull( - DisplayData.item("rootElement", getRootElement()).withLabel("XML Root Element")) - .addIfNotNull( - DisplayData.item("recordElement", getRecordElement()).withLabel("XML Record Element")) - .addIfNotNull( - DisplayData.item("recordClass", getRecordClass()).withLabel("XML Record Class")) - .addIfNotNull( - DisplayData.item("charset", getCharset()).withLabel("Charset")); + .include("configuration", getConfiguration()); } @VisibleForTesting BoundedSource<T> createSource() { - return CompressedSource.from(new XmlSource<>(this)).withCompression(getCompression()); + return CompressedSource.from( + new XmlSource<>( + StaticValueProvider.of(getFileOrPatternSpec()), getConfiguration(), 1L)) + .withCompression(getCompression()); } @Override public PCollection<T> expand(PBegin input) { + getConfiguration().validate(); return input.apply(org.apache.beam.sdk.io.Read.from(createSource())); } } + /** Implementation of {@link #readFiles}. */ + @AutoValue + public abstract static class ReadFiles<T> + extends PTransform<PCollection<ReadableFile>, PCollection<T>> { + abstract MappingConfiguration<T> getConfiguration(); + abstract Builder<T> toBuilder(); + + @AutoValue.Builder + abstract static class Builder<T> { + abstract Builder<T> setConfiguration(MappingConfiguration<T> configuration); + abstract ReadFiles<T> build(); + } + + private ReadFiles<T> withConfiguration(MappingConfiguration<T> configuration) { + return toBuilder().setConfiguration(configuration).build(); + } + + /** Like {@link Read#withRootElement}. */ + public ReadFiles<T> withRootElement(String rootElement) { + return withConfiguration(getConfiguration().withRootElement(rootElement)); + } + + /** Like {@link Read#withRecordElement}. */ + public ReadFiles<T> withRecordElement(String recordElement) { + return withConfiguration(getConfiguration().withRecordElement(recordElement)); + } + + /** Like {@link Read#withRecordClass}. */ + public ReadFiles<T> withRecordClass(Class<T> recordClass) { + return withConfiguration(getConfiguration().withRecordClass(recordClass)); + } + + /** Like {@link Read#withCharset}. */ + public ReadFiles<T> withCharset(Charset charset) { + return withConfiguration(getConfiguration().withCharset(charset)); + } + + /** Like {@link Read#withValidationEventHandler}. */ + public ReadFiles<T> withValidationEventHandler(ValidationEventHandler validationEventHandler) { + return withConfiguration( + getConfiguration().withValidationEventHandler(validationEventHandler)); + } + + @Override + public PCollection<T> expand(PCollection<ReadableFile> input) { + return input.apply( + new ReadAllViaFileBasedSource<T>( + 64 * 1024L * 1024L, + new CreateSourceFn<>(getConfiguration()), + JAXBCoder.of(getConfiguration().getRecordClass()))); + } + } + + private static class CreateSourceFn<T> implements SerializableFunction<String, XmlSource<T>> { + private final MappingConfiguration<T> configuration; + + public CreateSourceFn(MappingConfiguration<T> configuration) { + this.configuration = configuration; + } + + @Override + public XmlSource<T> apply(String input) { + return new XmlSource<>(StaticValueProvider.of(input), configuration, 1L); + } + } + /** Implementation of {@link #write}. */ @AutoValue public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> { http://git-wip-us.apache.org/repos/asf/beam/blob/abda38dc/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSource.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSource.java index b893d43..921cd7a 100644 --- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSource.java +++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSource.java @@ -42,8 +42,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.FileBasedSource; import org.apache.beam.sdk.io.fs.MatchResult.Metadata; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; -import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.options.ValueProvider; import org.codehaus.stax2.XMLInputFactory2; /** Implementation of {@link XmlIO#read}. */ @@ -51,21 +50,29 @@ public class XmlSource<T> extends FileBasedSource<T> { private static final String XML_VERSION = "1.1"; - private final XmlIO.Read<T> spec; + private final XmlIO.MappingConfiguration<T> configuration; - XmlSource(XmlIO.Read<T> spec) { - super(StaticValueProvider.of(spec.getFileOrPatternSpec()), spec.getMinBundleSize()); - this.spec = spec; + XmlSource( + ValueProvider<String> spec, + XmlIO.MappingConfiguration<T> configuration, + long minBundleSizeBytes) { + super(spec, minBundleSizeBytes); + this.configuration = configuration; } - private XmlSource(XmlIO.Read<T> spec, Metadata metadata, long startOffset, long endOffset) { - super(metadata, spec.getMinBundleSize(), startOffset, endOffset); - this.spec = spec; + private XmlSource( + XmlIO.MappingConfiguration<T> configuration, + long minBundleSizeBytes, + Metadata metadata, + long startOffset, + long endOffset) { + super(metadata, minBundleSizeBytes, startOffset, endOffset); + this.configuration = configuration; } @Override protected FileBasedSource<T> createForSubrangeOfFile(Metadata metadata, long start, long end) { - return new XmlSource<T>(spec.from(metadata.toString()), metadata, start, end); + return new XmlSource<T>(configuration, getMinBundleSize(), metadata, start, end); } @Override @@ -74,19 +81,8 @@ public class XmlSource<T> extends FileBasedSource<T> { } @Override - public void validate() { - super.validate(); - spec.validate(null); - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - spec.populateDisplayData(builder); - } - - @Override public Coder<T> getOutputCoder() { - return JAXBCoder.of(spec.getRecordClass()); + return JAXBCoder.of(configuration.getRecordClass()); } /** @@ -137,10 +133,12 @@ public class XmlSource<T> extends FileBasedSource<T> { // Set up a JAXB Unmarshaller that can be used to unmarshall record objects. try { - JAXBContext jaxbContext = JAXBContext.newInstance(getCurrentSource().spec.getRecordClass()); + JAXBContext jaxbContext = + JAXBContext.newInstance(getCurrentSource().configuration.getRecordClass()); jaxbUnmarshaller = jaxbContext.createUnmarshaller(); - if (getCurrentSource().spec.getValidationEventHandler() != null) { - jaxbUnmarshaller.setEventHandler(getCurrentSource().spec.getValidationEventHandler()); + if (getCurrentSource().configuration.getValidationEventHandler() != null) { + jaxbUnmarshaller.setEventHandler( + getCurrentSource().configuration.getValidationEventHandler()); } } catch (JAXBException e) { throw new RuntimeException(e); @@ -179,10 +177,10 @@ public class XmlSource<T> extends FileBasedSource<T> { byte[] dummyStartDocumentBytes = (String.format( "<?xml version=\"%s\" encoding=\"" - + getCurrentSource().spec.getCharset() + + getCurrentSource().configuration.getCharset() + "\"?><%s>", - XML_VERSION, getCurrentSource().spec.getRootElement())) - .getBytes(getCurrentSource().spec.getCharset()); + XML_VERSION, getCurrentSource().configuration.getRootElement())) + .getBytes(getCurrentSource().configuration.getCharset()); preambleByteBuffer.write(dummyStartDocumentBytes); // Gets the byte offset (in the input file) of the first record in ReadableByteChannel. This // method returns the offset and stores any bytes that should be used when creating the XML @@ -230,7 +228,8 @@ public class XmlSource<T> extends FileBasedSource<T> { ByteBuffer buf = ByteBuffer.allocate(BUF_SIZE); byte[] recordStartBytes = - ("<" + getCurrentSource().spec.getRecordElement()).getBytes(StandardCharsets.UTF_8); + ("<" + getCurrentSource().configuration.getRecordElement()) + .getBytes(StandardCharsets.UTF_8); outer: while (channel.read(buf) > 0) { buf.flip(); @@ -334,14 +333,14 @@ public class XmlSource<T> extends FileBasedSource<T> { this.parser = xmlInputFactory.createXMLStreamReader( new SequenceInputStream( new ByteArrayInputStream(lookAhead), Channels.newInputStream(channel)), - getCurrentSource().spec.getCharset()); + getCurrentSource().configuration.getCharset()); // Current offset should be the offset before reading the record element. while (true) { int event = parser.next(); if (event == XMLStreamConstants.START_ELEMENT) { String localName = parser.getLocalName(); - if (localName.equals(getCurrentSource().spec.getRecordElement())) { + if (localName.equals(getCurrentSource().configuration.getRecordElement())) { break; } } @@ -369,7 +368,7 @@ public class XmlSource<T> extends FileBasedSource<T> { } } JAXBElement<T> jb = - jaxbUnmarshaller.unmarshal(parser, getCurrentSource().spec.getRecordClass()); + jaxbUnmarshaller.unmarshal(parser, getCurrentSource().configuration.getRecordClass()); currentRecord = jb.getValue(); return true; } catch (JAXBException | XMLStreamException e) { http://git-wip-us.apache.org/repos/asf/beam/blob/abda38dc/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSourceTest.java b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSourceTest.java index abddcf9..a6adac6 100644 --- a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSourceTest.java +++ b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSourceTest.java @@ -41,6 +41,7 @@ import javax.xml.bind.ValidationEventHandler; import javax.xml.bind.annotation.XmlAttribute; import javax.xml.bind.annotation.XmlRootElement; import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.FileIO; import org.apache.beam.sdk.io.Source.Reader; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -459,60 +460,6 @@ public class XmlSourceTest { } @Test - public void testReadXMLNoRootElement() throws IOException { - File file = tempFolder.newFile("trainXMLSmall"); - Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8)); - - BoundedSource<Train> source = - XmlIO.<Train>read() - .from(file.toPath().toString()) - .withRecordElement("train") - .withRecordClass(Train.class) - .createSource(); - - exception.expect(NullPointerException.class); - exception.expectMessage( - "rootElement is null. Use builder method withRootElement() to set this."); - readEverythingFromReader(source.createReader(null)); - } - - @Test - public void testReadXMLNoRecordElement() throws IOException { - File file = tempFolder.newFile("trainXMLSmall"); - Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8)); - - BoundedSource<Train> source = - XmlIO.<Train>read() - .from(file.toPath().toString()) - .withRootElement("trains") - .withRecordClass(Train.class) - .createSource(); - - exception.expect(NullPointerException.class); - exception.expectMessage( - "recordElement is null. Use builder method withRecordElement() to set this."); - readEverythingFromReader(source.createReader(null)); - } - - @Test - public void testReadXMLNoRecordClass() throws IOException { - File file = tempFolder.newFile("trainXMLSmall"); - Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8)); - - BoundedSource<Train> source = - XmlIO.<Train>read() - .from(file.toPath().toString()) - .withRootElement("trains") - .withRecordElement("train") - .createSource(); - - exception.expect(NullPointerException.class); - exception.expectMessage( - "recordClass is null. Use builder method withRecordClass() to set this."); - readEverythingFromReader(source.createReader(null)); - } - - @Test public void testReadXMLIncorrectRootElement() throws IOException { File file = tempFolder.newFile("trainXMLSmall"); Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8)); @@ -938,7 +885,7 @@ public class XmlSourceTest { @Test @Category(NeedsRunner.class) - public void testReadXMLFilePattern() throws IOException { + public void testReadXMLFilePatternUsingReadAndReadFiles() throws IOException { List<Train> trains1 = generateRandomTrainList(20); File file = createRandomTrainXML("temp1.xml", trains1); List<Train> trains2 = generateRandomTrainList(10); @@ -948,9 +895,9 @@ public class XmlSourceTest { generateRandomTrainList(8); createRandomTrainXML("otherfile.xml", trains1); - PCollection<Train> output = + PCollection<Train> read = p.apply( - "ReadFileData", + "Read", XmlIO.<Train>read() .from(file.getParent() + "/" + "temp*.xml") .withRootElement("trains") @@ -958,12 +905,23 @@ public class XmlSourceTest { .withRecordClass(Train.class) .withMinBundleSize(1024)); + PCollection<Train> readFiles = + p.apply(FileIO.match().filepattern(file.getParent() + "/" + "temp*.xml")) + .apply(FileIO.readMatches()) + .apply( + "ReadFiles", + XmlIO.<Train>readFiles() + .withRootElement("trains") + .withRecordElement("train") + .withRecordClass(Train.class)); + List<Train> expectedResults = new ArrayList<>(); expectedResults.addAll(trains1); expectedResults.addAll(trains2); expectedResults.addAll(trains3); - PAssert.that(output).containsInAnyOrder(expectedResults); + PAssert.that(read).containsInAnyOrder(expectedResults); + PAssert.that(readFiles).containsInAnyOrder(expectedResults); p.run(); }