Can you format the code so it is readable and provide the stacktrace?
On Thursday, January 23, 2025 at 11:39:21 AM UTC+5:30 Ankit Rathod wrote:
> `import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder;
> import io.grpc.Status; import io.grpc.stub.StreamObserver; import
> java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean;
> import java.util.List; import java.util.UUID; import java.util.Map; import
> org.slf4j.Logger; import org.slf4j.LoggerFactory;
>
> public class ImprovedGrpcClient implements AutoCloseable { private static
> final Logger log = LoggerFactory.getLogger(ImprovedGrpcClient.class);
> // Configuration constants private static final int
> MAX_CONCURRENT_REQUESTS = 10; private static final int
> STREAM_TIMEOUT_SECONDS = 30; private static final int
> RECONNECT_DELAY_SECONDS = 5; private static final int
> MAX_RECONNECT_ATTEMPTS = 3; // gRPC and stream management private final
> ManagedChannel channel; private final YourServiceGrpc.YourServiceStub
> asyncStub; private volatile StreamObserver<YourRequestType>
> requestObserver; // Concurrency controls private final Semaphore
> requestThrottle; private final AtomicBoolean isStreamInitialized; private
> final Object streamLock; // Request tracking private final Map<String,
> CompletableFuture<YourResponseType>> responseFutures; private final
> ConcurrentLinkedQueue<YourRequestType> requestQueue; // Executors private
> final ScheduledExecutorService streamMonitor; private final ExecutorService
> responseProcessor; public ImprovedGrpcClient(String host, int port) { //
> Channel configuration with robust settings this.channel =
> ManagedChannelBuilder.forAddress(host, port) .usePlaintext() // Use
> .useTransportSecurity() in production .keepAliveTime(30, TimeUnit.SECONDS)
> .keepAliveTimeout(10, TimeUnit.SECONDS) .maxInboundMessageSize(100 * 1024 *
> 1024) .enableRetry() .build(); this.asyncStub =
> YourServiceGrpc.newStub(channel); // Initialize concurrent controls
> this.requestThrottle = new Semaphore(MAX_CONCURRENT_REQUESTS);
> this.isStreamInitialized = new AtomicBoolean(false); this.streamLock = new
> Object(); this.responseFutures = new ConcurrentHashMap<>();
> this.requestQueue = new ConcurrentLinkedQueue<>(); // Create executors int
> corePoolSize = Runtime.getRuntime().availableProcessors();
> this.responseProcessor = new ThreadPoolExecutor( corePoolSize, corePoolSize
> * 2, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100) );
> this.streamMonitor = Executors.newSingleThreadScheduledExecutor(); // Start
> monitoring the stream initializeStreamMonitoring(); } private void
> initializeStreamMonitoring() { streamMonitor.scheduleAtFixedRate(() -> {
> try { if (!isStreamInitialized.get()) { reconnectStream(); }
> processQueuedRequests(); } catch (Exception e) { log.error("Stream
> monitoring error", e); } }, 0, 5, TimeUnit.SECONDS); } public
> CompletableFuture<Boolean> initializeStream(StreamInitRequest initRequest)
> { CompletableFuture<Boolean> initFuture = new CompletableFuture<>();
> synchronized (streamLock) { if (isStreamInitialized.get()) {
> initFuture.complete(true); return initFuture; } try {
> StreamResponseObserver responseObserver = new
> StreamResponseObserver(initFuture); requestObserver =
> asyncStub.bidirectionalStream(responseObserver); // Send initialization
> request YourRequestType request = YourRequestType.newBuilder()
> .setInit(initRequest) .build(); requestObserver.onNext(request); } catch
> (Exception e) { log.error("Stream initialization failed", e);
> initFuture.completeExceptionally(e); } } return
> initFuture.orTimeout(STREAM_TIMEOUT_SECONDS, TimeUnit.SECONDS); } public
> CompletableFuture<YourResponseType> sendRequest(YourRequestType request) {
> // Validate stream is initialized if (!isStreamInitialized.get()) { return
> CompletableFuture.failedFuture( new IllegalStateException("Stream not
> initialized") ); } // Generate unique request ID String requestId =
> UUID.randomUUID().toString(); CompletableFuture<YourResponseType>
> responseFuture = new CompletableFuture<>(); try { // Acquire throttle
> permit with timeout if (!requestThrottle.tryAcquire(STREAM_TIMEOUT_SECONDS,
> TimeUnit.SECONDS)) { return CompletableFuture.failedFuture( new
> TimeoutException("Could not acquire request throttle") ); } // Store future
> for later correlation responseFutures.put(requestId, responseFuture); //
> Queue the request requestQueue.offer(request); } catch
> (InterruptedException e) { Thread.currentThread().interrupt(); return
> CompletableFuture.failedFuture(e); } // Add timeout and cleanup return
> responseFuture .orTimeout(STREAM_TIMEOUT_SECONDS, TimeUnit.SECONDS)
> .whenComplete((response, throwable) -> { responseFutures.remove(requestId);
> requestThrottle.release(); }); } private void processQueuedRequests() { if
> (!isStreamInitialized.get()) return; YourRequestType request; while
> ((request = requestQueue.poll()) != null) { try {
> requestObserver.onNext(request); } catch (Exception e) { log.error("Failed
> to send queued request", e); reconnectStream(); break; } } } private void
> reconnectStream() { synchronized (streamLock) { if
> (isStreamInitialized.get()) return; log.warn("Attempting to reconnect
> stream"); // Close existing stream if it exists if (requestObserver !=
> null) { try { requestObserver.onCompleted(); } catch (Exception e) {
> log.error("Error closing existing stream", e); } } // Fail all pending
> futures responseFutures.values().forEach(future ->
> future.completeExceptionally(new RuntimeException("Stream disconnected"))
> ); responseFutures.clear(); // Reinitialize stream // Note: You would
> typically call initializeStream with original parameters here } } private
> class StreamResponseObserver implements StreamObserver<YourResponseType> {
> private final CompletableFuture<Boolean> initFuture; public
> StreamResponseObserver(CompletableFuture<Boolean> initFuture) {
> this.initFuture = initFuture; } @Override public void
> onNext(YourResponseType response) { try { // Handle initialization response
> if (!isStreamInitialized.get() && isInitializationResponse(response)) {
> handleStreamInitialization(response); return; } // Correlate and complete
> response future correlateAndCompleteResponse(response); } catch (Exception
> e) { log.error("Error processing response", e); } } @Override public void
> onError(Throwable t) { log.error("Stream error", t); Status status =
> Status.fromThrowable(t); // Handle different error scenarios switch
> (status.getCode()) { case UNAVAILABLE: case CANCELLED: case
> DEADLINE_EXCEEDED: reconnectStream(); break; default: // Fail all futures
> for non-recoverable errors responseFutures.values().forEach(future ->
> future.completeExceptionally(t) ); responseFutures.clear(); } } @Override
> public void onCompleted() { log.info("Stream completed");
> reconnectStream(); } private void
> handleStreamInitialization(YourResponseType response) { synchronized
> (streamLock) { if (isSuccessfulInitialization(response)) {
> isStreamInitialized.set(true); initFuture.complete(true); } else {
> initFuture.completeExceptionally( new RuntimeException("Stream
> initialization failed") ); } } } private void
> correlateAndCompleteResponse(YourResponseType response) { String requestId
> = extractRequestId(response); CompletableFuture<YourResponseType> future =
> responseFutures.get(requestId); if (future != null) { if
> (isSuccessfulResponse(response)) { future.complete(response); } else {
> future.completeExceptionally( new RuntimeException("Request failed: " +
> getErrorMessage(response)) ); } } } } @Override public void close() { try {
> // Shutdown stream and channel if (requestObserver != null) {
> requestObserver.onCompleted(); } channel.shutdown().awaitTermination(5,
> TimeUnit.SECONDS); // Shutdown executors streamMonitor.shutdown();
> responseProcessor.shutdown(); streamMonitor.awaitTermination(5,
> TimeUnit.SECONDS); responseProcessor.awaitTermination(5, TimeUnit.SECONDS);
> } catch (InterruptedException e) { Thread.currentThread().interrupt();
> log.error("Shutdown interrupted", e); } } // Utility methods to be
> implemented based on your specific response types private boolean
> isInitializationResponse(YourResponseType response) { // Implement logic to
> identify initialization response return false; } private boolean
> isSuccessfulInitialization(YourResponseType response) { // Implement logic
> to check successful initialization return false; } private boolean
> isSuccessfulResponse(YourResponseType response) { // Implement logic to
> check successful response return false; } private String
> extractRequestId(YourResponseType response) { // Implement logic to extract
> request ID return ""; } private String getErrorMessage(YourResponseType
> response) { // Implement logic to get error message return ""; } } `
>
> I am not able to understand what is wrong with this code.After a few
> messages being processed susccessfully the stream suddenly errors out
> saying CLient close.Invalid wire in XML or Faled to read message. Is is
> something to do with race condition? Please help
>
> Tried to synchronously implement the above and it worked.I want to
> implement somekind of concurrency for request and response on the grpc
> client side.
>
--
You received this message because you are subscribed to the Google Groups
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To view this discussion visit
https://groups.google.com/d/msgid/grpc-io/e166c8d0-7509-426d-bbbd-983a165cc953n%40googlegroups.com.