Ismael,

On the network performance side, the issue is on the throughput.  For
networking purposes, you gain throughout by combining data into bigger
batch otherwise you pay higher overhead on network
handshaking/roudtrip-delay and wastage on the underlying network packet
buffer.  On mirrormaker, it fetches data inbound through FetchResponse from
source broker which can return data in MBytes (comprised of multiple
batches in the same response), however the outbound ProduceRequest to
target broker are in KByte batch size range if we don't optimize.  (The
reason those batches are in KBytes is because when the application client
originally produced to the 1st broker, they tended to select smaller batch
sizes to achieve low latency). The outbound throughput is not going to be
able to match with the inbound throughput.

In order to match the networking parity between inbound FetchResponse
(which allows packing multiple batches) and outbound ProduceRequest, we
ask to restore the multiple batch packing capability of ProduceRequest.  I
think KIP-98 did a shortcut to remove the multiple batch packing capability
to save the extra work they need to do to support transactions across
multiple batches.

For our use case, we view MM as a mere replication pipe to get the data
from one data center into another data center, the remote broker is almost
like a follower to the broker in the source cluster.  For broker to broker
replication, it's using FetchRequest/FetchResponse which does use multiple
batch packing to achieve the optimal network throughput.  On the broker
code path, FetchResponse and ProduceRequest went to the same handling code
and the broker will just append the MemoryRecords (which contains multiple
batches) into the same log segment file as one unit.  So it's not
particularly hard to restore the multiple batch packing feature back for
ProduceRequest.



On Tue, Mar 30, 2021 at 12:34 AM Ismael Juma <ism...@juma.me.uk> wrote:

