mynameborat commented on a change in pull request #1553:
URL: https://github.com/apache/samza/pull/1553#discussion_r751481543



##########
File path: 
samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinator.java
##########
@@ -234,13 +262,33 @@ private void prepareWorkerExecution(JobModel jobModel, 
JobCoordinatorMetadata ne
     }
   }
 
+  /**
+   * Wrapper around {@link MetadataResourceUtil} constructor so it can be 
stubbed during testing.
+   */
   @VisibleForTesting
   MetadataResourceUtil metadataResourceUtil(JobModel jobModel) {
     return new MetadataResourceUtil(jobModel, this.metrics, this.config);
   }
 
+  /**
+   * Wrapper around {@link DiagnosticsUtil#buildDiagnosticsManager} so it can 
be stubbed during testing.
+   */
+  @VisibleForTesting
+  Optional<DiagnosticsManager> buildDiagnosticsManager(String jobName,
+      String jobId, JobModel jobModel, String containerId, Optional<String> 
execEnvContainerId, Config config) {
+    return DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, jobModel, 
containerId, execEnvContainerId, config);
+  }
+
   private Set<JobMetadataChange> 
checkForMetadataChanges(JobCoordinatorMetadata newMetadata) {
     JobCoordinatorMetadata previousMetadata = 
this.jobCoordinatorMetadataManager.readJobCoordinatorMetadata();
     return 
this.jobCoordinatorMetadataManager.checkForMetadataChanges(newMetadata, 
previousMetadata);
   }
+
+  private static void quietlyStop(DiagnosticsManager diagnosticsManager) {
+    try {
+      diagnosticsManager.stop();

Review comment:
       Since you are making some changes related to diagnostics manager, I 
noticed the following within the `stop()` method
   
   ```
   scheduler.shutdown();
   
       // Allow any scheduled publishes to finish, and block for termination
       scheduler.awaitTermination(terminationDuration.toMillis(), 
TimeUnit.MILLISECONDS);
   
       if (!scheduler.isTerminated()) {
         LOG.warn("Unable to terminate scheduler");
         scheduler.shutdownNow();
       }
       this.systemProducer.stop();
       
       ```
       
       It seems like the `systemProducer` can linger in the event Interruption 
occurs due to `awaitTermination`. Would you mind fixing this as well to ensure 
`systemProducer.stop()` is invoked as part of finally?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@samza.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to