m-trieu commented on code in PR #34367:
URL: https://github.com/apache/beam/pull/34367#discussion_r2073738102
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableThrowingStreamObserver.java:
##########
@@ -159,4 +180,76 @@ static final class StreamClosedException extends Exception
{
super(s);
}
}
+
+ static final class InternalStreamTimeout extends Throwable {
+ private static final InternalStreamTimeout INSTANCE = new
InternalStreamTimeout();
+
+ private InternalStreamTimeout() {}
+
+ static boolean isInternalTimeout(Throwable t) {
+ while (t != null) {
+ if (t == INSTANCE) {
+ return true;
+ }
+ t = t.getCause();
+ }
+ return false;
+ }
+ }
+
+ private final class AsyncStreamCloser {
+ private final BlockingQueue<StreamObserver<T>> streamsToClose;
+ private final ExecutorService streamCloserExecutor;
+
+ @GuardedBy("this")
+ private boolean started;
+
+ private AsyncStreamCloser() {
+ streamsToClose = new LinkedBlockingQueue<>();
+ streamCloserExecutor =
+ Executors.newSingleThreadExecutor(
+ new
ThreadFactoryBuilder().setNameFormat("StreamCloserThread-%d").build());
+ }
+
+ private synchronized void start() {
+ if (!started) {
+ streamCloserExecutor.execute(
+ () -> {
+ while (!isPoisoned()) {
+ try {
+ timeoutStream(streamsToClose.take());
+ } catch (InterruptedException e) {
+ // Drain streamsToClose to prevent any dangling
StreamObservers.
+ streamsToClose.forEach(this::timeoutStream);
+ break;
+ }
+ }
+ });
+ started = true;
+ }
+ }
+
+ private void timeoutStream(StreamObserver<T> streamObserver) {
+ try {
+ streamObserver.onError(InternalStreamTimeout.INSTANCE);
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]