[ 
https://issues.apache.org/jira/browse/BEAM-4778?focusedWorklogId=130085&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-130085
 ]

ASF GitHub Bot logged work on BEAM-4778:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 02/Aug/18 03:51
            Start Date: 02/Aug/18 03:51
    Worklog Time Spent: 10m 
      Work Description: tweise commented on a change in pull request #5958: 
[BEAM-4778] add option to flink job server to clean staged artifacts per-job
URL: https://github.com/apache/beam/pull/5958#discussion_r207094725
 
 

 ##########
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactStagingService.java
 ##########
 @@ -138,56 +140,57 @@ public static String generateStagingSessionToken(String 
sessionId, String basePa
     StagingSessionToken stagingSessionToken = new StagingSessionToken();
     stagingSessionToken.setSessionId(sessionId);
     stagingSessionToken.setBasePath(basePath);
-    return encodeStagingSessionToken(stagingSessionToken);
+    return stagingSessionToken.encode();
   }
 
   private String encodedFileName(ArtifactMetadata artifactMetadata) {
     return "artifact_"
         + Hashing.sha256().hashString(artifactMetadata.getName(), 
CHARSET).toString();
   }
 
-  private static StagingSessionToken decodeStagingSessionToken(String 
stagingSessionToken)
-      throws Exception {
-    try {
-      return MAPPER.readValue(stagingSessionToken, StagingSessionToken.class);
-    } catch (JsonProcessingException e) {
-      String message =
-          String.format(
-              "Unable to deserialize staging token %s. Expected format: %s. 
Error: %s",
-              stagingSessionToken,
-              "{\"sessionId\": \"sessionId\", \"basePath\": \"basePath\"}",
-              e.getMessage());
-      throw new 
StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription(message));
-    }
-  }
+  public void removeArtifacts(String stagingSessionToken) throws Exception {
+    StagingSessionToken parsedToken = 
StagingSessionToken.decode(stagingSessionToken);
+    ResourceId dir = getJobDirResourceId(parsedToken);
+    ResourceId manifestResourceId = dir.resolve(MANIFEST, 
StandardResolveOptions.RESOLVE_FILE);
 
-  private static String encodeStagingSessionToken(StagingSessionToken 
stagingSessionToken)
-      throws Exception {
-    try {
-      return MAPPER.writeValueAsString(stagingSessionToken);
-    } catch (JsonProcessingException e) {
-      LOG.error("Error {} occurred while serializing {}.", e.getMessage(), 
stagingSessionToken);
-      throw e;
+    LOG.info("Removing dir {}", dir);
 
 Review comment:
   Do we really this and following logging at info level? Why not debug?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 130085)
    Time Spent: 5.5h  (was: 5h 20m)

> Less wasteful ArtifactStagingService
> ------------------------------------
>
>                 Key: BEAM-4778
>                 URL: https://issues.apache.org/jira/browse/BEAM-4778
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-core
>            Reporter: Eugene Kirpichov
>            Assignee: Ryan Williams
>            Priority: Major
>          Time Spent: 5.5h
>  Remaining Estimate: 0h
>
> [https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactStagingService.java]
>  is the main implementation of ArtifactStagingService.
> It stages artifacts into a directory; and in practice the passed staging 
> session token is such that the directory is different for every job. This 
> leads to 2 issues:
>  * It doesn't get cleaned up when the job finishes or even when the 
> JobService shuts down, so we have disk space leaks if running a lot of jobs 
> (e.g. a suite of ValidatesRunner tests)
>  * We repeatedly re-stage the same artifacts. Instead, ideally, we should 
> identify that some artifacts don't need to be staged - based on knowing their 
> md5. The artifact staging protocol has rudimentary support for this but may 
> need to be modified.
> CC: [~angoenka]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to