Hi, Guozhang,

Thanks for the KIP. A couple of comments.

1. About the impact on producer batching. My understanding is that
typically different sub-topologies in the same task are publishing to
different topics. Since the producer batching happens at the
topic/partition level, using a producer per task may not impact batching
much.

2. When processing.guarantee is set to exactly_once, do we want to enforce
acks to all in the producer? The default acks is 1 and may cause acked data
to be lost later when the leader changes.

Thanks,

Jun

On Tue, Mar 7, 2017 at 3:42 AM, Damian Guy <damian....@gmail.com> wrote:

> Hi Matthias,
>
> Thanks. The perf test is a good start but I don't think it goes far enough.
> 100 partitions is not a lot. What happens when there are thousands of
> partitions? What is the load on the brokers? How much more memory is used
> by the Streams App etc?
>
> Thanks,
> Damian
>
> On Tue, 7 Mar 2017 at 03:02 Matthias J. Sax <matth...@confluent.io> wrote:
>
> > Hi,
> >
> > I want to give a first respond:
> >
> >
> >
> > 1. Producer per task:
> >
> > First, we did some performance tests, indicating that the performance
> > penalty is small. Please have a look here:
> >
> > https://docs.google.com/spreadsheets/d/18aGOB13-
> ibwHJl5VE27HnHBlNkZAIeLF9yQ99FDxkuM/edit?usp=sharing
> >
> > For the test, we ran with a trunk version and a modified version that
> > uses a producer per task (of course, no transactions, but at-least-once
> > semantics). The scaling factor indicates the number of brokers and
> > (single threaded) Streams instances. We used SimpleBenchmark that is
> > part of AK code base.
> >
> >
> > Second, as the design is "producer per task" (and not "producer per
> > partition") it is possible to specify a custom PartitionGrouper that
> > assigns multiple partitions to a single task. Thus, it allows to reduce
> > the number of tasks for scenarios with many partitions. Right now, this
> > interface must be implemented solely by the user, but we could also add
> > a new config parameter that specifies the max.number.of.tasks or
> > partitions.per.task so that the user can configure this instead of
> > implementing the interface.
> >
> > Third, there is the idea of a "Producer Pool" that would allow to share
> > resources (network connections, memory, etc) over multiple producers.
> > This would allow to separate multiple transaction on the producer level,
> > while resources are shared. There is no detailed design document yet and
> > there would be a KIP for this feature.
> >
> > Thus, if there should be any performance problems for high scale
> > scenarios, there are multiple ways to tackle them while keeping the
> > "producer per task" design.
> >
> > Additionally, a "producer per thread" design would be way more complex
> > and I summarized the issues in a separate document. I will share a link
> > to the document soon.
> >
> >
> >
> > 2. StateStore recovery:
> >
> > Streams EoS will in the first design not allow to exploit the
> > improvements that are added for 0.11 at the moment. However, as 0.10.2
> > faces the same issues of potentially long recovery, there is no
> > regression with this regard. Thus, I see those improvements as
> > orthogonal or add-ons. Nevertheless, we should try to explore those
> > options and if possible get them into 0.11 such that Streams with EoS
> > gets the same improvements as at-least-once scenario.
> >
> >
> >
> > 3. Caching:
> >
> > We might need to do some experiments to quantify the impact on caching.
> > If it's severe, the suggested default commit interval of 100ms could
> > also be increased. Also, EoS will not enforce any commit interval, but
> > only change the default value. Thus, a user can freely trade-off latency
> > vs. caching-effect.
> >
> > Last but not least, there is the idea to allow "read_uncommitted" for
> > intermediate topic. This would be an advance design for Streams EoS that
> > allows downstream sub-topologies to read uncommitted data
> > optimistically. In case of failure, a cascading abort of transactions
> > would be required. This change will need another KIP.
> >
> >
> >
> > 4. Idempotent Producer:
> >
> > The transactional part automatically leverages the idempotent properties
> > of the producer. Idempotency is a requirement:
> >
> > > Note that enable.idempotence must be enabled if a TransactionalId is
> > configured.
> >
> > See
> >
> > https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF
> 0wSw9ra8/edit#bookmark=id.g2xsf9n49puh
> >
> > All idempotent retries, are handled by the producer internally (with or
> > without transaction) if enable.idempotence is set to true.
> >
> >
> >
> > -Matthias
> >
> >
> >
> > On 3/3/17 3:34 AM, Eno Thereska wrote:
> > > Another question:
> > >
> > > The KIP doesn’t exactly spell out how it uses the idempotence guarantee
> > from KIP-98. It seems that only the transactional part is needed. Or is
> the
> > idempotence guarantee working behind the scenes and helping for some
> > scenarios for which it is not worthwhile aborting a transaction (e.g.,
> > retransmitting a record after a temporary network glitch)?
> > >
> > > Thanks
> > > Eno
> > >
> > >> On Mar 2, 2017, at 4:56 PM, Jay Kreps <j...@confluent.io> wrote:
> > >>
> > >> I second the concern on with the one producer per task approach. At a
> > >> high-level it seems to make sense but I think Damian is exactly right
> > that
> > >> that cuts against the general design of the producer. Many people have
> > high
> > >> input partition counts and will have high task counts as a result. I
> > think
> > >> processing 1000 partitions should not be an unreasonable thing to want
> > to
> > >> do.
> > >>
> > >> The tricky bits will be:
> > >>
> > >>   - Reduced effectiveness of batching (or more latency and memory to
> get
> > >>   equivalent batching). This doesn't show up in simple benchmarks
> > because
> > >>   much of the penalty is I/O and CPU on the broker and the additional
> > threads
> > >>   from all the producers can make a single-threaded benchmark seem
> > faster.
> > >>   - TCP connection explosion. We maintain one connection per broker.
> > This
> > >>   is already high since each app instance does this. This design
> though
> > will
> > >>   add an additional multiplicative factor based on the partition count
> > of the
> > >>   input.
> > >>   - Connection and metadata request storms. When an instance with 1000
> > >>   tasks starts up it is going to try to create many thousands of
> > connections
> > >>   and issue a thousand metadata requests all at once.
> > >>   - Memory usage. We currently default to 64MB per producer. This can
> be
> > >>   tuned down, but the fact that we are spreading the batching over
> more
> > >>   producers will fundamentally mean we need a lot more memory to get
> > good
> > >>   perf and the memory usage will change as your task assignment
> changes
> > so it
> > >>   will be hard to set correctly unless it is done automatically.
> > >>   - Metrics explosion (1000 producer instances, each with their own
> > >>   metrics to monitor).
> > >>   - Thread explosion, 1000 background threads, one per producer, each
> > >>   sending data.
> > >>
> > >> -Jay
> > >>
> > >> On Wed, Mar 1, 2017 at 3:05 AM, Damian Guy <damian....@gmail.com>
> > wrote:
> > >>
> > >>> Hi Guozhang,
> > >>>
> > >>> Thanks for the KIP! This is an important feature for Kafka Streams
> and
> > will
> > >>> help to unlock a bunch of use cases.
> > >>>
> > >>> I have some concerns/questions:
> > >>>
> > >>>   1. Producer per task: I'm worried about the overhead this is going
> to
> > >>>   put on both the streams app and the Kafka Brokers. You can easily
> > >>> imagine
> > >>>   an app consuming thousands of partitions. What load will this put
> on
> > the
> > >>>   brokers? Am i correct in assuming that there will be metadata
> > requests
> > >>> per
> > >>>   Producer? The memory overhead in the streams app will also increase
> > >>> fairly
> > >>>   significantly. Should we adjust ProducerConfig.BUFFER_MEMORY_
> CONFIG?
> > >>>   2. State Store recovery: As we already know, restoring the entire
> > >>>   changelog can take an extremely long time. Even with a fairly small
> > >>> dataset
> > >>>   and an inappropriately tuned segment size, this can take way too
> > long.
> > >>> My
> > >>>   concern is that failures happen and then recovery takes "forever"
> > and we
> > >>>   end up in a situation where we need to change the max.poll.interval
> > to
> > >>> be
> > >>>   some very large number or else we end up in "rebalance hell". I
> don't
> > >>> think
> > >>>   this provides a very good user experience. You mention RocksDB
> > >>>   checkpointing in the doc - should we explore this idea some more?
> > i.e.,
> > >>>   understand the penalty for checkpointing. Maybe checkpoint every
> *n*
> > >>>    commits?
> > >>>   3. What does EoS mean for Caching? If we set the commit interval to
> > >>>   100ms then the cache is not going to be very effective. Should it
> > just
> > >>> be
> > >>>   disabled?
> > >>>
> > >>> Thanks,
> > >>> Damian
> > >>>
> > >>> On Tue, 28 Feb 2017 at 21:54 Guozhang Wang <wangg...@gmail.com>
> wrote:
> > >>>
> > >>>> Hi all,
> > >>>>
> > >>>> I have just created KIP-129 to leverage KIP-98 in Kafka Streams and
> > >>> provide
> > >>>> exactly-once processing semantics:
> > >>>>
> > >>>>
> > >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > >>> 129%3A+Streams+Exactly-Once+Semantics
> > >>>>
> > >>>> This KIP enables Streams users to optionally turn on exactly-once
> > >>>> processing semantics without changing their app code at all by
> > leveraging
> > >>>> the transactional messaging features provided in KIP-98.
> > >>>>
> > >>>> The above wiki page provides a high-level view of the proposed
> > changes,
> > >>>> while detailed implementation design can be found in this Google
> doc:
> > >>>>
> > >>>>
> > >>>> https://docs.google.com/document/d/1pGZ8xtOOyGwDYgH5vA6h19zOMMadu
> > >>> FK1DAB8_gBYA2c
> > >>>>
> > >>>> We would love to hear your comments and suggestions.
> > >>>>
> > >>>> Thanks,
> > >>>> -- Guozhang
> > >>>>
> > >>>
> > >
> >
> >
>

Reply via email to