Hi Gabriel,
I have found a way by which Crunch supports the uses case of having
repeated iterators but I am not completely sure of the in-outs of the same.
Basically rather than doing a groupBy on Ptable to get back a
PGroupedTable, I used the collectValues API to get back a
Ptable<key,Collection<values>>.
/ PTable<Integer, Collection<TupleN>> collectValues =
classifiedData.collectValues();
PTable<String, Integer> scores = collectValues.parallelDo("compute
pairs",
new PTableScoreCalculator(),
Writables.tableOf(Writables.strings(), Writables.ints()));/
Now when I do ParalledDo on the new collection I get back a Pair, having
keyType and ArrayList<valueType>, over which I can do things as I wish.
/class PTableScoreCalculator extends DoFn<Pair<Integer,
Collection<TupleN>>, Pair<String, Integer>> {
//public void process(Pair<Integer, Collection<TupleN>> input,
Emitter<Pair<String, Integer>> emitter) {
Iterator<TupleN> primary = input.second().iterator();
.....................
}/
This way I could iterate over again and again, any comments on the same.
I am attaching my test case for reference.
BTW why are there two methods that can do the same things the
/groupBykey/**method and the /collectValues/ method ? I see an
Aggregation gets invoked for the collection API and in the other case a
lazy collection gets created. Any idea on the different applications of
the two.
regards,
Rahul
On 28-06-2012 14:17, Gabriel Reid wrote:
On Thu, Jun 28, 2012 at 9:29 AM, Rahul<[email protected]> wrote:
Yes indeed this is a small PoC to get familiar with Crunch in relation to my
problem. Basically I have the following algo at play:
1. Read data rows
2. Create custom keys for each of them, built using various attributes of
data (this time it is just a simple hash code, but I would like to emit
multiple key-value pairs)
3. Group similar data based on created Keys
4. Iterate over individual items in the group and do extensive comparison
between all of them
I just built an outline in the test case to see what/how can be done, can
you advise something better ?
Thanks for the outline. In this case, your approach (with putting the
contents of the
incoming Iterable into a collection) should work fine, as long as
number of elements
per group is relatively small (i.e. easily able to fit in the memory
available to each reducer in your Hadoop cluster).
- Gabriel
package com.mylearning.crunch;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.junit.Test;
import com.cloudera.crunch.DoFn;
import com.cloudera.crunch.Emitter;
import com.cloudera.crunch.FilterFn;
import com.cloudera.crunch.MapFn;
import com.cloudera.crunch.PCollection;
import com.cloudera.crunch.PGroupedTable;
import com.cloudera.crunch.PTable;
import com.cloudera.crunch.Pair;
import com.cloudera.crunch.Pipeline;
import com.cloudera.crunch.TupleN;
import com.cloudera.crunch.impl.mr.MRPipeline;
import com.cloudera.crunch.test.FileHelper;
import com.cloudera.crunch.types.writable.TupleWritable;
import com.cloudera.crunch.types.writable.WritableType;
import com.cloudera.crunch.types.writable.Writables;
import com.sun.corba.se.spi.orbutil.fsm.InputImpl;
public class TuplesTest {
@Test
public void shouldGiveSomeResultsV2() throws Exception {
String fileLoc = FileHelper.createTempCopyOf("person-data.txt");
Pipeline pipeline = new MRPipeline(TuplesTest.class);
PCollection<String> readLines = pipeline.readTextFile(fileLoc);
WritableType<TupleN, TupleWritable> tuples =
Writables.tuples(Writables.strings(),
Writables.strings(), Writables.strings(), Writables.strings(),
Writables.strings());
PTable<Integer, TupleN> classifiedData = readLines.parallelDo("readling
Lines",
new LineSpliter(), Writables.tableOf(Writables.ints(), tuples));
PTable<Integer, Collection<TupleN>> collectValues =
classifiedData.collectValues();
PTable<String, Integer> scores = collectValues.parallelDo("compute pairs",
new PTableScoreCalculator(), Writables.tableOf(Writables.strings(),
Writables.ints()));
PTable<String, Integer> selectedData = scores.parallelDo("filtering data",
new ThresholdFilter(
4), Writables.tableOf(Writables.strings(), Writables.ints()));
pipeline.writeTextFile(selectedData, "/home/rahul/crunchOut");
pipeline.done();
}
}
class PTableScoreCalculator extends DoFn<Pair<Integer, Collection<TupleN>>,
Pair<String, Integer>> {
private static final long serialVersionUID = -4325522952289479933L;
@Override
public void process(Pair<Integer, Collection<TupleN>> input,
Emitter<Pair<String, Integer>> emitter) {
Iterator<TupleN> primary = input.second().iterator();
int pos = 1;
while (primary.hasNext()) {
Iterator<TupleN> secondary = getSecondary(input.second(), pos++);
if (secondary == null) {
return;
}
TupleN first = (TupleN) primary.next();
while (secondary.hasNext()) {
TupleN second = (TupleN) secondary.next();
emitter.emit(new Pair<String, Integer>(first.toString() + " : " +
second.toString(),
computeDistance(first, second)));
}
}
}
private Iterator<TupleN> getSecondary(Collection<TupleN> data, int pos) {
Iterator<TupleN> secondary = data.iterator();
for (int i = 0; i < pos; i++) {
if (!secondary.hasNext()) {
return null;
}
secondary.next();
}
return secondary;
}
private Integer computeDistance(TupleN first, TupleN second) {
Integer distance = 0;
for (int pos = 0; pos < first.size(); pos++) {
String data = first.get(pos).toString();
String data2 = second.get(pos).toString();
distance += StringUtils.getLevenshteinDistance(data, data2);
}
return distance;
}
}
class LineSpliter extends MapFn<String, Pair<Integer, TupleN>> {
private static final long serialVersionUID = 1L;
@Override
public Pair<Integer, TupleN> map(String line) {
String[] splitData = line.split(",");
Object[] objects = new Object[splitData.length];
for (int pos = 0; pos < objects.length; pos++) {
objects[pos] = splitData[pos];
}
return new Pair<Integer, TupleN>(1, new TupleN(objects));
}
}
class ThresholdFilter extends FilterFn<Pair<String, Integer>> {
private static final long serialVersionUID = 1L;
private Integer threshold;
public ThresholdFilter(Integer threshold) {
this.threshold = threshold;
}
@Override
public boolean accept(Pair<String, Integer> input) {
return input.second() <= threshold;
}
}