Yep, that's an interesting one as well. I'll fix the value re-use
issue with the change to PType if that sounds ok to you (or anyone
else), and I'll take a look at what can be done about this as well if
possible.

- Gabriel

On Thu, Jun 28, 2012 at 11:41 PM, Josh Wills <[email protected]> wrote:
> Yeah, that's no good. I had a similar case w/a project my intern was
> working on, where he created a PType that was:
>
> PType<Pair<K, V>> kv = pairs(strings(), strings());
> ...
> tableOf(kv, kv);
>
> ... which also fails because the output mapfn returned by kv is
> stateful. Easy to remedy it in the code, but confusing for the user in
> the same way. It would be nice for the PType to know if their MapFns
> were stateful so that a new instance of them could be returned each
> time PType.getInputMapFn or getOutputMapFn was called.
>
> On Thu, Jun 28, 2012 at 2:32 PM, Gabriel Reid <[email protected]> wrote:
>> Hmm, strange...according to my mail client the attachment was in
>> there. Anyhow, I've pasted it inline below:
>>
>> package com.cloudera.crunch;
>>
>> import static org.junit.Assert.assertEquals;
>>
>> import java.io.DataInput;
>> import java.io.DataOutput;
>> import java.io.IOException;
>> import java.io.Serializable;
>> import java.util.Collection;
>> import java.util.Collections;
>> import java.util.List;
>> import java.util.Map;
>>
>> import org.apache.hadoop.io.Writable;
>> import org.junit.Test;
>>
>> import com.cloudera.crunch.impl.mr.MRPipeline;
>> import com.cloudera.crunch.test.FileHelper;
>> import com.cloudera.crunch.types.writable.Writables;
>> import com.google.common.collect.Lists;
>>
>> public class SerializationReducerTest implements Serializable {
>>
>>  public static class SimpleStringWritable implements Writable {
>>
>>    private String value;
>>
>>    public void setValue(String value) {
>>      this.value = value;
>>    }
>>
>>    public String getValue() {
>>      return value;
>>    }
>>
>>    @Override
>>    public String toString() {
>>      return String.format("SimpleStringWritable(%s)", value);
>>    }
>>
>>    @Override
>>    public void write(DataOutput out) throws IOException {
>>      out.writeUTF(value);
>>    }
>>
>>    @Override
>>    public void readFields(DataInput in) throws IOException {
>>      this.value = in.readUTF();
>>    }
>>
>>  }
>>
>>  static SimpleStringWritable asSimple(String value) {
>>    SimpleStringWritable simpleStringWritable = new SimpleStringWritable();
>>    simpleStringWritable.setValue(value);
>>    return simpleStringWritable;
>>  }
>>
>>  static List<String> simplesToList(Collection<SimpleStringWritable>
>> simpleCollection) {
>>    List<String> stringList = Lists.newArrayList();
>>    for (SimpleStringWritable writable : simpleCollection) {
>>      stringList.add(writable.getValue());
>>    }
>>    Collections.sort(stringList);
>>    return stringList;
>>  }
>>
>>  @Test
>>  public void testWritables() throws IOException {
>>    Pipeline pipeline = new MRPipeline(SerializationReducerTest.class);
>>    Map<Integer, Collection<SimpleStringWritable>> collectionMap = pipeline
>>        .readTextFile(FileHelper.createTempCopyOf("set1.txt"))
>>        .parallelDo(new MapFn<String, Pair<Integer, SimpleStringWritable>>() {
>>
>>          @Override
>>          public Pair<Integer, SimpleStringWritable> map(String input) {
>>            return Pair.of(1, asSimple(input));
>>          }
>>
>>        }, Writables.tableOf(Writables.ints(),
>> Writables.writables(SimpleStringWritable.class))
>>
>>        ).collectValues().materializeToMap();
>>
>>    assertEquals(1, collectionMap.size());
>>
>>    // The actual content will just be ["e", "e", "e", "e"]
>>    assertEquals(Lists.newArrayList("a", "b", "c", "e"),
>> simplesToList(collectionMap.get(1)));
>>  }
>>
>>
>> }
>>
>>
>> On Thu, Jun 28, 2012 at 11:28 PM, Josh Wills <[email protected]> wrote:
>>> Gabriel,
>>>
>>> Generally agree with your line of thought-- where is the attached test case?
>>>
>>> J
>>>
>>> On Thu, Jun 28, 2012 at 2:11 PM, Gabriel Reid <[email protected]> 
>>> wrote:
>>>> Hi guys,
>>>>
>>>> As you may have seen, the topic of the PTable#collectValues method came up 
>>>> today in the user mailing list. I hadn't been aware of this method before, 
>>>> and when I took a closer look I saw that it just creates a Collection of 
>>>> values based on the incoming Iterable, without doing any kind of a deep 
>>>> copy of the contents of the Iterable. As far as I can see, something 
>>>> similar (i.e. holding on to values from an Iterable from a reducer) is 
>>>> also done in the Join methods.
>>>>
>>>> As Christian also pointed out (and added to the documentation for DoFn), 
>>>> this can be an issue, as values made available as an Iterable in a reducer 
>>>> are re-used within Hadoop.
>>>>
>>>> This object re-use isn't a problem in Crunch wherever a non-identity 
>>>> mapping is used between the serialization type and the PCollection type 
>>>> within the PType (for example, with primitives and String). However, using 
>>>> Writable types or non-mapped Avro types won't work (as shown in the 
>>>> attached test case).
>>>>
>>>> I think it's definitely a problem that PTable#collectValues (and probably 
>>>> some other methods) doesn't work for Writables, or in a broader sense, 
>>>> that the semantics can change for the Iterable that is passed in when 
>>>> processing a grouped table.
>>>>
>>>> One really easy (but also inefficient) way we could solve this would be to 
>>>> not use an IdentityFn as the default mapping function in Writables and 
>>>> AvroType, and instead use a MapFn that does a deep copy of the object 
>>>> (i.e. by serializing and deserializing itself in memory). This is of 
>>>> course a pretty big overhead for a something that isn't necessary in a lot 
>>>> of cases.
>>>>
>>>> Another option I was considering was to do something like making the input 
>>>> and output PTypes of a DoFn available to the DoFn, and adding a 
>>>> createDetachedValue method (or something similar) to PType, which would 
>>>> then serialize and deserialize objects in order to make a clone if 
>>>> necessary. With this approach, the clone method would have to be called 
>>>> within the collectValues method (or any other method that is holding on to 
>>>> values outside of the iterator).
>>>>
>>>> I prefer the second approach, as it avoids the the waste of extra 
>>>> cloning/serialization while still making it possible to get detached 
>>>> values out of an Iterable.
>>>>
>>>> Does anyone else have any thoughts on this?
>>>>
>>>> - Gabriel
>>>>
>>>
>>>
>>>
>>> --
>>> Director of Data Science
>>> Cloudera
>>> Twitter: @josh_wills

Reply via email to