khandelwal-prateek commented on code in PR #4097:
URL: https://github.com/apache/gobblin/pull/4097#discussion_r1955716130
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java:
##########
@@ -188,4 +189,15 @@ protected void removeTasksFromCurrentJob(List<String>
workUnitIdsToRemove) {
protected void addTasksToCurrentJob(List<WorkUnit> workUnitsToAdd) {
log.warn("NOT IMPLEMENTED: Temporal addTasksToCurrentJob");
}
+
+ @Override
+ public void close() throws IOException {
+ try {
+ this.workflowServiceStubs.getOptions().getMetricsScope().close();
Review Comment:
every place that uses WorkflowServiceStubs must remember to close
MetricsScope explicitly.. this is hard to maintain and any change needs to
updated at each caller
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java:
##########
@@ -218,7 +218,11 @@ public void
handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
throw new RuntimeException(e);
}
}));
- launcher.launchJob(listener);
+ try {
+ launcher.launchJob(listener);
+ } finally {
+ launcher.close();
+ }
Review Comment:
since launcher implements the Closeable interface, we can use
`try-with-resources` which executes the `close()` automatically method when the
try block exits, irrespective of whether it completes normally or due to an
exception
```
try (JobLauncher launcher = buildJobLauncher(newJobArrival.getJobConfig())) {
...
launcher.launchJob(listener);
} catch (Exception e) {
...
}
```
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java:
##########
@@ -292,6 +293,12 @@ public synchronized void stop() {
this.containerMetrics.get().stopMetricsReporting();
}
+ try {
+ this.workflowServiceStubs.getOptions().getMetricsScope().close();
+ } catch (Exception e) {
+ logger.error("Exception occurred while closing MetricsScope", e);
+ }
+
Review Comment:
We should handle the cleanup though a wrapper class. Since
`WorkflowServiceStubs` has a `shutdown()` method but does not close
MetricsScope, we can wrap `WorkflowServiceStubs` in a custom class to handle
cleanup
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java:
##########
@@ -163,6 +164,9 @@ public GobblinTemporalTaskRunner(String applicationName,
ConfigurationKeys.DEFAULT_GOBBLIN_TASK_EVENT_REPORTING_FAILURE_FATAL);
this.workers = new ArrayList<>();
+ String connectionUri =
clusterConfig.getString(GobblinTemporalConfigurationKeys.TEMPORAL_CONNECTION_STRING);
+ this.workflowServiceStubs =
TemporalWorkflowClientFactory.createServiceInstance(connectionUri);
Review Comment:
We can modify `TemporalWorkflowClientFactory` to return a custom
AutoCloseable wrapper(eg. `ManagedWorkflowServiceStubs`) instead of directly
returning WorkflowServiceStubs. We can implement the close method to ensure
that the metricsScope is closed (or any other resource is cleaned up) and each
caller can use this close method. This encapsulates resource management, so
callers don’t need to worry about cleanup.
```
public static ManagedWorkflowServiceStubs createServiceInstance(String
connectionUri) throws Exception {
...
return new ManagedWorkflowServiceStubs(stubs, metricsScope);
}
```
```
public class ManagedWorkflowServiceStubs implements AutoCloseable {
private final WorkflowServiceStubs workflowServiceStubs;
private final Scope metricsScope;
@Override
public void close() {
...
workflowServiceStubs.shutdown();
...
metricsScope.close();
...
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]