smjn commented on code in PR #20884:
URL: https://github.com/apache/kafka/pull/20884#discussion_r2526951229


##########
server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java:
##########
@@ -399,14 +401,24 @@ public void onComplete(ClientResponse response) {
         }
 
         // Visibility for testing
-        Optional<Errors> checkNetworkError(ClientResponse response, 
BiConsumer<Errors, Exception> errorConsumer) {
+        Optional<Errors> checkResponseError(ClientResponse response, 
BiConsumer<Errors, Exception> errorConsumer) {
             if (response.hasResponse()) {
                 return Optional.empty();
             }
 
-            log.debug("Response for RPC {} with key {} is invalid - {}.", 
name(), this.partitionKey, response);
-
-            if (response.wasDisconnected()) {
+            log.debug("Response for RPC {} with key {} is invalid - {}", 
name(), this.partitionKey, response);
+
+            if (response.authenticationException() != null) {
+                log.error("Authentication exception", 
response.authenticationException());
+                Errors error = 
Errors.forException(response.authenticationException());
+                errorConsumer.accept(error, new 
AuthenticationException(String.format("Server response for %s indicates 
authentication exception.", this.partitionKey)));
+                return Optional.of(error);

Review Comment:
   @apoorvmittal10 Thanks for the question.
   
   Specifically for SharePartition, no changes are needed are it does not 
explicitly handle error codes.
   
   For read RPC it checks:
   
https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/server/share/SharePartition.java#L457
   
   and for write:
   
https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/server/share/SharePartition.java#L2660
   
   so no special handling is needed. We do want to log these instances though 
as these are rare situations and we must be aware when they arise. That is 
already being done in here - 
https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/server/share/SharePartition.java#L2936
   
   Furthermore, these are general exceptions not really tied to any specific to 
the request. Other sender impls like `AddPartitionsToTxnManager` handle these 
similarly.
   
   These 2 exceptions are not included in the json spec either because of 
generality.



##########
server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java:
##########
@@ -399,14 +401,24 @@ public void onComplete(ClientResponse response) {
         }
 
         // Visibility for testing
-        Optional<Errors> checkNetworkError(ClientResponse response, 
BiConsumer<Errors, Exception> errorConsumer) {
+        Optional<Errors> checkResponseError(ClientResponse response, 
BiConsumer<Errors, Exception> errorConsumer) {
             if (response.hasResponse()) {
                 return Optional.empty();
             }
 
-            log.debug("Response for RPC {} with key {} is invalid - {}.", 
name(), this.partitionKey, response);
-
-            if (response.wasDisconnected()) {
+            log.debug("Response for RPC {} with key {} is invalid - {}", 
name(), this.partitionKey, response);
+
+            if (response.authenticationException() != null) {
+                log.error("Authentication exception", 
response.authenticationException());
+                Errors error = 
Errors.forException(response.authenticationException());
+                errorConsumer.accept(error, new 
AuthenticationException(String.format("Server response for %s indicates 
authentication exception.", this.partitionKey)));
+                return Optional.of(error);

Review Comment:
   @apoorvmittal10 Thanks for the question.
   
   Specifically for SharePartition, no changes are needed as it does not 
explicitly handle error codes.
   
   For read RPC it checks:
   
https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/server/share/SharePartition.java#L457
   
   and for write:
   
https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/server/share/SharePartition.java#L2660
   
   so no special handling is needed. We do want to log these instances though 
as these are rare situations and we must be aware when they arise. That is 
already being done in here - 
https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/server/share/SharePartition.java#L2936
   
   Furthermore, these are general exceptions not really tied to any specific to 
the request. Other sender impls like `AddPartitionsToTxnManager` handle these 
similarly.
   
   These 2 exceptions are not included in the json spec either because of 
generality.



##########
server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java:
##########
@@ -399,14 +401,24 @@ public void onComplete(ClientResponse response) {
         }
 
         // Visibility for testing
-        Optional<Errors> checkNetworkError(ClientResponse response, 
BiConsumer<Errors, Exception> errorConsumer) {
+        Optional<Errors> checkResponseError(ClientResponse response, 
BiConsumer<Errors, Exception> errorConsumer) {
             if (response.hasResponse()) {
                 return Optional.empty();
             }
 
-            log.debug("Response for RPC {} with key {} is invalid - {}.", 
name(), this.partitionKey, response);
-
-            if (response.wasDisconnected()) {
+            log.debug("Response for RPC {} with key {} is invalid - {}", 
name(), this.partitionKey, response);
+
+            if (response.authenticationException() != null) {
+                log.error("Authentication exception", 
response.authenticationException());
+                Errors error = 
Errors.forException(response.authenticationException());
+                errorConsumer.accept(error, new 
AuthenticationException(String.format("Server response for %s indicates 
authentication exception.", this.partitionKey)));
+                return Optional.of(error);

Review Comment:
   @apoorvmittal10 Thanks for the question.
   
   Specifically for SharePartition, no changes are needed as it does not 
explicitly handle error codes.
   
   For read RPC it checks:
   
https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/server/share/SharePartition.java#L457
   
   and for write:
   
https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/server/share/SharePartition.java#L2660
   
   so no special handling is needed. We do want to log these instances though 
as these are rare situations and we must be aware when they arise. That is 
already being done here - 
https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/server/share/SharePartition.java#L2936
   
   Furthermore, these are general exceptions not really tied to any specific to 
the request. Other sender impls like `AddPartitionsToTxnManager` handle these 
similarly.
   
   These 2 exceptions are not included in the json spec either because of 
generality.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to