`import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder;
import io.grpc.Status; import io.grpc.stub.StreamObserver; import
java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.List; import java.util.UUID; import java.util.Map; import
org.slf4j.Logger; import org.slf4j.LoggerFactory;
public class ImprovedGrpcClient implements AutoCloseable { private static
final Logger log = LoggerFactory.getLogger(ImprovedGrpcClient.class);
// Configuration constants private static final int MAX_CONCURRENT_REQUESTS
= 10; private static final int STREAM_TIMEOUT_SECONDS = 30; private static
final int RECONNECT_DELAY_SECONDS = 5; private static final int
MAX_RECONNECT_ATTEMPTS = 3; // gRPC and stream management private final
ManagedChannel channel; private final YourServiceGrpc.YourServiceStub
asyncStub; private volatile StreamObserver<YourRequestType>
requestObserver; // Concurrency controls private final Semaphore
requestThrottle; private final AtomicBoolean isStreamInitialized; private
final Object streamLock; // Request tracking private final Map<String,
CompletableFuture<YourResponseType>> responseFutures; private final
ConcurrentLinkedQueue<YourRequestType> requestQueue; // Executors private
final ScheduledExecutorService streamMonitor; private final ExecutorService
responseProcessor; public ImprovedGrpcClient(String host, int port) { //
Channel configuration with robust settings this.channel =
ManagedChannelBuilder.forAddress(host, port) .usePlaintext() // Use
.useTransportSecurity() in production .keepAliveTime(30, TimeUnit.SECONDS)
.keepAliveTimeout(10, TimeUnit.SECONDS) .maxInboundMessageSize(100 * 1024 *
1024) .enableRetry() .build(); this.asyncStub =
YourServiceGrpc.newStub(channel); // Initialize concurrent controls
this.requestThrottle = new Semaphore(MAX_CONCURRENT_REQUESTS);
this.isStreamInitialized = new AtomicBoolean(false); this.streamLock = new
Object(); this.responseFutures = new ConcurrentHashMap<>();
this.requestQueue = new ConcurrentLinkedQueue<>(); // Create executors int
corePoolSize = Runtime.getRuntime().availableProcessors();
this.responseProcessor = new ThreadPoolExecutor( corePoolSize, corePoolSize
* 2, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100) );
this.streamMonitor = Executors.newSingleThreadScheduledExecutor(); // Start
monitoring the stream initializeStreamMonitoring(); } private void
initializeStreamMonitoring() { streamMonitor.scheduleAtFixedRate(() -> {
try { if (!isStreamInitialized.get()) { reconnectStream(); }
processQueuedRequests(); } catch (Exception e) { log.error("Stream
monitoring error", e); } }, 0, 5, TimeUnit.SECONDS); } public
CompletableFuture<Boolean> initializeStream(StreamInitRequest initRequest)
{ CompletableFuture<Boolean> initFuture = new CompletableFuture<>();
synchronized (streamLock) { if (isStreamInitialized.get()) {
initFuture.complete(true); return initFuture; } try {
StreamResponseObserver responseObserver = new
StreamResponseObserver(initFuture); requestObserver =
asyncStub.bidirectionalStream(responseObserver); // Send initialization
request YourRequestType request = YourRequestType.newBuilder()
.setInit(initRequest) .build(); requestObserver.onNext(request); } catch
(Exception e) { log.error("Stream initialization failed", e);
initFuture.completeExceptionally(e); } } return
initFuture.orTimeout(STREAM_TIMEOUT_SECONDS, TimeUnit.SECONDS); } public
CompletableFuture<YourResponseType> sendRequest(YourRequestType request) {
// Validate stream is initialized if (!isStreamInitialized.get()) { return
CompletableFuture.failedFuture( new IllegalStateException("Stream not
initialized") ); } // Generate unique request ID String requestId =
UUID.randomUUID().toString(); CompletableFuture<YourResponseType>
responseFuture = new CompletableFuture<>(); try { // Acquire throttle
permit with timeout if (!requestThrottle.tryAcquire(STREAM_TIMEOUT_SECONDS,
TimeUnit.SECONDS)) { return CompletableFuture.failedFuture( new
TimeoutException("Could not acquire request throttle") ); } // Store future
for later correlation responseFutures.put(requestId, responseFuture); //
Queue the request requestQueue.offer(request); } catch
(InterruptedException e) { Thread.currentThread().interrupt(); return
CompletableFuture.failedFuture(e); } // Add timeout and cleanup return
responseFuture .orTimeout(STREAM_TIMEOUT_SECONDS, TimeUnit.SECONDS)
.whenComplete((response, throwable) -> { responseFutures.remove(requestId);
requestThrottle.release(); }); } private void processQueuedRequests() { if
(!isStreamInitialized.get()) return; YourRequestType request; while
((request = requestQueue.poll()) != null) { try {
requestObserver.onNext(request); } catch (Exception e) { log.error("Failed
to send queued request", e); reconnectStream(); break; } } } private void
reconnectStream() { synchronized (streamLock) { if
(isStreamInitialized.get()) return; log.warn("Attempting to reconnect
stream"); // Close existing stream if it exists if (requestObserver !=
null) { try { requestObserver.onCompleted(); } catch (Exception e) {
log.error("Error closing existing stream", e); } } // Fail all pending
futures responseFutures.values().forEach(future ->
future.completeExceptionally(new RuntimeException("Stream disconnected"))
); responseFutures.clear(); // Reinitialize stream // Note: You would
typically call initializeStream with original parameters here } } private
class StreamResponseObserver implements StreamObserver<YourResponseType> {
private final CompletableFuture<Boolean> initFuture; public
StreamResponseObserver(CompletableFuture<Boolean> initFuture) {
this.initFuture = initFuture; } @Override public void
onNext(YourResponseType response) { try { // Handle initialization response
if (!isStreamInitialized.get() && isInitializationResponse(response)) {
handleStreamInitialization(response); return; } // Correlate and complete
response future correlateAndCompleteResponse(response); } catch (Exception
e) { log.error("Error processing response", e); } } @Override public void
onError(Throwable t) { log.error("Stream error", t); Status status =
Status.fromThrowable(t); // Handle different error scenarios switch
(status.getCode()) { case UNAVAILABLE: case CANCELLED: case
DEADLINE_EXCEEDED: reconnectStream(); break; default: // Fail all futures
for non-recoverable errors responseFutures.values().forEach(future ->
future.completeExceptionally(t) ); responseFutures.clear(); } } @Override
public void onCompleted() { log.info("Stream completed");
reconnectStream(); } private void
handleStreamInitialization(YourResponseType response) { synchronized
(streamLock) { if (isSuccessfulInitialization(response)) {
isStreamInitialized.set(true); initFuture.complete(true); } else {
initFuture.completeExceptionally( new RuntimeException("Stream
initialization failed") ); } } } private void
correlateAndCompleteResponse(YourResponseType response) { String requestId
= extractRequestId(response); CompletableFuture<YourResponseType> future =
responseFutures.get(requestId); if (future != null) { if
(isSuccessfulResponse(response)) { future.complete(response); } else {
future.completeExceptionally( new RuntimeException("Request failed: " +
getErrorMessage(response)) ); } } } } @Override public void close() { try {
// Shutdown stream and channel if (requestObserver != null) {
requestObserver.onCompleted(); } channel.shutdown().awaitTermination(5,
TimeUnit.SECONDS); // Shutdown executors streamMonitor.shutdown();
responseProcessor.shutdown(); streamMonitor.awaitTermination(5,
TimeUnit.SECONDS); responseProcessor.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) { Thread.currentThread().interrupt();
log.error("Shutdown interrupted", e); } } // Utility methods to be
implemented based on your specific response types private boolean
isInitializationResponse(YourResponseType response) { // Implement logic to
identify initialization response return false; } private boolean
isSuccessfulInitialization(YourResponseType response) { // Implement logic
to check successful initialization return false; } private boolean
isSuccessfulResponse(YourResponseType response) { // Implement logic to
check successful response return false; } private String
extractRequestId(YourResponseType response) { // Implement logic to extract
request ID return ""; } private String getErrorMessage(YourResponseType
response) { // Implement logic to get error message return ""; } } `
I am not able to understand what is wrong with this code.After a few
messages being processed susccessfully the stream suddenly errors out
saying CLient close.Invalid wire in XML or Faled to read message. Is is
something to do with race condition? Please help
Tried to synchronously implement the above and it worked.I want to
implement somekind of concurrency for request and response on the grpc
client side.
--
You received this message because you are subscribed to the Google Groups
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To view this discussion visit
https://groups.google.com/d/msgid/grpc-io/21e0602f-e008-4a9d-b69f-dd7d40b747e1n%40googlegroups.com.