Repository: beam Updated Branches: refs/heads/master 3161904d9 -> 9f6377fcc
http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java deleted file mode 100644 index 86a9246..0000000 --- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java +++ /dev/null @@ -1,585 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.hdfs; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.ThreadLocalRandom; -import javax.annotation.Nullable; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.SerializableCoder; -import org.apache.beam.sdk.coders.VoidCoder; -import org.apache.beam.sdk.io.hdfs.Sink.WriteOperation; -import org.apache.beam.sdk.io.hdfs.Sink.Writer; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.ValueProvider; -import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.View; -import org.apache.beam.sdk.transforms.WithKeys; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; -import org.apache.beam.sdk.transforms.windowing.GlobalWindows; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollection.IsBounded; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.PDone; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This class is deprecated, and only exists currently for HDFSFileSink. - */ -@Deprecated -public class Write<T> extends PTransform<PCollection<T>, PDone> { - private static final Logger LOG = LoggerFactory.getLogger(Write.class); - - private static final int UNKNOWN_SHARDNUM = -1; - private static final int UNKNOWN_NUMSHARDS = -1; - - private final Sink<T> sink; - // This allows the number of shards to be dynamically computed based on the input - // PCollection. - @Nullable - private final PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards; - // We don't use a side input for static sharding, as we want this value to be updatable - // when a pipeline is updated. - @Nullable - private final ValueProvider<Integer> numShardsProvider; - private boolean windowedWrites; - - /** - * Creates a {@link Write} transform that writes to the given {@link Sink}, letting the runner - * control how many different shards are produced. - */ - public static <T> Write<T> to(Sink<T> sink) { - checkNotNull(sink, "sink"); - return new Write<>(sink, null /* runner-determined sharding */, null, false); - } - - private Write( - Sink<T> sink, - @Nullable PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards, - @Nullable ValueProvider<Integer> numShardsProvider, - boolean windowedWrites) { - this.sink = sink; - this.computeNumShards = computeNumShards; - this.numShardsProvider = numShardsProvider; - this.windowedWrites = windowedWrites; - } - - @Override - public PDone expand(PCollection<T> input) { - checkArgument(IsBounded.BOUNDED == input.isBounded() || windowedWrites, - "%s can only be applied to an unbounded PCollection if doing windowed writes", - Write.class.getSimpleName()); - return createWrite(input, sink.createWriteOperation()); - } - - @Override - public void validate(PipelineOptions options) { - sink.validate(options); - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - builder - .add(DisplayData.item("sink", sink.getClass()).withLabel("Write Sink")) - .include("sink", sink); - if (getSharding() != null) { - builder.include("sharding", getSharding()); - } else if (getNumShards() != null) { - String numShards = getNumShards().isAccessible() - ? getNumShards().get().toString() : getNumShards().toString(); - builder.add(DisplayData.item("numShards", numShards) - .withLabel("Fixed Number of Shards")); - } - } - - /** - * Returns the {@link Sink} associated with this PTransform. - */ - public Sink<T> getSink() { - return sink; - } - - /** - * Gets the {@link PTransform} that will be used to determine sharding. This can be either a - * static number of shards (as following a call to {@link #withNumShards(int)}), dynamic (by - * {@link #withSharding(PTransform)}), or runner-determined (by {@link - * #withRunnerDeterminedSharding()}. - */ - @Nullable - public PTransform<PCollection<T>, PCollectionView<Integer>> getSharding() { - return computeNumShards; - } - - public ValueProvider<Integer> getNumShards() { - return numShardsProvider; - } - - /** - * Returns a new {@link Write} that will write to the current {@link Sink} using the - * specified number of shards. - * - * <p>This option should be used sparingly as it can hurt performance. See {@link Write} for - * more information. - * - * <p>A value less than or equal to 0 will be equivalent to the default behavior of - * runner-determined sharding. - */ - public Write<T> withNumShards(int numShards) { - if (numShards > 0) { - return withNumShards(StaticValueProvider.of(numShards)); - } - return withRunnerDeterminedSharding(); - } - - /** - * Returns a new {@link Write} that will write to the current {@link Sink} using the - * {@link ValueProvider} specified number of shards. - * - * <p>This option should be used sparingly as it can hurt performance. See {@link Write} for - * more information. - */ - public Write<T> withNumShards(ValueProvider<Integer> numShardsProvider) { - return new Write<>(sink, null, numShardsProvider, windowedWrites); - } - - /** - * Returns a new {@link Write} that will write to the current {@link Sink} using the - * specified {@link PTransform} to compute the number of shards. - * - * <p>This option should be used sparingly as it can hurt performance. See {@link Write} for - * more information. - */ - public Write<T> withSharding(PTransform<PCollection<T>, PCollectionView<Integer>> sharding) { - checkNotNull( - sharding, "Cannot provide null sharding. Use withRunnerDeterminedSharding() instead"); - return new Write<>(sink, sharding, null, windowedWrites); - } - - /** - * Returns a new {@link Write} that will write to the current {@link Sink} with - * runner-determined sharding. - */ - public Write<T> withRunnerDeterminedSharding() { - return new Write<>(sink, null, null, windowedWrites); - } - - /** - * Returns a new {@link Write} that writes preserves windowing on it's input. - * - * <p>If this option is not specified, windowing and triggering are replaced by - * {@link GlobalWindows} and {@link DefaultTrigger}. - * - * <p>If there is no data for a window, no output shards will be generated for that window. - * If a window triggers multiple times, then more than a single output shard might be - * generated multiple times; it's up to the sink implementation to keep these output shards - * unique. - * - * <p>This option can only be used if {@link #withNumShards(int)} is also set to a - * positive value. - */ - public Write<T> withWindowedWrites() { - return new Write<>(sink, computeNumShards, numShardsProvider, true); - } - - /** - * Writes all the elements in a bundle using a {@link Writer} produced by the - * {@link WriteOperation} associated with the {@link Sink}. - */ - private class WriteBundles<WriteT> extends DoFn<T, WriteT> { - // Writer that will write the records in this bundle. Lazily - // initialized in processElement. - private Writer<T, WriteT> writer = null; - private final PCollectionView<WriteOperation<T, WriteT>> writeOperationView; - - WriteBundles(PCollectionView<WriteOperation<T, WriteT>> writeOperationView) { - this.writeOperationView = writeOperationView; - } - - @ProcessElement - public void processElement(ProcessContext c, BoundedWindow window) throws Exception { - // Lazily initialize the Writer - if (writer == null) { - WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView); - LOG.info("Opening writer for write operation {}", writeOperation); - writer = writeOperation.createWriter(c.getPipelineOptions()); - - if (windowedWrites) { - writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), UNKNOWN_SHARDNUM, - UNKNOWN_NUMSHARDS); - } else { - writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, UNKNOWN_NUMSHARDS); - } - LOG.debug("Done opening writer {} for operation {}", writer, writeOperationView); - } - try { - writer.write(c.element()); - } catch (Exception e) { - // Discard write result and close the write. - try { - writer.close(); - // The writer does not need to be reset, as this DoFn cannot be reused. - } catch (Exception closeException) { - if (closeException instanceof InterruptedException) { - // Do not silently ignore interrupted state. - Thread.currentThread().interrupt(); - } - // Do not mask the exception that caused the write to fail. - e.addSuppressed(closeException); - } - throw e; - } - } - - @FinishBundle - public void finishBundle(Context c) throws Exception { - if (writer != null) { - WriteT result = writer.close(); - c.output(result); - // Reset state in case of reuse. - writer = null; - } - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - builder.delegate(Write.this); - } - } - - /** - * Like {@link WriteBundles}, but where the elements for each shard have been collected into - * a single iterable. - * - * @see WriteBundles - */ - private class WriteShardedBundles<WriteT> extends DoFn<KV<Integer, Iterable<T>>, WriteT> { - private final PCollectionView<WriteOperation<T, WriteT>> writeOperationView; - private final PCollectionView<Integer> numShardsView; - - WriteShardedBundles(PCollectionView<WriteOperation<T, WriteT>> writeOperationView, - PCollectionView<Integer> numShardsView) { - this.writeOperationView = writeOperationView; - this.numShardsView = numShardsView; - } - - @ProcessElement - public void processElement(ProcessContext c, BoundedWindow window) throws Exception { - int numShards = numShardsView != null ? c.sideInput(numShardsView) : getNumShards().get(); - // In a sharded write, single input element represents one shard. We can open and close - // the writer in each call to processElement. - WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView); - LOG.info("Opening writer for write operation {}", writeOperation); - Writer<T, WriteT> writer = writeOperation.createWriter(c.getPipelineOptions()); - if (windowedWrites) { - writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), c.element().getKey(), - numShards); - } else { - writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, UNKNOWN_NUMSHARDS); - } - LOG.debug("Done opening writer {} for operation {}", writer, writeOperationView); - - try { - try { - for (T t : c.element().getValue()) { - writer.write(t); - } - } catch (Exception e) { - try { - writer.close(); - } catch (Exception closeException) { - if (closeException instanceof InterruptedException) { - // Do not silently ignore interrupted state. - Thread.currentThread().interrupt(); - } - // Do not mask the exception that caused the write to fail. - e.addSuppressed(closeException); - } - throw e; - } - - // Close the writer; if this throws let the error propagate. - WriteT result = writer.close(); - c.output(result); - } catch (Exception e) { - // If anything goes wrong, make sure to delete the temporary file. - writer.cleanup(); - throw e; - } - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - builder.delegate(Write.this); - } - } - - private static class ApplyShardingKey<T> extends DoFn<T, KV<Integer, T>> { - private final PCollectionView<Integer> numShardsView; - private final ValueProvider<Integer> numShardsProvider; - private int shardNumber; - - ApplyShardingKey(PCollectionView<Integer> numShardsView, - ValueProvider<Integer> numShardsProvider) { - this.numShardsView = numShardsView; - this.numShardsProvider = numShardsProvider; - shardNumber = UNKNOWN_SHARDNUM; - } - - @ProcessElement - public void processElement(ProcessContext context) { - int shardCount = 0; - if (numShardsView != null) { - shardCount = context.sideInput(numShardsView); - } else { - checkNotNull(numShardsProvider); - shardCount = numShardsProvider.get(); - } - checkArgument( - shardCount > 0, - "Must have a positive number of shards specified for non-runner-determined sharding." - + " Got %s", - shardCount); - if (shardNumber == UNKNOWN_SHARDNUM) { - // We want to desynchronize the first record sharding key for each instance of - // ApplyShardingKey, so records in a small PCollection will be statistically balanced. - shardNumber = ThreadLocalRandom.current().nextInt(shardCount); - } else { - shardNumber = (shardNumber + 1) % shardCount; - } - context.output(KV.of(shardNumber, context.element())); - } - } - - /** - * A write is performed as sequence of three {@link ParDo}'s. - * - * <p>In the first, a do-once ParDo is applied to a singleton PCollection containing the Sink's - * {@link WriteOperation}. In this initialization ParDo, {@link WriteOperation#initialize} is - * called. The output of this ParDo is a singleton PCollection - * containing the WriteOperation. - * - * <p>This singleton collection containing the WriteOperation is then used as a side input to a - * ParDo over the PCollection of elements to write. In this bundle-writing phase, - * {@link WriteOperation#createWriter} is called to obtain a {@link Writer}. - * {@link Writer#open} and {@link Writer#close} are called in {@link DoFn#startBundle} and - * {@link DoFn#finishBundle}, respectively, and {@link Writer#write} method is called for - * every element in the bundle. The output of this ParDo is a PCollection of - * <i>writer result</i> objects (see {@link Sink} for a description of writer results)-one for - * each bundle. - * - * <p>The final do-once ParDo uses the singleton collection of the WriteOperation as input and - * the collection of writer results as a side-input. In this ParDo, - * {@link WriteOperation#finalize} is called to finalize the write. - * - * <p>If the write of any element in the PCollection fails, {@link Writer#close} will be called - * before the exception that caused the write to fail is propagated and the write result will be - * discarded. - * - * <p>Since the {@link WriteOperation} is serialized after the initialization ParDo and - * deserialized in the bundle-writing and finalization phases, any state change to the - * WriteOperation object that occurs during initialization is visible in the latter phases. - * However, the WriteOperation is not serialized after the bundle-writing phase. This is why - * implementations should guarantee that {@link WriteOperation#createWriter} does not mutate - * WriteOperation). - */ - private <WriteT> PDone createWrite( - PCollection<T> input, WriteOperation<T, WriteT> writeOperation) { - Pipeline p = input.getPipeline(); - writeOperation.setWindowedWrites(windowedWrites); - - // A coder to use for the WriteOperation. - @SuppressWarnings("unchecked") - Coder<WriteOperation<T, WriteT>> operationCoder = - (Coder<WriteOperation<T, WriteT>>) SerializableCoder.of(writeOperation.getClass()); - - // A singleton collection of the WriteOperation, to be used as input to a ParDo to initialize - // the sink. - PCollection<WriteOperation<T, WriteT>> operationCollection = - p.apply("CreateOperationCollection", Create.of(writeOperation).withCoder(operationCoder)); - - // Initialize the resource in a do-once ParDo on the WriteOperation. - operationCollection = operationCollection - .apply("Initialize", ParDo.of( - new DoFn<WriteOperation<T, WriteT>, WriteOperation<T, WriteT>>() { - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - WriteOperation<T, WriteT> writeOperation = c.element(); - LOG.info("Initializing write operation {}", writeOperation); - writeOperation.initialize(c.getPipelineOptions()); - writeOperation.setWindowedWrites(windowedWrites); - LOG.debug("Done initializing write operation {}", writeOperation); - // The WriteOperation is also the output of this ParDo, so it can have mutable - // state. - c.output(writeOperation); - } - })) - .setCoder(operationCoder); - - // Create a view of the WriteOperation to be used as a sideInput to the parallel write phase. - final PCollectionView<WriteOperation<T, WriteT>> writeOperationView = - operationCollection.apply(View.<WriteOperation<T, WriteT>>asSingleton()); - - if (!windowedWrites) { - // Re-window the data into the global window and remove any existing triggers. - input = - input.apply( - Window.<T>into(new GlobalWindows()) - .triggering(DefaultTrigger.of()) - .discardingFiredPanes()); - } - - - // Perform the per-bundle writes as a ParDo on the input PCollection (with the WriteOperation - // as a side input) and collect the results of the writes in a PCollection. - // There is a dependency between this ParDo and the first (the WriteOperation PCollection - // as a side input), so this will happen after the initial ParDo. - PCollection<WriteT> results; - final PCollectionView<Integer> numShardsView; - if (computeNumShards == null && numShardsProvider == null) { - if (windowedWrites) { - throw new IllegalStateException("When doing windowed writes, numShards must be set" - + "explicitly to a positive value"); - } - numShardsView = null; - results = input - .apply("WriteBundles", - ParDo.of(new WriteBundles<>(writeOperationView)) - .withSideInputs(writeOperationView)); - } else { - if (computeNumShards != null) { - numShardsView = input.apply(computeNumShards); - results = input - .apply("ApplyShardLabel", ParDo.of( - new ApplyShardingKey<T>(numShardsView, null)).withSideInputs(numShardsView)) - .apply("GroupIntoShards", GroupByKey.<Integer, T>create()) - .apply("WriteShardedBundles", - ParDo.of(new WriteShardedBundles<>(writeOperationView, numShardsView)) - .withSideInputs(numShardsView, writeOperationView)); - } else { - numShardsView = null; - results = input - .apply("ApplyShardLabel", ParDo.of(new ApplyShardingKey<T>(null, numShardsProvider))) - .apply("GroupIntoShards", GroupByKey.<Integer, T>create()) - .apply("WriteShardedBundles", - ParDo.of(new WriteShardedBundles<>(writeOperationView, null)) - .withSideInputs(writeOperationView)); - } - } - results.setCoder(writeOperation.getWriterResultCoder()); - - if (windowedWrites) { - // When processing streaming windowed writes, results will arrive multiple times. This - // means we can't share the below implementation that turns the results into a side input, - // as new data arriving into a side input does not trigger the listening DoFn. Instead - // we aggregate the result set using a singleton GroupByKey, so the DoFn will be triggered - // whenever new data arrives. - PCollection<KV<Void, WriteT>> keyedResults = - results.apply("AttachSingletonKey", WithKeys.<Void, WriteT>of((Void) null)); - keyedResults.setCoder(KvCoder.<Void, WriteT>of(VoidCoder.of(), writeOperation - .getWriterResultCoder())); - - // Is the continuation trigger sufficient? - keyedResults - .apply("FinalizeGroupByKey", GroupByKey.<Void, WriteT>create()) - .apply("Finalize", ParDo.of(new DoFn<KV<Void, Iterable<WriteT>>, Integer>() { - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView); - LOG.info("Finalizing write operation {}.", writeOperation); - List<WriteT> results = Lists.newArrayList(c.element().getValue()); - writeOperation.finalize(results, c.getPipelineOptions()); - LOG.debug("Done finalizing write operation {}", writeOperation); - } - }).withSideInputs(writeOperationView)); - } else { - final PCollectionView<Iterable<WriteT>> resultsView = - results.apply(View.<WriteT>asIterable()); - ImmutableList.Builder<PCollectionView<?>> sideInputs = - ImmutableList.<PCollectionView<?>>builder().add(resultsView); - if (numShardsView != null) { - sideInputs.add(numShardsView); - } - - // Finalize the write in another do-once ParDo on the singleton collection containing the - // Writer. The results from the per-bundle writes are given as an Iterable side input. - // The WriteOperation's state is the same as after its initialization in the first do-once - // ParDo. There is a dependency between this ParDo and the parallel write (the writer - // results collection as a side input), so it will happen after the parallel write. - // For the non-windowed case, we guarantee that if no data is written but the user has - // set numShards, then all shards will be written out as empty files. For this reason we - // use a side input here. - operationCollection - .apply("Finalize", ParDo.of(new DoFn<WriteOperation<T, WriteT>, Integer>() { - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - WriteOperation<T, WriteT> writeOperation = c.element(); - LOG.info("Finalizing write operation {}.", writeOperation); - List<WriteT> results = Lists.newArrayList(c.sideInput(resultsView)); - LOG.debug("Side input initialized to finalize write operation {}.", writeOperation); - - // We must always output at least 1 shard, and honor user-specified numShards if - // set. - int minShardsNeeded; - if (numShardsView != null) { - minShardsNeeded = c.sideInput(numShardsView); - } else if (numShardsProvider != null) { - minShardsNeeded = numShardsProvider.get(); - } else { - minShardsNeeded = 1; - } - int extraShardsNeeded = minShardsNeeded - results.size(); - if (extraShardsNeeded > 0) { - LOG.info( - "Creating {} empty output shards in addition to {} written for a total of " - + " {}.", extraShardsNeeded, results.size(), minShardsNeeded); - for (int i = 0; i < extraShardsNeeded; ++i) { - Writer<T, WriteT> writer = writeOperation.createWriter(c.getPipelineOptions()); - writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, - UNKNOWN_NUMSHARDS); - WriteT emptyWrite = writer.close(); - results.add(emptyWrite); - } - LOG.debug("Done creating extra shards."); - } - writeOperation.finalize(results, c.getPipelineOptions()); - LOG.debug("Done finalizing write operation {}", writeOperation); - } - }).withSideInputs(sideInputs.build())); - } - return PDone.in(input.getPipeline()); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/package-info.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/package-info.java deleted file mode 100644 index 763b30a..0000000 --- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Transforms used to read from the Hadoop file system (HDFS). - */ -package org.apache.beam.sdk.io.hdfs; http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java deleted file mode 100644 index 9fa6606..0000000 --- a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.hdfs; - -import static org.junit.Assert.assertEquals; - -import com.google.common.base.MoreObjects; -import java.io.File; -import java.nio.charset.Charset; -import java.nio.file.Files; -import java.util.Collections; -import java.util.List; -import java.util.Objects; -import java.util.UUID; -import org.apache.avro.file.DataFileReader; -import org.apache.avro.file.FileReader; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.mapred.AvroKey; -import org.apache.beam.sdk.coders.AvroCoder; -import org.apache.beam.sdk.coders.DefaultCoder; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.values.KV; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - -/** - * Tests for HDFSFileSinkTest. - */ -public class HDFSFileSinkTest { - - @Rule - public TemporaryFolder tmpFolder = new TemporaryFolder(); - - private final String part0 = "part-r-00000"; - private final String foobar = "foobar"; - - private <T> void doWrite(Sink<T> sink, - PipelineOptions options, - Iterable<T> toWrite) throws Exception { - Sink.WriteOperation<T, String> writeOperation = - (Sink.WriteOperation<T, String>) sink.createWriteOperation(); - Sink.Writer<T, String> writer = writeOperation.createWriter(options); - writer.openUnwindowed(UUID.randomUUID().toString(), -1, -1); - for (T t: toWrite) { - writer.write(t); - } - String writeResult = writer.close(); - writeOperation.finalize(Collections.singletonList(writeResult), options); - } - - @Test - public void testWriteSingleRecord() throws Exception { - PipelineOptions options = PipelineOptionsFactory.create(); - File file = tmpFolder.newFolder(); - - HDFSFileSink<String, NullWritable, Text> sink = - HDFSFileSink.to( - file.toString(), - SequenceFileOutputFormat.class, - NullWritable.class, - Text.class, - new SerializableFunction<String, KV<NullWritable, Text>>() { - @Override - public KV<NullWritable, Text> apply(String input) { - return KV.of(NullWritable.get(), new Text(input)); - } - }); - - doWrite(sink, options, Collections.singletonList(foobar)); - - SequenceFile.Reader.Option opts = - SequenceFile.Reader.file(new Path(file.toString(), part0)); - SequenceFile.Reader reader = new SequenceFile.Reader(new Configuration(), opts); - assertEquals(NullWritable.class.getName(), reader.getKeyClassName()); - assertEquals(Text.class.getName(), reader.getValueClassName()); - NullWritable k = NullWritable.get(); - Text v = new Text(); - assertEquals(true, reader.next(k, v)); - assertEquals(NullWritable.get(), k); - assertEquals(new Text(foobar), v); - } - - @Test - public void testToText() throws Exception { - PipelineOptions options = PipelineOptionsFactory.create(); - File file = tmpFolder.newFolder(); - - HDFSFileSink<String, NullWritable, Text> sink = HDFSFileSink.toText(file.toString()); - - doWrite(sink, options, Collections.singletonList(foobar)); - - List<String> strings = Files.readAllLines(new File(file.toString(), part0).toPath(), - Charset.forName("UTF-8")); - assertEquals(Collections.singletonList(foobar), strings); - } - - @DefaultCoder(AvroCoder.class) - static class GenericClass { - int intField; - String stringField; - public GenericClass() {} - public GenericClass(int intValue, String stringValue) { - this.intField = intValue; - this.stringField = stringValue; - } - @Override - public String toString() { - return MoreObjects.toStringHelper(getClass()) - .add("intField", intField) - .add("stringField", stringField) - .toString(); - } - @Override - public int hashCode() { - return Objects.hash(intField, stringField); - } - @Override - public boolean equals(Object other) { - if (other == null || !(other instanceof GenericClass)) { - return false; - } - GenericClass o = (GenericClass) other; - return Objects.equals(intField, o.intField) && Objects.equals(stringField, o.stringField); - } - } - - @Test - public void testToAvro() throws Exception { - PipelineOptions options = PipelineOptionsFactory.create(); - File file = tmpFolder.newFolder(); - - HDFSFileSink<GenericClass, AvroKey<GenericClass>, NullWritable> sink = HDFSFileSink.toAvro( - file.toString(), - AvroCoder.of(GenericClass.class), - new Configuration(false)); - - doWrite(sink, options, Collections.singletonList(new GenericClass(3, "foobar"))); - - GenericDatumReader datumReader = new GenericDatumReader(); - FileReader<GenericData.Record> reader = - DataFileReader.openReader(new File(file.getAbsolutePath(), part0 + ".avro"), datumReader); - GenericData.Record next = reader.next(null); - assertEquals("foobar", next.get("stringField").toString()); - assertEquals(3, next.get("intField")); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java deleted file mode 100644 index a964239..0000000 --- a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java +++ /dev/null @@ -1,231 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.hdfs; - -import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Random; -import org.apache.beam.sdk.io.BoundedSource; -import org.apache.beam.sdk.io.Source; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.testing.SourceTestUtils; -import org.apache.beam.sdk.values.KV; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.SequenceFile.Writer; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - -/** - * Tests for HDFSFileSource. - */ -public class HDFSFileSourceTest { - - private Random random = new Random(0L); - - @Rule - public TemporaryFolder tmpFolder = new TemporaryFolder(); - - @Test - public void testFullyReadSingleFile() throws Exception { - PipelineOptions options = PipelineOptionsFactory.create(); - List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10, 0); - File file = createFileWithData("tmp.seq", expectedResults); - - HDFSFileSource<KV<IntWritable, Text>, IntWritable, Text> source = - HDFSFileSource.from( - file.toString(), SequenceFileInputFormat.class, IntWritable.class, Text.class); - - assertEquals(file.length(), source.getEstimatedSizeBytes(null)); - - assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray())); - } - - @Test - public void testFullyReadSingleFileWithSpaces() throws Exception { - PipelineOptions options = PipelineOptionsFactory.create(); - List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10, 0); - File file = createFileWithData("tmp data.seq", expectedResults); - - HDFSFileSource<KV<IntWritable, Text>, IntWritable, Text> source = - HDFSFileSource.from( - file.toString(), SequenceFileInputFormat.class, IntWritable.class, Text.class); - - assertEquals(file.length(), source.getEstimatedSizeBytes(null)); - - assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray())); - } - - @Test - public void testFullyReadFilePattern() throws IOException { - PipelineOptions options = PipelineOptionsFactory.create(); - List<KV<IntWritable, Text>> data1 = createRandomRecords(3, 10, 0); - File file1 = createFileWithData("file1", data1); - - List<KV<IntWritable, Text>> data2 = createRandomRecords(3, 10, 10); - createFileWithData("file2", data2); - - List<KV<IntWritable, Text>> data3 = createRandomRecords(3, 10, 20); - createFileWithData("file3", data3); - - List<KV<IntWritable, Text>> data4 = createRandomRecords(3, 10, 30); - createFileWithData("otherfile", data4); - - List<KV<IntWritable, Text>> expectedResults = new ArrayList<>(); - expectedResults.addAll(data1); - expectedResults.addAll(data2); - expectedResults.addAll(data3); - - HDFSFileSource<KV<IntWritable, Text>, IntWritable, Text> source = - HDFSFileSource.from( - new File(file1.getParent(), "file*").toString(), SequenceFileInputFormat.class, - IntWritable.class, Text.class); - - assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray())); - } - - @Test - public void testCloseUnstartedFilePatternReader() throws IOException { - PipelineOptions options = PipelineOptionsFactory.create(); - List<KV<IntWritable, Text>> data1 = createRandomRecords(3, 10, 0); - File file1 = createFileWithData("file1", data1); - - List<KV<IntWritable, Text>> data2 = createRandomRecords(3, 10, 10); - createFileWithData("file2", data2); - - List<KV<IntWritable, Text>> data3 = createRandomRecords(3, 10, 20); - createFileWithData("file3", data3); - - List<KV<IntWritable, Text>> data4 = createRandomRecords(3, 10, 30); - createFileWithData("otherfile", data4); - - HDFSFileSource<KV<IntWritable, Text>, IntWritable, Text> source = - HDFSFileSource.from( - new File(file1.getParent(), "file*").toString(), - SequenceFileInputFormat.class, IntWritable.class, Text.class); - Source.Reader<KV<IntWritable, Text>> reader = source.createReader(options); - - // Closing an unstarted FilePatternReader should not throw an exception. - try { - reader.close(); - } catch (Exception e) { - fail("Closing an unstarted FilePatternReader should not throw an exception"); - } - } - - @Test - public void testSplits() throws Exception { - PipelineOptions options = PipelineOptionsFactory.create(); - - List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10000, 0); - File file = createFileWithData("tmp.seq", expectedResults); - - HDFSFileSource<KV<IntWritable, Text>, IntWritable, Text> source = - HDFSFileSource.from( - file.toString(), SequenceFileInputFormat.class, IntWritable.class, Text.class); - - // Assert that the source produces the expected records - assertEquals(expectedResults, readFromSource(source, options)); - - // Split with a small bundle size (has to be at least size of sync interval) - List<? extends BoundedSource<KV<IntWritable, Text>>> splits = source - .split(SequenceFile.SYNC_INTERVAL, options); - assertTrue(splits.size() > 2); - SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options); - int nonEmptySplits = 0; - for (BoundedSource<KV<IntWritable, Text>> subSource : splits) { - if (readFromSource(subSource, options).size() > 0) { - nonEmptySplits += 1; - } - } - assertTrue(nonEmptySplits > 2); - } - - @Test - public void testSplitEstimatedSize() throws Exception { - PipelineOptions options = PipelineOptionsFactory.create(); - - List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10000, 0); - File file = createFileWithData("tmp.avro", expectedResults); - - HDFSFileSource<KV<IntWritable, Text>, IntWritable, Text> source = - HDFSFileSource.from(file.toString(), SequenceFileInputFormat.class, - IntWritable.class, Text.class); - - long originalSize = source.getEstimatedSizeBytes(options); - long splitTotalSize = 0; - List<? extends BoundedSource<KV<IntWritable, Text>>> splits = source.split( - SequenceFile.SYNC_INTERVAL, options - ); - for (BoundedSource<KV<IntWritable, Text>> splitSource : splits) { - splitTotalSize += splitSource.getEstimatedSizeBytes(options); - } - // Assert that the estimated size of the whole is the sum of its parts - assertEquals(originalSize, splitTotalSize); - } - - private File createFileWithData(String filename, List<KV<IntWritable, Text>> records) - throws IOException { - File tmpFile = tmpFolder.newFile(filename); - try (Writer writer = SequenceFile.createWriter(new Configuration(), - Writer.keyClass(IntWritable.class), Writer.valueClass(Text.class), - Writer.file(new Path(tmpFile.toURI())))) { - - for (KV<IntWritable, Text> record : records) { - writer.append(record.getKey(), record.getValue()); - } - } - return tmpFile; - } - - private List<KV<IntWritable, Text>> createRandomRecords(int dataItemLength, - int numItems, int offset) { - List<KV<IntWritable, Text>> records = new ArrayList<>(); - for (int i = 0; i < numItems; i++) { - IntWritable key = new IntWritable(i + offset); - Text value = new Text(createRandomString(dataItemLength)); - records.add(KV.of(key, value)); - } - return records; - } - - private String createRandomString(int length) { - char[] chars = "abcdefghijklmnopqrstuvwxyz".toCharArray(); - StringBuilder builder = new StringBuilder(); - for (int i = 0; i < length; i++) { - builder.append(chars[random.nextInt(chars.length)]); - } - return builder.toString(); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModuleTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModuleTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModuleTest.java deleted file mode 100644 index 6963116..0000000 --- a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModuleTest.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.hdfs; - -import static org.hamcrest.Matchers.hasItem; -import static org.junit.Assert.assertThat; - -import com.fasterxml.jackson.databind.Module; -import com.fasterxml.jackson.databind.ObjectMapper; -import java.util.AbstractMap; -import java.util.List; -import java.util.Map; -import org.apache.beam.sdk.util.common.ReflectHelpers; -import org.apache.hadoop.conf.Configuration; -import org.hamcrest.Matchers; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Test for {@link HadoopFileSystemModule}. - */ -@RunWith(JUnit4.class) -public class HadoopFileSystemModuleTest { - @Test - public void testObjectMapperIsAbleToFindModule() throws Exception { - List<Module> modules = ObjectMapper.findModules(ReflectHelpers.findClassLoader()); - assertThat(modules, hasItem(Matchers.<Module>instanceOf(HadoopFileSystemModule.class))); - } - - @Test - public void testConfigurationSerializationDeserialization() throws Exception { - Configuration baseConfiguration = new Configuration(false); - baseConfiguration.set("testPropertyA", "baseA"); - baseConfiguration.set("testPropertyC", "baseC"); - Configuration configuration = new Configuration(false); - configuration.addResource(baseConfiguration); - configuration.set("testPropertyA", "A"); - configuration.set("testPropertyB", "B"); - ObjectMapper objectMapper = new ObjectMapper(); - objectMapper.registerModule(new HadoopFileSystemModule()); - String serializedConfiguration = objectMapper.writeValueAsString(configuration); - Configuration deserializedConfiguration = - objectMapper.readValue(serializedConfiguration, Configuration.class); - assertThat(deserializedConfiguration, Matchers.<Map.Entry<String, String>>contains( - new AbstractMap.SimpleEntry("testPropertyA", "A"), - new AbstractMap.SimpleEntry("testPropertyB", "B"), - new AbstractMap.SimpleEntry("testPropertyC", "baseC"))); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrarTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrarTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrarTest.java deleted file mode 100644 index 2be3d93..0000000 --- a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrarTest.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.hdfs; - -import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; - -import com.google.common.collect.Lists; -import java.util.ServiceLoader; -import org.apache.beam.sdk.options.PipelineOptionsRegistrar; -import org.hamcrest.Matchers; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for {@link HadoopFileSystemOptionsRegistrar}. - */ -@RunWith(JUnit4.class) -public class HadoopFileSystemOptionsRegistrarTest { - - @Test - public void testServiceLoader() { - for (PipelineOptionsRegistrar registrar - : Lists.newArrayList(ServiceLoader.load(PipelineOptionsRegistrar.class).iterator())) { - if (registrar instanceof HadoopFileSystemOptionsRegistrar) { - assertThat(registrar.getPipelineOptions(), - Matchers.<Class<?>>contains(HadoopFileSystemOptions.class)); - return; - } - } - fail("Expected to find " + HadoopFileSystemOptionsRegistrar.class); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsTest.java deleted file mode 100644 index 634528b..0000000 --- a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsTest.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.hdfs; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; - -import java.util.AbstractMap; -import java.util.Map; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.hamcrest.Matchers; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for {@link HadoopFileSystemOptions}. - */ -@RunWith(JUnit4.class) -public class HadoopFileSystemOptionsTest { - @Test - public void testParsingHdfsConfiguration() { - HadoopFileSystemOptions options = PipelineOptionsFactory.fromArgs( - "--hdfsConfiguration=[" - + "{\"propertyA\": \"A\"}," - + "{\"propertyB\": \"B\"}]").as(HadoopFileSystemOptions.class); - assertEquals(2, options.getHdfsConfiguration().size()); - assertThat(options.getHdfsConfiguration().get(0), Matchers.<Map.Entry<String, String>>contains( - new AbstractMap.SimpleEntry("propertyA", "A"))); - assertThat(options.getHdfsConfiguration().get(1), Matchers.<Map.Entry<String, String>>contains( - new AbstractMap.SimpleEntry("propertyB", "B"))); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java deleted file mode 100644 index 96f7102..0000000 --- a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.hdfs; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import java.net.URI; -import java.util.ServiceLoader; -import org.apache.beam.sdk.io.FileSystem; -import org.apache.beam.sdk.io.FileSystemRegistrar; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for {@link HadoopFileSystemRegistrar}. - */ -@RunWith(JUnit4.class) -public class HadoopFileSystemRegistrarTest { - - @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); - private Configuration configuration; - private MiniDFSCluster hdfsCluster; - private URI hdfsClusterBaseUri; - - @Before - public void setUp() throws Exception { - configuration = new Configuration(); - configuration.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpFolder.getRoot().getAbsolutePath()); - MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(configuration); - hdfsCluster = builder.build(); - hdfsClusterBaseUri = new URI(configuration.get("fs.defaultFS") + "/"); - } - - @After - public void tearDown() throws Exception { - hdfsCluster.shutdown(); - } - - @Test - public void testServiceLoader() { - HadoopFileSystemOptions options = PipelineOptionsFactory.as(HadoopFileSystemOptions.class); - options.setHdfsConfiguration(ImmutableList.of(configuration)); - for (FileSystemRegistrar registrar - : Lists.newArrayList(ServiceLoader.load(FileSystemRegistrar.class).iterator())) { - if (registrar instanceof HadoopFileSystemRegistrar) { - Iterable<FileSystem> fileSystems = registrar.fromOptions(options); - assertEquals(hdfsClusterBaseUri.getScheme(), - ((HadoopFileSystem) Iterables.getOnlyElement(fileSystems)).getScheme()); - return; - } - } - fail("Expected to find " + HadoopFileSystemRegistrar.class); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java deleted file mode 100644 index cf86c36..0000000 --- a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java +++ /dev/null @@ -1,247 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.hdfs; - -import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import com.google.common.io.ByteStreams; -import java.net.URI; -import java.nio.ByteBuffer; -import java.nio.channels.Channels; -import java.nio.channels.ReadableByteChannel; -import java.nio.channels.WritableByteChannel; -import java.util.List; -import org.apache.beam.sdk.io.FileSystems; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.io.fs.CreateOptions.StandardCreateOptions; -import org.apache.beam.sdk.io.fs.MatchResult; -import org.apache.beam.sdk.io.fs.MatchResult.Metadata; -import org.apache.beam.sdk.io.fs.MatchResult.Status; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.util.MimeTypes; -import org.apache.beam.sdk.values.PCollection; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.junit.After; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for {@link HadoopFileSystem}. - */ -@RunWith(JUnit4.class) -public class HadoopFileSystemTest { - - @Rule public TestPipeline p = TestPipeline.create(); - @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); - @Rule public ExpectedException thrown = ExpectedException.none(); - private Configuration configuration; - private MiniDFSCluster hdfsCluster; - private URI hdfsClusterBaseUri; - private HadoopFileSystem fileSystem; - - @Before - public void setUp() throws Exception { - configuration = new Configuration(); - configuration.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpFolder.getRoot().getAbsolutePath()); - MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(configuration); - hdfsCluster = builder.build(); - hdfsClusterBaseUri = new URI(configuration.get("fs.defaultFS") + "/"); - fileSystem = new HadoopFileSystem(configuration); - } - - @After - public void tearDown() throws Exception { - hdfsCluster.shutdown(); - } - - @Test - public void testCreateAndReadFile() throws Exception { - create("testFile", "testData".getBytes()); - assertArrayEquals("testData".getBytes(), read("testFile")); - } - - @Test - public void testCopy() throws Exception { - create("testFileA", "testDataA".getBytes()); - create("testFileB", "testDataB".getBytes()); - fileSystem.copy( - ImmutableList.of( - testPath("testFileA"), - testPath("testFileB")), - ImmutableList.of( - testPath("copyTestFileA"), - testPath("copyTestFileB"))); - assertArrayEquals("testDataA".getBytes(), read("testFileA")); - assertArrayEquals("testDataB".getBytes(), read("testFileB")); - assertArrayEquals("testDataA".getBytes(), read("copyTestFileA")); - assertArrayEquals("testDataB".getBytes(), read("copyTestFileB")); - } - - @Test - public void testDelete() throws Exception { - create("testFileA", "testDataA".getBytes()); - create("testFileB", "testDataB".getBytes()); - create("testFileC", "testDataC".getBytes()); - - // ensure files exist - assertArrayEquals("testDataA".getBytes(), read("testFileA")); - assertArrayEquals("testDataB".getBytes(), read("testFileB")); - assertArrayEquals("testDataC".getBytes(), read("testFileC")); - - fileSystem.delete(ImmutableList.of( - testPath("testFileA"), - testPath("testFileC"))); - - List<MatchResult> results = - fileSystem.match(ImmutableList.of(testPath("testFile*").toString())); - assertThat(results, contains(MatchResult.create(Status.OK, ImmutableList.of( - Metadata.builder() - .setResourceId(testPath("testFileB")) - .setIsReadSeekEfficient(true) - .setSizeBytes("testDataB".getBytes().length) - .build())))); - } - - @Test - public void testMatch() throws Exception { - create("testFileAA", "testDataAA".getBytes()); - create("testFileA", "testDataA".getBytes()); - create("testFileB", "testDataB".getBytes()); - - // ensure files exist - assertArrayEquals("testDataAA".getBytes(), read("testFileAA")); - assertArrayEquals("testDataA".getBytes(), read("testFileA")); - assertArrayEquals("testDataB".getBytes(), read("testFileB")); - - List<MatchResult> results = - fileSystem.match(ImmutableList.of(testPath("testFileA*").toString())); - assertEquals(Status.OK, Iterables.getOnlyElement(results).status()); - assertThat(Iterables.getOnlyElement(results).metadata(), containsInAnyOrder( - Metadata.builder() - .setResourceId(testPath("testFileAA")) - .setIsReadSeekEfficient(true) - .setSizeBytes("testDataAA".getBytes().length) - .build(), - Metadata.builder() - .setResourceId(testPath("testFileA")) - .setIsReadSeekEfficient(true) - .setSizeBytes("testDataA".getBytes().length) - .build())); - } - - @Test - public void testRename() throws Exception { - create("testFileA", "testDataA".getBytes()); - create("testFileB", "testDataB".getBytes()); - - // ensure files exist - assertArrayEquals("testDataA".getBytes(), read("testFileA")); - assertArrayEquals("testDataB".getBytes(), read("testFileB")); - - fileSystem.rename( - ImmutableList.of( - testPath("testFileA"), testPath("testFileB")), - ImmutableList.of( - testPath("renameFileA"), testPath("renameFileB"))); - - List<MatchResult> results = - fileSystem.match(ImmutableList.of(testPath("*").toString())); - assertEquals(Status.OK, Iterables.getOnlyElement(results).status()); - assertThat(Iterables.getOnlyElement(results).metadata(), containsInAnyOrder( - Metadata.builder() - .setResourceId(testPath("renameFileA")) - .setIsReadSeekEfficient(true) - .setSizeBytes("testDataA".getBytes().length) - .build(), - Metadata.builder() - .setResourceId(testPath("renameFileB")) - .setIsReadSeekEfficient(true) - .setSizeBytes("testDataB".getBytes().length) - .build())); - - // ensure files exist - assertArrayEquals("testDataA".getBytes(), read("renameFileA")); - assertArrayEquals("testDataB".getBytes(), read("renameFileB")); - } - - @Test - public void testMatchNewResource() throws Exception { - // match file spec - assertEquals(testPath("file"), - fileSystem.matchNewResource(testPath("file").toString(), false)); - // match dir spec missing '/' - assertEquals(testPath("dir/"), - fileSystem.matchNewResource(testPath("dir").toString(), true)); - // match dir spec with '/' - assertEquals(testPath("dir/"), - fileSystem.matchNewResource(testPath("dir/").toString(), true)); - - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("Expected file path but received directory path"); - fileSystem.matchNewResource(testPath("dir/").toString(), false); - } - - @Test - @Ignore("TestPipeline needs a way to take in HadoopFileSystemOptions") - public void testReadPipeline() throws Exception { - create("testFileA", "testDataA".getBytes()); - create("testFileB", "testDataB".getBytes()); - create("testFileC", "testDataC".getBytes()); - - HadoopFileSystemOptions options = TestPipeline.testingPipelineOptions() - .as(HadoopFileSystemOptions.class); - options.setHdfsConfiguration(ImmutableList.of(fileSystem.fileSystem.getConf())); - FileSystems.setDefaultConfigInWorkers(options); - PCollection<String> pc = p.apply( - TextIO.read().from(testPath("testFile*").toString())); - PAssert.that(pc).containsInAnyOrder("testDataA", "testDataB", "testDataC"); - p.run(); - } - - private void create(String relativePath, byte[] contents) throws Exception { - try (WritableByteChannel channel = fileSystem.create( - testPath(relativePath), - StandardCreateOptions.builder().setMimeType(MimeTypes.BINARY).build())) { - channel.write(ByteBuffer.wrap(contents)); - } - } - - private byte[] read(String relativePath) throws Exception { - try (ReadableByteChannel channel = fileSystem.open(testPath(relativePath))) { - return ByteStreams.toByteArray(Channels.newInputStream(channel)); - } - } - - private HadoopResourceId testPath(String relativePath) { - return new HadoopResourceId(hdfsClusterBaseUri.resolve(relativePath)); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml index 5b1e243..9480705 100644 --- a/sdks/java/io/pom.xml +++ b/sdks/java/io/pom.xml @@ -68,9 +68,9 @@ <module>elasticsearch</module> <module>google-cloud-platform</module> <module>hadoop-common</module> + <module>hadoop-file-system</module> <module>hadoop</module> <module>hbase</module> - <module>hdfs</module> <module>jdbc</module> <module>jms</module> <module>kafka</module>