smjn commented on code in PR #18929:
URL: https://github.com/apache/kafka/pull/18929#discussion_r2077338659
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -662,6 +665,34 @@ public void run() {
));
}
+ CompletableFuture<AlterShareGroupOffsetsResponseData> persisterInitialize(
+ InitializeShareGroupStateParameters request,
+ AlterShareGroupOffsetsResponseData response
+ ) {
+ return persister.initializeState(request)
+ .handle((result, exp) -> {
+ if (exp == null) {
+ return response;
+ }
+ // build new AlterShareGroupOffsetsResponseData for error
response
+ AlterShareGroupOffsetsResponseData data = new
AlterShareGroupOffsetsResponseData();
+ GroupTopicPartitionData<PartitionStateData> gtp =
request.groupTopicPartitionData();
+ log.error("Unable to initialize share group state for {}, {}
while altering share group offsets", gtp.groupId(), gtp.topicsData(), exp);
+ Errors error = Errors.forException(exp);
+ gtp.topicsData().forEach(topicData -> {
+ data.responses().add(new
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic()
+ .setTopicName(topicData.topicId().toString())
+ .setPartitions(new
ArrayList<>(topicData.partitions().size())));
+ topicData.partitions().forEach(partition ->
data.responses().get(data.responses().size() - 1)
+ .partitions().add(new
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(partition.partition())
+ .setErrorCode(error.code())));
Review Comment:
error message also
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]