smjn commented on code in PR #18014:
URL: https://github.com/apache/kafka/pull/18014#discussion_r1875674980
##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java:
##########
@@ -240,9 +249,87 @@ public void startup(
log.info("Starting up.");
numPartitions = shareGroupTopicPartitionCount.getAsInt();
+ setupRecordPruning();
log.info("Startup complete.");
}
+ // visibility for tests
+ void setupRecordPruning() {
+ log.info("Scheduling share state topic prune job.");
+ timer.add(new TimerTask(config.shareCoordinatorTopicPruneIntervalMs())
{
+ @Override
+ public void run() {
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+ runtime.activeTopicPartitions().forEach(tp ->
futures.add(performRecordPruning(tp)));
+
+ CompletableFuture.allOf(futures.toArray(new
CompletableFuture[]{}))
+ .whenComplete((res, exp) -> {
+ if (exp != null) {
+ log.error("Received error in share state topic
prune, stopping job.", exp);
+ return;
+ }
Review Comment:
@dajac
I think I misunderstood:
```
I also wonder whether we should re-reschedule in case of failure too.
thenRun would only reschedule if the future succeeds. It would make it more
robust to unexpected failures.
```
`performRecordPruning` which returns the futures never completes the futures
exceptionally. I thought you meant that we should not reschedule in case of
failures.
Will rectify.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]