Updated Branches: refs/heads/master 63050d0d4 -> 374bf3de6
CRUNCH-123: Add support for Deletes to crunch-hbase. Contributed by Micah Whitacre. Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/374bf3de Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/374bf3de Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/374bf3de Branch: refs/heads/master Commit: 374bf3de658f8e519da0fc7306f31793e4c075b2 Parents: 63050d0 Author: Josh Wills <[email protected]> Authored: Tue Dec 4 16:36:57 2012 -0800 Committer: Josh Wills <[email protected]> Committed: Tue Dec 4 16:36:57 2012 -0800 ---------------------------------------------------------------------- .../apache/crunch/io/hbase/WordCountHBaseIT.java | 35 +++++++++++++++ .../org/apache/crunch/io/hbase/HBaseTarget.java | 3 +- 2 files changed, 37 insertions(+), 1 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/374bf3de/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java index f13edeb..51abdaa 100644 --- a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java +++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java @@ -50,6 +50,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; @@ -97,6 +98,18 @@ public class WordCountHBaseIT { }, Writables.writables(Put.class)); } + + @SuppressWarnings("serial") + public static PCollection<Delete> clearCounts(PTable<ImmutableBytesWritable, Result> counts) { + return counts.parallelDo("convert to delete", new DoFn<Pair<ImmutableBytesWritable, Result>, Delete>() { + @Override + public void process(Pair<ImmutableBytesWritable, Result> input, Emitter<Delete> emitter) { + Delete delete = new Delete(input.first().get()); + emitter.emit(delete); + } + + }, Writables.writables(Delete.class)); + } @Before public void setUp() throws Exception { @@ -151,6 +164,7 @@ public class WordCountHBaseIT { jarUp(jos, baseDir, prefix + "WordCountHBaseIT.class"); jarUp(jos, baseDir, prefix + "WordCountHBaseIT$1.class"); jarUp(jos, baseDir, prefix + "WordCountHBaseIT$2.class"); + jarUp(jos, baseDir, prefix + "WordCountHBaseIT$3.class"); jos.close(); Path target = new Path(tmpPath, "WordCountHBaseIT.jar"); @@ -205,6 +219,20 @@ public class WordCountHBaseIT { assertIsLong(outputTable, "cat", 2); assertIsLong(outputTable, "dog", 1); + + //verify HBaseTarget supports deletes. + Scan clearScan = new Scan(); + clearScan.addColumn(COUNTS_COLFAM, null); + pipeline = new MRPipeline(WordCountHBaseIT.class, hbaseTestUtil.getConfiguration()); + HBaseSourceTarget clearSource = new HBaseSourceTarget(outputTableName, clearScan); + PTable<ImmutableBytesWritable, Result> counts = pipeline.read(clearSource); + pipeline.write(clearCounts(counts), new HBaseTarget(outputTableName)); + pipeline.done(); + + assertDeleted(outputTable, "cat"); + assertDeleted(outputTable, "dog"); + + } finally { // not quite sure... } @@ -226,4 +254,11 @@ public class WordCountHBaseIT { assertTrue(rawCount != null); assertEquals(new Long(i), new Long(Bytes.toLong(rawCount))); } + + protected void assertDeleted(HTable table, String key) throws IOException { + Get get = new Get(Bytes.toBytes(key)); + get.addColumn(COUNTS_COLFAM, null); + Result result = table.get(get); + assertTrue(result.isEmpty()); + } } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/374bf3de/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java index c659c86..48593b8 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java @@ -29,6 +29,7 @@ import org.apache.crunch.types.PType; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; @@ -69,7 +70,7 @@ public class HBaseTarget implements MapReduceTarget { @Override public boolean accept(OutputHandler handler, PType<?> ptype) { - if (Put.class.equals(ptype.getTypeClass())) { + if (Put.class.equals(ptype.getTypeClass()) || Delete.class.equals(ptype.getTypeClass())) { handler.configure(this, ptype); return true; }
