Re: Moving partition(s) to different broker
Thanks, Peter for the quick response. You may need to enable unclean leader election, as well. - We already set unclean leader election to true. Unfortunately both brokers went down within 10 minutes of time. Another option is to increase the replication factor to 3 from 2. Right now the goal is to have no offline partitions in a cluster. Is your goal just to have no offline partitions or to recover the data contained in the affected partition? - Our goal is to have no offline partitions. We are fine with losing the data of affected partition. --Senthil On Tue, Nov 12, 2019 at 1:18 PM Peter Bukowinski wrote: > If the only replicas for that topic partition exist on brokers 15 and 24 > and they are both down, then you cannot recover the partition until either > of them is replaced or repaired and rejoins the cluster. You may need to > enable unclean leader election, as well. > > As you’ve discovered, adding replicas to an offline partition partition > doesn’t work. A topic/partition needs to be in a healthy state for that to > work. > > Is your goal just to have no offline partitions or to recover the data > contained in the affected partition? Producers and consumers should still > be able to access the topic in its current state. > > -- > Peter > > > On Nov 11, 2019, at 11:34 PM, SenthilKumar K > wrote: > > > > Hi Experts, We have seen a problem with partition leader i.e it's set > to -1. > > > > describe o/p: > > Topic: 1453 Partition: 47 Leader: -1 Replicas: 24,15 Isr: 24 > > > > Kafka Version: 2.2.0 > > Replication: 2 > > Partitions: 48 > > > > Brokers 24 ,15 both are down due to disk errors and we lost the partition > > 47. I tried increasing the replica ( 2 to 3 ) of the partition alone > using > > the Kafka partition reassignment tool but that didn't help. > > > > { > > > > "version": 1, > > > > "partitions": [{ > > > > "topic": "1453", > > > > "partition": 47, > > > > "replicas": [22, 11, 5], > > > > "log_dirs": ["any", "any", "any"] > > > > }] > > > > } > > > > > > Reassignment of partition 1453-47 is still in progress - Its stuck more > > than 3 hours. > > > > > > How to recover the partition 47? Pls advise. Thanks! > > > > --Senthil >
Re: Moving partition(s) to different broker
If the only replicas for that topic partition exist on brokers 15 and 24 and they are both down, then you cannot recover the partition until either of them is replaced or repaired and rejoins the cluster. You may need to enable unclean leader election, as well. As you’ve discovered, adding replicas to an offline partition partition doesn’t work. A topic/partition needs to be in a healthy state for that to work. Is your goal just to have no offline partitions or to recover the data contained in the affected partition? Producers and consumers should still be able to access the topic in its current state. -- Peter > On Nov 11, 2019, at 11:34 PM, SenthilKumar K wrote: > > Hi Experts, We have seen a problem with partition leader i.e it's set to -1. > > describe o/p: > Topic: 1453 Partition: 47 Leader: -1 Replicas: 24,15 Isr: 24 > > Kafka Version: 2.2.0 > Replication: 2 > Partitions: 48 > > Brokers 24 ,15 both are down due to disk errors and we lost the partition > 47. I tried increasing the replica ( 2 to 3 ) of the partition alone using > the Kafka partition reassignment tool but that didn't help. > > { > > "version": 1, > > "partitions": [{ > > "topic": "1453", > > "partition": 47, > > "replicas": [22, 11, 5], > > "log_dirs": ["any", "any", "any"] > > }] > > } > > > Reassignment of partition 1453-47 is still in progress - Its stuck more > than 3 hours. > > > How to recover the partition 47? Pls advise. Thanks! > > --Senthil
Moving partition(s) to different broker
Hi Experts, We have seen a problem with partition leader i.e it's set to -1. describe o/p: Topic: 1453 Partition: 47 Leader: -1 Replicas: 24,15 Isr: 24 Kafka Version: 2.2.0 Replication: 2 Partitions: 48 Brokers 24 ,15 both are down due to disk errors and we lost the partition 47. I tried increasing the replica ( 2 to 3 ) of the partition alone using the Kafka partition reassignment tool but that didn't help. { "version": 1, "partitions": [{ "topic": "1453", "partition": 47, "replicas": [22, 11, 5], "log_dirs": ["any", "any", "any"] }] } Reassignment of partition 1453-47 is still in progress - Its stuck more than 3 hours. How to recover the partition 47? Pls advise. Thanks! --Senthil
Re: Flink vs Kafka streams
I have not found relying on partitions for parallelism as a disadvantage. At flurry, we have several pipelines using both lower level API Kafka (for legacy reasons) and kafka streams + kafka connect. They process over 10B events per day at around 200k rps. We also use the same system to send over 10M notifications per day. Just to give you an example of a non-deterministic traffic pattern. - Praveen On Fri, Nov 8, 2019 at 10:43 PM Navneeth Krishnan wrote: > Thanks Peter, even with ECS we have autoscaling enabled but the issue is > during autoscaling we need to stop the job and start with new > parallelism which creates a downtime. > > Thanks > > On Fri, Nov 8, 2019 at 1:01 PM Peter Groesbeck > wrote: > > > We use EMR instead of ECS but if that’s an option for your team, you can > > configure auto scaling rules in your cloud formation so that your > task/job > > load dynamically controls cluster sizing. > > > > Sent from my iPhone > > > > > On Nov 8, 2019, at 1:40 AM, Navneeth Krishnan < > reachnavnee...@gmail.com> > > wrote: > > > > > > Hello All, > > > > > > I have a streaming job running in production which is processing over 2 > > > billion events per day and it does some heavy processing on each event. > > We > > > have been facing some challenges in managing flink in production like > > > scaling in and out, restarting the job with savepoint etc. Flink > > provides a > > > lot of features which seemed as an obvious choice at that time but now > > with > > > all the operational overhead we are thinking should we still use flink > > for > > > our stream processing requirements or choose kafka streams. > > > > > > We currently deploy flink on ECR. Bringing up a new cluster for another > > > stream job is too expensive but on the flip side running it on the same > > > cluster becomes difficult since there are no ways to say this job has > to > > be > > > run on a dedicated server versus this can run on a shared instance. > Also > > > savepoint point, cancel and submit a new job results in some downtime. > > The > > > most critical part being there is no shared state among all tasks sort > > of a > > > global state. We sort of achieve this today using an external redis > cache > > > but that incurs cost as well. > > > > > > If we are moving to kafka streams, it makes our deployment life much > > > easier, each new stream job will be a microservice that can scale > > > independently. With global state it's much easier to share state > without > > > using external cache. But the disadvantage is we have to rely on the > > > partitions for parallelism. Although this might initially sound easier, > > > when we need to scale much higher this will become a bottleneck. > > > > > > Do you guys have any suggestions on this? We need to decide which way > to > > > move forward and any suggestions would be of much greater help. > > > > > > Thanks > > >
Re: Kafka Streams - StateRestoreListener called when new partitions assigned
Hello Javier, When a rebalance happened and the new tasks (hence input partitions) are assigned that need to be restored, the state of the instance would also transit to REBALANCING, and would only be transit back to RUNNING after all tasks have been completed restoring and all are being processed normally. Looking at the docs, I think we can add a specific paragraph on web docs "developer guide" on exactly when the state transition would happen to clarify those things, could you create a JIRA ticket for improving on our docs so that we would not drop this on the floor? Guozhang On Tue, Nov 12, 2019 at 4:16 AM Javier Holguera wrote: > Hi, > > I understand that the state store listener that can be set using > KafkaStreams.setGlobalStateRestoreListener will be invoked when the streams > app starts if it doesn't find the state locally (e.g., running on a > ephemeral docker container). > > However, I wonder if the process happens as well if there is a rebalance > and the streams app gets assigned new partitions for which it doesn't have > state yet. Does the app go into the rebalancing state and stays there until > the state for those new partitions have been restored? Or does it go back > to running even if it hasn't happened yet? > > Thanks. > > Regards, > Javier. > -- -- Guozhang
Kafka Streams - StateRestoreListener called when new partitions assigned
Hi, I understand that the state store listener that can be set using KafkaStreams.setGlobalStateRestoreListener will be invoked when the streams app starts if it doesn't find the state locally (e.g., running on a ephemeral docker container). However, I wonder if the process happens as well if there is a rebalance and the streams app gets assigned new partitions for which it doesn't have state yet. Does the app go into the rebalancing state and stays there until the state for those new partitions have been restored? Or does it go back to running even if it hasn't happened yet? Thanks. Regards, Javier.
Re: MirrorMaker 2 Plugin class loader Error
Rajeev, the config errors are unavoidable at present and can be ignored or silenced. The Plugin error is concerning, and was previously described by Vishal. I suppose it's possible there is a dependency conflict in these builds. Can you send me the hash that you're building from? I'll try to reproduce. Ryanne On Fri, Nov 8, 2019, 7:31 PM Rajeev Chakrabarti wrote: > Hi Folks, > I'm trying to run MM 2 with the current trunk. Also, tried with the 2.4 > branch and I'm getting "ERROR Plugin class loader for connector: > 'org.apache.kafka.connect.mirror.MirrorSourceConnector'" errors for all the > connectors. It does not seem to be creating topics in the destination > cluster but has created the internal topics at both the source and > destination and has populated the heartbeats topic. But none of the source > topics created or replicated. I'm also getting a bunch of "not known > configs" like 'consumer.group.id' was supplied but isn't a known config. > What am I doing wrong? > Regards,Rajeev >
Re: Running Kafka Stream Application in YARN
Consider using Flink, Spark, or Samza instead. Ryanne On Fri, Nov 8, 2019, 4:27 AM Debraj Manna wrote: > Hi > > Is there any documentation or link I can refer to for the steps for > deploying the Kafka Streams application in YARN? > > Kafka Client - 0.11.0.3 > Kafka Broker - 2.2.1 > YARN - 2.6.0 >
Re: Detecting cluster down in consumer
Sachin, assuming you are using something like MM2, I recommend the following approaches: 1) have an external system monitor the clusters and trigger a failover by terminating the existing consumer group and launching a replacement. This can be done manually or can be automated if your infrastructure is sufficiently advanced. MM2's checkpoints make it possible to do this without losing progress or skipping records. 2) add failover logic around your KafkaConsumers to detect failure and reconfigure. 3) run consumer groups in both clusters, i.e. "active/active", with each configured to process records originating in their local cluster only. Set up health checks and a load balancer s.t. producers send to the healthiest cluster. In this approach, no intervention is required to failover or failback. Under normal operation, your secondary consumer group doesn't process anything, but will step in and process new records whenever the secondary cluster becomes active. Ryanne On Mon, Nov 11, 2019, 5:55 AM Sachin Kale wrote: > Hi, > > We are working on a prototype where we write to two Kafka cluster > (primary-secondary) and read from one of them (based on which one is > primary) to increase the availability. There is a flag which is used to > determine which cluster is primary and other becomes secondary. On > detecting primary cluster is down, secondary is promoted to primary. > > How do we detect cluster downtime failures in Kafka Consumer? I tried > different things but poll() makes sure to mask all the exceptions and > returns 0 records. > > > -Sachin- >
Re: [External] AW: Consumer Lags and receive no records anymore
I belive the behavior has changed over time. There is a way to explicitly set a practitioner and they provide: https://github.com/axbaretto/kafka/blob/master/clients/src/main/java/org/apache/kafka/clients/producer/RoundRobinPartitioner.java On 11/10/19, 5:45 AM, "Oliver Eckle" wrote: Hi Dave, thank you . saw some tutorial where they told it otherwise .. which confuses me a litte. If its done round-robin .. my "world view" makes sense again Oliver -Ursprüngliche Nachricht- Von: Tauzell, Dave Gesendet: Freitag, 8. November 2019 16:18 An: users@kafka.apache.org Betreff: Re: [External] AW: Consumer Lags and receive no records anymore A null key results in the client sending to partitions in a round-robin order. Use a key if you want to ensure that specific messages end up on the same partition. -Dave On 11/8/19, 1:06 AM, "Oliver Eckle" wrote: Hi, Don’t get me wrong, I just want to understand what's going on. so how do I figure out, how much partitions are required? Trial and Error? And as far as I understand, if I have null as key for the record, the record is stored in all partitions. Is it then not also processed by each consumer, even if I have more than one consumer? So could you explain, why the consumer stops to get data? Thx -Ursprüngliche Nachricht- Von: M. Manna Gesendet: Freitag, 8. November 2019 00:51 An: Kafka Users Betreff: Re: Consumer Lags and receive no records anymore Hi again, On Thu, 7 Nov 2019 at 23:40, Oliver Eckle wrote: > Hi, > > slow consumers - that could be the case. But why is that an issue? I > mean I try to use kafka exactly for that and the ability to recover. > So e.g if there is some burst scenario where a lot of data arrives and > has to be processed, a "slow consumer" will be the default case. > What I could understand, that having constantly slow consumers will be > an issue, e.g. if there is some compaction on the topic or data will > be erased, without having been read. > > This is what I think about the "lagging topic" > The scenario is like that: > > Producer --- Topic C ---> Consumer --- Processing ---> external REST > Endpoint > > Sending a Record to the external REST Endpoint takes around 300ms. > So if I have the "burst scenario" I mentioned above, there is maybe a > lag of 1000-2000 records. > So consumer is pulling 500 and process them, which means it takes > around 150s for the app to process the records. > This could create some timeouts I guess ... so that’s the reason why I > try to lower the poll records to 50 e.g. cause then is takes only 15s > until the poll is committed. > > Yeah having some liveness probe sounds pretty elegant .. give that a > try ... > Anyway, I need to understand why that is happening to deal with the > scenario the correct way.. killing the consumer after he stops to > consume messages, seems to me more like a workaround. > > Regards > As per your previous replies, if you have 2 partitions with that topic, you can distribute all data between 2 consumers in your cgroup, and process information. But given your data burst case, I would advise you increase your number of partitions and spread the burst across. Just like any other tool, Kafka requires certain level of configuration to achieve what you want. I would recommend you increase your partitions and consumers to spread the load. Regards, > > -Ursprüngliche Nachricht- > Von: M. Manna > Gesendet: Freitag, 8. November 2019 00:24 > An: users@kafka.apache.org > Betreff: Re: Consumer Lags and receive no records anymore > > Hi, > > > On 7 Nov 2019, at 22:39, Oliver Eckle wrote: > > > > Have a consumer group with one consumer for the topic .. by > misunderstanding I have two partitions on the topic .. > > Due to having no key set for the record - I think having several > consumers making no sense, or am I wrong. > > > I am not sure why that would be an issue. If you have 1 consumer your > cgroup, yes all the topic partitions will be assigned to that consumer. > Slow consumer means your consumers aren’t consuming messages as fast > as you are producing (or, fast enough). > > Is there any possibility to work around that? > > Cause for example on lagging topic is put to a external REST > > service, > which takes around 300ms to be handled.
Re: kafka-console-consumer --value-deserializer with access to headers
You have a typo - you mean deserializer Please try again. Regards, On Mon, 11 Nov 2019 at 14:28, Jorg Heymans wrote: > Don't think that option is available there, specifying > 'value.deserializer' in my consumer-config.properties file gives > > [2019-11-11 15:16:26,589] WARN The configuration 'value.serializer' was > supplied but isn't a known config. > (org.apache.kafka.clients.consumer.ConsumerConfig) > > Does there exist a description of what properties the consumer-config > properties file accepts ? I could find only a few references to it in the > documentation. > > Jorg > > On 2019/11/11 13:00:03, "M. Manna" wrote: > > Hi, > > > > > > On Mon, 11 Nov 2019 at 10:58, Jorg Heymans > wrote: > > > > > Hi, > > > > > > I have created a class implementing Deserializer, providing an > > > implementation for > > > > > > public String deserialize(String topic, Headers headers, byte[] data) > > > > > > that does some conditional processing based on headers, and then calls > the > > > other serde method > > > > > > public String deserialize(String topic, byte[] data) > > > > > > What i'm seeing is that kafka-console-consumer only uses the second > method > > > when a value deserializer is specified. Is there a way to force it to > > > invoke the first method, so i can do processing with headers ? I tried > > > implementing the deprecated 'ExtendedSerializer' but it does not make a > > > difference. > > > > > > Thanks, > > > Jorg > > > > > > > Have you tried providing a separate prop file using consumer.config > > argument? Please see the reference here: > > > > --consumer.config Consumer config properties file. > > Note > >that [consumer-property] takes > >precedence over this config. > > > > Try that and see how it goes. > > > > Thanks, > > >
Re: kafka-console-consumer --value-deserializer with access to headers
Don't think that option is available there, specifying 'value.deserializer' in my consumer-config.properties file gives [2019-11-11 15:16:26,589] WARN The configuration 'value.serializer' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig) Does there exist a description of what properties the consumer-config properties file accepts ? I could find only a few references to it in the documentation. Jorg On 2019/11/11 13:00:03, "M. Manna" wrote: > Hi, > > > On Mon, 11 Nov 2019 at 10:58, Jorg Heymans wrote: > > > Hi, > > > > I have created a class implementing Deserializer, providing an > > implementation for > > > > public String deserialize(String topic, Headers headers, byte[] data) > > > > that does some conditional processing based on headers, and then calls the > > other serde method > > > > public String deserialize(String topic, byte[] data) > > > > What i'm seeing is that kafka-console-consumer only uses the second method > > when a value deserializer is specified. Is there a way to force it to > > invoke the first method, so i can do processing with headers ? I tried > > implementing the deprecated 'ExtendedSerializer' but it does not make a > > difference. > > > > Thanks, > > Jorg > > > > Have you tried providing a separate prop file using consumer.config > argument? Please see the reference here: > > --consumer.config Consumer config properties file. > Note >that [consumer-property] takes >precedence over this config. > > Try that and see how it goes. > > Thanks, >
Re: Detecting cluster down in consumer
Hi, On Mon, 11 Nov 2019 at 11:55, Sachin Kale wrote: > Hi, > > We are working on a prototype where we write to two Kafka cluster > (primary-secondary) and read from one of them (based on which one is > primary) to increase the availability. There is a flag which is used to > determine which cluster is primary and other becomes secondary. On > detecting primary cluster is down, secondary is promoted to primary. > > How do we detect cluster downtime failures in Kafka Consumer? I tried > different things but poll() makes sure to mask all the exceptions and > returns 0 records. > > > -Sachin- > These couple of links suggest how to approach it.. https://www.slideshare.net/gwenshap/multicluster-and-failover-for-apache-kafka-kafka-summit-sf-17 https://www.confluent.io/blog/3-ways-prepare-disaster-recovery-multi-datacenter-apache-kafka-deployments If you are in container world (e.g. K8s, YARN or Mesos) - using liveness probe can help you determine if there's been a failover. But on traditional cloud, it's simply a heartbeat mechanism that tells you whether the services are usable or not. An example would be to be setup monitor alerts using SolarWind (or similar monitoring agents) and use Cruise control or Kafka-Monitor to setup alerts. May be others can also suggest something which I cannot think of right now. Thanks,
Re: kafka-console-consumer --value-deserializer with access to headers
Hi, On Mon, 11 Nov 2019 at 10:58, Jorg Heymans wrote: > Hi, > > I have created a class implementing Deserializer, providing an > implementation for > > public String deserialize(String topic, Headers headers, byte[] data) > > that does some conditional processing based on headers, and then calls the > other serde method > > public String deserialize(String topic, byte[] data) > > What i'm seeing is that kafka-console-consumer only uses the second method > when a value deserializer is specified. Is there a way to force it to > invoke the first method, so i can do processing with headers ? I tried > implementing the deprecated 'ExtendedSerializer' but it does not make a > difference. > > Thanks, > Jorg > Have you tried providing a separate prop file using consumer.config argument? Please see the reference here: --consumer.config Consumer config properties file. Note that [consumer-property] takes precedence over this config. Try that and see how it goes. Thanks,
Detecting cluster down in consumer
Hi, We are working on a prototype where we write to two Kafka cluster (primary-secondary) and read from one of them (based on which one is primary) to increase the availability. There is a flag which is used to determine which cluster is primary and other becomes secondary. On detecting primary cluster is down, secondary is promoted to primary. How do we detect cluster downtime failures in Kafka Consumer? I tried different things but poll() makes sure to mask all the exceptions and returns 0 records. -Sachin-
kafka-console-consumer --value-deserializer with access to headers
Hi, I have created a class implementing Deserializer, providing an implementation for public String deserialize(String topic, Headers headers, byte[] data) that does some conditional processing based on headers, and then calls the other serde method public String deserialize(String topic, byte[] data) What i'm seeing is that kafka-console-consumer only uses the second method when a value deserializer is specified. Is there a way to force it to invoke the first method, so i can do processing with headers ? I tried implementing the deprecated 'ExtendedSerializer' but it does not make a difference. Thanks, Jorg