Re: [PR] KAFKA-15647: Fix the different behavior in error handling between the old and new group coordinator [kafka]

2023-10-31 Thread via GitHub


dajac merged PR #14589:
URL: https://github.com/apache/kafka/pull/14589


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15647: Fix the different behavior in error handling between the old and new group coordinator [kafka]

2023-10-30 Thread via GitHub


dajac commented on code in PR #14589:
URL: https://github.com/apache/kafka/pull/14589#discussion_r1376679603


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -1500,6 +1502,8 @@ class KafkaApis(val requestChannel: RequestChannel,
 new OffsetFetchResponseData.OffsetFetchResponseGroup()
   .setGroupId(offsetFetchRequest.groupId)
   .setErrorCode(Errors.forException(exception).code)
+  } else if (offsetFetchResponse.errorCode() != Errors.NONE.code()) {

Review Comment:
   ditto.



##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -1461,6 +1461,8 @@ class KafkaApis(val requestChannel: RequestChannel,
 new OffsetFetchResponseData.OffsetFetchResponseGroup()
   .setGroupId(offsetFetchRequest.groupId)
   .setErrorCode(Errors.forException(exception).code)
+  } else if (offsetFetchResponse.errorCode() != Errors.NONE.code()) {

Review Comment:
   nit: We could remove the `()` after `code`.



##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -4701,6 +4751,89 @@ class KafkaApisTest {
 assertEquals(expectedOffsetFetchResponse, response.data)
   }
 
+  @Test
+  def testHandleOffsetFetchWithUnauthorizedTopicAndTopLevelError(): Unit = {
+def makeRequest(version: Short): RequestChannel.Request = {
+  val groups = Map(
+"group-1" -> List(
+  new TopicPartition("foo", 0),
+  new TopicPartition("bar", 0)
+).asJava,
+"group-2" -> List(
+  new TopicPartition("foo", 0),
+  new TopicPartition("bar", 0)
+).asJava
+  ).asJava
+  buildRequest(new OffsetFetchRequest.Builder(groups, false, 
false).build(version))
+}
+
+val requestChannelRequest = makeRequest(ApiKeys.OFFSET_FETCH.latestVersion)
+
+val authorizer: Authorizer = mock(classOf[Authorizer])
+
+val acls = Map(
+  "group-1" -> AuthorizationResult.ALLOWED,
+  "group-2" -> AuthorizationResult.ALLOWED,
+  "foo" -> AuthorizationResult.DENIED,
+  "bar" -> AuthorizationResult.ALLOWED
+)
+
+when(authorizer.authorize(
+  any[RequestContext],
+  any[util.List[Action]]
+)).thenAnswer { invocation =>
+  val actions = invocation.getArgument(1, classOf[util.List[Action]])
+  actions.asScala.map { action =>
+acls.getOrElse(action.resourcePattern.name, AuthorizationResult.DENIED)
+  }.asJava
+}
+
+// group-1 and group-2 are allowed and bar is allowed.
+val group1Future = new 
CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]()
+when(groupCoordinator.fetchOffsets(
+  requestChannelRequest.context,
+  new OffsetFetchRequestData.OffsetFetchRequestGroup()
+.setGroupId("group-1")
+.setTopics(List(new OffsetFetchRequestData.OffsetFetchRequestTopics()
+  .setName("bar")
+  .setPartitionIndexes(List[Integer](0).asJava)).asJava),
+  false
+)).thenReturn(group1Future)
+
+val group2Future = new 
CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]()
+when(groupCoordinator.fetchOffsets(
+  requestChannelRequest.context,
+  new OffsetFetchRequestData.OffsetFetchRequestGroup()
+.setGroupId("group-2")
+.setTopics(List(new OffsetFetchRequestData.OffsetFetchRequestTopics()
+  .setName("bar")
+  .setPartitionIndexes(List[Integer](0).asJava)).asJava),
+  false
+)).thenReturn(group1Future)
+
+createKafkaApis(authorizer = 
Some(authorizer)).handle(requestChannelRequest, RequestLocal.NoCaching)
+
+// group-2 mocks using the new group coordinator.
+// When the coordinator is not active, a response with error code is 
returned.

Review Comment:
   Should we add a note about `foo` here? The whole point of this test is to 
ensure that the failed topics are not present in the response when there is a 
top level error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15647: Fix the different behavior in error handling between the old and new group coordinator [kafka]

2023-10-27 Thread via GitHub


dajac commented on PR #14589:
URL: https://github.com/apache/kafka/pull/14589#issuecomment-1783398978

   @dongnuo123 One of the build is still red. You may have to merge trunk to 
