Mirrormaker 2.0 and compacted topics

2020-03-18 Thread Pirow Engelbrecht
Hello,

We're currently trying to evaluate Mirrormaker 2.0 for future inter-cluster 
replication, replacing our bespoke replicator. I understand that Mirrormaker 
2.0 documentation is only slated to be released in Kafka 2.5.0, but I was 
hoping that someone will know whether Mirrormaker 2.0 can be used to replicate 
compacted topics? I am especially interested in the case where a cluster has 
been off-line for a few hours and have to catch up via replication and the 
topic has been compacted since it was last seen by the off-line cluster.

Thanks

Pirow Engelbrecht | Senior Design Engineer
Tel +27 12 678 9740 (ext. 9879) | Cell +27 63 148 3376

76 Regency Drive | Irene | Centurion | 0157


[cid:image001.jpg@01D5FD0E.7B6D0750]


Facebook | 
YouTube | 
LinkedIn | 
Twitter | 
Instagram




Re: Connect - Membership Protocol

2020-03-18 Thread Robin Moffatt
Are the four brokers across two clusters (B1+B2) / (B3+B4), or one cluster?
If one cluster, are using the same config/offset/status topics for each
Connect cluster? Because that definitely won't work.

In case it's useful:
https://rmoff.net/2019/11/22/common-mistakes-made-when-configuring-multiple-kafka-connect-workers/


-- 

Robin Moffatt | Senior Developer Advocate | ro...@confluent.io | @rmoff


On Tue, 17 Mar 2020 at 17:58, mandeep gandhi 
wrote:

> Gentle reminder.
>
> + users.
>
> On Mon, Mar 16, 2020 at 11:27 PM mandeep gandhi 
> wrote:
>
> > Hi,
> >
> > (PS - I did not use users list as this concerns some internals)
> >
> > I was trying to deploy the following config on a K8s cluster  -
> >
> > Namespace -  X1 Connect group id -  G1Bootstrap servers - B1, B2
> > Namespace -  X2 Connect group id -  G1 Bootstrap servers - B3, B4
> >
> > With this configuration, I am seeing multiple rebalances (logs below). So
> > the question is how is the Membership protocol working?
> > I have tried to follow some KIPs and they mostly say that the Bootstrap
> > server is the coordinator of the group. If yes, then logically this
> > configuration should work just fine as both configs have different
> > bootstrap servers.  But as far as I have tried to understand the code
> (and
> > tried to run Kafka Connect), it seems like the members get added in the
> > group one by one and the head of the list becomes the group
> co-ordinator. (
> > if I change Connect Group id in 2nd config, things work)
> >
> > Also, I wanted some pointers on what is happening at the server-side vs
> > client-side during the protocol.
> >
> >
> > Kindly help.
> >
> > Logs -
> >
> > [2020-03-16 10:27:13,386] INFO [Worker clientId=connect-1,
> groupId=streaming-connect] Current config state offset 12 does not match
> group assignment 9164. Forcing rebalance.
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:942)
> > [2020-03-16 10:27:13,386] INFO [Worker clientId=connect-1,
> groupId=streaming-connect] Rebalance started
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:213)
> > [2020-03-16 10:27:13,386] INFO [Worker clientId=connect-1,
> groupId=streaming-connect] (Re-)joining group
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:505)
> > [2020-03-16 10:27:13,388] INFO [Worker clientId=connect-1,
> groupId=streaming-connect] Successfully joined group with generation 18535
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:469)
> > [2020-03-16 10:27:13,389] INFO [Worker clientId=connect-1,
> groupId=streaming-connect] Joined group at generation 18535 and got
> assignment: Assignment{error=0,
> leader='connect-1-3580fdcc-257e-40e7-8243-f7839021599f', leaderUrl='
> http://10.13.191.32:8083/', offset=9164, connectorIds=[], taskIds=[],
> revokedConnectorIds=[], revokedTaskIds=[], delay=0}
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397)
> > [2020-03-16 10:27:13,389] WARN [Worker clientId=connect-1,
> groupId=streaming-connect] Catching up to assignment's config offset.
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:909)
> > [2020-03-16 10:27:13,389] INFO [Worker clientId=connect-1,
> groupId=streaming-connect] Current config state offset 12 is behind group
> assignment 9164, reading to end of config log
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:970)
> >
> >
> >
> > Thanks,
> >
> > Mandeep Gandhi
> >
> >
> >
> >
>


