Hello, my application subscribes to some Change Data Capture events. During the holidays, there were no new events for more than 3 days, which caused the PubSubApiClient to try to resubscribe indefinitely due to a corrupt replay id. Events are retained for 3 days: https://developer.salesforce.com/docs/platform/pub-sub-api/references/methods/subscribe-rpc.html?q=replay#replaying-an-event-stream
There seems to be no error handling for this case and I can't add any custom error handling without patching the client. I am using Camel v4.4.3 and Java 21. I made a small test patch to fix this problem, but I don't know if this is the right way. Patch: diff --git a/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java b/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java index 8b3dbd2..efa8da5 100644 --- a/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java +++ b/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java @@ -57,6 +57,7 @@ public class PubSubApiClient extends ServiceSupport { public static final String PUBSUB_ERROR_AUTH_ERROR = "sfdc.platform.eventbus.grpc.service.auth.error"; private static final String PUBSUB_ERROR_AUTH_REFRESH_INVALID = "sfdc.platform.eventbus.grpc.service.auth.refresh.invalid"; + private static final String PUBSUB_ERROR_CORRUPTED_REPLAY_ID = "sfdc.platform.eventbus.grpc.subscription.fetch.replayid.corrupted"; protected PubSubGrpc.PubSubStub asyncStub; protected PubSubGrpc.PubSubBlockingStub blockingStub; @@ -317,6 +318,12 @@ public class PubSubApiClient extends ServiceSupport { session.attemptLoginUntilSuccessful(backoffIncrement, maxBackoff); LOG.debug("logged in {}", consumer.getTopic()); } + case PUBSUB_ERROR_CORRUPTED_REPLAY_ID -> { + LOG.error("replay id is corrupt. try resubscribing with ReplayPreset.LATEST"); + // reset replay id and subscribe from latest + replayId = null; + initialReplayPreset = ReplayPreset.LATEST; + } default -> LOG.error("unexpected errorCode: {}", errorCode); } } I'd appreciate any advice. Thanks :) Example log: 2025-01-20T15:03:48.814+01:00 ERROR 9360 --- [ault-executor-2] .c.PubSubApiClient$FetchResponseObserver : GRPC Exception io.grpc.StatusRuntimeException: INVALID_ARGUMENT: The Replay ID validation failed. Ensure that the Replay ID is valid. rpcId: **** at io.grpc.Status.asRuntimeException(Status.java:533) ~[grpc-api-1.61.1.jar:1.61.1] at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:481) ~[grpc-stub-1.61.1.jar:1.61.1] at io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:489) ~[grpc-core-1.61.1.jar:1.61.1] at io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:453) ~[grpc-core-1.61.1.jar:1.61.1] at io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:486) ~[grpc-core-1.61.1.jar:1.61.1] at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:574) ~[grpc-core-1.61.1.jar:1.61.1] at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:72) ~[grpc-core-1.61.1.jar:1.61.1] at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:742) ~[grpc-core-1.61.1.jar:1.61.1] at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:723) ~[grpc-core-1.61.1.jar:1.61.1] at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) ~[grpc-core-1.61.1.jar:1.61.1] at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) ~[grpc-core-1.61.1.jar:1.61.1] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[na:na] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[na:na] at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na] 2025-01-20T15:03:48.820+01:00 ERROR 9360 --- [ault-executor-2] .c.PubSubApiClient$FetchResponseObserver : Trailers: 2025-01-20T15:03:48.821+01:00 ERROR 9360 --- [ault-executor-2] .c.PubSubApiClient$FetchResponseObserver : Trailer: date, Value: Mon, 20 Jan 2025 14:03:30 GMT 2025-01-20T15:03:48.821+01:00 ERROR 9360 --- [ault-executor-2] .c.PubSubApiClient$FetchResponseObserver : Trailer: content-type, Value: application/grpc 2025-01-20T15:03:48.821+01:00 ERROR 9360 --- [ault-executor-2] .c.PubSubApiClient$FetchResponseObserver : Trailer: rpc-id, Value: **** 2025-01-20T15:03:48.821+01:00 ERROR 9360 --- [ault-executor-2] .c.PubSubApiClient$FetchResponseObserver : Trailer: error-code, Value: sfdc.platform.eventbus.grpc.subscription.fetch.replayid.corrupted 2025-01-20T15:03:48.821+01:00 ERROR 9360 --- [ault-executor-2] .c.PubSubApiClient$FetchResponseObserver : Trailer: type, Value: Subscribe 2025-01-20T15:03:48.821+01:00 ERROR 9360 --- [ault-executor-2] .c.PubSubApiClient$FetchResponseObserver : unexpected errorCode: sfdc.platform.eventbus.grpc.subscription.fetch.replayid.corrupted 2025-01-20T15:03:48.823+01:00 INFO 9360 --- [ault-executor-2] o.a.c.c.s.i.client.PubSubApiClient : Subscribing to topic: /data/AccountChangeEvent. 2025-01-20T15:03:48.823+01:00 INFO 9360 --- [ault-executor-2] o.a.c.c.s.i.client.PubSubApiClient : Subscribe successful. 2025-01-20T15:03:48.916+01:00 ERROR 9360 --- [ault-executor-2] .c.PubSubApiClient$FetchResponseObserver : GRPC Exception io.grpc.StatusRuntimeException: INVALID_ARGUMENT: The Replay ID validation failed. Ensure that the Replay ID is valid. rpcId: **** at io.grpc.Status.asRuntimeException(Status.java:533) ~[grpc-api-1.61.1.jar:1.61.1] at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:481) ~[grpc-stub-1.61.1.jar:1.61.1] at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:574) ~[grpc-core-1.61.1.jar:1.61.1] at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:72) ~[grpc-core-1.61.1.jar:1.61.1] at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:742) ~[grpc-core-1.61.1.jar:1.61.1] at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:723) ~[grpc-core-1.61.1.jar:1.61.1] at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) ~[grpc-core-1.61.1.jar:1.61.1] at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) ~[grpc-core-1.61.1.jar:1.61.1] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[na:na] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[na:na] at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na] 2025-01-20T15:03:48.917+01:00 ERROR 9360 --- [ault-executor-2] .c.PubSubApiClient$FetchResponseObserver : Trailers: 2025-01-20T15:03:48.917+01:00 ERROR 9360 --- [ault-executor-2] .c.PubSubApiClient$FetchResponseObserver : Trailer: date, Value: Mon, 20 Jan 2025 14:03:30 GMT 2025-01-20T15:03:48.917+01:00 ERROR 9360 --- [ault-executor-2] .c.PubSubApiClient$FetchResponseObserver : Trailer: content-type, Value: application/grpc 2025-01-20T15:03:48.917+01:00 ERROR 9360 --- [ault-executor-2] .c.PubSubApiClient$FetchResponseObserver : Trailer: rpc-id, Value: **** 2025-01-20T15:03:48.917+01:00 ERROR 9360 --- [ault-executor-2] .c.PubSubApiClient$FetchResponseObserver : Trailer: error-code, Value: sfdc.platform.eventbus.grpc.subscription.fetch.replayid.corrupted 2025-01-20T15:03:48.917+01:00 ERROR 9360 --- [ault-executor-2] .c.PubSubApiClient$FetchResponseObserver : Trailer: type, Value: Subscribe 2025-01-20T15:03:48.917+01:00 ERROR 9360 --- [ault-executor-2] .c.PubSubApiClient$FetchResponseObserver : unexpected errorCode: sfdc.platform.eventbus.grpc.subscription.fetch.replayid.corrupted 2025-01-20T15:03:48.917+01:00 INFO 9360 --- [ault-executor-2] o.a.c.c.s.i.client.PubSubApiClient : Subscribing to topic: /data/AccountChangeEvent. 2025-01-20T15:03:48.918+01:00 INFO 9360 --- [ault-executor-2] o.a.c.c.s.i.client.PubSubApiClient : Subscribe successful. Sent with Proton Mail secure email.