No, you're not-- I think that's a bug. Switching to "parallelDo" instead of
"by" on the groupedData object will work fine, but we should make sure the
by() operation works on grouped tables.


On Tue, Dec 11, 2012 at 1:18 PM, Jonathan Natkins <[email protected]>wrote:

> Alright, I'm back for more. This time, I'm trying to perform a group by
> with Avro data. What I've currently got is this:
>
>     PGroupedTable<String, MyAvroObject> processedData =
> data.parallelDo(new DoFn<String, Pair<String, MyAvroObject>>() {
>       public void process(String line, Emitter<Pair<String, MyAvroObject>>
> emitter) {
>         String key = getKey(line);
>         MyAvroObject value = convertToAvroObject(line);
>         emitter.emit(Pair.of(key, value));
>       }
>     }, Avros.tableOf(Avros.strings(), Avros.specifics(MyAvroObject.class)))
>     .groupByKey(3);
>
>     PTable<MyAvroGroup, Pair<String, Iterable<MyAvroObject>>> groupedData =
>         processedData.by(new MapFn<Pair<String, Iterable<MyAvroObject>>,
> MyAvroGroup>() {
>             @Override
>             public MyAvroGroup map(Pair<String, Iterable<MyAvroObject>>
> input) {
>               MyAvroGroup group = new MyAvroGroup();
>               group.objects = Lists.<MyAvroObject>newArrayList();
>
>               for (MyAvroObject obj : input.second()) {
>                 group.objects.add(obj);
>               }
>
>               return group;
>             }
>           },
>           Avros.specifics(MyAvroGroup.class));
>
> I think this is all pretty sane, but I'm getting an exception when the
> pipeline attempts to run the by():
>
> 12/12/10 14:11:07 WARN util.NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
> Exception in thread "main" java.lang.ClassCastException:
> org.apache.crunch.types.avro.AvroGroupedTableType cannot be cast to
> org.apache.crunch.types.avro.AvroType
>     at org.apache.crunch.types.avro.Avros.tableOf(Avros.java:608)
>     at
> org.apache.crunch.types.avro.AvroTypeFamily.tableOf(AvroTypeFamily.java:135)
>     at org.apache.crunch.impl.mem.collect.MemCollection.by
> (MemCollection.java:222)
>
> Am I doing something obviously wrong?
>
> Thanks,
> Natty
>
>
>
>
> On Fri, Dec 7, 2012 at 10:58 AM, Jonathan Natkins <[email protected]>wrote:
>
>> To bring things full circle, the core issue I was having was caused by
>> the fact that I was writing the data in the wrong way. Instead of
>>
>> pipeline.writeTextFile(words, args[1]);
>>
>> I should have been using
>>
>> pipeline.write(words, To.avroFile(args[1]);
>>
>> As Josh noted, writeTextFile was attempting to write my data out as a
>> String, but I wasn't giving it an object that was easy to turn into a
>> String, which resulted in an exception. Changing it to write to an avro
>> file solved those issues.
>>
>> Thanks, Josh!
>>
>>
>>
>> On Fri, Dec 7, 2012 at 10:26 AM, Josh Wills <[email protected]> wrote:
>>
>>> Hey Natty,
>>>
>>> Reply inlined.
>>>
>>>
>>>  On Fri, Dec 7, 2012 at 10:06 AM, Jonathan Natkins 
>>> <[email protected]>wrote:
>>>
>>>> Hey Josh,
>>>>
>>>> That really doesn't solve the problem I'm facing. Avros.specifics
>>>> assumes that I've got a Java file that Avro generated for me, which I don't
>>>> have. I can certainly go through the trouble of getting that file, but what
>>>> I've got currently is a POJO that I'm associating with a JSON Avro schema.
>>>> It's a perfectly valid use case, and as far as I can tell, from what's
>>>> provided by the Avros utility class, it should be supported. So here's my
>>>> question:
>>>>
>>>
>>> Interesting-- I had not hit that use case for Avro before. For a POJO, I
>>> would just use the reflection APIs, which are available via Avros.reflects.
>>>
>>>
>>>>
>>>> Is the Avros.generics issue a bug? It seems to me that the T of
>>>> PType<T> has to implement Writable, and in the case of the return type of
>>>> Avros.generics, this is not the case.
>>>>
>>>
>>> There's no requirement for the PType<T> to be a Writable, or even an
>>> Avro instance. There's stuff like o.a.c.types.PTypes.derived that lets you
>>> create PType<T> that depend on other PTypes, which is how Crunch handles
>>> things like protocol buffers/thrift/jackson-style object serializations.
>>>
>>> I'm just taking a closer look at the Exception that was thrown, and it
>>> looks to me like the problem is occurring at the end of the pipeline, where
>>> you're calling pipeline.writeTextFile (not included in the code snippet
>>> posted). Crunch has to convert the PType to something that can be converted
>>> to a Writable impl-- if you try to write an Avro object to the
>>> TextOutputFormat, it gets written as AvroWrapper@feedbeef. It looks to
>>> me that in this case, Crunch can't figure out how to turn MyAvroObject into
>>> a Writable instance for writing to the TextOuputFormat.
>>>
>>>
>>>> If it's a bug, then fine, I'll file a JIRA and jump through whatever
>>>> necessary hoops exist.
>>>>
>>>
>>> One way to fix this would be to update writeTextFile to force conversion
>>> of any non-string that was passed into it into a String via an auxiliary
>>> MapFn-- I'm not sure why I didn't do that in the first place. What do you
>>> think?
>>>
>>>
>>>> Thanks,
>>>> Natty
>>>>
>>>>
>>>> On Thu, Dec 6, 2012 at 6:08 PM, Josh Wills <[email protected]>wrote:
>>>>
>>>>> Did you look at Avros.specifics?
>>>>> On Dec 6, 2012 5:57 PM, "Jonathan Natkins" <[email protected]> wrote:
>>>>>
>>>>>> Ok, I'm still a little confused. Let's say I use Avros.generics(),
>>>>>> and then I modify my code to use GenericData.Records. Those Records still
>>>>>> don't implement the Writable interface, so I'm still getting a class cast
>>>>>> exception. Did I do something totally wrong?
>>>>>>
>>>>>>
>>>>>> On Thu, Dec 6, 2012 at 5:19 PM, Jonathan Natkins 
>>>>>> <[email protected]>wrote:
>>>>>>
>>>>>>> Well, the problem with that is that I really want to work with my
>>>>>>> objects, rather than use Avros.generics, because then I'm forced to 
>>>>>>> treat
>>>>>>> everything as a GenericData.Record. It's just a pain in the butt.
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Dec 6, 2012 at 5:17 PM, Josh Wills <[email protected]>wrote:
>>>>>>>
>>>>>>>> You don't want to create an AvroType yourself, you want to call
>>>>>>>> o.a.c.types.avro.Avros.records or one of its friends and pass it a 
>>>>>>>> Class
>>>>>>>> object.
>>>>>>>>
>>>>>>>> Interesting though, I would still want that case to work correctly.
>>>>>>>>
>>>>>>>> Josh
>>>>>>>> On Dec 6, 2012 5:14 PM, "Jonathan Natkins" <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> So I've been futzing with Crunch a bit, and trying to understand
>>>>>>>>> how to build a pipeline that outputs Avro data files. Roughly, I'm 
>>>>>>>>> doing
>>>>>>>>> something along these lines:
>>>>>>>>>
>>>>>>>>>     Schema.Parser schemaParser = new Schema.Parser();
>>>>>>>>>     final Schema avroObjSchema = schemaParser.parse(
>>>>>>>>> schemaJsonString);
>>>>>>>>>
>>>>>>>>>     AvroType avroType = new
>>>>>>>>> AvroType<MyAvroObject>(MyAvroObject.class,
>>>>>>>>>         avroObjSchema, new
>>>>>>>>> AvroDeepCopier.AvroReflectDeepCopier<MyAvroObject>(
>>>>>>>>>         MyAvroObject.class, avroObjSchema));
>>>>>>>>>
>>>>>>>>>     PCollection<MyAvroObject> words = logs.parallelDo(new
>>>>>>>>> DoFn<String, MyAvroObject>() {
>>>>>>>>>       public void process(String line, Emitter<MyAvroObject>
>>>>>>>>> emitter) {
>>>>>>>>>         emitter.emit(convertStringToAvroObj(line));
>>>>>>>>>       }
>>>>>>>>>     }, avroType);
>>>>>>>>>
>>>>>>>>> However, this results in a class cast exception:
>>>>>>>>>
>>>>>>>>> Exception in thread "main" java.lang.ClassCastException: class
>>>>>>>>> com.company.MyAvroObject
>>>>>>>>>     at java.lang.Class.asSubclass(Class.java:3039)
>>>>>>>>>     at
>>>>>>>>> org.apache.crunch.types.writable.Writables.records(Writables.java:250)
>>>>>>>>>     at
>>>>>>>>> org.apache.crunch.types.writable.WritableTypeFamily.records(WritableTypeFamily.java:86)
>>>>>>>>>     at
>>>>>>>>> org.apache.crunch.types.PTypeUtils.convert(PTypeUtils.java:61)
>>>>>>>>>     at org.apache.crunch.types.writable.WritableTypeFamily.as
>>>>>>>>> (WritableTypeFamily.java:135)
>>>>>>>>>     at
>>>>>>>>> org.apache.crunch.impl.mr.MRPipeline.writeTextFile(MRPipeline.java:319)
>>>>>>>>>
>>>>>>>>> Anybody have any thoughts? There's got to be a magical incantation
>>>>>>>>> that I have slightly off.
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>>
>>>
>>> --
>>> Director of Data Science
>>> Cloudera <http://www.cloudera.com>
>>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>>
>>>
>>
>


-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

Reply via email to