Hello,

my application subscribes to some Change Data Capture events.
During the holidays, there were no new events for more than 3 days, which 
caused the PubSubApiClient to try to resubscribe indefinitely due to a corrupt 
replay id.
Events are retained for 3 days: 
https://developer.salesforce.com/docs/platform/pub-sub-api/references/methods/subscribe-rpc.html?q=replay#replaying-an-event-stream

There seems to be no error handling for this case and I can't add any custom 
error handling without patching the client.
I am using Camel v4.4.3 and Java 21.
I made a small test patch to fix this problem, but I don't know if this is the 
right way.

Patch:
diff --git 
a/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java
 
b/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java
index 8b3dbd2..efa8da5 100644
--- 
a/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java
+++ 
b/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java
@@ -57,6 +57,7 @@ public class PubSubApiClient extends ServiceSupport {
 
     public static final String PUBSUB_ERROR_AUTH_ERROR = 
"sfdc.platform.eventbus.grpc.service.auth.error";
     private static final String PUBSUB_ERROR_AUTH_REFRESH_INVALID = 
"sfdc.platform.eventbus.grpc.service.auth.refresh.invalid";
+    private static final String PUBSUB_ERROR_CORRUPTED_REPLAY_ID = 
"sfdc.platform.eventbus.grpc.subscription.fetch.replayid.corrupted";
 
     protected PubSubGrpc.PubSubStub asyncStub;
     protected PubSubGrpc.PubSubBlockingStub blockingStub;
@@ -317,6 +318,12 @@ public class PubSubApiClient extends ServiceSupport {
                             
session.attemptLoginUntilSuccessful(backoffIncrement, maxBackoff);
                             LOG.debug("logged in {}", consumer.getTopic());
                         }
+                        case PUBSUB_ERROR_CORRUPTED_REPLAY_ID -> {
+                            LOG.error("replay id is corrupt. try resubscribing 
with ReplayPreset.LATEST");
+                            // reset replay id and subscribe from latest
+                            replayId = null;
+                            initialReplayPreset = ReplayPreset.LATEST;
+                        }
                         default -> LOG.error("unexpected errorCode: {}", 
errorCode);
                     }
                 }


I'd appreciate any advice. Thanks :)


Example log: 
2025-01-20T15:03:48.814+01:00 ERROR 9360 --- [ault-executor-2] 
.c.PubSubApiClient$FetchResponseObserver : GRPC Exception

io.grpc.StatusRuntimeException: INVALID_ARGUMENT: The Replay ID validation 
failed. Ensure that the Replay ID is valid. rpcId: ****
        at io.grpc.Status.asRuntimeException(Status.java:533) 
~[grpc-api-1.61.1.jar:1.61.1]
        at 
io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:481)
 ~[grpc-stub-1.61.1.jar:1.61.1]
        at 
io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:489)
 ~[grpc-core-1.61.1.jar:1.61.1]
        at 
io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:453)
 ~[grpc-core-1.61.1.jar:1.61.1]
        at 
io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:486)
 ~[grpc-core-1.61.1.jar:1.61.1]
        at 
io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:574) 
~[grpc-core-1.61.1.jar:1.61.1]
        at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:72) 
~[grpc-core-1.61.1.jar:1.61.1]
        at 
io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:742)
 ~[grpc-core-1.61.1.jar:1.61.1]
        at 
io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:723)
 ~[grpc-core-1.61.1.jar:1.61.1]
        at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) 
~[grpc-core-1.61.1.jar:1.61.1]
        at 
io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) 
~[grpc-core-1.61.1.jar:1.61.1]
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
 ~[na:na]
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
 ~[na:na]
        at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]

