[ https://issues.apache.org/jira/browse/BEAM-4778?focusedWorklogId=126757&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-126757 ]
ASF GitHub Bot logged work on BEAM-4778: ---------------------------------------- Author: ASF GitHub Bot Created on: 24/Jul/18 17:36 Start Date: 24/Jul/18 17:36 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #5958: [BEAM-4778] add option to flink job server to clean staged artifacts per-job URL: https://github.com/apache/beam/pull/5958#discussion_r204845207 ########## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactStagingService.java ########## @@ -138,56 +140,57 @@ public static String generateStagingSessionToken(String sessionId, String basePa StagingSessionToken stagingSessionToken = new StagingSessionToken(); stagingSessionToken.setSessionId(sessionId); stagingSessionToken.setBasePath(basePath); - return encodeStagingSessionToken(stagingSessionToken); + return stagingSessionToken.encode(); } private String encodedFileName(ArtifactMetadata artifactMetadata) { return "artifact_" + Hashing.sha256().hashString(artifactMetadata.getName(), CHARSET).toString(); } - private static StagingSessionToken decodeStagingSessionToken(String stagingSessionToken) - throws Exception { - try { - return MAPPER.readValue(stagingSessionToken, StagingSessionToken.class); - } catch (JsonProcessingException e) { - String message = - String.format( - "Unable to deserialize staging token %s. Expected format: %s. Error: %s", - stagingSessionToken, - "{\"sessionId\": \"sessionId\", \"basePath\": \"basePath\"}", - e.getMessage()); - throw new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription(message)); - } - } + public void removeJobArtifacts(String stagingSessionToken) throws Exception { + StagingSessionToken parsedToken = StagingSessionToken.decode(stagingSessionToken); + ResourceId dir = getJobDirResourceId(parsedToken); + ResourceId manifestResourceId = dir.resolve(MANIFEST, StandardResolveOptions.RESOLVE_FILE); - private static String encodeStagingSessionToken(StagingSessionToken stagingSessionToken) - throws Exception { - try { - return MAPPER.writeValueAsString(stagingSessionToken); - } catch (JsonProcessingException e) { - LOG.error("Error {} occurred while serializing {}.", e.getMessage(), stagingSessionToken); - throw e; + LOG.info("Removing dir {}", dir); + + ProxyManifest proxyManifest = + BeamFileSystemArtifactRetrievalService.loadManifest(manifestResourceId); + for (Location location : proxyManifest.getLocationList()) { + String uri = location.getUri(); + LOG.info("Removing artifact: {}", uri); + FileSystems.delete( + Collections.singletonList(FileSystems.matchNewResource(uri, false /* is directory */))); } + + ResourceId artifactsResourceId = + dir.resolve(ARTIFACTS, StandardResolveOptions.RESOLVE_DIRECTORY); + LOG.info("Removing artifacts: {}", artifactsResourceId); + FileSystems.delete(Collections.singletonList(artifactsResourceId)); + LOG.info("Removing manifest: {}", manifestResourceId); + FileSystems.delete(Collections.singletonList(manifestResourceId)); + LOG.info("Removing empty dir: {}", dir); + FileSystems.delete(Collections.singletonList(dir)); Review comment: DirectoryNotEmptyException is good enough. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 126757) Time Spent: 3h 50m (was: 3h 40m) > Less wasteful ArtifactStagingService > ------------------------------------ > > Key: BEAM-4778 > URL: https://issues.apache.org/jira/browse/BEAM-4778 > Project: Beam > Issue Type: Bug > Components: runner-core > Reporter: Eugene Kirpichov > Assignee: Ryan Williams > Priority: Major > Time Spent: 3h 50m > Remaining Estimate: 0h > > [https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactStagingService.java] > is the main implementation of ArtifactStagingService. > It stages artifacts into a directory; and in practice the passed staging > session token is such that the directory is different for every job. This > leads to 2 issues: > * It doesn't get cleaned up when the job finishes or even when the > JobService shuts down, so we have disk space leaks if running a lot of jobs > (e.g. a suite of ValidatesRunner tests) > * We repeatedly re-stage the same artifacts. Instead, ideally, we should > identify that some artifacts don't need to be staged - based on knowing their > md5. The artifact staging protocol has rudimentary support for this but may > need to be modified. > CC: [~angoenka] -- This message was sent by Atlassian JIRA (v7.6.3#76005)