+1. Thanks. On Jun 29, 2012 2:48 AM, "Gabriel Reid" <[email protected]> wrote:
> Yep, that's an interesting one as well. I'll fix the value re-use > issue with the change to PType if that sounds ok to you (or anyone > else), and I'll take a look at what can be done about this as well if > possible. > > - Gabriel > > On Thu, Jun 28, 2012 at 11:41 PM, Josh Wills <[email protected]> wrote: > > Yeah, that's no good. I had a similar case w/a project my intern was > > working on, where he created a PType that was: > > > > PType<Pair<K, V>> kv = pairs(strings(), strings()); > > ... > > tableOf(kv, kv); > > > > ... which also fails because the output mapfn returned by kv is > > stateful. Easy to remedy it in the code, but confusing for the user in > > the same way. It would be nice for the PType to know if their MapFns > > were stateful so that a new instance of them could be returned each > > time PType.getInputMapFn or getOutputMapFn was called. > > > > On Thu, Jun 28, 2012 at 2:32 PM, Gabriel Reid <[email protected]> > wrote: > >> Hmm, strange...according to my mail client the attachment was in > >> there. Anyhow, I've pasted it inline below: > >> > >> package com.cloudera.crunch; > >> > >> import static org.junit.Assert.assertEquals; > >> > >> import java.io.DataInput; > >> import java.io.DataOutput; > >> import java.io.IOException; > >> import java.io.Serializable; > >> import java.util.Collection; > >> import java.util.Collections; > >> import java.util.List; > >> import java.util.Map; > >> > >> import org.apache.hadoop.io.Writable; > >> import org.junit.Test; > >> > >> import com.cloudera.crunch.impl.mr.MRPipeline; > >> import com.cloudera.crunch.test.FileHelper; > >> import com.cloudera.crunch.types.writable.Writables; > >> import com.google.common.collect.Lists; > >> > >> public class SerializationReducerTest implements Serializable { > >> > >> public static class SimpleStringWritable implements Writable { > >> > >> private String value; > >> > >> public void setValue(String value) { > >> this.value = value; > >> } > >> > >> public String getValue() { > >> return value; > >> } > >> > >> @Override > >> public String toString() { > >> return String.format("SimpleStringWritable(%s)", value); > >> } > >> > >> @Override > >> public void write(DataOutput out) throws IOException { > >> out.writeUTF(value); > >> } > >> > >> @Override > >> public void readFields(DataInput in) throws IOException { > >> this.value = in.readUTF(); > >> } > >> > >> } > >> > >> static SimpleStringWritable asSimple(String value) { > >> SimpleStringWritable simpleStringWritable = new > SimpleStringWritable(); > >> simpleStringWritable.setValue(value); > >> return simpleStringWritable; > >> } > >> > >> static List<String> simplesToList(Collection<SimpleStringWritable> > >> simpleCollection) { > >> List<String> stringList = Lists.newArrayList(); > >> for (SimpleStringWritable writable : simpleCollection) { > >> stringList.add(writable.getValue()); > >> } > >> Collections.sort(stringList); > >> return stringList; > >> } > >> > >> @Test > >> public void testWritables() throws IOException { > >> Pipeline pipeline = new MRPipeline(SerializationReducerTest.class); > >> Map<Integer, Collection<SimpleStringWritable>> collectionMap = > pipeline > >> .readTextFile(FileHelper.createTempCopyOf("set1.txt")) > >> .parallelDo(new MapFn<String, Pair<Integer, > SimpleStringWritable>>() { > >> > >> @Override > >> public Pair<Integer, SimpleStringWritable> map(String input) { > >> return Pair.of(1, asSimple(input)); > >> } > >> > >> }, Writables.tableOf(Writables.ints(), > >> Writables.writables(SimpleStringWritable.class)) > >> > >> ).collectValues().materializeToMap(); > >> > >> assertEquals(1, collectionMap.size()); > >> > >> // The actual content will just be ["e", "e", "e", "e"] > >> assertEquals(Lists.newArrayList("a", "b", "c", "e"), > >> simplesToList(collectionMap.get(1))); > >> } > >> > >> > >> } > >> > >> > >> On Thu, Jun 28, 2012 at 11:28 PM, Josh Wills <[email protected]> > wrote: > >>> Gabriel, > >>> > >>> Generally agree with your line of thought-- where is the attached test > case? > >>> > >>> J > >>> > >>> On Thu, Jun 28, 2012 at 2:11 PM, Gabriel Reid <[email protected]> > wrote: > >>>> Hi guys, > >>>> > >>>> As you may have seen, the topic of the PTable#collectValues method > came up today in the user mailing list. I hadn't been aware of this method > before, and when I took a closer look I saw that it just creates a > Collection of values based on the incoming Iterable, without doing any kind > of a deep copy of the contents of the Iterable. As far as I can see, > something similar (i.e. holding on to values from an Iterable from a > reducer) is also done in the Join methods. > >>>> > >>>> As Christian also pointed out (and added to the documentation for > DoFn), this can be an issue, as values made available as an Iterable in a > reducer are re-used within Hadoop. > >>>> > >>>> This object re-use isn't a problem in Crunch wherever a non-identity > mapping is used between the serialization type and the PCollection type > within the PType (for example, with primitives and String). However, using > Writable types or non-mapped Avro types won't work (as shown in the > attached test case). > >>>> > >>>> I think it's definitely a problem that PTable#collectValues (and > probably some other methods) doesn't work for Writables, or in a broader > sense, that the semantics can change for the Iterable that is passed in > when processing a grouped table. > >>>> > >>>> One really easy (but also inefficient) way we could solve this would > be to not use an IdentityFn as the default mapping function in Writables > and AvroType, and instead use a MapFn that does a deep copy of the object > (i.e. by serializing and deserializing itself in memory). This is of course > a pretty big overhead for a something that isn't necessary in a lot of > cases. > >>>> > >>>> Another option I was considering was to do something like making the > input and output PTypes of a DoFn available to the DoFn, and adding a > createDetachedValue method (or something similar) to PType, which would > then serialize and deserialize objects in order to make a clone if > necessary. With this approach, the clone method would have to be called > within the collectValues method (or any other method that is holding on to > values outside of the iterator). > >>>> > >>>> I prefer the second approach, as it avoids the the waste of extra > cloning/serialization while still making it possible to get detached values > out of an Iterable. > >>>> > >>>> Does anyone else have any thoughts on this? > >>>> > >>>> - Gabriel > >>>> > >>> > >>> > >>> > >>> -- > >>> Director of Data Science > >>> Cloudera > >>> Twitter: @josh_wills >
