KAFKA-1865 Add a flush() method to the producer.
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0636928d Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0636928d Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0636928d Branch: refs/heads/trunk Commit: 0636928d961a6ceaab46d908f9372d913c3e5faf Parents: 22ff9e9 Author: Jay Kreps <jay.kr...@gmail.com> Authored: Sat Feb 7 12:01:51 2015 -0800 Committer: Jay Kreps <jay.kr...@gmail.com> Committed: Sat Feb 28 14:11:59 2015 -0800 ---------------------------------------------------------------------- .../java/org/apache/kafka/clients/Metadata.java | 10 +- .../kafka/clients/producer/KafkaProducer.java | 187 ++++++++++++--- .../kafka/clients/producer/MockProducer.java | 5 + .../apache/kafka/clients/producer/Producer.java | 5 + .../kafka/clients/producer/ProducerRecord.java | 20 +- .../internals/FutureRecordMetadata.java | 10 +- .../producer/internals/RecordAccumulator.java | 77 ++++++- .../clients/producer/internals/RecordBatch.java | 13 +- .../kafka/common/errors/InterruptException.java | 34 +++ .../apache/kafka/common/utils/SystemTime.java | 2 +- .../org/apache/kafka/clients/MetadataTest.java | 103 +++++++++ .../kafka/clients/producer/BufferPoolTest.java | 193 ---------------- .../kafka/clients/producer/MetadataTest.java | 95 -------- .../clients/producer/MockProducerTest.java | 6 + .../kafka/clients/producer/PartitionerTest.java | 69 ------ .../clients/producer/RecordAccumulatorTest.java | 207 ----------------- .../kafka/clients/producer/SenderTest.java | 155 ------------- .../producer/internals/BufferPoolTest.java | 193 ++++++++++++++++ .../producer/internals/PartitionerTest.java | 68 ++++++ .../internals/RecordAccumulatorTest.java | 228 +++++++++++++++++++ .../clients/producer/internals/SenderTest.java | 154 +++++++++++++ .../kafka/api/ProducerSendTest.scala | 62 +++-- .../test/scala/unit/kafka/utils/TestUtils.scala | 4 +- 23 files changed, 1097 insertions(+), 803 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/main/java/org/apache/kafka/clients/Metadata.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index e8afecd..c8bde8b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -99,19 +99,15 @@ public final class Metadata { /** * Wait for metadata update until the current version is larger than the last version we know of */ - public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) { + public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException { if (maxWaitMs < 0) { throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds"); } long begin = System.currentTimeMillis(); long remainingWaitMs = maxWaitMs; while (this.version <= lastVersion) { - try { - if (remainingWaitMs != 0) { - wait(remainingWaitMs); - } - } catch (InterruptedException e) { /* this is fine */ - } + if (remainingWaitMs != 0) + wait(remainingWaitMs); long elapsed = System.currentTimeMillis() - begin; if (elapsed >= maxWaitMs) throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 1fd6917..7397e56 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -32,6 +32,7 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.errors.TimeoutException; @@ -55,10 +56,66 @@ import org.slf4j.LoggerFactory; /** * A Kafka client that publishes records to the Kafka cluster. * <P> - * The producer is <i>thread safe</i> and should generally be shared among all threads for best performance. + * The producer is <i>thread safe</i> and sharing a single producer instance across threads will generally be faster than + * having multiple instances. * <p> - * The producer manages a single background thread that does I/O as well as a TCP connection to each of the brokers it - * needs to communicate with. Failure to close the producer after use will leak these resources. + * Here is a simple example of using the producer to send records with strings containing sequential numbers as the key/value + * pairs. + * <pre> + * {@code + * Properties props = new Properties(); + * props.put("bootstrap.servers", "localhost:4242"); + * props.put("acks", "all"); + * props.put("retries", 0); + * props.put("batch.size", 16384); + * props.put("linger.ms", 1); + * props.put("buffer.memory", 33554432); + * props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + * props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + * + * Producer<String, String> producer = new KafkaProducer(props); + * for(int i = 0; i < 100; i++) + * producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i))); + * + * producer.close(); + * }</pre> + * <p> + * The producer consists of a pool of buffer space that holds records that haven't yet been transmitted to the server + * as well as a background I/O thread that is responsible for turning these records into requests and transmitting them + * to the cluster. Failure to close the producer after use will leak these resources. + * <p> + * The {@link #send(ProducerRecord) send()} method is asynchronous. When called it adds the record to a buffer of pending record sends + * and immediately returns. This allows the producer to batch together individual records for efficiency. + * <p> + * The <code>acks</code> config controls the criteria under which requests are considered complete. The "all" setting + * we have specified will result in blocking on the full commit of the record, the slowest but most durable setting. + * <p> + * If the request fails, the producer can automatically retry, though since we have specified <code>retries</code> + * as 0 it won't. Enabling retries also opens up the possibility of duplicates (see the documentation on + * <a href="http://kafka.apache.org/documentation.html#semantics">message delivery semantics</a> for details). + * <p> + * The producer maintains buffers of unsent records for each partition. These buffers are of a size specified by + * the <code>batch.size</code> config. Making this larger can result in more batching, but requires more memory (since we will + * generally have one of these buffers for each active partition). + * <p> + * By default a buffer is available to send immediately even if there is additional unused space in the buffer. However if you + * want to reduce the number of requests you can set <code>linger.ms</code> to something greater than 0. This will + * instruct the producer to wait up to that number of milliseconds before sending a request in hope that more records will + * arrive to fill up the same batch. This is analogous to Nagle's algorithm in TCP. For example, in the code snippet above, + * likely all 100 records would be sent in a single request since we set our linger time to 1 millisecond. However this setting + * would add 1 millisecond of latency to our request waiting for more records to arrive if we didn't fill up the buffer. Note that + * records that arrive close together in time will generally batch together even with <code>linger.ms=0</code> so under heavy load + * batching will occur regardless of the linger configuration; however setting this to something larger than 0 can lead to fewer, more + * efficient requests when not under maximal load at the cost of a small amount of latency. + * <p> + * The <code>buffer.memory</code> controls the total amount of memory available to the producer for buffering. If records + * are sent faster than they can be transmitted to the server then this buffer space will be exhausted. When the buffer space is + * exhausted additional send calls will block. For uses where you want to avoid any blocking you can set <code>block.on.buffer.full=false</code> which + * will cause the send call to result in an exception. + * <p> + * The <code>key.serializer</code> and <code>value.serializer</code> instruct how to turn the key and value objects the user provides with + * their <code>ProducerRecord</code> into bytes. You can use the included {@link org.apache.kafka.common.serialization.ByteArraySerializer} or + * {@link org.apache.kafka.common.serialization.StringSerializer} for simple string or byte types. */ public class KafkaProducer<K, V> implements Producer<K, V> { @@ -241,8 +298,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> { } /** - * Asynchronously send a record to a topic. Equivalent to {@link #send(ProducerRecord, Callback) send(record, null)} - * @param record The record to be sent + * Asynchronously send a record to a topic. Equivalent to <code>send(record, null)</code>. + * See {@link #send(ProducerRecord, Callback)} for details. */ @Override public Future<RecordMetadata> send(ProducerRecord<K, V> record) { @@ -261,53 +318,59 @@ public class KafkaProducer<K, V> implements Producer<K, V> { * <p> * Since the send call is asynchronous it returns a {@link java.util.concurrent.Future Future} for the * {@link RecordMetadata} that will be assigned to this record. Invoking {@link java.util.concurrent.Future#get() - * get()} on this future will result in the metadata for the record or throw any exception that occurred while - * sending the record. + * get()} on this future will block until the associated request completes and then return the metadata for the record + * or throw any exception that occurred while sending the record. * <p> - * If you want to simulate a simple blocking call you can do the following: + * If you want to simulate a simple blocking call you can call the <code>get()</code> method immediately: * - * <pre>{@code - * producer.send(new ProducerRecord<byte[],byte[]>("the-topic", "key".getBytes(), "value".getBytes())).get(); + * <pre> + * {@code + * byte[] key = "key".getBytes(); + * byte[] value = "value".getBytes(); + * ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("my-topic", key, value) + * producer.send(record).get(); * }</pre> * <p> - * Those desiring fully non-blocking usage can make use of the {@link Callback} parameter to provide a callback that + * Fully non-blocking usage can make use of the {@link Callback} parameter to provide a callback that * will be invoked when the request is complete. * - * <pre>{@code - * ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", "key".getBytes(), "value".getBytes()); - * producer.send(myRecord, - * new Callback() { - * public void onCompletion(RecordMetadata metadata, Exception e) { - * if(e != null) - * e.printStackTrace(); - * System.out.println("The offset of the record we just sent is: " + metadata.offset()); - * } - * }); - * }</pre> + * <pre> + * {@code + * ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value); + * producer.send(myRecord, + * new Callback() { + * public void onCompletion(RecordMetadata metadata, Exception e) { + * if(e != null) + * e.printStackTrace(); + * System.out.println("The offset of the record we just sent is: " + metadata.offset()); + * } + * }); + * } + * </pre> * * Callbacks for records being sent to the same partition are guaranteed to execute in order. That is, in the * following example <code>callback1</code> is guaranteed to execute before <code>callback2</code>: * - * <pre>{@code + * <pre> + * {@code * producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1), callback1); * producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key2, value2), callback2); - * }</pre> + * } + * </pre> * <p> * Note that callbacks will generally execute in the I/O thread of the producer and so should be reasonably fast or * they will delay the sending of messages from other threads. If you want to execute blocking or computationally * expensive callbacks it is recommended to use your own {@link java.util.concurrent.Executor} in the callback body * to parallelize processing. - * <p> - * The producer manages a buffer of records waiting to be sent. This buffer has a hard limit on it's size, which is - * controlled by the configuration <code>total.memory.bytes</code>. If <code>send()</code> is called faster than the - * I/O thread can transfer data to the brokers the buffer will eventually run out of space. The default behavior in - * this case is to block the send call until the I/O thread catches up and more buffer space is available. However - * in cases where non-blocking usage is desired the setting <code>block.on.buffer.full=false</code> will cause the - * producer to instead throw an exception when buffer memory is exhausted. * * @param record The record to send * @param callback A user-supplied callback to execute when the record has been acknowledged by the server (null * indicates no callback) + * + * @throws InterruptException If the thread is interrupted while blocked + * @throws SerializationException If the key or value are not valid objects given the configured serializers + * @throws BufferExhaustedException If <code>block.on.buffer.full=false</code> and the buffer is full. + * */ @Override public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { @@ -352,7 +415,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { return new FutureFailure(e); } catch (InterruptedException e) { this.errors.record(); - throw new KafkaException(e); + throw new InterruptException(e); } catch (KafkaException e) { this.errors.record(); throw e; @@ -364,7 +427,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { * @param topic The topic we want metadata for * @param maxWaitMs The maximum time in ms for waiting on the metadata */ - private void waitOnMetadata(String topic, long maxWaitMs) { + private void waitOnMetadata(String topic, long maxWaitMs) throws InterruptedException { if (metadata.fetch().partitionsForTopic(topic) != null) { return; } else { @@ -399,20 +462,72 @@ public class KafkaProducer<K, V> implements Producer<K, V> { ProducerConfig.BUFFER_MEMORY_CONFIG + " configuration."); } + + /** + * Invoking this method makes all buffered records immediately available to send (even if <code>linger.ms</code> is + * greater than 0) and blocks on the completion of the requests associated with these records. The post-condition + * of <code>flush()</code> is that any previously sent record will have completed (e.g. <code>Future.isDone() == true</code>). + * A request is considered completed when it is successfully acknowledged + * according to the <code>acks</code> configuration you have specified or else it results in an error. + * <p> + * Other threads can continue sending records while one thread is blocked waiting for a flush call to complete, + * however no guarantee is made about the completion of records sent after the flush call begins. + * <p> + * This method can be useful when consuming from some input system and producing into Kafka. The <code>flush()</code> call + * gives a convenient way to ensure all previously sent messages have actually completed. + * <p> + * This example shows how to consume from one Kafka topic and produce to another Kafka topic: + * <pre> + * {@code + * for(ConsumerRecord<String, String> record: consumer.poll(100)) + * producer.send(new ProducerRecord("my-topic", record.key(), record.value()); + * producer.flush(); + * consumer.commit(); + * } + * </pre> + * + * Note that the above example may drop records if the produce request fails. If we want to ensure that this does not occur + * we need to set <code>retries=<large_number></code> in our config. + * + * @throws InterruptException If the thread is interrupted while blocked + */ + @Override + public void flush() { + log.trace("Flushing accumulated records in producer."); + this.accumulator.beginFlush(); + this.sender.wakeup(); + try { + this.accumulator.awaitFlushCompletion(); + } catch (InterruptedException e) { + throw new InterruptException("Flush interrupted.", e); + } + } + /** + * Get the partition metadata for the give topic. This can be used for custom partitioning. + * @throws InterruptException If the thread is interrupted while blocked + */ @Override public List<PartitionInfo> partitionsFor(String topic) { - waitOnMetadata(topic, this.metadataFetchTimeoutMs); + try { + waitOnMetadata(topic, this.metadataFetchTimeoutMs); + } catch (InterruptedException e) { + throw new InterruptException(e); + } return this.metadata.fetch().partitionsForTopic(topic); } + /** + * Get the full set of internal metrics maintained by the producer. + */ @Override public Map<MetricName, ? extends Metric> metrics() { return Collections.unmodifiableMap(this.metrics.metrics()); } /** - * Close this producer. This method blocks until all in-flight requests complete. + * Close this producer. This method blocks until all previously sent requests complete. + * @throws InterruptException If the thread is interrupted while blocked */ @Override public void close() { @@ -421,7 +536,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { try { this.ioThread.join(); } catch (InterruptedException e) { - throw new KafkaException(e); + throw new InterruptException(e); } this.metrics.close(); this.keySerializer.close(); http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java index 84530f2..6913090 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java @@ -128,6 +128,11 @@ public class MockProducer implements Producer<byte[], byte[]> { return offset; } } + + public synchronized void flush() { + while (!this.completions.isEmpty()) + completeNext(); + } public List<PartitionInfo> partitionsFor(String topic) { return this.cluster.partitionsForTopic(topic); http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java index 17fe541..5b3e75e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java @@ -45,6 +45,11 @@ public interface Producer<K, V> extends Closeable { * Send a record and invoke the given callback when the record has been acknowledged by the server */ public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback); + + /** + * Flush any accumulated records from the producer. Blocks until all sends are complete. + */ + public void flush(); /** * Get a list of partitions for the given topic for custom partition assignment. The partition metadata will change http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java index 4990692..75cd51e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java @@ -102,15 +102,21 @@ public final class ProducerRecord<K, V> { @Override public boolean equals(Object o) { - if (this == o) return true; - if (!(o instanceof ProducerRecord)) return false; + if (this == o) + return true; + else if (!(o instanceof ProducerRecord)) + return false; - ProducerRecord that = (ProducerRecord) o; + ProducerRecord<?, ?> that = (ProducerRecord<?, ?>) o; - if (key != null ? !key.equals(that.key) : that.key != null) return false; - if (partition != null ? !partition.equals(that.partition) : that.partition != null) return false; - if (topic != null ? !topic.equals(that.topic) : that.topic != null) return false; - if (value != null ? !value.equals(that.value) : that.value != null) return false; + if (key != null ? !key.equals(that.key) : that.key != null) + return false; + else if (partition != null ? !partition.equals(that.partition) : that.partition != null) + return false; + else if (topic != null ? !topic.equals(that.topic) : that.topic != null) + return false; + else if (value != null ? !value.equals(that.value) : that.value != null) + return false; return true; } http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java index 4a2da41..e2d9ca8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java @@ -51,13 +51,17 @@ public final class FutureRecordMetadata implements Future<RecordMetadata> { return valueOrError(); } - private RecordMetadata valueOrError() throws ExecutionException { + RecordMetadata valueOrError() throws ExecutionException { if (this.result.error() != null) throw new ExecutionException(this.result.error()); else - return new RecordMetadata(result.topicPartition(), this.result.baseOffset(), this.relativeOffset); + return value(); } - + + RecordMetadata value() { + return new RecordMetadata(result.topicPartition(), this.result.baseOffset(), this.relativeOffset); + } + public long relativeOffset() { return this.relativeOffset; } http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index ecfe214..d5c79e2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.common.Cluster; @@ -55,6 +56,7 @@ public final class RecordAccumulator { private static final Logger log = LoggerFactory.getLogger(RecordAccumulator.class); private volatile boolean closed; + private volatile AtomicInteger flushesInProgress; private int drainIndex; private final int batchSize; private final long lingerMs; @@ -62,6 +64,7 @@ public final class RecordAccumulator { private final BufferPool free; private final Time time; private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches; + private final IncompleteRecordBatches incomplete; /** * Create a new record accumulator @@ -89,12 +92,14 @@ public final class RecordAccumulator { Map<String, String> metricTags) { this.drainIndex = 0; this.closed = false; + this.flushesInProgress = new AtomicInteger(0); this.batchSize = batchSize; this.lingerMs = lingerMs; this.retryBackoffMs = retryBackoffMs; this.batches = new CopyOnWriteMap<TopicPartition, Deque<RecordBatch>>(); String metricGrpName = "producer-metrics"; this.free = new BufferPool(totalSize, batchSize, blockOnBufferFull, metrics, time , metricGrpName , metricTags); + this.incomplete = new IncompleteRecordBatches(); this.time = time; registerMetrics(metrics, metricGrpName, metricTags); } @@ -146,9 +151,8 @@ public final class RecordAccumulator { RecordBatch last = dq.peekLast(); if (last != null) { FutureRecordMetadata future = last.tryAppend(key, value, callback); - if (future != null) { + if (future != null) return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false); - } } } @@ -161,8 +165,7 @@ public final class RecordAccumulator { if (last != null) { FutureRecordMetadata future = last.tryAppend(key, value, callback); if (future != null) { - // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen - // often... + // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often... free.deallocate(buffer); return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false); } @@ -172,6 +175,7 @@ public final class RecordAccumulator { FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback)); dq.addLast(batch); + incomplete.add(batch); return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true); } } @@ -226,7 +230,7 @@ public final class RecordAccumulator { long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0); boolean full = deque.size() > 1 || batch.records.isFull(); boolean expired = waitedTimeMs >= timeToWaitMs; - boolean sendable = full || expired || exhausted || closed; + boolean sendable = full || expired || exhausted || closed || flushInProgress(); if (sendable && !backingOff) { readyNodes.add(leader); } else { @@ -266,7 +270,6 @@ public final class RecordAccumulator { * @param maxSize The maximum number of bytes to drain * @param now The current unix time in milliseconds * @return A list of {@link RecordBatch} for each node specified with total size less than the requested maxSize. - * TODO: There may be a starvation issue due to iteration order */ public Map<Integer, List<RecordBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) { if (nodes.isEmpty()) @@ -324,8 +327,32 @@ public final class RecordAccumulator { * Deallocate the record batch */ public void deallocate(RecordBatch batch) { + incomplete.remove(batch); free.deallocate(batch.records.buffer(), batch.records.capacity()); } + + /** + * Are there any threads currently waiting on a flush? + */ + private boolean flushInProgress() { + return flushesInProgress.get() > 0; + } + + /** + * Initiate the flushing of data from the accumulator...this makes all requests immediately ready + */ + public void beginFlush() { + this.flushesInProgress.getAndIncrement(); + } + + /** + * Mark all partitions as ready to send and block until the send is complete + */ + public void awaitFlushCompletion() throws InterruptedException { + for (RecordBatch batch: this.incomplete.all()) + batch.produceFuture.await(); + this.flushesInProgress.decrementAndGet(); + } /** * Close this accumulator and force all the record buffers to be drained @@ -334,7 +361,9 @@ public final class RecordAccumulator { this.closed = true; } - + /* + * Metadata about a record just appended to the record accumulator + */ public final static class RecordAppendResult { public final FutureRecordMetadata future; public final boolean batchIsFull; @@ -347,6 +376,9 @@ public final class RecordAccumulator { } } + /* + * The set of nodes that have at least one complete record batch in the accumulator + */ public final static class ReadyCheckResult { public final Set<Node> readyNodes; public final long nextReadyCheckDelayMs; @@ -358,4 +390,35 @@ public final class RecordAccumulator { this.unknownLeadersExist = unknownLeadersExist; } } + + /* + * A threadsafe helper class to hold RecordBatches that haven't been ack'd yet + */ + private final static class IncompleteRecordBatches { + private final Set<RecordBatch> incomplete; + + public IncompleteRecordBatches() { + this.incomplete = new HashSet<RecordBatch>(); + } + + public void add(RecordBatch batch) { + synchronized (incomplete) { + this.incomplete.add(batch); + } + } + + public void remove(RecordBatch batch) { + synchronized (incomplete) { + boolean removed = this.incomplete.remove(batch); + if (!removed) + throw new IllegalStateException("Remove from the incomplete set failed. This should be impossible."); + } + } + + public Iterable<RecordBatch> all() { + synchronized (incomplete) { + return new ArrayList<RecordBatch>(this.incomplete); + } + } + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java index dd0af8a..06182db 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java @@ -16,6 +16,7 @@ import java.util.ArrayList; import java.util.List; import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.Record; @@ -39,7 +40,7 @@ public final class RecordBatch { public long lastAttemptMs; public final MemoryRecords records; public final TopicPartition topicPartition; - private final ProduceRequestResult produceFuture; + public final ProduceRequestResult produceFuture; private final List<Thunk> thunks; public RecordBatch(TopicPartition tp, MemoryRecords records, long now) { @@ -77,7 +78,6 @@ public final class RecordBatch { * @param exception The exception that occurred (or null if the request was successful) */ public void done(long baseOffset, RuntimeException exception) { - this.produceFuture.done(topicPartition, baseOffset, exception); log.trace("Produced messages to topic-partition {} with base offset offset {} and error: {}.", topicPartition, baseOffset, @@ -86,14 +86,17 @@ public final class RecordBatch { for (int i = 0; i < this.thunks.size(); i++) { try { Thunk thunk = this.thunks.get(i); - if (exception == null) - thunk.callback.onCompletion(thunk.future.get(), null); - else + if (exception == null) { + RecordMetadata metadata = new RecordMetadata(this.topicPartition, baseOffset, thunk.future.relativeOffset()); + thunk.callback.onCompletion(metadata, null); + } else { thunk.callback.onCompletion(null, exception); + } } catch (Exception e) { log.error("Error executing user-provided callback on message for topic-partition {}:", topicPartition, e); } } + this.produceFuture.done(topicPartition, baseOffset, exception); } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java b/clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java new file mode 100644 index 0000000..fee322f --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.errors; + +import org.apache.kafka.common.KafkaException; + +/** + * An unchecked wrapper for InterruptedException + */ +public class InterruptException extends KafkaException { + + private static final long serialVersionUID = 1L; + + public InterruptException(InterruptedException cause) { + super(cause); + Thread.currentThread().interrupt(); + } + + public InterruptException(String message, InterruptedException cause) { + super(message, cause); + Thread.currentThread().interrupt(); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java b/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java index d682bd4..18725de 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java @@ -36,7 +36,7 @@ public class SystemTime implements Time { try { Thread.sleep(ms); } catch (InterruptedException e) { - // no stress + // just wake up early } } http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java new file mode 100644 index 0000000..928087d --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.clients; + +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Test; + +import static org.junit.Assert.*; + +public class MetadataTest { + + private long refreshBackoffMs = 100; + private long metadataExpireMs = 1000; + private Metadata metadata = new Metadata(refreshBackoffMs, metadataExpireMs); + private AtomicBoolean backgroundError = new AtomicBoolean(false); + + @After + public void tearDown() { + assertFalse(backgroundError.get()); + } + + @Test + public void testMetadata() throws Exception { + long time = 0; + metadata.update(Cluster.empty(), time); + assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0); + metadata.requestUpdate(); + assertFalse("Still no updated needed due to backoff", metadata.timeToNextUpdate(time) == 0); + time += refreshBackoffMs; + assertTrue("Update needed now that backoff time expired", metadata.timeToNextUpdate(time) == 0); + String topic = "my-topic"; + Thread t1 = asyncFetch(topic); + Thread t2 = asyncFetch(topic); + assertTrue("Awaiting update", t1.isAlive()); + assertTrue("Awaiting update", t2.isAlive()); + metadata.update(TestUtils.singletonCluster(topic, 1), time); + t1.join(); + t2.join(); + assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0); + time += metadataExpireMs; + assertTrue("Update needed due to stale metadata.", metadata.timeToNextUpdate(time) == 0); + } + + /** + * Tests that {@link org.apache.kafka.clients.Metadata#awaitUpdate(int, long)} doesn't + * wait forever with a max timeout value of 0 + * + * @throws Exception + * @see https://issues.apache.org/jira/browse/KAFKA-1836 + */ + @Test + public void testMetadataUpdateWaitTime() throws Exception { + long time = 0; + metadata.update(Cluster.empty(), time); + assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0); + // first try with a max wait time of 0 and ensure that this returns back without waiting forever + try { + metadata.awaitUpdate(metadata.requestUpdate(), 0); + fail("Wait on metadata update was expected to timeout, but it didn't"); + } catch (TimeoutException te) { + // expected + } + // now try with a higher timeout value once + final long twoSecondWait = 2000; + try { + metadata.awaitUpdate(metadata.requestUpdate(), twoSecondWait); + fail("Wait on metadata update was expected to timeout, but it didn't"); + } catch (TimeoutException te) { + // expected + } + } + + private Thread asyncFetch(final String topic) { + Thread thread = new Thread() { + public void run() { + while (metadata.fetch().partitionsForTopic(topic) == null) { + try { + metadata.awaitUpdate(metadata.requestUpdate(), refreshBackoffMs); + } catch (Exception e) { + backgroundError.set(true); + } + } + } + }; + thread.start(); + return thread; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java deleted file mode 100644 index 4ae43ed..0000000 --- a/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java +++ /dev/null @@ -1,193 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.clients.producer; - -import org.apache.kafka.clients.producer.internals.BufferPool; -import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.utils.MockTime; -import org.apache.kafka.test.TestUtils; -import org.junit.Test; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicBoolean; - -import static org.junit.Assert.*; - -public class BufferPoolTest { - private MockTime time = new MockTime(); - private Metrics metrics = new Metrics(time); - String metricGroup = "TestMetrics"; - Map<String, String> metricTags = new LinkedHashMap<String, String>(); - - /** - * Test the simple non-blocking allocation paths - */ - @Test - public void testSimple() throws Exception { - long totalMemory = 64 * 1024; - int size = 1024; - BufferPool pool = new BufferPool(totalMemory, size, false, metrics, time, metricGroup, metricTags); - ByteBuffer buffer = pool.allocate(size); - assertEquals("Buffer size should equal requested size.", size, buffer.limit()); - assertEquals("Unallocated memory should have shrunk", totalMemory - size, pool.unallocatedMemory()); - assertEquals("Available memory should have shrunk", totalMemory - size, pool.availableMemory()); - buffer.putInt(1); - buffer.flip(); - pool.deallocate(buffer); - assertEquals("All memory should be available", totalMemory, pool.availableMemory()); - assertEquals("But now some is on the free list", totalMemory - size, pool.unallocatedMemory()); - buffer = pool.allocate(size); - assertEquals("Recycled buffer should be cleared.", 0, buffer.position()); - assertEquals("Recycled buffer should be cleared.", buffer.capacity(), buffer.limit()); - pool.deallocate(buffer); - assertEquals("All memory should be available", totalMemory, pool.availableMemory()); - assertEquals("Still a single buffer on the free list", totalMemory - size, pool.unallocatedMemory()); - buffer = pool.allocate(2 * size); - pool.deallocate(buffer); - assertEquals("All memory should be available", totalMemory, pool.availableMemory()); - assertEquals("Non-standard size didn't go to the free list.", totalMemory - size, pool.unallocatedMemory()); - } - - /** - * Test that we cannot try to allocate more memory then we have in the whole pool - */ - @Test(expected = IllegalArgumentException.class) - public void testCantAllocateMoreMemoryThanWeHave() throws Exception { - BufferPool pool = new BufferPool(1024, 512, true, metrics, time, metricGroup, metricTags); - ByteBuffer buffer = pool.allocate(1024); - assertEquals(1024, buffer.limit()); - pool.deallocate(buffer); - buffer = pool.allocate(1025); - } - - @Test - public void testNonblockingMode() throws Exception { - BufferPool pool = new BufferPool(2, 1, false, metrics, time, metricGroup, metricTags); - pool.allocate(1); - try { - pool.allocate(2); - fail("The buffer allocated more than it has!"); - } catch (BufferExhaustedException e) { - // this is good - } - } - - /** - * Test that delayed allocation blocks - */ - @Test - public void testDelayedAllocation() throws Exception { - BufferPool pool = new BufferPool(5 * 1024, 1024, true, metrics, time, metricGroup, metricTags); - ByteBuffer buffer = pool.allocate(1024); - CountDownLatch doDealloc = asyncDeallocate(pool, buffer); - CountDownLatch allocation = asyncAllocate(pool, 5 * 1024); - assertEquals("Allocation shouldn't have happened yet, waiting on memory.", 1L, allocation.getCount()); - doDealloc.countDown(); // return the memory - allocation.await(); - } - - private CountDownLatch asyncDeallocate(final BufferPool pool, final ByteBuffer buffer) { - final CountDownLatch latch = new CountDownLatch(1); - Thread thread = new Thread() { - public void run() { - try { - latch.await(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - pool.deallocate(buffer); - } - }; - thread.start(); - return latch; - } - - private CountDownLatch asyncAllocate(final BufferPool pool, final int size) { - final CountDownLatch completed = new CountDownLatch(1); - Thread thread = new Thread() { - public void run() { - try { - pool.allocate(size); - } catch (InterruptedException e) { - e.printStackTrace(); - } finally { - completed.countDown(); - } - } - }; - thread.start(); - return completed; - } - - /** - * This test creates lots of threads that hammer on the pool - */ - @Test - public void testStressfulSituation() throws Exception { - int numThreads = 10; - final int iterations = 50000; - final int poolableSize = 1024; - final int totalMemory = numThreads / 2 * poolableSize; - final BufferPool pool = new BufferPool(totalMemory, poolableSize, true, metrics, time, metricGroup, metricTags); - List<StressTestThread> threads = new ArrayList<StressTestThread>(); - for (int i = 0; i < numThreads; i++) - threads.add(new StressTestThread(pool, iterations)); - for (StressTestThread thread : threads) - thread.start(); - for (StressTestThread thread : threads) - thread.join(); - for (StressTestThread thread : threads) - assertTrue("Thread should have completed all iterations successfully.", thread.success.get()); - assertEquals(totalMemory, pool.availableMemory()); - } - - public static class StressTestThread extends Thread { - private final int iterations; - private final BufferPool pool; - public final AtomicBoolean success = new AtomicBoolean(false); - - public StressTestThread(BufferPool pool, int iterations) { - this.iterations = iterations; - this.pool = pool; - } - - public void run() { - try { - for (int i = 0; i < iterations; i++) { - int size; - if (TestUtils.RANDOM.nextBoolean()) - // allocate poolable size - size = pool.poolableSize(); - else - // allocate a random size - size = TestUtils.RANDOM.nextInt((int) pool.totalMemory()); - ByteBuffer buffer = pool.allocate(size); - pool.deallocate(buffer); - } - success.set(true); - } catch (Exception e) { - e.printStackTrace(); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java deleted file mode 100644 index 743aa7e..0000000 --- a/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java +++ /dev/null @@ -1,95 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package org.apache.kafka.clients.producer; - -import org.apache.kafka.clients.Metadata; -import org.apache.kafka.common.Cluster; -import org.apache.kafka.common.errors.TimeoutException; -import org.apache.kafka.test.TestUtils; -import org.junit.Test; - -import static org.junit.Assert.*; - -public class MetadataTest { - - private long refreshBackoffMs = 100; - private long metadataExpireMs = 1000; - private Metadata metadata = new Metadata(refreshBackoffMs, metadataExpireMs); - - @Test - public void testMetadata() throws Exception { - long time = 0; - metadata.update(Cluster.empty(), time); - assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0); - metadata.requestUpdate(); - assertFalse("Still no updated needed due to backoff", metadata.timeToNextUpdate(time) == 0); - time += refreshBackoffMs; - assertTrue("Update needed now that backoff time expired", metadata.timeToNextUpdate(time) == 0); - String topic = "my-topic"; - Thread t1 = asyncFetch(topic); - Thread t2 = asyncFetch(topic); - assertTrue("Awaiting update", t1.isAlive()); - assertTrue("Awaiting update", t2.isAlive()); - metadata.update(TestUtils.singletonCluster(topic, 1), time); - t1.join(); - t2.join(); - assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0); - time += metadataExpireMs; - assertTrue("Update needed due to stale metadata.", metadata.timeToNextUpdate(time) == 0); - } - - /** - * Tests that {@link org.apache.kafka.clients.Metadata#awaitUpdate(int, long)} doesn't - * wait forever with a max timeout value of 0 - * - * @throws Exception - * @see https://issues.apache.org/jira/browse/KAFKA-1836 - */ - @Test - public void testMetadataUpdateWaitTime() throws Exception { - long time = 0; - metadata.update(Cluster.empty(), time); - assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0); - // first try with a max wait time of 0 and ensure that this returns back without waiting forever - try { - metadata.awaitUpdate(metadata.requestUpdate(), 0); - fail("Wait on metadata update was expected to timeout, but it didn't"); - } catch (TimeoutException te) { - // expected - } - // now try with a higher timeout value once - final long twoSecondWait = 2000; - try { - metadata.awaitUpdate(metadata.requestUpdate(), twoSecondWait); - fail("Wait on metadata update was expected to timeout, but it didn't"); - } catch (TimeoutException te) { - // expected - } - } - - private Thread asyncFetch(final String topic) { - Thread thread = new Thread() { - public void run() { - while (metadata.fetch().partitionsForTopic(topic) == null) { - try { - metadata.awaitUpdate(metadata.requestUpdate(), refreshBackoffMs); - } catch (TimeoutException e) { - // let it go - } - } - } - }; - thread.start(); - return thread; - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java index 75513b0..6372f1a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java @@ -67,6 +67,12 @@ public class MockProducerTest { assertEquals(e, err.getCause()); } assertFalse("No more requests to complete", producer.completeNext()); + + Future<RecordMetadata> md3 = producer.send(record1); + Future<RecordMetadata> md4 = producer.send(record2); + assertTrue("Requests should not be completed.", !md3.isDone() && !md4.isDone()); + producer.flush(); + assertTrue("Requests should be completed.", md3.isDone() && md4.isDone()); } private boolean isError(Future<?> future) { http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java deleted file mode 100644 index 404bedb..0000000 --- a/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java +++ /dev/null @@ -1,69 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package org.apache.kafka.clients.producer; - -import static java.util.Arrays.asList; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.util.List; - -import org.apache.kafka.clients.producer.internals.Partitioner; -import org.apache.kafka.common.Cluster; -import org.apache.kafka.common.Node; -import org.apache.kafka.common.PartitionInfo; -import org.junit.Test; - -public class PartitionerTest { - - private byte[] key = "key".getBytes(); - private Partitioner partitioner = new Partitioner(); - private Node node0 = new Node(0, "localhost", 99); - private Node node1 = new Node(1, "localhost", 100); - private Node node2 = new Node(2, "localhost", 101); - private Node[] nodes = new Node[] {node0, node1, node2}; - private String topic = "test"; - // Intentionally make the partition list not in partition order to test the edge cases. - private List<PartitionInfo> partitions = asList(new PartitionInfo(topic, 1, null, nodes, nodes), - new PartitionInfo(topic, 2, node1, nodes, nodes), - new PartitionInfo(topic, 0, node0, nodes, nodes)); - private Cluster cluster = new Cluster(asList(node0, node1, node2), partitions); - - @Test - public void testUserSuppliedPartitioning() { - assertEquals("If the user supplies a partition we should use it.", 0, partitioner.partition("test", key, 0, cluster)); - } - - @Test - public void testKeyPartitionIsStable() { - int partition = partitioner.partition("test", key, null, cluster); - assertEquals("Same key should yield same partition", partition, partitioner.partition("test", key, null, cluster)); - } - - @Test - public void testRoundRobinWithUnavailablePartitions() { - // When there are some unavailable partitions, we want to make sure that (1) we always pick an available partition, - // and (2) the available partitions are selected in a round robin way. - int countForPart0 = 0; - int countForPart2 = 0; - for (int i = 1; i <= 100; i++) { - int part = partitioner.partition("test", null, null, cluster); - assertTrue("We should never choose a leader-less node in round robin", part == 0 || part == 2); - if (part == 0) - countForPart0++; - else - countForPart2++; - } - assertEquals("The distribution between two available partitions should be even", countForPart0, countForPart2); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java deleted file mode 100644 index 8333863..0000000 --- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java +++ /dev/null @@ -1,207 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package org.apache.kafka.clients.producer; - -import static java.util.Arrays.asList; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertFalse; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.kafka.clients.producer.internals.RecordAccumulator; -import org.apache.kafka.clients.producer.internals.RecordBatch; -import org.apache.kafka.common.Cluster; -import org.apache.kafka.common.Node; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.record.CompressionType; -import org.apache.kafka.common.record.LogEntry; -import org.apache.kafka.common.record.Record; -import org.apache.kafka.common.record.Records; -import org.apache.kafka.common.utils.MockTime; -import org.junit.Test; - -public class RecordAccumulatorTest { - - private String topic = "test"; - private int partition1 = 0; - private int partition2 = 1; - private int partition3 = 2; - private Node node1 = new Node(0, "localhost", 1111); - private Node node2 = new Node(1, "localhost", 1112); - private TopicPartition tp1 = new TopicPartition(topic, partition1); - private TopicPartition tp2 = new TopicPartition(topic, partition2); - private TopicPartition tp3 = new TopicPartition(topic, partition3); - private PartitionInfo part1 = new PartitionInfo(topic, partition1, node1, null, null); - private PartitionInfo part2 = new PartitionInfo(topic, partition2, node1, null, null); - private PartitionInfo part3 = new PartitionInfo(topic, partition3, node2, null, null); - private MockTime time = new MockTime(); - private byte[] key = "key".getBytes(); - private byte[] value = "value".getBytes(); - private int msgSize = Records.LOG_OVERHEAD + Record.recordSize(key, value); - private Cluster cluster = new Cluster(Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3)); - private Metrics metrics = new Metrics(time); - String metricGroup = "TestMetrics"; - Map<String, String> metricTags = new LinkedHashMap<String, String>(); - - @Test - public void testFull() throws Exception { - long now = time.milliseconds(); - RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, 100L, false, metrics, time, metricTags); - int appends = 1024 / msgSize; - for (int i = 0; i < appends; i++) { - accum.append(tp1, key, value, CompressionType.NONE, null); - assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size()); - } - accum.append(tp1, key, value, CompressionType.NONE, null); - assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); - List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id()); - assertEquals(1, batches.size()); - RecordBatch batch = batches.get(0); - Iterator<LogEntry> iter = batch.records.iterator(); - for (int i = 0; i < appends; i++) { - LogEntry entry = iter.next(); - assertEquals("Keys should match", ByteBuffer.wrap(key), entry.record().key()); - assertEquals("Values should match", ByteBuffer.wrap(value), entry.record().value()); - } - assertFalse("No more records", iter.hasNext()); - } - - @Test - public void testAppendLarge() throws Exception { - int batchSize = 512; - RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, 0L, 100L, false, metrics, time, metricTags); - accum.append(tp1, key, new byte[2 * batchSize], CompressionType.NONE, null); - assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); - } - - @Test - public void testLinger() throws Exception { - long lingerMs = 10L; - RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time, metricTags); - accum.append(tp1, key, value, CompressionType.NONE, null); - assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); - time.sleep(10); - assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); - List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id()); - assertEquals(1, batches.size()); - RecordBatch batch = batches.get(0); - Iterator<LogEntry> iter = batch.records.iterator(); - LogEntry entry = iter.next(); - assertEquals("Keys should match", ByteBuffer.wrap(key), entry.record().key()); - assertEquals("Values should match", ByteBuffer.wrap(value), entry.record().value()); - assertFalse("No more records", iter.hasNext()); - } - - @Test - public void testPartialDrain() throws Exception { - RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, 100L, false, metrics, time, metricTags); - int appends = 1024 / msgSize + 1; - List<TopicPartition> partitions = asList(tp1, tp2); - for (TopicPartition tp : partitions) { - for (int i = 0; i < appends; i++) - accum.append(tp, key, value, CompressionType.NONE, null); - } - assertEquals("Partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); - - List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node1), 1024, 0).get(node1.id()); - assertEquals("But due to size bound only one partition should have been retrieved", 1, batches.size()); - } - - @SuppressWarnings("unused") - @Test - public void testStressfulSituation() throws Exception { - final int numThreads = 5; - final int msgs = 10000; - final int numParts = 2; - final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 0L, 100L, true, metrics, time, metricTags); - List<Thread> threads = new ArrayList<Thread>(); - for (int i = 0; i < numThreads; i++) { - threads.add(new Thread() { - public void run() { - for (int i = 0; i < msgs; i++) { - try { - accum.append(new TopicPartition(topic, i % numParts), key, value, CompressionType.NONE, null); - } catch (Exception e) { - e.printStackTrace(); - } - } - } - }); - } - for (Thread t : threads) - t.start(); - int read = 0; - long now = time.milliseconds(); - while (read < numThreads * msgs) { - Set<Node> nodes = accum.ready(cluster, now).readyNodes; - List<RecordBatch> batches = accum.drain(cluster, nodes, 5 * 1024, 0).get(node1.id()); - if (batches != null) { - for (RecordBatch batch : batches) { - for (LogEntry entry : batch.records) - read++; - accum.deallocate(batch); - } - } - } - - for (Thread t : threads) - t.join(); - } - - - @Test - public void testNextReadyCheckDelay() throws Exception { - // Next check time will use lingerMs since this test won't trigger any retries/backoff - long lingerMs = 10L; - RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time, metricTags); - // Just short of going over the limit so we trigger linger time - int appends = 1024 / msgSize; - - // Partition on node1 only - for (int i = 0; i < appends; i++) - accum.append(tp1, key, value, CompressionType.NONE, null); - RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); - assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); - assertEquals("Next check time should be the linger time", lingerMs, result.nextReadyCheckDelayMs); - - time.sleep(lingerMs / 2); - - // Add partition on node2 only - for (int i = 0; i < appends; i++) - accum.append(tp3, key, value, CompressionType.NONE, null); - result = accum.ready(cluster, time.milliseconds()); - assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); - assertEquals("Next check time should be defined by node1, half remaining linger time", lingerMs / 2, result.nextReadyCheckDelayMs); - - // Add data for another partition on node1, enough to make data sendable immediately - for (int i = 0; i < appends + 1; i++) - accum.append(tp2, key, value, CompressionType.NONE, null); - result = accum.ready(cluster, time.milliseconds()); - assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes); - // Note this can actually be < linger time because it may use delays from partitions that aren't sendable - // but have leaders with other sendable data. - assertTrue("Next check time should be defined by node2, at most linger time", result.nextReadyCheckDelayMs <= lingerMs); - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java deleted file mode 100644 index 558942a..0000000 --- a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java +++ /dev/null @@ -1,155 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package org.apache.kafka.clients.producer; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; - -import org.apache.kafka.clients.Metadata; -import org.apache.kafka.clients.MockClient; -import org.apache.kafka.clients.producer.internals.RecordAccumulator; -import org.apache.kafka.clients.producer.internals.Sender; -import org.apache.kafka.common.Cluster; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.protocol.ApiKeys; -import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.ProtoUtils; -import org.apache.kafka.common.protocol.types.Struct; -import org.apache.kafka.common.record.CompressionType; -import org.apache.kafka.common.utils.MockTime; -import org.apache.kafka.test.TestUtils; -import org.junit.Before; -import org.junit.Test; - -public class SenderTest { - - private static final int MAX_REQUEST_SIZE = 1024 * 1024; - private static final short ACKS_ALL = -1; - private static final int MAX_RETRIES = 0; - private static final int REQUEST_TIMEOUT_MS = 10000; - - private TopicPartition tp = new TopicPartition("test", 0); - private MockTime time = new MockTime(); - private MockClient client = new MockClient(time); - private int batchSize = 16 * 1024; - private Metadata metadata = new Metadata(0, Long.MAX_VALUE); - private Cluster cluster = TestUtils.singletonCluster("test", 1); - private Metrics metrics = new Metrics(time); - Map<String, String> metricTags = new LinkedHashMap<String, String>(); - private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 1024 * 1024, 0L, 0L, false, metrics, time, metricTags); - private Sender sender = new Sender(client, - metadata, - this.accumulator, - MAX_REQUEST_SIZE, - ACKS_ALL, - MAX_RETRIES, - REQUEST_TIMEOUT_MS, - metrics, - time, - "clientId"); - - @Before - public void setup() { - metadata.update(cluster, time.milliseconds()); - } - - @Test - public void testSimple() throws Exception { - long offset = 0; - Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null).future; - sender.run(time.milliseconds()); // connect - sender.run(time.milliseconds()); // send produce request - assertEquals("We should have a single produce request in flight.", 1, client.inFlightRequestCount()); - client.respond(produceResponse(tp.topic(), tp.partition(), offset, Errors.NONE.code())); - sender.run(time.milliseconds()); - assertEquals("All requests completed.", offset, (long) client.inFlightRequestCount()); - sender.run(time.milliseconds()); - assertTrue("Request should be completed", future.isDone()); - assertEquals(offset, future.get().offset()); - } - - @Test - public void testRetries() throws Exception { - // create a sender with retries = 1 - int maxRetries = 1; - Sender sender = new Sender(client, - metadata, - this.accumulator, - MAX_REQUEST_SIZE, - ACKS_ALL, - maxRetries, - REQUEST_TIMEOUT_MS, - new Metrics(), - time, - "clientId"); - // do a successful retry - Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null).future; - sender.run(time.milliseconds()); // connect - sender.run(time.milliseconds()); // send produce request - assertEquals(1, client.inFlightRequestCount()); - client.disconnect(client.requests().peek().request().destination()); - assertEquals(0, client.inFlightRequestCount()); - sender.run(time.milliseconds()); // receive error - sender.run(time.milliseconds()); // reconnect - sender.run(time.milliseconds()); // resend - assertEquals(1, client.inFlightRequestCount()); - long offset = 0; - client.respond(produceResponse(tp.topic(), tp.partition(), offset, Errors.NONE.code())); - sender.run(time.milliseconds()); - assertTrue("Request should have retried and completed", future.isDone()); - assertEquals(offset, future.get().offset()); - - // do an unsuccessful retry - future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null).future; - sender.run(time.milliseconds()); // send produce request - for (int i = 0; i < maxRetries + 1; i++) { - client.disconnect(client.requests().peek().request().destination()); - sender.run(time.milliseconds()); // receive error - sender.run(time.milliseconds()); // reconnect - sender.run(time.milliseconds()); // resend - } - sender.run(time.milliseconds()); - completedWithError(future, Errors.NETWORK_EXCEPTION); - } - - private void completedWithError(Future<RecordMetadata> future, Errors error) throws Exception { - assertTrue("Request should be completed", future.isDone()); - try { - future.get(); - fail("Should have thrown an exception."); - } catch (ExecutionException e) { - assertEquals(error.exception().getClass(), e.getCause().getClass()); - } - } - - private Struct produceResponse(String topic, int part, long offset, int error) { - Struct struct = new Struct(ProtoUtils.currentResponseSchema(ApiKeys.PRODUCE.id)); - Struct response = struct.instance("responses"); - response.set("topic", topic); - Struct partResp = response.instance("partition_responses"); - partResp.set("partition", part); - partResp.set("error_code", (short) error); - partResp.set("base_offset", offset); - response.set("partition_responses", new Object[] {partResp}); - struct.set("responses", new Object[] {response}); - return struct; - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/0636928d/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java new file mode 100644 index 0000000..2c69382 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java @@ -0,0 +1,193 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.producer.internals; + +import org.apache.kafka.clients.producer.BufferExhaustedException; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.test.TestUtils; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.junit.Assert.*; + +public class BufferPoolTest { + private MockTime time = new MockTime(); + private Metrics metrics = new Metrics(time); + String metricGroup = "TestMetrics"; + Map<String, String> metricTags = new LinkedHashMap<String, String>(); + + /** + * Test the simple non-blocking allocation paths + */ + @Test + public void testSimple() throws Exception { + long totalMemory = 64 * 1024; + int size = 1024; + BufferPool pool = new BufferPool(totalMemory, size, false, metrics, time, metricGroup, metricTags); + ByteBuffer buffer = pool.allocate(size); + assertEquals("Buffer size should equal requested size.", size, buffer.limit()); + assertEquals("Unallocated memory should have shrunk", totalMemory - size, pool.unallocatedMemory()); + assertEquals("Available memory should have shrunk", totalMemory - size, pool.availableMemory()); + buffer.putInt(1); + buffer.flip(); + pool.deallocate(buffer); + assertEquals("All memory should be available", totalMemory, pool.availableMemory()); + assertEquals("But now some is on the free list", totalMemory - size, pool.unallocatedMemory()); + buffer = pool.allocate(size); + assertEquals("Recycled buffer should be cleared.", 0, buffer.position()); + assertEquals("Recycled buffer should be cleared.", buffer.capacity(), buffer.limit()); + pool.deallocate(buffer); + assertEquals("All memory should be available", totalMemory, pool.availableMemory()); + assertEquals("Still a single buffer on the free list", totalMemory - size, pool.unallocatedMemory()); + buffer = pool.allocate(2 * size); + pool.deallocate(buffer); + assertEquals("All memory should be available", totalMemory, pool.availableMemory()); + assertEquals("Non-standard size didn't go to the free list.", totalMemory - size, pool.unallocatedMemory()); + } + + /** + * Test that we cannot try to allocate more memory then we have in the whole pool + */ + @Test(expected = IllegalArgumentException.class) + public void testCantAllocateMoreMemoryThanWeHave() throws Exception { + BufferPool pool = new BufferPool(1024, 512, true, metrics, time, metricGroup, metricTags); + ByteBuffer buffer = pool.allocate(1024); + assertEquals(1024, buffer.limit()); + pool.deallocate(buffer); + buffer = pool.allocate(1025); + } + + @Test + public void testNonblockingMode() throws Exception { + BufferPool pool = new BufferPool(2, 1, false, metrics, time, metricGroup, metricTags); + pool.allocate(1); + try { + pool.allocate(2); + fail("The buffer allocated more than it has!"); + } catch (BufferExhaustedException e) { + // this is good + } + } + + /** + * Test that delayed allocation blocks + */ + @Test + public void testDelayedAllocation() throws Exception { + BufferPool pool = new BufferPool(5 * 1024, 1024, true, metrics, time, metricGroup, metricTags); + ByteBuffer buffer = pool.allocate(1024); + CountDownLatch doDealloc = asyncDeallocate(pool, buffer); + CountDownLatch allocation = asyncAllocate(pool, 5 * 1024); + assertEquals("Allocation shouldn't have happened yet, waiting on memory.", 1L, allocation.getCount()); + doDealloc.countDown(); // return the memory + allocation.await(); + } + + private CountDownLatch asyncDeallocate(final BufferPool pool, final ByteBuffer buffer) { + final CountDownLatch latch = new CountDownLatch(1); + Thread thread = new Thread() { + public void run() { + try { + latch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + pool.deallocate(buffer); + } + }; + thread.start(); + return latch; + } + + private CountDownLatch asyncAllocate(final BufferPool pool, final int size) { + final CountDownLatch completed = new CountDownLatch(1); + Thread thread = new Thread() { + public void run() { + try { + pool.allocate(size); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + completed.countDown(); + } + } + }; + thread.start(); + return completed; + } + + /** + * This test creates lots of threads that hammer on the pool + */ + @Test + public void testStressfulSituation() throws Exception { + int numThreads = 10; + final int iterations = 50000; + final int poolableSize = 1024; + final long totalMemory = numThreads / 2 * poolableSize; + final BufferPool pool = new BufferPool(totalMemory, poolableSize, true, metrics, time, metricGroup, metricTags); + List<StressTestThread> threads = new ArrayList<StressTestThread>(); + for (int i = 0; i < numThreads; i++) + threads.add(new StressTestThread(pool, iterations)); + for (StressTestThread thread : threads) + thread.start(); + for (StressTestThread thread : threads) + thread.join(); + for (StressTestThread thread : threads) + assertTrue("Thread should have completed all iterations successfully.", thread.success.get()); + assertEquals(totalMemory, pool.availableMemory()); + } + + public static class StressTestThread extends Thread { + private final int iterations; + private final BufferPool pool; + public final AtomicBoolean success = new AtomicBoolean(false); + + public StressTestThread(BufferPool pool, int iterations) { + this.iterations = iterations; + this.pool = pool; + } + + public void run() { + try { + for (int i = 0; i < iterations; i++) { + int size; + if (TestUtils.RANDOM.nextBoolean()) + // allocate poolable size + size = pool.poolableSize(); + else + // allocate a random size + size = TestUtils.RANDOM.nextInt((int) pool.totalMemory()); + ByteBuffer buffer = pool.allocate(size); + pool.deallocate(buffer); + } + success.set(true); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + +}