smjn commented on code in PR #18014:
URL: https://github.com/apache/kafka/pull/18014#discussion_r1879319264
##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java:
##########
@@ -240,9 +251,96 @@ public void startup(
log.info("Starting up.");
numPartitions = shareGroupTopicPartitionCount.getAsInt();
+ Map<TopicPartition, Long> offsets = new ConcurrentHashMap<>();
+ setupRecordPruning(offsets);
log.info("Startup complete.");
}
+ private void setupRecordPruning(Map<TopicPartition, Long> offsets) {
+ log.info("Scheduling share state topic prune job.");
+ timer.add(new TimerTask(config.shareCoordinatorTopicPruneIntervalMs())
{
+ @Override
+ public void run() {
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+ runtime.activeTopicPartitions().forEach(tp ->
futures.add(performRecordPruning(tp, offsets)));
+
+ CompletableFuture.allOf(futures.toArray(new
CompletableFuture[]{}))
+ .whenComplete((res, exp) -> {
+ if (exp != null) {
+ log.error("Received error in share state topic
prune.", exp);
+ }
+ // Perpetual recursion, failure or not.
+ setupRecordPruning(offsets);
+ });
+ }
+ });
+ }
+
+ private CompletableFuture<Void> performRecordPruning(TopicPartition tp,
Map<TopicPartition, Long> offsets) {
+ // This future will always be completed normally, exception or not.
+ CompletableFuture<Void> fut = new CompletableFuture<>();
+ runtime.scheduleWriteOperation(
Review Comment:
No, the write operation used here is for the write consistency offered by
the method.
The `ShareCoordinatorShard.replay` calls `offsetsManager.updateState` with
various last written offset values. The `replay` method itself is called with
other write RPCs produce records. However, it does not mean the offset set in
replay has been committed.
Now, the coordinator enqueues the write operations in a queue and guarantees
that when the `scheduleWriteOperation` completes, the records it generated have
been replicated, even those which were written before it.
This was extensively discussed with the coordinator framework owner @dajac
and we arrived at this solution.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]