[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14408: KAFKA-14506: Implement DeleteGroups API and OffsetDelete API
rreddy-22 commented on code in PR #14408: URL: https://github.com/apache/kafka/pull/14408#discussion_r1336165006 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java: ## @@ -936,4 +939,204 @@ public void testLeaveGroupThrowsUnknownMemberIdException() throws Exception { assertEquals(expectedResponse, future.get()); } + +@Test +public void testDeleteOffsets() throws Exception { +CoordinatorRuntime runtime = mockRuntime(); +GroupCoordinatorService service = new GroupCoordinatorService( +new LogContext(), +createConfig(), +runtime +); +service.startup(() -> 1); + +OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection requestTopicCollection = +new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(); +requestTopicCollection.add( +new OffsetDeleteRequestData.OffsetDeleteRequestTopic() +.setName("topic") +.setPartitions(Collections.singletonList( +new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0) +)) +); +OffsetDeleteRequestData request = new OffsetDeleteRequestData().setGroupId("group") +.setTopics(requestTopicCollection); + +OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection responsePartitionCollection = +new OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection(); +responsePartitionCollection.add( +new OffsetDeleteResponseData.OffsetDeleteResponsePartition().setPartitionIndex(0) +); +OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection responseTopicCollection = +new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection(); +responseTopicCollection.add( +new OffsetDeleteResponseData.OffsetDeleteResponseTopic().setPartitions(responsePartitionCollection) +); +OffsetDeleteResponseData response = new OffsetDeleteResponseData() +.setTopics(responseTopicCollection); + + +when(runtime.scheduleWriteOperation( +ArgumentMatchers.eq("delete-offset"), +ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), +ArgumentMatchers.any() +)).thenReturn(CompletableFuture.completedFuture(response)); + +CompletableFuture future = service.deleteOffsets( +requestContext(ApiKeys.OFFSET_DELETE), +request, +BufferSupplier.NO_CACHING +); + +assertTrue(future.isDone()); +assertEquals(response, future.get()); +} Review Comment: nit: can we add new lines between the tests -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14408: KAFKA-14506: Implement DeleteGroups API and OffsetDelete API
rreddy-22 commented on code in PR #14408: URL: https://github.com/apache/kafka/pull/14408#discussion_r1336168777 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java: ## @@ -936,4 +939,204 @@ public void testLeaveGroupThrowsUnknownMemberIdException() throws Exception { assertEquals(expectedResponse, future.get()); } + +@Test +public void testDeleteOffsets() throws Exception { +CoordinatorRuntime runtime = mockRuntime(); +GroupCoordinatorService service = new GroupCoordinatorService( +new LogContext(), +createConfig(), +runtime +); +service.startup(() -> 1); + +OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection requestTopicCollection = +new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(); +requestTopicCollection.add( +new OffsetDeleteRequestData.OffsetDeleteRequestTopic() +.setName("topic") +.setPartitions(Collections.singletonList( +new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0) +)) +); +OffsetDeleteRequestData request = new OffsetDeleteRequestData().setGroupId("group") +.setTopics(requestTopicCollection); + +OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection responsePartitionCollection = +new OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection(); +responsePartitionCollection.add( +new OffsetDeleteResponseData.OffsetDeleteResponsePartition().setPartitionIndex(0) +); +OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection responseTopicCollection = +new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection(); +responseTopicCollection.add( +new OffsetDeleteResponseData.OffsetDeleteResponseTopic().setPartitions(responsePartitionCollection) +); +OffsetDeleteResponseData response = new OffsetDeleteResponseData() +.setTopics(responseTopicCollection); + + +when(runtime.scheduleWriteOperation( +ArgumentMatchers.eq("delete-offset"), +ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), +ArgumentMatchers.any() +)).thenReturn(CompletableFuture.completedFuture(response)); + +CompletableFuture future = service.deleteOffsets( +requestContext(ApiKeys.OFFSET_DELETE), +request, +BufferSupplier.NO_CACHING +); + +assertTrue(future.isDone()); +assertEquals(response, future.get()); +} +@Test +public void testDeleteOffsetsInvalidGroupId() throws Exception { +CoordinatorRuntime runtime = mockRuntime(); +GroupCoordinatorService service = new GroupCoordinatorService( +new LogContext(), +createConfig(), +runtime +); +service.startup(() -> 1); + +OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection requestTopicCollection = +new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(); +requestTopicCollection.add( +new OffsetDeleteRequestData.OffsetDeleteRequestTopic() +.setName("topic") +.setPartitions(Collections.singletonList( +new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0) +)) +); +OffsetDeleteRequestData request = new OffsetDeleteRequestData().setGroupId("") +.setTopics(requestTopicCollection); + +OffsetDeleteResponseData response = new OffsetDeleteResponseData() +.setErrorCode(Errors.INVALID_GROUP_ID.code()); + +when(runtime.scheduleWriteOperation( +ArgumentMatchers.eq("delete-offset"), +ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), +ArgumentMatchers.any() +)).thenReturn(CompletableFuture.completedFuture(response)); + +CompletableFuture future = service.deleteOffsets( +requestContext(ApiKeys.OFFSET_DELETE), +request, +BufferSupplier.NO_CACHING +); + +assertTrue(future.isDone()); +assertEquals(response, future.get()); +} +@Test +public void testDeleteOffsetsCoordinatorNotAvailableException() throws Exception { +CoordinatorRuntime runtime = mockRuntime(); +GroupCoordinatorService service = new GroupCoordinatorService( +new LogContext(), +createConfig(), +runtime +); +service.startup(() -> 1); + +OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection requestTopicCollection = +new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(); +requestTopicCollection
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14408: KAFKA-14506: Implement DeleteGroups API and OffsetDelete API
rreddy-22 commented on code in PR #14408: URL: https://github.com/apache/kafka/pull/14408#discussion_r1336165306 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java: ## @@ -936,4 +939,204 @@ public void testLeaveGroupThrowsUnknownMemberIdException() throws Exception { assertEquals(expectedResponse, future.get()); } + +@Test +public void testDeleteOffsets() throws Exception { +CoordinatorRuntime runtime = mockRuntime(); +GroupCoordinatorService service = new GroupCoordinatorService( +new LogContext(), +createConfig(), +runtime +); +service.startup(() -> 1); + +OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection requestTopicCollection = +new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(); +requestTopicCollection.add( +new OffsetDeleteRequestData.OffsetDeleteRequestTopic() +.setName("topic") +.setPartitions(Collections.singletonList( +new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0) +)) +); +OffsetDeleteRequestData request = new OffsetDeleteRequestData().setGroupId("group") +.setTopics(requestTopicCollection); + +OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection responsePartitionCollection = +new OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection(); +responsePartitionCollection.add( +new OffsetDeleteResponseData.OffsetDeleteResponsePartition().setPartitionIndex(0) +); +OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection responseTopicCollection = +new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection(); +responseTopicCollection.add( +new OffsetDeleteResponseData.OffsetDeleteResponseTopic().setPartitions(responsePartitionCollection) +); +OffsetDeleteResponseData response = new OffsetDeleteResponseData() +.setTopics(responseTopicCollection); + + +when(runtime.scheduleWriteOperation( +ArgumentMatchers.eq("delete-offset"), +ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), +ArgumentMatchers.any() +)).thenReturn(CompletableFuture.completedFuture(response)); + +CompletableFuture future = service.deleteOffsets( +requestContext(ApiKeys.OFFSET_DELETE), +request, +BufferSupplier.NO_CACHING +); + +assertTrue(future.isDone()); +assertEquals(response, future.get()); +} +@Test +public void testDeleteOffsetsInvalidGroupId() throws Exception { +CoordinatorRuntime runtime = mockRuntime(); +GroupCoordinatorService service = new GroupCoordinatorService( +new LogContext(), +createConfig(), +runtime +); +service.startup(() -> 1); + +OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection requestTopicCollection = +new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(); +requestTopicCollection.add( +new OffsetDeleteRequestData.OffsetDeleteRequestTopic() +.setName("topic") +.setPartitions(Collections.singletonList( +new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0) +)) +); +OffsetDeleteRequestData request = new OffsetDeleteRequestData().setGroupId("") +.setTopics(requestTopicCollection); + +OffsetDeleteResponseData response = new OffsetDeleteResponseData() +.setErrorCode(Errors.INVALID_GROUP_ID.code()); + +when(runtime.scheduleWriteOperation( +ArgumentMatchers.eq("delete-offset"), +ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), +ArgumentMatchers.any() +)).thenReturn(CompletableFuture.completedFuture(response)); + +CompletableFuture future = service.deleteOffsets( +requestContext(ApiKeys.OFFSET_DELETE), +request, +BufferSupplier.NO_CACHING +); + +assertTrue(future.isDone()); +assertEquals(response, future.get()); +} +@Test Review Comment: nit: line -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14408: KAFKA-14506: Implement DeleteGroups API and OffsetDelete API
rreddy-22 commented on code in PR #14408: URL: https://github.com/apache/kafka/pull/14408#discussion_r1336165006 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java: ## @@ -936,4 +939,204 @@ public void testLeaveGroupThrowsUnknownMemberIdException() throws Exception { assertEquals(expectedResponse, future.get()); } + +@Test +public void testDeleteOffsets() throws Exception { +CoordinatorRuntime runtime = mockRuntime(); +GroupCoordinatorService service = new GroupCoordinatorService( +new LogContext(), +createConfig(), +runtime +); +service.startup(() -> 1); + +OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection requestTopicCollection = +new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(); +requestTopicCollection.add( +new OffsetDeleteRequestData.OffsetDeleteRequestTopic() +.setName("topic") +.setPartitions(Collections.singletonList( +new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0) +)) +); +OffsetDeleteRequestData request = new OffsetDeleteRequestData().setGroupId("group") +.setTopics(requestTopicCollection); + +OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection responsePartitionCollection = +new OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection(); +responsePartitionCollection.add( +new OffsetDeleteResponseData.OffsetDeleteResponsePartition().setPartitionIndex(0) +); +OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection responseTopicCollection = +new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection(); +responseTopicCollection.add( +new OffsetDeleteResponseData.OffsetDeleteResponseTopic().setPartitions(responsePartitionCollection) +); +OffsetDeleteResponseData response = new OffsetDeleteResponseData() +.setTopics(responseTopicCollection); + + +when(runtime.scheduleWriteOperation( +ArgumentMatchers.eq("delete-offset"), +ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), +ArgumentMatchers.any() +)).thenReturn(CompletableFuture.completedFuture(response)); + +CompletableFuture future = service.deleteOffsets( +requestContext(ApiKeys.OFFSET_DELETE), +request, +BufferSupplier.NO_CACHING +); + +assertTrue(future.isDone()); +assertEquals(response, future.get()); +} Review Comment: nit: add line between tests -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14408: KAFKA-14506: Implement DeleteGroups API and OffsetDelete API
rreddy-22 commented on code in PR #14408: URL: https://github.com/apache/kafka/pull/14408#discussion_r1336162536 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java: ## @@ -90,4 +90,29 @@ void validateOffsetFetch( int memberEpoch, long lastCommittedOffset ) throws KafkaException; + +/** + * Validates the OffsetDelete request. + */ +void validateOffsetDelete() throws KafkaException; + +/** + * Validates the GroupDelete request + */ +void validateGroupDelete() throws KafkaException; + +/** + * Returns true if the group is actively subscribed to the topic. + * + * @param topic the topic name. + * @return whether the group is subscribed to the topic. + */ +boolean isSubscribedToTopic(String topic); Review Comment: got it okie! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14408: KAFKA-14506: Implement DeleteGroups API and OffsetDelete API
rreddy-22 commented on code in PR #14408: URL: https://github.com/apache/kafka/pull/14408#discussion_r148886 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java: ## @@ -90,4 +90,29 @@ void validateOffsetFetch( int memberEpoch, long lastCommittedOffset ) throws KafkaException; + +/** + * Validates the OffsetDelete request. + */ +void validateOffsetDelete() throws KafkaException; + +/** + * Validates the GroupDelete request Review Comment: nit: period is missing -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14408: KAFKA-14506: Implement DeleteGroups API and OffsetDelete API
rreddy-22 commented on code in PR #14408: URL: https://github.com/apache/kafka/pull/14408#discussion_r131492 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java: ## @@ -849,6 +853,46 @@ public void validateOffsetFetch( } } +/** + * Validates the OffsetDelete request. + */ +@Override +public void validateOffsetDelete() throws GroupIdNotFoundException { +if (isInState(DEAD)) { +throw new GroupIdNotFoundException(String.format("Group %s is in dead state.", groupId)); +} +} + +/** + * Validates the GroupDelete request. + */ +@Override +public void validateGroupDelete() throws ApiException { +if (isInState(DEAD)) { +throw new GroupIdNotFoundException(String.format("Group %s is in dead state.", groupId)); +} else if (isInState(STABLE) +|| isInState(PREPARING_REBALANCE) +|| isInState(COMPLETING_REBALANCE)) { +throw Errors.NON_EMPTY_GROUP.exception(); +} + +// We avoid writing the tombstone when the generationId is 0, since this group is only using +// Kafka for offset storage. +if (generationId() <= 0) { +throw Errors.UNKNOWN_SERVER_ERROR.exception(); +} +} + + Review Comment: nit: extra line -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14408: KAFKA-14506: Implement DeleteGroups API and OffsetDelete API
rreddy-22 commented on code in PR #14408: URL: https://github.com/apache/kafka/pull/14408#discussion_r129945 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java: ## @@ -262,6 +267,45 @@ public HeartbeatResponseData genericGroupHeartbeat( ); } +/** + * Handles a GroupDelete request. + * + * @param context The request context. Review Comment: nit: same with this, tab spaces to align both param descriptions -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14408: KAFKA-14506: Implement DeleteGroups API and OffsetDelete API
rreddy-22 commented on code in PR #14408: URL: https://github.com/apache/kafka/pull/14408#discussion_r127696 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java: ## @@ -90,4 +90,29 @@ void validateOffsetFetch( int memberEpoch, long lastCommittedOffset ) throws KafkaException; + +/** + * Validates the OffsetDelete request. + */ +void validateOffsetDelete() throws KafkaException; + +/** + * Validates the GroupDelete request + */ +void validateGroupDelete() throws KafkaException; + +/** + * Returns true if the group is actively subscribed to the topic. + * + * @param topic the topic name. + * @return whether the group is subscribed to the topic. + */ +boolean isSubscribedToTopic(String topic); Review Comment: Also I thought we had decided to use topicIds instead of topic names throughout the new protocol, are we using topic names for this API? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14408: KAFKA-14506: Implement DeleteGroups API and OffsetDelete API
rreddy-22 commented on code in PR #14408: URL: https://github.com/apache/kafka/pull/14408#discussion_r127696 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java: ## @@ -90,4 +90,29 @@ void validateOffsetFetch( int memberEpoch, long lastCommittedOffset ) throws KafkaException; + +/** + * Validates the OffsetDelete request. + */ +void validateOffsetDelete() throws KafkaException; + +/** + * Validates the GroupDelete request + */ +void validateGroupDelete() throws KafkaException; + +/** + * Returns true if the group is actively subscribed to the topic. + * + * @param topic the topic name. + * @return whether the group is subscribed to the topic. + */ +boolean isSubscribedToTopic(String topic); Review Comment: Also we decided to use topicIds throughout the protocol, are we using topic names for this API? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #14408: KAFKA-14506: Implement DeleteGroups API and OffsetDelete API
rreddy-22 commented on code in PR #14408: URL: https://github.com/apache/kafka/pull/14408#discussion_r126880 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java: ## @@ -90,4 +90,29 @@ void validateOffsetFetch( int memberEpoch, long lastCommittedOffset ) throws KafkaException; + +/** + * Validates the OffsetDelete request. + */ +void validateOffsetDelete() throws KafkaException; + +/** + * Validates the GroupDelete request + */ +void validateGroupDelete() throws KafkaException; + +/** + * Returns true if the group is actively subscribed to the topic. + * + * @param topic the topic name. Review Comment: nit: can we add a tab space and capitalize "The" -> topic The topic name. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org