arunpandianp commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1800611909
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java:
##########
@@ -131,46 +132,48 @@ private static GetWorkRequest
withRequestBudget(GetWorkRequest request, GetWorkB
return
request.toBuilder().setMaxItems(budget.items()).setMaxBytes(budget.bytes()).build();
}
- @SuppressWarnings("ReturnValueIgnored")
- void startStreams() {
- getWorkStream.get();
- getDataStream.get();
- commitWorkStream.get();
- workCommitter.get().start();
- // *stream.get() is all memoized in a threadsafe manner.
- started.set(true);
- }
-
- void closeAllStreams() {
- // Supplier<Stream>.get() starts the stream which is an expensive
operation as it initiates the
- // streaming RPCs by possibly making calls over the network. Do not close
the streams unless
- // they have already been started.
- if (started.get()) {
- getWorkStream.get().shutdown();
- getDataStream.get().shutdown();
- workCommitter.get().stop();
- commitWorkStream.get().shutdown();
+ synchronized void start() {
+ if (!started.get()) {
+ // Start these 3 streams in parallel since they each may perform
blocking IO.
+ CompletableFuture.allOf(
+ CompletableFuture.runAsync(getWorkStream::start, streamStarter),
+ CompletableFuture.runAsync(getDataStream::start, streamStarter),
+ CompletableFuture.runAsync(commitWorkStream::start,
streamStarter))
+ .join();
+ workCommitter.start();
+ started.set(true);
}
}
@Override
- public void adjustBudget(long itemsDelta, long bytesDelta) {
- getWorkBudget.set(getWorkBudget.get().apply(itemsDelta, bytesDelta));
+ public synchronized void close() {
if (started.get()) {
- getWorkStream.get().adjustBudget(itemsDelta, bytesDelta);
+ getWorkStream.shutdown();
+ getDataStream.shutdown();
+ workCommitter.stop();
+ commitWorkStream.shutdown();
}
}
@Override
- public GetWorkBudget remainingBudget() {
- return started.get() ? getWorkStream.get().remainingBudget() :
getWorkBudget.get();
+ public void setBudget(long items, long bytes) {
+ GetWorkBudget adjustment =
GetWorkBudget.builder().setItems(items).setBytes(bytes).build();
+ getWorkBudget.set(adjustment);
+ if (started.get()) {
+ getWorkStream.adjustBudget(adjustment);
Review Comment:
```suggestion
getWorkStream.setBudget(newBudget);
```
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -160,80 +179,125 @@ protected boolean isShutdown() {
private StreamObserver<RequestT> requestObserver() {
if (requestObserver == null) {
throw new NullPointerException(
- "requestObserver cannot be null. Missing a call to startStream() to
initialize.");
+ "requestObserver cannot be null. Missing a call to start() to
initialize stream.");
}
return requestObserver;
}
/** Send a request to the server. */
protected final void send(RequestT request) {
- lastSendTimeMs.set(Instant.now().getMillis());
synchronized (this) {
+ if (isShutdown()) {
+ return;
+ }
+
if (streamClosed.get()) {
throw new IllegalStateException("Send called on a client closed
stream.");
}
- requestObserver().onNext(request);
+ try {
+ lastSendTimeMs.set(Instant.now().getMillis());
+ requestObserver.onNext(request);
+ } catch (StreamObserverCancelledException e) {
+ if (isShutdown()) {
+ logger.debug("Stream was closed or shutdown during send.", e);
Review Comment:
```suggestion
logger.debug("Stream was shutdown during send.", e);
```
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -216,12 +207,18 @@ static FanOutStreamingEngineWorkerHarness forTesting(
return fanOutStreamingEngineWorkProvider;
}
- @SuppressWarnings("ReturnValueIgnored")
+ @SuppressWarnings("FutureReturnValueIgnored")
Review Comment:
assign the future to a variable named `unusedFuture` and remove the
suppression?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -160,80 +179,125 @@ protected boolean isShutdown() {
private StreamObserver<RequestT> requestObserver() {
if (requestObserver == null) {
throw new NullPointerException(
- "requestObserver cannot be null. Missing a call to startStream() to
initialize.");
+ "requestObserver cannot be null. Missing a call to start() to
initialize stream.");
}
return requestObserver;
}
/** Send a request to the server. */
protected final void send(RequestT request) {
- lastSendTimeMs.set(Instant.now().getMillis());
synchronized (this) {
+ if (isShutdown()) {
+ return;
+ }
+
if (streamClosed.get()) {
throw new IllegalStateException("Send called on a client closed
stream.");
}
- requestObserver().onNext(request);
+ try {
+ lastSendTimeMs.set(Instant.now().getMillis());
+ requestObserver.onNext(request);
+ } catch (StreamObserverCancelledException e) {
+ if (isShutdown()) {
+ logger.debug("Stream was closed or shutdown during send.", e);
+ return;
+ }
+
+ requestObserver.onError(e);
+ }
+ }
+ }
+
+ @Override
+ public final void start() {
+ if (!isShutdown.get() && started.compareAndSet(false, true)) {
+ // start() should only be executed once during the lifetime of the
stream for idempotency and
+ // when shutdown() has not been called.
+ startStream();
}
}
/** Starts the underlying stream. */
- protected final void startStream() {
+ private void startStream() {
// Add the stream to the registry after it has been fully constructed.
streamRegistry.add(this);
while (true) {
try {
synchronized (this) {
+ if (isShutdown.get()) {
+ break;
+ }
startTimeMs.set(Instant.now().getMillis());
lastResponseTimeMs.set(0);
streamClosed.set(false);
- // lazily initialize the requestObserver. Gets reset whenever the
stream is reopened.
- requestObserver = requestObserverSupplier.get();
+ requestObserver.reset();
onNewStream();
if (clientClosed.get()) {
halfClose();
}
return;
}
} catch (Exception e) {
- LOG.error("Failed to create new stream, retrying: ", e);
+ logger.error("Failed to create new stream, retrying: ", e);
try {
long sleep = backoff.nextBackOffMillis();
sleepUntil.set(Instant.now().getMillis() + sleep);
- Thread.sleep(sleep);
- } catch (InterruptedException | IOException i) {
+ sleeper.sleep(sleep);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ logger.info(
+ "Interrupted during stream creation backoff. The stream will not
be created.");
Review Comment:
log the stream name?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -187,13 +206,14 @@ protected void startThrottleTimer() {
commitWorkThrottleTimer.start();
}
- private void flushInternal(Map<Long, PendingRequest> requests) {
+ private void flushInternal(Map<Long, PendingRequest> requests) throws
InterruptedException {
Review Comment:
it doesn't look like any of the method calls inside `flushInternal` are
throwing InterruptedException. Can we remove the throws from here?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -317,29 +350,71 @@ public boolean commitWorkItem(
if (!canAccept(commitRequest.getSerializedSize() +
computation.length())) {
return false;
}
- PendingRequest request = new PendingRequest(computation, commitRequest,
onDone);
+
+ PendingRequest request = PendingRequest.create(computation,
commitRequest, onDone);
add(idGenerator.incrementAndGet(), request);
return true;
}
/** Flushes any pending work items to the wire. */
@Override
public void flush() {
- flushInternal(queue);
- queuedBytes = 0;
- queue.clear();
+ try {
+ if (!isShutdown()) {
+ flushInternal(queue);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ queuedBytes = 0;
+ queue.clear();
+ }
}
void add(long id, PendingRequest request) {
- assert (canAccept(request.getBytes()));
+ Preconditions.checkState(canAccept(request.getBytes()));
Review Comment:
double checking, since `canAccept` depends on `isShutdown`, is canAccept
still guaranteed to be true here?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -142,16 +142,7 @@ private FanOutStreamingEngineWorkerHarness(
connections.get().windmillStreams().values(),
totalGetWorkBudget);
lastBudgetRefresh.set(Instant.now());
});
- this.getWorkerMetadataStream =
- Suppliers.memoize(
- () ->
- streamFactory.createGetWorkerMetadataStream(
- dispatcherClient.getWindmillMetadataServiceStubBlocking(),
- getWorkerMetadataThrottleTimer,
- endpoints ->
- // Run this on a separate thread than the grpc stream
thread.
- newWorkerMetadataPublisher.submit(
- () -> newWindmillEndpoints.add(endpoints))));
+ this.getWorkerMetadataStream = null;
Review Comment:
does anything prevent us from creating the stream here and starting it in
start?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -160,80 +179,125 @@ protected boolean isShutdown() {
private StreamObserver<RequestT> requestObserver() {
if (requestObserver == null) {
throw new NullPointerException(
- "requestObserver cannot be null. Missing a call to startStream() to
initialize.");
+ "requestObserver cannot be null. Missing a call to start() to
initialize stream.");
}
return requestObserver;
}
/** Send a request to the server. */
protected final void send(RequestT request) {
- lastSendTimeMs.set(Instant.now().getMillis());
synchronized (this) {
+ if (isShutdown()) {
+ return;
+ }
+
if (streamClosed.get()) {
throw new IllegalStateException("Send called on a client closed
stream.");
}
- requestObserver().onNext(request);
+ try {
+ lastSendTimeMs.set(Instant.now().getMillis());
+ requestObserver.onNext(request);
+ } catch (StreamObserverCancelledException e) {
+ if (isShutdown()) {
+ logger.debug("Stream was closed or shutdown during send.", e);
+ return;
+ }
+
+ requestObserver.onError(e);
+ }
+ }
+ }
+
+ @Override
+ public final void start() {
+ if (!isShutdown.get() && started.compareAndSet(false, true)) {
+ // start() should only be executed once during the lifetime of the
stream for idempotency and
+ // when shutdown() has not been called.
+ startStream();
}
}
/** Starts the underlying stream. */
- protected final void startStream() {
+ private void startStream() {
// Add the stream to the registry after it has been fully constructed.
streamRegistry.add(this);
while (true) {
try {
synchronized (this) {
+ if (isShutdown.get()) {
+ break;
+ }
startTimeMs.set(Instant.now().getMillis());
lastResponseTimeMs.set(0);
streamClosed.set(false);
- // lazily initialize the requestObserver. Gets reset whenever the
stream is reopened.
- requestObserver = requestObserverSupplier.get();
+ requestObserver.reset();
onNewStream();
if (clientClosed.get()) {
halfClose();
}
return;
Review Comment:
Will it be better to do `streamRegistry.add(this);` here? and remove
`streamRegistry.remove(this);`?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -244,21 +308,26 @@ public final void appendSummaryHtml(PrintWriter writer) {
writer.format(", %dms backoff remaining", sleepLeft);
}
writer.format(
- ", current stream is %dms old, last send %dms, last response %dms,
closed: %s",
+ ", current stream is %dms old, last send %dms, last response %dms,
closed: %s, "
+ + "isShutdown: %s, shutdown time: %s",
debugDuration(nowMs, startTimeMs.get()),
debugDuration(nowMs, lastSendTimeMs.get()),
debugDuration(nowMs, lastResponseTimeMs.get()),
- streamClosed.get());
+ streamClosed.get(),
+ isShutdown.get(),
+ shutdownTime.get());
}
- // Don't require synchronization on stream, see the appendSummaryHtml
comment.
+ /**
+ * @implNote Don't require synchronization on stream, see the {@link
+ * #appendSummaryHtml(PrintWriter)} comment.
+ */
protected abstract void appendSpecificHtml(PrintWriter writer);
@Override
public final synchronized void halfClose() {
- // Synchronization of close and onCompleted necessary for correct retry
logic in onNewStream.
Review Comment:
is this not true anymore?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStream.java:
##########
@@ -47,16 +53,21 @@ public interface WindmillStream {
Instant startTime();
/**
- * Shutdown the stream. There should be no further interactions with the
stream once this has been
- * called.
+ * Shuts down the stream. No further interactions should be made with the
stream, and the stream
+ * will no longer try to connect internally. Any pending retries or
in-flight requests will be
+ * cancelled and all responses dropped and considered invalid.
*/
void shutdown();
/** Handle representing a stream of GetWork responses. */
@ThreadSafe
interface GetWorkStream extends WindmillStream {
/** Adjusts the {@link GetWorkBudget} for the stream. */
- void adjustBudget(long itemsDelta, long bytesDelta);
+ void adjustBudget(long newItems, long newBytes);
Review Comment:
```suggestion
void setBudget(long newItems, long newBytes);
```
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -160,80 +179,125 @@ protected boolean isShutdown() {
private StreamObserver<RequestT> requestObserver() {
if (requestObserver == null) {
throw new NullPointerException(
- "requestObserver cannot be null. Missing a call to startStream() to
initialize.");
+ "requestObserver cannot be null. Missing a call to start() to
initialize stream.");
}
return requestObserver;
}
/** Send a request to the server. */
protected final void send(RequestT request) {
- lastSendTimeMs.set(Instant.now().getMillis());
synchronized (this) {
+ if (isShutdown()) {
+ return;
+ }
+
if (streamClosed.get()) {
throw new IllegalStateException("Send called on a client closed
stream.");
}
- requestObserver().onNext(request);
+ try {
+ lastSendTimeMs.set(Instant.now().getMillis());
+ requestObserver.onNext(request);
+ } catch (StreamObserverCancelledException e) {
+ if (isShutdown()) {
+ logger.debug("Stream was closed or shutdown during send.", e);
+ return;
+ }
+
+ requestObserver.onError(e);
+ }
+ }
+ }
+
+ @Override
+ public final void start() {
+ if (!isShutdown.get() && started.compareAndSet(false, true)) {
Review Comment:
1. isShutdown returns false
2. A different thread calls Shutdown() and isShutdown becomes true
3. started is set to true and startStream is called
Is this a valid sequence? do we need to prevent it?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]