Repository: beam
Updated Branches:
  refs/heads/master e01c78da7 -> 0c2211375


Many improvements to TikaIO

This addresses most of the comments in #3378.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ba93dd39
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ba93dd39
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ba93dd39

Branch: refs/heads/master
Commit: ba93dd39111ab2b13f811d0abeb76a49a4a4f035
Parents: e01c78d
Author: Sergey Beryozkin <sberyoz...@gmail.com>
Authored: Mon Sep 11 16:11:10 2017 +0100
Committer: Eugene Kirpichov <kirpic...@google.com>
Committed: Thu Oct 26 12:45:19 2017 -0700

----------------------------------------------------------------------
 sdks/java/io/tika/pom.xml                       |  10 -
 .../apache/beam/sdk/io/tika/ParseResult.java    |  98 ++++
 .../org/apache/beam/sdk/io/tika/TikaIO.java     | 334 ++++++-------
 .../apache/beam/sdk/io/tika/TikaOptions.java    |  78 ----
 .../org/apache/beam/sdk/io/tika/TikaSource.java | 466 -------------------
 .../beam/sdk/io/tika/ParseResultTest.java       |  43 ++
 .../org/apache/beam/sdk/io/tika/TikaIOTest.java | 252 ++++------
 .../apache/beam/sdk/io/tika/TikaReaderTest.java |  82 ----
 .../apache/beam/sdk/io/tika/TikaSourceTest.java |  73 ---
 9 files changed, 392 insertions(+), 1044 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ba93dd39/sdks/java/io/tika/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/tika/pom.xml b/sdks/java/io/tika/pom.xml
index b8f7ece..d7f7e42 100644
--- a/sdks/java/io/tika/pom.xml
+++ b/sdks/java/io/tika/pom.xml
@@ -54,16 +54,6 @@
         </dependency>
 
         <dependency>
-            <groupId>joda-time</groupId>
-            <artifactId>joda-time</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-api</artifactId>
-        </dependency>
-
-        <dependency>
             <groupId>org.apache.tika</groupId>
             <artifactId>tika-core</artifactId>
             <version>${tika.version}</version>

