scwhittle commented on code in PR #34367:
URL: https://github.com/apache/beam/pull/34367#discussion_r2016168525
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java:
##########
@@ -88,19 +105,24 @@ private WindmillStreamSender(
streamingEngineStreamFactory.createDirectCommitWorkStream(
connection,
streamingEngineThrottleTimers.commitWorkThrottleTimer());
this.workCommitter = workCommitterFactory.apply(commitWorkStream);
- this.getWorkStream =
- streamingEngineStreamFactory.createDirectGetWorkStream(
- connection,
- withRequestBudget(getWorkRequest, getWorkBudget.get()),
- streamingEngineThrottleTimers.getWorkThrottleTimer(),
- FixedStreamHeartbeatSender.create(getDataStream),
- getDataClientFactory.apply(getDataStream),
- workCommitter,
- workItemScheduler);
+ this.activeGetWorkStream = new AtomicReference<>();
+ this.getWorkStreamFactory =
+ () ->
+ streamingEngineStreamFactory.createDirectGetWorkStream(
+ connection,
+ withRequestBudget(getWorkRequest, getWorkBudget.get()),
+ streamingEngineThrottleTimers.getWorkThrottleTimer(),
+ FixedStreamHeartbeatSender.create(getDataStream),
+ getDataClientFactory.apply(getDataStream),
+ workCommitter,
+ workItemScheduler);
// 3 threads, 1 for each stream type (GetWork, GetData, CommitWork).
this.streamStarter =
Executors.newFixedThreadPool(
Review Comment:
how about newCachedThreadPool? it seems like 2 of these threads are just for
start() and then won't be used and we might as well have them go away.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java:
##########
@@ -88,19 +105,24 @@ private WindmillStreamSender(
streamingEngineStreamFactory.createDirectCommitWorkStream(
connection,
streamingEngineThrottleTimers.commitWorkThrottleTimer());
this.workCommitter = workCommitterFactory.apply(commitWorkStream);
- this.getWorkStream =
- streamingEngineStreamFactory.createDirectGetWorkStream(
- connection,
- withRequestBudget(getWorkRequest, getWorkBudget.get()),
- streamingEngineThrottleTimers.getWorkThrottleTimer(),
- FixedStreamHeartbeatSender.create(getDataStream),
- getDataClientFactory.apply(getDataStream),
- workCommitter,
- workItemScheduler);
+ this.activeGetWorkStream = new AtomicReference<>();
+ this.getWorkStreamFactory =
+ () ->
+ streamingEngineStreamFactory.createDirectGetWorkStream(
+ connection,
+ withRequestBudget(getWorkRequest, getWorkBudget.get()),
+ streamingEngineThrottleTimers.getWorkThrottleTimer(),
Review Comment:
should we just use the same throttle timer, heartbeat sender, and
getdataclientfactory for each stream?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java:
##########
@@ -57,15 +64,26 @@
@Internal
@ThreadSafe
final class WindmillStreamSender implements GetWorkBudgetSpender, StreamSender
{
- private static final String STREAM_STARTER_THREAD_NAME =
"StartWindmillStreamThread-%d";
- private final AtomicBoolean started;
- private final AtomicReference<GetWorkBudget> getWorkBudget;
- private final GetWorkStream getWorkStream;
+ private static final Logger LOG =
LoggerFactory.getLogger(WindmillStreamSender.class);
+ private static final String STREAM_MANAGER_THREAD_NAME_FORMAT =
"WindmillStreamManagerThread";
+ private static final int GET_WORK_STREAM_TTL_MINUTES = 45;
+
+ private final AtomicBoolean isRunning = new AtomicBoolean(false);
private final GetDataStream getDataStream;
private final CommitWorkStream commitWorkStream;
private final WorkCommitter workCommitter;
private final StreamingEngineThrottleTimers streamingEngineThrottleTimers;
private final ExecutorService streamStarter;
+ private final String backendWorkerToken;
+
+ @GuardedBy("activeGetWorkStream")
+ private final AtomicReference<GetWorkStream> activeGetWorkStream;
+
+ @GuardedBy("activeGetWorkStream")
+ private final AtomicReference<GetWorkBudget> getWorkBudget;
+
+ @GuardedBy("activeGetWorkStream")
Review Comment:
this doesn't seem like it needs to be guarded? any reason it has to be
serially called?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java:
##########
@@ -165,4 +201,39 @@ long getAndResetThrottleTime() {
long getCurrentActiveCommitBytes() {
return workCommitter.currentActiveCommitBytes();
}
+
+ /**
+ * Creates, starts, and gracefully terminates {@link GetWorkStream} before
the clientside deadline
+ * to prevent {@link
org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Status#DEADLINE_EXCEEDED} errors.
+ * If at any point the server closes the stream, reconnects immediately.
+ */
+ private void getWorkStreamLoop(CountDownLatch waitForInitialStream) {
+ @Nullable GetWorkStream newStream = null;
+ while (isRunning.get()) {
+ synchronized (activeGetWorkStream) {
+ newStream = getWorkStreamFactory.get();
Review Comment:
maybe this coudl be outside the synchronized block? we can create the new
stream before swapping it in as the active one. We could create the new stream
with no budget and just set it's budget once the old stream is half-closed.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java:
##########
@@ -126,35 +148,49 @@ private static GetWorkRequest
withRequestBudget(GetWorkRequest request, GetWorkB
}
synchronized void start() {
- if (!started.get()) {
+ if (isRunning.compareAndSet(false, true)) {
checkState(!streamStarter.isShutdown(), "WindmillStreamSender has
already been shutdown.");
-
// Start these 3 streams in parallel since they each may perform
blocking IO.
+ CountDownLatch waitForInitialStream = new CountDownLatch(1);
+ streamStarter.execute(() -> getWorkStreamLoop(waitForInitialStream));
CompletableFuture.allOf(
- CompletableFuture.runAsync(getWorkStream::start, streamStarter),
CompletableFuture.runAsync(getDataStream::start, streamStarter),
CompletableFuture.runAsync(commitWorkStream::start,
streamStarter))
.join();
+ try {
+ waitForInitialStream.await();
+ } catch (InterruptedException e) {
+ close();
+ LOG.error("GetWorkStream to {} was never able to start.",
backendWorkerToken);
+ throw new IllegalStateException("GetWorkStream unable to start
aborting.", e);
+ }
workCommitter.start();
- started.set(true);
}
}
@Override
public synchronized void close() {
+ isRunning.set(false);
streamStarter.shutdownNow();
- getWorkStream.shutdown();
getDataStream.shutdown();
workCommitter.stop();
commitWorkStream.shutdown();
}
@Override
public void setBudget(long items, long bytes) {
- GetWorkBudget budget =
GetWorkBudget.builder().setItems(items).setBytes(bytes).build();
- getWorkBudget.set(budget);
- if (started.get()) {
- getWorkStream.setBudget(budget);
+ synchronized (activeGetWorkStream) {
+ GetWorkBudget budget =
GetWorkBudget.builder().setItems(items).setBytes(bytes).build();
Review Comment:
this build and setting on the atomic, could be outside synchronized block
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java:
##########
@@ -57,15 +64,26 @@
@Internal
@ThreadSafe
final class WindmillStreamSender implements GetWorkBudgetSpender, StreamSender
{
- private static final String STREAM_STARTER_THREAD_NAME =
"StartWindmillStreamThread-%d";
- private final AtomicBoolean started;
- private final AtomicReference<GetWorkBudget> getWorkBudget;
- private final GetWorkStream getWorkStream;
+ private static final Logger LOG =
LoggerFactory.getLogger(WindmillStreamSender.class);
+ private static final String STREAM_MANAGER_THREAD_NAME_FORMAT =
"WindmillStreamManagerThread";
+ private static final int GET_WORK_STREAM_TTL_MINUTES = 45;
+
+ private final AtomicBoolean isRunning = new AtomicBoolean(false);
private final GetDataStream getDataStream;
private final CommitWorkStream commitWorkStream;
private final WorkCommitter workCommitter;
private final StreamingEngineThrottleTimers streamingEngineThrottleTimers;
private final ExecutorService streamStarter;
+ private final String backendWorkerToken;
+
+ @GuardedBy("activeGetWorkStream")
+ private final AtomicReference<GetWorkStream> activeGetWorkStream;
Review Comment:
if this is guarded I don't think it needs to be an atomic.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java:
##########
@@ -126,35 +148,49 @@ private static GetWorkRequest
withRequestBudget(GetWorkRequest request, GetWorkB
}
synchronized void start() {
- if (!started.get()) {
+ if (isRunning.compareAndSet(false, true)) {
checkState(!streamStarter.isShutdown(), "WindmillStreamSender has
already been shutdown.");
-
// Start these 3 streams in parallel since they each may perform
blocking IO.
+ CountDownLatch waitForInitialStream = new CountDownLatch(1);
+ streamStarter.execute(() -> getWorkStreamLoop(waitForInitialStream));
CompletableFuture.allOf(
- CompletableFuture.runAsync(getWorkStream::start, streamStarter),
CompletableFuture.runAsync(getDataStream::start, streamStarter),
CompletableFuture.runAsync(commitWorkStream::start,
streamStarter))
.join();
+ try {
+ waitForInitialStream.await();
+ } catch (InterruptedException e) {
+ close();
+ LOG.error("GetWorkStream to {} was never able to start.",
backendWorkerToken);
+ throw new IllegalStateException("GetWorkStream unable to start
aborting.", e);
+ }
workCommitter.start();
- started.set(true);
}
}
@Override
public synchronized void close() {
+ isRunning.set(false);
streamStarter.shutdownNow();
- getWorkStream.shutdown();
getDataStream.shutdown();
workCommitter.stop();
commitWorkStream.shutdown();
}
@Override
public void setBudget(long items, long bytes) {
- GetWorkBudget budget =
GetWorkBudget.builder().setItems(items).setBytes(bytes).build();
- getWorkBudget.set(budget);
- if (started.get()) {
- getWorkStream.setBudget(budget);
+ synchronized (activeGetWorkStream) {
+ GetWorkBudget budget =
GetWorkBudget.builder().setItems(items).setBytes(bytes).build();
+ getWorkBudget.set(budget);
+ if (isRunning.get()) {
Review Comment:
remove running check and just use the null below? seems like if
activeGetWorkStream is set it is ok to call and it's one less interleaving to
think about
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java:
##########
@@ -57,15 +64,26 @@
@Internal
@ThreadSafe
final class WindmillStreamSender implements GetWorkBudgetSpender, StreamSender
{
- private static final String STREAM_STARTER_THREAD_NAME =
"StartWindmillStreamThread-%d";
- private final AtomicBoolean started;
- private final AtomicReference<GetWorkBudget> getWorkBudget;
- private final GetWorkStream getWorkStream;
+ private static final Logger LOG =
LoggerFactory.getLogger(WindmillStreamSender.class);
+ private static final String STREAM_MANAGER_THREAD_NAME_FORMAT =
"WindmillStreamManagerThread";
+ private static final int GET_WORK_STREAM_TTL_MINUTES = 45;
+
+ private final AtomicBoolean isRunning = new AtomicBoolean(false);
private final GetDataStream getDataStream;
private final CommitWorkStream commitWorkStream;
private final WorkCommitter workCommitter;
private final StreamingEngineThrottleTimers streamingEngineThrottleTimers;
private final ExecutorService streamStarter;
+ private final String backendWorkerToken;
+
+ @GuardedBy("activeGetWorkStream")
+ private final AtomicReference<GetWorkStream> activeGetWorkStream;
+
+ @GuardedBy("activeGetWorkStream")
+ private final AtomicReference<GetWorkBudget> getWorkBudget;
Review Comment:
ditto if this is guarded probably doesn't need to be atomic
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java:
##########
@@ -165,4 +201,39 @@ long getAndResetThrottleTime() {
long getCurrentActiveCommitBytes() {
return workCommitter.currentActiveCommitBytes();
}
+
+ /**
+ * Creates, starts, and gracefully terminates {@link GetWorkStream} before
the clientside deadline
+ * to prevent {@link
org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Status#DEADLINE_EXCEEDED} errors.
+ * If at any point the server closes the stream, reconnects immediately.
+ */
+ private void getWorkStreamLoop(CountDownLatch waitForInitialStream) {
+ @Nullable GetWorkStream newStream = null;
+ while (isRunning.get()) {
+ synchronized (activeGetWorkStream) {
+ newStream = getWorkStreamFactory.get();
+ newStream.start();
+ waitForInitialStream.countDown();
+ activeGetWorkStream.set(newStream);
+ }
+ try {
+ // Try to gracefully terminate the stream.
+ if (!newStream.awaitTermination(GET_WORK_STREAM_TTL_MINUTES,
TimeUnit.MINUTES)) {
+ newStream.halfClose();
+ }
+
+ // If graceful termination is unsuccessful, forcefully shutdown.
+ if (!newStream.awaitTermination(30, TimeUnit.SECONDS)) {
+ newStream.shutdown();
+ }
+
+ } catch (InterruptedException e) {
+ // continue until !isRunning.
Review Comment:
Could we instead force that isRunning is set to false here? we don't expect
interruptions to happen for any other reason.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java:
##########
@@ -57,15 +64,26 @@
@Internal
@ThreadSafe
final class WindmillStreamSender implements GetWorkBudgetSpender, StreamSender
{
- private static final String STREAM_STARTER_THREAD_NAME =
"StartWindmillStreamThread-%d";
- private final AtomicBoolean started;
- private final AtomicReference<GetWorkBudget> getWorkBudget;
- private final GetWorkStream getWorkStream;
+ private static final Logger LOG =
LoggerFactory.getLogger(WindmillStreamSender.class);
+ private static final String STREAM_MANAGER_THREAD_NAME_FORMAT =
"WindmillStreamManagerThread";
+ private static final int GET_WORK_STREAM_TTL_MINUTES = 45;
Review Comment:
could note that this needs to be less than the deadline in the other file
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java:
##########
@@ -165,4 +201,39 @@ long getAndResetThrottleTime() {
long getCurrentActiveCommitBytes() {
return workCommitter.currentActiveCommitBytes();
}
+
+ /**
+ * Creates, starts, and gracefully terminates {@link GetWorkStream} before
the clientside deadline
+ * to prevent {@link
org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Status#DEADLINE_EXCEEDED} errors.
+ * If at any point the server closes the stream, reconnects immediately.
+ */
+ private void getWorkStreamLoop(CountDownLatch waitForInitialStream) {
+ @Nullable GetWorkStream newStream = null;
+ while (isRunning.get()) {
+ synchronized (activeGetWorkStream) {
+ newStream = getWorkStreamFactory.get();
+ newStream.start();
+ waitForInitialStream.countDown();
+ activeGetWorkStream.set(newStream);
+ }
+ try {
+ // Try to gracefully terminate the stream.
+ if (!newStream.awaitTermination(GET_WORK_STREAM_TTL_MINUTES,
TimeUnit.MINUTES)) {
+ newStream.halfClose();
+ }
+
+ // If graceful termination is unsuccessful, forcefully shutdown.
+ if (!newStream.awaitTermination(30, TimeUnit.SECONDS)) {
Review Comment:
maybe better to increase this? If we lose getwork responses then windmill
worker has to retry, if we can get them to flush with a little more time that
seems fine.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java:
##########
@@ -189,6 +189,12 @@ private static <T extends AbstractStub<T>> T
withDefaultDeadline(T stub) {
return stub.withDeadlineAfter(DEFAULT_STREAM_RPC_DEADLINE_SECONDS,
TimeUnit.SECONDS);
}
+ private static <T extends AbstractStub<T>> T withLongDeadline(T stub) {
Review Comment:
withDirectPathDeadline?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java:
##########
@@ -165,4 +201,39 @@ long getAndResetThrottleTime() {
long getCurrentActiveCommitBytes() {
return workCommitter.currentActiveCommitBytes();
}
+
+ /**
+ * Creates, starts, and gracefully terminates {@link GetWorkStream} before
the clientside deadline
+ * to prevent {@link
org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Status#DEADLINE_EXCEEDED} errors.
+ * If at any point the server closes the stream, reconnects immediately.
+ */
+ private void getWorkStreamLoop(CountDownLatch waitForInitialStream) {
+ @Nullable GetWorkStream newStream = null;
+ while (isRunning.get()) {
+ synchronized (activeGetWorkStream) {
+ newStream = getWorkStreamFactory.get();
+ newStream.start();
+ waitForInitialStream.countDown();
+ activeGetWorkStream.set(newStream);
+ }
+ try {
+ // Try to gracefully terminate the stream.
+ if (!newStream.awaitTermination(GET_WORK_STREAM_TTL_MINUTES,
TimeUnit.MINUTES)) {
+ newStream.halfClose();
Review Comment:
I think as we half-close here we probably want to create a new stream to
take over.
That way we aren't idle while we're waiting for the termination.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##########
@@ -217,11 +217,11 @@ public void terminate(Throwable terminationException) {
}
private String constructStreamCancelledErrorMessage(long totalSecondsWaited)
{
- return deadlineSeconds > 0
+ return inactivityTimeout > 0
? "Waited "
+ totalSecondsWaited
+ "s which exceeds given deadline of "
- + deadlineSeconds
+ + inactivityTimeout
+ "s for the outboundObserver to become ready meaning "
+ "that the stream deadline was not respected."
Review Comment:
this seems like the wrong message if it isn't the stream deadline
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]