Re: How to acknowledge after consuming the message from Kafka topic?
Hi Daniel, This is my code. Hopes it looks understandable, thanks :) const kafka = require('kafka-node'); const ConsumerGroup = kafka.ConsumerGroup; let options = { kafkaHost: '127.0.0.1:9092', groupId: 'DualTest', autoCommit: false, // autoCommitIntervalMs: 5000, protocol: ['roundrobin'], fromOffset: 'latest', outOfRangeOffset: 'earliest', sessionTimeout: 15000, fetchMaxBytes: 10 * 1024 * 1024, } let consumerGroup = new ConsumerGroup(options, 'topicName') consumerGroup.on('message', (message) => { console.log(message); consumerGroup.commit((err, res) => { if (err) { console.error(err) } else { console.log(res) } }); }); consumerGroup.on('error', (err) => { console.log(`Error Occured ${err}`) }); Here, the autoCommit property is set to false and committing manually by consumerGroup.commit(), but when I restart the consumer it consumes all the offsets from starting. Thanks On Mon, Jan 21, 2019 at 11:48 PM Daniel Hinojosa < dhinoj...@evolutionnext.com> wrote: > Show some code Rahul. > > On Mon, Jan 21, 2019 at 11:02 AM Rahul Singh < > rahul.si...@smartsensesolutions.com> wrote: > > > I am using node-kafka, I have used consumer.commit to commit offsets but > > don't know why when I restart the consumer it consume the committed > > offsets. > > > > Thanks > > > > On Mon, Jan 21, 2019, 10:24 PM Hans Jespersen > > > > Are you using kafka-node or node-rdkafka? In either case you should > call > > > Consumer.commit(cb) or something similar to manually commit offsets > (aka > > > acknowledge messages). > > > > > > Alternatively so can set a config parameter on the consumer to > > autoCommit. > > > > > > https://github.com/SOHU-Co/kafka-node/blob/master/README.md#consumer > > > > > > https://github.com/Blizzard/node-rdkafka/blob/master/README.md > > > > > > -hans > > > > > > > On Jan 21, 2019, at 5:17 AM, Rahul Singh < > > > rahul.si...@smartsensesolutions.com> wrote: > > > > > > > > I am using in Node with node-kafka module. > > > > > > > >> On Mon, Jan 21, 2019 at 6:45 PM M. Manna > wrote: > > > >> > > > >> Please read KafkaConsumer javadoc - your answer is already there. > > > >> > > > >> Thanks, > > > >> > > > >> On Mon, 21 Jan 2019 at 13:13, Rahul Singh < > > > >> rahul.si...@smartsensesolutions.com> wrote: > > > >> > > > >>> Hi All, > > > >>> > > > >>> I am testing kafka locally, I am able to produce and consume > message. > > > >> But, > > > >>> after consuming the message from topic I want to acknowledge. > > > >>> > > > >>> Looking for solution. Please revert if anyone have. > > > >>> > > > >>> Thanks & Regards > > > >>> Rahul Singh > > > >>> > > > >> > > > > > >
Re: How to acknowledge after consuming the message from Kafka topic?
Hi Hans, Let me correct this. I am using kafka-node client not node-kafka. On Tue, Jan 22, 2019 at 3:56 AM Hans Jespersen wrote: > Do you mean this node-kafka from 4 years ago ( > https://github.com/sutoiku/node-kafka)? > > If so that’s a very very old client, only supports Apache Kafka 0.8 and > stores offsets in zookeeper (which Kafka 0.9 and above no longer do). > > I recommend you use a more up to date nodejs kafka client than this one. > > -hans > > > On Jan 21, 2019, at 10:02 AM, Rahul Singh < > rahul.si...@smartsensesolutions.com> wrote: > > > > I am using node-kafka, I have used consumer.commit to commit offsets but > > don't know why when I restart the consumer it consume the committed > offsets. > > > > Thanks > > > >> On Mon, Jan 21, 2019, 10:24 PM Hans Jespersen >> > >> Are you using kafka-node or node-rdkafka? In either case you should call > >> Consumer.commit(cb) or something similar to manually commit offsets (aka > >> acknowledge messages). > >> > >> Alternatively so can set a config parameter on the consumer to > autoCommit. > >> > >> https://github.com/SOHU-Co/kafka-node/blob/master/README.md#consumer > >> > >> https://github.com/Blizzard/node-rdkafka/blob/master/README.md > >> > >> -hans > >> > >>> On Jan 21, 2019, at 5:17 AM, Rahul Singh < > >> rahul.si...@smartsensesolutions.com> wrote: > >>> > >>> I am using in Node with node-kafka module. > >>> > On Mon, Jan 21, 2019 at 6:45 PM M. Manna wrote: > > Please read KafkaConsumer javadoc - your answer is already there. > > Thanks, > > On Mon, 21 Jan 2019 at 13:13, Rahul Singh < > rahul.si...@smartsensesolutions.com> wrote: > > > Hi All, > > > > I am testing kafka locally, I am able to produce and consume message. > But, > > after consuming the message from topic I want to acknowledge. > > > > Looking for solution. Please revert if anyone have. > > > > Thanks & Regards > > Rahul Singh > > > > >> >
Re: NullPointerException in KafkaStreams during startup
That is expected... It's not possible to change the subscription during a rolling restart. You need to stop all instances and afterwards start new instances with the new subscription. I did not look into the details of your change, but you might also need to reset your application before starting new instances, because changing the subscription might be a "breaking" change: https://docs.confluent.io/current/streams/developer-guide/app-reset-tool.html -Matthias On 1/21/19 2:49 PM, Johan Horvius wrote: > Hello! > > I'm having trouble when deploying a new version of a service during the > re-balancing step where the topology doesn't match what KafkaStreams > library assumes and there's a NPE while creating tasks. > > Background info: > I'm running a Spring Boot service which utilizes KafkaStreams, currently > subscribed to two topics that has 10 partitions each. The service is > running in 2 instances for increased reliability and load balancing. > In the next version of the service I've added another stream listening > to a different topic. The service is deployed with a rolling strategy > where first 2 instances of the new version is added and then the old > versions 2 instances are shut down. > > When trying to deploy my new version the partitions are withdrawn and > re-assigned and during the task creation the NPE happens and > KafkaStreams goes into a failed state. > > Kafka is backed by 3 brokers in a cluster. > > I've tried to re-create the scenario in a simpler setting but been > unable to do so. The re-balancing works fine when I try to run it > locally with dummy test topics. > > I'm attaching the log from the service. > > While trying to figure out what was wrong the only conclusion I could > come up with was that KafkaStreams got confused due to building an > original topology and then during re-balance got tasks in another order > and then it did not re-build the internal topology before trying to > create tasks, thus a mismatch between KafkaStreams node groups > associated with a task key such as 3_3 would not match up with the > expected consumer/producer-combo. > > Hopefully you can shed some lights on what could be wrong. > > Regards > Johan Horvius > > signature.asc Description: OpenPGP digital signature
NullPointerException in KafkaStreams during startup
Hello! I'm having trouble when deploying a new version of a service during the re-balancing step where the topology doesn't match what KafkaStreams library assumes and there's a NPE while creating tasks. Background info: I'm running a Spring Boot service which utilizes KafkaStreams, currently subscribed to two topics that has 10 partitions each. The service is running in 2 instances for increased reliability and load balancing. In the next version of the service I've added another stream listening to a different topic. The service is deployed with a rolling strategy where first 2 instances of the new version is added and then the old versions 2 instances are shut down. When trying to deploy my new version the partitions are withdrawn and re-assigned and during the task creation the NPE happens and KafkaStreams goes into a failed state. Kafka is backed by 3 brokers in a cluster. I've tried to re-create the scenario in a simpler setting but been unable to do so. The re-balancing works fine when I try to run it locally with dummy test topics. I'm attaching the log from the service. While trying to figure out what was wrong the only conclusion I could come up with was that KafkaStreams got confused due to building an original topology and then during re-balance got tasks in another order and then it did not re-build the internal topology before trying to create tasks, thus a mismatch between KafkaStreams node groups associated with a task key such as 3_3 would not match up with the expected consumer/producer-combo. Hopefully you can shed some lights on what could be wrong. Regards Johan Horvius 2019-01-14 08:54:51.855 INFO 1 [ main] trationDelegate$BeanPostProcessorChecker : Bean 'configurationPropertiesRebinderAutoConfiguration' of type [org.springframework.cloud.autoconfigure.ConfigurationPropertiesRebinderAutoConfiguration$$EnhancerBySpringCGLIB$$18dc8d9e] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 2019-01-14 08:54:53.232 INFO 1 [ main] ConfigServicePropertySourceLocator : Located environment: name=notification-service, profiles=[test], label=master, version=6fd287b2d6140f58f59ca90519271d5d95d2e1bc, state=null 2019-01-14 08:54:53.382 INFO 1 [ main] NotificationApplication : The following profiles are active: test 2019-01-14 08:54:56.808 INFO 1 [ main] RepositoryConfigurationDelegate : Bootstrapping Spring Data repositories in DEFAULT mode. 2019-01-14 08:54:57.049 INFO 1 [ main] RepositoryConfigurationDelegate : Finished Spring Data repository scanning in 225ms. Found 9 repository interfaces. 2019-01-14 08:54:58.046 INFO 1 [ main] GenericScope : BeanFactory id=7f4306d9-958f-3b0c-8c7e-3d928ee18106 2019-01-14 08:54:58.111 INFO 1 [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.kafka.annotation.KafkaBootstrapConfiguration' of type [org.springframework.kafka.annotation.KafkaBootstrapConfiguration$$EnhancerBySpringCGLIB$$d0749c24] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 2019-01-14 08:54:59.013 INFO 1 [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.transaction.annotation.ProxyTransactionManagementConfiguration' of type [org.springframework.transaction.annotation.ProxyTransactionManagementConfiguration$$EnhancerBySpringCGLIB$$fcc28aa1] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 2019-01-14 08:54:59.156 INFO 1 [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.hateoas.config.HateoasConfiguration' of type [org.springframework.hateoas.config.HateoasConfiguration$$EnhancerBySpringCGLIB$$7c42d7d3] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 2019-01-14 08:54:59.207 INFO 1 [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.cloud.autoconfigure.ConfigurationPropertiesRebinderAutoConfiguration' of type [org.springframework.cloud.autoconfigure.ConfigurationPropertiesRebinderAutoConfiguration$$EnhancerBySpringCGLIB$$18dc8d9e] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 2019-01-14 08:55:00.683 INFO 1 [ main] TomcatWebServer : Tomcat initialized with port(s): 8080 (http) 2019-01-14 08:55:00.717 INFO 1 [ main] Http11NioProtocol: Initializing ProtocolHandler ["http-nio-8080"] 2019-01-14 08:55:00.754 INFO 1 [ main] StandardService : Starting service [Tomcat] 2019-01-14 08:55:00.756 INFO 1 [ main] StandardEngine : Starting Servlet Engine: Apache Tomcat/9.0.13 2019-01-14 0
Re: How to acknowledge after consuming the message from Kafka topic?
Do you mean this node-kafka from 4 years ago (https://github.com/sutoiku/node-kafka)? If so that’s a very very old client, only supports Apache Kafka 0.8 and stores offsets in zookeeper (which Kafka 0.9 and above no longer do). I recommend you use a more up to date nodejs kafka client than this one. -hans > On Jan 21, 2019, at 10:02 AM, Rahul Singh > wrote: > > I am using node-kafka, I have used consumer.commit to commit offsets but > don't know why when I restart the consumer it consume the committed offsets. > > Thanks > >> On Mon, Jan 21, 2019, 10:24 PM Hans Jespersen > >> Are you using kafka-node or node-rdkafka? In either case you should call >> Consumer.commit(cb) or something similar to manually commit offsets (aka >> acknowledge messages). >> >> Alternatively so can set a config parameter on the consumer to autoCommit. >> >> https://github.com/SOHU-Co/kafka-node/blob/master/README.md#consumer >> >> https://github.com/Blizzard/node-rdkafka/blob/master/README.md >> >> -hans >> >>> On Jan 21, 2019, at 5:17 AM, Rahul Singh < >> rahul.si...@smartsensesolutions.com> wrote: >>> >>> I am using in Node with node-kafka module. >>> On Mon, Jan 21, 2019 at 6:45 PM M. Manna wrote: Please read KafkaConsumer javadoc - your answer is already there. Thanks, On Mon, 21 Jan 2019 at 13:13, Rahul Singh < rahul.si...@smartsensesolutions.com> wrote: > Hi All, > > I am testing kafka locally, I am able to produce and consume message. But, > after consuming the message from topic I want to acknowledge. > > Looking for solution. Please revert if anyone have. > > Thanks & Regards > Rahul Singh > >>
Re: Debugging periodic high kafka log flush times
Hi Rajiv, Did you ever find out what was causing this issue? I notice something similar on my Kafka Cluster only that the 95th percentile of the log flush goes above 10 or 20 Min, and then I start to see under replicated partitions. Besides the difference on time, the scenario is quite the same. The infrastrucure is slighlty diffferent as well but i use even bigger machines and the kafka is version 10.0.2.1. Thanks, Martin Cremona.
Re: How to acknowledge after consuming the message from Kafka topic?
Show some code Rahul. On Mon, Jan 21, 2019 at 11:02 AM Rahul Singh < rahul.si...@smartsensesolutions.com> wrote: > I am using node-kafka, I have used consumer.commit to commit offsets but > don't know why when I restart the consumer it consume the committed > offsets. > > Thanks > > On Mon, Jan 21, 2019, 10:24 PM Hans Jespersen > > Are you using kafka-node or node-rdkafka? In either case you should call > > Consumer.commit(cb) or something similar to manually commit offsets (aka > > acknowledge messages). > > > > Alternatively so can set a config parameter on the consumer to > autoCommit. > > > > https://github.com/SOHU-Co/kafka-node/blob/master/README.md#consumer > > > > https://github.com/Blizzard/node-rdkafka/blob/master/README.md > > > > -hans > > > > > On Jan 21, 2019, at 5:17 AM, Rahul Singh < > > rahul.si...@smartsensesolutions.com> wrote: > > > > > > I am using in Node with node-kafka module. > > > > > >> On Mon, Jan 21, 2019 at 6:45 PM M. Manna wrote: > > >> > > >> Please read KafkaConsumer javadoc - your answer is already there. > > >> > > >> Thanks, > > >> > > >> On Mon, 21 Jan 2019 at 13:13, Rahul Singh < > > >> rahul.si...@smartsensesolutions.com> wrote: > > >> > > >>> Hi All, > > >>> > > >>> I am testing kafka locally, I am able to produce and consume message. > > >> But, > > >>> after consuming the message from topic I want to acknowledge. > > >>> > > >>> Looking for solution. Please revert if anyone have. > > >>> > > >>> Thanks & Regards > > >>> Rahul Singh > > >>> > > >> > > >
Re: How to acknowledge after consuming the message from Kafka topic?
I am using node-kafka, I have used consumer.commit to commit offsets but don't know why when I restart the consumer it consume the committed offsets. Thanks On Mon, Jan 21, 2019, 10:24 PM Hans Jespersen Are you using kafka-node or node-rdkafka? In either case you should call > Consumer.commit(cb) or something similar to manually commit offsets (aka > acknowledge messages). > > Alternatively so can set a config parameter on the consumer to autoCommit. > > https://github.com/SOHU-Co/kafka-node/blob/master/README.md#consumer > > https://github.com/Blizzard/node-rdkafka/blob/master/README.md > > -hans > > > On Jan 21, 2019, at 5:17 AM, Rahul Singh < > rahul.si...@smartsensesolutions.com> wrote: > > > > I am using in Node with node-kafka module. > > > >> On Mon, Jan 21, 2019 at 6:45 PM M. Manna wrote: > >> > >> Please read KafkaConsumer javadoc - your answer is already there. > >> > >> Thanks, > >> > >> On Mon, 21 Jan 2019 at 13:13, Rahul Singh < > >> rahul.si...@smartsensesolutions.com> wrote: > >> > >>> Hi All, > >>> > >>> I am testing kafka locally, I am able to produce and consume message. > >> But, > >>> after consuming the message from topic I want to acknowledge. > >>> > >>> Looking for solution. Please revert if anyone have. > >>> > >>> Thanks & Regards > >>> Rahul Singh > >>> > >> >
Re: How to acknowledge after consuming the message from Kafka topic?
Are you using kafka-node or node-rdkafka? In either case you should call Consumer.commit(cb) or something similar to manually commit offsets (aka acknowledge messages). Alternatively so can set a config parameter on the consumer to autoCommit. https://github.com/SOHU-Co/kafka-node/blob/master/README.md#consumer https://github.com/Blizzard/node-rdkafka/blob/master/README.md -hans > On Jan 21, 2019, at 5:17 AM, Rahul Singh > wrote: > > I am using in Node with node-kafka module. > >> On Mon, Jan 21, 2019 at 6:45 PM M. Manna wrote: >> >> Please read KafkaConsumer javadoc - your answer is already there. >> >> Thanks, >> >> On Mon, 21 Jan 2019 at 13:13, Rahul Singh < >> rahul.si...@smartsensesolutions.com> wrote: >> >>> Hi All, >>> >>> I am testing kafka locally, I am able to produce and consume message. >> But, >>> after consuming the message from topic I want to acknowledge. >>> >>> Looking for solution. Please revert if anyone have. >>> >>> Thanks & Regards >>> Rahul Singh >>> >>
Re: How to acknowledge after consuming the message from Kafka topic?
I am using in Node with node-kafka module. On Mon, Jan 21, 2019 at 6:45 PM M. Manna wrote: > Please read KafkaConsumer javadoc - your answer is already there. > > Thanks, > > On Mon, 21 Jan 2019 at 13:13, Rahul Singh < > rahul.si...@smartsensesolutions.com> wrote: > > > Hi All, > > > > I am testing kafka locally, I am able to produce and consume message. > But, > > after consuming the message from topic I want to acknowledge. > > > > Looking for solution. Please revert if anyone have. > > > > Thanks & Regards > > Rahul Singh > > >
Re: How to acknowledge after consuming the message from Kafka topic?
Please read KafkaConsumer javadoc - your answer is already there. Thanks, On Mon, 21 Jan 2019 at 13:13, Rahul Singh < rahul.si...@smartsensesolutions.com> wrote: > Hi All, > > I am testing kafka locally, I am able to produce and consume message. But, > after consuming the message from topic I want to acknowledge. > > Looking for solution. Please revert if anyone have. > > Thanks & Regards > Rahul Singh >
How to acknowledge after consuming the message from Kafka topic?
Hi All, I am testing kafka locally, I am able to produce and consume message. But, after consuming the message from topic I want to acknowledge. Looking for solution. Please revert if anyone have. Thanks & Regards Rahul Singh
Deploying Kafka topics in a kerberized Zookeeper without superuser (in a CI flow)
I'm running Kafka 1.1.1 and Zookeeper 3.4.6 in a cluster, both guarded by Kerberos. My app stack includes a module containing topic configurations, and my continuous integration build autodeploys changes to topics with kafka-topics.sh and kafka-configs.sh. When I try to use a non-superuser principal to authenticate in the scripts, the topic metadata is created by kafka-topics.sh in Zookeeper in such a way that Kafka cannot process it to create the actual topics in Kafka brokers - partitions are not created in the broker. Also, running kafka-configs.sh to alter configs of existing topics gets "NoAuth for /configs/". When I authenticate with the superuser principal "kafka" then everything works fine. But making the "kafka" superuser credentials available in CI context seems unsecure. Is it possible to use kafka-topics.sh and kafka-configs.sh in a kerberized environment with a non-superuser Kerberos principal and how can this be made to happen? Can you suggest an alternate solution to achieve CI for Kafka topics? Best regards, Kristjan Peil