[jira] [Commented] (KAFKA-1955) Explore disk-based buffering in new Kafka Producer

2017-06-09 Thread Brandon Bradley (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16044512#comment-16044512
 ] 

Brandon Bradley commented on KAFKA-1955:


[~cmccabe] I was trying to stick to this scope:

> So the scope of this feature would just be to provide a bigger buffer for 
> short outages or latency spikes in the Kafka cluster during which you would 
> hope you don't also experience failures in your producer processes.

So, many of the issues you've suggested would be safely ignored under that 
scope. If recovery is required as part of the scope, the malloc approach will 
not work.

> Explore disk-based buffering in new Kafka Producer
> --
>
> Key: KAFKA-1955
> URL: https://issues.apache.org/jira/browse/KAFKA-1955
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Affects Versions: 0.8.2.0
>Reporter: Jay Kreps
>Assignee: Jay Kreps
> Attachments: KAFKA-1955.patch, 
> KAFKA-1955-RABASED-TO-8th-AUG-2015.patch
>
>
> There are two approaches to using Kafka for capturing event data that has no 
> other "source of truth store":
> 1. Just write to Kafka and try hard to keep the Kafka cluster up as you would 
> a database.
> 2. Write to some kind of local disk store and copy from that to Kafka.
> The cons of the second approach are the following:
> 1. You end up depending on disks on all the producer machines. If you have 
> 1 producers, that is 10k places state is kept. These tend to fail a lot.
> 2. You can get data arbitrarily delayed
> 3. You still don't tolerate hard outages since there is no replication in the 
> producer tier
> 4. This tends to make problems with duplicates more common in certain failure 
> scenarios.
> There is one big pro, though: you don't have to keep Kafka running all the 
> time.
> So far we have done nothing in Kafka to help support approach (2), but people 
> have built a lot of buffering things. It's not clear that this is necessarily 
> bad.
> However implementing this in the new Kafka producer might actually be quite 
> easy. Here is an idea for how to do it. Implementation of this idea is 
> probably pretty easy but it would require some pretty thorough testing to see 
> if it was a success.
> The new producer maintains a pool of ByteBuffer instances which it attempts 
> to recycle and uses to buffer and send messages. When unsent data is queuing 
> waiting to be sent to the cluster it is hanging out in this pool.
> One approach to implementing a disk-baked buffer would be to slightly 
> generalize this so that the buffer pool has the option to use a mmap'd file 
> backend for it's ByteBuffers. When the BufferPool was created with a 
> totalMemory setting of 1GB it would preallocate a 1GB sparse file and memory 
> map it, then chop the file into batchSize MappedByteBuffer pieces and 
> populate it's buffer with those.
> Everything else would work normally except now all the buffered data would be 
> disk backed and in cases where there was significant backlog these would 
> start to fill up and page out.
> We currently allow messages larger than batchSize and to handle these we do a 
> one-off allocation of the necessary size. We would have to disallow this when 
> running in mmap mode. However since the disk buffer will be really big this 
> should not be a significant limitation as the batch size can be pretty big.
> We would want to ensure that the pooling always gives out the most recently 
> used ByteBuffer (I think it does). This way under normal operation where 
> requests are processed quickly a given buffer would be reused many times 
> before any physical disk write activity occurred.
> Note that although this let's the producer buffer very large amounts of data 
> the buffer isn't really fault-tolerant, since the ordering in the file isn't 
> known so there is no easy way to recovery the producer's buffer in a failure. 
> So the scope of this feature would just be to provide a bigger buffer for 
> short outages or latency spikes in the Kafka cluster during which you would 
> hope you don't also experience failures in your producer processes.
> To complete the feature we would need to:
> a. Get some unit tests that would cover disk-backed usage
> b. Do some manual performance testing of this usage and understand the impact 
> on throughput.
> c. Do some manual testing of failure cases (i.e. if the broker goes down for 
> 30 seconds we should be able to keep taking writes) and observe how well the 
> producer handles the catch up time when it has a large backlog to get rid of.
> d. Add a new configuration for the producer to enable this, something like 
> use.file.buffers=true/false.
> e. Add documentation that covers these new options.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-1955) Explore disk-based buffering in new Kafka Producer

2017-06-08 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16043494#comment-16043494
 ] 

Jay Kreps commented on KAFKA-1955:
--

I think the patch I submitted was kind of a cool hack, but after thinking about 
it I don't think it is really what you want.

Here are the considerations I thought we should probably think through:
1. How will recovery work? The patch I gave didn't have a mechanism to recover 
from a failure. In the case of a OS crash the OS gives very weak guarantees on 
what is on disk for any data that hasn't been fsync'd. Not only can arbitrary 
bits of data be missing but it is even possible with some FS configurations to 
get arbitrary corrupt blocks that haven't been zero'd yet. I think to get this 
right you need a commit log and recovery proceedure that verifies unsync'd data 
on startup. I'm not 100% sure you can do this with just the buffer pool, though 
maybe you can.
2. What are the ordering guarantees for buffered data?
3. How does this interact with transactions/EOS?
4. Should it be the case that all writes go through the commit log or should it 
be the case that only failures are journaled. If you journal all writes prior 
to sending to the server, the problem is that that amounts to significant 
overhead and leads to the possibility that logging or other I/O can slow you 
down. If you journal only failures you have the problem that your throughput 
may be very high in the non-failure scenario and then when Kafka goes down 
suddenly you start doing I/O but that is much slower and your throughput drops 
precipitously. Either may be okay but it is worth thinking through what the 
right behavior is.

