Re: Long start time for consumer

2018-05-29 Thread Shantanu Deshmukh
Thanks for your suggestion. However, this doesn't seem applicable for our
Kafka version. We are using 0.10.0.1

On Tue, May 29, 2018 at 7:04 PM Manikumar  wrote:

> Pls check "group.initial.rebalance.delay.ms" broker config property.  This
> will be the delay for the initial consumer rebalance.
>
> from docs
>
> "The rebalance will be further delayed by the value of
> group.initial.rebalance.delay.ms as new members join the group,
>  up to a maximum of max.poll.interval.ms"
>
>
> http://kafka.apache.org/documentation/#upgrade_1100_notable
>
> On Tue, May 29, 2018 at 6:51 PM, Shantanu Deshmukh 
> wrote:
>
> > No, no dynamic topic creation.
> >
> > On Tue, May 29, 2018 at 6:38 PM Jaikiran Pai 
> > wrote:
> >
> > > Are your topics dynamically created? If so, see this
> > > threadhttps://www.mail-archive.com/dev@kafka.apache.org/msg67224.html
> > >
> > > -Jaikiran
> > >
> > >
> > > On 29/05/18 5:21 PM, Shantanu Deshmukh wrote:
> > > > Hello,
> > > >
> > > > We have 3 broker Kafka 0.10.0.1 cluster. We have 5 topics, each with
> 10
> > > > partitions. I have an application which consumes from all these
> topics
> > by
> > > > creating multiple consumer processes. All of these consumers are
> under
> > a
> > > > same consumer group. I am noticing that every time we restart this
> > > > application. It takes almost 5 minutes for consumers to start
> > consuming.
> > > > What might be going wrong?
> > > >
> > >
> > >
> >
>


Re: Long start time for consumer

2018-05-29 Thread Manikumar
Pls check "group.initial.rebalance.delay.ms" broker config property.  This
will be the delay for the initial consumer rebalance.

from docs

"The rebalance will be further delayed by the value of
group.initial.rebalance.delay.ms as new members join the group,
 up to a maximum of max.poll.interval.ms"


http://kafka.apache.org/documentation/#upgrade_1100_notable

On Tue, May 29, 2018 at 6:51 PM, Shantanu Deshmukh 
wrote:

> No, no dynamic topic creation.
>
> On Tue, May 29, 2018 at 6:38 PM Jaikiran Pai 
> wrote:
>
> > Are your topics dynamically created? If so, see this
> > threadhttps://www.mail-archive.com/dev@kafka.apache.org/msg67224.html
> >
> > -Jaikiran
> >
> >
> > On 29/05/18 5:21 PM, Shantanu Deshmukh wrote:
> > > Hello,
> > >
> > > We have 3 broker Kafka 0.10.0.1 cluster. We have 5 topics, each with 10
> > > partitions. I have an application which consumes from all these topics
> by
> > > creating multiple consumer processes. All of these consumers are under
> a
> > > same consumer group. I am noticing that every time we restart this
> > > application. It takes almost 5 minutes for consumers to start
> consuming.
> > > What might be going wrong?
> > >
> >
> >
>


Re: Long start time for consumer

2018-05-29 Thread Shantanu Deshmukh
I cannot because there are messages which need high priority. Setting poll
interval to 4 second means there might be delay of 4 seconds + regular
processing time, which is not desirable.

Also, will it impact heartbeating?

On Tue, May 29, 2018 at 6:17 PM M. Manna  wrote:

