Hello All,
I was using a mirror maker 2.0. I was testing the consumer checkpointing
functionality. I found that the RemoteClusterUtils.translateOffsets do not give
checkpoints for the consumer which run in assign mode.
I am using mirror maker 2.0 of Kafka Version 2.5.0 and Scala version 2.12
My source Kafka setup is 1 broker 1 zookeeper having Kafka version 1.0.0. Scala
version 2.11
My target Kafka setup is 1 broker 1 zookeeper having Kafka version 1.0.0. Scala
version 2.11
I am only doing 1-way replication from my source cluster to the target cluster.
Mirror Maker Config:
================
clusters = A, B
A.bootstrap.servers = localhost:9082
B.bootstrap.servers = localhost:9092
A->B.enabled = true
A->B.topics = .*
A->B.groups = .*
B->A.enabled = false
B->A.topics = .*
replication.factor=1
checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1
emit.heartbeats.interval.seconds = 2
refresh.topics.interval.seconds=1
refresh.groups.interval.seconds=1
emit.checkpoints.interval.seconds=1
sync.topic.configs.enabled=true
sync.topic.configs.interval.seconds=1
replication.policy.class=com.ie.naukri.replicator.SimpleReplicationPolicy
============================================================================
In the replication policy, I have removed topic renaming and replicating the
topic as it is (same name in target cluster as source cluster).
Steps to replicate:
=============
1) Create a topic on the source cluster
2) Push some data in the topic using console producer
3) Start a consumer in assign mode to read from the above topic but only from 1
partition.
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9082");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "TestTopic-123");
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "2");
KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[],
byte[]>(properties);
TopicPartition tp = new TopicPartition("TestTopic-123", 1);
consumer.assign(Collections.singleton(tp));
while (true) {
ConsumerRecords<byte[], byte[]> records =
consumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<byte[], byte[]> record : records) {
System.out.println(new String(record.value()) + "__" +
record.partition());
Thread.sleep(2000);
}
}
}
4) Stop consumer mid-way. Describe the consumer in the source cluster to get
the lag information.
bin/kafka-consumer-groups.sh --describe --bootstrap-server localhost:9082
--group TestTopic-123
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
TestTopic-123 TestTopic-123 0 5 28 23
5) Run translate offset method to print the downstream offset.
Map<TopicPartition, OffsetAndMetadata> newOffsets =
RemoteClusterUtils.translateOffsets(properties, "A", "TestTopic-123",
Duration.ofMillis(5500));
System.out.println(newOffsets.toString());
6) An empty map is returned
Expected Outcome: Translated Committed offset should have been returned.
My Debugging
===========
On debugging the issue, I found that the checkpoint topic in the target cluster
did not have this group's committed offset.
Tried multiple times with different commit frequency and topic/group name. It
didn't work. Only consumer running in subscribe mode and console consumer with
--group flag is giving checkpoint.
Question
========
1) Is it intended as functionality that the assign mode consumer can never be
reset? Or is it a bug?
Any help would be greatly appreciated.