Thanks. I think the core issue here is that there is an assumption in
MapReduce (and in Crunch) that once the Iterable<V> are processed in a
reduce step, they are now gone, and cannot be processed again, which is why
passing along the PType for the grouped table type didn't make sense as
part of the processing pipeline-- you couldn't serialize the Iterable<V> to
disk, and so it was only possible to process the output of a groupByKey
operation once. Updating that so that the Iterable<V> could be processed
multiple times in a single job necessitates including spillable collections.

J


On Tue, Dec 11, 2012 at 1:26 PM, Jonathan Natkins <[email protected]>wrote:

> Cool, JIRA filed: https://issues.apache.org/jira/browse/CRUNCH-129
>
>
>
> On Tue, Dec 11, 2012 at 1:21 PM, Josh Wills <[email protected]> wrote:
>
>> No, you're not-- I think that's a bug. Switching to "parallelDo" instead
>> of "by" on the groupedData object will work fine, but we should make sure
>> the by() operation works on grouped tables.
>>
>>
>> On Tue, Dec 11, 2012 at 1:18 PM, Jonathan Natkins <[email protected]>wrote:
>>
>>> Alright, I'm back for more. This time, I'm trying to perform a group by
>>> with Avro data. What I've currently got is this:
>>>
>>>     PGroupedTable<String, MyAvroObject> processedData =
>>> data.parallelDo(new DoFn<String, Pair<String, MyAvroObject>>() {
>>>       public void process(String line, Emitter<Pair<String, MyAvroObject>>
>>> emitter) {
>>>         String key = getKey(line);
>>>         MyAvroObject value = convertToAvroObject(line);
>>>         emitter.emit(Pair.of(key, value));
>>>       }
>>>     }, Avros.tableOf(Avros.strings(),
>>> Avros.specifics(MyAvroObject.class)))
>>>     .groupByKey(3);
>>>
>>>     PTable<MyAvroGroup, Pair<String, Iterable<MyAvroObject>>>
>>> groupedData =
>>>         processedData.by(new MapFn<Pair<String, Iterable<MyAvroObject>>,
>>> MyAvroGroup>() {
>>>             @Override
>>>             public MyAvroGroup map(Pair<String, Iterable<MyAvroObject>>
>>> input) {
>>>               MyAvroGroup group = new MyAvroGroup();
>>>               group.objects = Lists.<MyAvroObject>newArrayList();
>>>
>>>               for (MyAvroObject obj : input.second()) {
>>>                 group.objects.add(obj);
>>>               }
>>>
>>>               return group;
>>>             }
>>>           },
>>>           Avros.specifics(MyAvroGroup.class));
>>>
>>> I think this is all pretty sane, but I'm getting an exception when the
>>> pipeline attempts to run the by():
>>>
>>> 12/12/10 14:11:07 WARN util.NativeCodeLoader: Unable to load
>>> native-hadoop library for your platform... using builtin-java classes where
>>> applicable
>>> Exception in thread "main" java.lang.ClassCastException:
>>> org.apache.crunch.types.avro.AvroGroupedTableType cannot be cast to
>>> org.apache.crunch.types.avro.AvroType
>>>     at org.apache.crunch.types.avro.Avros.tableOf(Avros.java:608)
>>>     at
>>> org.apache.crunch.types.avro.AvroTypeFamily.tableOf(AvroTypeFamily.java:135)
>>>     at org.apache.crunch.impl.mem.collect.MemCollection.by
>>> (MemCollection.java:222)
>>>
>>> Am I doing something obviously wrong?
>>>
>>> Thanks,
>>> Natty
>>>
>>>
>>>
>>>
>>> On Fri, Dec 7, 2012 at 10:58 AM, Jonathan Natkins <[email protected]>wrote:
>>>
>>>> To bring things full circle, the core issue I was having was caused by
>>>> the fact that I was writing the data in the wrong way. Instead of
>>>>
>>>> pipeline.writeTextFile(words, args[1]);
>>>>
>>>> I should have been using
>>>>
>>>> pipeline.write(words, To.avroFile(args[1]);
>>>>
>>>> As Josh noted, writeTextFile was attempting to write my data out as a
>>>> String, but I wasn't giving it an object that was easy to turn into a
>>>> String, which resulted in an exception. Changing it to write to an avro
>>>> file solved those issues.
>>>>
>>>> Thanks, Josh!
>>>>
>>>>
>>>>
>>>> On Fri, Dec 7, 2012 at 10:26 AM, Josh Wills <[email protected]>wrote:
>>>>
>>>>> Hey Natty,
>>>>>
>>>>> Reply inlined.
>>>>>
>>>>>
>>>>>  On Fri, Dec 7, 2012 at 10:06 AM, Jonathan Natkins <[email protected]
>>>>> > wrote:
>>>>>
>>>>>> Hey Josh,
>>>>>>
>>>>>> That really doesn't solve the problem I'm facing. Avros.specifics
>>>>>> assumes that I've got a Java file that Avro generated for me, which I 
>>>>>> don't
>>>>>> have. I can certainly go through the trouble of getting that file, but 
>>>>>> what
>>>>>> I've got currently is a POJO that I'm associating with a JSON Avro 
>>>>>> schema.
>>>>>> It's a perfectly valid use case, and as far as I can tell, from what's
>>>>>> provided by the Avros utility class, it should be supported. So here's my
>>>>>> question:
>>>>>>
>>>>>
>>>>> Interesting-- I had not hit that use case for Avro before. For a POJO,
>>>>> I would just use the reflection APIs, which are available via
>>>>> Avros.reflects.
>>>>>
>>>>>
>>>>>>
>>>>>> Is the Avros.generics issue a bug? It seems to me that the T of
>>>>>> PType<T> has to implement Writable, and in the case of the return type of
>>>>>> Avros.generics, this is not the case.
>>>>>>
>>>>>
>>>>> There's no requirement for the PType<T> to be a Writable, or even an
>>>>> Avro instance. There's stuff like o.a.c.types.PTypes.derived that lets you
>>>>> create PType<T> that depend on other PTypes, which is how Crunch handles
>>>>> things like protocol buffers/thrift/jackson-style object serializations.
>>>>>
>>>>> I'm just taking a closer look at the Exception that was thrown, and it
>>>>> looks to me like the problem is occurring at the end of the pipeline, 
>>>>> where
>>>>> you're calling pipeline.writeTextFile (not included in the code snippet
>>>>> posted). Crunch has to convert the PType to something that can be 
>>>>> converted
>>>>> to a Writable impl-- if you try to write an Avro object to the
>>>>> TextOutputFormat, it gets written as AvroWrapper@feedbeef. It looks
>>>>> to me that in this case, Crunch can't figure out how to turn MyAvroObject
>>>>> into a Writable instance for writing to the TextOuputFormat.
>>>>>
>>>>>
>>>>>> If it's a bug, then fine, I'll file a JIRA and jump through whatever
>>>>>> necessary hoops exist.
>>>>>>
>>>>>
>>>>> One way to fix this would be to update writeTextFile to force
>>>>> conversion of any non-string that was passed into it into a String via an
>>>>> auxiliary MapFn-- I'm not sure why I didn't do that in the first place.
>>>>> What do you think?
>>>>>
>>>>>
>>>>>> Thanks,
>>>>>> Natty
>>>>>>
>>>>>>
>>>>>> On Thu, Dec 6, 2012 at 6:08 PM, Josh Wills <[email protected]>wrote:
>>>>>>
>>>>>>> Did you look at Avros.specifics?
>>>>>>> On Dec 6, 2012 5:57 PM, "Jonathan Natkins" <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Ok, I'm still a little confused. Let's say I use Avros.generics(),
>>>>>>>> and then I modify my code to use GenericData.Records. Those Records 
>>>>>>>> still
>>>>>>>> don't implement the Writable interface, so I'm still getting a class 
>>>>>>>> cast
>>>>>>>> exception. Did I do something totally wrong?
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Dec 6, 2012 at 5:19 PM, Jonathan Natkins <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> Well, the problem with that is that I really want to work with my
>>>>>>>>> objects, rather than use Avros.generics, because then I'm forced to 
>>>>>>>>> treat
>>>>>>>>> everything as a GenericData.Record. It's just a pain in the butt.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Dec 6, 2012 at 5:17 PM, Josh Wills 
>>>>>>>>> <[email protected]>wrote:
>>>>>>>>>
>>>>>>>>>> You don't want to create an AvroType yourself, you want to call
>>>>>>>>>> o.a.c.types.avro.Avros.records or one of its friends and pass it a 
>>>>>>>>>> Class
>>>>>>>>>> object.
>>>>>>>>>>
>>>>>>>>>> Interesting though, I would still want that case to work
>>>>>>>>>> correctly.
>>>>>>>>>>
>>>>>>>>>> Josh
>>>>>>>>>> On Dec 6, 2012 5:14 PM, "Jonathan Natkins" <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> So I've been futzing with Crunch a bit, and trying to understand
>>>>>>>>>>> how to build a pipeline that outputs Avro data files. Roughly, I'm 
>>>>>>>>>>> doing
>>>>>>>>>>> something along these lines:
>>>>>>>>>>>
>>>>>>>>>>>     Schema.Parser schemaParser = new Schema.Parser();
>>>>>>>>>>>     final Schema avroObjSchema = schemaParser.parse(
>>>>>>>>>>> schemaJsonString);
>>>>>>>>>>>
>>>>>>>>>>>     AvroType avroType = new
>>>>>>>>>>> AvroType<MyAvroObject>(MyAvroObject.class,
>>>>>>>>>>>         avroObjSchema, new
>>>>>>>>>>> AvroDeepCopier.AvroReflectDeepCopier<MyAvroObject>(
>>>>>>>>>>>         MyAvroObject.class, avroObjSchema));
>>>>>>>>>>>
>>>>>>>>>>>     PCollection<MyAvroObject> words = logs.parallelDo(new
>>>>>>>>>>> DoFn<String, MyAvroObject>() {
>>>>>>>>>>>       public void process(String line, Emitter<MyAvroObject>
>>>>>>>>>>> emitter) {
>>>>>>>>>>>         emitter.emit(convertStringToAvroObj(line));
>>>>>>>>>>>       }
>>>>>>>>>>>     }, avroType);
>>>>>>>>>>>
>>>>>>>>>>> However, this results in a class cast exception:
>>>>>>>>>>>
>>>>>>>>>>> Exception in thread "main" java.lang.ClassCastException: class
>>>>>>>>>>> com.company.MyAvroObject
>>>>>>>>>>>     at java.lang.Class.asSubclass(Class.java:3039)
>>>>>>>>>>>     at
>>>>>>>>>>> org.apache.crunch.types.writable.Writables.records(Writables.java:250)
>>>>>>>>>>>     at
>>>>>>>>>>> org.apache.crunch.types.writable.WritableTypeFamily.records(WritableTypeFamily.java:86)
>>>>>>>>>>>     at
>>>>>>>>>>> org.apache.crunch.types.PTypeUtils.convert(PTypeUtils.java:61)
>>>>>>>>>>>     at org.apache.crunch.types.writable.WritableTypeFamily.as
>>>>>>>>>>> (WritableTypeFamily.java:135)
>>>>>>>>>>>     at
>>>>>>>>>>> org.apache.crunch.impl.mr.MRPipeline.writeTextFile(MRPipeline.java:319)
>>>>>>>>>>>
>>>>>>>>>>> Anybody have any thoughts? There's got to be a magical
>>>>>>>>>>> incantation that I have slightly off.
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Director of Data Science
>>>>> Cloudera <http://www.cloudera.com>
>>>>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>>>>
>>>>>
>>>>
>>>
>>
>>
>> --
>> Director of Data Science
>> Cloudera <http://www.cloudera.com>
>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>
>>
>


-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

Reply via email to