Re: [PR] KAFKA-16386: Convert NETWORK_EXCEPTIONs from KIP-890 transaction verification [kafka]
jolshan commented on PR #15559: URL: https://github.com/apache/kafka/pull/15559#issuecomment-2064671000 I wonder if I should backport this to 3.7 as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16386: Convert NETWORK_EXCEPTIONs from KIP-890 transaction verification [kafka]
jolshan merged PR #15559: URL: https://github.com/apache/kafka/pull/15559 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16386: Convert NETWORK_EXCEPTIONs from KIP-890 transaction verification [kafka]
jolshan commented on PR #15559: URL: https://github.com/apache/kafka/pull/15559#issuecomment-2019067710 [testAlterSinkConnectorOffsetsOverriddenConsumerGroupId](https://ge.apache.org/scans/tests?search.names=Git%20branch&search.relativeStartTime=P28D&search.rootProjectNames=kafka&search.timeZoneId=America%2FLos_Angeles&search.values=trunk&tests.container=org.apache.kafka.connect.integration.OffsetsApiIntegrationTest&tests.test=testAlterSinkConnectorOffsetsOverriddenConsumerGroupId) [testSeparateOffsetsTopic](https://ge.apache.org/scans/tests?search.names=Git%20branch&search.relativeStartTime=P28D&search.rootProjectNames=kafka&search.timeZoneId=America%2FLos_Angeles&search.values=trunk&tests.container=org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest&tests.test=testSeparateOffsetsTopic) These caught my eye but have been flaky/failing since before this change. Here are JIRAs for them: https://issues.apache.org/jira/browse/KAFKA-15914 and https://issues.apache.org/jira/browse/KAFKA-14089 The other tests look known and/or unrelated. I will go ahead and merge. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16386: Convert NETWORK_EXCEPTIONs from KIP-890 transaction verification [kafka]
jolshan commented on PR #15559: URL: https://github.com/apache/kafka/pull/15559#issuecomment-2018411363 Getting a fresh build before merging 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16386: Convert NETWORK_EXCEPTIONs from KIP-890 transaction verification [kafka]
squah-confluent commented on code in PR #15559: URL: https://github.com/apache/kafka/pull/15559#discussion_r1536046613 ## core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala: ## @@ -1365,6 +1365,16 @@ object GroupMetadataManager { def maybeConvertOffsetCommitError(error: Errors) : Errors = { error match { + case Errors.NETWORK_EXCEPTION => +// When committing offsets transactionally, we now verify the transaction with the +// transaction coordinator. Verification can fail with `NETWORK_EXCEPTION`, a retriable +// error which older clients may not expect and retry correctly. We have the option of +// translating `NETWORK_EXCEPTION` to either `COORDINATOR_LOAD_IN_PROGRESS` or Review Comment: Thanks, I've reworded the comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16386: Convert NETWORK_EXCEPTIONs from KIP-890 transaction verification [kafka]
jolshan commented on code in PR #15559: URL: https://github.com/apache/kafka/pull/15559#discussion_r1536023033 ## core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala: ## @@ -1365,6 +1365,16 @@ object GroupMetadataManager { def maybeConvertOffsetCommitError(error: Errors) : Errors = { error match { + case Errors.NETWORK_EXCEPTION => +// When committing offsets transactionally, we now verify the transaction with the +// transaction coordinator. Verification can fail with `NETWORK_EXCEPTION`, a retriable +// error which older clients may not expect and retry correctly. We have the option of +// translating `NETWORK_EXCEPTION` to either `COORDINATOR_LOAD_IN_PROGRESS` or Review Comment: Thanks for the explanation. I wonder if we could leave out the sentence about "we have the option..." and just include " We use `COORDINATOR_LOAD_IN_PROGRESS` because it retries the request without an unnecessary coordinator lookup." -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16386: Convert NETWORK_EXCEPTIONs from KIP-890 transaction verification [kafka]
squah-confluent commented on code in PR #15559: URL: https://github.com/apache/kafka/pull/15559#discussion_r1536004118 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java: ## @@ -1899,8 +1900,19 @@ public void testCommitTransactionalOffsets() throws ExecutionException, Interrup assertEquals(response, future.get()); } -@Test -public void testCommitTransactionalOffsetsWithWrappedError() throws ExecutionException, InterruptedException { +private static Stream testCommitTransactionalOffsetsWithWrappedErrorSource() { +return Stream.of( +Arguments.arguments(new NotEnoughReplicasException(), Errors.COORDINATOR_NOT_AVAILABLE.code()), +Arguments.arguments(new NetworkException(), Errors.COORDINATOR_LOAD_IN_PROGRESS.code()) +); +} + +@ParameterizedTest +@MethodSource("testCommitTransactionalOffsetsWithWrappedErrorSource") Review Comment: That appears to work, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16386: Convert NETWORK_EXCEPTIONs from KIP-890 transaction verification [kafka]
dajac commented on code in PR #15559: URL: https://github.com/apache/kafka/pull/15559#discussion_r1535973164 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java: ## @@ -1899,8 +1900,19 @@ public void testCommitTransactionalOffsets() throws ExecutionException, Interrup assertEquals(response, future.get()); } -@Test -public void testCommitTransactionalOffsetsWithWrappedError() throws ExecutionException, InterruptedException { +private static Stream testCommitTransactionalOffsetsWithWrappedErrorSource() { +return Stream.of( +Arguments.arguments(new NotEnoughReplicasException(), Errors.COORDINATOR_NOT_AVAILABLE.code()), +Arguments.arguments(new NetworkException(), Errors.COORDINATOR_LOAD_IN_PROGRESS.code()) +); +} + +@ParameterizedTest +@MethodSource("testCommitTransactionalOffsetsWithWrappedErrorSource") Review Comment: nit: Would it be possible to use a CsvSource with Errors,Errors? I am not sure if CsvSource works with Enums though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16386: Convert NETWORK_EXCEPTIONs from KIP-890 transaction verification [kafka]
squah-confluent commented on code in PR #15559: URL: https://github.com/apache/kafka/pull/15559#discussion_r1535903580 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -830,7 +830,11 @@ class ReplicaManager(val config: KafkaConfig, val customException = error match { case Errors.INVALID_TXN_STATE => Some(error.exception("Partition was not added to the transaction")) + // Transaction verification can fail with a retriable error that older clients may not + // retry correctly. Translate these to `NOT_ENOUGH_REPLICAS`, which will cause such + // clients to retry the produce request. Review Comment: I've updated the comment. Let me know if it looks okay now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16386: Convert NETWORK_EXCEPTIONs from KIP-890 transaction verification [kafka]
artemlivshits commented on code in PR #15559: URL: https://github.com/apache/kafka/pull/15559#discussion_r1535865337 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -830,7 +830,11 @@ class ReplicaManager(val config: KafkaConfig, val customException = error match { case Errors.INVALID_TXN_STATE => Some(error.exception("Partition was not added to the transaction")) + // Transaction verification can fail with a retriable error that older clients may not + // retry correctly. Translate these to `NOT_ENOUGH_REPLICAS`, which will cause such + // clients to retry the produce request. Review Comment: There are multiple errors that could could clients to retry produce request, we chose this one because it doesn't trigger metadata refresh. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16386: Convert NETWORK_EXCEPTIONs from KIP-890 transaction verification [kafka]
squah-confluent commented on code in PR #15559: URL: https://github.com/apache/kafka/pull/15559#discussion_r1535637194 ## core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala: ## @@ -921,7 +921,10 @@ private[group] class GroupCoordinator( ): Unit = { val (error, verificationGuard) = errorAndGuard if (error != Errors.NONE) { -val finalError = GroupMetadataManager.maybeConvertOffsetCommitError(error) +val finalError = error match { + case Errors.NETWORK_EXCEPTION => Errors.COORDINATOR_LOAD_IN_PROGRESS Review Comment: I've added a comment and moved the error translation in the old group coordinator to `maybeConvertOffsetCommitError`. I also moved the error translation in the new group coordinator to `handleOperationException` based on feedback from @\dajac. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16386: Convert NETWORK_EXCEPTIONs from KIP-890 transaction verification [kafka]
squah-confluent commented on code in PR #15559: URL: https://github.com/apache/kafka/pull/15559#discussion_r1535637194 ## core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala: ## @@ -921,7 +921,10 @@ private[group] class GroupCoordinator( ): Unit = { val (error, verificationGuard) = errorAndGuard if (error != Errors.NONE) { -val finalError = GroupMetadataManager.maybeConvertOffsetCommitError(error) +val finalError = error match { + case Errors.NETWORK_EXCEPTION => Errors.COORDINATOR_LOAD_IN_PROGRESS Review Comment: I've added a comment and moved the error translation in both group coordinators now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16386: Convert NETWORK_EXCEPTIONs from KIP-890 transaction verification [kafka]
squah-confluent commented on code in PR #15559: URL: https://github.com/apache/kafka/pull/15559#discussion_r1535636055 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -831,6 +831,7 @@ class ReplicaManager(val config: KafkaConfig, error match { case Errors.INVALID_TXN_STATE => Some(error.exception("Partition was not added to the transaction")) case Errors.CONCURRENT_TRANSACTIONS | + Errors.NETWORK_EXCEPTION | Review Comment: Good idea, I've added a comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16386: Convert NETWORK_EXCEPTIONs from KIP-890 transaction verification [kafka]
squah-confluent commented on code in PR #15559: URL: https://github.com/apache/kafka/pull/15559#discussion_r1534709657 ## core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala: ## @@ -921,7 +921,10 @@ private[group] class GroupCoordinator( ): Unit = { val (error, verificationGuard) = errorAndGuard if (error != Errors.NONE) { -val finalError = GroupMetadataManager.maybeConvertOffsetCommitError(error) +val finalError = error match { + case Errors.NETWORK_EXCEPTION => Errors.COORDINATOR_LOAD_IN_PROGRESS Review Comment: Thank you for the review! `maybeConvertOffsetCommitError` is also used in [`GroupMetadataManager.createPutCacheCallback`](https://github.com/apache/kafka/blob/7e85d7d32e8cb4f3fa9b01fd4197ee43d64ba6d0/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L430). I wasn't sure whether it made sense to translate network exceptions on that path too. I'm happy to move the translation there if you think it makes sense. I have the same uncertainty about [`handleOperationException`](https://github.com/apache/kafka/blob/7e85d7d32e8cb4f3fa9b01fd4197ee43d64ba6d0/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java#L1084) in the new group coordinator. All the other group coordinator methods use it to translate errors and I was not sure if it made sense to translate network exceptions there, for all methods. Also, I agree, a comment to explain the choice of error would be good. I'll add one here and in ReplicaManager too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16386: Convert NETWORK_EXCEPTIONs from KIP-890 transaction verification [kafka]
squah-confluent commented on code in PR #15559: URL: https://github.com/apache/kafka/pull/15559#discussion_r1534709657 ## core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala: ## @@ -921,7 +921,10 @@ private[group] class GroupCoordinator( ): Unit = { val (error, verificationGuard) = errorAndGuard if (error != Errors.NONE) { -val finalError = GroupMetadataManager.maybeConvertOffsetCommitError(error) +val finalError = error match { + case Errors.NETWORK_EXCEPTION => Errors.COORDINATOR_LOAD_IN_PROGRESS Review Comment: Thank you for the review! `maybeConvertOffsetCommitError` is also used in [`GroupMetadataManager.createPutCacheCallback`](https://github.com/apache/kafka/blob/7e85d7d32e8cb4f3fa9b01fd4197ee43d64ba6d0/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L430). I wasn't sure whether it made sense to translate network exceptions on that path too. I'm happy to move the translation there if you think it makes sense. I have the same uncertainty about [`handleOperationException`](https://github.com/apache/kafka/blob/7e85d7d32e8cb4f3fa9b01fd4197ee43d64ba6d0/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java#L1084) in the new group coordinator. All the other group coordinator methods use it to translate errors and I was not sure if it made sense to translate network exceptions there. Also, I agree, a comment to explain the choice of error would be good. I'll add one here and in ReplicaManager too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16386: Convert NETWORK_EXCEPTIONs from KIP-890 transaction verification [kafka]
squah-confluent commented on code in PR #15559: URL: https://github.com/apache/kafka/pull/15559#discussion_r1534709657 ## core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala: ## @@ -921,7 +921,10 @@ private[group] class GroupCoordinator( ): Unit = { val (error, verificationGuard) = errorAndGuard if (error != Errors.NONE) { -val finalError = GroupMetadataManager.maybeConvertOffsetCommitError(error) +val finalError = error match { + case Errors.NETWORK_EXCEPTION => Errors.COORDINATOR_LOAD_IN_PROGRESS Review Comment: Thank you for the review! `maybeConvertOffsetCommitError` is also used in [`GroupMetadataManager.createPutCacheCallback`](https://github.com/apache/kafka/blob/7e85d7d32e8cb4f3fa9b01fd4197ee43d64ba6d0/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L430). I wasn't sure whether it made sense to translate network exceptions on that path too. I'm happy to move the translation there if you think it makes sense. I have the same uncertainty about [`handleOperationException`](https://github.com/apache/kafka/blob/7e85d7d32e8cb4f3fa9b01fd4197ee43d64ba6d0/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java#L1084) in the new group coordinator. All the other group coordinator methods use it to translate errors and I was not sure if it made sense to translate network exceptions there. Also, a comment to explain the choice of error would be good. I'll add one here and in ReplicaManager too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16386: Convert NETWORK_EXCEPTIONs from KIP-890 transaction verification [kafka]
artemlivshits commented on code in PR #15559: URL: https://github.com/apache/kafka/pull/15559#discussion_r1534632389 ## core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala: ## @@ -921,7 +921,10 @@ private[group] class GroupCoordinator( ): Unit = { val (error, verificationGuard) = errorAndGuard if (error != Errors.NONE) { -val finalError = GroupMetadataManager.maybeConvertOffsetCommitError(error) +val finalError = error match { + case Errors.NETWORK_EXCEPTION => Errors.COORDINATOR_LOAD_IN_PROGRESS Review Comment: Should we add a comment why we're translating to this error and not the other? ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -831,6 +831,7 @@ class ReplicaManager(val config: KafkaConfig, error match { case Errors.INVALID_TXN_STATE => Some(error.exception("Partition was not added to the transaction")) case Errors.CONCURRENT_TRANSACTIONS | + Errors.NETWORK_EXCEPTION | Review Comment: Should we add a comment why we're translating to NotEnoughReplicasException error and not something else? I think there is a bit of thinking that went into this logic and the code doesn't reflect it. ## core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala: ## @@ -921,7 +921,10 @@ private[group] class GroupCoordinator( ): Unit = { val (error, verificationGuard) = errorAndGuard if (error != Errors.NONE) { -val finalError = GroupMetadataManager.maybeConvertOffsetCommitError(error) +val finalError = error match { + case Errors.NETWORK_EXCEPTION => Errors.COORDINATOR_LOAD_IN_PROGRESS Review Comment: I'm curious why we singled out this error explicitly rather than putting it into the function tasked specifically with error translation? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org