[BEAM-2016] Delete HdfsFileSource & Sink

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7512a73c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7512a73c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7512a73c

Branch: refs/heads/master
Commit: 7512a73cf8aa2a527c89ecb054e92207411ed241
Parents: 3bffe0e
Author: Dan Halperin <dhalp...@google.com>
Authored: Thu May 4 18:33:16 2017 -0700
Committer: Dan Halperin <dhalp...@google.com>
Committed: Fri May 5 08:48:04 2017 -0700

----------------------------------------------------------------------
 sdks/java/io/hadoop-file-system/README.md       |  43 --
 sdks/java/io/hadoop-file-system/pom.xml         |  24 -
 .../apache/beam/sdk/io/hdfs/HDFSFileSink.java   | 478 --------------
 .../apache/beam/sdk/io/hdfs/HDFSFileSource.java | 625 -------------------
 .../java/org/apache/beam/sdk/io/hdfs/Sink.java  | 195 ------
 .../org/apache/beam/sdk/io/hdfs/UGIHelper.java  |  38 --
 .../java/org/apache/beam/sdk/io/hdfs/Write.java | 588 -----------------
 .../apache/beam/sdk/io/hdfs/package-info.java   |   3 +-
 .../beam/sdk/io/hdfs/HDFSFileSinkTest.java      | 172 -----
 .../beam/sdk/io/hdfs/HDFSFileSourceTest.java    | 231 -------
 10 files changed, 2 insertions(+), 2395 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7512a73c/sdks/java/io/hadoop-file-system/README.md
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/README.md 
b/sdks/java/io/hadoop-file-system/README.md
deleted file mode 100644
index 3a734f2..0000000
--- a/sdks/java/io/hadoop-file-system/README.md
+++ /dev/null
@@ -1,43 +0,0 @@
-<!--
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
--->
-
-# HDFS IO
-
-This library provides HDFS sources and sinks to make it possible to read and
-write Apache Hadoop file formats from Apache Beam pipelines.
-
-Currently, only the read path is implemented. A `HDFSFileSource` allows any
-Hadoop `FileInputFormat` to be read as a `PCollection`.
-
-A `HDFSFileSource` can be read from using the
-`org.apache.beam.sdk.io.Read` transform. For example:
-
-```java
-HDFSFileSource<K, V> source = HDFSFileSource.from(path, MyInputFormat.class,
-  MyKey.class, MyValue.class);
-PCollection<KV<MyKey, MyValue>> records = pipeline.apply(Read.from(mySource));
-```
-
-Alternatively, the `readFrom` method is a convenience method that returns a 
read
-transform. For example:
-
-```java
-PCollection<KV<MyKey, MyValue>> records = 
pipeline.apply(HDFSFileSource.readFrom(path,
-  MyInputFormat.class, MyKey.class, MyValue.class));
-```

http://git-wip-us.apache.org/repos/asf/beam/blob/7512a73c/sdks/java/io/hadoop-file-system/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-file-system/pom.xml 
b/sdks/java/io/hadoop-file-system/pom.xml
index 562277e..3b392c2 100644
--- a/sdks/java/io/hadoop-file-system/pom.xml
+++ b/sdks/java/io/hadoop-file-system/pom.xml
@@ -82,11 +82,6 @@
     </dependency>
 
     <dependency>
-      <groupId>org.apache.beam</groupId>
-      <artifactId>beam-sdks-java-io-hadoop-common</artifactId>
-    </dependency>
-
-    <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-core</artifactId>
     </dependency>
@@ -124,25 +119,6 @@
     </dependency>
 
     <dependency>
-      <groupId>org.apache.avro</groupId>
-      <artifactId>avro</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.avro</groupId>
-      <artifactId>avro-mapred</artifactId>
-      <version>${avro.version}</version>
-      <classifier>hadoop2</classifier>
-      <exclusions>
-        <!-- exclude old Jetty version of servlet API -->
-        <exclusion>
-          <groupId>org.mortbay.jetty</groupId>
-          <artifactId>servlet-api</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-
-    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-client</artifactId>
       <scope>provided</scope>

http://git-wip-us.apache.org/repos/asf/beam/blob/7512a73c/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
 
