Re: Long start time for consumer
Thanks for your suggestion. However, this doesn't seem applicable for our Kafka version. We are using 0.10.0.1 On Tue, May 29, 2018 at 7:04 PM Manikumar wrote: > Pls check "group.initial.rebalance.delay.ms" broker config property. This > will be the delay for the initial consumer rebalance. > > from docs > > "The rebalance will be further delayed by the value of > group.initial.rebalance.delay.ms as new members join the group, > up to a maximum of max.poll.interval.ms" > > > http://kafka.apache.org/documentation/#upgrade_1100_notable > > On Tue, May 29, 2018 at 6:51 PM, Shantanu Deshmukh > wrote: > > > No, no dynamic topic creation. > > > > On Tue, May 29, 2018 at 6:38 PM Jaikiran Pai > > wrote: > > > > > Are your topics dynamically created? If so, see this > > > threadhttps://www.mail-archive.com/dev@kafka.apache.org/msg67224.html > > > > > > -Jaikiran > > > > > > > > > On 29/05/18 5:21 PM, Shantanu Deshmukh wrote: > > > > Hello, > > > > > > > > We have 3 broker Kafka 0.10.0.1 cluster. We have 5 topics, each with > 10 > > > > partitions. I have an application which consumes from all these > topics > > by > > > > creating multiple consumer processes. All of these consumers are > under > > a > > > > same consumer group. I am noticing that every time we restart this > > > > application. It takes almost 5 minutes for consumers to start > > consuming. > > > > What might be going wrong? > > > > > > > > > > > > >
Re: Long start time for consumer
Pls check "group.initial.rebalance.delay.ms" broker config property. This will be the delay for the initial consumer rebalance. from docs "The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms" http://kafka.apache.org/documentation/#upgrade_1100_notable On Tue, May 29, 2018 at 6:51 PM, Shantanu Deshmukh wrote: > No, no dynamic topic creation. > > On Tue, May 29, 2018 at 6:38 PM Jaikiran Pai > wrote: > > > Are your topics dynamically created? If so, see this > > threadhttps://www.mail-archive.com/dev@kafka.apache.org/msg67224.html > > > > -Jaikiran > > > > > > On 29/05/18 5:21 PM, Shantanu Deshmukh wrote: > > > Hello, > > > > > > We have 3 broker Kafka 0.10.0.1 cluster. We have 5 topics, each with 10 > > > partitions. I have an application which consumes from all these topics > by > > > creating multiple consumer processes. All of these consumers are under > a > > > same consumer group. I am noticing that every time we restart this > > > application. It takes almost 5 minutes for consumers to start > consuming. > > > What might be going wrong? > > > > > > > >
Re: Long start time for consumer
I cannot because there are messages which need high priority. Setting poll interval to 4 second means there might be delay of 4 seconds + regular processing time, which is not desirable. Also, will it impact heartbeating? On Tue, May 29, 2018 at 6:17 PM M. Manna wrote: > Have you tried increase the poll time higher, e.g. 4000 and see if that > helps matters? > > On 29 May 2018 at 13:44, Shantanu Deshmukh wrote: > > > Here is the code which consuming messages > > > > > > while(true && startShutdown == false) { > > Context context = new Context(); > > JSONObject notifJSON = new JSONObject(); > > String notificationMsg = ""; > > NotificationEvent notifEvent = null; > > initializeContext(); > > try { > > consumerConnect(); > > ConsumerRecords records = consumer.poll(100); > > if(records.count() == 0) { > > //logger.trace("No records in topic: "+this.topic); > > continue; > > } > > for(ConsumerRecord record : records) { > > try { > > long totalStart = System.currentTimeMillis(); > > notificationMsg = record.value(); > > JSONParser jsonParser = new JSONParser(); > > logger.trace("Kafka-Msg: >>"+notificationMsg); > > if(notificationMsg.equals("")) { > > continue; > > } > > Profiler.start(workerId, "json-parse"); > > notifJSON = > > (JSONObject)jsonParser.parse(notificationMsg); > > Profiler.end(workerId, "json-parse"); > > notifEvent= new NotificationEvent(notifJSON); > > if( notifEvent.getTransactionID().equals("") == true ) { > > notifEvent.generateTransactionID(); > > } > > context.setEventObject(notifEvent); > > updateContext(context); > > > > // Fetch template ==// > > Profiler.start(workerId, "tpl-fetch"); > > long start = System.currentTimeMillis(); > > Template template = > > notifTplMngr.fetchTemplate(notifEvent); > > > > logger.trace("fetch-tpl:"+(System.currentTimeMillis()-start)); > > Profiler.end(workerId, "tpl-fetch"); > > > > // Personalise template ==// > > Profiler.start(workerId, "personalisation"); > > start = System.currentTimeMillis(); > > String message = > > NotificationTemplatePersonaliser.personaliseAuto(template, notifEvent); > > > > notifEvent.setMaskedMessage(NotificationTemplatePersonalis > > er.getMaskedContent(template, > > notifEvent)); > > > > logger.trace("personalise:"+(System.currentTimeMillis()-start)); > > Profiler.end(workerId, "personalisation"); > > > > context.setEventObject(notifEvent); > > updateContext(context); > > > > // Send notification==// > > Profiler.start(workerId, "notif-dispatch"); > > postOffice.sendNotification(message, notifEvent); > > Profiler.end(workerId, "notif-dispatch"); > > > > retryCount = 0; > > logger.debug("Time to complete notification dispatch > > :"+(System.currentTimeMillis()-totalStart)); > > if(startShutdown == true) { > > break; > > } > > } catch (Exception ex) { > > if(ex instanceof RetriableException) { > > kafkaLogger.error(ex); > > logger.warn("",ex); > > addToFailedQueue(notifJSON, ex.getMessage(), > > CODE_RETRIABLE_FAILURE); > > } else if(ex instanceof InvalidEventException) { > > > > JsonLog jsonLog = new JsonLog(); > > jsonLog.setDescription("Invalid event message. > Reason: > > "+ex.getMessage()); > > jsonLog.setOriginalPayload(notificationMsg); > > jsonLog.setEventType("ERROR"); > > jsonLog.setCode("InvalidEventException"); > > jsonLog.setComponent(kafkaLogger.getSourceClass(ex)); > > jsonLog.setSubComponent(notifEvent.getChannelName()); > > kafkaLogger.log(jsonLog); > > //kafkaLogger.error(ex); > > addToFailedQueue(notifJSON, ex.getMessage(), > > CODE_PERMANENT_FAILURE); > > logger.warn("Invalid event message. Reason: > > "+ex.getMessage()); > > > > } else if(ex instanceof EventFailedException) { > > addToFailedQueue(notifJSON, ex.getMessage(), > > CODE_PERMANENT_FAILURE); > > kafkaLogger.error(ex); > > logger.warn("Notification event failed. Reason: > > "+ex.get
Re: Long start time for consumer
No, no dynamic topic creation. On Tue, May 29, 2018 at 6:38 PM Jaikiran Pai wrote: > Are your topics dynamically created? If so, see this > threadhttps://www.mail-archive.com/dev@kafka.apache.org/msg67224.html > > -Jaikiran > > > On 29/05/18 5:21 PM, Shantanu Deshmukh wrote: > > Hello, > > > > We have 3 broker Kafka 0.10.0.1 cluster. We have 5 topics, each with 10 > > partitions. I have an application which consumes from all these topics by > > creating multiple consumer processes. All of these consumers are under a > > same consumer group. I am noticing that every time we restart this > > application. It takes almost 5 minutes for consumers to start consuming. > > What might be going wrong? > > > >
Re: Long start time for consumer
Are your topics dynamically created? If so, see this threadhttps://www.mail-archive.com/dev@kafka.apache.org/msg67224.html -Jaikiran On 29/05/18 5:21 PM, Shantanu Deshmukh wrote: Hello, We have 3 broker Kafka 0.10.0.1 cluster. We have 5 topics, each with 10 partitions. I have an application which consumes from all these topics by creating multiple consumer processes. All of these consumers are under a same consumer group. I am noticing that every time we restart this application. It takes almost 5 minutes for consumers to start consuming. What might be going wrong?
Re: Long start time for consumer
Have you tried increase the poll time higher, e.g. 4000 and see if that helps matters? On 29 May 2018 at 13:44, Shantanu Deshmukh wrote: > Here is the code which consuming messages > > > while(true && startShutdown == false) { > Context context = new Context(); > JSONObject notifJSON = new JSONObject(); > String notificationMsg = ""; > NotificationEvent notifEvent = null; > initializeContext(); > try { > consumerConnect(); > ConsumerRecords records = consumer.poll(100); > if(records.count() == 0) { > //logger.trace("No records in topic: "+this.topic); > continue; > } > for(ConsumerRecord record : records) { > try { > long totalStart = System.currentTimeMillis(); > notificationMsg = record.value(); > JSONParser jsonParser = new JSONParser(); > logger.trace("Kafka-Msg: >>"+notificationMsg); > if(notificationMsg.equals("")) { > continue; > } > Profiler.start(workerId, "json-parse"); > notifJSON = > (JSONObject)jsonParser.parse(notificationMsg); > Profiler.end(workerId, "json-parse"); > notifEvent= new NotificationEvent(notifJSON); > if( notifEvent.getTransactionID().equals("") == true ) { > notifEvent.generateTransactionID(); > } > context.setEventObject(notifEvent); > updateContext(context); > > // Fetch template ==// > Profiler.start(workerId, "tpl-fetch"); > long start = System.currentTimeMillis(); > Template template = > notifTplMngr.fetchTemplate(notifEvent); > > logger.trace("fetch-tpl:"+(System.currentTimeMillis()-start)); > Profiler.end(workerId, "tpl-fetch"); > > // Personalise template ==// > Profiler.start(workerId, "personalisation"); > start = System.currentTimeMillis(); > String message = > NotificationTemplatePersonaliser.personaliseAuto(template, notifEvent); > > notifEvent.setMaskedMessage(NotificationTemplatePersonalis > er.getMaskedContent(template, > notifEvent)); > > logger.trace("personalise:"+(System.currentTimeMillis()-start)); > Profiler.end(workerId, "personalisation"); > > context.setEventObject(notifEvent); > updateContext(context); > > // Send notification==// > Profiler.start(workerId, "notif-dispatch"); > postOffice.sendNotification(message, notifEvent); > Profiler.end(workerId, "notif-dispatch"); > > retryCount = 0; > logger.debug("Time to complete notification dispatch > :"+(System.currentTimeMillis()-totalStart)); > if(startShutdown == true) { > break; > } > } catch (Exception ex) { > if(ex instanceof RetriableException) { > kafkaLogger.error(ex); > logger.warn("",ex); > addToFailedQueue(notifJSON, ex.getMessage(), > CODE_RETRIABLE_FAILURE); > } else if(ex instanceof InvalidEventException) { > > JsonLog jsonLog = new JsonLog(); > jsonLog.setDescription("Invalid event message. Reason: > "+ex.getMessage()); > jsonLog.setOriginalPayload(notificationMsg); > jsonLog.setEventType("ERROR"); > jsonLog.setCode("InvalidEventException"); > jsonLog.setComponent(kafkaLogger.getSourceClass(ex)); > jsonLog.setSubComponent(notifEvent.getChannelName()); > kafkaLogger.log(jsonLog); > //kafkaLogger.error(ex); > addToFailedQueue(notifJSON, ex.getMessage(), > CODE_PERMANENT_FAILURE); > logger.warn("Invalid event message. Reason: > "+ex.getMessage()); > > } else if(ex instanceof EventFailedException) { > addToFailedQueue(notifJSON, ex.getMessage(), > CODE_PERMANENT_FAILURE); > kafkaLogger.error(ex); > logger.warn("Notification event failed. Reason: > "+ex.getMessage()); > > } else if(ex instanceof > org.json.simple.parser.ParseException) { > kafkaLogger.error("Exception while parsing notification > JSON message."); > logger.warn("Exception while parsing notification JSON > message."); > } else { > kafkaLogger.error(ex); > addToFailedQueue(notifJSON, ex.getMessage(), > CODE_PERMANENT_FAILURE); > logger.warn
Re: Long start time for consumer
Here is the code which consuming messages while(true && startShutdown == false) { Context context = new Context(); JSONObject notifJSON = new JSONObject(); String notificationMsg = ""; NotificationEvent notifEvent = null; initializeContext(); try { consumerConnect(); ConsumerRecords records = consumer.poll(100); if(records.count() == 0) { //logger.trace("No records in topic: "+this.topic); continue; } for(ConsumerRecord record : records) { try { long totalStart = System.currentTimeMillis(); notificationMsg = record.value(); JSONParser jsonParser = new JSONParser(); logger.trace("Kafka-Msg: >>"+notificationMsg); if(notificationMsg.equals("")) { continue; } Profiler.start(workerId, "json-parse"); notifJSON = (JSONObject)jsonParser.parse(notificationMsg); Profiler.end(workerId, "json-parse"); notifEvent= new NotificationEvent(notifJSON); if( notifEvent.getTransactionID().equals("") == true ) { notifEvent.generateTransactionID(); } context.setEventObject(notifEvent); updateContext(context); // Fetch template ==// Profiler.start(workerId, "tpl-fetch"); long start = System.currentTimeMillis(); Template template = notifTplMngr.fetchTemplate(notifEvent); logger.trace("fetch-tpl:"+(System.currentTimeMillis()-start)); Profiler.end(workerId, "tpl-fetch"); // Personalise template ==// Profiler.start(workerId, "personalisation"); start = System.currentTimeMillis(); String message = NotificationTemplatePersonaliser.personaliseAuto(template, notifEvent); notifEvent.setMaskedMessage(NotificationTemplatePersonaliser.getMaskedContent(template, notifEvent)); logger.trace("personalise:"+(System.currentTimeMillis()-start)); Profiler.end(workerId, "personalisation"); context.setEventObject(notifEvent); updateContext(context); // Send notification==// Profiler.start(workerId, "notif-dispatch"); postOffice.sendNotification(message, notifEvent); Profiler.end(workerId, "notif-dispatch"); retryCount = 0; logger.debug("Time to complete notification dispatch :"+(System.currentTimeMillis()-totalStart)); if(startShutdown == true) { break; } } catch (Exception ex) { if(ex instanceof RetriableException) { kafkaLogger.error(ex); logger.warn("",ex); addToFailedQueue(notifJSON, ex.getMessage(), CODE_RETRIABLE_FAILURE); } else if(ex instanceof InvalidEventException) { JsonLog jsonLog = new JsonLog(); jsonLog.setDescription("Invalid event message. Reason: "+ex.getMessage()); jsonLog.setOriginalPayload(notificationMsg); jsonLog.setEventType("ERROR"); jsonLog.setCode("InvalidEventException"); jsonLog.setComponent(kafkaLogger.getSourceClass(ex)); jsonLog.setSubComponent(notifEvent.getChannelName()); kafkaLogger.log(jsonLog); //kafkaLogger.error(ex); addToFailedQueue(notifJSON, ex.getMessage(), CODE_PERMANENT_FAILURE); logger.warn("Invalid event message. Reason: "+ex.getMessage()); } else if(ex instanceof EventFailedException) { addToFailedQueue(notifJSON, ex.getMessage(), CODE_PERMANENT_FAILURE); kafkaLogger.error(ex); logger.warn("Notification event failed. Reason: "+ex.getMessage()); } else if(ex instanceof org.json.simple.parser.ParseException) { kafkaLogger.error("Exception while parsing notification JSON message."); logger.warn("Exception while parsing notification JSON message."); } else { kafkaLogger.error(ex); addToFailedQueue(notifJSON, ex.getMessage(), CODE_PERMANENT_FAILURE); logger.warn("",ex); } } finally { eventsProcessed++; } } } catch (Exception ex) { kafkaLogger.error(ex); addToFailedQueue(notifJSON, ex.getMessage(), CODE_PERMANENT_FAILURE); logger.warn("",ex); } } << And here are server properties. broker.id=0 port=9092 dele
Re: Long start time for consumer
Thanks.. Where is your consumer code that is consuming messages? On 29 May 2018 at 13:18, Shantanu Deshmukh wrote: > No problem, here are consumer properties > - > auto.commit.interval.ms = 3000 > auto.offset.reset = latest > bootstrap.servers = [x.x.x.x:9092, x.x.x.x:9092, x.x.x.x:9092] > check.crcs = true > client.id = > connections.max.idle.ms = 54 > enable.auto.commit = true > exclude.internal.topics = true > fetch.max.bytes = 52428800 > fetch.max.wait.ms = 500 > fetch.min.bytes = 1 > group.id = otp-notifications-consumer > heartbeat.interval.ms = 3000 > interceptor.classes = null > key.deserializer = class > org.apache.kafka.common.serialization.StringDeserializer > max.partition.fetch.bytes = 1048576 > max.poll.interval.ms = 30 > max.poll.records = 5 > metadata.max.age.ms = 30 > metric.reporters = [] > metrics.num.samples = 2 > metrics.sample.window.ms = 3 > partition.assignment.strategy = [class > org.apache.kafka.clients.consumer.RangeAssignor] > receive.buffer.bytes = 65536 > reconnect.backoff.ms = 50 > request.timeout.ms = 305000 > retry.backoff.ms = 100 > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 6 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.mechanism = GSSAPI > security.protocol = SSL > send.buffer.bytes = 131072 > session.timeout.ms = 30 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.endpoint.identification.algorithm = null > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.location = null > ssl.keystore.password = null > ssl.keystore.type = JKS > ssl.protocol = TLS > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.location = > ssl.truststore.password = [hidden] > ssl.truststore.type = JKS > value.deserializer = class > org.apache.kafka.common.serialization.StringDeserializer > > > On Tue, May 29, 2018 at 5:36 PM M. Manna wrote: > > > Hi, > > > > It's not possible to answer questions based on text. You need to share > your > > consumer.properties, and server.properties file, and also, what exactly > you > > have changed from default configuration. > > > > > > > > On 29 May 2018 at 12:51, Shantanu Deshmukh > wrote: > > > > > Hello, > > > > > > We have 3 broker Kafka 0.10.0.1 cluster. We have 5 topics, each with 10 > > > partitions. I have an application which consumes from all these topics > by > > > creating multiple consumer processes. All of these consumers are under > a > > > same consumer group. I am noticing that every time we restart this > > > application. It takes almost 5 minutes for consumers to start > consuming. > > > What might be going wrong? > > > > > >
Re: Long start time for consumer
No problem, here are consumer properties - auto.commit.interval.ms = 3000 auto.offset.reset = latest bootstrap.servers = [x.x.x.x:9092, x.x.x.x:9092, x.x.x.x:9092] check.crcs = true client.id = connections.max.idle.ms = 54 enable.auto.commit = true exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = otp-notifications-consumer heartbeat.interval.ms = 3000 interceptor.classes = null key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 30 max.poll.records = 5 metadata.max.age.ms = 30 metric.reporters = [] metrics.num.samples = 2 metrics.sample.window.ms = 3 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.ms = 50 request.timeout.ms = 305000 retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 6 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = SSL send.buffer.bytes = 131072 session.timeout.ms = 30 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = ssl.truststore.password = [hidden] ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer On Tue, May 29, 2018 at 5:36 PM M. Manna wrote: > Hi, > > It's not possible to answer questions based on text. You need to share your > consumer.properties, and server.properties file, and also, what exactly you > have changed from default configuration. > > > > On 29 May 2018 at 12:51, Shantanu Deshmukh wrote: > > > Hello, > > > > We have 3 broker Kafka 0.10.0.1 cluster. We have 5 topics, each with 10 > > partitions. I have an application which consumes from all these topics by > > creating multiple consumer processes. All of these consumers are under a > > same consumer group. I am noticing that every time we restart this > > application. It takes almost 5 minutes for consumers to start consuming. > > What might be going wrong? > > >
Re: Long start time for consumer
Hi, It's not possible to answer questions based on text. You need to share your consumer.properties, and server.properties file, and also, what exactly you have changed from default configuration. On 29 May 2018 at 12:51, Shantanu Deshmukh wrote: > Hello, > > We have 3 broker Kafka 0.10.0.1 cluster. We have 5 topics, each with 10 > partitions. I have an application which consumes from all these topics by > creating multiple consumer processes. All of these consumers are under a > same consumer group. I am noticing that every time we restart this > application. It takes almost 5 minutes for consumers to start consuming. > What might be going wrong? >
Long start time for consumer
Hello, We have 3 broker Kafka 0.10.0.1 cluster. We have 5 topics, each with 10 partitions. I have an application which consumes from all these topics by creating multiple consumer processes. All of these consumers are under a same consumer group. I am noticing that every time we restart this application. It takes almost 5 minutes for consumers to start consuming. What might be going wrong?