Hello,
my application subscribes to some Change Data Capture events.
During the holidays, there were no new events for more than 3 days, which
caused the PubSubApiClient to try to resubscribe indefinitely due to a corrupt
replay id.
Events are retained for 3 days:
https://developer.salesforce.com/docs/platform/pub-sub-api/references/methods/subscribe-rpc.html?q=replay#replaying-an-event-stream
There seems to be no error handling for this case and I can't add any custom
error handling without patching the client.
I am using Camel v4.4.3 and Java 21.
I made a small test patch to fix this problem, but I don't know if this is the
right way.
Patch:
diff --git
a/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java
b/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java
index 8b3dbd2..efa8da5 100644
---
a/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java
+++
b/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java
@@ -57,6 +57,7 @@ public class PubSubApiClient extends ServiceSupport {
public static final String PUBSUB_ERROR_AUTH_ERROR =
"sfdc.platform.eventbus.grpc.service.auth.error";
private static final String PUBSUB_ERROR_AUTH_REFRESH_INVALID =
"sfdc.platform.eventbus.grpc.service.auth.refresh.invalid";
+ private static final String PUBSUB_ERROR_CORRUPTED_REPLAY_ID =
"sfdc.platform.eventbus.grpc.subscription.fetch.replayid.corrupted";
protected PubSubGrpc.PubSubStub asyncStub;
protected PubSubGrpc.PubSubBlockingStub blockingStub;
@@ -317,6 +318,12 @@ public class PubSubApiClient extends ServiceSupport {
session.attemptLoginUntilSuccessful(backoffIncrement, maxBackoff);
LOG.debug("logged in {}", consumer.getTopic());
}
+ case PUBSUB_ERROR_CORRUPTED_REPLAY_ID -> {
+ LOG.error("replay id is corrupt. try resubscribing
with ReplayPreset.LATEST");
+ // reset replay id and subscribe from latest
+ replayId = null;
+ initialReplayPreset = ReplayPreset.LATEST;
+ }
default -> LOG.error("unexpected errorCode: {}",
errorCode);
}
}
I'd appreciate any advice. Thanks :)
Example log:
2025-01-20T15:03:48.814+01:00 ERROR 9360 --- [ault-executor-2]
.c.PubSubApiClient$FetchResponseObserver : GRPC Exception
io.grpc.StatusRuntimeException: INVALID_ARGUMENT: The Replay ID validation
failed. Ensure that the Replay ID is valid. rpcId: ****
at io.grpc.Status.asRuntimeException(Status.java:533)
~[grpc-api-1.61.1.jar:1.61.1]
at
io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:481)
~[grpc-stub-1.61.1.jar:1.61.1]
at
io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:489)
~[grpc-core-1.61.1.jar:1.61.1]
at
io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:453)
~[grpc-core-1.61.1.jar:1.61.1]
at
io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:486)
~[grpc-core-1.61.1.jar:1.61.1]
at
io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:574)
~[grpc-core-1.61.1.jar:1.61.1]
at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:72)
~[grpc-core-1.61.1.jar:1.61.1]
at
io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:742)
~[grpc-core-1.61.1.jar:1.61.1]
at
io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:723)
~[grpc-core-1.61.1.jar:1.61.1]
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
~[grpc-core-1.61.1.jar:1.61.1]
at
io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
~[grpc-core-1.61.1.jar:1.61.1]
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
~[na:na]
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
~[na:na]
at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]
2025-01-20T15:03:48.820+01:00 ERROR 9360 --- [ault-executor-2]
.c.PubSubApiClient$FetchResponseObserver : Trailers:
2025-01-20T15:03:48.821+01:00 ERROR 9360 --- [ault-executor-2]
.c.PubSubApiClient$FetchResponseObserver : Trailer: date, Value: Mon, 20 Jan
2025 14:03:30 GMT
2025-01-20T15:03:48.821+01:00 ERROR 9360 --- [ault-executor-2]
.c.PubSubApiClient$FetchResponseObserver : Trailer: content-type, Value:
application/grpc
2025-01-20T15:03:48.821+01:00 ERROR 9360 --- [ault-executor-2]
.c.PubSubApiClient$FetchResponseObserver : Trailer: rpc-id, Value: ****
2025-01-20T15:03:48.821+01:00 ERROR 9360 --- [ault-executor-2]
.c.PubSubApiClient$FetchResponseObserver : Trailer: error-code, Value:
sfdc.platform.eventbus.grpc.subscription.fetch.replayid.corrupted
2025-01-20T15:03:48.821+01:00 ERROR 9360 --- [ault-executor-2]
.c.PubSubApiClient$FetchResponseObserver : Trailer: type, Value: Subscribe
2025-01-20T15:03:48.821+01:00 ERROR 9360 --- [ault-executor-2]
.c.PubSubApiClient$FetchResponseObserver : unexpected errorCode:
sfdc.platform.eventbus.grpc.subscription.fetch.replayid.corrupted
2025-01-20T15:03:48.823+01:00 INFO 9360 --- [ault-executor-2]
o.a.c.c.s.i.client.PubSubApiClient : Subscribing to topic:
/data/AccountChangeEvent.
2025-01-20T15:03:48.823+01:00 INFO 9360 --- [ault-executor-2]
o.a.c.c.s.i.client.PubSubApiClient : Subscribe successful.
2025-01-20T15:03:48.916+01:00 ERROR 9360 --- [ault-executor-2]
.c.PubSubApiClient$FetchResponseObserver : GRPC Exception
io.grpc.StatusRuntimeException: INVALID_ARGUMENT: The Replay ID validation
failed. Ensure that the Replay ID is valid. rpcId: ****
at io.grpc.Status.asRuntimeException(Status.java:533)
~[grpc-api-1.61.1.jar:1.61.1]
at
io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:481)
~[grpc-stub-1.61.1.jar:1.61.1]
at
io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:574)
~[grpc-core-1.61.1.jar:1.61.1]
at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:72)
~[grpc-core-1.61.1.jar:1.61.1]
at
io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:742)
~[grpc-core-1.61.1.jar:1.61.1]
at
io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:723)
~[grpc-core-1.61.1.jar:1.61.1]
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
~[grpc-core-1.61.1.jar:1.61.1]
at
io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
~[grpc-core-1.61.1.jar:1.61.1]
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
~[na:na]
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
~[na:na]
at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]
2025-01-20T15:03:48.917+01:00 ERROR 9360 --- [ault-executor-2]
.c.PubSubApiClient$FetchResponseObserver : Trailers:
2025-01-20T15:03:48.917+01:00 ERROR 9360 --- [ault-executor-2]
.c.PubSubApiClient$FetchResponseObserver : Trailer: date, Value: Mon, 20 Jan
2025 14:03:30 GMT
2025-01-20T15:03:48.917+01:00 ERROR 9360 --- [ault-executor-2]
.c.PubSubApiClient$FetchResponseObserver : Trailer: content-type, Value:
application/grpc
2025-01-20T15:03:48.917+01:00 ERROR 9360 --- [ault-executor-2]
.c.PubSubApiClient$FetchResponseObserver : Trailer: rpc-id, Value: ****
2025-01-20T15:03:48.917+01:00 ERROR 9360 --- [ault-executor-2]
.c.PubSubApiClient$FetchResponseObserver : Trailer: error-code, Value:
sfdc.platform.eventbus.grpc.subscription.fetch.replayid.corrupted
2025-01-20T15:03:48.917+01:00 ERROR 9360 --- [ault-executor-2]
.c.PubSubApiClient$FetchResponseObserver : Trailer: type, Value: Subscribe
2025-01-20T15:03:48.917+01:00 ERROR 9360 --- [ault-executor-2]
.c.PubSubApiClient$FetchResponseObserver : unexpected errorCode:
sfdc.platform.eventbus.grpc.subscription.fetch.replayid.corrupted
2025-01-20T15:03:48.917+01:00 INFO 9360 --- [ault-executor-2]
o.a.c.c.s.i.client.PubSubApiClient : Subscribing to topic:
/data/AccountChangeEvent.
2025-01-20T15:03:48.918+01:00 INFO 9360 --- [ault-executor-2]
o.a.c.c.s.i.client.PubSubApiClient : Subscribe successful.
Sent with Proton Mail secure email.