a-jubar commented on code in PR #17265:
URL: https://github.com/apache/camel/pull/17265#discussion_r1971081518
##########
components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java:
##########
@@ -174,6 +181,58 @@ public void subscribe(PubSubApiConsumer consumer,
ReplayPreset replayPreset, Str
serverStream.onNext(fetchRequestBuilder.build());
}
+ public void checkInitialReplayIdValidity(String topic, ByteString
replayId) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Checking initialReplayId {} for topic {}",
base64EncodeByteString(replayId), topic);
+ }
+ final AtomicReference<Throwable> error = new AtomicReference<>();
+ final CountDownLatch latch = new CountDownLatch(1);
+ final StreamObserver<FetchResponse> responseObserver = new
StreamObserver<>() {
+
+ @Override
+ public void onNext(FetchResponse value) {
+ latch.countDown();
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ if (t instanceof StatusRuntimeException e) {
+ Metadata trailers = e.getTrailers();
+ if (trailers != null && PUBSUB_ERROR_CORRUPTED_REPLAY_ID
+ .equals(trailers.get(Metadata.Key.of("error-code",
Metadata.ASCII_STRING_MARSHALLER)))) {
+ error.set(t);
+ }
+ }
+ latch.countDown();
+ }
+
+ @Override
+ public void onCompleted() {
Review Comment:
Yes, if the replay id is valid the requested event is returned by the server
and onNext gets invoked.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]