Updated Branches: refs/heads/master eb3de8851 -> 2d25d5dfd
Cleanup a bunch of compiler warnings, mostly by adding serialVersionUIDs to DoFn and friends Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/2d25d5df Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/2d25d5df Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/2d25d5df Branch: refs/heads/master Commit: 2d25d5dfd8f4b78681162c2dfaf4a7342c7678e1 Parents: 4b73249 Author: Josh Wills <[email protected]> Authored: Mon Jun 18 19:44:23 2012 -0700 Committer: Josh Wills <[email protected]> Committed: Mon Jun 18 19:44:23 2012 -0700 ---------------------------------------------------------------------- src/main/java/com/cloudera/crunch/CombineFn.java | 55 +- src/main/java/com/cloudera/crunch/DoFn.java | 3 +- src/main/java/com/cloudera/crunch/FilterFn.java | 7 + src/main/java/com/cloudera/crunch/MapFn.java | 1 + src/main/java/com/cloudera/crunch/Tuple3.java | 2 +- src/main/java/com/cloudera/crunch/Tuple4.java | 2 +- .../com/cloudera/crunch/fn/CompositeMapFn.java | 3 +- .../java/com/cloudera/crunch/fn/ExtractKeyFn.java | 3 +- .../java/com/cloudera/crunch/fn/IdentityFn.java | 3 +- .../java/com/cloudera/crunch/fn/MapKeysFn.java | 2 + .../java/com/cloudera/crunch/fn/MapValuesFn.java | 2 + .../java/com/cloudera/crunch/fn/PairMapFn.java | 2 + .../crunch/io/text/BZip2TextInputFormat.java | 358 ++-- .../cloudera/crunch/io/text/CBZip2InputStream.java | 1720 +++++++-------- .../cloudera/crunch/io/text/TextFileSource.java | 2 +- .../java/com/cloudera/crunch/lib/Aggregate.java | 17 +- src/main/java/com/cloudera/crunch/lib/Cogroup.java | 6 + src/main/java/com/cloudera/crunch/lib/Join.java | 22 +- src/main/java/com/cloudera/crunch/lib/PTables.java | 2 + src/main/java/com/cloudera/crunch/lib/Sample.java | 3 +- src/main/java/com/cloudera/crunch/lib/Set.java | 74 +- src/main/java/com/cloudera/crunch/lib/Sort.java | 36 +- .../cloudera/crunch/lib/join/FullOuterJoinFn.java | 2 + .../com/cloudera/crunch/lib/join/InnerJoinFn.java | 2 + .../java/com/cloudera/crunch/lib/join/JoinFn.java | 2 + .../com/cloudera/crunch/lib/join/JoinUtils.java | 4 +- .../cloudera/crunch/lib/join/LeftOuterJoinFn.java | 2 + .../cloudera/crunch/lib/join/RightOuterJoinFn.java | 3 +- .../cloudera/crunch/types/PGroupedTableType.java | 5 +- .../com/cloudera/crunch/types/PTypeFamily.java | 4 +- .../com/cloudera/crunch/types/TupleFactory.java | 1 + .../crunch/types/avro/AvroGroupedTableType.java | 2 - .../crunch/types/avro/AvroKeyConverter.java | 2 + .../crunch/types/avro/AvroPairConverter.java | 4 +- .../cloudera/crunch/types/avro/AvroTypeFamily.java | 10 +- .../java/com/cloudera/crunch/types/avro/Avros.java | 29 +- .../types/writable/WritableGroupedTableType.java | 1 - .../types/writable/WritablePairConverter.java | 4 +- .../crunch/types/writable/WritableTypeFamily.java | 10 +- .../types/writable/WritableValueConverter.java | 3 +- .../cloudera/crunch/types/writable/Writables.java | 43 +- src/main/java/com/cloudera/crunch/util/PTypes.java | 17 +- src/main/java/com/cloudera/crunch/util/Protos.java | 4 + src/main/java/com/cloudera/crunch/util/Tuples.java | 4 +- 44 files changed, 1313 insertions(+), 1170 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2d25d5df/src/main/java/com/cloudera/crunch/CombineFn.java ---------------------------------------------------------------------- diff --git a/src/main/java/com/cloudera/crunch/CombineFn.java b/src/main/java/com/cloudera/crunch/CombineFn.java index f1b17dd..e450140 100644 --- a/src/main/java/com/cloudera/crunch/CombineFn.java +++ b/src/main/java/com/cloudera/crunch/CombineFn.java @@ -35,7 +35,8 @@ import com.google.common.collect.Sets; * */ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, T>> { - + private static final long serialVersionUID = 1L; + public static interface Aggregator<T> extends Serializable { /** * Clears the internal state of this Aggregator and prepares it for the values associated @@ -66,6 +67,8 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, * instance. */ public static class AggregatorCombineFn<K, V> extends CombineFn<K, V> { + private static final long serialVersionUID = 1L; + private final Aggregator<V> aggregator; public AggregatorCombineFn(Aggregator<V> aggregator) { @@ -85,6 +88,8 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, } private static abstract class TupleAggregator<T> implements Aggregator<T> { + private static final long serialVersionUID = 1L; + private final List<Aggregator<Object>> aggregators; public TupleAggregator(Aggregator<?>...aggregators) { @@ -113,6 +118,8 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, } public static class PairAggregator<V1, V2> extends TupleAggregator<Pair<V1, V2>> { + private static final long serialVersionUID = 1L; + public PairAggregator(Aggregator<V1> a1, Aggregator<V2> a2) { super(a1, a2); } @@ -129,6 +136,8 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, } public static class TripAggregator<A, B, C> extends TupleAggregator<Tuple3<A, B, C>> { + private static final long serialVersionUID = 1L; + public TripAggregator(Aggregator<A> a1, Aggregator<B> a2, Aggregator<C> a3) { super(a1, a2, a3); } @@ -146,6 +155,8 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, } public static class QuadAggregator<A, B, C, D> extends TupleAggregator<Tuple4<A, B, C, D>> { + private static final long serialVersionUID = 1L; + public QuadAggregator(Aggregator<A> a1, Aggregator<B> a2, Aggregator<C> a3, Aggregator<D> a4) { super(a1, a2, a3, a4); } @@ -163,6 +174,8 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, } public static class TupleNAggregator extends TupleAggregator<TupleN> { + private static final long serialVersionUID = 1L; + private final int size; public TupleNAggregator(Aggregator<?>... aggregators) { @@ -212,7 +225,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, } public static final <K> CombineFn<K, TupleN> tupleAggregator(AggregatorFactory<?>... factories) { - Aggregator[] aggs = new Aggregator[factories.length]; + Aggregator<?>[] aggs = new Aggregator[factories.length]; for (int i = 0; i < aggs.length; i++) { aggs[i] = factories[i].create(); } @@ -328,6 +341,8 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, } public static class SumLongs implements Aggregator<Long> { + private static final long serialVersionUID = 1L; + private long sum = 0; @Override @@ -350,6 +365,8 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, }; public static class SumInts implements Aggregator<Integer> { + private static final long serialVersionUID = 1L; + private int sum = 0; @Override @@ -372,6 +389,8 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, }; public static class SumFloats implements Aggregator<Float> { + private static final long serialVersionUID = 1L; + private float sum = 0; @Override @@ -394,6 +413,8 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, }; public static class SumDoubles implements Aggregator<Double> { + private static final long serialVersionUID = 1L; + private double sum = 0; @Override @@ -416,6 +437,8 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, }; public static class SumBigInts implements Aggregator<BigInteger> { + private static final long serialVersionUID = 1L; + private BigInteger sum = BigInteger.ZERO; @Override @@ -438,6 +461,8 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, }; public static class MaxLongs implements Aggregator<Long> { + private static final long serialVersionUID = 1L; + private Long max = null; @Override @@ -462,6 +487,8 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, }; public static class MaxInts implements Aggregator<Integer> { + private static final long serialVersionUID = 1L; + private Integer max = null; @Override @@ -486,6 +513,8 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, }; public static class MaxFloats implements Aggregator<Float> { + private static final long serialVersionUID = 1L; + private Float max = null; @Override @@ -510,6 +539,8 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, }; public static class MaxDoubles implements Aggregator<Double> { + private static final long serialVersionUID = 1L; + private Double max = null; @Override @@ -534,6 +565,8 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, }; public static class MaxBigInts implements Aggregator<BigInteger> { + private static final long serialVersionUID = 1L; + private BigInteger max = null; @Override @@ -558,6 +591,8 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, }; public static class MinLongs implements Aggregator<Long> { + private static final long serialVersionUID = 1L; + private Long min = null; @Override @@ -582,6 +617,8 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, }; public static class MinInts implements Aggregator<Integer> { + private static final long serialVersionUID = 1L; + private Integer min = null; @Override @@ -606,6 +643,8 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, }; public static class MinFloats implements Aggregator<Float> { + private static final long serialVersionUID = 1L; + private Float min = null; @Override @@ -630,6 +669,8 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, }; public static class MinDoubles implements Aggregator<Double> { + private static final long serialVersionUID = 1L; + private Double min = null; @Override @@ -654,6 +695,8 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, }; public static class MinBigInts implements Aggregator<BigInteger> { + private static final long serialVersionUID = 1L; + private BigInteger min = null; @Override @@ -678,6 +721,8 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, }; public static class MaxNAggregator<V extends Comparable<V>> implements Aggregator<V> { + private static final long serialVersionUID = 1L; + private final int arity; private transient SortedSet<V> elements; @@ -711,6 +756,8 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, } public static class MinNAggregator<V extends Comparable<V>> implements Aggregator<V> { + private static final long serialVersionUID = 1L; + private final int arity; private transient SortedSet<V> elements; @@ -744,6 +791,8 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, } public static class FirstNAggregator<V> implements Aggregator<V> { + private static final long serialVersionUID = 1L; + private final int arity; private final List<V> elements; @@ -771,6 +820,8 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, } public static class LastNAggregator<V> implements Aggregator<V> { + private static final long serialVersionUID = 1L; + private final int arity; private final LinkedList<V> elements; http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2d25d5df/src/main/java/com/cloudera/crunch/DoFn.java ---------------------------------------------------------------------- diff --git a/src/main/java/com/cloudera/crunch/DoFn.java b/src/main/java/com/cloudera/crunch/DoFn.java index 31c9f68..bbd4e8e 100644 --- a/src/main/java/com/cloudera/crunch/DoFn.java +++ b/src/main/java/com/cloudera/crunch/DoFn.java @@ -35,7 +35,8 @@ import com.cloudera.crunch.test.TestCounters; * */ public abstract class DoFn<S, T> implements Serializable { - + private static final long serialVersionUID = 1L; + private transient TaskInputOutputContext<?, ?, ?, ?> context; private transient Configuration testConf; private transient String internalStatus; http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2d25d5df/src/main/java/com/cloudera/crunch/FilterFn.java ---------------------------------------------------------------------- diff --git a/src/main/java/com/cloudera/crunch/FilterFn.java b/src/main/java/com/cloudera/crunch/FilterFn.java index a5d88b4..fee4879 100644 --- a/src/main/java/com/cloudera/crunch/FilterFn.java +++ b/src/main/java/com/cloudera/crunch/FilterFn.java @@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableList; * */ public abstract class FilterFn<T> extends DoFn<T, T> { + private static final long serialVersionUID = 1L; /** * If true, emit the given record. @@ -48,6 +49,8 @@ public abstract class FilterFn<T> extends DoFn<T, T> { } public static class AndFn<S> extends FilterFn<S> { + private static final long serialVersionUID = 1L; + private final List<FilterFn<S>> fns; public AndFn(FilterFn<S>... fns) { @@ -79,6 +82,8 @@ public abstract class FilterFn<T> extends DoFn<T, T> { } public static class OrFn<S> extends FilterFn<S> { + private static final long serialVersionUID = 1L; + private final List<FilterFn<S>> fns; public OrFn(FilterFn<S>... fns) { @@ -110,6 +115,8 @@ public abstract class FilterFn<T> extends DoFn<T, T> { } public static class NotFn<S> extends FilterFn<S> { + private static final long serialVersionUID = 1L; + private final FilterFn<S> base; public NotFn(FilterFn<S> base) { http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2d25d5df/src/main/java/com/cloudera/crunch/MapFn.java ---------------------------------------------------------------------- diff --git a/src/main/java/com/cloudera/crunch/MapFn.java b/src/main/java/com/cloudera/crunch/MapFn.java index 8c11fa6..32ab9e6 100644 --- a/src/main/java/com/cloudera/crunch/MapFn.java +++ b/src/main/java/com/cloudera/crunch/MapFn.java @@ -21,6 +21,7 @@ package com.cloudera.crunch; * */ public abstract class MapFn<S, T> extends DoFn<S, T> { + private static final long serialVersionUID = 1L; /** * Maps the given input into an instance of the output type. http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2d25d5df/src/main/java/com/cloudera/crunch/Tuple3.java ---------------------------------------------------------------------- diff --git a/src/main/java/com/cloudera/crunch/Tuple3.java b/src/main/java/com/cloudera/crunch/Tuple3.java index 92f8504..57e192b 100644 --- a/src/main/java/com/cloudera/crunch/Tuple3.java +++ b/src/main/java/com/cloudera/crunch/Tuple3.java @@ -79,7 +79,7 @@ public class Tuple3<V1, V2, V3> implements Tuple { return false; if (getClass() != obj.getClass()) return false; - Tuple3 other = (Tuple3) obj; + Tuple3<?, ?, ?> other = (Tuple3<?, ?, ?>) obj; return (first == other.first || (first != null && first.equals(other.first))) && (second == other.second || (second != null && second.equals(other.second))) && (third == other.third || (third != null && third.equals(other.third))); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2d25d5df/src/main/java/com/cloudera/crunch/Tuple4.java ---------------------------------------------------------------------- diff --git a/src/main/java/com/cloudera/crunch/Tuple4.java b/src/main/java/com/cloudera/crunch/Tuple4.java index 390a20f..d1edf9d 100644 --- a/src/main/java/com/cloudera/crunch/Tuple4.java +++ b/src/main/java/com/cloudera/crunch/Tuple4.java @@ -88,7 +88,7 @@ public class Tuple4<V1, V2, V3, V4> implements Tuple { return false; if (getClass() != obj.getClass()) return false; - Tuple4 other = (Tuple4) obj; + Tuple4<?, ?, ?, ?> other = (Tuple4<?, ?, ?, ?>) obj; return (first == other.first || (first != null && first.equals(other.first))) && (second == other.second || (second != null && second.equals(other.second))) && (third == other.third || (third != null && third.equals(other.third))) && http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2d25d5df/src/main/java/com/cloudera/crunch/fn/CompositeMapFn.java ---------------------------------------------------------------------- diff --git a/src/main/java/com/cloudera/crunch/fn/CompositeMapFn.java b/src/main/java/com/cloudera/crunch/fn/CompositeMapFn.java index f809b25..d0ba32a 100644 --- a/src/main/java/com/cloudera/crunch/fn/CompositeMapFn.java +++ b/src/main/java/com/cloudera/crunch/fn/CompositeMapFn.java @@ -20,7 +20,8 @@ import com.cloudera.crunch.Emitter; import com.cloudera.crunch.MapFn; public class CompositeMapFn<R, S, T> extends MapFn<R, T> { - + private static final long serialVersionUID = 1L; + private final MapFn<R, S> first; private final MapFn<S, T> second; http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2d25d5df/src/main/java/com/cloudera/crunch/fn/ExtractKeyFn.java ---------------------------------------------------------------------- diff --git a/src/main/java/com/cloudera/crunch/fn/ExtractKeyFn.java b/src/main/java/com/cloudera/crunch/fn/ExtractKeyFn.java index ac78721..d3942d2 100644 --- a/src/main/java/com/cloudera/crunch/fn/ExtractKeyFn.java +++ b/src/main/java/com/cloudera/crunch/fn/ExtractKeyFn.java @@ -22,7 +22,8 @@ import com.cloudera.crunch.Pair; * is used to convert from a {@code PCollection<V>} to a {@code PTable<K, V>}. */ public class ExtractKeyFn<K, V> extends MapFn<V, Pair<K, V>> { - + private static final long serialVersionUID = 1L; + private final MapFn<V, K> mapFn; public ExtractKeyFn(MapFn<V, K> mapFn) { http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2d25d5df/src/main/java/com/cloudera/crunch/fn/IdentityFn.java ---------------------------------------------------------------------- diff --git a/src/main/java/com/cloudera/crunch/fn/IdentityFn.java b/src/main/java/com/cloudera/crunch/fn/IdentityFn.java index 1008d9d..d10e608 100644 --- a/src/main/java/com/cloudera/crunch/fn/IdentityFn.java +++ b/src/main/java/com/cloudera/crunch/fn/IdentityFn.java @@ -17,7 +17,8 @@ package com.cloudera.crunch.fn; import com.cloudera.crunch.MapFn; public class IdentityFn<T> extends MapFn<T, T> { - + private static final long serialVersionUID = 1L; + private static final IdentityFn<Object> INSTANCE = new IdentityFn<Object>(); @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2d25d5df/src/main/java/com/cloudera/crunch/fn/MapKeysFn.java ---------------------------------------------------------------------- diff --git a/src/main/java/com/cloudera/crunch/fn/MapKeysFn.java b/src/main/java/com/cloudera/crunch/fn/MapKeysFn.java index 6ce374f..eaf0ed0 100644 --- a/src/main/java/com/cloudera/crunch/fn/MapKeysFn.java +++ b/src/main/java/com/cloudera/crunch/fn/MapKeysFn.java @@ -19,6 +19,8 @@ import com.cloudera.crunch.Emitter; import com.cloudera.crunch.Pair; public abstract class MapKeysFn<K1, K2, V> extends DoFn<Pair<K1, V>, Pair<K2, V>> { + private static final long serialVersionUID = 1L; + @Override public void process(Pair<K1, V> input, Emitter<Pair<K2, V>> emitter) { emitter.emit(Pair.of(map(input.first()), input.second())); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2d25d5df/src/main/java/com/cloudera/crunch/fn/MapValuesFn.java ---------------------------------------------------------------------- diff --git a/src/main/java/com/cloudera/crunch/fn/MapValuesFn.java b/src/main/java/com/cloudera/crunch/fn/MapValuesFn.java index 7c1a8b6..df75579 100644 --- a/src/main/java/com/cloudera/crunch/fn/MapValuesFn.java +++ b/src/main/java/com/cloudera/crunch/fn/MapValuesFn.java @@ -19,6 +19,8 @@ import com.cloudera.crunch.Emitter; import com.cloudera.crunch.Pair; public abstract class MapValuesFn<K, V1, V2> extends DoFn<Pair<K, V1>, Pair<K, V2>> { + private static final long serialVersionUID = 1L; + @Override public void process(Pair<K, V1> input, Emitter<Pair<K, V2>> emitter) { emitter.emit(Pair.of(input.first(), map(input.second()))); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2d25d5df/src/main/java/com/cloudera/crunch/fn/PairMapFn.java ---------------------------------------------------------------------- diff --git a/src/main/java/com/cloudera/crunch/fn/PairMapFn.java b/src/main/java/com/cloudera/crunch/fn/PairMapFn.java index 0e64da1..93491dc 100644 --- a/src/main/java/com/cloudera/crunch/fn/PairMapFn.java +++ b/src/main/java/com/cloudera/crunch/fn/PairMapFn.java @@ -21,6 +21,8 @@ import com.cloudera.crunch.MapFn; import com.cloudera.crunch.Pair; public class PairMapFn<K, V, S, T> extends MapFn<Pair<K, V>, Pair<S, T>> { + private static final long serialVersionUID = 1L; + private MapFn<K, S> keys; private MapFn<V, T> values; http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2d25d5df/src/main/java/com/cloudera/crunch/io/text/BZip2TextInputFormat.java ---------------------------------------------------------------------- diff --git a/src/main/java/com/cloudera/crunch/io/text/BZip2TextInputFormat.java b/src/main/java/com/cloudera/crunch/io/text/BZip2TextInputFormat.java index 55250dc..3c8ecb7 100644 --- a/src/main/java/com/cloudera/crunch/io/text/BZip2TextInputFormat.java +++ b/src/main/java/com/cloudera/crunch/io/text/BZip2TextInputFormat.java @@ -35,210 +35,208 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; -@SuppressWarnings("unchecked") public class BZip2TextInputFormat extends FileInputFormat<LongWritable, Text> { + /** + * Treats keys as offset in file and value as line. Since the input file is + * compressed, the offset for a particular line is not well-defined. This + * implementation returns the starting position of a compressed block as the + * key for every line in that block. + */ + + private static class BZip2LineRecordReader extends RecordReader<LongWritable, Text> { + + private long start; + + private long end; + + private long pos; + + private CBZip2InputStream in; + + private ByteArrayOutputStream buffer = new ByteArrayOutputStream(256); + + // flag to indicate if previous character read was Carriage Return ('\r') + // and the next character was not Line Feed ('\n') + private boolean CRFollowedByNonLF = false; + + // in the case where a Carriage Return ('\r') was not followed by a + // Line Feed ('\n'), this variable will hold that non Line Feed character + // that was read from the underlying stream. + private byte nonLFChar; + /** - * Treats keys as offset in file and value as line. Since the input file is - * compressed, the offset for a particular line is not well-defined. This - * implementation returns the starting position of a compressed block as the - * key for every line in that block. + * Provide a bridge to get the bytes from the ByteArrayOutputStream without + * creating a new byte array. */ + private static class TextStuffer extends OutputStream { + public Text target; + + @Override + public void write(int b) { + throw new UnsupportedOperationException("write(byte) not supported"); + } + + @Override + public void write(byte[] data, int offset, int len) throws IOException { + target.clear(); + target.set(data, offset, len); + } + } - private static class BZip2LineRecordReader extends RecordReader { - - private long start; - - private long end; - - private long pos; - - private CBZip2InputStream in; - - private ByteArrayOutputStream buffer = new ByteArrayOutputStream(256); - - // flag to indicate if previous character read was Carriage Return ('\r') - // and the next character was not Line Feed ('\n') - private boolean CRFollowedByNonLF = false; - - // in the case where a Carriage Return ('\r') was not followed by a - // Line Feed ('\n'), this variable will hold that non Line Feed character - // that was read from the underlying stream. - private byte nonLFChar; - - - /** - * Provide a bridge to get the bytes from the ByteArrayOutputStream without - * creating a new byte array. - */ - private static class TextStuffer extends OutputStream { - public Text target; - - @Override - public void write(int b) { - throw new UnsupportedOperationException("write(byte) not supported"); - } - - @Override - public void write(byte[] data, int offset, int len) throws IOException { - target.clear(); - target.set(data, offset, len); - } - } + private TextStuffer bridge = new TextStuffer(); + + private LongWritable key = new LongWritable(); + private Text value = new Text(); + + public BZip2LineRecordReader(Configuration job, FileSplit split) throws IOException { + start = split.getStart(); + end = start + split.getLength(); + final Path file = split.getPath(); + + // open the file and seek to the start of the split + FileSystem fs = file.getFileSystem(job); + FSDataInputStream fileIn = fs.open(split.getPath()); + fileIn.seek(start); + + in = new CBZip2InputStream(fileIn, 9, end); + if (start != 0) { + // skip first line and re-establish "start". + // LineRecordReader.readLine(this.in, null); + readLine(this.in, null); + start = in.getPos(); + } + pos = in.getPos(); + } - private TextStuffer bridge = new TextStuffer(); - - private LongWritable key = new LongWritable(); - private Text value = new Text(); - - public BZip2LineRecordReader(Configuration job, FileSplit split) throws IOException { - start = split.getStart(); - end = start + split.getLength(); - final Path file = split.getPath(); - - // open the file and seek to the start of the split - FileSystem fs = file.getFileSystem(job); - FSDataInputStream fileIn = fs.open(split.getPath()); - fileIn.seek(start); - - in = new CBZip2InputStream(fileIn, 9, end); - if (start != 0) { - // skip first line and re-establish "start". - // LineRecordReader.readLine(this.in, null); - readLine(this.in, null); - start = in.getPos(); - } - pos = in.getPos(); + /* + * LineRecordReader.readLine() is depricated in HAdoop 0.17. So it is added here + * locally. + */ + private long readLine(InputStream in, + OutputStream out) throws IOException { + long bytes = 0; + while (true) { + int b = -1; + if(CRFollowedByNonLF) { + // In the previous call, a Carriage Return ('\r') was followed + // by a non Line Feed ('\n') character - in that call we would + // have not returned the non Line Feed character but would have + // read it from the stream - lets use that already read character + // now + b = nonLFChar; + CRFollowedByNonLF = false; + } else { + b = in.read(); } - - /* - * LineRecordReader.readLine() is depricated in HAdoop 0.17. So it is added here - * locally. - */ - private long readLine(InputStream in, - OutputStream out) throws IOException { - long bytes = 0; - while (true) { - int b = -1; - if(CRFollowedByNonLF) { - // In the previous call, a Carriage Return ('\r') was followed - // by a non Line Feed ('\n') character - in that call we would - // have not returned the non Line Feed character but would have - // read it from the stream - lets use that already read character - // now - b = nonLFChar; - CRFollowedByNonLF = false; - } else { - b = in.read(); - } - if (b == -1) { - break; - } - bytes += 1; - - byte c = (byte)b; - if (c == '\n') { - break; - } - - if (c == '\r') { - byte nextC = (byte)in.read(); - if (nextC != '\n') { - CRFollowedByNonLF = true; - nonLFChar = nextC; - } else { - bytes += 1; - } - break; - } - - if (out != null) { - out.write(c); - } - } - return bytes; + if (b == -1) { + break; } + bytes += 1; - /** Read a line. */ - public boolean next(LongWritable key, Text value) - throws IOException { - if (pos > end) - return false; - - key.set(pos); // key is position - buffer.reset(); - // long bytesRead = LineRecordReader.readLine(in, buffer); - long bytesRead = readLine(in, buffer); - if (bytesRead == 0) { - return false; - } - pos = in.getPos(); - // if we have read ahead because we encountered a carriage return - // char followed by a non line feed char, decrement the pos - if(CRFollowedByNonLF) { - pos--; - } - - bridge.target = value; - buffer.writeTo(bridge); - return true; + byte c = (byte)b; + if (c == '\n') { + break; } - /** - * Get the progress within the split - */ - @Override - public float getProgress() { - if (start == end) { - return 0.0f; - } else { - return Math.min(1.0f, (pos - start) / (float) (end - start)); - } + if (c == '\r') { + byte nextC = (byte)in.read(); + if (nextC != '\n') { + CRFollowedByNonLF = true; + nonLFChar = nextC; + } else { + bytes += 1; + } + break; } - @Override - public void close() throws IOException { - in.close(); + if (out != null) { + out.write(c); } + } + return bytes; + } - @Override - public LongWritable getCurrentKey() throws IOException, - InterruptedException { - return key; - } + /** Read a line. */ + public boolean next(LongWritable key, Text value) + throws IOException { + if (pos > end) + return false; + + key.set(pos); // key is position + buffer.reset(); + // long bytesRead = LineRecordReader.readLine(in, buffer); + long bytesRead = readLine(in, buffer); + if (bytesRead == 0) { + return false; + } + pos = in.getPos(); + // if we have read ahead because we encountered a carriage return + // char followed by a non line feed char, decrement the pos + if(CRFollowedByNonLF) { + pos--; + } + + bridge.target = value; + buffer.writeTo(bridge); + return true; + } - @Override - public Text getCurrentValue() throws IOException, InterruptedException { - return value; - } + /** + * Get the progress within the split + */ + @Override + public float getProgress() { + if (start == end) { + return 0.0f; + } else { + return Math.min(1.0f, (pos - start) / (float) (end - start)); + } + } - @Override - public void initialize(InputSplit split, TaskAttemptContext context) - throws IOException, InterruptedException { - // no op - } + @Override + public void close() throws IOException { + in.close(); + } - @Override - public boolean nextKeyValue() throws IOException, InterruptedException { - return next(key, value); - } + @Override + public LongWritable getCurrentKey() throws IOException, + InterruptedException { + return key; + } + @Override + public Text getCurrentValue() throws IOException, InterruptedException { + return value; } @Override - protected boolean isSplitable(JobContext context, Path file) { - return true; + public void initialize(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + // no op } - + @Override - public RecordReader createRecordReader(InputSplit split, - TaskAttemptContext context) { - try { - return new BZip2LineRecordReader(context.getConfiguration(), - (FileSplit) split); - } catch (IOException e) { - throw new RuntimeException(e); - } + public boolean nextKeyValue() throws IOException, InterruptedException { + return next(key, value); + } + + } + + @Override + protected boolean isSplitable(JobContext context, Path file) { + return true; + } + + @Override + public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, + TaskAttemptContext context) { + try { + return new BZip2LineRecordReader(context.getConfiguration(), + (FileSplit) split); + } catch (IOException e) { + throw new RuntimeException(e); } + } } \ No newline at end of file
