Based on recent suggestion by Joel, I am experimenting with using flush() to
simulate batched-sync behavior.
The essence of my single threaded producer code is :
for (int i = 0; i < numRecords;) {
// 1- Send a batch
for(int batchCounter=0; batchCounter<batchSz; ++batchCounter) {
Future<RecordMetadata> f = producer.send(record, null);
futureList.add(f);
i++;
}
// 2- Flush after sending batch
producer.flush();
// 3- Ensure all msgs were send
for( Future<RecordMetadata> f : futureList) {
f.get();
}
}
There are actually two batch size in play here. One is the number of messages
between every flush() call made by the client. The other is the batch.size
setting which impacts the batching internally done by the underlying Async api.
Intuitively .. we either want to
A) Set both batch sizes to be Equal, OR
B) Set the underlying batch.size to a sufficiently large number so as to
effectively disable internal batch management
Below numbers are in MB/s. The 'Batch' column indicate the number of events
between each explicit client flush()
Setup is 1-node broker and acks=1.
1 partition
Batch=4k Batch=8k Batch=16k
Equal batchSizes (a) 16 32 52
large batch.Size (b) 140 123 124
4 partitions
Batch=4k Batch=8k Batch=16k
Equal batchSz (a) 35 61 82
large batch.size (b) 7 7 7
8 partitions
Batch=4k Batch=8k Batch=16k
Equal batchSz (a) 49 70 99
large batch.size (b) 7 8 7
There are two issues noticeable in these number:
1 - Case A is much faster than case B for 4 and 8 partitions.
2 - Single partition mode outperforms all others and here case B is faster than
case A.
Side Note: I used the client APIs from the trunk while the broker is running
0.8.2 (I don't think it matters, but nevertheless wanted to point out)