Hey Jacob, The challenge with consuming multiple sources of the same type is that the Configuration information will conflict (e.g., the name of the table has to be written to a certain field in a Configuration object, so writing it twice will overwrite the previous value.) This was a limitation in earlier revisions of Crunch, but we came up with the InputBundle abstraction (o.a.c.io.impl.InputBundle) to work around it, and I just haven't gotten around to updating the HBaseSourceTarget to take advantage of it.
We would want to move the InputBundle from o.a.c.io.impl to o.a.c.io and make it user-facing, and then update crunch-hbase to take advantage of it. If you file a JIRA for it, I'll make sure that it gets done. J On Wed, Dec 5, 2012 at 8:28 AM, Williams,Jacob <[email protected]>wrote: > When attempting to join or cogroup collections that originate from two > different HBaseSourceTargets, I get the error below. HBaseSourceTarget does > not call CrunchInputs.addInputPath, so when CrunchInputs.getFormatNodeMap > is called, RuntimeParameters.MULTI_INPUTS is null in the configuration. Any > thoughts on how to fix or work around this? > > Thanks, > Jacob Williams > > > Error: > > 12/12/05 09:56:02 INFO exec.CrunchJob: java.lang.NullPointerException > at > com.google.common.base.Preconditions.checkNotNull(Preconditions.java:187) > at com.google.common.base.Splitter.split(Splitter.java:371) > at > org.apache.crunch.impl.mr.run.CrunchInputs.getFormatNodeMap(CrunchInputs.java:51) > at > org.apache.crunch.impl.mr.run.CrunchInputFormat.getSplits(CrunchInputFormat.java:44) > at org.apache.hadoop.mapred.JobClient.writeNewSplits(JobClient.java:1014) > at org.apache.hadoop.mapred.JobClient.writeSplits(JobClient.java:1031) > at org.apache.hadoop.mapred.JobClient.access$600(JobClient.java:172) > at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:943) > at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:896) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:396) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1332) > at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:896) > at org.apache.hadoop.mapreduce.Job.submit(Job.java:531) > at > org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob.submit(CrunchControlledJob.java:331) > at org.apache.crunch.impl.mr.exec.CrunchJob.submit(CrunchJob.java:126) > at > org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl.startReadyJobs(CrunchJobControl.java:247) > at > org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl.run(CrunchJobControl.java:275) > at java.lang.Thread.run(Thread.java:662) > > > Example which produces this error: > > import java.io.IOException; > > import org.apache.crunch.MapFn; > import org.apache.crunch.PCollection; > import org.apache.crunch.PTable; > import org.apache.crunch.Pair; > import org.apache.crunch.io.hbase.HBaseSourceTarget; > import org.apache.crunch.Pipeline; > import org.apache.crunch.impl.mr.MRPipeline; > import org.apache.crunch.io.hbase.HBaseSourceTarget; > import org.apache.crunch.types.writable.Writables; > > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.hbase.HBaseConfiguration; > import org.apache.hadoop.hbase.client.Result; > import org.apache.hadoop.hbase.client.Scan; > import org.apache.hadoop.hbase.io.ImmutableBytesWritable; > > public class Main { > public static void main(String[] args) throws IOException { > Configuration config = HBaseConfiguration.create(); > > Pipeline pipeline = new MRPipeline(Main.class, "join-problem", > config); > > PTable<ImmutableBytesWritable, Result> results1 = pipeline.read(new > HBaseSourceTarget("some_table", new Scan())); > PTable<ImmutableBytesWritable, Result> results2 = pipeline.read(new > HBaseSourceTarget("some_table", new Scan())); > pipeline.writeTextFile(results1.join(results2), "join-test"); > pipeline.run(); > } > } > CONFIDENTIALITY NOTICE This message and any included attachments are > from Cerner Corporation and are intended only for the addressee. The > information contained in this message is confidential and may constitute > inside or non-public information under international, federal, or state > securities laws. Unauthorized forwarding, printing, copying, distribution, > or use of such information is strictly prohibited and may be unlawful. If > you are not the addressee, please promptly delete this message and notify > the sender of the delivery error by e-mail or you may call Cerner's > corporate offices in Kansas City, Missouri, U.S.A at (+1) (816)221-1024. > -- Director of Data Science Cloudera <http://www.cloudera.com> Twitter: @josh_wills <http://twitter.com/josh_wills>