> Have you tried increase the poll time higher, e.g. 4000 and see if that
> helps matters?
>
> On 29 May 2018 at 13:44, Shantanu Deshmukh  wrote:
>
> > Here is the code which consuming messages
> >
> > 
> > while(true && startShutdown == false) {
> > Context context = new Context();
> > JSONObject notifJSON = new JSONObject();
> > String notificationMsg = "";
> > NotificationEvent notifEvent = null;
> > initializeContext();
> > try {
> > consumerConnect();
> > ConsumerRecords records = consumer.poll(100);
> > if(records.count() == 0) {
> > //logger.trace("No records in topic: "+this.topic);
> > continue;
> > }
> > for(ConsumerRecord record : records) {
> > try {
> > long totalStart = System.currentTimeMillis();
> > notificationMsg = record.value();
> > JSONParser jsonParser = new JSONParser();
> > logger.trace("Kafka-Msg: >>"+notificationMsg);
> > if(notificationMsg.equals("")) {
> > continue;
> > }
> > Profiler.start(workerId, "json-parse");
> > notifJSON   =
> > (JSONObject)jsonParser.parse(notificationMsg);
> > Profiler.end(workerId, "json-parse");
> > notifEvent= new NotificationEvent(notifJSON);
> > if( notifEvent.getTransactionID().equals("") == true ) {
> > notifEvent.generateTransactionID();
> > }
> > context.setEventObject(notifEvent);
> > updateContext(context);
> >
> > // Fetch template ==//
> > Profiler.start(workerId, "tpl-fetch");
> > long start = System.currentTimeMillis();
> > Template template   =
> > notifTplMngr.fetchTemplate(notifEvent);
> >
> > logger.trace("fetch-tpl:"+(System.currentTimeMillis()-start));
> > Profiler.end(workerId, "tpl-fetch");
> >
> > // Personalise template ==//
> > Profiler.start(workerId, "personalisation");
> > start = System.currentTimeMillis();
> > String message  =
> > NotificationTemplatePersonaliser.personaliseAuto(template, notifEvent);
> >
> > notifEvent.setMaskedMessage(NotificationTemplatePersonalis
> > er.getMaskedContent(template,
> > notifEvent));
> >
> > logger.trace("personalise:"+(System.currentTimeMillis()-start));
> > Profiler.end(workerId, "personalisation");
> >
> > context.setEventObject(notifEvent);
> > updateContext(context);
> >
> > // Send notification==//
> > Profiler.start(workerId, "notif-dispatch");
> > postOffice.sendNotification(message, notifEvent);
> > Profiler.end(workerId, "notif-dispatch");
> >
> > retryCount = 0;
> > logger.debug("Time to complete notification dispatch
> > :"+(System.currentTimeMillis()-totalStart));
> > if(startShutdown == true) {
> > break;
> > }
> > } catch (Exception ex) {
> > if(ex instanceof RetriableException) {
> > kafkaLogger.error(ex);
> > logger.warn("",ex);
> > addToFailedQueue(notifJSON, ex.getMessage(),
> > CODE_RETRIABLE_FAILURE);
> > } else if(ex instanceof InvalidEventException) {
> >
> > JsonLog jsonLog = new JsonLog();
> > jsonLog.setDescription("Invalid event message.
> Reason:
> > "+ex.getMessage());
> > jsonLog.setOriginalPayload(notificationMsg);
> > jsonLog.setEventType("ERROR");
> > jsonLog.setCode("InvalidEventException");
> > jsonLog.setComponent(kafkaLogger.getSourceClass(ex));
> > jsonLog.setSubComponent(notifEvent.getChannelName());
> > kafkaLogger.log(jsonLog);
> > //kafkaLogger.error(ex);
> > addToFailedQueue(notifJSON, ex.getMessage(),
> > CODE_PERMANENT_FAILURE);
> > logger.warn("Invalid event message. Reason:
> > "+ex.getMessage());
> >
> > } else if(ex instanceof EventFailedException) {
> > addToFailedQueue(notifJSON, ex.getMessage(),
> > CODE_PERMANENT_FAILURE);
> > kafkaLogger.error(ex);
> > logger.warn("Notification event failed. Reason:
> > "+ex.get

Re: Long start time for consumer

2018-05-29 Thread Shantanu Deshmukh
No, no dynamic topic creation.

On Tue, May 29, 2018 at 6:38 PM Jaikiran Pai 
wrote:

> Are your topics dynamically created? If so, see this
> threadhttps://www.mail-archive.com/dev@kafka.apache.org/msg67224.html
>
> -Jaikiran
>
>
> On 29/05/18 5:21 PM, Shantanu Deshmukh wrote:
> > Hello,
> >
> > We have 3 broker Kafka 0.10.0.1 cluster. We have 5 topics, each with 10
> > partitions. I have an application which consumes from all these topics by
> > creating multiple consumer processes. All of these consumers are under a
> > same consumer group. I am noticing that every time we restart this
> > application. It takes almost 5 minutes for consumers to start consuming.
> > What might be going wrong?
> >
>
>


Re: Long start time for consumer

2018-05-29 Thread Jaikiran Pai
Are your topics dynamically created? If so, see this 
threadhttps://www.mail-archive.com/dev@kafka.apache.org/msg67224.html


-Jaikiran


On 29/05/18 5:21 PM, Shantanu Deshmukh wrote:

Hello,

We have 3 broker Kafka 0.10.0.1 cluster. We have 5 topics, each with 10
partitions. I have an application which consumes from all these topics by
creating multiple consumer processes. All of these consumers are under a
same consumer group. I am noticing that every time we restart this
application. It takes almost 5 minutes for consumers to start consuming.
What might be going wrong?





Re: Long start time for consumer

2018-05-29 Thread M. Manna
Have you tried increase the poll time higher, e.g. 4000 and see if that
helps matters?

On 29 May 2018 at 13:44, Shantanu Deshmukh  wrote:

> Here is the code which consuming messages
>
> 
> while(true && startShutdown == false) {
> Context context = new Context();
> JSONObject notifJSON = new JSONObject();
> String notificationMsg = "";
> NotificationEvent notifEvent = null;
> initializeContext();
> try {
> consumerConnect();
> ConsumerRecords records = consumer.poll(100);
> if(records.count() == 0) {
> //logger.trace("No records in topic: "+this.topic);
> continue;
> }
> for(ConsumerRecord record : records) {
> try {
> long totalStart = System.currentTimeMillis();
> notificationMsg = record.value();
> JSONParser jsonParser = new JSONParser();
> logger.trace("Kafka-Msg: >>"+notificationMsg);
> if(notificationMsg.equals("")) {
> continue;
> }
> Profiler.start(workerId, "json-parse");
> notifJSON   =
> (JSONObject)jsonParser.parse(notificationMsg);
> Profiler.end(workerId, "json-parse");
> notifEvent= new NotificationEvent(notifJSON);
> if( notifEvent.getTransactionID().equals("") == true ) {
> notifEvent.generateTransactionID();
> }
> context.setEventObject(notifEvent);
> updateContext(context);
>
> // Fetch template ==//
> Profiler.start(workerId, "tpl-fetch");
> long start = System.currentTimeMillis();
> Template template   =
> notifTplMngr.fetchTemplate(notifEvent);
>
> logger.trace("fetch-tpl:"+(System.currentTimeMillis()-start));
> Profiler.end(workerId, "tpl-fetch");
>
> // Personalise template ==//
> Profiler.start(workerId, "personalisation");
> start = System.currentTimeMillis();
> String message  =
> NotificationTemplatePersonaliser.personaliseAuto(template, notifEvent);
>
> notifEvent.setMaskedMessage(NotificationTemplatePersonalis
> er.getMaskedContent(template,
> notifEvent));
>
> logger.trace("personalise:"+(System.currentTimeMillis()-start));
> Profiler.end(workerId, "personalisation");
>
> context.setEventObject(notifEvent);
> updateContext(context);
>
> // Send notification==//
> Profiler.start(workerId, "notif-dispatch");
> postOffice.sendNotification(message, notifEvent);
> Profiler.end(workerId, "notif-dispatch");
>
> retryCount = 0;
> logger.debug("Time to complete notification dispatch
> :"+(System.currentTimeMillis()-totalStart));
> if(startShutdown == true) {
> break;
> }
> } catch (Exception ex) {
> if(ex instanceof RetriableException) {
> kafkaLogger.error(ex);
> logger.warn("",ex);
> addToFailedQueue(notifJSON, ex.getMessage(),
> CODE_RETRIABLE_FAILURE);
> } else if(ex instanceof InvalidEventException) {
>
> JsonLog jsonLog = new JsonLog();
> jsonLog.setDescription("Invalid event message. Reason:
> "+ex.getMessage());
> jsonLog.setOriginalPayload(notificationMsg);
> jsonLog.setEventType("ERROR");
> jsonLog.setCode("InvalidEventException");
> jsonLog.setComponent(kafkaLogger.getSourceClass(ex));
> jsonLog.setSubComponent(notifEvent.getChannelName());
> kafkaLogger.log(jsonLog);
> //kafkaLogger.error(ex);
> addToFailedQueue(notifJSON, ex.getMessage(),
> CODE_PERMANENT_FAILURE);
> logger.warn("Invalid event message. Reason:
> "+ex.getMessage());
>
> } else if(ex instanceof EventFailedException) {
> addToFailedQueue(notifJSON, ex.getMessage(),
> CODE_PERMANENT_FAILURE);
> kafkaLogger.error(ex);
> logger.warn("Notification event failed. Reason:
> "+ex.getMessage());
>
> } else if(ex instanceof
> org.json.simple.parser.ParseException) {
> kafkaLogger.error("Exception while parsing notification
> JSON message.");
> logger.warn("Exception while parsing notification JSON
> message.");
> } else {
> kafkaLogger.error(ex);
> addToFailedQueue(notifJSON, ex.getMessage(),
> CODE_PERMANENT_FAILURE);
> logger.warn

Re: Long start time for consumer

2018-05-29 Thread Shantanu Deshmukh
Here is the code which consuming messages


while(true && startShutdown == false) {
Context context = new Context();
JSONObject notifJSON = new JSONObject();
String notificationMsg = "";
NotificationEvent notifEvent = null;
initializeContext();
try {
consumerConnect();
ConsumerRecords records = consumer.poll(100);
if(records.count() == 0) {
//logger.trace("No records in topic: "+this.topic);
continue;
}
for(ConsumerRecord record : records) {
try {
long totalStart = System.currentTimeMillis();
notificationMsg = record.value();
JSONParser jsonParser = new JSONParser();
logger.trace("Kafka-Msg: >>"+notificationMsg);
if(notificationMsg.equals("")) {
continue;
}
Profiler.start(workerId, "json-parse");
notifJSON   =
(JSONObject)jsonParser.parse(notificationMsg);
Profiler.end(workerId, "json-parse");
notifEvent= new NotificationEvent(notifJSON);
if( notifEvent.getTransactionID().equals("") == true ) {
notifEvent.generateTransactionID();
}
context.setEventObject(notifEvent);
updateContext(context);

// Fetch template ==//
Profiler.start(workerId, "tpl-fetch");
long start = System.currentTimeMillis();
Template template   =
notifTplMngr.fetchTemplate(notifEvent);

logger.trace("fetch-tpl:"+(System.currentTimeMillis()-start));
Profiler.end(workerId, "tpl-fetch");

// Personalise template ==//
Profiler.start(workerId, "personalisation");
start = System.currentTimeMillis();
String message  =
NotificationTemplatePersonaliser.personaliseAuto(template, notifEvent);

notifEvent.setMaskedMessage(NotificationTemplatePersonaliser.getMaskedContent(template,
notifEvent));

logger.trace("personalise:"+(System.currentTimeMillis()-start));
Profiler.end(workerId, "personalisation");

context.setEventObject(notifEvent);
updateContext(context);

// Send notification==//
Profiler.start(workerId, "notif-dispatch");
postOffice.sendNotification(message, notifEvent);
Profiler.end(workerId, "notif-dispatch");

retryCount = 0;
logger.debug("Time to complete notification dispatch
:"+(System.currentTimeMillis()-totalStart));
if(startShutdown == true) {
break;
}
} catch (Exception ex) {
if(ex instanceof RetriableException) {
kafkaLogger.error(ex);
logger.warn("",ex);
addToFailedQueue(notifJSON, ex.getMessage(),
CODE_RETRIABLE_FAILURE);
} else if(ex instanceof InvalidEventException) {

JsonLog jsonLog = new JsonLog();
jsonLog.setDescription("Invalid event message. Reason:
"+ex.getMessage());
jsonLog.setOriginalPayload(notificationMsg);
jsonLog.setEventType("ERROR");
jsonLog.setCode("InvalidEventException");
jsonLog.setComponent(kafkaLogger.getSourceClass(ex));
jsonLog.setSubComponent(notifEvent.getChannelName());
kafkaLogger.log(jsonLog);
//kafkaLogger.error(ex);
addToFailedQueue(notifJSON, ex.getMessage(),
CODE_PERMANENT_FAILURE);
logger.warn("Invalid event message. Reason:
"+ex.getMessage());

} else if(ex instanceof EventFailedException) {
addToFailedQueue(notifJSON, ex.getMessage(),
CODE_PERMANENT_FAILURE);
kafkaLogger.error(ex);
logger.warn("Notification event failed. Reason:
"+ex.getMessage());

} else if(ex instanceof
org.json.simple.parser.ParseException) {
kafkaLogger.error("Exception while parsing notification
JSON message.");
logger.warn("Exception while parsing notification JSON
message.");
} else {
kafkaLogger.error(ex);
addToFailedQueue(notifJSON, ex.getMessage(),
CODE_PERMANENT_FAILURE);
logger.warn("",ex);
}
} finally {
eventsProcessed++;
}
}
} catch (Exception ex) {
kafkaLogger.error(ex);
addToFailedQueue(notifJSON, ex.getMessage(),
CODE_PERMANENT_FAILURE);
logger.warn("",ex);
}
}
<<

And here are server properties.

broker.id=0
port=9092
dele

Re: Long start time for consumer

2018-05-29 Thread M. Manna
Thanks..

Where is your consumer code that is consuming messages?

On 29 May 2018 at 13:18, Shantanu Deshmukh  wrote:

> No problem, here are consumer properties
> -
> auto.commit.interval.ms = 3000
> auto.offset.reset = latest
> bootstrap.servers = [x.x.x.x:9092, x.x.x.x:9092, x.x.x.x:9092]
> check.crcs = true
> client.id =
> connections.max.idle.ms = 54
> enable.auto.commit = true
> exclude.internal.topics = true
> fetch.max.bytes = 52428800
> fetch.max.wait.ms = 500
> fetch.min.bytes = 1
> group.id = otp-notifications-consumer
> heartbeat.interval.ms = 3000
> interceptor.classes = null
> key.deserializer = class
> org.apache.kafka.common.serialization.StringDeserializer
> max.partition.fetch.bytes = 1048576
> max.poll.interval.ms = 30
> max.poll.records = 5
> metadata.max.age.ms = 30
> metric.reporters = []
> metrics.num.samples = 2
> metrics.sample.window.ms = 3
> partition.assignment.strategy = [class
> org.apache.kafka.clients.consumer.RangeAssignor]
> receive.buffer.bytes = 65536
> reconnect.backoff.ms = 50
> request.timeout.ms = 305000
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.min.time.before.relogin = 6
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> sasl.kerberos.ticket.renew.window.factor = 0.8
> sasl.mechanism = GSSAPI
> security.protocol = SSL
> send.buffer.bytes = 131072
> session.timeout.ms = 30
> ssl.cipher.suites = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.endpoint.identification.algorithm = null
> ssl.key.password = null
> ssl.keymanager.algorithm = SunX509
> ssl.keystore.location = null
> ssl.keystore.password = null
> ssl.keystore.type = JKS
> ssl.protocol = TLS
> ssl.provider = null
> ssl.secure.random.implementation = null
> ssl.trustmanager.algorithm = PKIX
> ssl.truststore.location = 
> ssl.truststore.password = [hidden]
> ssl.truststore.type = JKS
> value.deserializer = class
> org.apache.kafka.common.serialization.StringDeserializer
> 
>
> On Tue, May 29, 2018 at 5:36 PM M. Manna  wrote:
>
> > Hi,
> >
> > It's not possible to answer questions based on text. You need to share
> your
> > consumer.properties, and server.properties file, and also, what exactly
> you
> > have changed from default configuration.
> >
> >
> >
> > On 29 May 2018 at 12:51, Shantanu Deshmukh 
> wrote:
> >
> > > Hello,
> > >
> > > We have 3 broker Kafka 0.10.0.1 cluster. We have 5 topics, each with 10
> > > partitions. I have an application which consumes from all these topics
> by
> > > creating multiple consumer processes. All of these consumers are under
> a
> > > same consumer group. I am noticing that every time we restart this
> > > application. It takes almost 5 minutes for consumers to start
> consuming.
> > > What might be going wrong?
> > >
> >
>


Re: Long start time for consumer

2018-05-29 Thread Shantanu Deshmukh
No problem, here are consumer properties
-
auto.commit.interval.ms = 3000
auto.offset.reset = latest
bootstrap.servers = [x.x.x.x:9092, x.x.x.x:9092, x.x.x.x:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 54
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = otp-notifications-consumer
heartbeat.interval.ms = 3000
interceptor.classes = null
key.deserializer = class
org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 30
max.poll.records = 5
metadata.max.age.ms = 30
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 3
partition.assignment.strategy = [class
org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 6
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = SSL
send.buffer.bytes = 131072
session.timeout.ms = 30
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = 
ssl.truststore.password = [hidden]
ssl.truststore.type = JKS
value.deserializer = class
org.apache.kafka.common.serialization.StringDeserializer


On Tue, May 29, 2018 at 5:36 PM M. Manna  wrote:

> Hi,
>
> It's not possible to answer questions based on text. You need to share your
> consumer.properties, and server.properties file, and also, what exactly you
> have changed from default configuration.
>
>
>
> On 29 May 2018 at 12:51, Shantanu Deshmukh  wrote:
>
> > Hello,
> >
> > We have 3 broker Kafka 0.10.0.1 cluster. We have 5 topics, each with 10
> > partitions. I have an application which consumes from all these topics by
> > creating multiple consumer processes. All of these consumers are under a
> > same consumer group. I am noticing that every time we restart this
> > application. It takes almost 5 minutes for consumers to start consuming.
> > What might be going wrong?
> >
>


Re: Long start time for consumer

2018-05-29 Thread M. Manna
Hi,

It's not possible to answer questions based on text. You need to share your
consumer.properties, and server.properties file, and also, what exactly you
have changed from default configuration.



On 29 May 2018 at 12:51, Shantanu Deshmukh  wrote:

> Hello,
>
> We have 3 broker Kafka 0.10.0.1 cluster. We have 5 topics, each with 10
> partitions. I have an application which consumes from all these topics by
> creating multiple consumer processes. All of these consumers are under a
> same consumer group. I am noticing that every time we restart this
> application. It takes almost 5 minutes for consumers to start consuming.
> What might be going wrong?
>


Long start time for consumer

2018-05-29 Thread Shantanu Deshmukh
Hello,

We have 3 broker Kafka 0.10.0.1 cluster. We have 5 topics, each with 10
partitions. I have an application which consumes from all these topics by
creating multiple consumer processes. All of these consumers are under a
same consumer group. I am noticing that every time we restart this
application. It takes almost 5 minutes for consumers to start consuming.
What might be going wrong?