+1 to adding latency metrics.

To add context on why CPU, memory and GC has a bigger impact than network
in a Mirror for compressed topics without KIP-712 is: *a failing / unstable
mirror cluster will have lag perpetually spiking having much larger impact
on e2e latencies*. To explain a bit more:

Less data moved:
Compressed topics "usually" should move less data over the network and are
useful to reduce the network cost / footprint of replication. Therefore,
network usage may naturally be less than if this data were uncompressed.
Instead the CPU usage bottleneck hits first due to decompression of data.
Prior to KIP-712 we had never been able to operate a mirror at wire speed.

Stability:
If there is a load spike, there can be a few scenarios played out:
- more data in a batch i.e. larger uncompressed size i.e. larger memory
footprint
- more number of batches i.e. larger memory footprint

In either case higher memory usage and more CPU cycles are used due to
this.
If the GC throughput or heap size is insufficient leads to OOMs.

Domino Effect:
Just like any Kafka Consumer, if a consumer instance in a consumer group
terminates it triggers a rebalance. In this case that rebalance happens due
to an OOM. If a Mirror instance that fails due to an OOM triggered by
traffic load (explained above) will result in a domino effect of more
Mirror instances having OOMs as the load increases due to an even smaller
number of running instances remaining in the group. Eventually leading to a
total failure of the mirror cluster.

Memory Limits & Ineffective workarounds:
A question that could be asked couldn't we configure the Mirror instance in
such a way that this doesn't happen? The answer is it's expensive and
difficult.
Let's say we are using a 4 core host with X GBs of memory and configure the
Mirror to use 4 Streams and this configuration leads to an OOM, we could
try to reduce the number of Streams to 3 or 2. That's a 25-50% loss in
efficiency i.e. we may now need 2x the number of nodes (& 2x cost) without
any guarantees that this configuration will never result in an OOM (since
future traffic characteristics are unpredictable) but it may reduce the
probability of an OOM.

Summary:
Since the root cause is memory usage due to decompression of data in
flight, the ideal way to resolve this was to eliminate the decompression of
data which isn't a hard requirement for the mirror to operate since it was
not performing any transformation or repartitioning in our case.

Thanks,
- Ambud

On Mon, Feb 22, 2021 at 9:20 AM Vahid Hashemian <vahid.hashem...@gmail.com>
wrote:

