Sharing of State Stores

2020-07-29 Thread Charles Devilliers
Hello,

I have some rudimentary questions on state stores. My service is planned to
have two transformers, each listening to a different topic. Both topics
have the same number of partitions and the upstream producers to those
topics are consistent with respect to key schema. My question centers
around the fact that both transformers need to consult and update the same
persistent state store in order to make decisions with respect to record
processing. I am not implementing a custom key partitioner, I'm using the
default. Also, there is no re-keying done by either transformer.

Given the above scenario, I have the following questions:

1) Will a given key always hash to the same kstream consumer group member
for both transformers? You can imagine why this is important given that
they share a state store. My concern is that rebalancing may occur, and
somehow the key space for one of the transformers is moved to another pod,
but not both.

2) If transformer A processes a record R for a given key K, and the shared
state store is updated at key K as a result of that processing, does the
second transformer B have access to the updated state store value as soon
as transformer A is done processing the record? (Assume the record is
updated with a state store put()).

I have been told that in order to ensure that the partition assignments are
consistent across pods, for both input topics, I have to do some exotic
merging of the kstreams that process the input topics, which feels strange
and wrong.

Are there any other constraints or considerations that involve sharing a
state store across transformers that I should be thinking about in my
architecture for this service, but didn't mention?

Thanks for clarifying.


Re: Mirrormaker 2 logs - WARN Catching up to assignment's config offset

2020-07-29 Thread Iftach Ben-Yosef
Hey Ryanne,

Interesting points. I wasn't aware of the number of partitions in relation
to the number of connect workers. I will test this out and update.

Thanks!

On Wed, Jul 29, 2020, 21:01 Ryanne Dolan  wrote:

