[BEAM-2016] Delete HdfsFileSource & Sink
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7512a73c Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7512a73c Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7512a73c Branch: refs/heads/master Commit: 7512a73cf8aa2a527c89ecb054e92207411ed241 Parents: 3bffe0e Author: Dan Halperin <dhalp...@google.com> Authored: Thu May 4 18:33:16 2017 -0700 Committer: Dan Halperin <dhalp...@google.com> Committed: Fri May 5 08:48:04 2017 -0700 ---------------------------------------------------------------------- sdks/java/io/hadoop-file-system/README.md | 43 -- sdks/java/io/hadoop-file-system/pom.xml | 24 - .../apache/beam/sdk/io/hdfs/HDFSFileSink.java | 478 -------------- .../apache/beam/sdk/io/hdfs/HDFSFileSource.java | 625 ------------------- .../java/org/apache/beam/sdk/io/hdfs/Sink.java | 195 ------ .../org/apache/beam/sdk/io/hdfs/UGIHelper.java | 38 -- .../java/org/apache/beam/sdk/io/hdfs/Write.java | 588 ----------------- .../apache/beam/sdk/io/hdfs/package-info.java | 3 +- .../beam/sdk/io/hdfs/HDFSFileSinkTest.java | 172 ----- .../beam/sdk/io/hdfs/HDFSFileSourceTest.java | 231 ------- 10 files changed, 2 insertions(+), 2395 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/7512a73c/sdks/java/io/hadoop-file-system/README.md ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop-file-system/README.md b/sdks/java/io/hadoop-file-system/README.md deleted file mode 100644 index 3a734f2..0000000 --- a/sdks/java/io/hadoop-file-system/README.md +++ /dev/null @@ -1,43 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. ---> - -# HDFS IO - -This library provides HDFS sources and sinks to make it possible to read and -write Apache Hadoop file formats from Apache Beam pipelines. - -Currently, only the read path is implemented. A `HDFSFileSource` allows any -Hadoop `FileInputFormat` to be read as a `PCollection`. - -A `HDFSFileSource` can be read from using the -`org.apache.beam.sdk.io.Read` transform. For example: - -```java -HDFSFileSource<K, V> source = HDFSFileSource.from(path, MyInputFormat.class, - MyKey.class, MyValue.class); -PCollection<KV<MyKey, MyValue>> records = pipeline.apply(Read.from(mySource)); -``` - -Alternatively, the `readFrom` method is a convenience method that returns a read -transform. For example: - -```java -PCollection<KV<MyKey, MyValue>> records = pipeline.apply(HDFSFileSource.readFrom(path, - MyInputFormat.class, MyKey.class, MyValue.class)); -``` http://git-wip-us.apache.org/repos/asf/beam/blob/7512a73c/sdks/java/io/hadoop-file-system/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop-file-system/pom.xml b/sdks/java/io/hadoop-file-system/pom.xml index 562277e..3b392c2 100644 --- a/sdks/java/io/hadoop-file-system/pom.xml +++ b/sdks/java/io/hadoop-file-system/pom.xml @@ -82,11 +82,6 @@ </dependency> <dependency> - <groupId>org.apache.beam</groupId> - <artifactId>beam-sdks-java-io-hadoop-common</artifactId> - </dependency> - - <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> </dependency> @@ -124,25 +119,6 @@ </dependency> <dependency> - <groupId>org.apache.avro</groupId> - <artifactId>avro</artifactId> - </dependency> - - <dependency> - <groupId>org.apache.avro</groupId> - <artifactId>avro-mapred</artifactId> - <version>${avro.version}</version> - <classifier>hadoop2</classifier> - <exclusions> - <!-- exclude old Jetty version of servlet API --> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>servlet-api</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <scope>provided</scope> http://git-wip-us.apache.org/repos/asf/beam/blob/7512a73c/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java deleted file mode 100644 index aee73c4..0000000 --- a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java +++ /dev/null @@ -1,478 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.hdfs; - -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; - -import com.google.auto.value.AutoValue; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import java.io.IOException; -import java.net.URI; -import java.security.PrivilegedExceptionAction; -import java.util.Random; -import java.util.Set; -import javax.annotation.Nullable; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.mapred.AvroKey; -import org.apache.avro.mapreduce.AvroKeyOutputFormat; -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.coders.AvroCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.io.hadoop.SerializableConfiguration; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.values.KV; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.JobID; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.mapreduce.TaskID; -import org.apache.hadoop.mapreduce.TaskType; -import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; -import org.apache.hadoop.mapreduce.task.JobContextImpl; -import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; - -/** - * A {@link Sink} for writing records to a Hadoop filesystem using a Hadoop file-based - * output - * format. - * - * <p>To write a {@link org.apache.beam.sdk.values.PCollection} of elements of type T to Hadoop - * filesystem use {@link HDFSFileSink#to}, specify the path (this can be any Hadoop supported - * filesystem: HDFS, S3, GCS etc), the Hadoop {@link FileOutputFormat}, the key class K and the - * value class V and finally the {@link SerializableFunction} to map from T to {@link KV} of K - * and V. - * - * <p>{@code HDFSFileSink} can be used by {@link Write} to create write - * transform. See example below. - * - * <p>{@code HDFSFileSink} comes with helper methods to write text and Apache Avro. For example: - * - * <pre> - * {@code - * HDFSFileSink<CustomSpecificAvroClass, AvroKey<CustomSpecificAvroClass>, NullWritable> sink = - * HDFSFileSink.toAvro(path, AvroCoder.of(CustomSpecificAvroClass.class)); - * avroRecordsPCollection.apply(Write.to(sink)); - * } - * </pre> - * - * @param <T> the type of elements of the input {@link org.apache.beam.sdk.values.PCollection}. - * @param <K> the type of keys to be written to the sink via {@link FileOutputFormat}. - * @param <V> the type of values to be written to the sink via {@link FileOutputFormat}. - */ -@AutoValue -@Experimental -public abstract class HDFSFileSink<T, K, V> extends Sink<T> { - - private static final JobID jobId = new JobID( - Long.toString(System.currentTimeMillis()), - new Random().nextInt(Integer.MAX_VALUE)); - - public abstract String path(); - public abstract Class<? extends FileOutputFormat<K, V>> formatClass(); - public abstract Class<K> keyClass(); - public abstract Class<V> valueClass(); - public abstract SerializableFunction<T, KV<K, V>> outputConverter(); - public abstract SerializableConfiguration serializableConfiguration(); - public @Nullable abstract String username(); - public abstract boolean validate(); - - // ======================================================================= - // Factory methods - // ======================================================================= - - public static <T, K, V, W extends FileOutputFormat<K, V>> HDFSFileSink<T, K, V> - to(String path, - Class<W> formatClass, - Class<K> keyClass, - Class<V> valueClass, - SerializableFunction<T, KV<K, V>> outputConverter) { - return HDFSFileSink.<T, K, V>builder() - .setPath(path) - .setFormatClass(formatClass) - .setKeyClass(keyClass) - .setValueClass(valueClass) - .setOutputConverter(outputConverter) - .setConfiguration(null) - .setUsername(null) - .setValidate(true) - .build(); - } - - public static <T> HDFSFileSink<T, NullWritable, Text> toText(String path) { - SerializableFunction<T, KV<NullWritable, Text>> outputConverter = - new SerializableFunction<T, KV<NullWritable, Text>>() { - @Override - public KV<NullWritable, Text> apply(T input) { - return KV.of(NullWritable.get(), new Text(input.toString())); - } - }; - return to(path, TextOutputFormat.class, NullWritable.class, Text.class, outputConverter); - } - - /** - * Helper to create Avro sink given {@link AvroCoder}. Keep in mind that configuration - * object is altered to enable Avro output. - */ - public static <T> HDFSFileSink<T, AvroKey<T>, NullWritable> toAvro(String path, - final AvroCoder<T> coder, - Configuration conf) { - SerializableFunction<T, KV<AvroKey<T>, NullWritable>> outputConverter = - new SerializableFunction<T, KV<AvroKey<T>, NullWritable>>() { - @Override - public KV<AvroKey<T>, NullWritable> apply(T input) { - return KV.of(new AvroKey<>(input), NullWritable.get()); - } - }; - conf.set("avro.schema.output.key", coder.getSchema().toString()); - return to( - path, - AvroKeyOutputFormat.class, - (Class<AvroKey<T>>) (Class<?>) AvroKey.class, - NullWritable.class, - outputConverter).withConfiguration(conf); - } - - /** - * Helper to create Avro sink given {@link Schema}. Keep in mind that configuration - * object is altered to enable Avro output. - */ - public static HDFSFileSink<GenericRecord, AvroKey<GenericRecord>, NullWritable> - toAvro(String path, Schema schema, Configuration conf) { - return toAvro(path, AvroCoder.of(schema), conf); - } - - /** - * Helper to create Avro sink given {@link Class}. Keep in mind that configuration - * object is altered to enable Avro output. - */ - public static <T> HDFSFileSink<T, AvroKey<T>, NullWritable> toAvro(String path, - Class<T> cls, - Configuration conf) { - return toAvro(path, AvroCoder.of(cls), conf); - } - - // ======================================================================= - // Builder methods - // ======================================================================= - - public abstract Builder<T, K, V> toBuilder(); - public static <T, K, V> Builder builder() { - return new AutoValue_HDFSFileSink.Builder<>(); - } - - /** - * AutoValue builder for {@link HDFSFileSink}. - */ - @AutoValue.Builder - public abstract static class Builder<T, K, V> { - public abstract Builder<T, K, V> setPath(String path); - public abstract Builder<T, K, V> setFormatClass( - Class<? extends FileOutputFormat<K, V>> formatClass); - public abstract Builder<T, K, V> setKeyClass(Class<K> keyClass); - public abstract Builder<T, K, V> setValueClass(Class<V> valueClass); - public abstract Builder<T, K, V> setOutputConverter( - SerializableFunction<T, KV<K, V>> outputConverter); - public abstract Builder<T, K, V> setSerializableConfiguration( - SerializableConfiguration serializableConfiguration); - public Builder<T, K, V> setConfiguration(@Nullable Configuration configuration) { - if (configuration == null) { - configuration = new Configuration(false); - } - return this.setSerializableConfiguration(new SerializableConfiguration(configuration)); - } - public abstract Builder<T, K, V> setUsername(String username); - public abstract Builder<T, K, V> setValidate(boolean validate); - public abstract HDFSFileSink<T, K, V> build(); - } - - public HDFSFileSink<T, K, V> withConfiguration(@Nullable Configuration configuration) { - return this.toBuilder().setConfiguration(configuration).build(); - } - - public HDFSFileSink<T, K, V> withUsername(@Nullable String username) { - return this.toBuilder().setUsername(username).build(); - } - - // ======================================================================= - // Sink - // ======================================================================= - - @Override - public void validate(PipelineOptions options) { - if (validate()) { - try { - UGIHelper.getBestUGI(username()).doAs(new PrivilegedExceptionAction<Void>() { - @Override - public Void run() throws Exception { - FileSystem fs = FileSystem.get(new URI(path()), - SerializableConfiguration.newConfiguration(serializableConfiguration())); - checkState(!fs.exists(new Path(path())), "Output path %s already exists", path()); - return null; - } - }); - } catch (IOException | InterruptedException e) { - throw new RuntimeException(e); - } - } - } - - @Override - public Sink.WriteOperation<T, String> createWriteOperation() { - return new HDFSWriteOperation<>(this, path(), formatClass()); - } - - private Job newJob() throws IOException { - Job job = SerializableConfiguration.newJob(serializableConfiguration()); - job.setJobID(jobId); - job.setOutputKeyClass(keyClass()); - job.setOutputValueClass(valueClass()); - return job; - } - - // ======================================================================= - // WriteOperation - // ======================================================================= - - /** {{@link WriteOperation}} for HDFS. */ - private static class HDFSWriteOperation<T, K, V> extends WriteOperation<T, String> { - - private final HDFSFileSink<T, K, V> sink; - private final String path; - private final Class<? extends FileOutputFormat<K, V>> formatClass; - - HDFSWriteOperation(HDFSFileSink<T, K, V> sink, - String path, - Class<? extends FileOutputFormat<K, V>> formatClass) { - this.sink = sink; - this.path = path; - this.formatClass = formatClass; - } - - @Override - public void initialize(PipelineOptions options) throws Exception { - Job job = sink.newJob(); - FileOutputFormat.setOutputPath(job, new Path(path)); - } - - @Override - public void setWindowedWrites(boolean windowedWrites) { - } - - @Override - public void finalize(final Iterable<String> writerResults, PipelineOptions options) - throws Exception { - UGIHelper.getBestUGI(sink.username()).doAs(new PrivilegedExceptionAction<Void>() { - @Override - public Void run() throws Exception { - doFinalize(writerResults); - return null; - } - }); - } - - private void doFinalize(Iterable<String> writerResults) throws Exception { - Job job = sink.newJob(); - FileSystem fs = FileSystem.get(new URI(path), job.getConfiguration()); - // If there are 0 output shards, just create output folder. - if (!writerResults.iterator().hasNext()) { - fs.mkdirs(new Path(path)); - return; - } - - // job successful - JobContext context = new JobContextImpl(job.getConfiguration(), job.getJobID()); - FileOutputCommitter outputCommitter = new FileOutputCommitter(new Path(path), context); - outputCommitter.commitJob(context); - - // get actual output shards - Set<String> actual = Sets.newHashSet(); - FileStatus[] statuses = fs.listStatus(new Path(path), new PathFilter() { - @Override - public boolean accept(Path path) { - String name = path.getName(); - return !name.startsWith("_") && !name.startsWith("."); - } - }); - - // get expected output shards - Set<String> expected = Sets.newHashSet(writerResults); - checkState( - expected.size() == Lists.newArrayList(writerResults).size(), - "Data loss due to writer results hash collision"); - for (FileStatus s : statuses) { - String name = s.getPath().getName(); - int pos = name.indexOf('.'); - actual.add(pos > 0 ? name.substring(0, pos) : name); - } - - checkState(actual.equals(expected), "Writer results and output files do not match"); - - // rename output shards to Hadoop style, i.e. part-r-00000.txt - int i = 0; - for (FileStatus s : statuses) { - String name = s.getPath().getName(); - int pos = name.indexOf('.'); - String ext = pos > 0 ? name.substring(pos) : ""; - fs.rename( - s.getPath(), - new Path(s.getPath().getParent(), String.format("part-r-%05d%s", i, ext))); - i++; - } - } - - @Override - public Writer<T, String> createWriter(PipelineOptions options) throws Exception { - return new HDFSWriter<>(this, path, formatClass); - } - - @Override - public Sink<T> getSink() { - return sink; - } - - @Override - public Coder<String> getWriterResultCoder() { - return StringUtf8Coder.of(); - } - - } - - // ======================================================================= - // Writer - // ======================================================================= - - private static class HDFSWriter<T, K, V> extends Writer<T, String> { - - private final HDFSWriteOperation<T, K, V> writeOperation; - private final String path; - private final Class<? extends FileOutputFormat<K, V>> formatClass; - - // unique hash for each task - private int hash; - - private TaskAttemptContext context; - private RecordWriter<K, V> recordWriter; - private FileOutputCommitter outputCommitter; - - HDFSWriter(HDFSWriteOperation<T, K, V> writeOperation, - String path, - Class<? extends FileOutputFormat<K, V>> formatClass) { - this.writeOperation = writeOperation; - this.path = path; - this.formatClass = formatClass; - } - - @Override - public void openWindowed(final String uId, - BoundedWindow window, - PaneInfo paneInfo, - int shard, - int numShards) throws Exception { - throw new UnsupportedOperationException("Windowing support not implemented yet for" - + "HDFS. Window " + window); - } - - @Override - public void openUnwindowed(final String uId, int shard, int numShards) throws Exception { - UGIHelper.getBestUGI(writeOperation.sink.username()).doAs( - new PrivilegedExceptionAction<Void>() { - @Override - public Void run() throws Exception { - doOpen(uId); - return null; - } - } - ); - } - - private void doOpen(String uId) throws Exception { - this.hash = uId.hashCode(); - - Job job = writeOperation.sink.newJob(); - FileOutputFormat.setOutputPath(job, new Path(path)); - - // Each Writer is responsible for writing one bundle of elements and is represented by one - // unique Hadoop task based on uId/hash. All tasks share the same job ID. - JobID jobId = job.getJobID(); - TaskID taskId = new TaskID(jobId, TaskType.REDUCE, hash); - context = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID(taskId, 0)); - - FileOutputFormat<K, V> outputFormat = formatClass.newInstance(); - recordWriter = outputFormat.getRecordWriter(context); - outputCommitter = (FileOutputCommitter) outputFormat.getOutputCommitter(context); - } - - @Override - public void write(T value) throws Exception { - checkNotNull(recordWriter, - "Record writer can't be null. Make sure to open Writer first!"); - KV<K, V> kv = writeOperation.sink.outputConverter().apply(value); - recordWriter.write(kv.getKey(), kv.getValue()); - } - - @Override - public void cleanup() throws Exception { - - } - - @Override - public String close() throws Exception { - return UGIHelper.getBestUGI(writeOperation.sink.username()).doAs( - new PrivilegedExceptionAction<String>() { - @Override - public String run() throws Exception { - return doClose(); - } - }); - } - - private String doClose() throws Exception { - // task/attempt successful - recordWriter.close(context); - outputCommitter.commitTask(context); - - // result is prefix of the output file name - return String.format("part-r-%d", hash); - } - - @Override - public WriteOperation<T, String> getWriteOperation() { - return writeOperation; - } - - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/7512a73c/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java deleted file mode 100644 index 5cc2097..0000000 --- a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java +++ /dev/null @@ -1,625 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.hdfs; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkState; - -import com.google.auto.value.AutoValue; -import com.google.common.base.Function; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.security.PrivilegedExceptionAction; -import java.util.List; -import java.util.ListIterator; -import java.util.NoSuchElementException; -import javax.annotation.Nullable; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.mapred.AvroKey; -import org.apache.avro.mapreduce.AvroKeyInputFormat; -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.coders.AvroCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.VoidCoder; -import org.apache.beam.sdk.io.BoundedSource; -import org.apache.beam.sdk.io.hadoop.SerializableConfiguration; -import org.apache.beam.sdk.io.hadoop.WritableCoder; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.util.CoderUtils; -import org.apache.beam.sdk.values.KV; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableUtils; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; -import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A {@code BoundedSource} for reading files resident in a Hadoop filesystem using a - * Hadoop file-based input format. - * - * <p>To read a {@link org.apache.beam.sdk.values.PCollection} of - * {@link org.apache.beam.sdk.values.KV} key-value pairs from one or more - * Hadoop files, use {@link HDFSFileSource#from} to specify the path(s) of the files to - * read, the Hadoop {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}, the - * key class and the value class. - * - * <p>A {@code HDFSFileSource} can be read from using the - * {@link org.apache.beam.sdk.io.Read} transform. For example: - * - * <pre> - * {@code - * HDFSFileSource<K, V> source = HDFSFileSource.from(path, MyInputFormat.class, - * MyKey.class, MyValue.class); - * PCollection<KV<MyKey, MyValue>> records = pipeline.apply(Read.from(mySource)); - * } - * </pre> - * - * <p>Implementation note: Since Hadoop's - * {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat} - * determines the input splits, this class extends {@link BoundedSource} rather than - * {@link org.apache.beam.sdk.io.OffsetBasedSource}, since the latter - * dictates input splits. - * @param <T> the type of elements of the result {@link org.apache.beam.sdk.values.PCollection}. - * @param <K> the type of keys to be read from the source via {@link FileInputFormat}. - * @param <V> the type of values to be read from the source via {@link FileInputFormat}. - */ -@AutoValue -@Experimental -public abstract class HDFSFileSource<T, K, V> extends BoundedSource<T> { - private static final long serialVersionUID = 0L; - - private static final Logger LOG = LoggerFactory.getLogger(HDFSFileSource.class); - - public abstract String filepattern(); - public abstract Class<? extends FileInputFormat<K, V>> formatClass(); - public abstract Coder<T> coder(); - public abstract SerializableFunction<KV<K, V>, T> inputConverter(); - public abstract SerializableConfiguration serializableConfiguration(); - public @Nullable abstract SerializableSplit serializableSplit(); - public @Nullable abstract String username(); - public abstract boolean validateSource(); - - // ======================================================================= - // Factory methods - // ======================================================================= - - public static <T, K, V, W extends FileInputFormat<K, V>> HDFSFileSource<T, K, V> - from(String filepattern, - Class<W> formatClass, - Coder<T> coder, - SerializableFunction<KV<K, V>, T> inputConverter) { - return HDFSFileSource.<T, K, V>builder() - .setFilepattern(filepattern) - .setFormatClass(formatClass) - .setCoder(coder) - .setInputConverter(inputConverter) - .setConfiguration(null) - .setUsername(null) - .setValidateSource(true) - .setSerializableSplit(null) - .build(); - } - - public static <K, V, W extends FileInputFormat<K, V>> HDFSFileSource<KV<K, V>, K, V> - from(String filepattern, - Class<W> formatClass, - Class<K> keyClass, - Class<V> valueClass) { - KvCoder<K, V> coder = KvCoder.of(getDefaultCoder(keyClass), getDefaultCoder(valueClass)); - SerializableFunction<KV<K, V>, KV<K, V>> inputConverter = - new SerializableFunction<KV<K, V>, KV<K, V>>() { - @Override - public KV<K, V> apply(KV<K, V> input) { - return input; - } - }; - return HDFSFileSource.<KV<K, V>, K, V>builder() - .setFilepattern(filepattern) - .setFormatClass(formatClass) - .setCoder(coder) - .setInputConverter(inputConverter) - .setConfiguration(null) - .setUsername(null) - .setValidateSource(true) - .setSerializableSplit(null) - .build(); - } - - public static HDFSFileSource<String, LongWritable, Text> - fromText(String filepattern) { - SerializableFunction<KV<LongWritable, Text>, String> inputConverter = - new SerializableFunction<KV<LongWritable, Text>, String>() { - @Override - public String apply(KV<LongWritable, Text> input) { - return input.getValue().toString(); - } - }; - return from(filepattern, TextInputFormat.class, StringUtf8Coder.of(), inputConverter); - } - - /** - * Helper to read from Avro source given {@link AvroCoder}. Keep in mind that configuration - * object is altered to enable Avro input. - */ - public static <T> HDFSFileSource<T, AvroKey<T>, NullWritable> - fromAvro(String filepattern, final AvroCoder<T> coder, Configuration conf) { - Class<AvroKeyInputFormat<T>> formatClass = castClass(AvroKeyInputFormat.class); - SerializableFunction<KV<AvroKey<T>, NullWritable>, T> inputConverter = - new SerializableFunction<KV<AvroKey<T>, NullWritable>, T>() { - @Override - public T apply(KV<AvroKey<T>, NullWritable> input) { - try { - return CoderUtils.clone(coder, input.getKey().datum()); - } catch (CoderException e) { - throw new RuntimeException(e); - } - } - }; - conf.set("avro.schema.input.key", coder.getSchema().toString()); - return from(filepattern, formatClass, coder, inputConverter).withConfiguration(conf); - } - - /** - * Helper to read from Avro source given {@link Schema}. Keep in mind that configuration - * object is altered to enable Avro input. - */ - public static HDFSFileSource<GenericRecord, AvroKey<GenericRecord>, NullWritable> - fromAvro(String filepattern, Schema schema, Configuration conf) { - return fromAvro(filepattern, AvroCoder.of(schema), conf); - } - - /** - * Helper to read from Avro source given {@link Class}. Keep in mind that configuration - * object is altered to enable Avro input. - */ - public static <T> HDFSFileSource<T, AvroKey<T>, NullWritable> - fromAvro(String filepattern, Class<T> cls, Configuration conf) { - return fromAvro(filepattern, AvroCoder.of(cls), conf); - } - - // ======================================================================= - // Builder methods - // ======================================================================= - - public abstract HDFSFileSource.Builder<T, K, V> toBuilder(); - public static <T, K, V> HDFSFileSource.Builder builder() { - return new AutoValue_HDFSFileSource.Builder<>(); - } - - /** - * AutoValue builder for {@link HDFSFileSource}. - */ - @AutoValue.Builder - public abstract static class Builder<T, K, V> { - public abstract Builder<T, K, V> setFilepattern(String filepattern); - public abstract Builder<T, K, V> setFormatClass( - Class<? extends FileInputFormat<K, V>> formatClass); - public abstract Builder<T, K, V> setCoder(Coder<T> coder); - public abstract Builder<T, K, V> setInputConverter( - SerializableFunction<KV<K, V>, T> inputConverter); - public abstract Builder<T, K, V> setSerializableConfiguration( - SerializableConfiguration serializableConfiguration); - public Builder<T, K, V> setConfiguration(Configuration configuration) { - if (configuration == null) { - configuration = new Configuration(false); - } - return this.setSerializableConfiguration(new SerializableConfiguration(configuration)); - } - public abstract Builder<T, K, V> setSerializableSplit(SerializableSplit serializableSplit); - public abstract Builder<T, K, V> setUsername(@Nullable String username); - public abstract Builder<T, K, V> setValidateSource(boolean validate); - public abstract HDFSFileSource<T, K, V> build(); - } - - public HDFSFileSource<T, K, V> withConfiguration(@Nullable Configuration configuration) { - return this.toBuilder().setConfiguration(configuration).build(); - } - - public HDFSFileSource<T, K, V> withUsername(@Nullable String username) { - return this.toBuilder().setUsername(username).build(); - } - - // ======================================================================= - // BoundedSource - // ======================================================================= - - @Override - public List<? extends BoundedSource<T>> split( - final long desiredBundleSizeBytes, - PipelineOptions options) throws Exception { - if (serializableSplit() == null) { - List<InputSplit> inputSplits = UGIHelper.getBestUGI(username()).doAs( - new PrivilegedExceptionAction<List<InputSplit>>() { - @Override - public List<InputSplit> run() throws Exception { - return computeSplits(desiredBundleSizeBytes, serializableConfiguration()); - } - }); - return Lists.transform(inputSplits, - new Function<InputSplit, BoundedSource<T>>() { - @Override - public BoundedSource<T> apply(@Nullable InputSplit inputSplit) { - SerializableSplit serializableSplit = new SerializableSplit(inputSplit); - return HDFSFileSource.this.toBuilder() - .setSerializableSplit(serializableSplit) - .build(); - } - }); - } else { - return ImmutableList.of(this); - } - } - - @Override - public long getEstimatedSizeBytes(PipelineOptions options) { - long size = 0; - - try { - // If this source represents a split from split, - // then return the size of the split, rather then the entire input - if (serializableSplit() != null) { - return serializableSplit().getSplit().getLength(); - } - - size += UGIHelper.getBestUGI(username()).doAs(new PrivilegedExceptionAction<Long>() { - @Override - public Long run() throws Exception { - long size = 0; - Job job = SerializableConfiguration.newJob(serializableConfiguration()); - for (FileStatus st : listStatus(createFormat(job), job)) { - size += st.getLen(); - } - return size; - } - }); - } catch (IOException e) { - LOG.warn( - "Will estimate size of input to be 0 bytes. Can't estimate size of the input due to:", e); - // ignore, and return 0 - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOG.warn( - "Will estimate size of input to be 0 bytes. Can't estimate size of the input due to:", e); - // ignore, and return 0 - } - return size; - } - - @Override - public BoundedReader<T> createReader(PipelineOptions options) throws IOException { - this.validate(); - return new HDFSFileReader<>(this, filepattern(), formatClass(), serializableSplit()); - } - - @Override - public void validate() { - if (validateSource()) { - try { - UGIHelper.getBestUGI(username()).doAs(new PrivilegedExceptionAction<Void>() { - @Override - public Void run() throws Exception { - final Path pathPattern = new Path(filepattern()); - FileSystem fs = FileSystem.get(pathPattern.toUri(), - SerializableConfiguration.newConfiguration(serializableConfiguration())); - FileStatus[] fileStatuses = fs.globStatus(pathPattern); - checkState( - fileStatuses != null && fileStatuses.length > 0, - "Unable to find any files matching %s", filepattern()); - return null; - } - }); - } catch (IOException | InterruptedException e) { - throw new RuntimeException(e); - } - } - } - - @Override - public Coder<T> getDefaultOutputCoder() { - return coder(); - } - - // ======================================================================= - // Helpers - // ======================================================================= - - private List<InputSplit> computeSplits(long desiredBundleSizeBytes, - SerializableConfiguration serializableConfiguration) - throws IOException, IllegalAccessException, InstantiationException { - Job job = SerializableConfiguration.newJob(serializableConfiguration); - FileInputFormat.setMinInputSplitSize(job, desiredBundleSizeBytes); - FileInputFormat.setMaxInputSplitSize(job, desiredBundleSizeBytes); - return createFormat(job).getSplits(job); - } - - private FileInputFormat<K, V> createFormat(Job job) - throws IOException, IllegalAccessException, InstantiationException { - Path path = new Path(filepattern()); - FileInputFormat.addInputPath(job, path); - return formatClass().newInstance(); - } - - private List<FileStatus> listStatus(FileInputFormat<K, V> format, Job job) - throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { - // FileInputFormat#listStatus is protected, so call using reflection - Method listStatus = FileInputFormat.class.getDeclaredMethod("listStatus", JobContext.class); - listStatus.setAccessible(true); - @SuppressWarnings("unchecked") - List<FileStatus> stat = (List<FileStatus>) listStatus.invoke(format, job); - return stat; - } - - @SuppressWarnings("unchecked") - private static <T> Coder<T> getDefaultCoder(Class<T> c) { - if (Writable.class.isAssignableFrom(c)) { - Class<? extends Writable> writableClass = (Class<? extends Writable>) c; - return (Coder<T>) WritableCoder.of(writableClass); - } else if (Void.class.equals(c)) { - return (Coder<T>) VoidCoder.of(); - } - // TODO: how to use registered coders here? - throw new IllegalStateException("Cannot find coder for " + c); - } - - @SuppressWarnings("unchecked") - private static <T> Class<T> castClass(Class<?> aClass) { - return (Class<T>) aClass; - } - - // ======================================================================= - // BoundedReader - // ======================================================================= - - private static class HDFSFileReader<T, K, V> extends BoundedSource.BoundedReader<T> { - - private final HDFSFileSource<T, K, V> source; - private final String filepattern; - private final Class<? extends FileInputFormat<K, V>> formatClass; - private final Job job; - - private List<InputSplit> splits; - private ListIterator<InputSplit> splitsIterator; - - private Configuration conf; - private FileInputFormat<?, ?> format; - private TaskAttemptContext attemptContext; - private RecordReader<K, V> currentReader; - private KV<K, V> currentPair; - - HDFSFileReader( - HDFSFileSource<T, K, V> source, - String filepattern, - Class<? extends FileInputFormat<K, V>> formatClass, - SerializableSplit serializableSplit) - throws IOException { - this.source = source; - this.filepattern = filepattern; - this.formatClass = formatClass; - this.job = SerializableConfiguration.newJob(source.serializableConfiguration()); - - if (serializableSplit != null) { - this.splits = ImmutableList.of(serializableSplit.getSplit()); - this.splitsIterator = splits.listIterator(); - } - } - - @Override - public boolean start() throws IOException { - Path path = new Path(filepattern); - FileInputFormat.addInputPath(job, path); - - conf = job.getConfiguration(); - try { - format = formatClass.newInstance(); - } catch (InstantiationException | IllegalAccessException e) { - throw new IOException("Cannot instantiate file input format " + formatClass, e); - } - attemptContext = new TaskAttemptContextImpl(conf, new TaskAttemptID()); - - if (splitsIterator == null) { - splits = format.getSplits(job); - splitsIterator = splits.listIterator(); - } - - return advance(); - } - - @Override - public boolean advance() throws IOException { - try { - if (currentReader != null && currentReader.nextKeyValue()) { - currentPair = nextPair(); - return true; - } else { - while (splitsIterator.hasNext()) { - // advance the reader and see if it has records - final InputSplit nextSplit = splitsIterator.next(); - @SuppressWarnings("unchecked") - RecordReader<K, V> reader = - (RecordReader<K, V>) format.createRecordReader(nextSplit, attemptContext); - if (currentReader != null) { - currentReader.close(); - } - currentReader = reader; - UGIHelper.getBestUGI(source.username()).doAs(new PrivilegedExceptionAction<Void>() { - @Override - public Void run() throws Exception { - currentReader.initialize(nextSplit, attemptContext); - return null; - } - }); - if (currentReader.nextKeyValue()) { - currentPair = nextPair(); - return true; - } - currentReader.close(); - currentReader = null; - } - // either no next split or all readers were empty - currentPair = null; - return false; - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IOException(e); - } - } - - @Override - public T getCurrent() throws NoSuchElementException { - if (currentPair == null) { - throw new NoSuchElementException(); - } - return source.inputConverter().apply(currentPair); - } - - @Override - public void close() throws IOException { - if (currentReader != null) { - currentReader.close(); - currentReader = null; - } - currentPair = null; - } - - @Override - public BoundedSource<T> getCurrentSource() { - return source; - } - - @SuppressWarnings("unchecked") - private KV<K, V> nextPair() throws IOException, InterruptedException { - K key = currentReader.getCurrentKey(); - V value = currentReader.getCurrentValue(); - // clone Writable objects since they are reused between calls to RecordReader#nextKeyValue - if (key instanceof Writable) { - key = (K) WritableUtils.clone((Writable) key, conf); - } - if (value instanceof Writable) { - value = (V) WritableUtils.clone((Writable) value, conf); - } - return KV.of(key, value); - } - - // ======================================================================= - // Optional overrides - // ======================================================================= - - @Override - public Double getFractionConsumed() { - if (currentReader == null) { - return 0.0; - } - if (splits.isEmpty()) { - return 1.0; - } - int index = splitsIterator.previousIndex(); - int numReaders = splits.size(); - if (index == numReaders) { - return 1.0; - } - double before = 1.0 * index / numReaders; - double after = 1.0 * (index + 1) / numReaders; - Double fractionOfCurrentReader = getProgress(); - if (fractionOfCurrentReader == null) { - return before; - } - return before + fractionOfCurrentReader * (after - before); - } - - private Double getProgress() { - try { - return (double) currentReader.getProgress(); - } catch (IOException | InterruptedException e) { - return null; - } - } - - } - - // ======================================================================= - // SerializableSplit - // ======================================================================= - - /** - * A wrapper to allow Hadoop {@link org.apache.hadoop.mapreduce.InputSplit}s to be - * serialized using Java's standard serialization mechanisms. Note that the InputSplit - * has to be Writable (which most are). - */ - protected static class SerializableSplit implements Externalizable { - private static final long serialVersionUID = 0L; - - private InputSplit split; - - public SerializableSplit() { - } - - public SerializableSplit(InputSplit split) { - checkArgument(split instanceof Writable, "Split is not writable: %s", split); - this.split = split; - } - - public InputSplit getSplit() { - return split; - } - - @Override - public void writeExternal(ObjectOutput out) throws IOException { - out.writeUTF(split.getClass().getCanonicalName()); - ((Writable) split).write(out); - } - - @Override - public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - String className = in.readUTF(); - try { - split = (InputSplit) Class.forName(className).newInstance(); - ((Writable) split).readFields(in); - } catch (InstantiationException | IllegalAccessException e) { - throw new IOException(e); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/7512a73c/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java deleted file mode 100644 index fe2db5f..0000000 --- a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java +++ /dev/null @@ -1,195 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.hdfs; - -import java.io.Serializable; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.display.HasDisplayData; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; - -/** - * This class is deprecated, and only exists for HDFSFileSink. - */ -@Deprecated -public abstract class Sink<T> implements Serializable, HasDisplayData { - /** - * Ensures that the sink is valid and can be written to before the write operation begins. One - * should use {@link com.google.common.base.Preconditions} to implement this method. - */ - public abstract void validate(PipelineOptions options); - - /** - * Returns an instance of a {@link WriteOperation} that can write to this Sink. - */ - public abstract WriteOperation<T, ?> createWriteOperation(); - - /** - * {@inheritDoc} - * - * <p>By default, does not register any display data. Implementors may override this method - * to provide their own display data. - */ - @Override - public void populateDisplayData(DisplayData.Builder builder) {} - - /** - * A {@link WriteOperation} defines the process of a parallel write of objects to a Sink. - * - * <p>The {@code WriteOperation} defines how to perform initialization and finalization of a - * parallel write to a sink as well as how to create a {@link Sink.Writer} object that can write - * a bundle to the sink. - * - * <p>Since operations in Beam may be run multiple times for redundancy or fault-tolerance, - * the initialization and finalization defined by a WriteOperation <b>must be idempotent</b>. - * - * <p>{@code WriteOperation}s may be mutable; a {@code WriteOperation} is serialized after the - * call to {@code initialize} method and deserialized before calls to - * {@code createWriter} and {@code finalized}. However, it is not - * reserialized after {@code createWriter}, so {@code createWriter} should not mutate the - * state of the {@code WriteOperation}. - * - * <p>See {@link Sink} for more detailed documentation about the process of writing to a Sink. - * - * @param <T> The type of objects to write - * @param <WriteT> The result of a per-bundle write - */ - public abstract static class WriteOperation<T, WriteT> implements Serializable { - /** - * Performs initialization before writing to the sink. Called before writing begins. - */ - public abstract void initialize(PipelineOptions options) throws Exception; - - /** - * Indicates that the operation will be performing windowed writes. - */ - public abstract void setWindowedWrites(boolean windowedWrites); - - /** - * Given an Iterable of results from bundle writes, performs finalization after writing and - * closes the sink. Called after all bundle writes are complete. - * - * <p>The results that are passed to finalize are those returned by bundles that completed - * successfully. Although bundles may have been run multiple times (for fault-tolerance), only - * one writer result will be passed to finalize for each bundle. An implementation of finalize - * should perform clean up of any failed and successfully retried bundles. Note that these - * failed bundles will not have their writer result passed to finalize, so finalize should be - * capable of locating any temporary/partial output written by failed bundles. - * - * <p>A best practice is to make finalize atomic. If this is impossible given the semantics - * of the sink, finalize should be idempotent, as it may be called multiple times in the case of - * failure/retry or for redundancy. - * - * <p>Note that the iteration order of the writer results is not guaranteed to be consistent if - * finalize is called multiple times. - * - * @param writerResults an Iterable of results from successful bundle writes. - */ - public abstract void finalize(Iterable<WriteT> writerResults, PipelineOptions options) - throws Exception; - - /** - * Creates a new {@link Sink.Writer} to write a bundle of the input to the sink. - * - * <p>The bundle id that the writer will use to uniquely identify its output will be passed to - * {@link Writer#openWindowed} or {@link Writer#openUnwindowed}. - * - * <p>Must not mutate the state of the WriteOperation. - */ - public abstract Writer<T, WriteT> createWriter(PipelineOptions options) throws Exception; - - /** - * Returns the Sink that this write operation writes to. - */ - public abstract Sink<T> getSink(); - - /** - * Returns a coder for the writer result type. - */ - public abstract Coder<WriteT> getWriterResultCoder(); - } - - /** - * A Writer writes a bundle of elements from a PCollection to a sink. - * {@link Writer#openWindowed} or {@link Writer#openUnwindowed} is called before writing begins - * and {@link Writer#close} is called after all elements in the bundle have been written. - * {@link Writer#write} writes an element to the sink. - * - * <p>Note that any access to static members or methods of a Writer must be thread-safe, as - * multiple instances of a Writer may be instantiated in different threads on the same worker. - * - * <p>See {@link Sink} for more detailed documentation about the process of writing to a Sink. - * - * @param <T> The type of object to write - * @param <WriteT> The writer results type (e.g., the bundle's output filename, as String) - */ - public abstract static class Writer<T, WriteT> { - /** - * Performs bundle initialization. For example, creates a temporary file for writing or - * initializes any state that will be used across calls to {@link Writer#write}. - * - * <p>The unique id that is given to open should be used to ensure that the writer's output does - * not interfere with the output of other Writers, as a bundle may be executed many times for - * fault tolerance. See {@link Sink} for more information about bundle ids. - * - * <p>The window and paneInfo arguments are populated when windowed writes are requested. - * shard and numbShards are populated for the case of static sharding. In cases where the - * runner is dynamically picking sharding, shard and numShards might both be set to -1. - */ - public abstract void openWindowed(String uId, - BoundedWindow window, - PaneInfo paneInfo, - int shard, - int numShards) throws Exception; - - /** - * Perform bundle initialization for the case where the file is written unwindowed. - */ - public abstract void openUnwindowed(String uId, - int shard, - int numShards) throws Exception; - - public abstract void cleanup() throws Exception; - - /** - * Called for each value in the bundle. - */ - public abstract void write(T value) throws Exception; - - /** - * Finishes writing the bundle. Closes any resources used for writing the bundle. - * - * <p>Returns a writer result that will be used in the {@link Sink.WriteOperation}'s - * finalization. The result should contain some way to identify the output of this bundle (using - * the bundle id). {@link WriteOperation#finalize} will use the writer result to identify - * successful writes. See {@link Sink} for more information about bundle ids. - * - * @return the writer result - */ - public abstract WriteT close() throws Exception; - - /** - * Returns the write operation this writer belongs to. - */ - public abstract WriteOperation<T, WriteT> getWriteOperation(); - - - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/7512a73c/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/UGIHelper.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/UGIHelper.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/UGIHelper.java deleted file mode 100644 index fd05a19..0000000 --- a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/UGIHelper.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.hdfs; - -import java.io.IOException; -import javax.annotation.Nullable; -import org.apache.hadoop.security.UserGroupInformation; - -/** - * {@link UserGroupInformation} helper methods. - */ -public class UGIHelper { - - /** - * Find the most appropriate UserGroupInformation to use. - * @param username the user name, or NULL if none is specified. - * @return the most appropriate UserGroupInformation - */ - public static UserGroupInformation getBestUGI(@Nullable String username) throws IOException { - return UserGroupInformation.getBestUGI(null, username); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/7512a73c/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java deleted file mode 100644 index ef6556e..0000000 --- a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/Write.java +++ /dev/null @@ -1,588 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.hdfs; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.ThreadLocalRandom; -import javax.annotation.Nullable; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.SerializableCoder; -import org.apache.beam.sdk.coders.VoidCoder; -import org.apache.beam.sdk.io.hdfs.Sink.WriteOperation; -import org.apache.beam.sdk.io.hdfs.Sink.Writer; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.ValueProvider; -import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.View; -import org.apache.beam.sdk.transforms.WithKeys; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; -import org.apache.beam.sdk.transforms.windowing.GlobalWindows; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollection.IsBounded; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.PDone; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This class is deprecated, and only exists currently for HDFSFileSink. - */ -@Deprecated -public class Write<T> extends PTransform<PCollection<T>, PDone> { - private static final Logger LOG = LoggerFactory.getLogger(Write.class); - - private static final int UNKNOWN_SHARDNUM = -1; - private static final int UNKNOWN_NUMSHARDS = -1; - - private final Sink<T> sink; - // This allows the number of shards to be dynamically computed based on the input - // PCollection. - @Nullable - private final PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards; - // We don't use a side input for static sharding, as we want this value to be updatable - // when a pipeline is updated. - @Nullable - private final ValueProvider<Integer> numShardsProvider; - private boolean windowedWrites; - - /** - * Creates a {@link Write} transform that writes to the given {@link Sink}, letting the runner - * control how many different shards are produced. - */ - public static <T> Write<T> to(Sink<T> sink) { - checkNotNull(sink, "sink"); - return new Write<>(sink, null /* runner-determined sharding */, null, false); - } - - private Write( - Sink<T> sink, - @Nullable PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards, - @Nullable ValueProvider<Integer> numShardsProvider, - boolean windowedWrites) { - this.sink = sink; - this.computeNumShards = computeNumShards; - this.numShardsProvider = numShardsProvider; - this.windowedWrites = windowedWrites; - } - - @Override - public PDone expand(PCollection<T> input) { - checkArgument(IsBounded.BOUNDED == input.isBounded() || windowedWrites, - "%s can only be applied to an unbounded PCollection if doing windowed writes", - Write.class.getSimpleName()); - return createWrite(input, sink.createWriteOperation()); - } - - @Override - public void validate(PipelineOptions options) { - sink.validate(options); - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - builder - .add(DisplayData.item("sink", sink.getClass()).withLabel("Write Sink")) - .include("sink", sink); - if (getSharding() != null) { - builder.include("sharding", getSharding()); - } else if (getNumShards() != null) { - String numShards = getNumShards().isAccessible() - ? getNumShards().get().toString() : getNumShards().toString(); - builder.add(DisplayData.item("numShards", numShards) - .withLabel("Fixed Number of Shards")); - } - } - - /** - * Returns the {@link Sink} associated with this PTransform. - */ - public Sink<T> getSink() { - return sink; - } - - /** - * Gets the {@link PTransform} that will be used to determine sharding. This can be either a - * static number of shards (as following a call to {@link #withNumShards(int)}), dynamic (by - * {@link #withSharding(PTransform)}), or runner-determined (by {@link - * #withRunnerDeterminedSharding()}. - */ - @Nullable - public PTransform<PCollection<T>, PCollectionView<Integer>> getSharding() { - return computeNumShards; - } - - public ValueProvider<Integer> getNumShards() { - return numShardsProvider; - } - - /** - * Returns a new {@link Write} that will write to the current {@link Sink} using the - * specified number of shards. - * - * <p>This option should be used sparingly as it can hurt performance. See {@link Write} for - * more information. - * - * <p>A value less than or equal to 0 will be equivalent to the default behavior of - * runner-determined sharding. - */ - public Write<T> withNumShards(int numShards) { - if (numShards > 0) { - return withNumShards(StaticValueProvider.of(numShards)); - } - return withRunnerDeterminedSharding(); - } - - /** - * Returns a new {@link Write} that will write to the current {@link Sink} using the - * {@link ValueProvider} specified number of shards. - * - * <p>This option should be used sparingly as it can hurt performance. See {@link Write} for - * more information. - */ - public Write<T> withNumShards(ValueProvider<Integer> numShardsProvider) { - return new Write<>(sink, null, numShardsProvider, windowedWrites); - } - - /** - * Returns a new {@link Write} that will write to the current {@link Sink} using the - * specified {@link PTransform} to compute the number of shards. - * - * <p>This option should be used sparingly as it can hurt performance. See {@link Write} for - * more information. - */ - public Write<T> withSharding(PTransform<PCollection<T>, PCollectionView<Integer>> sharding) { - checkNotNull( - sharding, "Cannot provide null sharding. Use withRunnerDeterminedSharding() instead"); - return new Write<>(sink, sharding, null, windowedWrites); - } - - /** - * Returns a new {@link Write} that will write to the current {@link Sink} with - * runner-determined sharding. - */ - public Write<T> withRunnerDeterminedSharding() { - return new Write<>(sink, null, null, windowedWrites); - } - - /** - * Returns a new {@link Write} that writes preserves windowing on it's input. - * - * <p>If this option is not specified, windowing and triggering are replaced by - * {@link GlobalWindows} and {@link DefaultTrigger}. - * - * <p>If there is no data for a window, no output shards will be generated for that window. - * If a window triggers multiple times, then more than a single output shard might be - * generated multiple times; it's up to the sink implementation to keep these output shards - * unique. - * - * <p>This option can only be used if {@link #withNumShards(int)} is also set to a - * positive value. - */ - public Write<T> withWindowedWrites() { - return new Write<>(sink, computeNumShards, numShardsProvider, true); - } - - /** - * Writes all the elements in a bundle using a {@link Writer} produced by the - * {@link WriteOperation} associated with the {@link Sink}. - */ - private class WriteBundles<WriteT> extends DoFn<T, WriteT> { - // Writer that will write the records in this bundle. Lazily - // initialized in processElement. - private Writer<T, WriteT> writer = null; - private BoundedWindow window; - private final PCollectionView<WriteOperation<T, WriteT>> writeOperationView; - - WriteBundles(PCollectionView<WriteOperation<T, WriteT>> writeOperationView) { - this.writeOperationView = writeOperationView; - } - - @ProcessElement - public void processElement(ProcessContext c, BoundedWindow window) throws Exception { - // Lazily initialize the Writer - if (writer == null) { - WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView); - LOG.info("Opening writer for write operation {}", writeOperation); - writer = writeOperation.createWriter(c.getPipelineOptions()); - - if (windowedWrites) { - writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), UNKNOWN_SHARDNUM, - UNKNOWN_NUMSHARDS); - } else { - writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, UNKNOWN_NUMSHARDS); - } - this.window = window; - LOG.debug("Done opening writer {} for operation {}", writer, writeOperationView); - } - try { - writer.write(c.element()); - } catch (Exception e) { - // Discard write result and close the write. - try { - writer.close(); - // The writer does not need to be reset, as this DoFn cannot be reused. - } catch (Exception closeException) { - if (closeException instanceof InterruptedException) { - // Do not silently ignore interrupted state. - Thread.currentThread().interrupt(); - } - // Do not mask the exception that caused the write to fail. - e.addSuppressed(closeException); - } - throw e; - } - } - - @FinishBundle - public void finishBundle(FinishBundleContext c) throws Exception { - if (writer != null) { - WriteT result = writer.close(); - c.output(result, window.maxTimestamp(), window); - // Reset state in case of reuse. - writer = null; - window = null; - } - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - builder.delegate(Write.this); - } - } - - /** - * Like {@link WriteBundles}, but where the elements for each shard have been collected into - * a single iterable. - * - * @see WriteBundles - */ - private class WriteShardedBundles<WriteT> extends DoFn<KV<Integer, Iterable<T>>, WriteT> { - private final PCollectionView<WriteOperation<T, WriteT>> writeOperationView; - private final PCollectionView<Integer> numShardsView; - - WriteShardedBundles(PCollectionView<WriteOperation<T, WriteT>> writeOperationView, - PCollectionView<Integer> numShardsView) { - this.writeOperationView = writeOperationView; - this.numShardsView = numShardsView; - } - - @ProcessElement - public void processElement(ProcessContext c, BoundedWindow window) throws Exception { - int numShards = numShardsView != null ? c.sideInput(numShardsView) : getNumShards().get(); - // In a sharded write, single input element represents one shard. We can open and close - // the writer in each call to processElement. - WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView); - LOG.info("Opening writer for write operation {}", writeOperation); - Writer<T, WriteT> writer = writeOperation.createWriter(c.getPipelineOptions()); - if (windowedWrites) { - writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), c.element().getKey(), - numShards); - } else { - writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, UNKNOWN_NUMSHARDS); - } - LOG.debug("Done opening writer {} for operation {}", writer, writeOperationView); - - try { - try { - for (T t : c.element().getValue()) { - writer.write(t); - } - } catch (Exception e) { - try { - writer.close(); - } catch (Exception closeException) { - if (closeException instanceof InterruptedException) { - // Do not silently ignore interrupted state. - Thread.currentThread().interrupt(); - } - // Do not mask the exception that caused the write to fail. - e.addSuppressed(closeException); - } - throw e; - } - - // Close the writer; if this throws let the error propagate. - WriteT result = writer.close(); - c.output(result); - } catch (Exception e) { - // If anything goes wrong, make sure to delete the temporary file. - writer.cleanup(); - throw e; - } - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - builder.delegate(Write.this); - } - } - - private static class ApplyShardingKey<T> extends DoFn<T, KV<Integer, T>> { - private final PCollectionView<Integer> numShardsView; - private final ValueProvider<Integer> numShardsProvider; - private int shardNumber; - - ApplyShardingKey(PCollectionView<Integer> numShardsView, - ValueProvider<Integer> numShardsProvider) { - this.numShardsView = numShardsView; - this.numShardsProvider = numShardsProvider; - shardNumber = UNKNOWN_SHARDNUM; - } - - @ProcessElement - public void processElement(ProcessContext context) { - int shardCount = 0; - if (numShardsView != null) { - shardCount = context.sideInput(numShardsView); - } else { - checkNotNull(numShardsProvider); - shardCount = numShardsProvider.get(); - } - checkArgument( - shardCount > 0, - "Must have a positive number of shards specified for non-runner-determined sharding." - + " Got %s", - shardCount); - if (shardNumber == UNKNOWN_SHARDNUM) { - // We want to desynchronize the first record sharding key for each instance of - // ApplyShardingKey, so records in a small PCollection will be statistically balanced. - shardNumber = ThreadLocalRandom.current().nextInt(shardCount); - } else { - shardNumber = (shardNumber + 1) % shardCount; - } - context.output(KV.of(shardNumber, context.element())); - } - } - - /** - * A write is performed as sequence of three {@link ParDo}'s. - * - * <p>In the first, a do-once ParDo is applied to a singleton PCollection containing the Sink's - * {@link WriteOperation}. In this initialization ParDo, {@link WriteOperation#initialize} is - * called. The output of this ParDo is a singleton PCollection - * containing the WriteOperation. - * - * <p>This singleton collection containing the WriteOperation is then used as a side input to a - * ParDo over the PCollection of elements to write. In this bundle-writing phase, - * {@link WriteOperation#createWriter} is called to obtain a {@link Writer}. - * {@link Writer#open} and {@link Writer#close} are called in {@link DoFn#startBundle} and - * {@link DoFn#finishBundle}, respectively, and {@link Writer#write} method is called for - * every element in the bundle. The output of this ParDo is a PCollection of - * <i>writer result</i> objects (see {@link Sink} for a description of writer results)-one for - * each bundle. - * - * <p>The final do-once ParDo uses the singleton collection of the WriteOperation as input and - * the collection of writer results as a side-input. In this ParDo, - * {@link WriteOperation#finalize} is called to finalize the write. - * - * <p>If the write of any element in the PCollection fails, {@link Writer#close} will be called - * before the exception that caused the write to fail is propagated and the write result will be - * discarded. - * - * <p>Since the {@link WriteOperation} is serialized after the initialization ParDo and - * deserialized in the bundle-writing and finalization phases, any state change to the - * WriteOperation object that occurs during initialization is visible in the latter phases. - * However, the WriteOperation is not serialized after the bundle-writing phase. This is why - * implementations should guarantee that {@link WriteOperation#createWriter} does not mutate - * WriteOperation). - */ - private <WriteT> PDone createWrite( - PCollection<T> input, WriteOperation<T, WriteT> writeOperation) { - Pipeline p = input.getPipeline(); - writeOperation.setWindowedWrites(windowedWrites); - - // A coder to use for the WriteOperation. - @SuppressWarnings("unchecked") - Coder<WriteOperation<T, WriteT>> operationCoder = - (Coder<WriteOperation<T, WriteT>>) SerializableCoder.of(writeOperation.getClass()); - - // A singleton collection of the WriteOperation, to be used as input to a ParDo to initialize - // the sink. - PCollection<WriteOperation<T, WriteT>> operationCollection = - p.apply("CreateOperationCollection", Create.of(writeOperation).withCoder(operationCoder)); - - // Initialize the resource in a do-once ParDo on the WriteOperation. - operationCollection = operationCollection - .apply("Initialize", ParDo.of( - new DoFn<WriteOperation<T, WriteT>, WriteOperation<T, WriteT>>() { - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - WriteOperation<T, WriteT> writeOperation = c.element(); - LOG.info("Initializing write operation {}", writeOperation); - writeOperation.initialize(c.getPipelineOptions()); - writeOperation.setWindowedWrites(windowedWrites); - LOG.debug("Done initializing write operation {}", writeOperation); - // The WriteOperation is also the output of this ParDo, so it can have mutable - // state. - c.output(writeOperation); - } - })) - .setCoder(operationCoder); - - // Create a view of the WriteOperation to be used as a sideInput to the parallel write phase. - final PCollectionView<WriteOperation<T, WriteT>> writeOperationView = - operationCollection.apply(View.<WriteOperation<T, WriteT>>asSingleton()); - - if (!windowedWrites) { - // Re-window the data into the global window and remove any existing triggers. - input = - input.apply( - Window.<T>into(new GlobalWindows()) - .triggering(DefaultTrigger.of()) - .discardingFiredPanes()); - } - - - // Perform the per-bundle writes as a ParDo on the input PCollection (with the WriteOperation - // as a side input) and collect the results of the writes in a PCollection. - // There is a dependency between this ParDo and the first (the WriteOperation PCollection - // as a side input), so this will happen after the initial ParDo. - PCollection<WriteT> results; - final PCollectionView<Integer> numShardsView; - if (computeNumShards == null && numShardsProvider == null) { - if (windowedWrites) { - throw new IllegalStateException("When doing windowed writes, numShards must be set" - + "explicitly to a positive value"); - } - numShardsView = null; - results = input - .apply("WriteBundles", - ParDo.of(new WriteBundles<>(writeOperationView)) - .withSideInputs(writeOperationView)); - } else { - if (computeNumShards != null) { - numShardsView = input.apply(computeNumShards); - results = input - .apply("ApplyShardLabel", ParDo.of( - new ApplyShardingKey<T>(numShardsView, null)).withSideInputs(numShardsView)) - .apply("GroupIntoShards", GroupByKey.<Integer, T>create()) - .apply("WriteShardedBundles", - ParDo.of(new WriteShardedBundles<>(writeOperationView, numShardsView)) - .withSideInputs(numShardsView, writeOperationView)); - } else { - numShardsView = null; - results = input - .apply("ApplyShardLabel", ParDo.of(new ApplyShardingKey<T>(null, numShardsProvider))) - .apply("GroupIntoShards", GroupByKey.<Integer, T>create()) - .apply("WriteShardedBundles", - ParDo.of(new WriteShardedBundles<>(writeOperationView, null)) - .withSideInputs(writeOperationView)); - } - } - results.setCoder(writeOperation.getWriterResultCoder()); - - if (windowedWrites) { - // When processing streaming windowed writes, results will arrive multiple times. This - // means we can't share the below implementation that turns the results into a side input, - // as new data arriving into a side input does not trigger the listening DoFn. Instead - // we aggregate the result set using a singleton GroupByKey, so the DoFn will be triggered - // whenever new data arrives. - PCollection<KV<Void, WriteT>> keyedResults = - results.apply("AttachSingletonKey", WithKeys.<Void, WriteT>of((Void) null)); - keyedResults.setCoder(KvCoder.<Void, WriteT>of(VoidCoder.of(), writeOperation - .getWriterResultCoder())); - - // Is the continuation trigger sufficient? - keyedResults - .apply("FinalizeGroupByKey", GroupByKey.<Void, WriteT>create()) - .apply("Finalize", ParDo.of(new DoFn<KV<Void, Iterable<WriteT>>, Integer>() { - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView); - LOG.info("Finalizing write operation {}.", writeOperation); - List<WriteT> results = Lists.newArrayList(c.element().getValue()); - writeOperation.finalize(results, c.getPipelineOptions()); - LOG.debug("Done finalizing write operation {}", writeOperation); - } - }).withSideInputs(writeOperationView)); - } else { - final PCollectionView<Iterable<WriteT>> resultsView = - results.apply(View.<WriteT>asIterable()); - ImmutableList.Builder<PCollectionView<?>> sideInputs = - ImmutableList.<PCollectionView<?>>builder().add(resultsView); - if (numShardsView != null) { - sideInputs.add(numShardsView); - } - - // Finalize the write in another do-once ParDo on the singleton collection containing the - // Writer. The results from the per-bundle writes are given as an Iterable side input. - // The WriteOperation's state is the same as after its initialization in the first do-once - // ParDo. There is a dependency between this ParDo and the parallel write (the writer - // results collection as a side input), so it will happen after the parallel write. - // For the non-windowed case, we guarantee that if no data is written but the user has - // set numShards, then all shards will be written out as empty files. For this reason we - // use a side input here. - operationCollection - .apply("Finalize", ParDo.of(new DoFn<WriteOperation<T, WriteT>, Integer>() { - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - WriteOperation<T, WriteT> writeOperation = c.element(); - LOG.info("Finalizing write operation {}.", writeOperation); - List<WriteT> results = Lists.newArrayList(c.sideInput(resultsView)); - LOG.debug("Side input initialized to finalize write operation {}.", writeOperation); - - // We must always output at least 1 shard, and honor user-specified numShards if - // set. - int minShardsNeeded; - if (numShardsView != null) { - minShardsNeeded = c.sideInput(numShardsView); - } else if (numShardsProvider != null) { - minShardsNeeded = numShardsProvider.get(); - } else { - minShardsNeeded = 1; - } - int extraShardsNeeded = minShardsNeeded - results.size(); - if (extraShardsNeeded > 0) { - LOG.info( - "Creating {} empty output shards in addition to {} written for a total of " - + " {}.", extraShardsNeeded, results.size(), minShardsNeeded); - for (int i = 0; i < extraShardsNeeded; ++i) { - Writer<T, WriteT> writer = writeOperation.createWriter(c.getPipelineOptions()); - writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, - UNKNOWN_NUMSHARDS); - WriteT emptyWrite = writer.close(); - results.add(emptyWrite); - } - LOG.debug("Done creating extra shards."); - } - writeOperation.finalize(results, c.getPipelineOptions()); - LOG.debug("Done finalizing write operation {}", writeOperation); - } - }).withSideInputs(sideInputs.build())); - } - return PDone.in(input.getPipeline()); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/7512a73c/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/package-info.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/package-info.java index 763b30a..32c36cc 100644 --- a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/package-info.java +++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/package-info.java @@ -17,6 +17,7 @@ */ /** - * Transforms used to read from the Hadoop file system (HDFS). + * {@link org.apache.beam.sdk.io.FileSystem} implementation for any Hadoop + * {@link org.apache.hadoop.fs.FileSystem}. */ package org.apache.beam.sdk.io.hdfs;