b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
deleted file mode 100644
index aee73c4..0000000
--- 
a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
+++ /dev/null
@@ -1,478 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.auto.value.AutoValue;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import java.io.IOException;
-import java.net.URI;
-import java.security.PrivilegedExceptionAction;
-import java.util.Random;
-import java.util.Set;
-import javax.annotation.Nullable;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapreduce.AvroKeyOutputFormat;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.coders.AvroCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.values.KV;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskID;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.hadoop.mapreduce.task.JobContextImpl;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-
-/**
- * A {@link Sink} for writing records to a Hadoop filesystem using a Hadoop 
file-based
- * output
- * format.
- *
- * <p>To write a {@link org.apache.beam.sdk.values.PCollection} of elements of 
type T to Hadoop
- * filesystem use {@link HDFSFileSink#to}, specify the path (this can be any 
Hadoop supported
- * filesystem: HDFS, S3, GCS etc), the Hadoop {@link FileOutputFormat}, the 
key class K and the
- * value class V and finally the {@link SerializableFunction} to map from T to 
{@link KV} of K
- * and V.
- *
- * <p>{@code HDFSFileSink} can be used by {@link Write} to create write
- * transform. See example below.
- *
- * <p>{@code HDFSFileSink} comes with helper methods to write text and Apache 
Avro. For example:
- *
- * <pre>
- * {@code
- * HDFSFileSink<CustomSpecificAvroClass, AvroKey<CustomSpecificAvroClass>, 
NullWritable> sink =
- *   HDFSFileSink.toAvro(path, AvroCoder.of(CustomSpecificAvroClass.class));
- * avroRecordsPCollection.apply(Write.to(sink));
- * }
- * </pre>
- *
- * @param <T> the type of elements of the input {@link 
org.apache.beam.sdk.values.PCollection}.
- * @param <K> the type of keys to be written to the sink via {@link 
FileOutputFormat}.
- * @param <V> the type of values to be written to the sink via {@link 
FileOutputFormat}.
- */
-@AutoValue
-@Experimental
-public abstract class HDFSFileSink<T, K, V> extends Sink<T> {
-
-  private static final JobID jobId = new JobID(
-      Long.toString(System.currentTimeMillis()),
-      new Random().nextInt(Integer.MAX_VALUE));
-
-  public abstract String path();
-  public abstract Class<? extends FileOutputFormat<K, V>> formatClass();
-  public abstract Class<K> keyClass();
-  public abstract Class<V> valueClass();
-  public abstract SerializableFunction<T, KV<K, V>> outputConverter();
-  public abstract SerializableConfiguration serializableConfiguration();
-  public @Nullable abstract String username();
-  public abstract boolean validate();
-
-  // =======================================================================
-  // Factory methods
-  // =======================================================================
-
-  public static <T, K, V, W extends FileOutputFormat<K, V>> HDFSFileSink<T, K, 
V>
-  to(String path,
-     Class<W> formatClass,
-     Class<K> keyClass,
-     Class<V> valueClass,
-     SerializableFunction<T, KV<K, V>> outputConverter) {
-    return HDFSFileSink.<T, K, V>builder()
-        .setPath(path)
-        .setFormatClass(formatClass)
-        .setKeyClass(keyClass)
-        .setValueClass(valueClass)
-        .setOutputConverter(outputConverter)
-        .setConfiguration(null)
-        .setUsername(null)
-        .setValidate(true)
-        .build();
-  }
-
-  public static <T> HDFSFileSink<T, NullWritable, Text> toText(String path) {
-    SerializableFunction<T, KV<NullWritable, Text>> outputConverter =
-        new SerializableFunction<T, KV<NullWritable, Text>>() {
-          @Override
-          public KV<NullWritable, Text> apply(T input) {
-            return KV.of(NullWritable.get(), new Text(input.toString()));
-          }
-        };
-    return to(path, TextOutputFormat.class, NullWritable.class, Text.class, 
outputConverter);
-  }
-
-  /**
-   * Helper to create Avro sink given {@link AvroCoder}. Keep in mind that 
configuration
-   * object is altered to enable Avro output.
-   */
-  public static <T> HDFSFileSink<T, AvroKey<T>, NullWritable> toAvro(String 
path,
-                                                                     final 
AvroCoder<T> coder,
-                                                                     
Configuration conf) {
-    SerializableFunction<T, KV<AvroKey<T>, NullWritable>> outputConverter =
-        new SerializableFunction<T, KV<AvroKey<T>, NullWritable>>() {
-          @Override
-          public KV<AvroKey<T>, NullWritable> apply(T input) {
-            return KV.of(new AvroKey<>(input), NullWritable.get());
-          }
-        };
-    conf.set("avro.schema.output.key", coder.getSchema().toString());
-    return to(
-        path,
-        AvroKeyOutputFormat.class,
-        (Class<AvroKey<T>>) (Class<?>) AvroKey.class,
-        NullWritable.class,
-        outputConverter).withConfiguration(conf);
-  }
-
-  /**
-   * Helper to create Avro sink given {@link Schema}. Keep in mind that 
configuration
-   * object is altered to enable Avro output.
-   */
-  public static HDFSFileSink<GenericRecord, AvroKey<GenericRecord>, 
NullWritable>
-  toAvro(String path, Schema schema, Configuration conf) {
-    return toAvro(path, AvroCoder.of(schema), conf);
-  }
-
-  /**
-   * Helper to create Avro sink given {@link Class}. Keep in mind that 
configuration
-   * object is altered to enable Avro output.
-   */
-  public static <T> HDFSFileSink<T, AvroKey<T>, NullWritable> toAvro(String 
path,
-                                                                     Class<T> 
cls,
-                                                                     
Configuration conf) {
-    return toAvro(path, AvroCoder.of(cls), conf);
-  }
-
-  // =======================================================================
-  // Builder methods
-  // =======================================================================
-
-  public abstract Builder<T, K, V> toBuilder();
-  public static <T, K, V> Builder builder() {
-    return new AutoValue_HDFSFileSink.Builder<>();
-  }
-
-  /**
-   * AutoValue builder for {@link HDFSFileSink}.
-   */
-  @AutoValue.Builder
-  public abstract static class Builder<T, K, V> {
-    public abstract Builder<T, K, V> setPath(String path);
-    public abstract Builder<T, K, V> setFormatClass(
-        Class<? extends FileOutputFormat<K, V>> formatClass);
-    public abstract Builder<T, K, V> setKeyClass(Class<K> keyClass);
-    public abstract Builder<T, K, V> setValueClass(Class<V> valueClass);
-    public abstract Builder<T, K, V> setOutputConverter(
-        SerializableFunction<T, KV<K, V>> outputConverter);
-    public abstract Builder<T, K, V> setSerializableConfiguration(
-        SerializableConfiguration serializableConfiguration);
-    public Builder<T, K, V> setConfiguration(@Nullable Configuration 
configuration) {
-      if (configuration == null) {
-        configuration = new Configuration(false);
-      }
-      return this.setSerializableConfiguration(new 
SerializableConfiguration(configuration));
-    }
-    public abstract Builder<T, K, V> setUsername(String username);
-    public abstract Builder<T, K, V> setValidate(boolean validate);
-    public abstract HDFSFileSink<T, K, V> build();
-  }
-
-  public HDFSFileSink<T, K, V> withConfiguration(@Nullable Configuration 
configuration) {
-    return this.toBuilder().setConfiguration(configuration).build();
-  }
-
-  public HDFSFileSink<T, K, V> withUsername(@Nullable String username) {
-    return this.toBuilder().setUsername(username).build();
-  }
-
-  // =======================================================================
-  // Sink
-  // =======================================================================
-
-  @Override
-  public void validate(PipelineOptions options) {
-    if (validate()) {
-      try {
-        UGIHelper.getBestUGI(username()).doAs(new 
PrivilegedExceptionAction<Void>() {
-          @Override
-          public Void run() throws Exception {
-            FileSystem fs = FileSystem.get(new URI(path()),
-                
SerializableConfiguration.newConfiguration(serializableConfiguration()));
-            checkState(!fs.exists(new Path(path())), "Output path %s already 
exists", path());
-            return null;
-          }
-        });
-      } catch (IOException | InterruptedException e) {
-        throw new RuntimeException(e);
-      }
-    }
-  }
-
-  @Override
-  public Sink.WriteOperation<T, String> createWriteOperation() {
-    return new HDFSWriteOperation<>(this, path(), formatClass());
-  }
-
-  private Job newJob() throws IOException {
-    Job job = SerializableConfiguration.newJob(serializableConfiguration());
-    job.setJobID(jobId);
-    job.setOutputKeyClass(keyClass());
-    job.setOutputValueClass(valueClass());
-    return job;
-  }
-
-  // =======================================================================
-  // WriteOperation
-  // =======================================================================
-
-  /** {{@link WriteOperation}} for HDFS. */
-  private static class HDFSWriteOperation<T, K, V> extends WriteOperation<T, 
String> {
-
-    private final HDFSFileSink<T, K, V> sink;
-    private final String path;
-    private final Class<? extends FileOutputFormat<K, V>> formatClass;
-
-    HDFSWriteOperation(HDFSFileSink<T, K, V> sink,
-                       String path,
-                       Class<? extends FileOutputFormat<K, V>> formatClass) {
-      this.sink = sink;
-      this.path = path;
-      this.formatClass = formatClass;
-    }
-
-    @Override
-    public void initialize(PipelineOptions options) throws Exception {
-      Job job = sink.newJob();
-      FileOutputFormat.setOutputPath(job, new Path(path));
-    }
-
-    @Override
-    public void setWindowedWrites(boolean windowedWrites) {
-    }
-
-    @Override
-    public void finalize(final Iterable<String> writerResults, PipelineOptions 
options)
-        throws Exception {
-      UGIHelper.getBestUGI(sink.username()).doAs(new 
PrivilegedExceptionAction<Void>() {
-        @Override
-        public Void run() throws Exception {
-          doFinalize(writerResults);
-          return null;
-        }
-      });
-    }
-
-    private void doFinalize(Iterable<String> writerResults) throws Exception {
-      Job job = sink.newJob();
-      FileSystem fs = FileSystem.get(new URI(path), job.getConfiguration());
-      // If there are 0 output shards, just create output folder.
-      if (!writerResults.iterator().hasNext()) {
-        fs.mkdirs(new Path(path));
-        return;
-      }
-
-      // job successful
-      JobContext context = new JobContextImpl(job.getConfiguration(), 
job.getJobID());
-      FileOutputCommitter outputCommitter = new FileOutputCommitter(new 
Path(path), context);
-      outputCommitter.commitJob(context);
-
-      // get actual output shards
-      Set<String> actual = Sets.newHashSet();
-      FileStatus[] statuses = fs.listStatus(new Path(path), new PathFilter() {
-        @Override
-        public boolean accept(Path path) {
-          String name = path.getName();
-          return !name.startsWith("_") && !name.startsWith(".");
-        }
-      });
-
-      // get expected output shards
-      Set<String> expected = Sets.newHashSet(writerResults);
-      checkState(
-          expected.size() == Lists.newArrayList(writerResults).size(),
-          "Data loss due to writer results hash collision");
-      for (FileStatus s : statuses) {
-        String name = s.getPath().getName();
-        int pos = name.indexOf('.');
-        actual.add(pos > 0 ? name.substring(0, pos) : name);
-      }
-
-      checkState(actual.equals(expected), "Writer results and output files do 
not match");
-
-      // rename output shards to Hadoop style, i.e. part-r-00000.txt
-      int i = 0;
-      for (FileStatus s : statuses) {
-        String name = s.getPath().getName();
-        int pos = name.indexOf('.');
-        String ext = pos > 0 ? name.substring(pos) : "";
-        fs.rename(
-            s.getPath(),
-            new Path(s.getPath().getParent(), String.format("part-r-%05d%s", 
i, ext)));
-        i++;
-      }
-    }
-
-    @Override
-    public Writer<T, String> createWriter(PipelineOptions options) throws 
Exception {
-      return new HDFSWriter<>(this, path, formatClass);
-    }
-
-    @Override
-    public Sink<T> getSink() {
-      return sink;
-    }
-
-    @Override
-    public Coder<String> getWriterResultCoder() {
-      return StringUtf8Coder.of();
-    }
-
-  }
-
-  // =======================================================================
-  // Writer
-  // =======================================================================
-
-  private static class HDFSWriter<T, K, V> extends Writer<T, String> {
-
-    private final HDFSWriteOperation<T, K, V> writeOperation;
-    private final String path;
-    private final Class<? extends FileOutputFormat<K, V>> formatClass;
-
-    // unique hash for each task
-    private int hash;
-
-    private TaskAttemptContext context;
-    private RecordWriter<K, V> recordWriter;
-    private FileOutputCommitter outputCommitter;
-
-    HDFSWriter(HDFSWriteOperation<T, K, V> writeOperation,
-               String path,
-               Class<? extends FileOutputFormat<K, V>> formatClass) {
-      this.writeOperation = writeOperation;
-      this.path = path;
-      this.formatClass = formatClass;
-    }
-
-    @Override
-    public void openWindowed(final String uId,
-                             BoundedWindow window,
-                             PaneInfo paneInfo,
-                             int shard,
-                             int numShards) throws Exception {
-      throw new UnsupportedOperationException("Windowing support not 
implemented yet for"
-          + "HDFS. Window " + window);
-    }
-
-    @Override
-    public void openUnwindowed(final String uId, int shard, int numShards) 
throws Exception {
-      UGIHelper.getBestUGI(writeOperation.sink.username()).doAs(
-          new PrivilegedExceptionAction<Void>() {
-            @Override
-            public Void run() throws Exception {
-              doOpen(uId);
-              return null;
-            }
-          }
-      );
-    }
-
-    private void doOpen(String uId) throws Exception {
-      this.hash = uId.hashCode();
-
-      Job job = writeOperation.sink.newJob();
-      FileOutputFormat.setOutputPath(job, new Path(path));
-
-      // Each Writer is responsible for writing one bundle of elements and is 
represented by one
-      // unique Hadoop task based on uId/hash. All tasks share the same job ID.
-      JobID jobId = job.getJobID();
-      TaskID taskId = new TaskID(jobId, TaskType.REDUCE, hash);
-      context = new TaskAttemptContextImpl(job.getConfiguration(), new 
TaskAttemptID(taskId, 0));
-
-      FileOutputFormat<K, V> outputFormat = formatClass.newInstance();
-      recordWriter = outputFormat.getRecordWriter(context);
-      outputCommitter = (FileOutputCommitter) 
outputFormat.getOutputCommitter(context);
-    }
-
-    @Override
-    public void write(T value) throws Exception {
-      checkNotNull(recordWriter,
-          "Record writer can't be null. Make sure to open Writer first!");
-      KV<K, V> kv = writeOperation.sink.outputConverter().apply(value);
-      recordWriter.write(kv.getKey(), kv.getValue());
-    }
-
-    @Override
-    public void cleanup() throws Exception {
-
-    }
-
-    @Override
-    public String close() throws Exception {
-      return UGIHelper.getBestUGI(writeOperation.sink.username()).doAs(
-          new PrivilegedExceptionAction<String>() {
-            @Override
-            public String run() throws Exception {
-              return doClose();
-            }
-          });
-    }
-
-    private String doClose() throws Exception {
-      // task/attempt successful
-      recordWriter.close(context);
-      outputCommitter.commitTask(context);
-
-      // result is prefix of the output file name
-      return String.format("part-r-%d", hash);
-    }
-
-    @Override
-    public WriteOperation<T, String> getWriteOperation() {
-      return writeOperation;
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/7512a73c/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
 
b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
deleted file mode 100644
index 5cc2097..0000000
--- 
a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
+++ /dev/null
@@ -1,625 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.auto.value.AutoValue;
-import com.google.common.base.Function;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.security.PrivilegedExceptionAction;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.NoSuchElementException;
-import javax.annotation.Nullable;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapreduce.AvroKeyInputFormat;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.coders.AvroCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
-import org.apache.beam.sdk.io.hadoop.WritableCoder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.values.KV;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A {@code BoundedSource} for reading files resident in a Hadoop filesystem 
using a
- * Hadoop file-based input format.
- *
- * <p>To read a {@link org.apache.beam.sdk.values.PCollection} of
- * {@link org.apache.beam.sdk.values.KV} key-value pairs from one or more
- * Hadoop files, use {@link HDFSFileSource#from} to specify the path(s) of the 
files to
- * read, the Hadoop {@link 
org.apache.hadoop.mapreduce.lib.input.FileInputFormat}, the
- * key class and the value class.
- *
- * <p>A {@code HDFSFileSource} can be read from using the
- * {@link org.apache.beam.sdk.io.Read} transform. For example:
- *
- * <pre>
- * {@code
- * HDFSFileSource<K, V> source = HDFSFileSource.from(path, MyInputFormat.class,
- *   MyKey.class, MyValue.class);
- * PCollection<KV<MyKey, MyValue>> records = 
pipeline.apply(Read.from(mySource));
- * }
- * </pre>
- *
- * <p>Implementation note: Since Hadoop's
- * {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}
- * determines the input splits, this class extends {@link BoundedSource} 
rather than
- * {@link org.apache.beam.sdk.io.OffsetBasedSource}, since the latter
- * dictates input splits.
- * @param <T> the type of elements of the result {@link 
org.apache.beam.sdk.values.PCollection}.
- * @param <K> the type of keys to be read from the source via {@link 
FileInputFormat}.
- * @param <V> the type of values to be read from the source via {@link 
FileInputFormat}.
- */
-@AutoValue
-@Experimental
-public abstract class HDFSFileSource<T, K, V> extends BoundedSource<T> {
-  private static final long serialVersionUID = 0L;
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(HDFSFileSource.class);
-
-  public abstract String filepattern();
-  public abstract Class<? extends FileInputFormat<K, V>> formatClass();
-  public abstract Coder<T> coder();
-  public abstract SerializableFunction<KV<K, V>, T> inputConverter();
-  public abstract SerializableConfiguration serializableConfiguration();
-  public @Nullable abstract SerializableSplit serializableSplit();
-  public @Nullable abstract String username();
-  public abstract boolean validateSource();
-
-  // =======================================================================
-  // Factory methods
-  // =======================================================================
-
-  public static <T, K, V, W extends FileInputFormat<K, V>> HDFSFileSource<T, 
K, V>
-  from(String filepattern,
-       Class<W> formatClass,
-       Coder<T> coder,
-       SerializableFunction<KV<K, V>, T> inputConverter) {
-    return HDFSFileSource.<T, K, V>builder()
-        .setFilepattern(filepattern)
-        .setFormatClass(formatClass)
-        .setCoder(coder)
-        .setInputConverter(inputConverter)
-        .setConfiguration(null)
-        .setUsername(null)
-        .setValidateSource(true)
-        .setSerializableSplit(null)
-        .build();
-  }
-
-  public static <K, V, W extends FileInputFormat<K, V>> HDFSFileSource<KV<K, 
V>, K, V>
-  from(String filepattern,
-       Class<W> formatClass,
-       Class<K> keyClass,
-       Class<V> valueClass) {
-    KvCoder<K, V> coder = KvCoder.of(getDefaultCoder(keyClass), 
getDefaultCoder(valueClass));
-    SerializableFunction<KV<K, V>, KV<K, V>> inputConverter =
-        new SerializableFunction<KV<K, V>, KV<K, V>>() {
-          @Override
-          public KV<K, V> apply(KV<K, V> input) {
-            return input;
-          }
-        };
-    return HDFSFileSource.<KV<K, V>, K, V>builder()
-        .setFilepattern(filepattern)
-        .setFormatClass(formatClass)
-        .setCoder(coder)
-        .setInputConverter(inputConverter)
-        .setConfiguration(null)
-        .setUsername(null)
-        .setValidateSource(true)
-        .setSerializableSplit(null)
-        .build();
-  }
-
-  public static HDFSFileSource<String, LongWritable, Text>
-  fromText(String filepattern) {
-    SerializableFunction<KV<LongWritable, Text>, String> inputConverter =
-        new SerializableFunction<KV<LongWritable, Text>, String>() {
-      @Override
-      public String apply(KV<LongWritable, Text> input) {
-        return input.getValue().toString();
-      }
-    };
-    return from(filepattern, TextInputFormat.class, StringUtf8Coder.of(), 
inputConverter);
-  }
-
-  /**
-   * Helper to read from Avro source given {@link AvroCoder}. Keep in mind 
that configuration
-   * object is altered to enable Avro input.
-   */
-  public static <T> HDFSFileSource<T, AvroKey<T>, NullWritable>
-  fromAvro(String filepattern, final AvroCoder<T> coder, Configuration conf) {
-    Class<AvroKeyInputFormat<T>> formatClass = 
castClass(AvroKeyInputFormat.class);
-    SerializableFunction<KV<AvroKey<T>, NullWritable>, T> inputConverter =
-        new SerializableFunction<KV<AvroKey<T>, NullWritable>, T>() {
-          @Override
-          public T apply(KV<AvroKey<T>, NullWritable> input) {
-            try {
-              return CoderUtils.clone(coder, input.getKey().datum());
-            } catch (CoderException e) {
-              throw new RuntimeException(e);
-            }
-          }
-        };
-    conf.set("avro.schema.input.key", coder.getSchema().toString());
-    return from(filepattern, formatClass, coder, 
inputConverter).withConfiguration(conf);
-  }
-
-  /**
-   * Helper to read from Avro source given {@link Schema}. Keep in mind that 
configuration
-   * object is altered to enable Avro input.
-   */
-  public static HDFSFileSource<GenericRecord, AvroKey<GenericRecord>, 
NullWritable>
-  fromAvro(String filepattern, Schema schema, Configuration conf) {
-    return fromAvro(filepattern, AvroCoder.of(schema), conf);
-  }
-
-  /**
-   * Helper to read from Avro source given {@link Class}. Keep in mind that 
configuration
-   * object is altered to enable Avro input.
-   */
-  public static <T> HDFSFileSource<T, AvroKey<T>, NullWritable>
-  fromAvro(String filepattern, Class<T> cls, Configuration conf) {
-    return fromAvro(filepattern, AvroCoder.of(cls), conf);
-  }
-
-  // =======================================================================
-  // Builder methods
-  // =======================================================================
-
-  public abstract HDFSFileSource.Builder<T, K, V> toBuilder();
-  public static <T, K, V> HDFSFileSource.Builder builder() {
-    return new AutoValue_HDFSFileSource.Builder<>();
-  }
-
-  /**
-   * AutoValue builder for {@link HDFSFileSource}.
-   */
-  @AutoValue.Builder
-  public abstract static class Builder<T, K, V> {
-    public abstract Builder<T, K, V> setFilepattern(String filepattern);
-    public abstract Builder<T, K, V> setFormatClass(
-        Class<? extends FileInputFormat<K, V>> formatClass);
-    public abstract Builder<T, K, V> setCoder(Coder<T> coder);
-    public abstract Builder<T, K, V> setInputConverter(
-        SerializableFunction<KV<K, V>, T> inputConverter);
-    public abstract Builder<T, K, V> setSerializableConfiguration(
-        SerializableConfiguration serializableConfiguration);
-    public Builder<T, K, V> setConfiguration(Configuration configuration) {
-      if (configuration == null) {
-        configuration = new Configuration(false);
-      }
-      return this.setSerializableConfiguration(new 
SerializableConfiguration(configuration));
-    }
-    public abstract Builder<T, K, V> setSerializableSplit(SerializableSplit 
serializableSplit);
-    public abstract Builder<T, K, V> setUsername(@Nullable String username);
-    public abstract Builder<T, K, V> setValidateSource(boolean validate);
-    public abstract HDFSFileSource<T, K, V> build();
-  }
-
-  public HDFSFileSource<T, K, V> withConfiguration(@Nullable Configuration 
configuration) {
-    return this.toBuilder().setConfiguration(configuration).build();
-  }
-
-  public HDFSFileSource<T, K, V> withUsername(@Nullable String username) {
-    return this.toBuilder().setUsername(username).build();
-  }
-
-  // =======================================================================
-  // BoundedSource
-  // =======================================================================
-
-  @Override
-  public List<? extends BoundedSource<T>> split(
-      final long desiredBundleSizeBytes,
-      PipelineOptions options) throws Exception {
-    if (serializableSplit() == null) {
-      List<InputSplit> inputSplits = UGIHelper.getBestUGI(username()).doAs(
-          new PrivilegedExceptionAction<List<InputSplit>>() {
-            @Override
-            public List<InputSplit> run() throws Exception {
-              return computeSplits(desiredBundleSizeBytes, 
serializableConfiguration());
-            }
-          });
-      return Lists.transform(inputSplits,
-          new Function<InputSplit, BoundedSource<T>>() {
-            @Override
-            public BoundedSource<T> apply(@Nullable InputSplit inputSplit) {
-              SerializableSplit serializableSplit = new 
SerializableSplit(inputSplit);
-              return HDFSFileSource.this.toBuilder()
-                  .setSerializableSplit(serializableSplit)
-                  .build();
-            }
-          });
-    } else {
-      return ImmutableList.of(this);
-    }
-  }
-
-  @Override
-  public long getEstimatedSizeBytes(PipelineOptions options) {
-    long size = 0;
-
-    try {
-      // If this source represents a split from split,
-      // then return the size of the split, rather then the entire input
-      if (serializableSplit() != null) {
-        return serializableSplit().getSplit().getLength();
-      }
-
-      size += UGIHelper.getBestUGI(username()).doAs(new 
PrivilegedExceptionAction<Long>() {
-        @Override
-        public Long run() throws Exception {
-          long size = 0;
-          Job job = 
SerializableConfiguration.newJob(serializableConfiguration());
-          for (FileStatus st : listStatus(createFormat(job), job)) {
-            size += st.getLen();
-          }
-          return size;
-        }
-      });
-    } catch (IOException e) {
-      LOG.warn(
-          "Will estimate size of input to be 0 bytes. Can't estimate size of 
the input due to:", e);
-      // ignore, and return 0
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      LOG.warn(
-          "Will estimate size of input to be 0 bytes. Can't estimate size of 
the input due to:", e);
-      // ignore, and return 0
-    }
-    return size;
-  }
-
-  @Override
-  public BoundedReader<T> createReader(PipelineOptions options) throws 
IOException {
-    this.validate();
-    return new HDFSFileReader<>(this, filepattern(), formatClass(), 
serializableSplit());
-  }
-
-  @Override
-  public void validate() {
-    if (validateSource()) {
-      try {
-        UGIHelper.getBestUGI(username()).doAs(new 
PrivilegedExceptionAction<Void>() {
-              @Override
-              public Void run() throws Exception {
-                final Path pathPattern = new Path(filepattern());
-                FileSystem fs = FileSystem.get(pathPattern.toUri(),
-                    
SerializableConfiguration.newConfiguration(serializableConfiguration()));
-                FileStatus[] fileStatuses = fs.globStatus(pathPattern);
-                checkState(
-                    fileStatuses != null && fileStatuses.length > 0,
-                    "Unable to find any files matching %s", filepattern());
-                  return null;
-                }
-              });
-      } catch (IOException | InterruptedException e) {
-        throw new RuntimeException(e);
-      }
-    }
-  }
-
-  @Override
-  public Coder<T> getDefaultOutputCoder() {
-    return coder();
-  }
-
-  // =======================================================================
-  // Helpers
-  // =======================================================================
-
-  private List<InputSplit> computeSplits(long desiredBundleSizeBytes,
-                                         SerializableConfiguration 
serializableConfiguration)
-      throws IOException, IllegalAccessException, InstantiationException {
-    Job job = SerializableConfiguration.newJob(serializableConfiguration);
-    FileInputFormat.setMinInputSplitSize(job, desiredBundleSizeBytes);
-    FileInputFormat.setMaxInputSplitSize(job, desiredBundleSizeBytes);
-    return createFormat(job).getSplits(job);
-  }
-
-  private FileInputFormat<K, V> createFormat(Job job)
-      throws IOException, IllegalAccessException, InstantiationException {
-    Path path = new Path(filepattern());
-    FileInputFormat.addInputPath(job, path);
-    return formatClass().newInstance();
-  }
-
-  private List<FileStatus> listStatus(FileInputFormat<K, V> format, Job job)
-      throws NoSuchMethodException, InvocationTargetException, 
IllegalAccessException {
-    // FileInputFormat#listStatus is protected, so call using reflection
-    Method listStatus = FileInputFormat.class.getDeclaredMethod("listStatus", 
JobContext.class);
-    listStatus.setAccessible(true);
-    @SuppressWarnings("unchecked")
-    List<FileStatus> stat = (List<FileStatus>) listStatus.invoke(format, job);
-    return stat;
-  }
-
-  @SuppressWarnings("unchecked")
-  private static <T> Coder<T> getDefaultCoder(Class<T> c) {
-    if (Writable.class.isAssignableFrom(c)) {
-      Class<? extends Writable> writableClass = (Class<? extends Writable>) c;
-      return (Coder<T>) WritableCoder.of(writableClass);
-    } else if (Void.class.equals(c)) {
-      return (Coder<T>) VoidCoder.of();
-    }
-    // TODO: how to use registered coders here?
-    throw new IllegalStateException("Cannot find coder for " + c);
-  }
-
-  @SuppressWarnings("unchecked")
-  private static <T> Class<T> castClass(Class<?> aClass) {
-    return (Class<T>) aClass;
-  }
-
-  // =======================================================================
-  // BoundedReader
-  // =======================================================================
-
-  private static class HDFSFileReader<T, K, V> extends 
BoundedSource.BoundedReader<T> {
-
-    private final HDFSFileSource<T, K, V> source;
-    private final String filepattern;
-    private final Class<? extends FileInputFormat<K, V>> formatClass;
-    private final Job job;
-
-    private List<InputSplit> splits;
-    private ListIterator<InputSplit> splitsIterator;
-
-    private Configuration conf;
-    private FileInputFormat<?, ?> format;
-    private TaskAttemptContext attemptContext;
-    private RecordReader<K, V> currentReader;
-    private KV<K, V> currentPair;
-
-    HDFSFileReader(
-        HDFSFileSource<T, K, V> source,
-        String filepattern,
-        Class<? extends FileInputFormat<K, V>> formatClass,
-        SerializableSplit serializableSplit)
-        throws IOException {
-      this.source = source;
-      this.filepattern = filepattern;
-      this.formatClass = formatClass;
-      this.job = 
SerializableConfiguration.newJob(source.serializableConfiguration());
-
-      if (serializableSplit != null) {
-        this.splits = ImmutableList.of(serializableSplit.getSplit());
-        this.splitsIterator = splits.listIterator();
-      }
-    }
-
-    @Override
-    public boolean start() throws IOException {
-      Path path = new Path(filepattern);
-      FileInputFormat.addInputPath(job, path);
-
-      conf = job.getConfiguration();
-      try {
-        format = formatClass.newInstance();
-      } catch (InstantiationException | IllegalAccessException e) {
-        throw new IOException("Cannot instantiate file input format " + 
formatClass, e);
-      }
-      attemptContext = new TaskAttemptContextImpl(conf, new TaskAttemptID());
-
-      if (splitsIterator == null) {
-        splits = format.getSplits(job);
-        splitsIterator = splits.listIterator();
-      }
-
-      return advance();
-    }
-
-    @Override
-    public boolean advance() throws IOException {
-      try {
-        if (currentReader != null && currentReader.nextKeyValue()) {
-          currentPair = nextPair();
-          return true;
-        } else {
-          while (splitsIterator.hasNext()) {
-            // advance the reader and see if it has records
-            final InputSplit nextSplit = splitsIterator.next();
-            @SuppressWarnings("unchecked")
-            RecordReader<K, V> reader =
-                (RecordReader<K, V>) format.createRecordReader(nextSplit, 
attemptContext);
-            if (currentReader != null) {
-              currentReader.close();
-            }
-            currentReader = reader;
-            UGIHelper.getBestUGI(source.username()).doAs(new 
PrivilegedExceptionAction<Void>() {
-              @Override
-              public Void run() throws Exception {
-                currentReader.initialize(nextSplit, attemptContext);
-                return null;
-              }
-            });
-            if (currentReader.nextKeyValue()) {
-              currentPair = nextPair();
-              return true;
-            }
-            currentReader.close();
-            currentReader = null;
-          }
-          // either no next split or all readers were empty
-          currentPair = null;
-          return false;
-        }
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new IOException(e);
-      }
-    }
-
-    @Override
-    public T getCurrent() throws NoSuchElementException {
-      if (currentPair == null) {
-        throw new NoSuchElementException();
-      }
-      return source.inputConverter().apply(currentPair);
-    }
-
-    @Override
-    public void close() throws IOException {
-      if (currentReader != null) {
-        currentReader.close();
-        currentReader = null;
-      }
-      currentPair = null;
-    }
-
-    @Override
-    public BoundedSource<T> getCurrentSource() {
-      return source;
-    }
-
-    @SuppressWarnings("unchecked")
-    private KV<K, V> nextPair() throws IOException, InterruptedException {
-      K key = currentReader.getCurrentKey();
-      V value = currentReader.getCurrentValue();
-      // clone Writable objects since they are reused between calls to 
RecordReader#nextKeyValue
-      if (key instanceof Writable) {
-        key = (K) WritableUtils.clone((Writable) key, conf);
-      }
-      if (value instanceof Writable) {
-        value = (V) WritableUtils.clone((Writable) value, conf);
-      }
-      return KV.of(key, value);
-    }
-
-    // =======================================================================
-    // Optional overrides
-    // =======================================================================
-
-    @Override
-    public Double getFractionConsumed() {
-      if (currentReader == null) {
-        return 0.0;
-      }
-      if (splits.isEmpty()) {
-        return 1.0;
-      }
-      int index = splitsIterator.previousIndex();
-      int numReaders = splits.size();
-      if (index == numReaders) {
-        return 1.0;
-      }
-      double before = 1.0 * index / numReaders;
-      double after = 1.0 * (index + 1) / numReaders;
-      Double fractionOfCurrentReader = getProgress();
-      if (fractionOfCurrentReader == null) {
-        return before;
-      }
-      return before + fractionOfCurrentReader * (after - before);
-    }
-
-    private Double getProgress() {
-      try {
-        return (double) currentReader.getProgress();
-      } catch (IOException | InterruptedException e) {
-        return null;
-      }
-    }
-
-  }
-
-  // =======================================================================
-  // SerializableSplit
-  // =======================================================================
-
-  /**
-   * A wrapper to allow Hadoop {@link org.apache.hadoop.mapreduce.InputSplit}s 
to be
-   * serialized using Java's standard serialization mechanisms. Note that the 
InputSplit
-   * has to be Writable (which most are).
-   */
-  protected static class SerializableSplit implements Externalizable {
-    private static final long serialVersionUID = 0L;
-
-    private InputSplit split;
-
-    public SerializableSplit() {
-    }
-
-    public SerializableSplit(InputSplit split) {
-      checkArgument(split instanceof Writable, "Split is not writable: %s", 
split);
-      this.split = split;
-    }
-
-    public InputSplit getSplit() {
-      return split;
-    }
-
-    @Override
-    public void writeExternal(ObjectOutput out) throws IOException {
-      out.writeUTF(split.getClass().getCanonicalName());
-      ((Writable) split).write(out);
-    }
-
-    @Override
-    public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
-      String className = in.readUTF();
-      try {
-        split = (InputSplit) Class.forName(className).newInstance();
-        ((Writable) split).readFields(in);
-      } catch (InstantiationException | IllegalAccessException e) {
-        throw new IOException(e);
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/7512a73c/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java
 
b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java
deleted file mode 100644
index fe2db5f..0000000
--- 
a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import java.io.Serializable;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.HasDisplayData;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-
-/**
- * This class is deprecated, and only exists for HDFSFileSink.
- */
-@Deprecated
-public abstract class Sink<T> implements Serializable, HasDisplayData {
-  /**
-   * Ensures that the sink is valid and can be written to before the write 
operation begins. One
-   * should use {@link com.google.common.base.Preconditions} to implement this 
method.
-   */
-  public abstract void validate(PipelineOptions options);
-
-  /**
-   * Returns an instance of a {@link WriteOperation} that can write to this 
Sink.
-   */
-  public abstract WriteOperation<T, ?> createWriteOperation();
-
-  /**
-   * {@inheritDoc}
-   *
-   * <p>By default, does not register any display data. Implementors may 
override this method
-   * to provide their own display data.
-   */
-  @Override
-  public void populateDisplayData(DisplayData.Builder builder) {}
-
-  /**
-   * A {@link WriteOperation} defines the process of a parallel write of 
objects to a Sink.
-   *
-   * <p>The {@code WriteOperation} defines how to perform initialization and 
finalization of a
-   * parallel write to a sink as well as how to create a {@link Sink.Writer} 
object that can write
-   * a bundle to the sink.
-   *
-   * <p>Since operations in Beam may be run multiple times for redundancy or 
fault-tolerance,
-   * the initialization and finalization defined by a WriteOperation <b>must 
be idempotent</b>.
-   *
-   * <p>{@code WriteOperation}s may be mutable; a {@code WriteOperation} is 
serialized after the
-   * call to {@code initialize} method and deserialized before calls to
-   * {@code createWriter} and {@code finalized}. However, it is not
-   * reserialized after {@code createWriter}, so {@code createWriter} should 
not mutate the
-   * state of the {@code WriteOperation}.
-   *
-   * <p>See {@link Sink} for more detailed documentation about the process of 
writing to a Sink.
-   *
-   * @param <T> The type of objects to write
-   * @param <WriteT> The result of a per-bundle write
-   */
-  public abstract static class WriteOperation<T, WriteT> implements 
Serializable {
-    /**
-     * Performs initialization before writing to the sink. Called before 
writing begins.
-     */
-    public abstract void initialize(PipelineOptions options) throws Exception;
-
-    /**
-     * Indicates that the operation will be performing windowed writes.
-     */
-    public abstract void setWindowedWrites(boolean windowedWrites);
-
-    /**
-     * Given an Iterable of results from bundle writes, performs finalization 
after writing and
-     * closes the sink. Called after all bundle writes are complete.
-     *
-     * <p>The results that are passed to finalize are those returned by 
bundles that completed
-     * successfully. Although bundles may have been run multiple times (for 
fault-tolerance), only
-     * one writer result will be passed to finalize for each bundle. An 
implementation of finalize
-     * should perform clean up of any failed and successfully retried bundles. 
 Note that these
-     * failed bundles will not have their writer result passed to finalize, so 
finalize should be
-     * capable of locating any temporary/partial output written by failed 
bundles.
-     *
-     * <p>A best practice is to make finalize atomic. If this is impossible 
given the semantics
-     * of the sink, finalize should be idempotent, as it may be called 
multiple times in the case of
-     * failure/retry or for redundancy.
-     *
-     * <p>Note that the iteration order of the writer results is not 
guaranteed to be consistent if
-     * finalize is called multiple times.
-     *
-     * @param writerResults an Iterable of results from successful bundle 
writes.
-     */
-    public abstract void finalize(Iterable<WriteT> writerResults, 
PipelineOptions options)
-        throws Exception;
-
-    /**
-     * Creates a new {@link Sink.Writer} to write a bundle of the input to the 
sink.
-     *
-     * <p>The bundle id that the writer will use to uniquely identify its 
output will be passed to
-     * {@link Writer#openWindowed} or {@link Writer#openUnwindowed}.
-     *
-     * <p>Must not mutate the state of the WriteOperation.
-     */
-    public abstract Writer<T, WriteT> createWriter(PipelineOptions options) 
throws Exception;
-
-    /**
-     * Returns the Sink that this write operation writes to.
-     */
-    public abstract Sink<T> getSink();
-
-    /**
-     * Returns a coder for the writer result type.
-     */
-    public abstract Coder<WriteT> getWriterResultCoder();
-  }
-
-  /**
-   * A Writer writes a bundle of elements from a PCollection to a sink.
-   * {@link Writer#openWindowed} or {@link Writer#openUnwindowed} is called 
before writing begins
-   * and {@link Writer#close} is called after all elements in the bundle have 
been written.
-   * {@link Writer#write} writes an element to the sink.
-   *
-   * <p>Note that any access to static members or methods of a Writer must be 
thread-safe, as
-   * multiple instances of a Writer may be instantiated in different threads 
on the same worker.
-   *
-   * <p>See {@link Sink} for more detailed documentation about the process of 
writing to a Sink.
-   *
-   * @param <T> The type of object to write
-   * @param <WriteT> The writer results type (e.g., the bundle's output 
filename, as String)
-   */
-  public abstract static class Writer<T, WriteT> {
-    /**
-     * Performs bundle initialization. For example, creates a temporary file 
for writing or
-     * initializes any state that will be used across calls to {@link 
Writer#write}.
-     *
-     * <p>The unique id that is given to open should be used to ensure that 
the writer's output does
-     * not interfere with the output of other Writers, as a bundle may be 
executed many times for
-     * fault tolerance. See {@link Sink} for more information about bundle ids.
-     *
-     * <p>The window and paneInfo arguments are populated when windowed writes 
are requested.
-     * shard and numbShards are populated for the case of static sharding. In 
cases where the
-     * runner is dynamically picking sharding, shard and numShards might both 
be set to -1.
-     */
-    public abstract void openWindowed(String uId,
-                                      BoundedWindow window,
-                                      PaneInfo paneInfo,
-                                      int shard,
-                                      int numShards) throws Exception;
-
-    /**
-     * Perform bundle initialization for the case where the file is written 
unwindowed.
-     */
-    public abstract void openUnwindowed(String uId,
-                                        int shard,
-                                        int numShards) throws Exception;
-
-    public abstract void cleanup() throws Exception;
-
-    /**
-     * Called for each value in the bundle.
-     */
-    public abstract void write(T value) throws Exception;
-
-    /**
-     * Finishes writing the bundle. Closes any resources used for writing the 
bundle.
-     *
-     * <p>Returns a writer result that will be used in the {@link 
Sink.WriteOperation}'s
-     * finalization. The result should contain some way to identify the output 
of this bundle (using
-     * the bundle id). {@link WriteOperation#finalize} will use the writer 
result to identify
-     * successful writes. See {@link Sink} for more information about bundle 
ids.
-     *
-     * @return the writer result
-     */
-    public abstract WriteT close() throws Exception;
-
-    /**
-     * Returns the write operation this writer belongs to.
-     */
-    public abstract WriteOperation<T, WriteT> getWriteOperation();
-
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/7512a73c/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/UGIHelper.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/UGIHelper.java
 
b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/UGIHelper.java
deleted file mode 100644
index fd05a19..0000000
--- 
a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/UGIHelper.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import java.io.IOException;
-import javax.annotation.Nullable;
-import org.apache.hadoop.security.UserGroupInformation;
-
-/**
- * {@link UserGroupInformation} helper methods.
- */
-public class UGIHelper {
-
-  /**
-   * Find the most appropriate UserGroupInformation to use.
-   * @param username the user name, or NULL if none is specified.
-   * @return the most appropriate UserGroupInformation
-   */
-  public static UserGroupInformation getBestUGI(@Nullable String username) 
throws IOException {
-    return UserGroupInformation.getBestUGI(null, username);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/7512a73c/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java
 
b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java
deleted file mode 100644
index ef6556e..0000000
--- 
a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java
+++ /dev/null
@@ -1,588 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.ThreadLocalRandom;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.SerializableCoder;
-import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.hdfs.Sink.WriteOperation;
-import org.apache.beam.sdk.io.hdfs.Sink.Writer;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.transforms.WithKeys;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollection.IsBounded;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PDone;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class is deprecated, and only exists currently for HDFSFileSink.
- */
-@Deprecated
-public class Write<T> extends PTransform<PCollection<T>, PDone> {
-  private static final Logger LOG = LoggerFactory.getLogger(Write.class);
-
-  private static final int UNKNOWN_SHARDNUM = -1;
-  private static final int UNKNOWN_NUMSHARDS = -1;
-
-  private final Sink<T> sink;
-  // This allows the number of shards to be dynamically computed based on the 
input
-  // PCollection.
-  @Nullable
-  private final PTransform<PCollection<T>, PCollectionView<Integer>> 
computeNumShards;
-  // We don't use a side input for static sharding, as we want this value to 
be updatable
-  // when a pipeline is updated.
-  @Nullable
-  private final ValueProvider<Integer> numShardsProvider;
-  private boolean windowedWrites;
-
-  /**
-   * Creates a {@link Write} transform that writes to the given {@link Sink}, 
letting the runner
-   * control how many different shards are produced.
-   */
-  public static <T> Write<T> to(Sink<T> sink) {
-    checkNotNull(sink, "sink");
-    return new Write<>(sink, null /* runner-determined sharding */, null, 
false);
-  }
-
-  private Write(
-      Sink<T> sink,
-      @Nullable PTransform<PCollection<T>, PCollectionView<Integer>> 
computeNumShards,
-      @Nullable ValueProvider<Integer> numShardsProvider,
-      boolean windowedWrites) {
-    this.sink = sink;
-    this.computeNumShards = computeNumShards;
-    this.numShardsProvider = numShardsProvider;
-    this.windowedWrites = windowedWrites;
-  }
-
-  @Override
-  public PDone expand(PCollection<T> input) {
-    checkArgument(IsBounded.BOUNDED == input.isBounded() || windowedWrites,
-        "%s can only be applied to an unbounded PCollection if doing windowed 
writes",
-        Write.class.getSimpleName());
-    return createWrite(input, sink.createWriteOperation());
-  }
-
-  @Override
-  public void validate(PipelineOptions options) {
-    sink.validate(options);
-  }
-
-  @Override
-  public void populateDisplayData(DisplayData.Builder builder) {
-    super.populateDisplayData(builder);
-    builder
-        .add(DisplayData.item("sink", sink.getClass()).withLabel("Write Sink"))
-        .include("sink", sink);
-    if (getSharding() != null) {
-      builder.include("sharding", getSharding());
-    } else if (getNumShards() != null) {
-      String numShards = getNumShards().isAccessible()
-          ? getNumShards().get().toString() : getNumShards().toString();
-      builder.add(DisplayData.item("numShards", numShards)
-          .withLabel("Fixed Number of Shards"));
-    }
-  }
-
-  /**
-   * Returns the {@link Sink} associated with this PTransform.
-   */
-  public Sink<T> getSink() {
-    return sink;
-  }
-
-  /**
-   * Gets the {@link PTransform} that will be used to determine sharding. This 
can be either a
-   * static number of shards (as following a call to {@link 
#withNumShards(int)}), dynamic (by
-   * {@link #withSharding(PTransform)}), or runner-determined (by {@link
-   * #withRunnerDeterminedSharding()}.
-   */
-  @Nullable
-  public PTransform<PCollection<T>, PCollectionView<Integer>> getSharding() {
-    return computeNumShards;
-  }
-
-  public ValueProvider<Integer> getNumShards() {
-    return numShardsProvider;
-  }
-
-  /**
-   * Returns a new {@link Write} that will write to the current {@link Sink} 
using the
-   * specified number of shards.
-   *
-   * <p>This option should be used sparingly as it can hurt performance. See 
{@link Write} for
-   * more information.
-   *
-   * <p>A value less than or equal to 0 will be equivalent to the default 
behavior of
-   * runner-determined sharding.
-   */
-  public Write<T> withNumShards(int numShards) {
-    if (numShards > 0) {
-      return withNumShards(StaticValueProvider.of(numShards));
-    }
-    return withRunnerDeterminedSharding();
-  }
-
-  /**
-   * Returns a new {@link Write} that will write to the current {@link Sink} 
using the
-   * {@link ValueProvider} specified number of shards.
-   *
-   * <p>This option should be used sparingly as it can hurt performance. See 
{@link Write} for
-   * more information.
-   */
-  public Write<T> withNumShards(ValueProvider<Integer> numShardsProvider) {
-    return new Write<>(sink, null, numShardsProvider, windowedWrites);
-  }
-
-  /**
-   * Returns a new {@link Write} that will write to the current {@link Sink} 
using the
-   * specified {@link PTransform} to compute the number of shards.
-   *
-   * <p>This option should be used sparingly as it can hurt performance. See 
{@link Write} for
-   * more information.
-   */
-  public Write<T> withSharding(PTransform<PCollection<T>, 
PCollectionView<Integer>> sharding) {
-    checkNotNull(
-        sharding, "Cannot provide null sharding. Use 
withRunnerDeterminedSharding() instead");
-    return new Write<>(sink, sharding, null, windowedWrites);
-  }
-
-  /**
-   * Returns a new {@link Write} that will write to the current {@link Sink} 
with
-   * runner-determined sharding.
-   */
-  public Write<T> withRunnerDeterminedSharding() {
-    return new Write<>(sink, null, null, windowedWrites);
-  }
-
-  /**
-   * Returns a new {@link Write} that writes preserves windowing on it's input.
-   *
-   * <p>If this option is not specified, windowing and triggering are replaced 
by
-   * {@link GlobalWindows} and {@link DefaultTrigger}.
-   *
-   * <p>If there is no data for a window, no output shards will be generated 
for that window.
-   * If a window triggers multiple times, then more than a single output shard 
might be
-   * generated multiple times; it's up to the sink implementation to keep 
these output shards
-   * unique.
-   *
-   * <p>This option can only be used if {@link #withNumShards(int)} is also 
set to a
-   * positive value.
-   */
-  public Write<T> withWindowedWrites() {
-    return new Write<>(sink, computeNumShards, numShardsProvider, true);
-  }
-
-  /**
-   * Writes all the elements in a bundle using a {@link Writer} produced by the
-   * {@link WriteOperation} associated with the {@link Sink}.
-   */
-  private class WriteBundles<WriteT> extends DoFn<T, WriteT> {
-    // Writer that will write the records in this bundle. Lazily
-    // initialized in processElement.
-    private Writer<T, WriteT> writer = null;
-    private BoundedWindow window;
-    private final PCollectionView<WriteOperation<T, WriteT>> 
writeOperationView;
-
-    WriteBundles(PCollectionView<WriteOperation<T, WriteT>> 
writeOperationView) {
-      this.writeOperationView = writeOperationView;
-    }
-
-    @ProcessElement
-    public void processElement(ProcessContext c, BoundedWindow window) throws 
Exception {
-      // Lazily initialize the Writer
-      if (writer == null) {
-        WriteOperation<T, WriteT> writeOperation = 
c.sideInput(writeOperationView);
-        LOG.info("Opening writer for write operation {}", writeOperation);
-        writer = writeOperation.createWriter(c.getPipelineOptions());
-
-        if (windowedWrites) {
-          writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), 
UNKNOWN_SHARDNUM,
-              UNKNOWN_NUMSHARDS);
-        } else {
-          writer.openUnwindowed(UUID.randomUUID().toString(), 
UNKNOWN_SHARDNUM, UNKNOWN_NUMSHARDS);
-        }
-        this.window = window;
-        LOG.debug("Done opening writer {} for operation {}", writer, 
writeOperationView);
-      }
-      try {
-        writer.write(c.element());
-      } catch (Exception e) {
-        // Discard write result and close the write.
-        try {
-          writer.close();
-          // The writer does not need to be reset, as this DoFn cannot be 
reused.
-        } catch (Exception closeException) {
-          if (closeException instanceof InterruptedException) {
-            // Do not silently ignore interrupted state.
-            Thread.currentThread().interrupt();
-          }
-          // Do not mask the exception that caused the write to fail.
-          e.addSuppressed(closeException);
-        }
-        throw e;
-      }
-    }
-
-    @FinishBundle
-    public void finishBundle(FinishBundleContext c) throws Exception {
-      if (writer != null) {
-        WriteT result = writer.close();
-        c.output(result, window.maxTimestamp(), window);
-        // Reset state in case of reuse.
-        writer = null;
-        window = null;
-      }
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      builder.delegate(Write.this);
-    }
-  }
-
-  /**
-   * Like {@link WriteBundles}, but where the elements for each shard have 
been collected into
-   * a single iterable.
-   *
-   * @see WriteBundles
-   */
-  private class WriteShardedBundles<WriteT> extends DoFn<KV<Integer, 
Iterable<T>>, WriteT> {
-    private final PCollectionView<WriteOperation<T, WriteT>> 
writeOperationView;
-    private final PCollectionView<Integer> numShardsView;
-
-    WriteShardedBundles(PCollectionView<WriteOperation<T, WriteT>> 
writeOperationView,
-                        PCollectionView<Integer> numShardsView) {
-      this.writeOperationView = writeOperationView;
-      this.numShardsView = numShardsView;
-    }
-
-    @ProcessElement
-    public void processElement(ProcessContext c, BoundedWindow window) throws 
Exception {
-      int numShards = numShardsView != null ? c.sideInput(numShardsView) : 
getNumShards().get();
-      // In a sharded write, single input element represents one shard. We can 
open and close
-      // the writer in each call to processElement.
-      WriteOperation<T, WriteT> writeOperation = 
c.sideInput(writeOperationView);
-      LOG.info("Opening writer for write operation {}", writeOperation);
-      Writer<T, WriteT> writer = 
writeOperation.createWriter(c.getPipelineOptions());
-      if (windowedWrites) {
-        writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), 
c.element().getKey(),
-            numShards);
-      } else {
-        writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, 
UNKNOWN_NUMSHARDS);
-      }
-      LOG.debug("Done opening writer {} for operation {}", writer, 
writeOperationView);
-
-      try {
-        try {
-          for (T t : c.element().getValue()) {
-            writer.write(t);
-          }
-        } catch (Exception e) {
-          try {
-            writer.close();
-          } catch (Exception closeException) {
-            if (closeException instanceof InterruptedException) {
-              // Do not silently ignore interrupted state.
-              Thread.currentThread().interrupt();
-            }
-            // Do not mask the exception that caused the write to fail.
-            e.addSuppressed(closeException);
-          }
-          throw e;
-        }
-
-        // Close the writer; if this throws let the error propagate.
-        WriteT result = writer.close();
-        c.output(result);
-      } catch (Exception e) {
-        // If anything goes wrong, make sure to delete the temporary file.
-        writer.cleanup();
-        throw e;
-      }
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      builder.delegate(Write.this);
-    }
-  }
-
-  private static class ApplyShardingKey<T> extends DoFn<T, KV<Integer, T>> {
-    private final PCollectionView<Integer> numShardsView;
-    private final ValueProvider<Integer> numShardsProvider;
-    private int shardNumber;
-
-    ApplyShardingKey(PCollectionView<Integer> numShardsView,
-                     ValueProvider<Integer> numShardsProvider) {
-      this.numShardsView = numShardsView;
-      this.numShardsProvider = numShardsProvider;
-      shardNumber = UNKNOWN_SHARDNUM;
-    }
-
-    @ProcessElement
-    public void processElement(ProcessContext context) {
-      int shardCount = 0;
-      if (numShardsView != null) {
-        shardCount = context.sideInput(numShardsView);
-      } else {
-        checkNotNull(numShardsProvider);
-        shardCount = numShardsProvider.get();
-      }
-      checkArgument(
-          shardCount > 0,
-          "Must have a positive number of shards specified for 
non-runner-determined sharding."
-              + " Got %s",
-          shardCount);
-      if (shardNumber == UNKNOWN_SHARDNUM) {
-        // We want to desynchronize the first record sharding key for each 
instance of
-        // ApplyShardingKey, so records in a small PCollection will be 
statistically balanced.
-        shardNumber = ThreadLocalRandom.current().nextInt(shardCount);
-      } else {
-        shardNumber = (shardNumber + 1) % shardCount;
-      }
-      context.output(KV.of(shardNumber, context.element()));
-    }
-  }
-
-  /**
-   * A write is performed as sequence of three {@link ParDo}'s.
-   *
-   * <p>In the first, a do-once ParDo is applied to a singleton PCollection 
containing the Sink's
-   * {@link WriteOperation}. In this initialization ParDo, {@link 
WriteOperation#initialize} is
-   * called. The output of this ParDo is a singleton PCollection
-   * containing the WriteOperation.
-   *
-   * <p>This singleton collection containing the WriteOperation is then used 
as a side input to a
-   * ParDo over the PCollection of elements to write. In this bundle-writing 
phase,
-   * {@link WriteOperation#createWriter} is called to obtain a {@link Writer}.
-   * {@link Writer#open} and {@link Writer#close} are called in {@link 
DoFn#startBundle} and
-   * {@link DoFn#finishBundle}, respectively, and {@link Writer#write} method 
is called for
-   * every element in the bundle. The output of this ParDo is a PCollection of
-   * <i>writer result</i> objects (see {@link Sink} for a description of 
writer results)-one for
-   * each bundle.
-   *
-   * <p>The final do-once ParDo uses the singleton collection of the 
WriteOperation as input and
-   * the collection of writer results as a side-input. In this ParDo,
-   * {@link WriteOperation#finalize} is called to finalize the write.
-   *
-   * <p>If the write of any element in the PCollection fails, {@link 
Writer#close} will be called
-   * before the exception that caused the write to fail is propagated and the 
write result will be
-   * discarded.
-   *
-   * <p>Since the {@link WriteOperation} is serialized after the 
initialization ParDo and
-   * deserialized in the bundle-writing and finalization phases, any state 
change to the
-   * WriteOperation object that occurs during initialization is visible in the 
latter phases.
-   * However, the WriteOperation is not serialized after the bundle-writing 
phase.  This is why
-   * implementations should guarantee that {@link WriteOperation#createWriter} 
does not mutate
-   * WriteOperation).
-   */
-  private <WriteT> PDone createWrite(
-      PCollection<T> input, WriteOperation<T, WriteT> writeOperation) {
-    Pipeline p = input.getPipeline();
-    writeOperation.setWindowedWrites(windowedWrites);
-
-    // A coder to use for the WriteOperation.
-    @SuppressWarnings("unchecked")
-    Coder<WriteOperation<T, WriteT>> operationCoder =
-        (Coder<WriteOperation<T, WriteT>>) 
SerializableCoder.of(writeOperation.getClass());
-
-    // A singleton collection of the WriteOperation, to be used as input to a 
ParDo to initialize
-    // the sink.
-    PCollection<WriteOperation<T, WriteT>> operationCollection =
-        p.apply("CreateOperationCollection", 
Create.of(writeOperation).withCoder(operationCoder));
-
-    // Initialize the resource in a do-once ParDo on the WriteOperation.
-    operationCollection = operationCollection
-        .apply("Initialize", ParDo.of(
-            new DoFn<WriteOperation<T, WriteT>, WriteOperation<T, WriteT>>() {
-              @ProcessElement
-              public void processElement(ProcessContext c) throws Exception {
-                WriteOperation<T, WriteT> writeOperation = c.element();
-                LOG.info("Initializing write operation {}", writeOperation);
-                writeOperation.initialize(c.getPipelineOptions());
-                writeOperation.setWindowedWrites(windowedWrites);
-                LOG.debug("Done initializing write operation {}", 
writeOperation);
-                // The WriteOperation is also the output of this ParDo, so it 
can have mutable
-                // state.
-                c.output(writeOperation);
-              }
-            }))
-        .setCoder(operationCoder);
-
-    // Create a view of the WriteOperation to be used as a sideInput to the 
parallel write phase.
-    final PCollectionView<WriteOperation<T, WriteT>> writeOperationView =
-        operationCollection.apply(View.<WriteOperation<T, 
WriteT>>asSingleton());
-
-    if (!windowedWrites) {
-      // Re-window the data into the global window and remove any existing 
triggers.
-      input =
-          input.apply(
-              Window.<T>into(new GlobalWindows())
-                  .triggering(DefaultTrigger.of())
-                  .discardingFiredPanes());
-    }
-
-
-    // Perform the per-bundle writes as a ParDo on the input PCollection (with 
the WriteOperation
-    // as a side input) and collect the results of the writes in a PCollection.
-    // There is a dependency between this ParDo and the first (the 
WriteOperation PCollection
-    // as a side input), so this will happen after the initial ParDo.
-    PCollection<WriteT> results;
-    final PCollectionView<Integer> numShardsView;
-    if (computeNumShards == null && numShardsProvider == null) {
-      if (windowedWrites) {
-        throw new IllegalStateException("When doing windowed writes, numShards 
must be set"
-            + "explicitly to a positive value");
-      }
-      numShardsView = null;
-      results = input
-          .apply("WriteBundles",
-              ParDo.of(new WriteBundles<>(writeOperationView))
-                  .withSideInputs(writeOperationView));
-    } else {
-      if (computeNumShards != null) {
-        numShardsView = input.apply(computeNumShards);
-        results  = input
-            .apply("ApplyShardLabel", ParDo.of(
-                new ApplyShardingKey<T>(numShardsView, 
null)).withSideInputs(numShardsView))
-            .apply("GroupIntoShards", GroupByKey.<Integer, T>create())
-            .apply("WriteShardedBundles",
-                ParDo.of(new WriteShardedBundles<>(writeOperationView, 
numShardsView))
-                    .withSideInputs(numShardsView, writeOperationView));
-      } else {
-        numShardsView = null;
-        results = input
-            .apply("ApplyShardLabel", ParDo.of(new ApplyShardingKey<T>(null, 
numShardsProvider)))
-            .apply("GroupIntoShards", GroupByKey.<Integer, T>create())
-            .apply("WriteShardedBundles",
-                ParDo.of(new WriteShardedBundles<>(writeOperationView, null))
-                    .withSideInputs(writeOperationView));
-      }
-    }
-    results.setCoder(writeOperation.getWriterResultCoder());
-
-    if (windowedWrites) {
-      // When processing streaming windowed writes, results will arrive 
multiple times. This
-      // means we can't share the below implementation that turns the results 
into a side input,
-      // as new data arriving into a side input does not trigger the listening 
DoFn. Instead
-      // we aggregate the result set using a singleton GroupByKey, so the DoFn 
will be triggered
-      // whenever new data arrives.
-      PCollection<KV<Void, WriteT>> keyedResults =
-          results.apply("AttachSingletonKey", WithKeys.<Void, WriteT>of((Void) 
null));
-      keyedResults.setCoder(KvCoder.<Void, WriteT>of(VoidCoder.of(), 
writeOperation
-          .getWriterResultCoder()));
-
-      // Is the continuation trigger sufficient?
-      keyedResults
-          .apply("FinalizeGroupByKey", GroupByKey.<Void, WriteT>create())
-          .apply("Finalize", ParDo.of(new DoFn<KV<Void, Iterable<WriteT>>, 
Integer>() {
-            @ProcessElement
-            public void processElement(ProcessContext c) throws Exception {
-              WriteOperation<T, WriteT> writeOperation = 
c.sideInput(writeOperationView);
-              LOG.info("Finalizing write operation {}.", writeOperation);
-              List<WriteT> results = 
Lists.newArrayList(c.element().getValue());
-              writeOperation.finalize(results, c.getPipelineOptions());
-              LOG.debug("Done finalizing write operation {}", writeOperation);
-            }
-          }).withSideInputs(writeOperationView));
-    } else {
-      final PCollectionView<Iterable<WriteT>> resultsView =
-          results.apply(View.<WriteT>asIterable());
-      ImmutableList.Builder<PCollectionView<?>> sideInputs =
-          ImmutableList.<PCollectionView<?>>builder().add(resultsView);
-      if (numShardsView != null) {
-        sideInputs.add(numShardsView);
-      }
-
-      // Finalize the write in another do-once ParDo on the singleton 
collection containing the
-      // Writer. The results from the per-bundle writes are given as an 
Iterable side input.
-      // The WriteOperation's state is the same as after its initialization in 
the first do-once
-      // ParDo. There is a dependency between this ParDo and the parallel 
write (the writer
-      // results collection as a side input), so it will happen after the 
parallel write.
-      // For the non-windowed case, we guarantee that  if no data is written 
but the user has
-      // set numShards, then all shards will be written out as empty files. 
For this reason we
-      // use a side input here.
-      operationCollection
-          .apply("Finalize", ParDo.of(new DoFn<WriteOperation<T, WriteT>, 
Integer>() {
-            @ProcessElement
-            public void processElement(ProcessContext c) throws Exception {
-              WriteOperation<T, WriteT> writeOperation = c.element();
-              LOG.info("Finalizing write operation {}.", writeOperation);
-              List<WriteT> results = 
Lists.newArrayList(c.sideInput(resultsView));
-              LOG.debug("Side input initialized to finalize write operation 
{}.", writeOperation);
-
-              // We must always output at least 1 shard, and honor 
user-specified numShards if
-              // set.
-              int minShardsNeeded;
-              if (numShardsView != null) {
-                minShardsNeeded = c.sideInput(numShardsView);
-              } else if (numShardsProvider != null) {
-                minShardsNeeded = numShardsProvider.get();
-              } else {
-                minShardsNeeded = 1;
-              }
-              int extraShardsNeeded = minShardsNeeded - results.size();
-              if (extraShardsNeeded > 0) {
-                LOG.info(
-                    "Creating {} empty output shards in addition to {} written 
for a total of "
-                        + " {}.", extraShardsNeeded, results.size(), 
minShardsNeeded);
-                for (int i = 0; i < extraShardsNeeded; ++i) {
-                  Writer<T, WriteT> writer = 
writeOperation.createWriter(c.getPipelineOptions());
-                  writer.openUnwindowed(UUID.randomUUID().toString(), 
UNKNOWN_SHARDNUM,
-                      UNKNOWN_NUMSHARDS);
-                  WriteT emptyWrite = writer.close();
-                  results.add(emptyWrite);
-                }
-                LOG.debug("Done creating extra shards.");
-              }
-              writeOperation.finalize(results, c.getPipelineOptions());
-              LOG.debug("Done finalizing write operation {}", writeOperation);
-            }
-          }).withSideInputs(sideInputs.build()));
-    }
-    return PDone.in(input.getPipeline());
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/7512a73c/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/package-info.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/package-info.java
 
b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/package-info.java
index 763b30a..32c36cc 100644
--- 
a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/package-info.java
+++ 
b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/package-info.java
@@ -17,6 +17,7 @@
  */
 
 /**
- * Transforms used to read from the Hadoop file system (HDFS).
+ * {@link org.apache.beam.sdk.io.FileSystem} implementation for any Hadoop
+ * {@link org.apache.hadoop.fs.FileSystem}.
  */
 package org.apache.beam.sdk.io.hdfs;

Reply via email to