> Iftach, you can try deleting Connect's internal config and status topics.
> The status topic records, among other things, the offsets within the config
> topics iirc, so if you delete the configs without deleting the status,
> you'll get messages such as those. Just don't delete the mm2-offsets
> topics, as doing so would result in MM2 starting from the beginning of all
> source partitions and re-replicating everything.
>
> You can also check that there are enough partitions in the config and
> status topics to account for all the Connect workers. It's possible that
> you're in a rebalance loop from too many consumers to those internal
> topics.
>
> Ryanne
>
> On Wed, Jul 29, 2020 at 1:03 AM Iftach Ben-Yosef 
> wrote:
>
> > Hello
> >
> > I'm running a mirrormaker 2 cluster which copies from 3 source clusters
> > into 1 destination. Yesterday I restarted the cluster and it took 1 of
> the
> > mirrored topics a pretty long time to recover (2~ hours)
> >
> > Since the restart the mm2 cluster has been sending a lot of these warning
> > messages from all 3 source clusters
> >
> >  WARN [Worker clientId=connect-4, groupId=local-ny-mm2] Catching up to
> > assignment's config offset.
> > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1020)
> >
> > Here is a snippet of how the logs look like
> >
> > TDOUT: [2020-07-29 05:44:17,143] INFO [Worker clientId=connect-2,
> > groupId=local-chi-mm2] Rebalance started
> > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:222)
> > STDOUT: [2020-07-29 05:44:17,143] INFO [Worker clientId=connect-2,
> > groupId=local-chi-mm2] (Re-)joining group
> > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:552)
> > STDOUT: [2020-07-29 05:44:17,144] INFO [Worker clientId=connect-2,
> > groupId=local-chi-mm2] Successfully joined group with generation 9005
> > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:503)
> > STDOUT: [2020-07-29 05:44:17,144] INFO [Worker clientId=connect-2,
> > groupId=local-chi-mm2] Joined group at generation 9005 with protocol
> > version 2 and got assignment: Assignment{error=0,
> > leader='connect-1-cb7aa52c-a29a-4cf9-8f50-b691ba38aa3d',
> > leaderUrl='NOTUSED/local-chi', offset=1178, connectorIds=[], taskIds=[],
> > revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance
> delay: 0
> > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1549)
> > STDOUT: [2020-07-29 05:44:17,144] WARN [Worker clientId=connect-2,
> > groupId=local-chi-mm2] Catching up to assignment's config offset.
> > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1020)
> > STDOUT: [2020-07-29 05:44:17,144] INFO [Worker clientId=connect-2,
> > groupId=local-chi-mm2] Current config state offset -1 is behind group
> > assignment 1178, reading to end of config log
> > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1081)
> > STDOUT: [2020-07-29 05:44:17,183] INFO [Worker clientId=connect-6,
> > groupId=local-chi-mm2] Successfully joined group with generation 9317
> > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:503)
> > STDOUT: [2020-07-29 05:44:17,184] INFO [Worker clientId=connect-6,
> > groupId=local-chi-mm2] Joined group at generation 9317 with protocol
> > version 2 and got assignment: Assignment{error=0,
> > leader='connect-5-9a82cf55-e113-4112-86bf-d3cabcd44f54',
> > leaderUrl='NOTUSED/local-chi', offset=1401, connectorIds=[], taskIds=[],
> > revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance
> delay: 0
> > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1549)
> > STDOUT: [2020-07-29 05:44:17,184] WARN [Worker clientId=connect-6,
> > groupId=local-chi-mm2] Catching up to assignment's config offset.
> > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1020)
> > STDOUT: [2020-07-29 05:44:17,184] INFO [Worker clientId=connect-6,
> > groupId=local-chi-mm2] Current config state offset -1 is behind group
> > assignment 1401, reading to end of config log
> > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1081)
> > STDOUT: [2020-07-29 05:44:17,239] INFO [Worker clientId=connect-8,
> > groupId=local-sa-mm2] Finished reading to end of log and updated config
> > snapshot, new config log offset: -1
> > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1085)
> > STDOUT: [2020-07-29 05:44:17,239] INFO [Worker clientId=connect-8,
> > groupId=local-sa-mm2] Current config state offset -1 does not match group
> > assignment 1387. Forcing rebalance.
> > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1053)
> > STDOUT: [2020-07-29 05:44:17,239] INFO [Worker clientId=connect-8,
> > groupId=local-sa-mm2] Rebalance started
> > 

Re: Mirrormaker 2 logs - WARN Catching up to assignment's config offset

2020-07-29 Thread Ryanne Dolan
Iftach, you can try deleting Connect's internal config and status topics.
The status topic records, among other things, the offsets within the config
topics iirc, so if you delete the configs without deleting the status,
you'll get messages such as those. Just don't delete the mm2-offsets
topics, as doing so would result in MM2 starting from the beginning of all
source partitions and re-replicating everything.

You can also check that there are enough partitions in the config and
status topics to account for all the Connect workers. It's possible that
you're in a rebalance loop from too many consumers to those internal topics.

Ryanne

On Wed, Jul 29, 2020 at 1:03 AM Iftach Ben-Yosef 
wrote:

> Hello
>
> I'm running a mirrormaker 2 cluster which copies from 3 source clusters
> into 1 destination. Yesterday I restarted the cluster and it took 1 of the
> mirrored topics a pretty long time to recover (2~ hours)
>
> Since the restart the mm2 cluster has been sending a lot of these warning
> messages from all 3 source clusters
>
>  WARN [Worker clientId=connect-4, groupId=local-ny-mm2] Catching up to
> assignment's config offset.
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1020)
>
> Here is a snippet of how the logs look like
>
> TDOUT: [2020-07-29 05:44:17,143] INFO [Worker clientId=connect-2,
> groupId=local-chi-mm2] Rebalance started
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:222)
> STDOUT: [2020-07-29 05:44:17,143] INFO [Worker clientId=connect-2,
> groupId=local-chi-mm2] (Re-)joining group
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:552)
> STDOUT: [2020-07-29 05:44:17,144] INFO [Worker clientId=connect-2,
> groupId=local-chi-mm2] Successfully joined group with generation 9005
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:503)
> STDOUT: [2020-07-29 05:44:17,144] INFO [Worker clientId=connect-2,
> groupId=local-chi-mm2] Joined group at generation 9005 with protocol
> version 2 and got assignment: Assignment{error=0,
> leader='connect-1-cb7aa52c-a29a-4cf9-8f50-b691ba38aa3d',
> leaderUrl='NOTUSED/local-chi', offset=1178, connectorIds=[], taskIds=[],
> revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1549)
> STDOUT: [2020-07-29 05:44:17,144] WARN [Worker clientId=connect-2,
> groupId=local-chi-mm2] Catching up to assignment's config offset.
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1020)
> STDOUT: [2020-07-29 05:44:17,144] INFO [Worker clientId=connect-2,
> groupId=local-chi-mm2] Current config state offset -1 is behind group
> assignment 1178, reading to end of config log
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1081)
> STDOUT: [2020-07-29 05:44:17,183] INFO [Worker clientId=connect-6,
> groupId=local-chi-mm2] Successfully joined group with generation 9317
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:503)
> STDOUT: [2020-07-29 05:44:17,184] INFO [Worker clientId=connect-6,
> groupId=local-chi-mm2] Joined group at generation 9317 with protocol
> version 2 and got assignment: Assignment{error=0,
> leader='connect-5-9a82cf55-e113-4112-86bf-d3cabcd44f54',
> leaderUrl='NOTUSED/local-chi', offset=1401, connectorIds=[], taskIds=[],
> revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1549)
> STDOUT: [2020-07-29 05:44:17,184] WARN [Worker clientId=connect-6,
> groupId=local-chi-mm2] Catching up to assignment's config offset.
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1020)
> STDOUT: [2020-07-29 05:44:17,184] INFO [Worker clientId=connect-6,
> groupId=local-chi-mm2] Current config state offset -1 is behind group
> assignment 1401, reading to end of config log
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1081)
> STDOUT: [2020-07-29 05:44:17,239] INFO [Worker clientId=connect-8,
> groupId=local-sa-mm2] Finished reading to end of log and updated config
> snapshot, new config log offset: -1
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1085)
> STDOUT: [2020-07-29 05:44:17,239] INFO [Worker clientId=connect-8,
> groupId=local-sa-mm2] Current config state offset -1 does not match group
> assignment 1387. Forcing rebalance.
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1053)
> STDOUT: [2020-07-29 05:44:17,239] INFO [Worker clientId=connect-8,
> groupId=local-sa-mm2] Rebalance started
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:222)
> STDOUT: [2020-07-29 05:44:17,239] INFO [Worker clientId=connect-8,
> groupId=local-sa-mm2] (Re-)joining group
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:552)
> STDOUT: [2020-07-29 05:44:17,276] INFO [Worker clientId=connect-8,
> groupId=local-sa-mm2] Successfully joined group with generation 9077
> 

Re: Partition assignment not well distributed over threads

2020-07-29 Thread Sophie Blee-Goldman
Hey Giselle,

How many stream threads is each instance configured with? If the total
number of threads
across all instances exceeds the total number of tasks, then some threads
won't get any
assigned tasks. There's a known bug where tasks might not get evenly
distributed over all
instances in this scenario, as Streams would only attempt to balance the
tasks over the
threads. See KAFKA-9173 .
Luckily, this should be fixed in 2.6 which is just about to be
released.

Instances that joined later, or restarted, would be more likely to have
these threads with no
assigned tasks due to the stickiness optimization, as you guessed.

If the problem you've run into is due to running more stream threads than
tasks, I would
recommend just decreasing the number of threads per instance to get a
balanced assignment.
This won't hurt performance in any way since those extra threads would have
just been sitting
idle anyways. Or better yet, upgrade to 2.6.