http://git-wip-us.apache.org/repos/asf/beam/blob/ba93dd39/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/ParseResult.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/ParseResult.java 
b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/ParseResult.java
new file mode 100644
index 0000000..0a77491
--- /dev/null
+++ 
b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/ParseResult.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.tika;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+import org.apache.tika.metadata.Metadata;
+
+/**
+ * Tika parse result containing the file location, metadata
+ * and content converted to String.
+ */
+@SuppressWarnings("serial")
+public class ParseResult implements Serializable {
+  private final String fileLocation;
+  private final String content;
+  private final Metadata metadata;
+  private final String[] metadataNames;
+
+  public ParseResult(String fileLocation, String content) {
+    this(fileLocation, content, new Metadata());
+  }
+
+  public ParseResult(String fileLocation, String content, Metadata metadata) {
+    this.fileLocation = fileLocation;
+    this.content = content;
+    this.metadata = metadata;
+    this.metadataNames = metadata.names();
+  }
+
+  /**
+   * Gets a file content.
+   */
+  public String getContent() {
+    return content;
+  }
+
+  /**
+   * Gets a file metadata.
+   */
+  public Metadata getMetadata() {
+    return metadata;
+  }
+
+  /**
+   * Gets a file location.
+   */
+  public String getFileLocation() {
+    return fileLocation;
+  }
+
+  @Override
+  public int hashCode() {
+    int hashCode = 1;
+    hashCode = 31 * hashCode + fileLocation.hashCode();
+    hashCode = 31 * hashCode + content.hashCode();
+    hashCode = 31 * hashCode + getMetadataHashCode();
+    return hashCode;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (!(obj instanceof ParseResult)) {
+      return false;
+    }
+
+    ParseResult pr = (ParseResult) obj;
+    return this.fileLocation.equals(pr.fileLocation)
+      && this.content.equals(pr.content)
+      && this.metadata.equals(pr.metadata);
+  }
+
+  //TODO:
+  // Remove this function and use metadata.hashCode() once Apache Tika 1.17 
gets released.
+  private int getMetadataHashCode() {
+    int hashCode = 0;
+    for (String name : metadataNames) {
+      hashCode += name.hashCode() ^ Arrays.hashCode(metadata.getValues(name));
+    }
+    return hashCode;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba93dd39/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java 
b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java
index 4876dcf..32353e1 100644
--- a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java
+++ b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java
@@ -18,40 +18,72 @@
 package org.apache.beam.sdk.io.tika;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+
 import com.google.auto.value.AutoValue;
 
+import java.io.InputStream;
+import java.nio.channels.Channels;
+
 import javax.annotation.Nullable;
+
 import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.Read.Bounded;
+import org.apache.beam.sdk.io.Compression;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.FileIO.ReadableFile;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.tika.config.TikaConfig;
+import org.apache.tika.io.TikaInputStream;
 import org.apache.tika.metadata.Metadata;
+import org.apache.tika.parser.AutoDetectParser;
+import org.apache.tika.parser.ParseContext;
+import org.apache.tika.parser.Parser;
+import org.apache.tika.sax.ToTextContentHandler;
+import org.xml.sax.ContentHandler;
+
+
 
 
 /**
- * {@link PTransform} for parsing arbitrary files using Apache Tika.
+ * A collection of {@link PTransform} transforms for parsing arbitrary files 
using Apache Tika.
  * Files in many well known text, binary or scientific formats can be 
processed.
  *
- * <p>To read a {@link PCollection} from one or more files
- * use {@link TikaIO.Read#from(String)}
- * to specify the path of the file(s) to be read.
+ * <p>{@link TikaIO.Parse} and {@link TikaIO.ParseAll} parse the files and 
return
+ * a {@link PCollection} containing one {@link ParseResult} per each file.
+ *
+ * <p>Combine {@link TikaIO.ParseAll} with {@link FileIO.Match}
+ * and {@link FileIO.ReadMatches} to match, read and parse the files.
+ *
+ * <p>Example:
  *
- * <p>{@link TikaIO.Read} returns a bounded {@link PCollection} of {@link 
String Strings},
- * each corresponding to a sequence of characters reported by Apache Tika SAX 
Parser.
+ * <pre>{@code
+ * Pipeline p = ...;
+ *
+ * // A simple parse of a local PDF file (only runs locally):
+ * PCollection<ParseResult> results =
+ *   p.apply(FileIO.match().filepattern("/local/path/to/file.pdf"))
+ *    .apply(FileIO.readMatches())
+ *    .apply(TikaIO.parseFiles());
+ * }</pre>
+ *
+ * <p>Use {@link TikaIO.Parse} to match, read and parse the files in simple 
cases.
  *
  * <p>Example:
  *
  * <pre>{@code
  * Pipeline p = ...;
  *
- * // A simple Read of a local PDF file (only runs locally):
- * PCollection<String> content = 
p.apply(TikaInput.from("/local/path/to/file.pdf"));
+ * // A simple parse of a local PDF file (only runs locally):
+ * PCollection<ParseResult> results =
+ *   p.apply(TikaIO.parseAll().filepattern("/local/path/to/file.pdf"));
  * }</pre>
  *
  * <b>Warning:</b> the API of this IO is likely to change in the next release.
@@ -60,85 +92,96 @@ import org.apache.tika.metadata.Metadata;
 public class TikaIO {
 
   /**
-   * A {@link PTransform} that parses one or more files and returns a bounded 
{@link PCollection}
-   * containing one element for each sequence of characters reported by Apache 
Tika SAX Parser.
+   * A {@link PTransform} that matches and parses the files
+   * and returns a bounded {@link PCollection} of {@link ParseResult}.
    */
-   public static Read read() {
-     return new AutoValue_TikaIO_Read.Builder()
-        .setQueuePollTime(Read.DEFAULT_QUEUE_POLL_TIME)
-        .setQueueMaxPollTime(Read.DEFAULT_QUEUE_MAX_POLL_TIME)
+  public static Parse parse() {
+    return new AutoValue_TikaIO_Parse.Builder()
         .build();
-   }
+  }
 
-   /** Implementation of {@link #read}. */
-  @AutoValue
-  public abstract static class Read extends PTransform<PBegin, 
PCollection<String>> {
-    private static final long serialVersionUID = 2198301984784351829L;
-    public static final long DEFAULT_QUEUE_POLL_TIME = 50L;
-    public static final long DEFAULT_QUEUE_MAX_POLL_TIME = 3000L;
+  /**
+   * A {@link PTransform} that accepts a {@link PCollection} of {@link 
ReadableFile}
+   * and returns a {@link PCollection} of {@link ParseResult}.
+   */
+  public static ParseAll parseAll() {
+    return new AutoValue_TikaIO_ParseAll.Builder()
+        .build();
+  }
 
-    @Nullable abstract ValueProvider<String> getFilepattern();
-    @Nullable abstract ValueProvider<String> getTikaConfigPath();
-    @Nullable abstract Metadata getInputMetadata();
-    @Nullable abstract Boolean getReadOutputMetadata();
-    @Nullable abstract Long getQueuePollTime();
-    @Nullable abstract Long getQueueMaxPollTime();
-    @Nullable abstract Integer getMinimumTextLength();
-    @Nullable abstract Boolean getParseSynchronously();
+  /** Implementation of {@link #parse}. */
+  @SuppressWarnings("serial")
+  @AutoValue
+  public abstract static class Parse extends PTransform<PBegin, 
PCollection<ParseResult>> {
+    @Nullable
+    abstract ValueProvider<String> getFilepattern();
 
     abstract Builder toBuilder();
 
     @AutoValue.Builder
     abstract static class Builder {
       abstract Builder setFilepattern(ValueProvider<String> filepattern);
-      abstract Builder setTikaConfigPath(ValueProvider<String> tikaConfigPath);
-      abstract Builder setInputMetadata(Metadata metadata);
-      abstract Builder setReadOutputMetadata(Boolean value);
-      abstract Builder setQueuePollTime(Long value);
-      abstract Builder setQueueMaxPollTime(Long value);
-      abstract Builder setMinimumTextLength(Integer value);
-      abstract Builder setParseSynchronously(Boolean value);
 
-      abstract Read build();
+      abstract Parse build();
     }
 
-    /**
-     * A {@link PTransform} that parses one or more files with the given 
filename
-     * or filename pattern and returns a bounded {@link PCollection} containing
-     * one element for each sequence of characters reported by Apache Tika SAX 
Parser.
-     *
-     * <p>Filepattern can be a local path (if running locally), or a Google 
Cloud Storage
-     * filename or filename pattern of the form {@code 
"gs://<bucket>/<filepath>"}
-     * (if running locally or using remote execution service).
-     *
-     * <p>Standard <a 
href="http://docs.oracle.com/javase/tutorial/essential/io/find.html"; >Java
-     * Filesystem glob patterns</a> ("*", "?", "[..]") are supported.
-     */
-    public Read from(String filepattern) {
-      checkNotNull(filepattern, "Filepattern cannot be empty.");
-      return from(StaticValueProvider.of(filepattern));
+    /** Matches the given filepattern. */
+    public Parse filepattern(String filepattern) {
+      return 
this.filepattern(ValueProvider.StaticValueProvider.of(filepattern));
     }
 
-    /** Same as {@code from(filepattern)}, but accepting a {@link 
ValueProvider}. */
-    public Read from(ValueProvider<String> filepattern) {
-      checkNotNull(filepattern, "Filepattern cannot be empty.");
-      return toBuilder()
-          .setFilepattern(filepattern)
-          .setQueuePollTime(Read.DEFAULT_QUEUE_POLL_TIME)
-          .setQueueMaxPollTime(Read.DEFAULT_QUEUE_MAX_POLL_TIME)
-          .build();
+    /** Like {@link #filepattern(String)} but using a {@link ValueProvider}. */
+    public Parse filepattern(ValueProvider<String> filepattern) {
+      return toBuilder().setFilepattern(filepattern).build();
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+
+      builder
+        .addIfNotNull(
+          DisplayData.item("filePattern", getFilepattern()).withLabel("File 
Pattern"));
+    }
+
+    @Override
+    public PCollection<ParseResult> expand(PBegin input) {
+      return input
+          .apply(FileIO.match().filepattern(getFilepattern()))
+          
.apply(FileIO.readMatches().withCompression(Compression.UNCOMPRESSED))
+          .apply(parseAll());
+    }
+  }
+
+  /** Implementation of {@link #parseAll}. */
+  @SuppressWarnings("serial")
+  @AutoValue
+  public abstract static class ParseAll extends
+    PTransform<PCollection<ReadableFile>, PCollection<ParseResult>> {
+
+    @Nullable abstract ValueProvider<String> getTikaConfigPath();
+    @Nullable abstract Metadata getInputMetadata();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setTikaConfigPath(ValueProvider<String> tikaConfigPath);
+      abstract Builder setInputMetadata(Metadata metadata);
+
+      abstract ParseAll build();
     }
 
     /**
      * Returns a new transform which will use the custom TikaConfig.
      */
-    public Read withTikaConfigPath(String tikaConfigPath) {
+    public ParseAll withTikaConfigPath(String tikaConfigPath) {
       checkNotNull(tikaConfigPath, "TikaConfigPath cannot be empty.");
       return withTikaConfigPath(StaticValueProvider.of(tikaConfigPath));
     }
 
     /** Same as {@code with(tikaConfigPath)}, but accepting a {@link 
ValueProvider}. */
-    public Read withTikaConfigPath(ValueProvider<String> tikaConfigPath) {
+    public ParseAll withTikaConfigPath(ValueProvider<String> tikaConfigPath) {
       checkNotNull(tikaConfigPath, "TikaConfigPath cannot be empty.");
       return toBuilder()
           .setTikaConfigPath(tikaConfigPath)
@@ -149,7 +192,7 @@ public class TikaIO {
      * Returns a new transform which will use the provided content type hint
      * to make the file parser detection more efficient.
      */
-    public Read withContentTypeHint(String contentType) {
+    public ParseAll withContentTypeHint(String contentType) {
       checkNotNull(contentType, "ContentType cannot be empty.");
       Metadata metadata = new Metadata();
       metadata.add(Metadata.CONTENT_TYPE, contentType);
@@ -160,7 +203,7 @@ public class TikaIO {
      * Returns a new transform which will use the provided input metadata
      * for parsing the files.
      */
-    public Read withInputMetadata(Metadata metadata) {
+    public ParseAll withInputMetadata(Metadata metadata) {
       Metadata inputMetadata = this.getInputMetadata();
       if (inputMetadata != null) {
         for (String name : metadata.names()) {
@@ -172,88 +215,15 @@ public class TikaIO {
       return toBuilder().setInputMetadata(inputMetadata).build();
     }
 
-    /**
-     * Returns a new transform which will report the metadata.
-     */
-    public Read withReadOutputMetadata(Boolean value) {
-      return toBuilder().setReadOutputMetadata(value).build();
-    }
-
-    /**
-     * Returns a new transform which will use the specified queue poll time.
-     */
-    public Read withQueuePollTime(Long value) {
-      return toBuilder().setQueuePollTime(value).build();
-    }
-
-    /**
-     * Returns a new transform which will use the specified queue max poll 
time.
-     */
-    public Read withQueueMaxPollTime(Long value) {
-      return toBuilder().setQueueMaxPollTime(value).build();
-    }
-
-    /**
-     * Returns a new transform which will operate on the text blocks with the
-     * given minimum text length.
-     */
-    public Read withMinimumTextlength(Integer value) {
-      return toBuilder().setMinimumTextLength(value).build();
-    }
-
-    /**
-     * Returns a new transform which will use the synchronous reader.
-     */
-    public Read withParseSynchronously(Boolean value) {
-      return toBuilder().setParseSynchronously(value).build();
-    }
-
-    /**
-     * Path to Tika configuration resource.
-     */
-    public Read withOptions(TikaOptions options) {
-      checkNotNull(options, "TikaOptions cannot be empty.");
-      Builder builder = toBuilder();
-      builder.setFilepattern(StaticValueProvider.of(options.getInput()))
-             .setQueuePollTime(options.getQueuePollTime())
-             .setQueueMaxPollTime(options.getQueueMaxPollTime())
-             .setMinimumTextLength(options.getMinimumTextLength())
-             .setParseSynchronously(options.getParseSynchronously());
-      if (options.getContentTypeHint() != null) {
-        Metadata metadata = this.getInputMetadata();
-        if (metadata == null) {
-            metadata = new Metadata();
-        }
-        metadata.add(Metadata.CONTENT_TYPE, options.getContentTypeHint());
-        builder.setInputMetadata(metadata);
-      }
-      if (options.getTikaConfigPath() != null) {
-        
builder.setTikaConfigPath(StaticValueProvider.of(options.getTikaConfigPath()));
-      }
-      if (Boolean.TRUE.equals(options.getReadOutputMetadata())) {
-        builder.setReadOutputMetadata(options.getReadOutputMetadata());
-      }
-      return builder.build();
-    }
-
     @Override
-    public PCollection<String> expand(PBegin input) {
-      checkNotNull(this.getFilepattern(), "Filepattern cannot be empty.");
-      final Bounded<String> read = org.apache.beam.sdk.io.Read.from(new 
TikaSource(this));
-      PCollection<String> pcol = input.getPipeline().apply(read);
-      pcol.setCoder(getDefaultOutputCoder());
-      return pcol;
+    public PCollection<ParseResult> expand(PCollection<ReadableFile> input) {
+      return input.apply(ParDo.of(new ParseToStringFn(this)));
     }
 
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
 
-      String filepatternDisplay = getFilepattern().isAccessible()
-        ? getFilepattern().get() : getFilepattern().toString();
-      builder
-          .addIfNotNull(DisplayData.item("filePattern", filepatternDisplay)
-            .withLabel("File Pattern"));
       if (getTikaConfigPath() != null) {
         String tikaConfigPathDisplay = getTikaConfigPath().isAccessible()
           ? getTikaConfigPath().get() : getTikaConfigPath().toString();
@@ -262,49 +232,55 @@ public class TikaIO {
       }
       Metadata metadata = getInputMetadata();
       if (metadata != null) {
-        StringBuilder sb = new StringBuilder();
-        sb.append('[');
-        for (String name : metadata.names()) {
-            if (sb.length() > 1) {
-              sb.append(',');
-            }
-            sb.append(name).append('=').append(metadata.get(name));
-        }
-        sb.append(']');
+        //TODO: use metadata.toString() only without a trim() once Apache Tika 
1.17 gets released
         builder
-            .add(DisplayData.item("inputMetadata", sb.toString())
+            .add(DisplayData.item("inputMetadata", metadata.toString().trim())
             .withLabel("Input Metadata"));
       }
-      if (Boolean.TRUE.equals(getParseSynchronously())) {
-        builder
-          .add(DisplayData.item("parseMode", "synchronous")
-            .withLabel("Parse Mode"));
-      } else {
-        builder
-          .add(DisplayData.item("parseMode", "asynchronous")
-            .withLabel("Parse Mode"));
-        builder
-          .add(DisplayData.item("queuePollTime", getQueuePollTime().toString())
-            .withLabel("Queue Poll Time"))
-        .add(DisplayData.item("queueMaxPollTime", 
getQueueMaxPollTime().toString())
-          .withLabel("Queue Max Poll Time"));
-      }
-      Integer minTextLen = getMinimumTextLength();
-      if (minTextLen != null && minTextLen > 0) {
-        builder
-        .add(DisplayData.item("minTextLen", getMinimumTextLength().toString())
-          .withLabel("Minimum Text Length"));
+    }
+
+    private static class ParseToStringFn extends DoFn<ReadableFile, 
ParseResult> {
+
+      private static final long serialVersionUID = 6837207505313720989L;
+      private final TikaIO.ParseAll spec;
+      private TikaConfig tikaConfig;
+
+      ParseToStringFn(TikaIO.ParseAll spec) {
+        this.spec = spec;
       }
-      if (Boolean.TRUE.equals(getReadOutputMetadata())) {
-        builder
-          .add(DisplayData.item("readOutputMetadata", "true")
-            .withLabel("Read Output Metadata"));
+
+      @Setup
+      public void setup() throws Exception {
+        if (spec.getTikaConfigPath() != null) {
+          ResourceId configResource =
+              
FileSystems.matchSingleFileSpec(spec.getTikaConfigPath().get()).resourceId();
+          tikaConfig = new TikaConfig(
+                           
Channels.newInputStream(FileSystems.open(configResource)));
+        }
       }
-    }
 
-    @Override
-    protected Coder<String> getDefaultOutputCoder() {
-      return StringUtf8Coder.of();
+      @ProcessElement
+      public void processElement(ProcessContext c) throws Exception {
+        ReadableFile file = c.element();
+        InputStream stream = Channels.newInputStream(file.open());
+        try (InputStream tikaStream = TikaInputStream.get(stream)) {
+
+          final Parser parser = tikaConfig == null
+              ? new AutoDetectParser() : new AutoDetectParser(tikaConfig);
+
+          final ParseContext context = new ParseContext();
+          context.set(Parser.class, parser);
+          Metadata tikaMetadata = spec.getInputMetadata() != null
+            ? spec.getInputMetadata() : new 
org.apache.tika.metadata.Metadata();
+
+          ContentHandler tikaHandler = new ToTextContentHandler();
+          parser.parse(tikaStream, tikaHandler, tikaMetadata, context);
+
+          c.output(new ParseResult(file.getMetadata().resourceId().toString(),
+              tikaHandler.toString(),
+              tikaMetadata));
+        }
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/ba93dd39/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaOptions.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaOptions.java 
b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaOptions.java
deleted file mode 100644
index fb97678..0000000
--- 
a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaOptions.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.tika;
-
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.Validation;
-
-/**
- * TikaInput Options to support the command-line applications.
- */
-public interface TikaOptions extends PipelineOptions {
-
-  @Description("Input path")
-  @Validation.Required
-  String getInput();
-  void setInput(String value);
-
-  @Description("Tika Config path")
-  String getTikaConfigPath();
-  void setTikaConfigPath(String value);
-
-  @Description("Tika Parser Content Type hint")
-  String getContentTypeHint();
-  void setContentTypeHint(String value);
-
-  @Description("Metadata report status")
-  @Default.Boolean(false)
-  Boolean getReadOutputMetadata();
-  void setReadOutputMetadata(Boolean value);
-
-  @Description("Optional use of the synchronous reader")
-  @Default.Boolean(false)
-  Boolean getParseSynchronously();
-  void setParseSynchronously(Boolean value);
-
-  @Description("Tika Parser queue poll time in milliseconds")
-  @Default.Long(TikaIO.Read.DEFAULT_QUEUE_POLL_TIME)
-  Long getQueuePollTime();
-  void setQueuePollTime(Long value);
-
-  @Description("Tika Parser queue maximum poll time in milliseconds")
-  @Default.Long(TikaIO.Read.DEFAULT_QUEUE_MAX_POLL_TIME)
-  Long getQueueMaxPollTime();
-  void setQueueMaxPollTime(Long value);
-
-  @Description("Minumin text fragment length for Tika Parser to report")
-  @Default.Integer(0)
-  Integer getMinimumTextLength();
-  void setMinimumTextLength(Integer value);
-
-  @Description("Pipeline name")
-  @Default.String("TikaRead")
-  String getPipelineName();
-  void setPipelineName(String value);
-
-  @Description("Output path")
-  @Default.String("/tmp/tika/out")
-  String getOutput();
-  void setOutput(String value);
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba93dd39/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaSource.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaSource.java 
b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaSource.java
deleted file mode 100644
index 7c8852b..0000000
--- 
a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaSource.java
+++ /dev/null
@@ -1,466 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.tika;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.NoSuchElementException;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import javax.annotation.Nullable;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.FileSystems;
-import org.apache.beam.sdk.io.Source;
-import org.apache.beam.sdk.io.fs.MatchResult;
-import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.tika.config.TikaConfig;
-import org.apache.tika.exception.TikaException;
-import org.apache.tika.io.TikaInputStream;
-import org.apache.tika.parser.AutoDetectParser;
-import org.apache.tika.parser.ParseContext;
-import org.apache.tika.parser.Parser;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.xml.sax.SAXException;
-import org.xml.sax.helpers.DefaultHandler;
-
-/**
- * Implementation detail of {@link TikaIO.Read}.
- *
- * <p>A {@link Source} which can represent the content of the files parsed by 
Apache Tika.
- */
-class TikaSource extends BoundedSource<String> {
-  private static final long serialVersionUID = -509574062910491122L;
-  private static final Logger LOG = LoggerFactory.getLogger(TikaSource.class);
-
-  @Nullable
-  private MatchResult.Metadata singleFileMetadata;
-  private final Mode mode;
-  private final TikaIO.Read spec;
-
-  /**
-   * Source mode.
-   */
-  public enum Mode {
-    FILEPATTERN, SINGLE_FILE
-  }
-
-  TikaSource(TikaIO.Read spec) {
-    this.mode = Mode.FILEPATTERN;
-    this.spec = spec;
-  }
-
-  TikaSource(Metadata fileMetadata, TikaIO.Read spec) {
-    mode = Mode.SINGLE_FILE;
-    this.singleFileMetadata = checkNotNull(fileMetadata, "fileMetadata");
-    this.spec = spec;
-  }
-
-  @Override
-  public BoundedReader<String> createReader(PipelineOptions options) throws 
IOException {
-    this.validate();
-    checkState(spec.getFilepattern().isAccessible(),
-        "Cannot create a Tika reader without access to the file"
-        + " or pattern specification: {}.", spec.getFilepattern());
-    if (spec.getTikaConfigPath() != null) {
-      checkState(spec.getTikaConfigPath().isAccessible(),
-        "Cannot create a Tika reader without access to its configuration",
-        spec.getTikaConfigPath());
-    }
-
-    String fileOrPattern = spec.getFilepattern().get();
-    if (mode == Mode.FILEPATTERN) {
-      List<Metadata> fileMetadata = expandFilePattern(fileOrPattern);
-      List<TikaReader> fileReaders = new ArrayList<>();
-      for (Metadata metadata : fileMetadata) {
-        fileReaders.add(new TikaReader(this, 
metadata.resourceId().toString()));
-      }
-      if (fileReaders.size() == 1) {
-        return fileReaders.get(0);
-      }
-      return new FilePatternTikaReader(this, fileReaders);
-    } else {
-      return new TikaReader(this, singleFileMetadata.resourceId().toString());
-    }
-
-  }
-
-  @Override
-  public List<? extends TikaSource> split(long desiredBundleSizeBytes, 
PipelineOptions options)
-    throws Exception {
-    if (mode == Mode.SINGLE_FILE) {
-      return ImmutableList.of(this);
-    } else {
-      List<Metadata> fileMetadata = 
expandFilePattern(spec.getFilepattern().get());
-
-      List<TikaSource> splitResults = new LinkedList<>();
-      for (Metadata metadata : fileMetadata) {
-        splitResults.add(new TikaSource(metadata, spec));
-      }
-      return splitResults;
-    }
-  }
-
-  public TikaIO.Read getTikaInputRead() {
-    return spec;
-  }
-
-  @Override
-  public Coder<String> getDefaultOutputCoder() {
-    return StringUtf8Coder.of();
-  }
-
-  @Override
-  public void validate() {
-    switch (mode) {
-    case FILEPATTERN:
-      checkArgument(this.singleFileMetadata == null,
-        "Unexpected initialized singleFileMetadata value");
-      break;
-    case SINGLE_FILE:
-      checkNotNull(this.singleFileMetadata,
-        "Unexpected null singleFileMetadata value");
-      break;
-    default:
-      throw new IllegalStateException("Unknown mode: " + mode);
-    }
-  }
-
-  @Override
-  public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
-    long totalSize = 0;
-    List<Metadata> fileMetadata = 
expandFilePattern(spec.getFilepattern().get());
-    for (Metadata metadata : fileMetadata) {
-      totalSize += metadata.sizeBytes();
-    }
-    return totalSize;
-  }
-
-  Mode getMode() {
-    return this.mode;
-  }
-
-  Metadata getSingleFileMetadata() {
-    return this.singleFileMetadata;
-  }
-
-  private static List<Metadata> expandFilePattern(String fileOrPattern) throws 
IOException {
-    MatchResult matches = Iterables.getOnlyElement(
-      FileSystems.match(Collections.singletonList(fileOrPattern)));
-    LOG.info("Matched {} files for pattern {}", matches.metadata().size(), 
fileOrPattern);
-    List<Metadata> metadata = ImmutableList.copyOf(matches.metadata());
-    checkArgument(!metadata.isEmpty(),
-      "Unable to find any files matching %s", fileOrPattern);
-
-    return metadata;
-  }
-
-  /**
-   *  FilePatternTikaReader.
-   *  TODO: This is mostly a copy of FileBasedSource internal file-pattern 
reader
-   *        so that code would need to be generalized as part of the future 
contribution
-   */
-  static class FilePatternTikaReader extends BoundedReader<String> {
-    private final TikaSource source;
-    final ListIterator<TikaReader> fileReadersIterator;
-    TikaReader currentReader = null;
-
-    public FilePatternTikaReader(TikaSource source, List<TikaReader> 
fileReaders) {
-      this.source = source;
-      this.fileReadersIterator = fileReaders.listIterator();
-    }
-
-    @Override
-    public boolean start() throws IOException {
-      return startNextNonemptyReader();
-    }
-
-    @Override
-    public boolean advance() throws IOException {
-      checkState(currentReader != null, "Call start() before advance()");
-      if (currentReader.advance()) {
-        return true;
-      }
-      return startNextNonemptyReader();
-    }
-
-    private boolean startNextNonemptyReader() throws IOException {
-      while (fileReadersIterator.hasNext()) {
-        currentReader = fileReadersIterator.next();
-        if (currentReader.start()) {
-          return true;
-        }
-        currentReader.close();
-      }
-      return false;
-    }
-
-    @Override
-    public String getCurrent() throws NoSuchElementException {
-      return currentReader.getCurrent();
-    }
-
-    @Override
-    public Instant getCurrentTimestamp() throws NoSuchElementException {
-      return currentReader.getCurrentTimestamp();
-    }
-
-    @Override
-    public void close() throws IOException {
-      if (currentReader != null) {
-        currentReader.close();
-      }
-      while (fileReadersIterator.hasNext()) {
-        fileReadersIterator.next().close();
-      }
-    }
-
-    @Override
-    public TikaSource getCurrentSource() {
-      return source;
-    }
-  }
-
-  static class TikaReader extends BoundedReader<String> {
-    private ExecutorService execService;
-    private final ContentHandlerImpl tikaHandler = new ContentHandlerImpl();
-    private String current;
-    private TikaSource source;
-    private String filePath;
-    private TikaIO.Read spec;
-    private org.apache.tika.metadata.Metadata tikaMetadata;
-    private Iterator<String> metadataIterator;
-
-    TikaReader(TikaSource source, String filePath) {
-      this.source = source;
-      this.filePath = filePath;
-      this.spec = source.getTikaInputRead();
-    }
-
-    @Override
-    public boolean start() throws IOException {
-      final InputStream is = TikaInputStream.get(Paths.get(filePath));
-      TikaConfig tikaConfig = null;
-      if (spec.getTikaConfigPath() != null) {
-        try {
-          tikaConfig = new TikaConfig(spec.getTikaConfigPath().get());
-        } catch (TikaException | SAXException e) {
-          throw new IOException(e);
-        }
-      }
-      final Parser parser = tikaConfig == null ? new AutoDetectParser()
-          : new AutoDetectParser(tikaConfig);
-      final ParseContext context = new ParseContext();
-      context.set(Parser.class, parser);
-      tikaMetadata = spec.getInputMetadata() != null ? spec.getInputMetadata()
-          : new org.apache.tika.metadata.Metadata();
-
-      if (spec.getMinimumTextLength() != null) {
-        tikaHandler.setMinTextLength(spec.getMinimumTextLength());
-      }
-
-      if (!Boolean.TRUE.equals(spec.getParseSynchronously())) {
-        // Try to parse the file on the executor thread to make the best effort
-        // at letting the pipeline thread advancing over the file content
-        // without immediately parsing all of it
-        execService = Executors.newFixedThreadPool(1);
-        execService.submit(new Runnable() {
-          public void run() {
-            try {
-              parser.parse(is, tikaHandler, tikaMetadata, context);
-              is.close();
-            } catch (Exception ex) {
-              tikaHandler.setParseException(ex);
-            }
-          }
-        });
-      } else {
-        // Some parsers might not be able to report the content in chunks.
-        // It does not make sense to create extra threads in such cases
-        try {
-          parser.parse(is, tikaHandler, tikaMetadata, context);
-        } catch (Exception ex) {
-          throw new IOException(ex);
-        } finally {
-          is.close();
-        }
-      }
-      return advanceToNext();
-    }
-
-    @Override
-    public boolean advance() throws IOException {
-      checkState(current != null, "Call start() before advance()");
-      return advanceToNext();
-    }
-
-    protected boolean advanceToNext() throws IOException {
-      current = null;
-      // The content is reported first
-      if (metadataIterator == null) {
-        // Check if some content is already available
-        current = tikaHandler.getCurrent();
-
-        if (current == null && 
!Boolean.TRUE.equals(spec.getParseSynchronously())) {
-          long maxPollTime = 0;
-          long configuredMaxPollTime = spec.getQueueMaxPollTime() == null
-              ? TikaIO.Read.DEFAULT_QUEUE_MAX_POLL_TIME : 
spec.getQueueMaxPollTime();
-          long configuredPollTime = spec.getQueuePollTime() == null
-              ? TikaIO.Read.DEFAULT_QUEUE_POLL_TIME : spec.getQueuePollTime();
-
-          // Poll the queue till the next piece of data is available
-          while (current == null && maxPollTime < configuredMaxPollTime) {
-            boolean docEnded = tikaHandler.waitForNext(configuredPollTime);
-            current = tikaHandler.getCurrent();
-            // End of Document ?
-            if (docEnded) {
-              break;
-            }
-            maxPollTime += spec.getQueuePollTime();
-          }
-        }
-        // No more content ?
-        if (current == null && 
Boolean.TRUE.equals(spec.getReadOutputMetadata())) {
-          // Time to report the metadata
-          metadataIterator = Arrays.asList(tikaMetadata.names()).iterator();
-        }
-      }
-
-      if (metadataIterator != null && metadataIterator.hasNext()) {
-          String key = metadataIterator.next();
-          // The metadata name/value separator can be configured if needed
-          current = key + "=" + tikaMetadata.get(key);
-      }
-      return current != null;
-    }
-
-    @Override
-    public String getCurrent() throws NoSuchElementException {
-      if (current == null) {
-        throw new NoSuchElementException();
-      }
-      return current;
-    }
-
-    @Override
-    public void close() throws IOException {
-      if (execService != null) {
-          execService.shutdown();
-      }
-    }
-
-    ExecutorService getExecutorService() {
-      return execService;
-    }
-
-    @Override
-    public BoundedSource<String> getCurrentSource() {
-      return source;
-    }
-  }
-
-  /**
-   * Tika Parser Content Handler.
-   */
-  static class ContentHandlerImpl extends DefaultHandler {
-    private Queue<String> queue = new ConcurrentLinkedQueue<>();
-    private volatile boolean documentEnded;
-    private volatile Exception parseException;
-    private volatile String current;
-    private int minTextLength;
-
-    @Override
-    public void characters(char ch[], int start, int length) throws 
SAXException {
-      String value = new String(ch, start, length).trim();
-      if (!value.isEmpty()) {
-        if (minTextLength <= 0) {
-          queue.add(value);
-        } else {
-          current = current == null ? value : current + " " + value;
-          if (current.length() >= minTextLength) {
-            queue.add(current);
-            current = null;
-          }
-        }
-      }
-    }
-
-    public void setParseException(Exception ex) {
-      this.parseException = ex;
-    }
-
-    public synchronized boolean waitForNext(long pollTime) throws IOException {
-      if (!documentEnded) {
-        try {
-          wait(pollTime);
-        } catch (InterruptedException ex) {
-          // continue;
-        }
-      }
-      return documentEnded;
-    }
-
-    @Override
-    public synchronized void endDocument() throws SAXException {
-      this.documentEnded = true;
-      notify();
-    }
-
-    public String getCurrent() throws IOException {
-      checkParseException();
-      String value = queue.poll();
-      if (value == null && documentEnded) {
-        return current;
-      } else {
-        return value;
-      }
-    }
-    public void checkParseException() throws IOException {
-      if (parseException != null) {
-        throw new IOException(parseException);
-      }
-    }
-
-    public void setMinTextLength(int minTextLength) {
-      this.minTextLength = minTextLength;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba93dd39/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/ParseResultTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/ParseResultTest.java
 
b/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/ParseResultTest.java
new file mode 100644
index 0000000..fd86152
--- /dev/null
+++ 
b/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/ParseResultTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.tika;
+
+import static org.junit.Assert.assertEquals;
+import org.apache.tika.metadata.Metadata;
+import org.junit.Test;
+
+/**
+ * Tests ParseResult.
+ */
+public class ParseResultTest {
+  @Test
+  public void testEqualsAndHashCode() {
+    ParseResult p1 = new ParseResult("a.txt", "hello", getMetadata());
+    ParseResult p2 = new ParseResult("a.txt", "hello", getMetadata());
+    assertEquals(p1, p2);
+    assertEquals(p1.hashCode(), p2.hashCode());
+  }
+
+  static Metadata getMetadata() {
+    Metadata m = new Metadata();
+    m.add("Author", "BeamTikaUser");
+    m.add("Author", "BeamTikaUser2");
+    m.add("Date", "2017-09-01");
+    return m;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba93dd39/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java 
b/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java
index 40ff569..a985b0a 100644
--- 
a/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java
+++ 
b/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java
@@ -18,14 +18,14 @@
 package org.apache.beam.sdk.io.tika;
 
 import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.hamcrest.Matchers.isA;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 import java.io.IOException;
 
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.io.Compression;
+import org.apache.beam.sdk.io.FileIO;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -33,231 +33,171 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.tika.exception.TikaException;
-import org.junit.Ignore;
+import org.apache.tika.metadata.Metadata;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 /**
  * Tests TikaInput.
  */
 public class TikaIOTest {
-  private static final String[] PDF_FILE = new String[] {
-      "Combining", "can help to ingest", "Apache Beam", "in most known 
formats.",
-      "the content from the files", "and", "Apache Tika"
-  };
-  private static final String[] PDF_ZIP_FILE = new String[] {
-      "Combining", "can help to ingest", "Apache Beam", "in most known 
formats.",
-      "the content from the files", "and", "Apache Tika",
-      "apache-beam-tika.pdf"
-  };
-  private static final String[] ODT_FILE = new String[] {
-      "Combining", "can help to ingest", "Apache", "Beam", "in most known 
formats.",
-      "the content from the files", "and", "Apache Tika"
-  };
-  private static final String[] ODT_FILE_WITH_METADATA = new String[] {
-      "Combining", "can help to ingest", "Apache", "Beam", "in most known 
formats.",
-      "the content from the files", "and", "Apache Tika",
-      "Author=BeamTikaUser"
-  };
-  private static final String[] ODT_FILE_WITH_MIN_TEXT_LEN = new String[] {
-      "Combining Apache Beam", "and Apache Tika can help to ingest", "in most 
known formats.",
-      "the content from the files"
-  };
-  private static final String[] ODT_FILES = new String[] {
-      "Combining", "can help to ingest", "Apache", "Beam", "in most known 
formats.",
-      "the content from the files", "and", "Apache Tika",
-      "Open Office", "Text", "PDF", "Excel", "Scientific",
-      "and other formats", "are supported."
-  };
+  private static final String PDF_FILE =
+      "\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n"
+      + "Combining\n\nApache Beam\n\nand\n\nApache Tika\n\ncan help to 
ingest\n\n"
+      + "the content from the files\n\nin most known formats.\n\n\n";
+
+  private static final String PDF_ZIP_FILE =
+      "\n\n\n\n\n\n\n\napache-beam-tika.pdf\n\n\nCombining\n\n\nApache 
Beam\n\n\n"
+      + "and\n\n\nApache Tika\n\n\ncan help to ingest\n\n\nthe content from 
the files\n\n\n"
+      + "in most known formats.\n\n\n\n\n\n\n";
+
+  private static final String ODT_FILE =
+      
"\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n"
+      + "Combining\nApache Beam\nand\nApache Tika\ncan help to ingest\nthe 
content from the"
+      + " files\nin most known formats.\n";
+
+  private static final String ODT_FILE2 =
+      
"\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n"
+      + "Open Office\nPDF\nExcel\nText\nScientific\nand other formats\nare 
supported.\n";
 
   @Rule
   public TestPipeline p = TestPipeline.create();
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
 
-  @Ignore
   @Test
-  public void testReadPdfFile() throws IOException {
+  public void testParsePdfFile() throws IOException {
 
     String resourcePath = 
getClass().getResource("/apache-beam-tika.pdf").getPath();
 
-    doTestReadFiles(resourcePath, PDF_FILE);
+    doTestParse(resourcePath, new ParseResult(resourcePath, PDF_FILE));
   }
 
-  @Test
-  public void testReadZipPdfFile() throws IOException {
-
-    String resourcePath = 
getClass().getResource("/apache-beam-tika-pdf.zip").getPath();
-
-    doTestReadFiles(resourcePath, PDF_ZIP_FILE);
+  private void doTestParse(String resourcePath, ParseResult... expectedResults)
+      throws IOException {
+     PCollection<ParseResult> output =
+         p.apply("ParseAll", TikaIO.parse().filepattern(resourcePath))
+         .apply(ParDo.of(new FilterMetadataFn()));
+     PAssert.that(output).containsInAnyOrder(expectedResults);
+     p.run();
   }
 
   @Test
-  public void testReadOdtFile() throws IOException {
+  public void testParseAllPdfFile() throws IOException {
 
-    String resourcePath = 
getClass().getResource("/apache-beam-tika1.odt").getPath();
+    String resourcePath = 
getClass().getResource("/apache-beam-tika.pdf").getPath();
 
-    doTestReadFiles(resourcePath, ODT_FILE);
+    doTestParseAll(resourcePath, new ParseResult(resourcePath, PDF_FILE));
   }
 
   @Test
-  public void testReadOdtFiles() throws IOException {
-    String resourcePath = 
getClass().getResource("/apache-beam-tika1.odt").getPath();
-    resourcePath = resourcePath.replace("apache-beam-tika1", "*");
+  public void testParseAllZipPdfFile() throws IOException {
 
-    doTestReadFiles(resourcePath, ODT_FILES);
-  }
+    String resourcePath = 
getClass().getResource("/apache-beam-tika-pdf.zip").getPath();
 
-  private void doTestReadFiles(String resourcePath, String[] expected) throws 
IOException {
-    PCollection<String> output = p.apply("ParseFiles", 
TikaIO.read().from(resourcePath));
-    PAssert.that(output).containsInAnyOrder(expected);
-    p.run();
+    doTestParseAll(resourcePath, new ParseResult(resourcePath, PDF_ZIP_FILE));
   }
 
   @Test
-  public void testReadOdtFileWithMetadata() throws IOException {
+  public void testParseAllOdtFile() throws IOException {
 
     String resourcePath = 
getClass().getResource("/apache-beam-tika1.odt").getPath();
 
-    PCollection<String> output = p.apply("ParseOdtFile",
-        TikaIO.read().from(resourcePath).withReadOutputMetadata(true))
-        .apply(ParDo.of(new FilterMetadataFn()));
-    PAssert.that(output).containsInAnyOrder(ODT_FILE_WITH_METADATA);
-    p.run();
+    doTestParseAll(resourcePath, new ParseResult(resourcePath, ODT_FILE, 
getOdtMetadata()));
   }
 
   @Test
-  public void testReadOdtFileWithMinTextLength() throws IOException {
+  public void testParseAllOdtFiles() throws IOException {
+    String resourcePath1 = 
getClass().getResource("/apache-beam-tika1.odt").getPath();
+    String resourcePath2 = 
getClass().getResource("/apache-beam-tika2.odt").getPath();
+    String resourcePath = resourcePath1.replace("apache-beam-tika1", "*");
 
-    String resourcePath = 
getClass().getResource("/apache-beam-tika1.odt").getPath();
-
-    PCollection<String> output = p.apply("ParseOdtFile",
-        TikaIO.read().from(resourcePath).withMinimumTextlength(20));
-    PAssert.that(output).containsInAnyOrder(ODT_FILE_WITH_MIN_TEXT_LEN);
-    p.run();
+    doTestParseAll(resourcePath, new ParseResult(resourcePath1, ODT_FILE, 
getOdtMetadata()),
+        new ParseResult(resourcePath2, ODT_FILE2));
   }
 
-  @Test
-  public void testReadPdfFileSync() throws IOException {
-
-    String resourcePath = 
getClass().getResource("/apache-beam-tika.pdf").getPath();
-
-    PCollection<String> output = p.apply("ParsePdfFile",
-        TikaIO.read().from(resourcePath).withParseSynchronously(true));
-    PAssert.that(output).containsInAnyOrder(PDF_FILE);
+  private void doTestParseAll(String resourcePath, ParseResult... 
expectedResults)
+     throws IOException {
+    PCollection<ParseResult> output =
+        p.apply("ParseFiles", FileIO.match().filepattern(resourcePath))
+        .apply(FileIO.readMatches().withCompression(Compression.UNCOMPRESSED))
+        .apply(TikaIO.parseAll())
+        .apply(ParDo.of(new FilterMetadataFn()));
+    PAssert.that(output).containsInAnyOrder(expectedResults);
     p.run();
   }
 
   @Test
-  public void testReadDamagedPdfFile() throws IOException {
+  public void testParseAllDamagedPdfFile() throws IOException {
+    thrown.expectCause(isA(TikaException.class));
+    String resourcePath = getClass().getResource("/damaged.pdf").getPath();
 
-    doTestReadDamagedPdfFile(false);
+    p.apply("ParseInvalidPdfFile", FileIO.match().filepattern(resourcePath))
+      .apply(FileIO.readMatches())
+      .apply(TikaIO.parseAll());
+    p.run();
   }
 
   @Test
-  public void testReadDamagedPdfFileSync() throws IOException {
-    doTestReadDamagedPdfFile(true);
-  }
+  public void testParseDisplayData() {
+    TikaIO.Parse parse = TikaIO.parse().filepattern("file.pdf");
 
-  private void doTestReadDamagedPdfFile(boolean sync) throws IOException {
+    DisplayData displayData = DisplayData.from(parse);
 
-    String resourcePath = getClass().getResource("/damaged.pdf").getPath();
-
-    p.apply("ParseInvalidPdfFile",
-        TikaIO.read().from(resourcePath).withParseSynchronously(sync));
-    try {
-        p.run();
-        fail("Transform failure is expected");
-    } catch (RuntimeException ex) {
-      assertTrue(ex.getCause().getCause() instanceof TikaException);
-    }
+    assertThat(displayData, hasDisplayItem("filePattern", "file.pdf"));
+    assertEquals(1, displayData.items().size());
   }
 
   @Test
-  public void testReadDisplayData() {
-    TikaIO.Read read = TikaIO.read()
-        .from("foo.*")
+  public void testParseAllDisplayData() {
+    TikaIO.ParseAll parseAll = TikaIO.parseAll()
         .withTikaConfigPath("tikaconfigpath")
-        .withContentTypeHint("application/pdf")
-        .withMinimumTextlength(100)
-        .withReadOutputMetadata(true);
+        .withContentTypeHint("application/pdf");
 
-    DisplayData displayData = DisplayData.from(read);
+    DisplayData displayData = DisplayData.from(parseAll);
 
-    assertThat(displayData, hasDisplayItem("filePattern", "foo.*"));
     assertThat(displayData, hasDisplayItem("tikaConfigPath", 
"tikaconfigpath"));
     assertThat(displayData, hasDisplayItem("inputMetadata",
-        "[Content-Type=application/pdf]"));
-    assertThat(displayData, hasDisplayItem("readOutputMetadata", "true"));
-    assertThat(displayData, hasDisplayItem("parseMode", "asynchronous"));
-    assertThat(displayData, hasDisplayItem("queuePollTime", "50"));
-    assertThat(displayData, hasDisplayItem("queueMaxPollTime", "3000"));
-    assertThat(displayData, hasDisplayItem("minTextLen", "100"));
-    assertEquals(8, displayData.items().size());
-  }
-
-  @Test
-  public void testReadDisplayDataSyncMode() {
-    TikaIO.Read read = TikaIO.read()
-        .from("foo.*")
-        .withParseSynchronously(true);
-
-    DisplayData displayData = DisplayData.from(read);
-
-    assertThat(displayData, hasDisplayItem("filePattern", "foo.*"));
-    assertThat(displayData, hasDisplayItem("parseMode", "synchronous"));
+        "Content-Type=application/pdf"));
     assertEquals(2, displayData.items().size());
   }
 
   @Test
-  public void testReadDisplayDataWithDefaultOptions() {
-    final String[] args = new String[]{"--input=/input/tika.pdf"};
-    TikaIO.Read read = TikaIO.read().withOptions(createOptions(args));
+  public void testParseAllDisplayDataWithCustomOptions() {
+    TikaIO.ParseAll parseAll = TikaIO.parseAll()
+        .withTikaConfigPath("/tikaConfigPath")
+        .withContentTypeHint("application/pdf");
 
-    DisplayData displayData = DisplayData.from(read);
+    DisplayData displayData = DisplayData.from(parseAll);
 
-    assertThat(displayData, hasDisplayItem("filePattern", "/input/tika.pdf"));
-    assertThat(displayData, hasDisplayItem("parseMode", "asynchronous"));
-    assertThat(displayData, hasDisplayItem("queuePollTime", "50"));
-    assertThat(displayData, hasDisplayItem("queueMaxPollTime", "3000"));
-    assertEquals(4, displayData.items().size());
-  }
-  @Test
-  public void testReadDisplayDataWithCustomOptions() {
-    final String[] args = new String[]{"--input=/input/tika.pdf",
-                                       "--tikaConfigPath=/tikaConfigPath",
-                                       "--queuePollTime=10",
-                                       "--queueMaxPollTime=1000",
-                                       "--contentTypeHint=application/pdf",
-                                       "--readOutputMetadata=true"};
-    TikaIO.Read read = TikaIO.read().withOptions(createOptions(args));
-
-    DisplayData displayData = DisplayData.from(read);
-
-    assertThat(displayData, hasDisplayItem("filePattern", "/input/tika.pdf"));
     assertThat(displayData, hasDisplayItem("tikaConfigPath", 
"/tikaConfigPath"));
-    assertThat(displayData, hasDisplayItem("parseMode", "asynchronous"));
-    assertThat(displayData, hasDisplayItem("queuePollTime", "10"));
-    assertThat(displayData, hasDisplayItem("queueMaxPollTime", "1000"));
     assertThat(displayData, hasDisplayItem("inputMetadata",
-        "[Content-Type=application/pdf]"));
-    assertThat(displayData, hasDisplayItem("readOutputMetadata", "true"));
-    assertEquals(7, displayData.items().size());
-  }
-
-  private static TikaOptions createOptions(String[] args) {
-    return PipelineOptionsFactory.fromArgs(args)
-        .withValidation().as(TikaOptions.class);
+        "Content-Type=application/pdf"));
+    assertEquals(2, displayData.items().size());
   }
 
-  static class FilterMetadataFn extends DoFn<String, String> {
+  static class FilterMetadataFn extends DoFn<ParseResult, ParseResult> {
     private static final long serialVersionUID = 6338014219600516621L;
 
     @ProcessElement
     public void processElement(ProcessContext c) {
-      String word = c.element();
-      if (word.contains("=") && !word.startsWith("Author")) {
-        return;
+      ParseResult result = c.element();
+      Metadata m = new Metadata();
+      // Files contain many metadata properties. This function drops all but 
the "Author"
+      // property manually added to "apache-beam-tika1.odt" resource only to 
make
+      // the tests simpler
+      if (result.getFileLocation().endsWith("apache-beam-tika1.odt")) {
+          m.set("Author", result.getMetadata().get("Author"));
       }
-      c.output(word);
+      ParseResult newResult = new ParseResult(result.getFileLocation(), 
result.getContent(), m);
+      c.output(newResult);
     }
   }
+
+  static Metadata getOdtMetadata() {
+    Metadata m = new Metadata();
+    m.set("Author", "BeamTikaUser");
+    return m;
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/ba93dd39/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaReaderTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaReaderTest.java
 
b/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaReaderTest.java
deleted file mode 100644
index 5c4e754..0000000
--- 
a/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaReaderTest.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.tika;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Arrays;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.beam.sdk.io.tika.TikaSource.TikaReader;
-import org.junit.Test;
-
-/**
- * Tests TikaReader.
- */
-public class TikaReaderTest {
-  private static final List<String> ODT_FILE = Arrays.asList(
-      "Combining", "can help to ingest", "Apache", "Beam", "in most known 
formats.",
-      "the content from the files", "and", "Apache Tika");
-
-  @Test
-  public void testOdtFileAsyncReader() throws Exception {
-    doTestOdtFileReader(false);
-  }
-  @Test
-  public void testOdtFileSyncReader() throws Exception {
-    doTestOdtFileReader(true);
-  }
-  private void doTestOdtFileReader(boolean sync) throws Exception {
-    String resourcePath = 
getClass().getResource("/apache-beam-tika1.odt").getPath();
-    TikaSource source = new TikaSource(TikaIO.read()
-                                                .withParseSynchronously(sync)
-                                                .from(resourcePath));
-    TikaReader reader = (TikaReader) source.createReader(null);
-
-    List<String> content = new LinkedList<String>();
-    for (boolean available = reader.start(); available; available = 
reader.advance()) {
-      content.add(reader.getCurrent());
-    }
-    assertTrue(content.containsAll(ODT_FILE));
-    if (!sync) {
-      assertNotNull(reader.getExecutorService());
-    } else {
-      assertNull(reader.getExecutorService());
-    }
-    reader.close();
-  }
-
-  @Test
-  public void testOdtFilesReader() throws Exception {
-    String resourcePath = 
getClass().getResource("/apache-beam-tika1.odt").getPath();
-    String filePattern = resourcePath.replace("apache-beam-tika1", "*");
-
-    TikaSource source = new TikaSource(TikaIO.read().from(filePattern));
-    TikaSource.FilePatternTikaReader reader =
-        (TikaSource.FilePatternTikaReader) source.createReader(null);
-    List<String> content = new LinkedList<String>();
-    for (boolean available = reader.start(); available; available = 
reader.advance()) {
-      content.add(reader.getCurrent());
-    }
-    assertTrue(content.containsAll(ODT_FILE));
-    reader.close();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/ba93dd39/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaSourceTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaSourceTest.java
 
b/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaSourceTest.java
deleted file mode 100644
index 550f469..0000000
--- 
a/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaSourceTest.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.tika;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.util.List;
-
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.tika.TikaSource.TikaReader;
-import org.junit.Test;
-
-/**
- * Tests TikaSource.
- */
-public class TikaSourceTest {
-
-  @Test
-  public void testOdtFileSource() throws Exception {
-    String resourcePath = 
getClass().getResource("/apache-beam-tika1.odt").getPath();
-    TikaSource source = new TikaSource(TikaIO.read().from(resourcePath));
-    assertEquals(StringUtf8Coder.of(), source.getDefaultOutputCoder());
-
-    assertEquals(TikaSource.Mode.FILEPATTERN, source.getMode());
-    assertTrue(source.createReader(null) instanceof TikaReader);
-
-    List<? extends TikaSource> sources = source.split(1, null);
-    assertEquals(1, sources.size());
-    TikaSource nextSource = sources.get(0);
-    assertEquals(TikaSource.Mode.SINGLE_FILE, nextSource.getMode());
-    assertEquals(resourcePath, 
nextSource.getSingleFileMetadata().resourceId().toString());
-  }
-
-  @Test
-  public void testOdtFilesSource() throws Exception {
-    String resourcePath = 
getClass().getResource("/apache-beam-tika1.odt").getPath();
-    String resourcePath2 = 
getClass().getResource("/apache-beam-tika2.odt").getPath();
-    String filePattern = resourcePath.replace("apache-beam-tika1", "*");
-
-    TikaSource source = new TikaSource(TikaIO.read().from(filePattern));
-    assertEquals(StringUtf8Coder.of(), source.getDefaultOutputCoder());
-
-    assertEquals(TikaSource.Mode.FILEPATTERN, source.getMode());
-    assertTrue(source.createReader(null) instanceof 
TikaSource.FilePatternTikaReader);
-
-    List<? extends TikaSource> sources = source.split(1, null);
-    assertEquals(2, sources.size());
-    TikaSource nextSource = sources.get(0);
-    assertEquals(TikaSource.Mode.SINGLE_FILE, nextSource.getMode());
-    String nextSourceResource = 
nextSource.getSingleFileMetadata().resourceId().toString();
-    TikaSource nextSource2 = sources.get(1);
-    assertEquals(TikaSource.Mode.SINGLE_FILE, nextSource2.getMode());
-    String nextSourceResource2 = 
nextSource2.getSingleFileMetadata().resourceId().toString();
-    assertTrue(nextSourceResource.equals(resourcePath) && 
nextSourceResource2.equals(resourcePath2)
-        || nextSourceResource.equals(resourcePath2) && 
nextSourceResource2.equals(resourcePath));
-  }
-}

Reply via email to