Hi Gabriel,

I have found a way by which Crunch supports the uses case of having repeated iterators but I am not completely sure of the in-outs of the same. Basically rather than doing a groupBy on Ptable to get back a PGroupedTable, I used the collectValues API to get back a Ptable<key,Collection<values>>.

/ PTable<Integer, Collection<TupleN>> collectValues = classifiedData.collectValues(); PTable<String, Integer> scores = collectValues.parallelDo("compute pairs", new PTableScoreCalculator(), Writables.tableOf(Writables.strings(), Writables.ints()));/


Now when I do ParalledDo on the new collection I get back a Pair, having keyType and ArrayList<valueType>, over which I can do things as I wish.

/class PTableScoreCalculator extends DoFn<Pair<Integer, Collection<TupleN>>, Pair<String, Integer>> {
//public void process(Pair<Integer, Collection<TupleN>> input,
      Emitter<Pair<String, Integer>> emitter) {
    Iterator<TupleN> primary = input.second().iterator();
.....................
}/

This way I could iterate over again and again, any comments on the same. I am attaching my test case for reference.

BTW why are there two methods that can do the same things the /groupBykey/**method and the /collectValues/ method ? I see an Aggregation gets invoked for the collection API and in the other case a lazy collection gets created. Any idea on the different applications of the two.

regards,
Rahul


On 28-06-2012 14:17, Gabriel Reid wrote:
On Thu, Jun 28, 2012 at 9:29 AM, Rahul<[email protected]>  wrote:
Yes indeed this is a small PoC to get familiar with Crunch in relation to my
problem. Basically I have the following algo at play:
1. Read data rows
2. Create custom keys for each of them, built using various attributes of
data (this time it is just a simple hash code, but I would like to emit
multiple key-value pairs)
3. Group similar data based on created Keys
4. Iterate over individual items in the group and do extensive comparison
between all of them

I just built an outline in the test case to see what/how can be done, can
you advise something better ?

Thanks for the outline. In this case, your approach (with putting the
contents of the
incoming Iterable into a collection) should work fine, as long as
number of elements
per group is relatively small (i.e. easily able to fit in the memory
available to each reducer in your Hadoop cluster).

- Gabriel

package com.mylearning.crunch;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;

import org.apache.commons.lang.StringUtils;
import org.junit.Test;

import com.cloudera.crunch.DoFn;
import com.cloudera.crunch.Emitter;
import com.cloudera.crunch.FilterFn;
import com.cloudera.crunch.MapFn;
import com.cloudera.crunch.PCollection;
import com.cloudera.crunch.PGroupedTable;
import com.cloudera.crunch.PTable;
import com.cloudera.crunch.Pair;
import com.cloudera.crunch.Pipeline;
import com.cloudera.crunch.TupleN;
import com.cloudera.crunch.impl.mr.MRPipeline;
import com.cloudera.crunch.test.FileHelper;
import com.cloudera.crunch.types.writable.TupleWritable;
import com.cloudera.crunch.types.writable.WritableType;
import com.cloudera.crunch.types.writable.Writables;
import com.sun.corba.se.spi.orbutil.fsm.InputImpl;

public class TuplesTest {

  @Test
  public void shouldGiveSomeResultsV2() throws Exception {
    String fileLoc = FileHelper.createTempCopyOf("person-data.txt");
    Pipeline pipeline = new MRPipeline(TuplesTest.class);
    PCollection<String> readLines = pipeline.readTextFile(fileLoc);

    WritableType<TupleN, TupleWritable> tuples = 
Writables.tuples(Writables.strings(),
        Writables.strings(), Writables.strings(), Writables.strings(), 
Writables.strings());

    PTable<Integer, TupleN> classifiedData = readLines.parallelDo("readling 
Lines",
        new LineSpliter(), Writables.tableOf(Writables.ints(), tuples));

    PTable<Integer, Collection<TupleN>> collectValues = 
classifiedData.collectValues();
    PTable<String, Integer> scores = collectValues.parallelDo("compute pairs",
        new PTableScoreCalculator(), Writables.tableOf(Writables.strings(), 
Writables.ints()));

    PTable<String, Integer> selectedData = scores.parallelDo("filtering data", 
new ThresholdFilter(
        4), Writables.tableOf(Writables.strings(), Writables.ints()));
    pipeline.writeTextFile(selectedData, "/home/rahul/crunchOut");
    pipeline.done();
  }
}

class PTableScoreCalculator extends DoFn<Pair<Integer, Collection<TupleN>>, 
Pair<String, Integer>> {

  private static final long serialVersionUID = -4325522952289479933L;

  @Override
  public void process(Pair<Integer, Collection<TupleN>> input,
      Emitter<Pair<String, Integer>> emitter) {
    Iterator<TupleN> primary = input.second().iterator();
    int pos = 1;
    while (primary.hasNext()) {
      Iterator<TupleN> secondary = getSecondary(input.second(), pos++);
      if (secondary == null) {
        return;
      }
      TupleN first = (TupleN) primary.next();
      while (secondary.hasNext()) {
        TupleN second = (TupleN) secondary.next();
        emitter.emit(new Pair<String, Integer>(first.toString() + " : " + 
second.toString(),
            computeDistance(first, second)));
      }
    }

  }

  private Iterator<TupleN> getSecondary(Collection<TupleN> data, int pos) {
    Iterator<TupleN> secondary = data.iterator();
    for (int i = 0; i < pos; i++) {
      if (!secondary.hasNext()) {
        return null;
      }
      secondary.next();
    }
    return secondary;
  }

  private Integer computeDistance(TupleN first, TupleN second) {
    Integer distance = 0;
    for (int pos = 0; pos < first.size(); pos++) {
      String data = first.get(pos).toString();
      String data2 = second.get(pos).toString();
      distance += StringUtils.getLevenshteinDistance(data, data2);
    }
    return distance;
  }

}

class LineSpliter extends MapFn<String, Pair<Integer, TupleN>> {

  private static final long serialVersionUID = 1L;

  @Override
  public Pair<Integer, TupleN> map(String line) {
    String[] splitData = line.split(",");
    Object[] objects = new Object[splitData.length];
    for (int pos = 0; pos < objects.length; pos++) {
      objects[pos] = splitData[pos];
    }
    return new Pair<Integer, TupleN>(1, new TupleN(objects));
  }

}

class ThresholdFilter extends FilterFn<Pair<String, Integer>> {

  private static final long serialVersionUID = 1L;
  private Integer threshold;

  public ThresholdFilter(Integer threshold) {
    this.threshold = threshold;
  }

  @Override
  public boolean accept(Pair<String, Integer> input) {
    return input.second() <= threshold;
  }

}

Reply via email to