[BEAM-1914]Â XmlIO now complies with PTransform style guide
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d0c0a60c Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d0c0a60c Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d0c0a60c Branch: refs/heads/DSL_SQL Commit: d0c0a60c83a9d2a6caa29f91f89d8c0ee3b0eb93 Parents: 57929fb Author: Eugene Kirpichov <kirpic...@google.com> Authored: Mon Apr 17 16:25:42 2017 -0700 Committer: Jean-Baptiste Onofré <jbono...@apache.org> Committed: Wed Apr 19 10:34:46 2017 +0200 ---------------------------------------------------------------------- .../apache/beam/sdk/io/CompressedSource.java | 4 +- .../main/java/org/apache/beam/sdk/io/XmlIO.java | 477 +++++++++++++++++++ .../java/org/apache/beam/sdk/io/XmlSink.java | 226 ++------- .../java/org/apache/beam/sdk/io/XmlSource.java | 191 +------- .../sdk/transforms/display/DisplayData.java | 6 + .../org/apache/beam/sdk/io/XmlSinkTest.java | 89 ++-- .../org/apache/beam/sdk/io/XmlSourceTest.java | 248 ++++++---- .../sdk/transforms/display/DisplayDataTest.java | 17 + 8 files changed, 740 insertions(+), 518 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/d0c0a60c/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java index ecd0fd9..1d940cb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java @@ -46,10 +46,10 @@ import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; * A Source that reads from compressed files. A {@code CompressedSources} wraps a delegate * {@link FileBasedSource} that is able to read the decompressed file format. * - * <p>For example, use the following to read from a gzip-compressed XML file: + * <p>For example, use the following to read from a gzip-compressed file-based source: * * <pre> {@code - * XmlSource mySource = XmlSource.from(...); + * FileBasedSource<T> mySource = ...; * PCollection<T> collection = p.apply(Read.from(CompressedSource * .from(mySource) * .withDecompression(CompressedSource.CompressionMode.GZIP))); http://git-wip-us.apache.org/repos/asf/beam/blob/d0c0a60c/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlIO.java new file mode 100644 index 0000000..a53fb86 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlIO.java @@ -0,0 +1,477 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.auto.value.AutoValue; +import com.google.common.annotations.VisibleForTesting; +import javax.annotation.Nullable; +import javax.xml.bind.JAXBContext; +import javax.xml.bind.JAXBException; +import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; + +/** Transforms for reading and writing XML files using JAXB mappers. */ +public class XmlIO { + // CHECKSTYLE.OFF: JavadocStyle + /** + * Reads XML files. This source reads one or more XML files and + * creates a {@link PCollection} of a given type. Please note the example given below. + * + * <p>The XML file must be of the following form, where {@code root} and {@code record} are XML + * element names that are defined by the user: + * + * <pre>{@code + * <root> + * <record> ... </record> + * <record> ... </record> + * <record> ... </record> + * ... + * <record> ... </record> + * </root> + * }</pre> + * + * <p>Basically, the XML document should contain a single root element with an inner list + * consisting entirely of record elements. The records may contain arbitrary XML content; however, + * that content <b>must not</b> contain the start {@code <record>} or end {@code </record>} tags. + * This restriction enables reading from large XML files in parallel from different offsets in the + * file. + * + * <p>Root and/or record elements may additionally contain an arbitrary number of XML attributes. + * Additionally users must provide a class of a JAXB annotated Java type that can be used convert + * records into Java objects and vice versa using JAXB marshalling/unmarshalling mechanisms. + * Reading the source will generate a {@code PCollection} of the given JAXB annotated Java type. + * Optionally users may provide a minimum size of a bundle that should be created for the source. + * + * <p>The following example shows how to use this method in a Beam pipeline: + * + * <pre>{@code + * PCollection<String> output = p.apply(XmlIO.<Record>read() + * .from(file.toPath().toString()) + * .withRootElement("root") + * .withRecordElement("record") + * .withRecordClass(Record.class)); + * }</pre> + * + * <p>Currently, only XML files that use single-byte characters are supported. Using a file that + * contains multi-byte characters may result in data loss or duplication. + * + * <p>To use this method: + * + * <ol> + * <li>Explicitly declare a dependency on org.codehaus.woodstox:stax2-api + * <li>Include a compatible implementation on the classpath at run-time, such as + * org.codehaus.woodstox:woodstox-core-asl + * </ol> + * + * <p>These dependencies have been declared as optional in the sdks/java/core/pom.xml file of + * Apache Beam. + * + * <h3>Permissions</h3> + * Permission requirements depend on the {@link org.apache.beam.sdk.runners.PipelineRunner + * PipelineRunner} that is used to execute the Beam pipeline. Please refer to the documentation of + * corresponding {@link PipelineRunner PipelineRunners} for more details. + * + * @param <T> Type of the objects that represent the records of the XML file. The {@code + * PCollection} generated by this source will be of this type. + */ + // CHECKSTYLE.ON: JavadocStyle + public static <T> Read<T> read() { + return new AutoValue_XmlIO_Read.Builder<T>() + .setMinBundleSize(Read.DEFAULT_MIN_BUNDLE_SIZE) + .setCompressionType(Read.CompressionType.AUTO) + .build(); + } + + // CHECKSTYLE.OFF: JavadocStyle + /** + * A {@link Sink} that outputs records as XML-formatted elements. Writes a {@link PCollection} of + * records from JAXB-annotated classes to a single file location. + * + * <p>Given a PCollection containing records of type T that can be marshalled to XML elements, + * this Sink will produce a single file consisting of a single root element that contains all of + * the elements in the PCollection. + * + * <p>XML Sinks are created with a base filename to write to, a root element name that will be + * used for the root element of the output files, and a class to bind to an XML element. This + * class will be used in the marshalling of records in an input PCollection to their XML + * representation and must be able to be bound using JAXB annotations (checked at pipeline + * construction time). + * + * <p>XML Sinks can be written to using the {@link Write} transform: + * + * <pre>{@code + * p.apply(XmlIO.<Type>write() + * .withRecordClass(Type.class) + * .withRootElement(root_element) + * .toFilenamePrefix(output_filename)); + * }</pre> + * + * <p>For example, consider the following class with JAXB annotations: + * + * <pre> + * {@literal @}XmlRootElement(name = "word_count_result") + * {@literal @}XmlType(propOrder = {"word", "frequency"}) + * public class WordFrequency { + * private String word; + * private long frequency; + * + * public WordFrequency() { } + * + * public WordFrequency(String word, long frequency) { + * this.word = word; + * this.frequency = frequency; + * } + * + * public void setWord(String word) { + * this.word = word; + * } + * + * public void setFrequency(long frequency) { + * this.frequency = frequency; + * } + * + * public long getFrequency() { + * return frequency; + * } + * + * public String getWord() { + * return word; + * } + * } + * </pre> + * + * <p>The following will produce XML output with a root element named "words" from a PCollection + * of WordFrequency objects: + * + * <pre>{@code + * p.apply(XmlIO.<WordFrequency>write() + * .withRecordClass(WordFrequency.class) + * .withRootElement("words") + * .toFilenamePrefix(output_file)); + * }</pre> + * + * <p>The output of which will look like: + * + * <pre>{@code + * <words> + * + * <word_count_result> + * <word>decreased</word> + * <frequency>1</frequency> + * </word_count_result> + * + * <word_count_result> + * <word>War</word> + * <frequency>4</frequency> + * </word_count_result> + * + * <word_count_result> + * <word>empress'</word> + * <frequency>14</frequency> + * </word_count_result> + * + * <word_count_result> + * <word>stoops</word> + * <frequency>6</frequency> + * </word_count_result> + * + * ... + * </words> + * }</pre> + */ + // CHECKSTYLE.ON: JavadocStyle + public static <T> Write<T> write() { + return new AutoValue_XmlIO_Write.Builder<T>().build(); + } + + /** Implementation of {@link #read}. */ + @AutoValue + public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> { + private static final int DEFAULT_MIN_BUNDLE_SIZE = 8 * 1024; + + @Nullable + abstract String getFileOrPatternSpec(); + + @Nullable + abstract String getRootElement(); + + @Nullable + abstract String getRecordElement(); + + @Nullable + abstract Class<T> getRecordClass(); + + abstract CompressionType getCompressionType(); + + abstract long getMinBundleSize(); + + abstract Builder<T> toBuilder(); + + @AutoValue.Builder + abstract static class Builder<T> { + abstract Builder<T> setFileOrPatternSpec(String fileOrPatternSpec); + + abstract Builder<T> setRootElement(String rootElement); + + abstract Builder<T> setRecordElement(String recordElement); + + abstract Builder<T> setRecordClass(Class<T> recordClass); + + abstract Builder<T> setMinBundleSize(long minBundleSize); + + abstract Builder<T> setCompressionType(CompressionType compressionType); + + abstract Read<T> build(); + } + + /** Strategy for determining the compression type of XML files being read. */ + public enum CompressionType { + /** Automatically determine the compression type based on filename extension. */ + AUTO(""), + /** Uncompressed (i.e., may be split). */ + UNCOMPRESSED(""), + /** GZipped. */ + GZIP(".gz"), + /** BZipped. */ + BZIP2(".bz2"), + /** Zipped. */ + ZIP(".zip"), + /** Deflate compressed. */ + DEFLATE(".deflate"); + + private String filenameSuffix; + + CompressionType(String suffix) { + this.filenameSuffix = suffix; + } + + /** + * Determine if a given filename matches a compression type based on its extension. + * @param filename the filename to match + * @return true iff the filename ends with the compression type's known extension. + */ + public boolean matches(String filename) { + return filename.toLowerCase().endsWith(filenameSuffix.toLowerCase()); + } + } + + /** + * Reads a single XML file or a set of XML files defined by a Java "glob" + * file pattern. Each XML file should be of the form defined in {@link #read}. + */ + public Read<T> from(String fileOrPatternSpec) { + return toBuilder().setFileOrPatternSpec(fileOrPatternSpec).build(); + } + + /** + * Sets name of the root element of the XML document. This will be used to create a valid + * starting root element when initiating a bundle of records created from an XML document. This + * is a required parameter. + */ + public Read<T> withRootElement(String rootElement) { + return toBuilder().setRootElement(rootElement).build(); + } + + /** + * Sets name of the record element of the XML document. This will be used to determine offset of + * the first record of a bundle created from the XML document. This is a required parameter. + */ + public Read<T> withRecordElement(String recordElement) { + return toBuilder().setRecordElement(recordElement).build(); + } + + /** + * Sets a JAXB annotated class that can be populated using a record of the provided XML file. + * This will be used when unmarshalling record objects from the XML file. This is a required + * parameter. + */ + public Read<T> withRecordClass(Class<T> recordClass) { + return toBuilder().setRecordClass(recordClass).build(); + } + + /** + * Sets a parameter {@code minBundleSize} for the minimum bundle size of the source. Please + * refer to {@link OffsetBasedSource} for the definition of minBundleSize. This is an optional + * parameter. + */ + public Read<T> withMinBundleSize(long minBundleSize) { + return toBuilder().setMinBundleSize(minBundleSize).build(); + } + + /** + * Decompresses all input files using the specified compression type. + * + * <p>If no compression type is specified, the default is {@link CompressionType#AUTO}. + * In this mode, the compression type of the file is determined by its extension. + * Supports .gz, .bz2, .zip and .deflate compression. + */ + public Read<T> withCompressionType(CompressionType compressionType) { + return toBuilder().setCompressionType(compressionType).build(); + } + + @Override + public void validate(PBegin input) { + checkNotNull( + getRootElement(), + "rootElement is null. Use builder method withRootElement() to set this."); + checkNotNull( + getRecordElement(), + "recordElement is null. Use builder method withRecordElement() to set this."); + checkNotNull( + getRecordClass(), + "recordClass is null. Use builder method withRecordClass() to set this."); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder + .addIfNotDefault( + DisplayData.item("minBundleSize", getMinBundleSize()) + .withLabel("Minimum Bundle Size"), + 1L) + .add(DisplayData.item("filePattern", getFileOrPatternSpec()).withLabel("File Pattern")) + .addIfNotNull( + DisplayData.item("rootElement", getRootElement()).withLabel("XML Root Element")) + .addIfNotNull( + DisplayData.item("recordElement", getRecordElement()).withLabel("XML Record Element")) + .addIfNotNull( + DisplayData.item("recordClass", getRecordClass()).withLabel("XML Record Class")); + } + + @VisibleForTesting + BoundedSource<T> createSource() { + XmlSource<T> source = new XmlSource<>(this); + switch (getCompressionType()) { + case UNCOMPRESSED: + return source; + case AUTO: + return CompressedSource.from(source); + case BZIP2: + return CompressedSource.from(source) + .withDecompression(CompressedSource.CompressionMode.BZIP2); + case GZIP: + return CompressedSource.from(source) + .withDecompression(CompressedSource.CompressionMode.GZIP); + case ZIP: + return CompressedSource.from(source) + .withDecompression(CompressedSource.CompressionMode.ZIP); + case DEFLATE: + return CompressedSource.from(source) + .withDecompression(CompressedSource.CompressionMode.DEFLATE); + default: + throw new IllegalArgumentException("Unknown compression type: " + getCompressionType()); + } + } + + @Override + public PCollection<T> expand(PBegin input) { + return input.apply(org.apache.beam.sdk.io.Read.from(createSource())); + } + } + + /** Implementation of {@link #write}. */ + @AutoValue + public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> { + @Nullable + abstract String getFilenamePrefix(); + + @Nullable + abstract Class<T> getRecordClass(); + + @Nullable + abstract String getRootElement(); + + abstract Builder<T> toBuilder(); + + @AutoValue.Builder + abstract static class Builder<T> { + abstract Builder<T> setFilenamePrefix(String baseOutputFilename); + + abstract Builder<T> setRecordClass(Class<T> recordClass); + + abstract Builder<T> setRootElement(String rootElement); + + abstract Write<T> build(); + } + + + /** + * Writes to files with the given path prefix. + * + * <p>Output files will have the name {@literal {filenamePrefix}-0000i-of-0000n.xml} where n is + * the number of output bundles. + */ + public Write<T> toFilenamePrefix(String filenamePrefix) { + return toBuilder().setFilenamePrefix(filenamePrefix).build(); + } + + /** + * Writes objects of the given class mapped to XML elements using JAXB. + * + * <p>The specified class must be able to be used to create a JAXB context. + */ + public Write<T> withRecordClass(Class<T> recordClass) { + return toBuilder().setRecordClass(recordClass).build(); + } + + /** + * Sets the enclosing root element for the generated XML files. + */ + public Write<T> withRootElement(String rootElement) { + return toBuilder().setRootElement(rootElement).build(); + } + + @Override + public void validate(PCollection<T> input) { + checkNotNull(getRecordClass(), "Missing a class to bind to a JAXB context."); + checkNotNull(getRootElement(), "Missing a root element name."); + checkNotNull(getFilenamePrefix(), "Missing a filename to write to."); + try { + JAXBContext.newInstance(getRecordClass()); + } catch (JAXBException e) { + throw new RuntimeException("Error binding classes to a JAXB Context.", e); + } + } + + @Override + public PDone expand(PCollection<T> input) { + return input.apply(org.apache.beam.sdk.io.Write.to(createSink())); + } + + @VisibleForTesting + XmlSink<T> createSink() { + return new XmlSink<>(this); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + createSink().populateFileBasedDisplayData(builder); + builder + .addIfNotNull(DisplayData.item("rootElement", getRootElement()) + .withLabel("XML Root Element")) + .addIfNotNull(DisplayData.item("recordClass", getRecordClass()) + .withLabel("XML Record Class")); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/d0c0a60c/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java index 2159c8f..7700329 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java @@ -17,226 +17,58 @@ */ package org.apache.beam.sdk.io; -import static com.google.common.base.Preconditions.checkNotNull; - import java.io.OutputStream; import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; import javax.xml.bind.JAXBContext; -import javax.xml.bind.JAXBException; import javax.xml.bind.Marshaller; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation; -import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriter; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.CoderUtils; -import org.apache.beam.sdk.values.PCollection; -// CHECKSTYLE.OFF: JavadocStyle -/** - * A {@link Sink} that outputs records as XML-formatted elements. Writes a {@link PCollection} of - * records from JAXB-annotated classes to a single file location. - * - * <p>Given a PCollection containing records of type T that can be marshalled to XML elements, this - * Sink will produce a single file consisting of a single root element that contains all of the - * elements in the PCollection. - * - * <p>XML Sinks are created with a base filename to write to, a root element name that will be used - * for the root element of the output files, and a class to bind to an XML element. This class - * will be used in the marshalling of records in an input PCollection to their XML representation - * and must be able to be bound using JAXB annotations (checked at pipeline construction time). - * - * <p>XML Sinks can be written to using the {@link Write} transform: - * - * <pre> - * p.apply(Write.to( - * XmlSink.ofRecordClass(Type.class) - * .withRootElementName(root_element) - * .toFilenamePrefix(output_filename))); - * </pre> - * - * <p>For example, consider the following class with JAXB annotations: - * - * <pre> - * {@literal @}XmlRootElement(name = "word_count_result") - * {@literal @}XmlType(propOrder = {"word", "frequency"}) - * public class WordFrequency { - * private String word; - * private long frequency; - * - * public WordFrequency() { } - * - * public WordFrequency(String word, long frequency) { - * this.word = word; - * this.frequency = frequency; - * } - * - * public void setWord(String word) { - * this.word = word; - * } - * - * public void setFrequency(long frequency) { - * this.frequency = frequency; - * } - * - * public long getFrequency() { - * return frequency; - * } - * - * public String getWord() { - * return word; - * } - * } - * </pre> - * - * <p>The following will produce XML output with a root element named "words" from a PCollection of - * WordFrequency objects: - * <pre> - * p.apply(Write.to( - * XmlSink.ofRecordClass(WordFrequency.class) - * .withRootElement("words") - * .toFilenamePrefix(output_file))); - * </pre> - * - * <p>The output of which will look like: - * <pre> - * {@code - * <words> - * - * <word_count_result> - * <word>decreased</word> - * <frequency>1</frequency> - * </word_count_result> - * - * <word_count_result> - * <word>War</word> - * <frequency>4</frequency> - * </word_count_result> - * - * <word_count_result> - * <word>empress'</word> - * <frequency>14</frequency> - * </word_count_result> - * - * <word_count_result> - * <word>stoops</word> - * <frequency>6</frequency> - * </word_count_result> - * - * ... - * </words> - * }</pre> - */ -// CHECKSTYLE.ON: JavadocStyle -@SuppressWarnings("checkstyle:javadocstyle") -public class XmlSink { +/** Implementation of {@link XmlIO#write}. */ +class XmlSink<T> extends FileBasedSink<T> { protected static final String XML_EXTENSION = "xml"; - /** - * Returns a builder for an XmlSink. You'll need to configure the class to bind, the root - * element name, and the output file prefix with {@link Bound#ofRecordClass}, {@link - * Bound#withRootElement}, and {@link Bound#toFilenamePrefix}, respectively. - */ - public static Bound<?> write() { - return new Bound<>(null, null, null); + private final XmlIO.Write<T> spec; + + XmlSink(XmlIO.Write<T> spec) { + super(spec.getFilenamePrefix(), XML_EXTENSION); + this.spec = spec; } /** - * Returns an XmlSink that writes objects as XML entities. - * - * <p>Output files will have the name {@literal {baseOutputFilename}-0000i-of-0000n.xml} where n - * is the number of output bundles. - * - * @param klass the class of the elements to write. - * @param rootElementName the enclosing root element. - * @param baseOutputFilename the output filename prefix. + * Validates that the root element, class to bind to a JAXB context, and filenamePrefix have + * been set and that the class can be bound in a JAXB context. */ - public static <T> Bound<T> writeOf( - Class<T> klass, String rootElementName, String baseOutputFilename) { - return new Bound<>(klass, rootElementName, baseOutputFilename); + @Override + public void validate(PipelineOptions options) { + spec.validate(null); } /** - * A {@link FileBasedSink} that writes objects as XML elements. + * Creates an {@link XmlWriteOperation}. */ - public static class Bound<T> extends FileBasedSink<T> { - final Class<T> classToBind; - final String rootElementName; - - private Bound(Class<T> classToBind, String rootElementName, String baseOutputFilename) { - super(baseOutputFilename, XML_EXTENSION); - this.classToBind = classToBind; - this.rootElementName = rootElementName; - } - - /** - * Returns an XmlSink that writes objects of the class specified as XML elements. - * - * <p>The specified class must be able to be used to create a JAXB context. - */ - public <T> Bound<T> ofRecordClass(Class<T> classToBind) { - return new Bound<>(classToBind, rootElementName, getBaseOutputFilenameProvider().get()); - } - - /** - * Returns an XmlSink that writes to files with the given prefix. - * - * <p>Output files will have the name {@literal {filenamePrefix}-0000i-of-0000n.xml} where n is - * the number of output bundles. - */ - public Bound<T> toFilenamePrefix(String baseOutputFilename) { - return new Bound<>(classToBind, rootElementName, baseOutputFilename); - } - - /** - * Returns an XmlSink that writes XML files with an enclosing root element of the - * supplied name. - */ - public Bound<T> withRootElement(String rootElementName) { - return new Bound<>(classToBind, rootElementName, getBaseOutputFilenameProvider().get()); - } - - /** - * Validates that the root element, class to bind to a JAXB context, and filenamePrefix have - * been set and that the class can be bound in a JAXB context. - */ - @Override - public void validate(PipelineOptions options) { - checkNotNull(classToBind, "Missing a class to bind to a JAXB context."); - checkNotNull(rootElementName, "Missing a root element name."); - checkNotNull(getBaseOutputFilenameProvider().get(), "Missing a filename to write to."); - try { - JAXBContext.newInstance(classToBind); - } catch (JAXBException e) { - throw new RuntimeException("Error binding classes to a JAXB Context.", e); - } - } + @Override + public XmlWriteOperation<T> createWriteOperation(PipelineOptions options) { + return new XmlWriteOperation<>(this); + } - /** - * Creates an {@link XmlWriteOperation}. - */ - @Override - public XmlWriteOperation<T> createWriteOperation(PipelineOptions options) { - return new XmlWriteOperation<>(this); - } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + spec.populateDisplayData(builder); + } - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - builder - .addIfNotNull(DisplayData.item("rootElement", rootElementName) - .withLabel("XML Root Element")) - .addIfNotNull(DisplayData.item("recordClass", classToBind) - .withLabel("XML Record Class")); - } + void populateFileBasedDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); } /** * {@link Sink.WriteOperation} for XML {@link Sink}s. */ protected static final class XmlWriteOperation<T> extends FileBasedWriteOperation<T> { - public XmlWriteOperation(XmlSink.Bound<T> sink) { + public XmlWriteOperation(XmlSink<T> sink) { super(sink); } @@ -247,7 +79,7 @@ public class XmlSink { public XmlWriter<T> createWriter(PipelineOptions options) throws Exception { JAXBContext context; Marshaller marshaller; - context = JAXBContext.newInstance(getSink().classToBind); + context = JAXBContext.newInstance(getSink().spec.getRecordClass()); marshaller = context.createMarshaller(); marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE); marshaller.setProperty(Marshaller.JAXB_FRAGMENT, Boolean.TRUE); @@ -259,8 +91,8 @@ public class XmlSink { * Return the XmlSink.Bound for this write operation. */ @Override - public XmlSink.Bound<T> getSink() { - return (XmlSink.Bound<T>) super.getSink(); + public XmlSink<T> getSink() { + return (XmlSink<T>) super.getSink(); } } @@ -289,7 +121,7 @@ public class XmlSink { */ @Override protected void writeHeader() throws Exception { - String rootElementName = getWriteOperation().getSink().rootElementName; + String rootElementName = getWriteOperation().getSink().spec.getRootElement(); os.write(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "<" + rootElementName + ">\n")); } @@ -298,7 +130,7 @@ public class XmlSink { */ @Override protected void writeFooter() throws Exception { - String rootElementName = getWriteOperation().getSink().rootElementName; + String rootElementName = getWriteOperation().getSink().spec.getRootElement(); os.write(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "\n</" + rootElementName + ">")); } http://git-wip-us.apache.org/repos/asf/beam/blob/d0c0a60c/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSource.java index 6bf2015..7416c85 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSource.java @@ -17,8 +17,6 @@ */ package org.apache.beam.sdk.io; -import static com.google.common.base.Preconditions.checkNotNull; - import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -45,154 +43,29 @@ import javax.xml.stream.XMLStreamReader; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.JAXBCoder; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.values.PCollection; import org.codehaus.stax2.XMLInputFactory2; -// CHECKSTYLE.OFF: JavadocStyle -/** - * A source that can be used to read XML files. This source reads one or more - * XML files and creates a {@link PCollection} of a given type. A {@link Read} transform can be - * created by passing an {@link XmlSource} object to {@link Read#from}. Please note the - * example given below. - * - * <p>The XML file must be of the following form, where {@code root} and {@code record} are XML - * element names that are defined by the user: - * - * <pre> - * {@code - * <root> - * <record> ... </record> - * <record> ... </record> - * <record> ... </record> - * ... - * <record> ... </record> - * </root> - * } - * </pre> - * - * <p>Basically, the XML document should contain a single root element with an inner list consisting - * entirely of record elements. The records may contain arbitrary XML content; however, that content - * <b>must not</b> contain the start {@code <record>} or end {@code </record>} tags. This - * restriction enables reading from large XML files in parallel from different offsets in the file. - * - * <p>Root and/or record elements may additionally contain an arbitrary number of XML attributes. - * Additionally users must provide a class of a JAXB annotated Java type that can be used convert - * records into Java objects and vice versa using JAXB marshalling/unmarshalling mechanisms. Reading - * the source will generate a {@code PCollection} of the given JAXB annotated Java type. - * Optionally users may provide a minimum size of a bundle that should be created for the source. - * - * <p>The following example shows how to read from {@link XmlSource} in a Beam pipeline: - * - * <pre> - * {@code - * XmlSource<String> source = XmlSource.<String>from(file.toPath().toString()) - * .withRootElement("root") - * .withRecordElement("record") - * .withRecordClass(Record.class); - * PCollection<String> output = p.apply(Read.from(source)); - * } - * </pre> - * - * <p>Currently, only XML files that use single-byte characters are supported. Using a file that - * contains multi-byte characters may result in data loss or duplication. - * - * <p>To use {@link XmlSource}: - * <ol> - * <li>Explicitly declare a dependency on org.codehaus.woodstox:stax2-api</li> - * <li>Include a compatible implementation on the classpath at run-time, - * such as org.codehaus.woodstox:woodstox-core-asl</li> - * </ol> - * - * <p>These dependencies have been declared as optional in the sdks/java/core/pom.xml file of - * Apache Beam. - * - * <h3>Permissions</h3> - * Permission requirements depend on the - * {@link org.apache.beam.sdk.runners.PipelineRunner PipelineRunner} that is - * used to execute the Beam pipeline. Please refer to the documentation of corresponding - * {@link PipelineRunner PipelineRunners} for more details. - * - * @param <T> Type of the objects that represent the records of the XML file. The - * {@code PCollection} generated by this source will be of this type. - */ -// CHECKSTYLE.ON: JavadocStyle +/** Implementation of {@link XmlIO#read}. */ public class XmlSource<T> extends FileBasedSource<T> { private static final String XML_VERSION = "1.1"; - private static final int DEFAULT_MIN_BUNDLE_SIZE = 8 * 1024; - private final String rootElement; - private final String recordElement; - private final Class<T> recordClass; - - /** - * Creates an XmlSource for a single XML file or a set of XML files defined by a Java "glob" file - * pattern. Each XML file should be of the form defined in {@link XmlSource}. - */ - public static <T> XmlSource<T> from(String fileOrPatternSpec) { - return new XmlSource<>(fileOrPatternSpec, DEFAULT_MIN_BUNDLE_SIZE, null, null, null); - } - - /** - * Sets name of the root element of the XML document. This will be used to create a valid starting - * root element when initiating a bundle of records created from an XML document. This is a - * required parameter. - */ - public XmlSource<T> withRootElement(String rootElement) { - return new XmlSource<>( - getFileOrPatternSpec(), getMinBundleSize(), rootElement, recordElement, recordClass); - } - /** - * Sets name of the record element of the XML document. This will be used to determine offset of - * the first record of a bundle created from the XML document. This is a required parameter. - */ - public XmlSource<T> withRecordElement(String recordElement) { - return new XmlSource<>( - getFileOrPatternSpec(), getMinBundleSize(), rootElement, recordElement, recordClass); - } + private final XmlIO.Read<T> spec; - /** - * Sets a JAXB annotated class that can be populated using a record of the provided XML file. This - * will be used when unmarshalling record objects from the XML file. This is a required - * parameter. - */ - public XmlSource<T> withRecordClass(Class<T> recordClass) { - return new XmlSource<>( - getFileOrPatternSpec(), getMinBundleSize(), rootElement, recordElement, recordClass); + XmlSource(XmlIO.Read<T> spec) { + super(spec.getFileOrPatternSpec(), spec.getMinBundleSize()); + this.spec = spec; } - /** - * Sets a parameter {@code minBundleSize} for the minimum bundle size of the source. Please refer - * to {@link OffsetBasedSource} for the definition of minBundleSize. This is an optional - * parameter. - */ - public XmlSource<T> withMinBundleSize(long minBundleSize) { - return new XmlSource<>( - getFileOrPatternSpec(), minBundleSize, rootElement, recordElement, recordClass); - } - - private XmlSource(String fileOrPattern, long minBundleSize, String rootElement, - String recordElement, Class<T> recordClass) { - super(fileOrPattern, minBundleSize); - this.rootElement = rootElement; - this.recordElement = recordElement; - this.recordClass = recordClass; - } - - private XmlSource(String fileOrPattern, long minBundleSize, long startOffset, long endOffset, - String rootElement, String recordElement, Class<T> recordClass) { - super(fileOrPattern, minBundleSize, startOffset, endOffset); - this.rootElement = rootElement; - this.recordElement = recordElement; - this.recordClass = recordClass; + private XmlSource(XmlIO.Read<T> spec, long startOffset, long endOffset) { + super(spec.getFileOrPatternSpec(), spec.getMinBundleSize(), startOffset, endOffset); + this.spec = spec; } @Override protected FileBasedSource<T> createForSubrangeOfFile(String fileName, long start, long end) { - return new XmlSource<T>( - fileName, getMinBundleSize(), start, end, rootElement, recordElement, recordClass); + return new XmlSource<T>(spec.from(fileName), start, end); } @Override @@ -203,42 +76,17 @@ public class XmlSource<T> extends FileBasedSource<T> { @Override public void validate() { super.validate(); - checkNotNull( - rootElement, "rootElement is null. Use builder method withRootElement() to set this."); - checkNotNull( - recordElement, - "recordElement is null. Use builder method withRecordElement() to set this."); - checkNotNull( - recordClass, "recordClass is null. Use builder method withRecordClass() to set this."); + spec.validate(null); } @Override public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - builder - .addIfNotNull(DisplayData.item("rootElement", rootElement) - .withLabel("XML Root Element")) - .addIfNotNull(DisplayData.item("recordElement", recordElement) - .withLabel("XML Record Element")) - .addIfNotNull(DisplayData.item("recordClass", recordClass) - .withLabel("XML Record Class")); + spec.populateDisplayData(builder); } @Override public Coder<T> getDefaultOutputCoder() { - return JAXBCoder.of(recordClass); - } - - public String getRootElement() { - return rootElement; - } - - public String getRecordElement() { - return recordElement; - } - - public Class<T> getRecordClass() { - return recordClass; + return JAXBCoder.of(spec.getRecordClass()); } /** @@ -289,7 +137,7 @@ public class XmlSource<T> extends FileBasedSource<T> { // Set up a JAXB Unmarshaller that can be used to unmarshall record objects. try { - JAXBContext jaxbContext = JAXBContext.newInstance(getCurrentSource().recordClass); + JAXBContext jaxbContext = JAXBContext.newInstance(getCurrentSource().spec.getRecordClass()); jaxbUnmarshaller = jaxbContext.createUnmarshaller(); // Throw errors if validation fails. JAXB by default ignores validation errors. @@ -334,8 +182,10 @@ public class XmlSource<T> extends FileBasedSource<T> { // this XML parsing may fail or may produce incorrect results. byte[] dummyStartDocumentBytes = - ("<?xml version=\"" + XML_VERSION + "\" encoding=\"UTF-8\" ?>" - + "<" + getCurrentSource().rootElement + ">").getBytes(StandardCharsets.UTF_8); + (String.format( + "<?xml version=\"%s\" encoding=\"UTF-8\" ?><%s>", + XML_VERSION, getCurrentSource().spec.getRootElement())) + .getBytes(StandardCharsets.UTF_8); preambleByteBuffer.write(dummyStartDocumentBytes); // Gets the byte offset (in the input file) of the first record in ReadableByteChannel. This // method returns the offset and stores any bytes that should be used when creating the XML @@ -383,7 +233,7 @@ public class XmlSource<T> extends FileBasedSource<T> { ByteBuffer buf = ByteBuffer.allocate(BUF_SIZE); byte[] recordStartBytes = - ("<" + getCurrentSource().recordElement).getBytes(StandardCharsets.UTF_8); + ("<" + getCurrentSource().spec.getRecordElement()).getBytes(StandardCharsets.UTF_8); outer: while (channel.read(buf) > 0) { buf.flip(); @@ -494,7 +344,7 @@ public class XmlSource<T> extends FileBasedSource<T> { int event = parser.next(); if (event == XMLStreamConstants.START_ELEMENT) { String localName = parser.getLocalName(); - if (localName.equals(getCurrentSource().recordElement)) { + if (localName.equals(getCurrentSource().spec.getRecordElement())) { break; } } @@ -521,7 +371,8 @@ public class XmlSource<T> extends FileBasedSource<T> { return false; } } - JAXBElement<T> jb = jaxbUnmarshaller.unmarshal(parser, getCurrentSource().recordClass); + JAXBElement<T> jb = + jaxbUnmarshaller.unmarshal(parser, getCurrentSource().spec.getRecordClass()); currentRecord = jb.getValue(); return true; } catch (JAXBException | XMLStreamException e) { http://git-wip-us.apache.org/repos/asf/beam/blob/d0c0a60c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java index 669dc6d..3c4337b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java @@ -778,6 +778,12 @@ public class DisplayData implements Serializable { visitedComponents.add(subComponent); visitedPathMap.put(path, subComponent); Class<?> namespace = subComponent.getClass(); + // Common case: AutoValue classes such as AutoValue_FooIO_Read. It's more useful + // to show the user the FooIO.Read class, which is the direct superclass of the AutoValue + // generated class. + if (namespace.getSimpleName().startsWith("AutoValue_")) { + namespace = namespace.getSuperclass(); + } Path prevPath = latestPath; Class<?> prevNs = latestNs; http://git-wip-us.apache.org/repos/asf/beam/blob/d0c0a60c/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java index 63b5d11..7f559d1 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java @@ -59,7 +59,6 @@ public class XmlSinkTest { @Rule public ExpectedException thrown = ExpectedException.none(); - private Class<Bird> testClass = Bird.class; private String testRootElement = "testElement"; private String testFilePrefix = "/path/to/testPrefix"; @@ -70,7 +69,12 @@ public class XmlSinkTest { public void testXmlWriter() throws Exception { PipelineOptions options = PipelineOptionsFactory.create(); XmlWriteOperation<Bird> writeOp = - XmlSink.writeOf(Bird.class, "birds", testFilePrefix).createWriteOperation(options); + XmlIO.<Bird>write() + .toFilenamePrefix(testFilePrefix) + .withRecordClass(Bird.class) + .withRootElement("birds") + .createSink() + .createWriteOperation(options); XmlWriter<Bird> writer = writeOp.createWriter(options); List<Bird> bundle = @@ -85,51 +89,37 @@ public class XmlSinkTest { * Builder methods correctly initialize an XML Sink. */ @Test - public void testBuildXmlSink() { - XmlSink.Bound<Bird> sink = - XmlSink.write() + public void testBuildXmlWriteTransform() { + XmlIO.Write<Bird> write = + XmlIO.<Bird>write() .toFilenamePrefix(testFilePrefix) - .ofRecordClass(testClass) + .withRecordClass(Bird.class) .withRootElement(testRootElement); - assertEquals(testClass, sink.classToBind); - assertEquals(testRootElement, sink.rootElementName); - assertEquals(testFilePrefix, sink.getBaseOutputFilenameProvider().get()); + assertEquals(Bird.class, write.getRecordClass()); + assertEquals(testRootElement, write.getRootElement()); + assertEquals(testFilePrefix, write.getFilenamePrefix()); } - /** - * Alternate builder method correctly initializes an XML Sink. - */ + /** Validation ensures no fields are missing. */ @Test - public void testBuildXmlSinkDirect() { - XmlSink.Bound<Bird> sink = - XmlSink.writeOf(Bird.class, testRootElement, testFilePrefix); - assertEquals(testClass, sink.classToBind); - assertEquals(testRootElement, sink.rootElementName); - assertEquals(testFilePrefix, sink.getBaseOutputFilenameProvider().get()); + public void testValidateXmlSinkMissingRecordClass() { + thrown.expect(NullPointerException.class); + XmlIO.<Bird>write() + .withRootElement(testRootElement) + .toFilenamePrefix(testFilePrefix) + .validate(null); } - /** - * Validation ensures no fields are missing. - */ @Test - public void testValidateXmlSinkMissingFields() { - XmlSink.Bound<Bird> sink; - sink = XmlSink.writeOf(null, testRootElement, testFilePrefix); - validateAndFailIfSucceeds(sink, NullPointerException.class); - sink = XmlSink.writeOf(testClass, null, testFilePrefix); - validateAndFailIfSucceeds(sink, NullPointerException.class); - sink = XmlSink.writeOf(testClass, testRootElement, null); - validateAndFailIfSucceeds(sink, NullPointerException.class); + public void testValidateXmlSinkMissingRootElement() { + thrown.expect(NullPointerException.class); + XmlIO.<Bird>write().withRecordClass(Bird.class).toFilenamePrefix(testFilePrefix).validate(null); } - /** - * Call validate and fail if validation does not throw the expected exception. - */ - private <T> void validateAndFailIfSucceeds( - XmlSink.Bound<T> sink, Class<? extends Exception> expected) { - thrown.expect(expected); - PipelineOptions options = PipelineOptionsFactory.create(); - sink.validate(options); + @Test + public void testValidateXmlSinkMissingFilePrefix() { + thrown.expect(NullPointerException.class); + XmlIO.<Bird>write().withRecordClass(Bird.class).withRootElement(testRootElement).validate(null); } /** @@ -138,13 +128,13 @@ public class XmlSinkTest { @Test public void testCreateWriteOperations() { PipelineOptions options = PipelineOptionsFactory.create(); - XmlSink.Bound<Bird> sink = - XmlSink.writeOf(testClass, testRootElement, testFilePrefix); + XmlSink<Bird> sink = + XmlIO.<Bird>write() + .withRecordClass(Bird.class) + .withRootElement(testRootElement) + .toFilenamePrefix(testFilePrefix) + .createSink(); XmlWriteOperation<Bird> writeOp = sink.createWriteOperation(options); - assertEquals(testClass, writeOp.getSink().classToBind); - assertEquals(testFilePrefix, writeOp.getSink().getBaseOutputFilenameProvider().get()); - assertEquals(testRootElement, writeOp.getSink().rootElementName); - // assertEquals(XmlSink.XML_EXTENSION, writeOp.getSink().getFilenamePolicy().extension); Path outputPath = new File(testFilePrefix).toPath(); Path tempPath = new File(writeOp.tempDirectory.get()).toPath(); assertEquals(outputPath.getParent(), tempPath.getParent()); @@ -159,7 +149,11 @@ public class XmlSinkTest { public void testCreateWriter() throws Exception { PipelineOptions options = PipelineOptionsFactory.create(); XmlWriteOperation<Bird> writeOp = - XmlSink.writeOf(testClass, testRootElement, testFilePrefix) + XmlIO.<Bird>write() + .withRecordClass(Bird.class) + .withRootElement(testRootElement) + .toFilenamePrefix(testFilePrefix) + .createSink() .createWriteOperation(options); XmlWriter<Bird> writer = writeOp.createWriter(options); Path outputPath = new File(testFilePrefix).toPath(); @@ -167,18 +161,17 @@ public class XmlSinkTest { assertEquals(outputPath.getParent(), tempPath.getParent()); assertThat( tempPath.getFileName().toString(), containsString("temp-beam-" + outputPath.getFileName())); - assertEquals(testRootElement, writer.getWriteOperation().getSink().rootElementName); assertNotNull(writer.marshaller); } @Test public void testDisplayData() { - XmlSink.Bound<Integer> sink = XmlSink.write() + XmlIO.Write<Integer> write = XmlIO.<Integer>write() .toFilenamePrefix("foobar") .withRootElement("bird") - .ofRecordClass(Integer.class); + .withRecordClass(Integer.class); - DisplayData displayData = DisplayData.from(sink); + DisplayData displayData = DisplayData.from(write); assertThat(displayData, hasDisplayItem("fileNamePattern", "foobar-SSSSS-of-NNNNN.xml")); assertThat(displayData, hasDisplayItem("rootElement", "bird")); http://git-wip-us.apache.org/repos/asf/beam/blob/d0c0a60c/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java index 5f71f30..0120b8b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java @@ -285,12 +285,14 @@ public class XmlSourceTest { File file = tempFolder.newFile("trainXMLTiny"); Files.write(file.toPath(), tinyXML.getBytes(StandardCharsets.UTF_8)); - XmlSource<Train> source = - XmlSource.<Train>from(file.toPath().toString()) + BoundedSource<Train> source = + XmlIO.<Train>read() + .from(file.toPath().toString()) .withRootElement("trains") .withRecordElement("train") .withRecordClass(Train.class) - .withMinBundleSize(1024); + .withMinBundleSize(1024) + .createSource(); List<Train> expectedResults = ImmutableList.of( new Train("Thomas", Train.TRAIN_NUMBER_UNDEFINED, null, null), @@ -308,12 +310,14 @@ public class XmlSourceTest { File file = tempFolder.newFile("trainXMLTiny"); Files.write(file.toPath(), xmlWithMultiByteChars.getBytes(StandardCharsets.UTF_8)); - XmlSource<Train> source = - XmlSource.<Train>from(file.toPath().toString()) + BoundedSource<Train> source = + XmlIO.<Train>read() + .from(file.toPath().toString()) .withRootElement("trains") .withRecordElement("train") .withRecordClass(Train.class) - .withMinBundleSize(1024); + .withMinBundleSize(1024) + .createSource(); List<Train> expectedResults = ImmutableList.of( new Train("ThomasÂ¥", Train.TRAIN_NUMBER_UNDEFINED, null, null), @@ -334,12 +338,14 @@ public class XmlSourceTest { File file = tempFolder.newFile("trainXMLTiny"); Files.write(file.toPath(), xmlWithMultiByteElementName.getBytes(StandardCharsets.UTF_8)); - XmlSource<Train> source = - XmlSource.<Train>from(file.toPath().toString()) + BoundedSource<Train> source = + XmlIO.<Train>read() + .from(file.toPath().toString()) .withRootElement("දà·à¶¸à·à¶»à·à¶ºà¶±à·") .withRecordElement("දà·à¶¸à·à¶»à·à¶º") .withRecordClass(Train.class) - .withMinBundleSize(1024); + .withMinBundleSize(1024) + .createSource(); List<Train> expectedResults = ImmutableList.of( new Train("Thomas", Train.TRAIN_NUMBER_UNDEFINED, null, null), @@ -357,18 +363,20 @@ public class XmlSourceTest { File file = tempFolder.newFile("trainXMLTiny"); Files.write(file.toPath(), tinyXML.getBytes(StandardCharsets.UTF_8)); - XmlSource<Train> source = - XmlSource.<Train>from(file.toPath().toString()) + BoundedSource<Train> source = + XmlIO.<Train>read() + .from(file.toPath().toString()) .withRootElement("trains") .withRecordElement("train") .withRecordClass(Train.class) - .withMinBundleSize(10); - List<? extends FileBasedSource<Train>> splits = source.split(50, null); + .withMinBundleSize(10) + .createSource(); + List<? extends BoundedSource<Train>> splits = source.split(50, null); assertTrue(splits.size() > 2); List<Train> results = new ArrayList<>(); - for (FileBasedSource<Train> split : splits) { + for (BoundedSource<Train> split : splits) { results.addAll(readEverythingFromReader(split.createReader(null))); } @@ -394,12 +402,14 @@ public class XmlSourceTest { File file = tempFolder.newFile("trainXMLSmall"); Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8)); - XmlSource<Train> source = - XmlSource.<Train>from(file.toPath().toString()) + BoundedSource<Train> source = + XmlIO.<Train>read() + .from(file.toPath().toString()) .withRootElement("trains") .withRecordElement("train") .withRecordClass(Train.class) - .withMinBundleSize(1024); + .withMinBundleSize(1024) + .createSource(); List<Train> expectedResults = ImmutableList.of(new Train("Thomas", 1, "blue", null), new Train("Henry", 3, "green", null), @@ -417,10 +427,12 @@ public class XmlSourceTest { File file = tempFolder.newFile("trainXMLSmall"); Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8)); - XmlSource<Train> source = - XmlSource.<Train>from(file.toPath().toString()) + BoundedSource<Train> source = + XmlIO.<Train>read() + .from(file.toPath().toString()) .withRecordElement("train") - .withRecordClass(Train.class); + .withRecordClass(Train.class) + .createSource(); exception.expect(NullPointerException.class); exception.expectMessage( @@ -433,10 +445,12 @@ public class XmlSourceTest { File file = tempFolder.newFile("trainXMLSmall"); Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8)); - XmlSource<Train> source = - XmlSource.<Train>from(file.toPath().toString()) + BoundedSource<Train> source = + XmlIO.<Train>read() + .from(file.toPath().toString()) .withRootElement("trains") - .withRecordClass(Train.class); + .withRecordClass(Train.class) + .createSource(); exception.expect(NullPointerException.class); exception.expectMessage( @@ -449,10 +463,12 @@ public class XmlSourceTest { File file = tempFolder.newFile("trainXMLSmall"); Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8)); - XmlSource<Train> source = - XmlSource.<Train>from(file.toPath().toString()) + BoundedSource<Train> source = + XmlIO.<Train>read() + .from(file.toPath().toString()) .withRootElement("trains") - .withRecordElement("train"); + .withRecordElement("train") + .createSource(); exception.expect(NullPointerException.class); exception.expectMessage( @@ -465,11 +481,13 @@ public class XmlSourceTest { File file = tempFolder.newFile("trainXMLSmall"); Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8)); - XmlSource<Train> source = - XmlSource.<Train>from(file.toPath().toString()) + BoundedSource<Train> source = + XmlIO.<Train>read() + .from(file.toPath().toString()) .withRootElement("something") .withRecordElement("train") - .withRecordClass(Train.class); + .withRecordClass(Train.class) + .createSource(); exception.expectMessage("Unexpected close tag </trains>; expected </something>."); readEverythingFromReader(source.createReader(null)); @@ -480,11 +498,13 @@ public class XmlSourceTest { File file = tempFolder.newFile("trainXMLSmall"); Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8)); - XmlSource<Train> source = - XmlSource.<Train>from(file.toPath().toString()) + BoundedSource<Train> source = + XmlIO.<Train>read() + .from(file.toPath().toString()) .withRootElement("trains") .withRecordElement("something") - .withRecordClass(Train.class); + .withRecordClass(Train.class) + .createSource(); assertEquals(readEverythingFromReader(source.createReader(null)), new ArrayList<Train>()); } @@ -500,11 +520,13 @@ public class XmlSourceTest { File file = tempFolder.newFile("trainXMLSmall"); Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8)); - XmlSource<WrongTrainType> source = - XmlSource.<WrongTrainType>from(file.toPath().toString()) + BoundedSource<WrongTrainType> source = + XmlIO.<WrongTrainType>read() + .from(file.toPath().toString()) .withRootElement("trains") .withRecordElement("train") - .withRecordClass(WrongTrainType.class); + .withRecordClass(WrongTrainType.class) + .createSource(); exception.expect(RuntimeException.class); @@ -525,11 +547,13 @@ public class XmlSourceTest { File file = tempFolder.newFile("trainXMLSmall"); Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8)); - XmlSource<Train> source = - XmlSource.<Train>from(file.toPath().toString()) + BoundedSource<Train> source = + XmlIO.<Train>read() + .from(file.toPath().toString()) .withRootElement("trains") .withRecordElement("train") - .withRecordClass(Train.class); + .withRecordClass(Train.class) + .createSource(); List<Train> expectedResults = ImmutableList.of(new Train("Thomas", 1, "blue", null), new Train("Henry", 3, "green", null), @@ -548,12 +572,14 @@ public class XmlSourceTest { File file = tempFolder.newFile("trainXMLSmall"); Files.write(file.toPath(), trainXMLWithEmptyTags.getBytes(StandardCharsets.UTF_8)); - XmlSource<Train> source = - XmlSource.<Train>from(file.toPath().toString()) + BoundedSource<Train> source = + XmlIO.<Train>read() + .from(file.toPath().toString()) .withRootElement("trains") .withRecordElement("train") .withRecordClass(Train.class) - .withMinBundleSize(1024); + .withMinBundleSize(1024) + .createSource(); List<Train> expectedResults = ImmutableList.of(new Train("Thomas", 1, "blue", null), new Train("Henry", 3, "green", null), new Train("Toby", 7, "brown", null), @@ -572,14 +598,15 @@ public class XmlSourceTest { File file = tempFolder.newFile("trainXMLSmall"); Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8)); - XmlSource<Train> source = - XmlSource.<Train>from(file.toPath().toString()) - .withRootElement("trains") - .withRecordElement("train") - .withRecordClass(Train.class) - .withMinBundleSize(1024); - - PCollection<Train> output = p.apply("ReadFileData", Read.from(source)); + PCollection<Train> output = + p.apply( + "ReadFileData", + XmlIO.<Train>read() + .from(file.toPath().toString()) + .withRootElement("trains") + .withRecordElement("train") + .withRecordClass(Train.class) + .withMinBundleSize(1024)); List<Train> expectedResults = ImmutableList.of(new Train("Thomas", 1, "blue", null), new Train("Henry", 3, "green", null), @@ -595,12 +622,14 @@ public class XmlSourceTest { File file = tempFolder.newFile("trainXMLSmall"); Files.write(file.toPath(), trainXMLWithAttributes.getBytes(StandardCharsets.UTF_8)); - XmlSource<Train> source = - XmlSource.<Train>from(file.toPath().toString()) + BoundedSource<Train> source = + XmlIO.<Train>read() + .from(file.toPath().toString()) .withRootElement("trains") .withRecordElement("train") .withRecordClass(Train.class) - .withMinBundleSize(1024); + .withMinBundleSize(1024) + .createSource(); List<Train> expectedResults = ImmutableList.of(new Train("Thomas", 1, "blue", "small"), new Train("Henry", 3, "green", "big"), new Train("Toby", 7, "brown", "small"), @@ -618,12 +647,14 @@ public class XmlSourceTest { File file = tempFolder.newFile("trainXMLSmall"); Files.write(file.toPath(), trainXMLWithSpaces.getBytes(StandardCharsets.UTF_8)); - XmlSource<Train> source = - XmlSource.<Train>from(file.toPath().toString()) + BoundedSource<Train> source = + XmlIO.<Train>read() + .from(file.toPath().toString()) .withRootElement("trains") .withRecordElement("train") .withRecordClass(Train.class) - .withMinBundleSize(1024); + .withMinBundleSize(1024) + .createSource(); List<Train> expectedResults = ImmutableList.of(new Train("Thomas ", 1, "blue", null), new Train("Henry", 3, "green", null), new Train("Toby", 7, " brown ", null), @@ -642,12 +673,14 @@ public class XmlSourceTest { List<Train> trains = generateRandomTrainList(100); File file = createRandomTrainXML(fileName, trains); - XmlSource<Train> source = - XmlSource.<Train>from(file.toPath().toString()) + BoundedSource<Train> source = + XmlIO.<Train>read() + .from(file.toPath().toString()) .withRootElement("trains") .withRecordElement("train") .withRecordClass(Train.class) - .withMinBundleSize(1024); + .withMinBundleSize(1024) + .createSource(); assertThat( trainsToStrings(trains), @@ -662,13 +695,15 @@ public class XmlSourceTest { List<Train> trains = generateRandomTrainList(100); File file = createRandomTrainXML(fileName, trains); - XmlSource<Train> source = - XmlSource.<Train>from(file.toPath().toString()) - .withRootElement("trains") - .withRecordElement("train") - .withRecordClass(Train.class) - .withMinBundleSize(1024); - PCollection<Train> output = p.apply("ReadFileData", Read.from(source)); + PCollection<Train> output = + p.apply( + "ReadFileData", + XmlIO.<Train>read() + .from(file.toPath().toString()) + .withRootElement("trains") + .withRecordElement("train") + .withRecordClass(Train.class) + .withMinBundleSize(1024)); PAssert.that(output).containsInAnyOrder(trains); p.run(); @@ -680,18 +715,20 @@ public class XmlSourceTest { List<Train> trains = generateRandomTrainList(10); File file = createRandomTrainXML(fileName, trains); - XmlSource<Train> source = - XmlSource.<Train>from(file.toPath().toString()) + BoundedSource<Train> source = + XmlIO.<Train>read() + .from(file.toPath().toString()) .withRootElement("trains") .withRecordElement("train") .withRecordClass(Train.class) - .withMinBundleSize(10); - List<? extends FileBasedSource<Train>> splits = source.split(100, null); + .withMinBundleSize(10) + .createSource(); + List<? extends BoundedSource<Train>> splits = source.split(100, null); assertTrue(splits.size() > 2); List<Train> results = new ArrayList<>(); - for (FileBasedSource<Train> split : splits) { + for (BoundedSource<Train> split : splits) { results.addAll(readEverythingFromReader(split.createReader(null))); } @@ -704,19 +741,21 @@ public class XmlSourceTest { List<Train> trains = generateRandomTrainList(100); File file = createRandomTrainXML(fileName, trains); - XmlSource<Train> source = - XmlSource.<Train>from(file.toPath().toString()) + BoundedSource<Train> source = + XmlIO.<Train>read() + .from(file.toPath().toString()) .withRootElement("trains") .withRecordElement("train") .withRecordClass(Train.class) - .withMinBundleSize(10); - List<? extends FileBasedSource<Train>> splits = source.split(256, null); + .withMinBundleSize(10) + .createSource(); + List<? extends BoundedSource<Train>> splits = source.split(256, null); // Not a trivial split assertTrue(splits.size() > 2); List<Train> results = new ArrayList<>(); - for (FileBasedSource<Train> split : splits) { + for (BoundedSource<Train> split : splits) { results.addAll(readEverythingFromReader(split.createReader(null))); } assertThat(trainsToStrings(trains), containsInAnyOrder(trainsToStrings(results).toArray())); @@ -729,14 +768,16 @@ public class XmlSourceTest { List<Train> trains = generateRandomTrainList(100); File file = createRandomTrainXML(fileName, trains); - XmlSource<Train> fileSource = - XmlSource.<Train>from(file.toPath().toString()) + BoundedSource<Train> fileSource = + XmlIO.<Train>read() + .from(file.toPath().toString()) .withRootElement("trains") .withRecordElement("train") .withRecordClass(Train.class) - .withMinBundleSize(10); + .withMinBundleSize(10) + .createSource(); - List<? extends FileBasedSource<Train>> splits = + List<? extends BoundedSource<Train>> splits = fileSource.split(file.length() / 3, null); for (BoundedSource<Train> splitSource : splits) { int numItems = readEverythingFromReader(splitSource.createReader(null)).size(); @@ -771,11 +812,13 @@ public class XmlSourceTest { File file = tempFolder.newFile("trainXMLSmall"); Files.write(file.toPath(), trainXMLWithAllFeaturesSingleByte.getBytes(StandardCharsets.UTF_8)); - XmlSource<Train> source = - XmlSource.<Train>from(file.toPath().toString()) + BoundedSource<Train> source = + XmlIO.<Train>read() + .from(file.toPath().toString()) .withRootElement("trains") .withRecordElement("train") - .withRecordClass(Train.class); + .withRecordClass(Train.class) + .createSource(); assertSplitAtFractionExhaustive(source, options); } @@ -788,11 +831,13 @@ public class XmlSourceTest { File file = tempFolder.newFile("trainXMLSmall"); Files.write(file.toPath(), trainXMLWithAllFeaturesMultiByte.getBytes(StandardCharsets.UTF_8)); - XmlSource<Train> source = - XmlSource.<Train>from(file.toPath().toString()) + BoundedSource<Train> source = + XmlIO.<Train>read() + .from(file.toPath().toString()) .withRootElement("දà·à¶¸à·à¶»à·à¶ºà¶±à·") .withRecordElement("දà·à¶¸à·à¶»à·à¶º") - .withRecordClass(Train.class); + .withRecordClass(Train.class) + .createSource(); assertSplitAtFractionExhaustive(source, options); } @@ -808,13 +853,15 @@ public class XmlSourceTest { generateRandomTrainList(8); createRandomTrainXML("otherfile.xml", trains1); - XmlSource<Train> source = - XmlSource.<Train>from(file.getParent() + "/" + "temp*.xml") - .withRootElement("trains") - .withRecordElement("train") - .withRecordClass(Train.class) - .withMinBundleSize(1024); - PCollection<Train> output = p.apply("ReadFileData", Read.from(source)); + PCollection<Train> output = + p.apply( + "ReadFileData", + XmlIO.<Train>read() + .from(file.getParent() + "/" + "temp*.xml") + .withRootElement("trains") + .withRecordElement("train") + .withRecordClass(Train.class) + .withMinBundleSize(1024)); List<Train> expectedResults = new ArrayList<>(); expectedResults.addAll(trains1); @@ -827,15 +874,14 @@ public class XmlSourceTest { @Test public void testDisplayData() { - - - XmlSource<?> source = XmlSource - .<Integer>from("foo.xml") - .withRootElement("bird") - .withRecordElement("cat") - .withMinBundleSize(1234) - .withRecordClass(Integer.class); - DisplayData displayData = DisplayData.from(source); + DisplayData displayData = + DisplayData.from( + XmlIO.<Integer>read() + .from("foo.xml") + .withRootElement("bird") + .withRecordElement("cat") + .withMinBundleSize(1234) + .withRecordClass(Integer.class)); assertThat(displayData, hasDisplayItem("filePattern", "foo.xml")); assertThat(displayData, hasDisplayItem("rootElement", "bird")); http://git-wip-us.apache.org/repos/asf/beam/blob/d0c0a60c/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java index c617f06..9b24b69 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java @@ -44,8 +44,10 @@ import static org.junit.Assert.fail; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.auto.value.AutoValue; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMultimap; +import com.google.common.collect.Iterables; import com.google.common.collect.Multimap; import com.google.common.testing.EqualsTester; import java.io.IOException; @@ -1299,6 +1301,21 @@ public class DisplayDataTest implements Serializable { DisplayData.from(component); } + @AutoValue + abstract static class Foo implements HasDisplayData { + @Override + public void populateDisplayData(Builder builder) { + builder.add(DisplayData.item("someKey", "someValue")); + } + } + + @Test + public void testAutoValue() { + DisplayData data = DisplayData.from(new AutoValue_DisplayDataTest_Foo()); + Item item = Iterables.getOnlyElement(data.asMap().values()); + assertEquals(Foo.class, item.getNamespace()); + } + private String quoted(Object obj) { return String.format("\"%s\"", obj); }