Hi Gabriel,
Bull's eye :) My code was holding reference to a non-transient Text
instance.
Here is the culprit code
PTable<TextPair, Long> wordCoOccurrence = textFile.parallelDo(new
DoFn<String, Pair<TextPair, Long>>() {
TextPair textPair = new TextPair();
@Override
public void process(String input, Emitter<Pair<TextPair, Long>>
emitter) {
String[] words = input.split("\\s+");
for (int i = 0; i < words.length; i++) {
String word = words[i];
if(Strings.isNullOrEmpty(word)) {
continue;
}
// lets look for neighbours now
int start = (i - DEFAULT_NEIGHBOUR_WINDOW < 0) ? 0 : i
- DEFAULT_NEIGHBOUR_WINDOW;
int end = (i + DEFAULT_NEIGHBOUR_WINDOW >=
words.length) ? words.length - 1 : i + DEFAULT_NEIGHBOUR_WINDOW;
for(int j = start; j < end; j++) {
if(i == j) continue;
textPair.set(new Text(words[i]), new
Text(words[j]));
emitter.emit(Pair.of(textPair, 1L));
}
}
}
},
textFile.getTypeFamily().tableOf(Writables.writables(TextPair.class),
Writables.longs()));
And this is how I fixed it
PTable<TextPair, Long> wordCoOccurrence = textFile.parallelDo(new
DoFn<String, Pair<TextPair, Long>>() {
transient TextPair textPair;
@Override
public void initialize() {
super.initialize();
textPair = new TextPair();
}
@Override
public void process(String input, Emitter<Pair<TextPair, Long>>
emitter) {
String[] words = input.split("\\s+");
for (int i = 0; i < words.length; i++) {
String word = words[i];
if(Strings.isNullOrEmpty(word)) {
continue;
}
// lets look for neighbours now
int start = (i - DEFAULT_NEIGHBOUR_WINDOW < 0) ? 0 : i
- DEFAULT_NEIGHBOUR_WINDOW;
int end = (i + DEFAULT_NEIGHBOUR_WINDOW >=
words.length) ? words.length - 1 : i + DEFAULT_NEIGHBOUR_WINDOW;
for(int j = start; j < end; j++) {
if(i == j) continue;
textPair.set(new Text(words[i]), new
Text(words[j]));
emitter.emit(Pair.of(textPair, 1L));
}
}
}
},
textFile.getTypeFamily().tableOf(Writables.writables(TextPair.class),
Writables.longs()));
Would you please share how this part is converted to Hadoop Map function?
Does crunch convert these function to normal MapReduce jobs or the process
is more involved? I have to admit I coded this like I used to code Mapper
functions.
Appreciate your help.
On Wed, Dec 26, 2012 at 7:04 PM, Gabriel Reid <[email protected]>wrote:
> Hi Ashish,
>
> Are you holding on to a non-transient Text instance in a DoFn perhaps?
> DoFns need to remain serializable.
>
> Otherwise, could you post your (non-working) code (I'm assuming its pretty
> short).
>
> - Gabriel
>
>
> On 26 Dec 2012, at 13:54, Ashish <[email protected]> wrote:
>
> Folks,
>
> I was trying to port Word co-occurrence example(using Pairs) to Crunch.
> Had used famous TextPair class from Hadoop Definitive Guide.
> While running getting this error
>
> ERROR mr.MRPipeline: org.apache.crunch.impl.mr.run.CrunchRuntimeException:
> java.io.NotSerializableException: org.apache.hadoop.io.Text
>
> As an alternative, I created WordPair class that uses String instead of
> Text and implemented Serializable, WritableComparable. This piece worked.
>
> Is this behavior expected or I am missing something?
>
>
> --
> thanks
> ashish
>
> Blog: http://www.ashishpaliwal.com/blog
> My Photo Galleries: http://www.pbase.com/ashishpaliwal
>
>
--
thanks
ashish
Blog: http://www.ashishpaliwal.com/blog
My Photo Galleries: http://www.pbase.com/ashishpaliwal