Repository: beam
Updated Branches:
  refs/heads/master 3161904d9 -> 9f6377fcc


http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java 
b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java
deleted file mode 100644
index 86a9246..0000000
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java
+++ /dev/null
@@ -1,585 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.ThreadLocalRandom;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.SerializableCoder;
-import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.hdfs.Sink.WriteOperation;
-import org.apache.beam.sdk.io.hdfs.Sink.Writer;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.transforms.WithKeys;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollection.IsBounded;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PDone;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class is deprecated, and only exists currently for HDFSFileSink.
- */
-@Deprecated
-public class Write<T> extends PTransform<PCollection<T>, PDone> {
-  private static final Logger LOG = LoggerFactory.getLogger(Write.class);
-
-  private static final int UNKNOWN_SHARDNUM = -1;
-  private static final int UNKNOWN_NUMSHARDS = -1;
-
-  private final Sink<T> sink;
-  // This allows the number of shards to be dynamically computed based on the 
input
-  // PCollection.
-  @Nullable
-  private final PTransform<PCollection<T>, PCollectionView<Integer>> 
computeNumShards;
-  // We don't use a side input for static sharding, as we want this value to 
be updatable
-  // when a pipeline is updated.
-  @Nullable
-  private final ValueProvider<Integer> numShardsProvider;
-  private boolean windowedWrites;
-
-  /**
-   * Creates a {@link Write} transform that writes to the given {@link Sink}, 
letting the runner
-   * control how many different shards are produced.
-   */
-  public static <T> Write<T> to(Sink<T> sink) {
-    checkNotNull(sink, "sink");
-    return new Write<>(sink, null /* runner-determined sharding */, null, 
false);
-  }
-
-  private Write(
-      Sink<T> sink,
-      @Nullable PTransform<PCollection<T>, PCollectionView<Integer>> 
computeNumShards,
-      @Nullable ValueProvider<Integer> numShardsProvider,
-      boolean windowedWrites) {
-    this.sink = sink;
-    this.computeNumShards = computeNumShards;
-    this.numShardsProvider = numShardsProvider;
-    this.windowedWrites = windowedWrites;
-  }
-
-  @Override
-  public PDone expand(PCollection<T> input) {
-    checkArgument(IsBounded.BOUNDED == input.isBounded() || windowedWrites,
-        "%s can only be applied to an unbounded PCollection if doing windowed 
writes",
-        Write.class.getSimpleName());
-    return createWrite(input, sink.createWriteOperation());
-  }
-
-  @Override
-  public void validate(PipelineOptions options) {
-    sink.validate(options);
-  }
-
-  @Override
-  public void populateDisplayData(DisplayData.Builder builder) {
-    super.populateDisplayData(builder);
-    builder
-        .add(DisplayData.item("sink", sink.getClass()).withLabel("Write Sink"))
-        .include("sink", sink);
-    if (getSharding() != null) {
-      builder.include("sharding", getSharding());
-    } else if (getNumShards() != null) {
-      String numShards = getNumShards().isAccessible()
-          ? getNumShards().get().toString() : getNumShards().toString();
-      builder.add(DisplayData.item("numShards", numShards)
-          .withLabel("Fixed Number of Shards"));
-    }
-  }
-
-  /**
-   * Returns the {@link Sink} associated with this PTransform.
-   */
-  public Sink<T> getSink() {
-    return sink;
-  }
-
-  /**
-   * Gets the {@link PTransform} that will be used to determine sharding. This 
can be either a
-   * static number of shards (as following a call to {@link 
#withNumShards(int)}), dynamic (by
-   * {@link #withSharding(PTransform)}), or runner-determined (by {@link
-   * #withRunnerDeterminedSharding()}.
-   */
-  @Nullable
-  public PTransform<PCollection<T>, PCollectionView<Integer>> getSharding() {
-    return computeNumShards;
-  }
-
-  public ValueProvider<Integer> getNumShards() {
-    return numShardsProvider;
-  }
-
-  /**
-   * Returns a new {@link Write} that will write to the current {@link Sink} 
using the
-   * specified number of shards.
-   *
-   * <p>This option should be used sparingly as it can hurt performance. See 
{@link Write} for
-   * more information.
-   *
-   * <p>A value less than or equal to 0 will be equivalent to the default 
behavior of
-   * runner-determined sharding.
-   */
-  public Write<T> withNumShards(int numShards) {
-    if (numShards > 0) {
-      return withNumShards(StaticValueProvider.of(numShards));
-    }
-    return withRunnerDeterminedSharding();
-  }
-
-  /**
-   * Returns a new {@link Write} that will write to the current {@link Sink} 
using the
-   * {@link ValueProvider} specified number of shards.
-   *
-   * <p>This option should be used sparingly as it can hurt performance. See 
{@link Write} for
-   * more information.
-   */
-  public Write<T> withNumShards(ValueProvider<Integer> numShardsProvider) {
-    return new Write<>(sink, null, numShardsProvider, windowedWrites);
-  }
-
-  /**
-   * Returns a new {@link Write} that will write to the current {@link Sink} 
using the
-   * specified {@link PTransform} to compute the number of shards.
-   *
-   * <p>This option should be used sparingly as it can hurt performance. See 
{@link Write} for
-   * more information.
-   */
-  public Write<T> withSharding(PTransform<PCollection<T>, 
PCollectionView<Integer>> sharding) {
-    checkNotNull(
-        sharding, "Cannot provide null sharding. Use 
withRunnerDeterminedSharding() instead");
-    return new Write<>(sink, sharding, null, windowedWrites);
-  }
-
-  /**
-   * Returns a new {@link Write} that will write to the current {@link Sink} 
with
-   * runner-determined sharding.
-   */
-  public Write<T> withRunnerDeterminedSharding() {
-    return new Write<>(sink, null, null, windowedWrites);
-  }
-
-  /**
-   * Returns a new {@link Write} that writes preserves windowing on it's input.
-   *
-   * <p>If this option is not specified, windowing and triggering are replaced 
by
-   * {@link GlobalWindows} and {@link DefaultTrigger}.
-   *
-   * <p>If there is no data for a window, no output shards will be generated 
for that window.
-   * If a window triggers multiple times, then more than a single output shard 
might be
-   * generated multiple times; it's up to the sink implementation to keep 
these output shards
-   * unique.
-   *
-   * <p>This option can only be used if {@link #withNumShards(int)} is also 
set to a
-   * positive value.
-   */
-  public Write<T> withWindowedWrites() {
-    return new Write<>(sink, computeNumShards, numShardsProvider, true);
-  }
-
-  /**
-   * Writes all the elements in a bundle using a {@link Writer} produced by the
-   * {@link WriteOperation} associated with the {@link Sink}.
-   */
-  private class WriteBundles<WriteT> extends DoFn<T, WriteT> {
-    // Writer that will write the records in this bundle. Lazily
-    // initialized in processElement.
-    private Writer<T, WriteT> writer = null;
-    private final PCollectionView<WriteOperation<T, WriteT>> 
writeOperationView;
-
-    WriteBundles(PCollectionView<WriteOperation<T, WriteT>> 
writeOperationView) {
-      this.writeOperationView = writeOperationView;
-    }
-
-    @ProcessElement
-    public void processElement(ProcessContext c, BoundedWindow window) throws 
Exception {
-      // Lazily initialize the Writer
-      if (writer == null) {
-        WriteOperation<T, WriteT> writeOperation = 
c.sideInput(writeOperationView);
-        LOG.info("Opening writer for write operation {}", writeOperation);
-        writer = writeOperation.createWriter(c.getPipelineOptions());
-
-        if (windowedWrites) {
-          writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), 
UNKNOWN_SHARDNUM,
-              UNKNOWN_NUMSHARDS);
-        } else {
-          writer.openUnwindowed(UUID.randomUUID().toString(), 
UNKNOWN_SHARDNUM, UNKNOWN_NUMSHARDS);
-        }
-        LOG.debug("Done opening writer {} for operation {}", writer, 
writeOperationView);
-      }
-      try {
-        writer.write(c.element());
-      } catch (Exception e) {
-        // Discard write result and close the write.
-        try {
-          writer.close();
-          // The writer does not need to be reset, as this DoFn cannot be 
reused.
-        } catch (Exception closeException) {
-          if (closeException instanceof InterruptedException) {
-            // Do not silently ignore interrupted state.
-            Thread.currentThread().interrupt();
-          }
-          // Do not mask the exception that caused the write to fail.
-          e.addSuppressed(closeException);
-        }
-        throw e;
-      }
-    }
-
-    @FinishBundle
-    public void finishBundle(Context c) throws Exception {
-      if (writer != null) {
-        WriteT result = writer.close();
-        c.output(result);
-        // Reset state in case of reuse.
-        writer = null;
-      }
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      builder.delegate(Write.this);
-    }
-  }
-
-  /**
-   * Like {@link WriteBundles}, but where the elements for each shard have 
been collected into
-   * a single iterable.
-   *
-   * @see WriteBundles
-   */
-  private class WriteShardedBundles<WriteT> extends DoFn<KV<Integer, 
Iterable<T>>, WriteT> {
-    private final PCollectionView<WriteOperation<T, WriteT>> 
writeOperationView;
-    private final PCollectionView<Integer> numShardsView;
-
-    WriteShardedBundles(PCollectionView<WriteOperation<T, WriteT>> 
writeOperationView,
-                        PCollectionView<Integer> numShardsView) {
-      this.writeOperationView = writeOperationView;
-      this.numShardsView = numShardsView;
-    }
-
-    @ProcessElement
-    public void processElement(ProcessContext c, BoundedWindow window) throws 
Exception {
-      int numShards = numShardsView != null ? c.sideInput(numShardsView) : 
getNumShards().get();
-      // In a sharded write, single input element represents one shard. We can 
open and close
-      // the writer in each call to processElement.
-      WriteOperation<T, WriteT> writeOperation = 
c.sideInput(writeOperationView);
-      LOG.info("Opening writer for write operation {}", writeOperation);
-      Writer<T, WriteT> writer = 
writeOperation.createWriter(c.getPipelineOptions());
-      if (windowedWrites) {
-        writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), 
c.element().getKey(),
-            numShards);
-      } else {
-        writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, 
UNKNOWN_NUMSHARDS);
-      }
-      LOG.debug("Done opening writer {} for operation {}", writer, 
writeOperationView);
-
-      try {
-        try {
-          for (T t : c.element().getValue()) {
-            writer.write(t);
-          }
-        } catch (Exception e) {
-          try {
-            writer.close();
-          } catch (Exception closeException) {
-            if (closeException instanceof InterruptedException) {
-              // Do not silently ignore interrupted state.
-              Thread.currentThread().interrupt();
-            }
-            // Do not mask the exception that caused the write to fail.
-            e.addSuppressed(closeException);
-          }
-          throw e;
-        }
-
-        // Close the writer; if this throws let the error propagate.
-        WriteT result = writer.close();
-        c.output(result);
-      } catch (Exception e) {
-        // If anything goes wrong, make sure to delete the temporary file.
-        writer.cleanup();
-        throw e;
-      }
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      builder.delegate(Write.this);
-    }
-  }
-
-  private static class ApplyShardingKey<T> extends DoFn<T, KV<Integer, T>> {
-    private final PCollectionView<Integer> numShardsView;
-    private final ValueProvider<Integer> numShardsProvider;
-    private int shardNumber;
-
-    ApplyShardingKey(PCollectionView<Integer> numShardsView,
-                     ValueProvider<Integer> numShardsProvider) {
-      this.numShardsView = numShardsView;
-      this.numShardsProvider = numShardsProvider;
-      shardNumber = UNKNOWN_SHARDNUM;
-    }
-
-    @ProcessElement
-    public void processElement(ProcessContext context) {
-      int shardCount = 0;
-      if (numShardsView != null) {
-        shardCount = context.sideInput(numShardsView);
-      } else {
-        checkNotNull(numShardsProvider);
-        shardCount = numShardsProvider.get();
-      }
-      checkArgument(
-          shardCount > 0,
-          "Must have a positive number of shards specified for 
non-runner-determined sharding."
-              + " Got %s",
-          shardCount);
-      if (shardNumber == UNKNOWN_SHARDNUM) {
-        // We want to desynchronize the first record sharding key for each 
instance of
-        // ApplyShardingKey, so records in a small PCollection will be 
statistically balanced.
-        shardNumber = ThreadLocalRandom.current().nextInt(shardCount);
-      } else {
-        shardNumber = (shardNumber + 1) % shardCount;
-      }
-      context.output(KV.of(shardNumber, context.element()));
-    }
-  }
-
-  /**
-   * A write is performed as sequence of three {@link ParDo}'s.
-   *
-   * <p>In the first, a do-once ParDo is applied to a singleton PCollection 
containing the Sink's
-   * {@link WriteOperation}. In this initialization ParDo, {@link 
WriteOperation#initialize} is
-   * called. The output of this ParDo is a singleton PCollection
-   * containing the WriteOperation.
-   *
-   * <p>This singleton collection containing the WriteOperation is then used 
as a side input to a
-   * ParDo over the PCollection of elements to write. In this bundle-writing 
phase,
-   * {@link WriteOperation#createWriter} is called to obtain a {@link Writer}.
-   * {@link Writer#open} and {@link Writer#close} are called in {@link 
DoFn#startBundle} and
-   * {@link DoFn#finishBundle}, respectively, and {@link Writer#write} method 
is called for
-   * every element in the bundle. The output of this ParDo is a PCollection of
-   * <i>writer result</i> objects (see {@link Sink} for a description of 
writer results)-one for
-   * each bundle.
-   *
-   * <p>The final do-once ParDo uses the singleton collection of the 
WriteOperation as input and
-   * the collection of writer results as a side-input. In this ParDo,
-   * {@link WriteOperation#finalize} is called to finalize the write.
-   *
-   * <p>If the write of any element in the PCollection fails, {@link 
Writer#close} will be called
-   * before the exception that caused the write to fail is propagated and the 
write result will be
-   * discarded.
-   *
-   * <p>Since the {@link WriteOperation} is serialized after the 
initialization ParDo and
-   * deserialized in the bundle-writing and finalization phases, any state 
change to the
-   * WriteOperation object that occurs during initialization is visible in the 
latter phases.
-   * However, the WriteOperation is not serialized after the bundle-writing 
phase.  This is why
-   * implementations should guarantee that {@link WriteOperation#createWriter} 
does not mutate
-   * WriteOperation).
-   */
-  private <WriteT> PDone createWrite(
-      PCollection<T> input, WriteOperation<T, WriteT> writeOperation) {
-    Pipeline p = input.getPipeline();
-    writeOperation.setWindowedWrites(windowedWrites);
-
-    // A coder to use for the WriteOperation.
-    @SuppressWarnings("unchecked")
-    Coder<WriteOperation<T, WriteT>> operationCoder =
-        (Coder<WriteOperation<T, WriteT>>) 
SerializableCoder.of(writeOperation.getClass());
-
-    // A singleton collection of the WriteOperation, to be used as input to a 
ParDo to initialize
-    // the sink.
-    PCollection<WriteOperation<T, WriteT>> operationCollection =
-        p.apply("CreateOperationCollection", 
Create.of(writeOperation).withCoder(operationCoder));
-
-    // Initialize the resource in a do-once ParDo on the WriteOperation.
-    operationCollection = operationCollection
-        .apply("Initialize", ParDo.of(
-            new DoFn<WriteOperation<T, WriteT>, WriteOperation<T, WriteT>>() {
-              @ProcessElement
-              public void processElement(ProcessContext c) throws Exception {
-                WriteOperation<T, WriteT> writeOperation = c.element();
-                LOG.info("Initializing write operation {}", writeOperation);
-                writeOperation.initialize(c.getPipelineOptions());
-                writeOperation.setWindowedWrites(windowedWrites);
-                LOG.debug("Done initializing write operation {}", 
writeOperation);
-                // The WriteOperation is also the output of this ParDo, so it 
can have mutable
-                // state.
-                c.output(writeOperation);
-              }
-            }))
-        .setCoder(operationCoder);
-
-    // Create a view of the WriteOperation to be used as a sideInput to the 
parallel write phase.
-    final PCollectionView<WriteOperation<T, WriteT>> writeOperationView =
-        operationCollection.apply(View.<WriteOperation<T, 
WriteT>>asSingleton());
-
-    if (!windowedWrites) {
-      // Re-window the data into the global window and remove any existing 
triggers.
-      input =
-          input.apply(
-              Window.<T>into(new GlobalWindows())
-                  .triggering(DefaultTrigger.of())
-                  .discardingFiredPanes());
-    }
-
-
-    // Perform the per-bundle writes as a ParDo on the input PCollection (with 
the WriteOperation
-    // as a side input) and collect the results of the writes in a PCollection.
-    // There is a dependency between this ParDo and the first (the 
WriteOperation PCollection
-    // as a side input), so this will happen after the initial ParDo.
-    PCollection<WriteT> results;
-    final PCollectionView<Integer> numShardsView;
-    if (computeNumShards == null && numShardsProvider == null) {
-      if (windowedWrites) {
-        throw new IllegalStateException("When doing windowed writes, numShards 
must be set"
-            + "explicitly to a positive value");
-      }
-      numShardsView = null;
-      results = input
-          .apply("WriteBundles",
-              ParDo.of(new WriteBundles<>(writeOperationView))
-                  .withSideInputs(writeOperationView));
-    } else {
-      if (computeNumShards != null) {
-        numShardsView = input.apply(computeNumShards);
-        results  = input
-            .apply("ApplyShardLabel", ParDo.of(
-                new ApplyShardingKey<T>(numShardsView, 
null)).withSideInputs(numShardsView))
-            .apply("GroupIntoShards", GroupByKey.<Integer, T>create())
-            .apply("WriteShardedBundles",
-                ParDo.of(new WriteShardedBundles<>(writeOperationView, 
numShardsView))
-                    .withSideInputs(numShardsView, writeOperationView));
-      } else {
-        numShardsView = null;
-        results = input
-            .apply("ApplyShardLabel", ParDo.of(new ApplyShardingKey<T>(null, 
numShardsProvider)))
-            .apply("GroupIntoShards", GroupByKey.<Integer, T>create())
-            .apply("WriteShardedBundles",
-                ParDo.of(new WriteShardedBundles<>(writeOperationView, null))
-                    .withSideInputs(writeOperationView));
-      }
-    }
-    results.setCoder(writeOperation.getWriterResultCoder());
-
-    if (windowedWrites) {
-      // When processing streaming windowed writes, results will arrive 
multiple times. This
-      // means we can't share the below implementation that turns the results 
into a side input,
-      // as new data arriving into a side input does not trigger the listening 
DoFn. Instead
-      // we aggregate the result set using a singleton GroupByKey, so the DoFn 
will be triggered
-      // whenever new data arrives.
-      PCollection<KV<Void, WriteT>> keyedResults =
-          results.apply("AttachSingletonKey", WithKeys.<Void, WriteT>of((Void) 
null));
-      keyedResults.setCoder(KvCoder.<Void, WriteT>of(VoidCoder.of(), 
writeOperation
-          .getWriterResultCoder()));
-
-      // Is the continuation trigger sufficient?
-      keyedResults
-          .apply("FinalizeGroupByKey", GroupByKey.<Void, WriteT>create())
-          .apply("Finalize", ParDo.of(new DoFn<KV<Void, Iterable<WriteT>>, 
Integer>() {
-            @ProcessElement
-            public void processElement(ProcessContext c) throws Exception {
-              WriteOperation<T, WriteT> writeOperation = 
c.sideInput(writeOperationView);
-              LOG.info("Finalizing write operation {}.", writeOperation);
-              List<WriteT> results = 
Lists.newArrayList(c.element().getValue());
-              writeOperation.finalize(results, c.getPipelineOptions());
-              LOG.debug("Done finalizing write operation {}", writeOperation);
-            }
-          }).withSideInputs(writeOperationView));
-    } else {
-      final PCollectionView<Iterable<WriteT>> resultsView =
-          results.apply(View.<WriteT>asIterable());
-      ImmutableList.Builder<PCollectionView<?>> sideInputs =
-          ImmutableList.<PCollectionView<?>>builder().add(resultsView);
-      if (numShardsView != null) {
-        sideInputs.add(numShardsView);
-      }
-
-      // Finalize the write in another do-once ParDo on the singleton 
collection containing the
-      // Writer. The results from the per-bundle writes are given as an 
Iterable side input.
-      // The WriteOperation's state is the same as after its initialization in 
the first do-once
-      // ParDo. There is a dependency between this ParDo and the parallel 
write (the writer
-      // results collection as a side input), so it will happen after the 
parallel write.
-      // For the non-windowed case, we guarantee that  if no data is written 
but the user has
-      // set numShards, then all shards will be written out as empty files. 
For this reason we
-      // use a side input here.
-      operationCollection
-          .apply("Finalize", ParDo.of(new DoFn<WriteOperation<T, WriteT>, 
Integer>() {
-            @ProcessElement
-            public void processElement(ProcessContext c) throws Exception {
-              WriteOperation<T, WriteT> writeOperation = c.element();
-              LOG.info("Finalizing write operation {}.", writeOperation);
-              List<WriteT> results = 
Lists.newArrayList(c.sideInput(resultsView));
-              LOG.debug("Side input initialized to finalize write operation 
{}.", writeOperation);
-
-              // We must always output at least 1 shard, and honor 
user-specified numShards if
-              // set.
-              int minShardsNeeded;
-              if (numShardsView != null) {
-                minShardsNeeded = c.sideInput(numShardsView);
-              } else if (numShardsProvider != null) {
-                minShardsNeeded = numShardsProvider.get();
-              } else {
-                minShardsNeeded = 1;
-              }
-              int extraShardsNeeded = minShardsNeeded - results.size();
-              if (extraShardsNeeded > 0) {
-                LOG.info(
-                    "Creating {} empty output shards in addition to {} written 
for a total of "
-                        + " {}.", extraShardsNeeded, results.size(), 
minShardsNeeded);
-                for (int i = 0; i < extraShardsNeeded; ++i) {
-                  Writer<T, WriteT> writer = 
writeOperation.createWriter(c.getPipelineOptions());
-                  writer.openUnwindowed(UUID.randomUUID().toString(), 
UNKNOWN_SHARDNUM,
-                      UNKNOWN_NUMSHARDS);
-                  WriteT emptyWrite = writer.close();
-                  results.add(emptyWrite);
-                }
-                LOG.debug("Done creating extra shards.");
-              }
-              writeOperation.finalize(results, c.getPipelineOptions());
-              LOG.debug("Done finalizing write operation {}", writeOperation);
-            }
-          }).withSideInputs(sideInputs.build()));
-    }
-    return PDone.in(input.getPipeline());
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/package-info.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/package-info.java 
b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/package-info.java
deleted file mode 100644
index 763b30a..0000000
--- 
a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Transforms used to read from the Hadoop file system (HDFS).
- */
-package org.apache.beam.sdk.io.hdfs;

http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java
 
b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java
deleted file mode 100644
index 9fa6606..0000000
--- 
a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import static org.junit.Assert.assertEquals;
-
-import com.google.common.base.MoreObjects;
-import java.io.File;
-import java.nio.charset.Charset;
-import java.nio.file.Files;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-import java.util.UUID;
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.file.FileReader;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.beam.sdk.coders.AvroCoder;
-import org.apache.beam.sdk.coders.DefaultCoder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.values.KV;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-/**
- * Tests for HDFSFileSinkTest.
- */
-public class HDFSFileSinkTest {
-
-  @Rule
-  public TemporaryFolder tmpFolder = new TemporaryFolder();
-
-  private final String part0 = "part-r-00000";
-  private final String foobar = "foobar";
-
-  private <T> void doWrite(Sink<T> sink,
-                           PipelineOptions options,
-                           Iterable<T> toWrite) throws Exception {
-    Sink.WriteOperation<T, String> writeOperation =
-        (Sink.WriteOperation<T, String>) sink.createWriteOperation();
-    Sink.Writer<T, String> writer = writeOperation.createWriter(options);
-    writer.openUnwindowed(UUID.randomUUID().toString(),  -1, -1);
-    for (T t: toWrite) {
-      writer.write(t);
-    }
-    String writeResult = writer.close();
-    writeOperation.finalize(Collections.singletonList(writeResult), options);
-  }
-
-  @Test
-  public void testWriteSingleRecord() throws Exception {
-    PipelineOptions options = PipelineOptionsFactory.create();
-    File file = tmpFolder.newFolder();
-
-    HDFSFileSink<String, NullWritable, Text> sink =
-        HDFSFileSink.to(
-            file.toString(),
-            SequenceFileOutputFormat.class,
-            NullWritable.class,
-            Text.class,
-            new SerializableFunction<String, KV<NullWritable, Text>>() {
-              @Override
-              public KV<NullWritable, Text> apply(String input) {
-                return KV.of(NullWritable.get(), new Text(input));
-              }
-            });
-
-    doWrite(sink, options, Collections.singletonList(foobar));
-
-    SequenceFile.Reader.Option opts =
-        SequenceFile.Reader.file(new Path(file.toString(), part0));
-    SequenceFile.Reader reader = new SequenceFile.Reader(new Configuration(), 
opts);
-    assertEquals(NullWritable.class.getName(), reader.getKeyClassName());
-    assertEquals(Text.class.getName(), reader.getValueClassName());
-    NullWritable k = NullWritable.get();
-    Text v = new Text();
-    assertEquals(true, reader.next(k, v));
-    assertEquals(NullWritable.get(), k);
-    assertEquals(new Text(foobar), v);
-  }
-
-  @Test
-  public void testToText() throws Exception {
-    PipelineOptions options = PipelineOptionsFactory.create();
-    File file = tmpFolder.newFolder();
-
-    HDFSFileSink<String, NullWritable, Text> sink = 
HDFSFileSink.toText(file.toString());
-
-    doWrite(sink, options, Collections.singletonList(foobar));
-
-    List<String> strings = Files.readAllLines(new File(file.toString(), 
part0).toPath(),
-        Charset.forName("UTF-8"));
-    assertEquals(Collections.singletonList(foobar), strings);
-  }
-
-  @DefaultCoder(AvroCoder.class)
-  static class GenericClass {
-    int intField;
-    String stringField;
-    public GenericClass() {}
-    public GenericClass(int intValue, String stringValue) {
-      this.intField = intValue;
-      this.stringField = stringValue;
-    }
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(getClass())
-          .add("intField", intField)
-          .add("stringField", stringField)
-          .toString();
-    }
-    @Override
-    public int hashCode() {
-      return Objects.hash(intField, stringField);
-    }
-    @Override
-    public boolean equals(Object other) {
-      if (other == null || !(other instanceof GenericClass)) {
-        return false;
-      }
-      GenericClass o = (GenericClass) other;
-      return Objects.equals(intField, o.intField) && 
Objects.equals(stringField, o.stringField);
-    }
-  }
-
-  @Test
-  public void testToAvro() throws Exception {
-    PipelineOptions options = PipelineOptionsFactory.create();
-    File file = tmpFolder.newFolder();
-
-    HDFSFileSink<GenericClass, AvroKey<GenericClass>, NullWritable> sink = 
HDFSFileSink.toAvro(
-        file.toString(),
-        AvroCoder.of(GenericClass.class),
-        new Configuration(false));
-
-    doWrite(sink, options, Collections.singletonList(new GenericClass(3, 
"foobar")));
-
-    GenericDatumReader datumReader = new GenericDatumReader();
-    FileReader<GenericData.Record> reader =
-        DataFileReader.openReader(new File(file.getAbsolutePath(), part0 + 
".avro"), datumReader);
-    GenericData.Record next = reader.next(null);
-    assertEquals("foobar", next.get("stringField").toString());
-    assertEquals(3, next.get("intField"));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
 
b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
deleted file mode 100644
index a964239..0000000
--- 
a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.Source;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.SourceTestUtils;
-import org.apache.beam.sdk.values.KV;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.Writer;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-/**
- * Tests for HDFSFileSource.
- */
-public class HDFSFileSourceTest {
-
-  private Random random = new Random(0L);
-
-  @Rule
-  public TemporaryFolder tmpFolder = new TemporaryFolder();
-
-  @Test
-  public void testFullyReadSingleFile() throws Exception {
-    PipelineOptions options = PipelineOptionsFactory.create();
-    List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10, 
0);
-    File file = createFileWithData("tmp.seq", expectedResults);
-
-    HDFSFileSource<KV<IntWritable, Text>, IntWritable, Text> source =
-            HDFSFileSource.from(
-                    file.toString(), SequenceFileInputFormat.class, 
IntWritable.class, Text.class);
-
-    assertEquals(file.length(), source.getEstimatedSizeBytes(null));
-
-    assertThat(expectedResults, containsInAnyOrder(readFromSource(source, 
options).toArray()));
-  }
-
-  @Test
-  public void testFullyReadSingleFileWithSpaces() throws Exception {
-    PipelineOptions options = PipelineOptionsFactory.create();
-    List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10, 
0);
-    File file = createFileWithData("tmp data.seq", expectedResults);
-
-    HDFSFileSource<KV<IntWritable, Text>, IntWritable, Text> source =
-            HDFSFileSource.from(
-                    file.toString(), SequenceFileInputFormat.class, 
IntWritable.class, Text.class);
-
-    assertEquals(file.length(), source.getEstimatedSizeBytes(null));
-
-    assertThat(expectedResults, containsInAnyOrder(readFromSource(source, 
options).toArray()));
-  }
-
-  @Test
-  public void testFullyReadFilePattern() throws IOException {
-    PipelineOptions options = PipelineOptionsFactory.create();
-    List<KV<IntWritable, Text>> data1 = createRandomRecords(3, 10, 0);
-    File file1 = createFileWithData("file1", data1);
-
-    List<KV<IntWritable, Text>> data2 = createRandomRecords(3, 10, 10);
-    createFileWithData("file2", data2);
-
-    List<KV<IntWritable, Text>> data3 = createRandomRecords(3, 10, 20);
-    createFileWithData("file3", data3);
-
-    List<KV<IntWritable, Text>> data4 = createRandomRecords(3, 10, 30);
-    createFileWithData("otherfile", data4);
-
-    List<KV<IntWritable, Text>> expectedResults = new ArrayList<>();
-    expectedResults.addAll(data1);
-    expectedResults.addAll(data2);
-    expectedResults.addAll(data3);
-
-    HDFSFileSource<KV<IntWritable, Text>, IntWritable, Text> source =
-        HDFSFileSource.from(
-            new File(file1.getParent(), "file*").toString(), 
SequenceFileInputFormat.class,
-            IntWritable.class, Text.class);
-
-    assertThat(expectedResults, containsInAnyOrder(readFromSource(source, 
options).toArray()));
-  }
-
-  @Test
-  public void testCloseUnstartedFilePatternReader() throws IOException {
-    PipelineOptions options = PipelineOptionsFactory.create();
-    List<KV<IntWritable, Text>> data1 = createRandomRecords(3, 10, 0);
-    File file1 = createFileWithData("file1", data1);
-
-    List<KV<IntWritable, Text>> data2 = createRandomRecords(3, 10, 10);
-    createFileWithData("file2", data2);
-
-    List<KV<IntWritable, Text>> data3 = createRandomRecords(3, 10, 20);
-    createFileWithData("file3", data3);
-
-    List<KV<IntWritable, Text>> data4 = createRandomRecords(3, 10, 30);
-    createFileWithData("otherfile", data4);
-
-    HDFSFileSource<KV<IntWritable, Text>, IntWritable, Text> source =
-        HDFSFileSource.from(
-            new File(file1.getParent(), "file*").toString(),
-            SequenceFileInputFormat.class, IntWritable.class, Text.class);
-    Source.Reader<KV<IntWritable, Text>> reader = source.createReader(options);
-
-    // Closing an unstarted FilePatternReader should not throw an exception.
-    try {
-      reader.close();
-    } catch (Exception e) {
-      fail("Closing an unstarted FilePatternReader should not throw an 
exception");
-    }
-  }
-
-  @Test
-  public void testSplits() throws Exception {
-    PipelineOptions options = PipelineOptionsFactory.create();
-
-    List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 
10000, 0);
-    File file = createFileWithData("tmp.seq", expectedResults);
-
-    HDFSFileSource<KV<IntWritable, Text>, IntWritable, Text> source =
-        HDFSFileSource.from(
-            file.toString(), SequenceFileInputFormat.class, IntWritable.class, 
Text.class);
-
-    // Assert that the source produces the expected records
-    assertEquals(expectedResults, readFromSource(source, options));
-
-    // Split with a small bundle size (has to be at least size of sync 
interval)
-    List<? extends BoundedSource<KV<IntWritable, Text>>> splits = source
-        .split(SequenceFile.SYNC_INTERVAL, options);
-    assertTrue(splits.size() > 2);
-    SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options);
-    int nonEmptySplits = 0;
-    for (BoundedSource<KV<IntWritable, Text>> subSource : splits) {
-      if (readFromSource(subSource, options).size() > 0) {
-        nonEmptySplits += 1;
-      }
-    }
-    assertTrue(nonEmptySplits > 2);
-  }
-
-  @Test
-  public void testSplitEstimatedSize() throws Exception {
-    PipelineOptions options = PipelineOptionsFactory.create();
-
-    List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 
10000, 0);
-    File file = createFileWithData("tmp.avro", expectedResults);
-
-    HDFSFileSource<KV<IntWritable, Text>, IntWritable, Text> source =
-        HDFSFileSource.from(file.toString(), SequenceFileInputFormat.class,
-            IntWritable.class, Text.class);
-
-    long originalSize = source.getEstimatedSizeBytes(options);
-    long splitTotalSize = 0;
-    List<? extends BoundedSource<KV<IntWritable, Text>>> splits = source.split(
-        SequenceFile.SYNC_INTERVAL, options
-    );
-    for (BoundedSource<KV<IntWritable, Text>> splitSource : splits) {
-      splitTotalSize += splitSource.getEstimatedSizeBytes(options);
-    }
-    // Assert that the estimated size of the whole is the sum of its parts
-    assertEquals(originalSize, splitTotalSize);
-  }
-
-  private File createFileWithData(String filename, List<KV<IntWritable, Text>> 
records)
-      throws IOException {
-    File tmpFile = tmpFolder.newFile(filename);
-    try (Writer writer = SequenceFile.createWriter(new Configuration(),
-        Writer.keyClass(IntWritable.class), Writer.valueClass(Text.class),
-        Writer.file(new Path(tmpFile.toURI())))) {
-
-      for (KV<IntWritable, Text> record : records) {
-        writer.append(record.getKey(), record.getValue());
-      }
-    }
-    return tmpFile;
-  }
-
-  private List<KV<IntWritable, Text>> createRandomRecords(int dataItemLength,
-                                                          int numItems, int 
offset) {
-    List<KV<IntWritable, Text>> records = new ArrayList<>();
-    for (int i = 0; i < numItems; i++) {
-      IntWritable key = new IntWritable(i + offset);
-      Text value = new Text(createRandomString(dataItemLength));
-      records.add(KV.of(key, value));
-    }
-    return records;
-  }
-
-  private String createRandomString(int length) {
-    char[] chars = "abcdefghijklmnopqrstuvwxyz".toCharArray();
-    StringBuilder builder = new StringBuilder();
-    for (int i = 0; i < length; i++) {
-      builder.append(chars[random.nextInt(chars.length)]);
-    }
-    return builder.toString();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModuleTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModuleTest.java
 
b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModuleTest.java
deleted file mode 100644
index 6963116..0000000
--- 
a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModuleTest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import static org.hamcrest.Matchers.hasItem;
-import static org.junit.Assert.assertThat;
-
-import com.fasterxml.jackson.databind.Module;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import java.util.AbstractMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.sdk.util.common.ReflectHelpers;
-import org.apache.hadoop.conf.Configuration;
-import org.hamcrest.Matchers;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Test for {@link HadoopFileSystemModule}.
- */
-@RunWith(JUnit4.class)
-public class HadoopFileSystemModuleTest {
-  @Test
-  public void testObjectMapperIsAbleToFindModule() throws Exception {
-    List<Module> modules = 
ObjectMapper.findModules(ReflectHelpers.findClassLoader());
-    assertThat(modules, 
hasItem(Matchers.<Module>instanceOf(HadoopFileSystemModule.class)));
-  }
-
-  @Test
-  public void testConfigurationSerializationDeserialization() throws Exception 
{
-    Configuration baseConfiguration = new Configuration(false);
-    baseConfiguration.set("testPropertyA", "baseA");
-    baseConfiguration.set("testPropertyC", "baseC");
-    Configuration configuration = new Configuration(false);
-    configuration.addResource(baseConfiguration);
-    configuration.set("testPropertyA", "A");
-    configuration.set("testPropertyB", "B");
-    ObjectMapper objectMapper = new ObjectMapper();
-    objectMapper.registerModule(new HadoopFileSystemModule());
-    String serializedConfiguration = 
objectMapper.writeValueAsString(configuration);
-    Configuration deserializedConfiguration =
-        objectMapper.readValue(serializedConfiguration, Configuration.class);
-    assertThat(deserializedConfiguration, Matchers.<Map.Entry<String, 
String>>contains(
-        new AbstractMap.SimpleEntry("testPropertyA", "A"),
-        new AbstractMap.SimpleEntry("testPropertyB", "B"),
-        new AbstractMap.SimpleEntry("testPropertyC", "baseC")));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrarTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrarTest.java
 
b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrarTest.java
deleted file mode 100644
index 2be3d93..0000000
--- 
a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrarTest.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
-
-import com.google.common.collect.Lists;
-import java.util.ServiceLoader;
-import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
-import org.hamcrest.Matchers;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link HadoopFileSystemOptionsRegistrar}.
- */
-@RunWith(JUnit4.class)
-public class HadoopFileSystemOptionsRegistrarTest {
-
-  @Test
-  public void testServiceLoader() {
-    for (PipelineOptionsRegistrar registrar
-        : 
Lists.newArrayList(ServiceLoader.load(PipelineOptionsRegistrar.class).iterator()))
 {
-      if (registrar instanceof HadoopFileSystemOptionsRegistrar) {
-        assertThat(registrar.getPipelineOptions(),
-            Matchers.<Class<?>>contains(HadoopFileSystemOptions.class));
-        return;
-      }
-    }
-    fail("Expected to find " + HadoopFileSystemOptionsRegistrar.class);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsTest.java
 
b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsTest.java
deleted file mode 100644
index 634528b..0000000
--- 
a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsTest.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-
-import java.util.AbstractMap;
-import java.util.Map;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.hamcrest.Matchers;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link HadoopFileSystemOptions}.
- */
-@RunWith(JUnit4.class)
-public class HadoopFileSystemOptionsTest {
-  @Test
-  public void testParsingHdfsConfiguration() {
-    HadoopFileSystemOptions options = PipelineOptionsFactory.fromArgs(
-        "--hdfsConfiguration=["
-            + "{\"propertyA\": \"A\"},"
-            + "{\"propertyB\": \"B\"}]").as(HadoopFileSystemOptions.class);
-    assertEquals(2, options.getHdfsConfiguration().size());
-    assertThat(options.getHdfsConfiguration().get(0), 
Matchers.<Map.Entry<String, String>>contains(
-        new AbstractMap.SimpleEntry("propertyA", "A")));
-    assertThat(options.getHdfsConfiguration().get(1), 
Matchers.<Map.Entry<String, String>>contains(
-        new AbstractMap.SimpleEntry("propertyB", "B")));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java
 
b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java
deleted file mode 100644
index 96f7102..0000000
--- 
a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import java.net.URI;
-import java.util.ServiceLoader;
-import org.apache.beam.sdk.io.FileSystem;
-import org.apache.beam.sdk.io.FileSystemRegistrar;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link HadoopFileSystemRegistrar}.
- */
-@RunWith(JUnit4.class)
-public class HadoopFileSystemRegistrarTest {
-
-  @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
-  private Configuration configuration;
-  private MiniDFSCluster hdfsCluster;
-  private URI hdfsClusterBaseUri;
-
-  @Before
-  public void setUp() throws Exception {
-    configuration = new Configuration();
-    configuration.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
tmpFolder.getRoot().getAbsolutePath());
-    MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(configuration);
-    hdfsCluster = builder.build();
-    hdfsClusterBaseUri = new URI(configuration.get("fs.defaultFS") + "/");
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    hdfsCluster.shutdown();
-  }
-
-  @Test
-  public void testServiceLoader() {
-    HadoopFileSystemOptions options = 
PipelineOptionsFactory.as(HadoopFileSystemOptions.class);
-    options.setHdfsConfiguration(ImmutableList.of(configuration));
-    for (FileSystemRegistrar registrar
-        : 
Lists.newArrayList(ServiceLoader.load(FileSystemRegistrar.class).iterator())) {
-      if (registrar instanceof HadoopFileSystemRegistrar) {
-        Iterable<FileSystem> fileSystems = registrar.fromOptions(options);
-        assertEquals(hdfsClusterBaseUri.getScheme(),
-            ((HadoopFileSystem) 
Iterables.getOnlyElement(fileSystems)).getScheme());
-        return;
-      }
-    }
-    fail("Expected to find " + HadoopFileSystemRegistrar.class);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
 
b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
deleted file mode 100644
index cf86c36..0000000
--- 
a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hdfs;
-
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.io.ByteStreams;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.WritableByteChannel;
-import java.util.List;
-import org.apache.beam.sdk.io.FileSystems;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.io.fs.CreateOptions.StandardCreateOptions;
-import org.apache.beam.sdk.io.fs.MatchResult;
-import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
-import org.apache.beam.sdk.io.fs.MatchResult.Status;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.util.MimeTypes;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link HadoopFileSystem}.
- */
-@RunWith(JUnit4.class)
-public class HadoopFileSystemTest {
-
-  @Rule public TestPipeline p = TestPipeline.create();
-  @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
-  @Rule public ExpectedException thrown = ExpectedException.none();
-  private Configuration configuration;
-  private MiniDFSCluster hdfsCluster;
-  private URI hdfsClusterBaseUri;
-  private HadoopFileSystem fileSystem;
-
-  @Before
-  public void setUp() throws Exception {
-    configuration = new Configuration();
-    configuration.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
tmpFolder.getRoot().getAbsolutePath());
-    MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(configuration);
-    hdfsCluster = builder.build();
-    hdfsClusterBaseUri = new URI(configuration.get("fs.defaultFS") + "/");
-    fileSystem = new HadoopFileSystem(configuration);
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    hdfsCluster.shutdown();
-  }
-
-  @Test
-  public void testCreateAndReadFile() throws Exception {
-    create("testFile", "testData".getBytes());
-    assertArrayEquals("testData".getBytes(), read("testFile"));
-  }
-
-  @Test
-  public void testCopy() throws Exception {
-    create("testFileA", "testDataA".getBytes());
-    create("testFileB", "testDataB".getBytes());
-    fileSystem.copy(
-        ImmutableList.of(
-            testPath("testFileA"),
-            testPath("testFileB")),
-        ImmutableList.of(
-            testPath("copyTestFileA"),
-            testPath("copyTestFileB")));
-    assertArrayEquals("testDataA".getBytes(), read("testFileA"));
-    assertArrayEquals("testDataB".getBytes(), read("testFileB"));
-    assertArrayEquals("testDataA".getBytes(), read("copyTestFileA"));
-    assertArrayEquals("testDataB".getBytes(), read("copyTestFileB"));
-  }
-
-  @Test
-  public void testDelete() throws Exception {
-    create("testFileA", "testDataA".getBytes());
-    create("testFileB", "testDataB".getBytes());
-    create("testFileC", "testDataC".getBytes());
-
-    // ensure files exist
-    assertArrayEquals("testDataA".getBytes(), read("testFileA"));
-    assertArrayEquals("testDataB".getBytes(), read("testFileB"));
-    assertArrayEquals("testDataC".getBytes(), read("testFileC"));
-
-    fileSystem.delete(ImmutableList.of(
-        testPath("testFileA"),
-        testPath("testFileC")));
-
-    List<MatchResult> results =
-        fileSystem.match(ImmutableList.of(testPath("testFile*").toString()));
-    assertThat(results, contains(MatchResult.create(Status.OK, 
ImmutableList.of(
-        Metadata.builder()
-            .setResourceId(testPath("testFileB"))
-            .setIsReadSeekEfficient(true)
-            .setSizeBytes("testDataB".getBytes().length)
-            .build()))));
-  }
-
-  @Test
-  public void testMatch() throws Exception {
-    create("testFileAA", "testDataAA".getBytes());
-    create("testFileA", "testDataA".getBytes());
-    create("testFileB", "testDataB".getBytes());
-
-    // ensure files exist
-    assertArrayEquals("testDataAA".getBytes(), read("testFileAA"));
-    assertArrayEquals("testDataA".getBytes(), read("testFileA"));
-    assertArrayEquals("testDataB".getBytes(), read("testFileB"));
-
-    List<MatchResult> results =
-        fileSystem.match(ImmutableList.of(testPath("testFileA*").toString()));
-    assertEquals(Status.OK, Iterables.getOnlyElement(results).status());
-    assertThat(Iterables.getOnlyElement(results).metadata(), 
containsInAnyOrder(
-        Metadata.builder()
-            .setResourceId(testPath("testFileAA"))
-            .setIsReadSeekEfficient(true)
-            .setSizeBytes("testDataAA".getBytes().length)
-            .build(),
-        Metadata.builder()
-            .setResourceId(testPath("testFileA"))
-            .setIsReadSeekEfficient(true)
-            .setSizeBytes("testDataA".getBytes().length)
-            .build()));
-  }
-
-  @Test
-  public void testRename() throws Exception {
-    create("testFileA", "testDataA".getBytes());
-    create("testFileB", "testDataB".getBytes());
-
-    // ensure files exist
-    assertArrayEquals("testDataA".getBytes(), read("testFileA"));
-    assertArrayEquals("testDataB".getBytes(), read("testFileB"));
-
-    fileSystem.rename(
-        ImmutableList.of(
-            testPath("testFileA"), testPath("testFileB")),
-        ImmutableList.of(
-            testPath("renameFileA"), testPath("renameFileB")));
-
-    List<MatchResult> results =
-        fileSystem.match(ImmutableList.of(testPath("*").toString()));
-    assertEquals(Status.OK, Iterables.getOnlyElement(results).status());
-    assertThat(Iterables.getOnlyElement(results).metadata(), 
containsInAnyOrder(
-        Metadata.builder()
-            .setResourceId(testPath("renameFileA"))
-            .setIsReadSeekEfficient(true)
-            .setSizeBytes("testDataA".getBytes().length)
-            .build(),
-        Metadata.builder()
-            .setResourceId(testPath("renameFileB"))
-            .setIsReadSeekEfficient(true)
-            .setSizeBytes("testDataB".getBytes().length)
-            .build()));
-
-    // ensure files exist
-    assertArrayEquals("testDataA".getBytes(), read("renameFileA"));
-    assertArrayEquals("testDataB".getBytes(), read("renameFileB"));
-  }
-
-  @Test
-  public void testMatchNewResource() throws Exception {
-    // match file spec
-    assertEquals(testPath("file"),
-        fileSystem.matchNewResource(testPath("file").toString(), false));
-    // match dir spec missing '/'
-    assertEquals(testPath("dir/"),
-        fileSystem.matchNewResource(testPath("dir").toString(), true));
-    // match dir spec with '/'
-    assertEquals(testPath("dir/"),
-        fileSystem.matchNewResource(testPath("dir/").toString(), true));
-
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Expected file path but received directory path");
-    fileSystem.matchNewResource(testPath("dir/").toString(), false);
-  }
-
-  @Test
-  @Ignore("TestPipeline needs a way to take in HadoopFileSystemOptions")
-  public void testReadPipeline() throws Exception {
-    create("testFileA", "testDataA".getBytes());
-    create("testFileB", "testDataB".getBytes());
-    create("testFileC", "testDataC".getBytes());
-
-    HadoopFileSystemOptions options = TestPipeline.testingPipelineOptions()
-        .as(HadoopFileSystemOptions.class);
-    
options.setHdfsConfiguration(ImmutableList.of(fileSystem.fileSystem.getConf()));
-    FileSystems.setDefaultConfigInWorkers(options);
-    PCollection<String> pc = p.apply(
-        TextIO.read().from(testPath("testFile*").toString()));
-    PAssert.that(pc).containsInAnyOrder("testDataA", "testDataB", "testDataC");
-    p.run();
-  }
-
-  private void create(String relativePath, byte[] contents) throws Exception {
-    try (WritableByteChannel channel = fileSystem.create(
-        testPath(relativePath),
-        
StandardCreateOptions.builder().setMimeType(MimeTypes.BINARY).build())) {
-      channel.write(ByteBuffer.wrap(contents));
-    }
-  }
-
-  private byte[] read(String relativePath) throws Exception {
-    try (ReadableByteChannel channel = 
fileSystem.open(testPath(relativePath))) {
-      return ByteStreams.toByteArray(Channels.newInputStream(channel));
-    }
-  }
-
-  private HadoopResourceId testPath(String relativePath) {
-    return new HadoopResourceId(hdfsClusterBaseUri.resolve(relativePath));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml
index 5b1e243..9480705 100644
--- a/sdks/java/io/pom.xml
+++ b/sdks/java/io/pom.xml
@@ -68,9 +68,9 @@
     <module>elasticsearch</module>
     <module>google-cloud-platform</module>
     <module>hadoop-common</module>
+    <module>hadoop-file-system</module>
     <module>hadoop</module>
     <module>hbase</module>
-    <module>hdfs</module>
     <module>jdbc</module>
     <module>jms</module>
     <module>kafka</module>

Reply via email to