Updated Branches:
  refs/heads/flume-1.3.0 cdddf5aa8 -> fff91b4ba

FLUME-1535. Ability to specify the capacity of MemoryChannel in bytes.

(Ted Malaska via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/fff91b4b
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/fff91b4b
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/fff91b4b

Branch: refs/heads/flume-1.3.0
Commit: fff91b4ba5c8529e5e1acd31c04e12464994aad0
Parents: cdddf5a
Author: Hari Shreedharan <[email protected]>
Authored: Fri Oct 5 15:23:54 2012 -0700
Committer: Hari Shreedharan <[email protected]>
Committed: Fri Oct 5 15:24:37 2012 -0700

----------------------------------------------------------------------
 .../org/apache/flume/channel/MemoryChannel.java    |  120 ++++++++--
 .../apache/flume/channel/TestMemoryChannel.java    |  183 ++++++++++++++-
 2 files changed, 276 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/fff91b4b/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java 
b/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
index c72e97c..fc3a1e2 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
@@ -37,12 +37,18 @@ public class MemoryChannel extends BasicChannelSemantics {
   private static Logger LOGGER = LoggerFactory.getLogger(MemoryChannel.class);
   private static final Integer defaultCapacity = 100;
   private static final Integer defaultTransCapacity = 100;
+  private static final double byteCapacitySlotSize = 100;
+  private static final Long defaultByteCapacity = 
(long)(Runtime.getRuntime().maxMemory() * .80);
+  private static final Integer defaultByteCapacityBufferPercentage = 20;
+
   private static final Integer defaultKeepAlive = 3;
 
   public class MemoryTransaction extends BasicTransactionSemantics {
     private LinkedBlockingDeque<Event> takeList;
     private LinkedBlockingDeque<Event> putList;
     private final ChannelCounter channelCounter;
+    private int putByteCounter = 0;
+    private int takeByteCounter = 0;
 
     public MemoryTransaction(int transCapacity, ChannelCounter counter) {
       putList = new LinkedBlockingDeque<Event>(transCapacity);
@@ -52,13 +58,24 @@ public class MemoryChannel extends BasicChannelSemantics {
     }
 
     @Override
-    protected void doPut(Event event) {
+    protected void doPut(Event event) throws InterruptedException {
       channelCounter.incrementEventPutAttemptCount();
-      if(!putList.offer(event)) {
-        throw new ChannelException("Put queue for MemoryTransaction of 
capacity " +
-            putList.size() + " full, consider committing more frequently, " +
-            "increasing capacity or increasing thread count");
+      int eventByteSize = 
(int)Math.ceil(estimateEventSize(event)/byteCapacitySlotSize);
+
+      if (bytesRemaining.tryAcquire(eventByteSize, keepAlive, 
TimeUnit.SECONDS)) {
+        if(!putList.offer(event)) {
+          throw new ChannelException("Put queue for MemoryTransaction of 
capacity " +
+              putList.size() + " full, consider committing more frequently, " +
+              "increasing capacity or increasing thread count");
+        }
+      } else {
+        throw new ChannelException("Put queue for MemoryTransaction of 
byteCapacity " +
+            (lastByteCapacity * (int)byteCapacitySlotSize) + " bytes cannot 
add an " +
+            " event of size " + estimateEventSize(event) + " bytes because " +
+             (bytesRemaining.availablePermits() * (int)byteCapacitySlotSize) + 
" bytes are already used." +
+            " Try consider comitting more frequently, increasing byteCapacity 
or increasing thread count");
       }
+      putByteCounter += eventByteSize;
     }
 
     @Override
@@ -80,6 +97,9 @@ public class MemoryChannel extends BasicChannelSemantics {
           "signalling existence of entry");
       takeList.put(event);
 
+      int eventByteSize = 
(int)Math.ceil(estimateEventSize(event)/byteCapacitySlotSize);
+      takeByteCounter += eventByteSize;
+
       return event;
     }
 
@@ -105,6 +125,10 @@ public class MemoryChannel extends BasicChannelSemantics {
         putList.clear();
         takeList.clear();
       }
+      bytesRemaining.release(takeByteCounter);
+      takeByteCounter = 0;
+      putByteCounter = 0;
+
       queueStored.release(puts);
       if(remainingChange > 0) {
         queueRemaining.release(remainingChange);
@@ -130,6 +154,10 @@ public class MemoryChannel extends BasicChannelSemantics {
         }
         putList.clear();
       }
+      bytesRemaining.release(putByteCounter);
+      putByteCounter = 0;
+      takeByteCounter = 0;
+
       queueStored.release(takes);
       channelCounter.setChannelSize(queue.size());
     }
@@ -155,6 +183,10 @@ public class MemoryChannel extends BasicChannelSemantics {
   // maximum items in a transaction queue
   private volatile Integer transCapacity;
   private volatile int keepAlive;
+  private volatile int byteCapacity;
+  private volatile int lastByteCapacity;
+  private volatile int byteCapacityBufferPercentage;
+  private Semaphore bytesRemaining;
   private ChannelCounter channelCounter;
 
 
@@ -163,37 +195,50 @@ public class MemoryChannel extends BasicChannelSemantics {
     queueLock = 0;
   }
 
+  /**
+   * Read parameters from context
+   * <li>capacity = type long that defines the total number of events allowed 
at one time in the queue.
+   * <li>transactionCapacity = type long that defines the total number of 
events allowed in one transaction.
+   * <li>byteCapacity = type long that defines the max number of bytes used 
for events in the queue.
+   * <li>byteCapacityBufferPercentage = type int that defines the percent of 
buffer between byteCapacity and the estimated event size.
+   * <li>keep-alive = type int that defines the number of second to wait for a 
queue permit
+   */
   @Override
   public void configure(Context context) {
-    String strCapacity = context.getString("capacity");
     Integer capacity = null;
-    if(strCapacity == null) {
+    try {
+      capacity = context.getInteger("capacity", defaultCapacity);
+    } catch(NumberFormatException e) {
       capacity = defaultCapacity;
-    } else {
-      try {
-        capacity = Integer.parseInt(strCapacity);
-      } catch(NumberFormatException e) {
-        capacity = defaultCapacity;
-      }
     }
-    String strTransCapacity = context.getString("transactionCapacity");
-    if(strTransCapacity == null) {
+
+    try {
+      transCapacity = context.getInteger("transactionCapacity", 
defaultTransCapacity);
+    } catch(NumberFormatException e) {
       transCapacity = defaultTransCapacity;
-    } else {
-      try {
-        transCapacity = Integer.parseInt(strTransCapacity);
-      } catch(NumberFormatException e) {
-        transCapacity = defaultTransCapacity;
-      }
     }
+
     Preconditions.checkState(transCapacity <= capacity);
 
-    String strKeepAlive = context.getString("keep-alive");
+    try {
+      byteCapacityBufferPercentage = 
context.getInteger("byteCapacityBufferPercentage", 
defaultByteCapacityBufferPercentage);
+    } catch(NumberFormatException e) {
+      byteCapacityBufferPercentage = defaultByteCapacityBufferPercentage;
+    }
+
+    try {
+      byteCapacity = (int)((context.getLong("byteCapacity", 
defaultByteCapacity).longValue() * (1 - byteCapacityBufferPercentage * .01 )) 
/byteCapacitySlotSize);
+      if (byteCapacity < 1) {
+        byteCapacity = Integer.MAX_VALUE;
+      }
+    } catch(NumberFormatException e) {
+      byteCapacity = (int)((defaultByteCapacity * (1 - 
byteCapacityBufferPercentage * .01 )) /byteCapacitySlotSize);
+    }
 
-    if (strKeepAlive == null) {
+    try {
+      keepAlive = context.getInteger("keep-alive", defaultKeepAlive);
+    } catch(NumberFormatException e) {
       keepAlive = defaultKeepAlive;
-    } else {
-      keepAlive = Integer.parseInt(strKeepAlive);
     }
 
     if(queue != null) {
@@ -210,6 +255,26 @@ public class MemoryChannel extends BasicChannelSemantics {
       }
     }
 
+    if (bytesRemaining == null) {
+      bytesRemaining = new Semaphore(byteCapacity);
+      lastByteCapacity = byteCapacity;
+    } else {
+      if (byteCapacity > lastByteCapacity) {
+        bytesRemaining.release(byteCapacity - lastByteCapacity);
+        lastByteCapacity = byteCapacity;
+      } else {
+        try {
+          if(!bytesRemaining.tryAcquire(lastByteCapacity - byteCapacity, 
keepAlive, TimeUnit.SECONDS)) {
+            LOGGER.warn("Couldn't acquire permits to downsize the byte 
capacity, resizing has been aborted");
+          } else {
+            lastByteCapacity = byteCapacity;
+          }
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+      }
+    }
+
     if (channelCounter == null) {
       channelCounter = new ChannelCounter(getName());
     }
@@ -263,4 +328,9 @@ public class MemoryChannel extends BasicChannelSemantics {
   protected BasicTransactionSemantics createTransaction() {
     return new MemoryTransaction(transCapacity, channelCounter);
   }
+
+  private long estimateEventSize(Event event)
+  {
+    return event.getBody().length;
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/fff91b4b/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java 
b/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java
index e070864..4af4a40 100644
--- 
a/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java
+++ 
b/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java
@@ -176,7 +176,7 @@ public class TestMemoryChannel {
   }
 
   @Test
-  public void testBufferEmptyingAfterTakeCommit() {
+  public void testCapacityBufferEmptyingAfterTakeCommit() {
     Context context = new Context();
     Map<String, String> parms = new HashMap<String, String>();
     parms.put("capacity", "3");
@@ -208,7 +208,7 @@ public class TestMemoryChannel {
   }
 
   @Test
-  public void testBufferEmptyingAfterRollback() {
+  public void testCapacityBufferEmptyingAfterRollback() {
     Context context = new Context();
     Map<String, String> parms = new HashMap<String, String>();
     parms.put("capacity", "3");
@@ -232,4 +232,183 @@ public class TestMemoryChannel {
     tx.commit();
     tx.close();
   }
+
+  @Test(expected=ChannelException.class)
+  public void testByteCapacityOverload() {
+    Context context = new Context();
+    Map<String, String> parms = new HashMap<String, String>();
+    parms.put("byteCapacity", "2000");
+    parms.put("byteCapacityBufferPercentage", "20");
+    context.putAll(parms);
+    Configurables.configure(channel,  context);
+
+    byte[] eventBody = new byte[405];
+
+    Transaction transaction = channel.getTransaction();
+    transaction.begin();
+    channel.put(EventBuilder.withBody(eventBody));
+    channel.put(EventBuilder.withBody(eventBody));
+    channel.put(EventBuilder.withBody(eventBody));
+    transaction.commit();
+    transaction.close();
+
+    transaction = channel.getTransaction();
+    transaction.begin();
+    channel.put(EventBuilder.withBody(eventBody));
+    channel.put(EventBuilder.withBody(eventBody));
+    // this should kill  it
+    transaction.commit();
+    Assert.fail();
+
+  }
+
+  public void testByteCapacityBufferEmptyingAfterTakeCommit() {
+    Context context = new Context();
+    Map<String, String> parms = new HashMap<String, String>();
+    parms.put("byteCapacity", "2000");
+    parms.put("byteCapacityBufferPercentage", "20");
+    context.putAll(parms);
+    Configurables.configure(channel,  context);
+
+    byte[] eventBody = new byte[405];
+
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    channel.put(EventBuilder.withBody(eventBody));
+    channel.put(EventBuilder.withBody(eventBody));
+    channel.put(EventBuilder.withBody(eventBody));
+    channel.put(EventBuilder.withBody(eventBody));
+    try {
+      channel.put(EventBuilder.withBody(eventBody));
+      throw new RuntimeException("Put was able to overflow byte capacity.");
+    } catch (ChannelException ce)
+    {
+      //Do nothing
+    }
+
+    tx.commit();
+    tx.close();
+
+    tx = channel.getTransaction();
+    tx.begin();
+    channel.take();
+    channel.take();
+    tx.commit();
+    tx.close();
+
+    tx = channel.getTransaction();
+    tx.begin();
+    channel.put(EventBuilder.withBody(eventBody));
+    channel.put(EventBuilder.withBody(eventBody));
+    try {
+      channel.put(EventBuilder.withBody(eventBody));
+      throw new RuntimeException("Put was able to overflow byte capacity.");
+    } catch (ChannelException ce)
+    {
+      //Do nothing
+    }
+    tx.commit();
+    tx.close();
+  }
+
+  @Test
+  public void testByteCapacityBufferEmptyingAfterRollback() {
+    Context context = new Context();
+    Map<String, String> parms = new HashMap<String, String>();
+    parms.put("byteCapacity", "2000");
+    parms.put("byteCapacityBufferPercentage", "20");
+    context.putAll(parms);
+    Configurables.configure(channel,  context);
+
+    byte[] eventBody = new byte[405];
+
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    channel.put(EventBuilder.withBody(eventBody));
+    channel.put(EventBuilder.withBody(eventBody));
+    channel.put(EventBuilder.withBody(eventBody));
+    tx.rollback();
+    tx.close();
+
+    tx = channel.getTransaction();
+    tx.begin();
+    channel.put(EventBuilder.withBody(eventBody));
+    channel.put(EventBuilder.withBody(eventBody));
+    channel.put(EventBuilder.withBody(eventBody));
+    tx.commit();
+    tx.close();
+  }
+
+  @Test
+  public void testByteCapacityBufferChangeConfig() {
+    Context context = new Context();
+    Map<String, String> parms = new HashMap<String, String>();
+    parms.put("byteCapacity", "2000");
+    parms.put("byteCapacityBufferPercentage", "20");
+    context.putAll(parms);
+    Configurables.configure(channel,  context);
+
+    byte[] eventBody = new byte[405];
+
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    channel.put(EventBuilder.withBody(eventBody));
+
+    parms.put("byteCapacity", "1500");
+    context.putAll(parms);
+    Configurables.configure(channel,  context);
+
+    channel.put(EventBuilder.withBody(eventBody));
+    try {
+      channel.put(EventBuilder.withBody(eventBody));
+      Assert.fail();
+    } catch ( ChannelException e ) {
+      //success
+    }
+
+    parms.put("byteCapacity", "2500");
+    parms.put("byteCapacityBufferPercentage", "20");
+    context.putAll(parms);
+    Configurables.configure(channel,  context);
+
+    channel.put(EventBuilder.withBody(eventBody));
+
+    parms.put("byteCapacity", "300");
+    context.putAll(parms);
+    Configurables.configure(channel,  context);
+
+    channel.put(EventBuilder.withBody(eventBody));
+    try {
+      channel.put(EventBuilder.withBody(eventBody));
+      Assert.fail();
+    } catch ( ChannelException e ) {
+      //success
+    }
+
+    parms.put("byteCapacity", "3300");
+    context.putAll(parms);
+    Configurables.configure(channel,  context);
+
+    channel.put(EventBuilder.withBody(eventBody));
+
+    try {
+      channel.put(EventBuilder.withBody(eventBody));
+      Assert.fail();
+    } catch ( ChannelException e ) {
+      //success
+    }
+
+    parms.put("byteCapacity", "4000");
+    context.putAll(parms);
+    Configurables.configure(channel,  context);
+
+    channel.put(EventBuilder.withBody(eventBody));
+
+    try {
+      channel.put(EventBuilder.withBody(eventBody));
+      Assert.fail();
+    } catch ( ChannelException e ) {
+      //success
+    }
+  }
 }

Reply via email to