> Explore disk-based buffering in new Kafka Producer
> --
>
> Key: KAFKA-1955
> URL: https://issues.apache.org/jira/browse/KAFKA-1955
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Affects Versions: 0.8.2.0
>Reporter: Jay Kreps
>Assignee: Jay Kreps
> Attachments: KAFKA-1955.patch, 
> KAFKA-1955-RABASED-TO-8th-AUG-2015.patch
>
>
> There are two approaches to using Kafka for capturing event data that has no 
> other "source of truth store":
> 1. Just write to Kafka and try hard to keep the Kafka cluster up as you would 
> a database.
> 2. Write to some kind of local disk store and copy from that to Kafka.
> The cons of the second approach are the following:
> 1. You end up depending on disks on all the producer machines. If you have 
> 1 producers, that is 10k places state is kept. These tend to fail a lot.
> 2. You can get data arbitrarily delayed
> 3. You still don't tolerate hard outages since there is no replication in the 
> producer tier
> 4. This tends to make problems with duplicates more common in certain failure 
> scenarios.
> There is one big pro, though: you don't have to keep Kafka running all the 
> time.
> So far we have done nothing in Kafka to help support approach (2), but people 
> have built a lot of buffering things. It's not clear that this is necessarily 
> bad.
> However implementing this in the new Kafka producer might actually be quite 
> easy. Here is an idea for how to do it. Implementation of this idea is 
> probably pretty easy but it would require some pretty thorough testing to see 
> if it was a success.
> The new producer maintains a pool of ByteBuffer instances which it attempts 
> to recycle and uses to buffer and send messages. When unsent data is queuing 
> waiting to be sent to the cluster it is hanging out in this pool.
> One approach to implementing a disk-baked buffer would be to slightly 
> generalize this so that the buffer pool has the option to use a mmap'd file 
> backend for it's ByteBuffers. When the BufferPool was created with a 
> totalMemory setting of 1GB it would preallocate a 1GB sparse file and memory 
> map it, then chop the file into batchSize MappedByteBuffer pieces and 
> populate it's buffer with those.
> Everything else would work normally except now all the buffered data would be 
> disk backed and in cases where there was significant backlog these would 
> start to fill up and page out.
> We currently allow messages larger than batchSize and to handle these we do a 
> one-off allocation of the necessary size. We would have to disallow this when 
> running in mmap mode. However since the disk buffer will be really big this 
> should not be a significant limitation as the batch size can be pretty big.
> We would want to ensure that the pooling always gives out the most recently 
> used ByteBuffer (I think it does). This way under normal operation where 
> requests are processed quickly a given buffer would be reused many times 
> before any physical disk write activity occurred.
> Note that although this let's the producer buffer very large amounts of data 
> 

[jira] [Commented] (KAFKA-1955) Explore disk-based buffering in new Kafka Producer

2017-05-23 Thread Brandon Bradley (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16021683#comment-16021683
 ] 

Brandon Bradley commented on KAFKA-1955:


I've implemented a barebones malloc/free on top of MappedByteBuffer that passes 
the simple test. It has diverged quite a bit from the current PR. I'm not sure 
if I should start a new PR or force push on top of the current one. My 
intuition is to start a new one to permanently show my previous progress.

In addition, I'm still looking for CI access if anyone can oblige. I've tried 
all the channels I know (dev mailing list, the PR, IRC).

