Suggestions on canarying traffic for kafka consumers.
Hi Folks, Canarying traffic is an excellent way of reducing the impact when releasing a new release with a bug. Such canarying is somewhat easier with a few queueing backends like sqs & redis. In sqs for example each application container/instance of canary can self regulate how much throughput they process after looking at how much throughput the rest of the containers processed using some quota logic. But coming to the kafka consumers, since partitions are assigned to application containers/instances, I'm finding it a bit hard to decide on a way to split the traffic between the canary & production application deployments. As of now these are a few thoughts in my mind. ### Kafka Consumer as Separate Deployment that makes RPC calls to application containers In this approach, I was thinking to run kafka consumer as a separate deployment that makes rpc calls to application containers via load balancer or envoy. The load balancer/envoy will help in splitting the traffic between canary and production containers. ### Kafka Stateful Proxy This approach was that there is a kafka proxy wrapper on top of kafka brokers which runs kafka consumer groups for each topic and partition assignment strategy mirrors that of the broker's partition assignment. This mirroring assignment is to ensure that load is equally split, and if it's not equally split the problem is not with the proxy but the problem is at the broker level itself where partitions themselves are unbalanced across brokers. applications containers poll for new items from this proxy and report back to this proxy once that processing has finished - essentially we are abstracting out the polling loop to maintain a listener to poll items and report back the status on them. can be implemented as push or pull method based pros and cons of each approach. ### Use of other queueing backends like SQS A separate kafka consumer group deployment can be made that exposes kafka topics under sqs queues for each application use case. Although this is stacking multiple components and looks unintuitive, this solution seems to be the most simplest of all and is flexible to implement other functionalities on top ### Topic Splitting Each kafka consumer use case sets up an infrastructure component to create their own canary/prod topics that are created from the main kafka topic according to the canary traffic percentage. This solution felt like a complex one and ruled it out, would be interesting if anyone had any positive thoughts on this approach. ### 2 Kafka Consumer Groups Canary and Production deployments use their own kafka consumer groups and both use a hash function to decide which deployment can process which item, for x percentage of the traffic to be driven via canary something like `digest(partition, offset) % 100 < x` can be used. There is the problem of resource wastage in this approach, but it is still a very decent approach to splitting the traffic between canary & production. --- I wanted to get some inputs on the above approaches and most importantly want to see how others are solving this problem of canarying kafka event streams and maybe help in re-enforcing some of the above approaches. Thanks Srinivas SRE @ zomato @eightnoteight
Re: MirrorMaker 2 Reload Configuration
Hi All, After inspecting a few internal topics and running console consumer to see the payloads in the mm2-configs topic identified that properties are indeed not getting refreshed, I've assumed that mm2 internally is joining the existing cluster, so to refresh config I've tried to completely stop the mm2 cluster i.e reduce the mm2 deployment capacity to 0 and waited for a couple of mins and increased the capacity again back to our previous number. With this approach the mm2 started to load the configuration from the mm2.properties back again. Thanks On Fri, Nov 13, 2020 at 4:02 PM Péter Sinóros-Szabó wrote: > > Hi Ryanne, > > I will open an issue in Jira. > I see mm2-config and mm2-status topics on both the source and destination > clusters. > Should I purge all of them? Or is it enough to purge just the destination > topics? > > Thanks, > Peter > > On Wed, 11 Nov 2020 at 19:33, Ryanne Dolan wrote: > > > Hey guys, this is because the configuration gets loaded into the internal > > mm2-config topics, and these may get out of sync with the mm2.properties > > file in some scenarios. I believe this occurs whenever an old/bad > > configuration gets written to Kafka, which MM2 can read successfully but > > which causes MM2 to get stuck before it can write any updates back to the > > mm2-config topics. Just modifying the mm2.properties file does not resolve > > the issue, since Workers read from the mm2-config topics, not the > > mm2.properties file directly. > > > > The fix is to truncate or delete the mm2-config and mm2-status topics. N.B. > > do _not_ delete the mm2-offsets topics, as this would cause MM2 to > > restart replication from offset 0. > > > > I'm not sure why deleting these topics works, but it seems to cause Connect > > to wait for the new configuration to be loaded from mm2.properties, rather > > than reading the old configuration from mm2-config and getting stuck. > > > > Can someone report the issue in jira? > > > > Ryanne > > > > On Wed, Nov 11, 2020 at 9:35 AM Péter Sinóros-Szabó > > wrote: > > > > > Hi, > > > > > > I have a similar issue. I changed the source cluster bootstrap address > > and > > > MM2 picked it up only partially. Some parts of it still use the old > > > address, some the new. The old and the new address list is routed to the > > > same cluster, same brokers, just on a different network path. > > > > > > So is there any way to force the configuration update? > > > > > > Cheers, > > > Peter > > > > > > On Wed, 4 Nov 2020 at 18:39, Ning Zhang wrote: > > > > > > > if your new topics are not named "topic1" or "topic2", maybe you want > > to > > > > use regex * to allow more topics to be considered by Mm2 > > > > > > > > # regex which defines which topics gets replicated. For eg "foo-.*" > > > > src-cluster->dst-cluster.topics = topic1,topic2 > > > > > > > > On 2020/10/30 01:48:00, "Devaki, Srinivas" > > > > wrote: > > > > > Hi Folks, > > > > > > > > > > I'm running mirror maker as a dedicated cluster as given in the > > > > > mirrormaker 2 doc. but for some reason when I add new topics and > > > > > deploy the mirror maker it's not detecting the new topics at all, > > even > > > > > the config dumps in the mirror maker startup logs don't show the > > newly > > > > > added topics. > > > > > > > > > > I've attached the config that I'm using, initially I assumed that > > > > > there might be some refresh configuration option either in connect or > > > > > mirror maker, but the connect rest api doesn't seem to be working in > > > > > this mode and also couldn't find any refresh configuration option. > > > > > > > > > > Any ideas on this? Thank you in advance > > > > > > > > > > ``` > > > > > clusters = src-cluster, dst-cluster > > > > > > > > > > # disable topic prefixes > > > > > src-cluster.replication.policy.separator = > > > > > dst-cluster.replication.policy.separator = > > > > > replication.policy.separator = > > > > > source.cluster.alias = > > > > > target.cluster.alias = > > > > > > > > > > > > > > > # enable idemptotence > > > > > source.cluster.producer.enable.idempotence =
Re: MirrorMaker 2 Reload Configuration
Hi Folks, I've also ran a console-consumer on the `mm2-configs` kafka topic created by the mirror maker and found that even after the restart of the mirror maker 2 with new config, the config registered in the mm2-configs kafka topic is still pointing to a legacy mirror maker configuration. Thanks On Fri, Oct 30, 2020 at 7:18 AM Devaki, Srinivas wrote: > > Hi Folks, > > I'm running mirror maker as a dedicated cluster as given in the > mirrormaker 2 doc. but for some reason when I add new topics and > deploy the mirror maker it's not detecting the new topics at all, even > the config dumps in the mirror maker startup logs don't show the newly > added topics. > > I've attached the config that I'm using, initially I assumed that > there might be some refresh configuration option either in connect or > mirror maker, but the connect rest api doesn't seem to be working in > this mode and also couldn't find any refresh configuration option. > > Any ideas on this? Thank you in advance > > ``` > clusters = src-cluster, dst-cluster > > # disable topic prefixes > src-cluster.replication.policy.separator = > dst-cluster.replication.policy.separator = > replication.policy.separator = > source.cluster.alias = > target.cluster.alias = > > > # enable idemptotence > source.cluster.producer.enable.idempotence = true > target.cluster.producer.enable.idempotence = true > > # connection information for each cluster > # This is a comma separated host:port pairs for each cluster > # for e.g. "A_host1:9092, A_host2:9092, A_host3:9092" > src-cluster.bootstrap.servers = > sng-kfnode1.internal:9092,sng-kfnode1.internal:9092,sng-kfnode1.internal:9092 > dst-cluster.bootstrap.servers = > prod-online-v2-kafka-1.internal:9092,prod-online-v2-kafka-2.internal:9092,prod-online-v2-kafka-3.internal:9092,prod-online-v2-kafka-4.internal:9092,prod-online-v2-kafka-5.internal:9092 > > # regex which defines which topics gets replicated. For eg "foo-.*" > src-cluster->dst-cluster.topics = topic1,topic2 > > # client-id > src-cluster.client.id = prod-mm2-onlinev1-to-onlinev2-consumer-v0 > dst-cluster.client.id = prod-mm2-onlinev1-to-onlinev2-producer-v0 > > > # group.instance.id=_mirror_make_instance_1 > # consumer should periodically emit heartbeats > src-cluster->dst-cluster.consumer.auto.offset.reset = earliest > src-cluster->dst-cluster.consumer.overrides.auto.offset.reset = earliest > > # connector should periodically emit heartbeats > src-cluster->dst-cluster.emit.heartbeats.enabled = true > > # frequency of heartbeats, default is 5 seconds > src-cluster->dst-cluster.emit.heartbeats.interval.seconds = 10 > > # connector should periodically emit consumer offset information > src-cluster->dst-cluster.emit.checkpoints.enabled = true > > # frequency of checkpoints, default is 5 seconds > src-cluster->dst-cluster.emit.checkpoints.interval.seconds = 10 > > # whether to monitor source cluster ACLs for changes > src-cluster->dst-cluster.sync.topic.acls.enabled = false > > # whether or not to monitor source cluster for configuration changes > src-cluster->dst-cluster.sync.topic.configs.enabled = true > # add retention.ms to the default list given in the > DefaultConfigPropertyFilter > # > https://github.com/apache/kafka/blob/889fd31b207b86db6d059792131d14389639d9e4/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java#L33-L38 > src-cluster->dst-cluster.config.properties.blacklist = > follower\\.replication\\.throttled\\.replicas, \ > > leader\\.replication\\.throttled\\.replicas, \ > > message\\.timestamp\\.difference\\.max\\.ms, \ > > message\\.timestamp\\.type, \ > > unclean\\.leader\\.election\\.enable, \ >min\\.insync\\.replicas, \ >retention\\.ms > > # connector should periodically check for new topics > src-cluster->dst-cluster.refresh.topics.enabled = true > > # frequency to check source cluster for new topics, default is 5 seconds > src-cluster->dst-cluster.refresh.topics.interval.seconds = 300 > > # enable and configure individual replication flows > src-cluster->dst-cluster.enabled = true > dst-cluster->src-cluster.enabled = false > > > # Setting replication factor of newly created remote topics > # replication.factor=2 > > # Internal Topic Settings > # > # The replication factor for mm2 internal topics "heartbeats", > "B.checkpoints.internal" and > # "mm2-offset-syncs.B.internal" >
MirrorMaker 2 Reload Configuration
Hi Folks, I'm running mirror maker as a dedicated cluster as given in the mirrormaker 2 doc. but for some reason when I add new topics and deploy the mirror maker it's not detecting the new topics at all, even the config dumps in the mirror maker startup logs don't show the newly added topics. I've attached the config that I'm using, initially I assumed that there might be some refresh configuration option either in connect or mirror maker, but the connect rest api doesn't seem to be working in this mode and also couldn't find any refresh configuration option. Any ideas on this? Thank you in advance ``` clusters = src-cluster, dst-cluster # disable topic prefixes src-cluster.replication.policy.separator = dst-cluster.replication.policy.separator = replication.policy.separator = source.cluster.alias = target.cluster.alias = # enable idemptotence source.cluster.producer.enable.idempotence = true target.cluster.producer.enable.idempotence = true # connection information for each cluster # This is a comma separated host:port pairs for each cluster # for e.g. "A_host1:9092, A_host2:9092, A_host3:9092" src-cluster.bootstrap.servers = sng-kfnode1.internal:9092,sng-kfnode1.internal:9092,sng-kfnode1.internal:9092 dst-cluster.bootstrap.servers = prod-online-v2-kafka-1.internal:9092,prod-online-v2-kafka-2.internal:9092,prod-online-v2-kafka-3.internal:9092,prod-online-v2-kafka-4.internal:9092,prod-online-v2-kafka-5.internal:9092 # regex which defines which topics gets replicated. For eg "foo-.*" src-cluster->dst-cluster.topics = topic1,topic2 # client-id src-cluster.client.id = prod-mm2-onlinev1-to-onlinev2-consumer-v0 dst-cluster.client.id = prod-mm2-onlinev1-to-onlinev2-producer-v0 # group.instance.id=_mirror_make_instance_1 # consumer should periodically emit heartbeats src-cluster->dst-cluster.consumer.auto.offset.reset = earliest src-cluster->dst-cluster.consumer.overrides.auto.offset.reset = earliest # connector should periodically emit heartbeats src-cluster->dst-cluster.emit.heartbeats.enabled = true # frequency of heartbeats, default is 5 seconds src-cluster->dst-cluster.emit.heartbeats.interval.seconds = 10 # connector should periodically emit consumer offset information src-cluster->dst-cluster.emit.checkpoints.enabled = true # frequency of checkpoints, default is 5 seconds src-cluster->dst-cluster.emit.checkpoints.interval.seconds = 10 # whether to monitor source cluster ACLs for changes src-cluster->dst-cluster.sync.topic.acls.enabled = false # whether or not to monitor source cluster for configuration changes src-cluster->dst-cluster.sync.topic.configs.enabled = true # add retention.ms to the default list given in the DefaultConfigPropertyFilter # https://github.com/apache/kafka/blob/889fd31b207b86db6d059792131d14389639d9e4/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java#L33-L38 src-cluster->dst-cluster.config.properties.blacklist = follower\\.replication\\.throttled\\.replicas, \ leader\\.replication\\.throttled\\.replicas, \ message\\.timestamp\\.difference\\.max\\.ms, \ message\\.timestamp\\.type, \ unclean\\.leader\\.election\\.enable, \ min\\.insync\\.replicas, \ retention\\.ms # connector should periodically check for new topics src-cluster->dst-cluster.refresh.topics.enabled = true # frequency to check source cluster for new topics, default is 5 seconds src-cluster->dst-cluster.refresh.topics.interval.seconds = 300 # enable and configure individual replication flows src-cluster->dst-cluster.enabled = true dst-cluster->src-cluster.enabled = false # Setting replication factor of newly created remote topics # replication.factor=2 # Internal Topic Settings # # The replication factor for mm2 internal topics "heartbeats", "B.checkpoints.internal" and # "mm2-offset-syncs.B.internal" # For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3. checkpoints.topic.replication.factor=3 # 14 days checkpoints.topic.retention.ms=120960 heartbeats.topic.replication.factor=3 offset-syncs.topic.replication.factor=3 # The replication factor for connect internal topics "mm2-configs.B.internal", "mm2-offsets.B.internal" and # "mm2-status.B.internal" # For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3. offset.storage.replication.factor=3 status.storage.replication.factor=3 config.storage.replication.factor=3 # customize as needed # replication.policy.separator = _ # sync.topic.acls.enabled = false # emit.heartbeats.interval.seconds = 5 ``` Thanks
Re: Consumer Group rebalancing events
Hi Tess D'erberwill, Таисия Дорошенков, Thanks a lot for the reply on this, one major issue I found with our existing metrics is that they are not granular enough i.e if the rebalancing finishes under 10 seconds then I might lose that event in the metrics if my metrics scrape interval is 10 seconds. Any suggestions around how to log the exact events than metrics(major concern is around losing the events) Thanks On Tue, Aug 25, 2020 at 7:59 PM Tess D'erberwill wrote: > Hi, Srinivas Devaki ! > > There are Kafka JMX metrics - NumGroupsCompletingRebalance and > NumGroupsPreparingRebalance. I think you can monitor them by some > application and calculate statistic. > As for us - we added Jolokia agent to our Kafka docker image and Jolokia > expose all JMX metrics via HTTP. We run additional service which gets > necessary metrics from this endpoint and analyse them. > > On 2020/08/23 16:39:35, "Devaki, Srinivas" > wrote: > > Hi Folks, > > > > I’m trying to find the consumer group rebalancing events, so to plot how > > much time consumer groups usually take in rebalancing during application > > deployments. > > > > I’ve tried to check the logs of `server.log`, `controller.log`, > > `state-change.log`. but couldn’t find anything about rebalancing. > > > > > > Thanks & Regards > > Srinivas Devaki > > sre @ zomato > > >
Consumer Group rebalancing events
Hi Folks, I’m trying to find the consumer group rebalancing events, so to plot how much time consumer groups usually take in rebalancing during application deployments. I’ve tried to check the logs of `server.log`, `controller.log`, `state-change.log`. but couldn’t find anything about rebalancing. Thanks & Regards Srinivas Devaki sre @ zomato
Re: Resource based kafka assignor
Also, want to clarify one more doubt, is there any way for the client to explicitly trigger a rebalance without dying itself? On Thu, Jan 30, 2020 at 7:54 PM Devaki, Srinivas wrote: > Hi All, > > We have a set of logstash consumer groups running under the same set of > instances, we have decided to run separate consumer groups subscribing > multiple topics instead of running single consumer group for all topics(the > reasoning behind this decision is because of how our elasticsearch cluster > is designed). > > Since we are running multiple consumer groups, sometimes we have detected > that a few ec2 nodes are receiving multiple high throughput topics in > different consumer groups. which was expected based on the implementation > of round robin assignor. > > So I've decided to make a partition assignor which will consider the > assignment based on other consumer group assignment. > > Could you please give me some pointers on how to proceed. This is my > initial ideas on the problem. > > Solution #0: > write an assignor, and use a specific consumer id pattern across all > consumer groups, and in the assignor do a describe on all consumer groups > and based on the topic throughput and the other consumer group assignment > decide the assignment of this topic > > > > > Thanks >
Resource based kafka assignor
Hi All, We have a set of logstash consumer groups running under the same set of instances, we have decided to run separate consumer groups subscribing multiple topics instead of running single consumer group for all topics(the reasoning behind this decision is because of how our elasticsearch cluster is designed). Since we are running multiple consumer groups, sometimes we have detected that a few ec2 nodes are receiving multiple high throughput topics in different consumer groups. which was expected based on the implementation of round robin assignor. So I've decided to make a partition assignor which will consider the assignment based on other consumer group assignment. Could you please give me some pointers on how to proceed. This is my initial ideas on the problem. Solution #0: write an assignor, and use a specific consumer id pattern across all consumer groups, and in the assignor do a describe on all consumer groups and based on the topic throughput and the other consumer group assignment decide the assignment of this topic Thanks