Updated Branches: refs/heads/flume-1.5 12bb474f2 -> ee75bc57c
FLUME-2259. Transaction closure not happening for all the scenario in HBaseSink. (Gopinathan A via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/ee75bc57 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/ee75bc57 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/ee75bc57 Branch: refs/heads/flume-1.5 Commit: ee75bc57cdacf98620035ff0acb132821c01c9bb Parents: 12bb474 Author: Hari Shreedharan <hshreedha...@apache.org> Authored: Wed Jan 15 21:27:16 2014 -0800 Committer: Hari Shreedharan <hshreedha...@apache.org> Committed: Wed Jan 15 21:28:47 2014 -0800 ---------------------------------------------------------------------- .../org/apache/flume/sink/hbase/HBaseSink.java | 112 ++++++++++--------- .../hbase/MockSimpleHbaseEventSerializer.java | 38 +++++++ .../apache/flume/sink/hbase/TestHBaseSink.java | 82 +++++++++++++- 3 files changed, 173 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/ee75bc57/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java index d5996c3..f5cb229 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java @@ -231,66 +231,32 @@ public class HBaseSink extends AbstractSink implements Configurable { Transaction txn = channel.getTransaction(); List<Row> actions = new LinkedList<Row>(); List<Increment> incs = new LinkedList<Increment>(); - txn.begin(); - long i = 0; - for(; i < batchSize; i++) { - Event event = channel.take(); - if(event == null){ - status = Status.BACKOFF; - if (i == 0) { - sinkCounter.incrementBatchEmptyCount(); + try { + txn.begin(); + long i = 0; + for (; i < batchSize; i++) { + Event event = channel.take(); + if (event == null) { + if (i == 0) { + status = Status.BACKOFF; + sinkCounter.incrementBatchEmptyCount(); + } else { + sinkCounter.incrementBatchUnderflowCount(); + } + break; } else { - sinkCounter.incrementBatchUnderflowCount(); + serializer.initialize(event, columnFamily); + actions.addAll(serializer.getActions()); + incs.addAll(serializer.getIncrements()); } - break; - } else { - serializer.initialize(event, columnFamily); - actions.addAll(serializer.getActions()); - incs.addAll(serializer.getIncrements()); } - } - if (i == batchSize) { - sinkCounter.incrementBatchCompleteCount(); - } - sinkCounter.addToEventDrainAttemptCount(i); - - putEventsAndCommit(actions, incs, txn); - return status; - } + if (i == batchSize) { + sinkCounter.incrementBatchCompleteCount(); + } + sinkCounter.addToEventDrainAttemptCount(i); - private void putEventsAndCommit(final List<Row> actions, final List<Increment> incs, - Transaction txn) throws EventDeliveryException { - try { - runPrivileged(new PrivilegedExceptionAction<Void>() { - @Override - public Void run() throws Exception { - for(Row r : actions) { - if(r instanceof Put) { - ((Put)r).setWriteToWAL(enableWal); - } - // Newer versions of HBase - Increment implements Row. - if(r instanceof Increment) { - ((Increment)r).setWriteToWAL(enableWal); - } - } - table.batch(actions); - return null; - } - }); + putEventsAndCommit(actions, incs, txn); - runPrivileged(new PrivilegedExceptionAction<Void>() { - @Override - public Void run() throws Exception { - for (final Increment i : incs) { - i.setWriteToWAL(enableWal); - table.increment(i); - } - return null; - } - }); - - txn.commit(); - sinkCounter.addToEventDrainSuccessCount(actions.size()); } catch (Throwable e) { try{ txn.rollback(); @@ -313,6 +279,42 @@ public class HBaseSink extends AbstractSink implements Configurable { } finally { txn.close(); } + return status; + } + + private void putEventsAndCommit(final List<Row> actions, + final List<Increment> incs, Transaction txn) throws Exception { + + runPrivileged(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + for (Row r : actions) { + if (r instanceof Put) { + ((Put) r).setWriteToWAL(enableWal); + } + // Newer versions of HBase - Increment implements Row. + if (r instanceof Increment) { + ((Increment) r).setWriteToWAL(enableWal); + } + } + table.batch(actions); + return null; + } + }); + + runPrivileged(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + for (final Increment i : incs) { + i.setWriteToWAL(enableWal); + table.increment(i); + } + return null; + } + }); + + txn.commit(); + sinkCounter.addToEventDrainSuccessCount(actions.size()); } private <T> T runPrivileged(final PrivilegedExceptionAction<T> action) throws Exception { http://git-wip-us.apache.org/repos/asf/flume/blob/ee75bc57/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/MockSimpleHbaseEventSerializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/MockSimpleHbaseEventSerializer.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/MockSimpleHbaseEventSerializer.java new file mode 100644 index 0000000..9b2a850 --- /dev/null +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/MockSimpleHbaseEventSerializer.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flume.sink.hbase; + +import java.util.List; + +import org.apache.flume.FlumeException; +import org.apache.hadoop.hbase.client.Row; + +class MockSimpleHbaseEventSerializer extends SimpleHbaseEventSerializer { + + public static boolean throwException = false; + + @Override + public List<Row> getActions() throws FlumeException { + if (throwException) { + throw new FlumeException("Exception for testing"); + } + return super.getActions(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flume/blob/ee75bc57/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java index ab4128e..f41bf53 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java @@ -18,14 +18,15 @@ */ package org.apache.flume.sink.hbase; +import static org.mockito.Mockito.*; import java.io.IOException; import java.util.Arrays; import java.util.HashMap; import java.util.Map; - import org.apache.flume.Channel; +import org.apache.flume.ChannelException; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; @@ -35,14 +36,12 @@ import org.apache.flume.Transaction; import org.apache.flume.channel.MemoryChannel; import org.apache.flume.conf.Configurables; import org.apache.flume.event.EventBuilder; -import org.apache.flume.sink.hbase.HBaseSink; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; - import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Ignore; @@ -60,7 +59,6 @@ public class TestHBaseSink { private static Context ctx = new Context(); private static String valBase = "testing hbase sink: jham"; - @BeforeClass public static void setUp() throws Exception { testUtility.startMiniCluster(); @@ -368,5 +366,81 @@ public class TestHBaseSink { } return results; } + + @Test + public void testTransactionStateOnChannelException() throws Exception { + ctx.put("batchSize", "1"); + testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); + HBaseSink sink = new HBaseSink(testUtility.getConfiguration()); + Configurables.configure(sink, ctx); + // Reset the context to a higher batchSize + Channel channel = spy(new MemoryChannel()); + Configurables.configure(channel, new Context()); + sink.setChannel(channel); + sink.start(); + Transaction tx = channel.getTransaction(); + tx.begin(); + Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + 0)); + channel.put(e); + tx.commit(); + tx.close(); + doThrow(new ChannelException("Mock Exception")).when(channel).take(); + try { + sink.process(); + Assert.fail("take() method should throw exception"); + } catch (ChannelException ex) { + Assert.assertEquals("Mock Exception", ex.getMessage()); + } + doReturn(e).when(channel).take(); + sink.process(); + sink.stop(); + HTable table = new HTable(testUtility.getConfiguration(), tableName); + byte[][] results = getResults(table, 1); + byte[] out = results[0]; + Assert.assertArrayEquals(e.getBody(), out); + out = results[1]; + Assert.assertArrayEquals(Longs.toByteArray(1), out); + testUtility.deleteTable(tableName.getBytes()); + } + + @Test + public void testTransactionStateOnSerializationException() throws Exception { + ctx.put("batchSize", "1"); + ctx.put(HBaseSinkConfigurationConstants.CONFIG_SERIALIZER, + "org.apache.flume.sink.hbase.MockSimpleHbaseEventSerializer"); + testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); + HBaseSink sink = new HBaseSink(testUtility.getConfiguration()); + Configurables.configure(sink, ctx); + // Reset the context to a higher batchSize + Channel channel = new MemoryChannel(); + Configurables.configure(channel, new Context()); + sink.setChannel(channel); + sink.start(); + Transaction tx = channel.getTransaction(); + tx.begin(); + Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + 0)); + channel.put(e); + tx.commit(); + tx.close(); + try { + MockSimpleHbaseEventSerializer.throwException = true; + sink.process(); + Assert.fail("FlumeException expected from serilazer"); + } catch (FlumeException ex) { + Assert.assertEquals("Exception for testing", ex.getMessage()); + } + MockSimpleHbaseEventSerializer.throwException = false; + sink.process(); + sink.stop(); + HTable table = new HTable(testUtility.getConfiguration(), tableName); + byte[][] results = getResults(table, 1); + byte[] out = results[0]; + Assert.assertArrayEquals(e.getBody(), out); + out = results[1]; + Assert.assertArrayEquals(Longs.toByteArray(1), out); + testUtility.deleteTable(tableName.getBytes()); + } + + }