scwhittle commented on code in PR #27767: URL: https://github.com/apache/beam/pull/27767#discussion_r1289963961
########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillEndpoints.java: ########## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill; + +import com.google.auto.value.AutoValue; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; + +@AutoValue +public abstract class WindmillEndpoints { + public abstract ImmutableMap<String, String> globalDataServers(); + + public abstract ImmutableList<String> windmillServers(); Review Comment: add a comment about what string is alternatively should it be some sort of endpoint object instead of a string? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/AbstractWindmillStream.java: ########## @@ -130,16 +127,18 @@ private static long debugDuration(long nowMs, long startMs) { protected abstract void onNewStream(); /** Returns whether there are any pending requests that should be retried on a stream break. */ - protected abstract boolean hasPendingRequests(); + protected boolean hasPendingRequests() { Review Comment: revert? Since this is related to correctness a default seems dangerous since it makes it possible to skip implementing correctly. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillEndpoints.java: ########## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill; + +import com.google.auto.value.AutoValue; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; + +@AutoValue +public abstract class WindmillEndpoints { + public abstract ImmutableMap<String, String> globalDataServers(); Review Comment: Comment on what key and value are ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcCommitWorkStream.java: ########## @@ -56,7 +56,8 @@ final class GrpcCommitWorkStream private final int streamingRpcBatchLimit; private GrpcCommitWorkStream( - CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub stub, + Function<StreamObserver<StreamingCommitResponse>, StreamObserver<StreamingCommitWorkRequest>> + commitWorkRpc, Review Comment: nit: commitWorkRpc is a confusing name to me for a function. What about createCommitWorkRpcFn or startCommitWorkRpcFn etc? ditto for the other files ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcWindmillServer.java: ########## @@ -294,7 +293,10 @@ private synchronized CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alph if (stubList.size() == 1) { return stubList.get(0); } - return stubList.get(rand.nextInt(stubList.size())); + return stubList + .get(rand.nextInt(stubList.size())) + .withDeadlineAfter( + AbstractWindmillStream.DEFAULT_STREAM_RPC_DEADLINE_SECONDS, TimeUnit.SECONDS); Review Comment: this deadline is not being applied to stub returned above if only 1 stub ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcGetWorkerMetadataStream.java: ########## @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.grpcclient; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList.toImmutableList; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap.toImmutableMap; + +import java.io.PrintWriter; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import java.util.function.Function; +import org.apache.beam.runners.dataflow.worker.windmill.AbstractWindmillStream; +import org.apache.beam.runners.dataflow.worker.windmill.StreamObserverFactory; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkerMetadataRequest; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkerMetadataResponse; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillEndpoints; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub.GetWorkerMetadataStream; +import org.apache.beam.sdk.util.BackOff; +import org.apache.beam.sdk.util.FluentBackoff; +import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.StreamObserver; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class GrpcGetWorkerMetadataStream + extends AbstractWindmillStream<WorkerMetadataRequest, WorkerMetadataResponse> + implements GetWorkerMetadataStream { + private static final Logger LOG = LoggerFactory.getLogger(GrpcGetWorkerMetadataStream.class); + private static final WorkerMetadataRequest HEALTH_CHECK_REQUEST = + WorkerMetadataRequest.getDefaultInstance(); + + private final AtomicLong metadataVersion; + private final WorkerMetadataRequest workerMetadataRequest; + private final ThrottleTimer getWorkerMetadataThrottleTimer; + private final Consumer<WindmillEndpoints> serverMappingUpdater; + + private GrpcGetWorkerMetadataStream( + Function<StreamObserver<WorkerMetadataResponse>, StreamObserver<WorkerMetadataRequest>> + getWorkerMetadataRpc, + BackOff backoff, + StreamObserverFactory streamObserverFactory, + Set<AbstractWindmillStream<?, ?>> streamRegistry, + int logEveryNStreamFailures, + JobHeader jobHeader, + AtomicLong metadataVersion, + ThrottleTimer getWorkerMetadataThrottleTimer, + Consumer<WindmillEndpoints> serverMappingUpdater) { + super( + getWorkerMetadataRpc, + backoff, + streamObserverFactory, + streamRegistry, + logEveryNStreamFailures); + this.workerMetadataRequest = WorkerMetadataRequest.newBuilder().setHeader(jobHeader).build(); + this.metadataVersion = metadataVersion; + this.getWorkerMetadataThrottleTimer = getWorkerMetadataThrottleTimer; + this.serverMappingUpdater = serverMappingUpdater; + } + + public static GrpcGetWorkerMetadataStream create( + Function<StreamObserver<WorkerMetadataResponse>, StreamObserver<WorkerMetadataRequest>> + getWorkerMetadataRpc, + BackOff backoff, + StreamObserverFactory streamObserverFactory, + Set<AbstractWindmillStream<?, ?>> streamRegistry, + int logEveryNStreamFailures, + JobHeader jobHeader, + AtomicLong metadataVersion, + ThrottleTimer getWorkerMetadataThrottleTimer, + Consumer<WindmillEndpoints> serverMappingUpdater) { + GrpcGetWorkerMetadataStream getWorkerMetadataStream = + new GrpcGetWorkerMetadataStream( + getWorkerMetadataRpc, + backoff, + streamObserverFactory, + streamRegistry, + logEveryNStreamFailures, + jobHeader, + metadataVersion, + getWorkerMetadataThrottleTimer, + serverMappingUpdater); + getWorkerMetadataStream.startStream(); + return getWorkerMetadataStream; + } + + @VisibleForTesting + static GrpcGetWorkerMetadataStream forTesting( + Function<StreamObserver<WorkerMetadataResponse>, StreamObserver<WorkerMetadataRequest>> + getWorkerMetadataRpc, + AtomicLong metadataVersion, + JobHeader jobHeader, + Set<AbstractWindmillStream<?, ?>> streamRegistry, + Consumer<WindmillEndpoints> serverMappingUpdater) { + GrpcGetWorkerMetadataStream testStream = + new GrpcGetWorkerMetadataStream( + getWorkerMetadataRpc, + FluentBackoff.DEFAULT.backoff(), + StreamObserverFactory.direct(DEFAULT_STREAM_RPC_DEADLINE_SECONDS * 2, 1), + streamRegistry, + 1, // logEveryNStreamFailures + jobHeader, + metadataVersion, + new ThrottleTimer(), + serverMappingUpdater); + testStream.startStream(); + return testStream; + } + + private long updateMetadataVersion(long currentMetadataVersion, long responseMetadataVersion) { + if (currentMetadataVersion < responseMetadataVersion) { + return responseMetadataVersion; + } + + // If the currentMetadataVersion is greater than or equal to one in the response, the response + // data is stale, and we do not want to do anything. + LOG.info( + "Received metadata version={}; Current metadata version={}. " + + "Skipping update because received stale metadata", + responseMetadataVersion, + currentMetadataVersion); + return currentMetadataVersion; + } + + @Override + protected void onResponse(WorkerMetadataResponse response) { + metadataVersion.getAndUpdate( + current -> updateMetadataVersion(current, response.getMetadataVersion())); Review Comment: if we get a version before our current version this seems to leave metadataVersion at the more current version *but* then it goes ahead and returns the stale response instead of filtering it. ########## runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcGetWorkerMetadataStreamTest.java: ########## @@ -0,0 +1,348 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.grpcclient; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList.toImmutableList; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap.toImmutableMap; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.apache.beam.runners.dataflow.worker.windmill.AbstractWindmillStream; +import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkerMetadataRequest; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkerMetadataResponse; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillEndpoints; +import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ManagedChannel; +import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Server; +import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.inprocess.InProcessChannelBuilder; +import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.inprocess.InProcessServerBuilder; +import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.StreamObserver; +import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.testing.GrpcCleanupRule; +import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.util.MutableHandlerRegistry; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mockito; + +@RunWith(JUnit4.class) +public class GrpcGetWorkerMetadataStreamTest { + private static final List<WorkerMetadataResponse.Endpoint> DIRECT_PATH_ENDPOINTS = + Lists.newArrayList( + WorkerMetadataResponse.Endpoint.newBuilder().setEndpoint("direct_path").build()); + private static final Map<String, WorkerMetadataResponse.Endpoint> GLOBAL_DATA_ENDPOINTS = + Maps.newHashMap(); + private static final JobHeader TEST_JOB_HEADER = + JobHeader.newBuilder() + .setJobId("test_job") + .setWorkerId("test_worker") + .setProjectId("test_project") + .build(); + private static final String name = "Fake server for GrpcGetWorkerMetadataStreamTest"; + @Rule public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); + private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry(); + private final Set<AbstractWindmillStream<?, ?>> streamRegistry = new HashSet<>(); + private ManagedChannel inProcessChannel; + private GrpcGetWorkerMetadataStream stream; + + private GrpcGetWorkerMetadataStream getWorkerMetadataStream( Review Comment: nit: add test to name? getWorkerMetadataTestStream ########## runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcGetWorkerMetadataStreamTest.java: ########## @@ -0,0 +1,348 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.grpcclient; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList.toImmutableList; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap.toImmutableMap; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.apache.beam.runners.dataflow.worker.windmill.AbstractWindmillStream; +import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkerMetadataRequest; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkerMetadataResponse; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillEndpoints; +import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ManagedChannel; +import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Server; +import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.inprocess.InProcessChannelBuilder; +import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.inprocess.InProcessServerBuilder; +import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.StreamObserver; +import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.testing.GrpcCleanupRule; +import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.util.MutableHandlerRegistry; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mockito; + +@RunWith(JUnit4.class) +public class GrpcGetWorkerMetadataStreamTest { + private static final List<WorkerMetadataResponse.Endpoint> DIRECT_PATH_ENDPOINTS = + Lists.newArrayList( + WorkerMetadataResponse.Endpoint.newBuilder().setEndpoint("direct_path").build()); + private static final Map<String, WorkerMetadataResponse.Endpoint> GLOBAL_DATA_ENDPOINTS = + Maps.newHashMap(); + private static final JobHeader TEST_JOB_HEADER = + JobHeader.newBuilder() + .setJobId("test_job") + .setWorkerId("test_worker") + .setProjectId("test_project") + .build(); + private static final String name = "Fake server for GrpcGetWorkerMetadataStreamTest"; + @Rule public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); + private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry(); + private final Set<AbstractWindmillStream<?, ?>> streamRegistry = new HashSet<>(); + private ManagedChannel inProcessChannel; + private GrpcGetWorkerMetadataStream stream; + + private GrpcGetWorkerMetadataStream getWorkerMetadataStream( + GetWorkerMetadataTestStub getWorkerMetadataTestStub, + AtomicLong metadataVersion, + Consumer<WindmillEndpoints> endpointsConsumer) { + serviceRegistry.addService(getWorkerMetadataTestStub); + return GrpcGetWorkerMetadataStream.forTesting( + responseObserver -> + CloudWindmillServiceV1Alpha1Grpc.newStub(inProcessChannel) + .getWorkerMetadataStream(responseObserver), + metadataVersion, + TEST_JOB_HEADER, + streamRegistry, + endpointsConsumer); + } + + @Before + public void setUp() throws IOException { + Server server = + InProcessServerBuilder.forName(name) + .fallbackHandlerRegistry(serviceRegistry) + .directExecutor() + .build() + .start(); + inProcessChannel = + grpcCleanup.register(InProcessChannelBuilder.forName(name).directExecutor().build()); + grpcCleanup.register(server); + grpcCleanup.register(inProcessChannel); + GLOBAL_DATA_ENDPOINTS.put( + "global_data", + WorkerMetadataResponse.Endpoint.newBuilder().setEndpoint("global_data").build()); + } + + @After + public void cleanUp() { + inProcessChannel.shutdownNow(); + } + + @Test + public void testGetWorkerMetadata() { + WorkerMetadataResponse mockResponse = + WorkerMetadataResponse.newBuilder() + .setMetadataVersion(1) + .addAllDirectPathEndpoints(DIRECT_PATH_ENDPOINTS) + .putAllGlobalDataEndpoints(GLOBAL_DATA_ENDPOINTS) + .build(); + TestWindmillEndpointsConsumer testWindmillEndpointsConsumer = + new TestWindmillEndpointsConsumer(); + TestGetWorkMetadataRequestObserver requestObserver = + new TestGetWorkMetadataRequestObserver(mockResponse); + GetWorkerMetadataTestStub testStub = new GetWorkerMetadataTestStub(requestObserver); + AtomicLong metadataVersion = new AtomicLong(0); + stream = getWorkerMetadataStream(testStub, metadataVersion, testWindmillEndpointsConsumer); + assertEquals( + GLOBAL_DATA_ENDPOINTS.size(), testWindmillEndpointsConsumer.globalDataEndpoints.size()); + testWindmillEndpointsConsumer.globalDataEndpoints.forEach( + (key, value) -> assertTrue(GLOBAL_DATA_ENDPOINTS.containsKey(key))); + assertEquals( + DIRECT_PATH_ENDPOINTS.size(), testWindmillEndpointsConsumer.windmillEndpoints.size()); + assertTrue( + testWindmillEndpointsConsumer.windmillEndpoints.containsAll( + DIRECT_PATH_ENDPOINTS.stream() + .map(WorkerMetadataResponse.Endpoint::getEndpoint) + .collect(Collectors.toList()))); + assertEquals(mockResponse.getMetadataVersion(), metadataVersion.get()); + } + + @Test + public void testGetWorkerMetadata_consumesSubsequentResponseMetadata() { + WorkerMetadataResponse initialResponse = + WorkerMetadataResponse.newBuilder() + .setMetadataVersion(1) + .addAllDirectPathEndpoints(DIRECT_PATH_ENDPOINTS) + .putAllGlobalDataEndpoints(GLOBAL_DATA_ENDPOINTS) + .build(); + TestWindmillEndpointsConsumer testWindmillEndpointsConsumer = + Mockito.spy(new TestWindmillEndpointsConsumer()); + TestGetWorkMetadataRequestObserver requestObserver = + new TestGetWorkMetadataRequestObserver(initialResponse); + GetWorkerMetadataTestStub testStub = new GetWorkerMetadataTestStub(requestObserver); + AtomicLong metadataVersion = new AtomicLong(0); + stream = getWorkerMetadataStream(testStub, metadataVersion, testWindmillEndpointsConsumer); + + List<WorkerMetadataResponse.Endpoint> newDirectPathEndpoints = + Lists.newArrayList( + WorkerMetadataResponse.Endpoint.newBuilder() + .setEndpoint("newWindmillEndpoint") + .build()); + Map<String, WorkerMetadataResponse.Endpoint> newGlobalDataEndpoints = new HashMap<>(); + newGlobalDataEndpoints.put( + "new_global_data", + WorkerMetadataResponse.Endpoint.newBuilder().setEndpoint("newGlobalData").build()); + + WorkerMetadataResponse newWorkMetadataResponse = + WorkerMetadataResponse.newBuilder() + .setMetadataVersion(initialResponse.getMetadataVersion() + 1) + .addAllDirectPathEndpoints(newDirectPathEndpoints) + .putAllGlobalDataEndpoints(newGlobalDataEndpoints) + .build(); + + testStub.injectWorkerMetadata(newWorkMetadataResponse); + + assertEquals( + newGlobalDataEndpoints.size(), testWindmillEndpointsConsumer.globalDataEndpoints.size()); + testWindmillEndpointsConsumer.globalDataEndpoints.forEach( + (key, value) -> assertTrue(newGlobalDataEndpoints.containsKey(key))); + assertEquals( + newDirectPathEndpoints.size(), testWindmillEndpointsConsumer.windmillEndpoints.size()); + assertTrue( + testWindmillEndpointsConsumer.windmillEndpoints.containsAll( + newDirectPathEndpoints.stream() + .map(WorkerMetadataResponse.Endpoint::getEndpoint) + .collect(Collectors.toList()))); + assertEquals(newWorkMetadataResponse.getMetadataVersion(), metadataVersion.get()); + } + + @Test + public void testGetWorkerMetadata_doesNotConsumeResponseIfMetadataStale() { + WorkerMetadataResponse freshEndpoints = + WorkerMetadataResponse.newBuilder() + .setMetadataVersion(2) + .addAllDirectPathEndpoints(DIRECT_PATH_ENDPOINTS) + .putAllGlobalDataEndpoints(GLOBAL_DATA_ENDPOINTS) + .build(); + TestWindmillEndpointsConsumer testWindmillEndpointsConsumer = + Mockito.spy(new TestWindmillEndpointsConsumer()); + TestGetWorkMetadataRequestObserver requestObserver = + new TestGetWorkMetadataRequestObserver(freshEndpoints); + GetWorkerMetadataTestStub testStub = new GetWorkerMetadataTestStub(requestObserver); + AtomicLong metadataVersion = new AtomicLong(0); + stream = getWorkerMetadataStream(testStub, metadataVersion, testWindmillEndpointsConsumer); + List<WorkerMetadataResponse.Endpoint> staleDirectPathEndpoints = + Lists.newArrayList( + WorkerMetadataResponse.Endpoint.newBuilder() + .setEndpoint("staleWindmillEndpoint") + .build()); + Map<String, WorkerMetadataResponse.Endpoint> staleGlobalDataEndpoints = new HashMap<>(); + staleGlobalDataEndpoints.put( + "stale_global_data", + WorkerMetadataResponse.Endpoint.newBuilder().setEndpoint("staleGlobalData").build()); + + testStub.injectWorkerMetadata( Review Comment: see comment in code. It seems like this test should fail with the current logic. It would also pass if this injection is somehow not reaching the stream so maybe some wiring is wrong? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