Re: Mirrormaker 2.0 and compacted topics

2020-03-18 Thread Sönke Liebau
Hi Pirow,

as far as I understand MirrorMaker 2.0 will not treat compacted topics any
different from uncompacted topics.

What that means for your scenario is that your replication may miss some
messages in the case of a long unavailability, if those messages were
compacted in the meantime. However, your replicated topic will always
receive the latest message per key so be "up-to-date" once MM is running
again.

Not sure if that answers your question?

Best regards,
Sönke


On Wed, 18 Mar 2020 at 09:18, Pirow Engelbrecht <
pirow.engelbre...@etion.co.za> wrote:

> Hello,
>
>
>
> We’re currently trying to evaluate Mirrormaker 2.0 for future
> inter-cluster replication, replacing our bespoke replicator. I understand
> that Mirrormaker 2.0 documentation is only slated to be released in Kafka
> 2.5.0, but I was hoping that someone will know whether Mirrormaker 2.0 can
> be used to replicate compacted topics? I am especially interested in the
> case where a cluster has been off-line for a few hours and have to catch up
> via replication and the topic has been compacted since it was last seen by
> the off-line cluster.
>
>
>
> Thanks
>
>
>
> *Pirow Engelbrecht *| Senior Design Engineer
> *Tel* +27 12 678 9740 (ext. 9879) | *Cell* +27 63 148 3376
>
> 76 Regency Drive | Irene | Centurion | 0157
> 
>
> 
>
> Facebook
>  |
> YouTube  |
> LinkedIn  | Twitter
>  | Instagram
> 
>
>
>


-- 
Sönke Liebau
Partner
Tel. +49 179 7940878
OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany


RE: Mirrormaker 2.0 and compacted topics

2020-03-18 Thread Pirow Engelbrecht
Hi Sönke,

OK, thanks, so compacted topics is supported, but an exact replica (i.e. Kafka 
records at the same offsets) of the original topic is not possible as there is 
nothing to replicate. Is my understanding correct?

Thanks

Pirow Engelbrecht | Senior Design Engineer
Tel +27 12 678 9740 (ext. 9879) | Cell +27 63 148 3376

76 Regency Drive | Irene | Centurion | 0157


[cid:image001.jpg@01D5FD25.569FACE0]


Facebook | 
YouTube | 
LinkedIn | 
Twitter | 
Instagram


From: Sönke Liebau 
Sent: Wednesday, 18 March 2020 12:14 PM
To: users@kafka.apache.org
Subject: Re: Mirrormaker 2.0 and compacted topics

Hi Pirow,

as far as I understand MirrorMaker 2.0 will not treat compacted topics any
different from uncompacted topics.

What that means for your scenario is that your replication may miss some
messages in the case of a long unavailability, if those messages were
compacted in the meantime. However, your replicated topic will always
receive the latest message per key so be "up-to-date" once MM is running
again.

Not sure if that answers your question?

Best regards,
Sönke


On Wed, 18 Mar 2020 at 09:18, Pirow Engelbrecht <
pirow.engelbre...@etion.co.za> wrote:

> Hello,
>
>
>
> We’re currently trying to evaluate Mirrormaker 2.0 for future
> inter-cluster replication, replacing our bespoke replicator. I understand
> that Mirrormaker 2.0 documentation is only slated to be released in Kafka
> 2.5.0, but I was hoping that someone will know whether Mirrormaker 2.0 can
> be used to replicate compacted topics? I am especially interested in the
> case where a cluster has been off-line for a few hours and have to catch up
> via replication and the topic has been compacted since it was last seen by
> the off-line cluster.
>
>
>
> Thanks
>
>
>
> *Pirow Engelbrecht *| Senior Design Engineer
> *Tel* +27 12 678 9740 (ext. 9879) | *Cell* +27 63 148 3376
>
> 76 Regency Drive | Irene | Centurion | 0157
> >
>
> >
>
> Facebook
> >
>  |
> YouTube 
> >
>  |
> LinkedIn 
> >
>  | Twitter
> > | 
> Instagram
> >
>
>
>


