Hi,
I am trying to create multiple iterators in a DoFn process method.
/ public void process(Pair<Integer, Iterable<TupleN>> input,
Emitter<Pair<String, Integer>> emitter) {}
/Eve/ry /time I ask a iterator it gives back the same one and thus I
could not not traverse the list again and again as I am hitting the
following stack trace .
/
1575 [Thread-23] WARN org.apache.hadoop.mapred.LocalJobRunner -
job_local_0001
java.util.NoSuchElementException: iterate past last value
at
org.apache.hadoop.mapreduce.ReduceContext$ValueIterator.next(ReduceContext.java:159)
at
com.cloudera.crunch.types.PGroupedTableType$PTypeIterable$1.next(PGroupedTableType.java:56)
at
com.mylearning.crunch.TuplesTest$ScoreCalculator.process(TuplesTest.java:52)
at
com.mylearning.crunch.TuplesTest$ScoreCalculator.process(TuplesTest.java:1)
at com.cloudera.crunch.impl.mr.run.RTNode.process(RTNode.java:85)
at
com.cloudera.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:39)
.............../
What I see that /PGroupedTableType /returns back the same iterator
available to it via ReducerContext and this is the same reason that I
get to see the exception.
Shouldn't this give a back a new iterator everytime I ask it ?
If not then what should be the way of doing the same ? (I can iterate
first adding elements to a collection but then I donno if that should be
the way of going forward)
I have attached my test case for your reference.
regards,
Rahul
package com.mylearning.crunch;
import java.util.Iterator;
import org.junit.Test;
import com.cloudera.crunch.DoFn;
import com.cloudera.crunch.Emitter;
import com.cloudera.crunch.PCollection;
import com.cloudera.crunch.PGroupedTable;
import com.cloudera.crunch.PTable;
import com.cloudera.crunch.Pair;
import com.cloudera.crunch.Pipeline;
import com.cloudera.crunch.TupleN;
import com.cloudera.crunch.impl.mr.MRPipeline;
import com.cloudera.crunch.test.FileHelper;
import com.cloudera.crunch.types.writable.TupleWritable;
import com.cloudera.crunch.types.writable.WritableType;
import com.cloudera.crunch.types.writable.Writables;
public class TuplesTest {
static class LineSpliter extends DoFn<String, Pair<Integer, TupleN>> {
private static final long serialVersionUID = 1L;
public void process(String line, Emitter<Pair<Integer, TupleN>> emitter) {
String[] splitData = line.split(",");
Object[] objects = new Object[splitData.length];
for (int pos = 0; pos < objects.length; pos++) {
objects[pos] = splitData[pos];
}
emitter.emit(new Pair<Integer, TupleN>(line.hashCode(), new
TupleN(objects)));
}
}
static class ScoreCalculator extends DoFn<Pair<Integer, Iterable<TupleN>>,
Pair<String, Integer>> {
private static final long serialVersionUID = 1L;
@Override
public void process(Pair<Integer, Iterable<TupleN>> input,
Emitter<Pair<String, Integer>> emitter) {
Iterator<TupleN> primary = input.second().iterator();
int pos = 1;
while (primary.hasNext()) {
Iterator<TupleN> secondary = getSecondary(input, pos++);
if (secondary == null) {
return;
}
TupleN first = (TupleN) primary.next();
while (secondary.hasNext()) {
TupleN second = (TupleN) secondary.next();
emitter
.emit(new Pair<String, Integer>(first.toString() + " : " +
second.toString(), 100));
}
}
}
private Iterator<TupleN> getSecondary(Pair<Integer, Iterable<TupleN>>
input, int pos) {
Iterator<TupleN> secondary = input.second().iterator();
for (int i = 0; i < pos; i++) {
if (!secondary.hasNext()) {
return null;
}
secondary.next();
}
return secondary;
}
}
@Test
public void shouldGiveSomeResults() throws Exception {
String fileLoc = FileHelper.createTempCopyOf("person-data.txt");
Pipeline pipeline = new MRPipeline(TuplesTest.class);
PCollection<String> readLines = pipeline.readTextFile(fileLoc);
WritableType<TupleN, TupleWritable> tuples =
Writables.tuples(Writables.strings(),
Writables.strings(), Writables.strings(), Writables.strings(),
Writables.strings());
PTable<Integer, TupleN> classifiedData = readLines.parallelDo("readling
Lines",
new LineSpliter(), Writables.tableOf(Writables.ints(), tuples));
PGroupedTable<Integer, TupleN> groupedData = classifiedData.groupByKey();
PTable<String, Integer> scores = groupedData.parallelDo("compute scores",
new ScoreCalculator(), Writables.tableOf(Writables.strings(),
Writables.ints()));
pipeline.writeTextFile(scores, "/home/rahul/crunchOut");
pipeline.done();
}
}
name,Fname,LName,Country,DoB
Rahul Sharma,Rahul,Sharma,India,1-1-1999
Rahul Sharma,Rahul,Sharma,India,1-1-1999
Rahul Sharma,Rahul,Sharma,India,1-1-1999
Rahul Sharma,Rahul,Sharma,India,1-1-1999
Rahul Sharma,Rahul,Sharma,India,1-1-1999
Rahul Sharma,Rahul,Sharma,India,1-1-1999
Rahul Sharma,Rahul,Sharma,India,1-1-1999
Rahul Sharma,Rahul,Sharma,India,1-1-1999
Rahul Sharma,Rahul,Sharma,India,1-1-1999
Rahul Sharma,Rahul,Sharma,India,1-1-1999
Rahul Sharma,Rahul,Sharma,India,1-1-1999
Rahul Sharma,Rahul,Sharma,India,1-1-1999
Rahul Sharma,Rahul,Sharma,India,1-1-1999