Hi

Thanks for reporting. This seems like a rare situation but can happen again
if there is no activity for that long period of time.
You are welcome to create a JIRA and send your patch as a PR against the
main branch. This makes it easier for us and others to help review.

I am not sure if you were able to manually test your "fix" in your system
or not ?


On Mon, Jan 20, 2025 at 3:49 PM j_b_s34 <j_b_...@proton.me.invalid> wrote:

> Hello,
>
> my application subscribes to some Change Data Capture events.
> During the holidays, there were no new events for more than 3 days, which
> caused the PubSubApiClient to try to resubscribe indefinitely due to a
> corrupt replay id.
> Events are retained for 3 days:
> https://developer.salesforce.com/docs/platform/pub-sub-api/references/methods/subscribe-rpc.html?q=replay#replaying-an-event-stream
>
> There seems to be no error handling for this case and I can't add any
> custom error handling without patching the client.
> I am using Camel v4.4.3 and Java 21.
> I made a small test patch to fix this problem, but I don't know if this is
> the right way.
>
> Patch:
> diff --git
> a/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java
> b/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java
> index 8b3dbd2..efa8da5 100644
> ---
> a/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java
> +++
> b/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java
> @@ -57,6 +57,7 @@ public class PubSubApiClient extends ServiceSupport {
>
>      public static final String PUBSUB_ERROR_AUTH_ERROR =
> "sfdc.platform.eventbus.grpc.service.auth.error";
>      private static final String PUBSUB_ERROR_AUTH_REFRESH_INVALID =
> "sfdc.platform.eventbus.grpc.service.auth.refresh.invalid";
> +    private static final String PUBSUB_ERROR_CORRUPTED_REPLAY_ID =
> "sfdc.platform.eventbus.grpc.subscription.fetch.replayid.corrupted";
>
>      protected PubSubGrpc.PubSubStub asyncStub;
>      protected PubSubGrpc.PubSubBlockingStub blockingStub;
> @@ -317,6 +318,12 @@ public class PubSubApiClient extends ServiceSupport {
>
>  session.attemptLoginUntilSuccessful(backoffIncrement, maxBackoff);
>                              LOG.debug("logged in {}",
> consumer.getTopic());
>                          }
> +                        case PUBSUB_ERROR_CORRUPTED_REPLAY_ID -> {
> +                            LOG.error("replay id is corrupt. try
> resubscribing with ReplayPreset.LATEST");
> +                            // reset replay id and subscribe from latest
> +                            replayId = null;
> +                            initialReplayPreset = ReplayPreset.LATEST;
> +                        }
>                          default -> LOG.error("unexpected errorCode: {}",
> errorCode);
>                      }
>                  }
>
>
> I'd appreciate any advice. Thanks :)
>
>
> Example log:
> 2025-01-20T15:03:48.814+01:00 ERROR 9360 --- [ault-executor-2]
> .c.PubSubApiClient$FetchResponseObserver : GRPC Exception
>
> io.grpc.StatusRuntimeException: INVALID_ARGUMENT: The Replay ID validation
> failed. Ensure that the Replay ID is valid. rpcId: ****
>         at io.grpc.Status.asRuntimeException(Status.java:533)
> ~[grpc-api-1.61.1.jar:1.61.1]
>         at
> io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:481)
> ~[grpc-stub-1.61.1.jar:1.61.1]
>         at
> io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:489)
> ~[grpc-core-1.61.1.jar:1.61.1]
>         at
> io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:453)
> ~[grpc-core-1.61.1.jar:1.61.1]
>         at
> io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:486)
> ~[grpc-core-1.61.1.jar:1.61.1]
>         at
> io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:574)
> ~[grpc-core-1.61.1.jar:1.61.1]
>         at
> io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:72)
> ~[grpc-core-1.61.1.jar:1.61.1]
>         at
> io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:742)
> ~[grpc-core-1.61.1.jar:1.61.1]
>         at
> io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:723)
> ~[grpc-core-1.61.1.jar:1.61.1]
>         at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
> ~[grpc-core-1.61.1.jar:1.61.1]
>         at
> io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
> ~[grpc-core-1.61.1.jar:1.61.1]
>         at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
> ~[na:na]
>         at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
> ~[na:na]
>         at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]
>
> 2025-01-20T15:03:48.820+01:00 ERROR 9360 --- [ault-executor-2]
> .c.PubSubApiClient$FetchResponseObserver : Trailers:
> 2025-01-20T15:03:48.821+01:00 ERROR 9360 --- [ault-executor-2]
> .c.PubSubApiClient$FetchResponseObserver : Trailer: date, Value: Mon, 20
> Jan 2025 14:03:30 GMT
> 2025-01-20T15:03:48.821+01:00 ERROR 9360 --- [ault-executor-2]
> .c.PubSubApiClient$FetchResponseObserver : Trailer: content-type, Value:
> application/grpc
> 2025-01-20T15:03:48.821+01:00 ERROR 9360 --- [ault-executor-2]
> .c.PubSubApiClient$FetchResponseObserver : Trailer: rpc-id, Value: ****
> 2025-01-20T15:03:48.821+01:00 ERROR 9360 --- [ault-executor-2]
> .c.PubSubApiClient$FetchResponseObserver : Trailer: error-code, Value:
> sfdc.platform.eventbus.grpc.subscription.fetch.replayid.corrupted
> 2025-01-20T15:03:48.821+01:00 ERROR 9360 --- [ault-executor-2]
> .c.PubSubApiClient$FetchResponseObserver : Trailer: type, Value: Subscribe
> 2025-01-20T15:03:48.821+01:00 ERROR 9360 --- [ault-executor-2]
> .c.PubSubApiClient$FetchResponseObserver : unexpected errorCode:
> sfdc.platform.eventbus.grpc.subscription.fetch.replayid.corrupted
> 2025-01-20T15:03:48.823+01:00  INFO 9360 --- [ault-executor-2]
> o.a.c.c.s.i.client.PubSubApiClient       : Subscribing to topic:
> /data/AccountChangeEvent.
> 2025-01-20T15:03:48.823+01:00  INFO 9360 --- [ault-executor-2]
> o.a.c.c.s.i.client.PubSubApiClient       : Subscribe successful.
> 2025-01-20T15:03:48.916+01:00 ERROR 9360 --- [ault-executor-2]
> .c.PubSubApiClient$FetchResponseObserver : GRPC Exception
>
> io.grpc.StatusRuntimeException: INVALID_ARGUMENT: The Replay ID validation
> failed. Ensure that the Replay ID is valid. rpcId: ****
>         at io.grpc.Status.asRuntimeException(Status.java:533)
> ~[grpc-api-1.61.1.jar:1.61.1]
>         at
> io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:481)
> ~[grpc-stub-1.61.1.jar:1.61.1]
>         at
> io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:574)
> ~[grpc-core-1.61.1.jar:1.61.1]
>         at
> io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:72)
> ~[grpc-core-1.61.1.jar:1.61.1]
>         at
> io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:742)
> ~[grpc-core-1.61.1.jar:1.61.1]
>         at
> io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:723)
> ~[grpc-core-1.61.1.jar:1.61.1]
>         at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
> ~[grpc-core-1.61.1.jar:1.61.1]
>         at
> io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
> ~[grpc-core-1.61.1.jar:1.61.1]
>         at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
> ~[na:na]
>         at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
> ~[na:na]
>         at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]
>
> 2025-01-20T15:03:48.917+01:00 ERROR 9360 --- [ault-executor-2]
> .c.PubSubApiClient$FetchResponseObserver : Trailers:
> 2025-01-20T15:03:48.917+01:00 ERROR 9360 --- [ault-executor-2]
> .c.PubSubApiClient$FetchResponseObserver : Trailer: date, Value: Mon, 20
> Jan 2025 14:03:30 GMT
> 2025-01-20T15:03:48.917+01:00 ERROR 9360 --- [ault-executor-2]
> .c.PubSubApiClient$FetchResponseObserver : Trailer: content-type, Value:
> application/grpc
> 2025-01-20T15:03:48.917+01:00 ERROR 9360 --- [ault-executor-2]
> .c.PubSubApiClient$FetchResponseObserver : Trailer: rpc-id, Value: ****
> 2025-01-20T15:03:48.917+01:00 ERROR 9360 --- [ault-executor-2]
> .c.PubSubApiClient$FetchResponseObserver : Trailer: error-code, Value:
> sfdc.platform.eventbus.grpc.subscription.fetch.replayid.corrupted
> 2025-01-20T15:03:48.917+01:00 ERROR 9360 --- [ault-executor-2]
> .c.PubSubApiClient$FetchResponseObserver : Trailer: type, Value: Subscribe
> 2025-01-20T15:03:48.917+01:00 ERROR 9360 --- [ault-executor-2]
> .c.PubSubApiClient$FetchResponseObserver : unexpected errorCode:
> sfdc.platform.eventbus.grpc.subscription.fetch.replayid.corrupted
> 2025-01-20T15:03:48.917+01:00  INFO 9360 --- [ault-executor-2]
> o.a.c.c.s.i.client.PubSubApiClient       : Subscribing to topic:
> /data/AccountChangeEvent.
> 2025-01-20T15:03:48.918+01:00  INFO 9360 --- [ault-executor-2]
> o.a.c.c.s.i.client.PubSubApiClient       : Subscribe successful.
>
> Sent with Proton Mail secure email.
>


-- 
Claus Ibsen
-----------------
@davsclaus
Camel in Action 2: https://www.manning.com/ibsen2

Reply via email to