FLUME-2338. Support coalescing increments in HBaseSink. (Mike Percy via Hari Shreedharan)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/5a06871c Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/5a06871c Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/5a06871c Branch: refs/heads/flume-1.5 Commit: 5a06871c78102b87ca5b53993cc7b95e66292bad Parents: ae022cf Author: Hari Shreedharan <hshreedha...@apache.org> Authored: Sun Mar 2 23:49:17 2014 -0800 Committer: Hari Shreedharan <hshreedha...@apache.org> Committed: Mon Mar 3 01:07:28 2014 -0800 ---------------------------------------------------------------------- flume-ng-doc/sphinx/FlumeUserGuide.rst | 2 + .../org/apache/flume/sink/hbase/BatchAware.java | 28 ++ .../org/apache/flume/sink/hbase/HBaseSink.java | 197 +++++++++++- .../sink/hbase/IncrementHBaseSerializer.java | 80 +++++ .../apache/flume/sink/hbase/TestHBaseSink.java | 309 ++++++++++++++++--- 5 files changed, 563 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/5a06871c/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 96bf73e..cedb283 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -1839,6 +1839,8 @@ Property Name Default Desc zookeeperQuorum -- The quorum spec. This is the value for the property ``hbase.zookeeper.quorum`` in hbase-site.xml znodeParent /hbase The base path for the znode for the -ROOT- region. Value of ``zookeeper.znode.parent`` in hbase-site.xml batchSize 100 Number of events to be written per txn. +coalesceIncrements false Should the sink coalesce multiple increments to a cell per batch. This might give + better performance if there are multiple increments to a limited number of cells. serializer org.apache.flume.sink.hbase.SimpleHbaseEventSerializer Default increment column = "iCol", payload column = "pCol". serializer.* -- Properties to be passed to the serializer. kerberosPrincipal -- Kerberos user principal for accessing secure HBase http://git-wip-us.apache.org/repos/asf/flume/blob/5a06871c/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/BatchAware.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/BatchAware.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/BatchAware.java new file mode 100644 index 0000000..0974241 --- /dev/null +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/BatchAware.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.sink.hbase; + +/** + * This interface allows for implementing HBase serializers that are aware of + * batching. {@link #onBatchStart()} is called at the beginning of each batch + * by the sink. + */ +public interface BatchAware { + public void onBatchStart(); +} http://git-wip-us.apache.org/repos/asf/flume/blob/5a06871c/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java index c4a666c..9996a4e 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java @@ -19,15 +19,23 @@ package org.apache.flume.sink.hbase; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.util.LinkedList; import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.FlumeException; import org.apache.flume.Transaction; +import org.apache.flume.annotations.InterfaceAudience; import org.apache.flume.conf.Configurable; import org.apache.flume.instrumentation.SinkCounter; import org.apache.flume.sink.AbstractSink; @@ -52,7 +60,7 @@ import org.apache.hadoop.hbase.security.User; /** * * A simple sink which reads events from a channel and writes them to HBase. - * The Hbase configution is picked up from the first <tt>hbase-site.xml</tt> + * The Hbase configuration is picked up from the first <tt>hbase-site.xml</tt> * encountered in the classpath. This sink supports batch reading of * events from the channel, and writing them to Hbase, to minimize the number * of flushes on the hbase tables. To use this sink, it has to be configured @@ -97,8 +105,13 @@ public class HBaseSink extends AbstractSink implements Configurable { private String kerberosKeytab; private User hbaseUser; private boolean enableWal = true; + private boolean batchIncrements = false; + private Method refGetFamilyMap = null; private SinkCounter sinkCounter; + // Internal hooks used for unit testing. + private DebugIncrementsCallback debugIncrCallback = null; + public HBaseSink(){ this(HBaseConfiguration.create()); } @@ -107,6 +120,13 @@ public class HBaseSink extends AbstractSink implements Configurable { this.config = conf; } + @VisibleForTesting + @InterfaceAudience.Private + HBaseSink(Configuration conf, DebugIncrementsCallback cb) { + this(conf); + this.debugIncrCallback = cb; + } + @Override public void start(){ Preconditions.checkArgument(table == null, "Please call stop " + @@ -222,6 +242,17 @@ public class HBaseSink extends AbstractSink implements Configurable { "writes to HBase will have WAL disabled, and any data in the " + "memstore of this region in the Region Server could be lost!"); } + + batchIncrements = context.getBoolean( + HBaseSinkConfigurationConstants.CONFIG_COALESCE_INCREMENTS, + HBaseSinkConfigurationConstants.DEFAULT_COALESCE_INCREMENTS); + + if (batchIncrements) { + logger.info("Increment coalescing is enabled. Increments will be " + + "buffered."); + refGetFamilyMap = reflectLookupGetFamilyMap(); + } + String zkQuorum = context.getString(HBaseSinkConfigurationConstants .ZK_QUORUM); Integer port = null; @@ -281,6 +312,11 @@ public class HBaseSink extends AbstractSink implements Configurable { List<Increment> incs = new LinkedList<Increment>(); try { txn.begin(); + + if (serializer instanceof BatchAware) { + ((BatchAware)serializer).onBatchStart(); + } + long i = 0; for (; i < batchSize; i++) { Event event = channel.take(); @@ -309,7 +345,7 @@ public class HBaseSink extends AbstractSink implements Configurable { try{ txn.rollback(); } catch (Exception e2) { - logger.error("Exception in rollback. Rollback might not have been" + + logger.error("Exception in rollback. Rollback might not have been " + "successful." , e2); } logger.error("Failed to commit transaction." + @@ -353,7 +389,20 @@ public class HBaseSink extends AbstractSink implements Configurable { runPrivileged(new PrivilegedExceptionAction<Void>() { @Override public Void run() throws Exception { - for (final Increment i : incs) { + + List<Increment> processedIncrements; + if (batchIncrements) { + processedIncrements = coalesceIncrements(incs); + } else { + processedIncrements = incs; + } + + // Only used for unit testing. + if (debugIncrCallback != null) { + debugIncrCallback.onAfterCoalesce(processedIncrements); + } + + for (final Increment i : processedIncrements) { i.setWriteToWAL(enableWal); table.increment(i); } @@ -364,6 +413,7 @@ public class HBaseSink extends AbstractSink implements Configurable { txn.commit(); sinkCounter.addToEventDrainSuccessCount(actions.size()); } + private <T> T runPrivileged(final PrivilegedExceptionAction<T> action) throws Exception { if(hbaseUser != null) { @@ -375,4 +425,145 @@ public class HBaseSink extends AbstractSink implements Configurable { return action.run(); } } + + /** + * The method getFamilyMap() is no longer available in Hbase 0.96. + * We must use reflection to determine which version we may use. + */ + @VisibleForTesting + static Method reflectLookupGetFamilyMap() { + Method m = null; + String[] methodNames = { "getFamilyMapOfLongs", "getFamilyMap" }; + for (String methodName : methodNames) { + try { + m = Increment.class.getMethod(methodName); + if (m != null && m.getReturnType().equals(Map.class)) { + logger.debug("Using Increment.{} for coalesce", methodName); + break; + } + } catch (NoSuchMethodException e) { + logger.debug("Increment.{} does not exist. Exception follows.", + methodName, e); + } catch (SecurityException e) { + logger.debug("No access to Increment.{}; Exception follows.", + methodName, e); + } + } + if (m == null) { + throw new UnsupportedOperationException( + "Cannot find Increment.getFamilyMap()"); + } + return m; + } + + @SuppressWarnings("unchecked") + private Map<byte[], NavigableMap<byte[], Long>> getFamilyMap(Increment inc) { + Preconditions.checkNotNull(refGetFamilyMap, + "Increment.getFamilymap() not found"); + Preconditions.checkNotNull(inc, "Increment required"); + Map<byte[], NavigableMap<byte[], Long>> familyMap = null; + try { + Object familyObj = refGetFamilyMap.invoke(inc); + familyMap = (Map<byte[], NavigableMap<byte[], Long>>) familyObj; + } catch (IllegalAccessException e) { + logger.warn("Unexpected error calling getFamilyMap()", e); + Throwables.propagate(e); + } catch (InvocationTargetException e) { + logger.warn("Unexpected error calling getFamilyMap()", e); + Throwables.propagate(e); + } + return familyMap; + } + + /** + * Perform "compression" on the given set of increments so that Flume sends + * the minimum possible number of RPC operations to HBase per batch. + * @param incs Input: Increment objects to coalesce. + * @return List of new Increment objects after coalescing the unique counts. + */ + private List<Increment> coalesceIncrements(Iterable<Increment> incs) { + Preconditions.checkNotNull(incs, "List of Increments must not be null"); + // Aggregate all of the increment row/family/column counts. + // The nested map is keyed like this: {row, family, qualifier} => count. + Map<byte[], Map<byte[], NavigableMap<byte[], Long>>> counters = + Maps.newTreeMap(Bytes.BYTES_COMPARATOR); + for (Increment inc : incs) { + byte[] row = inc.getRow(); + Map<byte[], NavigableMap<byte[], Long>> families = getFamilyMap(inc); + for (Map.Entry<byte[], NavigableMap<byte[],Long>> familyEntry : families.entrySet()) { + byte[] family = familyEntry.getKey(); + NavigableMap<byte[], Long> qualifiers = familyEntry.getValue(); + for (Map.Entry<byte[], Long> qualifierEntry : qualifiers.entrySet()) { + byte[] qualifier = qualifierEntry.getKey(); + Long count = qualifierEntry.getValue(); + incrementCounter(counters, row, family, qualifier, count); + } + } + } + + // Reconstruct list of Increments per unique row/family/qualifier. + List<Increment> coalesced = Lists.newLinkedList(); + for (Map.Entry<byte[], Map<byte[],NavigableMap<byte[], Long>>> rowEntry : counters.entrySet()) { + byte[] row = rowEntry.getKey(); + Map <byte[], NavigableMap<byte[], Long>> families = rowEntry.getValue(); + Increment inc = new Increment(row); + for (Map.Entry<byte[], NavigableMap<byte[], Long>> familyEntry : families.entrySet()) { + byte[] family = familyEntry.getKey(); + NavigableMap<byte[], Long> qualifiers = familyEntry.getValue(); + for (Map.Entry<byte[], Long> qualifierEntry : qualifiers.entrySet()) { + byte[] qualifier = qualifierEntry.getKey(); + long count = qualifierEntry.getValue(); + inc.addColumn(family, qualifier, count); + } + } + coalesced.add(inc); + } + + return coalesced; + } + + /** + * Helper function for {@link #coalesceIncrements} to increment a counter + * value in the passed data structure. + * @param counters Nested data structure containing the counters. + * @param row Row key to increment. + * @param family Column family to increment. + * @param qualifier Column qualifier to increment. + * @param count Amount to increment by. + */ + private void incrementCounter( + Map<byte[], Map<byte[], NavigableMap<byte[], Long>>> counters, + byte[] row, byte[] family, byte[] qualifier, Long count) { + + Map<byte[], NavigableMap<byte[], Long>> families = counters.get(row); + if (families == null) { + families = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); + counters.put(row, families); + } + + NavigableMap<byte[], Long> qualifiers = families.get(family); + if (qualifiers == null) { + qualifiers = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); + families.put(family, qualifiers); + } + + Long existingValue = qualifiers.get(qualifier); + if (existingValue == null) { + qualifiers.put(qualifier, count); + } else { + qualifiers.put(qualifier, existingValue + count); + } + } + + @VisibleForTesting + @InterfaceAudience.Private + HbaseEventSerializer getSerializer() { + return serializer; + } + + @VisibleForTesting + @InterfaceAudience.Private + interface DebugIncrementsCallback { + public void onAfterCoalesce(Iterable<Increment> increments); + } } http://git-wip-us.apache.org/repos/asf/flume/blob/5a06871c/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/IncrementHBaseSerializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/IncrementHBaseSerializer.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/IncrementHBaseSerializer.java new file mode 100644 index 0000000..b4343eb --- /dev/null +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/IncrementHBaseSerializer.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.sink.hbase; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Charsets; +import com.google.common.collect.Lists; +import java.util.Collections; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.conf.ComponentConfiguration; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Row; + +import java.util.List; + +/** + * For Increment-related unit tests. + */ +class IncrementHBaseSerializer implements HbaseEventSerializer, BatchAware { + private Event event; + private byte[] family; + private int numBatchesStarted = 0; + + @Override public void configure(Context context) { } + @Override public void configure(ComponentConfiguration conf) { } + @Override public void close() { } + + @Override + public void initialize(Event event, byte[] columnFamily) { + this.event = event; + this.family = columnFamily; + } + + // This class only creates Increments. + @Override + public List<Row> getActions() { + return Collections.emptyList(); + } + + // Treat each Event as a String, i,e, "row:qualifier". + @Override + public List<Increment> getIncrements() { + List<Increment> increments = Lists.newArrayList(); + String body = new String(event.getBody(), Charsets.UTF_8); + String[] pieces = body.split(":"); + String row = pieces[0]; + String qualifier = pieces[1]; + Increment inc = new Increment(row.getBytes(Charsets.UTF_8)); + inc.addColumn(family, qualifier.getBytes(Charsets.UTF_8), 1L); + increments.add(inc); + return increments; + } + + @Override + public void onBatchStart() { + numBatchesStarted++; + } + + @VisibleForTesting + public int getNumBatchesStarted() { + return numBatchesStarted; + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/5a06871c/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java index d1b0182..ab65a38 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java @@ -21,10 +21,16 @@ package org.apache.flume.sink.hbase; import static org.mockito.Mockito.*; import java.io.IOException; +import java.lang.reflect.Method; import java.util.Arrays; -import java.util.HashMap; +import java.util.List; import java.util.Map; - +import java.util.NavigableMap; +import com.google.common.base.Charsets; +import com.google.common.base.Throwables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.primitives.Longs; import org.apache.flume.Channel; import org.apache.flume.ChannelException; import org.apache.flume.Context; @@ -37,64 +43,95 @@ import org.apache.flume.channel.MemoryChannel; import org.apache.flume.conf.Configurables; import org.apache.flume.event.EventBuilder; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.ZKConfig; +import org.junit.After; import org.junit.AfterClass; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; import org.junit.Assert; - -import com.google.common.primitives.Longs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TestHBaseSink { - private static HBaseTestingUtility testUtility = new HBaseTestingUtility(); - private static String tableName = "TestHbaseSink"; - private static String columnFamily = "TestColumnFamily"; - private static String inColumn = "iCol"; - private static String plCol = "pCol"; - private static Context ctx = new Context(); - private static String valBase = "testing hbase sink: jham"; - private static Configuration conf; + private static final Logger logger = + LoggerFactory.getLogger(TestHBaseSink.class); + + private static final HBaseTestingUtility testUtility = new HBaseTestingUtility(); + private static final String tableName = "TestHbaseSink"; + private static final String columnFamily = "TestColumnFamily"; + private static final String inColumn = "iCol"; + private static final String plCol = "pCol"; + private static final String valBase = "testing hbase sink: jham"; + + private Configuration conf; + private Context ctx; @BeforeClass - public static void setUp() throws Exception { + public static void setUpOnce() throws Exception { testUtility.startMiniCluster(); - Map<String, String> ctxMap = new HashMap<String, String>(); - ctxMap.put("table", tableName); - ctxMap.put("columnFamily", columnFamily); - ctxMap.put("serializer", - "org.apache.flume.sink.hbase.SimpleHbaseEventSerializer"); - ctxMap.put("serializer.payloadColumn", plCol); - ctxMap.put("serializer.incrementColumn", inColumn); - ctx.putAll(ctxMap); - conf = new Configuration(testUtility.getConfiguration()); } @AfterClass - public static void tearDown() throws Exception { + public static void tearDownOnce() throws Exception { testUtility.shutdownMiniCluster(); } + /** + * Most common context setup for unit tests using + * {@link SimpleHbaseEventSerializer}. + */ + @Before + public void setUp() throws IOException { + conf = new Configuration(testUtility.getConfiguration()); + ctx = new Context(); + testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); + } + @After + public void tearDown() throws IOException { + testUtility.deleteTable(tableName.getBytes()); + } + + /** + * Set up {@link Context} for use with {@link SimpleHbaseEventSerializer}. + */ + private void initContextForSimpleHbaseEventSerializer() { + ctx = new Context(); + ctx.put("table", tableName); + ctx.put("columnFamily", columnFamily); + ctx.put("serializer", SimpleHbaseEventSerializer.class.getName()); + ctx.put("serializer.payloadColumn", plCol); + ctx.put("serializer.incrementColumn", inColumn); + } + + /** + * Set up {@link Context} for use with {@link IncrementHBaseSerializer}. + */ + private void initContextForIncrementHBaseSerializer() { + ctx = new Context(); + ctx.put("table", tableName); + ctx.put("columnFamily", columnFamily); + ctx.put("serializer", IncrementHBaseSerializer.class.getName()); + } @Test public void testOneEventWithDefaults() throws Exception { //Create a context without setting increment column and payload Column - Map<String,String> ctxMap = new HashMap<String,String>(); - ctxMap.put("table", tableName); - ctxMap.put("columnFamily", columnFamily); - ctxMap.put("serializer", - "org.apache.flume.sink.hbase.SimpleHbaseEventSerializer"); - Context tmpctx = new Context(); - tmpctx.putAll(ctxMap); + ctx = new Context(); + ctx.put("table", tableName); + ctx.put("columnFamily", columnFamily); + ctx.put("serializer", SimpleHbaseEventSerializer.class.getName()); - testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); HBaseSink sink = new HBaseSink(conf); Configurables.configure(sink, ctx); Channel channel = new MemoryChannel(); @@ -117,12 +154,11 @@ public class TestHBaseSink { Assert.assertArrayEquals(e.getBody(), out); out = results[1]; Assert.assertArrayEquals(Longs.toByteArray(1), out); - testUtility.deleteTable(tableName.getBytes()); } @Test public void testOneEvent() throws Exception { - testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); + initContextForSimpleHbaseEventSerializer(); HBaseSink sink = new HBaseSink(conf); Configurables.configure(sink, ctx); Channel channel = new MemoryChannel(); @@ -145,12 +181,11 @@ public class TestHBaseSink { Assert.assertArrayEquals(e.getBody(), out); out = results[1]; Assert.assertArrayEquals(Longs.toByteArray(1), out); - testUtility.deleteTable(tableName.getBytes()); } @Test public void testThreeEvents() throws Exception { - testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); + initContextForSimpleHbaseEventSerializer(); ctx.put("batchSize", "3"); HBaseSink sink = new HBaseSink(conf); Configurables.configure(sink, ctx); @@ -183,12 +218,11 @@ public class TestHBaseSink { Assert.assertEquals(3, found); out = results[3]; Assert.assertArrayEquals(Longs.toByteArray(3), out); - testUtility.deleteTable(tableName.getBytes()); } @Test public void testMultipleBatches() throws Exception { - testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); + initContextForSimpleHbaseEventSerializer(); ctx.put("batchSize", "2"); HBaseSink sink = new HBaseSink(conf); Configurables.configure(sink, ctx); @@ -227,11 +261,17 @@ public class TestHBaseSink { Assert.assertEquals(3, found); out = results[3]; Assert.assertArrayEquals(Longs.toByteArray(3), out); - testUtility.deleteTable(tableName.getBytes()); } @Test(expected = FlumeException.class) public void testMissingTable() throws Exception { + logger.info("Running testMissingTable()"); + initContextForSimpleHbaseEventSerializer(); + + // setUp() will create the table, so we delete it. + logger.info("Deleting table {}", tableName); + testUtility.deleteTable(tableName.getBytes()); + ctx.put("batchSize", "2"); HBaseSink sink = new HBaseSink(conf); Configurables.configure(sink, ctx); @@ -240,7 +280,8 @@ public class TestHBaseSink { Channel channel = new MemoryChannel(); Configurables.configure(channel, new Context()); sink.setChannel(channel); - sink.start(); + + logger.info("Writing data into channel"); Transaction tx = channel.getTransaction(); tx.begin(); for(int i = 0; i < 3; i++){ @@ -249,7 +290,25 @@ public class TestHBaseSink { } tx.commit(); tx.close(); - sink.process(); + + logger.info("Starting sink and processing events"); + try { + logger.info("Calling sink.start()"); + sink.start(); // This method will throw. + + // We never get here, but we log in case the behavior changes. + logger.error("Unexpected error: Calling sink.process()"); + sink.process(); + logger.error("Unexpected error: Calling sink.stop()"); + sink.stop(); + } finally { + // Re-create the table so tearDown() doesn't throw. + testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); + } + + // FIXME: The test should never get here, the below code doesn't run. + Assert.fail(); + HTable table = new HTable(conf, tableName); byte[][] results = getResults(table, 2); byte[] out; @@ -266,9 +325,9 @@ public class TestHBaseSink { out = results[2]; Assert.assertArrayEquals(Longs.toByteArray(2), out); sink.process(); - sink.stop(); } + // TODO: Move this test to a different class and run it stand-alone. /** * This test must run last - it shuts down the minicluster :D * @throws Exception @@ -280,8 +339,8 @@ public class TestHBaseSink { "and uncomment this annotation to run this test.") @Test(expected = EventDeliveryException.class) public void testHBaseFailure() throws Exception { + initContextForSimpleHbaseEventSerializer(); ctx.put("batchSize", "2"); - testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); HBaseSink sink = new HBaseSink(conf); Configurables.configure(sink, ctx); //Reset the context to a higher batchSize @@ -374,8 +433,9 @@ public class TestHBaseSink { @Test public void testTransactionStateOnChannelException() throws Exception { + initContextForSimpleHbaseEventSerializer(); ctx.put("batchSize", "1"); - testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); + HBaseSink sink = new HBaseSink(conf); Configurables.configure(sink, ctx); // Reset the context to a higher batchSize @@ -405,15 +465,15 @@ public class TestHBaseSink { Assert.assertArrayEquals(e.getBody(), out); out = results[1]; Assert.assertArrayEquals(Longs.toByteArray(1), out); - testUtility.deleteTable(tableName.getBytes()); } @Test public void testTransactionStateOnSerializationException() throws Exception { + initContextForSimpleHbaseEventSerializer(); ctx.put("batchSize", "1"); ctx.put(HBaseSinkConfigurationConstants.CONFIG_SERIALIZER, "org.apache.flume.sink.hbase.MockSimpleHbaseEventSerializer"); - testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); + HBaseSink sink = new HBaseSink(conf); Configurables.configure(sink, ctx); // Reset the context to a higher batchSize @@ -444,11 +504,11 @@ public class TestHBaseSink { Assert.assertArrayEquals(e.getBody(), out); out = results[1]; Assert.assertArrayEquals(Longs.toByteArray(1), out); - testUtility.deleteTable(tableName.getBytes()); } @Test public void testWithoutConfigurationObject() throws Exception{ + initContextForSimpleHbaseEventSerializer(); Context tmpContext = new Context(ctx.getParameters()); tmpContext.put("batchSize", "2"); tmpContext.put(HBaseSinkConfigurationConstants.ZK_QUORUM, @@ -457,7 +517,7 @@ public class TestHBaseSink { tmpContext.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT, conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT)); - testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); + HBaseSink sink = new HBaseSink(); Configurables.configure(sink, tmpContext); Channel channel = new MemoryChannel(); @@ -492,11 +552,11 @@ public class TestHBaseSink { Assert.assertEquals(3, found); out = results[3]; Assert.assertArrayEquals(Longs.toByteArray(3), out); - testUtility.deleteTable(tableName.getBytes()); } @Test public void testZKQuorum() throws Exception{ + initContextForSimpleHbaseEventSerializer(); Context tmpContext = new Context(ctx.getParameters()); String zkQuorum = "zk1.flume.apache.org:3342, zk2.flume.apache.org:3342, " + "zk3.flume.apache.org:3342"; @@ -516,6 +576,7 @@ public class TestHBaseSink { @Test (expected = FlumeException.class) public void testZKQuorumIncorrectPorts() throws Exception{ + initContextForSimpleHbaseEventSerializer(); Context tmpContext = new Context(ctx.getParameters()); String zkQuorum = "zk1.flume.apache.org:3345, zk2.flume.apache.org:3342, " + @@ -529,4 +590,152 @@ public class TestHBaseSink { Configurables.configure(sink, tmpContext); Assert.fail(); } -} \ No newline at end of file + + @Test + public void testCoalesce() throws EventDeliveryException { + initContextForIncrementHBaseSerializer(); + ctx.put("batchSize", "100"); + ctx.put(HBaseSinkConfigurationConstants.CONFIG_COALESCE_INCREMENTS, + String.valueOf(true)); + + final Map<String, Long> expectedCounts = Maps.newHashMap(); + expectedCounts.put("r1:c1", 10L); + expectedCounts.put("r1:c2", 20L); + expectedCounts.put("r2:c1", 7L); + expectedCounts.put("r2:c3", 63L); + HBaseSink.DebugIncrementsCallback cb = new CoalesceValidator(expectedCounts); + + HBaseSink sink = new HBaseSink(testUtility.getConfiguration(), cb); + Configurables.configure(sink, ctx); + Channel channel = createAndConfigureMemoryChannel(sink); + + List<Event> events = Lists.newLinkedList(); + generateEvents(events, expectedCounts); + putEvents(channel, events); + + sink.start(); + sink.process(); // Calls CoalesceValidator instance. + sink.stop(); + } + + @Test(expected = AssertionError.class) + public void negativeTestCoalesce() throws EventDeliveryException { + initContextForIncrementHBaseSerializer(); + ctx.put("batchSize", "10"); + + final Map<String, Long> expectedCounts = Maps.newHashMap(); + expectedCounts.put("r1:c1", 10L); + HBaseSink.DebugIncrementsCallback cb = new CoalesceValidator(expectedCounts); + + HBaseSink sink = new HBaseSink(testUtility.getConfiguration(), cb); + Configurables.configure(sink, ctx); + Channel channel = createAndConfigureMemoryChannel(sink); + + List<Event> events = Lists.newLinkedList(); + generateEvents(events, expectedCounts); + putEvents(channel, events); + + sink.start(); + sink.process(); // Calls CoalesceValidator instance. + sink.stop(); + } + + @Test + public void testBatchAware() throws EventDeliveryException { + logger.info("Running testBatchAware()"); + initContextForIncrementHBaseSerializer(); + HBaseSink sink = new HBaseSink(testUtility.getConfiguration()); + Configurables.configure(sink, ctx); + Channel channel = createAndConfigureMemoryChannel(sink); + + sink.start(); + int batchCount = 3; + for (int i = 0; i < batchCount; i++) { + sink.process(); + } + sink.stop(); + Assert.assertEquals(batchCount, + ((IncrementHBaseSerializer) sink.getSerializer()).getNumBatchesStarted()); + } + + /** + * For testing that the rows coalesced, serialized by + * {@link IncrementHBaseSerializer}, are of the expected batch size. + */ + private static class CoalesceValidator + implements HBaseSink.DebugIncrementsCallback { + + private final Map<String,Long> expectedCounts; + private final Method refGetFamilyMap; + + public CoalesceValidator(Map<String, Long> expectedCounts) { + this.expectedCounts = expectedCounts; + this.refGetFamilyMap = HBaseSink.reflectLookupGetFamilyMap(); + } + + @Override + @SuppressWarnings("unchecked") + public void onAfterCoalesce(Iterable<Increment> increments) { + for (Increment inc : increments) { + byte[] row = inc.getRow(); + Map<byte[], NavigableMap<byte[], Long>> families = null; + try { + families = (Map<byte[], NavigableMap<byte[], Long>>) + refGetFamilyMap.invoke(inc); + } catch (Exception e) { + Throwables.propagate(e); + } + for (byte[] family : families.keySet()) { + NavigableMap<byte[], Long> qualifiers = families.get(family); + for (Map.Entry<byte[], Long> entry : qualifiers.entrySet()) { + byte[] qualifier = entry.getKey(); + Long count = entry.getValue(); + StringBuilder b = new StringBuilder(20); + b.append(new String(row, Charsets.UTF_8)); + b.append(':'); + b.append(new String(qualifier, Charsets.UTF_8)); + String key = b.toString(); + Assert.assertEquals("Expected counts don't match observed for " + key, + expectedCounts.get(key), count); + } + } + } + } + } + + /** + * Add number of Events corresponding to counts to the events list. + * @param events Destination list. + * @param counts How many events to generate for each row:qualifier pair. + */ + private void generateEvents(List<Event> events, Map<String, Long> counts) { + for (String key : counts.keySet()) { + long count = counts.get(key); + for (long i = 0; i < count; i++) { + events.add(EventBuilder.withBody(key, Charsets.UTF_8)); + } + } + } + + private Channel createAndConfigureMemoryChannel(HBaseSink sink) { + Channel channel = new MemoryChannel(); + Context channelCtx = new Context(); + channelCtx.put("capacity", String.valueOf(1000L)); + channelCtx.put("transactionCapacity", String.valueOf(1000L)); + Configurables.configure(channel, channelCtx); + sink.setChannel(channel); + channel.start(); + return channel; + } + + private void putEvents(Channel channel, Iterable<Event> events) { + Transaction tx = channel.getTransaction(); + tx.begin(); + for (Event event : events) { + channel.put(event); + } + tx.commit(); + tx.close(); + } + +}