Hi,
Today I tried to create BloomFilters using Crunch, attached is the
testcase for the same. I do not know if there is a better way of
accomplishing the same.
I think APIs to create/load BloomFilters could be a good add-on to
Crunch's existing set. If people feel like it could be added then I can
make a patch for the same.
regards,
Rahul
package org.apache.crunch.bloomfilter;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.crunch.CombineFn.Aggregator;
import org.apache.crunch.CombineFn.AggregatorCombineFn;
import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
import org.apache.crunch.PCollection;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.io.At;
import org.apache.crunch.test.TemporaryPath;
import org.apache.crunch.test.TemporaryPaths;
import org.apache.crunch.types.PTypeFamily;
import org.apache.crunch.types.writable.Writables;
import org.apache.hadoop.util.bloom.BloomFilter;
import org.apache.hadoop.util.bloom.Key;
import org.apache.hadoop.util.hash.Hash;
import org.junit.Rule;
import org.junit.Test;
import com.google.common.collect.ImmutableList;
public class BloomFiltersTest implements Serializable {
@Rule
public transient TemporaryPath tmpDir = TemporaryPaths.create();
@Test
public void testFilterCreation() throws IOException {
String inputPath = tmpDir.copyResourceFileName("sample1.txt");
BloomFilterFn<String> filterFn = new BloomFilterFn<String>() {
@Override
List<String> getKeys(String input) {
if (input.length() > 4)
return Arrays.asList(input);
return null;
}
};
BloomFilter filter = createFilter(inputPath, filterFn);
assertFalse(filter.membershipTest(new Key("Mcbeth".getBytes())));
assertTrue(filter.membershipTest(new Key("apple".getBytes())));
}
@Test
public void testFilterCreation2() throws IOException {
String inputPath = tmpDir.copyResourceFileName("shakes.txt");
BloomFilterFn<String> filterFn = new BloomFilterFn<String>() {
@Override
List<String> getKeys(String input) {
return Arrays.asList(StringUtils.split(input, " "));
}
};
BloomFilter filter = createFilter(inputPath, filterFn);
assertTrue(filter.membershipTest(new Key("Mcbeth".getBytes())));
assertTrue(filter.membershipTest(new Key("apples".getBytes())));
}
private BloomFilter createFilter(String inputPath, BloomFilterFn<String>
filterFn) throws IOException {
MRPipeline pipeline = new MRPipeline(BloomFiltersTest.class);
PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath));
PTypeFamily tf = shakespeare.getTypeFamily();
PTable<Boolean, BloomFilter> table = shakespeare.parallelDo(filterFn,
tf.tableOf(tf.booleans(), Writables.writables(BloomFilter.class)));
PTable<Boolean, BloomFilter> combineValues =
table.groupByKey(1).combineValues(
new AggregatorCombineFn<Boolean, BloomFilter>(new
BloomFilterAggregator()));
Iterator<Pair<Boolean, BloomFilter>> iterator =
combineValues.materialize().iterator();
shakespeare.getPipeline().run();
return iterator.next().second();
}
}
class BloomFilterAggregator implements Aggregator<BloomFilter> {
private static final long serialVersionUID = 1L;
private transient BloomFilter bloomFilter = null;
@Override
public void reset() {
if (bloomFilter == null) {
bloomFilter = new BloomFilter(1000, 5, Hash.MURMUR_HASH);
}
}
@Override
public void update(BloomFilter value) {
bloomFilter.or(value);
}
@Override
public Iterable<BloomFilter> results() {
return ImmutableList.of(bloomFilter);
}
}
abstract class BloomFilterFn<S> extends DoFn<S, Pair<Boolean, BloomFilter>> {
private static final long serialVersionUID = -4170907490047335387L;
private transient BloomFilter bloomFilter = null;
@Override
public void initialize() {
super.initialize();
bloomFilter = new BloomFilter(1000, 5, Hash.MURMUR_HASH);
}
@Override
public void process(S input, Emitter<Pair<Boolean, BloomFilter>> emitter) {
List<String> key = getKeys(input);
if (key != null) {
for (String value : key) {
if (StringUtils.isNotBlank(value))
bloomFilter.add(new Key(value.getBytes()));
}
}
}
abstract List<String> getKeys(S input);
@Override
public void cleanup(Emitter<Pair<Boolean, BloomFilter>> emitter) {
emitter.emit(Pair.of(true, bloomFilter));
}
}