scwhittle commented on code in PR #27767:
URL: https://github.com/apache/beam/pull/27767#discussion_r1289963961


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillEndpoints.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill;
+
+import com.google.auto.value.AutoValue;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+@AutoValue
+public abstract class WindmillEndpoints {
+  public abstract ImmutableMap<String, String> globalDataServers();
+
+  public abstract ImmutableList<String> windmillServers();

Review Comment:
   add a comment about what string is
   
   alternatively should it be some sort of endpoint object instead of a string?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/AbstractWindmillStream.java:
##########
@@ -130,16 +127,18 @@ private static long debugDuration(long nowMs, long 
startMs) {
   protected abstract void onNewStream();
 
   /** Returns whether there are any pending requests that should be retried on 
a stream break. */
-  protected abstract boolean hasPendingRequests();
+  protected boolean hasPendingRequests() {

Review Comment:
   revert? Since this is related to correctness a default seems dangerous since 
it makes it possible to skip implementing correctly.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillEndpoints.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill;
+
+import com.google.auto.value.AutoValue;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+@AutoValue
+public abstract class WindmillEndpoints {
+  public abstract ImmutableMap<String, String> globalDataServers();

Review Comment:
   Comment on what key and value are



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcCommitWorkStream.java:
##########
@@ -56,7 +56,8 @@ final class GrpcCommitWorkStream
   private final int streamingRpcBatchLimit;
 
   private GrpcCommitWorkStream(
-      CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub stub,
+      Function<StreamObserver<StreamingCommitResponse>, 
StreamObserver<StreamingCommitWorkRequest>>
+          commitWorkRpc,

Review Comment:
   nit: commitWorkRpc is a confusing name to me for a function.  What about 
createCommitWorkRpcFn or startCommitWorkRpcFn etc?
   
   ditto for the other files



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcWindmillServer.java:
##########
@@ -294,7 +293,10 @@ private synchronized 
CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alph
     if (stubList.size() == 1) {
       return stubList.get(0);
     }
-    return stubList.get(rand.nextInt(stubList.size()));
+    return stubList
+        .get(rand.nextInt(stubList.size()))
+        .withDeadlineAfter(
+            AbstractWindmillStream.DEFAULT_STREAM_RPC_DEADLINE_SECONDS, 
TimeUnit.SECONDS);

Review Comment:
   this deadline is not being applied to stub returned above if only 1 stub



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcGetWorkerMetadataStream.java:
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.grpcclient;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList.toImmutableList;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap.toImmutableMap;
+
+import java.io.PrintWriter;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.beam.runners.dataflow.worker.windmill.AbstractWindmillStream;
+import org.apache.beam.runners.dataflow.worker.windmill.StreamObserverFactory;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkerMetadataRequest;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkerMetadataResponse;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillEndpoints;
+import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub.GetWorkerMetadataStream;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.StreamObserver;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class GrpcGetWorkerMetadataStream
+    extends AbstractWindmillStream<WorkerMetadataRequest, 
WorkerMetadataResponse>
+    implements GetWorkerMetadataStream {
+  private static final Logger LOG = 
LoggerFactory.getLogger(GrpcGetWorkerMetadataStream.class);
+  private static final WorkerMetadataRequest HEALTH_CHECK_REQUEST =
+      WorkerMetadataRequest.getDefaultInstance();
+
+  private final AtomicLong metadataVersion;
+  private final WorkerMetadataRequest workerMetadataRequest;
+  private final ThrottleTimer getWorkerMetadataThrottleTimer;
+  private final Consumer<WindmillEndpoints> serverMappingUpdater;
+
+  private GrpcGetWorkerMetadataStream(
+      Function<StreamObserver<WorkerMetadataResponse>, 
StreamObserver<WorkerMetadataRequest>>
+          getWorkerMetadataRpc,
+      BackOff backoff,
+      StreamObserverFactory streamObserverFactory,
+      Set<AbstractWindmillStream<?, ?>> streamRegistry,
+      int logEveryNStreamFailures,
+      JobHeader jobHeader,
+      AtomicLong metadataVersion,
+      ThrottleTimer getWorkerMetadataThrottleTimer,
+      Consumer<WindmillEndpoints> serverMappingUpdater) {
+    super(
+        getWorkerMetadataRpc,
+        backoff,
+        streamObserverFactory,
+        streamRegistry,
+        logEveryNStreamFailures);
+    this.workerMetadataRequest = 
WorkerMetadataRequest.newBuilder().setHeader(jobHeader).build();
+    this.metadataVersion = metadataVersion;
+    this.getWorkerMetadataThrottleTimer = getWorkerMetadataThrottleTimer;
+    this.serverMappingUpdater = serverMappingUpdater;
+  }
+
+  public static GrpcGetWorkerMetadataStream create(
+      Function<StreamObserver<WorkerMetadataResponse>, 
StreamObserver<WorkerMetadataRequest>>
+          getWorkerMetadataRpc,
+      BackOff backoff,
+      StreamObserverFactory streamObserverFactory,
+      Set<AbstractWindmillStream<?, ?>> streamRegistry,
+      int logEveryNStreamFailures,
+      JobHeader jobHeader,
+      AtomicLong metadataVersion,
+      ThrottleTimer getWorkerMetadataThrottleTimer,
+      Consumer<WindmillEndpoints> serverMappingUpdater) {
+    GrpcGetWorkerMetadataStream getWorkerMetadataStream =
+        new GrpcGetWorkerMetadataStream(
+            getWorkerMetadataRpc,
+            backoff,
+            streamObserverFactory,
+            streamRegistry,
+            logEveryNStreamFailures,
+            jobHeader,
+            metadataVersion,
+            getWorkerMetadataThrottleTimer,
+            serverMappingUpdater);
+    getWorkerMetadataStream.startStream();
+    return getWorkerMetadataStream;
+  }
+
+  @VisibleForTesting
+  static GrpcGetWorkerMetadataStream forTesting(
+      Function<StreamObserver<WorkerMetadataResponse>, 
StreamObserver<WorkerMetadataRequest>>
+          getWorkerMetadataRpc,
+      AtomicLong metadataVersion,
+      JobHeader jobHeader,
+      Set<AbstractWindmillStream<?, ?>> streamRegistry,
+      Consumer<WindmillEndpoints> serverMappingUpdater) {
+    GrpcGetWorkerMetadataStream testStream =
+        new GrpcGetWorkerMetadataStream(
+            getWorkerMetadataRpc,
+            FluentBackoff.DEFAULT.backoff(),
+            StreamObserverFactory.direct(DEFAULT_STREAM_RPC_DEADLINE_SECONDS * 
2, 1),
+            streamRegistry,
+            1, // logEveryNStreamFailures
+            jobHeader,
+            metadataVersion,
+            new ThrottleTimer(),
+            serverMappingUpdater);
+    testStream.startStream();
+    return testStream;
+  }
+
+  private long updateMetadataVersion(long currentMetadataVersion, long 
responseMetadataVersion) {
+    if (currentMetadataVersion < responseMetadataVersion) {
+      return responseMetadataVersion;
+    }
+
+    // If the currentMetadataVersion is greater than or equal to one in the 
response, the response
+    // data is stale, and we do not want to do anything.
+    LOG.info(
+        "Received metadata version={}; Current metadata version={}. "
+            + "Skipping update because received stale metadata",
+        responseMetadataVersion,
+        currentMetadataVersion);
+    return currentMetadataVersion;
+  }
+
+  @Override
+  protected void onResponse(WorkerMetadataResponse response) {
+    metadataVersion.getAndUpdate(
+        current -> updateMetadataVersion(current, 
response.getMetadataVersion()));

Review Comment:
   if we get a version before our current version this seems to leave 
metadataVersion at the more current version *but* then it goes ahead and 
returns the stale response instead of filtering it.



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcGetWorkerMetadataStreamTest.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.grpcclient;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList.toImmutableList;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap.toImmutableMap;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.dataflow.worker.windmill.AbstractWindmillStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkerMetadataRequest;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkerMetadataResponse;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillEndpoints;
+import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ManagedChannel;
+import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Server;
+import 
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.inprocess.InProcessChannelBuilder;
+import 
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.inprocess.InProcessServerBuilder;
+import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.StreamObserver;
+import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.testing.GrpcCleanupRule;
+import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.util.MutableHandlerRegistry;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mockito;
+
+@RunWith(JUnit4.class)
+public class GrpcGetWorkerMetadataStreamTest {
+  private static final List<WorkerMetadataResponse.Endpoint> 
DIRECT_PATH_ENDPOINTS =
+      Lists.newArrayList(
+          
WorkerMetadataResponse.Endpoint.newBuilder().setEndpoint("direct_path").build());
+  private static final Map<String, WorkerMetadataResponse.Endpoint> 
GLOBAL_DATA_ENDPOINTS =
+      Maps.newHashMap();
+  private static final JobHeader TEST_JOB_HEADER =
+      JobHeader.newBuilder()
+          .setJobId("test_job")
+          .setWorkerId("test_worker")
+          .setProjectId("test_project")
+          .build();
+  private static final String name = "Fake server for 
GrpcGetWorkerMetadataStreamTest";
+  @Rule public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
+  private final MutableHandlerRegistry serviceRegistry = new 
MutableHandlerRegistry();
+  private final Set<AbstractWindmillStream<?, ?>> streamRegistry = new 
HashSet<>();
+  private ManagedChannel inProcessChannel;
+  private GrpcGetWorkerMetadataStream stream;
+
+  private GrpcGetWorkerMetadataStream getWorkerMetadataStream(

Review Comment:
   nit: add test to name? getWorkerMetadataTestStream



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcGetWorkerMetadataStreamTest.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.grpcclient;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList.toImmutableList;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap.toImmutableMap;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.dataflow.worker.windmill.AbstractWindmillStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkerMetadataRequest;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkerMetadataResponse;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillEndpoints;
+import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ManagedChannel;
+import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Server;
+import 
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.inprocess.InProcessChannelBuilder;
+import 
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.inprocess.InProcessServerBuilder;
+import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.StreamObserver;
+import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.testing.GrpcCleanupRule;
+import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.util.MutableHandlerRegistry;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mockito;
+
+@RunWith(JUnit4.class)
+public class GrpcGetWorkerMetadataStreamTest {
+  private static final List<WorkerMetadataResponse.Endpoint> 
DIRECT_PATH_ENDPOINTS =
+      Lists.newArrayList(
+          
WorkerMetadataResponse.Endpoint.newBuilder().setEndpoint("direct_path").build());
+  private static final Map<String, WorkerMetadataResponse.Endpoint> 
GLOBAL_DATA_ENDPOINTS =
+      Maps.newHashMap();
+  private static final JobHeader TEST_JOB_HEADER =
+      JobHeader.newBuilder()
+          .setJobId("test_job")
+          .setWorkerId("test_worker")
+          .setProjectId("test_project")
+          .build();
+  private static final String name = "Fake server for 
GrpcGetWorkerMetadataStreamTest";
+  @Rule public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
+  private final MutableHandlerRegistry serviceRegistry = new 
MutableHandlerRegistry();
+  private final Set<AbstractWindmillStream<?, ?>> streamRegistry = new 
HashSet<>();
+  private ManagedChannel inProcessChannel;
+  private GrpcGetWorkerMetadataStream stream;
+
+  private GrpcGetWorkerMetadataStream getWorkerMetadataStream(
+      GetWorkerMetadataTestStub getWorkerMetadataTestStub,
+      AtomicLong metadataVersion,
+      Consumer<WindmillEndpoints> endpointsConsumer) {
+    serviceRegistry.addService(getWorkerMetadataTestStub);
+    return GrpcGetWorkerMetadataStream.forTesting(
+        responseObserver ->
+            CloudWindmillServiceV1Alpha1Grpc.newStub(inProcessChannel)
+                .getWorkerMetadataStream(responseObserver),
+        metadataVersion,
+        TEST_JOB_HEADER,
+        streamRegistry,
+        endpointsConsumer);
+  }
+
+  @Before
+  public void setUp() throws IOException {
+    Server server =
+        InProcessServerBuilder.forName(name)
+            .fallbackHandlerRegistry(serviceRegistry)
+            .directExecutor()
+            .build()
+            .start();
+    inProcessChannel =
+        
grpcCleanup.register(InProcessChannelBuilder.forName(name).directExecutor().build());
+    grpcCleanup.register(server);
+    grpcCleanup.register(inProcessChannel);
+    GLOBAL_DATA_ENDPOINTS.put(
+        "global_data",
+        
WorkerMetadataResponse.Endpoint.newBuilder().setEndpoint("global_data").build());
+  }
+
+  @After
+  public void cleanUp() {
+    inProcessChannel.shutdownNow();
+  }
+
+  @Test
+  public void testGetWorkerMetadata() {
+    WorkerMetadataResponse mockResponse =
+        WorkerMetadataResponse.newBuilder()
+            .setMetadataVersion(1)
+            .addAllDirectPathEndpoints(DIRECT_PATH_ENDPOINTS)
+            .putAllGlobalDataEndpoints(GLOBAL_DATA_ENDPOINTS)
+            .build();
+    TestWindmillEndpointsConsumer testWindmillEndpointsConsumer =
+        new TestWindmillEndpointsConsumer();
+    TestGetWorkMetadataRequestObserver requestObserver =
+        new TestGetWorkMetadataRequestObserver(mockResponse);
+    GetWorkerMetadataTestStub testStub = new 
GetWorkerMetadataTestStub(requestObserver);
+    AtomicLong metadataVersion = new AtomicLong(0);
+    stream = getWorkerMetadataStream(testStub, metadataVersion, 
testWindmillEndpointsConsumer);
+    assertEquals(
+        GLOBAL_DATA_ENDPOINTS.size(), 
testWindmillEndpointsConsumer.globalDataEndpoints.size());
+    testWindmillEndpointsConsumer.globalDataEndpoints.forEach(
+        (key, value) -> assertTrue(GLOBAL_DATA_ENDPOINTS.containsKey(key)));
+    assertEquals(
+        DIRECT_PATH_ENDPOINTS.size(), 
testWindmillEndpointsConsumer.windmillEndpoints.size());
+    assertTrue(
+        testWindmillEndpointsConsumer.windmillEndpoints.containsAll(
+            DIRECT_PATH_ENDPOINTS.stream()
+                .map(WorkerMetadataResponse.Endpoint::getEndpoint)
+                .collect(Collectors.toList())));
+    assertEquals(mockResponse.getMetadataVersion(), metadataVersion.get());
+  }
+
+  @Test
+  public void testGetWorkerMetadata_consumesSubsequentResponseMetadata() {
+    WorkerMetadataResponse initialResponse =
+        WorkerMetadataResponse.newBuilder()
+            .setMetadataVersion(1)
+            .addAllDirectPathEndpoints(DIRECT_PATH_ENDPOINTS)
+            .putAllGlobalDataEndpoints(GLOBAL_DATA_ENDPOINTS)
+            .build();
+    TestWindmillEndpointsConsumer testWindmillEndpointsConsumer =
+        Mockito.spy(new TestWindmillEndpointsConsumer());
+    TestGetWorkMetadataRequestObserver requestObserver =
+        new TestGetWorkMetadataRequestObserver(initialResponse);
+    GetWorkerMetadataTestStub testStub = new 
GetWorkerMetadataTestStub(requestObserver);
+    AtomicLong metadataVersion = new AtomicLong(0);
+    stream = getWorkerMetadataStream(testStub, metadataVersion, 
testWindmillEndpointsConsumer);
+
+    List<WorkerMetadataResponse.Endpoint> newDirectPathEndpoints =
+        Lists.newArrayList(
+            WorkerMetadataResponse.Endpoint.newBuilder()
+                .setEndpoint("newWindmillEndpoint")
+                .build());
+    Map<String, WorkerMetadataResponse.Endpoint> newGlobalDataEndpoints = new 
HashMap<>();
+    newGlobalDataEndpoints.put(
+        "new_global_data",
+        
WorkerMetadataResponse.Endpoint.newBuilder().setEndpoint("newGlobalData").build());
+
+    WorkerMetadataResponse newWorkMetadataResponse =
+        WorkerMetadataResponse.newBuilder()
+            .setMetadataVersion(initialResponse.getMetadataVersion() + 1)
+            .addAllDirectPathEndpoints(newDirectPathEndpoints)
+            .putAllGlobalDataEndpoints(newGlobalDataEndpoints)
+            .build();
+
+    testStub.injectWorkerMetadata(newWorkMetadataResponse);
+
+    assertEquals(
+        newGlobalDataEndpoints.size(), 
testWindmillEndpointsConsumer.globalDataEndpoints.size());
+    testWindmillEndpointsConsumer.globalDataEndpoints.forEach(
+        (key, value) -> assertTrue(newGlobalDataEndpoints.containsKey(key)));
+    assertEquals(
+        newDirectPathEndpoints.size(), 
testWindmillEndpointsConsumer.windmillEndpoints.size());
+    assertTrue(
+        testWindmillEndpointsConsumer.windmillEndpoints.containsAll(
+            newDirectPathEndpoints.stream()
+                .map(WorkerMetadataResponse.Endpoint::getEndpoint)
+                .collect(Collectors.toList())));
+    assertEquals(newWorkMetadataResponse.getMetadataVersion(), 
metadataVersion.get());
+  }
+
+  @Test
+  public void testGetWorkerMetadata_doesNotConsumeResponseIfMetadataStale() {
+    WorkerMetadataResponse freshEndpoints =
+        WorkerMetadataResponse.newBuilder()
+            .setMetadataVersion(2)
+            .addAllDirectPathEndpoints(DIRECT_PATH_ENDPOINTS)
+            .putAllGlobalDataEndpoints(GLOBAL_DATA_ENDPOINTS)
+            .build();
+    TestWindmillEndpointsConsumer testWindmillEndpointsConsumer =
+        Mockito.spy(new TestWindmillEndpointsConsumer());
+    TestGetWorkMetadataRequestObserver requestObserver =
+        new TestGetWorkMetadataRequestObserver(freshEndpoints);
+    GetWorkerMetadataTestStub testStub = new 
GetWorkerMetadataTestStub(requestObserver);
+    AtomicLong metadataVersion = new AtomicLong(0);
+    stream = getWorkerMetadataStream(testStub, metadataVersion, 
testWindmillEndpointsConsumer);
+    List<WorkerMetadataResponse.Endpoint> staleDirectPathEndpoints =
+        Lists.newArrayList(
+            WorkerMetadataResponse.Endpoint.newBuilder()
+                .setEndpoint("staleWindmillEndpoint")
+                .build());
+    Map<String, WorkerMetadataResponse.Endpoint> staleGlobalDataEndpoints = 
new HashMap<>();
+    staleGlobalDataEndpoints.put(
+        "stale_global_data",
+        
WorkerMetadataResponse.Endpoint.newBuilder().setEndpoint("staleGlobalData").build());
+
+    testStub.injectWorkerMetadata(

Review Comment:
   see comment in code. It seems like this test should fail with the current 
logic.  It would also pass if this injection is somehow not reaching the stream 
so maybe some wiring is wrong?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to