Didn't seem to work.
I tried output.inputConf("crunch.avro.mode",
"SPECIFIC");output.outputConf("crunch.avro.mode", "SPECIFIC");
as well to no avail.
From: [email protected]
Date: Fri, 29 Aug 2014 13:52:35 -0700
Subject: Re: Trouble with Avro records
To: [email protected]
Hey Danny,
So I suspect the problem is that the AvroMode info isn't getting propagated to
the ParquetFileSourceTarget. The simplest way to verify the problem is not as
simple as it should be, but I'd like you to try something like this:
SourceTarget<Log> output = new AvroParquetFileSourceTarget<Log>(new Path(path),
Avros.specifics(Log.class));
output.conf("crunch.avro.mode", "SPECIFIC");
...and let me know if that fixes the problem. If so, I can file a JIRA to fix
it properly.
J
On Fri, Aug 29, 2014 at 11:15 AM, Josh Wills <[email protected]> wrote:
Hey Danny,
I'll take a look at it later today, kind of a crazy AM for me.
J
On Fri, Aug 29, 2014 at 9:11 AM, Danny Morgan <[email protected]> wrote:
Okay looks like the AvroMode.setSpecificClassLoader() fix works for Avro files
but doesn't work for Parquet Avro files.
// This works greatPipeline p1 = new MRPipeline(MyLogs.class, getConf());
PCollections logs = ....;SourceTarget<Log> output = At.avroFile(path,
Avros.specifics(Log.class))p1.write(logs, output,
WriteMode.OVERWRITE);p1.done();
Pipeline p2 = new MRPipeline(MyLogs.class,
getConf());AvroMode.setSpecificClassLoader(Log.class.getClassLoader());PCollection<Log>
logs2 = p2.read(out);p2.done();
//This fails with java.lang.ClassCastException:
org.apache.avro.generic.GenericData$Record cannot be cast to
com.test.LogPipeline p1 = new MRPipeline(MyLogs.class, getConf());
PCollections logs = ....;SourceTarget<Log> output = At.avroFile(path,
Avros.specifics(Log.class))SourceTarget<Log> output = new
AvroParquetFileSourceTarget<Log>(new Path(path), Avros.specifics(Log.class));
p1.write(logs, output, WriteMode.OVERWRITE);p1.done();
Pipeline p2 = new MRPipeline(MyLogs.class,
getConf());AvroMode.setSpecificClassLoader(Log.class.getClassLoader());
PCollection<Log> logs2 = p2.read(out);p2.done();
Any idea?
Thanks!
From: [email protected]
Date: Thu, 21 Aug 2014 13:54:56 -0700
Subject: Re: Trouble with Avro records
To: [email protected]
On Thu, Aug 21, 2014 at 1:46 PM, Danny Morgan <[email protected]> wrote:
Thanks Josh, if it is any consolation Crunch has saved me countless hours and
cpu cycles over how we used to do things so I'm glad to be benefiting from your
ill-advised decisions :-)
That is always nice to hear-- thanks!
Danny
From: [email protected]
Date: Thu, 21 Aug 2014 13:33:40 -0700
Subject: Re: Trouble with Avro records
To: [email protected]
On Thu, Aug 21, 2014 at 1:28 PM, Danny Morgan <[email protected]> wrote:
Awesome adding:
AvroMode.setSpecificClassLoader(Visit.class.getClassLoader());
Did the trick, thanks Josh!
Do you mind clarifying when to use Avros.records()
versus Avros.specifics() or Avros.reflects()?
Oops, yes-- sorry about that. The records() method is defined on both the
WritableTypeFamily and the AvroTypeFamily as a method that is supposed to
handle "arbitrary" data types that aren't supported by the built-in POJO and
Crunch Tuple methods. Fortunately/unfortunately, the arbitrary data types
depends on the backing implementation: for Writables, records() only really
supports classes that implement Writable, and for Avros, records() checks to
see if the class implements SpecificRecord (at which point it handles it using
Avros.specifics) and if it does not, passes it on to Avros.reflects.
I think that, like many thinks in Crunch, records() reflects an early and
somewhat ill-advised decision I made when I was first creating the API that I
would likely do away with if I had it to do over again. :)
J
Thanks Again!
From: [email protected]
Date: Thu, 21 Aug 2014 13:20:20 -0700
Subject: Re: Trouble with Avro records
To: [email protected]
Okay. I suspect this is the problem:
https://issues.apache.org/jira/browse/CRUNCH-442
Gabriel fixed this for the upcoming 0.11 release, but there are a couple of
workarounds in the comments. One is to put the avro jar into your job jar file,
instead of relying on the one that is in the hadoop lib. The other is to
configure AvroMode.setSpecificClassLoader w/the class loader for your Visit
class before kicking off the job.
Josh
On Thu, Aug 21, 2014 at 1:10 PM, Danny Morgan <[email protected]> wrote:
Hi Josh,
crunch-0.10.0-hadoop2
Thanks.
From: [email protected]
Date: Thu, 21 Aug 2014 13:04:29 -0700
Subject: Re: Trouble with Avro records
To: [email protected]
That feels like an AvroMode-related exception; which version of Crunch are you
using?
J
On Thu, Aug 21, 2014 at 12:45 PM, Danny Morgan <[email protected]> wrote:
Hi Guys,
Love crunch but having some trouble recently using Avro records. I think
someone needs to write a Crunch book.
I'm trying to aggregate hourly visits to a page by each user. I do one pass to
parse the records and then I try to "group by" unique users and hour and count
the number of times they visited as well as their first visit time in the hour.
Here's the Avro schema
{"namespace": "com.test",
"type": "record",
"name": "Visit",
"fields": [
{"name": "dateid", "type": "int"}, // Year Month Day Hour in
PST as an integer e.g. 2014081103
{"name": "userid", "type": "string"},
{"name": "vcount", "type": ["long", "null"]},
{"name": "firsttimestamp", "type": ["long", "null"]} // Unixtime stamp of
first visit
]
}
Here I do the parsing, at first vcount and firstvisit aren't set.
PTable<Visit, Pair<Long, Long>> visits = parsed.parallelDo("visits-parsing",
new VisitsExtractor(),
Avros.tableOf(Avros.specifics(Visit.class),
Avros.pairs(Avros.longs(), Avros.longs())));
The relevant line from VisitsExtractor:
emitter.emit(Pair.of(visit, Pair.of(1L, log.timestamp())));
Everything up to this point works fine, now I want to count up the unique
visitors and the minimum timestamp.
PTable<Visit, Pair<Long, Long>> agg =
visits.groupByKey().combineValues(Aggregators.pairAggregator(Aggregators.SUM_LONGS(),
Aggregators.MIN_LONGS()));
The above seems to work fine too, now I want to create new Visit classes and
fill in the count and minimum timestamp fields.
PCollection<Visit> count_visits = agg.parallelDo("visits-count", new
DoFn<Pair<Visit, Pair<Long, Long>>, Visit>() {
@Override
public void process(Pair<Visit, Pair<Long, Long>> p, Emitter<Visit>
emitter) {
Visit v = Visit.newBuilder(p.first()).build();
v.setVcount(p.second().first());
v.setFirsttimestamp(p.second().second());
emitter.emit(v);
}
}, Avros.specifics(Visit.class));
}
count_visits.write(To.textFile(outputPath), WriteMode.OVERWRITE);
Here's the error:
2014-08-21 15:09:26,245 ERROR run.CrunchReducer (CrunchReducer.java:reduce(54))
- Reducer exception
java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot
be cast to com.test.Visit
at com.test.Logs$1.process(Logs.java:49)
at com.test.Logs$1.process(Logs.java:1)
at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98)
at
org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
So I'm pretty sure the problem is this line:
Visit v = Visit.newBuilder(p.first()).build();
specifically p.first() should be a Visit type but I guess it isn't. I assume
the output of the groupBy operation in the reducers is serializing the key but
not using the correct Avro type to do it?
Also I don't think I understand when I should be using Avros.records() versus
Avros.specifics() when I have a generated avro file.
Thanks!
Danny
--
Director of Data ScienceCloudera
Twitter: @josh_wills
--
Director of Data ScienceClouderaTwitter: @josh_wills