Hmm, strange...according to my mail client the attachment was in
there. Anyhow, I've pasted it inline below:
package com.cloudera.crunch;
import static org.junit.Assert.assertEquals;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.io.Writable;
import org.junit.Test;
import com.cloudera.crunch.impl.mr.MRPipeline;
import com.cloudera.crunch.test.FileHelper;
import com.cloudera.crunch.types.writable.Writables;
import com.google.common.collect.Lists;
public class SerializationReducerTest implements Serializable {
public static class SimpleStringWritable implements Writable {
private String value;
public void setValue(String value) {
this.value = value;
}
public String getValue() {
return value;
}
@Override
public String toString() {
return String.format("SimpleStringWritable(%s)", value);
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(value);
}
@Override
public void readFields(DataInput in) throws IOException {
this.value = in.readUTF();
}
}
static SimpleStringWritable asSimple(String value) {
SimpleStringWritable simpleStringWritable = new SimpleStringWritable();
simpleStringWritable.setValue(value);
return simpleStringWritable;
}
static List<String> simplesToList(Collection<SimpleStringWritable>
simpleCollection) {
List<String> stringList = Lists.newArrayList();
for (SimpleStringWritable writable : simpleCollection) {
stringList.add(writable.getValue());
}
Collections.sort(stringList);
return stringList;
}
@Test
public void testWritables() throws IOException {
Pipeline pipeline = new MRPipeline(SerializationReducerTest.class);
Map<Integer, Collection<SimpleStringWritable>> collectionMap = pipeline
.readTextFile(FileHelper.createTempCopyOf("set1.txt"))
.parallelDo(new MapFn<String, Pair<Integer, SimpleStringWritable>>() {
@Override
public Pair<Integer, SimpleStringWritable> map(String input) {
return Pair.of(1, asSimple(input));
}
}, Writables.tableOf(Writables.ints(),
Writables.writables(SimpleStringWritable.class))
).collectValues().materializeToMap();
assertEquals(1, collectionMap.size());
// The actual content will just be ["e", "e", "e", "e"]
assertEquals(Lists.newArrayList("a", "b", "c", "e"),
simplesToList(collectionMap.get(1)));
}
}
On Thu, Jun 28, 2012 at 11:28 PM, Josh Wills <[email protected]> wrote:
> Gabriel,
>
> Generally agree with your line of thought-- where is the attached test case?
>
> J
>
> On Thu, Jun 28, 2012 at 2:11 PM, Gabriel Reid <[email protected]> wrote:
>> Hi guys,
>>
>> As you may have seen, the topic of the PTable#collectValues method came up
>> today in the user mailing list. I hadn't been aware of this method before,
>> and when I took a closer look I saw that it just creates a Collection of
>> values based on the incoming Iterable, without doing any kind of a deep copy
>> of the contents of the Iterable. As far as I can see, something similar
>> (i.e. holding on to values from an Iterable from a reducer) is also done in
>> the Join methods.
>>
>> As Christian also pointed out (and added to the documentation for DoFn),
>> this can be an issue, as values made available as an Iterable in a reducer
>> are re-used within Hadoop.
>>
>> This object re-use isn't a problem in Crunch wherever a non-identity mapping
>> is used between the serialization type and the PCollection type within the
>> PType (for example, with primitives and String). However, using Writable
>> types or non-mapped Avro types won't work (as shown in the attached test
>> case).
>>
>> I think it's definitely a problem that PTable#collectValues (and probably
>> some other methods) doesn't work for Writables, or in a broader sense, that
>> the semantics can change for the Iterable that is passed in when processing
>> a grouped table.
>>
>> One really easy (but also inefficient) way we could solve this would be to
>> not use an IdentityFn as the default mapping function in Writables and
>> AvroType, and instead use a MapFn that does a deep copy of the object (i.e.
>> by serializing and deserializing itself in memory). This is of course a
>> pretty big overhead for a something that isn't necessary in a lot of cases.
>>
>> Another option I was considering was to do something like making the input
>> and output PTypes of a DoFn available to the DoFn, and adding a
>> createDetachedValue method (or something similar) to PType, which would then
>> serialize and deserialize objects in order to make a clone if necessary.
>> With this approach, the clone method would have to be called within the
>> collectValues method (or any other method that is holding on to values
>> outside of the iterator).
>>
>> I prefer the second approach, as it avoids the the waste of extra
>> cloning/serialization while still making it possible to get detached values
>> out of an Iterable.
>>
>> Does anyone else have any thoughts on this?
>>
>> - Gabriel
>>
>
>
>
> --
> Director of Data Science
> Cloudera
> Twitter: @josh_wills