OK so in Java - pardon the verbosity I might say something like the code below but I face the following issues 1) I need to store all values in memory as I run combineByKey - it I could return an RDD which consumed values that would be great but I don't know how to do that - 2) In my version of the functions I get a tuple so I know the key but all of Scala's functtions for byKey do not make the key available - this may work for a trivial function like wordcount but the code I want to port needs to know the key when processing values 3) it is important the I have control over partitioning - I can do that with mapPartition but it is also important that within a partition keys be received in sorted order - easy if every partition could a separate RDD - combined later but I am not sure how that works.
in Hadoop then I reduce the values for each key I get an interator and do not need to keep all values in memory. Similarly while the output in Hadoop is written to disk as key values in Spark it could populate a JavaPairRDD if there were a way to do that lazily One other issue - I don't see a good way to say a merge function is finished - i.e. no further data is coming in which would be useful in processing steps. /** * a class to store a key and all its values * using an array list * @param <K> key type * @param <V> value type */ public static class KeyAndValues<K, V> { public final K key; private final ArrayList<V> values = new ArrayList<V>(); public KeyAndValues(final K pKey) { key = pKey; } public void addValue(V added) { values.add(added); } public Iterable<V> getIterable() { return values; } public KeyAndValues<K, V> merge(KeyAndValues<K, V> merged) { values.addAll(merged.values); return this; } } // start function for combine by key - gets key from first tuple public static class CombineStartKeyAndValues<K, V> implements Function<Tuple2<K,V>, KeyAndValues<K, V>> { public KeyAndValues call(Tuple2<K,V> x) { KeyAndValues ret = new KeyAndValues(x._1()); ret.addValue(x._2()); return ret; } } // continue function for combine by key - adds values to array public static class CombineContinueKeyAndValues<K, V> implements Function2< KeyAndValues< K,V>, Tuple2<K,V>, KeyAndValues<K, V>> { public KeyAndValues<K, V> call(final KeyAndValues<K, V> kvs, final Tuple2<K,V> added) throws Exception { kvs.addValue(added._2()); return kvs; } } // merge function - merges arrays - NOTE there is no signal to say merge is done public static class CombineMergeKeyAndValues<K, V> implements Function2< KeyAndValues<K, V>,KeyAndValues<K, V>,KeyAndValues<K, V>> { public KeyAndValues<K, V> call(final KeyAndValues<K, V> v1, final KeyAndValues<K, V> v2) throws Exception { return null; } } On Fri, Sep 19, 2014 at 11:19 PM, Victor Tso-Guillen <v...@paxata.com> wrote: > So sorry about teasing you with the Scala. But the method is there in Java > too, I just checked. > > On Fri, Sep 19, 2014 at 2:02 PM, Victor Tso-Guillen <v...@paxata.com> > wrote: > >> It might not be the same as a real hadoop reducer, but I think it would >> accomplish the same. Take a look at: >> >> import org.apache.spark.SparkContext._ >> // val rdd: RDD[(K, V)] >> // def zero(value: V): S >> // def reduce(agg: S, value: V): S >> // def merge(agg1: S, agg2: S): S >> val reducedUnsorted: RDD[(K, S)] = rdd.combineByKey[Int](zero, reduce, >> merge) >> reducedUnsorted.sortByKey() >> >> On Fri, Sep 19, 2014 at 1:37 PM, Steve Lewis <lordjoe2...@gmail.com> >> wrote: >> >>> I am struggling to reproduce the functionality of a Hadoop reducer on >>> Spark (in Java) >>> >>> in Hadoop I have a function >>> public void doReduce(K key, Iterator<V> values) >>> in Hadoop there is also a consumer (context write) which can be seen as >>> consume(key,value) >>> >>> In my code >>> 1) knowing the key is important to the function >>> 2) there is neither one output tuple2 per key nor one output tuple2 per >>> value >>> 3) the number of values per key might be large enough that storing them >>> in memory is impractical >>> 4) keys must appear in sorted order >>> >>> one good example would run through a large document using a similarity >>> function to look at the last 200 lines and output any of those with a >>> similarity of more than 0.3 (do not suggest output all and filter - the >>> real problem is more complex) the critical concern is an uncertain number >>> of tuples per key. >>> >>> my questions >>> 1) how can this be done - ideally a consumer would be a JavaPairRDD but >>> I don't see how to create one and add items later >>> >>> 2) how do I handle the entire partition, sort, process (involving calls >>> to doReduce) process >>> >>> >>> >> >> > -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com