2025-01-20T15:03:48.820+01:00 ERROR 9360 --- [ault-executor-2] 
.c.PubSubApiClient$FetchResponseObserver : Trailers:
2025-01-20T15:03:48.821+01:00 ERROR 9360 --- [ault-executor-2] 
.c.PubSubApiClient$FetchResponseObserver : Trailer: date, Value: Mon, 20 Jan 
2025 14:03:30 GMT
2025-01-20T15:03:48.821+01:00 ERROR 9360 --- [ault-executor-2] 
.c.PubSubApiClient$FetchResponseObserver : Trailer: content-type, Value: 
application/grpc
2025-01-20T15:03:48.821+01:00 ERROR 9360 --- [ault-executor-2] 
.c.PubSubApiClient$FetchResponseObserver : Trailer: rpc-id, Value: ****
2025-01-20T15:03:48.821+01:00 ERROR 9360 --- [ault-executor-2] 
.c.PubSubApiClient$FetchResponseObserver : Trailer: error-code, Value: 
sfdc.platform.eventbus.grpc.subscription.fetch.replayid.corrupted
2025-01-20T15:03:48.821+01:00 ERROR 9360 --- [ault-executor-2] 
.c.PubSubApiClient$FetchResponseObserver : Trailer: type, Value: Subscribe
2025-01-20T15:03:48.821+01:00 ERROR 9360 --- [ault-executor-2] 
.c.PubSubApiClient$FetchResponseObserver : unexpected errorCode: 
sfdc.platform.eventbus.grpc.subscription.fetch.replayid.corrupted
2025-01-20T15:03:48.823+01:00  INFO 9360 --- [ault-executor-2] 
o.a.c.c.s.i.client.PubSubApiClient       : Subscribing to topic: 
/data/AccountChangeEvent.
2025-01-20T15:03:48.823+01:00  INFO 9360 --- [ault-executor-2] 
o.a.c.c.s.i.client.PubSubApiClient       : Subscribe successful.
2025-01-20T15:03:48.916+01:00 ERROR 9360 --- [ault-executor-2] 
.c.PubSubApiClient$FetchResponseObserver : GRPC Exception

io.grpc.StatusRuntimeException: INVALID_ARGUMENT: The Replay ID validation 
failed. Ensure that the Replay ID is valid. rpcId: ****
        at io.grpc.Status.asRuntimeException(Status.java:533) 
~[grpc-api-1.61.1.jar:1.61.1]
        at 
io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:481)
 ~[grpc-stub-1.61.1.jar:1.61.1]
        at 
io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:574) 
~[grpc-core-1.61.1.jar:1.61.1]
        at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:72) 
~[grpc-core-1.61.1.jar:1.61.1]
        at 
io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:742)
 ~[grpc-core-1.61.1.jar:1.61.1]
        at 
io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:723)
 ~[grpc-core-1.61.1.jar:1.61.1]
        at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) 
~[grpc-core-1.61.1.jar:1.61.1]
        at 
io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) 
~[grpc-core-1.61.1.jar:1.61.1]
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
 ~[na:na]
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
 ~[na:na]
        at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]

2025-01-20T15:03:48.917+01:00 ERROR 9360 --- [ault-executor-2] 
.c.PubSubApiClient$FetchResponseObserver : Trailers:
2025-01-20T15:03:48.917+01:00 ERROR 9360 --- [ault-executor-2] 
.c.PubSubApiClient$FetchResponseObserver : Trailer: date, Value: Mon, 20 Jan 
2025 14:03:30 GMT
2025-01-20T15:03:48.917+01:00 ERROR 9360 --- [ault-executor-2] 
.c.PubSubApiClient$FetchResponseObserver : Trailer: content-type, Value: 
application/grpc
2025-01-20T15:03:48.917+01:00 ERROR 9360 --- [ault-executor-2] 
.c.PubSubApiClient$FetchResponseObserver : Trailer: rpc-id, Value: ****
2025-01-20T15:03:48.917+01:00 ERROR 9360 --- [ault-executor-2] 
.c.PubSubApiClient$FetchResponseObserver : Trailer: error-code, Value: 
sfdc.platform.eventbus.grpc.subscription.fetch.replayid.corrupted
2025-01-20T15:03:48.917+01:00 ERROR 9360 --- [ault-executor-2] 
.c.PubSubApiClient$FetchResponseObserver : Trailer: type, Value: Subscribe
2025-01-20T15:03:48.917+01:00 ERROR 9360 --- [ault-executor-2] 
.c.PubSubApiClient$FetchResponseObserver : unexpected errorCode: 
sfdc.platform.eventbus.grpc.subscription.fetch.replayid.corrupted
2025-01-20T15:03:48.917+01:00  INFO 9360 --- [ault-executor-2] 
o.a.c.c.s.i.client.PubSubApiClient       : Subscribing to topic: 
/data/AccountChangeEvent.
2025-01-20T15:03:48.918+01:00  INFO 9360 --- [ault-executor-2] 
o.a.c.c.s.i.client.PubSubApiClient       : Subscribe successful.

Sent with Proton Mail secure email.

Reply via email to