--
Sönke Liebau
Partner
Tel. +49 179 7940878
OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany


Re: Mirrormaker 2.0 and compacted topics

2020-03-18 Thread Sönke Liebau
Hi Pirow,

records at the same offset as in the original topic is not possible for non
compacted topics either, that is one of the major constraints of
MirrorMaker.

Compaction doesn't change offsets of messages, so in theory should behave
the same as a non compacted topic, unless of course you miss messages as in
your scenario.

Additionally, as compaction is triggered individually by each cluster, the
topics might contain different numbers of messages per key at any given
time. This should not be relevant for a compaction use case, but can be
good to keep in mind.

Best regards,
Sönke

Pirow Engelbrecht  schrieb am Mi., 18. März
2020, 12:01:

> Hi Sönke,
>
>
>
> OK, thanks, so compacted topics is supported, but an exact replica (i.e.
> Kafka records at the same offsets) of the original topic is not possible as
> there is nothing to replicate. Is my understanding correct?
>
>
>
> Thanks
>
>
>
> *Pirow Engelbrecht *| Senior Design Engineer
> *Tel* +27 12 678 9740 (ext. 9879) | *Cell* +27 63 148 3376
>
> 76 Regency Drive | Irene | Centurion | 0157
> 
>
> 
>
> Facebook
>  |
> YouTube  |
> LinkedIn  | Twitter
>  | Instagram
> 
>
>
>
> *From:* Sönke Liebau 
> *Sent:* Wednesday, 18 March 2020 12:14 PM
> *To:* users@kafka.apache.org
> *Subject:* Re: Mirrormaker 2.0 and compacted topics
>
>
>
> Hi Pirow,
>
> as far as I understand MirrorMaker 2.0 will not treat compacted topics any
> different from uncompacted topics.
>
> What that means for your scenario is that your replication may miss some
> messages in the case of a long unavailability, if those messages were
> compacted in the meantime. However, your replicated topic will always
> receive the latest message per key so be "up-to-date" once MM is running
> again.
>
> Not sure if that answers your question?
>
> Best regards,
> Sönke
>
>
> On Wed, 18 Mar 2020 at 09:18, Pirow Engelbrecht <
> pirow.engelbre...@etion.co.za> wrote:
>
> > Hello,
> >
> >
> >
> > We’re currently trying to evaluate Mirrormaker 2.0 for future
> > inter-cluster replication, replacing our bespoke replicator. I understand
> > that Mirrormaker 2.0 documentation is only slated to be released in Kafka
> > 2.5.0, but I was hoping that someone will know whether Mirrormaker 2.0
> can
> > be used to replicate compacted topics? I am especially interested in the
> > case where a cluster has been off-line for a few hours and have to catch
> up
> > via replication and the topic has been compacted since it was last seen
> by
> > the off-line cluster.
> >
> >
> >
> > Thanks
> >
> >
> >
> > *Pirow Engelbrecht *| Senior Design Engineer
> > *Tel* +27 12 678 9740 (ext. 9879) | *Cell* +27 63 148 3376
> >
> > 76 Regency Drive | Irene | Centurion | 0157
> > 
> >
> > 
> >
> > Facebook
> >  |
> > YouTube  |
> > LinkedIn  | Twitter
> >  | Instagram
> > 
> >
> >
> >
>
>
> --
> Sönke Liebau
> Partner
> Tel. +49 179 7940878
> OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany
>


RE: Mirrormaker 2.0 and compacted topics

2020-03-18 Thread Pirow Engelbrecht
Hi Sönke,

Thank you for the clarification – much appreciated.

Kind regards
Pirow Engelbrecht | Senior Design Engineer
Tel +27 12 678 9740 (ext. 9879) | Cell +27 63 148 3376

76 Regency Drive | Irene | Centurion | 0157


