I second Eno's concern regarding the impact of Streams EOS on state stores.
> We do a full recovery today and the EOS proposal will not make this any worse. Yes, today we do a full state store recovery under certain failures. However, I think the point (or perhaps: open question) is that, with the EOS design, there's now an *increased likelihood* of such failures that trigger full state store recovery. If this increase is significant, then I would consider this to be a regression that we should address. As Eno said: > currently we pay the recovery price for a Kafka Streams instance failure. > Now we might pay it for a transaction failure. Will transaction failures be > more or less common than the previous types of failures? Damian voiced similar concerns at the very beginning of this discussion, not sure what his current opinion is here. -Michael On Wed, Mar 22, 2017 at 1:04 AM, Sriram Subramanian <r...@confluent.io> wrote: > To add to this discussion, I do think we should think about this in > increments. We do a full recovery today and the EOS proposal will not make > this any worse. Using store snapshot is a good option to avoid store > recovery in the future but as Eno points out, all pluggable stores would > need to have this ability. W.r.t transaction failures, this should not be > an issue. We should be simply retrying. There is one optimization we can do > for clean shutdowns. We could store a clean shutdown file that contains the > input offsets. This file gets written when you close the streams instance. > On start, you could can check the offsets from the shutdown file and > compare it with the offsets we get from the consumer and ensure they match. > If they do, you could use the same store instead of recovering. However, if > we go with the snapshot approach, this will not be required. My vote would > be to implement V1 and solve the bootstrap problem which exist today in the > future versions. > > On Tue, Mar 21, 2017 at 4:43 PM, Matthias J. Sax <matth...@confluent.io> > wrote: > > > Thanks for your feedback Eno. > > > > For now, I still think that the KIP itself does not need to talk about > > this in more detail, because we apply the same strategy for EoS as for > > non-EoS (as of 0.10.2). > > > > Thus, in case of a clean shutdown, we write the checkpoint file for a > > store and thus know we can reuse the store. In case of failure, we need > > to recreate the store from the changelog. > > > > > Will a V1 design that relies on plain store recovery from Kafka for > > > each transaction abort be good enough, or usable? > > > > Why should it not be usable? It's the same strategy as used in 0.10.2 > > and it runs in production in many companies already. > > > > > however it seems to me we might have a regression of sorts > > > Now we might pay it for a transaction failure. > > > > I would assume transaction failures to be quite rare. Maybe the core EoS > > folks can comment here, too. > > > > > > > > -Matthias > > > > > > > > On 3/20/17 3:16 PM, Eno Thereska wrote: > > > Hi Matthias, > > > > > > I'd like to see some more info on how you propose to handle > transactions > > that involve state stores in the KIP itself. The design doc has info > about > > various optimisations like RocksDb snapshots and transactions and such, > but > > will there be a user-visible interface that indicates whether a store has > > snapshot and/or transactional capabilities? If a user plugs in another > > store, what guarantees are they expected to get? > > > > > > Will a V1 design that relies on plain store recovery from Kafka for > each > > transaction abort be good enough, or usable? If your dataset is large > > (e.g., 200GB) the recovery time might be so large as to effectively > render > > that Kafka Streams instance unavailable for tens of minutes. You mention > > that is not a regression to what we currently have, however it seems to > me > > we might have a regression of sorts: currently we pay the recovery price > > for a Kafka Streams instance failure. Now we might pay it for a > transaction > > failure. Will transaction failures be more or less common than the > previous > > types of failures? I'd like to see this addressed. > > > > > > Thanks > > > Eno > > > > > > > > > > > >> On 15 Mar 2017, at 22:09, Matthias J. Sax <matth...@confluent.io> > > wrote: > > >> > > >> Just a quick follow up: > > >> > > >> Our overall proposal is, to implement KIP-129 as is as a “Stream EoS > > >> 1.0” version. The raised concerns are all valid, but hard to quantify > at > > >> the moment. Implementing KIP-129, that provides a clean design, allows > > >> us to gain more insight in the performance implications. This enables > > >> us, to make an educated decision, if the “producer per task” model > > >> perform wells or not, and if a switch to a “producer per thread” model > > >> is mandatory. > > >> > > >> We also want to point out, that we can move incrementally from > "producer > > >> per task" to "producer per thread" design or apply some incremental > > >> improvements to "producer per task" (as discussed in the doc). Thus, > > >> there is not issue with regard to upgrading. > > >> > > >> > > >> -Matthias > > >> > > >> > > >> > > >> On 3/15/17 2:36 PM, Matthias J. Sax wrote: > > >>> Hi, > > >>> > > >>> I want to pick up this thread again. As there are some concerns about > > >>> the "producer per task" design, we did write up an alternative > > "producer > > >>> per thread" design and discuss pros/cons of both approaches: > > >>> > > >>> https://docs.google.com/document/d/1CfOJaa6mdg5o7pLf_ > > zXISV4oE0ZeMZwT_sG1QWgL4EE > > >>> > > >>> > > >>> Looking forward to your feedback. > > >>> > > >>> > > >>> -Matthias > > >>> > > >>> > > >>> On 3/10/17 3:24 AM, Damian Guy wrote: > > >>>> Hi Matthias, > > >>>> > > >>>> Thanks for the response. I agree with you regarding the use of > > >>>> PartitionGrouper to reduce the number of tasks. It would be good to > > have an > > >>>> idea of any additional load on the brokers as we increase the number > > of > > >>>> tasks and therefore producers. > > >>>> > > >>>> Thanks, > > >>>> Damian > > >>>> > > >>>> On Wed, 8 Mar 2017 at 01:45 Matthias J. Sax <matth...@confluent.io> > > wrote: > > >>>> > > >>>>> Damian, Jun, > > >>>>> > > >>>>> Thanks for your input. > > >>>>> > > >>>>> > > >>>>> About Performance test: > > >>>>> > > >>>>> I can follow up with more performance tests using more partitions > and > > >>>>> also collecting broker metrics. > > >>>>> > > >>>>> However, I want to highlight again, that even if 1000+ partitions > > would > > >>>>> be problematic, one can simply implement PartitionGrouper interface > > and > > >>>>> reduce the number of tasks to 250 or 100... So I am not sure, if we > > >>>>> should block this KIP, even if there might be some performance > > penalty > > >>>>> for currently single partitioned tasks. > > >>>>> > > >>>>> About memory usage. JXM max-heap and max-off-heap did report 256MB > > and > > >>>>> 133MB for all experiments (thus I did not put it in the > spreadsheet). > > >>>>> Thus, using 100 producers (each using a max of 32MB of memory) was > > not > > >>>>> an issue with regard to memory consumption. I did not track > "current > > >>>>> head/off-heap" memory as this would require a more advance test > > setup to > > >>>>> monitor it over time. If you think this would be required, we can > do > > >>>>> some tests though. > > >>>>> > > >>>>> However, as 256 MB was enough memory, and there are other > components > > >>>>> next to the producers using memory, I don't expect a severely > > increased > > >>>>> memory usage. Producer allocate memory on-demand, and if load is > > shared > > >>>>> over multiple producers, overall memory usage should stay the same > > as a > > >>>>> single producer should allocate less memory. > > >>>>> > > >>>>> > > >>>>> About Batching: > > >>>>> > > >>>>> As you can see from the benchmarks (in the detailed view -- I also > > added > > >>>>> some graphs to the summary now) the average batch size gets > slightly > > >>>>> decrease with an increased number of partitions. However, there is > no > > >>>>> big difference between "producer per thread" and "producer per > task" > > >>>>> scenario. > > >>>>> > > >>>>> > > >>>>> About acks: > > >>>>> > > >>>>> This is covered by KIP-98 already. If idempotent producer is use, > > it's > > >>>>> required to set max.in.flight.requests.per.connection=1 and > retries > > > 0 > > >>>>> -- otherwise a config exception will be thrown. For transactions, > > it's > > >>>>> further required that acks=-1 to avoid a config exception. > > >>>>> > > >>>>> Other bits, like min.isr, replication.factor, etc. (ie, all > > broker/topic > > >>>>> configs) are out of scope, and it's user responsibility to set > those > > >>>>> values correctly to ensure transactionality and idempotency. > > >>>>> > > >>>>> > > >>>>> > > >>>>> -Matthias > > >>>>> > > >>>>> > > >>>>> On 3/7/17 9:32 AM, Jun Rao wrote: > > >>>>>> Hi, Guozhang, > > >>>>>> > > >>>>>> Thanks for the KIP. A couple of comments. > > >>>>>> > > >>>>>> 1. About the impact on producer batching. My understanding is that > > >>>>>> typically different sub-topologies in the same task are publishing > > to > > >>>>>> different topics. Since the producer batching happens at the > > >>>>>> topic/partition level, using a producer per task may not impact > > batching > > >>>>>> much. > > >>>>>> > > >>>>>> 2. When processing.guarantee is set to exactly_once, do we want to > > >>>>> enforce > > >>>>>> acks to all in the producer? The default acks is 1 and may cause > > acked > > >>>>> data > > >>>>>> to be lost later when the leader changes. > > >>>>>> > > >>>>>> Thanks, > > >>>>>> > > >>>>>> Jun > > >>>>>> > > >>>>>> On Tue, Mar 7, 2017 at 3:42 AM, Damian Guy <damian....@gmail.com> > > wrote: > > >>>>>> > > >>>>>>> Hi Matthias, > > >>>>>>> > > >>>>>>> Thanks. The perf test is a good start but I don't think it goes > far > > >>>>> enough. > > >>>>>>> 100 partitions is not a lot. What happens when there are > thousands > > of > > >>>>>>> partitions? What is the load on the brokers? How much more memory > > is > > >>>>> used > > >>>>>>> by the Streams App etc? > > >>>>>>> > > >>>>>>> Thanks, > > >>>>>>> Damian > > >>>>>>> > > >>>>>>> On Tue, 7 Mar 2017 at 03:02 Matthias J. Sax < > matth...@confluent.io > > > > > >>>>> wrote: > > >>>>>>> > > >>>>>>>> Hi, > > >>>>>>>> > > >>>>>>>> I want to give a first respond: > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> 1. Producer per task: > > >>>>>>>> > > >>>>>>>> First, we did some performance tests, indicating that the > > performance > > >>>>>>>> penalty is small. Please have a look here: > > >>>>>>>> > > >>>>>>>> https://docs.google.com/spreadsheets/d/18aGOB13- > > >>>>>>> ibwHJl5VE27HnHBlNkZAIeLF9yQ99FDxkuM/edit?usp=sharing > > >>>>>>>> > > >>>>>>>> For the test, we ran with a trunk version and a modified version > > that > > >>>>>>>> uses a producer per task (of course, no transactions, but > > at-least-once > > >>>>>>>> semantics). The scaling factor indicates the number of brokers > and > > >>>>>>>> (single threaded) Streams instances. We used SimpleBenchmark > that > > is > > >>>>>>>> part of AK code base. > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> Second, as the design is "producer per task" (and not "producer > > per > > >>>>>>>> partition") it is possible to specify a custom PartitionGrouper > > that > > >>>>>>>> assigns multiple partitions to a single task. Thus, it allows to > > reduce > > >>>>>>>> the number of tasks for scenarios with many partitions. Right > > now, this > > >>>>>>>> interface must be implemented solely by the user, but we could > > also add > > >>>>>>>> a new config parameter that specifies the max.number.of.tasks or > > >>>>>>>> partitions.per.task so that the user can configure this instead > of > > >>>>>>>> implementing the interface. > > >>>>>>>> > > >>>>>>>> Third, there is the idea of a "Producer Pool" that would allow > to > > share > > >>>>>>>> resources (network connections, memory, etc) over multiple > > producers. > > >>>>>>>> This would allow to separate multiple transaction on the > producer > > >>>>> level, > > >>>>>>>> while resources are shared. There is no detailed design document > > yet > > >>>>> and > > >>>>>>>> there would be a KIP for this feature. > > >>>>>>>> > > >>>>>>>> Thus, if there should be any performance problems for high scale > > >>>>>>>> scenarios, there are multiple ways to tackle them while keeping > > the > > >>>>>>>> "producer per task" design. > > >>>>>>>> > > >>>>>>>> Additionally, a "producer per thread" design would be way more > > complex > > >>>>>>>> and I summarized the issues in a separate document. I will share > > a link > > >>>>>>>> to the document soon. > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> 2. StateStore recovery: > > >>>>>>>> > > >>>>>>>> Streams EoS will in the first design not allow to exploit the > > >>>>>>>> improvements that are added for 0.11 at the moment. However, as > > 0.10.2 > > >>>>>>>> faces the same issues of potentially long recovery, there is no > > >>>>>>>> regression with this regard. Thus, I see those improvements as > > >>>>>>>> orthogonal or add-ons. Nevertheless, we should try to explore > > those > > >>>>>>>> options and if possible get them into 0.11 such that Streams > with > > EoS > > >>>>>>>> gets the same improvements as at-least-once scenario. > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> 3. Caching: > > >>>>>>>> > > >>>>>>>> We might need to do some experiments to quantify the impact on > > caching. > > >>>>>>>> If it's severe, the suggested default commit interval of 100ms > > could > > >>>>>>>> also be increased. Also, EoS will not enforce any commit > > interval, but > > >>>>>>>> only change the default value. Thus, a user can freely trade-off > > >>>>> latency > > >>>>>>>> vs. caching-effect. > > >>>>>>>> > > >>>>>>>> Last but not least, there is the idea to allow > "read_uncommitted" > > for > > >>>>>>>> intermediate topic. This would be an advance design for Streams > > EoS > > >>>>> that > > >>>>>>>> allows downstream sub-topologies to read uncommitted data > > >>>>>>>> optimistically. In case of failure, a cascading abort of > > transactions > > >>>>>>>> would be required. This change will need another KIP. > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> 4. Idempotent Producer: > > >>>>>>>> > > >>>>>>>> The transactional part automatically leverages the idempotent > > >>>>> properties > > >>>>>>>> of the producer. Idempotency is a requirement: > > >>>>>>>> > > >>>>>>>>> Note that enable.idempotence must be enabled if a > > TransactionalId is > > >>>>>>>> configured. > > >>>>>>>> > > >>>>>>>> See > > >>>>>>>> > > >>>>>>>> > > >>>>> https://docs.google.com/document/d/11Jqy_ > > GjUGtdXJK94XGsEIK7CP1SnQGdp2eF > > >>>>>>> 0wSw9ra8/edit#bookmark=id.g2xsf9n49puh > > >>>>>>>> > > >>>>>>>> All idempotent retries, are handled by the producer internally > > (with or > > >>>>>>>> without transaction) if enable.idempotence is set to true. > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> -Matthias > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> On 3/3/17 3:34 AM, Eno Thereska wrote: > > >>>>>>>>> Another question: > > >>>>>>>>> > > >>>>>>>>> The KIP doesn’t exactly spell out how it uses the idempotence > > >>>>> guarantee > > >>>>>>>> from KIP-98. It seems that only the transactional part is > needed. > > Or is > > >>>>>>> the > > >>>>>>>> idempotence guarantee working behind the scenes and helping for > > some > > >>>>>>>> scenarios for which it is not worthwhile aborting a transaction > > (e.g., > > >>>>>>>> retransmitting a record after a temporary network glitch)? > > >>>>>>>>> > > >>>>>>>>> Thanks > > >>>>>>>>> Eno > > >>>>>>>>> > > >>>>>>>>>> On Mar 2, 2017, at 4:56 PM, Jay Kreps <j...@confluent.io> > wrote: > > >>>>>>>>>> > > >>>>>>>>>> I second the concern on with the one producer per task > > approach. At a > > >>>>>>>>>> high-level it seems to make sense but I think Damian is > exactly > > right > > >>>>>>>> that > > >>>>>>>>>> that cuts against the general design of the producer. Many > > people > > >>>>> have > > >>>>>>>> high > > >>>>>>>>>> input partition counts and will have high task counts as a > > result. I > > >>>>>>>> think > > >>>>>>>>>> processing 1000 partitions should not be an unreasonable thing > > to > > >>>>> want > > >>>>>>>> to > > >>>>>>>>>> do. > > >>>>>>>>>> > > >>>>>>>>>> The tricky bits will be: > > >>>>>>>>>> > > >>>>>>>>>> - Reduced effectiveness of batching (or more latency and > > memory to > > >>>>>>> get > > >>>>>>>>>> equivalent batching). This doesn't show up in simple > benchmarks > > >>>>>>>> because > > >>>>>>>>>> much of the penalty is I/O and CPU on the broker and the > > additional > > >>>>>>>> threads > > >>>>>>>>>> from all the producers can make a single-threaded benchmark > > seem > > >>>>>>>> faster. > > >>>>>>>>>> - TCP connection explosion. We maintain one connection per > > broker. > > >>>>>>>> This > > >>>>>>>>>> is already high since each app instance does this. This > design > > >>>>>>> though > > >>>>>>>> will > > >>>>>>>>>> add an additional multiplicative factor based on the > partition > > >>>>> count > > >>>>>>>> of the > > >>>>>>>>>> input. > > >>>>>>>>>> - Connection and metadata request storms. When an instance > with > > >>>>> 1000 > > >>>>>>>>>> tasks starts up it is going to try to create many thousands > of > > >>>>>>>> connections > > >>>>>>>>>> and issue a thousand metadata requests all at once. > > >>>>>>>>>> - Memory usage. We currently default to 64MB per producer. > > This can > > >>>>>>> be > > >>>>>>>>>> tuned down, but the fact that we are spreading the batching > > over > > >>>>>>> more > > >>>>>>>>>> producers will fundamentally mean we need a lot more memory > to > > get > > >>>>>>>> good > > >>>>>>>>>> perf and the memory usage will change as your task assignment > > >>>>>>> changes > > >>>>>>>> so it > > >>>>>>>>>> will be hard to set correctly unless it is done > automatically. > > >>>>>>>>>> - Metrics explosion (1000 producer instances, each with their > > own > > >>>>>>>>>> metrics to monitor). > > >>>>>>>>>> - Thread explosion, 1000 background threads, one per > producer, > > each > > >>>>>>>>>> sending data. > > >>>>>>>>>> > > >>>>>>>>>> -Jay > > >>>>>>>>>> > > >>>>>>>>>> On Wed, Mar 1, 2017 at 3:05 AM, Damian Guy < > > damian....@gmail.com> > > >>>>>>>> wrote: > > >>>>>>>>>> > > >>>>>>>>>>> Hi Guozhang, > > >>>>>>>>>>> > > >>>>>>>>>>> Thanks for the KIP! This is an important feature for Kafka > > Streams > > >>>>>>> and > > >>>>>>>> will > > >>>>>>>>>>> help to unlock a bunch of use cases. > > >>>>>>>>>>> > > >>>>>>>>>>> I have some concerns/questions: > > >>>>>>>>>>> > > >>>>>>>>>>> 1. Producer per task: I'm worried about the overhead this is > > going > > >>>>>>> to > > >>>>>>>>>>> put on both the streams app and the Kafka Brokers. You can > > easily > > >>>>>>>>>>> imagine > > >>>>>>>>>>> an app consuming thousands of partitions. What load will > this > > put > > >>>>>>> on > > >>>>>>>> the > > >>>>>>>>>>> brokers? Am i correct in assuming that there will be > metadata > > >>>>>>>> requests > > >>>>>>>>>>> per > > >>>>>>>>>>> Producer? The memory overhead in the streams app will also > > >>>>> increase > > >>>>>>>>>>> fairly > > >>>>>>>>>>> significantly. Should we adjust > ProducerConfig.BUFFER_MEMORY_ > > >>>>>>> CONFIG? > > >>>>>>>>>>> 2. State Store recovery: As we already know, restoring the > > entire > > >>>>>>>>>>> changelog can take an extremely long time. Even with a > fairly > > >>>>> small > > >>>>>>>>>>> dataset > > >>>>>>>>>>> and an inappropriately tuned segment size, this can take way > > too > > >>>>>>>> long. > > >>>>>>>>>>> My > > >>>>>>>>>>> concern is that failures happen and then recovery takes > > "forever" > > >>>>>>>> and we > > >>>>>>>>>>> end up in a situation where we need to change the > > >>>>> max.poll.interval > > >>>>>>>> to > > >>>>>>>>>>> be > > >>>>>>>>>>> some very large number or else we end up in "rebalance > hell". > > I > > >>>>>>> don't > > >>>>>>>>>>> think > > >>>>>>>>>>> this provides a very good user experience. You mention > RocksDB > > >>>>>>>>>>> checkpointing in the doc - should we explore this idea some > > more? > > >>>>>>>> i.e., > > >>>>>>>>>>> understand the penalty for checkpointing. Maybe checkpoint > > every > > >>>>>>> *n* > > >>>>>>>>>>> commits? > > >>>>>>>>>>> 3. What does EoS mean for Caching? If we set the commit > > interval > > >>>>> to > > >>>>>>>>>>> 100ms then the cache is not going to be very effective. > > Should it > > >>>>>>>> just > > >>>>>>>>>>> be > > >>>>>>>>>>> disabled? > > >>>>>>>>>>> > > >>>>>>>>>>> Thanks, > > >>>>>>>>>>> Damian > > >>>>>>>>>>> > > >>>>>>>>>>> On Tue, 28 Feb 2017 at 21:54 Guozhang Wang < > wangg...@gmail.com > > > > > >>>>>>> wrote: > > >>>>>>>>>>> > > >>>>>>>>>>>> Hi all, > > >>>>>>>>>>>> > > >>>>>>>>>>>> I have just created KIP-129 to leverage KIP-98 in Kafka > > Streams and > > >>>>>>>>>>> provide > > >>>>>>>>>>>> exactly-once processing semantics: > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > >>>>>>>>>>> 129%3A+Streams+Exactly-Once+Semantics > > >>>>>>>>>>>> > > >>>>>>>>>>>> This KIP enables Streams users to optionally turn on > > exactly-once > > >>>>>>>>>>>> processing semantics without changing their app code at all > by > > >>>>>>>> leveraging > > >>>>>>>>>>>> the transactional messaging features provided in KIP-98. > > >>>>>>>>>>>> > > >>>>>>>>>>>> The above wiki page provides a high-level view of the > proposed > > >>>>>>>> changes, > > >>>>>>>>>>>> while detailed implementation design can be found in this > > Google > > >>>>>>> doc: > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> https://docs.google.com/document/d/ > > 1pGZ8xtOOyGwDYgH5vA6h19zOMMadu > > >>>>>>>>>>> FK1DAB8_gBYA2c > > >>>>>>>>>>>> > > >>>>>>>>>>>> We would love to hear your comments and suggestions. > > >>>>>>>>>>>> > > >>>>>>>>>>>> Thanks, > > >>>>>>>>>>>> -- Guozhang > > >>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>>> > > >>>>> > > >>>>> > > >>>> > > >>> > > >> > > > > > > > >