lhotari commented on code in PR #25884:
URL: https://github.com/apache/pulsar/pull/25884#discussion_r3323883125


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/coordinator/v5/TransactionCoordinatorV5.java:
##########
@@ -70,18 +78,79 @@
  * {@code (segment, subscription)} pair. The fan-out is metadata-store writes 
(not RPCs) and
  * is bounded by the txn's participant count.
  *
- * <p>P5.1 scope: happy-path newTxn / endTxn. No timeout sweep, no GC sweep — 
those land in
- * P5.2.
+ * <p>Background sweeps: a single elected broker — the owner of partition 0 of
+ * {@code transaction_coordinator_assign} — periodically (a) aborts timed-out 
open transactions
+ * ({@link #sweepTimeouts}) and (b) garbage-collects finalized transactions 
whose retention has
+ * elapsed ({@link #sweepGc}). Concurrent sweeps from a stale owner are still 
safe — every state
+ * transition is a header CAS — so the single-sweeper election is an 
efficiency measure, not a
+ * correctness one.
  */
 @CustomLog
 public class TransactionCoordinatorV5 {
 
     private final PulsarService pulsar;
     private final TxnMetadataStore txnStore;
 
+    private final long timeoutSweepIntervalMs;
+    private final long gcSweepIntervalMs;
+    private final long gcRetentionMs;
+    private volatile ScheduledExecutorService sweepExecutor;
+    private volatile boolean closed;
+
     public TransactionCoordinatorV5(PulsarService pulsar) {
         this.pulsar = pulsar;
         this.txnStore = new TxnMetadataStore(pulsar.getLocalMetadataStore());
+        var config = pulsar.getConfiguration();
+        this.timeoutSweepIntervalMs = TimeUnit.SECONDS.toMillis(
+                
config.getTransactionCoordinatorScalableTopicsTimeoutSweepIntervalSeconds());
+        this.gcSweepIntervalMs = TimeUnit.SECONDS.toMillis(
+                
config.getTransactionCoordinatorScalableTopicsGcIntervalSeconds());
+        this.gcRetentionMs = TimeUnit.SECONDS.toMillis(
+                
config.getTransactionCoordinatorScalableTopicsGcRetentionSeconds());
+    }
+
+    // ---- Lifecycle --------------------------------------------------------
+
+    /**
+     * Start the periodic timeout / GC sweeps on a dedicated single-thread 
scheduler. Each tick is
+     * gated by {@link #ifElectedSweeper} so only the partition-0 owner does 
the scan. Idempotent —
+     * a second call is ignored.
+     */
+    public synchronized void start() {
+        if (closed || sweepExecutor != null) {
+            return;
+        }
+        sweepExecutor = Executors.newSingleThreadScheduledExecutor(
+                new DefaultThreadFactory("pulsar-txn-v5-sweep"));
+        sweepExecutor.scheduleWithFixedDelay(() -> runSweep("timeout", 
this::sweepTimeouts),
+                timeoutSweepIntervalMs, timeoutSweepIntervalMs, 
TimeUnit.MILLISECONDS);
+        sweepExecutor.scheduleWithFixedDelay(() -> runSweep("gc", 
this::sweepGc),
+                gcSweepIntervalMs, gcSweepIntervalMs, TimeUnit.MILLISECONDS);
+    }
+
+    /** Stop the sweeps. Idempotent. */
+    public synchronized void close() {
+        closed = true;
+        if (sweepExecutor != null) {
+            sweepExecutor.shutdownNow();
+            sweepExecutor = null;
+        }
+    }
+
+    /**
+     * Run one sweep cycle on the scheduler thread and block until it 
completes, so the
+     * fixed-delay scheduling never overlaps two cycles. Errors are logged and 
swallowed — the next
+     * tick retries.
+     */
+    private void runSweep(String name, Supplier<CompletableFuture<Void>> 
sweep) {
+        if (closed) {
+            return;
+        }
+        try {
+            sweep.get().get();
+        } catch (Throwable t) {
+            log.warn().attr("sweep", name).exception(t).log("v5 TC sweep cycle 
failed; will retry");
+        }

Review Comment:
   runSweep logs at WARN on shutdown interruption — runSweep close() sets 
closed=true then shutdownNow(), interrupting an in-flight sweep.get().get(). 
The resulting InterruptedException is caught as Throwable and logged "v5 TC 
sweep cycle failed; will retry" at WARN — spurious noise on every shutdown 
mid-sweep. Suggest checking closed (or Thread.interrupted()) in the catch and 
downgrading/suppressing in that case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to