> Explore disk-based buffering in new Kafka Producer
> --
>
> Key: KAFKA-1955
> URL: https://issues.apache.org/jira/browse/KAFKA-1955
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Affects Versions: 0.8.2.0
>Reporter: Jay Kreps
>Assignee: Jay Kreps
> Attachments: KAFKA-1955.patch, 
> KAFKA-1955-RABASED-TO-8th-AUG-2015.patch
>
>
> There are two approaches to using Kafka for capturing event data that has no 
> other "source of truth store":
> 1. Just write to Kafka and try hard to keep the Kafka cluster up as you would 
> a database.
> 2. Write to some kind of local disk store and copy from that to Kafka.
> The cons of the second approach are the following:
> 1. You end up depending on disks on all the producer machines. If you have 
> 1 producers, that is 10k places state is kept. These tend to fail a lot.
> 2. You can get data arbitrarily delayed
> 3. You still don't tolerate hard outages since there is no replication in the 
> producer tier
> 4. This tends to make problems with duplicates more common in certain failure 
> scenarios.
> There is one big pro, though: you don't have to keep Kafka running all the 
> time.
> So far we have done nothing in Kafka to help support approach (2), but people 
> have built a lot of buffering things. It's not clear that this is necessarily 
> bad.
> However implementing this in the new Kafka producer might actually be quite 
> easy. Here is an idea for how to do it. Implementation of this idea is 
> probably pretty easy but it would require some pretty thorough testing to see 
> if it was a success.
> The new producer maintains a pool of ByteBuffer instances which it attempts 
> to recycle and uses to buffer and send messages. When unsent data is queuing 
> waiting to be sent to the cluster it is hanging out in this pool.
> One approach to implementing a disk-baked buffer would be to slightly 
> generalize this so that the buffer pool has the option to use a mmap'd file 
> backend for it's ByteBuffers. When the BufferPool was created with a 
> totalMemory setting of 1GB it would preallocate a 1GB sparse file and memory 
> map it, then chop the file into batchSize MappedByteBuffer pieces and 
> populate it's buffer with those.
> Everything else would work normally except now all the buffered data would be 
> disk backed and in cases where there was significant backlog these would 
> start to fill up and page out.
> We currently allow messages larger than batchSize and to handle these we do a 
> one-off allocation of the necessary size. We would have to disallow this when 
> running in mmap mode. However since the disk buffer will be really big this 
> should not be a significant limitation as the batch size can be pretty big.
> We would want to ensure that the pooling always gives out the most recently 
> used ByteBuffer (I think it does). This way under normal operation where 
> requests are processed quickly a given buffer would be reused many times 
> before any physical disk write activity occurred.
> Note that although this let's the producer buffer very large amounts of data 
> the buffer isn't really fault-tolerant, since the ordering in the file isn't 
> known so there is no easy way to recovery the producer's buffer in a failure. 
> So the scope of this feature would just be to provide a bigger buffer for 
> short outages or latency spikes in the Kafka cluster during which you would 
> hope you don't also experience failures in your producer processes.
> To complete the feature we would need to:
> a. Get some unit tests that would cover disk-backed usage
> b. Do some manual performance testing of this usage and understand the impact 
> on throughput.
> c. Do some manual testing of failure cases (i.e. if the broker goes down for 
> 30 seconds we should be able to keep taking writes) and observe how well the 
> producer handles the catch up time when it has a large backlog to get rid of.
> d. Add a new configuration for the producer to enable this, something like 
> use.file.buffers=true/false.
> e. Add documentation that covers these new options.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-1955) Explore disk-based buffering in new Kafka Producer

2017-05-22 Thread Brandon Bradley (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16019726#comment-16019726
 ] 

Brandon Bradley commented on KAFKA-1955:


Ok, I am wrong. There are two instances where the current implementation reuses 
buffers.

> Explore disk-based buffering in new Kafka Producer
> --
>
> Key: KAFKA-1955
> URL: https://issues.apache.org/jira/browse/KAFKA-1955
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Affects Versions: 0.8.2.0
>Reporter: Jay Kreps
>Assignee: Jay Kreps
> Attachments: KAFKA-1955.patch, 
> KAFKA-1955-RABASED-TO-8th-AUG-2015.patch
>
>
> There are two approaches to using Kafka for capturing event data that has no 
> other "source of truth store":
> 1. Just write to Kafka and try hard to keep the Kafka cluster up as you would 
> a database.
> 2. Write to some kind of local disk store and copy from that to Kafka.
> The cons of the second approach are the following:
> 1. You end up depending on disks on all the producer machines. If you have 
> 1 producers, that is 10k places state is kept. These tend to fail a lot.
> 2. You can get data arbitrarily delayed
> 3. You still don't tolerate hard outages since there is no replication in the 
> producer tier
> 4. This tends to make problems with duplicates more common in certain failure 
> scenarios.
> There is one big pro, though: you don't have to keep Kafka running all the 
> time.
> So far we have done nothing in Kafka to help support approach (2), but people 
> have built a lot of buffering things. It's not clear that this is necessarily 
> bad.
> However implementing this in the new Kafka producer might actually be quite 
> easy. Here is an idea for how to do it. Implementation of this idea is 
> probably pretty easy but it would require some pretty thorough testing to see 
> if it was a success.
> The new producer maintains a pool of ByteBuffer instances which it attempts 
> to recycle and uses to buffer and send messages. When unsent data is queuing 
> waiting to be sent to the cluster it is hanging out in this pool.
> One approach to implementing a disk-baked buffer would be to slightly 
> generalize this so that the buffer pool has the option to use a mmap'd file 
> backend for it's ByteBuffers. When the BufferPool was created with a 
> totalMemory setting of 1GB it would preallocate a 1GB sparse file and memory 
> map it, then chop the file into batchSize MappedByteBuffer pieces and 
> populate it's buffer with those.
> Everything else would work normally except now all the buffered data would be 
> disk backed and in cases where there was significant backlog these would 
> start to fill up and page out.
> We currently allow messages larger than batchSize and to handle these we do a 
> one-off allocation of the necessary size. We would have to disallow this when 
> running in mmap mode. However since the disk buffer will be really big this 
> should not be a significant limitation as the batch size can be pretty big.
> We would want to ensure that the pooling always gives out the most recently 
> used ByteBuffer (I think it does). This way under normal operation where 
> requests are processed quickly a given buffer would be reused many times 
> before any physical disk write activity occurred.
> Note that although this let's the producer buffer very large amounts of data 
> the buffer isn't really fault-tolerant, since the ordering in the file isn't 
> known so there is no easy way to recovery the producer's buffer in a failure. 
> So the scope of this feature would just be to provide a bigger buffer for 
> short outages or latency spikes in the Kafka cluster during which you would 
> hope you don't also experience failures in your producer processes.
> To complete the feature we would need to:
> a. Get some unit tests that would cover disk-backed usage
> b. Do some manual performance testing of this usage and understand the impact 
> on throughput.
> c. Do some manual testing of failure cases (i.e. if the broker goes down for 
> 30 seconds we should be able to keep taking writes) and observe how well the 
> producer handles the catch up time when it has a large backlog to get rid of.
> d. Add a new configuration for the producer to enable this, something like 
> use.file.buffers=true/false.
> e. Add documentation that covers these new options.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-1955) Explore disk-based buffering in new Kafka Producer

