reuvenlax commented on code in PR #23333:
URL: https://github.com/apache/beam/pull/23333#discussion_r1004668562


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -832,21 +845,22 @@ public void start() {
     ExecutionStateSampler.instance().start();
 
     // Periodically report workers counters and other updates.
-    globalWorkerUpdatesTimer = new Timer("GlobalWorkerUpdatesTimer");
-    globalWorkerUpdatesTimer.schedule(
-        new TimerTask() {
+    globalWorkerUpdatesTimer = executorSupplier.get();
+    globalWorkerUpdatesTimer.scheduleWithFixedDelay(
+        new Runnable() {
           @Override
           public void run() {
             reportPeriodicWorkerUpdates();
           }
         },
         0,
-        options.getWindmillHarnessUpdateReportingPeriod().getMillis());
+        options.getWindmillHarnessUpdateReportingPeriod().getMillis(),
+        TimeUnit.MILLISECONDS);
 
-    refreshWorkTimer = new Timer("RefreshWork");
+    refreshWorkTimer = executorSupplier.get();

Review Comment:
   Is there any way to avoid losing the thread names? We sometimes use them 
when debugging threadz



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -832,21 +845,22 @@ public void start() {
     ExecutionStateSampler.instance().start();
 
     // Periodically report workers counters and other updates.
-    globalWorkerUpdatesTimer = new Timer("GlobalWorkerUpdatesTimer");
-    globalWorkerUpdatesTimer.schedule(
-        new TimerTask() {
+    globalWorkerUpdatesTimer = executorSupplier.get();
+    globalWorkerUpdatesTimer.scheduleWithFixedDelay(
+        new Runnable() {

Review Comment:
   I think you can replace this with this::reportPeriodicWorkerUpdates



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -1871,16 +1946,17 @@ private void schedulePeriodicGlobalConfigRequests() {
     LOG.info("windmillServerStub is now ready");
 
     // Now start a thread that periodically refreshes the windmill service 
endpoint.
-    globalConfigRefreshTimer = new Timer("GlobalConfigRefreshTimer");
-    globalConfigRefreshTimer.schedule(
-        new TimerTask() {
+    globalConfigRefreshTimer = executorSupplier.get();
+    globalConfigRefreshTimer.scheduleWithFixedDelay(
+        new Runnable() {

Review Comment:
   Again this could be this::getGlobalConfig



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -2382,6 +2458,7 @@ public List<Windmill.KeyedGetDataRequest> 
getKeysToRefresh(Instant refreshDeadli
                       .setKey(shardedKey.key())
                       .setShardingKey(shardedKey.shardingKey())
                       .setWorkToken(work.getWorkItem().getWorkToken())
+                      
.addAllLatencyAttribution(work.getLatencyAttributionList())

Review Comment:
   is there a reason this is on KeyedGetDataRequest and not on work-item commit?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -857,32 +871,34 @@ public void run() {
             }
           },
           options.getActiveWorkRefreshPeriodMillis(),
-          options.getActiveWorkRefreshPeriodMillis());
+          options.getActiveWorkRefreshPeriodMillis(),
+          TimeUnit.MILLISECONDS);
     }
     if (windmillServiceEnabled && options.getStuckCommitDurationMillis() > 0) {
       int periodMillis = Math.max(options.getStuckCommitDurationMillis() / 10, 
100);
-      refreshWorkTimer.schedule(
-          new TimerTask() {
+      refreshWorkTimer.scheduleWithFixedDelay(
+          new Runnable() {
             @Override
             public void run() {
               invalidateStuckCommits();
             }
           },
           periodMillis,
-          periodMillis);
+          periodMillis,
+          TimeUnit.MILLISECONDS);
     }
 
     if (options.getPeriodicStatusPageOutputDirectory() != null) {
-      statusPageTimer = new Timer("DumpStatusPages");
-      statusPageTimer.schedule(
-          new TimerTask() {
+      statusPageTimer = executorSupplier.get();
+      statusPageTimer.scheduleWithFixedDelay(
+          new Runnable() {

Review Comment:
   replace with lambda () -> {}



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -1197,13 +1239,36 @@ public State getState() {
     }
 
     public void setState(State state) {
+      Instant now = clock.get();

Review Comment:
   I believe this function is called multiple times per work item. We need to 
performance test this to make sure pipelines don't get more expensive (and we 
don't make e2e latency even worse)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to