Re: [PR] KAFKA-15647: Fix the different behavior in error handling between the old and new group coordinator [kafka]
dajac merged PR #14589: URL: https://github.com/apache/kafka/pull/14589 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15647: Fix the different behavior in error handling between the old and new group coordinator [kafka]
dajac commented on code in PR #14589: URL: https://github.com/apache/kafka/pull/14589#discussion_r1376679603 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -1500,6 +1502,8 @@ class KafkaApis(val requestChannel: RequestChannel, new OffsetFetchResponseData.OffsetFetchResponseGroup() .setGroupId(offsetFetchRequest.groupId) .setErrorCode(Errors.forException(exception).code) + } else if (offsetFetchResponse.errorCode() != Errors.NONE.code()) { Review Comment: ditto. ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -1461,6 +1461,8 @@ class KafkaApis(val requestChannel: RequestChannel, new OffsetFetchResponseData.OffsetFetchResponseGroup() .setGroupId(offsetFetchRequest.groupId) .setErrorCode(Errors.forException(exception).code) + } else if (offsetFetchResponse.errorCode() != Errors.NONE.code()) { Review Comment: nit: We could remove the `()` after `code`. ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -4701,6 +4751,89 @@ class KafkaApisTest { assertEquals(expectedOffsetFetchResponse, response.data) } + @Test + def testHandleOffsetFetchWithUnauthorizedTopicAndTopLevelError(): Unit = { +def makeRequest(version: Short): RequestChannel.Request = { + val groups = Map( +"group-1" -> List( + new TopicPartition("foo", 0), + new TopicPartition("bar", 0) +).asJava, +"group-2" -> List( + new TopicPartition("foo", 0), + new TopicPartition("bar", 0) +).asJava + ).asJava + buildRequest(new OffsetFetchRequest.Builder(groups, false, false).build(version)) +} + +val requestChannelRequest = makeRequest(ApiKeys.OFFSET_FETCH.latestVersion) + +val authorizer: Authorizer = mock(classOf[Authorizer]) + +val acls = Map( + "group-1" -> AuthorizationResult.ALLOWED, + "group-2" -> AuthorizationResult.ALLOWED, + "foo" -> AuthorizationResult.DENIED, + "bar" -> AuthorizationResult.ALLOWED +) + +when(authorizer.authorize( + any[RequestContext], + any[util.List[Action]] +)).thenAnswer { invocation => + val actions = invocation.getArgument(1, classOf[util.List[Action]]) + actions.asScala.map { action => +acls.getOrElse(action.resourcePattern.name, AuthorizationResult.DENIED) + }.asJava +} + +// group-1 and group-2 are allowed and bar is allowed. +val group1Future = new CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]() +when(groupCoordinator.fetchOffsets( + requestChannelRequest.context, + new OffsetFetchRequestData.OffsetFetchRequestGroup() +.setGroupId("group-1") +.setTopics(List(new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName("bar") + .setPartitionIndexes(List[Integer](0).asJava)).asJava), + false +)).thenReturn(group1Future) + +val group2Future = new CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]() +when(groupCoordinator.fetchOffsets( + requestChannelRequest.context, + new OffsetFetchRequestData.OffsetFetchRequestGroup() +.setGroupId("group-2") +.setTopics(List(new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName("bar") + .setPartitionIndexes(List[Integer](0).asJava)).asJava), + false +)).thenReturn(group1Future) + +createKafkaApis(authorizer = Some(authorizer)).handle(requestChannelRequest, RequestLocal.NoCaching) + +// group-2 mocks using the new group coordinator. +// When the coordinator is not active, a response with error code is returned. Review Comment: Should we add a note about `foo` here? The whole point of this test is to ensure that the failed topics are not present in the response when there is a top level error. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15647: Fix the different behavior in error handling between the old and new group coordinator [kafka]
dajac commented on PR #14589: URL: https://github.com/apache/kafka/pull/14589#issuecomment-1783398978 @dongnuo123 One of the build is still red. You may have to merge trunk to get the latest changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15647: Fix the different behavior in error handling between the old and new group coordinator [kafka]
dongnuo123 commented on code in PR #14589: URL: https://github.com/apache/kafka/pull/14589#discussion_r1373846331 ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -4385,15 +4451,39 @@ class KafkaApisTest { .setGroupId("group-3") .setErrorCode(Errors.INVALID_GROUP_ID.code) - val expectedOffsetFetchResponse = new OffsetFetchResponseData() -.setGroups(List(group1Response, group2Response, group3Response).asJava) + val group4Response = new OffsetFetchResponseData.OffsetFetchResponseGroup() Review Comment: group3Response and group4Response are the same. I wanted to make sure when ``` group3Future.completeExceptionally(Errors.INVALID_GROUP_ID.exception) group4Future.complete(group4Response) ``` handleOffsetFetch gives the same response. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15647: Fix the different behavior in error handling between the old and new group coordinator [kafka]
dongnuo123 commented on code in PR #14589: URL: https://github.com/apache/kafka/pull/14589#discussion_r1373847099 ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -4385,15 +4451,39 @@ class KafkaApisTest { .setGroupId("group-3") .setErrorCode(Errors.INVALID_GROUP_ID.code) - val expectedOffsetFetchResponse = new OffsetFetchResponseData() -.setGroups(List(group1Response, group2Response, group3Response).asJava) + val group4Response = new OffsetFetchResponseData.OffsetFetchResponseGroup() +.setGroupId("group-4") +.setErrorCode(Errors.INVALID_GROUP_ID.code) + + val group5Response = new OffsetFetchResponseData.OffsetFetchResponseGroup() Review Comment: Yeah, you're right. Let me write it to a separate method. We probably don't need this group 5 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15647: Fix the different behavior in error handling between the old and new group coordinator [kafka]
dongnuo123 commented on code in PR #14589: URL: https://github.com/apache/kafka/pull/14589#discussion_r1373846331 ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -4385,15 +4451,39 @@ class KafkaApisTest { .setGroupId("group-3") .setErrorCode(Errors.INVALID_GROUP_ID.code) - val expectedOffsetFetchResponse = new OffsetFetchResponseData() -.setGroups(List(group1Response, group2Response, group3Response).asJava) + val group4Response = new OffsetFetchResponseData.OffsetFetchResponseGroup() Review Comment: group3Response and group4Response are the same. I wanted to when ``` group3Future.completeExceptionally(Errors.INVALID_GROUP_ID.exception) group4Future.complete(group4Response) ``` handleOffsetFetch gives the same response. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15647: Fix the different behavior in error handling between the old and new group coordinator [kafka]
dajac commented on code in PR #14589: URL: https://github.com/apache/kafka/pull/14589#discussion_r1373349480 ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -4385,15 +4451,39 @@ class KafkaApisTest { .setGroupId("group-3") .setErrorCode(Errors.INVALID_GROUP_ID.code) - val expectedOffsetFetchResponse = new OffsetFetchResponseData() -.setGroups(List(group1Response, group2Response, group3Response).asJava) + val group4Response = new OffsetFetchResponseData.OffsetFetchResponseGroup() +.setGroupId("group-4") +.setErrorCode(Errors.INVALID_GROUP_ID.code) + + val group5Response = new OffsetFetchResponseData.OffsetFetchResponseGroup() Review Comment: I don't fully understand this test case either. My understanding is that we could have a weird response only in one case: the coordinator returns a response with an error and the kafka apis adds unauthorized topics to it. Did I get it right? If so, it may be better to actually add a new (and separate) test case for this one. ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -4385,15 +4451,39 @@ class KafkaApisTest { .setGroupId("group-3") .setErrorCode(Errors.INVALID_GROUP_ID.code) - val expectedOffsetFetchResponse = new OffsetFetchResponseData() -.setGroups(List(group1Response, group2Response, group3Response).asJava) + val group4Response = new OffsetFetchResponseData.OffsetFetchResponseGroup() +.setGroupId("group-4") +.setErrorCode(Errors.INVALID_GROUP_ID.code) + + val group5Response = new OffsetFetchResponseData.OffsetFetchResponseGroup() +.setGroupId("group-5") +.setTopics(List( + new OffsetFetchResponseData.OffsetFetchResponseTopics() +.setName("unknown") +.setPartitions(List( + new OffsetFetchResponseData.OffsetFetchResponsePartitions() +.setPartitionIndex(0) +.setCommittedOffset(-1) +).asJava), + new OffsetFetchResponseData.OffsetFetchResponseTopics() +.setName("foo") +.setPartitions(List( + new OffsetFetchResponseData.OffsetFetchResponsePartitions() +.setPartitionIndex(1) +.setCommittedOffset(-1) +).asJava) +).asJava) + + val expectedGroups = List(group1Response, group2Response, group3Response, group4Response, group5Response) Review Comment: Is this change really necessary? ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -4385,15 +4451,39 @@ class KafkaApisTest { .setGroupId("group-3") .setErrorCode(Errors.INVALID_GROUP_ID.code) - val expectedOffsetFetchResponse = new OffsetFetchResponseData() -.setGroups(List(group1Response, group2Response, group3Response).asJava) + val group4Response = new OffsetFetchResponseData.OffsetFetchResponseGroup() Review Comment: Actually, how is this different from `group3Response`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15647: Fix the different behavior in error handling between the old and new group coordinator [kafka]
dongnuo123 commented on code in PR #14589: URL: https://github.com/apache/kafka/pull/14589#discussion_r1372463509 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -268,7 +269,9 @@ public CompletableFuture consumerGroupHeartb ConsumerGroupHeartbeatRequestData request Review Comment: Not sure if I understand, should we specify in group coordinator interface or in group coordinator service? I made some change in the group coordinator interface. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15647: Fix the different behavior in error handling between the old and new group coordinator [kafka]
dajac commented on code in PR #14589: URL: https://github.com/apache/kafka/pull/14589#discussion_r1371818152 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -310,16 +313,22 @@ public CompletableFuture joinGroup( JoinGroupRequestData request, BufferSupplier bufferSupplier ) { +CompletableFuture responseFuture = new CompletableFuture<>(); + if (!isActive.get()) { -return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()); -} +responseFuture.complete(new JoinGroupResponseData() +.setMemberId(request.memberId()) +.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()) +); -CompletableFuture responseFuture = new CompletableFuture<>(); +return responseFuture; +} if (!isGroupIdNotEmpty(request.groupId())) { responseFuture.complete(new JoinGroupResponseData() Review Comment: We could also use `return CompletableFuture.completedFuture` here. ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -4341,6 +4371,15 @@ class KafkaApisTest { false )).thenReturn(group3Future) + val group4Future = new CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]() Review Comment: I also feel like that we don't fully repro the issue in this one. We also need to test for both the all topics (null) and specific topics (non null) cases. Do you agree? ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -3267,6 +3267,35 @@ class KafkaApisTest { assertEquals(Errors.GROUP_ID_NOT_FOUND, Errors.forCode(response.data.errorCode)) } + @Test + def testOffsetDeleteWithInvalidGroupWithTopLevelError(): Unit = { Review Comment: I think that we don't fully reproduce the bug with this test because `if (data.topics().isEmpty()) {` would be true in this case as well. Would it be possible to have a test which really have partial results awaiting in the builder? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -310,16 +313,22 @@ public CompletableFuture joinGroup( JoinGroupRequestData request, BufferSupplier bufferSupplier ) { +CompletableFuture responseFuture = new CompletableFuture<>(); + if (!isActive.get()) { -return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()); -} +responseFuture.complete(new JoinGroupResponseData() Review Comment: nit: Could we also use `return CompletableFuture.completedFuture`)? ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java: ## @@ -139,7 +138,7 @@ public void testStartupShutdown() throws Exception { } @Test -public void testConsumerGroupHeartbeatWhenNotStarted() { +public void testConsumerGroupHeartbeatWhenNotStarted() throws ExecutionException, InterruptedException { Review Comment: Is it worth adding a similar test for each API? That would ensure that we keep the contract. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -268,7 +269,9 @@ public CompletableFuture consumerGroupHeartb ConsumerGroupHeartbeatRequestData request Review Comment: Should we also update the javadoc of all those methods to specify the contract a little more? I known that the old coordinator won't fully comply with this but as it will go away, it should not be a big deal. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org