Repository: beam
Updated Branches:
  refs/heads/master f303b9899 -> cad84c880


Refactor Hadoop/HDFS IO


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/da6d7e16
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/da6d7e16
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/da6d7e16

Branch: refs/heads/master
Commit: da6d7e16464cf8ddb1a798872e79f3ff55580c9c
Parents: 2b1c084
Author: Rafal Wojdyla <r...@spotify.com>
Authored: Thu Feb 16 00:56:51 2017 -0500
Committer: Dan Halperin <dhalp...@google.com>
Committed: Wed Feb 22 16:20:14 2017 -0800

----------------------------------------------------------------------
 sdks/java/io/hdfs/pom.xml                       |  11 +
 .../apache/beam/sdk/io/hdfs/HDFSFileSink.java   | 300 ++++++++---
 .../apache/beam/sdk/io/hdfs/HDFSFileSource.java | 531 ++++++++++++-------
 .../sdk/io/hdfs/SerializableConfiguration.java  |  28 +-
 .../org/apache/beam/sdk/io/hdfs/UGIHelper.java  |  38 ++
 .../apache/beam/sdk/io/hdfs/WritableCoder.java  |   2 +-
 .../beam/sdk/io/hdfs/HDFSFileSinkTest.java      | 173 ++++++
 .../beam/sdk/io/hdfs/HDFSFileSourceTest.java    |  39 +-
 8 files changed, 839 insertions(+), 283 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/da6d7e16/sdks/java/io/hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/pom.xml b/sdks/java/io/hdfs/pom.xml
index cd6cf4c..f857a22 100644
--- a/sdks/java/io/hdfs/pom.xml
+++ b/sdks/java/io/hdfs/pom.xml
@@ -105,6 +105,17 @@
     </dependency>
 
     <dependency>
+      <groupId>com.google.auto.value</groupId>
+      <artifactId>auto-value</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>com.google.code.findbugs</groupId>
       <artifactId>jsr305</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/da6d7e16/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java 
b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
index 6d30307..168bac7 100644
--- 
a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
+++ 
b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
@@ -17,25 +17,36 @@
  */
 package org.apache.beam.sdk.io.hdfs;
 
+import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
+import com.google.auto.value.AutoValue;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import java.io.IOException;
-import java.util.Map;
+import java.net.URI;
+import java.security.PrivilegedExceptionAction;
 import java.util.Random;
 import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapreduce.AvroKeyOutputFormat;
+import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.Sink;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.KV;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobID;
@@ -46,67 +57,203 @@ import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hadoop.mapreduce.task.JobContextImpl;
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 
 /**
- * A {@code Sink} for writing records to a Hadoop filesystem using a Hadoop 
file-based output
+ * A {@link Sink} for writing records to a Hadoop filesystem using a Hadoop 
file-based output
  * format.
  *
- * @param <K> The type of keys to be written to the sink.
- * @param <V> The type of values to be written to the sink.
+ * <p>To write a {@link org.apache.beam.sdk.values.PCollection} of elements of 
type T to Hadoop
+ * filesystem use {@link HDFSFileSink#to}, specify the path (this can be any 
Hadoop supported
+ * filesystem: HDFS, S3, GCS etc), the Hadoop {@link FileOutputFormat}, the 
key class K and the
+ * value class V and finally the {@link SerializableFunction} to map from T to 
{@link KV} of K
+ * and V.
+ *
+ * <p>{@code HDFSFileSink} can be used by {@link org.apache.beam.sdk.io.Write} 
to create write
+ * transform. See example below.
+ *
+ * <p>{@code HDFSFileSink} comes with helper methods to write text and Apache 
Avro. For example:
+ *
+ * <pre>
+ * {@code
+ * HDFSFileSink<CustomSpecificAvroClass, AvroKey<CustomSpecificAvroClass>, 
NullWritable> sink =
+ *   HDFSFileSink.toAvro(path, AvroCoder.of(CustomSpecificAvroClass.class));
+ * avroRecordsPCollection.apply(Write.to(sink));
+ * }
+ * </pre>
+ *
+ * @param <T> the type of elements of the input {@link 
org.apache.beam.sdk.values.PCollection}.
+ * @param <K> the type of keys to be written to the sink via {@link 
FileOutputFormat}.
+ * @param <V> the type of values to be written to the sink via {@link 
FileOutputFormat}.
  */