2017-05-22 Thread Brandon Bradley (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16019697#comment-16019697
 ] 

Brandon Bradley commented on KAFKA-1955:


The current `BufferPool` implementation does not actually reuse any buffers 
(directly). It bounds the buffer space for the pool and tracks how much space 
has been allocated from the heap for the pool. It may even be possible _not_ to 
use a free list in this implementation, but that is not the issue here.

I believe a malloc/free implementation over `MappedByteBuffer` will be the best 
choice. This will allow the producer buffers to use a file like a piece of 
memory at the cost of maintaining a more complex free list.

> Explore disk-based buffering in new Kafka Producer
> --
>
> Key: KAFKA-1955
> URL: https://issues.apache.org/jira/browse/KAFKA-1955
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Affects Versions: 0.8.2.0
>Reporter: Jay Kreps
>Assignee: Jay Kreps
> Attachments: KAFKA-1955.patch, 
> KAFKA-1955-RABASED-TO-8th-AUG-2015.patch
>
>
> There are two approaches to using Kafka for capturing event data that has no 
> other "source of truth store":
> 1. Just write to Kafka and try hard to keep the Kafka cluster up as you would 
> a database.
> 2. Write to some kind of local disk store and copy from that to Kafka.
> The cons of the second approach are the following:
> 1. You end up depending on disks on all the producer machines. If you have 
> 1 producers, that is 10k places state is kept. These tend to fail a lot.
> 2. You can get data arbitrarily delayed
> 3. You still don't tolerate hard outages since there is no replication in the 
> producer tier
> 4. This tends to make problems with duplicates more common in certain failure 
> scenarios.
> There is one big pro, though: you don't have to keep Kafka running all the 
> time.
> So far we have done nothing in Kafka to help support approach (2), but people 
> have built a lot of buffering things. It's not clear that this is necessarily 
> bad.
> However implementing this in the new Kafka producer might actually be quite 
> easy. Here is an idea for how to do it. Implementation of this idea is 
> probably pretty easy but it would require some pretty thorough testing to see 
> if it was a success.
> The new producer maintains a pool of ByteBuffer instances which it attempts 
> to recycle and uses to buffer and send messages. When unsent data is queuing 
> waiting to be sent to the cluster it is hanging out in this pool.
> One approach to implementing a disk-baked buffer would be to slightly 
> generalize this so that the buffer pool has the option to use a mmap'd file 
> backend for it's ByteBuffers. When the BufferPool was created with a 
> totalMemory setting of 1GB it would preallocate a 1GB sparse file and memory 
> map it, then chop the file into batchSize MappedByteBuffer pieces and 
> populate it's buffer with those.
> Everything else would work normally except now all the buffered data would be 
> disk backed and in cases where there was significant backlog these would 
> start to fill up and page out.
> We currently allow messages larger than batchSize and to handle these we do a 
> one-off allocation of the necessary size. We would have to disallow this when 
> running in mmap mode. However since the disk buffer will be really big this 
> should not be a significant limitation as the batch size can be pretty big.
> We would want to ensure that the pooling always gives out the most recently 
> used ByteBuffer (I think it does). This way under normal operation where 
> requests are processed quickly a given buffer would be reused many times 
> before any physical disk write activity occurred.
> Note that although this let's the producer buffer very large amounts of data 
> the buffer isn't really fault-tolerant, since the ordering in the file isn't 
> known so there is no easy way to recovery the producer's buffer in a failure. 
> So the scope of this feature would just be to provide a bigger buffer for 
> short outages or latency spikes in the Kafka cluster during which you would 
> hope you don't also experience failures in your producer processes.
> To complete the feature we would need to:
> a. Get some unit tests that would cover disk-backed usage
> b. Do some manual performance testing of this usage and understand the impact 
> on throughput.
> c. Do some manual testing of failure cases (i.e. if the broker goes down for 
> 30 seconds we should be able to keep taking writes) and observe how well the 
> producer handles the catch up time when it has a large backlog to get rid of.
> d. Add a new configuration for the producer to enable this, something like 
> use.file.buffers=true/false.
> e. Add documentation that covers these new 