get the latest changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15647: Fix the different behavior in error handling between the old and new group coordinator [kafka]

2023-10-26 Thread via GitHub


dongnuo123 commented on code in PR #14589:
URL: https://github.com/apache/kafka/pull/14589#discussion_r1373846331


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -4385,15 +4451,39 @@ class KafkaApisTest {
 .setGroupId("group-3")
 .setErrorCode(Errors.INVALID_GROUP_ID.code)
 
-  val expectedOffsetFetchResponse = new OffsetFetchResponseData()
-.setGroups(List(group1Response, group2Response, group3Response).asJava)
+  val group4Response = new 
OffsetFetchResponseData.OffsetFetchResponseGroup()

Review Comment:
   group3Response and group4Response are the same. I wanted to make sure when
```
   group3Future.completeExceptionally(Errors.INVALID_GROUP_ID.exception)
   group4Future.complete(group4Response)
   ```
   handleOffsetFetch gives the same response.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15647: Fix the different behavior in error handling between the old and new group coordinator [kafka]

2023-10-26 Thread via GitHub


dongnuo123 commented on code in PR #14589:
URL: https://github.com/apache/kafka/pull/14589#discussion_r1373847099


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -4385,15 +4451,39 @@ class KafkaApisTest {
 .setGroupId("group-3")
 .setErrorCode(Errors.INVALID_GROUP_ID.code)
 
-  val expectedOffsetFetchResponse = new OffsetFetchResponseData()
-.setGroups(List(group1Response, group2Response, group3Response).asJava)
+  val group4Response = new 
OffsetFetchResponseData.OffsetFetchResponseGroup()
+.setGroupId("group-4")
+.setErrorCode(Errors.INVALID_GROUP_ID.code)
+
+  val group5Response = new 
OffsetFetchResponseData.OffsetFetchResponseGroup()

Review Comment:
   Yeah, you're right. Let me write it to a separate method. We probably don't 
need this group 5



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15647: Fix the different behavior in error handling between the old and new group coordinator [kafka]

2023-10-26 Thread via GitHub


dongnuo123 commented on code in PR #14589:
URL: https://github.com/apache/kafka/pull/14589#discussion_r1373846331


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -4385,15 +4451,39 @@ class KafkaApisTest {
 .setGroupId("group-3")
 .setErrorCode(Errors.INVALID_GROUP_ID.code)
 
-  val expectedOffsetFetchResponse = new OffsetFetchResponseData()
-.setGroups(List(group1Response, group2Response, group3Response).asJava)
+  val group4Response = new 
OffsetFetchResponseData.OffsetFetchResponseGroup()

Review Comment:
   group3Response and group4Response are the same. I wanted to when
```
   group3Future.completeExceptionally(Errors.INVALID_GROUP_ID.exception)
   group4Future.complete(group4Response)
   ```
   handleOffsetFetch gives the same response.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15647: Fix the different behavior in error handling between the old and new group coordinator [kafka]

2023-10-26 Thread via GitHub


dajac commented on code in PR #14589:
URL: https://github.com/apache/kafka/pull/14589#discussion_r1373349480


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -4385,15 +4451,39 @@ class KafkaApisTest {
 .setGroupId("group-3")
 .setErrorCode(Errors.INVALID_GROUP_ID.code)
 
-  val expectedOffsetFetchResponse = new OffsetFetchResponseData()
-.setGroups(List(group1Response, group2Response, group3Response).asJava)
+  val group4Response = new 
OffsetFetchResponseData.OffsetFetchResponseGroup()
+.setGroupId("group-4")
+.setErrorCode(Errors.INVALID_GROUP_ID.code)
+
+  val group5Response = new 
OffsetFetchResponseData.OffsetFetchResponseGroup()

Review Comment:
   I don't fully understand this test case either. My understanding is that we 
could have a weird response only in one case: the coordinator returns a 
response with an error and the kafka apis adds unauthorized topics to it. Did I 
get it right? If so, it may be better to actually add a new (and separate) test 
case for this one.



##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -4385,15 +4451,39 @@ class KafkaApisTest {
 .setGroupId("group-3")
 .setErrorCode(Errors.INVALID_GROUP_ID.code)
 
-  val expectedOffsetFetchResponse = new OffsetFetchResponseData()
-.setGroups(List(group1Response, group2Response, group3Response).asJava)
+  val group4Response = new 
OffsetFetchResponseData.OffsetFetchResponseGroup()
+.setGroupId("group-4")
+.setErrorCode(Errors.INVALID_GROUP_ID.code)
+
+  val group5Response = new 
OffsetFetchResponseData.OffsetFetchResponseGroup()
+.setGroupId("group-5")
+.setTopics(List(
+  new OffsetFetchResponseData.OffsetFetchResponseTopics()
+.setName("unknown")
+.setPartitions(List(
+  new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+.setPartitionIndex(0)
+.setCommittedOffset(-1)
+).asJava),
+  new OffsetFetchResponseData.OffsetFetchResponseTopics()
+.setName("foo")
+.setPartitions(List(
+  new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+.setPartitionIndex(1)
+.setCommittedOffset(-1)
+).asJava)
+).asJava)
+
+  val expectedGroups = List(group1Response, group2Response, 
group3Response, group4Response, group5Response)

Review Comment:
   Is this change really necessary?



##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -4385,15 +4451,39 @@ class KafkaApisTest {
 .setGroupId("group-3")
 .setErrorCode(Errors.INVALID_GROUP_ID.code)
 
-  val expectedOffsetFetchResponse = new OffsetFetchResponseData()
-.setGroups(List(group1Response, group2Response, group3Response).asJava)
+  val group4Response = new 
OffsetFetchResponseData.OffsetFetchResponseGroup()

Review Comment:
   Actually, how is this different from `group3Response`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15647: Fix the different behavior in error handling between the old and new group coordinator [kafka]

2023-10-25 Thread via GitHub


dongnuo123 commented on code in PR #14589:
URL: https://github.com/apache/kafka/pull/14589#discussion_r1372463509


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -268,7 +269,9 @@ public 
CompletableFuture consumerGroupHeartb
 ConsumerGroupHeartbeatRequestData request

Review Comment:
   Not sure if I understand, should we specify in group coordinator interface 
or in group coordinator service? I made some change in the group coordinator 
interface.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15647: Fix the different behavior in error handling between the old and new group coordinator [kafka]

2023-10-25 Thread via GitHub


dajac commented on code in PR #14589:
URL: https://github.com/apache/kafka/pull/14589#discussion_r1371818152


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -310,16 +313,22 @@ public CompletableFuture joinGroup(
 JoinGroupRequestData request,
 BufferSupplier bufferSupplier
 ) {
+CompletableFuture responseFuture = new 
CompletableFuture<>();
+
 if (!isActive.get()) {
-return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
-}
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(request.memberId())
+.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+);
 
-CompletableFuture responseFuture = new 
CompletableFuture<>();
+return responseFuture;
+}
 
 if (!isGroupIdNotEmpty(request.groupId())) {
 responseFuture.complete(new JoinGroupResponseData()

Review Comment:
   We could also use `return CompletableFuture.completedFuture` here.



##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -4341,6 +4371,15 @@ class KafkaApisTest {
 false
   )).thenReturn(group3Future)
 
+  val group4Future = new 
CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]()

Review Comment:
   I also feel like that we don't fully repro the issue in this one. We also 
need to test for both the all topics (null) and specific topics (non null) 
cases. Do you agree?



##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -3267,6 +3267,35 @@ class KafkaApisTest {
 assertEquals(Errors.GROUP_ID_NOT_FOUND, 
Errors.forCode(response.data.errorCode))
   }
 
+  @Test
+  def testOffsetDeleteWithInvalidGroupWithTopLevelError(): Unit = {

Review Comment:
   I think that we don't fully reproduce the bug with this test because `if 
(data.topics().isEmpty()) {` would be true in this case as well. Would it be 
possible to have a test which really have partial results awaiting in the 
builder?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -310,16 +313,22 @@ public CompletableFuture joinGroup(
 JoinGroupRequestData request,
 BufferSupplier bufferSupplier
 ) {
+CompletableFuture responseFuture = new 
CompletableFuture<>();
+
 if (!isActive.get()) {
-return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
-}
+responseFuture.complete(new JoinGroupResponseData()

Review Comment:
   nit: Could we also use `return CompletableFuture.completedFuture`)?



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##
@@ -139,7 +138,7 @@ public void testStartupShutdown() throws Exception {
 }
 
 @Test
-public void testConsumerGroupHeartbeatWhenNotStarted() {
+public void testConsumerGroupHeartbeatWhenNotStarted() throws 
ExecutionException, InterruptedException {

Review Comment:
   Is it worth adding a similar test for each API? That would ensure that we 
keep the contract.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -268,7 +269,9 @@ public 
CompletableFuture consumerGroupHeartb
 ConsumerGroupHeartbeatRequestData request

Review Comment:
   Should we also update the javadoc of all those methods to specify the 
contract a little more? I known that the old coordinator won't fully comply 
with this but as it will go away, it should not be a big deal.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org