Hi Thanks for reporting. This seems like a rare situation but can happen again if there is no activity for that long period of time. You are welcome to create a JIRA and send your patch as a PR against the main branch. This makes it easier for us and others to help review.
I am not sure if you were able to manually test your "fix" in your system or not ? On Mon, Jan 20, 2025 at 3:49 PM j_b_s34 <j_b_...@proton.me.invalid> wrote: > Hello, > > my application subscribes to some Change Data Capture events. > During the holidays, there were no new events for more than 3 days, which > caused the PubSubApiClient to try to resubscribe indefinitely due to a > corrupt replay id. > Events are retained for 3 days: > https://developer.salesforce.com/docs/platform/pub-sub-api/references/methods/subscribe-rpc.html?q=replay#replaying-an-event-stream > > There seems to be no error handling for this case and I can't add any > custom error handling without patching the client. > I am using Camel v4.4.3 and Java 21. > I made a small test patch to fix this problem, but I don't know if this is > the right way. > > Patch: > diff --git > a/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java > b/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java > index 8b3dbd2..efa8da5 100644 > --- > a/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java > +++ > b/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java > @@ -57,6 +57,7 @@ public class PubSubApiClient extends ServiceSupport { > > public static final String PUBSUB_ERROR_AUTH_ERROR = > "sfdc.platform.eventbus.grpc.service.auth.error"; > private static final String PUBSUB_ERROR_AUTH_REFRESH_INVALID = > "sfdc.platform.eventbus.grpc.service.auth.refresh.invalid"; > + private static final String PUBSUB_ERROR_CORRUPTED_REPLAY_ID = > "sfdc.platform.eventbus.grpc.subscription.fetch.replayid.corrupted"; > > protected PubSubGrpc.PubSubStub asyncStub; > protected PubSubGrpc.PubSubBlockingStub blockingStub; > @@ -317,6 +318,12 @@ public class PubSubApiClient extends ServiceSupport { > > session.attemptLoginUntilSuccessful(backoffIncrement, maxBackoff); > LOG.debug("logged in {}", > consumer.getTopic()); > } > + case PUBSUB_ERROR_CORRUPTED_REPLAY_ID -> { > + LOG.error("replay id is corrupt. try > resubscribing with ReplayPreset.LATEST"); > + // reset replay id and subscribe from latest > + replayId = null; > + initialReplayPreset = ReplayPreset.LATEST; > + } > default -> LOG.error("unexpected errorCode: {}", > errorCode); > } > } > > > I'd appreciate any advice. Thanks :) > > > Example log: > 2025-01-20T15:03:48.814+01:00 ERROR 9360 --- [ault-executor-2] > .c.PubSubApiClient$FetchResponseObserver : GRPC Exception > > io.grpc.StatusRuntimeException: INVALID_ARGUMENT: The Replay ID validation > failed. Ensure that the Replay ID is valid. rpcId: **** > at io.grpc.Status.asRuntimeException(Status.java:533) > ~[grpc-api-1.61.1.jar:1.61.1] > at > io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:481) > ~[grpc-stub-1.61.1.jar:1.61.1] > at > io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:489) > ~[grpc-core-1.61.1.jar:1.61.1] > at > io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:453) > ~[grpc-core-1.61.1.jar:1.61.1] > at > io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:486) > ~[grpc-core-1.61.1.jar:1.61.1] > at > io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:574) > ~[grpc-core-1.61.1.jar:1.61.1] > at > io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:72) > ~[grpc-core-1.61.1.jar:1.61.1] > at > io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:742) > ~[grpc-core-1.61.1.jar:1.61.1] > at > io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:723) > ~[grpc-core-1.61.1.jar:1.61.1] > at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) > ~[grpc-core-1.61.1.jar:1.61.1] > at > io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) > ~[grpc-core-1.61.1.jar:1.61.1] > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) > ~[na:na] > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) > ~[na:na] > at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na] > > 2025-01-20T15:03:48.820+01:00 ERROR 9360 --- [ault-executor-2] > .c.PubSubApiClient$FetchResponseObserver : Trailers: > 2025-01-20T15:03:48.821+01:00 ERROR 9360 --- [ault-executor-2] > .c.PubSubApiClient$FetchResponseObserver : Trailer: date, Value: Mon, 20 > Jan 2025 14:03:30 GMT > 2025-01-20T15:03:48.821+01:00 ERROR 9360 --- [ault-executor-2] > .c.PubSubApiClient$FetchResponseObserver : Trailer: content-type, Value: > application/grpc > 2025-01-20T15:03:48.821+01:00 ERROR 9360 --- [ault-executor-2] > .c.PubSubApiClient$FetchResponseObserver : Trailer: rpc-id, Value: **** > 2025-01-20T15:03:48.821+01:00 ERROR 9360 --- [ault-executor-2] > .c.PubSubApiClient$FetchResponseObserver : Trailer: error-code, Value: > sfdc.platform.eventbus.grpc.subscription.fetch.replayid.corrupted > 2025-01-20T15:03:48.821+01:00 ERROR 9360 --- [ault-executor-2] > .c.PubSubApiClient$FetchResponseObserver : Trailer: type, Value: Subscribe > 2025-01-20T15:03:48.821+01:00 ERROR 9360 --- [ault-executor-2] > .c.PubSubApiClient$FetchResponseObserver : unexpected errorCode: > sfdc.platform.eventbus.grpc.subscription.fetch.replayid.corrupted > 2025-01-20T15:03:48.823+01:00 INFO 9360 --- [ault-executor-2] > o.a.c.c.s.i.client.PubSubApiClient : Subscribing to topic: > /data/AccountChangeEvent. > 2025-01-20T15:03:48.823+01:00 INFO 9360 --- [ault-executor-2] > o.a.c.c.s.i.client.PubSubApiClient : Subscribe successful. > 2025-01-20T15:03:48.916+01:00 ERROR 9360 --- [ault-executor-2] > .c.PubSubApiClient$FetchResponseObserver : GRPC Exception > > io.grpc.StatusRuntimeException: INVALID_ARGUMENT: The Replay ID validation > failed. Ensure that the Replay ID is valid. rpcId: **** > at io.grpc.Status.asRuntimeException(Status.java:533) > ~[grpc-api-1.61.1.jar:1.61.1] > at > io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:481) > ~[grpc-stub-1.61.1.jar:1.61.1] > at > io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:574) > ~[grpc-core-1.61.1.jar:1.61.1] > at > io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:72) > ~[grpc-core-1.61.1.jar:1.61.1] > at > io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:742) > ~[grpc-core-1.61.1.jar:1.61.1] > at > io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:723) > ~[grpc-core-1.61.1.jar:1.61.1] > at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) > ~[grpc-core-1.61.1.jar:1.61.1] > at > io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) > ~[grpc-core-1.61.1.jar:1.61.1] > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) > ~[na:na] > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) > ~[na:na] > at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na] > > 2025-01-20T15:03:48.917+01:00 ERROR 9360 --- [ault-executor-2] > .c.PubSubApiClient$FetchResponseObserver : Trailers: > 2025-01-20T15:03:48.917+01:00 ERROR 9360 --- [ault-executor-2] > .c.PubSubApiClient$FetchResponseObserver : Trailer: date, Value: Mon, 20 > Jan 2025 14:03:30 GMT > 2025-01-20T15:03:48.917+01:00 ERROR 9360 --- [ault-executor-2] > .c.PubSubApiClient$FetchResponseObserver : Trailer: content-type, Value: > application/grpc > 2025-01-20T15:03:48.917+01:00 ERROR 9360 --- [ault-executor-2] > .c.PubSubApiClient$FetchResponseObserver : Trailer: rpc-id, Value: **** > 2025-01-20T15:03:48.917+01:00 ERROR 9360 --- [ault-executor-2] > .c.PubSubApiClient$FetchResponseObserver : Trailer: error-code, Value: > sfdc.platform.eventbus.grpc.subscription.fetch.replayid.corrupted > 2025-01-20T15:03:48.917+01:00 ERROR 9360 --- [ault-executor-2] > .c.PubSubApiClient$FetchResponseObserver : Trailer: type, Value: Subscribe > 2025-01-20T15:03:48.917+01:00 ERROR 9360 --- [ault-executor-2] > .c.PubSubApiClient$FetchResponseObserver : unexpected errorCode: > sfdc.platform.eventbus.grpc.subscription.fetch.replayid.corrupted > 2025-01-20T15:03:48.917+01:00 INFO 9360 --- [ault-executor-2] > o.a.c.c.s.i.client.PubSubApiClient : Subscribing to topic: > /data/AccountChangeEvent. > 2025-01-20T15:03:48.918+01:00 INFO 9360 --- [ault-executor-2] > o.a.c.c.s.i.client.PubSubApiClient : Subscribe successful. > > Sent with Proton Mail secure email. > -- Claus Ibsen ----------------- @davsclaus Camel in Action 2: https://www.manning.com/ibsen2