Here is the code which consuming messages
>>>>>>>>
while(true && startShutdown == false) {
Context context = new Context();
JSONObject notifJSON = new JSONObject();
String notificationMsg = "";
NotificationEvent notifEvent = null;
initializeContext();
try {
consumerConnect();
ConsumerRecords<String, String> records = consumer.poll(100);
if(records.count() == 0) {
//logger.trace("No records in topic: "+this.topic);
continue;
}
for(ConsumerRecord<String, String> record : records) {
try {
long totalStart = System.currentTimeMillis();
notificationMsg = record.value();
JSONParser jsonParser = new JSONParser();
logger.trace("Kafka-Msg: >>"+notificationMsg);
if(notificationMsg.equals("")) {
continue;
}
Profiler.start(workerId, "json-parse");
notifJSON =
(JSONObject)jsonParser.parse(notificationMsg);
Profiler.end(workerId, "json-parse");
notifEvent = new NotificationEvent(notifJSON);
if( notifEvent.getTransactionID().equals("") == true ) {
notifEvent.generateTransactionID();
}
context.setEventObject(notifEvent);
updateContext(context);
//======== Fetch template ==========//
Profiler.start(workerId, "tpl-fetch");
long start = System.currentTimeMillis();
Template template =
notifTplMngr.fetchTemplate(notifEvent);
logger.trace("fetch-tpl:"+(System.currentTimeMillis()-start));
Profiler.end(workerId, "tpl-fetch");
//======== Personalise template ==========//
Profiler.start(workerId, "personalisation");
start = System.currentTimeMillis();
String message =
NotificationTemplatePersonaliser.personaliseAuto(template, notifEvent);
notifEvent.setMaskedMessage(NotificationTemplatePersonaliser.getMaskedContent(template,
notifEvent));
logger.trace("personalise:"+(System.currentTimeMillis()-start));
Profiler.end(workerId, "personalisation");
context.setEventObject(notifEvent);
updateContext(context);
//======== Send notification==========//
Profiler.start(workerId, "notif-dispatch");
postOffice.sendNotification(message, notifEvent);
Profiler.end(workerId, "notif-dispatch");
retryCount = 0;
logger.debug("Time to complete notification dispatch
:"+(System.currentTimeMillis()-totalStart));
if(startShutdown == true) {
break;
}
} catch (Exception ex) {
if(ex instanceof RetriableException) {
kafkaLogger.error(ex);
logger.warn("",ex);
addToFailedQueue(notifJSON, ex.getMessage(),
CODE_RETRIABLE_FAILURE);
} else if(ex instanceof InvalidEventException) {
JsonLog jsonLog = new JsonLog();
jsonLog.setDescription("Invalid event message. Reason:
"+ex.getMessage());
jsonLog.setOriginalPayload(notificationMsg);
jsonLog.setEventType("ERROR");
jsonLog.setCode("InvalidEventException");
jsonLog.setComponent(kafkaLogger.getSourceClass(ex));
jsonLog.setSubComponent(notifEvent.getChannelName());
kafkaLogger.log(jsonLog);
//kafkaLogger.error(ex);
addToFailedQueue(notifJSON, ex.getMessage(),
CODE_PERMANENT_FAILURE);
logger.warn("Invalid event message. Reason:
"+ex.getMessage());
} else if(ex instanceof EventFailedException) {
addToFailedQueue(notifJSON, ex.getMessage(),
CODE_PERMANENT_FAILURE);
kafkaLogger.error(ex);
logger.warn("Notification event failed. Reason:
"+ex.getMessage());
} else if(ex instanceof
org.json.simple.parser.ParseException) {
kafkaLogger.error("Exception while parsing notification
JSON message.");
logger.warn("Exception while parsing notification JSON
message.");
} else {
kafkaLogger.error(ex);
addToFailedQueue(notifJSON, ex.getMessage(),
CODE_PERMANENT_FAILURE);
logger.warn("",ex);
}
} finally {
eventsProcessed++;
}
}
} catch (Exception ex) {
kafkaLogger.error(ex);
addToFailedQueue(notifJSON, ex.getMessage(),
CODE_PERMANENT_FAILURE);
logger.warn("",ex);
}
}
<<<<<<<<<<
And here are server properties.
broker.id=0
port=9092
delete.topic.enable=true
message.max.bytes=1500000
listeners=SSL://x.x.x.x:9092
advertised.listeners=SSL://x.x.x.x:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/lotus/kafka-logs
num.partitions=3
auto.topic.creation.enable=false
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
ssl.keystore.location=/opt/kafka/certificates/kafka.keystore.jks
ssl.keystore.password=xxxx
ssl.key.password=xxxx
ssl.truststore.location=/opt/kafka/certificates/kafka.truststore.jks
ssl.truststore.password=xxxx
security.inter.broker.protocol=SSL
zookeeper.connect=x.x.x.x:2181,x.x.x.x:2181,x.x.x.x:2181
zookeeper.connection.timeout.ms=6000
On Tue, May 29, 2018 at 5:59 PM M. Manna <[email protected]> wrote:
> Thanks..
>
> Where is your consumer code that is consuming messages?
>
> On 29 May 2018 at 13:18, Shantanu Deshmukh <[email protected]> wrote:
>
> > No problem, here are consumer properties
> > ---------
> > auto.commit.interval.ms = 3000
> > auto.offset.reset = latest
> > bootstrap.servers = [x.x.x.x:9092, x.x.x.x:9092, x.x.x.x:9092]
> > check.crcs = true
> > client.id =
> > connections.max.idle.ms = 540000
> > enable.auto.commit = true
> > exclude.internal.topics = true
> > fetch.max.bytes = 52428800
> > fetch.max.wait.ms = 500
> > fetch.min.bytes = 1
> > group.id = otp-notifications-consumer
> > heartbeat.interval.ms = 3000
> > interceptor.classes = null
> > key.deserializer = class
> > org.apache.kafka.common.serialization.StringDeserializer
> > max.partition.fetch.bytes = 1048576
> > max.poll.interval.ms = 300000
> > max.poll.records = 5
> > metadata.max.age.ms = 300000
> > metric.reporters = []
> > metrics.num.samples = 2
> > metrics.sample.window.ms = 30000
> > partition.assignment.strategy = [class
> > org.apache.kafka.clients.consumer.RangeAssignor]
> > receive.buffer.bytes = 65536
> > reconnect.backoff.ms = 50
> > request.timeout.ms = 305000
> > retry.backoff.ms = 100
> > sasl.kerberos.kinit.cmd = /usr/bin/kinit
> > sasl.kerberos.min.time.before.relogin = 60000
> > sasl.kerberos.service.name = null
> > sasl.kerberos.ticket.renew.jitter = 0.05
> > sasl.kerberos.ticket.renew.window.factor = 0.8
> > sasl.mechanism = GSSAPI
> > security.protocol = SSL
> > send.buffer.bytes = 131072
> > session.timeout.ms = 300000
> > ssl.cipher.suites = null
> > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> > ssl.endpoint.identification.algorithm = null
> > ssl.key.password = null
> > ssl.keymanager.algorithm = SunX509
> > ssl.keystore.location = null
> > ssl.keystore.password = null
> > ssl.keystore.type = JKS
> > ssl.protocol = TLS
> > ssl.provider = null
> > ssl.secure.random.implementation = null
> > ssl.trustmanager.algorithm = PKIX
> > ssl.truststore.location = ****
> > ssl.truststore.password = [hidden]
> > ssl.truststore.type = JKS
> > value.deserializer = class
> > org.apache.kafka.common.serialization.StringDeserializer
> > ------------
> >
> > On Tue, May 29, 2018 at 5:36 PM M. Manna <[email protected]> wrote:
> >
> > > Hi,
> > >
> > > It's not possible to answer questions based on text. You need to share
> > your
> > > consumer.properties, and server.properties file, and also, what exactly
> > you
> > > have changed from default configuration.
> > >
> > >
> > >
> > > On 29 May 2018 at 12:51, Shantanu Deshmukh <[email protected]>
> > wrote:
> > >
> > > > Hello,
> > > >
> > > > We have 3 broker Kafka 0.10.0.1 cluster. We have 5 topics, each with
> 10
> > > > partitions. I have an application which consumes from all these
> topics
> > by
> > > > creating multiple consumer processes. All of these consumers are
> under
> > a
> > > > same consumer group. I am noticing that every time we restart this
> > > > application. It takes almost 5 minutes for consumers to start
> > consuming.
> > > > What might be going wrong?
> > > >
> > >
> >
>