Regarding the colocation question: no, the assignment doesn't take that
into account at the
moment. Typically Streams applications won't be running on the same machine
as the broker.
Clearly it has been difficult enough to optimize for two things at the same
time, stickiness and
balance, without introducing a third :)

On Wed, Jul 29, 2020 at 4:58 AM Giselle Van Dongen <
giselle.vandon...@klarrio.com> wrote:

> We have a Kafka Streams (2.4) app consisting of 5 instances. It reads from
> a Kafka topic with 20 partitions (5 brokers).
>
> We notice that the partition assignment does not always lead to well
> distributed load over the different threads. We notice this at startup as
> well as after a recovery of a failed thread.
>
> 1. At startup, some instances get a significantly lower load and sometimes
> even no load. It seems like instances that come up slightly later get no
> partitions assigned (because of sticky assignment?).
>
> 2. When one thread (container) dies and comes back it often does not
> receive any or very few partitions to work on. We assume this has to do
> with the sticky assignment. Is there any way we can make this distribution
> more equal?
>
> I was also wondering whether Kafka Streams takes into account colocation
> of Kafka brokers with stream processing threads when assigning partitions.
> Do partitions on brokers get assigned to the streams thread that is
> colocated with it on the same machine?
>


Consumer cannot move past missing offset

2020-07-29 Thread Thomas Becker
We are having issues with some of our older consumers getting stuck reading a 
topic. The issue seems to occur at specific offsets. Here's an excerpt from 
kafka-dump-log on the topic partition around the offset in question:

baseOffset: 13920966 lastOffset: 13920987 count: 6 baseSequence: -1 
lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 49 
isTransactional: false isControl: false position: 98516844 CreateTime: 
1595224747691 size: 4407 magic: 2 compresscodec: NONE crc: 1598305187 isvalid: 
true
| offset: 13920978 CreateTime: 1595224747691 keysize: 36 valuesize: 681 
sequence: -1 headerKeys: []
| offset: 13920979 CreateTime: 1595224747691 keysize: 36 valuesize: 677 
sequence: -1 headerKeys: []
| offset: 13920980 CreateTime: 1595224747691 keysize: 36 valuesize: 680 
sequence: -1 headerKeys: []
| offset: 13920984 CreateTime: 1595224747691 keysize: 36 valuesize: 681 
sequence: -1 headerKeys: []
| offset: 13920985 CreateTime: 1595224747691 keysize: 36 valuesize: 677 
sequence: -1 headerKeys: []
| offset: 13920986 CreateTime: 1595224747691 keysize: 36 valuesize: 680 
sequence: -1 headerKeys: []

This is the last batch in the segment, and the "bad offset" is 13920987. Note 
that is listed as the lastOffset contained in the batch, though it doesn't 
actually exist (this is a compacted topic). When I seek to this offset and 
begin consumption, different things happen depending on which consumer version 
is being used:

0.10.0.0 - throws RecordTooLargeException. To me, this looks like a red 
herring. The old consumer assumes the record must be too large because it got 
some data back but was not able to parse a single record from it.

0.10.1.0 - gets stuck fetching the same empty batch over and over again.

2.4.1 - works properly, fetching offset 13920991 which is the next valid offset 
and continues.

We are running Kafka version 2.4.1 on the broker side. I did some searching in 
JIRA but was unable to find anything to explain this.



--

