Author: mubarak
Date: Thu Aug 2 06:33:04 2012
New Revision: 1368359
URL: http://svn.apache.org/viewvc?rev=1368359&view=rev
Log:
FLUME-1401: Asynchbase sink should be configurable to support timeout
(Hari Shreedharan via Brock Noland)
Modified:
flume/trunk/flume-ng-doc/sphinx/FlumeUserGuide.rst
flume/trunk/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
flume/trunk/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
Modified: flume/trunk/flume-ng-doc/sphinx/FlumeUserGuide.rst
URL:
http://svn.apache.org/viewvc/flume/trunk/flume-ng-doc/sphinx/FlumeUserGuide.rst?rev=1368359&r1=1368358&r2=1368359&view=diff
==============================================================================
--- flume/trunk/flume-ng-doc/sphinx/FlumeUserGuide.rst (original)
+++ flume/trunk/flume-ng-doc/sphinx/FlumeUserGuide.rst Thu Aug 2 06:33:04 2012
@@ -30,7 +30,7 @@ different sources to a centralized data
Apache Flume is a top level project at the Apache Software Foundation.
There are currently two release code lines available, versions 0.9.x and 1.x.
-This documentation applies to the 1.x codeline.
+This documentation applies to the 1.x codeline.
Please click here for
`the Flume 0.9.x User Guide
<http://archive.cloudera.com/cdh/3/flume/UserGuide/>`_.
@@ -155,7 +155,7 @@ A simple example
Here, we give an example configuration file, describing a single-node Flume
deployment. This configuration lets a user generate events and subsequently
logs them to the console.
.. code-block:: properties
-
+
# example.conf: A single-node Flume configuration
# Name the components on this agent
@@ -175,7 +175,7 @@ Here, we give an example configuration f
agent1.channels.channel1.type = memory
agent1.channels.channel1.capacity = 1000
agent1.channels.channel1.transactionCapactiy = 100
-
+
# Bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
@@ -643,7 +643,7 @@ interceptors.*
of indicating to the application writing the log file that it
needs to
retain the log or that the event hasn't been sent, for some
reason. If
this doesn't make sense, you need only know this: Your
application can
- never guarantee data has been received when using a
unidirectional
+ never guarantee data has been received when using a unidirectional
asynchronous interface such as ExecSource! As an extension of this
warning - and to be completely clear - there is absolutely zero
guarantee
of event delivery when using this source. You have been warned.
@@ -1204,17 +1204,19 @@ This sink is still experimental.
The type is the FQCN: org.apache.flume.sink.hbase.AsyncHBaseSink.
Required properties are in **bold**.
-================ ============================================================
=============================================================================
+================ ============================================================
====================================================================================
Property Name Default
Description
-================ ============================================================
=============================================================================
+================ ============================================================
====================================================================================
**channel** --
**type** --
The component type name, needs to be ``org.apache.flume.sink.AsyncHBaseSink``
**table** --
The name of the table in Hbase to write to.
**columnFamily** --
The column family in Hbase to write to.
batchSize 100
Number of events to be written per txn.
+timeout --
The length of time (in milliseconds) the sink waits for acks from hbase for
+
all events in a transaction. If no timeout is specified, the sink will wait
forever.
serializer org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
serializer.* --
Properties to be passed to the serializer.
-================ ============================================================
=============================================================================
+================ ============================================================
====================================================================================
Example for agent named **agent_foo**:
@@ -1361,8 +1363,8 @@ keep-alive 3
write-timeout 3 Amount of time (in
sec) to wait for a write operation
==================== ================================
========================================================
-.. note:: By default the File Channel uses paths for checkpoint and data
- directories that are within the user home as specified above.
+.. note:: By default the File Channel uses paths for checkpoint and data
+ directories that are within the user home as specified above.
As a result if you have more than one File Channel instances
active within the agent, only one will be able to lock the
directories and cause the other channel initialization to fail.
Modified:
flume/trunk/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
URL:
http://svn.apache.org/viewvc/flume/trunk/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java?rev=1368359&r1=1368358&r2=1368359&view=diff
==============================================================================
---
flume/trunk/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
(original)
+++
flume/trunk/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
Thu Aug 2 06:33:04 2012
@@ -43,10 +43,12 @@ import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.stumbleupon.async.Callback;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import org.apache.flume.ChannelException;
import org.apache.flume.instrumentation.SinkCounter;
/**
@@ -74,6 +76,9 @@ import org.apache.flume.instrumentation.
* maximum number of events the sink will commit per transaction. The default
* batch size is 100 events.
* <p>
+* <tt>timeout: </tt> The length of time in milliseconds the sink waits for
+* callbacks from hbase for all events in a transaction.
+* If no timeout is specified, the sink will wait forever.<p>
*
* <strong>Note: </strong> Hbase does not guarantee atomic commits on multiple
* rows. So if a subset of events in a batch are written to disk by Hbase and
@@ -99,6 +104,7 @@ public class AsyncHBaseSink extends Abst
private Transaction txn;
private volatile boolean open = false;
private SinkCounter sinkCounter;
+ private long timeout;
public AsyncHBaseSink(){
conf = HBaseConfiguration.create();
@@ -145,35 +151,40 @@ public class AsyncHBaseSink extends Abst
Status status = Status.READY;
Channel channel = getChannel();
- txn = channel.getTransaction();
- txn.begin();
int i = 0;
- for (; i < batchSize; i++) {
- Event event = channel.take();
- if (event == null) {
- status = Status.BACKOFF;
- if (i == 0) {
- sinkCounter.incrementBatchEmptyCount();
+ try {
+ txn = channel.getTransaction();
+ txn.begin();
+ for (; i < batchSize; i++) {
+ Event event = channel.take();
+ if (event == null) {
+ status = Status.BACKOFF;
+ if (i == 0) {
+ sinkCounter.incrementBatchEmptyCount();
+ } else {
+ sinkCounter.incrementBatchUnderflowCount();
+ }
+ break;
} else {
- sinkCounter.incrementBatchUnderflowCount();
- }
- break;
- } else {
- serializer.setEvent(event);
- List<PutRequest> actions = serializer.getActions();
- List<AtomicIncrementRequest> increments = serializer.getIncrements();
- callbacksExpected.addAndGet(actions.size() + increments.size());
+ serializer.setEvent(event);
+ List<PutRequest> actions = serializer.getActions();
+ List<AtomicIncrementRequest> increments = serializer.getIncrements();
+ callbacksExpected.addAndGet(actions.size() + increments.size());
- for (PutRequest action : actions) {
- client.put(action).addCallbacks(putSuccessCallback,
putFailureCallback);
- }
- for (AtomicIncrementRequest increment : increments) {
- client.atomicIncrement(increment).addCallbacks(
- incrementSuccessCallback, incrementFailureCallback);
+ for (PutRequest action : actions) {
+ client.put(action).addCallbacks(putSuccessCallback,
putFailureCallback);
+ }
+ for (AtomicIncrementRequest increment : increments) {
+ client.atomicIncrement(increment).addCallbacks(
+ incrementSuccessCallback, incrementFailureCallback);
+ }
}
}
+ } catch (Throwable e) {
+ this.handleTransactionFailure(txn);
+ this.checkIfChannelExceptionAndThrow(e);
}
- if(i == batchSize) {
+ if (i == batchSize) {
sinkCounter.incrementBatchCompleteCount();
}
sinkCounter.addToEventDrainAttemptCount(i);
@@ -183,14 +194,14 @@ public class AsyncHBaseSink extends Abst
while ((callbacksReceived.get() < callbacksExpected.get())
&& !txnFail.get()) {
try {
- condition.await();
- } catch (InterruptedException ex) {
- logger.error("Interrupted while waiting for callbacks from HBase.");
- try {
- txn.rollback();
- } finally {
- txn.close();
+ if(!condition.await(timeout, TimeUnit.MILLISECONDS)){
+ txnFail.set(true);
+ logger.warn("HBase callbacks timed out. "
+ + "Transaction will be rolled back.");
}
+ } catch (Exception ex) {
+ logger.error("Exception while waiting for callbacks from HBase.");
+ this.handleTransactionFailure(txn);
Throwables.propagate(ex);
}
}
@@ -215,28 +226,11 @@ public class AsyncHBaseSink extends Abst
} else {
try{
txn.commit();
+ txn.close();
sinkCounter.addToEventDrainSuccessCount(i);
} catch (Throwable e) {
- try{
- txn.rollback();
- } catch (Exception e2) {
- logger.error("Exception in rollback. Rollback might not have been" +
- "successful." , e2);
- }
- logger.error("Failed to commit transaction." +
- "Transaction rolled back.", e);
- if(e instanceof Error || e instanceof RuntimeException){
- logger.error("Failed to commit transaction." +
- "Transaction rolled back.", e);
- Throwables.propagate(e);
- } else {
- logger.error("Failed to commit transaction." +
- "Transaction rolled back.", e);
- throw new EventDeliveryException("Failed to commit transaction." +
- "Transaction rolled back.", e);
- }
- } finally {
- txn.close();
+ this.handleTransactionFailure(txn);
+ this.checkIfChannelExceptionAndThrow(e);
}
}
@@ -283,6 +277,13 @@ public class AsyncHBaseSink extends Abst
if(sinkCounter == null) {
sinkCounter = new SinkCounter(this.getName());
}
+ timeout = context.getLong(HBaseSinkConfigurationConstants.CONFIG_TIMEOUT,
+ HBaseSinkConfigurationConstants.DEFAULT_TIMEOUT);
+ if(timeout <= 0){
+ logger.warn("Timeout should be positive for Hbase sink. "
+ + "Sink will not timeout.");
+ timeout = HBaseSinkConfigurationConstants.DEFAULT_TIMEOUT;
+ }
}
@Override
public void start(){
@@ -419,4 +420,14 @@ public class AsyncHBaseSink extends Abst
return null;
}
}
+
+ private void checkIfChannelExceptionAndThrow(Throwable e)
+ throws EventDeliveryException {
+ if (e instanceof ChannelException) {
+ throw new EventDeliveryException("Error in processing transaction.", e);
+ } else if (e instanceof Error || e instanceof RuntimeException) {
+ Throwables.propagate(e);
+ }
+ throw new EventDeliveryException("Error in processing transaction.", e);
+ }
}
Modified:
flume/trunk/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
URL:
http://svn.apache.org/viewvc/flume/trunk/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java?rev=1368359&r1=1368358&r2=1368359&view=diff
==============================================================================
---
flume/trunk/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
(original)
+++
flume/trunk/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
Thu Aug 2 06:33:04 2012
@@ -44,5 +44,8 @@ public class HBaseSinkConfigurationConst
*/
public static final String CONFIG_SERIALIZER_PREFIX = CONFIG_SERIALIZER +
".";
+ public static final String CONFIG_TIMEOUT = "timeout";
+
+ public static final long DEFAULT_TIMEOUT = Long.MAX_VALUE;
}