Hi,

Today I tried to create BloomFilters using Crunch, attached is the testcase for the same. I do not know if there is a better way of accomplishing the same. I think APIs to create/load BloomFilters could be a good add-on to Crunch's existing set. If people feel like it could be added then I can make a patch for the same.

regards,
Rahul

package org.apache.crunch.bloomfilter;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import org.apache.commons.lang.StringUtils;
import org.apache.crunch.CombineFn.Aggregator;
import org.apache.crunch.CombineFn.AggregatorCombineFn;
import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
import org.apache.crunch.PCollection;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.io.At;
import org.apache.crunch.test.TemporaryPath;
import org.apache.crunch.test.TemporaryPaths;
import org.apache.crunch.types.PTypeFamily;
import org.apache.crunch.types.writable.Writables;
import org.apache.hadoop.util.bloom.BloomFilter;
import org.apache.hadoop.util.bloom.Key;
import org.apache.hadoop.util.hash.Hash;
import org.junit.Rule;
import org.junit.Test;

import com.google.common.collect.ImmutableList;

public class BloomFiltersTest implements Serializable {
  @Rule
  public transient TemporaryPath tmpDir = TemporaryPaths.create();

  @Test
  public void testFilterCreation() throws IOException {
    String inputPath = tmpDir.copyResourceFileName("sample1.txt");
    BloomFilterFn<String> filterFn = new BloomFilterFn<String>() {
      @Override
      List<String> getKeys(String input) {
        if (input.length() > 4)
          return Arrays.asList(input);
        return null;
      }
    };
    BloomFilter filter = createFilter(inputPath, filterFn);
    assertFalse(filter.membershipTest(new Key("Mcbeth".getBytes())));
    assertTrue(filter.membershipTest(new Key("apple".getBytes())));
  }

  @Test
  public void testFilterCreation2() throws IOException {
    String inputPath = tmpDir.copyResourceFileName("shakes.txt");
    BloomFilterFn<String> filterFn = new BloomFilterFn<String>() {
      @Override
      List<String> getKeys(String input) {
        return Arrays.asList(StringUtils.split(input, " "));
      }
    };
    BloomFilter filter = createFilter(inputPath, filterFn);
    assertTrue(filter.membershipTest(new Key("Mcbeth".getBytes())));
    assertTrue(filter.membershipTest(new Key("apples".getBytes())));
  }

  private BloomFilter createFilter(String inputPath, BloomFilterFn<String> 
filterFn) throws IOException {
    MRPipeline pipeline = new MRPipeline(BloomFiltersTest.class);
    PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath));
    PTypeFamily tf = shakespeare.getTypeFamily();
    PTable<Boolean, BloomFilter> table = shakespeare.parallelDo(filterFn,
        tf.tableOf(tf.booleans(), Writables.writables(BloomFilter.class)));
    PTable<Boolean, BloomFilter> combineValues = 
table.groupByKey(1).combineValues(
        new AggregatorCombineFn<Boolean, BloomFilter>(new 
BloomFilterAggregator()));
    Iterator<Pair<Boolean, BloomFilter>> iterator = 
combineValues.materialize().iterator();
    shakespeare.getPipeline().run();
    return iterator.next().second();
  }
}

class BloomFilterAggregator implements Aggregator<BloomFilter> {
  private static final long serialVersionUID = 1L;
  private transient BloomFilter bloomFilter = null;

  @Override
  public void reset() {
    if (bloomFilter == null) {
      bloomFilter = new BloomFilter(1000, 5, Hash.MURMUR_HASH);
    }

  }

  @Override
  public void update(BloomFilter value) {
    bloomFilter.or(value);
  }

  @Override
  public Iterable<BloomFilter> results() {
    return ImmutableList.of(bloomFilter);
  }

}

abstract class BloomFilterFn<S> extends DoFn<S, Pair<Boolean, BloomFilter>> {
  private static final long serialVersionUID = -4170907490047335387L;
  private transient BloomFilter bloomFilter = null;

  @Override
  public void initialize() {
    super.initialize();
    bloomFilter = new BloomFilter(1000, 5, Hash.MURMUR_HASH);
  }

  @Override
  public void process(S input, Emitter<Pair<Boolean, BloomFilter>> emitter) {
    List<String> key = getKeys(input);
    if (key != null) {
      for (String value : key) {
        if (StringUtils.isNotBlank(value))
          bloomFilter.add(new Key(value.getBytes()));
      }
    }
  }

  abstract List<String> getKeys(S input);

  @Override
  public void cleanup(Emitter<Pair<Boolean, BloomFilter>> emitter) {
    emitter.emit(Pair.of(true, bloomFilter));
  }
}

Reply via email to