So my thought would be that if the DoFn in this step:

beforeWrite.parallelDo(DoFn, U, ParallelDoOptions.builder().
sources(target).build());

signaled that it was going to write a lot of data with a large scaleFactor,
then the planner would use the output from beforeWrite as a checkpoint, and
save the DoFn processing for the map phase.


On Tue, Feb 25, 2014 at 5:08 PM, Jinal Shah <[email protected]> wrote:

> Yup this is to avoid .run() ;-) . But I want the beforeWrite output to be
> stored. So how do I apply the scaleFactor method and how will help to make
> the DoFn for afterWrite run in Mapside.
>
>
> On Tue, Feb 25, 2014 at 6:58 PM, Josh Wills <[email protected]> wrote:
>
> > Okay. Out of curiosity, if you override the float scaleFactor() method
> that
> > you apply here:
> >
> > PCollection<U> afterParallelDo = afterWrite.parallelDo(DoFn, U,
> > ParallelDoOptions.builder().sources(target).build());
> >
> > and apply it to beforeWrite, does it still insist on writing out
> > beforeWrite on the reduce side?
> >
> > BTW, I'm assuming there is (again) some reason not to force a run() here.
> > ;-)
> >
> >
> >
> > On Tue, Feb 25, 2014 at 4:51 PM, Jinal Shah <[email protected]>
> > wrote:
> >
> > > I wanted to run that in the map phase instead of reduce. If I don't do
> > that
> > > it will run in the reduce phase.
> > >
> > >
> > > On Tue, Feb 25, 2014 at 5:38 PM, Josh Wills <[email protected]>
> wrote:
> > >
> > > > On Tue, Feb 25, 2014 at 3:04 PM, Jinal Shah <[email protected]
> >
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > I'm trying to do an union of 3 PTables but I'm getting this error
> > > > > http://pastebin.com/TkMPunJu
> > > > >
> > > > > this is where it is throwing it
> > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/crunch/blob/master/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputCollection.java#L66
> > > > >
> > > > > this is what I'm trying to do
> > > > >
> > > > > PCollection<U> beforeWrite = someOperation();
> > > > >
> > > > > SourceTarget<U> target = new AvroFileTarget().asSourceTaget(U);
> > > > >
> > > > > pipeline.write(beforeWrite, target);
> > > > >
> > > > > PCollection<U> afterWrite = pipeline.read(target);
> > > > >
> > > >
> > > > Why are you creating afterWrite here, instead of doing the processing
> > in
> > > > the next step (the one that yields afterParallelDo) against
> > beforeWrite?
> > > >
> > > >
> > > > > PCollection<U> afterParallelDo = afterWrite.parallelDo(DoFn, U,
> > > > > ParallelDoOptions.builder().sources(target).build());
> > > > >
> > > > > PTable<K,U> afterSomeOperation = someOperations();
> > > > >
> > > > > PTable<K,U> thatNeedsToBeAdded = comingFromHbase();
> > > > >
> > > > > PTable<K,U> unionNeeded =
> > >  afterSomeOperation.union(thatNeedsToBeAdded);
> > > > //
> > > > > this is where it fails for some reason since it is looking for the
> > > target
> > > > > which is not generated yet.
> > > > >
> > > > >
> > > > > Can anyone help me in understanding why this is happening?
> > > > >
> > > > > Thanks
> > > > > Jinal
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > Director of Data Science
> > > > Cloudera <http://www.cloudera.com>
> > > > Twitter: @josh_wills <http://twitter.com/josh_wills>
> > > >
> > >
> >
>



-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

Reply via email to