Alright, I'm back for more. This time, I'm trying to perform a group by
with Avro data. What I've currently got is this:
PGroupedTable<String, MyAvroObject> processedData = data.parallelDo(new
DoFn<String, Pair<String, MyAvroObject>>() {
public void process(String line, Emitter<Pair<String, MyAvroObject>>
emitter) {
String key = getKey(line);
MyAvroObject value = convertToAvroObject(line);
emitter.emit(Pair.of(key, value));
}
}, Avros.tableOf(Avros.strings(), Avros.specifics(MyAvroObject.class)))
.groupByKey(3);
PTable<MyAvroGroup, Pair<String, Iterable<MyAvroObject>>> groupedData =
processedData.by(new MapFn<Pair<String, Iterable<MyAvroObject>>,
MyAvroGroup>() {
@Override
public MyAvroGroup map(Pair<String, Iterable<MyAvroObject>>
input) {
MyAvroGroup group = new MyAvroGroup();
group.objects = Lists.<MyAvroObject>newArrayList();
for (MyAvroObject obj : input.second()) {
group.objects.add(obj);
}
return group;
}
},
Avros.specifics(MyAvroGroup.class));
I think this is all pretty sane, but I'm getting an exception when the
pipeline attempts to run the by():
12/12/10 14:11:07 WARN util.NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
Exception in thread "main" java.lang.ClassCastException:
org.apache.crunch.types.avro.AvroGroupedTableType cannot be cast to
org.apache.crunch.types.avro.AvroType
at org.apache.crunch.types.avro.Avros.tableOf(Avros.java:608)
at
org.apache.crunch.types.avro.AvroTypeFamily.tableOf(AvroTypeFamily.java:135)
at org.apache.crunch.impl.mem.collect.MemCollection.by
(MemCollection.java:222)
Am I doing something obviously wrong?
Thanks,
Natty
On Fri, Dec 7, 2012 at 10:58 AM, Jonathan Natkins <[email protected]>wrote:
> To bring things full circle, the core issue I was having was caused by the
> fact that I was writing the data in the wrong way. Instead of
>
> pipeline.writeTextFile(words, args[1]);
>
> I should have been using
>
> pipeline.write(words, To.avroFile(args[1]);
>
> As Josh noted, writeTextFile was attempting to write my data out as a
> String, but I wasn't giving it an object that was easy to turn into a
> String, which resulted in an exception. Changing it to write to an avro
> file solved those issues.
>
> Thanks, Josh!
>
>
>
> On Fri, Dec 7, 2012 at 10:26 AM, Josh Wills <[email protected]> wrote:
>
>> Hey Natty,
>>
>> Reply inlined.
>>
>>
>> On Fri, Dec 7, 2012 at 10:06 AM, Jonathan Natkins <[email protected]>wrote:
>>
>>> Hey Josh,
>>>
>>> That really doesn't solve the problem I'm facing. Avros.specifics
>>> assumes that I've got a Java file that Avro generated for me, which I don't
>>> have. I can certainly go through the trouble of getting that file, but what
>>> I've got currently is a POJO that I'm associating with a JSON Avro schema.
>>> It's a perfectly valid use case, and as far as I can tell, from what's
>>> provided by the Avros utility class, it should be supported. So here's my
>>> question:
>>>
>>
>> Interesting-- I had not hit that use case for Avro before. For a POJO, I
>> would just use the reflection APIs, which are available via Avros.reflects.
>>
>>
>>>
>>> Is the Avros.generics issue a bug? It seems to me that the T of PType<T>
>>> has to implement Writable, and in the case of the return type of
>>> Avros.generics, this is not the case.
>>>
>>
>> There's no requirement for the PType<T> to be a Writable, or even an Avro
>> instance. There's stuff like o.a.c.types.PTypes.derived that lets you
>> create PType<T> that depend on other PTypes, which is how Crunch handles
>> things like protocol buffers/thrift/jackson-style object serializations.
>>
>> I'm just taking a closer look at the Exception that was thrown, and it
>> looks to me like the problem is occurring at the end of the pipeline, where
>> you're calling pipeline.writeTextFile (not included in the code snippet
>> posted). Crunch has to convert the PType to something that can be converted
>> to a Writable impl-- if you try to write an Avro object to the
>> TextOutputFormat, it gets written as AvroWrapper@feedbeef. It looks to
>> me that in this case, Crunch can't figure out how to turn MyAvroObject into
>> a Writable instance for writing to the TextOuputFormat.
>>
>>
>>> If it's a bug, then fine, I'll file a JIRA and jump through whatever
>>> necessary hoops exist.
>>>
>>
>> One way to fix this would be to update writeTextFile to force conversion
>> of any non-string that was passed into it into a String via an auxiliary
>> MapFn-- I'm not sure why I didn't do that in the first place. What do you
>> think?
>>
>>
>>> Thanks,
>>> Natty
>>>
>>>
>>> On Thu, Dec 6, 2012 at 6:08 PM, Josh Wills <[email protected]> wrote:
>>>
>>>> Did you look at Avros.specifics?
>>>> On Dec 6, 2012 5:57 PM, "Jonathan Natkins" <[email protected]> wrote:
>>>>
>>>>> Ok, I'm still a little confused. Let's say I use Avros.generics(), and
>>>>> then I modify my code to use GenericData.Records. Those Records still
>>>>> don't
>>>>> implement the Writable interface, so I'm still getting a class cast
>>>>> exception. Did I do something totally wrong?
>>>>>
>>>>>
>>>>> On Thu, Dec 6, 2012 at 5:19 PM, Jonathan Natkins
>>>>> <[email protected]>wrote:
>>>>>
>>>>>> Well, the problem with that is that I really want to work with my
>>>>>> objects, rather than use Avros.generics, because then I'm forced to treat
>>>>>> everything as a GenericData.Record. It's just a pain in the butt.
>>>>>>
>>>>>>
>>>>>> On Thu, Dec 6, 2012 at 5:17 PM, Josh Wills <[email protected]>wrote:
>>>>>>
>>>>>>> You don't want to create an AvroType yourself, you want to call
>>>>>>> o.a.c.types.avro.Avros.records or one of its friends and pass it a Class
>>>>>>> object.
>>>>>>>
>>>>>>> Interesting though, I would still want that case to work correctly.
>>>>>>>
>>>>>>> Josh
>>>>>>> On Dec 6, 2012 5:14 PM, "Jonathan Natkins" <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> So I've been futzing with Crunch a bit, and trying to understand
>>>>>>>> how to build a pipeline that outputs Avro data files. Roughly, I'm
>>>>>>>> doing
>>>>>>>> something along these lines:
>>>>>>>>
>>>>>>>> Schema.Parser schemaParser = new Schema.Parser();
>>>>>>>> final Schema avroObjSchema = schemaParser.parse(
>>>>>>>> schemaJsonString);
>>>>>>>>
>>>>>>>> AvroType avroType = new
>>>>>>>> AvroType<MyAvroObject>(MyAvroObject.class,
>>>>>>>> avroObjSchema, new
>>>>>>>> AvroDeepCopier.AvroReflectDeepCopier<MyAvroObject>(
>>>>>>>> MyAvroObject.class, avroObjSchema));
>>>>>>>>
>>>>>>>> PCollection<MyAvroObject> words = logs.parallelDo(new
>>>>>>>> DoFn<String, MyAvroObject>() {
>>>>>>>> public void process(String line, Emitter<MyAvroObject>
>>>>>>>> emitter) {
>>>>>>>> emitter.emit(convertStringToAvroObj(line));
>>>>>>>> }
>>>>>>>> }, avroType);
>>>>>>>>
>>>>>>>> However, this results in a class cast exception:
>>>>>>>>
>>>>>>>> Exception in thread "main" java.lang.ClassCastException: class
>>>>>>>> com.company.MyAvroObject
>>>>>>>> at java.lang.Class.asSubclass(Class.java:3039)
>>>>>>>> at
>>>>>>>> org.apache.crunch.types.writable.Writables.records(Writables.java:250)
>>>>>>>> at
>>>>>>>> org.apache.crunch.types.writable.WritableTypeFamily.records(WritableTypeFamily.java:86)
>>>>>>>> at
>>>>>>>> org.apache.crunch.types.PTypeUtils.convert(PTypeUtils.java:61)
>>>>>>>> at org.apache.crunch.types.writable.WritableTypeFamily.as
>>>>>>>> (WritableTypeFamily.java:135)
>>>>>>>> at
>>>>>>>> org.apache.crunch.impl.mr.MRPipeline.writeTextFile(MRPipeline.java:319)
>>>>>>>>
>>>>>>>> Anybody have any thoughts? There's got to be a magical incantation
>>>>>>>> that I have slightly off.
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>
>>
>>
>> --
>> Director of Data Science
>> Cloudera <http://www.cloudera.com>
>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>
>>
>