> As Henry mentions in the KIP, we are seeing a great deal of improvements
> and efficiency by using the mirroring enhancement proposed in this KIP, and
> believe it would be equally beneficial to everyone that runs Kafka and
> Kafka Mirror at scale.
>
> I'm bumping up this thread in case there are additional feedback or
> comments.
>
> Thanks,
> --Vahid
>
> On Sat, Feb 13, 2021, 13:59 Ryanne Dolan <ryannedo...@gmail.com> wrote:
>
> > Glad to hear that latency and thruput aren't negatively affected
> somehow. I
> > would love to see this KIP move forward.
> >
> > Ryanne
> >
> > On Sat, Feb 13, 2021, 3:00 PM Henry Cai <h...@pinterest.com> wrote:
> >
> > > Ryanne,
> > >
> > > Yes, network performance is also important.  In our deployment, we are
> > > bottlenecked on the CPU/memory on the mirror hosts.  We are using c5.2x
> > and
> > > m5.2x nodes in AWS, before the deployment, CPU would peak to 80% but
> > there
> > > is enough network bandwidth left on those hosts.  Having said that, we
> > > maintain the same network throughput before and after the switch.
> > >
> > > On Fri, Feb 12, 2021 at 12:20 PM Ryanne Dolan <ryannedo...@gmail.com>
> > > wrote:
> > >
> > >> Hey Henry, great KIP. The performance improvements are impressive!
> > >> However, often cpu, ram, gc are not the metrics most important to a
> > >> replication pipeline -- often the network is mostly saturated anyway.
> Do
> > >> you know how this change affects latency or thruput? I suspect less GC
> > >> pressure means slightly less p99 latency, but it would be great to see
> > that
> > >> confirmed. I don't think it's necessary that this KIP improves these
> > >> metrics, but I think it's important to show that they at least aren't
> > made
> > >> worse.
> > >>
> > >> I suspect any improvement in MM1 would be magnified in MM2, given
> there
> > >> is a lot more machinery between consumer and producer in MM2.
> > >>
> > >>
> > >> I'd like to do some performance analysis based on these changes.
> Looking
> > >> forward to a PR!
> > >>
> > >> Ryanne
> > >>
> > >> On Wed, Feb 10, 2021, 3:50 PM Henry Cai <h...@pinterest.com> wrote:
> > >>
> > >>> On the question "whether shallow mirror is only applied on mirror
> maker
> > >>> v1", the code change is mostly on consumer and producer code path,
> the
> > >>> change to mirrormaker v1 is very trivial.  We chose to modify the
> > >>> consumer/producer path (instead of creating a new mirror product) so
> > other
> > >>> use cases can use that feature as well.  The change to mirror maker
> v2
> > >>> should be straightforward as well but we don't have that environment
> in
> > >>> house.  I think the community can easily port this change to mirror
> > maker
> > >>> v2.
> > >>>
> > >>>
> > >>>
> > >>> On Wed, Feb 10, 2021 at 12:58 PM Vahid Hashemian <
> > >>> vahid.hashem...@gmail.com> wrote:
> > >>>
> > >>>> Retitled the thread to conform to the common format.
> > >>>>
> > >>>> On Fri, Feb 5, 2021 at 4:00 PM Ning Zhang <ning2008w...@gmail.com>
> > >>>> wrote:
> > >>>>
> > >>>> > Hello Henry,
> > >>>> >
> > >>>> > This is a very interesting proposal.
> > >>>> > https://issues.apache.org/jira/browse/KAFKA-10728 reflects the
> > >>>> similar
> > >>>> > concern of re-compressing data in mirror maker.
> > >>>> >
> > >>>> > Probably one thing may need to clarify is: how "shallow" mirroring
> > is
> > >>>> only
> > >>>> > applied to mirrormaker use case, if the changes need to be made on
> > >>>> generic
> > >>>> > consumer and producer (e.g. by adding `fetch.raw.bytes` and
> > >>>> > `send.raw.bytes` to producer and consumer config)
> > >>>> >
> > >>>> > On 2021/02/05 00:59:57, Henry Cai <h...@pinterest.com.INVALID>
> > wrote:
> > >>>> > > Dear Community members,
> > >>>> > >
> > >>>> > > We are proposing a new feature to improve the performance of
> Kafka
> > >>>> mirror
> > >>>> > > maker:
> > >>>> > >
> > >>>> >
> > >>>>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-712%3A+Shallow+Mirroring
> > >>>> > >
> > >>>> > > The current Kafka MirrorMaker process (with the underlying
> > Consumer
> > >>>> and
> > >>>> > > Producer library) uses significant CPU cycles and memory to
> > >>>> > > decompress/recompress, deserialize/re-serialize messages and
> copy
> > >>>> > multiple
> > >>>> > > times of messages bytes along the mirroring/replicating stages.
> > >>>> > >
> > >>>> > > The KIP proposes a *shallow mirror* feature which brings back
> the
> > >>>> shallow
> > >>>> > > iterator concept to the mirror process and also proposes to skip
> > the
> > >>>> > > unnecessary message decompression and recompression steps.  We
> > >>>> argue in
> > >>>> > > many cases users just want a simple replication pipeline to
> > >>>> replicate the
> > >>>> > > message as it is from the source cluster to the destination
> > >>>> cluster.  In
> > >>>> > > many cases the messages in the source cluster are already
> > >>>> compressed and
> > >>>> > > properly batched, users just need an identical copy of the
> message
> > >>>> bytes
> > >>>> > > through the mirroring without any transformation or
> > repartitioning.
> > >>>> > >
> > >>>> > > We have a prototype implementation in house with MirrorMaker v1
> > and
> > >>>> > > observed *CPU usage dropped from 50% to 15%* for some mirror
> > >>>> pipelines.
> > >>>> > >
> > >>>> > > We name this feature: *shallow mirroring* since it has some
> > >>>> resemblance
> > >>>> > to
> > >>>> > > the old Kafka 0.7 namesake feature but the implementations are
> not
> > >>>> quite
> > >>>> > > the same.  ‘*Shallow*’ means 1. we *shallowly* iterate
> > RecordBatches
> > >>>> > inside
> > >>>> > > MemoryRecords structure instead of deep iterating records inside
> > >>>> > > RecordBatch; 2. We *shallowly* copy (share) pointers inside
> > >>>> ByteBuffer
> > >>>> > > instead of deep copying and deserializing bytes into objects.
> > >>>> > >
> > >>>> > > Please share discussions/feedback along this email thread.
> > >>>> > >
> > >>>> >
> > >>>>
> > >>>>
> > >>>> --
> > >>>>
> > >>>> Thanks!
> > >>>> --Vahid
> > >>>>
> > >>>
> >
>

Reply via email to