Hi all,
The below are the two implementation logs : 1) old kafka broker with old
kafka client version (0.8 etc)
2) new kafka brokers (3.6.1) with new kafka client version (3.6.1) .
The old does not have authentication but new kafka we are using
authentication..we went to new one for security and robustness.
*Old kafka brokers with old kafka api implementation: Here we are using *
2025-01-16 08:46:12,380 INFO
(com.cisco.cr.kafka.controller.EventsController:98) [http-nio-8080-exec-8]
- Consume process begin...
2025-01-16 08:46:12,380 INFO
(com.test.cr.kafka.services.NextgenConsumerDriver:82)
[http-nio-8080-exec-8] - Validating request...
2025-01-16 08:46:12,387 INFO
(com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-8]
- Latest offset in Kafka for topic: crt_org_ce_ts1crt & partition: 0 is
16136630 and in user input is 16130077
2025-01-16 08:46:12,387 INFO
(com.test.cr.kafka.services.TestKafkaConsumer:117) [http-nio-8080-exec-8]
- Earliest offset in Kafka for topic: crt_org_ce_ts1crt & partition: 0 is
16129521 and in user input is 16130077
2025-01-16 08:46:12,415 INFO
(com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
- Total size of messages read from offset 16130077 - 1043773 KB.
2025-01-16 08:46:12,502 INFO
(com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
- Total size of messages read from offset 16130372 - 1043496 KB.
2025-01-16 08:46:12,543 INFO
(com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
- Total size of messages read from offset 16130670 - 1044428 KB.
2025-01-16 08:46:12,587 INFO
(com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
- Total size of messages read from offset 16130968 - 1044939 KB.
2025-01-16 08:46:12,622 INFO
(com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
- Total size of messages read from offset 16131265 - 1045057 KB.
2025-01-16 08:46:12,650 INFO
(com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
- Total size of messages read from offset 16131567 - 1042968 KB.
2025-01-16 08:46:12,676 INFO
(com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
- Total size of messages read from offset 16131863 - 1043547 KB.
2025-01-16 08:46:12,698 INFO
(com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
- Total size of messages read from offset 16132161 - 1045497 KB.
2025-01-16 08:46:12,723 INFO
(com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
- Total size of messages read from offset 16132459 - 1043030 KB.
2025-01-16 08:46:12,746 INFO
(com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
- Total size of messages read from offset 16132755 - 1045957 KB.
2025-01-16 08:46:12,772 INFO
(com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
- Total size of messages read from offset 16133054 - 1043704 KB.
2025-01-16 08:46:12,789 INFO
(com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
- Total size of messages read from offset 16133350 - 533178 KB.
2025-01-16 08:46:12,805 INFO
(com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
- Total size of messages read from offset 16133499 - 1042816 KB.
2025-01-16 08:46:12,832 INFO
(com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
- Total size of messages read from offset 16133797 - 1045895 KB.
2025-01-16 08:46:12,859 INFO
(com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
- Total size of messages read from offset 16134093 - 1044584 KB.
2025-01-16 08:46:12,884 INFO
(com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
- Total size of messages read from offset 16134398 - 1045775 KB.
2025-01-16 08:46:12,906 INFO
(com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
- Total size of messages read from offset 16134702 - 1043018 KB.
2025-01-16 08:46:12,930 INFO
(com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
- Total size of messages read from offset 16134999 - 1045228 KB.
2025-01-16 08:46:12,952 INFO
(com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
- Total size of messages read from offset 16135290 - 1044921 KB.
2025-01-16 08:46:12,979 INFO
(com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
- Total size of messages read from offset 16135579 - 1046197 KB.
2025-01-16 08:46:13,002 INFO
(com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
- Total size of messages read from offset 16135876 - 1042978 KB.
2025-01-16 08:46:13,029 INFO
(com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
- Total size of messages read from offset 16136172 - 1045985 KB.
2025-01-16 08:46:13,050 INFO
(com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
- Total size of messages read from offset 16136463 - 600146 KB.
2025-01-16 08:46:13,057 INFO
(com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
- Total size of messages read from offset 16136630 - 0 KB.
2025-01-16 08:46:14,059 INFO
(com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
- Total size of messages read from offset 16136630 - 0 KB.
2025-01-16 08:46:15,061 INFO
(com.test.cr.kafka.services.TestKafkaConsumer:167) [http-nio-8080-exec-8]
- Total size of messages read from offset 16136630 - 0 KB.
2025-01-16 08:46:15,061 INFO
(com.test.cr.kafka.services.TestKafkaConsumer:221) [http-nio-8080-exec-8]
- No messages were found in 3 successive fetches. Stopping the consume
process here.
2025-01-16 08:46:15,061 INFO
(com.test.cr.kafka.services.TestKafkaConsumer:242) [http-nio-8080-exec-8]
- Closing consumer... crt_consumer_crt_org_ce_ts1crt_0_Webex-Webex
2025-01-16 08:46:15,061 INFO
(com.test.cr.kafka.services.TestKafkaConsumer:249) [http-nio-8080-exec-8]
- Summary ::: Total number of messages read from offset 16130077 to
16136630 are - 6553. Count of filtered results - 54
2025-01-16 08:46:15,081 INFO
(com.cisco.cr.kafka.controller.EventsController:154)
[http-nio-8080-exec-8] - Consume process end.
old code: In old code we were using
FetchRequest,FetchResponse,SimpleConsumer without authentication and this
is low level API
public class ChangeEventsKafkaConsumer{
private CEConsumeResponse ceResponse;
public ChangeEventsKafkaConsumer() {
super();
}
public ChangeEventsKafkaConsumer(ConfigBean confBean,
CEConsumeResponse res, long offset) {
super(confBean);
this.ceResponse = res;
this.offset = offset;
}
@Override
public long readMessages(long a_maxRead, String a_topic, int a_partition,
List<String> ng_replicaBrokers, int a_port) throws CustomException {
return 0l;
}
public List<JSONObject> consume(long offset, String a_topic,
int a_partition, List<String> ng_replicaBrokers, int a_port,
CEConsumeRequest inputReq) throws CustomException {
List<JSONObject> msglist = new ArrayList<JSONObject>();
int waitTime = Integer.valueOf(Config
.getConsumerPropValue("pull.wait.time"));
int limit = Integer.valueOf(Config.getConsumerPropValue("pull.size.limit"));
int fetchSize = Integer.valueOf(Config
.getConsumerPropValue("pull.each.fetch.size"));
int emptyLoopLimit = Integer.valueOf(Config
.getConsumerPropValue("pull.empty.loop.limit"));
/*
* Fetching Metadata.
*/
PartitionMetadata metadata = findLeader(ng_replicaBrokers, a_port,
a_topic, a_partition);
if (metadata == null) {
log.error("Can't find metadata for Topic and Partition.");
throw new CustomException(
"Can't find metadata for Topic and Partition.");
}
if (metadata.leader() == null) {
log.error("Can't find Leader for Topic and Partition.");
throw new CustomException(
"Can't find Leader for Topic and Partition.");
}
/*
* Getting lead broker.
*/
String leadBroker = metadata.leader().host();
log.debug("Lead broker for Partion : {}, Topic :{} is {}", a_partition,
a_topic, leadBroker);
SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port,
configBean.getConnectionTimeOut(),
configBean.getKafkaBufferSize(), clientName);
/*
* Fetching kafka offset from Kafka
*/
long kafkaOffset = getLastOffset(consumer, a_topic, a_partition,
kafka.api.OffsetRequest.LatestTime(), clientName);
/*
* Getting earliest available offset in kafka.
*/
long kafkaEarliestOffset = getLastOffset(consumer, a_topic,
a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName);
/*
* User input offset.
*/
long readOffset = offset;
log.info(
"Latest offset in Kafka for topic: {} & partition: {} is {} and in user
input is {}",
a_topic, a_partition, kafkaOffset, readOffset);
log.info(
"Earliest offset in Kafka for topic: {} & partition: {} is {} and in user
input is {}",
a_topic, a_partition, kafkaEarliestOffset, readOffset);
if (readOffset == 0 && readOffset < kafkaEarliestOffset) {
log.warn("Resetting the offset to earliest available offset in kafka.");
readOffset = kafkaEarliestOffset;
}
boolean end = false;
long startTime = Calendar.getInstance().getTimeInMillis();
long endTime = Calendar.getInstance().getTimeInMillis();
int emptyFetchCount = 0;
do {
FetchRequest req = new FetchRequestBuilder().clientId(clientName)
.addFetch(a_topic, a_partition, readOffset, fetchSize)
.build();
FetchResponse fetchResponse = consumer.fetch(req);
/*
* Reading data from kafka response.
*/
JSONObject obj = null;
ByteBufferMessageSet set = fetchResponse.messageSet(a_topic,
a_partition);
log.info("Total size of messages read from offset {} - {} KB.",
readOffset, set.sizeInBytes());
for (MessageAndOffset messageAndOffset : set) {
String message = null;
if (messageAndOffset.offset() < readOffset) {
log.warn("Found an old offset: {} ,Expecting: {}",
messageAndOffset.offset(), readOffset);
continue;
}
ByteBuffer payload = messageAndOffset.message().payload();
byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
try {
message = new String(bytes, "UTF-8");
} catch (UnsupportedEncodingException ue) {
log.warn(ue.getMessage(), ue);
message = new String(bytes);
}
log.debug(
"client name : {} , Offset is : {} , Message is : {} ",
clientName, String.valueOf(messageAndOffset.offset()),
message);
CONSUME_LOG.debug(a_topic + "\t" + a_partition + "\t"
+ String.valueOf(messageAndOffset.offset()));
obj = new JSONObject(message);
if (!inputReq.getApplicationArea().isSubscription()
|| (obj.has("SUBSCRIBER_NAMES") && obj.getString(
"SUBSCRIBER_NAMES").contains(
inputReq.getSender().getReferenceID()))) {
if (inputReq.getApplicationArea().getChangeEventsType() == null
|| (obj.has("NOTIFICATION_EVENT_NAME") && inputReq
.getApplicationArea()
.getChangeEventsType()
.contains(
obj.getString("NOTIFICATION_EVENT_NAME")))) {
msglist.add(new JSONObject(message));
}
}
readOffset = messageAndOffset.nextOffset();
}
endTime = Calendar.getInstance().getTimeInMillis();
if (msglist.size() >= Math.round(limit
/ inputReq.getApplicationArea().getReqInfo().size())
|| (endTime - startTime) >= waitTime) {
log.info(
"Wait condition has been met... exiting the fetch loop. recordCount - {},
time exhausted - {} ms.",
msglist.size(), (endTime - startTime));
end = true;
} else if (set.sizeInBytes() == 0) {
emptyFetchCount++;
try {
if(emptyFetchCount == emptyLoopLimit) {
log.info("No messages were found in 3 successive fetches. Stopping the
consume process here.");
end = true;
} else {
Thread.sleep(1000);
}
} catch (InterruptedException ie) {
CONSUME_LOG.warn(ie.getMessage(), ie);
}
} else {
emptyFetchCount = 0;
}
req = null;
} while (!end);
*New kafkaconsumer implementation with new kafka brokers:*
2025-01-16 08:46:56,307 INFO
(com.test.cr.kafka.controller.EventsController:99) [http-nio-8080-exec-3]
- Consume process begin...
2025-01-16 08:46:56,307 INFO
(com.test.cr.kafka.services.NextgenConsumerDriver:76)
[http-nio-8080-exec-3] - Validating request...
2025-01-16 08:46:56,307 INFO
(com.test.cr.kafka.services.TestKafkaConsumer:68) [http-nio-8080-exec-3] -
Consume messages Topic: ORG_CHANGE_EVENT_YS1_18_OCT_2024 & partition: 0
2025-01-16 08:46:56,307 INFO
(com.test.cr.kafka.services.KafkaConsumerFactory:45)
[http-nio-8080-exec-3] - KafkaUser:app_client kafkaBrokerStr:
ys1kafka-aln-01.test.com:9092,ys1kafka-aln-02.cisco.com:9092,
ys1kafka-rdn-01.test.com:9092,ys1kafka-rdn-02.test.com:9092
GroupID:crt_org_change_events_lt
2025-01-16 08:46:56,308 INFO
(org.apache.kafka.clients.consumer.ConsumerConfig:370)
[http-nio-8080-exec-3] - ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.include.jmx.reporter = true
auto.offset.reset = none
bootstrap.servers = [kafka-aln-01.test.com:9092,
kafka-aln-02.test.com:9092, kafka-rdn-01.test.com:9092,
kafka-rdn-02.test.com:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = crt_consumer_http-nio-8080-exec-3
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 10
fetch.min.bytes = 1
group.id =
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class
org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class
org.apache.kafka.clients.consumer.RangeAssignor, class
org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = [hidden]
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.connect.timeout.ms = null
sasl.login.read.timeout.ms = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.login.retry.backoff.max.ms = 10000
sasl.login.retry.backoff.ms = 100
sasl.mechanism = PLAIN
sasl.oauthbearer.clock.skew.seconds = 30
sasl.oauthbearer.expected.audience = null
sasl.oauthbearer.expected.issuer = null
sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
sasl.oauthbearer.jwks.endpoint.url = null
sasl.oauthbearer.scope.claim.name = scope
sasl.oauthbearer.sub.claim.name = sub
sasl.oauthbearer.token.endpoint.url = null
security.protocol = SASL_SSL
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 45000
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class
org.apache.kafka.common.serialization.StringDeserializer
2025-01-16 08:46:56,308 WARN
(org.apache.kafka.clients.consumer.KafkaConsumer:688)
[http-nio-8080-exec-3] - [Consumer
clientId=crt_consumer_http-nio-8080-exec-3, groupId=] Support for using the
empty group id by consumers is deprecated and will be removed in the next
major release.
2025-01-16 08:46:56,330 INFO
(org.apache.kafka.common.utils.AppInfoParser:119) [http-nio-8080-exec-3] -
Kafka version: 3.6.1
2025-01-16 08:46:56,330 INFO
(org.apache.kafka.common.utils.AppInfoParser:120) [http-nio-8080-exec-3] -
Kafka commitId: 5e3c2b738d253ff5
2025-01-16 08:46:56,330 INFO
(org.apache.kafka.common.utils.AppInfoParser:121) [http-nio-8080-exec-3] -
Kafka startTimeMs: 1737046016330
2025-01-16 08:46:56,330 INFO
(org.apache.kafka.clients.consumer.KafkaConsumer:1067)
[http-nio-8080-exec-3] - [Consumer
clientId=crt_consumer_http-nio-8080-exec-3, groupId=] Assigned to
partition(s): ORG_CHANGE_EVENT_YS1_18_OCT_2024-0
2025-01-16 08:46:56,330 INFO
(org.apache.kafka.clients.consumer.KafkaConsumer:1067)
[http-nio-8080-exec-3] - [Consumer
clientId=crt_consumer_http-nio-8080-exec-3, groupId=] Assigned to
partition(s): ORG_CHANGE_EVENT_YS1_18_OCT_2024-0
2025-01-16 08:46:56,832 INFO
(com.test.cr.kafka.services.TestKafkaConsumer:95) [http-nio-8080-exec-3] -
EarliestOffset :0 LatestOffset :16136566 Time :502 ms
2025-01-16 08:46:56,832 INFO
(com.test.cr.kafka.services.TestKafkaConsumer:97) [http-nio-8080-exec-3] -
Latest & Earliest offset in Kafka for topic:
ORG_CHANGE_EVENT_YS1_18_OCT_2024 & partition: 0 Latest is 16136566 Earliest
is 0 and in user input is 16130077
2025-01-16 08:46:56,832 INFO
(org.apache.kafka.clients.consumer.KafkaConsumer:1564)
[http-nio-8080-exec-3] - [Consumer
clientId=crt_consumer_http-nio-8080-exec-3, groupId=] Seeking to offset
16130077 for partition ORG_CHANGE_EVENT_YS1_18_OCT_2024-0
2025-01-16 08:46:57,157 INFO (org.apache.kafka.clients.Metadata:287)
[http-nio-8080-exec-3] - [Consumer
clientId=crt_consumer_http-nio-8080-exec-3, groupId=] Cluster ID:
WyVVmQEwTbeRp73DYJgk7Q
2025-01-16 08:46:58,741 INFO
(com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3]
- Poll Records Count :500
2025-01-16 08:46:58,823 INFO
(com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3]
- Poll Records Count :500
2025-01-16 08:46:58,898 INFO
(com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3]
- Poll Records Count :500
2025-01-16 08:46:58,964 INFO
(com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3]
- Poll Records Count :188
2025-01-16 08:47:00,243 INFO
(com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3]
- Poll Records Count :500
2025-01-16 08:47:00,316 INFO
(com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3]
- Poll Records Count :500
2025-01-16 08:47:00,383 INFO
(com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3]
- Poll Records Count :500
2025-01-16 08:47:00,455 INFO
(com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3]
- Poll Records Count :235
2025-01-16 08:47:01,735 INFO
(com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3]
- Poll Records Count :500
2025-01-16 08:47:01,799 INFO
(com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3]
- Poll Records Count :500
2025-01-16 08:47:01,966 INFO
(com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3]
- Poll Records Count :500
2025-01-16 08:47:02,035 INFO
(com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3]
- Poll Records Count :294
2025-01-16 08:47:02,912 INFO
(com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3]
- Poll Records Count :500
2025-01-16 08:47:02,981 INFO
(com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3]
- Poll Records Count :500
2025-01-16 08:47:03,050 INFO
(com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3]
- Poll Records Count :272
2025-01-16 08:47:05,087 INFO
(com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3]
- Poll Records Count :0
2025-01-16 08:47:08,088 INFO
(com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3]
- Poll Records Count :0
2025-01-16 08:47:11,088 INFO
(com.test.cr.kafka.services.TestKafkaConsumer:114) [http-nio-8080-exec-3]
- Poll Records Count :0
2025-01-16 08:47:11,088 INFO
(com.test.cr.kafka.services.TestKafkaConsumer:157) [http-nio-8080-exec-3]
- No messages were found in 3 successive fetches. Stopping the consume
process here.
2025-01-16 08:47:11,088 INFO
(com.test.cr.kafka.services.TestKafkaConsumer:179) [http-nio-8080-exec-3]
- Summary ::: Total number of messages read from offset 16130077 to
16136566 are - 6489. Count of filtered results - 54
2025-01-16 08:47:11,088 INFO
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1025)
[http-nio-8080-exec-3] - [Consumer
clientId=crt_consumer_http-nio-8080-exec-3, groupId=] Resetting generation
and member id due to: consumer pro-actively leaving the group
2025-01-16 08:47:11,089 INFO
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1072)
[http-nio-8080-exec-3] - [Consumer
clientId=crt_consumer_http-nio-8080-exec-3, groupId=] Request joining group
due to: consumer pro-actively leaving the group
2025-01-16 08:47:11,138 INFO (org.apache.kafka.common.metrics.Metrics:693)
[http-nio-8080-exec-3] - Metrics scheduler closed
2025-01-16 08:47:11,139 INFO (org.apache.kafka.common.metrics.Metrics:697)
[http-nio-8080-exec-3] - Closing reporter
org.apache.kafka.common.metrics.JmxReporter
2025-01-16 08:47:11,139 INFO (org.apache.kafka.common.metrics.Metrics:703)
[http-nio-8080-exec-3] - Metrics reporters closed
2025-01-16 08:47:11,140 INFO
(org.apache.kafka.common.utils.AppInfoParser:83) [http-nio-8080-exec-3] -
App info kafka.consumer for crt_consumer_http-nio-8080-exec-3 unregistered
2025-01-16 08:47:11,159 INFO
(com.cisco.cr.kafka.controller.EventsController:156)
[http-nio-8080-exec-3] - Consume process end.
New code:Here we are using KafkaConsumer API with SASL authentication
public List<JSONObject> consume(long offset, String topicName,int
partition,CEConsumeRequest inputReq) throws CustomException {
List<JSONObject> msglist = new ArrayList<JSONObject>();
int waitTime = Integer.valueOf(Config
.getConsumerPropValue("pull.wait.time"));
int limit = Integer.valueOf(Config.getConsumerPropValue("pull.size.limit"));
int emptyLoopLimit = Integer.valueOf(Config
.getConsumerPropValue("pull.empty.loop.limit"));
int fetchSize = Integer.valueOf(Config
.getConsumerPropValue("pull.each.fetch.size"));
log.info(
"Consume messages Topic: {} & partition: {}",
topicName, partition);
TopicPartition topicPartition = new TopicPartition(topicName, partition);
long readOffset = offset;
clientName = "crt_consumer_" + Thread.currentThread().getName();
try (KafkaConsumer<String,String> consumer =
KafkaConsumerFactory.createConsumer(clientName,fetchSize,topicPartition)){
consumer.assign(Collections.singletonList(topicPartition));
long OffsetStartTime = System.currentTimeMillis();
long OffsetEndTime= System.currentTimeMillis();
long kafkaEarliestOffset=0;
long latestOffset=0;
Admin adminClient = KafkaConsumerFactory.getAdminClient();
try {
kafkaEarliestOffset = adminClient.listOffsets(
Map.of(topicPartition, OffsetSpec.earliest())
).all().get().get(topicPartition).offset();
latestOffset = adminClient.listOffsets(
Map.of(topicPartition, OffsetSpec.latest())
).all().get().get(topicPartition).offset();
OffsetEndTime = System.currentTimeMillis();
} catch (Exception e) {
e.printStackTrace();
}
log.info("EarliestOffset :{} LatestOffset :{} Time :{}
ms",kafkaEarliestOffset,latestOffset,(OffsetEndTime - OffsetStartTime));
log.info(
"Latest & Earliest offset in Kafka for topic: {} & partition: {} Latest
is {} Earliest is {} and in user input is {}",
topicName, partition, latestOffset, kafkaEarliestOffset,readOffset);
if (readOffset == 0 && readOffset < kafkaEarliestOffset) {
log.warn("Resetting the offset to earliest available offset in kafka.");
readOffset = kafkaEarliestOffset;
}
boolean end = false;
long startTime = System.currentTimeMillis();
long endTime = System.currentTimeMillis();
int emptyFetchCount = 0;
consumer.seek(topicPartition, offset);
do {
JSONObject obj = null;
ConsumerRecords < String, String > records =
consumer.poll(Duration.ofMillis(2000));
log.info("Poll Records Count :{}",records.count());
for (ConsumerRecord< String, String > consumerRecord: records) {
long currentOffset = consumerRecord.offset();
if (currentOffset < readOffset) {
log.warn("Found an old offset: {}, Expecting: {}",
currentOffset, readOffset);
continue;
}
String message = consumerRecord.value();
log.debug(
"client name : {} , Offset is : {} , Message is : {} ",
clientName, readOffset,
message);
CONSUME_LOG.debug(topicName + "\t" + partition + "\t"
+ String.valueOf(currentOffset));
obj = new JSONObject(message);
if (!inputReq.getApplicationArea().isSubscription()
|| (obj.has("SUBSCRIBER_NAMES") && obj.getString(
"SUBSCRIBER_NAMES").contains(
inputReq.getSender().getReferenceID()))) {
if (inputReq.getApplicationArea().getChangeEventsType() == null
|| (obj.has("NOTIFICATION_EVENT_NAME") && inputReq
.getApplicationArea()
.getChangeEventsType()
.contains(
obj.getString("NOTIFICATION_EVENT_NAME")))) {
msglist.add(obj);
}
}
readOffset = currentOffset + 1;
}
endTime = System.currentTimeMillis();
if (msglist.size() >= Math.round(limit
/ inputReq.getApplicationArea().getReqInfo().size())
|| (endTime - startTime) >= waitTime) {
log.info(
"Wait condition has been met... exiting the fetch loop. recordCount - {},
time exhausted - {} ms.",
msglist.size(), (endTime - startTime));
end = true;
//consumer.commitSync();
} else if (records.isEmpty()) {
emptyFetchCount++;
try {
if(emptyFetchCount == emptyLoopLimit) {
log.info("No messages were found in 3 successive fetches. Stopping the
consume process here.");
end = true;
} else {
Thread.sleep(1000);
}
} catch (InterruptedException ie) {
CONSUME_LOG.warn(ie.getMessage(), ie);
}
} else {
emptyFetchCount = 0;
}
} while (!end);
The new kafka consumer API is very slow..i have tried lot of settings like
increasing MAX_POLL_RECORDS_CONFIG,FETCH_MIN_BYTES_CONFIG etc..but no use.
Is this time expected with the new KafkaConsumer API.How to improve my
timings.
please help on this.
Regards,
Giri