scwhittle commented on code in PR #29879:
URL: https://github.com/apache/beam/pull/29879#discussion_r1446015818
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java:
##########
@@ -311,8 +309,6 @@ public WorkerMessageResponse
reportWorkerMessage(StreamingScalingReport report)
if (result.getWorkerMessageResponses() == null) {
return new WorkerMessageResponse();
}
- WorkerMessageResponse response = result.getWorkerMessageResponses().get(0);
- logger.debug("Worker Message Response result: {}", response);
- return response;
+ return new WorkerMessageResponse();
Review Comment:
How about no return value if it's just meaningless? This is an internal
class (could be marked @Internal above to make that clear) so we can change the
method signature later if we do want a return value.
Otherwise this should be coded so that it would return the real value if it
was present (since this sdk will possibly run for a long time) as it could be
confusing from just looking at method signature.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -587,6 +588,13 @@ public void start() {
options.getWindmillHarnessUpdateReportingPeriod().getMillis(),
TimeUnit.MILLISECONDS);
+ workerMessageReportTimer = executorSupplier.apply("WorkerMessageTimer");
+ workerMessageReportTimer.scheduleWithFixedDelay(
+ this::reportPeriodicWorkerMessage,
+ 0,
+ options.getWindmillHarnessUpdateReportingPeriod().getMillis(),
Review Comment:
have a >0 check similar to the activeworkrefresh
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java:
##########
@@ -280,13 +279,12 @@ public WorkItemServiceState
reportWorkItemStatus(WorkItemStatus workItemStatus)
/** Reports the autoscaling signals to dataflow */
@Override
- public WorkerMessageResponse reportWorkerMessage(StreamingScalingReport
report)
+ public WorkerMessageResponse
reportStreamingMetricsWorkerMessage(StreamingScalingReport report)
Review Comment:
method still seems a little confusing, request is inconsistent with response
and method name.
I would expect either:
StreamingScalingResponse
reportStreamingScalingMetrics(StreamingScalingReport);
or
WorkerMessageResponse reportWorkerMessage(WorkerMessage)
If we have the second we could have helper methods which construct the
WorkerMessage from a StreamingScalingReport.
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClientTest.java:
##########
@@ -227,10 +230,22 @@ public void testCloudServiceCallMultipleWorkItems()
throws Exception {
client.getWorkItem();
}
- // @Test
- // public void testWorkMessageCreation() throws Exception {
- // WorkUnitClient client = new DataflowWorkUnitClient(pipelineOptions,
LOG);
- // }
+ @Test
+ public void testReportWorkerMessageEmptyResponse() throws Exception {
+ MockLowLevelHttpResponse response = new MockLowLevelHttpResponse();
+ response.setContentType(Json.MEDIA_TYPE);
+ SendWorkerMessagesResponse workerMessage = new
SendWorkerMessagesResponse();
+ workerMessage.setFactory(Transport.getJsonFactory());
+ response.setContent(workerMessage.toPrettyString());
+ when(request.execute()).thenReturn(response);
+ StreamingScalingReport activeThreadsReport =
+ new StreamingScalingReport().setActiveThreadCount(1);
+ WorkUnitClient client = new DataflowWorkUnitClient(pipelineOptions, LOG);
+ WorkerMessageResponse expected = new WorkerMessageResponse();
+ WorkerMessageResponse clientResponse =
+ client.reportStreamingMetricsWorkerMessage(activeThreadsReport);
Review Comment:
would be a better test verify what the actual request made was, see
testCloudServiceCall for how it does it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]