mynameborat commented on a change in pull request #1553: URL: https://github.com/apache/samza/pull/1553#discussion_r751481543
########## File path: samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinator.java ########## @@ -234,13 +262,33 @@ private void prepareWorkerExecution(JobModel jobModel, JobCoordinatorMetadata ne } } + /** + * Wrapper around {@link MetadataResourceUtil} constructor so it can be stubbed during testing. + */ @VisibleForTesting MetadataResourceUtil metadataResourceUtil(JobModel jobModel) { return new MetadataResourceUtil(jobModel, this.metrics, this.config); } + /** + * Wrapper around {@link DiagnosticsUtil#buildDiagnosticsManager} so it can be stubbed during testing. + */ + @VisibleForTesting + Optional<DiagnosticsManager> buildDiagnosticsManager(String jobName, + String jobId, JobModel jobModel, String containerId, Optional<String> execEnvContainerId, Config config) { + return DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, jobModel, containerId, execEnvContainerId, config); + } + private Set<JobMetadataChange> checkForMetadataChanges(JobCoordinatorMetadata newMetadata) { JobCoordinatorMetadata previousMetadata = this.jobCoordinatorMetadataManager.readJobCoordinatorMetadata(); return this.jobCoordinatorMetadataManager.checkForMetadataChanges(newMetadata, previousMetadata); } + + private static void quietlyStop(DiagnosticsManager diagnosticsManager) { + try { + diagnosticsManager.stop(); Review comment: Since you are making some changes related to diagnostics manager, I noticed the following within the `stop()` method ``` scheduler.shutdown(); // Allow any scheduled publishes to finish, and block for termination scheduler.awaitTermination(terminationDuration.toMillis(), TimeUnit.MILLISECONDS); if (!scheduler.isTerminated()) { LOG.warn("Unable to terminate scheduler"); scheduler.shutdownNow(); } this.systemProducer.stop(); ``` It seems like the `systemProducer` can linger in the event Interruption occurs due to `awaitTermination`. Would you mind fixing this as well to ensure `systemProducer.stop()` is invoked as part of finally? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@samza.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org