Hi, Guozhang, Thanks for the KIP. A couple of comments.
1. About the impact on producer batching. My understanding is that typically different sub-topologies in the same task are publishing to different topics. Since the producer batching happens at the topic/partition level, using a producer per task may not impact batching much. 2. When processing.guarantee is set to exactly_once, do we want to enforce acks to all in the producer? The default acks is 1 and may cause acked data to be lost later when the leader changes. Thanks, Jun On Tue, Mar 7, 2017 at 3:42 AM, Damian Guy <damian....@gmail.com> wrote: > Hi Matthias, > > Thanks. The perf test is a good start but I don't think it goes far enough. > 100 partitions is not a lot. What happens when there are thousands of > partitions? What is the load on the brokers? How much more memory is used > by the Streams App etc? > > Thanks, > Damian > > On Tue, 7 Mar 2017 at 03:02 Matthias J. Sax <matth...@confluent.io> wrote: > > > Hi, > > > > I want to give a first respond: > > > > > > > > 1. Producer per task: > > > > First, we did some performance tests, indicating that the performance > > penalty is small. Please have a look here: > > > > https://docs.google.com/spreadsheets/d/18aGOB13- > ibwHJl5VE27HnHBlNkZAIeLF9yQ99FDxkuM/edit?usp=sharing > > > > For the test, we ran with a trunk version and a modified version that > > uses a producer per task (of course, no transactions, but at-least-once > > semantics). The scaling factor indicates the number of brokers and > > (single threaded) Streams instances. We used SimpleBenchmark that is > > part of AK code base. > > > > > > Second, as the design is "producer per task" (and not "producer per > > partition") it is possible to specify a custom PartitionGrouper that > > assigns multiple partitions to a single task. Thus, it allows to reduce > > the number of tasks for scenarios with many partitions. Right now, this > > interface must be implemented solely by the user, but we could also add > > a new config parameter that specifies the max.number.of.tasks or > > partitions.per.task so that the user can configure this instead of > > implementing the interface. > > > > Third, there is the idea of a "Producer Pool" that would allow to share > > resources (network connections, memory, etc) over multiple producers. > > This would allow to separate multiple transaction on the producer level, > > while resources are shared. There is no detailed design document yet and > > there would be a KIP for this feature. > > > > Thus, if there should be any performance problems for high scale > > scenarios, there are multiple ways to tackle them while keeping the > > "producer per task" design. > > > > Additionally, a "producer per thread" design would be way more complex > > and I summarized the issues in a separate document. I will share a link > > to the document soon. > > > > > > > > 2. StateStore recovery: > > > > Streams EoS will in the first design not allow to exploit the > > improvements that are added for 0.11 at the moment. However, as 0.10.2 > > faces the same issues of potentially long recovery, there is no > > regression with this regard. Thus, I see those improvements as > > orthogonal or add-ons. Nevertheless, we should try to explore those > > options and if possible get them into 0.11 such that Streams with EoS > > gets the same improvements as at-least-once scenario. > > > > > > > > 3. Caching: > > > > We might need to do some experiments to quantify the impact on caching. > > If it's severe, the suggested default commit interval of 100ms could > > also be increased. Also, EoS will not enforce any commit interval, but > > only change the default value. Thus, a user can freely trade-off latency > > vs. caching-effect. > > > > Last but not least, there is the idea to allow "read_uncommitted" for > > intermediate topic. This would be an advance design for Streams EoS that > > allows downstream sub-topologies to read uncommitted data > > optimistically. In case of failure, a cascading abort of transactions > > would be required. This change will need another KIP. > > > > > > > > 4. Idempotent Producer: > > > > The transactional part automatically leverages the idempotent properties > > of the producer. Idempotency is a requirement: > > > > > Note that enable.idempotence must be enabled if a TransactionalId is > > configured. > > > > See > > > > https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF > 0wSw9ra8/edit#bookmark=id.g2xsf9n49puh > > > > All idempotent retries, are handled by the producer internally (with or > > without transaction) if enable.idempotence is set to true. > > > > > > > > -Matthias > > > > > > > > On 3/3/17 3:34 AM, Eno Thereska wrote: > > > Another question: > > > > > > The KIP doesn’t exactly spell out how it uses the idempotence guarantee > > from KIP-98. It seems that only the transactional part is needed. Or is > the > > idempotence guarantee working behind the scenes and helping for some > > scenarios for which it is not worthwhile aborting a transaction (e.g., > > retransmitting a record after a temporary network glitch)? > > > > > > Thanks > > > Eno > > > > > >> On Mar 2, 2017, at 4:56 PM, Jay Kreps <j...@confluent.io> wrote: > > >> > > >> I second the concern on with the one producer per task approach. At a > > >> high-level it seems to make sense but I think Damian is exactly right > > that > > >> that cuts against the general design of the producer. Many people have > > high > > >> input partition counts and will have high task counts as a result. I > > think > > >> processing 1000 partitions should not be an unreasonable thing to want > > to > > >> do. > > >> > > >> The tricky bits will be: > > >> > > >> - Reduced effectiveness of batching (or more latency and memory to > get > > >> equivalent batching). This doesn't show up in simple benchmarks > > because > > >> much of the penalty is I/O and CPU on the broker and the additional > > threads > > >> from all the producers can make a single-threaded benchmark seem > > faster. > > >> - TCP connection explosion. We maintain one connection per broker. > > This > > >> is already high since each app instance does this. This design > though > > will > > >> add an additional multiplicative factor based on the partition count > > of the > > >> input. > > >> - Connection and metadata request storms. When an instance with 1000 > > >> tasks starts up it is going to try to create many thousands of > > connections > > >> and issue a thousand metadata requests all at once. > > >> - Memory usage. We currently default to 64MB per producer. This can > be > > >> tuned down, but the fact that we are spreading the batching over > more > > >> producers will fundamentally mean we need a lot more memory to get > > good > > >> perf and the memory usage will change as your task assignment > changes > > so it > > >> will be hard to set correctly unless it is done automatically. > > >> - Metrics explosion (1000 producer instances, each with their own > > >> metrics to monitor). > > >> - Thread explosion, 1000 background threads, one per producer, each > > >> sending data. > > >> > > >> -Jay > > >> > > >> On Wed, Mar 1, 2017 at 3:05 AM, Damian Guy <damian....@gmail.com> > > wrote: > > >> > > >>> Hi Guozhang, > > >>> > > >>> Thanks for the KIP! This is an important feature for Kafka Streams > and > > will > > >>> help to unlock a bunch of use cases. > > >>> > > >>> I have some concerns/questions: > > >>> > > >>> 1. Producer per task: I'm worried about the overhead this is going > to > > >>> put on both the streams app and the Kafka Brokers. You can easily > > >>> imagine > > >>> an app consuming thousands of partitions. What load will this put > on > > the > > >>> brokers? Am i correct in assuming that there will be metadata > > requests > > >>> per > > >>> Producer? The memory overhead in the streams app will also increase > > >>> fairly > > >>> significantly. Should we adjust ProducerConfig.BUFFER_MEMORY_ > CONFIG? > > >>> 2. State Store recovery: As we already know, restoring the entire > > >>> changelog can take an extremely long time. Even with a fairly small > > >>> dataset > > >>> and an inappropriately tuned segment size, this can take way too > > long. > > >>> My > > >>> concern is that failures happen and then recovery takes "forever" > > and we > > >>> end up in a situation where we need to change the max.poll.interval > > to > > >>> be > > >>> some very large number or else we end up in "rebalance hell". I > don't > > >>> think > > >>> this provides a very good user experience. You mention RocksDB > > >>> checkpointing in the doc - should we explore this idea some more? > > i.e., > > >>> understand the penalty for checkpointing. Maybe checkpoint every > *n* > > >>> commits? > > >>> 3. What does EoS mean for Caching? If we set the commit interval to > > >>> 100ms then the cache is not going to be very effective. Should it > > just > > >>> be > > >>> disabled? > > >>> > > >>> Thanks, > > >>> Damian > > >>> > > >>> On Tue, 28 Feb 2017 at 21:54 Guozhang Wang <wangg...@gmail.com> > wrote: > > >>> > > >>>> Hi all, > > >>>> > > >>>> I have just created KIP-129 to leverage KIP-98 in Kafka Streams and > > >>> provide > > >>>> exactly-once processing semantics: > > >>>> > > >>>> > > >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > >>> 129%3A+Streams+Exactly-Once+Semantics > > >>>> > > >>>> This KIP enables Streams users to optionally turn on exactly-once > > >>>> processing semantics without changing their app code at all by > > leveraging > > >>>> the transactional messaging features provided in KIP-98. > > >>>> > > >>>> The above wiki page provides a high-level view of the proposed > > changes, > > >>>> while detailed implementation design can be found in this Google > doc: > > >>>> > > >>>> > > >>>> https://docs.google.com/document/d/1pGZ8xtOOyGwDYgH5vA6h19zOMMadu > > >>> FK1DAB8_gBYA2c > > >>>> > > >>>> We would love to hear your comments and suggestions. > > >>>> > > >>>> Thanks, > > >>>> -- Guozhang > > >>>> > > >>> > > > > > > > >