lhotari commented on code in PR #25884:
URL: https://github.com/apache/pulsar/pull/25884#discussion_r3323883125
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/coordinator/v5/TransactionCoordinatorV5.java:
##########
@@ -70,18 +78,79 @@
* {@code (segment, subscription)} pair. The fan-out is metadata-store writes
(not RPCs) and
* is bounded by the txn's participant count.
*
- * <p>P5.1 scope: happy-path newTxn / endTxn. No timeout sweep, no GC sweep —
those land in
- * P5.2.
+ * <p>Background sweeps: a single elected broker — the owner of partition 0 of
+ * {@code transaction_coordinator_assign} — periodically (a) aborts timed-out
open transactions
+ * ({@link #sweepTimeouts}) and (b) garbage-collects finalized transactions
whose retention has
+ * elapsed ({@link #sweepGc}). Concurrent sweeps from a stale owner are still
safe — every state
+ * transition is a header CAS — so the single-sweeper election is an
efficiency measure, not a
+ * correctness one.
*/
@CustomLog
public class TransactionCoordinatorV5 {
private final PulsarService pulsar;
private final TxnMetadataStore txnStore;
+ private final long timeoutSweepIntervalMs;
+ private final long gcSweepIntervalMs;
+ private final long gcRetentionMs;
+ private volatile ScheduledExecutorService sweepExecutor;
+ private volatile boolean closed;
+
public TransactionCoordinatorV5(PulsarService pulsar) {
this.pulsar = pulsar;
this.txnStore = new TxnMetadataStore(pulsar.getLocalMetadataStore());
+ var config = pulsar.getConfiguration();
+ this.timeoutSweepIntervalMs = TimeUnit.SECONDS.toMillis(
+
config.getTransactionCoordinatorScalableTopicsTimeoutSweepIntervalSeconds());
+ this.gcSweepIntervalMs = TimeUnit.SECONDS.toMillis(
+
config.getTransactionCoordinatorScalableTopicsGcIntervalSeconds());
+ this.gcRetentionMs = TimeUnit.SECONDS.toMillis(
+
config.getTransactionCoordinatorScalableTopicsGcRetentionSeconds());
+ }
+
+ // ---- Lifecycle --------------------------------------------------------
+
+ /**
+ * Start the periodic timeout / GC sweeps on a dedicated single-thread
scheduler. Each tick is
+ * gated by {@link #ifElectedSweeper} so only the partition-0 owner does
the scan. Idempotent —
+ * a second call is ignored.
+ */
+ public synchronized void start() {
+ if (closed || sweepExecutor != null) {
+ return;
+ }
+ sweepExecutor = Executors.newSingleThreadScheduledExecutor(
+ new DefaultThreadFactory("pulsar-txn-v5-sweep"));
+ sweepExecutor.scheduleWithFixedDelay(() -> runSweep("timeout",
this::sweepTimeouts),
+ timeoutSweepIntervalMs, timeoutSweepIntervalMs,
TimeUnit.MILLISECONDS);
+ sweepExecutor.scheduleWithFixedDelay(() -> runSweep("gc",
this::sweepGc),
+ gcSweepIntervalMs, gcSweepIntervalMs, TimeUnit.MILLISECONDS);
+ }
+
+ /** Stop the sweeps. Idempotent. */
+ public synchronized void close() {
+ closed = true;
+ if (sweepExecutor != null) {
+ sweepExecutor.shutdownNow();
+ sweepExecutor = null;
+ }
+ }
+
+ /**
+ * Run one sweep cycle on the scheduler thread and block until it
completes, so the
+ * fixed-delay scheduling never overlaps two cycles. Errors are logged and
swallowed — the next
+ * tick retries.
+ */
+ private void runSweep(String name, Supplier<CompletableFuture<Void>>
sweep) {
+ if (closed) {
+ return;
+ }
+ try {
+ sweep.get().get();
+ } catch (Throwable t) {
+ log.warn().attr("sweep", name).exception(t).log("v5 TC sweep cycle
failed; will retry");
+ }
Review Comment:
runSweep logs at WARN on shutdown interruption — runSweep close() sets
closed=true then shutdownNow(), interrupting an in-flight sweep.get().get().
The resulting InterruptedException is caught as Throwable and logged "v5 TC
sweep cycle failed; will retry" at WARN — spurious noise on every shutdown
mid-sweep. Suggest checking closed (or Thread.interrupted()) in the catch and
downgrading/suppressing in that case.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]