Updated Branches: refs/heads/flume-1.3.0 cdddf5aa8 -> fff91b4ba
FLUME-1535. Ability to specify the capacity of MemoryChannel in bytes. (Ted Malaska via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/fff91b4b Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/fff91b4b Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/fff91b4b Branch: refs/heads/flume-1.3.0 Commit: fff91b4ba5c8529e5e1acd31c04e12464994aad0 Parents: cdddf5a Author: Hari Shreedharan <[email protected]> Authored: Fri Oct 5 15:23:54 2012 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Fri Oct 5 15:24:37 2012 -0700 ---------------------------------------------------------------------- .../org/apache/flume/channel/MemoryChannel.java | 120 ++++++++-- .../apache/flume/channel/TestMemoryChannel.java | 183 ++++++++++++++- 2 files changed, 276 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/fff91b4b/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java b/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java index c72e97c..fc3a1e2 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java +++ b/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java @@ -37,12 +37,18 @@ public class MemoryChannel extends BasicChannelSemantics { private static Logger LOGGER = LoggerFactory.getLogger(MemoryChannel.class); private static final Integer defaultCapacity = 100; private static final Integer defaultTransCapacity = 100; + private static final double byteCapacitySlotSize = 100; + private static final Long defaultByteCapacity = (long)(Runtime.getRuntime().maxMemory() * .80); + private static final Integer defaultByteCapacityBufferPercentage = 20; + private static final Integer defaultKeepAlive = 3; public class MemoryTransaction extends BasicTransactionSemantics { private LinkedBlockingDeque<Event> takeList; private LinkedBlockingDeque<Event> putList; private final ChannelCounter channelCounter; + private int putByteCounter = 0; + private int takeByteCounter = 0; public MemoryTransaction(int transCapacity, ChannelCounter counter) { putList = new LinkedBlockingDeque<Event>(transCapacity); @@ -52,13 +58,24 @@ public class MemoryChannel extends BasicChannelSemantics { } @Override - protected void doPut(Event event) { + protected void doPut(Event event) throws InterruptedException { channelCounter.incrementEventPutAttemptCount(); - if(!putList.offer(event)) { - throw new ChannelException("Put queue for MemoryTransaction of capacity " + - putList.size() + " full, consider committing more frequently, " + - "increasing capacity or increasing thread count"); + int eventByteSize = (int)Math.ceil(estimateEventSize(event)/byteCapacitySlotSize); + + if (bytesRemaining.tryAcquire(eventByteSize, keepAlive, TimeUnit.SECONDS)) { + if(!putList.offer(event)) { + throw new ChannelException("Put queue for MemoryTransaction of capacity " + + putList.size() + " full, consider committing more frequently, " + + "increasing capacity or increasing thread count"); + } + } else { + throw new ChannelException("Put queue for MemoryTransaction of byteCapacity " + + (lastByteCapacity * (int)byteCapacitySlotSize) + " bytes cannot add an " + + " event of size " + estimateEventSize(event) + " bytes because " + + (bytesRemaining.availablePermits() * (int)byteCapacitySlotSize) + " bytes are already used." + + " Try consider comitting more frequently, increasing byteCapacity or increasing thread count"); } + putByteCounter += eventByteSize; } @Override @@ -80,6 +97,9 @@ public class MemoryChannel extends BasicChannelSemantics { "signalling existence of entry"); takeList.put(event); + int eventByteSize = (int)Math.ceil(estimateEventSize(event)/byteCapacitySlotSize); + takeByteCounter += eventByteSize; + return event; } @@ -105,6 +125,10 @@ public class MemoryChannel extends BasicChannelSemantics { putList.clear(); takeList.clear(); } + bytesRemaining.release(takeByteCounter); + takeByteCounter = 0; + putByteCounter = 0; + queueStored.release(puts); if(remainingChange > 0) { queueRemaining.release(remainingChange); @@ -130,6 +154,10 @@ public class MemoryChannel extends BasicChannelSemantics { } putList.clear(); } + bytesRemaining.release(putByteCounter); + putByteCounter = 0; + takeByteCounter = 0; + queueStored.release(takes); channelCounter.setChannelSize(queue.size()); } @@ -155,6 +183,10 @@ public class MemoryChannel extends BasicChannelSemantics { // maximum items in a transaction queue private volatile Integer transCapacity; private volatile int keepAlive; + private volatile int byteCapacity; + private volatile int lastByteCapacity; + private volatile int byteCapacityBufferPercentage; + private Semaphore bytesRemaining; private ChannelCounter channelCounter; @@ -163,37 +195,50 @@ public class MemoryChannel extends BasicChannelSemantics { queueLock = 0; } + /** + * Read parameters from context + * <li>capacity = type long that defines the total number of events allowed at one time in the queue. + * <li>transactionCapacity = type long that defines the total number of events allowed in one transaction. + * <li>byteCapacity = type long that defines the max number of bytes used for events in the queue. + * <li>byteCapacityBufferPercentage = type int that defines the percent of buffer between byteCapacity and the estimated event size. + * <li>keep-alive = type int that defines the number of second to wait for a queue permit + */ @Override public void configure(Context context) { - String strCapacity = context.getString("capacity"); Integer capacity = null; - if(strCapacity == null) { + try { + capacity = context.getInteger("capacity", defaultCapacity); + } catch(NumberFormatException e) { capacity = defaultCapacity; - } else { - try { - capacity = Integer.parseInt(strCapacity); - } catch(NumberFormatException e) { - capacity = defaultCapacity; - } } - String strTransCapacity = context.getString("transactionCapacity"); - if(strTransCapacity == null) { + + try { + transCapacity = context.getInteger("transactionCapacity", defaultTransCapacity); + } catch(NumberFormatException e) { transCapacity = defaultTransCapacity; - } else { - try { - transCapacity = Integer.parseInt(strTransCapacity); - } catch(NumberFormatException e) { - transCapacity = defaultTransCapacity; - } } + Preconditions.checkState(transCapacity <= capacity); - String strKeepAlive = context.getString("keep-alive"); + try { + byteCapacityBufferPercentage = context.getInteger("byteCapacityBufferPercentage", defaultByteCapacityBufferPercentage); + } catch(NumberFormatException e) { + byteCapacityBufferPercentage = defaultByteCapacityBufferPercentage; + } + + try { + byteCapacity = (int)((context.getLong("byteCapacity", defaultByteCapacity).longValue() * (1 - byteCapacityBufferPercentage * .01 )) /byteCapacitySlotSize); + if (byteCapacity < 1) { + byteCapacity = Integer.MAX_VALUE; + } + } catch(NumberFormatException e) { + byteCapacity = (int)((defaultByteCapacity * (1 - byteCapacityBufferPercentage * .01 )) /byteCapacitySlotSize); + } - if (strKeepAlive == null) { + try { + keepAlive = context.getInteger("keep-alive", defaultKeepAlive); + } catch(NumberFormatException e) { keepAlive = defaultKeepAlive; - } else { - keepAlive = Integer.parseInt(strKeepAlive); } if(queue != null) { @@ -210,6 +255,26 @@ public class MemoryChannel extends BasicChannelSemantics { } } + if (bytesRemaining == null) { + bytesRemaining = new Semaphore(byteCapacity); + lastByteCapacity = byteCapacity; + } else { + if (byteCapacity > lastByteCapacity) { + bytesRemaining.release(byteCapacity - lastByteCapacity); + lastByteCapacity = byteCapacity; + } else { + try { + if(!bytesRemaining.tryAcquire(lastByteCapacity - byteCapacity, keepAlive, TimeUnit.SECONDS)) { + LOGGER.warn("Couldn't acquire permits to downsize the byte capacity, resizing has been aborted"); + } else { + lastByteCapacity = byteCapacity; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + if (channelCounter == null) { channelCounter = new ChannelCounter(getName()); } @@ -263,4 +328,9 @@ public class MemoryChannel extends BasicChannelSemantics { protected BasicTransactionSemantics createTransaction() { return new MemoryTransaction(transCapacity, channelCounter); } + + private long estimateEventSize(Event event) + { + return event.getBody().length; + } } http://git-wip-us.apache.org/repos/asf/flume/blob/fff91b4b/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java b/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java index e070864..4af4a40 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java +++ b/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java @@ -176,7 +176,7 @@ public class TestMemoryChannel { } @Test - public void testBufferEmptyingAfterTakeCommit() { + public void testCapacityBufferEmptyingAfterTakeCommit() { Context context = new Context(); Map<String, String> parms = new HashMap<String, String>(); parms.put("capacity", "3"); @@ -208,7 +208,7 @@ public class TestMemoryChannel { } @Test - public void testBufferEmptyingAfterRollback() { + public void testCapacityBufferEmptyingAfterRollback() { Context context = new Context(); Map<String, String> parms = new HashMap<String, String>(); parms.put("capacity", "3"); @@ -232,4 +232,183 @@ public class TestMemoryChannel { tx.commit(); tx.close(); } + + @Test(expected=ChannelException.class) + public void testByteCapacityOverload() { + Context context = new Context(); + Map<String, String> parms = new HashMap<String, String>(); + parms.put("byteCapacity", "2000"); + parms.put("byteCapacityBufferPercentage", "20"); + context.putAll(parms); + Configurables.configure(channel, context); + + byte[] eventBody = new byte[405]; + + Transaction transaction = channel.getTransaction(); + transaction.begin(); + channel.put(EventBuilder.withBody(eventBody)); + channel.put(EventBuilder.withBody(eventBody)); + channel.put(EventBuilder.withBody(eventBody)); + transaction.commit(); + transaction.close(); + + transaction = channel.getTransaction(); + transaction.begin(); + channel.put(EventBuilder.withBody(eventBody)); + channel.put(EventBuilder.withBody(eventBody)); + // this should kill it + transaction.commit(); + Assert.fail(); + + } + + public void testByteCapacityBufferEmptyingAfterTakeCommit() { + Context context = new Context(); + Map<String, String> parms = new HashMap<String, String>(); + parms.put("byteCapacity", "2000"); + parms.put("byteCapacityBufferPercentage", "20"); + context.putAll(parms); + Configurables.configure(channel, context); + + byte[] eventBody = new byte[405]; + + Transaction tx = channel.getTransaction(); + tx.begin(); + channel.put(EventBuilder.withBody(eventBody)); + channel.put(EventBuilder.withBody(eventBody)); + channel.put(EventBuilder.withBody(eventBody)); + channel.put(EventBuilder.withBody(eventBody)); + try { + channel.put(EventBuilder.withBody(eventBody)); + throw new RuntimeException("Put was able to overflow byte capacity."); + } catch (ChannelException ce) + { + //Do nothing + } + + tx.commit(); + tx.close(); + + tx = channel.getTransaction(); + tx.begin(); + channel.take(); + channel.take(); + tx.commit(); + tx.close(); + + tx = channel.getTransaction(); + tx.begin(); + channel.put(EventBuilder.withBody(eventBody)); + channel.put(EventBuilder.withBody(eventBody)); + try { + channel.put(EventBuilder.withBody(eventBody)); + throw new RuntimeException("Put was able to overflow byte capacity."); + } catch (ChannelException ce) + { + //Do nothing + } + tx.commit(); + tx.close(); + } + + @Test + public void testByteCapacityBufferEmptyingAfterRollback() { + Context context = new Context(); + Map<String, String> parms = new HashMap<String, String>(); + parms.put("byteCapacity", "2000"); + parms.put("byteCapacityBufferPercentage", "20"); + context.putAll(parms); + Configurables.configure(channel, context); + + byte[] eventBody = new byte[405]; + + Transaction tx = channel.getTransaction(); + tx.begin(); + channel.put(EventBuilder.withBody(eventBody)); + channel.put(EventBuilder.withBody(eventBody)); + channel.put(EventBuilder.withBody(eventBody)); + tx.rollback(); + tx.close(); + + tx = channel.getTransaction(); + tx.begin(); + channel.put(EventBuilder.withBody(eventBody)); + channel.put(EventBuilder.withBody(eventBody)); + channel.put(EventBuilder.withBody(eventBody)); + tx.commit(); + tx.close(); + } + + @Test + public void testByteCapacityBufferChangeConfig() { + Context context = new Context(); + Map<String, String> parms = new HashMap<String, String>(); + parms.put("byteCapacity", "2000"); + parms.put("byteCapacityBufferPercentage", "20"); + context.putAll(parms); + Configurables.configure(channel, context); + + byte[] eventBody = new byte[405]; + + Transaction tx = channel.getTransaction(); + tx.begin(); + channel.put(EventBuilder.withBody(eventBody)); + + parms.put("byteCapacity", "1500"); + context.putAll(parms); + Configurables.configure(channel, context); + + channel.put(EventBuilder.withBody(eventBody)); + try { + channel.put(EventBuilder.withBody(eventBody)); + Assert.fail(); + } catch ( ChannelException e ) { + //success + } + + parms.put("byteCapacity", "2500"); + parms.put("byteCapacityBufferPercentage", "20"); + context.putAll(parms); + Configurables.configure(channel, context); + + channel.put(EventBuilder.withBody(eventBody)); + + parms.put("byteCapacity", "300"); + context.putAll(parms); + Configurables.configure(channel, context); + + channel.put(EventBuilder.withBody(eventBody)); + try { + channel.put(EventBuilder.withBody(eventBody)); + Assert.fail(); + } catch ( ChannelException e ) { + //success + } + + parms.put("byteCapacity", "3300"); + context.putAll(parms); + Configurables.configure(channel, context); + + channel.put(EventBuilder.withBody(eventBody)); + + try { + channel.put(EventBuilder.withBody(eventBody)); + Assert.fail(); + } catch ( ChannelException e ) { + //success + } + + parms.put("byteCapacity", "4000"); + context.putAll(parms); + Configurables.configure(channel, context); + + channel.put(EventBuilder.withBody(eventBody)); + + try { + channel.put(EventBuilder.withBody(eventBody)); + Assert.fail(); + } catch ( ChannelException e ) { + //success + } + } }