[jira] [Commented] (KAFKA-1955) Explore disk-based buffering in new Kafka Producer

2017-05-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015046#comment-16015046
 ] 

ASF GitHub Bot commented on KAFKA-1955:
---

GitHub user blbradley opened a pull request:

https://github.com/apache/kafka/pull/3083

KAFKA-1955: [WIP] Disk based buffer in Producer

Based on patch from @jkreps in [this JIRA 
ticket](https://issues.apache.org/jira/browse/KAFKA-1955).

- [ ] Get some unit tests that would cover disk-backed usage
- [ ] Do some manual performance testing of this usage and understand the 
impact on throughput.
- [ ] Do some manual testing of failure cases (i.e. if the broker goes down 
for 30 seconds we should be able to keep taking writes) and observe how well 
the producer handles the catch up time when it has a large backlog to get rid 
of.
- [ ] Add a new configuration for the producer to enable this, something 
like use.file.buffers=true/false.
- [ ] Add documentation that covers these new options.

I've brought the patch into sync with trunk. Testing is next, which I've 
started on. I am flexible on how this can be implemented.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/blbradley/kafka kafka-disk-buffer

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3083.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3083


commit 6b29fc95c394283ff4f2410ad37f7c8fcbd0d8d7
Author: Brandon Bradley 
Date:   2017-05-17T17:12:53Z

WIP: KAFKA-1955 August 8th 2015 rebase

commit 75d2af1d7f8dda4e2fe41da60455d813d655edd0
Author: Brandon Bradley 
Date:   2017-05-17T22:43:47Z

Merge branch 'trunk' into kafka-disk-buffer

patch works against trunk test suite

commit d3c765db789eef2fe71eca7a45dbca72e356f346
Author: Brandon Bradley 
Date:   2017-05-17T23:14:34Z

fix imports, add whitespace from diff

commit b58118c6413a5e900f5c1ebee112bd24e8d4b119
Author: Brandon Bradley 
Date:   2017-05-17T23:34:04Z

simple file buffer test

commit cd389f073eca18effa6449d9934aea0f90e84139
Author: Brandon Bradley 
Date:   2017-05-17T23:35:21Z

failing unallocated memory check

commit 49b6860e6c3be4bac62937dc835d5b6f97c7ff11
Author: Brandon Bradley 
Date:   2017-05-18T00:35:29Z

allocate buffer dynamically, passing tests

commit ed7aab5357fe9d7805dcb305d0318fb4ea770550
Author: Brandon Bradley 
Date:   2017-05-18T00:46:10Z

failing allocated memory check

commit 875ac83096199e35307a7ef47772907607aba1f1
Author: Brandon Bradley 
Date:   2017-05-18T00:56:47Z

do not add to free list during allocation

commit 4223e14896f4609d5bef80e97ee6d9982d2127a5
Author: Brandon Bradley 
Date:   2017-05-18T01:20:46Z

add license




> Explore disk-based buffering in new Kafka Producer
> --
>
> Key: KAFKA-1955
> URL: https://issues.apache.org/jira/browse/KAFKA-1955
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Affects Versions: 0.8.2.0
>Reporter: Jay Kreps
>Assignee: Jay Kreps
> Attachments: KAFKA-1955.patch, 
> KAFKA-1955-RABASED-TO-8th-AUG-2015.patch
>
>
> There are two approaches to using Kafka for capturing event data that has no 
> other "source of truth store":
> 1. Just write to Kafka and try hard to keep the Kafka cluster up as you would 
> a database.
> 2. Write to some kind of local disk store and copy from that to Kafka.
> The cons of the second approach are the following:
> 1. You end up depending on disks on all the producer machines. If you have 
> 1 producers, that is 10k places state is kept. These tend to fail a lot.
> 2. You can get data arbitrarily delayed
> 3. You still don't tolerate hard outages since there is no replication in the 
> producer tier
> 4. This tends to make problems with duplicates more common in certain failure 
> scenarios.
> There is one big pro, though: you don't have to keep Kafka running all the 
> time.
> So far we have done nothing in Kafka to help support approach (2), but people 
> have built a lot of buffering things. It's not clear that this is necessarily 
> bad.
> However implementing this in the new Kafka producer might actually be quite 
> easy. Here is an idea for how to do it. Implementation of this idea is 
> probably pretty easy but it would require some pretty thorough testing to see 
> if it was a success.
> The new producer maintains a pool of ByteBuffer instances which it 

[jira] [Commented] (KAFKA-1955) Explore disk-based buffering in new Kafka Producer

2017-05-17 Thread Brandon Bradley (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16014550#comment-16014550
 ] 

Brandon Bradley commented on KAFKA-1955:


The rebased patch applies cleanly to 68ad80f8. I'm trying to get it updated to 
trunk and submit a proper pull request.

> Explore disk-based buffering in new Kafka Producer
> --
>
> Key: KAFKA-1955
> URL: https://issues.apache.org/jira/browse/KAFKA-1955
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Affects Versions: 0.8.2.0
>Reporter: Jay Kreps
>Assignee: Jay Kreps
> Attachments: KAFKA-1955.patch, 
> KAFKA-1955-RABASED-TO-8th-AUG-2015.patch
>
>
> There are two approaches to using Kafka for capturing event data that has no 
> other "source of truth store":
> 1. Just write to Kafka and try hard to keep the Kafka cluster up as you would 
> a database.
> 2. Write to some kind of local disk store and copy from that to Kafka.
> The cons of the second approach are the following:
> 1. You end up depending on disks on all the producer machines. If you have 
> 1 producers, that is 10k places state is kept. These tend to fail a lot.
> 2. You can get data arbitrarily delayed
> 3. You still don't tolerate hard outages since there is no replication in the 
> producer tier
> 4. This tends to make problems with duplicates more common in certain failure 
> scenarios.
> There is one big pro, though: you don't have to keep Kafka running all the 
> time.
> So far we have done nothing in Kafka to help support approach (2), but people 
> have built a lot of buffering things. It's not clear that this is necessarily 
> bad.
> However implementing this in the new Kafka producer might actually be quite 
> easy. Here is an idea for how to do it. Implementation of this idea is 
> probably pretty easy but it would require some pretty thorough testing to see 
> if it was a success.
> The new producer maintains a pool of ByteBuffer instances which it attempts 
> to recycle and uses to buffer and send messages. When unsent data is queuing 
> waiting to be sent to the cluster it is hanging out in this pool.
> One approach to implementing a disk-baked buffer would be to slightly 
> generalize this so that the buffer pool has the option to use a mmap'd file 
> backend for it's ByteBuffers. When the BufferPool was created with a 
> totalMemory setting of 1GB it would preallocate a 1GB sparse file and memory 
> map it, then chop the file into batchSize MappedByteBuffer pieces and 
> populate it's buffer with those.
> Everything else would work normally except now all the buffered data would be 
> disk backed and in cases where there was significant backlog these would 
> start to fill up and page out.
> We currently allow messages larger than batchSize and to handle these we do a 
> one-off allocation of the necessary size. We would have to disallow this when 
> running in mmap mode. However since the disk buffer will be really big this 
> should not be a significant limitation as the batch size can be pretty big.
> We would want to ensure that the pooling always gives out the most recently 
> used ByteBuffer (I think it does). This way under normal operation where 
> requests are processed quickly a given buffer would be reused many times 
> before any physical disk write activity occurred.
> Note that although this let's the producer buffer very large amounts of data 
> the buffer isn't really fault-tolerant, since the ordering in the file isn't 
> known so there is no easy way to recovery the producer's buffer in a failure. 
> So the scope of this feature would just be to provide a bigger buffer for 
> short outages or latency spikes in the Kafka cluster during which you would 
> hope you don't also experience failures in your producer processes.
> To complete the feature we would need to:
> a. Get some unit tests that would cover disk-backed usage
> b. Do some manual performance testing of this usage and understand the impact 
> on throughput.
> c. Do some manual testing of failure cases (i.e. if the broker goes down for 
> 30 seconds we should be able to keep taking writes) and observe how well the 
> producer handles the catch up time when it has a large backlog to get rid of.
> d. Add a new configuration for the producer to enable this, something like 
> use.file.buffers=true/false.
> e. Add documentation that covers these new options.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-1955) Explore disk-based buffering in new Kafka Producer

