parveensania commented on code in PR #34148:
URL: https://github.com/apache/beam/pull/34148#discussion_r2124146001
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java:
##########
@@ -296,6 +326,70 @@ public void
testOnNewWorkerMetadata_correctlyRemovesStaleWindmillServers()
assertTrue(currentBackends.globalDataStreams().isEmpty());
}
+ @Test
+ public void testOnNewWorkerMetadata_endpointTypeChanged() throws
InterruptedException {
+ GetWorkBudgetDistributor getWorkBudgetDistributor =
mock(GetWorkBudgetDistributor.class);
+ fanOutStreamingEngineWorkProvider =
+ newFanOutStreamingEngineWorkerHarness(
+ GetWorkBudget.builder().setItems(1).setBytes(1).build(),
+ getWorkBudgetDistributor,
+ noOpProcessWorkItemFn());
+
+ String workerToken = "workerToken1";
+ String workerToken2 = "workerToken2";
+
+ WorkerMetadataResponse firstWorkerMetadata =
+ WorkerMetadataResponse.newBuilder()
+ .setMetadataVersion(1)
+ .addWorkEndpoints(
+ WorkerMetadataResponse.Endpoint.newBuilder()
+ .setBackendWorkerToken(workerToken)
+ .build())
+ .addWorkEndpoints(
+ WorkerMetadataResponse.Endpoint.newBuilder()
+ .setBackendWorkerToken(workerToken2)
+ .build())
+ .setExternalEndpoint(AUTHENTICATING_SERVICE)
+ .setEndpointType(EndpointType.DIRECTPATH)
+ .putAllGlobalDataEndpoints(DEFAULT)
+ .build();
+
+ WorkerMetadataResponse secondWorkerMetadata =
+ WorkerMetadataResponse.newBuilder()
+ .setMetadataVersion(2)
Review Comment:
There is a strange behavior of StreamObserver of GetWorkerMetadataTestStub.
When non-incremental metadata versions are injected (2->1 or 1->1), the second
endpoint injection gets dropped and consumeWorkerMetadata is not invoked.
But in the case of incremental versions (1->2), consumeWorkerMetadata is
invoked both times.
I verified the behavior is not because of the version check condition in
consumeWorkerMetadata; the method itself does not get run. I am still debugging
this and will update the test case once I identify the issue.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]