As well as it is trying to run it in parallel so now it is failing on that.
On Wed, Feb 26, 2014 at 10:30 AM, Jinal Shah <[email protected]>wrote: > I did as you said but now it is running the DoFn twice since after that > parallel do I'm writing that output to HDFS so it divided that both work > into 2 once while storing the output it is running it in the reduce phase > and then while doing the union it is running it in the map phase. > > > On Tue, Feb 25, 2014 at 7:41 PM, Josh Wills <[email protected]> wrote: > >> So my thought would be that if the DoFn in this step: >> >> beforeWrite.parallelDo(DoFn, U, ParallelDoOptions.builder(). >> sources(target).build()); >> >> signaled that it was going to write a lot of data with a large >> scaleFactor, >> then the planner would use the output from beforeWrite as a checkpoint, >> and >> save the DoFn processing for the map phase. >> >> >> On Tue, Feb 25, 2014 at 5:08 PM, Jinal Shah <[email protected]> >> wrote: >> >> > Yup this is to avoid .run() ;-) . But I want the beforeWrite output to >> be >> > stored. So how do I apply the scaleFactor method and how will help to >> make >> > the DoFn for afterWrite run in Mapside. >> > >> > >> > On Tue, Feb 25, 2014 at 6:58 PM, Josh Wills <[email protected]> >> wrote: >> > >> > > Okay. Out of curiosity, if you override the float scaleFactor() method >> > that >> > > you apply here: >> > > >> > > PCollection<U> afterParallelDo = afterWrite.parallelDo(DoFn, U, >> > > ParallelDoOptions.builder().sources(target).build()); >> > > >> > > and apply it to beforeWrite, does it still insist on writing out >> > > beforeWrite on the reduce side? >> > > >> > > BTW, I'm assuming there is (again) some reason not to force a run() >> here. >> > > ;-) >> > > >> > > >> > > >> > > On Tue, Feb 25, 2014 at 4:51 PM, Jinal Shah <[email protected]> >> > > wrote: >> > > >> > > > I wanted to run that in the map phase instead of reduce. If I don't >> do >> > > that >> > > > it will run in the reduce phase. >> > > > >> > > > >> > > > On Tue, Feb 25, 2014 at 5:38 PM, Josh Wills <[email protected]> >> > wrote: >> > > > >> > > > > On Tue, Feb 25, 2014 at 3:04 PM, Jinal Shah < >> [email protected] >> > > >> > > > > wrote: >> > > > > >> > > > > > Hi, >> > > > > > >> > > > > > I'm trying to do an union of 3 PTables but I'm getting this >> error >> > > > > > http://pastebin.com/TkMPunJu >> > > > > > >> > > > > > this is where it is throwing it >> > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://github.com/apache/crunch/blob/master/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseInputCollection.java#L66 >> > > > > > >> > > > > > this is what I'm trying to do >> > > > > > >> > > > > > PCollection<U> beforeWrite = someOperation(); >> > > > > > >> > > > > > SourceTarget<U> target = new AvroFileTarget().asSourceTaget(U); >> > > > > > >> > > > > > pipeline.write(beforeWrite, target); >> > > > > > >> > > > > > PCollection<U> afterWrite = pipeline.read(target); >> > > > > > >> > > > > >> > > > > Why are you creating afterWrite here, instead of doing the >> processing >> > > in >> > > > > the next step (the one that yields afterParallelDo) against >> > > beforeWrite? >> > > > > >> > > > > >> > > > > > PCollection<U> afterParallelDo = afterWrite.parallelDo(DoFn, U, >> > > > > > ParallelDoOptions.builder().sources(target).build()); >> > > > > > >> > > > > > PTable<K,U> afterSomeOperation = someOperations(); >> > > > > > >> > > > > > PTable<K,U> thatNeedsToBeAdded = comingFromHbase(); >> > > > > > >> > > > > > PTable<K,U> unionNeeded = >> > > > afterSomeOperation.union(thatNeedsToBeAdded); >> > > > > // >> > > > > > this is where it fails for some reason since it is looking for >> the >> > > > target >> > > > > > which is not generated yet. >> > > > > > >> > > > > > >> > > > > > Can anyone help me in understanding why this is happening? >> > > > > > >> > > > > > Thanks >> > > > > > Jinal >> > > > > > >> > > > > >> > > > > >> > > > > >> > > > > -- >> > > > > Director of Data Science >> > > > > Cloudera <http://www.cloudera.com> >> > > > > Twitter: @josh_wills <http://twitter.com/josh_wills> >> > > > > >> > > > >> > > >> > >> >> >> >> -- >> Director of Data Science >> Cloudera <http://www.cloudera.com> >> Twitter: @josh_wills <http://twitter.com/josh_wills> >> > >
