mimaison commented on code in PR #11748:
URL: https://github.com/apache/kafka/pull/11748#discussion_r873586510
##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -319,27 +319,21 @@ public void testReplication() throws Exception {
waitForCondition(() ->
primaryClient.remoteConsumerOffsets(consumerGroupName, BACKUP_CLUSTER_ALIAS,
Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new
TopicPartition("backup.test-topic-1", 0)), CHECKPOINT_DURATION_MS, "Offsets not
translated downstream to primary cluster.");
- waitForCondition(() ->
primaryClient.remoteConsumerOffsets(consumerGroupName, BACKUP_CLUSTER_ALIAS,
- Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new
TopicPartition("test-topic-1", 0)), CHECKPOINT_DURATION_MS, "Offsets not
translated upstream to primary cluster.");
-
Map<TopicPartition, OffsetAndMetadata> primaryOffsets =
primaryClient.remoteConsumerOffsets(consumerGroupName, BACKUP_CLUSTER_ALIAS,
Duration.ofMillis(CHECKPOINT_DURATION_MS));
primaryClient.close();
backupClient.close();
// Failback consumer group to primary cluster
- try (Consumer<byte[], byte[]> backupConsumer =
primary.kafka().createConsumer(Collections.singletonMap("group.id",
consumerGroupName))) {
- backupConsumer.assign(primaryOffsets.keySet());
- primaryOffsets.forEach(backupConsumer::seek);
- backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
- backupConsumer.commitAsync();
-
- assertTrue(backupConsumer.position(new
TopicPartition("test-topic-1", 0)) > 0, "Consumer failedback to zero upstream
offset.");
- assertTrue(backupConsumer.position(new
TopicPartition("backup.test-topic-1", 0)) > 0, "Consumer failedback to zero
downstream offset.");
- assertTrue(backupConsumer.position(
- new TopicPartition("test-topic-1", 0)) <=
NUM_RECORDS_PRODUCED, "Consumer failedback beyond expected upstream offset.");
- assertTrue(backupConsumer.position(
+ try (Consumer<byte[], byte[]> primaryConsumer =
primary.kafka().createConsumer(Collections.singletonMap("group.id",
consumerGroupName))) {
+ primaryConsumer.assign(primaryOffsets.keySet());
+ primaryOffsets.forEach(primaryConsumer::seek);
+ primaryConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
+ primaryConsumer.commitAsync();
+
+ assertTrue(primaryConsumer.position(new
TopicPartition("backup.test-topic-1", 0)) > 0, "Consumer failedback to zero
downstream offset.");
Review Comment:
Same as above, now `remoteConsumerOffsets()` only returns offsets for remote
topics that are being mirrored from the backup cluster and these are prefixed
with `backup.`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]