Thanks Gabriel ! Where can I look in Crunch code for this. Its like Crunch has some wrapper MapReduce functions and post the complete pipeline graph, it decides which functions to run when, passing the params from Mapper to DoFn instance. Is this understanding correct?
On Wed, Dec 26, 2012 at 10:44 PM, Gabriel Reid <[email protected]>wrote: > Hi Ashish, > > Your solution looks good -- indeed, any non-serializable members are > typically initialized in the initialize method. > > The way crunch works is that DoFn instances are serialized at the client, > and then deserialized, initialized, and run within map and reduce tasks. A > single map or reduce task will make use of one or more DoFn instances (ie > they can be chained together within a single task). > > - Gabriel > > > On 26 Dec 2012, at 15:26, Ashish <[email protected]> wrote: > > Hi Gabriel, > > Bull's eye :) My code was holding reference to a non-transient Text > instance. > > Here is the culprit code > > PTable<TextPair, Long> wordCoOccurrence = textFile.parallelDo(new > DoFn<String, Pair<TextPair, Long>>() { > TextPair textPair = new TextPair(); > @Override > public void process(String input, Emitter<Pair<TextPair, > Long>> emitter) { > > String[] words = input.split("\\s+"); > > for (int i = 0; i < words.length; i++) { > String word = words[i]; > if(Strings.isNullOrEmpty(word)) { > continue; > } > > // lets look for neighbours now > int start = (i - DEFAULT_NEIGHBOUR_WINDOW < 0) ? 0 : i > - DEFAULT_NEIGHBOUR_WINDOW; > int end = (i + DEFAULT_NEIGHBOUR_WINDOW >= > words.length) ? words.length - 1 : i + DEFAULT_NEIGHBOUR_WINDOW; > for(int j = start; j < end; j++) { > if(i == j) continue; > textPair.set(new Text(words[i]), new > Text(words[j])); > emitter.emit(Pair.of(textPair, 1L)); > } > } > } > }, > textFile.getTypeFamily().tableOf(Writables.writables(TextPair.class), > Writables.longs())); > > And this is how I fixed it > > PTable<TextPair, Long> wordCoOccurrence = textFile.parallelDo(new > DoFn<String, Pair<TextPair, Long>>() { > transient TextPair textPair; > > @Override > public void initialize() { > super.initialize(); > textPair = new TextPair(); > } > > @Override > public void process(String input, Emitter<Pair<TextPair, > Long>> emitter) { > String[] words = input.split("\\s+"); > > for (int i = 0; i < words.length; i++) { > String word = words[i]; > if(Strings.isNullOrEmpty(word)) { > continue; > } > > // lets look for neighbours now > int start = (i - DEFAULT_NEIGHBOUR_WINDOW < 0) ? 0 : i > - DEFAULT_NEIGHBOUR_WINDOW; > int end = (i + DEFAULT_NEIGHBOUR_WINDOW >= > words.length) ? words.length - 1 : i + DEFAULT_NEIGHBOUR_WINDOW; > for(int j = start; j < end; j++) { > if(i == j) continue; > textPair.set(new Text(words[i]), new > Text(words[j])); > emitter.emit(Pair.of(textPair, 1L)); > } > } > } > }, > textFile.getTypeFamily().tableOf(Writables.writables(TextPair.class), > Writables.longs())); > > Would you please share how this part is converted to Hadoop Map function? > Does crunch convert these function to normal MapReduce jobs or the process > is more involved? I have to admit I coded this like I used to code Mapper > functions. > > Appreciate your help. > > > On Wed, Dec 26, 2012 at 7:04 PM, Gabriel Reid <[email protected]>wrote: > >> Hi Ashish, >> >> Are you holding on to a non-transient Text instance in a DoFn perhaps? >> DoFns need to remain serializable. >> >> Otherwise, could you post your (non-working) code (I'm assuming its >> pretty short). >> >> - Gabriel >> >> >> On 26 Dec 2012, at 13:54, Ashish <[email protected]> wrote: >> >> Folks, >> >> I was trying to port Word co-occurrence example(using Pairs) to Crunch. >> Had used famous TextPair class from Hadoop Definitive Guide. >> While running getting this error >> >> ERROR mr.MRPipeline: >> org.apache.crunch.impl.mr.run.CrunchRuntimeException: >> java.io.NotSerializableException: org.apache.hadoop.io.Text >> >> As an alternative, I created WordPair class that uses String instead of >> Text and implemented Serializable, WritableComparable. This piece worked. >> >> Is this behavior expected or I am missing something? >> >> >> -- >> thanks >> ashish >> >> Blog: http://www.ashishpaliwal.com/blog >> My Photo Galleries: http://www.pbase.com/ashishpaliwal >> >> > > > -- > thanks > ashish > > Blog: http://www.ashishpaliwal.com/blog > My Photo Galleries: http://www.pbase.com/ashishpaliwal > > -- thanks ashish Blog: http://www.ashishpaliwal.com/blog My Photo Galleries: http://www.pbase.com/ashishpaliwal
