RE: Re: MirrorMaker 2 Reload Configuration
Hi Peter, I am running into a similar issue, did you create a JIRA ticket for this? Or is there any workaround you have found for the same? Thanks and regards, Praveen On 2020/11/13 14:45:23 Péter Sinóros-Szabó wrote: > Hi, > > I tried as well to stop all instances of MM2, but it didn't help for me. > I had to stop all MM2 instances, delete the mm2-config and mm2-status > topics on the destination cluster and start up all MM2 instances again. > > Peter >
Re: Client session timed out
Hello All, Please provide some inputs and help to resolve this issue. I have cross verified and it is not a DNS issue. Not sure why we are facing this issue. Should I report a bug for this ? Please let me know. Regards, Praveen Kumar K S On Tue, Oct 27, 2020 at 8:41 PM Sabina Marx wrote: > Does anyone have any idea what we can do? > > All Zookeepers(3) and Kafkas are running. (5 nodes > meaning 5 physical hosts). Then I reboot one physical > host. I still have the redundancy. But when the physical host comes up and > zookeeper and then Kafka come up, I have Kafka timing out and not > connecting to the existing Kafka cluster. > > Log: > Started Apache Kafka. > INFO Registered kafka:type=kafka.Log4jController MBean > (kafka.utils.Log4jControllerRegistration$) > INFO Setting -D jdk.tls.rejectClientInitiatedRenegotiation=true to disable > client-initiated TLS renegotiation (org.apache.zookeeper.common.X509Util) > INFO Registered signal handlers for TERM, INT, HUP > (org.apache.kafka.common.utils.LoggingSignalHandler) > INFO starting (kafka.server.KafkaServer) > INFO Connecting to zookeeper on X.X.X.X:2181,X.X.X.X:2181,X.X.X.X:2181 > (kafka.server.KafkaServer) > INFO [ZooKeeperClient Kafka server] Initializing a new session to > X.X.X.X:2181,X.X.X.X:2181,X.X.X.X:2181. (kafka.zookeeper.ZooKeeperClient) > INFO Client > environment:zookeeper.version=3.5.8-f439ca583e70862c3068a1f2a7d4d068eec33315, > built on 05/04/2020 15:53 GMT (org.apache.zookeeper.ZooKeeper) > INFO Client environment:host.name=Kafka03.X.X > (org.apache.zookeeper.ZooKeeper) > INFO Client environment:java.version=11.0.8 > (org.apache.zookeeper.ZooKeeper) > INFO Client environment:java.vendor=Debian (org.apache.zookeeper.ZooKeeper) > INFO Client environment:java.home=/usr/lib/jvm/java-11-openjdk-amd64 > (org.apache.zookeeper.ZooKeeper) > INFO Client > environment:java.class.path=/opt/kafka/bin/../libs/activation-1.1.1.jar:/opt/kafka/bin/../libs/aopalliance-repackaged-2.5.0.jar:/opt/kafka/bin/../libs/argparse4j-0.7.0.jar:/opt/kafka/bin/../libs/audience-annotations-0.5.0.j > INFO Client > environment:java.library.path=/usr/java/packages/lib:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib > (org.apache.zookeeper.ZooKeeper) > INFO Client environment:java.io.tmpdir=/tmp > (org.apache.zookeeper.ZooKeeper) > INFO Client environment:java.compiler= (org.apache.zookeeper.ZooKeeper) > INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper) > INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper) > INFO Client environment:os.version=4.19.0-10-amd64 > (org.apache.zookeeper.ZooKeeper) > INFO Client environment:user.name=it (org.apache.zookeeper.ZooKeeper) > INFO Client environment:user.home=/home/it (org.apache.zookeeper.ZooKeeper) > INFO Client environment:user.dir=/ (org.apache.zookeeper.ZooKeeper) > INFO Client environment:os.memory.free=980MB > (org.apache.zookeeper.ZooKeeper) > INFO Client environment:os.memory.max=1024MB > (org.apache.zookeeper.ZooKeeper) > INFO Client environment:os.memory.total=1024MB > (org.apache.zookeeper.ZooKeeper) > INFO Initiating client connection, > connectString=X.X.X.X:2181,X.X.X.X:2181,X.X.X.X:2181 sessionTimeout=18000 > watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@48b67364 > (org.apache.zookeeper.ZooKeeper) > INFO jute.maxbuffer value is 4194304 Bytes > (org.apache.zookeeper.ClientCnxnSocket) > INFO zookeeper.request.timeout value is 0. feature enabled= > (org.apache.zookeeper.ClientCnxn) > INFO [ZooKeeperClient Kafka server] Waiting until connected. > (kafka.zookeeper.ZooKeeperClient) > INFO Opening socket connection to server kafka01.X.X/X.X.X.X:2181. Will > not attempt to authenticate using SASL (unknown error) > (org.apache.zookeeper.ClientCnxn) > INFO Socket connection established, initiating session, client: > /X.X.X.X:45952, server: kafka01.X.X/X.X.X.X:2181 > (org.apache.zookeeper.ClientCnxn) > WARN Client session timed out, have not heard from server in 6003ms for > sessionid 0x0 (org.apache.zookeeper.ClientCnxn) > INFO Client session timed out, have not heard from server in 6003ms for > sessionid 0x0, closing socket connection and attempting reconnect > (org.apache.zookeeper.ClientCnxn) > INFO Opening socket connection to server kafka05.X.X/X.X.X.X:2181. Will > not attempt to authenticate using SASL (unknown error) > (org.apache.zookeeper.ClientCnxn) > INFO Socket connection established, initiating session, client: > /X.X.X.X:51582, server: kafka05.X.X/X.X.X.X:2181 > (org.apache.zookeeper.ClientCnxn) > WARN Client session timed out, have not heard from server in 6003ms for > sessionid 0x0 (org.apache.zookeeper.ClientCnxn) > INFO Client session timed out, have
Re: Client session timed out
Hello, Can someone please help me to understand what is the issue ? Regards, Praveen Kumar K S +91-9986855625 On Thu, Oct 22, 2020 at 6:52 AM Praveen Kumar K S wrote: > Hello Experts, > > Any help to debug and resolve this issue is highly appreciated. > > Regards, > Praveen > > On Wed, 21 Oct, 2020, 11:26 Sabina Marx, wrote: > >> Hi Praveen, >> >> it seems to be the same problem, your log looks quite similar to mine. >> But I have no solution until now. >> >> Regards >> Sabina >> >> Von: Praveen Kumar K S >> Antworten an: "users@kafka.apache.org" >> Datum: Dienstag, 20. Oktober 2020 um 20:07 >> An: "users@kafka.apache.org" >> Betreff: Re: Client session timed out >> >> Hello, >> >> I'm not sure if I can add my issue in this thread. But it seems like I'm >> facing the same problem. >> >> KAFKA_VERSION=2.5.1 >> ZK_VERSION=3.5.8 >> >> I run 3 node zookeeper cluster and 3 node kafka cluster as docker >> containers in docker swarm environment. When I install it for first time, >> everything goes well. Zookeeper and Kafka are able to form the cluster. >> Services are healthy. >> >> But when I issue docker update command, kafka is not coming up though the >> zookeeper cluster is healthy. Below is the sequence of steps. >> >> docker service update one_zookeeper --image x.x.x/v1/zookeeper:latest >> --force >> docker service update one_zookeeper1 --image x.x.x/v1/zookeeper:latest >> --force >> docker service update one_zookeeper2 --image x.x.x/v1/zookeeper:latest >> --force >> >> Zookeeper is healthy now. I'm able to query leader and follower. >> >> Now, I'm updating kafka and it doesn't work. >> docker service update one_kafka --image x.x.com/v1/kafka:latest< >> http://x.x.com/v1/kafka:latest> --force >> >> PFA Kafka log. >> >> While kafka update has failed, I see that kafka1 and kafka2 are running >> and healthy. >> >> docker service ls | grep kafka >> one_kafkareplicated 0/1 >> one_kafka1 replicated 1/1 >> one_kafka2 replicated 1/1 >> >> To cross verify, I have just brought down the services zookeeper and >> kafka without data loss. I preserve >> zookeeperdata,zookeeperlogs,zookeepertxns and kafkadata,kafkalogs. >> >> docker stack remove one >> docker stack deploy -c cluster-zookeeper.yml one >> docker stack deploy -c cluster-kafka.yml one >> >> Now, all the services are healthy. >> >> I'm not sure why kafka deployment is failing only during update. There is >> no change in the configuration in either zookeeper or kafka. >> >> Please help me resolve this issue and let me know if you need any >> additional details. >> >> Regards, >> Praveen Kumar K S >> +91-9986855625 >> >> >> On Tue, Oct 20, 2020 at 3:54 PM Sabina Marx > sabina.m...@sneo.io>> wrote: >> Yes, it's the same problem. >> >> Am 19.10.20, 19:50 schrieb "Mich Talebzadeh" > <mailto:mich.talebza...@gmail.com>>: >> >> can you try to disable automatic start and on the node just booted, >> start >> zookeeper first, check the log that it is connected and then start >> Kafka? >> >> I assume everything is set-up OK including in >> $KAFKA_HOME/config/server.properties values for broker.id< >> http://broker.id>, hostname, >> zookeeper.connect=:2181,,server2>:2181, :2181 and >> also >> zookeeper.connection.timeout.ms< >> http://zookeeper.connection.timeout.ms>=6000 (default) >> >> HTH >> >> >> >> >> >> LinkedIn * >> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> < >> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> >* >> >> >> >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any >> loss, damage or destruction of data or any other property which may >> arise >> from relying on this email's technical content is explicitly >> disclaimed. >> The author will in no case be liable for any monetary damages arising >> from >> such loss, damage or destruction. >> >> >> >> >> On Mon, 19 Oct 2020 at 18:17, Sabina Marx > <mailto:sabina.m...@sneo.io>> wrote: >> >> > Yes, you have it >>
Re: Client session timed out
Hello Experts, Any help to debug and resolve this issue is highly appreciated. Regards, Praveen On Wed, 21 Oct, 2020, 11:26 Sabina Marx, wrote: > Hi Praveen, > > it seems to be the same problem, your log looks quite similar to mine. But > I have no solution until now. > > Regards > Sabina > > Von: Praveen Kumar K S > Antworten an: "users@kafka.apache.org" > Datum: Dienstag, 20. Oktober 2020 um 20:07 > An: "users@kafka.apache.org" > Betreff: Re: Client session timed out > > Hello, > > I'm not sure if I can add my issue in this thread. But it seems like I'm > facing the same problem. > > KAFKA_VERSION=2.5.1 > ZK_VERSION=3.5.8 > > I run 3 node zookeeper cluster and 3 node kafka cluster as docker > containers in docker swarm environment. When I install it for first time, > everything goes well. Zookeeper and Kafka are able to form the cluster. > Services are healthy. > > But when I issue docker update command, kafka is not coming up though the > zookeeper cluster is healthy. Below is the sequence of steps. > > docker service update one_zookeeper --image x.x.x/v1/zookeeper:latest > --force > docker service update one_zookeeper1 --image x.x.x/v1/zookeeper:latest > --force > docker service update one_zookeeper2 --image x.x.x/v1/zookeeper:latest > --force > > Zookeeper is healthy now. I'm able to query leader and follower. > > Now, I'm updating kafka and it doesn't work. > docker service update one_kafka --image x.x.com/v1/kafka:latest< > http://x.x.com/v1/kafka:latest> --force > > PFA Kafka log. > > While kafka update has failed, I see that kafka1 and kafka2 are running > and healthy. > > docker service ls | grep kafka > one_kafkareplicated 0/1 > one_kafka1 replicated 1/1 > one_kafka2 replicated 1/1 > > To cross verify, I have just brought down the services zookeeper and kafka > without data loss. I preserve zookeeperdata,zookeeperlogs,zookeepertxns and > kafkadata,kafkalogs. > > docker stack remove one > docker stack deploy -c cluster-zookeeper.yml one > docker stack deploy -c cluster-kafka.yml one > > Now, all the services are healthy. > > I'm not sure why kafka deployment is failing only during update. There is > no change in the configuration in either zookeeper or kafka. > > Please help me resolve this issue and let me know if you need any > additional details. > > Regards, > Praveen Kumar K S > +91-9986855625 > > > On Tue, Oct 20, 2020 at 3:54 PM Sabina Marx sabina.m...@sneo.io>> wrote: > Yes, it's the same problem. > > Am 19.10.20, 19:50 schrieb "Mich Talebzadeh" <mailto:mich.talebza...@gmail.com>>: > > can you try to disable automatic start and on the node just booted, > start > zookeeper first, check the log that it is connected and then start > Kafka? > > I assume everything is set-up OK including in > $KAFKA_HOME/config/server.properties values for broker.id< > http://broker.id>, hostname, > zookeeper.connect=:2181,,server2>:2181, :2181 and > also > > zookeeper.connection.timeout.ms<http://zookeeper.connection.timeout.ms>=6000 > (default) > > HTH > > > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > < > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > >* > > > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for > any > loss, damage or destruction of data or any other property which may > arise > from relying on this email's technical content is explicitly > disclaimed. > The author will in no case be liable for any monetary damages arising > from > such loss, damage or destruction. > > > > > On Mon, 19 Oct 2020 at 18:17, Sabina Marx sabina.m...@sneo.io>> wrote: > > > Yes, you have it > > > > Holen Sie sich Outlook für iOS<https://aka.ms/o0ukef> > > > > Von: Mich Talebzadeh mich.talebza...@gmail.com>> > > Gesendet: Monday, October 19, 2020 7:09:53 PM > > An: users@kafka.apache.org<mailto:users@kafka.apache.org> < > users@kafka.apache.org<mailto:users@kafka.apache.org>> > > Betreff: Re: Client session timed out > > > > Ok I think it is clearer now. > > > > As I understand all your Zookeepers and Kafkas are running. (5 nodes > > meaning 5 physical hosts?). Then you have to reboot one physical > host. You > > sti
Re: Client session timed out
Hello, I'm not sure if I can add my issue in this thread. But it seems like I'm facing the same problem. KAFKA_VERSION=2.5.1 ZK_VERSION=3.5.8 I run 3 node zookeeper cluster and 3 node kafka cluster as docker containers in docker swarm environment. When I install it for first time, everything goes well. Zookeeper and Kafka are able to form the cluster. Services are healthy. But when I issue docker update command, kafka is not coming up though the zookeeper cluster is healthy. Below is the sequence of steps. *docker service update one_zookeeper --image x.x.x/v1/zookeeper:latest --forcedocker service update one_zookeeper1 --image x.x.x/v1/zookeeper:latest --forcedocker service update one_zookeeper2 --image x.x.x/v1/zookeeper:latest --force* Zookeeper is healthy now. I'm able to query leader and follower. Now, I'm updating kafka and it doesn't work. docker service update one_kafka --image x.x.com/v1/kafka:latest --force PFA Kafka log. While kafka update has failed, I see that kafka1 and kafka2 are running and healthy. *docker service ls | grep kafka* *one_kafkareplicated 0/1* *one_kafka1 replicated 1/1* *one_kafka2 replicated 1/1* To cross verify, I have just brought down the services zookeeper and kafka without data loss. I preserve zookeeperdata,zookeeperlogs,zookeepertxns and kafkadata,kafkalogs. *docker stack remove one* *docker stack deploy -c cluster-zookeeper.yml one* *docker stack deploy -c cluster-kafka.yml one* Now, all the services are healthy. I'm not sure why kafka deployment is failing only during update. There is no change in the configuration in either zookeeper or kafka. Please help me resolve this issue and let me know if you need any additional details. Regards, Praveen Kumar K S +91-9986855625 On Tue, Oct 20, 2020 at 3:54 PM Sabina Marx wrote: > Yes, it's the same problem. > > Am 19.10.20, 19:50 schrieb "Mich Talebzadeh" : > > can you try to disable automatic start and on the node just booted, > start > zookeeper first, check the log that it is connected and then start > Kafka? > > I assume everything is set-up OK including in > $KAFKA_HOME/config/server.properties values for broker.id, > hostname, > zookeeper.connect=:2181,,server2>:2181, :2181 and > also > zookeeper.connection.timeout.ms=6000 (default) > > HTH > > > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > < > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > >* > > > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for > any > loss, damage or destruction of data or any other property which may > arise > from relying on this email's technical content is explicitly > disclaimed. > The author will in no case be liable for any monetary damages arising > from > such loss, damage or destruction. > > > > > On Mon, 19 Oct 2020 at 18:17, Sabina Marx wrote: > > > Yes, you have it > > > > Holen Sie sich Outlook für iOS<https://aka.ms/o0ukef> > > > > Von: Mich Talebzadeh > > Gesendet: Monday, October 19, 2020 7:09:53 PM > > An: users@kafka.apache.org > > Betreff: Re: Client session timed out > > > > Ok I think it is clearer now. > > > > As I understand all your Zookeepers and Kafkas are running. (5 nodes > > meaning 5 physical hosts?). Then you have to reboot one physical > host. You > > still have the redundancy. But when the physical host comes up and > your > > zookeeper and then Kafka come up, you have Kafka timing out and not > > connecting to the existing Kafka cluster? > > > > Does that make sense? > > > > > > > > > > > > > > LinkedIn * > > > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > > < > > > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > > >* > > > > > > > > > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility > for any > > loss, damage or destruction of data or any other property which may > arise > > from relying on this email's technical content is explicitly > disclaimed. > > The author will in no case be liable for any monetary damages > arising from > > such loss, damage or destruction. > > > > > > > > > > On Mon, 19 Oct
Kafka Zookeeper Compatibility Matrix
Hello, May I know if kafka 2.5.1 is compatible with zk 3.5.8 ? Please help where can I find the compatibility matrix. Thanks! Regards, Praveen Kumar K S
Re: Kafka upgrade from 0.10 to 2.3.x
Thanks guys. We went with rolling upgrade since we couldn't redo the upgrade test on our test and staging envs. They were already upgraded in rolling fashion and we didn't want to spend cycles redoing the test for downtime upgrade again. If this is possible, it would have been nice to have had the documentation say downtime upgrade is ok. maybe its widely understood that downtime upgrade is ok? But it was a bit confusing when it was mentioned on some upgrade versions but not others. Praveen On Wed, May 13, 2020 at 8:51 AM Israel Ekpo wrote: > It would be a good idea to attempt this upgrade in a different environment > like QA or STAGING to make sure there are no surprises first before > upgrading PRODUCTION > > Going from 0.10 to 2.3.x is a significant jump with potential for issues > > Running a trial in a lower environment with exact setup as PROD will > prepare you better for a production upgrade experience > > > > On Wed, May 13, 2020 at 3:27 AM M. Manna wrote: > > > I have done this before. What Matthias said below is correct. > > > > First you’ve got to stop all apps to prevent data consumption (if that’s > > what you also mean by having downtime) > > > > Then, you can go ahead and replace the bin. > > > > Regards, > > > > On Tue, 12 May 2020 at 18:33, Matthias J. Sax wrote: > > > > > I guess you can just stop all servers, update the binaries (and > > > potentially configs), and afterward restart the servers. > > > > > > Of course, you might want to stop all applications that connect to the > > > cluster first. > > > > > > > > > -Matthias > > > > > > On 5/11/20 9:50 AM, Praveen wrote: > > > > Hi folks, > > > > > > > > I'd like to take downtime to upgrade to 2.3.x from 10.2.1. But I > can't > > > find > > > > it in the doc for 2.3.x upgrade that I can take downtime to do this. > > The > > > > instructions are for rolling upgrade only. > > > > > > > > Has anyone tried this? > > > > > > > > Praveen > > > > > > > > > > > > >
Kafka upgrade from 0.10 to 2.3.x
Hi folks, I'd like to take downtime to upgrade to 2.3.x from 10.2.1. But I can't find it in the doc for 2.3.x upgrade that I can take downtime to do this. The instructions are for rolling upgrade only. Has anyone tried this? Praveen
Re: Flink vs Kafka streams
I have not found relying on partitions for parallelism as a disadvantage. At flurry, we have several pipelines using both lower level API Kafka (for legacy reasons) and kafka streams + kafka connect. They process over 10B events per day at around 200k rps. We also use the same system to send over 10M notifications per day. Just to give you an example of a non-deterministic traffic pattern. - Praveen On Fri, Nov 8, 2019 at 10:43 PM Navneeth Krishnan wrote: > Thanks Peter, even with ECS we have autoscaling enabled but the issue is > during autoscaling we need to stop the job and start with new > parallelism which creates a downtime. > > Thanks > > On Fri, Nov 8, 2019 at 1:01 PM Peter Groesbeck > wrote: > > > We use EMR instead of ECS but if that’s an option for your team, you can > > configure auto scaling rules in your cloud formation so that your > task/job > > load dynamically controls cluster sizing. > > > > Sent from my iPhone > > > > > On Nov 8, 2019, at 1:40 AM, Navneeth Krishnan < > reachnavnee...@gmail.com> > > wrote: > > > > > > Hello All, > > > > > > I have a streaming job running in production which is processing over 2 > > > billion events per day and it does some heavy processing on each event. > > We > > > have been facing some challenges in managing flink in production like > > > scaling in and out, restarting the job with savepoint etc. Flink > > provides a > > > lot of features which seemed as an obvious choice at that time but now > > with > > > all the operational overhead we are thinking should we still use flink > > for > > > our stream processing requirements or choose kafka streams. > > > > > > We currently deploy flink on ECR. Bringing up a new cluster for another > > > stream job is too expensive but on the flip side running it on the same > > > cluster becomes difficult since there are no ways to say this job has > to > > be > > > run on a dedicated server versus this can run on a shared instance. > Also > > > savepoint point, cancel and submit a new job results in some downtime. > > The > > > most critical part being there is no shared state among all tasks sort > > of a > > > global state. We sort of achieve this today using an external redis > cache > > > but that incurs cost as well. > > > > > > If we are moving to kafka streams, it makes our deployment life much > > > easier, each new stream job will be a microservice that can scale > > > independently. With global state it's much easier to share state > without > > > using external cache. But the disadvantage is we have to rely on the > > > partitions for parallelism. Although this might initially sound easier, > > > when we need to scale much higher this will become a bottleneck. > > > > > > Do you guys have any suggestions on this? We need to decide which way > to > > > move forward and any suggestions would be of much greater help. > > > > > > Thanks > > >
Low level kafka consumer API to KafkaStreams App.
Hi there, I have a kafka application that uses kafka consumer low-level api to help us process data from a single partition concurrently. Our use case is to send out 800k messages per sec. We are able to do that with 4 boxes using 10k threads and each request taking 50ms in a thread. (1000/50*1*4) I understand that kafka in general uses partitions as its parallelism model. It is my understanding that if I want the exact same behavior with kafka streams, I'd need to create 40k partitions for this topic. Is that right? What is the overhead on creating thousands of partitions? If we end up wanting to send out millions of messages per second, is increasing the partitions the only way? Best, Praveen
Ksql
Has anybody used ksql? What are limitations of this and what it’s best for. Would appreciate if you can share some information on this. Best Regards, Praveen Joshi Sent from my iPhone
Kafka Consumer Committing Offset Even After Re-Assignment
I have 4 consumers on 2 boxes (running two consumers each) and 16 partitions. Each consumer takes 4 partitions. In Kafka 0.9.0.1, I'm noticing that even when a consumer is no longer assigned the partition, it is able to commit offset to it. *Box 1 Started* t1 - Box 1, Consumer 1 - Owns 8 partitions Box 1, Consumer 2 - Owns 8 partitions Consumers start polling and are submitting tasks to a task pool for processing. *Box 2 Started* t2 - Box 1, Consumer 1 - Owns 4 partitions Box 1, Consumer 2 - Owns 4 partitions Box 2, Consumer 1 - Owns 4 partitions Box 2, Consumer 2 - Owns 4 partitions Partition-1 is now reassigned to Box 2, Consumer 1. But Box 1, Consumer 1 already submitted some of the records for processing when it owned the partition earlier. t3 - Box 1, Consumer 1 - After the tasks finish executing, even tho it longer owns the partition, it is still able to commit the offset t4 - Box 2, Consumer 1 - Commits offsets as well, overwriting offset committed by Box 1, Consumer 1. Is this expected? Should I be using the ConsumerRebalanceListener to prevent commits to partitions not owned by the consumer? - Praveen
Re: Writing streams to kafka topic
Thanks. I was able to quickly build a simple example out of this. Also saw the issue with punctuate and your “tick” feed recommendation for now. - Praveen On Fri, Sep 1, 2017 at 9:48 AM, Matthias J. Sax <matth...@confluent.io> wrote: > Hi, > > this is not supported by the DSL layer. What you would need to do, is to > add a custom stateful transform() operator after there window > (`stream.groupByKey().aggregate().toStream().transform().to()`), that > buffers the output and remembers the latest result. Second, you would > schedule a punctuation that emit the data whenever you want. > > Hope this helps. > > > -Matthias > > On 8/31/17 9:52 PM, Praveen wrote: > > Hi, > > > > I have a use case where I want to schedule processing of events in the > > future. I am not really sure if this a proper use of stream processing > > application. But I was looking at KTable and kafka streams api to see if > > this was possible. > > > > So far the pattern I have is: > > FEED -> changelog stream -> groupByKey() -> window -> write to > > different kafka topic > > > > The window here i believe would be the TumblingWindow for my use case. > I'd > > like to write back to a kafka topic only after the window retention ends. > > The documentation > > <http://docs.confluent.io/current/streams/developer- > guide.html#writing-streams-back-to-kafka> > > says that streams may only be written "continuously" to the kafka topic. > Is > > that the case? > > > > - Praveen > > > >
Writing streams to kafka topic
Hi, I have a use case where I want to schedule processing of events in the future. I am not really sure if this a proper use of stream processing application. But I was looking at KTable and kafka streams api to see if this was possible. So far the pattern I have is: FEED -> changelog stream -> groupByKey() -> window -> write to different kafka topic The window here i believe would be the TumblingWindow for my use case. I'd like to write back to a kafka topic only after the window retention ends. The documentation <http://docs.confluent.io/current/streams/developer-guide.html#writing-streams-back-to-kafka> says that streams may only be written "continuously" to the kafka topic. Is that the case? - Praveen
Fat partition - Kafka Spark streaming
Hi, We have a Kafka spark streaming integrated app that listens to twitter and pushes the tweets to Kafka and which is later consumed by spark app. We are constantly seeing one of the Kafka partitions always having more data than the other partitions. Not able to zero in on the root cause. We use tweet id as the key and based on which we even partition. We established that tweet ids have very equal distribution (snowflake) don't see any issues with distribution (% even, % prime, % odd number of partitions). But still partition 3 has more data and the offset range of this partition is always more than the other partitions offset range. Any suggestions or directions to debug this further would be much appreciated. Thank you. Gurupraveen
Re: Where is offset recorded?
Kafka used to use zookeeper for managing offsets. But I believe it has since changed to storing offset in a separate topic called __consumer_offsets. This info is there in the documentation. See here: https://kafka.apache.org/090/documentation.html#impl_offsettracking On Mon, Feb 20, 2017 at 5:02 PM, Jean Changyiwrote: > Hi, > > Thank you for reading my question. > > I'm still using kafka-2.1.1-0.9.0.0. From some article I learned that > offsets were stored in zookeeper, however I can only find some z-nodes > about > console-consumer in my zookeeper. > > My question is where is mine offset record? That means where can I find my > consumer-groups' offset record? By the way, I found some binary files in my > kafka log menu (/tmp/kafka-logs in my computer): > /tmp/kafka-logs/__consumer_offset-${number}/, do they have something to do > with offsets? > > > > Best regards > > > > --- > > Jean > > > > jeanking...@gmail.com > > > > > >
Re: How does one deploy to consumers without causing re-balancing for real time use case?
Hey Onur, I was just watching your talk on rebalancing from last year - https://www.youtube.com/watch?v=QaeXDh12EhE Nice talk!. I think I have an idea as to why it takes 1 hr in my case based on the talk in the video. In my case with 32 boxes / consumers from the same group, I believe the current state of the group coordinator's state machine gets messed up each time a new one is added until the very last consumer. Also I have a heartbeat set to 97 seconds (97 secs b/c normal processing could take that long and we don't want coordinator to think consumer is dead). I think both of these coupled together is why the cluster restart takes > 1hr. I'm curious how linkedin does clean cluster restarts? How do you handle the scenario described above? Praveen On Wed, Feb 15, 2017 at 10:22 AM, Praveen <praveev...@gmail.com> wrote: > I still think a clean cluster start should not take > 1 hr for balancing > though. Is this expected or am i doing something different? > > I thought this would be a common use case. > > Praveen > > On Fri, Feb 10, 2017 at 10:26 AM, Onur Karaman < > okara...@linkedin.com.invalid> wrote: > >> Pradeep is right. >> >> close() will try and send out a LeaveGroupRequest while a kill -9 will >> not. >> >> On Fri, Feb 10, 2017 at 10:19 AM, Pradeep Gollakota <pradeep...@gmail.com >> > >> wrote: >> >> > I believe if you're calling the .close() method on shutdown, then the >> > LeaveGroupRequest will be made. If you're doing a kill -9, I'm not sure >> if >> > that request will be made. >> > >> > On Fri, Feb 10, 2017 at 8:47 AM, Praveen <praveev...@gmail.com> wrote: >> > >> > > @Pradeep - I just read your thread, the 1hr pause was when all the >> > > consumers where shutdown simultaneously. I'm testing out rolling >> restart >> > > to get the actual numbers. The initial numbers are promising. >> > > >> > > `STOP (1) (1min later kicks off) -> REBALANCE -> START (1) -> >> REBALANCE >> > > (takes 1min to get a partition)` >> > > >> > > In your thread, Ewen says - >> > > >> > > "The LeaveGroupRequest is only sent on a graceful shutdown. If a >> > > consumer knows it is going to >> > > shutdown, it is good to proactively make sure the group knows it >> needs to >> > > rebalance work because some of the partitions that were handled by the >> > > consumer need to be handled by some other group members." >> > > >> > > So does this mean that the consumer should inform the group ahead of >> > > time before it goes down? Currently, I just shutdown the process. >> > > >> > > >> > > On Fri, Feb 10, 2017 at 8:35 AM, Pradeep Gollakota < >> pradeep...@gmail.com >> > > >> > > wrote: >> > > >> > > > I asked a similar question a while ago. There doesn't appear to be a >> > way >> > > to >> > > > not triggering the rebalance. But I'm not sure why it would be >> taking > >> > > 1hr >> > > > in your case. For us it was pretty fast. >> > > > >> > > > https://www.mail-archive.com/users@kafka.apache.org/msg23925.html >> > > > >> > > > >> > > > >> > > > On Fri, Feb 10, 2017 at 4:28 AM, Krzysztof Lesniewski, Nexiot AG < >> > > > krzysztof.lesniew...@nexiot.ch> wrote: >> > > > >> > > > > Would be great to get some input on it. >> > > > > >> > > > > - Krzysztof Lesniewski >> > > > > >> > > > > >> > > > > On 06.02.2017 08:27, Praveen wrote: >> > > > > >> > > > >> I have a 16 broker kafka cluster. There is a topic with 32 >> > partitions >> > > > >> containing real time data and on the other side, I have 32 boxes >> w/ >> > 1 >> > > > >> consumer reading from these partitions. >> > > > >> >> > > > >> Today our deployment strategy is stop, deploy and start the >> > processes >> > > on >> > > > >> all the 32 consumers. This triggers re-balancing and takes a long >> > > period >> > > > >> of >> > > > >> time (> 1hr). Such a long pause isn't good for real time >> processing. >> > > > >> >> > > > >> I was thinking of rolling deploy but I think that will still >> cause >> > > > >> re-balancing b/c we will still have consumers go down and come >> up. >> > > > >> >> > > > >> How do you deploy to consumers without triggering re-balancing >> (or >> > > > >> triggering one that doesn't affect your SLA) when doing real time >> > > > >> processing? >> > > > >> >> > > > >> Thanks, >> > > > >> Praveen >> > > > >> >> > > > >> >> > > > > >> > > > >> > > >> > >> > >
Re: How does one deploy to consumers without causing re-balancing for real time use case?
I still think a clean cluster start should not take > 1 hr for balancing though. Is this expected or am i doing something different? I thought this would be a common use case. Praveen On Fri, Feb 10, 2017 at 10:26 AM, Onur Karaman < okara...@linkedin.com.invalid> wrote: > Pradeep is right. > > close() will try and send out a LeaveGroupRequest while a kill -9 will not. > > On Fri, Feb 10, 2017 at 10:19 AM, Pradeep Gollakota <pradeep...@gmail.com> > wrote: > > > I believe if you're calling the .close() method on shutdown, then the > > LeaveGroupRequest will be made. If you're doing a kill -9, I'm not sure > if > > that request will be made. > > > > On Fri, Feb 10, 2017 at 8:47 AM, Praveen <praveev...@gmail.com> wrote: > > > > > @Pradeep - I just read your thread, the 1hr pause was when all the > > > consumers where shutdown simultaneously. I'm testing out rolling > restart > > > to get the actual numbers. The initial numbers are promising. > > > > > > `STOP (1) (1min later kicks off) -> REBALANCE -> START (1) -> REBALANCE > > > (takes 1min to get a partition)` > > > > > > In your thread, Ewen says - > > > > > > "The LeaveGroupRequest is only sent on a graceful shutdown. If a > > > consumer knows it is going to > > > shutdown, it is good to proactively make sure the group knows it needs > to > > > rebalance work because some of the partitions that were handled by the > > > consumer need to be handled by some other group members." > > > > > > So does this mean that the consumer should inform the group ahead of > > > time before it goes down? Currently, I just shutdown the process. > > > > > > > > > On Fri, Feb 10, 2017 at 8:35 AM, Pradeep Gollakota < > pradeep...@gmail.com > > > > > > wrote: > > > > > > > I asked a similar question a while ago. There doesn't appear to be a > > way > > > to > > > > not triggering the rebalance. But I'm not sure why it would be > taking > > > > 1hr > > > > in your case. For us it was pretty fast. > > > > > > > > https://www.mail-archive.com/users@kafka.apache.org/msg23925.html > > > > > > > > > > > > > > > > On Fri, Feb 10, 2017 at 4:28 AM, Krzysztof Lesniewski, Nexiot AG < > > > > krzysztof.lesniew...@nexiot.ch> wrote: > > > > > > > > > Would be great to get some input on it. > > > > > > > > > > - Krzysztof Lesniewski > > > > > > > > > > > > > > > On 06.02.2017 08:27, Praveen wrote: > > > > > > > > > >> I have a 16 broker kafka cluster. There is a topic with 32 > > partitions > > > > >> containing real time data and on the other side, I have 32 boxes > w/ > > 1 > > > > >> consumer reading from these partitions. > > > > >> > > > > >> Today our deployment strategy is stop, deploy and start the > > processes > > > on > > > > >> all the 32 consumers. This triggers re-balancing and takes a long > > > period > > > > >> of > > > > >> time (> 1hr). Such a long pause isn't good for real time > processing. > > > > >> > > > > >> I was thinking of rolling deploy but I think that will still cause > > > > >> re-balancing b/c we will still have consumers go down and come up. > > > > >> > > > > >> How do you deploy to consumers without triggering re-balancing (or > > > > >> triggering one that doesn't affect your SLA) when doing real time > > > > >> processing? > > > > >> > > > > >> Thanks, > > > > >> Praveen > > > > >> > > > > >> > > > > > > > > > > > > > > >
Re: How does one deploy to consumers without causing re-balancing for real time use case?
@Pradeep - I just read your thread, the 1hr pause was when all the consumers where shutdown simultaneously. I'm testing out rolling restart to get the actual numbers. The initial numbers are promising. `STOP (1) (1min later kicks off) -> REBALANCE -> START (1) -> REBALANCE (takes 1min to get a partition)` In your thread, Ewen says - "The LeaveGroupRequest is only sent on a graceful shutdown. If a consumer knows it is going to shutdown, it is good to proactively make sure the group knows it needs to rebalance work because some of the partitions that were handled by the consumer need to be handled by some other group members." So does this mean that the consumer should inform the group ahead of time before it goes down? Currently, I just shutdown the process. On Fri, Feb 10, 2017 at 8:35 AM, Pradeep Gollakota <pradeep...@gmail.com> wrote: > I asked a similar question a while ago. There doesn't appear to be a way to > not triggering the rebalance. But I'm not sure why it would be taking > 1hr > in your case. For us it was pretty fast. > > https://www.mail-archive.com/users@kafka.apache.org/msg23925.html > > > > On Fri, Feb 10, 2017 at 4:28 AM, Krzysztof Lesniewski, Nexiot AG < > krzysztof.lesniew...@nexiot.ch> wrote: > > > Would be great to get some input on it. > > > > - Krzysztof Lesniewski > > > > > > On 06.02.2017 08:27, Praveen wrote: > > > >> I have a 16 broker kafka cluster. There is a topic with 32 partitions > >> containing real time data and on the other side, I have 32 boxes w/ 1 > >> consumer reading from these partitions. > >> > >> Today our deployment strategy is stop, deploy and start the processes on > >> all the 32 consumers. This triggers re-balancing and takes a long period > >> of > >> time (> 1hr). Such a long pause isn't good for real time processing. > >> > >> I was thinking of rolling deploy but I think that will still cause > >> re-balancing b/c we will still have consumers go down and come up. > >> > >> How do you deploy to consumers without triggering re-balancing (or > >> triggering one that doesn't affect your SLA) when doing real time > >> processing? > >> > >> Thanks, > >> Praveen > >> > >> > > >
How does one deploy to consumers without causing re-balancing for real time use case?
I have a 16 broker kafka cluster. There is a topic with 32 partitions containing real time data and on the other side, I have 32 boxes w/ 1 consumer reading from these partitions. Today our deployment strategy is stop, deploy and start the processes on all the 32 consumers. This triggers re-balancing and takes a long period of time (> 1hr). Such a long pause isn't good for real time processing. I was thinking of rolling deploy but I think that will still cause re-balancing b/c we will still have consumers go down and come up. How do you deploy to consumers without triggering re-balancing (or triggering one that doesn't affect your SLA) when doing real time processing? Thanks, Praveen
Re: Is it a bad idea to use periods within a consumer group name? "my-service.topic1_consumer_group"
Not that I know of. We at Flurry have been using periods in our group names for a while now and haven't encountered any issues b/c of that. On Tue, Dec 13, 2016 at 5:13 PM, Jeff Widmanwrote: > I vaguely remember reading somewhere that it's a bad idea to use periods > within Kafka consumer group names because it can potentially conflict with > metric names. > > I've searched, and not finding anything, so am I just mis-remembering? > > It is operationally convenient because zookeeper CLI allows tab completion > within periods. >
KafkaProducer 0.9.0.1 Client - Async `send` stops sending
I am running a map-reduce job to queue data from HDFS into Kafka. The mappers simply open a file and senda the data to Kafka using KafkaProducer 0.9.0.1 client. The issue I am debugging is that the Kafka Producer async send fails to `send` to brokers after sometime. I was able to capture the behavior with logs enabled. Kafka Producer starts off doing the following: 1. NetworkClient - Initiates connection with nodes 2. NetworkClient - Sends Metadata request 3. KafkaProducer - Sends records down to the rest of the system 4. RecordAccumulator - Allocates new byte message buffer 5. Sender - Creates producer requests to send the message to the brokers. Steps 3 - 5 repeats for a while. Then all of a sudden, when no more records are needed to be sent, the network client just sends metadata requests every minute. My code is waiting on all the async producer sends to complete. I am checking this with a `Future.get()`. When I look at the heap dump, I can still see byte buffers in the RecordAccumulator's BufferPool. So it looks like Kafka is not attempting to send those records anymore. Will appreciate if anyone has any insight on what's happening here. I've also attached my thread dump and producer configs. Thanks, Praveen
Re: How to move a broker out of rotation?
Nice. That has some nice set of functionality. Thanks. I'll take a look. Praveen On Thu, Sep 29, 2016 at 4:07 PM, Todd Palino <tpal...@gmail.com> wrote: > There’s not a good answer for this with just the Kafka tools. We opened > sourced the tool that we use for removing brokers and rebalancing > partitions in a cluster: > https://github.com/linkedin/kafka-tools > > So when we want to remove a broker (with an ID of 1 in this example) from a > cluster, we run: > kafka-assigner -z zookeeper.example.com:2181 -e remove -b 1 > > That runs a bunch of partition reassignments to move all replicas off that > broker and distribute them to the other brokers in the cluster. > > -Todd > > > On Thu, Sep 29, 2016 at 3:53 PM, Praveen <praveev...@gmail.com> wrote: > > > I have 16 brokers. Now one of the brokers (B-16) got completely messed up > > and is sent for repair. > > > > But I can still see some partitions including the B-16 in its replicas, > > thereby becoming under-replicated. > > > > Is there a proper way to take broker out of rotation? > > > > Praveen > > > > > > -- > *Todd Palino* > Staff Site Reliability Engineer > Data Infrastructure Streaming > > > > linkedin.com/in/toddpalino >
How to move a broker out of rotation?
I have 16 brokers. Now one of the brokers (B-16) got completely messed up and is sent for repair. But I can still see some partitions including the B-16 in its replicas, thereby becoming under-replicated. Is there a proper way to take broker out of rotation? Praveen
Kafka 0.9.1 - Uneven Partition to Consumer assignment
I have a topic with 16 partitions. I also have 24 consumer threads (8 per process per box) subscribed to that same topic. This configuration ensures that there is plenty of room for 1:1 partition to consumer assignment. And some standby consumers to take over in case the process dies But during a kafka restart (one of many), I somehow ended up in a state where 16 partitions where assigned to only 12 consumer threads. ``` 6 partitions to 4 consumers on Box 1 5 partitions to 4 consumers on Box 2 5 partitions to 4 consumers on Box 3 ``` I believe at least some of the other consumers on each boxes were still active As a result, few of the consumers were overloaded resulting in a bigger lag. I initially thought this was due to kafka rebalancing but that doesn't seem to be the case. I am using the default RangeAssigner. I don't believe that anything will change with RoundRobinAssigner since I only have 1 topic. (Feel free to correct me if I am wrong about the round robin assigner) How can I ensure that partitions to consumer assignment is always uniform? Would appreciate if you have any insights. Thanks, Praveen
Communication between Kafka clients & Kafka
Do Kafka clients(producers & consumers) use rpc to communicate with the Kafka cluster.? Regards, Praveen