Hmm, strange...according to my mail client the attachment was in
there. Anyhow, I've pasted it inline below:

package com.cloudera.crunch;

import static org.junit.Assert.assertEquals;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.io.Writable;
import org.junit.Test;

import com.cloudera.crunch.impl.mr.MRPipeline;
import com.cloudera.crunch.test.FileHelper;
import com.cloudera.crunch.types.writable.Writables;
import com.google.common.collect.Lists;

public class SerializationReducerTest implements Serializable {

  public static class SimpleStringWritable implements Writable {

    private String value;

    public void setValue(String value) {
      this.value = value;
    }

    public String getValue() {
      return value;
    }

    @Override
    public String toString() {
      return String.format("SimpleStringWritable(%s)", value);
    }

    @Override
    public void write(DataOutput out) throws IOException {
      out.writeUTF(value);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
      this.value = in.readUTF();
    }

  }

  static SimpleStringWritable asSimple(String value) {
    SimpleStringWritable simpleStringWritable = new SimpleStringWritable();
    simpleStringWritable.setValue(value);
    return simpleStringWritable;
  }

  static List<String> simplesToList(Collection<SimpleStringWritable>
simpleCollection) {
    List<String> stringList = Lists.newArrayList();
    for (SimpleStringWritable writable : simpleCollection) {
      stringList.add(writable.getValue());
    }
    Collections.sort(stringList);
    return stringList;
  }

  @Test
  public void testWritables() throws IOException {
    Pipeline pipeline = new MRPipeline(SerializationReducerTest.class);
    Map<Integer, Collection<SimpleStringWritable>> collectionMap = pipeline
        .readTextFile(FileHelper.createTempCopyOf("set1.txt"))
        .parallelDo(new MapFn<String, Pair<Integer, SimpleStringWritable>>() {

          @Override
          public Pair<Integer, SimpleStringWritable> map(String input) {
            return Pair.of(1, asSimple(input));
          }

        }, Writables.tableOf(Writables.ints(),
Writables.writables(SimpleStringWritable.class))

        ).collectValues().materializeToMap();

    assertEquals(1, collectionMap.size());

    // The actual content will just be ["e", "e", "e", "e"]
    assertEquals(Lists.newArrayList("a", "b", "c", "e"),
simplesToList(collectionMap.get(1)));
  }


}


On Thu, Jun 28, 2012 at 11:28 PM, Josh Wills <[email protected]> wrote:
> Gabriel,
>
> Generally agree with your line of thought-- where is the attached test case?
>
> J
>
> On Thu, Jun 28, 2012 at 2:11 PM, Gabriel Reid <[email protected]> wrote:
>> Hi guys,
>>
>> As you may have seen, the topic of the PTable#collectValues method came up 
>> today in the user mailing list. I hadn't been aware of this method before, 
>> and when I took a closer look I saw that it just creates a Collection of 
>> values based on the incoming Iterable, without doing any kind of a deep copy 
>> of the contents of the Iterable. As far as I can see, something similar 
>> (i.e. holding on to values from an Iterable from a reducer) is also done in 
>> the Join methods.
>>
>> As Christian also pointed out (and added to the documentation for DoFn), 
>> this can be an issue, as values made available as an Iterable in a reducer 
>> are re-used within Hadoop.
>>
>> This object re-use isn't a problem in Crunch wherever a non-identity mapping 
>> is used between the serialization type and the PCollection type within the 
>> PType (for example, with primitives and String). However, using Writable 
>> types or non-mapped Avro types won't work (as shown in the attached test 
>> case).
>>
>> I think it's definitely a problem that PTable#collectValues (and probably 
>> some other methods) doesn't work for Writables, or in a broader sense, that 
>> the semantics can change for the Iterable that is passed in when processing 
>> a grouped table.
>>
>> One really easy (but also inefficient) way we could solve this would be to 
>> not use an IdentityFn as the default mapping function in Writables and 
>> AvroType, and instead use a MapFn that does a deep copy of the object (i.e. 
>> by serializing and deserializing itself in memory). This is of course a 
>> pretty big overhead for a something that isn't necessary in a lot of cases.
>>
>> Another option I was considering was to do something like making the input 
>> and output PTypes of a DoFn available to the DoFn, and adding a 
>> createDetachedValue method (or something similar) to PType, which would then 
>> serialize and deserialize objects in order to make a clone if necessary. 
>> With this approach, the clone method would have to be called within the 
>> collectValues method (or any other method that is holding on to values 
>> outside of the iterator).
>>
>> I prefer the second approach, as it avoids the the waste of extra 
>> cloning/serialization while still making it possible to get detached values 
>> out of an Iterable.
>>
>> Does anyone else have any thoughts on this?
>>
>> - Gabriel
>>
>
>
>
> --
> Director of Data Science
> Cloudera
> Twitter: @josh_wills

Reply via email to