2015-09-21 Thread Rahul Amaram (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14900525#comment-14900525
 ] 

Rahul Amaram commented on KAFKA-1955:
-

I do not understand the implementation details completely but we are planning 
to use this for storing a really huge buffer of 80 GB of millions of events. 
What would happen in such a scenario? What are the possible downsides of using 
such a huge disk buffer?

> Explore disk-based buffering in new Kafka Producer
> --
>
> Key: KAFKA-1955
> URL: https://issues.apache.org/jira/browse/KAFKA-1955
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Affects Versions: 0.8.2.0
>Reporter: Jay Kreps
>Assignee: Jay Kreps
> Attachments: KAFKA-1955-RABASED-TO-8th-AUG-2015.patch, 
> KAFKA-1955.patch
>
>
> There are two approaches to using Kafka for capturing event data that has no 
> other "source of truth store":
> 1. Just write to Kafka and try hard to keep the Kafka cluster up as you would 
> a database.
> 2. Write to some kind of local disk store and copy from that to Kafka.
> The cons of the second approach are the following:
> 1. You end up depending on disks on all the producer machines. If you have 
> 1 producers, that is 10k places state is kept. These tend to fail a lot.
> 2. You can get data arbitrarily delayed
> 3. You still don't tolerate hard outages since there is no replication in the 
> producer tier
> 4. This tends to make problems with duplicates more common in certain failure 
> scenarios.
> There is one big pro, though: you don't have to keep Kafka running all the 
> time.
> So far we have done nothing in Kafka to help support approach (2), but people 
> have built a lot of buffering things. It's not clear that this is necessarily 
> bad.
> However implementing this in the new Kafka producer might actually be quite 
> easy. Here is an idea for how to do it. Implementation of this idea is 
> probably pretty easy but it would require some pretty thorough testing to see 
> if it was a success.
> The new producer maintains a pool of ByteBuffer instances which it attempts 
> to recycle and uses to buffer and send messages. When unsent data is queuing 
> waiting to be sent to the cluster it is hanging out in this pool.
> One approach to implementing a disk-baked buffer would be to slightly 
> generalize this so that the buffer pool has the option to use a mmap'd file 
> backend for it's ByteBuffers. When the BufferPool was created with a 
> totalMemory setting of 1GB it would preallocate a 1GB sparse file and memory 
> map it, then chop the file into batchSize MappedByteBuffer pieces and 
> populate it's buffer with those.
> Everything else would work normally except now all the buffered data would be 
> disk backed and in cases where there was significant backlog these would 
> start to fill up and page out.
> We currently allow messages larger than batchSize and to handle these we do a 
> one-off allocation of the necessary size. We would have to disallow this when 
> running in mmap mode. However since the disk buffer will be really big this 
> should not be a significant limitation as the batch size can be pretty big.
> We would want to ensure that the pooling always gives out the most recently 
> used ByteBuffer (I think it does). This way under normal operation where 
> requests are processed quickly a given buffer would be reused many times 
> before any physical disk write activity occurred.
> Note that although this let's the producer buffer very large amounts of data 
> the buffer isn't really fault-tolerant, since the ordering in the file isn't 
> known so there is no easy way to recovery the producer's buffer in a failure. 
> So the scope of this feature would just be to provide a bigger buffer for 
> short outages or latency spikes in the Kafka cluster during which you would 
> hope you don't also experience failures in your producer processes.
> To complete the feature we would need to:
> a. Get some unit tests that would cover disk-backed usage
> b. Do some manual performance testing of this usage and understand the impact 
> on throughput.
> c. Do some manual testing of failure cases (i.e. if the broker goes down for 
> 30 seconds we should be able to keep taking writes) and observe how well the 
> producer handles the catch up time when it has a large backlog to get rid of.
> d. Add a new configuration for the producer to enable this, something like 
> use.file.buffers=true/false.
> e. Add documentation that covers these new options.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1955) Explore disk-based buffering in new Kafka Producer

