apoorvmittal10 commented on code in PR #17980:
URL: https://github.com/apache/kafka/pull/17980#discussion_r1863751566


##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -294,17 +294,27 @@ public CompletableFuture<Map<TopicIdPartition, 
ShareAcknowledgeResponseData.Part
 
                 futures.put(topicIdPartition, future);
             } else {
-                futures.put(topicIdPartition, 
CompletableFuture.completedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION));
+                futures.put(topicIdPartition, 
CompletableFuture.completedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception()));
             }
         });
 
         CompletableFuture<Void> allFutures = CompletableFuture.allOf(
             futures.values().toArray(new CompletableFuture[0]));
         return allFutures.thenApply(v -> {
             Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData> 
result = new HashMap<>();
-            futures.forEach((topicIdPartition, future) -> 
result.put(topicIdPartition, new ShareAcknowledgeResponseData.PartitionData()
-                .setPartitionIndex(topicIdPartition.partition())
-                .setErrorCode(future.join().code())));
+            futures.forEach((topicIdPartition, future) -> {
+                ShareAcknowledgeResponseData.PartitionData partitionData = new 
ShareAcknowledgeResponseData.PartitionData()
+                        .setPartitionIndex(topicIdPartition.partition());

Review Comment:
   nit: indentation, should be 4 spaces prior.



##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -294,17 +294,27 @@ public CompletableFuture<Map<TopicIdPartition, 
ShareAcknowledgeResponseData.Part
 
                 futures.put(topicIdPartition, future);
             } else {
-                futures.put(topicIdPartition, 
CompletableFuture.completedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION));
+                futures.put(topicIdPartition, 
CompletableFuture.completedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception()));
             }
         });
 
         CompletableFuture<Void> allFutures = CompletableFuture.allOf(
             futures.values().toArray(new CompletableFuture[0]));
         return allFutures.thenApply(v -> {
             Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData> 
result = new HashMap<>();
-            futures.forEach((topicIdPartition, future) -> 
result.put(topicIdPartition, new ShareAcknowledgeResponseData.PartitionData()
-                .setPartitionIndex(topicIdPartition.partition())
-                .setErrorCode(future.join().code())));
+            futures.forEach((topicIdPartition, future) -> {
+                ShareAcknowledgeResponseData.PartitionData partitionData = new 
ShareAcknowledgeResponseData.PartitionData()
+                        .setPartitionIndex(topicIdPartition.partition());
+                Throwable t = future.join();
+                if (t == null) {
+                    partitionData.setErrorCode(Errors.NONE.code())
+                            .setErrorMessage(Errors.NONE.message());

Review Comment:
   Why to set a error message in case of null?



##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -294,17 +294,27 @@ public CompletableFuture<Map<TopicIdPartition, 
ShareAcknowledgeResponseData.Part
 
                 futures.put(topicIdPartition, future);
             } else {
-                futures.put(topicIdPartition, 
CompletableFuture.completedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION));
+                futures.put(topicIdPartition, 
CompletableFuture.completedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception()));
             }
         });
 
         CompletableFuture<Void> allFutures = CompletableFuture.allOf(
             futures.values().toArray(new CompletableFuture[0]));
         return allFutures.thenApply(v -> {
             Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData> 
result = new HashMap<>();
-            futures.forEach((topicIdPartition, future) -> 
result.put(topicIdPartition, new ShareAcknowledgeResponseData.PartitionData()
-                .setPartitionIndex(topicIdPartition.partition())
-                .setErrorCode(future.join().code())));
+            futures.forEach((topicIdPartition, future) -> {
+                ShareAcknowledgeResponseData.PartitionData partitionData = new 
ShareAcknowledgeResponseData.PartitionData()
+                        .setPartitionIndex(topicIdPartition.partition());
+                Throwable t = future.join();
+                if (t == null) {
+                    partitionData.setErrorCode(Errors.NONE.code())

Review Comment:
   Do we need to set the error code explicitly in case of `no error/t== null`, 
isn't by default the `short` value of error code should be `0`?



##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -294,17 +294,27 @@ public CompletableFuture<Map<TopicIdPartition, 
ShareAcknowledgeResponseData.Part
 
                 futures.put(topicIdPartition, future);
             } else {
-                futures.put(topicIdPartition, 
CompletableFuture.completedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION));
+                futures.put(topicIdPartition, 
CompletableFuture.completedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception()));
             }
         });
 
         CompletableFuture<Void> allFutures = CompletableFuture.allOf(
             futures.values().toArray(new CompletableFuture[0]));
         return allFutures.thenApply(v -> {
             Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData> 
result = new HashMap<>();
-            futures.forEach((topicIdPartition, future) -> 
result.put(topicIdPartition, new ShareAcknowledgeResponseData.PartitionData()
-                .setPartitionIndex(topicIdPartition.partition())
-                .setErrorCode(future.join().code())));
+            futures.forEach((topicIdPartition, future) -> {
+                ShareAcknowledgeResponseData.PartitionData partitionData = new 
ShareAcknowledgeResponseData.PartitionData()
+                        .setPartitionIndex(topicIdPartition.partition());
+                Throwable t = future.join();
+                if (t == null) {
+                    partitionData.setErrorCode(Errors.NONE.code())
+                            .setErrorMessage(Errors.NONE.message());
+                } else {
+                    partitionData.setErrorCode(Errors.forException(t).code())
+                            .setErrorMessage(t.getMessage());

Review Comment:
   nit: indentation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to