Repository: beam Updated Branches: refs/heads/master e01c78da7 -> 0c2211375
Many improvements to TikaIO This addresses most of the comments in #3378. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ba93dd39 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ba93dd39 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ba93dd39 Branch: refs/heads/master Commit: ba93dd39111ab2b13f811d0abeb76a49a4a4f035 Parents: e01c78d Author: Sergey Beryozkin <sberyoz...@gmail.com> Authored: Mon Sep 11 16:11:10 2017 +0100 Committer: Eugene Kirpichov <kirpic...@google.com> Committed: Thu Oct 26 12:45:19 2017 -0700 ---------------------------------------------------------------------- sdks/java/io/tika/pom.xml | 10 - .../apache/beam/sdk/io/tika/ParseResult.java | 98 ++++ .../org/apache/beam/sdk/io/tika/TikaIO.java | 334 ++++++------- .../apache/beam/sdk/io/tika/TikaOptions.java | 78 ---- .../org/apache/beam/sdk/io/tika/TikaSource.java | 466 ------------------- .../beam/sdk/io/tika/ParseResultTest.java | 43 ++ .../org/apache/beam/sdk/io/tika/TikaIOTest.java | 252 ++++------ .../apache/beam/sdk/io/tika/TikaReaderTest.java | 82 ---- .../apache/beam/sdk/io/tika/TikaSourceTest.java | 73 --- 9 files changed, 392 insertions(+), 1044 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/ba93dd39/sdks/java/io/tika/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/tika/pom.xml b/sdks/java/io/tika/pom.xml index b8f7ece..d7f7e42 100644 --- a/sdks/java/io/tika/pom.xml +++ b/sdks/java/io/tika/pom.xml @@ -54,16 +54,6 @@ </dependency> <dependency> - <groupId>joda-time</groupId> - <artifactId>joda-time</artifactId> - </dependency> - - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - </dependency> - - <dependency> <groupId>org.apache.tika</groupId> <artifactId>tika-core</artifactId> <version>${tika.version}</version> http://git-wip-us.apache.org/repos/asf/beam/blob/ba93dd39/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/ParseResult.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/ParseResult.java b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/ParseResult.java new file mode 100644 index 0000000..0a77491 --- /dev/null +++ b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/ParseResult.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.tika; + +import java.io.Serializable; +import java.util.Arrays; + +import org.apache.tika.metadata.Metadata; + +/** + * Tika parse result containing the file location, metadata + * and content converted to String. + */ +@SuppressWarnings("serial") +public class ParseResult implements Serializable { + private final String fileLocation; + private final String content; + private final Metadata metadata; + private final String[] metadataNames; + + public ParseResult(String fileLocation, String content) { + this(fileLocation, content, new Metadata()); + } + + public ParseResult(String fileLocation, String content, Metadata metadata) { + this.fileLocation = fileLocation; + this.content = content; + this.metadata = metadata; + this.metadataNames = metadata.names(); + } + + /** + * Gets a file content. + */ + public String getContent() { + return content; + } + + /** + * Gets a file metadata. + */ + public Metadata getMetadata() { + return metadata; + } + + /** + * Gets a file location. + */ + public String getFileLocation() { + return fileLocation; + } + + @Override + public int hashCode() { + int hashCode = 1; + hashCode = 31 * hashCode + fileLocation.hashCode(); + hashCode = 31 * hashCode + content.hashCode(); + hashCode = 31 * hashCode + getMetadataHashCode(); + return hashCode; + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof ParseResult)) { + return false; + } + + ParseResult pr = (ParseResult) obj; + return this.fileLocation.equals(pr.fileLocation) + && this.content.equals(pr.content) + && this.metadata.equals(pr.metadata); + } + + //TODO: + // Remove this function and use metadata.hashCode() once Apache Tika 1.17 gets released. + private int getMetadataHashCode() { + int hashCode = 0; + for (String name : metadataNames) { + hashCode += name.hashCode() ^ Arrays.hashCode(metadata.getValues(name)); + } + return hashCode; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba93dd39/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java index 4876dcf..32353e1 100644 --- a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java +++ b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java @@ -18,40 +18,72 @@ package org.apache.beam.sdk.io.tika; import static com.google.common.base.Preconditions.checkNotNull; + import com.google.auto.value.AutoValue; +import java.io.InputStream; +import java.nio.channels.Channels; + import javax.annotation.Nullable; + import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.io.Read.Bounded; +import org.apache.beam.sdk.io.Compression; +import org.apache.beam.sdk.io.FileIO; +import org.apache.beam.sdk.io.FileIO.ReadableFile; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; +import org.apache.tika.config.TikaConfig; +import org.apache.tika.io.TikaInputStream; import org.apache.tika.metadata.Metadata; +import org.apache.tika.parser.AutoDetectParser; +import org.apache.tika.parser.ParseContext; +import org.apache.tika.parser.Parser; +import org.apache.tika.sax.ToTextContentHandler; +import org.xml.sax.ContentHandler; + + /** - * {@link PTransform} for parsing arbitrary files using Apache Tika. + * A collection of {@link PTransform} transforms for parsing arbitrary files using Apache Tika. * Files in many well known text, binary or scientific formats can be processed. * - * <p>To read a {@link PCollection} from one or more files - * use {@link TikaIO.Read#from(String)} - * to specify the path of the file(s) to be read. + * <p>{@link TikaIO.Parse} and {@link TikaIO.ParseAll} parse the files and return + * a {@link PCollection} containing one {@link ParseResult} per each file. + * + * <p>Combine {@link TikaIO.ParseAll} with {@link FileIO.Match} + * and {@link FileIO.ReadMatches} to match, read and parse the files. + * + * <p>Example: * - * <p>{@link TikaIO.Read} returns a bounded {@link PCollection} of {@link String Strings}, - * each corresponding to a sequence of characters reported by Apache Tika SAX Parser. + * <pre>{@code + * Pipeline p = ...; + * + * // A simple parse of a local PDF file (only runs locally): + * PCollection<ParseResult> results = + * p.apply(FileIO.match().filepattern("/local/path/to/file.pdf")) + * .apply(FileIO.readMatches()) + * .apply(TikaIO.parseFiles()); + * }</pre> + * + * <p>Use {@link TikaIO.Parse} to match, read and parse the files in simple cases. * * <p>Example: * * <pre>{@code * Pipeline p = ...; * - * // A simple Read of a local PDF file (only runs locally): - * PCollection<String> content = p.apply(TikaInput.from("/local/path/to/file.pdf")); + * // A simple parse of a local PDF file (only runs locally): + * PCollection<ParseResult> results = + * p.apply(TikaIO.parseAll().filepattern("/local/path/to/file.pdf")); * }</pre> * * <b>Warning:</b> the API of this IO is likely to change in the next release. @@ -60,85 +92,96 @@ import org.apache.tika.metadata.Metadata; public class TikaIO { /** - * A {@link PTransform} that parses one or more files and returns a bounded {@link PCollection} - * containing one element for each sequence of characters reported by Apache Tika SAX Parser. + * A {@link PTransform} that matches and parses the files + * and returns a bounded {@link PCollection} of {@link ParseResult}. */ - public static Read read() { - return new AutoValue_TikaIO_Read.Builder() - .setQueuePollTime(Read.DEFAULT_QUEUE_POLL_TIME) - .setQueueMaxPollTime(Read.DEFAULT_QUEUE_MAX_POLL_TIME) + public static Parse parse() { + return new AutoValue_TikaIO_Parse.Builder() .build(); - } + } - /** Implementation of {@link #read}. */ - @AutoValue - public abstract static class Read extends PTransform<PBegin, PCollection<String>> { - private static final long serialVersionUID = 2198301984784351829L; - public static final long DEFAULT_QUEUE_POLL_TIME = 50L; - public static final long DEFAULT_QUEUE_MAX_POLL_TIME = 3000L; + /** + * A {@link PTransform} that accepts a {@link PCollection} of {@link ReadableFile} + * and returns a {@link PCollection} of {@link ParseResult}. + */ + public static ParseAll parseAll() { + return new AutoValue_TikaIO_ParseAll.Builder() + .build(); + } - @Nullable abstract ValueProvider<String> getFilepattern(); - @Nullable abstract ValueProvider<String> getTikaConfigPath(); - @Nullable abstract Metadata getInputMetadata(); - @Nullable abstract Boolean getReadOutputMetadata(); - @Nullable abstract Long getQueuePollTime(); - @Nullable abstract Long getQueueMaxPollTime(); - @Nullable abstract Integer getMinimumTextLength(); - @Nullable abstract Boolean getParseSynchronously(); + /** Implementation of {@link #parse}. */ + @SuppressWarnings("serial") + @AutoValue + public abstract static class Parse extends PTransform<PBegin, PCollection<ParseResult>> { + @Nullable + abstract ValueProvider<String> getFilepattern(); abstract Builder toBuilder(); @AutoValue.Builder abstract static class Builder { abstract Builder setFilepattern(ValueProvider<String> filepattern); - abstract Builder setTikaConfigPath(ValueProvider<String> tikaConfigPath); - abstract Builder setInputMetadata(Metadata metadata); - abstract Builder setReadOutputMetadata(Boolean value); - abstract Builder setQueuePollTime(Long value); - abstract Builder setQueueMaxPollTime(Long value); - abstract Builder setMinimumTextLength(Integer value); - abstract Builder setParseSynchronously(Boolean value); - abstract Read build(); + abstract Parse build(); } - /** - * A {@link PTransform} that parses one or more files with the given filename - * or filename pattern and returns a bounded {@link PCollection} containing - * one element for each sequence of characters reported by Apache Tika SAX Parser. - * - * <p>Filepattern can be a local path (if running locally), or a Google Cloud Storage - * filename or filename pattern of the form {@code "gs://<bucket>/<filepath>"} - * (if running locally or using remote execution service). - * - * <p>Standard <a href="http://docs.oracle.com/javase/tutorial/essential/io/find.html" >Java - * Filesystem glob patterns</a> ("*", "?", "[..]") are supported. - */ - public Read from(String filepattern) { - checkNotNull(filepattern, "Filepattern cannot be empty."); - return from(StaticValueProvider.of(filepattern)); + /** Matches the given filepattern. */ + public Parse filepattern(String filepattern) { + return this.filepattern(ValueProvider.StaticValueProvider.of(filepattern)); } - /** Same as {@code from(filepattern)}, but accepting a {@link ValueProvider}. */ - public Read from(ValueProvider<String> filepattern) { - checkNotNull(filepattern, "Filepattern cannot be empty."); - return toBuilder() - .setFilepattern(filepattern) - .setQueuePollTime(Read.DEFAULT_QUEUE_POLL_TIME) - .setQueueMaxPollTime(Read.DEFAULT_QUEUE_MAX_POLL_TIME) - .build(); + /** Like {@link #filepattern(String)} but using a {@link ValueProvider}. */ + public Parse filepattern(ValueProvider<String> filepattern) { + return toBuilder().setFilepattern(filepattern).build(); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + + builder + .addIfNotNull( + DisplayData.item("filePattern", getFilepattern()).withLabel("File Pattern")); + } + + @Override + public PCollection<ParseResult> expand(PBegin input) { + return input + .apply(FileIO.match().filepattern(getFilepattern())) + .apply(FileIO.readMatches().withCompression(Compression.UNCOMPRESSED)) + .apply(parseAll()); + } + } + + /** Implementation of {@link #parseAll}. */ + @SuppressWarnings("serial") + @AutoValue + public abstract static class ParseAll extends + PTransform<PCollection<ReadableFile>, PCollection<ParseResult>> { + + @Nullable abstract ValueProvider<String> getTikaConfigPath(); + @Nullable abstract Metadata getInputMetadata(); + + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setTikaConfigPath(ValueProvider<String> tikaConfigPath); + abstract Builder setInputMetadata(Metadata metadata); + + abstract ParseAll build(); } /** * Returns a new transform which will use the custom TikaConfig. */ - public Read withTikaConfigPath(String tikaConfigPath) { + public ParseAll withTikaConfigPath(String tikaConfigPath) { checkNotNull(tikaConfigPath, "TikaConfigPath cannot be empty."); return withTikaConfigPath(StaticValueProvider.of(tikaConfigPath)); } /** Same as {@code with(tikaConfigPath)}, but accepting a {@link ValueProvider}. */ - public Read withTikaConfigPath(ValueProvider<String> tikaConfigPath) { + public ParseAll withTikaConfigPath(ValueProvider<String> tikaConfigPath) { checkNotNull(tikaConfigPath, "TikaConfigPath cannot be empty."); return toBuilder() .setTikaConfigPath(tikaConfigPath) @@ -149,7 +192,7 @@ public class TikaIO { * Returns a new transform which will use the provided content type hint * to make the file parser detection more efficient. */ - public Read withContentTypeHint(String contentType) { + public ParseAll withContentTypeHint(String contentType) { checkNotNull(contentType, "ContentType cannot be empty."); Metadata metadata = new Metadata(); metadata.add(Metadata.CONTENT_TYPE, contentType); @@ -160,7 +203,7 @@ public class TikaIO { * Returns a new transform which will use the provided input metadata * for parsing the files. */ - public Read withInputMetadata(Metadata metadata) { + public ParseAll withInputMetadata(Metadata metadata) { Metadata inputMetadata = this.getInputMetadata(); if (inputMetadata != null) { for (String name : metadata.names()) { @@ -172,88 +215,15 @@ public class TikaIO { return toBuilder().setInputMetadata(inputMetadata).build(); } - /** - * Returns a new transform which will report the metadata. - */ - public Read withReadOutputMetadata(Boolean value) { - return toBuilder().setReadOutputMetadata(value).build(); - } - - /** - * Returns a new transform which will use the specified queue poll time. - */ - public Read withQueuePollTime(Long value) { - return toBuilder().setQueuePollTime(value).build(); - } - - /** - * Returns a new transform which will use the specified queue max poll time. - */ - public Read withQueueMaxPollTime(Long value) { - return toBuilder().setQueueMaxPollTime(value).build(); - } - - /** - * Returns a new transform which will operate on the text blocks with the - * given minimum text length. - */ - public Read withMinimumTextlength(Integer value) { - return toBuilder().setMinimumTextLength(value).build(); - } - - /** - * Returns a new transform which will use the synchronous reader. - */ - public Read withParseSynchronously(Boolean value) { - return toBuilder().setParseSynchronously(value).build(); - } - - /** - * Path to Tika configuration resource. - */ - public Read withOptions(TikaOptions options) { - checkNotNull(options, "TikaOptions cannot be empty."); - Builder builder = toBuilder(); - builder.setFilepattern(StaticValueProvider.of(options.getInput())) - .setQueuePollTime(options.getQueuePollTime()) - .setQueueMaxPollTime(options.getQueueMaxPollTime()) - .setMinimumTextLength(options.getMinimumTextLength()) - .setParseSynchronously(options.getParseSynchronously()); - if (options.getContentTypeHint() != null) { - Metadata metadata = this.getInputMetadata(); - if (metadata == null) { - metadata = new Metadata(); - } - metadata.add(Metadata.CONTENT_TYPE, options.getContentTypeHint()); - builder.setInputMetadata(metadata); - } - if (options.getTikaConfigPath() != null) { - builder.setTikaConfigPath(StaticValueProvider.of(options.getTikaConfigPath())); - } - if (Boolean.TRUE.equals(options.getReadOutputMetadata())) { - builder.setReadOutputMetadata(options.getReadOutputMetadata()); - } - return builder.build(); - } - @Override - public PCollection<String> expand(PBegin input) { - checkNotNull(this.getFilepattern(), "Filepattern cannot be empty."); - final Bounded<String> read = org.apache.beam.sdk.io.Read.from(new TikaSource(this)); - PCollection<String> pcol = input.getPipeline().apply(read); - pcol.setCoder(getDefaultOutputCoder()); - return pcol; + public PCollection<ParseResult> expand(PCollection<ReadableFile> input) { + return input.apply(ParDo.of(new ParseToStringFn(this))); } @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - String filepatternDisplay = getFilepattern().isAccessible() - ? getFilepattern().get() : getFilepattern().toString(); - builder - .addIfNotNull(DisplayData.item("filePattern", filepatternDisplay) - .withLabel("File Pattern")); if (getTikaConfigPath() != null) { String tikaConfigPathDisplay = getTikaConfigPath().isAccessible() ? getTikaConfigPath().get() : getTikaConfigPath().toString(); @@ -262,49 +232,55 @@ public class TikaIO { } Metadata metadata = getInputMetadata(); if (metadata != null) { - StringBuilder sb = new StringBuilder(); - sb.append('['); - for (String name : metadata.names()) { - if (sb.length() > 1) { - sb.append(','); - } - sb.append(name).append('=').append(metadata.get(name)); - } - sb.append(']'); + //TODO: use metadata.toString() only without a trim() once Apache Tika 1.17 gets released builder - .add(DisplayData.item("inputMetadata", sb.toString()) + .add(DisplayData.item("inputMetadata", metadata.toString().trim()) .withLabel("Input Metadata")); } - if (Boolean.TRUE.equals(getParseSynchronously())) { - builder - .add(DisplayData.item("parseMode", "synchronous") - .withLabel("Parse Mode")); - } else { - builder - .add(DisplayData.item("parseMode", "asynchronous") - .withLabel("Parse Mode")); - builder - .add(DisplayData.item("queuePollTime", getQueuePollTime().toString()) - .withLabel("Queue Poll Time")) - .add(DisplayData.item("queueMaxPollTime", getQueueMaxPollTime().toString()) - .withLabel("Queue Max Poll Time")); - } - Integer minTextLen = getMinimumTextLength(); - if (minTextLen != null && minTextLen > 0) { - builder - .add(DisplayData.item("minTextLen", getMinimumTextLength().toString()) - .withLabel("Minimum Text Length")); + } + + private static class ParseToStringFn extends DoFn<ReadableFile, ParseResult> { + + private static final long serialVersionUID = 6837207505313720989L; + private final TikaIO.ParseAll spec; + private TikaConfig tikaConfig; + + ParseToStringFn(TikaIO.ParseAll spec) { + this.spec = spec; } - if (Boolean.TRUE.equals(getReadOutputMetadata())) { - builder - .add(DisplayData.item("readOutputMetadata", "true") - .withLabel("Read Output Metadata")); + + @Setup + public void setup() throws Exception { + if (spec.getTikaConfigPath() != null) { + ResourceId configResource = + FileSystems.matchSingleFileSpec(spec.getTikaConfigPath().get()).resourceId(); + tikaConfig = new TikaConfig( + Channels.newInputStream(FileSystems.open(configResource))); + } } - } - @Override - protected Coder<String> getDefaultOutputCoder() { - return StringUtf8Coder.of(); + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + ReadableFile file = c.element(); + InputStream stream = Channels.newInputStream(file.open()); + try (InputStream tikaStream = TikaInputStream.get(stream)) { + + final Parser parser = tikaConfig == null + ? new AutoDetectParser() : new AutoDetectParser(tikaConfig); + + final ParseContext context = new ParseContext(); + context.set(Parser.class, parser); + Metadata tikaMetadata = spec.getInputMetadata() != null + ? spec.getInputMetadata() : new org.apache.tika.metadata.Metadata(); + + ContentHandler tikaHandler = new ToTextContentHandler(); + parser.parse(tikaStream, tikaHandler, tikaMetadata, context); + + c.output(new ParseResult(file.getMetadata().resourceId().toString(), + tikaHandler.toString(), + tikaMetadata)); + } + } } } } http://git-wip-us.apache.org/repos/asf/beam/blob/ba93dd39/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaOptions.java b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaOptions.java deleted file mode 100644 index fb97678..0000000 --- a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaOptions.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.tika; - -import org.apache.beam.sdk.options.Default; -import org.apache.beam.sdk.options.Description; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.Validation; - -/** - * TikaInput Options to support the command-line applications. - */ -public interface TikaOptions extends PipelineOptions { - - @Description("Input path") - @Validation.Required - String getInput(); - void setInput(String value); - - @Description("Tika Config path") - String getTikaConfigPath(); - void setTikaConfigPath(String value); - - @Description("Tika Parser Content Type hint") - String getContentTypeHint(); - void setContentTypeHint(String value); - - @Description("Metadata report status") - @Default.Boolean(false) - Boolean getReadOutputMetadata(); - void setReadOutputMetadata(Boolean value); - - @Description("Optional use of the synchronous reader") - @Default.Boolean(false) - Boolean getParseSynchronously(); - void setParseSynchronously(Boolean value); - - @Description("Tika Parser queue poll time in milliseconds") - @Default.Long(TikaIO.Read.DEFAULT_QUEUE_POLL_TIME) - Long getQueuePollTime(); - void setQueuePollTime(Long value); - - @Description("Tika Parser queue maximum poll time in milliseconds") - @Default.Long(TikaIO.Read.DEFAULT_QUEUE_MAX_POLL_TIME) - Long getQueueMaxPollTime(); - void setQueueMaxPollTime(Long value); - - @Description("Minumin text fragment length for Tika Parser to report") - @Default.Integer(0) - Integer getMinimumTextLength(); - void setMinimumTextLength(Integer value); - - @Description("Pipeline name") - @Default.String("TikaRead") - String getPipelineName(); - void setPipelineName(String value); - - @Description("Output path") - @Default.String("/tmp/tika/out") - String getOutput(); - void setOutput(String value); - -} http://git-wip-us.apache.org/repos/asf/beam/blob/ba93dd39/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaSource.java b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaSource.java deleted file mode 100644 index 7c8852b..0000000 --- a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaSource.java +++ /dev/null @@ -1,466 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.tika; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; - -import java.io.IOException; -import java.io.InputStream; -import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.ListIterator; -import java.util.NoSuchElementException; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -import javax.annotation.Nullable; - -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.io.BoundedSource; -import org.apache.beam.sdk.io.FileSystems; -import org.apache.beam.sdk.io.Source; -import org.apache.beam.sdk.io.fs.MatchResult; -import org.apache.beam.sdk.io.fs.MatchResult.Metadata; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.tika.config.TikaConfig; -import org.apache.tika.exception.TikaException; -import org.apache.tika.io.TikaInputStream; -import org.apache.tika.parser.AutoDetectParser; -import org.apache.tika.parser.ParseContext; -import org.apache.tika.parser.Parser; -import org.joda.time.Instant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.xml.sax.SAXException; -import org.xml.sax.helpers.DefaultHandler; - -/** - * Implementation detail of {@link TikaIO.Read}. - * - * <p>A {@link Source} which can represent the content of the files parsed by Apache Tika. - */ -class TikaSource extends BoundedSource<String> { - private static final long serialVersionUID = -509574062910491122L; - private static final Logger LOG = LoggerFactory.getLogger(TikaSource.class); - - @Nullable - private MatchResult.Metadata singleFileMetadata; - private final Mode mode; - private final TikaIO.Read spec; - - /** - * Source mode. - */ - public enum Mode { - FILEPATTERN, SINGLE_FILE - } - - TikaSource(TikaIO.Read spec) { - this.mode = Mode.FILEPATTERN; - this.spec = spec; - } - - TikaSource(Metadata fileMetadata, TikaIO.Read spec) { - mode = Mode.SINGLE_FILE; - this.singleFileMetadata = checkNotNull(fileMetadata, "fileMetadata"); - this.spec = spec; - } - - @Override - public BoundedReader<String> createReader(PipelineOptions options) throws IOException { - this.validate(); - checkState(spec.getFilepattern().isAccessible(), - "Cannot create a Tika reader without access to the file" - + " or pattern specification: {}.", spec.getFilepattern()); - if (spec.getTikaConfigPath() != null) { - checkState(spec.getTikaConfigPath().isAccessible(), - "Cannot create a Tika reader without access to its configuration", - spec.getTikaConfigPath()); - } - - String fileOrPattern = spec.getFilepattern().get(); - if (mode == Mode.FILEPATTERN) { - List<Metadata> fileMetadata = expandFilePattern(fileOrPattern); - List<TikaReader> fileReaders = new ArrayList<>(); - for (Metadata metadata : fileMetadata) { - fileReaders.add(new TikaReader(this, metadata.resourceId().toString())); - } - if (fileReaders.size() == 1) { - return fileReaders.get(0); - } - return new FilePatternTikaReader(this, fileReaders); - } else { - return new TikaReader(this, singleFileMetadata.resourceId().toString()); - } - - } - - @Override - public List<? extends TikaSource> split(long desiredBundleSizeBytes, PipelineOptions options) - throws Exception { - if (mode == Mode.SINGLE_FILE) { - return ImmutableList.of(this); - } else { - List<Metadata> fileMetadata = expandFilePattern(spec.getFilepattern().get()); - - List<TikaSource> splitResults = new LinkedList<>(); - for (Metadata metadata : fileMetadata) { - splitResults.add(new TikaSource(metadata, spec)); - } - return splitResults; - } - } - - public TikaIO.Read getTikaInputRead() { - return spec; - } - - @Override - public Coder<String> getDefaultOutputCoder() { - return StringUtf8Coder.of(); - } - - @Override - public void validate() { - switch (mode) { - case FILEPATTERN: - checkArgument(this.singleFileMetadata == null, - "Unexpected initialized singleFileMetadata value"); - break; - case SINGLE_FILE: - checkNotNull(this.singleFileMetadata, - "Unexpected null singleFileMetadata value"); - break; - default: - throw new IllegalStateException("Unknown mode: " + mode); - } - } - - @Override - public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { - long totalSize = 0; - List<Metadata> fileMetadata = expandFilePattern(spec.getFilepattern().get()); - for (Metadata metadata : fileMetadata) { - totalSize += metadata.sizeBytes(); - } - return totalSize; - } - - Mode getMode() { - return this.mode; - } - - Metadata getSingleFileMetadata() { - return this.singleFileMetadata; - } - - private static List<Metadata> expandFilePattern(String fileOrPattern) throws IOException { - MatchResult matches = Iterables.getOnlyElement( - FileSystems.match(Collections.singletonList(fileOrPattern))); - LOG.info("Matched {} files for pattern {}", matches.metadata().size(), fileOrPattern); - List<Metadata> metadata = ImmutableList.copyOf(matches.metadata()); - checkArgument(!metadata.isEmpty(), - "Unable to find any files matching %s", fileOrPattern); - - return metadata; - } - - /** - * FilePatternTikaReader. - * TODO: This is mostly a copy of FileBasedSource internal file-pattern reader - * so that code would need to be generalized as part of the future contribution - */ - static class FilePatternTikaReader extends BoundedReader<String> { - private final TikaSource source; - final ListIterator<TikaReader> fileReadersIterator; - TikaReader currentReader = null; - - public FilePatternTikaReader(TikaSource source, List<TikaReader> fileReaders) { - this.source = source; - this.fileReadersIterator = fileReaders.listIterator(); - } - - @Override - public boolean start() throws IOException { - return startNextNonemptyReader(); - } - - @Override - public boolean advance() throws IOException { - checkState(currentReader != null, "Call start() before advance()"); - if (currentReader.advance()) { - return true; - } - return startNextNonemptyReader(); - } - - private boolean startNextNonemptyReader() throws IOException { - while (fileReadersIterator.hasNext()) { - currentReader = fileReadersIterator.next(); - if (currentReader.start()) { - return true; - } - currentReader.close(); - } - return false; - } - - @Override - public String getCurrent() throws NoSuchElementException { - return currentReader.getCurrent(); - } - - @Override - public Instant getCurrentTimestamp() throws NoSuchElementException { - return currentReader.getCurrentTimestamp(); - } - - @Override - public void close() throws IOException { - if (currentReader != null) { - currentReader.close(); - } - while (fileReadersIterator.hasNext()) { - fileReadersIterator.next().close(); - } - } - - @Override - public TikaSource getCurrentSource() { - return source; - } - } - - static class TikaReader extends BoundedReader<String> { - private ExecutorService execService; - private final ContentHandlerImpl tikaHandler = new ContentHandlerImpl(); - private String current; - private TikaSource source; - private String filePath; - private TikaIO.Read spec; - private org.apache.tika.metadata.Metadata tikaMetadata; - private Iterator<String> metadataIterator; - - TikaReader(TikaSource source, String filePath) { - this.source = source; - this.filePath = filePath; - this.spec = source.getTikaInputRead(); - } - - @Override - public boolean start() throws IOException { - final InputStream is = TikaInputStream.get(Paths.get(filePath)); - TikaConfig tikaConfig = null; - if (spec.getTikaConfigPath() != null) { - try { - tikaConfig = new TikaConfig(spec.getTikaConfigPath().get()); - } catch (TikaException | SAXException e) { - throw new IOException(e); - } - } - final Parser parser = tikaConfig == null ? new AutoDetectParser() - : new AutoDetectParser(tikaConfig); - final ParseContext context = new ParseContext(); - context.set(Parser.class, parser); - tikaMetadata = spec.getInputMetadata() != null ? spec.getInputMetadata() - : new org.apache.tika.metadata.Metadata(); - - if (spec.getMinimumTextLength() != null) { - tikaHandler.setMinTextLength(spec.getMinimumTextLength()); - } - - if (!Boolean.TRUE.equals(spec.getParseSynchronously())) { - // Try to parse the file on the executor thread to make the best effort - // at letting the pipeline thread advancing over the file content - // without immediately parsing all of it - execService = Executors.newFixedThreadPool(1); - execService.submit(new Runnable() { - public void run() { - try { - parser.parse(is, tikaHandler, tikaMetadata, context); - is.close(); - } catch (Exception ex) { - tikaHandler.setParseException(ex); - } - } - }); - } else { - // Some parsers might not be able to report the content in chunks. - // It does not make sense to create extra threads in such cases - try { - parser.parse(is, tikaHandler, tikaMetadata, context); - } catch (Exception ex) { - throw new IOException(ex); - } finally { - is.close(); - } - } - return advanceToNext(); - } - - @Override - public boolean advance() throws IOException { - checkState(current != null, "Call start() before advance()"); - return advanceToNext(); - } - - protected boolean advanceToNext() throws IOException { - current = null; - // The content is reported first - if (metadataIterator == null) { - // Check if some content is already available - current = tikaHandler.getCurrent(); - - if (current == null && !Boolean.TRUE.equals(spec.getParseSynchronously())) { - long maxPollTime = 0; - long configuredMaxPollTime = spec.getQueueMaxPollTime() == null - ? TikaIO.Read.DEFAULT_QUEUE_MAX_POLL_TIME : spec.getQueueMaxPollTime(); - long configuredPollTime = spec.getQueuePollTime() == null - ? TikaIO.Read.DEFAULT_QUEUE_POLL_TIME : spec.getQueuePollTime(); - - // Poll the queue till the next piece of data is available - while (current == null && maxPollTime < configuredMaxPollTime) { - boolean docEnded = tikaHandler.waitForNext(configuredPollTime); - current = tikaHandler.getCurrent(); - // End of Document ? - if (docEnded) { - break; - } - maxPollTime += spec.getQueuePollTime(); - } - } - // No more content ? - if (current == null && Boolean.TRUE.equals(spec.getReadOutputMetadata())) { - // Time to report the metadata - metadataIterator = Arrays.asList(tikaMetadata.names()).iterator(); - } - } - - if (metadataIterator != null && metadataIterator.hasNext()) { - String key = metadataIterator.next(); - // The metadata name/value separator can be configured if needed - current = key + "=" + tikaMetadata.get(key); - } - return current != null; - } - - @Override - public String getCurrent() throws NoSuchElementException { - if (current == null) { - throw new NoSuchElementException(); - } - return current; - } - - @Override - public void close() throws IOException { - if (execService != null) { - execService.shutdown(); - } - } - - ExecutorService getExecutorService() { - return execService; - } - - @Override - public BoundedSource<String> getCurrentSource() { - return source; - } - } - - /** - * Tika Parser Content Handler. - */ - static class ContentHandlerImpl extends DefaultHandler { - private Queue<String> queue = new ConcurrentLinkedQueue<>(); - private volatile boolean documentEnded; - private volatile Exception parseException; - private volatile String current; - private int minTextLength; - - @Override - public void characters(char ch[], int start, int length) throws SAXException { - String value = new String(ch, start, length).trim(); - if (!value.isEmpty()) { - if (minTextLength <= 0) { - queue.add(value); - } else { - current = current == null ? value : current + " " + value; - if (current.length() >= minTextLength) { - queue.add(current); - current = null; - } - } - } - } - - public void setParseException(Exception ex) { - this.parseException = ex; - } - - public synchronized boolean waitForNext(long pollTime) throws IOException { - if (!documentEnded) { - try { - wait(pollTime); - } catch (InterruptedException ex) { - // continue; - } - } - return documentEnded; - } - - @Override - public synchronized void endDocument() throws SAXException { - this.documentEnded = true; - notify(); - } - - public String getCurrent() throws IOException { - checkParseException(); - String value = queue.poll(); - if (value == null && documentEnded) { - return current; - } else { - return value; - } - } - public void checkParseException() throws IOException { - if (parseException != null) { - throw new IOException(parseException); - } - } - - public void setMinTextLength(int minTextLength) { - this.minTextLength = minTextLength; - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/ba93dd39/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/ParseResultTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/ParseResultTest.java b/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/ParseResultTest.java new file mode 100644 index 0000000..fd86152 --- /dev/null +++ b/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/ParseResultTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.tika; + +import static org.junit.Assert.assertEquals; +import org.apache.tika.metadata.Metadata; +import org.junit.Test; + +/** + * Tests ParseResult. + */ +public class ParseResultTest { + @Test + public void testEqualsAndHashCode() { + ParseResult p1 = new ParseResult("a.txt", "hello", getMetadata()); + ParseResult p2 = new ParseResult("a.txt", "hello", getMetadata()); + assertEquals(p1, p2); + assertEquals(p1.hashCode(), p2.hashCode()); + } + + static Metadata getMetadata() { + Metadata m = new Metadata(); + m.add("Author", "BeamTikaUser"); + m.add("Author", "BeamTikaUser2"); + m.add("Date", "2017-09-01"); + return m; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba93dd39/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java b/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java index 40ff569..a985b0a 100644 --- a/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java +++ b/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java @@ -18,14 +18,14 @@ package org.apache.beam.sdk.io.tika; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static org.hamcrest.Matchers.isA; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import java.io.IOException; -import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.io.Compression; +import org.apache.beam.sdk.io.FileIO; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.DoFn; @@ -33,231 +33,171 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.PCollection; import org.apache.tika.exception.TikaException; -import org.junit.Ignore; +import org.apache.tika.metadata.Metadata; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; /** * Tests TikaInput. */ public class TikaIOTest { - private static final String[] PDF_FILE = new String[] { - "Combining", "can help to ingest", "Apache Beam", "in most known formats.", - "the content from the files", "and", "Apache Tika" - }; - private static final String[] PDF_ZIP_FILE = new String[] { - "Combining", "can help to ingest", "Apache Beam", "in most known formats.", - "the content from the files", "and", "Apache Tika", - "apache-beam-tika.pdf" - }; - private static final String[] ODT_FILE = new String[] { - "Combining", "can help to ingest", "Apache", "Beam", "in most known formats.", - "the content from the files", "and", "Apache Tika" - }; - private static final String[] ODT_FILE_WITH_METADATA = new String[] { - "Combining", "can help to ingest", "Apache", "Beam", "in most known formats.", - "the content from the files", "and", "Apache Tika", - "Author=BeamTikaUser" - }; - private static final String[] ODT_FILE_WITH_MIN_TEXT_LEN = new String[] { - "Combining Apache Beam", "and Apache Tika can help to ingest", "in most known formats.", - "the content from the files" - }; - private static final String[] ODT_FILES = new String[] { - "Combining", "can help to ingest", "Apache", "Beam", "in most known formats.", - "the content from the files", "and", "Apache Tika", - "Open Office", "Text", "PDF", "Excel", "Scientific", - "and other formats", "are supported." - }; + private static final String PDF_FILE = + "\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n" + + "Combining\n\nApache Beam\n\nand\n\nApache Tika\n\ncan help to ingest\n\n" + + "the content from the files\n\nin most known formats.\n\n\n"; + + private static final String PDF_ZIP_FILE = + "\n\n\n\n\n\n\n\napache-beam-tika.pdf\n\n\nCombining\n\n\nApache Beam\n\n\n" + + "and\n\n\nApache Tika\n\n\ncan help to ingest\n\n\nthe content from the files\n\n\n" + + "in most known formats.\n\n\n\n\n\n\n"; + + private static final String ODT_FILE = + "\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n" + + "Combining\nApache Beam\nand\nApache Tika\ncan help to ingest\nthe content from the" + + " files\nin most known formats.\n"; + + private static final String ODT_FILE2 = + "\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n" + + "Open Office\nPDF\nExcel\nText\nScientific\nand other formats\nare supported.\n"; @Rule public TestPipeline p = TestPipeline.create(); + @Rule + public ExpectedException thrown = ExpectedException.none(); - @Ignore @Test - public void testReadPdfFile() throws IOException { + public void testParsePdfFile() throws IOException { String resourcePath = getClass().getResource("/apache-beam-tika.pdf").getPath(); - doTestReadFiles(resourcePath, PDF_FILE); + doTestParse(resourcePath, new ParseResult(resourcePath, PDF_FILE)); } - @Test - public void testReadZipPdfFile() throws IOException { - - String resourcePath = getClass().getResource("/apache-beam-tika-pdf.zip").getPath(); - - doTestReadFiles(resourcePath, PDF_ZIP_FILE); + private void doTestParse(String resourcePath, ParseResult... expectedResults) + throws IOException { + PCollection<ParseResult> output = + p.apply("ParseAll", TikaIO.parse().filepattern(resourcePath)) + .apply(ParDo.of(new FilterMetadataFn())); + PAssert.that(output).containsInAnyOrder(expectedResults); + p.run(); } @Test - public void testReadOdtFile() throws IOException { + public void testParseAllPdfFile() throws IOException { - String resourcePath = getClass().getResource("/apache-beam-tika1.odt").getPath(); + String resourcePath = getClass().getResource("/apache-beam-tika.pdf").getPath(); - doTestReadFiles(resourcePath, ODT_FILE); + doTestParseAll(resourcePath, new ParseResult(resourcePath, PDF_FILE)); } @Test - public void testReadOdtFiles() throws IOException { - String resourcePath = getClass().getResource("/apache-beam-tika1.odt").getPath(); - resourcePath = resourcePath.replace("apache-beam-tika1", "*"); + public void testParseAllZipPdfFile() throws IOException { - doTestReadFiles(resourcePath, ODT_FILES); - } + String resourcePath = getClass().getResource("/apache-beam-tika-pdf.zip").getPath(); - private void doTestReadFiles(String resourcePath, String[] expected) throws IOException { - PCollection<String> output = p.apply("ParseFiles", TikaIO.read().from(resourcePath)); - PAssert.that(output).containsInAnyOrder(expected); - p.run(); + doTestParseAll(resourcePath, new ParseResult(resourcePath, PDF_ZIP_FILE)); } @Test - public void testReadOdtFileWithMetadata() throws IOException { + public void testParseAllOdtFile() throws IOException { String resourcePath = getClass().getResource("/apache-beam-tika1.odt").getPath(); - PCollection<String> output = p.apply("ParseOdtFile", - TikaIO.read().from(resourcePath).withReadOutputMetadata(true)) - .apply(ParDo.of(new FilterMetadataFn())); - PAssert.that(output).containsInAnyOrder(ODT_FILE_WITH_METADATA); - p.run(); + doTestParseAll(resourcePath, new ParseResult(resourcePath, ODT_FILE, getOdtMetadata())); } @Test - public void testReadOdtFileWithMinTextLength() throws IOException { + public void testParseAllOdtFiles() throws IOException { + String resourcePath1 = getClass().getResource("/apache-beam-tika1.odt").getPath(); + String resourcePath2 = getClass().getResource("/apache-beam-tika2.odt").getPath(); + String resourcePath = resourcePath1.replace("apache-beam-tika1", "*"); - String resourcePath = getClass().getResource("/apache-beam-tika1.odt").getPath(); - - PCollection<String> output = p.apply("ParseOdtFile", - TikaIO.read().from(resourcePath).withMinimumTextlength(20)); - PAssert.that(output).containsInAnyOrder(ODT_FILE_WITH_MIN_TEXT_LEN); - p.run(); + doTestParseAll(resourcePath, new ParseResult(resourcePath1, ODT_FILE, getOdtMetadata()), + new ParseResult(resourcePath2, ODT_FILE2)); } - @Test - public void testReadPdfFileSync() throws IOException { - - String resourcePath = getClass().getResource("/apache-beam-tika.pdf").getPath(); - - PCollection<String> output = p.apply("ParsePdfFile", - TikaIO.read().from(resourcePath).withParseSynchronously(true)); - PAssert.that(output).containsInAnyOrder(PDF_FILE); + private void doTestParseAll(String resourcePath, ParseResult... expectedResults) + throws IOException { + PCollection<ParseResult> output = + p.apply("ParseFiles", FileIO.match().filepattern(resourcePath)) + .apply(FileIO.readMatches().withCompression(Compression.UNCOMPRESSED)) + .apply(TikaIO.parseAll()) + .apply(ParDo.of(new FilterMetadataFn())); + PAssert.that(output).containsInAnyOrder(expectedResults); p.run(); } @Test - public void testReadDamagedPdfFile() throws IOException { + public void testParseAllDamagedPdfFile() throws IOException { + thrown.expectCause(isA(TikaException.class)); + String resourcePath = getClass().getResource("/damaged.pdf").getPath(); - doTestReadDamagedPdfFile(false); + p.apply("ParseInvalidPdfFile", FileIO.match().filepattern(resourcePath)) + .apply(FileIO.readMatches()) + .apply(TikaIO.parseAll()); + p.run(); } @Test - public void testReadDamagedPdfFileSync() throws IOException { - doTestReadDamagedPdfFile(true); - } + public void testParseDisplayData() { + TikaIO.Parse parse = TikaIO.parse().filepattern("file.pdf"); - private void doTestReadDamagedPdfFile(boolean sync) throws IOException { + DisplayData displayData = DisplayData.from(parse); - String resourcePath = getClass().getResource("/damaged.pdf").getPath(); - - p.apply("ParseInvalidPdfFile", - TikaIO.read().from(resourcePath).withParseSynchronously(sync)); - try { - p.run(); - fail("Transform failure is expected"); - } catch (RuntimeException ex) { - assertTrue(ex.getCause().getCause() instanceof TikaException); - } + assertThat(displayData, hasDisplayItem("filePattern", "file.pdf")); + assertEquals(1, displayData.items().size()); } @Test - public void testReadDisplayData() { - TikaIO.Read read = TikaIO.read() - .from("foo.*") + public void testParseAllDisplayData() { + TikaIO.ParseAll parseAll = TikaIO.parseAll() .withTikaConfigPath("tikaconfigpath") - .withContentTypeHint("application/pdf") - .withMinimumTextlength(100) - .withReadOutputMetadata(true); + .withContentTypeHint("application/pdf"); - DisplayData displayData = DisplayData.from(read); + DisplayData displayData = DisplayData.from(parseAll); - assertThat(displayData, hasDisplayItem("filePattern", "foo.*")); assertThat(displayData, hasDisplayItem("tikaConfigPath", "tikaconfigpath")); assertThat(displayData, hasDisplayItem("inputMetadata", - "[Content-Type=application/pdf]")); - assertThat(displayData, hasDisplayItem("readOutputMetadata", "true")); - assertThat(displayData, hasDisplayItem("parseMode", "asynchronous")); - assertThat(displayData, hasDisplayItem("queuePollTime", "50")); - assertThat(displayData, hasDisplayItem("queueMaxPollTime", "3000")); - assertThat(displayData, hasDisplayItem("minTextLen", "100")); - assertEquals(8, displayData.items().size()); - } - - @Test - public void testReadDisplayDataSyncMode() { - TikaIO.Read read = TikaIO.read() - .from("foo.*") - .withParseSynchronously(true); - - DisplayData displayData = DisplayData.from(read); - - assertThat(displayData, hasDisplayItem("filePattern", "foo.*")); - assertThat(displayData, hasDisplayItem("parseMode", "synchronous")); + "Content-Type=application/pdf")); assertEquals(2, displayData.items().size()); } @Test - public void testReadDisplayDataWithDefaultOptions() { - final String[] args = new String[]{"--input=/input/tika.pdf"}; - TikaIO.Read read = TikaIO.read().withOptions(createOptions(args)); + public void testParseAllDisplayDataWithCustomOptions() { + TikaIO.ParseAll parseAll = TikaIO.parseAll() + .withTikaConfigPath("/tikaConfigPath") + .withContentTypeHint("application/pdf"); - DisplayData displayData = DisplayData.from(read); + DisplayData displayData = DisplayData.from(parseAll); - assertThat(displayData, hasDisplayItem("filePattern", "/input/tika.pdf")); - assertThat(displayData, hasDisplayItem("parseMode", "asynchronous")); - assertThat(displayData, hasDisplayItem("queuePollTime", "50")); - assertThat(displayData, hasDisplayItem("queueMaxPollTime", "3000")); - assertEquals(4, displayData.items().size()); - } - @Test - public void testReadDisplayDataWithCustomOptions() { - final String[] args = new String[]{"--input=/input/tika.pdf", - "--tikaConfigPath=/tikaConfigPath", - "--queuePollTime=10", - "--queueMaxPollTime=1000", - "--contentTypeHint=application/pdf", - "--readOutputMetadata=true"}; - TikaIO.Read read = TikaIO.read().withOptions(createOptions(args)); - - DisplayData displayData = DisplayData.from(read); - - assertThat(displayData, hasDisplayItem("filePattern", "/input/tika.pdf")); assertThat(displayData, hasDisplayItem("tikaConfigPath", "/tikaConfigPath")); - assertThat(displayData, hasDisplayItem("parseMode", "asynchronous")); - assertThat(displayData, hasDisplayItem("queuePollTime", "10")); - assertThat(displayData, hasDisplayItem("queueMaxPollTime", "1000")); assertThat(displayData, hasDisplayItem("inputMetadata", - "[Content-Type=application/pdf]")); - assertThat(displayData, hasDisplayItem("readOutputMetadata", "true")); - assertEquals(7, displayData.items().size()); - } - - private static TikaOptions createOptions(String[] args) { - return PipelineOptionsFactory.fromArgs(args) - .withValidation().as(TikaOptions.class); + "Content-Type=application/pdf")); + assertEquals(2, displayData.items().size()); } - static class FilterMetadataFn extends DoFn<String, String> { + static class FilterMetadataFn extends DoFn<ParseResult, ParseResult> { private static final long serialVersionUID = 6338014219600516621L; @ProcessElement public void processElement(ProcessContext c) { - String word = c.element(); - if (word.contains("=") && !word.startsWith("Author")) { - return; + ParseResult result = c.element(); + Metadata m = new Metadata(); + // Files contain many metadata properties. This function drops all but the "Author" + // property manually added to "apache-beam-tika1.odt" resource only to make + // the tests simpler + if (result.getFileLocation().endsWith("apache-beam-tika1.odt")) { + m.set("Author", result.getMetadata().get("Author")); } - c.output(word); + ParseResult newResult = new ParseResult(result.getFileLocation(), result.getContent(), m); + c.output(newResult); } } + + static Metadata getOdtMetadata() { + Metadata m = new Metadata(); + m.set("Author", "BeamTikaUser"); + return m; + } } http://git-wip-us.apache.org/repos/asf/beam/blob/ba93dd39/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaReaderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaReaderTest.java b/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaReaderTest.java deleted file mode 100644 index 5c4e754..0000000 --- a/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaReaderTest.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.tika; - -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -import java.util.Arrays; -import java.util.LinkedList; -import java.util.List; - -import org.apache.beam.sdk.io.tika.TikaSource.TikaReader; -import org.junit.Test; - -/** - * Tests TikaReader. - */ -public class TikaReaderTest { - private static final List<String> ODT_FILE = Arrays.asList( - "Combining", "can help to ingest", "Apache", "Beam", "in most known formats.", - "the content from the files", "and", "Apache Tika"); - - @Test - public void testOdtFileAsyncReader() throws Exception { - doTestOdtFileReader(false); - } - @Test - public void testOdtFileSyncReader() throws Exception { - doTestOdtFileReader(true); - } - private void doTestOdtFileReader(boolean sync) throws Exception { - String resourcePath = getClass().getResource("/apache-beam-tika1.odt").getPath(); - TikaSource source = new TikaSource(TikaIO.read() - .withParseSynchronously(sync) - .from(resourcePath)); - TikaReader reader = (TikaReader) source.createReader(null); - - List<String> content = new LinkedList<String>(); - for (boolean available = reader.start(); available; available = reader.advance()) { - content.add(reader.getCurrent()); - } - assertTrue(content.containsAll(ODT_FILE)); - if (!sync) { - assertNotNull(reader.getExecutorService()); - } else { - assertNull(reader.getExecutorService()); - } - reader.close(); - } - - @Test - public void testOdtFilesReader() throws Exception { - String resourcePath = getClass().getResource("/apache-beam-tika1.odt").getPath(); - String filePattern = resourcePath.replace("apache-beam-tika1", "*"); - - TikaSource source = new TikaSource(TikaIO.read().from(filePattern)); - TikaSource.FilePatternTikaReader reader = - (TikaSource.FilePatternTikaReader) source.createReader(null); - List<String> content = new LinkedList<String>(); - for (boolean available = reader.start(); available; available = reader.advance()) { - content.add(reader.getCurrent()); - } - assertTrue(content.containsAll(ODT_FILE)); - reader.close(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/ba93dd39/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaSourceTest.java b/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaSourceTest.java deleted file mode 100644 index 550f469..0000000 --- a/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaSourceTest.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.tika; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.util.List; - -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.io.tika.TikaSource.TikaReader; -import org.junit.Test; - -/** - * Tests TikaSource. - */ -public class TikaSourceTest { - - @Test - public void testOdtFileSource() throws Exception { - String resourcePath = getClass().getResource("/apache-beam-tika1.odt").getPath(); - TikaSource source = new TikaSource(TikaIO.read().from(resourcePath)); - assertEquals(StringUtf8Coder.of(), source.getDefaultOutputCoder()); - - assertEquals(TikaSource.Mode.FILEPATTERN, source.getMode()); - assertTrue(source.createReader(null) instanceof TikaReader); - - List<? extends TikaSource> sources = source.split(1, null); - assertEquals(1, sources.size()); - TikaSource nextSource = sources.get(0); - assertEquals(TikaSource.Mode.SINGLE_FILE, nextSource.getMode()); - assertEquals(resourcePath, nextSource.getSingleFileMetadata().resourceId().toString()); - } - - @Test - public void testOdtFilesSource() throws Exception { - String resourcePath = getClass().getResource("/apache-beam-tika1.odt").getPath(); - String resourcePath2 = getClass().getResource("/apache-beam-tika2.odt").getPath(); - String filePattern = resourcePath.replace("apache-beam-tika1", "*"); - - TikaSource source = new TikaSource(TikaIO.read().from(filePattern)); - assertEquals(StringUtf8Coder.of(), source.getDefaultOutputCoder()); - - assertEquals(TikaSource.Mode.FILEPATTERN, source.getMode()); - assertTrue(source.createReader(null) instanceof TikaSource.FilePatternTikaReader); - - List<? extends TikaSource> sources = source.split(1, null); - assertEquals(2, sources.size()); - TikaSource nextSource = sources.get(0); - assertEquals(TikaSource.Mode.SINGLE_FILE, nextSource.getMode()); - String nextSourceResource = nextSource.getSingleFileMetadata().resourceId().toString(); - TikaSource nextSource2 = sources.get(1); - assertEquals(TikaSource.Mode.SINGLE_FILE, nextSource2.getMode()); - String nextSourceResource2 = nextSource2.getSingleFileMetadata().resourceId().toString(); - assertTrue(nextSourceResource.equals(resourcePath) && nextSourceResource2.equals(resourcePath2) - || nextSourceResource.equals(resourcePath2) && nextSourceResource2.equals(resourcePath)); - } -}