davsclaus commented on code in PR #17265:
URL: https://github.com/apache/camel/pull/17265#discussion_r1969983127
##########
components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java:
##########
@@ -174,6 +181,58 @@ public void subscribe(PubSubApiConsumer consumer,
ReplayPreset replayPreset, Str
serverStream.onNext(fetchRequestBuilder.build());
}
+ public void checkInitialReplayIdValidity(String topic, ByteString
replayId) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Checking initialReplayId {} for topic {}",
base64EncodeByteString(replayId), topic);
+ }
+ final AtomicReference<Throwable> error = new AtomicReference<>();
+ final CountDownLatch latch = new CountDownLatch(1);
+ final StreamObserver<FetchResponse> responseObserver = new
StreamObserver<>() {
+
+ @Override
+ public void onNext(FetchResponse value) {
+ latch.countDown();
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ if (t instanceof StatusRuntimeException e) {
+ Metadata trailers = e.getTrailers();
+ if (trailers != null && PUBSUB_ERROR_CORRUPTED_REPLAY_ID
+ .equals(trailers.get(Metadata.Key.of("error-code",
Metadata.ASCII_STRING_MARSHALLER)))) {
+ error.set(t);
+ }
+ }
+ latch.countDown();
+ }
+
+ @Override
+ public void onCompleted() {
Review Comment:
Just to be sure, so if the reply id is valid then onNext is invoked, so you
dont need to count down the latch here also
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]