2015-05-10 Thread Yoel Amram (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14537214#comment-14537214
 ] 

Yoel Amram commented on KAFKA-1955:
---

It could really help if there would be a way to provision the producer and get 
info about the number of events buffered to disk or any other information that 
would indicate a transmission lag or a situation where backlog is starting to 
fill up.
This can then be used by clients both for statistics purposes or even for 
blocking inbound traffic until the buffer is consumed.
 

 Explore disk-based buffering in new Kafka Producer
 --

 Key: KAFKA-1955
 URL: https://issues.apache.org/jira/browse/KAFKA-1955
 Project: Kafka
  Issue Type: Improvement
  Components: producer 
Affects Versions: 0.8.2.0
Reporter: Jay Kreps
Assignee: Jay Kreps
 Attachments: KAFKA-1955.patch


 There are two approaches to using Kafka for capturing event data that has no 
 other source of truth store:
 1. Just write to Kafka and try hard to keep the Kafka cluster up as you would 
 a database.
 2. Write to some kind of local disk store and copy from that to Kafka.
 The cons of the second approach are the following:
 1. You end up depending on disks on all the producer machines. If you have 
 1 producers, that is 10k places state is kept. These tend to fail a lot.
 2. You can get data arbitrarily delayed
 3. You still don't tolerate hard outages since there is no replication in the 
 producer tier
 4. This tends to make problems with duplicates more common in certain failure 
 scenarios.
 There is one big pro, though: you don't have to keep Kafka running all the 
 time.
 So far we have done nothing in Kafka to help support approach (2), but people 
 have built a lot of buffering things. It's not clear that this is necessarily 
 bad.
 However implementing this in the new Kafka producer might actually be quite 
 easy. Here is an idea for how to do it. Implementation of this idea is 
 probably pretty easy but it would require some pretty thorough testing to see 
 if it was a success.
 The new producer maintains a pool of ByteBuffer instances which it attempts 
 to recycle and uses to buffer and send messages. When unsent data is queuing 
 waiting to be sent to the cluster it is hanging out in this pool.
 One approach to implementing a disk-baked buffer would be to slightly 
 generalize this so that the buffer pool has the option to use a mmap'd file 
 backend for it's ByteBuffers. When the BufferPool was created with a 
 totalMemory setting of 1GB it would preallocate a 1GB sparse file and memory 
 map it, then chop the file into batchSize MappedByteBuffer pieces and 
 populate it's buffer with those.
 Everything else would work normally except now all the buffered data would be 
 disk backed and in cases where there was significant backlog these would 
 start to fill up and page out.
 We currently allow messages larger than batchSize and to handle these we do a 
 one-off allocation of the necessary size. We would have to disallow this when 
 running in mmap mode. However since the disk buffer will be really big this 
 should not be a significant limitation as the batch size can be pretty big.
 We would want to ensure that the pooling always gives out the most recently 
 used ByteBuffer (I think it does). This way under normal operation where 
 requests are processed quickly a given buffer would be reused many times 
 before any physical disk write activity occurred.
 Note that although this let's the producer buffer very large amounts of data 
 the buffer isn't really fault-tolerant, since the ordering in the file isn't 
 known so there is no easy way to recovery the producer's buffer in a failure. 
 So the scope of this feature would just be to provide a bigger buffer for 
 short outages or latency spikes in the Kafka cluster during which you would 
 hope you don't also experience failures in your producer processes.
 To complete the feature we would need to:
 a. Get some unit tests that would cover disk-backed usage
 b. Do some manual performance testing of this usage and understand the impact 
 on throughput.
 c. Do some manual testing of failure cases (i.e. if the broker goes down for 
 30 seconds we should be able to keep taking writes) and observe how well the 
 producer handles the catch up time when it has a large backlog to get rid of.
 d. Add a new configuration for the producer to enable this, something like 
 use.file.buffers=true/false.
 e. Add documentation that covers these new options.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1955) Explore disk-based buffering in new Kafka Producer