> Hi Henry,
>
> Can you clarify why this "network performance" issue is only related to
> shallow mirroring? Generally, we want the protocol to be generic and not
> have a number of special cases. The more special cases you have, the
> tougher it becomes to test all the edge cases.
>
> Ismael
>
> On Mon, Mar 29, 2021 at 9:51 PM Henry Cai <h...@pinterest.com.invalid>
> wrote:
>
> > It's interesting this VOTE thread finally becomes a DISCUSS thread.
> >
> > For MM2 concern, I will take a look to see whether I can add the support
> > for MM2.
> >
> > For Ismael's concern on multiple batches in the ProduceRequest
> (conflicting
> > with KIP-98), here is my take:
> >
> > 1. We do need to group multiple batches in the same request otherwise the
> > network performance will suffer.
> > 2. For the concern on transactional message support as in KIP-98, since
> MM1
> > and MM2 currently don't support transactional messages, KIP-712 will not
> > attempt to support transactions either.  I will add a config option on
> > producer config: allowMultipleBatches.  By default this option will be
> off
> > and the user needs to explicitly turn on this option to use the shallow
> > mirror feature.  And if we detect both this option and transaction is
> > turned on we will throw an exception to protect current transaction
> > processing.
> > 3. In the future, when MM2 starts to support exact-once and transactional
> > messages (is that KIP-656?), we can revisit this code.  The current
> > transactional message already makes the compromise that the messages in
> the
> > same RecordBatch (MessageSet) are sharing the same
> > sequence-id/transaction-id, so those messages need to be committed all
> > together.  I think when we support the shallow mirror with transactional
> > semantics, we will group all batches in the same ProduceRequest in the
> same
> > transaction boundary, they need to be committed all together.  On the
> > broker side, all batches coming from ProduceRequest (or FetchResponse)
> are
> > committed in the same log segment file as one unit (current behavior).
> >
> > On Mon, Mar 29, 2021 at 8:46 AM Ryanne Dolan <ryannedo...@gmail.com>
> > wrote:
> >
> > > Ah, I see, thanks Ismael. Now I understand your concern.
> > >
> > > From KIP-98, re this change in v3:
> > >
> > > "This allows us to remove the message set size since each message set
> > > already contains a field for the size. More importantly, since there is
> > > only one message set to be written to the log, partial produce failures
> > are
> > > no longer possible. The full message set is either successfully written
> > to
> > > the log (and replicated) or it is not."
> > >
> > > The schema and size field don't seem to be an issue, as KIP-712 already
> > > addresses.
> > >
> > > The partial produce failure issue is something I don't understand. I
> > can't
> > > tell if this was done out of convenience at the time or if there is
> > > something incompatible with partial produce success/failure and EOS.
> Does
> > > anyone know?
> > >
> > > Ryanne
> > >
> > > On Mon, Mar 29, 2021, 1:41 AM Ismael Juma <ism...@juma.me.uk> wrote:
> > >
> > > > Ryanne,
> > > >
> > > > You misunderstood the referenced comment. It is about the produce
> > request
> > > > change to have multiple batches:
> > > >
> > > > "Up to ProduceRequest V2, a ProduceRequest can contain multiple
> batches
> > > of
> > > > messages stored in the record_set field, but this was disabled in V3.
> > We
> > > > are proposing to bring the multiple batches feature back to improve
> the
> > > > network throughput of the mirror maker producer when the original
> batch
> > > > size from source broker is too small."
> > > >
> > > > This is unrelated to shallow iteration.
> > > >
> > > > Ismael
> > > >
> > > > On Sun, Mar 28, 2021, 10:15 PM Ryanne Dolan <ryannedo...@gmail.com>
> > > wrote:
> > > >
> > > > > Ismael, I don't think KIP-98 is related. Shallow iteration was
> > removed
> > > in
> > > > > KAFKA-732, which predates KIP-98 by a few years.
> > > > >
> > > > > Ryanne
> > > > >
> > > > > On Sun, Mar 28, 2021, 11:25 PM Ismael Juma <ism...@juma.me.uk>
> > wrote:
> > > > >
> > > > > > Thanks for the KIP. I have a few high level comments:
> > > > > >
> > > > > > 1. Like Tom, I'm not convinced about the proposal to make this
> > change
> > > > to
> > > > > > MirrorMaker 1 if we intend to deprecate it and remove it. I would
> > > > rather
> > > > > us
> > > > > > focus our efforts on the implementation we intend to support
> going
> > > > > forward.
> > > > > > 2. The producer/consumer configs seem pretty dangerous for
> general
> > > > usage,
> > > > > > but the KIP doesn't address the potential downsides.
> > > > > > 3. How does the ProducerRequest change impact exactly-once (if at
> > > all)?
> > > > > The
> > > > > > change we are reverting was done as part of KIP-98. Have we
> > > considered
> > > > > the
> > > > > > original reasons for the change?
> > > > > >
> > > > > > Thanks,
> > > > > > Ismael
> > > > > >
> > > > > > On Wed, Feb 10, 2021 at 12:58 PM Vahid Hashemian <
> > > > > > vahid.hashem...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Retitled the thread to conform to the common format.
> > > > > > >
> > > > > > > On Fri, Feb 5, 2021 at 4:00 PM Ning Zhang <
> > ning2008w...@gmail.com>
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hello Henry,
> > > > > > > >
> > > > > > > > This is a very interesting proposal.
> > > > > > > > https://issues.apache.org/jira/browse/KAFKA-10728 reflects
> the
> > > > > similar
> > > > > > > > concern of re-compressing data in mirror maker.
> > > > > > > >
> > > > > > > > Probably one thing may need to clarify is: how "shallow"
> > > mirroring
> > > > is
> > > > > > > only
> > > > > > > > applied to mirrormaker use case, if the changes need to be
> made
> > > on
> > > > > > > generic
> > > > > > > > consumer and producer (e.g. by adding `fetch.raw.bytes` and
> > > > > > > > `send.raw.bytes` to producer and consumer config)
> > > > > > > >
> > > > > > > > On 2021/02/05 00:59:57, Henry Cai <h...@pinterest.com.INVALID
> >
> > > > > wrote:
> > > > > > > > > Dear Community members,
> > > > > > > > >
> > > > > > > > > We are proposing a new feature to improve the performance
> of
> > > > Kafka
> > > > > > > mirror
> > > > > > > > > maker:
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-712%3A+Shallow+Mirroring
> > > > > > > > >
> > > > > > > > > The current Kafka MirrorMaker process (with the underlying
> > > > Consumer
> > > > > > and
> > > > > > > > > Producer library) uses significant CPU cycles and memory to
> > > > > > > > > decompress/recompress, deserialize/re-serialize messages
> and
> > > copy
> > > > > > > > multiple
> > > > > > > > > times of messages bytes along the mirroring/replicating
> > stages.
> > > > > > > > >
> > > > > > > > > The KIP proposes a *shallow mirror* feature which brings
> back
> > > the
> > > > > > > shallow
> > > > > > > > > iterator concept to the mirror process and also proposes to
> > > skip
> > > > > the
> > > > > > > > > unnecessary message decompression and recompression steps.
> > We
> > > > > argue
> > > > > > in
> > > > > > > > > many cases users just want a simple replication pipeline to
> > > > > replicate
> > > > > > > the
> > > > > > > > > message as it is from the source cluster to the destination
> > > > > cluster.
> > > > > > > In
> > > > > > > > > many cases the messages in the source cluster are already
> > > > > compressed
> > > > > > > and
> > > > > > > > > properly batched, users just need an identical copy of the
> > > > message
> > > > > > > bytes
> > > > > > > > > through the mirroring without any transformation or
> > > > repartitioning.
> > > > > > > > >
> > > > > > > > > We have a prototype implementation in house with
> MirrorMaker
> > v1
> > > > and
> > > > > > > > > observed *CPU usage dropped from 50% to 15%* for some
> mirror
> > > > > > pipelines.
> > > > > > > > >
> > > > > > > > > We name this feature: *shallow mirroring* since it has some
> > > > > > resemblance
> > > > > > > > to
> > > > > > > > > the old Kafka 0.7 namesake feature but the implementations
> > are
> > > > not
> > > > > > > quite
> > > > > > > > > the same.  ‘*Shallow*’ means 1. we *shallowly* iterate
> > > > > RecordBatches
> > > > > > > > inside
> > > > > > > > > MemoryRecords structure instead of deep iterating records
> > > inside
> > > > > > > > > RecordBatch; 2. We *shallowly* copy (share) pointers inside
> > > > > > ByteBuffer
> > > > > > > > > instead of deep copying and deserializing bytes into
> objects.
> > > > > > > > >
> > > > > > > > > Please share discussions/feedback along this email thread.
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > >
> > > > > > > Thanks!
> > > > > > > --Vahid
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to