smjn commented on code in PR #18014:
URL: https://github.com/apache/kafka/pull/18014#discussion_r1879319264
##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java:
##########
@@ -240,9 +251,96 @@ public void startup(
log.info("Starting up.");
numPartitions = shareGroupTopicPartitionCount.getAsInt();
+ Map<TopicPartition, Long> offsets = new ConcurrentHashMap<>();
+ setupRecordPruning(offsets);
log.info("Startup complete.");
}
+ private void setupRecordPruning(Map<TopicPartition, Long> offsets) {
+ log.info("Scheduling share state topic prune job.");
+ timer.add(new TimerTask(config.shareCoordinatorTopicPruneIntervalMs())
{
+ @Override
+ public void run() {
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+ runtime.activeTopicPartitions().forEach(tp ->
futures.add(performRecordPruning(tp, offsets)));
+
+ CompletableFuture.allOf(futures.toArray(new
CompletableFuture[]{}))
+ .whenComplete((res, exp) -> {
+ if (exp != null) {
+ log.error("Received error in share state topic
prune.", exp);
+ }
+ // Perpetual recursion, failure or not.
+ setupRecordPruning(offsets);
+ });
+ }
+ });
+ }
+
+ private CompletableFuture<Void> performRecordPruning(TopicPartition tp,
Map<TopicPartition, Long> offsets) {
+ // This future will always be completed normally, exception or not.
+ CompletableFuture<Void> fut = new CompletableFuture<>();
+ runtime.scheduleWriteOperation(
Review Comment:
No, the write operation used here is for the write consistency offered by
the method.
The `ShareCoordinatorShard.replay` calls `offsetsManager.updateState` with
various last written offset values. The `replay` method itself is called when
other write RPCs produce records. However, it does not mean the offset set in
replay has been committed.
Now, the coordinator enqueues the write operations in a queue and guarantees
that when the `scheduleWriteOperation` completes, the records it generated have
been replicated, even those which were written before it.
The framework however, gives no consistency guarantees between write and
read operations. Consider, a write op writing an offset into the offset
manager. We only know that this offset is written but not replicated. A
subsequent read could give us back the same offset but still there is no
guarantee that this offset has been replicated. It is only when the next write
operation completes, do we have a guarantee that the previous offset has been
committed.
This was extensively discussed with the coordinator framework owner @dajac
and we arrived at this solution.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]