[ https://issues.apache.org/jira/browse/KAFKA-156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13631474#comment-13631474 ]
Scott Carey commented on KAFKA-156: ----------------------------------- Jay -- I agree, the duplication issue does not depend on whether there is a disk or memory queue. However, in both cases one can choose to dither duplicate messages or drop them on failures. In the in memory case, biasing it to drop a message rather than duplicate on a failure is more acceptable than the on disk case. This is because an in memory queue is more likely to suffer loss than a disk queue. For example, a producer may crash or be kill-9'd and we would expect in flight, in memory data to be lost. My thoughts on this issue are biased by our legacy system -- each producer-equivalent would log locally and then the equivalent of the broker would 'harvest' these logs with no possible duplication. Loss is possible if the disks failed on the client, but that would take down the whole app anyway. Furthermore, we use SSD's on those servers (since late 2008!) and have not had a single SSD drive failure where data was lost (we had a couple have their performance degrade to abysmal levels, but the data was still there). Additionally, we are able to restart / service the nodes that collect the data without data loss because of the local spooling. Replication in Kafka will allow us to do rolling restarts of brokers and achieve similar operational utility. The need for 'spill to disk' is certainly less with replication active. However, it doesn't take us long to fill our entire memory buffer up full of messages on some of our clients -- even a 10 second window of unavailability means losing messages unless we can spill to disk. On your proposal: * What happens if there is a 'bubble' in sequence ids from the broker perspective? What does the broker do? How does the client know to re-send? * What happens when two clients assign themselves the same id? Answer to question on my proposal: * It is assumed that the final batch commit is idempotent, so if the client fails to get the final ACK (step 4, "Acknowledge Batch Commit" it will go back to step 3 and send the batch commit message again. If it is the same broker, it can simply acknowledge since it already committed it. If it is a replica, then there are two cases: a) The other broker has the UUID info (which is replicated?) and can restart the process at the right point. b) Failover to another broker starts the process over at step 1 with the same UUID, and when the broker that crashed comes online the brokers in the replica set reconcile to remove the duplicate. There are a limited number of in flight or recently in flight batches. I think b will work, but I don't know enough about how a broker replica set reconciles in 0.8 when one fails. If we assume strict ordering on whether the replica or the client gets the ACK for a batch commit first, a repair process should be consistent. A two-phase produce doesn't have to be serial from batch to batch -- a few pipelined requests could be supported, but too many could be used for DOS. A high-water-mark approach is more difficult to pipeline, but probably does not need it. One idea I had is far more radical. It boils down to these questions: Why even have a separate producer protocol at all? Why isn't the consumer protocol good enough for getting data to the brokers? I admit, this is tricky and I have not thought through it well; but I think it is worth sharing. The consumer protocol is highly reliable and easy to enforce once-only semantics. If there was some sort of client-initiated broker 'pull' with the consumer protocol, there might be some opportunities for overall simplification in the protocol and more sharing in the code. A producer would be required to assign an offset id and increment per message. The producer would trigger the broker to begin initiate a request to read all of the batches from that starting ID to the "end" , commit it, then start from the last offset to the "end", and repeat. This makes a producer like a broker -- except that it wants to drop data a lot faster, and therefore needs to know how far along the broker is in pulling data down. Perhaps it can safely assume that if batch "1 to 50" was requested, and subsequently batch "51 to 100" is requested, that the request for the latter batch indicates that the first has successfully been committed, but that serializes batches and prevents pipelining. Alternatively the "1 to 50 is committed" message can ride with the "get 51 to 100" request. What I find useful here is the same thing that is great about the consumer protocol: putting the burden on the one obtaining the data to track progress is cleaner in the face of failure. This bears similarity to your proposal, but with inversion of control -- the broker asks for the next batch when it is ready. If there is a broker failure, the replica can pull in messages, and duplicate removal can occur when they reconcile (and the offset id in the topic will be consistent, since both sides use offsets). Producers are then responsible for buffering up to the threshold they can tolerate, and can spool to disk if they please (perhaps re-using some broker code to do so). > Messages should not be dropped when brokers are unavailable > ----------------------------------------------------------- > > Key: KAFKA-156 > URL: https://issues.apache.org/jira/browse/KAFKA-156 > Project: Kafka > Issue Type: Improvement > Reporter: Sharad Agarwal > Fix For: 0.8 > > > When none of the broker is available, producer should spool the messages to > disk and keep retrying for brokers to come back. > This will also enable brokers upgrade/maintenance without message loss. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira