darinspivey opened a new issue, #25279: URL: https://github.com/apache/pulsar/issues/25279
### Search before reporting - [x] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar. ### Read release policy - [x] I understand that [unsupported versions](https://pulsar.apache.org/contribute/release-policy/#supported-versions) don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker. ### User environment Version: 4.0.8 via Helm chart `pulsar-4.4.0` (but the bug is also present in current master) Related: #25119 (different bug, same area — phase 2 __compaction cursor) ### Issue Description ## Summary This bug was found because we update topic policies frequently in our pulsar cluster, and we noticed an accumulation of thousands of backlogged events in `__change_events`. Because compaction is triggered automatically when change events are added, we see this bug: Compaction phase 2 fails every time with a `ConnectException` because the broker disconnects the compaction consumer as part of processing its own seek request, causing `channelInactive` to fire on the client and kill the in-flight seek future before the broker sends the success response. Since `AbstractTwoPhaseCompactor.phaseTwoSeekThenLoop` has no retry logic for transient seek failures, every compaction attempt aborts. The `__compaction` subscription backlog grows without bound. ## Affected Topics Any topic with compaction enabled. Most visibly affects `__change_events` system topics because `SystemTopic.isCompactionEnabled()` hardcodes `true` and the effective `compactionThreshold` is `0` bytes, so compaction is triggered on any non-zero backlog. Frequent topic-level policy writes (each write appends a message to `__change_events`) cause compaction to be triggered and fail in a continuous loop. ## Expected Behavior Compaction phase 2 seeks the `__compaction` reader back to the start of the compacted range and reads forward, producing a new compacted ledger. ## Actual Behavior Compaction fails on every attempt and the `__compaction` subscription backlog grows indefinitely. ### Error messages ```text [BookKeeperClientWorker-OrderedExecutor-0-0] INFO o.a.p.compaction.AbstractTwoPhaseCompactor - Commencing phase two of compaction for persistent://my-tenant/my-namespace/__change_events-partition-2, from 1218818:0:2:-1 to 1330087:5:2:-1, compacting 12 keys to ledger 1341961 [BookKeeperClientWorker-OrderedExecutor-0-0] INFO o.a.p.client.impl.ConsumerImpl - [persistent://my-tenant/my-namespace/__change_events-partition-2][__compaction] Seeking subscription to the message 1218818:0:2:-1 [pulsar-io-3-4] INFO o.a.p.broker.service.Consumer - Disconnecting consumer: Consumer{subscription=PulsarCompactorSubscription{topic=persistent://my-tenant/my-namespace/__change_events-partition-2, name=__compaction}, consumerId=27241, consumerName=6QQBD, address=[id: 0x6da7169d, L:/10.10.x.x:6650 - R:/10.10.x.x:53634] [SR:10.10.x.x, state:Connected]} [pulsar-io-3-4] INFO o.a.p.broker.service.AbstractDispatcherSingleActiveConsumer - Removing consumer Consumer{subscription=PulsarCompactorSubscription{topic=persistent://my-tenant/my-namespace/__change_events-partition-2, name=__compaction}, ...} [pulsar-io-3-4] INFO o.a.p.broker.service.persistent.PersistentSubscription - [persistent://my-tenant/my-namespace/__change_events-partition-2][__compaction] Successfully disconnected consumers from subscription, proceeding with cursor reset [BookKeeperClientWorker-OrderedExecutor-0-0] INFO o.a.b.mledger.impl.ManagedCursorImpl - [my-tenant/my-namespace/persistent/__change_events-partition-2] Initiate reset readPosition from 1330087:6 to 1218818:0 (ackSet is null) on cursor __compaction [pulsar-io-3-6] INFO o.a.p.client.impl.ConnectionHandler - [persistent://my-tenant/my-namespace/__change_events-partition-2] [__compaction] Closed connection [id: 0xb3fcdb76, L:/10.10.x.x:53634 ! R:/10.10.x.x:6650] -- Will try again in 0.1 s, hostUrl: null [pulsar-io-3-6] ERROR o.a.p.client.impl.ConsumerImpl - [persistent://my-tenant/my-namespace/__change_events-partition-2][__compaction] Failed to reset subscription: Disconnected from server at /10.10.x.x:6650 [broker-client-shared-internal-executor-5-1] WARN o.a.p.broker.service.persistent.PersistentTopic - [persistent://my-tenant/my-namespace/__change_events-partition-2] Compaction failure. java.util.concurrent.CompletionException: org.apache.pulsar.client.api.PulsarClientException$ConnectException: Failed to seek the subscription __compaction of the topic persistent://my-tenant/my-namespace/__change_events-partition-2 to the message 1218818:0:2:-1 at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) ~[?:?] at java.base/java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) ~[?:?] at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown Source) ~[?:?] at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[?:?] at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown Source) ~[?:?] at org.apache.pulsar.client.impl.ConsumerImpl.failSeek(ConsumerImpl.java:2631) ~[org.apache.pulsar-pulsar-client-original-4.0.8.jar:4.0.8] at org.apache.pulsar.client.impl.ConsumerImpl.lambda$seekAsyncInternal$60(ConsumerImpl.java:2603) ~[org.apache.pulsar-pulsar-client-original-4.0.8.jar:4.0.8] at org.apache.pulsar.client.impl.ClientCnx.channelInactive(ClientCnx.java:342) ~[org.apache.pulsar-pulsar-client-original-4.0.8.jar:4.0.8] Caused by: org.apache.pulsar.client.api.PulsarClientException$ConnectException: Failed to seek the subscription __compaction of the topic persistent://my-tenant/my-namespace/__change_events-partition-2 to the message 1218818:0:2:-1 Disconnected from server at /10.10.x.x:6650 [BookKeeperClientWorker-OrderedExecutor-0-0] INFO o.a.b.mledger.impl.ManagedCursorImpl - [my-tenant/my-namespace/persistent/__change_events-partition-2] reset readPosition to 1218818:0 (ackSet is null) before current read readPosition 1330087:6 on cursor __compaction [BookKeeperClientWorker-OrderedExecutor-0-0] INFO o.a.p.broker.service.ServerCnx - [/10.10.x.x:53634] [persistent://my-tenant/my-namespace/__change_events-partition-2][__compaction] Reset subscription to message id 1218818:0 (ackSet is null) ``` ### Reproducing the issue 1. Create a namespace with frequent topic-level policy writes to cause `__change_events` backlog to grow. For example, calling`setMaxUnackedMessagesPerConsumer` per topic on every consumer restart. This is a real-world use case that triggered these findings. 2. Observe compaction triggering automatically (threshold = 0 bytes for system topics) or trigger manually: `pulsar-admin topics compact persistent://my-tenant/my-namespace/__change_events` 3. Observe compaction failure in broker logs ### Additional information ## Root Cause The sequence in `PersistentSubscription.resetCursorInternal` (`PersistentSubscription.java:916`) is: 1. disconnect active consumers via `dispatcher.disconnectActiveConsumers(true)` 2. reset the managed cursor position via `ManagedCursorImpl.internalResetCursor` 3. send `commandSender.sendSuccessResponse(requestId)` back in `ServerCnx.handleSeek`'s `thenRun`. Step 1 closes the consumer's Netty channel server-side, which fires `channelInactive` on the client (`ClientCnx.java:328`). `channelInactive` immediately fails all entries in`pendingRequests` with `ConnectException` (`ClientCnx.java:341–344`), including the seek request that triggered the reset. By the time the server sends the success response in step 3, the client has already failed the seek future and aborted compaction. The disconnect in step 1 is necessary for correctness — the dispatcher may have messages in flight to the consumer that would be inconsistent with the new cursor position — so the server-side ordering cannot simply be reversed. The fix belongs on the compactor side. `AbstractTwoPhaseCompactor.phaseTwoSeekThenLoop` has no retry logic — a failed `seekAsync` propagates directly to `whenComplete` and aborts: ```java reader.seekAsync(from).thenCompose((v) -> { // phase two loop }).whenComplete((res, exception) -> { if (exception != null) { deleteLedger(bk, ledger).whenComplete((res2, exception2) -> { promise.completeExceptionally(exception); // no retry }); } }); ``` ## Potential Fix The seek is idempotent and the ConnectException is always transient in this context — the broker disconnects the consumer as part of processing the seek, so by the time the consumer reconnects the cursor is already at the correct position. Retrying seekAsync with a short delay allows the consumer to reconnect, at which point the seek succeeds immediately. But I leave that up to the implementors. ### Are you willing to submit a PR? - [ ] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
