junrao commented on code in PR #18014:
URL: https://github.com/apache/kafka/pull/18014#discussion_r1881135478


##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java:
##########
@@ -240,9 +253,82 @@ public void startup(
 
         log.info("Starting up.");
         numPartitions = shareGroupTopicPartitionCount.getAsInt();
+        setupRecordPruning();
         log.info("Startup complete.");
     }
 
+    private void setupRecordPruning() {
+        log.info("Scheduling share-group state topic prune job.");
+        timer.add(new TimerTask(config.shareCoordinatorTopicPruneIntervalMs()) 
{
+            @Override
+            public void run() {
+                List<CompletableFuture<Void>> futures = new ArrayList<>();
+                runtime.activeTopicPartitions().forEach(tp -> 
futures.add(performRecordPruning(tp)));
+
+                CompletableFuture.allOf(futures.toArray(new 
CompletableFuture[]{}))
+                    .whenComplete((res, exp) -> {
+                        if (exp != null) {
+                            log.error("Received error in share-group state 
topic prune.", exp);
+                        }
+                        // Perpetual recursion, failure or not.
+                        setupRecordPruning();
+                    });
+            }
+        });
+    }
+
+    private CompletableFuture<Void> performRecordPruning(TopicPartition tp) {
+        // This future will always be completed normally, exception or not.
+        CompletableFuture<Void> fut = new CompletableFuture<>();
+
+        runtime.scheduleWriteOperation(
+            "write-state-record-prune",
+            tp,
+            Duration.ofMillis(config.shareCoordinatorWriteTimeoutMs()),
+            ShareCoordinatorShard::lastRedundantOffset
+        ).whenComplete((result, exception) -> {
+            if (exception != null) {
+                log.debug("Last redundant offset for tp {} lookup threw an 
error.", tp, exception);
+                Errors error = Errors.forException(exception);
+                // These errors might result from partition metadata not loaded
+                // or shard re-election. Will cause unnecessary noise, hence 
not logging
+                if (!(error.equals(Errors.COORDINATOR_LOAD_IN_PROGRESS) || 
error.equals(Errors.NOT_COORDINATOR))) {
+                    log.error("Last redundant offset lookup for tp {} threw an 
error.", tp, exception);
+                    fut.completeExceptionally(exception);
+                    return;
+                }
+                fut.complete(null);
+                return;
+            }
+            if (result.isPresent()) {
+                Long off = result.get();
+                Long lastPrunedOffset = lastPrunedOffsets.get(tp);
+                if (lastPrunedOffset != null && lastPrunedOffset.longValue() 
== off) {
+                    log.debug("{} already pruned till offset {}", tp, off);
+                    fut.complete(null);
+                    return;
+                }
+
+                log.info("Pruning records in {} till offset {}.", tp, off);
+                writer.deleteRecords(tp, off)
+                    .whenComplete((res, exp) -> {
+                        if (exp != null) {
+                            log.debug("Exception while deleting records in {} 
till offset {}.", tp, off, exp);
+                            fut.completeExceptionally(exp);
+                            return;
+                        }
+                        fut.complete(null);
+                        // Best effort prevention of issuing duplicate delete 
calls.
+                        lastPrunedOffsets.put(tp, off);

Review Comment:
   This approach is ok, but it breaks the pattern in CoordinatorRuntime that 
all internal states are updated by CoordinatorRuntime threads since 
lastPrunedOffsets is now updated by the request io threads. We probably could 
follow the approach of how CoordinatorRuntime waits for a record to be 
replicated. It appends the record to local log and registers a 
`PartitionListener` for `onHighWatermarkUpdated()`. Once a new HWM is received, 
`CoordinatorRuntime.onHighWatermarkUpdated` enqueues a 
`CoordinatorInternalEvent`, the processing of which will trigger the update of 
the internal state. We could follow a similar approach for `LowWaterMark`. This 
can be done in a follow up jira.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to