parveensania commented on code in PR #35901:
URL: https://github.com/apache/beam/pull/35901#discussion_r2299427487


##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java:
##########
@@ -4058,6 +4067,106 @@ public void testStuckCommit() throws Exception {
         removeDynamicFields(result.get(1L)));
   }
 
+  @Test
+  public void testSwitchStreamingWorkerHarness() throws Exception {
+    if (!streamingEngine) {
+      return;
+    }
+
+    List<ParallelInstruction> instructions =
+        Arrays.asList(
+            makeSourceInstruction(StringUtf8Coder.of()),
+            makeSinkInstruction(StringUtf8Coder.of(), 0));
+
+    // Start with CloudPath.
+    DataflowWorkerHarnessOptions options =
+        
createTestingPipelineOptions("--isWindmillServiceDirectPathEnabled=false");
+
+    StreamingDataflowWorker worker =
+        makeWorker(
+            defaultWorkerParams()
+                .setOptions(options)
+                .setInstructions(instructions)
+                .publishCounters()
+                .build());
+
+    GrpcDispatcherClient mockDispatcherClient = 
mock(GrpcDispatcherClient.class);
+
+    // FanOutStreamingEngineWorkerHarness creates
+    // CloudWindmillMetadataServiceV1Alpha1Stub and expects the stream to
+    // successfully start. Mocking it here.
+    Channel mockChannel = mock(Channel.class);
+    ClientCall<WorkerMetadataRequest, WorkerMetadataResponse> mockClientCall =
+        mock(ClientCall.class);
+    when(mockChannel.newCall(
+            
eq(CloudWindmillMetadataServiceV1Alpha1Grpc.getGetWorkerMetadataMethod()), 
any()))
+        .thenReturn(mockClientCall);
+    when(mockDispatcherClient.getWindmillMetadataServiceStubBlocking())
+        
.thenReturn(CloudWindmillMetadataServiceV1Alpha1Grpc.newStub(mockChannel));
+    java.lang.reflect.Field dispatcherClientField =
+        StreamingDataflowWorker.class.getDeclaredField("dispatcherClient");
+    dispatcherClientField.setAccessible(true);
+    dispatcherClientField.set(worker, mockDispatcherClient);
+
+    // Capture the config observer.
+    ArgumentCaptor<Consumer<StreamingGlobalConfig>> observerCaptor =
+        ArgumentCaptor.forClass(Consumer.class);
+    verify(mockGlobalConfigHandle, 
atLeastOnce()).registerConfigObserver(observerCaptor.capture());
+    List<Consumer<StreamingGlobalConfig>> observers = 
observerCaptor.getAllValues();
+
+    worker.start();
+
+    // Use reflection to check the harness type.
+    java.lang.reflect.Field harnessField =
+        
StreamingDataflowWorker.class.getDeclaredField("streamingWorkerHarness");
+    harnessField.setAccessible(true);
+    AtomicReference<Object> harnessRef = (AtomicReference<Object>) 
harnessField.get(worker);

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to