-public class HDFSFileSink<K, V> extends Sink<KV<K, V>> {
+@AutoValue
+public abstract class HDFSFileSink<T, K, V> extends Sink<T> {
 
   private static final JobID jobId = new JobID(
       Long.toString(System.currentTimeMillis()),
       new Random().nextInt(Integer.MAX_VALUE));
 
-  protected final String path;
-  protected final Class<? extends FileOutputFormat<K, V>> formatClass;
+  public abstract String path();
+  public abstract Class<? extends FileOutputFormat<K, V>> formatClass();
+  public abstract Class<K> keyClass();
+  public abstract Class<V> valueClass();
+  public abstract SerializableFunction<T, KV<K, V>> outputConverter();
+  public abstract SerializableConfiguration serializableConfiguration();
+  public @Nullable abstract String username();
+  public abstract boolean validate();
+
+  // =======================================================================
+  // Factory methods
+  // =======================================================================
+
+  public static <T, K, V, W extends FileOutputFormat<K, V>> HDFSFileSink<T, K, 
V>
+  to(String path,
+     Class<W> formatClass,
+     Class<K> keyClass,
+     Class<V> valueClass,
+     SerializableFunction<T, KV<K, V>> outputConverter) {
+    return HDFSFileSink.<T, K, V>builder()
+        .setPath(path)
+        .setFormatClass(formatClass)
+        .setKeyClass(keyClass)
+        .setValueClass(valueClass)
+        .setOutputConverter(outputConverter)
+        .setConfiguration(null)
+        .setUsername(null)
+        .setValidate(true)
+        .build();
+  }
+
+  public static <T> HDFSFileSink<T, NullWritable, Text> toText(String path) {
+    SerializableFunction<T, KV<NullWritable, Text>> outputConverter =
+        new SerializableFunction<T, KV<NullWritable, Text>>() {
+          @Override
+          public KV<NullWritable, Text> apply(T input) {
+            return KV.of(NullWritable.get(), new Text(input.toString()));
+          }
+        };
+    return to(path, TextOutputFormat.class, NullWritable.class, Text.class, 
outputConverter);
+  }
+
+  /**
+   * Helper to create Avro sink given {@link AvroCoder}. Keep in mind that 
configuration
+   * object is altered to enable Avro output.
+   */
+  public static <T> HDFSFileSink<T, AvroKey<T>, NullWritable> toAvro(String 
path,
+                                                                     final 
AvroCoder<T> coder,
+                                                                     
Configuration conf) {
+    SerializableFunction<T, KV<AvroKey<T>, NullWritable>> outputConverter =
+        new SerializableFunction<T, KV<AvroKey<T>, NullWritable>>() {
+          @Override
+          public KV<AvroKey<T>, NullWritable> apply(T input) {
+            return KV.of(new AvroKey<>(input), NullWritable.get());
+          }
+        };
+    conf.set("avro.schema.output.key", coder.getSchema().toString());
+    return to(
+        path,
+        AvroKeyOutputFormat.class,
+        (Class<AvroKey<T>>) (Class<?>) AvroKey.class,
+        NullWritable.class,
+        outputConverter).withConfiguration(conf);
+  }
+
+  /**
+   * Helper to create Avro sink given {@link Schema}. Keep in mind that 
configuration
+   * object is altered to enable Avro output.
+   */
+  public static HDFSFileSink<GenericRecord, AvroKey<GenericRecord>, 
NullWritable>
+  toAvro(String path, Schema schema, Configuration conf) {
+    return toAvro(path, AvroCoder.of(schema), conf);
+  }
 
-  // workaround to make Configuration serializable
-  private final Map<String, String> map;
+  /**
+   * Helper to create Avro sink given {@link Class}. Keep in mind that 
configuration
+   * object is altered to enable Avro output.
+   */
+  public static <T> HDFSFileSink<T, AvroKey<T>, NullWritable> toAvro(String 
path,
+                                                                     Class<T> 
cls,
+                                                                     
Configuration conf) {
+    return toAvro(path, AvroCoder.of(cls), conf);
+  }
 
-  public HDFSFileSink(String path, Class<? extends FileOutputFormat<K, V>> 
formatClass) {
-    this.path = path;
-    this.formatClass = formatClass;
-    this.map = Maps.newHashMap();
+  // =======================================================================
+  // Builder methods
+  // =======================================================================
+
+  public abstract Builder<T, K, V> toBuilder();
+  public static <T, K, V> Builder builder() {
+    return new AutoValue_HDFSFileSink.Builder<>();
   }
 
-  public HDFSFileSink(String path, Class<? extends FileOutputFormat<K, V>> 
formatClass,
-                      Configuration conf) {
-    this(path, formatClass);
-    // serialize conf to map
-    for (Map.Entry<String, String> entry : conf) {
-      map.put(entry.getKey(), entry.getValue());
+  /**
+   * AutoValue builder for {@link HDFSFileSink}.
+   */
+  @AutoValue.Builder
+  public abstract static class Builder<T, K, V> {
+    public abstract Builder<T, K, V> setPath(String path);
+    public abstract Builder<T, K, V> setFormatClass(
+        Class<? extends FileOutputFormat<K, V>> formatClass);
+    public abstract Builder<T, K, V> setKeyClass(Class<K> keyClass);
+    public abstract Builder<T, K, V> setValueClass(Class<V> valueClass);
+    public abstract Builder<T, K, V> setOutputConverter(
+        SerializableFunction<T, KV<K, V>> outputConverter);
+    public abstract Builder<T, K, V> setSerializableConfiguration(
+        SerializableConfiguration serializableConfiguration);
+    public Builder<T, K, V> setConfiguration(@Nullable Configuration 
configuration) {
+      if (configuration == null) {
+        configuration = new Configuration(false);
+      }
+      return this.setSerializableConfiguration(new 
SerializableConfiguration(configuration));
     }
+    public abstract Builder<T, K, V> setUsername(String username);
+    public abstract Builder<T, K, V> setValidate(boolean validate);
+    public abstract HDFSFileSink<T, K, V> build();
   }
 
+  public HDFSFileSink<T, K, V> withConfiguration(@Nullable Configuration 
configuration) {
+    return this.toBuilder().setConfiguration(configuration).build();
+  }
+
+  public HDFSFileSink<T, K, V> withUsername(@Nullable String username) {
+    return this.toBuilder().setUsername(username).build();
+  }
+
+  // =======================================================================
+  // Sink
+  // =======================================================================
+
   @Override
   public void validate(PipelineOptions options) {
-    try {
-      Job job = jobInstance();
-      FileSystem fs = FileSystem.get(job.getConfiguration());
-      checkState(!fs.exists(new Path(path)), "Output path " + path + " already 
exists");
-    } catch (IOException e) {
-      throw new RuntimeException(e);
+    if (validate()) {
+      try {
+        UGIHelper.getBestUGI(username()).doAs(new 
PrivilegedExceptionAction<Void>() {
+          @Override
+          public Void run() throws Exception {
+            FileSystem fs = FileSystem.get(new URI(path()),
+                
SerializableConfiguration.newConfiguration(serializableConfiguration()));
+            checkState(!fs.exists(new Path(path())), "Output path %s already 
exists", path());
+            return null;
+          }
+        });
+      } catch (IOException | InterruptedException e) {
+        throw new RuntimeException(e);
+      }
     }
   }
 
   @Override
-  public Sink.WriteOperation<KV<K, V>, ?> createWriteOperation(PipelineOptions 
options) {
-    return new HDFSWriteOperation<>(this, path, formatClass);
+  public Sink.WriteOperation<T, String> createWriteOperation(PipelineOptions 
options) {
+    return new HDFSWriteOperation<>(this, path(), formatClass());
   }
 
-  private Job jobInstance() throws IOException {
-    Job job = Job.getInstance();
-    // deserialize map to conf
-    Configuration conf = job.getConfiguration();
-    for (Map.Entry<String, String> entry : map.entrySet()) {
-      conf.set(entry.getKey(), entry.getValue());
-    }
+  private Job newJob() throws IOException {
+    Job job = SerializableConfiguration.newJob(serializableConfiguration());
     job.setJobID(jobId);
+    job.setOutputKeyClass(keyClass());
+    job.setOutputValueClass(valueClass());
     return job;
   }
 
@@ -115,15 +262,15 @@ public class HDFSFileSink<K, V> extends Sink<KV<K, V>> {
   // =======================================================================
 
   /** {{@link WriteOperation}} for HDFS. */
-  public static class HDFSWriteOperation<K, V> extends WriteOperation<KV<K, 
V>, String> {
+  private static class HDFSWriteOperation<T, K, V> extends WriteOperation<T, 
String> {
 
-    private final Sink<KV<K, V>> sink;
-    protected final String path;
-    protected final Class<? extends FileOutputFormat<K, V>> formatClass;
+    private final HDFSFileSink<T, K, V> sink;
+    private final String path;
+    private final Class<? extends FileOutputFormat<K, V>> formatClass;
 
-    public HDFSWriteOperation(Sink<KV<K, V>> sink,
-                              String path,
-                              Class<? extends FileOutputFormat<K, V>> 
formatClass) {
+    HDFSWriteOperation(HDFSFileSink<T, K, V> sink,
+                       String path,
+                       Class<? extends FileOutputFormat<K, V>> formatClass) {
       this.sink = sink;
       this.path = path;
       this.formatClass = formatClass;
@@ -131,14 +278,25 @@ public class HDFSFileSink<K, V> extends Sink<KV<K, V>> {
 
     @Override
     public void initialize(PipelineOptions options) throws Exception {
-      Job job = ((HDFSFileSink<K, V>) getSink()).jobInstance();
+      Job job = sink.newJob();
       FileOutputFormat.setOutputPath(job, new Path(path));
     }
 
     @Override
-    public void finalize(Iterable<String> writerResults, PipelineOptions 
options) throws Exception {
-      Job job = ((HDFSFileSink<K, V>) getSink()).jobInstance();
-      FileSystem fs = FileSystem.get(job.getConfiguration());
+    public void finalize(final Iterable<String> writerResults, PipelineOptions 
options)
+        throws Exception {
+      UGIHelper.getBestUGI(sink.username()).doAs(new 
PrivilegedExceptionAction<Void>() {
+        @Override
+        public Void run() throws Exception {
+          doFinalize(writerResults);
+          return null;
+        }
+      });
+    }
+
+    private void doFinalize(Iterable<String> writerResults) throws Exception {
+      Job job = sink.newJob();
+      FileSystem fs = FileSystem.get(new URI(path), job.getConfiguration());
 
       // If there are 0 output shards, just create output folder.
       if (!writerResults.iterator().hasNext()) {
@@ -188,12 +346,12 @@ public class HDFSFileSink<K, V> extends Sink<KV<K, V>> {
     }
 
     @Override
-    public Writer<KV<K, V>, String> createWriter(PipelineOptions options) 
throws Exception {
+    public Writer<T, String> createWriter(PipelineOptions options) throws 
Exception {
       return new HDFSWriter<>(this, path, formatClass);
     }
 
     @Override
-    public Sink<KV<K, V>> getSink() {
+    public Sink<T> getSink() {
       return sink;
     }
 
@@ -208,10 +366,9 @@ public class HDFSFileSink<K, V> extends Sink<KV<K, V>> {
   // Writer
   // =======================================================================
 
-  /** {{@link Writer}} for HDFS files. */
-  public static class HDFSWriter<K, V> extends Writer<KV<K, V>, String> {
+  private static class HDFSWriter<T, K, V> extends Writer<T, String> {
 
-    private final HDFSWriteOperation<K, V> writeOperation;
+    private final HDFSWriteOperation<T, K, V> writeOperation;
     private final String path;
     private final Class<? extends FileOutputFormat<K, V>> formatClass;
 
@@ -222,19 +379,31 @@ public class HDFSFileSink<K, V> extends Sink<KV<K, V>> {
     private RecordWriter<K, V> recordWriter;
     private FileOutputCommitter outputCommitter;
 
-    public HDFSWriter(HDFSWriteOperation<K, V> writeOperation,
-                      String path,
-                      Class<? extends FileOutputFormat<K, V>> formatClass) {
+    HDFSWriter(HDFSWriteOperation<T, K, V> writeOperation,
+               String path,
+               Class<? extends FileOutputFormat<K, V>> formatClass) {
       this.writeOperation = writeOperation;
       this.path = path;
       this.formatClass = formatClass;
     }
 
     @Override
-    public void open(String uId) throws Exception {
+    public void open(final String uId) throws Exception {
+      UGIHelper.getBestUGI(writeOperation.sink.username()).doAs(
+          new PrivilegedExceptionAction<Void>() {
+            @Override
+            public Void run() throws Exception {
+              doOpen(uId);
+              return null;
+            }
+          }
+      );
+    }
+
+    private void doOpen(String uId) throws Exception {
       this.hash = uId.hashCode();
 
-      Job job = ((HDFSFileSink<K, V>) 
getWriteOperation().getSink()).jobInstance();
+      Job job = writeOperation.sink.newJob();
       FileOutputFormat.setOutputPath(job, new Path(path));
 
       // Each Writer is responsible for writing one bundle of elements and is 
represented by one
@@ -250,12 +419,25 @@ public class HDFSFileSink<K, V> extends Sink<KV<K, V>> {
     }
 
     @Override
-    public void write(KV<K, V> value) throws Exception {
-      recordWriter.write(value.getKey(), value.getValue());
+    public void write(T value) throws Exception {
+      checkNotNull(recordWriter,
+          "Record writer can't be null. Make sure to open Writer first!");
+      KV<K, V> kv = writeOperation.sink.outputConverter().apply(value);
+      recordWriter.write(kv.getKey(), kv.getValue());
     }
 
     @Override
     public String close() throws Exception {
+      return UGIHelper.getBestUGI(writeOperation.sink.username()).doAs(
+          new PrivilegedExceptionAction<String>() {
+            @Override
+            public String run() throws Exception {
+              return doClose();
+            }
+          });
+    }
+
+    private String doClose() throws Exception {
       // task/attempt successful
       recordWriter.close(context);
       outputCommitter.commitTask(context);
@@ -265,7 +447,7 @@ public class HDFSFileSink<K, V> extends Sink<KV<K, V>> {
     }
 
     @Override
-    public WriteOperation<KV<K, V>, String> getWriteOperation() {
+    public WriteOperation<T, String> getWriteOperation() {
       return writeOperation;
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/da6d7e16/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
 
b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
index 1affb4a..8e12561 100644
--- 
a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
+++ 
b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
@@ -18,8 +18,9 @@
 package org.apache.beam.sdk.io.hdfs;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
 
+import com.google.auto.value.AutoValue;
 import com.google.common.base.Function;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
@@ -29,20 +30,34 @@ import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.net.URI;
+import java.security.PrivilegedExceptionAction;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.NoSuchElementException;
 import javax.annotation.Nullable;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapreduce.AvroKeyInputFormat;
+import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.values.KV;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -52,10 +67,13 @@ import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * A {@code BoundedSource} for reading files resident in a Hadoop filesystem 
(HDFS) using a
+ * A {@code BoundedSource} for reading files resident in a Hadoop filesystem 
using a
  * Hadoop file-based input format.
  *
  * <p>To read a {@link org.apache.beam.sdk.values.PCollection} of
@@ -75,153 +93,301 @@ import 
org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
  * }
  * </pre>
  *
- * <p>The {@link HDFSFileSource#readFrom} method is a convenience method
- * that returns a read transform. For example:
- *
- * <pre>
- * {@code
- * PCollection<KV<MyKey, MyValue>> records = HDFSFileSource.readFrom(path,
- *   MyInputFormat.class, MyKey.class, MyValue.class);
- * }
- * </pre>
- *
  * <p>Implementation note: Since Hadoop's
  * {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}
  * determines the input splits, this class extends {@link BoundedSource} 
rather than
  * {@link org.apache.beam.sdk.io.OffsetBasedSource}, since the latter
  * dictates input splits.
-
- * @param <K> The type of keys to be read from the source.
- * @param <V> The type of values to be read from the source.
+ * @param <T> the type of elements of the result {@link 
org.apache.beam.sdk.values.PCollection}.
+ * @param <K> the type of keys to be read from the source via {@link 
FileInputFormat}.
+ * @param <V> the type of values to be read from the source via {@link 
FileInputFormat}.
  */
-public class HDFSFileSource<K, V> extends BoundedSource<KV<K, V>> {
+@AutoValue
+public abstract class HDFSFileSource<T, K, V> extends BoundedSource<T> {
   private static final long serialVersionUID = 0L;
 
-  protected final String filepattern;
-  protected final Class<? extends FileInputFormat<?, ?>> formatClass;
-  protected final Class<K> keyClass;
-  protected final Class<V> valueClass;
-  protected final SerializableSplit serializableSplit;
+  private static final Logger LOG = 
LoggerFactory.getLogger(HDFSFileSource.class);
+
+  public abstract String filepattern();
+  public abstract Class<? extends FileInputFormat<K, V>> formatClass();
+  public abstract Coder<T> coder();
+  public abstract SerializableFunction<KV<K, V>, T> inputConverter();
+  public abstract SerializableConfiguration serializableConfiguration();
+  public @Nullable abstract SerializableSplit serializableSplit();
+  public @Nullable abstract String username();
+  public abstract boolean validateSource();
+
+  // =======================================================================
+  // Factory methods
+  // =======================================================================
+
+  public static <T, K, V, W extends FileInputFormat<K, V>> HDFSFileSource<T, 
K, V>
+  from(String filepattern,
+       Class<W> formatClass,
+       Coder<T> coder,
+       SerializableFunction<KV<K, V>, T> inputConverter) {
+    return HDFSFileSource.<T, K, V>builder()
+        .setFilepattern(filepattern)
+        .setFormatClass(formatClass)
+        .setCoder(coder)
+        .setInputConverter(inputConverter)
+        .setConfiguration(null)
+        .setUsername(null)
+        .setValidateSource(true)
+        .setSerializableSplit(null)
+        .build();
+  }
 
-  /**
-   * Creates a {@code Read} transform that will read from an {@code 
HDFSFileSource}
-   * with the given file name or pattern ("glob") using the given Hadoop
-   * {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat},
-   * with key-value types specified by the given key class and value class.
-   */
-  public static <K, V, T extends FileInputFormat<K, V>> Read.Bounded<KV<K, V>> 
readFrom(
-      String filepattern, Class<T> formatClass, Class<K> keyClass, Class<V> 
valueClass) {
-    return Read.from(from(filepattern, formatClass, keyClass, valueClass));
+  public static <K, V, W extends FileInputFormat<K, V>> HDFSFileSource<KV<K, 
V>, K, V>
+  from(String filepattern,
+       Class<W> formatClass,
+       Class<K> keyClass,
+       Class<V> valueClass) {
+    KvCoder<K, V> coder = KvCoder.of(getDefaultCoder(keyClass), 
getDefaultCoder(valueClass));
+    SerializableFunction<KV<K, V>, KV<K, V>> inputConverter =
+        new SerializableFunction<KV<K, V>, KV<K, V>>() {
+          @Override
+          public KV<K, V> apply(KV<K, V> input) {
+            return input;
+          }
+        };
+    return HDFSFileSource.<KV<K, V>, K, V>builder()
+        .setFilepattern(filepattern)
+        .setFormatClass(formatClass)
+        .setCoder(coder)
+        .setInputConverter(inputConverter)
+        .setConfiguration(null)
+        .setUsername(null)
+        .setValidateSource(true)
+        .setSerializableSplit(null)
+        .build();
+  }
+
+  public static HDFSFileSource<String, LongWritable, Text>
+  fromText(String filepattern) {
+    SerializableFunction<KV<LongWritable, Text>, String> inputConverter =
+        new SerializableFunction<KV<LongWritable, Text>, String>() {
+      @Override
+      public String apply(KV<LongWritable, Text> input) {
+        return input.getValue().toString();
+      }
+    };
+    return from(filepattern, TextInputFormat.class, StringUtf8Coder.of(), 
inputConverter);
   }
 
   /**
-   * Creates a {@code HDFSFileSource} that reads from the given file name or 
pattern ("glob")
-   * using the given Hadoop {@link 
org.apache.hadoop.mapreduce.lib.input.FileInputFormat},
-   * with key-value types specified by the given key class and value class.
+   * Helper to read from Avro source given {@link AvroCoder}. Keep in mind 
that configuration
+   * object is altered to enable Avro input.
    */
-  public static <K, V, T extends FileInputFormat<K, V>> HDFSFileSource<K, V> 
from(
-      String filepattern, Class<T> formatClass, Class<K> keyClass, Class<V> 
valueClass) {
-    return new HDFSFileSource<>(filepattern, formatClass, keyClass, 
valueClass);
+  public static <T> HDFSFileSource<T, AvroKey<T>, NullWritable>
+  fromAvro(String filepattern, final AvroCoder<T> coder, Configuration conf) {
+    Class<AvroKeyInputFormat<T>> formatClass = 
castClass(AvroKeyInputFormat.class);
+    SerializableFunction<KV<AvroKey<T>, NullWritable>, T> inputConverter =
+        new SerializableFunction<KV<AvroKey<T>, NullWritable>, T>() {
+          @Override
+          public T apply(KV<AvroKey<T>, NullWritable> input) {
+            try {
+              return CoderUtils.clone(coder, input.getKey().datum());
+            } catch (CoderException e) {
+              throw new RuntimeException(e);
+            }
+          }
+        };
+    conf.set("avro.schema.input.key", coder.getSchema().toString());
+    return from(filepattern, formatClass, coder, 
inputConverter).withConfiguration(conf);
   }
 
   /**
-   * Create a {@code HDFSFileSource} based on a file or a file pattern 
specification.
+   * Helper to read from Avro source given {@link Schema}. Keep in mind that 
configuration
+   * object is altered to enable Avro input.
    */
-  protected HDFSFileSource(String filepattern,
-                           Class<? extends FileInputFormat<?, ?>> formatClass, 
Class<K> keyClass,
-                           Class<V> valueClass) {
-    this(filepattern, formatClass, keyClass, valueClass, null);
+  public static HDFSFileSource<GenericRecord, AvroKey<GenericRecord>, 
NullWritable>
+  fromAvro(String filepattern, Schema schema, Configuration conf) {
+    return fromAvro(filepattern, AvroCoder.of(schema), conf);
   }
 
   /**
-   * Create a {@code HDFSFileSource} based on a single Hadoop input split, 
which won't be
-   * split up further.
+   * Helper to read from Avro source given {@link Class}. Keep in mind that 
configuration
+   * object is altered to enable Avro input.
    */
-  protected HDFSFileSource(String filepattern,
-                           Class<? extends FileInputFormat<?, ?>> formatClass, 
Class<K> keyClass,
-                           Class<V> valueClass, SerializableSplit 
serializableSplit) {
-    this.filepattern = filepattern;
-    this.formatClass = formatClass;
-    this.keyClass = keyClass;
-    this.valueClass = valueClass;
-    this.serializableSplit = serializableSplit;
+  public static <T> HDFSFileSource<T, AvroKey<T>, NullWritable>
+  fromAvro(String filepattern, Class<T> cls, Configuration conf) {
+    return fromAvro(filepattern, AvroCoder.of(cls), conf);
   }
 
-  public String getFilepattern() {
-    return filepattern;
+  // =======================================================================
+  // Builder methods
+  // =======================================================================
+
+  public abstract HDFSFileSource.Builder<T, K, V> toBuilder();
+  public static <T, K, V> HDFSFileSource.Builder builder() {
+    return new AutoValue_HDFSFileSource.Builder<>();
   }
 
-  public Class<? extends FileInputFormat<?, ?>> getFormatClass() {
-    return formatClass;
+  /**
+   * AutoValue builder for {@link HDFSFileSource}.
+   */
+  @AutoValue.Builder
+  public abstract static class Builder<T, K, V> {
+    public abstract Builder<T, K, V> setFilepattern(String filepattern);
+    public abstract Builder<T, K, V> setFormatClass(
+        Class<? extends FileInputFormat<K, V>> formatClass);
+    public abstract Builder<T, K, V> setCoder(Coder<T> coder);
+    public abstract Builder<T, K, V> setInputConverter(
+        SerializableFunction<KV<K, V>, T> inputConverter);
+    public abstract Builder<T, K, V> setSerializableConfiguration(
+        SerializableConfiguration serializableConfiguration);
+    public Builder<T, K, V> setConfiguration(Configuration configuration) {
+      if (configuration == null) {
+        configuration = new Configuration(false);
+      }
+      return this.setSerializableConfiguration(new 
SerializableConfiguration(configuration));
+    }
+    public abstract Builder<T, K, V> setSerializableSplit(SerializableSplit 
serializableSplit);
+    public abstract Builder<T, K, V> setUsername(@Nullable String username);
+    public abstract Builder<T, K, V> setValidateSource(boolean validate);
+    public abstract HDFSFileSource<T, K, V> build();
   }
 
-  public Class<K> getKeyClass() {
-    return keyClass;
+  public HDFSFileSource<T, K, V> withConfiguration(@Nullable Configuration 
configuration) {
+    return this.toBuilder().setConfiguration(configuration).build();
   }
 
-  public Class<V> getValueClass() {
-    return valueClass;
+  public HDFSFileSource<T, K, V> withUsername(@Nullable String username) {
+    return this.toBuilder().setUsername(username).build();
   }
 
+  // =======================================================================
+  // BoundedSource
+  // =======================================================================
+
   @Override
-  public void validate() {
-    checkNotNull(filepattern, "need to set the filepattern of a 
HDFSFileSource");
-    checkNotNull(formatClass, "need to set the format class of a 
HDFSFileSource");
-    checkNotNull(keyClass, "need to set the key class of a HDFSFileSource");
-    checkNotNull(valueClass, "need to set the value class of a 
HDFSFileSource");
+  public List<? extends BoundedSource<T>> splitIntoBundles(
+      final long desiredBundleSizeBytes,
+      PipelineOptions options) throws Exception {
+    if (serializableSplit() == null) {
+      List<InputSplit> inputSplits = UGIHelper.getBestUGI(username()).doAs(
+          new PrivilegedExceptionAction<List<InputSplit>>() {
+            @Override
+            public List<InputSplit> run() throws Exception {
+              return computeSplits(desiredBundleSizeBytes, 
serializableConfiguration());
+            }
+          });
+      return Lists.transform(inputSplits,
+          new Function<InputSplit, BoundedSource<T>>() {
+            @Override
+            public BoundedSource<T> apply(@Nullable InputSplit inputSplit) {
+              SerializableSplit serializableSplit = new 
SerializableSplit(inputSplit);
+              return HDFSFileSource.this.toBuilder()
+                  .setSerializableSplit(serializableSplit)
+                  .build();
+            }
+          });
+    } else {
+      return ImmutableList.of(this);
+    }
   }
 
   @Override
-  public List<? extends BoundedSource<KV<K, V>>> splitIntoBundles(long 
desiredBundleSizeBytes,
-      PipelineOptions options) throws Exception {
-    if (serializableSplit == null) {
-      return Lists.transform(computeSplits(desiredBundleSizeBytes),
-          new Function<InputSplit, BoundedSource<KV<K, V>>>() {
+  public long getEstimatedSizeBytes(PipelineOptions options) {
+    long size = 0;
+
+    try {
+      // If this source represents a split from splitIntoBundles, then return 
the size of the split,
+      // rather then the entire input
+      if (serializableSplit() != null) {
+        return serializableSplit().getSplit().getLength();
+      }
+
+      size += UGIHelper.getBestUGI(username()).doAs(new 
PrivilegedExceptionAction<Long>() {
         @Override
-        public BoundedSource<KV<K, V>> apply(@Nullable InputSplit inputSplit) {
-          return new HDFSFileSource<>(filepattern, formatClass, keyClass,
-              valueClass, new SerializableSplit(inputSplit));
+        public Long run() throws Exception {
+          long size = 0;
+          Job job = 
SerializableConfiguration.newJob(serializableConfiguration());
+          for (FileStatus st : listStatus(createFormat(job), job)) {
+            size += st.getLen();
+          }
+          return size;
         }
       });
-    } else {
-      return ImmutableList.of(this);
+    } catch (IOException e) {
+      LOG.warn(
+          "Will estimate size of input to be 0 bytes. Can't estimate size of 
the input due to:", e);
+      // ignore, and return 0
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOG.warn(
+          "Will estimate size of input to be 0 bytes. Can't estimate size of 
the input due to:", e);
+      // ignore, and return 0
     }
+    return size;
   }
 
-  private FileInputFormat<?, ?> createFormat(Job job) throws IOException, 
IllegalAccessException,
-      InstantiationException {
-    Path path = new Path(filepattern);
-    FileInputFormat.addInputPath(job, path);
-    return formatClass.newInstance();
+  @Override
+  public BoundedReader<T> createReader(PipelineOptions options) throws 
IOException {
+    this.validate();
+    return new HDFSFileReader<>(this, filepattern(), formatClass(), 
serializableSplit());
+  }
+
+  @Override
+  public void validate() {
+    if (validateSource()) {
+      try {
+        UGIHelper.getBestUGI(username()).doAs(new 
PrivilegedExceptionAction<Void>() {
+              @Override
+              public Void run() throws Exception {
+                FileSystem fs = FileSystem.get(new URI(filepattern()),
+                    
SerializableConfiguration.newConfiguration(serializableConfiguration()));
+                FileStatus[] fileStatuses = fs.globStatus(new 
Path(filepattern()));
+                checkState(
+                    fileStatuses != null && fileStatuses.length > 0,
+                    "Unable to find any files matching %s", filepattern());
+                  return null;
+                }
+              });
+      } catch (IOException | InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    }
   }
 
-  protected List<InputSplit> computeSplits(long desiredBundleSizeBytes) throws 
IOException,
-      IllegalAccessException, InstantiationException {
-    Job job = Job.getInstance();
+  @Override
+  public Coder<T> getDefaultOutputCoder() {
+    return coder();
+  }
+
+  // =======================================================================
+  // Helpers
+  // =======================================================================
+
+  private List<InputSplit> computeSplits(long desiredBundleSizeBytes,
+                                         SerializableConfiguration 
serializableConfiguration)
+      throws IOException, IllegalAccessException, InstantiationException {
+    Job job = SerializableConfiguration.newJob(serializableConfiguration);
     FileInputFormat.setMinInputSplitSize(job, desiredBundleSizeBytes);
     FileInputFormat.setMaxInputSplitSize(job, desiredBundleSizeBytes);
     return createFormat(job).getSplits(job);
   }
 
-  @Override
-  public BoundedReader<KV<K, V>> createReader(PipelineOptions options) throws 
IOException {
-    this.validate();
-
-    if (serializableSplit == null) {
-      return new HDFSFileReader<>(this, filepattern, formatClass);
-    } else {
-      return new HDFSFileReader<>(this, filepattern, formatClass,
-          serializableSplit.getSplit());
-    }
+  private FileInputFormat<K, V> createFormat(Job job)
+      throws IOException, IllegalAccessException, InstantiationException {
+    Path path = new Path(filepattern());
+    FileInputFormat.addInputPath(job, path);
+    return formatClass().newInstance();
   }
 
-  @Override
-  public Coder<KV<K, V>> getDefaultOutputCoder() {
-    return KvCoder.of(getDefaultCoder(keyClass), getDefaultCoder(valueClass));
+  private List<FileStatus> listStatus(FileInputFormat<K, V> format, Job job)
+      throws NoSuchMethodException, InvocationTargetException, 
IllegalAccessException {
+    // FileInputFormat#listStatus is protected, so call using reflection
+    Method listStatus = FileInputFormat.class.getDeclaredMethod("listStatus", 
JobContext.class);
+    listStatus.setAccessible(true);
+    @SuppressWarnings("unchecked")
+    List<FileStatus> stat = (List<FileStatus>) listStatus.invoke(format, job);
+    return stat;
   }
 
   @SuppressWarnings("unchecked")
-  private <T> Coder<T> getDefaultCoder(Class<T> c) {
+  private static <T> Coder<T> getDefaultCoder(Class<T> c) {
     if (Writable.class.isAssignableFrom(c)) {
       Class<? extends Writable> writableClass = (Class<? extends Writable>) c;
       return (Coder<T>) WritableCoder.of(writableClass);
@@ -232,82 +398,46 @@ public class HDFSFileSource<K, V> extends 
BoundedSource<KV<K, V>> {
     throw new IllegalStateException("Cannot find coder for " + c);
   }
 
-  // BoundedSource
-
-  @Override
-  public long getEstimatedSizeBytes(PipelineOptions options) {
-    long size = 0;
-
-    try {
-      // If this source represents a split from splitIntoBundles, then return 
the size of the split,
-      // rather then the entire input
-      if (serializableSplit != null) {
-        return serializableSplit.getSplit().getLength();
-      }
-
-      Job job = Job.getInstance(); // new instance
-      for (FileStatus st : listStatus(createFormat(job), job)) {
-        size += st.getLen();
-      }
-    } catch (IOException | NoSuchMethodException | InvocationTargetException
-        | IllegalAccessException | InstantiationException e) {
-      // ignore, and return 0
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      // ignore, and return 0
-    }
-    return size;
+  @SuppressWarnings("unchecked")
+  private static <T> Class<T> castClass(Class<?> aClass) {
+    return (Class<T>) aClass;
   }
 
-  private <K, V> List<FileStatus> listStatus(FileInputFormat<K, V> format,
-      JobContext jobContext) throws NoSuchMethodException, 
InvocationTargetException,
-      IllegalAccessException {
-    // FileInputFormat#listStatus is protected, so call using reflection
-    Method listStatus = FileInputFormat.class.getDeclaredMethod("listStatus", 
JobContext.class);
-    listStatus.setAccessible(true);
-    @SuppressWarnings("unchecked")
-    List<FileStatus> stat = (List<FileStatus>) listStatus.invoke(format, 
jobContext);
-    return stat;
-  }
+  // =======================================================================
+  // BoundedReader
+  // =======================================================================
 
-  static class HDFSFileReader<K, V> extends BoundedSource.BoundedReader<KV<K, 
V>> {
+  private static class HDFSFileReader<T, K, V> extends 
BoundedSource.BoundedReader<T> {
 
-    private final BoundedSource<KV<K, V>> source;
+    private final HDFSFileSource<T, K, V> source;
     private final String filepattern;
-    private final Class<? extends FileInputFormat<?, ?>> formatClass;
-    protected Job job;
+    private final Class<? extends FileInputFormat<K, V>> formatClass;
+    private final Job job;
 
-    private FileInputFormat<?, ?> format;
-    private TaskAttemptContext attemptContext;
     private List<InputSplit> splits;
     private ListIterator<InputSplit> splitsIterator;
+
     private Configuration conf;
-    protected RecordReader<K, V> currentReader;
+    private FileInputFormat<?, ?> format;
+    private TaskAttemptContext attemptContext;
+    private RecordReader<K, V> currentReader;
     private KV<K, V> currentPair;
-    private volatile boolean done = false;
-
-    /**
-     * Create a {@code HDFSFileReader} based on a file or a file pattern 
specification.
-     */
-    public HDFSFileReader(BoundedSource<KV<K, V>> source, String filepattern,
-                          Class<? extends FileInputFormat<?, ?>> formatClass) 
throws IOException {
-      this(source, filepattern, formatClass, null);
-    }
 
-    /**
-     * Create a {@code HDFSFileReader} based on a single Hadoop input split.
-     */
-    public HDFSFileReader(BoundedSource<KV<K, V>> source, String filepattern,
-                          Class<? extends FileInputFormat<?, ?>> formatClass, 
InputSplit split)
-            throws IOException {
+    HDFSFileReader(
+        HDFSFileSource<T, K, V> source,
+        String filepattern,
+        Class<? extends FileInputFormat<K, V>> formatClass,
+        SerializableSplit serializableSplit)
+        throws IOException {
       this.source = source;
       this.filepattern = filepattern;
       this.formatClass = formatClass;
-      if (split != null) {
-        this.splits = ImmutableList.of(split);
+      this.job = 
SerializableConfiguration.newJob(source.serializableConfiguration());
+
+      if (serializableSplit != null) {
+        this.splits = ImmutableList.of(serializableSplit.getSplit());
         this.splitsIterator = splits.listIterator();
       }
-      this.job = Job.getInstance(); // new instance
     }
 
     @Override
@@ -315,21 +445,19 @@ public class HDFSFileSource<K, V> extends 
BoundedSource<KV<K, V>> {
       Path path = new Path(filepattern);
       FileInputFormat.addInputPath(job, path);
 
+      conf = job.getConfiguration();
       try {
-        @SuppressWarnings("unchecked")
-        FileInputFormat<K, V> f = (FileInputFormat<K, V>) 
formatClass.newInstance();
-        this.format = f;
+        format = formatClass.newInstance();
       } catch (InstantiationException | IllegalAccessException e) {
         throw new IOException("Cannot instantiate file input format " + 
formatClass, e);
       }
-      this.attemptContext = new TaskAttemptContextImpl(job.getConfiguration(),
-          new TaskAttemptID());
+      attemptContext = new TaskAttemptContextImpl(conf, new TaskAttemptID());
 
       if (splitsIterator == null) {
-        this.splits = format.getSplits(job);
-        this.splitsIterator = splits.listIterator();
+        splits = format.getSplits(job);
+        splitsIterator = splits.listIterator();
       }
-      this.conf = job.getConfiguration();
+
       return advance();
     }
 
@@ -342,7 +470,7 @@ public class HDFSFileSource<K, V> extends 
BoundedSource<KV<K, V>> {
         } else {
           while (splitsIterator.hasNext()) {
             // advance the reader and see if it has records
-            InputSplit nextSplit = splitsIterator.next();
+            final InputSplit nextSplit = splitsIterator.next();
             @SuppressWarnings("unchecked")
             RecordReader<K, V> reader =
                 (RecordReader<K, V>) format.createRecordReader(nextSplit, 
attemptContext);
@@ -350,7 +478,13 @@ public class HDFSFileSource<K, V> extends 
BoundedSource<KV<K, V>> {
               currentReader.close();
             }
             currentReader = reader;
-            currentReader.initialize(nextSplit, attemptContext);
+            UGIHelper.getBestUGI(source.username()).doAs(new 
PrivilegedExceptionAction<Void>() {
+              @Override
+              public Void run() throws Exception {
+                currentReader.initialize(nextSplit, attemptContext);
+                return null;
+              }
+            });
             if (currentReader.nextKeyValue()) {
               currentPair = nextPair();
               return true;
@@ -360,7 +494,6 @@ public class HDFSFileSource<K, V> extends 
BoundedSource<KV<K, V>> {
           }
           // either no next split or all readers were empty
           currentPair = null;
-          done = true;
           return false;
         }
       } catch (InterruptedException e) {
@@ -369,26 +502,12 @@ public class HDFSFileSource<K, V> extends 
BoundedSource<KV<K, V>> {
       }
     }
 
-    @SuppressWarnings("unchecked")
-    protected KV<K, V> nextPair() throws IOException, InterruptedException {
-      K key = currentReader.getCurrentKey();
-      V value = currentReader.getCurrentValue();
-      // clone Writable objects since they are reused between calls to 
RecordReader#nextKeyValue
-      if (key instanceof Writable) {
-        key = (K) WritableUtils.clone((Writable) key, conf);
-      }
-      if (value instanceof Writable) {
-        value = (V) WritableUtils.clone((Writable) value, conf);
-      }
-      return KV.of(key, value);
-    }
-
     @Override
-    public KV<K, V> getCurrent() throws NoSuchElementException {
+    public T getCurrent() throws NoSuchElementException {
       if (currentPair == null) {
         throw new NoSuchElementException();
       }
-      return currentPair;
+      return source.inputConverter().apply(currentPair);
     }
 
     @Override
@@ -401,11 +520,27 @@ public class HDFSFileSource<K, V> extends 
BoundedSource<KV<K, V>> {
     }
 
     @Override
-    public BoundedSource<KV<K, V>> getCurrentSource() {
+    public BoundedSource<T> getCurrentSource() {
       return source;
     }
 
-    // BoundedReader
+    @SuppressWarnings("unchecked")
+    private KV<K, V> nextPair() throws IOException, InterruptedException {
+      K key = currentReader.getCurrentKey();
+      V value = currentReader.getCurrentValue();
+      // clone Writable objects since they are reused between calls to 
RecordReader#nextKeyValue
+      if (key instanceof Writable) {
+        key = (K) WritableUtils.clone((Writable) key, conf);
+      }
+      if (value instanceof Writable) {
+        value = (V) WritableUtils.clone((Writable) value, conf);
+      }
+      return KV.of(key, value);
+    }
+
+    // =======================================================================
+    // Optional overrides
+    // =======================================================================
 
     @Override
     public Double getFractionConsumed() {
@@ -437,31 +572,18 @@ public class HDFSFileSource<K, V> extends 
BoundedSource<KV<K, V>> {
       }
     }
 
-    @Override
-    public final long getSplitPointsRemaining() {
-      if (done) {
-        return 0;
-      }
-      // This source does not currently support dynamic work rebalancing, so 
remaining
-      // parallelism is always 1.
-      return 1;
-    }
-
-    @Override
-    public BoundedSource<KV<K, V>> splitAtFraction(double fraction) {
-      // Not yet supported. To implement this, the sizes of the splits should 
be used to
-      // calculate the remaining splits that constitute the given fraction, 
then a
-      // new source backed by those splits should be returned.
-      return null;
-    }
   }
 
+  // =======================================================================
+  // SerializableSplit
+  // =======================================================================
+
   /**
    * A wrapper to allow Hadoop {@link org.apache.hadoop.mapreduce.InputSplit}s 
to be
    * serialized using Java's standard serialization mechanisms. Note that the 
InputSplit
    * has to be Writable (which most are).
    */
-  public static class SerializableSplit implements Externalizable {
+  protected static class SerializableSplit implements Externalizable {
     private static final long serialVersionUID = 0L;
 
     private InputSplit split;
@@ -496,5 +618,4 @@ public class HDFSFileSource<K, V> extends 
BoundedSource<KV<K, V>> {
     }
   }
 
-
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/da6d7e16/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/SerializableConfiguration.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/SerializableConfiguration.java
 
b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/SerializableConfiguration.java
index f7b4bff..0772e57 100644
--- 
a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/SerializableConfiguration.java
+++ 
b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/SerializableConfiguration.java
@@ -57,11 +57,37 @@ public class SerializableConfiguration implements 
Externalizable {
 
   @Override
   public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
-    conf = new Configuration();
+    conf = new Configuration(false);
     int size = in.readInt();
     for (int i = 0; i < size; i++) {
       conf.set(in.readUTF(), in.readUTF());
     }
   }
 
+  /**
+   * Returns new configured {@link Job} object.
+   */
+  public static Job newJob(@Nullable SerializableConfiguration conf) throws 
IOException {
+    if (conf == null) {
+      return Job.getInstance();
+    } else {
+      Job job = Job.getInstance();
+      for (Map.Entry<String, String> entry : conf.get()) {
+        job.getConfiguration().set(entry.getKey(), entry.getValue());
+      }
+      return job;
+    }
+  }
+
+  /**
+   * Returns new populated {@link Configuration} object.
+   */
+  public static Configuration newConfiguration(@Nullable 
SerializableConfiguration conf) {
+    if (conf == null) {
+      return new Configuration();
+    } else {
+      return conf.get();
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/da6d7e16/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/UGIHelper.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/UGIHelper.java 
b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/UGIHelper.java
new file mode 100644
index 0000000..fd05a19
--- /dev/null
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/UGIHelper.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import java.io.IOException;
+import javax.annotation.Nullable;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * {@link UserGroupInformation} helper methods.
+ */
+public class UGIHelper {
+
+  /**
+   * Find the most appropriate UserGroupInformation to use.
+   * @param username the user name, or NULL if none is specified.
+   * @return the most appropriate UserGroupInformation
+   */
+  public static UserGroupInformation getBestUGI(@Nullable String username) 
throws IOException {
+    return UserGroupInformation.getBestUGI(null, username);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/da6d7e16/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java
 
b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java
index 637e686..d958cda 100644
--- 
a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java
+++ 
b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java
@@ -43,7 +43,7 @@ import org.apache.hadoop.io.Writable;
  * }
  * </pre>
  *
- * @param <T> the type of elements handled by this coder
+ * @param <T> the type of elements handled by this coder.
  */
 public class WritableCoder<T extends Writable> extends StandardCoder<T> {
   private static final long serialVersionUID = 0L;

http://git-wip-us.apache.org/repos/asf/beam/blob/da6d7e16/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java
 
b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java
new file mode 100644
index 0000000..8b9a6d1
--- /dev/null
+++ 
b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.base.MoreObjects;
+import java.io.File;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.UUID;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.FileReader;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.DefaultCoder;
+import org.apache.beam.sdk.io.Sink;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Tests for HDFSFileSinkTest.
+ */
+public class HDFSFileSinkTest {
+
+  @Rule
+  public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+  private final String part0 = "part-r-00000";
+  private final String foobar = "foobar";
+
+  private <T> void doWrite(Sink<T> sink,
+                           PipelineOptions options,
+                           Iterable<T> toWrite) throws Exception {
+    Sink.WriteOperation<T, String> writeOperation =
+        (Sink.WriteOperation<T, String>) sink.createWriteOperation(options);
+    Sink.Writer<T, String> writer = writeOperation.createWriter(options);
+    writer.open(UUID.randomUUID().toString());
+    for (T t: toWrite) {
+      writer.write(t);
+    }
+    String writeResult = writer.close();
+    writeOperation.finalize(Collections.singletonList(writeResult), options);
+  }
+
+  @Test
+  public void testWriteSingleRecord() throws Exception {
+    PipelineOptions options = PipelineOptionsFactory.create();
+    File file = tmpFolder.newFolder();
+
+    HDFSFileSink<String, NullWritable, Text> sink =
+        HDFSFileSink.to(
+            file.toString(),
+            SequenceFileOutputFormat.class,
+            NullWritable.class,
+            Text.class,
+            new SerializableFunction<String, KV<NullWritable, Text>>() {
+              @Override
+              public KV<NullWritable, Text> apply(String input) {
+                return KV.of(NullWritable.get(), new Text(input));
+              }
+            });
+
+    doWrite(sink, options, Collections.singletonList(foobar));
+
+    SequenceFile.Reader.Option opts =
+        SequenceFile.Reader.file(new Path(file.toString(), part0));
+    SequenceFile.Reader reader = new SequenceFile.Reader(new Configuration(), 
opts);
+    assertEquals(NullWritable.class.getName(), reader.getKeyClassName());
+    assertEquals(Text.class.getName(), reader.getValueClassName());
+    NullWritable k = NullWritable.get();
+    Text v = new Text();
+    assertEquals(true, reader.next(k, v));
+    assertEquals(NullWritable.get(), k);
+    assertEquals(new Text(foobar), v);
+  }
+
+  @Test
+  public void testToText() throws Exception {
+    PipelineOptions options = PipelineOptionsFactory.create();
+    File file = tmpFolder.newFolder();
+
+    HDFSFileSink<String, NullWritable, Text> sink = 
HDFSFileSink.toText(file.toString());
+
+    doWrite(sink, options, Collections.singletonList(foobar));
+
+    List<String> strings = Files.readAllLines(new File(file.toString(), 
part0).toPath(),
+        Charset.forName("UTF-8"));
+    assertEquals(Collections.singletonList(foobar), strings);
+  }
+
+  @DefaultCoder(AvroCoder.class)
+  static class GenericClass {
+    int intField;
+    String stringField;
+    public GenericClass() {}
+    public GenericClass(int intValue, String stringValue) {
+      this.intField = intValue;
+      this.stringField = stringValue;
+    }
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(getClass())
+          .add("intField", intField)
+          .add("stringField", stringField)
+          .toString();
+    }
+    @Override
+    public int hashCode() {
+      return Objects.hash(intField, stringField);
+    }
+    @Override
+    public boolean equals(Object other) {
+      if (other == null || !(other instanceof GenericClass)) {
+        return false;
+      }
+      GenericClass o = (GenericClass) other;
+      return Objects.equals(intField, o.intField) && 
Objects.equals(stringField, o.stringField);
+    }
+  }
+
+  @Test
+  public void testToAvro() throws Exception {
+    PipelineOptions options = PipelineOptionsFactory.create();
+    File file = tmpFolder.newFolder();
+
+    HDFSFileSink<GenericClass, AvroKey<GenericClass>, NullWritable> sink = 
HDFSFileSink.toAvro(
+        file.toString(),
+        AvroCoder.of(GenericClass.class),
+        new Configuration(false));
+
+    doWrite(sink, options, Collections.singletonList(new GenericClass(3, 
"foobar")));
+
+    GenericDatumReader datumReader = new GenericDatumReader();
+    FileReader<GenericData.Record> reader =
+        DataFileReader.openReader(new File(file.getAbsolutePath(), part0 + 
".avro"), datumReader);
+    GenericData.Record next = reader.next(null);
+    assertEquals("foobar", next.get("stringField").toString());
+    assertEquals(3, next.get("intField"));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/da6d7e16/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
 
b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
index 4c3f1ce..ac6af40 100644
--- 
a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
+++ 
b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
@@ -51,7 +51,7 @@ import org.junit.rules.TemporaryFolder;
  */
 public class HDFSFileSourceTest {
 
-  Random random = new Random(0L);
+  private Random random = new Random(0L);
 
   @Rule
   public TemporaryFolder tmpFolder = new TemporaryFolder();
@@ -62,9 +62,9 @@ public class HDFSFileSourceTest {
     List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10, 
0);
     File file = createFileWithData("tmp.seq", expectedResults);
 
-    HDFSFileSource<IntWritable, Text> source =
-        HDFSFileSource.from(file.toString(), SequenceFileInputFormat.class,
-            IntWritable.class, Text.class);
+    HDFSFileSource<KV<IntWritable, Text>, IntWritable, Text> source =
+        HDFSFileSource.from(
+            file.toString(), SequenceFileInputFormat.class, IntWritable.class, 
Text.class);
 
     assertEquals(file.length(), source.getEstimatedSizeBytes(null));
 
@@ -86,13 +86,16 @@ public class HDFSFileSourceTest {
     List<KV<IntWritable, Text>> data4 = createRandomRecords(3, 10, 30);
     createFileWithData("otherfile", data4);
 
-    HDFSFileSource<IntWritable, Text> source =
-        HDFSFileSource.from(new File(file1.getParent(), "file*").toString(),
-            SequenceFileInputFormat.class, IntWritable.class, Text.class);
     List<KV<IntWritable, Text>> expectedResults = new ArrayList<>();
     expectedResults.addAll(data1);
     expectedResults.addAll(data2);
     expectedResults.addAll(data3);
+
+    HDFSFileSource<KV<IntWritable, Text>, IntWritable, Text> source =
+        HDFSFileSource.from(
+            new File(file1.getParent(), "file*").toString(), 
SequenceFileInputFormat.class,
+            IntWritable.class, Text.class);
+
     assertThat(expectedResults, containsInAnyOrder(readFromSource(source, 
options).toArray()));
   }
 
@@ -111,10 +114,12 @@ public class HDFSFileSourceTest {
     List<KV<IntWritable, Text>> data4 = createRandomRecords(3, 10, 30);
     createFileWithData("otherfile", data4);
 
-    HDFSFileSource<IntWritable, Text> source =
-        HDFSFileSource.from(new File(file1.getParent(), "file*").toString(),
+    HDFSFileSource<KV<IntWritable, Text>, IntWritable, Text> source =
+        HDFSFileSource.from(
+            new File(file1.getParent(), "file*").toString(),
             SequenceFileInputFormat.class, IntWritable.class, Text.class);
     Source.Reader<KV<IntWritable, Text>> reader = source.createReader(options);
+
     // Closing an unstarted FilePatternReader should not throw an exception.
     try {
       reader.close();
@@ -128,11 +133,11 @@ public class HDFSFileSourceTest {
     PipelineOptions options = PipelineOptionsFactory.create();
 
     List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 
10000, 0);
-    File file = createFileWithData("tmp.avro", expectedResults);
+    File file = createFileWithData("tmp.seq", expectedResults);
 
-    HDFSFileSource<IntWritable, Text> source =
-        HDFSFileSource.from(file.toString(), SequenceFileInputFormat.class,
-            IntWritable.class, Text.class);
+    HDFSFileSource<KV<IntWritable, Text>, IntWritable, Text> source =
+        HDFSFileSource.from(
+            file.toString(), SequenceFileInputFormat.class, IntWritable.class, 
Text.class);
 
     // Assert that the source produces the expected records
     assertEquals(expectedResults, readFromSource(source, options));
@@ -158,7 +163,7 @@ public class HDFSFileSourceTest {
     List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 
10000, 0);
     File file = createFileWithData("tmp.avro", expectedResults);
 
-    HDFSFileSource<IntWritable, Text> source =
+    HDFSFileSource<KV<IntWritable, Text>, IntWritable, Text> source =
         HDFSFileSource.from(file.toString(), SequenceFileInputFormat.class,
             IntWritable.class, Text.class);
 
@@ -178,8 +183,8 @@ public class HDFSFileSourceTest {
       throws IOException {
     File tmpFile = tmpFolder.newFile(filename);
     try (Writer writer = SequenceFile.createWriter(new Configuration(),
-          Writer.keyClass(IntWritable.class), Writer.valueClass(Text.class),
-          Writer.file(new Path(tmpFile.toURI())))) {
+        Writer.keyClass(IntWritable.class), Writer.valueClass(Text.class),
+        Writer.file(new Path(tmpFile.toURI())))) {
 
       for (KV<IntWritable, Text> record : records) {
         writer.append(record.getKey(), record.getValue());
@@ -189,7 +194,7 @@ public class HDFSFileSourceTest {
   }
 
   private List<KV<IntWritable, Text>> createRandomRecords(int dataItemLength,
-      int numItems, int offset) {
+                                                          int numItems, int 
offset) {
     List<KV<IntWritable, Text>> records = new ArrayList<>();
     for (int i = 0; i < numItems; i++) {
       IntWritable key = new IntWritable(i + offset);

Reply via email to