[https://dts-web-images.s3.amazonaws.com/Images/email+signatures/xperi_117.png]

Tommy Becker
Principal Engineer
Pronouns: he/him/his


O: 919.460.4747
E: thomas.bec...@xperi.com



www.xperi.com



This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of 
Xperi is authorized to conclude any binding agreement on behalf of Xperi by 
email. Binding agreements with Xperi may only be made by a signed written 
agreement.


Partition assignment not well distributed over threads

2020-07-29 Thread Giselle Van Dongen
We have a Kafka Streams (2.4) app consisting of 5 instances. It reads from a 
Kafka topic with 20 partitions (5 brokers). 

We notice that the partition assignment does not always lead to well 
distributed load over the different threads. We notice this at startup as well 
as after a recovery of a failed thread.

1. At startup, some instances get a significantly lower load and sometimes even 
no load. It seems like instances that come up slightly later get no partitions 
assigned (because of sticky assignment?). 

2. When one thread (container) dies and comes back it often does not receive 
any or very few partitions to work on. We assume this has to do with the sticky 
assignment. Is there any way we can make this distribution more equal?

I was also wondering whether Kafka Streams takes into account colocation of 
Kafka brokers with stream processing threads when assigning partitions. Do 
partitions on brokers get assigned to the streams thread that is colocated with 
it on the same machine? 


Re: Re: Kafka compatibility between 2.2.x to 0.10.0.0

2020-07-29 Thread dixingxin...@163.com
Hi Vitalii Stoianov, thanks for the reply.
We are going to upgrade both clients and brokers from 0.10.0.0 to 2.2.x,we have 
multi clusters now, so probably we will upgrade clients first then upgrade 
kafka cluster one by one.
I known this upgrade wont able to roll back, so if someone has done this 
before, you can tell me what problems we are facing. 




Best,
Xingxing Di
 
From: Vitalii Stoianov
Date: 2020-07-25 16:12
To: users
Subject: Re: Kafka compatibility between 2.2.x to 0.10.0.0
Hi  Xingxing Di,
 
Are you going to upgrade clients from 0.10.0.0 to 2.2.x or clients +
brokers ?
1. I think so, for more information on what incompatibilities clients may
notice when working with an old broker please check this page:
https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix
2. Message format change, check upgrade there are some steps after
which you won't be able to roll back the server if something goes wrong:
https://kafka.apache.org/22/documentation.html#upgrade
 
Also check release notes for each release between 0.10.0.0 to 2.2.x. There
can be some crucial information that will impact your decision to upgrade ,
for example for 2.2.1:
https://kafka.apache.org/22/documentation.html#upgrade_221_notable
 
Few important changes to notices:
In kafka 0.10.1. roll log mechanics was changed now it uses kafka message
createTime for more info check:
https://kafka.apache.org/22/documentation.html#upgrade_10_1_breaking, if
the creatTime is incorrect on messages it may lead to some issues on the
broker.
Message format changes in 0.11.0
https://kafka.apache.org/22/documentation.html#upgrade_1100_notable, if the
broker messages format changes to a new one i think there is no way to roll
back.
You can find more on https://kafka.apache.org/22/documentation.html#upgrade
 
Regards,
Vitalii.
 
On Fri, Jul 24, 2020 at 2:03 PM dixingxin...@163.com 
wrote:
 
> Hi kafka comunity:
> I have two questions:
> 1. Is it ok to use kafka-clients 2.2.1(java/scala)  to access kafka
> cluster 0.10.0.0 ?
> 2. Is there any risk to rolling upgrade kafka from 0.10.0.0 to 2.2.x? we
> will probably do the upgrade in a few month.
>
>
>
> Best,
> Xingxing Di
>


Mirrormaker 2 logs - WARN Catching up to assignment's config offset

2020-07-29 Thread Iftach Ben-Yosef
Hello

I'm running a mirrormaker 2 cluster which copies from 3 source clusters
into 1 destination. Yesterday I restarted the cluster and it took 1 of the
mirrored topics a pretty long time to recover (2~ hours)

Since the restart the mm2 cluster has been sending a lot of these warning
messages from all 3 source clusters

 WARN [Worker clientId=connect-4, groupId=local-ny-mm2] Catching up to
assignment's config offset.
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1020)

Here is a snippet of how the logs look like

