http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java new file mode 100644 index 0000000..fe2db5f --- /dev/null +++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hdfs; + +import java.io.Serializable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.HasDisplayData; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; + +/** + * This class is deprecated, and only exists for HDFSFileSink. + */ +@Deprecated +public abstract class Sink<T> implements Serializable, HasDisplayData { + /** + * Ensures that the sink is valid and can be written to before the write operation begins. One + * should use {@link com.google.common.base.Preconditions} to implement this method. + */ + public abstract void validate(PipelineOptions options); + + /** + * Returns an instance of a {@link WriteOperation} that can write to this Sink. + */ + public abstract WriteOperation<T, ?> createWriteOperation(); + + /** + * {@inheritDoc} + * + * <p>By default, does not register any display data. Implementors may override this method + * to provide their own display data. + */ + @Override + public void populateDisplayData(DisplayData.Builder builder) {} + + /** + * A {@link WriteOperation} defines the process of a parallel write of objects to a Sink. + * + * <p>The {@code WriteOperation} defines how to perform initialization and finalization of a + * parallel write to a sink as well as how to create a {@link Sink.Writer} object that can write + * a bundle to the sink. + * + * <p>Since operations in Beam may be run multiple times for redundancy or fault-tolerance, + * the initialization and finalization defined by a WriteOperation <b>must be idempotent</b>. + * + * <p>{@code WriteOperation}s may be mutable; a {@code WriteOperation} is serialized after the + * call to {@code initialize} method and deserialized before calls to + * {@code createWriter} and {@code finalized}. However, it is not + * reserialized after {@code createWriter}, so {@code createWriter} should not mutate the + * state of the {@code WriteOperation}. + * + * <p>See {@link Sink} for more detailed documentation about the process of writing to a Sink. + * + * @param <T> The type of objects to write + * @param <WriteT> The result of a per-bundle write + */ + public abstract static class WriteOperation<T, WriteT> implements Serializable { + /** + * Performs initialization before writing to the sink. Called before writing begins. + */ + public abstract void initialize(PipelineOptions options) throws Exception; + + /** + * Indicates that the operation will be performing windowed writes. + */ + public abstract void setWindowedWrites(boolean windowedWrites); + + /** + * Given an Iterable of results from bundle writes, performs finalization after writing and + * closes the sink. Called after all bundle writes are complete. + * + * <p>The results that are passed to finalize are those returned by bundles that completed + * successfully. Although bundles may have been run multiple times (for fault-tolerance), only + * one writer result will be passed to finalize for each bundle. An implementation of finalize + * should perform clean up of any failed and successfully retried bundles. Note that these + * failed bundles will not have their writer result passed to finalize, so finalize should be + * capable of locating any temporary/partial output written by failed bundles. + * + * <p>A best practice is to make finalize atomic. If this is impossible given the semantics + * of the sink, finalize should be idempotent, as it may be called multiple times in the case of + * failure/retry or for redundancy. + * + * <p>Note that the iteration order of the writer results is not guaranteed to be consistent if + * finalize is called multiple times. + * + * @param writerResults an Iterable of results from successful bundle writes. + */ + public abstract void finalize(Iterable<WriteT> writerResults, PipelineOptions options) + throws Exception; + + /** + * Creates a new {@link Sink.Writer} to write a bundle of the input to the sink. + * + * <p>The bundle id that the writer will use to uniquely identify its output will be passed to + * {@link Writer#openWindowed} or {@link Writer#openUnwindowed}. + * + * <p>Must not mutate the state of the WriteOperation. + */ + public abstract Writer<T, WriteT> createWriter(PipelineOptions options) throws Exception; + + /** + * Returns the Sink that this write operation writes to. + */ + public abstract Sink<T> getSink(); + + /** + * Returns a coder for the writer result type. + */ + public abstract Coder<WriteT> getWriterResultCoder(); + } + + /** + * A Writer writes a bundle of elements from a PCollection to a sink. + * {@link Writer#openWindowed} or {@link Writer#openUnwindowed} is called before writing begins + * and {@link Writer#close} is called after all elements in the bundle have been written. + * {@link Writer#write} writes an element to the sink. + * + * <p>Note that any access to static members or methods of a Writer must be thread-safe, as + * multiple instances of a Writer may be instantiated in different threads on the same worker. + * + * <p>See {@link Sink} for more detailed documentation about the process of writing to a Sink. + * + * @param <T> The type of object to write + * @param <WriteT> The writer results type (e.g., the bundle's output filename, as String) + */ + public abstract static class Writer<T, WriteT> { + /** + * Performs bundle initialization. For example, creates a temporary file for writing or + * initializes any state that will be used across calls to {@link Writer#write}. + * + * <p>The unique id that is given to open should be used to ensure that the writer's output does + * not interfere with the output of other Writers, as a bundle may be executed many times for + * fault tolerance. See {@link Sink} for more information about bundle ids. + * + * <p>The window and paneInfo arguments are populated when windowed writes are requested. + * shard and numbShards are populated for the case of static sharding. In cases where the + * runner is dynamically picking sharding, shard and numShards might both be set to -1. + */ + public abstract void openWindowed(String uId, + BoundedWindow window, + PaneInfo paneInfo, + int shard, + int numShards) throws Exception; + + /** + * Perform bundle initialization for the case where the file is written unwindowed. + */ + public abstract void openUnwindowed(String uId, + int shard, + int numShards) throws Exception; + + public abstract void cleanup() throws Exception; + + /** + * Called for each value in the bundle. + */ + public abstract void write(T value) throws Exception; + + /** + * Finishes writing the bundle. Closes any resources used for writing the bundle. + * + * <p>Returns a writer result that will be used in the {@link Sink.WriteOperation}'s + * finalization. The result should contain some way to identify the output of this bundle (using + * the bundle id). {@link WriteOperation#finalize} will use the writer result to identify + * successful writes. See {@link Sink} for more information about bundle ids. + * + * @return the writer result + */ + public abstract WriteT close() throws Exception; + + /** + * Returns the write operation this writer belongs to. + */ + public abstract WriteOperation<T, WriteT> getWriteOperation(); + + + } +}
http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/UGIHelper.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/UGIHelper.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/UGIHelper.java new file mode 100644 index 0000000..fd05a19 --- /dev/null +++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/UGIHelper.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hdfs; + +import java.io.IOException; +import javax.annotation.Nullable; +import org.apache.hadoop.security.UserGroupInformation; + +/** + * {@link UserGroupInformation} helper methods. + */ +public class UGIHelper { + + /** + * Find the most appropriate UserGroupInformation to use. + * @param username the user name, or NULL if none is specified. + * @return the most appropriate UserGroupInformation + */ + public static UserGroupInformation getBestUGI(@Nullable String username) throws IOException { + return UserGroupInformation.getBestUGI(null, username); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java new file mode 100644 index 0000000..86a9246 --- /dev/null +++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java @@ -0,0 +1,585 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hdfs; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; +import javax.annotation.Nullable; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.io.hdfs.Sink.WriteOperation; +import org.apache.beam.sdk.io.hdfs.Sink.Writer; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollection.IsBounded; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PDone; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class is deprecated, and only exists currently for HDFSFileSink. + */ +@Deprecated +public class Write<T> extends PTransform<PCollection<T>, PDone> { + private static final Logger LOG = LoggerFactory.getLogger(Write.class); + + private static final int UNKNOWN_SHARDNUM = -1; + private static final int UNKNOWN_NUMSHARDS = -1; + + private final Sink<T> sink; + // This allows the number of shards to be dynamically computed based on the input + // PCollection. + @Nullable + private final PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards; + // We don't use a side input for static sharding, as we want this value to be updatable + // when a pipeline is updated. + @Nullable + private final ValueProvider<Integer> numShardsProvider; + private boolean windowedWrites; + + /** + * Creates a {@link Write} transform that writes to the given {@link Sink}, letting the runner + * control how many different shards are produced. + */ + public static <T> Write<T> to(Sink<T> sink) { + checkNotNull(sink, "sink"); + return new Write<>(sink, null /* runner-determined sharding */, null, false); + } + + private Write( + Sink<T> sink, + @Nullable PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards, + @Nullable ValueProvider<Integer> numShardsProvider, + boolean windowedWrites) { + this.sink = sink; + this.computeNumShards = computeNumShards; + this.numShardsProvider = numShardsProvider; + this.windowedWrites = windowedWrites; + } + + @Override + public PDone expand(PCollection<T> input) { + checkArgument(IsBounded.BOUNDED == input.isBounded() || windowedWrites, + "%s can only be applied to an unbounded PCollection if doing windowed writes", + Write.class.getSimpleName()); + return createWrite(input, sink.createWriteOperation()); + } + + @Override + public void validate(PipelineOptions options) { + sink.validate(options); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder + .add(DisplayData.item("sink", sink.getClass()).withLabel("Write Sink")) + .include("sink", sink); + if (getSharding() != null) { + builder.include("sharding", getSharding()); + } else if (getNumShards() != null) { + String numShards = getNumShards().isAccessible() + ? getNumShards().get().toString() : getNumShards().toString(); + builder.add(DisplayData.item("numShards", numShards) + .withLabel("Fixed Number of Shards")); + } + } + + /** + * Returns the {@link Sink} associated with this PTransform. + */ + public Sink<T> getSink() { + return sink; + } + + /** + * Gets the {@link PTransform} that will be used to determine sharding. This can be either a + * static number of shards (as following a call to {@link #withNumShards(int)}), dynamic (by + * {@link #withSharding(PTransform)}), or runner-determined (by {@link + * #withRunnerDeterminedSharding()}. + */ + @Nullable + public PTransform<PCollection<T>, PCollectionView<Integer>> getSharding() { + return computeNumShards; + } + + public ValueProvider<Integer> getNumShards() { + return numShardsProvider; + } + + /** + * Returns a new {@link Write} that will write to the current {@link Sink} using the + * specified number of shards. + * + * <p>This option should be used sparingly as it can hurt performance. See {@link Write} for + * more information. + * + * <p>A value less than or equal to 0 will be equivalent to the default behavior of + * runner-determined sharding. + */ + public Write<T> withNumShards(int numShards) { + if (numShards > 0) { + return withNumShards(StaticValueProvider.of(numShards)); + } + return withRunnerDeterminedSharding(); + } + + /** + * Returns a new {@link Write} that will write to the current {@link Sink} using the + * {@link ValueProvider} specified number of shards. + * + * <p>This option should be used sparingly as it can hurt performance. See {@link Write} for + * more information. + */ + public Write<T> withNumShards(ValueProvider<Integer> numShardsProvider) { + return new Write<>(sink, null, numShardsProvider, windowedWrites); + } + + /** + * Returns a new {@link Write} that will write to the current {@link Sink} using the + * specified {@link PTransform} to compute the number of shards. + * + * <p>This option should be used sparingly as it can hurt performance. See {@link Write} for + * more information. + */ + public Write<T> withSharding(PTransform<PCollection<T>, PCollectionView<Integer>> sharding) { + checkNotNull( + sharding, "Cannot provide null sharding. Use withRunnerDeterminedSharding() instead"); + return new Write<>(sink, sharding, null, windowedWrites); + } + + /** + * Returns a new {@link Write} that will write to the current {@link Sink} with + * runner-determined sharding. + */ + public Write<T> withRunnerDeterminedSharding() { + return new Write<>(sink, null, null, windowedWrites); + } + + /** + * Returns a new {@link Write} that writes preserves windowing on it's input. + * + * <p>If this option is not specified, windowing and triggering are replaced by + * {@link GlobalWindows} and {@link DefaultTrigger}. + * + * <p>If there is no data for a window, no output shards will be generated for that window. + * If a window triggers multiple times, then more than a single output shard might be + * generated multiple times; it's up to the sink implementation to keep these output shards + * unique. + * + * <p>This option can only be used if {@link #withNumShards(int)} is also set to a + * positive value. + */ + public Write<T> withWindowedWrites() { + return new Write<>(sink, computeNumShards, numShardsProvider, true); + } + + /** + * Writes all the elements in a bundle using a {@link Writer} produced by the + * {@link WriteOperation} associated with the {@link Sink}. + */ + private class WriteBundles<WriteT> extends DoFn<T, WriteT> { + // Writer that will write the records in this bundle. Lazily + // initialized in processElement. + private Writer<T, WriteT> writer = null; + private final PCollectionView<WriteOperation<T, WriteT>> writeOperationView; + + WriteBundles(PCollectionView<WriteOperation<T, WriteT>> writeOperationView) { + this.writeOperationView = writeOperationView; + } + + @ProcessElement + public void processElement(ProcessContext c, BoundedWindow window) throws Exception { + // Lazily initialize the Writer + if (writer == null) { + WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView); + LOG.info("Opening writer for write operation {}", writeOperation); + writer = writeOperation.createWriter(c.getPipelineOptions()); + + if (windowedWrites) { + writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), UNKNOWN_SHARDNUM, + UNKNOWN_NUMSHARDS); + } else { + writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, UNKNOWN_NUMSHARDS); + } + LOG.debug("Done opening writer {} for operation {}", writer, writeOperationView); + } + try { + writer.write(c.element()); + } catch (Exception e) { + // Discard write result and close the write. + try { + writer.close(); + // The writer does not need to be reset, as this DoFn cannot be reused. + } catch (Exception closeException) { + if (closeException instanceof InterruptedException) { + // Do not silently ignore interrupted state. + Thread.currentThread().interrupt(); + } + // Do not mask the exception that caused the write to fail. + e.addSuppressed(closeException); + } + throw e; + } + } + + @FinishBundle + public void finishBundle(Context c) throws Exception { + if (writer != null) { + WriteT result = writer.close(); + c.output(result); + // Reset state in case of reuse. + writer = null; + } + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.delegate(Write.this); + } + } + + /** + * Like {@link WriteBundles}, but where the elements for each shard have been collected into + * a single iterable. + * + * @see WriteBundles + */ + private class WriteShardedBundles<WriteT> extends DoFn<KV<Integer, Iterable<T>>, WriteT> { + private final PCollectionView<WriteOperation<T, WriteT>> writeOperationView; + private final PCollectionView<Integer> numShardsView; + + WriteShardedBundles(PCollectionView<WriteOperation<T, WriteT>> writeOperationView, + PCollectionView<Integer> numShardsView) { + this.writeOperationView = writeOperationView; + this.numShardsView = numShardsView; + } + + @ProcessElement + public void processElement(ProcessContext c, BoundedWindow window) throws Exception { + int numShards = numShardsView != null ? c.sideInput(numShardsView) : getNumShards().get(); + // In a sharded write, single input element represents one shard. We can open and close + // the writer in each call to processElement. + WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView); + LOG.info("Opening writer for write operation {}", writeOperation); + Writer<T, WriteT> writer = writeOperation.createWriter(c.getPipelineOptions()); + if (windowedWrites) { + writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), c.element().getKey(), + numShards); + } else { + writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, UNKNOWN_NUMSHARDS); + } + LOG.debug("Done opening writer {} for operation {}", writer, writeOperationView); + + try { + try { + for (T t : c.element().getValue()) { + writer.write(t); + } + } catch (Exception e) { + try { + writer.close(); + } catch (Exception closeException) { + if (closeException instanceof InterruptedException) { + // Do not silently ignore interrupted state. + Thread.currentThread().interrupt(); + } + // Do not mask the exception that caused the write to fail. + e.addSuppressed(closeException); + } + throw e; + } + + // Close the writer; if this throws let the error propagate. + WriteT result = writer.close(); + c.output(result); + } catch (Exception e) { + // If anything goes wrong, make sure to delete the temporary file. + writer.cleanup(); + throw e; + } + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.delegate(Write.this); + } + } + + private static class ApplyShardingKey<T> extends DoFn<T, KV<Integer, T>> { + private final PCollectionView<Integer> numShardsView; + private final ValueProvider<Integer> numShardsProvider; + private int shardNumber; + + ApplyShardingKey(PCollectionView<Integer> numShardsView, + ValueProvider<Integer> numShardsProvider) { + this.numShardsView = numShardsView; + this.numShardsProvider = numShardsProvider; + shardNumber = UNKNOWN_SHARDNUM; + } + + @ProcessElement + public void processElement(ProcessContext context) { + int shardCount = 0; + if (numShardsView != null) { + shardCount = context.sideInput(numShardsView); + } else { + checkNotNull(numShardsProvider); + shardCount = numShardsProvider.get(); + } + checkArgument( + shardCount > 0, + "Must have a positive number of shards specified for non-runner-determined sharding." + + " Got %s", + shardCount); + if (shardNumber == UNKNOWN_SHARDNUM) { + // We want to desynchronize the first record sharding key for each instance of + // ApplyShardingKey, so records in a small PCollection will be statistically balanced. + shardNumber = ThreadLocalRandom.current().nextInt(shardCount); + } else { + shardNumber = (shardNumber + 1) % shardCount; + } + context.output(KV.of(shardNumber, context.element())); + } + } + + /** + * A write is performed as sequence of three {@link ParDo}'s. + * + * <p>In the first, a do-once ParDo is applied to a singleton PCollection containing the Sink's + * {@link WriteOperation}. In this initialization ParDo, {@link WriteOperation#initialize} is + * called. The output of this ParDo is a singleton PCollection + * containing the WriteOperation. + * + * <p>This singleton collection containing the WriteOperation is then used as a side input to a + * ParDo over the PCollection of elements to write. In this bundle-writing phase, + * {@link WriteOperation#createWriter} is called to obtain a {@link Writer}. + * {@link Writer#open} and {@link Writer#close} are called in {@link DoFn#startBundle} and + * {@link DoFn#finishBundle}, respectively, and {@link Writer#write} method is called for + * every element in the bundle. The output of this ParDo is a PCollection of + * <i>writer result</i> objects (see {@link Sink} for a description of writer results)-one for + * each bundle. + * + * <p>The final do-once ParDo uses the singleton collection of the WriteOperation as input and + * the collection of writer results as a side-input. In this ParDo, + * {@link WriteOperation#finalize} is called to finalize the write. + * + * <p>If the write of any element in the PCollection fails, {@link Writer#close} will be called + * before the exception that caused the write to fail is propagated and the write result will be + * discarded. + * + * <p>Since the {@link WriteOperation} is serialized after the initialization ParDo and + * deserialized in the bundle-writing and finalization phases, any state change to the + * WriteOperation object that occurs during initialization is visible in the latter phases. + * However, the WriteOperation is not serialized after the bundle-writing phase. This is why + * implementations should guarantee that {@link WriteOperation#createWriter} does not mutate + * WriteOperation). + */ + private <WriteT> PDone createWrite( + PCollection<T> input, WriteOperation<T, WriteT> writeOperation) { + Pipeline p = input.getPipeline(); + writeOperation.setWindowedWrites(windowedWrites); + + // A coder to use for the WriteOperation. + @SuppressWarnings("unchecked") + Coder<WriteOperation<T, WriteT>> operationCoder = + (Coder<WriteOperation<T, WriteT>>) SerializableCoder.of(writeOperation.getClass()); + + // A singleton collection of the WriteOperation, to be used as input to a ParDo to initialize + // the sink. + PCollection<WriteOperation<T, WriteT>> operationCollection = + p.apply("CreateOperationCollection", Create.of(writeOperation).withCoder(operationCoder)); + + // Initialize the resource in a do-once ParDo on the WriteOperation. + operationCollection = operationCollection + .apply("Initialize", ParDo.of( + new DoFn<WriteOperation<T, WriteT>, WriteOperation<T, WriteT>>() { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + WriteOperation<T, WriteT> writeOperation = c.element(); + LOG.info("Initializing write operation {}", writeOperation); + writeOperation.initialize(c.getPipelineOptions()); + writeOperation.setWindowedWrites(windowedWrites); + LOG.debug("Done initializing write operation {}", writeOperation); + // The WriteOperation is also the output of this ParDo, so it can have mutable + // state. + c.output(writeOperation); + } + })) + .setCoder(operationCoder); + + // Create a view of the WriteOperation to be used as a sideInput to the parallel write phase. + final PCollectionView<WriteOperation<T, WriteT>> writeOperationView = + operationCollection.apply(View.<WriteOperation<T, WriteT>>asSingleton()); + + if (!windowedWrites) { + // Re-window the data into the global window and remove any existing triggers. + input = + input.apply( + Window.<T>into(new GlobalWindows()) + .triggering(DefaultTrigger.of()) + .discardingFiredPanes()); + } + + + // Perform the per-bundle writes as a ParDo on the input PCollection (with the WriteOperation + // as a side input) and collect the results of the writes in a PCollection. + // There is a dependency between this ParDo and the first (the WriteOperation PCollection + // as a side input), so this will happen after the initial ParDo. + PCollection<WriteT> results; + final PCollectionView<Integer> numShardsView; + if (computeNumShards == null && numShardsProvider == null) { + if (windowedWrites) { + throw new IllegalStateException("When doing windowed writes, numShards must be set" + + "explicitly to a positive value"); + } + numShardsView = null; + results = input + .apply("WriteBundles", + ParDo.of(new WriteBundles<>(writeOperationView)) + .withSideInputs(writeOperationView)); + } else { + if (computeNumShards != null) { + numShardsView = input.apply(computeNumShards); + results = input + .apply("ApplyShardLabel", ParDo.of( + new ApplyShardingKey<T>(numShardsView, null)).withSideInputs(numShardsView)) + .apply("GroupIntoShards", GroupByKey.<Integer, T>create()) + .apply("WriteShardedBundles", + ParDo.of(new WriteShardedBundles<>(writeOperationView, numShardsView)) + .withSideInputs(numShardsView, writeOperationView)); + } else { + numShardsView = null; + results = input + .apply("ApplyShardLabel", ParDo.of(new ApplyShardingKey<T>(null, numShardsProvider))) + .apply("GroupIntoShards", GroupByKey.<Integer, T>create()) + .apply("WriteShardedBundles", + ParDo.of(new WriteShardedBundles<>(writeOperationView, null)) + .withSideInputs(writeOperationView)); + } + } + results.setCoder(writeOperation.getWriterResultCoder()); + + if (windowedWrites) { + // When processing streaming windowed writes, results will arrive multiple times. This + // means we can't share the below implementation that turns the results into a side input, + // as new data arriving into a side input does not trigger the listening DoFn. Instead + // we aggregate the result set using a singleton GroupByKey, so the DoFn will be triggered + // whenever new data arrives. + PCollection<KV<Void, WriteT>> keyedResults = + results.apply("AttachSingletonKey", WithKeys.<Void, WriteT>of((Void) null)); + keyedResults.setCoder(KvCoder.<Void, WriteT>of(VoidCoder.of(), writeOperation + .getWriterResultCoder())); + + // Is the continuation trigger sufficient? + keyedResults + .apply("FinalizeGroupByKey", GroupByKey.<Void, WriteT>create()) + .apply("Finalize", ParDo.of(new DoFn<KV<Void, Iterable<WriteT>>, Integer>() { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView); + LOG.info("Finalizing write operation {}.", writeOperation); + List<WriteT> results = Lists.newArrayList(c.element().getValue()); + writeOperation.finalize(results, c.getPipelineOptions()); + LOG.debug("Done finalizing write operation {}", writeOperation); + } + }).withSideInputs(writeOperationView)); + } else { + final PCollectionView<Iterable<WriteT>> resultsView = + results.apply(View.<WriteT>asIterable()); + ImmutableList.Builder<PCollectionView<?>> sideInputs = + ImmutableList.<PCollectionView<?>>builder().add(resultsView); + if (numShardsView != null) { + sideInputs.add(numShardsView); + } + + // Finalize the write in another do-once ParDo on the singleton collection containing the + // Writer. The results from the per-bundle writes are given as an Iterable side input. + // The WriteOperation's state is the same as after its initialization in the first do-once + // ParDo. There is a dependency between this ParDo and the parallel write (the writer + // results collection as a side input), so it will happen after the parallel write. + // For the non-windowed case, we guarantee that if no data is written but the user has + // set numShards, then all shards will be written out as empty files. For this reason we + // use a side input here. + operationCollection + .apply("Finalize", ParDo.of(new DoFn<WriteOperation<T, WriteT>, Integer>() { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + WriteOperation<T, WriteT> writeOperation = c.element(); + LOG.info("Finalizing write operation {}.", writeOperation); + List<WriteT> results = Lists.newArrayList(c.sideInput(resultsView)); + LOG.debug("Side input initialized to finalize write operation {}.", writeOperation); + + // We must always output at least 1 shard, and honor user-specified numShards if + // set. + int minShardsNeeded; + if (numShardsView != null) { + minShardsNeeded = c.sideInput(numShardsView); + } else if (numShardsProvider != null) { + minShardsNeeded = numShardsProvider.get(); + } else { + minShardsNeeded = 1; + } + int extraShardsNeeded = minShardsNeeded - results.size(); + if (extraShardsNeeded > 0) { + LOG.info( + "Creating {} empty output shards in addition to {} written for a total of " + + " {}.", extraShardsNeeded, results.size(), minShardsNeeded); + for (int i = 0; i < extraShardsNeeded; ++i) { + Writer<T, WriteT> writer = writeOperation.createWriter(c.getPipelineOptions()); + writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, + UNKNOWN_NUMSHARDS); + WriteT emptyWrite = writer.close(); + results.add(emptyWrite); + } + LOG.debug("Done creating extra shards."); + } + writeOperation.finalize(results, c.getPipelineOptions()); + LOG.debug("Done finalizing write operation {}", writeOperation); + } + }).withSideInputs(sideInputs.build())); + } + return PDone.in(input.getPipeline()); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/package-info.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/package-info.java new file mode 100644 index 0000000..763b30a --- /dev/null +++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Transforms used to read from the Hadoop file system (HDFS). + */ +package org.apache.beam.sdk.io.hdfs; http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java new file mode 100644 index 0000000..9fa6606 --- /dev/null +++ b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hdfs; + +import static org.junit.Assert.assertEquals; + +import com.google.common.base.MoreObjects; +import java.io.File; +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.UUID; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.file.FileReader; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.mapred.AvroKey; +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.DefaultCoder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.KV; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +/** + * Tests for HDFSFileSinkTest. + */ +public class HDFSFileSinkTest { + + @Rule + public TemporaryFolder tmpFolder = new TemporaryFolder(); + + private final String part0 = "part-r-00000"; + private final String foobar = "foobar"; + + private <T> void doWrite(Sink<T> sink, + PipelineOptions options, + Iterable<T> toWrite) throws Exception { + Sink.WriteOperation<T, String> writeOperation = + (Sink.WriteOperation<T, String>) sink.createWriteOperation(); + Sink.Writer<T, String> writer = writeOperation.createWriter(options); + writer.openUnwindowed(UUID.randomUUID().toString(), -1, -1); + for (T t: toWrite) { + writer.write(t); + } + String writeResult = writer.close(); + writeOperation.finalize(Collections.singletonList(writeResult), options); + } + + @Test + public void testWriteSingleRecord() throws Exception { + PipelineOptions options = PipelineOptionsFactory.create(); + File file = tmpFolder.newFolder(); + + HDFSFileSink<String, NullWritable, Text> sink = + HDFSFileSink.to( + file.toString(), + SequenceFileOutputFormat.class, + NullWritable.class, + Text.class, + new SerializableFunction<String, KV<NullWritable, Text>>() { + @Override + public KV<NullWritable, Text> apply(String input) { + return KV.of(NullWritable.get(), new Text(input)); + } + }); + + doWrite(sink, options, Collections.singletonList(foobar)); + + SequenceFile.Reader.Option opts = + SequenceFile.Reader.file(new Path(file.toString(), part0)); + SequenceFile.Reader reader = new SequenceFile.Reader(new Configuration(), opts); + assertEquals(NullWritable.class.getName(), reader.getKeyClassName()); + assertEquals(Text.class.getName(), reader.getValueClassName()); + NullWritable k = NullWritable.get(); + Text v = new Text(); + assertEquals(true, reader.next(k, v)); + assertEquals(NullWritable.get(), k); + assertEquals(new Text(foobar), v); + } + + @Test + public void testToText() throws Exception { + PipelineOptions options = PipelineOptionsFactory.create(); + File file = tmpFolder.newFolder(); + + HDFSFileSink<String, NullWritable, Text> sink = HDFSFileSink.toText(file.toString()); + + doWrite(sink, options, Collections.singletonList(foobar)); + + List<String> strings = Files.readAllLines(new File(file.toString(), part0).toPath(), + Charset.forName("UTF-8")); + assertEquals(Collections.singletonList(foobar), strings); + } + + @DefaultCoder(AvroCoder.class) + static class GenericClass { + int intField; + String stringField; + public GenericClass() {} + public GenericClass(int intValue, String stringValue) { + this.intField = intValue; + this.stringField = stringValue; + } + @Override + public String toString() { + return MoreObjects.toStringHelper(getClass()) + .add("intField", intField) + .add("stringField", stringField) + .toString(); + } + @Override + public int hashCode() { + return Objects.hash(intField, stringField); + } + @Override + public boolean equals(Object other) { + if (other == null || !(other instanceof GenericClass)) { + return false; + } + GenericClass o = (GenericClass) other; + return Objects.equals(intField, o.intField) && Objects.equals(stringField, o.stringField); + } + } + + @Test + public void testToAvro() throws Exception { + PipelineOptions options = PipelineOptionsFactory.create(); + File file = tmpFolder.newFolder(); + + HDFSFileSink<GenericClass, AvroKey<GenericClass>, NullWritable> sink = HDFSFileSink.toAvro( + file.toString(), + AvroCoder.of(GenericClass.class), + new Configuration(false)); + + doWrite(sink, options, Collections.singletonList(new GenericClass(3, "foobar"))); + + GenericDatumReader datumReader = new GenericDatumReader(); + FileReader<GenericData.Record> reader = + DataFileReader.openReader(new File(file.getAbsolutePath(), part0 + ".avro"), datumReader); + GenericData.Record next = reader.next(null); + assertEquals("foobar", next.get("stringField").toString()); + assertEquals(3, next.get("intField")); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java new file mode 100644 index 0000000..a964239 --- /dev/null +++ b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hdfs; + +import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.Source; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.SourceTestUtils; +import org.apache.beam.sdk.values.KV; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.SequenceFile.Writer; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +/** + * Tests for HDFSFileSource. + */ +public class HDFSFileSourceTest { + + private Random random = new Random(0L); + + @Rule + public TemporaryFolder tmpFolder = new TemporaryFolder(); + + @Test + public void testFullyReadSingleFile() throws Exception { + PipelineOptions options = PipelineOptionsFactory.create(); + List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10, 0); + File file = createFileWithData("tmp.seq", expectedResults); + + HDFSFileSource<KV<IntWritable, Text>, IntWritable, Text> source = + HDFSFileSource.from( + file.toString(), SequenceFileInputFormat.class, IntWritable.class, Text.class); + + assertEquals(file.length(), source.getEstimatedSizeBytes(null)); + + assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray())); + } + + @Test + public void testFullyReadSingleFileWithSpaces() throws Exception { + PipelineOptions options = PipelineOptionsFactory.create(); + List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10, 0); + File file = createFileWithData("tmp data.seq", expectedResults); + + HDFSFileSource<KV<IntWritable, Text>, IntWritable, Text> source = + HDFSFileSource.from( + file.toString(), SequenceFileInputFormat.class, IntWritable.class, Text.class); + + assertEquals(file.length(), source.getEstimatedSizeBytes(null)); + + assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray())); + } + + @Test + public void testFullyReadFilePattern() throws IOException { + PipelineOptions options = PipelineOptionsFactory.create(); + List<KV<IntWritable, Text>> data1 = createRandomRecords(3, 10, 0); + File file1 = createFileWithData("file1", data1); + + List<KV<IntWritable, Text>> data2 = createRandomRecords(3, 10, 10); + createFileWithData("file2", data2); + + List<KV<IntWritable, Text>> data3 = createRandomRecords(3, 10, 20); + createFileWithData("file3", data3); + + List<KV<IntWritable, Text>> data4 = createRandomRecords(3, 10, 30); + createFileWithData("otherfile", data4); + + List<KV<IntWritable, Text>> expectedResults = new ArrayList<>(); + expectedResults.addAll(data1); + expectedResults.addAll(data2); + expectedResults.addAll(data3); + + HDFSFileSource<KV<IntWritable, Text>, IntWritable, Text> source = + HDFSFileSource.from( + new File(file1.getParent(), "file*").toString(), SequenceFileInputFormat.class, + IntWritable.class, Text.class); + + assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray())); + } + + @Test + public void testCloseUnstartedFilePatternReader() throws IOException { + PipelineOptions options = PipelineOptionsFactory.create(); + List<KV<IntWritable, Text>> data1 = createRandomRecords(3, 10, 0); + File file1 = createFileWithData("file1", data1); + + List<KV<IntWritable, Text>> data2 = createRandomRecords(3, 10, 10); + createFileWithData("file2", data2); + + List<KV<IntWritable, Text>> data3 = createRandomRecords(3, 10, 20); + createFileWithData("file3", data3); + + List<KV<IntWritable, Text>> data4 = createRandomRecords(3, 10, 30); + createFileWithData("otherfile", data4); + + HDFSFileSource<KV<IntWritable, Text>, IntWritable, Text> source = + HDFSFileSource.from( + new File(file1.getParent(), "file*").toString(), + SequenceFileInputFormat.class, IntWritable.class, Text.class); + Source.Reader<KV<IntWritable, Text>> reader = source.createReader(options); + + // Closing an unstarted FilePatternReader should not throw an exception. + try { + reader.close(); + } catch (Exception e) { + fail("Closing an unstarted FilePatternReader should not throw an exception"); + } + } + + @Test + public void testSplits() throws Exception { + PipelineOptions options = PipelineOptionsFactory.create(); + + List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10000, 0); + File file = createFileWithData("tmp.seq", expectedResults); + + HDFSFileSource<KV<IntWritable, Text>, IntWritable, Text> source = + HDFSFileSource.from( + file.toString(), SequenceFileInputFormat.class, IntWritable.class, Text.class); + + // Assert that the source produces the expected records + assertEquals(expectedResults, readFromSource(source, options)); + + // Split with a small bundle size (has to be at least size of sync interval) + List<? extends BoundedSource<KV<IntWritable, Text>>> splits = source + .split(SequenceFile.SYNC_INTERVAL, options); + assertTrue(splits.size() > 2); + SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options); + int nonEmptySplits = 0; + for (BoundedSource<KV<IntWritable, Text>> subSource : splits) { + if (readFromSource(subSource, options).size() > 0) { + nonEmptySplits += 1; + } + } + assertTrue(nonEmptySplits > 2); + } + + @Test + public void testSplitEstimatedSize() throws Exception { + PipelineOptions options = PipelineOptionsFactory.create(); + + List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10000, 0); + File file = createFileWithData("tmp.avro", expectedResults); + + HDFSFileSource<KV<IntWritable, Text>, IntWritable, Text> source = + HDFSFileSource.from(file.toString(), SequenceFileInputFormat.class, + IntWritable.class, Text.class); + + long originalSize = source.getEstimatedSizeBytes(options); + long splitTotalSize = 0; + List<? extends BoundedSource<KV<IntWritable, Text>>> splits = source.split( + SequenceFile.SYNC_INTERVAL, options + ); + for (BoundedSource<KV<IntWritable, Text>> splitSource : splits) { + splitTotalSize += splitSource.getEstimatedSizeBytes(options); + } + // Assert that the estimated size of the whole is the sum of its parts + assertEquals(originalSize, splitTotalSize); + } + + private File createFileWithData(String filename, List<KV<IntWritable, Text>> records) + throws IOException { + File tmpFile = tmpFolder.newFile(filename); + try (Writer writer = SequenceFile.createWriter(new Configuration(), + Writer.keyClass(IntWritable.class), Writer.valueClass(Text.class), + Writer.file(new Path(tmpFile.toURI())))) { + + for (KV<IntWritable, Text> record : records) { + writer.append(record.getKey(), record.getValue()); + } + } + return tmpFile; + } + + private List<KV<IntWritable, Text>> createRandomRecords(int dataItemLength, + int numItems, int offset) { + List<KV<IntWritable, Text>> records = new ArrayList<>(); + for (int i = 0; i < numItems; i++) { + IntWritable key = new IntWritable(i + offset); + Text value = new Text(createRandomString(dataItemLength)); + records.add(KV.of(key, value)); + } + return records; + } + + private String createRandomString(int length) { + char[] chars = "abcdefghijklmnopqrstuvwxyz".toCharArray(); + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < length; i++) { + builder.append(chars[random.nextInt(chars.length)]); + } + return builder.toString(); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModuleTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModuleTest.java b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModuleTest.java new file mode 100644 index 0000000..6963116 --- /dev/null +++ b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModuleTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hdfs; + +import static org.hamcrest.Matchers.hasItem; +import static org.junit.Assert.assertThat; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.AbstractMap; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.util.common.ReflectHelpers; +import org.apache.hadoop.conf.Configuration; +import org.hamcrest.Matchers; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Test for {@link HadoopFileSystemModule}. + */ +@RunWith(JUnit4.class) +public class HadoopFileSystemModuleTest { + @Test + public void testObjectMapperIsAbleToFindModule() throws Exception { + List<Module> modules = ObjectMapper.findModules(ReflectHelpers.findClassLoader()); + assertThat(modules, hasItem(Matchers.<Module>instanceOf(HadoopFileSystemModule.class))); + } + + @Test + public void testConfigurationSerializationDeserialization() throws Exception { + Configuration baseConfiguration = new Configuration(false); + baseConfiguration.set("testPropertyA", "baseA"); + baseConfiguration.set("testPropertyC", "baseC"); + Configuration configuration = new Configuration(false); + configuration.addResource(baseConfiguration); + configuration.set("testPropertyA", "A"); + configuration.set("testPropertyB", "B"); + ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.registerModule(new HadoopFileSystemModule()); + String serializedConfiguration = objectMapper.writeValueAsString(configuration); + Configuration deserializedConfiguration = + objectMapper.readValue(serializedConfiguration, Configuration.class); + assertThat(deserializedConfiguration, Matchers.<Map.Entry<String, String>>contains( + new AbstractMap.SimpleEntry("testPropertyA", "A"), + new AbstractMap.SimpleEntry("testPropertyB", "B"), + new AbstractMap.SimpleEntry("testPropertyC", "baseC"))); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrarTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrarTest.java b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrarTest.java new file mode 100644 index 0000000..2be3d93 --- /dev/null +++ b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrarTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hdfs; + +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +import com.google.common.collect.Lists; +import java.util.ServiceLoader; +import org.apache.beam.sdk.options.PipelineOptionsRegistrar; +import org.hamcrest.Matchers; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link HadoopFileSystemOptionsRegistrar}. + */ +@RunWith(JUnit4.class) +public class HadoopFileSystemOptionsRegistrarTest { + + @Test + public void testServiceLoader() { + for (PipelineOptionsRegistrar registrar + : Lists.newArrayList(ServiceLoader.load(PipelineOptionsRegistrar.class).iterator())) { + if (registrar instanceof HadoopFileSystemOptionsRegistrar) { + assertThat(registrar.getPipelineOptions(), + Matchers.<Class<?>>contains(HadoopFileSystemOptions.class)); + return; + } + } + fail("Expected to find " + HadoopFileSystemOptionsRegistrar.class); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsTest.java b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsTest.java new file mode 100644 index 0000000..634528b --- /dev/null +++ b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hdfs; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +import java.util.AbstractMap; +import java.util.Map; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.hamcrest.Matchers; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link HadoopFileSystemOptions}. + */ +@RunWith(JUnit4.class) +public class HadoopFileSystemOptionsTest { + @Test + public void testParsingHdfsConfiguration() { + HadoopFileSystemOptions options = PipelineOptionsFactory.fromArgs( + "--hdfsConfiguration=[" + + "{\"propertyA\": \"A\"}," + + "{\"propertyB\": \"B\"}]").as(HadoopFileSystemOptions.class); + assertEquals(2, options.getHdfsConfiguration().size()); + assertThat(options.getHdfsConfiguration().get(0), Matchers.<Map.Entry<String, String>>contains( + new AbstractMap.SimpleEntry("propertyA", "A"))); + assertThat(options.getHdfsConfiguration().get(1), Matchers.<Map.Entry<String, String>>contains( + new AbstractMap.SimpleEntry("propertyB", "B"))); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java new file mode 100644 index 0000000..96f7102 --- /dev/null +++ b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hdfs; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import java.net.URI; +import java.util.ServiceLoader; +import org.apache.beam.sdk.io.FileSystem; +import org.apache.beam.sdk.io.FileSystemRegistrar; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link HadoopFileSystemRegistrar}. + */ +@RunWith(JUnit4.class) +public class HadoopFileSystemRegistrarTest { + + @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); + private Configuration configuration; + private MiniDFSCluster hdfsCluster; + private URI hdfsClusterBaseUri; + + @Before + public void setUp() throws Exception { + configuration = new Configuration(); + configuration.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpFolder.getRoot().getAbsolutePath()); + MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(configuration); + hdfsCluster = builder.build(); + hdfsClusterBaseUri = new URI(configuration.get("fs.defaultFS") + "/"); + } + + @After + public void tearDown() throws Exception { + hdfsCluster.shutdown(); + } + + @Test + public void testServiceLoader() { + HadoopFileSystemOptions options = PipelineOptionsFactory.as(HadoopFileSystemOptions.class); + options.setHdfsConfiguration(ImmutableList.of(configuration)); + for (FileSystemRegistrar registrar + : Lists.newArrayList(ServiceLoader.load(FileSystemRegistrar.class).iterator())) { + if (registrar instanceof HadoopFileSystemRegistrar) { + Iterable<FileSystem> fileSystems = registrar.fromOptions(options); + assertEquals(hdfsClusterBaseUri.getScheme(), + ((HadoopFileSystem) Iterables.getOnlyElement(fileSystems)).getScheme()); + return; + } + } + fail("Expected to find " + HadoopFileSystemRegistrar.class); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java new file mode 100644 index 0000000..cf86c36 --- /dev/null +++ b/sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hdfs; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.io.ByteStreams; +import java.net.URI; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; +import java.util.List; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.io.fs.CreateOptions.StandardCreateOptions; +import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.MatchResult.Metadata; +import org.apache.beam.sdk.io.fs.MatchResult.Status; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.util.MimeTypes; +import org.apache.beam.sdk.values.PCollection; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link HadoopFileSystem}. + */ +@RunWith(JUnit4.class) +public class HadoopFileSystemTest { + + @Rule public TestPipeline p = TestPipeline.create(); + @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); + @Rule public ExpectedException thrown = ExpectedException.none(); + private Configuration configuration; + private MiniDFSCluster hdfsCluster; + private URI hdfsClusterBaseUri; + private HadoopFileSystem fileSystem; + + @Before + public void setUp() throws Exception { + configuration = new Configuration(); + configuration.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpFolder.getRoot().getAbsolutePath()); + MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(configuration); + hdfsCluster = builder.build(); + hdfsClusterBaseUri = new URI(configuration.get("fs.defaultFS") + "/"); + fileSystem = new HadoopFileSystem(configuration); + } + + @After + public void tearDown() throws Exception { + hdfsCluster.shutdown(); + } + + @Test + public void testCreateAndReadFile() throws Exception { + create("testFile", "testData".getBytes()); + assertArrayEquals("testData".getBytes(), read("testFile")); + } + + @Test + public void testCopy() throws Exception { + create("testFileA", "testDataA".getBytes()); + create("testFileB", "testDataB".getBytes()); + fileSystem.copy( + ImmutableList.of( + testPath("testFileA"), + testPath("testFileB")), + ImmutableList.of( + testPath("copyTestFileA"), + testPath("copyTestFileB"))); + assertArrayEquals("testDataA".getBytes(), read("testFileA")); + assertArrayEquals("testDataB".getBytes(), read("testFileB")); + assertArrayEquals("testDataA".getBytes(), read("copyTestFileA")); + assertArrayEquals("testDataB".getBytes(), read("copyTestFileB")); + } + + @Test + public void testDelete() throws Exception { + create("testFileA", "testDataA".getBytes()); + create("testFileB", "testDataB".getBytes()); + create("testFileC", "testDataC".getBytes()); + + // ensure files exist + assertArrayEquals("testDataA".getBytes(), read("testFileA")); + assertArrayEquals("testDataB".getBytes(), read("testFileB")); + assertArrayEquals("testDataC".getBytes(), read("testFileC")); + + fileSystem.delete(ImmutableList.of( + testPath("testFileA"), + testPath("testFileC"))); + + List<MatchResult> results = + fileSystem.match(ImmutableList.of(testPath("testFile*").toString())); + assertThat(results, contains(MatchResult.create(Status.OK, ImmutableList.of( + Metadata.builder() + .setResourceId(testPath("testFileB")) + .setIsReadSeekEfficient(true) + .setSizeBytes("testDataB".getBytes().length) + .build())))); + } + + @Test + public void testMatch() throws Exception { + create("testFileAA", "testDataAA".getBytes()); + create("testFileA", "testDataA".getBytes()); + create("testFileB", "testDataB".getBytes()); + + // ensure files exist + assertArrayEquals("testDataAA".getBytes(), read("testFileAA")); + assertArrayEquals("testDataA".getBytes(), read("testFileA")); + assertArrayEquals("testDataB".getBytes(), read("testFileB")); + + List<MatchResult> results = + fileSystem.match(ImmutableList.of(testPath("testFileA*").toString())); + assertEquals(Status.OK, Iterables.getOnlyElement(results).status()); + assertThat(Iterables.getOnlyElement(results).metadata(), containsInAnyOrder( + Metadata.builder() + .setResourceId(testPath("testFileAA")) + .setIsReadSeekEfficient(true) + .setSizeBytes("testDataAA".getBytes().length) + .build(), + Metadata.builder() + .setResourceId(testPath("testFileA")) + .setIsReadSeekEfficient(true) + .setSizeBytes("testDataA".getBytes().length) + .build())); + } + + @Test + public void testRename() throws Exception { + create("testFileA", "testDataA".getBytes()); + create("testFileB", "testDataB".getBytes()); + + // ensure files exist + assertArrayEquals("testDataA".getBytes(), read("testFileA")); + assertArrayEquals("testDataB".getBytes(), read("testFileB")); + + fileSystem.rename( + ImmutableList.of( + testPath("testFileA"), testPath("testFileB")), + ImmutableList.of( + testPath("renameFileA"), testPath("renameFileB"))); + + List<MatchResult> results = + fileSystem.match(ImmutableList.of(testPath("*").toString())); + assertEquals(Status.OK, Iterables.getOnlyElement(results).status()); + assertThat(Iterables.getOnlyElement(results).metadata(), containsInAnyOrder( + Metadata.builder() + .setResourceId(testPath("renameFileA")) + .setIsReadSeekEfficient(true) + .setSizeBytes("testDataA".getBytes().length) + .build(), + Metadata.builder() + .setResourceId(testPath("renameFileB")) + .setIsReadSeekEfficient(true) + .setSizeBytes("testDataB".getBytes().length) + .build())); + + // ensure files exist + assertArrayEquals("testDataA".getBytes(), read("renameFileA")); + assertArrayEquals("testDataB".getBytes(), read("renameFileB")); + } + + @Test + public void testMatchNewResource() throws Exception { + // match file spec + assertEquals(testPath("file"), + fileSystem.matchNewResource(testPath("file").toString(), false)); + // match dir spec missing '/' + assertEquals(testPath("dir/"), + fileSystem.matchNewResource(testPath("dir").toString(), true)); + // match dir spec with '/' + assertEquals(testPath("dir/"), + fileSystem.matchNewResource(testPath("dir/").toString(), true)); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Expected file path but received directory path"); + fileSystem.matchNewResource(testPath("dir/").toString(), false); + } + + @Test + @Ignore("TestPipeline needs a way to take in HadoopFileSystemOptions") + public void testReadPipeline() throws Exception { + create("testFileA", "testDataA".getBytes()); + create("testFileB", "testDataB".getBytes()); + create("testFileC", "testDataC".getBytes()); + + HadoopFileSystemOptions options = TestPipeline.testingPipelineOptions() + .as(HadoopFileSystemOptions.class); + options.setHdfsConfiguration(ImmutableList.of(fileSystem.fileSystem.getConf())); + FileSystems.setDefaultConfigInWorkers(options); + PCollection<String> pc = p.apply( + TextIO.read().from(testPath("testFile*").toString())); + PAssert.that(pc).containsInAnyOrder("testDataA", "testDataB", "testDataC"); + p.run(); + } + + private void create(String relativePath, byte[] contents) throws Exception { + try (WritableByteChannel channel = fileSystem.create( + testPath(relativePath), + StandardCreateOptions.builder().setMimeType(MimeTypes.BINARY).build())) { + channel.write(ByteBuffer.wrap(contents)); + } + } + + private byte[] read(String relativePath) throws Exception { + try (ReadableByteChannel channel = fileSystem.open(testPath(relativePath))) { + return ByteStreams.toByteArray(Channels.newInputStream(channel)); + } + } + + private HadoopResourceId testPath(String relativePath) { + return new HadoopResourceId(hdfsClusterBaseUri.resolve(relativePath)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hdfs/README.md ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/README.md b/sdks/java/io/hdfs/README.md deleted file mode 100644 index 3a734f2..0000000 --- a/sdks/java/io/hdfs/README.md +++ /dev/null @@ -1,43 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. ---> - -# HDFS IO - -This library provides HDFS sources and sinks to make it possible to read and -write Apache Hadoop file formats from Apache Beam pipelines. - -Currently, only the read path is implemented. A `HDFSFileSource` allows any -Hadoop `FileInputFormat` to be read as a `PCollection`. - -A `HDFSFileSource` can be read from using the -`org.apache.beam.sdk.io.Read` transform. For example: - -```java -HDFSFileSource<K, V> source = HDFSFileSource.from(path, MyInputFormat.class, - MyKey.class, MyValue.class); -PCollection<KV<MyKey, MyValue>> records = pipeline.apply(Read.from(mySource)); -``` - -Alternatively, the `readFrom` method is a convenience method that returns a read -transform. For example: - -```java -PCollection<KV<MyKey, MyValue>> records = pipeline.apply(HDFSFileSource.readFrom(path, - MyInputFormat.class, MyKey.class, MyValue.class)); -``` http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hdfs/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/pom.xml b/sdks/java/io/hdfs/pom.xml deleted file mode 100644 index daa3b26..0000000 --- a/sdks/java/io/hdfs/pom.xml +++ /dev/null @@ -1,195 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.beam</groupId> - <artifactId>beam-sdks-java-io-parent</artifactId> - <version>0.7.0-SNAPSHOT</version> - <relativePath>../pom.xml</relativePath> - </parent> - - <artifactId>beam-sdks-java-io-hdfs</artifactId> - <name>Apache Beam :: SDKs :: Java :: IO :: HDFS</name> - <description>Library to read and write Hadoop/HDFS file formats from Beam.</description> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-surefire-plugin</artifactId> - <configuration> - <systemPropertyVariables> - <beamUseDummyRunner>false</beamUseDummyRunner> - </systemPropertyVariables> - </configuration> - </plugin> - </plugins> - </build> - - <properties> - <!-- - This is the version of Hadoop used to compile the hadoop-common module. - This dependency is defined with a provided scope. - Users must supply their own Hadoop version at runtime. - --> - <hadoop.version>2.7.3</hadoop.version> - </properties> - - <dependencyManagement> - <!-- - We define dependencies here instead of sdks/java/io because - of a version mimatch between this Hadoop version and other - Hadoop versions declared in other io submodules. - --> - <dependencies> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdfs</artifactId> - <classifier>tests</classifier> - <version>${hadoop.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-minicluster</artifactId> - <version>${hadoop.version}</version> - </dependency> - </dependencies> - </dependencyManagement> - - <dependencies> - <dependency> - <groupId>org.apache.beam</groupId> - <artifactId>beam-sdks-java-core</artifactId> - </dependency> - - <dependency> - <groupId>org.apache.beam</groupId> - <artifactId>beam-sdks-java-io-hadoop-common</artifactId> - </dependency> - - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-core</artifactId> - </dependency> - - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-databind</artifactId> - </dependency> - - <dependency> - <groupId>com.google.auto.service</groupId> - <artifactId>auto-service</artifactId> - <optional>true</optional> - </dependency> - - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - </dependency> - - <dependency> - <groupId>com.google.auto.value</groupId> - <artifactId>auto-value</artifactId> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - </dependency> - - <dependency> - <groupId>com.google.code.findbugs</groupId> - <artifactId>jsr305</artifactId> - </dependency> - - <dependency> - <groupId>org.apache.avro</groupId> - <artifactId>avro</artifactId> - </dependency> - - <dependency> - <groupId>org.apache.avro</groupId> - <artifactId>avro-mapred</artifactId> - <version>${avro.version}</version> - <classifier>hadoop2</classifier> - <exclusions> - <!-- exclude old Jetty version of servlet API --> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>servlet-api</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapreduce-client-core</artifactId> - <scope>provided</scope> - </dependency> - - <!-- test dependencies --> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-minicluster</artifactId> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdfs</artifactId> - <classifier>tests</classifier> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.beam</groupId> - <artifactId>beam-runners-direct-java</artifactId> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.hamcrest</groupId> - <artifactId>hamcrest-all</artifactId> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>test</scope> - </dependency> - </dependencies> -</project>