I wanted to run that in the map phase instead of reduce. If I don't do that
it will run in the reduce phase.


On Tue, Feb 25, 2014 at 5:38 PM, Josh Wills <[email protected]> wrote:

> On Tue, Feb 25, 2014 at 3:04 PM, Jinal Shah <[email protected]>
> wrote:
>
> > Hi,
> >
> > I'm trying to do an union of 3 PTables but I'm getting this error
> > http://pastebin.com/TkMPunJu
> >
> > this is where it is throwing it
> >
> >
> https://github.com/apache/crunch/blob/master/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputCollection.java#L66
> >
> > this is what I'm trying to do
> >
> > PCollection<U> beforeWrite = someOperation();
> >
> > SourceTarget<U> target = new AvroFileTarget().asSourceTaget(U);
> >
> > pipeline.write(beforeWrite, target);
> >
> > PCollection<U> afterWrite = pipeline.read(target);
> >
>
> Why are you creating afterWrite here, instead of doing the processing in
> the next step (the one that yields afterParallelDo) against beforeWrite?
>
>
> > PCollection<U> afterParallelDo = afterWrite.parallelDo(DoFn, U,
> > ParallelDoOptions.builder().sources(target).build());
> >
> > PTable<K,U> afterSomeOperation = someOperations();
> >
> > PTable<K,U> thatNeedsToBeAdded = comingFromHbase();
> >
> > PTable<K,U> unionNeeded =  afterSomeOperation.union(thatNeedsToBeAdded);
> //
> > this is where it fails for some reason since it is looking for the target
> > which is not generated yet.
> >
> >
> > Can anyone help me in understanding why this is happening?
> >
> > Thanks
> > Jinal
> >
>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>
>

Reply via email to