Yep, that's an interesting one as well. I'll fix the value re-use issue with the change to PType if that sounds ok to you (or anyone else), and I'll take a look at what can be done about this as well if possible.
- Gabriel On Thu, Jun 28, 2012 at 11:41 PM, Josh Wills <[email protected]> wrote: > Yeah, that's no good. I had a similar case w/a project my intern was > working on, where he created a PType that was: > > PType<Pair<K, V>> kv = pairs(strings(), strings()); > ... > tableOf(kv, kv); > > ... which also fails because the output mapfn returned by kv is > stateful. Easy to remedy it in the code, but confusing for the user in > the same way. It would be nice for the PType to know if their MapFns > were stateful so that a new instance of them could be returned each > time PType.getInputMapFn or getOutputMapFn was called. > > On Thu, Jun 28, 2012 at 2:32 PM, Gabriel Reid <[email protected]> wrote: >> Hmm, strange...according to my mail client the attachment was in >> there. Anyhow, I've pasted it inline below: >> >> package com.cloudera.crunch; >> >> import static org.junit.Assert.assertEquals; >> >> import java.io.DataInput; >> import java.io.DataOutput; >> import java.io.IOException; >> import java.io.Serializable; >> import java.util.Collection; >> import java.util.Collections; >> import java.util.List; >> import java.util.Map; >> >> import org.apache.hadoop.io.Writable; >> import org.junit.Test; >> >> import com.cloudera.crunch.impl.mr.MRPipeline; >> import com.cloudera.crunch.test.FileHelper; >> import com.cloudera.crunch.types.writable.Writables; >> import com.google.common.collect.Lists; >> >> public class SerializationReducerTest implements Serializable { >> >> public static class SimpleStringWritable implements Writable { >> >> private String value; >> >> public void setValue(String value) { >> this.value = value; >> } >> >> public String getValue() { >> return value; >> } >> >> @Override >> public String toString() { >> return String.format("SimpleStringWritable(%s)", value); >> } >> >> @Override >> public void write(DataOutput out) throws IOException { >> out.writeUTF(value); >> } >> >> @Override >> public void readFields(DataInput in) throws IOException { >> this.value = in.readUTF(); >> } >> >> } >> >> static SimpleStringWritable asSimple(String value) { >> SimpleStringWritable simpleStringWritable = new SimpleStringWritable(); >> simpleStringWritable.setValue(value); >> return simpleStringWritable; >> } >> >> static List<String> simplesToList(Collection<SimpleStringWritable> >> simpleCollection) { >> List<String> stringList = Lists.newArrayList(); >> for (SimpleStringWritable writable : simpleCollection) { >> stringList.add(writable.getValue()); >> } >> Collections.sort(stringList); >> return stringList; >> } >> >> @Test >> public void testWritables() throws IOException { >> Pipeline pipeline = new MRPipeline(SerializationReducerTest.class); >> Map<Integer, Collection<SimpleStringWritable>> collectionMap = pipeline >> .readTextFile(FileHelper.createTempCopyOf("set1.txt")) >> .parallelDo(new MapFn<String, Pair<Integer, SimpleStringWritable>>() { >> >> @Override >> public Pair<Integer, SimpleStringWritable> map(String input) { >> return Pair.of(1, asSimple(input)); >> } >> >> }, Writables.tableOf(Writables.ints(), >> Writables.writables(SimpleStringWritable.class)) >> >> ).collectValues().materializeToMap(); >> >> assertEquals(1, collectionMap.size()); >> >> // The actual content will just be ["e", "e", "e", "e"] >> assertEquals(Lists.newArrayList("a", "b", "c", "e"), >> simplesToList(collectionMap.get(1))); >> } >> >> >> } >> >> >> On Thu, Jun 28, 2012 at 11:28 PM, Josh Wills <[email protected]> wrote: >>> Gabriel, >>> >>> Generally agree with your line of thought-- where is the attached test case? >>> >>> J >>> >>> On Thu, Jun 28, 2012 at 2:11 PM, Gabriel Reid <[email protected]> >>> wrote: >>>> Hi guys, >>>> >>>> As you may have seen, the topic of the PTable#collectValues method came up >>>> today in the user mailing list. I hadn't been aware of this method before, >>>> and when I took a closer look I saw that it just creates a Collection of >>>> values based on the incoming Iterable, without doing any kind of a deep >>>> copy of the contents of the Iterable. As far as I can see, something >>>> similar (i.e. holding on to values from an Iterable from a reducer) is >>>> also done in the Join methods. >>>> >>>> As Christian also pointed out (and added to the documentation for DoFn), >>>> this can be an issue, as values made available as an Iterable in a reducer >>>> are re-used within Hadoop. >>>> >>>> This object re-use isn't a problem in Crunch wherever a non-identity >>>> mapping is used between the serialization type and the PCollection type >>>> within the PType (for example, with primitives and String). However, using >>>> Writable types or non-mapped Avro types won't work (as shown in the >>>> attached test case). >>>> >>>> I think it's definitely a problem that PTable#collectValues (and probably >>>> some other methods) doesn't work for Writables, or in a broader sense, >>>> that the semantics can change for the Iterable that is passed in when >>>> processing a grouped table. >>>> >>>> One really easy (but also inefficient) way we could solve this would be to >>>> not use an IdentityFn as the default mapping function in Writables and >>>> AvroType, and instead use a MapFn that does a deep copy of the object >>>> (i.e. by serializing and deserializing itself in memory). This is of >>>> course a pretty big overhead for a something that isn't necessary in a lot >>>> of cases. >>>> >>>> Another option I was considering was to do something like making the input >>>> and output PTypes of a DoFn available to the DoFn, and adding a >>>> createDetachedValue method (or something similar) to PType, which would >>>> then serialize and deserialize objects in order to make a clone if >>>> necessary. With this approach, the clone method would have to be called >>>> within the collectValues method (or any other method that is holding on to >>>> values outside of the iterator). >>>> >>>> I prefer the second approach, as it avoids the the waste of extra >>>> cloning/serialization while still making it possible to get detached >>>> values out of an Iterable. >>>> >>>> Does anyone else have any thoughts on this? >>>> >>>> - Gabriel >>>> >>> >>> >>> >>> -- >>> Director of Data Science >>> Cloudera >>> Twitter: @josh_wills
