smjn commented on code in PR #18014:
URL: https://github.com/apache/kafka/pull/18014#discussion_r1867985680
##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java:
##########
@@ -240,9 +249,43 @@ public void startup(
log.info("Starting up.");
numPartitions = shareGroupTopicPartitionCount.getAsInt();
+ setupRecordPruning();
log.info("Startup complete.");
}
+ private void setupRecordPruning() {
+ timer.add(new TimerTask(config.shareCoordinatorTopicPruneIntervalMs())
{
+ @Override
+ public void run() {
+ for (int i = 0; i < numPartitions; i++) {
Review Comment:
No - the shard (state machine) does not maintain that mapping. The
information is maintained by the runtime which calls the appropriate shards,
based on the context. This information is not exposed outside. We need access
to
https://github.com/apache/kafka/blob/trunk/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java#L1877
to expose this information. Even then the shard cannot do this.
The runtime is encapsulated in the ShareCoordinatorService and only it can
issue calls to the runtime. The Shard only serves to provide data related to
partitions.
Using the loop approach - for a specific internal topic-partition only the
correct Shard will honour the request and the others will fail silently due to
NOT_COORDINATOR.
Flow is
```
ShareCoordinatorShard.callback
|
|
add task task with correct shard
ShareCoordinatorService ----> Runtime -----> ==================== ---->
EventProcessor
| QUEUE
|
obtain shard from TP in task
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]