TDOUT: [2020-07-29 05:44:17,143] INFO [Worker clientId=connect-2,
groupId=local-chi-mm2] Rebalance started
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:222)
STDOUT: [2020-07-29 05:44:17,143] INFO [Worker clientId=connect-2,
groupId=local-chi-mm2] (Re-)joining group
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:552)
STDOUT: [2020-07-29 05:44:17,144] INFO [Worker clientId=connect-2,
groupId=local-chi-mm2] Successfully joined group with generation 9005
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:503)
STDOUT: [2020-07-29 05:44:17,144] INFO [Worker clientId=connect-2,
groupId=local-chi-mm2] Joined group at generation 9005 with protocol
version 2 and got assignment: Assignment{error=0,
leader='connect-1-cb7aa52c-a29a-4cf9-8f50-b691ba38aa3d',
leaderUrl='NOTUSED/local-chi', offset=1178, connectorIds=[], taskIds=[],
revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1549)
STDOUT: [2020-07-29 05:44:17,144] WARN [Worker clientId=connect-2,
groupId=local-chi-mm2] Catching up to assignment's config offset.
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1020)
STDOUT: [2020-07-29 05:44:17,144] INFO [Worker clientId=connect-2,
groupId=local-chi-mm2] Current config state offset -1 is behind group
assignment 1178, reading to end of config log
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1081)
STDOUT: [2020-07-29 05:44:17,183] INFO [Worker clientId=connect-6,
groupId=local-chi-mm2] Successfully joined group with generation 9317
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:503)
STDOUT: [2020-07-29 05:44:17,184] INFO [Worker clientId=connect-6,
groupId=local-chi-mm2] Joined group at generation 9317 with protocol
version 2 and got assignment: Assignment{error=0,
leader='connect-5-9a82cf55-e113-4112-86bf-d3cabcd44f54',
leaderUrl='NOTUSED/local-chi', offset=1401, connectorIds=[], taskIds=[],
revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1549)
STDOUT: [2020-07-29 05:44:17,184] WARN [Worker clientId=connect-6,
groupId=local-chi-mm2] Catching up to assignment's config offset.
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1020)
STDOUT: [2020-07-29 05:44:17,184] INFO [Worker clientId=connect-6,
groupId=local-chi-mm2] Current config state offset -1 is behind group
assignment 1401, reading to end of config log
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1081)
STDOUT: [2020-07-29 05:44:17,239] INFO [Worker clientId=connect-8,
groupId=local-sa-mm2] Finished reading to end of log and updated config
snapshot, new config log offset: -1
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1085)
STDOUT: [2020-07-29 05:44:17,239] INFO [Worker clientId=connect-8,
groupId=local-sa-mm2] Current config state offset -1 does not match group
assignment 1387. Forcing rebalance.
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1053)
STDOUT: [2020-07-29 05:44:17,239] INFO [Worker clientId=connect-8,
groupId=local-sa-mm2] Rebalance started
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:222)
STDOUT: [2020-07-29 05:44:17,239] INFO [Worker clientId=connect-8,
groupId=local-sa-mm2] (Re-)joining group
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:552)
STDOUT: [2020-07-29 05:44:17,276] INFO [Worker clientId=connect-8,
groupId=local-sa-mm2] Successfully joined group with generation 9077
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:503)

Is anyone familiar with this and can help? So far didnt find anything
useful online specifically for mm2. I tried to delete the config topics for
each source cluster from the destination cluster and then restarting the
mm2 service again. This did seem to help somewhat but I am still getting
those warnings and more logs than expected per min.


Thanks,
Iftach

-- 
The above terms reflect a potential business arrangement, are provided 
solely as a basis for further discussion, and are not intended to be and do 
not constitute a legally binding obligation. No legally binding obligations 
will be created, implied, or inferred until an agreement in final form is 
executed in writing by all parties involved.


This email and any 
attachments hereto may be confidential or privileged.  If you