Rebalancing and Its Impact
Hi, I'm new to Kafka and I'm trying to design a wrapper library in both Java and Go (uses Confluent/Kafka-Go) for Kafka to be used internally. For my use-case, CommitSync is a crucial step and we should do a read only after properly committing the old one. Repeated processing is not a big issue and our client service is idempotent enough. But data loss is a major issue and should not occur. I will create X number of consumers initially and will keep on polling from them. Hence I would like to know more about the negative scenario's that could happen here, Impact of them and how to properly handle them. I would like to know more about: 1) Network issue during consumer processing: What happens when network goes of for a brief period and comes back? Does Kafka consumer automatically handle this and becomes alive when network comes back or do we have to reinitialise them? If they come back alive do they resume work from where they left of? Eg: Consumer X read 50 records from Partition Y. Now internally the consumer offset moved to +50. But before committing network issue happens and the comes back alive. Now will the consumer have the metadata about what it read for last poll. Can it go on to commit +50 in offset? 2) Rebalancing in consumer groups. Impact of them on existing consumer process - whether the existing working consumer instance will pause and resume work during a rebalance or do we have to reinitialize them? How long can rebalance occur? If the consumer comes back alive after rebalance, does it have metadata about it last read? 3) What happens when a consumer joins during a rebalancing. Ideally it is again a rebalancing scenario. What will happen now? The existing will be discarded and the new one starts or will wait for the existing rebalance to complete? Kindly help me understanding these scenarios is detail and suggest solutions if possible. Also it would be much more helpful, if you could point me to a resource - an online article / book or anything that provides insight into the intricate details of Kafka. Thanks and Regards, Janardhanan V S
Unusual initial rebalance delay
Hi, I do see a delay of about 4 to 5 minutes for initial rebalance to trigger when using KafkaConsumer.subscribe(Pattern pattern, ConsumerRebalanceListener listener) signature to subscribe. Due to this, none of the subscribers are fetching any messages, for that duration, although messages are present in the Kafka topic that my subscribers are subscribing to. I am using Camel Kafka to subscribe to Kafka topics. I looked into Camel Kafka implementation to eliminate the possibility that Camel Kafka is possibly causing issues here. When I switch to KafkaConsumer.subscribe(Collection topics), it just works fine and no delays noticed. I am using Kafka v2.0.1 with Scala v2.12 and Camel Kafka v2.23.x Below is a portion of log lines showing rebalance delay. Any clue on how to go about identifying root cause? [2019-03-15 15:52:08,143] INFO [Log partition=queue.REF_REF_TestInboundOrdered-15, dir=/kafka/kafka-logs-58c06252374b] Loading producer state till offset 0 with message format version 2 (kafka.log.Log) [2019-03-15 15:52:08,143] INFO [Log partition=queue.REF_REF_TestInboundOrdered-15, dir=/kafka/kafka-logs-58c06252374b] Completed load of log with 1 segments, log start offset 0 and log end offset 0 in 1 ms (kafka.log.Log) [2019-03-15 15:52:08,144] INFO Created log for partition queue.REF_REF_TestInboundOrdered-15 in /kafka/kafka-logs-58c06252374b with properties {compression.type -> producer, message.format.version -> 2.0-IV1, file.delete.delay.ms -> 6, max.message.bytes -> 11534336, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, message.downconversion.enable -> true, min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 8640, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, segment.ms -> 60480, segment.bytes -> 262144000, retention.ms -> 60480, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages -> 9223372036854775807}. (kafka.log.LogManager) [2019-03-15 15:52:08,144] INFO [Partition queue.REF_REF_TestInboundOrdered-15 broker=1001] No checkpointed highwatermark is found for partition queue.REF_REF_TestInboundOrdered-15 (kafka.cluster.Partition) [2019-03-15 15:52:08,144] INFO Replica loaded for partition queue.REF_REF_TestInboundOrdered-15 with initial high watermark 0 (kafka.cluster.Replica) [2019-03-15 15:52:08,144] INFO [Partition queue.REF_REF_TestInboundOrdered-15 broker=1001] queue.REF_REF_TestInboundOrdered-15 starts at Leader Epoch 0 from offset 0. Previous Leader Epoch was: -1 (kafka.cluster.Partition) [2019-03-15 15:52:08,149] INFO [ReplicaAlterLogDirsManager on broker 1001] Added fetcher for partitions List() (kafka.server.ReplicaAlterLogDirsManager) [2019-03-15 15:56:09,739] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 5 milliseconds. (kafka.coordinator.group.GroupMetadataManager) [2019-03-15 15:56:25,030] INFO [GroupCoordinator 1001]: Preparing to rebalance group queue.REF_REF_TestInboundOrdered_GROUP with old generation 1 (__consumer_offsets-23) (kafka.coordinator.group.GroupCoordinator) [2019-03-15 15:56:25,103] INFO [GroupCoordinator 1001]: Stabilized group queue.REF_REF_TestInboundOrdered_GROUP generation 2 (__consumer_offsets-23) (kafka.coordinator.group.GroupCoordinator) [2019-03-15 15:56:25,106] INFO [GroupCoordinator 1001]: Assignment received from leader for group queue.REF_REF_TestInboundOrdered_GROUP for generation 2 (kafka.coordinator.group.GroupCoordinator) Regards Viswa Ramamoorthy
Preventing blocking upon producer buffer full in Kafka 2.1
Hi all, I was wondering how I could prevent blocking when using KafkaProducer to send records with a full buffer. I noticed from the v0.9 docs that there was a block.on.buffer.full config that could be set to false to achieve that behaviour...however, that was deprecated and is unavailable in v2.1. Would setting max.block.ms to 0 behave the same? If not, what's the best way to do so? Thanks, Justin Borromeo
Re: [EXTERNAL] Re: Permanent topic
Hi, According to cloudera manager kafka documentation: log.retention.ms: The maximum time before a new log segment is rolled out. If both log.retention.ms and log.retention.bytes are set, a segment is deleted when either limit is exceeded. The special value of -1 is interpreted as unlimited. This property is used in Kafka 1.4.0 and later in place of log.retention.hours. Regards, Harpreet Singh On Fri, Mar 15, 2019 at 1:58 PM M. Manna wrote: > There is difference between retention.bytes and retention.ms. Yes, > retention.bytes can be set to -1, but nowhere in the docs says about > retention.ms = -1 > > It might be possible that using -1 it's accepting as a "Value" - which > means it might not be validating. But it's not on official docs. I would > not be using this if official docs isn't supporting the same. > > Could anyone else shed some light on this? > > Thanks, > > > > On Fri, 15 Mar 2019 at 17:27, Maxim Kheyfets > wrote: > > > Good time of the day, > > > > We are using kafka 1.0.1, and want to create a permanent topic. One > online > > post suggests setting retention.ms and retention.bytes to -1. The sample > > below shows system accepts -1 correctly, but I don't see this documented > > anywhere explicitly in the official documentation. > > > > Could you confirm, and/or point me to the right official page? > > > > Thank you, > > Maxim > > > > > > kafka-topics.sh --create --zookeeper zk.local/kafka --replication-factor > 3 > > --partitions 30 --topic maxim-test > > kafka-configs.sh --zookeeper zk.local/kafka --entity-type topics > > --entity-name maxim-test --alter --add-config retention.ms=-1 > > kafka-configs.sh --zookeeper zk.local/kafka --entity-type topics > > --entity-name maxim-test --alter --add-config retention.bytes=-1 > > > > --describe shows it as successful: > > zk.local > > > > Topic:maxim-test PartitionCount:30 ReplicationFactor:3 > > Configs:retention.ms=-1,retention.bytes=-1 > > Topic: msg-opt-in Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 > Topic: > > msg-opt-in Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1 Topic: > > msg-opt-in Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2 Topic: > > msg-opt-in Partition: 3 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2 Topic: > > msg-opt-in Partition: 4 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3 Topic: > > msg-opt-in Partition: 5 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1 Topic: > > msg-opt-in Partition: 6 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 Topic: > > msg-opt-in Partition: 7 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1 Topic: > > msg-opt-in Partition: 8 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2 Topic: > > msg-opt-in Partition: 9 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2 Topic: > > msg-opt-in Partition: 10 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3 Topic: > > msg-opt-in Partition: 11 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1 Topic: > > msg-opt-in Partition: 12 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 Topic: > > msg-opt-in Partition: 13 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1 Topic: > > msg-opt-in Partition: 14 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2 Topic: > > msg-opt-in Partition: 15 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2 Topic: > > msg-opt-in Partition: 16 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3 Topic: > > msg-opt-in Partition: 17 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1 Topic: > > msg-opt-in Partition: 18 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 Topic: > > msg-opt-in Partition: 19 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1 Topic: > > msg-opt-in Partition: 20 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2 Topic: > > msg-opt-in Partition: 21 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2 Topic: > > msg-opt-in Partition: 22 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3 Topic: > > msg-opt-in Partition: 23 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1 Topic: > > msg-opt-in Partition: 24 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 Topic: > > msg-opt-in Partition: 25 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1 Topic: > > msg-opt-in Partition: 26 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2 Topic: > > msg-opt-in Partition: 27 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2 Topic: > > msg-opt-in Partition: 28 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3 Topic: > > msg-opt-in Partition: 29 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1 > > -- > > > > > > > > > > *Maxim Kheyfets* > > Senior DevOps Engineer > > > > maxim.kheyf...@clearme.com | > https://urldefense.proofpoint.com/v2/url?u=http-3A__www.clearme.com&d=DwIBaQ&c=T4LuzJg_R6QwRnqJoo4xTCUXoKbdWTdhZj7r4OYEklY&r=LvsTbNOOBqkmhXCFn1QQ5IEQuGkvpWEqGs0cuadvTrw&m=GVHaG2_fBmuKp3BtO6Jo9hbRbNtKd2HBoZt-e612NAw&s=c8TyGxG5Biv0hzLPlDLpf8SEP0MSnxgUd_VhGQ4o_4Y&e= > > >
Re: Permanent topic
There is difference between retention.bytes and retention.ms. Yes, retention.bytes can be set to -1, but nowhere in the docs says about retention.ms = -1 It might be possible that using -1 it's accepting as a "Value" - which means it might not be validating. But it's not on official docs. I would not be using this if official docs isn't supporting the same. Could anyone else shed some light on this? Thanks, On Fri, 15 Mar 2019 at 17:27, Maxim Kheyfets wrote: > Good time of the day, > > We are using kafka 1.0.1, and want to create a permanent topic. One online > post suggests setting retention.ms and retention.bytes to -1. The sample > below shows system accepts -1 correctly, but I don't see this documented > anywhere explicitly in the official documentation. > > Could you confirm, and/or point me to the right official page? > > Thank you, > Maxim > > > kafka-topics.sh --create --zookeeper zk.local/kafka --replication-factor 3 > --partitions 30 --topic maxim-test > kafka-configs.sh --zookeeper zk.local/kafka --entity-type topics > --entity-name maxim-test --alter --add-config retention.ms=-1 > kafka-configs.sh --zookeeper zk.local/kafka --entity-type topics > --entity-name maxim-test --alter --add-config retention.bytes=-1 > > --describe shows it as successful: > zk.local > > Topic:maxim-test PartitionCount:30 ReplicationFactor:3 > Configs:retention.ms=-1,retention.bytes=-1 > Topic: msg-opt-in Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 Topic: > msg-opt-in Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1 Topic: > msg-opt-in Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2 Topic: > msg-opt-in Partition: 3 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2 Topic: > msg-opt-in Partition: 4 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3 Topic: > msg-opt-in Partition: 5 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1 Topic: > msg-opt-in Partition: 6 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 Topic: > msg-opt-in Partition: 7 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1 Topic: > msg-opt-in Partition: 8 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2 Topic: > msg-opt-in Partition: 9 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2 Topic: > msg-opt-in Partition: 10 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3 Topic: > msg-opt-in Partition: 11 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1 Topic: > msg-opt-in Partition: 12 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 Topic: > msg-opt-in Partition: 13 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1 Topic: > msg-opt-in Partition: 14 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2 Topic: > msg-opt-in Partition: 15 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2 Topic: > msg-opt-in Partition: 16 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3 Topic: > msg-opt-in Partition: 17 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1 Topic: > msg-opt-in Partition: 18 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 Topic: > msg-opt-in Partition: 19 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1 Topic: > msg-opt-in Partition: 20 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2 Topic: > msg-opt-in Partition: 21 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2 Topic: > msg-opt-in Partition: 22 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3 Topic: > msg-opt-in Partition: 23 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1 Topic: > msg-opt-in Partition: 24 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 Topic: > msg-opt-in Partition: 25 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1 Topic: > msg-opt-in Partition: 26 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2 Topic: > msg-opt-in Partition: 27 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2 Topic: > msg-opt-in Partition: 28 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3 Topic: > msg-opt-in Partition: 29 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1 > -- > > > > > *Maxim Kheyfets* > Senior DevOps Engineer > > maxim.kheyf...@clearme.com | www.clearme.com >
Permanent topic
Good time of the day, We are using kafka 1.0.1, and want to create a permanent topic. One online post suggests setting retention.ms and retention.bytes to -1. The sample below shows system accepts -1 correctly, but I don't see this documented anywhere explicitly in the official documentation. Could you confirm, and/or point me to the right official page? Thank you, Maxim kafka-topics.sh --create --zookeeper zk.local/kafka --replication-factor 3 --partitions 30 --topic maxim-test kafka-configs.sh --zookeeper zk.local/kafka --entity-type topics --entity-name maxim-test --alter --add-config retention.ms=-1 kafka-configs.sh --zookeeper zk.local/kafka --entity-type topics --entity-name maxim-test --alter --add-config retention.bytes=-1 --describe shows it as successful: zk.local Topic:maxim-test PartitionCount:30 ReplicationFactor:3 Configs:retention.ms=-1,retention.bytes=-1 Topic: msg-opt-in Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 Topic: msg-opt-in Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1 Topic: msg-opt-in Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2 Topic: msg-opt-in Partition: 3 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2 Topic: msg-opt-in Partition: 4 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3 Topic: msg-opt-in Partition: 5 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1 Topic: msg-opt-in Partition: 6 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 Topic: msg-opt-in Partition: 7 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1 Topic: msg-opt-in Partition: 8 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2 Topic: msg-opt-in Partition: 9 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2 Topic: msg-opt-in Partition: 10 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3 Topic: msg-opt-in Partition: 11 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1 Topic: msg-opt-in Partition: 12 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 Topic: msg-opt-in Partition: 13 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1 Topic: msg-opt-in Partition: 14 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2 Topic: msg-opt-in Partition: 15 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2 Topic: msg-opt-in Partition: 16 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3 Topic: msg-opt-in Partition: 17 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1 Topic: msg-opt-in Partition: 18 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 Topic: msg-opt-in Partition: 19 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1 Topic: msg-opt-in Partition: 20 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2 Topic: msg-opt-in Partition: 21 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2 Topic: msg-opt-in Partition: 22 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3 Topic: msg-opt-in Partition: 23 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1 Topic: msg-opt-in Partition: 24 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 Topic: msg-opt-in Partition: 25 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1 Topic: msg-opt-in Partition: 26 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2 Topic: msg-opt-in Partition: 27 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2 Topic: msg-opt-in Partition: 28 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3 Topic: msg-opt-in Partition: 29 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1 -- *Maxim Kheyfets* Senior DevOps Engineer maxim.kheyf...@clearme.com | www.clearme.com
Re: Kafka - Connect for logs processing
Hi Hans, Thanks for quick response. I am gonna look into it. Thanks Pulkit On Fri, Mar 15, 2019 at 11:39 AM Hans Jespersen wrote: > Take a look at kafka-connect-spooldir and see if it meets your needs. > > https://www.confluent.io/connector/kafka-connect-spooldir/ > > This connector can monitor a directory and pick up any new files that are > created. Great for picking up batch files, parsing them, and publishing > each line as if it were published in realtime. > > -hans > > > On Mar 15, 2019, at 7:52 AM, Pulkit Manchanda > wrote: > > > > Hi All, > > > > I am building a data pipeline to send logs from one data source to the > > other node. > > I am using Kafka Connect standalone for this integration. > > Everything works fine but the problem is on Day1 the log file is renamed > as > > log_Day0 and a new log file log_Day1 is created. > > And my Kafka Connect don't process the new log file. > > Looking for a solution. Any help is appreciated. > > > > Thanks > > Pulkit >
Re: Kafka - Connect for logs processing
Take a look at kafka-connect-spooldir and see if it meets your needs. https://www.confluent.io/connector/kafka-connect-spooldir/ This connector can monitor a directory and pick up any new files that are created. Great for picking up batch files, parsing them, and publishing each line as if it were published in realtime. -hans > On Mar 15, 2019, at 7:52 AM, Pulkit Manchanda wrote: > > Hi All, > > I am building a data pipeline to send logs from one data source to the > other node. > I am using Kafka Connect standalone for this integration. > Everything works fine but the problem is on Day1 the log file is renamed as > log_Day0 and a new log file log_Day1 is created. > And my Kafka Connect don't process the new log file. > Looking for a solution. Any help is appreciated. > > Thanks > Pulkit
Kafka - Connect for logs processing
Hi All, I am building a data pipeline to send logs from one data source to the other node. I am using Kafka Connect standalone for this integration. Everything works fine but the problem is on Day1 the log file is renamed as log_Day0 and a new log file log_Day1 is created. And my Kafka Connect don't process the new log file. Looking for a solution. Any help is appreciated. Thanks Pulkit