[cid:image001.jpg@01D5FD28.3140B0E0]


Facebook | 
YouTube | 
LinkedIn | 
Twitter | 
Instagram


From: Sönke Liebau 
Sent: Wednesday, 18 March 2020 1:12 PM
To: users@kafka.apache.org
Subject: Re: Mirrormaker 2.0 and compacted topics

Hi Pirow,

records at the same offset as in the original topic is not possible for non
compacted topics either, that is one of the major constraints of
MirrorMaker.

Compaction doesn't change offsets of messages, so in theory should behave
the same as a non compacted topic, unless of course you miss messages as in
your scenario.

Additionally, as compaction is triggered individually by each cluster, the
topics might contain different numbers of messages per key at any given
time. This should not be relevant for a compaction use case, but can be
good to keep in mind.

Best regards,
Sönke

Pirow Engelbrecht 
mailto:pirow.engelbre...@etion.co.za>> schrieb 
am Mi., 18. März
2020, 12:01:

> Hi Sönke,
>
>
>
> OK, thanks, so compacted topics is supported, but an exact replica (i.e.
> Kafka records at the same offsets) of the original topic is not possible as
> there is nothing to replicate. Is my understanding correct?
>
>
>
> Thanks
>
>
>
> *Pirow Engelbrecht *| Senior Design Engineer
> *Tel* +27 12 678 9740 (ext. 9879) | *Cell* +27 63 148 3376
>
> 76 Regency Drive | Irene | Centurion | 0157
> >
>
> >
>
> Facebook
> >
>  |
> YouTube 
> >
>  |
> LinkedIn 
> >
>  | Twitter
> > | 
> Instagram
> >
>
>
>
> *From:* Sönke Liebau 
> mailto:soenke.lie...@opencore.com.INVALID>>
> *Sent:* Wednesday, 18 March 2020 12:14 PM
> *To:* users@kafka.apache.org
> *Subject:* Re: Mirrormaker 2.0 and compacted topics
>
>
>
> Hi Pirow,
>
> as far as I understand MirrorMaker 2.0 will not treat compacted topics any
> different from uncompacted topics.
>
> What that means for your scenario is that your replication may miss some
> messages in the case of a long unavailability, if those messages were
> compacted in the meantime. However, your replicated topic will always
> receive the latest message per key so be "up-to-date" once MM is running
> again.
>
> Not sure if that answers your question?
>
> Best regards,
> Sönke
>
>
> On Wed, 18 Mar 2020 at 09:18, Pirow Engelbrecht <
> pirow.engelbre...@etion.co.za> wrote:
>
> > Hello,
> >
> >
> >
> > We’re currently trying to evaluate Mirrormaker 2.0 for future
> > inter-cluster replication, replacing our bespoke replicator. I understand
> > that Mirrormaker 2.0 documentation is only slated to be released in Kafka
> > 2.5.0, but I was hoping that someone will know whether Mirrormaker 2.0
> can
> > be used to replicate compacted topics? I am especially interested in the
> > case where a cluster has been off-line for a few hours and have to catch
> up
> > via replication and the topic has been compacted since it was last seen
> by
> > the off-line cluster.
> >
> >
> >
> > Thanks
> >
> >
> >
> > *Pirow Engelbrecht *| Senior Design Engineer
> > *Tel* +27 12 678 9740 (ext. 9879) | *Cell* +27 63 148 3376
> >
> > 76 Regency Drive | Irene | Centurion | 0157
> > >
> >
> > >
> >
> > Facebook
> > >
> >  |
> > YouTube 
> > >
> >  |
> > LinkedIn 
> > >
> >  | Twitter
> > > | 
> > Instagram
> > >
> >
> >
> >
>
>
> --
> Sönke Liebau
> Partn

Re: MirrorMaker2 not mirroring for 5 minutes when adding a topic

2020-03-18 Thread Ryanne Dolan
Peter, can you share any log lines like "x took y ms" or "x took too long
(y ms)" or "timed out running task x"?

Ryanne

On Tue, Mar 17, 2020 at 10:45 AM Péter Sinóros-Szabó
 wrote:

