jolshan commented on a change in pull request #11331: URL: https://github.com/apache/kafka/pull/11331#discussion_r745182731
########## File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java ########## @@ -270,36 +273,320 @@ public void testFetchNormal() { } @Test - public void testFetchWithNoId() { + public void testFetchWithNoTopicId() { // Should work and default to using old request type. buildFetcher(); - TopicPartition noId = new TopicPartition("noId", 0); - TopicIdPartition noIdPart = new TopicIdPartition(Uuid.ZERO_UUID, noId); - assignFromUserNoId(singleton(noId)); - subscriptions.seek(noId, 0); + TopicIdPartition noId = new TopicIdPartition(Uuid.ZERO_UUID, new TopicPartition("noId", 0)); + assignFromUserNoId(singleton(noId.topicPartition())); + subscriptions.seek(noId.topicPartition(), 0); - // fetch should use request version 12 + // Fetch should use request version 12 assertEquals(1, fetcher.sendFetches()); assertFalse(fetcher.hasCompletedFetches()); - client.prepareResponse(fullFetchResponse(noIdPart, this.records, Errors.NONE, 100L, 0)); + client.prepareResponse( + fetchRequestMatcher((short) 12, noId, 0, Optional.of(validLeaderEpoch)), + fullFetchResponse(noId, this.records, Errors.NONE, 100L, 0) + ); consumerClient.poll(time.timer(0)); assertTrue(fetcher.hasCompletedFetches()); Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords = fetchedRecords(); - assertTrue(partitionRecords.containsKey(noId)); + assertTrue(partitionRecords.containsKey(noId.topicPartition())); - List<ConsumerRecord<byte[], byte[]>> records = partitionRecords.get(noId); + List<ConsumerRecord<byte[], byte[]>> records = partitionRecords.get(noId.topicPartition()); assertEquals(3, records.size()); - assertEquals(4L, subscriptions.position(noId).offset); // this is the next fetching position + assertEquals(4L, subscriptions.position(noId.topicPartition()).offset); // this is the next fetching position long offset = 1; for (ConsumerRecord<byte[], byte[]> record : records) { assertEquals(offset, record.offset()); offset += 1; } } + @Test + public void testFetchWithTopicId() { + buildFetcher(); + + TopicIdPartition tp = new TopicIdPartition(topicId, new TopicPartition(topicName, 0)); + assignFromUser(singleton(tp.topicPartition())); + subscriptions.seek(tp.topicPartition(), 0); + + // Fetch should use latest version + assertEquals(1, fetcher.sendFetches()); + assertFalse(fetcher.hasCompletedFetches()); + + client.prepareResponse( + fetchRequestMatcher(ApiKeys.FETCH.latestVersion(), tp, 0, Optional.of(validLeaderEpoch)), + fullFetchResponse(tp, this.records, Errors.NONE, 100L, 0) + ); + consumerClient.poll(time.timer(0)); + assertTrue(fetcher.hasCompletedFetches()); + + Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords = fetchedRecords(); + assertTrue(partitionRecords.containsKey(tp.topicPartition())); + + List<ConsumerRecord<byte[], byte[]>> records = partitionRecords.get(tp.topicPartition()); + assertEquals(3, records.size()); + assertEquals(4L, subscriptions.position(tp.topicPartition()).offset); // this is the next fetching position + long offset = 1; + for (ConsumerRecord<byte[], byte[]> record : records) { + assertEquals(offset, record.offset()); + offset += 1; + } + } + + @Test + public void testFetchForgetTopicIdWhenUnassigned() { + buildFetcher(); + + TopicIdPartition foo = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + TopicIdPartition bar = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("bar", 0)); + + // Assign foo and bar. + subscriptions.assignFromUser(singleton(foo.topicPartition())); + client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(1, singleton(foo), tp -> validLeaderEpoch)); + subscriptions.seek(foo.topicPartition(), 0); + + // Fetch should use latest version. + assertEquals(1, fetcher.sendFetches()); + + client.prepareResponse( + fetchRequestMatcher(ApiKeys.FETCH.latestVersion(), + singletonMap(foo, new PartitionData( + foo.topicId(), + 0, + FetchRequest.INVALID_LOG_START_OFFSET, + fetchSize, + Optional.of(validLeaderEpoch)) + ), + emptyList() + ), + fullFetchResponse(1, foo, this.records, Errors.NONE, 100L, 0) + ); + consumerClient.poll(time.timer(0)); + assertTrue(fetcher.hasCompletedFetches()); + fetchedRecords(); + + // Assign bar and unassign foo. + subscriptions.assignFromUser(singleton(bar.topicPartition())); + client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(1, singleton(bar), tp -> validLeaderEpoch)); + subscriptions.seek(bar.topicPartition(), 0); + + // Fetch should use latest version. + assertEquals(1, fetcher.sendFetches()); + assertFalse(fetcher.hasCompletedFetches()); + + client.prepareResponse( + fetchRequestMatcher(ApiKeys.FETCH.latestVersion(), + singletonMap(bar, new PartitionData( + bar.topicId(), + 0, + FetchRequest.INVALID_LOG_START_OFFSET, + fetchSize, + Optional.of(validLeaderEpoch)) + ), + singletonList(foo) + ), + fullFetchResponse(1, bar, this.records, Errors.NONE, 100L, 0) + ); + consumerClient.poll(time.timer(0)); + assertTrue(fetcher.hasCompletedFetches()); + fetchedRecords(); + } + + @Test + public void testFetchForgetTopicIdWhenReplaced() { + buildFetcher(); + + TopicIdPartition fooWithOldTopicId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + TopicIdPartition fooWithNewTopicId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + + // Assign foo with old topic id. + subscriptions.assignFromUser(singleton(fooWithOldTopicId.topicPartition())); + client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(1, singleton(fooWithOldTopicId), tp -> validLeaderEpoch)); + subscriptions.seek(fooWithOldTopicId.topicPartition(), 0); + + // Fetch should use latest version. + assertEquals(1, fetcher.sendFetches()); + + client.prepareResponse( + fetchRequestMatcher(ApiKeys.FETCH.latestVersion(), + singletonMap(fooWithOldTopicId, new PartitionData( + fooWithOldTopicId.topicId(), + 0, + FetchRequest.INVALID_LOG_START_OFFSET, + fetchSize, + Optional.of(validLeaderEpoch)) + ), + emptyList() + ), + fullFetchResponse(1, fooWithOldTopicId, this.records, Errors.NONE, 100L, 0) + ); + consumerClient.poll(time.timer(0)); + assertTrue(fetcher.hasCompletedFetches()); + fetchedRecords(); + + // Replace foo with old topic id with foo with new topic id. + subscriptions.assignFromUser(singleton(fooWithNewTopicId.topicPartition())); + client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(1, singleton(fooWithNewTopicId), tp -> validLeaderEpoch)); + subscriptions.seek(fooWithNewTopicId.topicPartition(), 0); + + // Fetch should use latest version. + assertEquals(1, fetcher.sendFetches()); + assertFalse(fetcher.hasCompletedFetches()); + + // foo with old topic id should be removed from the session. + client.prepareResponse( + fetchRequestMatcher(ApiKeys.FETCH.latestVersion(), + singletonMap(fooWithNewTopicId, new PartitionData( + fooWithNewTopicId.topicId(), + 0, + FetchRequest.INVALID_LOG_START_OFFSET, + fetchSize, + Optional.of(validLeaderEpoch)) + ), + singletonList(fooWithOldTopicId) + ), + fullFetchResponse(1, fooWithNewTopicId, this.records, Errors.NONE, 100L, 0) + ); + consumerClient.poll(time.timer(0)); + assertTrue(fetcher.hasCompletedFetches()); + fetchedRecords(); + } + + @Test + public void testFetchTopicIdUpgradeDowngrade() { + buildFetcher(); + + TopicIdPartition fooWithoutId = new TopicIdPartition(Uuid.ZERO_UUID, new TopicPartition("foo", 0)); + TopicIdPartition fooWithId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + + // Assign foo without a topic id. + subscriptions.assignFromUser(singleton(fooWithoutId.topicPartition())); + client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(1, singleton(fooWithoutId), tp -> validLeaderEpoch)); + subscriptions.seek(fooWithoutId.topicPartition(), 0); + + // Fetch should use version 12. + assertEquals(1, fetcher.sendFetches()); + + client.prepareResponse( + fetchRequestMatcher((short) 12, + singletonMap(fooWithoutId, new PartitionData( + fooWithoutId.topicId(), + 0, + FetchRequest.INVALID_LOG_START_OFFSET, + fetchSize, + Optional.of(validLeaderEpoch)) + ), + emptyList() + ), + fullFetchResponse(1, fooWithoutId, this.records, Errors.NONE, 100L, 0) + ); + consumerClient.poll(time.timer(0)); + assertTrue(fetcher.hasCompletedFetches()); + fetchedRecords(); + + // Upgrade. + subscriptions.assignFromUser(singleton(fooWithId.topicPartition())); Review comment: For my understanding, is this line necessary? We are assigning the same topic partition, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org