2015-02-14 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14321719#comment-14321719
 ] 

Jay Kreps commented on KAFKA-1955:
--

Created reviewboard https://reviews.apache.org/r/31052/diff/
 against branch trunk

 Explore disk-based buffering in new Kafka Producer
 --

 Key: KAFKA-1955
 URL: https://issues.apache.org/jira/browse/KAFKA-1955
 Project: Kafka
  Issue Type: Improvement
  Components: producer 
Affects Versions: 0.8.2.0
Reporter: Jay Kreps
 Attachments: KAFKA-1955.patch


 There are two approaches to using Kafka for capturing event data that has no 
 other source of truth store:
 1. Just write to Kafka and try hard to keep the Kafka cluster up as you would 
 a database.
 2. Write to some kind of local disk store and copy from that to Kafka.
 The cons of the second approach are the following:
 1. You end up depending on disks on all the producer machines. If you have 
 1 producers, that is 10k places state is kept. These tend to fail a lot.
 2. You can get data arbitrarily delayed
 3. You still don't tolerate hard outages since there is no replication in the 
 producer tier
 4. This tends to make problems with duplicates more common in certain failure 
 scenarios.
 There is one big pro, though: you don't have to keep Kafka running all the 
 time.
 So far we have done nothing in Kafka to help support approach (2), but people 
 have built a lot of buffering things. It's not clear that this is necessarily 
 bad.
 However implementing this in the new Kafka producer might actually be quite 
 easy. Here is an idea for how to do it. Implementation of this idea is 
 probably pretty easy but it would require some pretty thorough testing to see 
 if it was a success.
 The new producer maintains a pool of ByteBuffer instances which it attempts 
 to recycle and uses to buffer and send messages. When unsent data is queuing 
 waiting to be sent to the cluster it is hanging out in this pool.
 One approach to implementing a disk-baked buffer would be to slightly 
 generalize this so that the buffer pool has the option to use a mmap'd file 
 backend for it's ByteBuffers. When the BufferPool was created with a 
 totalMemory setting of 1GB it would preallocate a 1GB sparse file and memory 
 map it, then chop the file into batchSize MappedByteBuffer pieces and 
 populate it's buffer with those.
 Everything else would work normally except now all the buffered data would be 
 disk backed and in cases where there was significant backlog these would 
 start to fill up and page out.
 We currently allow messages larger than batchSize and to handle these we do a 
 one-off allocation of the necessary size. We would have to disallow this when 
 running in mmap mode. However since the disk buffer will be really big this 
 should not be a significant limitation as the batch size can be pretty big.
 We would want to ensure that the pooling always gives out the most recently 
 used ByteBuffer (I think it does). This way under normal operation where 
 requests are processed quickly a given buffer would be reused many times 
 before any physical disk write activity occurred.
 Note that although this let's the producer buffer very large amounts of data 
 the buffer isn't really fault-tolerant, since the ordering in the file isn't 
 known so there is no easy way to recovery the producer's buffer in a failure. 
 So the scope of this feature would just be to provide a bigger buffer for 
 short outages or latency spikes in the Kafka cluster during which you would 
 hope you don't also experience failures in your producer processes.
 To complete the feature we would need to:
 a. Get some unit tests that would cover disk-backed usage
 b. Do some manual performance testing of this usage and understand the impact 
 on throughput.
 c. Do some manual testing of failure cases (i.e. if the broker goes down for 
 30 seconds we should be able to keep taking writes) and observe how well the 
 producer handles the catch up time when it has a large backlog to get rid of.
 d. Add a new configuration for the producer to enable this, something like 
 use.file.buffers=true/false.
 e. Add documentation that covers these new options.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)