AndrewJSchofield commented on code in PR #18968:
URL: https://github.com/apache/kafka/pull/18968#discussion_r1965580105
##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java:
##########
@@ -317,16 +319,12 @@ public
CoordinatorResult<WriteShareGroupStateResponseData, CoordinatorRecord> wr
CoordinatorRecord record = generateShareStateRecord(partitionData,
key);
// build successful response if record is correctly created
- WriteShareGroupStateResponseData responseData = new
WriteShareGroupStateResponseData()
- .setResults(
- Collections.singletonList(
-
WriteShareGroupStateResponse.toResponseWriteStateResult(key.topicId(),
- Collections.singletonList(
-
WriteShareGroupStateResponse.toResponsePartitionResult(
- key.partition()
- ))
- ))
- );
+ WriteShareGroupStateResponseData responseData = new
WriteShareGroupStateResponseData().setResults(
Review Comment:
I think the previous style of
```
new WriteShareGroupStateResponse()
.setResults(
```
was more in line with the rest of the code you've written.
##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java:
##########
@@ -808,6 +811,106 @@ public
CompletableFuture<DeleteShareGroupStateResponseData> deleteState(RequestC
});
}
+ @Override
+ public CompletableFuture<InitializeShareGroupStateResponseData>
initializeState(RequestContext context, InitializeShareGroupStateRequestData
request) {
+ // Send an empty response if the coordinator is not active.
+ if (!isActive.get()) {
+ return CompletableFuture.completedFuture(
+ generateErrorInitStateResponse(
+ request,
+ Errors.COORDINATOR_NOT_AVAILABLE,
+ "Share coordinator is not available."
+ )
+ );
+ }
+
+ String groupId = request.groupId();
+ // Send an empty response if groupId is invalid.
+ if (isGroupIdEmpty(groupId)) {
+ log.error("Group id must be specified and non-empty: {}", request);
+ return CompletableFuture.completedFuture(
+ new InitializeShareGroupStateResponseData()
+ );
+ }
+
+ // Send an empty response if topic data is empty.
+ if (isEmpty(request.topics())) {
+ log.error("Topic Data is empty: {}", request);
+ return CompletableFuture.completedFuture(
+ new InitializeShareGroupStateResponseData()
+ );
+ }
+
+ // A map to store the futures for each topicId and partition.
+ Map<Uuid, Map<Integer,
CompletableFuture<InitializeShareGroupStateResponseData>>> futureMap = new
HashMap<>();
+
+ // The request received here could have multiple keys of structure
group:topic:partition. However,
+ // the initializeState method in ShareCoordinatorShard expects a
single key in the request. Hence, we will
+ // be looping over the keys below and constructing new
InitializeShareGroupStateRequestData objects to pass
+ // onto the shard method.
+
+ for (InitializeShareGroupStateRequestData.InitializeStateData
topicData : request.topics()) {
+ Uuid topicId = topicData.topicId();
+ for (InitializeShareGroupStateRequestData.PartitionData
partitionData : topicData.partitions()) {
+ SharePartitionKey coordinatorKey =
SharePartitionKey.getInstance(request.groupId(), topicId,
partitionData.partition());
+
+ InitializeShareGroupStateRequestData
requestForCurrentPartition = new InitializeShareGroupStateRequestData()
+ .setGroupId(groupId)
+ .setTopics(List.of(new
InitializeShareGroupStateRequestData.InitializeStateData()
+ .setTopicId(topicId)
+ .setPartitions(List.of(partitionData))));
+
+ CompletableFuture<InitializeShareGroupStateResponseData>
initializeFuture = runtime.scheduleWriteOperation(
+ "initialize-share-group-state",
+ topicPartitionFor(coordinatorKey),
+ Duration.ofMillis(config.shareCoordinatorWriteTimeoutMs()),
+ coordinator ->
coordinator.initializeState(requestForCurrentPartition)
+ ).exceptionally(deleteException ->
Review Comment:
nit: initializeException?
##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java:
##########
@@ -808,6 +811,106 @@ public
CompletableFuture<DeleteShareGroupStateResponseData> deleteState(RequestC
});
}
+ @Override
+ public CompletableFuture<InitializeShareGroupStateResponseData>
initializeState(RequestContext context, InitializeShareGroupStateRequestData
request) {
+ // Send an empty response if the coordinator is not active.
+ if (!isActive.get()) {
+ return CompletableFuture.completedFuture(
+ generateErrorInitStateResponse(
+ request,
+ Errors.COORDINATOR_NOT_AVAILABLE,
+ "Share coordinator is not available."
+ )
+ );
+ }
+
+ String groupId = request.groupId();
+ // Send an empty response if groupId is invalid.
+ if (isGroupIdEmpty(groupId)) {
+ log.error("Group id must be specified and non-empty: {}", request);
+ return CompletableFuture.completedFuture(
+ new InitializeShareGroupStateResponseData()
+ );
+ }
+
+ // Send an empty response if topic data is empty.
+ if (isEmpty(request.topics())) {
+ log.error("Topic Data is empty: {}", request);
+ return CompletableFuture.completedFuture(
+ new InitializeShareGroupStateResponseData()
+ );
+ }
+
+ // A map to store the futures for each topicId and partition.
+ Map<Uuid, Map<Integer,
CompletableFuture<InitializeShareGroupStateResponseData>>> futureMap = new
HashMap<>();
+
+ // The request received here could have multiple keys of structure
group:topic:partition. However,
+ // the initializeState method in ShareCoordinatorShard expects a
single key in the request. Hence, we will
+ // be looping over the keys below and constructing new
InitializeShareGroupStateRequestData objects to pass
+ // onto the shard method.
+
+ for (InitializeShareGroupStateRequestData.InitializeStateData
topicData : request.topics()) {
+ Uuid topicId = topicData.topicId();
+ for (InitializeShareGroupStateRequestData.PartitionData
partitionData : topicData.partitions()) {
+ SharePartitionKey coordinatorKey =
SharePartitionKey.getInstance(request.groupId(), topicId,
partitionData.partition());
+
+ InitializeShareGroupStateRequestData
requestForCurrentPartition = new InitializeShareGroupStateRequestData()
+ .setGroupId(groupId)
+ .setTopics(List.of(new
InitializeShareGroupStateRequestData.InitializeStateData()
+ .setTopicId(topicId)
+ .setPartitions(List.of(partitionData))));
+
+ CompletableFuture<InitializeShareGroupStateResponseData>
initializeFuture = runtime.scheduleWriteOperation(
+ "initialize-share-group-state",
+ topicPartitionFor(coordinatorKey),
+ Duration.ofMillis(config.shareCoordinatorWriteTimeoutMs()),
+ coordinator ->
coordinator.initializeState(requestForCurrentPartition)
+ ).exceptionally(deleteException ->
+ handleOperationException(
+ "initialize-share-group-state",
+ request,
+ deleteException,
+ (error, message) ->
InitializeShareGroupStateResponse.toErrorResponseData(
+ topicData.topicId(),
+ partitionData.partition(),
+ error,
+ "Unable to initialize share group state: " +
deleteException.getMessage()
+ ),
+ log
+ ));
+
+ futureMap.computeIfAbsent(topicId, k -> new HashMap<>())
+ .put(partitionData.partition(), initializeFuture);
+ }
+ }
+
+ // Combine all futures into a single CompletableFuture<Void>.
+ CompletableFuture<Void> combinedFuture =
CompletableFuture.allOf(futureMap.values().stream()
+ .flatMap(map ->
map.values().stream()).toArray(CompletableFuture[]::new));
+
+ // Transform the combined CompletableFuture<Void> into
CompletableFuture<DeleteShareGroupStateResponseData>.
+ return combinedFuture.thenApply(v -> {
+ List<InitializeShareGroupStateResponseData.InitializeStateResult>
initializeStateResult = new ArrayList<>(futureMap.size());
+ futureMap.forEach(
+ (topicId, topicEntry) -> {
+
List<InitializeShareGroupStateResponseData.PartitionResult> partitionResults =
new ArrayList<>(topicEntry.size());
+ topicEntry.forEach(
+ (partitionId, responseFuture) -> {
+ // ResponseFut would already be completed by now
since we have used
Review Comment:
The variable name was different in the code you copied here :)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]