http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java deleted file mode 100644 index aee73c4..0000000 --- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java +++ /dev/null @@ -1,478 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.hdfs; - -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; - -import com.google.auto.value.AutoValue; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import java.io.IOException; -import java.net.URI; -import java.security.PrivilegedExceptionAction; -import java.util.Random; -import java.util.Set; -import javax.annotation.Nullable; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.mapred.AvroKey; -import org.apache.avro.mapreduce.AvroKeyOutputFormat; -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.coders.AvroCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.io.hadoop.SerializableConfiguration; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.values.KV; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.JobID; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.mapreduce.TaskID; -import org.apache.hadoop.mapreduce.TaskType; -import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; -import org.apache.hadoop.mapreduce.task.JobContextImpl; -import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; - -/** - * A {@link Sink} for writing records to a Hadoop filesystem using a Hadoop file-based - * output - * format. - * - * <p>To write a {@link org.apache.beam.sdk.values.PCollection} of elements of type T to Hadoop - * filesystem use {@link HDFSFileSink#to}, specify the path (this can be any Hadoop supported - * filesystem: HDFS, S3, GCS etc), the Hadoop {@link FileOutputFormat}, the key class K and the - * value class V and finally the {@link SerializableFunction} to map from T to {@link KV} of K - * and V. - * - * <p>{@code HDFSFileSink} can be used by {@link Write} to create write - * transform. See example below. - * - * <p>{@code HDFSFileSink} comes with helper methods to write text and Apache Avro. For example: - * - * <pre> - * {@code - * HDFSFileSink<CustomSpecificAvroClass, AvroKey<CustomSpecificAvroClass>, NullWritable> sink = - * HDFSFileSink.toAvro(path, AvroCoder.of(CustomSpecificAvroClass.class)); - * avroRecordsPCollection.apply(Write.to(sink)); - * } - * </pre> - * - * @param <T> the type of elements of the input {@link org.apache.beam.sdk.values.PCollection}. - * @param <K> the type of keys to be written to the sink via {@link FileOutputFormat}. - * @param <V> the type of values to be written to the sink via {@link FileOutputFormat}. - */ -@AutoValue -@Experimental -public abstract class HDFSFileSink<T, K, V> extends Sink<T> { - - private static final JobID jobId = new JobID( - Long.toString(System.currentTimeMillis()), - new Random().nextInt(Integer.MAX_VALUE)); - - public abstract String path(); - public abstract Class<? extends FileOutputFormat<K, V>> formatClass(); - public abstract Class<K> keyClass(); - public abstract Class<V> valueClass(); - public abstract SerializableFunction<T, KV<K, V>> outputConverter(); - public abstract SerializableConfiguration serializableConfiguration(); - public @Nullable abstract String username(); - public abstract boolean validate(); - - // ======================================================================= - // Factory methods - // ======================================================================= - - public static <T, K, V, W extends FileOutputFormat<K, V>> HDFSFileSink<T, K, V> - to(String path, - Class<W> formatClass, - Class<K> keyClass, - Class<V> valueClass, - SerializableFunction<T, KV<K, V>> outputConverter) { - return HDFSFileSink.<T, K, V>builder() - .setPath(path) - .setFormatClass(formatClass) - .setKeyClass(keyClass) - .setValueClass(valueClass) - .setOutputConverter(outputConverter) - .setConfiguration(null) - .setUsername(null) - .setValidate(true) - .build(); - } - - public static <T> HDFSFileSink<T, NullWritable, Text> toText(String path) { - SerializableFunction<T, KV<NullWritable, Text>> outputConverter = - new SerializableFunction<T, KV<NullWritable, Text>>() { - @Override - public KV<NullWritable, Text> apply(T input) { - return KV.of(NullWritable.get(), new Text(input.toString())); - } - }; - return to(path, TextOutputFormat.class, NullWritable.class, Text.class, outputConverter); - } - - /** - * Helper to create Avro sink given {@link AvroCoder}. Keep in mind that configuration - * object is altered to enable Avro output. - */ - public static <T> HDFSFileSink<T, AvroKey<T>, NullWritable> toAvro(String path, - final AvroCoder<T> coder, - Configuration conf) { - SerializableFunction<T, KV<AvroKey<T>, NullWritable>> outputConverter = - new SerializableFunction<T, KV<AvroKey<T>, NullWritable>>() { - @Override - public KV<AvroKey<T>, NullWritable> apply(T input) { - return KV.of(new AvroKey<>(input), NullWritable.get()); - } - }; - conf.set("avro.schema.output.key", coder.getSchema().toString()); - return to( - path, - AvroKeyOutputFormat.class, - (Class<AvroKey<T>>) (Class<?>) AvroKey.class, - NullWritable.class, - outputConverter).withConfiguration(conf); - } - - /** - * Helper to create Avro sink given {@link Schema}. Keep in mind that configuration - * object is altered to enable Avro output. - */ - public static HDFSFileSink<GenericRecord, AvroKey<GenericRecord>, NullWritable> - toAvro(String path, Schema schema, Configuration conf) { - return toAvro(path, AvroCoder.of(schema), conf); - } - - /** - * Helper to create Avro sink given {@link Class}. Keep in mind that configuration - * object is altered to enable Avro output. - */ - public static <T> HDFSFileSink<T, AvroKey<T>, NullWritable> toAvro(String path, - Class<T> cls, - Configuration conf) { - return toAvro(path, AvroCoder.of(cls), conf); - } - - // ======================================================================= - // Builder methods - // ======================================================================= - - public abstract Builder<T, K, V> toBuilder(); - public static <T, K, V> Builder builder() { - return new AutoValue_HDFSFileSink.Builder<>(); - } - - /** - * AutoValue builder for {@link HDFSFileSink}. - */ - @AutoValue.Builder - public abstract static class Builder<T, K, V> { - public abstract Builder<T, K, V> setPath(String path); - public abstract Builder<T, K, V> setFormatClass( - Class<? extends FileOutputFormat<K, V>> formatClass); - public abstract Builder<T, K, V> setKeyClass(Class<K> keyClass); - public abstract Builder<T, K, V> setValueClass(Class<V> valueClass); - public abstract Builder<T, K, V> setOutputConverter( - SerializableFunction<T, KV<K, V>> outputConverter); - public abstract Builder<T, K, V> setSerializableConfiguration( - SerializableConfiguration serializableConfiguration); - public Builder<T, K, V> setConfiguration(@Nullable Configuration configuration) { - if (configuration == null) { - configuration = new Configuration(false); - } - return this.setSerializableConfiguration(new SerializableConfiguration(configuration)); - } - public abstract Builder<T, K, V> setUsername(String username); - public abstract Builder<T, K, V> setValidate(boolean validate); - public abstract HDFSFileSink<T, K, V> build(); - } - - public HDFSFileSink<T, K, V> withConfiguration(@Nullable Configuration configuration) { - return this.toBuilder().setConfiguration(configuration).build(); - } - - public HDFSFileSink<T, K, V> withUsername(@Nullable String username) { - return this.toBuilder().setUsername(username).build(); - } - - // ======================================================================= - // Sink - // ======================================================================= - - @Override - public void validate(PipelineOptions options) { - if (validate()) { - try { - UGIHelper.getBestUGI(username()).doAs(new PrivilegedExceptionAction<Void>() { - @Override - public Void run() throws Exception { - FileSystem fs = FileSystem.get(new URI(path()), - SerializableConfiguration.newConfiguration(serializableConfiguration())); - checkState(!fs.exists(new Path(path())), "Output path %s already exists", path()); - return null; - } - }); - } catch (IOException | InterruptedException e) { - throw new RuntimeException(e); - } - } - } - - @Override - public Sink.WriteOperation<T, String> createWriteOperation() { - return new HDFSWriteOperation<>(this, path(), formatClass()); - } - - private Job newJob() throws IOException { - Job job = SerializableConfiguration.newJob(serializableConfiguration()); - job.setJobID(jobId); - job.setOutputKeyClass(keyClass()); - job.setOutputValueClass(valueClass()); - return job; - } - - // ======================================================================= - // WriteOperation - // ======================================================================= - - /** {{@link WriteOperation}} for HDFS. */ - private static class HDFSWriteOperation<T, K, V> extends WriteOperation<T, String> { - - private final HDFSFileSink<T, K, V> sink; - private final String path; - private final Class<? extends FileOutputFormat<K, V>> formatClass; - - HDFSWriteOperation(HDFSFileSink<T, K, V> sink, - String path, - Class<? extends FileOutputFormat<K, V>> formatClass) { - this.sink = sink; - this.path = path; - this.formatClass = formatClass; - } - - @Override - public void initialize(PipelineOptions options) throws Exception { - Job job = sink.newJob(); - FileOutputFormat.setOutputPath(job, new Path(path)); - } - - @Override - public void setWindowedWrites(boolean windowedWrites) { - } - - @Override - public void finalize(final Iterable<String> writerResults, PipelineOptions options) - throws Exception { - UGIHelper.getBestUGI(sink.username()).doAs(new PrivilegedExceptionAction<Void>() { - @Override - public Void run() throws Exception { - doFinalize(writerResults); - return null; - } - }); - } - - private void doFinalize(Iterable<String> writerResults) throws Exception { - Job job = sink.newJob(); - FileSystem fs = FileSystem.get(new URI(path), job.getConfiguration()); - // If there are 0 output shards, just create output folder. - if (!writerResults.iterator().hasNext()) { - fs.mkdirs(new Path(path)); - return; - } - - // job successful - JobContext context = new JobContextImpl(job.getConfiguration(), job.getJobID()); - FileOutputCommitter outputCommitter = new FileOutputCommitter(new Path(path), context); - outputCommitter.commitJob(context); - - // get actual output shards - Set<String> actual = Sets.newHashSet(); - FileStatus[] statuses = fs.listStatus(new Path(path), new PathFilter() { - @Override - public boolean accept(Path path) { - String name = path.getName(); - return !name.startsWith("_") && !name.startsWith("."); - } - }); - - // get expected output shards - Set<String> expected = Sets.newHashSet(writerResults); - checkState( - expected.size() == Lists.newArrayList(writerResults).size(), - "Data loss due to writer results hash collision"); - for (FileStatus s : statuses) { - String name = s.getPath().getName(); - int pos = name.indexOf('.'); - actual.add(pos > 0 ? name.substring(0, pos) : name); - } - - checkState(actual.equals(expected), "Writer results and output files do not match"); - - // rename output shards to Hadoop style, i.e. part-r-00000.txt - int i = 0; - for (FileStatus s : statuses) { - String name = s.getPath().getName(); - int pos = name.indexOf('.'); - String ext = pos > 0 ? name.substring(pos) : ""; - fs.rename( - s.getPath(), - new Path(s.getPath().getParent(), String.format("part-r-%05d%s", i, ext))); - i++; - } - } - - @Override - public Writer<T, String> createWriter(PipelineOptions options) throws Exception { - return new HDFSWriter<>(this, path, formatClass); - } - - @Override - public Sink<T> getSink() { - return sink; - } - - @Override - public Coder<String> getWriterResultCoder() { - return StringUtf8Coder.of(); - } - - } - - // ======================================================================= - // Writer - // ======================================================================= - - private static class HDFSWriter<T, K, V> extends Writer<T, String> { - - private final HDFSWriteOperation<T, K, V> writeOperation; - private final String path; - private final Class<? extends FileOutputFormat<K, V>> formatClass; - - // unique hash for each task - private int hash; - - private TaskAttemptContext context; - private RecordWriter<K, V> recordWriter; - private FileOutputCommitter outputCommitter; - - HDFSWriter(HDFSWriteOperation<T, K, V> writeOperation, - String path, - Class<? extends FileOutputFormat<K, V>> formatClass) { - this.writeOperation = writeOperation; - this.path = path; - this.formatClass = formatClass; - } - - @Override - public void openWindowed(final String uId, - BoundedWindow window, - PaneInfo paneInfo, - int shard, - int numShards) throws Exception { - throw new UnsupportedOperationException("Windowing support not implemented yet for" - + "HDFS. Window " + window); - } - - @Override - public void openUnwindowed(final String uId, int shard, int numShards) throws Exception { - UGIHelper.getBestUGI(writeOperation.sink.username()).doAs( - new PrivilegedExceptionAction<Void>() { - @Override - public Void run() throws Exception { - doOpen(uId); - return null; - } - } - ); - } - - private void doOpen(String uId) throws Exception { - this.hash = uId.hashCode(); - - Job job = writeOperation.sink.newJob(); - FileOutputFormat.setOutputPath(job, new Path(path)); - - // Each Writer is responsible for writing one bundle of elements and is represented by one - // unique Hadoop task based on uId/hash. All tasks share the same job ID. - JobID jobId = job.getJobID(); - TaskID taskId = new TaskID(jobId, TaskType.REDUCE, hash); - context = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID(taskId, 0)); - - FileOutputFormat<K, V> outputFormat = formatClass.newInstance(); - recordWriter = outputFormat.getRecordWriter(context); - outputCommitter = (FileOutputCommitter) outputFormat.getOutputCommitter(context); - } - - @Override - public void write(T value) throws Exception { - checkNotNull(recordWriter, - "Record writer can't be null. Make sure to open Writer first!"); - KV<K, V> kv = writeOperation.sink.outputConverter().apply(value); - recordWriter.write(kv.getKey(), kv.getValue()); - } - - @Override - public void cleanup() throws Exception { - - } - - @Override - public String close() throws Exception { - return UGIHelper.getBestUGI(writeOperation.sink.username()).doAs( - new PrivilegedExceptionAction<String>() { - @Override - public String run() throws Exception { - return doClose(); - } - }); - } - - private String doClose() throws Exception { - // task/attempt successful - recordWriter.close(context); - outputCommitter.commitTask(context); - - // result is prefix of the output file name - return String.format("part-r-%d", hash); - } - - @Override - public WriteOperation<T, String> getWriteOperation() { - return writeOperation; - } - - } - -}
http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java deleted file mode 100644 index 5cc2097..0000000 --- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java +++ /dev/null @@ -1,625 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.hdfs; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkState; - -import com.google.auto.value.AutoValue; -import com.google.common.base.Function; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.security.PrivilegedExceptionAction; -import java.util.List; -import java.util.ListIterator; -import java.util.NoSuchElementException; -import javax.annotation.Nullable; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.mapred.AvroKey; -import org.apache.avro.mapreduce.AvroKeyInputFormat; -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.coders.AvroCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.VoidCoder; -import org.apache.beam.sdk.io.BoundedSource; -import org.apache.beam.sdk.io.hadoop.SerializableConfiguration; -import org.apache.beam.sdk.io.hadoop.WritableCoder; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.util.CoderUtils; -import org.apache.beam.sdk.values.KV; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableUtils; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; -import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A {@code BoundedSource} for reading files resident in a Hadoop filesystem using a - * Hadoop file-based input format. - * - * <p>To read a {@link org.apache.beam.sdk.values.PCollection} of - * {@link org.apache.beam.sdk.values.KV} key-value pairs from one or more - * Hadoop files, use {@link HDFSFileSource#from} to specify the path(s) of the files to - * read, the Hadoop {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}, the - * key class and the value class. - * - * <p>A {@code HDFSFileSource} can be read from using the - * {@link org.apache.beam.sdk.io.Read} transform. For example: - * - * <pre> - * {@code - * HDFSFileSource<K, V> source = HDFSFileSource.from(path, MyInputFormat.class, - * MyKey.class, MyValue.class); - * PCollection<KV<MyKey, MyValue>> records = pipeline.apply(Read.from(mySource)); - * } - * </pre> - * - * <p>Implementation note: Since Hadoop's - * {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat} - * determines the input splits, this class extends {@link BoundedSource} rather than - * {@link org.apache.beam.sdk.io.OffsetBasedSource}, since the latter - * dictates input splits. - * @param <T> the type of elements of the result {@link org.apache.beam.sdk.values.PCollection}. - * @param <K> the type of keys to be read from the source via {@link FileInputFormat}. - * @param <V> the type of values to be read from the source via {@link FileInputFormat}. - */ -@AutoValue -@Experimental -public abstract class HDFSFileSource<T, K, V> extends BoundedSource<T> { - private static final long serialVersionUID = 0L; - - private static final Logger LOG = LoggerFactory.getLogger(HDFSFileSource.class); - - public abstract String filepattern(); - public abstract Class<? extends FileInputFormat<K, V>> formatClass(); - public abstract Coder<T> coder(); - public abstract SerializableFunction<KV<K, V>, T> inputConverter(); - public abstract SerializableConfiguration serializableConfiguration(); - public @Nullable abstract SerializableSplit serializableSplit(); - public @Nullable abstract String username(); - public abstract boolean validateSource(); - - // ======================================================================= - // Factory methods - // ======================================================================= - - public static <T, K, V, W extends FileInputFormat<K, V>> HDFSFileSource<T, K, V> - from(String filepattern, - Class<W> formatClass, - Coder<T> coder, - SerializableFunction<KV<K, V>, T> inputConverter) { - return HDFSFileSource.<T, K, V>builder() - .setFilepattern(filepattern) - .setFormatClass(formatClass) - .setCoder(coder) - .setInputConverter(inputConverter) - .setConfiguration(null) - .setUsername(null) - .setValidateSource(true) - .setSerializableSplit(null) - .build(); - } - - public static <K, V, W extends FileInputFormat<K, V>> HDFSFileSource<KV<K, V>, K, V> - from(String filepattern, - Class<W> formatClass, - Class<K> keyClass, - Class<V> valueClass) { - KvCoder<K, V> coder = KvCoder.of(getDefaultCoder(keyClass), getDefaultCoder(valueClass)); - SerializableFunction<KV<K, V>, KV<K, V>> inputConverter = - new SerializableFunction<KV<K, V>, KV<K, V>>() { - @Override - public KV<K, V> apply(KV<K, V> input) { - return input; - } - }; - return HDFSFileSource.<KV<K, V>, K, V>builder() - .setFilepattern(filepattern) - .setFormatClass(formatClass) - .setCoder(coder) - .setInputConverter(inputConverter) - .setConfiguration(null) - .setUsername(null) - .setValidateSource(true) - .setSerializableSplit(null) - .build(); - } - - public static HDFSFileSource<String, LongWritable, Text> - fromText(String filepattern) { - SerializableFunction<KV<LongWritable, Text>, String> inputConverter = - new SerializableFunction<KV<LongWritable, Text>, String>() { - @Override - public String apply(KV<LongWritable, Text> input) { - return input.getValue().toString(); - } - }; - return from(filepattern, TextInputFormat.class, StringUtf8Coder.of(), inputConverter); - } - - /** - * Helper to read from Avro source given {@link AvroCoder}. Keep in mind that configuration - * object is altered to enable Avro input. - */ - public static <T> HDFSFileSource<T, AvroKey<T>, NullWritable> - fromAvro(String filepattern, final AvroCoder<T> coder, Configuration conf) { - Class<AvroKeyInputFormat<T>> formatClass = castClass(AvroKeyInputFormat.class); - SerializableFunction<KV<AvroKey<T>, NullWritable>, T> inputConverter = - new SerializableFunction<KV<AvroKey<T>, NullWritable>, T>() { - @Override - public T apply(KV<AvroKey<T>, NullWritable> input) { - try { - return CoderUtils.clone(coder, input.getKey().datum()); - } catch (CoderException e) { - throw new RuntimeException(e); - } - } - }; - conf.set("avro.schema.input.key", coder.getSchema().toString()); - return from(filepattern, formatClass, coder, inputConverter).withConfiguration(conf); - } - - /** - * Helper to read from Avro source given {@link Schema}. Keep in mind that configuration - * object is altered to enable Avro input. - */ - public static HDFSFileSource<GenericRecord, AvroKey<GenericRecord>, NullWritable> - fromAvro(String filepattern, Schema schema, Configuration conf) { - return fromAvro(filepattern, AvroCoder.of(schema), conf); - } - - /** - * Helper to read from Avro source given {@link Class}. Keep in mind that configuration - * object is altered to enable Avro input. - */ - public static <T> HDFSFileSource<T, AvroKey<T>, NullWritable> - fromAvro(String filepattern, Class<T> cls, Configuration conf) { - return fromAvro(filepattern, AvroCoder.of(cls), conf); - } - - // ======================================================================= - // Builder methods - // ======================================================================= - - public abstract HDFSFileSource.Builder<T, K, V> toBuilder(); - public static <T, K, V> HDFSFileSource.Builder builder() { - return new AutoValue_HDFSFileSource.Builder<>(); - } - - /** - * AutoValue builder for {@link HDFSFileSource}. - */ - @AutoValue.Builder - public abstract static class Builder<T, K, V> { - public abstract Builder<T, K, V> setFilepattern(String filepattern); - public abstract Builder<T, K, V> setFormatClass( - Class<? extends FileInputFormat<K, V>> formatClass); - public abstract Builder<T, K, V> setCoder(Coder<T> coder); - public abstract Builder<T, K, V> setInputConverter( - SerializableFunction<KV<K, V>, T> inputConverter); - public abstract Builder<T, K, V> setSerializableConfiguration( - SerializableConfiguration serializableConfiguration); - public Builder<T, K, V> setConfiguration(Configuration configuration) { - if (configuration == null) { - configuration = new Configuration(false); - } - return this.setSerializableConfiguration(new SerializableConfiguration(configuration)); - } - public abstract Builder<T, K, V> setSerializableSplit(SerializableSplit serializableSplit); - public abstract Builder<T, K, V> setUsername(@Nullable String username); - public abstract Builder<T, K, V> setValidateSource(boolean validate); - public abstract HDFSFileSource<T, K, V> build(); - } - - public HDFSFileSource<T, K, V> withConfiguration(@Nullable Configuration configuration) { - return this.toBuilder().setConfiguration(configuration).build(); - } - - public HDFSFileSource<T, K, V> withUsername(@Nullable String username) { - return this.toBuilder().setUsername(username).build(); - } - - // ======================================================================= - // BoundedSource - // ======================================================================= - - @Override - public List<? extends BoundedSource<T>> split( - final long desiredBundleSizeBytes, - PipelineOptions options) throws Exception { - if (serializableSplit() == null) { - List<InputSplit> inputSplits = UGIHelper.getBestUGI(username()).doAs( - new PrivilegedExceptionAction<List<InputSplit>>() { - @Override - public List<InputSplit> run() throws Exception { - return computeSplits(desiredBundleSizeBytes, serializableConfiguration()); - } - }); - return Lists.transform(inputSplits, - new Function<InputSplit, BoundedSource<T>>() { - @Override - public BoundedSource<T> apply(@Nullable InputSplit inputSplit) { - SerializableSplit serializableSplit = new SerializableSplit(inputSplit); - return HDFSFileSource.this.toBuilder() - .setSerializableSplit(serializableSplit) - .build(); - } - }); - } else { - return ImmutableList.of(this); - } - } - - @Override - public long getEstimatedSizeBytes(PipelineOptions options) { - long size = 0; - - try { - // If this source represents a split from split, - // then return the size of the split, rather then the entire input - if (serializableSplit() != null) { - return serializableSplit().getSplit().getLength(); - } - - size += UGIHelper.getBestUGI(username()).doAs(new PrivilegedExceptionAction<Long>() { - @Override - public Long run() throws Exception { - long size = 0; - Job job = SerializableConfiguration.newJob(serializableConfiguration()); - for (FileStatus st : listStatus(createFormat(job), job)) { - size += st.getLen(); - } - return size; - } - }); - } catch (IOException e) { - LOG.warn( - "Will estimate size of input to be 0 bytes. Can't estimate size of the input due to:", e); - // ignore, and return 0 - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOG.warn( - "Will estimate size of input to be 0 bytes. Can't estimate size of the input due to:", e); - // ignore, and return 0 - } - return size; - } - - @Override - public BoundedReader<T> createReader(PipelineOptions options) throws IOException { - this.validate(); - return new HDFSFileReader<>(this, filepattern(), formatClass(), serializableSplit()); - } - - @Override - public void validate() { - if (validateSource()) { - try { - UGIHelper.getBestUGI(username()).doAs(new PrivilegedExceptionAction<Void>() { - @Override - public Void run() throws Exception { - final Path pathPattern = new Path(filepattern()); - FileSystem fs = FileSystem.get(pathPattern.toUri(), - SerializableConfiguration.newConfiguration(serializableConfiguration())); - FileStatus[] fileStatuses = fs.globStatus(pathPattern); - checkState( - fileStatuses != null && fileStatuses.length > 0, - "Unable to find any files matching %s", filepattern()); - return null; - } - }); - } catch (IOException | InterruptedException e) { - throw new RuntimeException(e); - } - } - } - - @Override - public Coder<T> getDefaultOutputCoder() { - return coder(); - } - - // ======================================================================= - // Helpers - // ======================================================================= - - private List<InputSplit> computeSplits(long desiredBundleSizeBytes, - SerializableConfiguration serializableConfiguration) - throws IOException, IllegalAccessException, InstantiationException { - Job job = SerializableConfiguration.newJob(serializableConfiguration); - FileInputFormat.setMinInputSplitSize(job, desiredBundleSizeBytes); - FileInputFormat.setMaxInputSplitSize(job, desiredBundleSizeBytes); - return createFormat(job).getSplits(job); - } - - private FileInputFormat<K, V> createFormat(Job job) - throws IOException, IllegalAccessException, InstantiationException { - Path path = new Path(filepattern()); - FileInputFormat.addInputPath(job, path); - return formatClass().newInstance(); - } - - private List<FileStatus> listStatus(FileInputFormat<K, V> format, Job job) - throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { - // FileInputFormat#listStatus is protected, so call using reflection - Method listStatus = FileInputFormat.class.getDeclaredMethod("listStatus", JobContext.class); - listStatus.setAccessible(true); - @SuppressWarnings("unchecked") - List<FileStatus> stat = (List<FileStatus>) listStatus.invoke(format, job); - return stat; - } - - @SuppressWarnings("unchecked") - private static <T> Coder<T> getDefaultCoder(Class<T> c) { - if (Writable.class.isAssignableFrom(c)) { - Class<? extends Writable> writableClass = (Class<? extends Writable>) c; - return (Coder<T>) WritableCoder.of(writableClass); - } else if (Void.class.equals(c)) { - return (Coder<T>) VoidCoder.of(); - } - // TODO: how to use registered coders here? - throw new IllegalStateException("Cannot find coder for " + c); - } - - @SuppressWarnings("unchecked") - private static <T> Class<T> castClass(Class<?> aClass) { - return (Class<T>) aClass; - } - - // ======================================================================= - // BoundedReader - // ======================================================================= - - private static class HDFSFileReader<T, K, V> extends BoundedSource.BoundedReader<T> { - - private final HDFSFileSource<T, K, V> source; - private final String filepattern; - private final Class<? extends FileInputFormat<K, V>> formatClass; - private final Job job; - - private List<InputSplit> splits; - private ListIterator<InputSplit> splitsIterator; - - private Configuration conf; - private FileInputFormat<?, ?> format; - private TaskAttemptContext attemptContext; - private RecordReader<K, V> currentReader; - private KV<K, V> currentPair; - - HDFSFileReader( - HDFSFileSource<T, K, V> source, - String filepattern, - Class<? extends FileInputFormat<K, V>> formatClass, - SerializableSplit serializableSplit) - throws IOException { - this.source = source; - this.filepattern = filepattern; - this.formatClass = formatClass; - this.job = SerializableConfiguration.newJob(source.serializableConfiguration()); - - if (serializableSplit != null) { - this.splits = ImmutableList.of(serializableSplit.getSplit()); - this.splitsIterator = splits.listIterator(); - } - } - - @Override - public boolean start() throws IOException { - Path path = new Path(filepattern); - FileInputFormat.addInputPath(job, path); - - conf = job.getConfiguration(); - try { - format = formatClass.newInstance(); - } catch (InstantiationException | IllegalAccessException e) { - throw new IOException("Cannot instantiate file input format " + formatClass, e); - } - attemptContext = new TaskAttemptContextImpl(conf, new TaskAttemptID()); - - if (splitsIterator == null) { - splits = format.getSplits(job); - splitsIterator = splits.listIterator(); - } - - return advance(); - } - - @Override - public boolean advance() throws IOException { - try { - if (currentReader != null && currentReader.nextKeyValue()) { - currentPair = nextPair(); - return true; - } else { - while (splitsIterator.hasNext()) { - // advance the reader and see if it has records - final InputSplit nextSplit = splitsIterator.next(); - @SuppressWarnings("unchecked") - RecordReader<K, V> reader = - (RecordReader<K, V>) format.createRecordReader(nextSplit, attemptContext); - if (currentReader != null) { - currentReader.close(); - } - currentReader = reader; - UGIHelper.getBestUGI(source.username()).doAs(new PrivilegedExceptionAction<Void>() { - @Override - public Void run() throws Exception { - currentReader.initialize(nextSplit, attemptContext); - return null; - } - }); - if (currentReader.nextKeyValue()) { - currentPair = nextPair(); - return true; - } - currentReader.close(); - currentReader = null; - } - // either no next split or all readers were empty - currentPair = null; - return false; - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IOException(e); - } - } - - @Override - public T getCurrent() throws NoSuchElementException { - if (currentPair == null) { - throw new NoSuchElementException(); - } - return source.inputConverter().apply(currentPair); - } - - @Override - public void close() throws IOException { - if (currentReader != null) { - currentReader.close(); - currentReader = null; - } - currentPair = null; - } - - @Override - public BoundedSource<T> getCurrentSource() { - return source; - } - - @SuppressWarnings("unchecked") - private KV<K, V> nextPair() throws IOException, InterruptedException { - K key = currentReader.getCurrentKey(); - V value = currentReader.getCurrentValue(); - // clone Writable objects since they are reused between calls to RecordReader#nextKeyValue - if (key instanceof Writable) { - key = (K) WritableUtils.clone((Writable) key, conf); - } - if (value instanceof Writable) { - value = (V) WritableUtils.clone((Writable) value, conf); - } - return KV.of(key, value); - } - - // ======================================================================= - // Optional overrides - // ======================================================================= - - @Override - public Double getFractionConsumed() { - if (currentReader == null) { - return 0.0; - } - if (splits.isEmpty()) { - return 1.0; - } - int index = splitsIterator.previousIndex(); - int numReaders = splits.size(); - if (index == numReaders) { - return 1.0; - } - double before = 1.0 * index / numReaders; - double after = 1.0 * (index + 1) / numReaders; - Double fractionOfCurrentReader = getProgress(); - if (fractionOfCurrentReader == null) { - return before; - } - return before + fractionOfCurrentReader * (after - before); - } - - private Double getProgress() { - try { - return (double) currentReader.getProgress(); - } catch (IOException | InterruptedException e) { - return null; - } - } - - } - - // ======================================================================= - // SerializableSplit - // ======================================================================= - - /** - * A wrapper to allow Hadoop {@link org.apache.hadoop.mapreduce.InputSplit}s to be - * serialized using Java's standard serialization mechanisms. Note that the InputSplit - * has to be Writable (which most are). - */ - protected static class SerializableSplit implements Externalizable { - private static final long serialVersionUID = 0L; - - private InputSplit split; - - public SerializableSplit() { - } - - public SerializableSplit(InputSplit split) { - checkArgument(split instanceof Writable, "Split is not writable: %s", split); - this.split = split; - } - - public InputSplit getSplit() { - return split; - } - - @Override - public void writeExternal(ObjectOutput out) throws IOException { - out.writeUTF(split.getClass().getCanonicalName()); - ((Writable) split).write(out); - } - - @Override - public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - String className = in.readUTF(); - try { - split = (InputSplit) Class.forName(className).newInstance(); - ((Writable) split).readFields(in); - } catch (InstantiationException | IllegalAccessException e) { - throw new IOException(e); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java deleted file mode 100644 index 154a818..0000000 --- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java +++ /dev/null @@ -1,240 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.hdfs; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableList; -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.nio.ByteBuffer; -import java.nio.channels.Channels; -import java.nio.channels.ReadableByteChannel; -import java.nio.channels.SeekableByteChannel; -import java.nio.channels.WritableByteChannel; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import org.apache.beam.sdk.io.FileSystem; -import org.apache.beam.sdk.io.fs.CreateOptions; -import org.apache.beam.sdk.io.fs.MatchResult; -import org.apache.beam.sdk.io.fs.MatchResult.Metadata; -import org.apache.beam.sdk.io.fs.MatchResult.Status; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSInputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.fs.Path; - -/** - * Adapts {@link org.apache.hadoop.fs.FileSystem} connectors to be used as - * Apache Beam {@link FileSystem FileSystems}. - * - * <p>The following HDFS FileSystem(s) are known to be unsupported: - * <ul> - * <li>FTPFileSystem: Missing seek support within FTPInputStream</li> - * </ul> - * - * <p>This implementation assumes that the underlying Hadoop {@link FileSystem} is seek - * efficient when reading. The source code for the following {@link FSInputStream} implementations - * (as of Hadoop 2.7.1) do provide seek implementations: - * <ul> - * <li>HarFsInputStream</li> - * <li>S3InputStream</li> - * <li>DFSInputStream</li> - * <li>SwiftNativeInputStream</li> - * <li>NativeS3FsInputStream</li> - * <li>LocalFSFileInputStream</li> - * <li>NativeAzureFsInputStream</li> - * <li>S3AInputStream</li> - * </ul> - */ -class HadoopFileSystem extends FileSystem<HadoopResourceId> { - @VisibleForTesting - final org.apache.hadoop.fs.FileSystem fileSystem; - - HadoopFileSystem(Configuration configuration) throws IOException { - this.fileSystem = org.apache.hadoop.fs.FileSystem.newInstance(configuration); - } - - @Override - protected List<MatchResult> match(List<String> specs) { - ImmutableList.Builder<MatchResult> resultsBuilder = ImmutableList.builder(); - for (String spec : specs) { - try { - FileStatus[] fileStatuses = fileSystem.globStatus(new Path(spec)); - List<Metadata> metadata = new ArrayList<>(); - for (FileStatus fileStatus : fileStatuses) { - if (fileStatus.isFile()) { - metadata.add(Metadata.builder() - .setResourceId(new HadoopResourceId(fileStatus.getPath().toUri())) - .setIsReadSeekEfficient(true) - .setSizeBytes(fileStatus.getLen()) - .build()); - } - } - resultsBuilder.add(MatchResult.create(Status.OK, metadata)); - } catch (IOException e) { - resultsBuilder.add(MatchResult.create(Status.ERROR, e)); - } - } - return resultsBuilder.build(); - } - - @Override - protected WritableByteChannel create(HadoopResourceId resourceId, CreateOptions createOptions) - throws IOException { - return Channels.newChannel(fileSystem.create(resourceId.toPath())); - } - - @Override - protected ReadableByteChannel open(HadoopResourceId resourceId) throws IOException { - FileStatus fileStatus = fileSystem.getFileStatus(resourceId.toPath()); - return new HadoopSeekableByteChannel(fileStatus, fileSystem.open(resourceId.toPath())); - } - - @Override - protected void copy( - List<HadoopResourceId> srcResourceIds, - List<HadoopResourceId> destResourceIds) throws IOException { - for (int i = 0; i < srcResourceIds.size(); ++i) { - // Unfortunately HDFS FileSystems don't support a native copy operation so we are forced - // to use the inefficient implementation found in FileUtil which copies all the bytes through - // the local machine. - // - // HDFS FileSystem does define a concat method but could only find the DFSFileSystem - // implementing it. The DFSFileSystem implemented concat by deleting the srcs after which - // is not what we want. Also, all the other FileSystem implementations I saw threw - // UnsupportedOperationException within concat. - FileUtil.copy( - fileSystem, srcResourceIds.get(i).toPath(), - fileSystem, destResourceIds.get(i).toPath(), - false, - true, - fileSystem.getConf()); - } - } - - @Override - protected void rename( - List<HadoopResourceId> srcResourceIds, - List<HadoopResourceId> destResourceIds) throws IOException { - for (int i = 0; i < srcResourceIds.size(); ++i) { - fileSystem.rename( - srcResourceIds.get(i).toPath(), - destResourceIds.get(i).toPath()); - } - } - - @Override - protected void delete(Collection<HadoopResourceId> resourceIds) throws IOException { - for (HadoopResourceId resourceId : resourceIds) { - fileSystem.delete(resourceId.toPath(), false); - } - } - - @Override - protected HadoopResourceId matchNewResource(String singleResourceSpec, boolean isDirectory) { - try { - if (singleResourceSpec.endsWith("/") && !isDirectory) { - throw new IllegalArgumentException(String.format( - "Expected file path but received directory path %s", singleResourceSpec)); - } - return !singleResourceSpec.endsWith("/") && isDirectory - ? new HadoopResourceId(new URI(singleResourceSpec + "/")) - : new HadoopResourceId(new URI(singleResourceSpec)); - } catch (URISyntaxException e) { - throw new IllegalArgumentException( - String.format("Invalid spec %s directory %s", singleResourceSpec, isDirectory), - e); - } - } - - @Override - protected String getScheme() { - return fileSystem.getScheme(); - } - - /** An adapter around {@link FSDataInputStream} that implements {@link SeekableByteChannel}. */ - private static class HadoopSeekableByteChannel implements SeekableByteChannel { - private final FileStatus fileStatus; - private final FSDataInputStream inputStream; - private boolean closed; - - private HadoopSeekableByteChannel(FileStatus fileStatus, FSDataInputStream inputStream) { - this.fileStatus = fileStatus; - this.inputStream = inputStream; - this.closed = false; - } - - @Override - public int read(ByteBuffer dst) throws IOException { - if (closed) { - throw new IOException("Channel is closed"); - } - return inputStream.read(dst); - } - - @Override - public int write(ByteBuffer src) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public long position() throws IOException { - if (closed) { - throw new IOException("Channel is closed"); - } - return inputStream.getPos(); - } - - @Override - public SeekableByteChannel position(long newPosition) throws IOException { - if (closed) { - throw new IOException("Channel is closed"); - } - inputStream.seek(newPosition); - return this; - } - - @Override - public long size() throws IOException { - if (closed) { - throw new IOException("Channel is closed"); - } - return fileStatus.getLen(); - } - - @Override - public SeekableByteChannel truncate(long size) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public boolean isOpen() { - return !closed; - } - - @Override - public void close() throws IOException { - closed = true; - inputStream.close(); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModule.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModule.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModule.java deleted file mode 100644 index 2cb9d8a..0000000 --- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemModule.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.hdfs; - -import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.DeserializationContext; -import com.fasterxml.jackson.databind.JsonDeserializer; -import com.fasterxml.jackson.databind.JsonSerializer; -import com.fasterxml.jackson.databind.Module; -import com.fasterxml.jackson.databind.SerializerProvider; -import com.fasterxml.jackson.databind.annotation.JsonDeserialize; -import com.fasterxml.jackson.databind.annotation.JsonSerialize; -import com.fasterxml.jackson.databind.module.SimpleModule; -import com.google.auto.service.AutoService; -import java.io.IOException; -import java.util.Map; -import java.util.TreeMap; -import org.apache.hadoop.conf.Configuration; - -/** - * A Jackson {@link Module} that registers a {@link JsonSerializer} and {@link JsonDeserializer} - * for a Hadoop {@link Configuration}. The serialized representation is that of a JSON map. - * - * <p>Note that the serialization of the Hadoop {@link Configuration} only keeps the keys and their - * values dropping any configuration hierarchy and source information. - */ -@AutoService(Module.class) -public class HadoopFileSystemModule extends SimpleModule { - public HadoopFileSystemModule() { - super("HadoopFileSystemModule"); - setMixInAnnotation(Configuration.class, ConfigurationMixin.class); - } - - /** A mixin class to add Jackson annotations to the Hadoop {@link Configuration} class. */ - @JsonDeserialize(using = ConfigurationDeserializer.class) - @JsonSerialize(using = ConfigurationSerializer.class) - private static class ConfigurationMixin {} - - /** A Jackson {@link JsonDeserializer} for Hadoop {@link Configuration} objects. */ - static class ConfigurationDeserializer extends JsonDeserializer<Configuration> { - @Override - public Configuration deserialize(JsonParser jsonParser, - DeserializationContext deserializationContext) throws IOException, JsonProcessingException { - Map<String, String> rawConfiguration = - jsonParser.readValueAs(new TypeReference<Map<String, String>>() {}); - Configuration configuration = new Configuration(false); - for (Map.Entry<String, String> entry : rawConfiguration.entrySet()) { - configuration.set(entry.getKey(), entry.getValue()); - } - return configuration; - } - } - - /** A Jackson {@link JsonSerializer} for Hadoop {@link Configuration} objects. */ - static class ConfigurationSerializer extends JsonSerializer<Configuration> { - @Override - public void serialize(Configuration configuration, JsonGenerator jsonGenerator, - SerializerProvider serializerProvider) throws IOException, JsonProcessingException { - Map<String, String> map = new TreeMap<>(); - for (Map.Entry<String, String> entry : configuration) { - map.put(entry.getKey(), entry.getValue()); - } - jsonGenerator.writeObject(map); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java deleted file mode 100644 index 31250bc..0000000 --- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.hdfs; - -import java.util.List; -import org.apache.beam.sdk.options.Default; -import org.apache.beam.sdk.options.DefaultValueFactory; -import org.apache.beam.sdk.options.Description; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.hadoop.conf.Configuration; - -/** - * {@link PipelineOptions} which encapsulate {@link Configuration Hadoop Configuration} - * for the {@link HadoopFileSystem}. - */ -public interface HadoopFileSystemOptions extends PipelineOptions { - @Description("A list of Hadoop configurations used to configure zero or more Hadoop filesystems. " - + "To specify on the command-line, represent the value as a JSON list of JSON maps, where " - + "each map represents the entire configuration for a single Hadoop filesystem. For example " - + "--hdfsConfiguration='[{\"fs.default.name\": \"hdfs://localhost:9998\", ...}," - + "{\"fs.default.name\": \"s3a://\", ...},...]'") - @Default.InstanceFactory(ConfigurationLocator.class) - List<Configuration> getHdfsConfiguration(); - void setHdfsConfiguration(List<Configuration> value); - - /** A {@link DefaultValueFactory} which locates a Hadoop {@link Configuration}. */ - class ConfigurationLocator implements DefaultValueFactory<Configuration> { - @Override - public Configuration create(PipelineOptions options) { - // TODO: Find default configuration to use - return null; - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrar.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrar.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrar.java deleted file mode 100644 index 344623b..0000000 --- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptionsRegistrar.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.hdfs; - -import com.google.auto.service.AutoService; -import com.google.common.collect.ImmutableList; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsRegistrar; - -/** - * {@link AutoService} registrar for {@link HadoopFileSystemOptions}. - */ -@AutoService(PipelineOptionsRegistrar.class) -public class HadoopFileSystemOptionsRegistrar implements PipelineOptionsRegistrar { - - @Override - public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() { - return ImmutableList.<Class<? extends PipelineOptions>>of(HadoopFileSystemOptions.class); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java deleted file mode 100644 index 9159df3..0000000 --- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.hdfs; - -import static com.google.common.base.Preconditions.checkArgument; - -import com.google.auto.service.AutoService; -import com.google.common.collect.ImmutableList; -import java.io.IOException; -import java.util.Collections; -import java.util.List; -import javax.annotation.Nonnull; -import org.apache.beam.sdk.io.FileSystem; -import org.apache.beam.sdk.io.FileSystemRegistrar; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.hadoop.conf.Configuration; - -/** - * {@link AutoService} registrar for the {@link HadoopFileSystem}. - */ -@AutoService(FileSystemRegistrar.class) -public class HadoopFileSystemRegistrar implements FileSystemRegistrar { - - @Override - public Iterable<FileSystem> fromOptions(@Nonnull PipelineOptions options) { - List<Configuration> configurations = - options.as(HadoopFileSystemOptions.class).getHdfsConfiguration(); - if (configurations == null) { - configurations = Collections.emptyList(); - } - checkArgument(configurations.size() <= 1, - String.format( - "The %s currently only supports at most a single Hadoop configuration.", - HadoopFileSystemRegistrar.class.getSimpleName())); - - ImmutableList.Builder<FileSystem> builder = ImmutableList.builder(); - for (Configuration configuration : configurations) { - try { - builder.add(new HadoopFileSystem(configuration)); - } catch (IOException e) { - throw new IllegalArgumentException(String.format( - "Failed to construct Hadoop filesystem with configuration %s", configuration), e); - } - } - return builder.build(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java deleted file mode 100644 index e570864..0000000 --- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.hdfs; - -import java.net.URI; -import java.util.Objects; -import org.apache.beam.sdk.io.fs.ResolveOptions; -import org.apache.beam.sdk.io.fs.ResourceId; -import org.apache.hadoop.fs.Path; - -/** - * {@link ResourceId} implementation for the {@link HadoopFileSystem}. - */ -class HadoopResourceId implements ResourceId { - private final URI uri; - - HadoopResourceId(URI uri) { - this.uri = uri; - } - - @Override - public ResourceId resolve(String other, ResolveOptions resolveOptions) { - return new HadoopResourceId(uri.resolve(other)); - } - - @Override - public ResourceId getCurrentDirectory() { - return new HadoopResourceId(uri.getPath().endsWith("/") ? uri : uri.resolve(".")); - } - - public boolean isDirectory() { - return uri.getPath().endsWith("/"); - } - - @Override - public String getFilename() { - return new Path(uri).getName(); - } - - @Override - public String getScheme() { - return uri.getScheme(); - } - - @Override - public String toString() { - return uri.toString(); - } - - @Override - public boolean equals(Object obj) { - if (!(obj instanceof HadoopResourceId)) { - return false; - } - return Objects.equals(uri, ((HadoopResourceId) obj).uri); - } - - @Override - public int hashCode() { - return Objects.hashCode(uri); - } - - Path toPath() { - return new Path(uri); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java deleted file mode 100644 index fe2db5f..0000000 --- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/Sink.java +++ /dev/null @@ -1,195 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.hdfs; - -import java.io.Serializable; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.display.HasDisplayData; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; - -/** - * This class is deprecated, and only exists for HDFSFileSink. - */ -@Deprecated -public abstract class Sink<T> implements Serializable, HasDisplayData { - /** - * Ensures that the sink is valid and can be written to before the write operation begins. One - * should use {@link com.google.common.base.Preconditions} to implement this method. - */ - public abstract void validate(PipelineOptions options); - - /** - * Returns an instance of a {@link WriteOperation} that can write to this Sink. - */ - public abstract WriteOperation<T, ?> createWriteOperation(); - - /** - * {@inheritDoc} - * - * <p>By default, does not register any display data. Implementors may override this method - * to provide their own display data. - */ - @Override - public void populateDisplayData(DisplayData.Builder builder) {} - - /** - * A {@link WriteOperation} defines the process of a parallel write of objects to a Sink. - * - * <p>The {@code WriteOperation} defines how to perform initialization and finalization of a - * parallel write to a sink as well as how to create a {@link Sink.Writer} object that can write - * a bundle to the sink. - * - * <p>Since operations in Beam may be run multiple times for redundancy or fault-tolerance, - * the initialization and finalization defined by a WriteOperation <b>must be idempotent</b>. - * - * <p>{@code WriteOperation}s may be mutable; a {@code WriteOperation} is serialized after the - * call to {@code initialize} method and deserialized before calls to - * {@code createWriter} and {@code finalized}. However, it is not - * reserialized after {@code createWriter}, so {@code createWriter} should not mutate the - * state of the {@code WriteOperation}. - * - * <p>See {@link Sink} for more detailed documentation about the process of writing to a Sink. - * - * @param <T> The type of objects to write - * @param <WriteT> The result of a per-bundle write - */ - public abstract static class WriteOperation<T, WriteT> implements Serializable { - /** - * Performs initialization before writing to the sink. Called before writing begins. - */ - public abstract void initialize(PipelineOptions options) throws Exception; - - /** - * Indicates that the operation will be performing windowed writes. - */ - public abstract void setWindowedWrites(boolean windowedWrites); - - /** - * Given an Iterable of results from bundle writes, performs finalization after writing and - * closes the sink. Called after all bundle writes are complete. - * - * <p>The results that are passed to finalize are those returned by bundles that completed - * successfully. Although bundles may have been run multiple times (for fault-tolerance), only - * one writer result will be passed to finalize for each bundle. An implementation of finalize - * should perform clean up of any failed and successfully retried bundles. Note that these - * failed bundles will not have their writer result passed to finalize, so finalize should be - * capable of locating any temporary/partial output written by failed bundles. - * - * <p>A best practice is to make finalize atomic. If this is impossible given the semantics - * of the sink, finalize should be idempotent, as it may be called multiple times in the case of - * failure/retry or for redundancy. - * - * <p>Note that the iteration order of the writer results is not guaranteed to be consistent if - * finalize is called multiple times. - * - * @param writerResults an Iterable of results from successful bundle writes. - */ - public abstract void finalize(Iterable<WriteT> writerResults, PipelineOptions options) - throws Exception; - - /** - * Creates a new {@link Sink.Writer} to write a bundle of the input to the sink. - * - * <p>The bundle id that the writer will use to uniquely identify its output will be passed to - * {@link Writer#openWindowed} or {@link Writer#openUnwindowed}. - * - * <p>Must not mutate the state of the WriteOperation. - */ - public abstract Writer<T, WriteT> createWriter(PipelineOptions options) throws Exception; - - /** - * Returns the Sink that this write operation writes to. - */ - public abstract Sink<T> getSink(); - - /** - * Returns a coder for the writer result type. - */ - public abstract Coder<WriteT> getWriterResultCoder(); - } - - /** - * A Writer writes a bundle of elements from a PCollection to a sink. - * {@link Writer#openWindowed} or {@link Writer#openUnwindowed} is called before writing begins - * and {@link Writer#close} is called after all elements in the bundle have been written. - * {@link Writer#write} writes an element to the sink. - * - * <p>Note that any access to static members or methods of a Writer must be thread-safe, as - * multiple instances of a Writer may be instantiated in different threads on the same worker. - * - * <p>See {@link Sink} for more detailed documentation about the process of writing to a Sink. - * - * @param <T> The type of object to write - * @param <WriteT> The writer results type (e.g., the bundle's output filename, as String) - */ - public abstract static class Writer<T, WriteT> { - /** - * Performs bundle initialization. For example, creates a temporary file for writing or - * initializes any state that will be used across calls to {@link Writer#write}. - * - * <p>The unique id that is given to open should be used to ensure that the writer's output does - * not interfere with the output of other Writers, as a bundle may be executed many times for - * fault tolerance. See {@link Sink} for more information about bundle ids. - * - * <p>The window and paneInfo arguments are populated when windowed writes are requested. - * shard and numbShards are populated for the case of static sharding. In cases where the - * runner is dynamically picking sharding, shard and numShards might both be set to -1. - */ - public abstract void openWindowed(String uId, - BoundedWindow window, - PaneInfo paneInfo, - int shard, - int numShards) throws Exception; - - /** - * Perform bundle initialization for the case where the file is written unwindowed. - */ - public abstract void openUnwindowed(String uId, - int shard, - int numShards) throws Exception; - - public abstract void cleanup() throws Exception; - - /** - * Called for each value in the bundle. - */ - public abstract void write(T value) throws Exception; - - /** - * Finishes writing the bundle. Closes any resources used for writing the bundle. - * - * <p>Returns a writer result that will be used in the {@link Sink.WriteOperation}'s - * finalization. The result should contain some way to identify the output of this bundle (using - * the bundle id). {@link WriteOperation#finalize} will use the writer result to identify - * successful writes. See {@link Sink} for more information about bundle ids. - * - * @return the writer result - */ - public abstract WriteT close() throws Exception; - - /** - * Returns the write operation this writer belongs to. - */ - public abstract WriteOperation<T, WriteT> getWriteOperation(); - - - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/bacd33c8/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/UGIHelper.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/UGIHelper.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/UGIHelper.java deleted file mode 100644 index fd05a19..0000000 --- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/UGIHelper.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.hdfs; - -import java.io.IOException; -import javax.annotation.Nullable; -import org.apache.hadoop.security.UserGroupInformation; - -/** - * {@link UserGroupInformation} helper methods. - */ -public class UGIHelper { - - /** - * Find the most appropriate UserGroupInformation to use. - * @param username the user name, or NULL if none is specified. - * @return the most appropriate UserGroupInformation - */ - public static UserGroupInformation getBestUGI(@Nullable String username) throws IOException { - return UserGroupInformation.getBestUGI(null, username); - } - -}