scwhittle commented on code in PR #29879:
URL: https://github.com/apache/beam/pull/29879#discussion_r1446015818


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java:
##########
@@ -311,8 +309,6 @@ public WorkerMessageResponse 
reportWorkerMessage(StreamingScalingReport report)
     if (result.getWorkerMessageResponses() == null) {
       return new WorkerMessageResponse();
     }
-    WorkerMessageResponse response = result.getWorkerMessageResponses().get(0);
-    logger.debug("Worker Message Response result: {}", response);
-    return response;
+    return new WorkerMessageResponse();

Review Comment:
   How about no return value if it's just meaningless?  This is an internal 
class (could be marked @Internal above to make that clear) so we can change the 
method signature later if we do want a return value.
   
   Otherwise this should be coded so that it would return the real value if it 
was present (since this sdk will possibly run for a long time) as it could be 
confusing from just looking at method signature.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -587,6 +588,13 @@ public void start() {
         options.getWindmillHarnessUpdateReportingPeriod().getMillis(),
         TimeUnit.MILLISECONDS);
 
+    workerMessageReportTimer = executorSupplier.apply("WorkerMessageTimer");
+    workerMessageReportTimer.scheduleWithFixedDelay(
+        this::reportPeriodicWorkerMessage,
+        0,
+        options.getWindmillHarnessUpdateReportingPeriod().getMillis(),

Review Comment:
   have a >0 check similar to the activeworkrefresh



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java:
##########
@@ -280,13 +279,12 @@ public WorkItemServiceState 
reportWorkItemStatus(WorkItemStatus workItemStatus)
 
   /** Reports the autoscaling signals to dataflow */
   @Override
-  public WorkerMessageResponse reportWorkerMessage(StreamingScalingReport 
report)
+  public WorkerMessageResponse 
reportStreamingMetricsWorkerMessage(StreamingScalingReport report)

Review Comment:
   method still seems a little confusing, request is inconsistent with response 
and method name.
   
   I would expect either:
   StreamingScalingResponse 
reportStreamingScalingMetrics(StreamingScalingReport);
   or
   WorkerMessageResponse reportWorkerMessage(WorkerMessage)
   
   If we have the second we could have helper methods which construct the 
WorkerMessage from a StreamingScalingReport.
   



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClientTest.java:
##########
@@ -227,10 +230,22 @@ public void testCloudServiceCallMultipleWorkItems() 
throws Exception {
     client.getWorkItem();
   }
 
-  // @Test
-  // public void testWorkMessageCreation() throws Exception {
-  //   WorkUnitClient client = new DataflowWorkUnitClient(pipelineOptions, 
LOG);
-  // }
+  @Test
+  public void testReportWorkerMessageEmptyResponse() throws Exception {
+    MockLowLevelHttpResponse response = new MockLowLevelHttpResponse();
+    response.setContentType(Json.MEDIA_TYPE);
+    SendWorkerMessagesResponse workerMessage = new 
SendWorkerMessagesResponse();
+    workerMessage.setFactory(Transport.getJsonFactory());
+    response.setContent(workerMessage.toPrettyString());
+    when(request.execute()).thenReturn(response);
+    StreamingScalingReport activeThreadsReport =
+        new StreamingScalingReport().setActiveThreadCount(1);
+    WorkUnitClient client = new DataflowWorkUnitClient(pipelineOptions, LOG);
+    WorkerMessageResponse expected = new WorkerMessageResponse();
+    WorkerMessageResponse clientResponse =
+        client.reportStreamingMetricsWorkerMessage(activeThreadsReport);

Review Comment:
   would be a better test verify what the actual request made was, see 
testCloudServiceCall for how it does it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to