m-trieu commented on code in PR #31902:
URL: https://github.com/apache/beam/pull/31902#discussion_r1757419759
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -311,71 +401,87 @@ public void onNext(ResponseT response) {
@Override
public void onError(Throwable t) {
- onStreamFinished(t);
+ if (maybeTeardownStream()) {
+ return;
+ }
+
+ Status status = Status.fromThrowable(t);
+ setLastError(status.toString());
+
+ if (t instanceof StreamObserverCancelledException) {
+ logger.error(
+ "StreamObserver was unexpectedly cancelled for stream={},
worker={}. stacktrace={}",
+ getClass(),
+ backendWorkerToken,
+ t.getStackTrace(),
+ t);
+ } else if (errorCount.getAndIncrement() % logEveryNStreamFailures == 0) {
Review Comment:
done
made it depend on Status codes (onCompleted will use Status.OK)
seperated tracking restart count vs error count
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]