[ 
https://issues.apache.org/jira/browse/BEAM-4778?focusedWorklogId=130331&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-130331
 ]

ASF GitHub Bot logged work on BEAM-4778:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 02/Aug/18 16:49
            Start Date: 02/Aug/18 16:49
    Worklog Time Spent: 10m 
      Work Description: tweise closed pull request #5958: [BEAM-4778] add 
option to flink job server to clean staged artifacts per-job
URL: https://github.com/apache/beam/pull/5958
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/runners/flink/job-server/build.gradle 
b/runners/flink/job-server/build.gradle
index b74525254d9..aa56a475935 100644
--- a/runners/flink/job-server/build.gradle
+++ b/runners/flink/job-server/build.gradle
@@ -59,7 +59,11 @@ dependencies {
 runShadow {
   def jobHost = project.hasProperty("jobHost") ? project.property("jobHost") : 
"localhost:8099"
   def artifactsDir = project.hasProperty("artifactsDir") ?  
project.property("artifactsDir") : "/tmp/flink-artifacts"
+  def cleanArtifactsPerJob = project.hasProperty("cleanArtifactsPerJob")
   args = ["--job-host=${jobHost}", "--artifacts-dir=${artifactsDir}"]
+  if (cleanArtifactsPerJob)
+    args += ["--clean-artifacts-per-job"]
+
   // Enable remote debugging.
   jvmArgs = ["-Xdebug", 
"-Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5005"]
 }
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
index 6d6685c5c37..7e9b14a2acd 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
@@ -57,6 +57,12 @@
     @Option(name = "--artifacts-dir", usage = "The location to store staged 
artifact files")
     private String artifactStagingPath = "/tmp/beam-artifact-staging";
 
+    @Option(
+      name = "--clean-artifacts-per-job",
+      usage = "When true, remove each job's staged artifacts when it completes"
+    )
+    private Boolean cleanArtifactsPerJob = false;
+
     @Option(name = "--flink-master-url", usage = "Flink master url to submit 
job.")
     private String flinkMasterUrl = "[auto]";
   }
@@ -183,6 +189,11 @@ private InMemoryJobService createJobService() throws 
IOException {
             throw new RuntimeException(exn);
           }
         },
+        (String stagingSessionToken) -> {
+          if (configuration.cleanArtifactsPerJob) {
+            
artifactStagingServer.getService().removeArtifacts(stagingSessionToken);
+          }
+        },
         invoker);
   }
 
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactRetrievalService.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactRetrievalService.java
index 9319a70650b..613ec4d4dd7 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactRetrievalService.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactRetrievalService.java
@@ -182,6 +182,10 @@ static ProxyManifest loadManifest(String retrievalToken) 
throws IOException {
     LOG.info("Loading manifest for retrieval token {}", retrievalToken);
     // look for manifest file at $retrieval_token
     ResourceId manifestResourceId = 
getManifestLocationFromToken(retrievalToken);
+    return loadManifest(manifestResourceId);
+  }
+
+  static ProxyManifest loadManifest(ResourceId manifestResourceId) throws 
IOException {
     ProxyManifest.Builder manifestBuilder = ProxyManifest.newBuilder();
     try (InputStream stream = 
Channels.newInputStream(FileSystems.open(manifestResourceId))) {
       String contents = new String(ByteStreams.toByteArray(stream), 
StandardCharsets.UTF_8);
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactStagingService.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactStagingService.java
index f059971b6fc..5769045d0de 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactStagingService.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactStagingService.java
@@ -88,8 +88,10 @@
   public void commitManifest(
       CommitManifestRequest request, StreamObserver<CommitManifestResponse> 
responseObserver) {
     try {
-      ResourceId manifestResourceId = 
getManifestFileResourceId(request.getStagingSessionToken());
-      ResourceId artifactDirResourceId = 
getArtifactDirResourceId(request.getStagingSessionToken());
+      StagingSessionToken stagingSessionToken =
+          StagingSessionToken.decode(request.getStagingSessionToken());
+      ResourceId manifestResourceId = 
getManifestFileResourceId(stagingSessionToken);
+      ResourceId artifactDirResourceId = 
getArtifactDirResourceId(stagingSessionToken);
       ProxyManifest.Builder proxyManifestBuilder =
           ProxyManifest.newBuilder().setManifest(request.getManifest());
       for (ArtifactMetadata artifactMetadata : 
request.getManifest().getArtifactList()) {
@@ -138,7 +140,7 @@ public static String generateStagingSessionToken(String 
sessionId, String basePa
     StagingSessionToken stagingSessionToken = new StagingSessionToken();
     stagingSessionToken.setSessionId(sessionId);
     stagingSessionToken.setBasePath(basePath);
-    return encodeStagingSessionToken(stagingSessionToken);
+    return stagingSessionToken.encode();
   }
 
   private String encodedFileName(ArtifactMetadata artifactMetadata) {
@@ -146,48 +148,49 @@ private String encodedFileName(ArtifactMetadata 
artifactMetadata) {
         + Hashing.sha256().hashString(artifactMetadata.getName(), 
CHARSET).toString();
   }
 
-  private static StagingSessionToken decodeStagingSessionToken(String 
stagingSessionToken)
-      throws Exception {
-    try {
-      return MAPPER.readValue(stagingSessionToken, StagingSessionToken.class);
-    } catch (JsonProcessingException e) {
-      String message =
-          String.format(
-              "Unable to deserialize staging token %s. Expected format: %s. 
Error: %s",
-              stagingSessionToken,
-              "{\"sessionId\": \"sessionId\", \"basePath\": \"basePath\"}",
-              e.getMessage());
-      throw new 
StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription(message));
-    }
-  }
+  public void removeArtifacts(String stagingSessionToken) throws Exception {
+    StagingSessionToken parsedToken = 
StagingSessionToken.decode(stagingSessionToken);
+    ResourceId dir = getJobDirResourceId(parsedToken);
+    ResourceId manifestResourceId = dir.resolve(MANIFEST, 
StandardResolveOptions.RESOLVE_FILE);
 
-  private static String encodeStagingSessionToken(StagingSessionToken 
stagingSessionToken)
-      throws Exception {
-    try {
-      return MAPPER.writeValueAsString(stagingSessionToken);
-    } catch (JsonProcessingException e) {
-      LOG.error("Error {} occurred while serializing {}.", e.getMessage(), 
stagingSessionToken);
-      throw e;
+    LOG.debug("Removing dir {}", dir);
+
+    ProxyManifest proxyManifest =
+        
BeamFileSystemArtifactRetrievalService.loadManifest(manifestResourceId);
+    for (Location location : proxyManifest.getLocationList()) {
+      String uri = location.getUri();
+      LOG.debug("Removing artifact: {}", uri);
+      FileSystems.delete(
+          Collections.singletonList(FileSystems.matchNewResource(uri, false /* 
is directory */)));
     }
+
+    ResourceId artifactsResourceId =
+        dir.resolve(ARTIFACTS, StandardResolveOptions.RESOLVE_DIRECTORY);
+    LOG.debug("Removing artifacts: {}", artifactsResourceId);
+    FileSystems.delete(Collections.singletonList(artifactsResourceId));
+    LOG.debug("Removing manifest: {}", manifestResourceId);
+    FileSystems.delete(Collections.singletonList(manifestResourceId));
+    LOG.debug("Removing empty dir: {}", dir);
+    FileSystems.delete(Collections.singletonList(dir));
+    LOG.info("Removed dir {}", dir);
   }
 
-  private ResourceId getJobDirResourceId(String stagingSessionToken) throws 
Exception {
+  private ResourceId getJobDirResourceId(StagingSessionToken 
stagingSessionToken) {
     ResourceId baseResourceId;
-    StagingSessionToken parsedToken = 
decodeStagingSessionToken(stagingSessionToken);
     // Get or Create the base path
     baseResourceId =
-        FileSystems.matchNewResource(parsedToken.getBasePath(), true /* 
isDirectory */);
+        FileSystems.matchNewResource(stagingSessionToken.getBasePath(), true 
/* isDirectory */);
     // Using sessionId as the subDir to store artifacts and manifest.
     return baseResourceId.resolve(
-        parsedToken.getSessionId(), StandardResolveOptions.RESOLVE_DIRECTORY);
+        stagingSessionToken.getSessionId(), 
StandardResolveOptions.RESOLVE_DIRECTORY);
   }
 
-  private ResourceId getManifestFileResourceId(String stagingSessionToken) 
throws Exception {
+  private ResourceId getManifestFileResourceId(StagingSessionToken 
stagingSessionToken) {
     return getJobDirResourceId(stagingSessionToken)
         .resolve(MANIFEST, StandardResolveOptions.RESOLVE_FILE);
   }
 
-  private ResourceId getArtifactDirResourceId(String stagingSessionToken) 
throws Exception {
+  private ResourceId getArtifactDirResourceId(StagingSessionToken 
stagingSessionToken) {
     return getJobDirResourceId(stagingSessionToken)
         .resolve(ARTIFACTS, StandardResolveOptions.RESOLVE_DIRECTORY);
   }
@@ -211,10 +214,13 @@ public void onNext(PutArtifactRequest putArtifactRequest) 
{
         checkNotNull(putArtifactRequest);
         checkNotNull(putArtifactRequest.getMetadata());
         metadata = putArtifactRequest.getMetadata();
+        LOG.info("stored metadata: {}", metadata);
         // Check the base path exists or create the base path
         try {
           ResourceId artifactsDirId =
-              
getArtifactDirResourceId(putArtifactRequest.getMetadata().getStagingSessionToken());
+              getArtifactDirResourceId(
+                  StagingSessionToken.decode(
+                      
putArtifactRequest.getMetadata().getStagingSessionToken()));
           artifactId =
               artifactsDirId.resolve(
                   encodedFileName(metadata.getMetadata()), 
StandardResolveOptions.RESOLVE_FILE);
@@ -329,6 +335,30 @@ private void setBasePath(String basePath) {
       this.basePath = basePath;
     }
 
+    public String encode() {
+      try {
+        return MAPPER.writeValueAsString(this);
+      } catch (JsonProcessingException e) {
+        String message =
+            String.format("Error %s occurred while serializing %s", 
e.getMessage(), this);
+        throw new 
StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription(message));
+      }
+    }
+
+    public static StagingSessionToken decode(String stagingSessionToken) 
throws Exception {
+      try {
+        return MAPPER.readValue(stagingSessionToken, 
StagingSessionToken.class);
+      } catch (JsonProcessingException e) {
+        String message =
+            String.format(
+                "Unable to deserialize staging token %s. Expected format: %s. 
Error: %s",
+                stagingSessionToken,
+                "{\"sessionId\": \"sessionId\", \"basePath\": \"basePath\"}",
+                e.getMessage());
+        throw new 
StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription(message));
+      }
+    }
+
     @Override
     public String toString() {
       return "StagingSessionToken{"
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java
index 1fc2505a8f2..97758e1b46d 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java
@@ -38,6 +38,7 @@
 import org.apache.beam.model.pipeline.v1.Endpoints;
 import org.apache.beam.runners.core.construction.graph.PipelineValidator;
 import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.sdk.fn.function.ThrowingConsumer;
 import org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver;
 import org.apache.beam.vendor.grpc.v1.io.grpc.Status;
 import org.apache.beam.vendor.grpc.v1.io.grpc.StatusException;
@@ -70,26 +71,35 @@
   public static InMemoryJobService create(
       Endpoints.ApiServiceDescriptor stagingServiceDescriptor,
       Function<String, String> stagingServiceTokenProvider,
+      ThrowingConsumer<String> cleanupJobFn,
       JobInvoker invoker) {
-    return new InMemoryJobService(stagingServiceDescriptor, 
stagingServiceTokenProvider, invoker);
+    return new InMemoryJobService(
+        stagingServiceDescriptor, stagingServiceTokenProvider, cleanupJobFn, 
invoker);
   }
 
   private final ConcurrentMap<String, JobPreparation> preparations;
   private final ConcurrentMap<String, JobInvocation> invocations;
+  private final ConcurrentMap<String, String> stagingSessionTokens;
   private final Endpoints.ApiServiceDescriptor stagingServiceDescriptor;
   private final Function<String, String> stagingServiceTokenProvider;
+  private final ThrowingConsumer<String> cleanupJobFn;
   private final JobInvoker invoker;
 
   private InMemoryJobService(
       Endpoints.ApiServiceDescriptor stagingServiceDescriptor,
       Function<String, String> stagingServiceTokenProvider,
+      ThrowingConsumer<String> cleanupJobFn,
       JobInvoker invoker) {
     this.stagingServiceDescriptor = stagingServiceDescriptor;
     this.stagingServiceTokenProvider = stagingServiceTokenProvider;
+    this.cleanupJobFn = cleanupJobFn;
     this.invoker = invoker;
 
     this.preparations = new ConcurrentHashMap<>();
     this.invocations = new ConcurrentHashMap<>();
+
+    // Map "preparation ID" to staging token
+    this.stagingSessionTokens = new ConcurrentHashMap<>();
   }
 
   @Override
@@ -121,12 +131,15 @@ public void prepare(
         return;
       }
 
+      String stagingSessionToken = 
stagingServiceTokenProvider.apply(preparationId);
+      stagingSessionTokens.putIfAbsent(preparationId, stagingSessionToken);
+
       // send response
       PrepareJobResponse response =
           PrepareJobResponse.newBuilder()
               .setPreparationId(preparationId)
               .setArtifactStagingEndpoint(stagingServiceDescriptor)
-              
.setStagingSessionToken(stagingServiceTokenProvider.apply(preparationId))
+              .setStagingSessionToken(stagingSessionToken)
               .build();
       responseObserver.onNext(response);
       responseObserver.onCompleted();
@@ -161,6 +174,26 @@ public void run(RunJobRequest request, 
StreamObserver<RunJobResponse> responseOb
           invoker.invoke(
               preparation.pipeline(), preparation.options(), 
request.getRetrievalToken());
       String invocationId = invocation.getId();
+
+      invocation.addStateListener(
+          (state) -> {
+            if (!JobInvocation.isTerminated(state)) {
+              return;
+            }
+            String stagingSessionToken = 
stagingSessionTokens.get(preparationId);
+            stagingSessionTokens.remove(preparationId);
+            if (cleanupJobFn != null) {
+              try {
+                cleanupJobFn.accept(stagingSessionToken);
+              } catch (Exception e) {
+                LOG.error(
+                    "Failed to remove job staging directory for token {}: {}",
+                    stagingSessionToken,
+                    e);
+              }
+            }
+          });
+
       invocation.start();
       invocations.put(invocationId, invocation);
       RunJobResponse response = 
RunJobResponse.newBuilder().setJobId(invocationId).build();
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java
index f99a31a4b5c..831edc94280 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java
@@ -42,4 +42,16 @@
 
   /** Listen for job messages with a {@link Consumer}. */
   void addMessageListener(Consumer<JobMessage> messageStreamObserver);
+
+  static Boolean isTerminated(Enum state) {
+    switch (state) {
+      case DONE:
+      case FAILED:
+      case CANCELLED:
+      case DRAINED:
+        return true;
+      default:
+        return false;
+    }
+  }
 }
diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactServicesTest.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactServicesTest.java
index 19ffea9351c..d7a0c57c028 100644
--- 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactServicesTest.java
+++ 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactServicesTest.java
@@ -198,6 +198,14 @@ public void generateStagingSessionTokenTest() throws 
Exception {
         "{\"sessionId\":\"abc123\",\"basePath\":\"" + basePath + "\"}", 
stagingToken);
   }
 
+  void checkCleanup(String stagingSessionToken, String stagingSession) throws 
Exception {
+    Assert.assertTrue(
+        Files.exists(Paths.get(stagingDir.toAbsolutePath().toString(), 
stagingSession)));
+    stagingService.removeArtifacts(stagingSessionToken);
+    Assert.assertFalse(
+        Files.exists(Paths.get(stagingDir.toAbsolutePath().toString(), 
stagingSession)));
+  }
+
   @Test
   public void putArtifactsSingleSmallFileTest() throws Exception {
     String fileName = "file1";
@@ -219,6 +227,7 @@ public void putArtifactsSingleSmallFileTest() throws 
Exception {
             BeamFileSystemArtifactStagingService.MANIFEST),
         Paths.get(stagingToken));
     assertFiles(Collections.singleton(fileName), stagingToken);
+    checkCleanup(stagingSessionToken, stagingSession);
   }
 
   @Test
@@ -272,6 +281,8 @@ public void putArtifactsMultipleFilesTest() throws 
Exception {
         Paths.get(stagingDir.toAbsolutePath().toString(), stagingSession, 
"MANIFEST").toString(),
         retrievalToken);
     assertFiles(files.keySet(), retrievalToken);
+
+    checkCleanup(stagingSessionToken, stagingSession);
   }
 
   @Test
@@ -332,6 +343,8 @@ public void putArtifactsMultipleFilesConcurrentlyTest() 
throws Exception {
         Paths.get(stagingDir.toAbsolutePath().toString(), stagingSession, 
"MANIFEST").toString(),
         retrievalToken);
     assertFiles(files.keySet(), retrievalToken);
+
+    checkCleanup(stagingSessionToken, stagingSession);
   }
 
   @Test
@@ -418,6 +431,9 @@ public void 
putArtifactsMultipleFilesConcurrentSessionsTest() throws Exception {
         retrievalToken2);
     assertFiles(files1.keySet(), retrievalToken1);
     assertFiles(files2.keySet(), retrievalToken2);
+
+    checkCleanup(stagingSessionToken1, stagingSession1);
+    checkCleanup(stagingSessionToken2, stagingSession2);
   }
 
   private void assertFiles(Set<String> files, String retrievalToken) throws 
Exception {
diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobServiceTest.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobServiceTest.java
index 768e6289fb3..af6934071a2 100644
--- 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobServiceTest.java
+++ 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobServiceTest.java
@@ -22,6 +22,7 @@
 import static org.hamcrest.core.Is.is;
 import static org.hamcrest.core.IsNull.notNullValue;
 import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -58,7 +59,8 @@
   public void setUp() throws Exception {
     MockitoAnnotations.initMocks(this);
     stagingServiceDescriptor = 
Endpoints.ApiServiceDescriptor.getDefaultInstance();
-    service = InMemoryJobService.create(stagingServiceDescriptor, session -> 
"token", invoker);
+    service =
+        InMemoryJobService.create(stagingServiceDescriptor, session -> 
"token", null, invoker);
     when(invoker.invoke(TEST_PIPELINE, TEST_OPTIONS, 
TEST_RETRIEVAL_TOKEN)).thenReturn(invocation);
     when(invocation.getId()).thenReturn(TEST_JOB_ID);
   }
@@ -105,6 +107,9 @@ public void testJobSubmissionUsesJobInvokerAndIsSuccess() 
throws Exception {
     assertThat(runRecorder.values, hasSize(1));
     JobApi.RunJobResponse runResponse = runRecorder.values.get(0);
     assertThat(runResponse.getJobId(), is(TEST_JOB_ID));
+
+    verify(invocation, times(1)).addStateListener(any());
+    verify(invocation, times(1)).start();
   }
 
   private static class RecordingObserver<T> implements StreamObserver<T> {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 130331)
    Time Spent: 6.5h  (was: 6h 20m)

> Less wasteful ArtifactStagingService
> ------------------------------------
>
>                 Key: BEAM-4778
>                 URL: https://issues.apache.org/jira/browse/BEAM-4778
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-core
>            Reporter: Eugene Kirpichov
>            Assignee: Ryan Williams
>            Priority: Major
>          Time Spent: 6.5h
>  Remaining Estimate: 0h
>
> [https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactStagingService.java]
>  is the main implementation of ArtifactStagingService.
> It stages artifacts into a directory; and in practice the passed staging 
> session token is such that the directory is different for every job. This 
> leads to 2 issues:
>  * It doesn't get cleaned up when the job finishes or even when the 
> JobService shuts down, so we have disk space leaks if running a lot of jobs 
> (e.g. a suite of ValidatesRunner tests)
>  * We repeatedly re-stage the same artifacts. Instead, ideally, we should 
> identify that some artifacts don't need to be staged - based on knowing their 
> md5. The artifact staging protocol has rudimentary support for this but may 
> need to be modified.
> CC: [~angoenka]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to