> Hey,
>
> Running a MM2 cluster to mirror from A->B clusters I noticed that when I
> add a new topic to A cluster, MM2 will notice it:
> [2020-03-17 13:14:05,477] INFO Found 2719 topic-partitions on main. 1 are
> new. 0 were removed. Previously had 2718.
> (org.apache.kafka.connect.mirror.MirrorSourceConnector)
>
> That's fine. It seems that MM2 connectors just simply restart to start up
> with the new configuration.
> My problem is that it takes about 5 minutes for MM2 to start to mirror
> messages again.
>
> What I see in the logs are:
> [2020-03-17 13:14:07,107] INFO Kafka startTimeMs: 1584450847106
> (org.apache.kafka.common.utils.AppInfoParser)
> [2020-03-17 13:14:07,204] INFO [Producer clientId=producer-11] Cluster ID:
> wif2mnkZTayzAEOb2VyoLA (org.apache.kafka.clients.Metadata)
>
> It seems that MM2 restarts the connectors in 2 seconds.
> But then I see the usual logs, but according to MM2's metrics, it is not
> mirroring any messages for about 5 minutes.
>
> [2020-03-17 13:14:10,485] INFO
> WorkerSourceTask{id=MirrorCheckpointConnector-3} Committing offsets
> (org.apache.kafka.connect.runtime.WorkerSourceTask)
> [2020-03-17 13:14:10,485] INFO
> WorkerSourceTask{id=MirrorCheckpointConnector-3} Committing offsets
> (org.apache.kafka.connect.runtime.WorkerSourceTask)
> [2020-03-17 13:14:10,485] INFO
> WorkerSourceTask{id=MirrorCheckpointConnector-3} flushing 0 outstanding
> messages for offset commit
> (org.apache.kafka.connect.runtime.WorkerSourceTask)
> [2020-03-17 13:14:10,485] INFO
> WorkerSourceTask{id=MirrorCheckpointConnector-3} flushing 0 outstanding
> messages for offset commit
> (org.apache.kafka.connect.runtime.WorkerSourceTask)
> [2020-03-17 13:14:10,637] INFO
> WorkerSourceTask{id=MirrorCheckpointConnector-3} Finished commitOffsets
> successfully in 152 ms (org.apache.kafka.connect.runtime.WorkerSourceTask)
> [2020-03-17 13:14:10,637] INFO
> WorkerSourceTask{id=MirrorCheckpointConnector-3} Finished commitOffsets
> successfully in 152 ms (org.apache.kafka.connect.runtime.WorkerSourceTask)
>
> So after ~5 minutes, I see that it is subscribing to the topics... and I
> also see in the metrics that that is the time when it starts to mirror
> messages.
>
> [2020-03-17 13:19:39,850] INFO [Consumer clientId=consumer-39,
> groupId=null] Subscribed to partition(s):  ...
> [2020-03-17 13:19:39,851] INFO Starting with 308 previously uncommitted
> partitions. (org.apache.kafka.connect.mirror.MirrorSourceTask)
> [2020-03-17 13:19:39,851] INFO [Consumer clientId=consumer-39,
> groupId=null] Seeking to offset 178508243 for partition
> twTasks.austrac-service.executeTask.default-0 (org.apache.kafka
>
>
> Is there any idea how to speed this up?
>
> Thanks,
> Peter
>


Kafka Streams - partition assignment for the input topic

2020-03-18 Thread Stephen Young
I have a question about partition assignment for a kafka streams app. As I
understand it the more complex your topology is the greater the number of
internal topics kafka streams will create. In my case the app has 8 graphs
in the topology. There are 6 partitions for each graph (this matches the
number of partitions of the input topic). So there are 48 partitions that
the app needs to handle. These get balanced equally across all 3 servers
where the app is running (each server also has 2 threads so there are 6
available instances of the app).

The problem for me is that the partitions of the input topic have the
heaviest workload. But these 6 partitions are not distributed evenly
amongst the instances. They are just considered 6 partitions amongst the 48
the app needs to balance. But this means if a server gets most or all of
these 6 partitions, it ends up exhausting all of the resources on that
server.

Is there a way of equally balancing these 6 specific partitions amongst the
available instances? I thought writing a custom partition grouper might
help here:

https://kafka.apache.org/10/documentation/streams/developer-guide/config-streams.html#partition-grouper

But the advice seems to be to not do this otherwise you risk breaking the
app.

Thanks!


I'm trying to connect to a kafka broker running in AWS EKS (from outside the EKS cluster).

2020-03-18 Thread Dan Hill
Problem: I'm hitting an error: "no such host" for "
kafka-0.cluster.local:19092".

Has anyone done this before?  Any help would be appreciated.  Thanks! - Dan

My long-term goal is to get an AWS Lambda to send events to a Kafka running
in AWS EKS.

I used the following instructions

(linked to the "outside kubernetes" part) to setup up Kafka using the helm
config.  The only modifications are for the "outside kubernetes" part.


I've tried a few variations.  None of them worked.  I still can't connect
to it.
- on an Lambda in the same subnet, on an EC2 machine in the same subnet, on
a
- with a couple different "outside kubernetes" options.

E.g. if I setup external using LoadBalancer, I'll get something an External
IP like (fake) afdsafsafsafdas-13412341.us-east-1.elb.amazon.com:19092.

If I try to run a basic command against this domain, it fails saying there
is no such host for kafka-0.cluster.local:19092.


Regarding segment size config

2020-03-18 Thread 张祥
Hi community,

I understand that there are two configs regarding segment file size,
log.segment.bytes for broker and segment.bytes for topic. The default
values are both 1G and they are required to be an integer so they cannot
be larger than 2G. My question is, assuming I am not making any mistakes,
what is the reason that log segment size is limited below 2G ? Thanks.


RE: I'm trying to connect to a kafka broker running in AWS EKS (from outside the EKS cluster).

2020-03-18 Thread Pirow Engelbrecht
Hi Dan,

For Kafka:

  *   Make sure that your listeners and advertised listener configuration is 
set up correctly for external connectivity, incl. your Kafka protocols

For Kubernetes:

  *   How are you exposing the Kafka port? Your port number seems low for 
Kubernetes, typically external ports will be above 3
 *   There are several different ways of exposing a port externally, e.g. 
through a load balancer
  *   Do you have a DNS entry for the kafka service to be resolvable externally?

Kind regards

Pirow Engelbrecht | Senior Design Engineer
Tel +27 12 678 9740 (ext. 9879) | Cell +27 63 148 3376

76 Regency Drive | Irene | Centurion | 0157


[cid:image001.jpg@01D5FDC4.BF5F5150]


Facebook | 
YouTube | 
LinkedIn | 
Twitter | 
Instagram


From: Dan Hill 
Sent: Thursday, 19 March 2020 3:49 AM
To: users@kafka.apache.org
Subject: I'm trying to connect to a kafka broker running in AWS EKS (from 
outside the EKS cluster).

Problem: I'm hitting an error: "no such host" for "
kafka-0.cluster.local:19092".

Has anyone done this before? Any help would be appreciated. Thanks! - Dan

My long-term goal is to get an AWS Lambda to send events to a Kafka running
in AWS EKS.

I used the following instructions
>
(linked to the "outside kubernetes" part) to setup up Kafka using the helm
config. The only modifications are for the "outside kubernetes" part.
>

I've tried a few variations. None of them worked. I still can't connect
to it.
- on an Lambda in the same subnet, on an EC2 machine in the same subnet, on
a
- with a couple different "outside kubernetes" options.

E.g. if I setup external using LoadBalancer, I'll get something an External
IP like (fake) 
http://afdsafsafsafdas-13412341.us-east-1.elb.amazon.com:19092.

If I try to run a basic command against this domain, it fails saying there
is no such host for kafka-0.cluster.local:19092.


Kafka JMX monitoring

2020-03-18 Thread 张祥
Hi,

I want to know what the best practice to collect Kafka JMX metrics is. I
haven't found a decent way to collect and parse JMX in Java (because it is
too much) and I learn that there are tools like tools like jmxtrans to do
this. I wonder if there is more. Thanks. Regards.