[GitHub] [tez] shameersss1 commented on a change in pull request #60: TEZ-3363: Delete intermediate data at the vertex level for Shuffle Handler
shameersss1 commented on a change in pull request #60: URL: https://github.com/apache/tez/pull/60#discussion_r828152234 ## File path: tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ## @@ -4930,4 +4981,19 @@ public boolean getTaskRescheduleRelaxedLocality() { public Map> getDownstreamBlamingHosts(){ return downstreamBlamingHosts; } + + /** + * Initialize context from vertex shuffle deletion. + * @param deletionHeight + */ + public void initShuffleDeletionContext(int deletionHeight) { +VertexShuffleDataDeletionContext vShuffleDeletionContext = new VertexShuffleDataDeletionContext(deletionHeight); +vShuffleDeletionContext.setSpannedVertices(this); +this.vShuffleDeletionContext = vShuffleDeletionContext; + } + + @VisibleForTesting + public VertexShuffleDataDeletionContext getVShuffleDeletionContext() { Review comment: ack. I will fix it in next revision. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@tez.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [tez] shameersss1 commented on a change in pull request #60: TEZ-3363: Delete intermediate data at the vertex level for Shuffle Handler
shameersss1 commented on a change in pull request #60: URL: https://github.com/apache/tez/pull/60#discussion_r828125294 ## File path: tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ## @@ -4930,4 +4981,19 @@ public boolean getTaskRescheduleRelaxedLocality() { public Map> getDownstreamBlamingHosts(){ return downstreamBlamingHosts; } + + /** + * Initialize context from vertex shuffle deletion. + * @param deletionHeight + */ + public void initShuffleDeletionContext(int deletionHeight) { +VertexShuffleDataDeletionContext vShuffleDeletionContext = new VertexShuffleDataDeletionContext(deletionHeight); +vShuffleDeletionContext.setSpannedVertices(this); +this.vShuffleDeletionContext = vShuffleDeletionContext; + } + + @VisibleForTesting + public VertexShuffleDataDeletionContext getVShuffleDeletionContext() { Review comment: This is used by testVertexShuffleDelete() in TestVertexImpl -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@tez.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [tez] shameersss1 commented on a change in pull request #60: TEZ-3363: Delete intermediate data at the vertex level for Shuffle Handler
shameersss1 commented on a change in pull request #60: URL: https://github.com/apache/tez/pull/60#discussion_r825417887 ## File path: tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java ## @@ -1772,6 +1773,17 @@ private static void parseVertexEdges(DAGImpl dag, Map edgePlan vertex.setInputVertices(inVertices); vertex.setOutputVertices(outVertices); +boolean cleanupShuffleDataAtVertexLevel = dag.dagConf.getInt(TezConfiguration.TEZ_AM_VERTEX_CLEANUP_HEIGHT, +TezConfiguration.TEZ_AM_VERTEX_CLEANUP_HEIGHT_DEFAULT) > 0 && ShuffleUtils.isTezShuffleHandler(dag.dagConf); +if (cleanupShuffleDataAtVertexLevel) { + int deletionHeight = dag.dagConf.getInt(TezConfiguration.TEZ_AM_VERTEX_CLEANUP_HEIGHT, + TezConfiguration.TEZ_AM_VERTEX_CLEANUP_HEIGHT_DEFAULT); + + VertexShuffleDataDeletionContext vShuffleDeletionContext = Review comment: ack. Will resolve in next revision. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@tez.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [tez] shameersss1 commented on a change in pull request #60: TEZ-3363: Delete intermediate data at the vertex level for Shuffle Handler
shameersss1 commented on a change in pull request #60: URL: https://github.com/apache/tez/pull/60#discussion_r825414491 ## File path: tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ## @@ -3758,6 +3779,36 @@ public VertexState transition(VertexImpl vertex, VertexEvent event) { } } + private static class VertexShuffleDeleteTransition implements + SingleArcTransition { + +@Override +public void transition(VertexImpl vertex, VertexEvent event) { + int incompleteChildrenVertices = vertex.vShuffleDeletionContext.getIncompleteChildrenVertices(); + incompleteChildrenVertices = incompleteChildrenVertices - 1; + vertex.vShuffleDeletionContext.setIncompleteChildrenVertices(incompleteChildrenVertices); + // check if all the child vertices are completed + if (incompleteChildrenVertices == 0) { +LOG.info("Vertex shuffle data deletion for vertex name: " + +vertex.getName() + " with vertex id: " + vertex.getVertexId()); +// Get nodes of all the task attempts in vertex +Set nodes = Sets.newHashSet(); +Map tasksMap = vertex.getTasks(); +tasksMap.keySet().forEach(taskId -> { + Map taskAttemptMap = tasksMap.get(taskId).getAttempts(); + taskAttemptMap.keySet().forEach(attemptId -> { +nodes.add(taskAttemptMap.get(attemptId).getNodeId()); + }); +}); +vertex.appContext.getAppMaster().vertexComplete( +vertex.vertexId, nodes); + } else { +LOG.debug(String.format("The number of incomplete child vertex are %s for the vertex %s", Review comment: ack. Will resolve in next revision. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@tez.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [tez] shameersss1 commented on a change in pull request #60: TEZ-3363: Delete intermediate data at the vertex level for Shuffle Handler
shameersss1 commented on a change in pull request #60: URL: https://github.com/apache/tez/pull/60#discussion_r825413725 ## File path: tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ## @@ -883,6 +883,22 @@ public TezConfiguration(boolean loadDefaults) { + "dag.cleanup.on.completion"; public static final boolean TEZ_AM_DAG_CLEANUP_ON_COMPLETION_DEFAULT = false; + /** + * Integer value. Instructs AM to delete vertex shuffle data if a vertex and all its + * child vertices at a certain depth are completed. Value less than 0 indicates the feature Review comment: ack. Will resolve in next revision. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@tez.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [tez] shameersss1 commented on a change in pull request #60: TEZ-3363: Delete intermediate data at the vertex level for Shuffle Handler
shameersss1 commented on a change in pull request #60: URL: https://github.com/apache/tez/pull/60#discussion_r823724534 ## File path: tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ## @@ -883,6 +883,25 @@ public TezConfiguration(boolean loadDefaults) { + "dag.cleanup.on.completion"; public static final boolean TEZ_AM_DAG_CLEANUP_ON_COMPLETION_DEFAULT = false; + /** + * Boolean value. Instructs AM to delete vertex shuffle data if a vertex and all its + * child vertices at a certain depth are completed. + */ + @ConfigurationScope(Scope.AM) + @ConfigurationProperty(type="boolean") + public static final String TEZ_AM_VERTEX_CLEANUP_ON_COMPLETION = TEZ_AM_PREFIX + + "vertex.cleanup.on.completion"; + public static final boolean TEZ_AM_VERTEX_CLEANUP_ON_COMPLETION_DEFAULT = false; + + /** + * Int value. The height from the vertex that it can issue shuffle data deletion upon completion + */ + @ConfigurationScope(Scope.AM) + @ConfigurationProperty(type="integer") + public static final String TEZ_AM_VERTEX_CLEANUP_HEIGHT = TEZ_AM_PREFIX + + "vertex.cleanup.height"; + public static final int TEZ_AM_VERTEX_CLEANUP_HEIGHT_DEFAULT = 1; Review comment: 1) ack. I will do the required changes 2. Since we will be collapsing the two options into one, as per previous comments, The value will be default to 0 , Indicating feature will be disabled, Yes height = 2 makes more sense! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@tez.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [tez] shameersss1 commented on a change in pull request #60: TEZ-3363: Delete intermediate data at the vertex level for Shuffle Handler
shameersss1 commented on a change in pull request #60: URL: https://github.com/apache/tez/pull/60#discussion_r821872406 ## File path: tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java ## @@ -1312,6 +1313,86 @@ protected void sendError(ChannelHandlerContext ctx, String message, } } + @Test + public void testVertexDelete() throws Exception { +final ArrayList failures = new ArrayList(1); +Configuration conf = new Configuration(); +conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3); +conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); +conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, +"simple"); +UserGroupInformation.setConfiguration(conf); +File absLogDir = new File("target", TestShuffleHandler.class. +getSimpleName() + "LocDir").getAbsoluteFile(); +conf.set(YarnConfiguration.NM_LOCAL_DIRS, absLogDir.getAbsolutePath()); +ApplicationId appId = ApplicationId.newInstance(12345L, 1); +String appAttemptId = "attempt_12345_0001_1_00_00_0_10003_0"; +String user = "randomUser"; +List fileMap = new ArrayList(); +String vertexDirStr = +StringUtils.join(Path.SEPARATOR, +new String[] { absLogDir.getAbsolutePath(), +ShuffleHandler.USERCACHE, user, +ShuffleHandler.APPCACHE, appId.toString(), "dag_1/output/" + appAttemptId}); +File vertexDir = new File(vertexDirStr); +Assert.assertFalse("Vetex Directory should not exists", vertexDir.exists()); Review comment: ack. I will fix this in next revision. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@tez.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [tez] shameersss1 commented on a change in pull request #60: TEZ-3363: Delete intermediate data at the vertex level for Shuffle Handler
shameersss1 commented on a change in pull request #60: URL: https://github.com/apache/tez/pull/60#discussion_r821871999 ## File path: tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java ## @@ -1256,6 +1280,29 @@ private String getBaseLocation(String jobId, String dagId, String user) { return baseStr; } +/** + * Delete shuffle data in task directories belonging to a vertex. + */ +private void deleteTaskDirsOfVertex(String jobId, String dagId, String vertexId, String user) throws IOException { + String baseStr = getBaseLocation(jobId, dagId, user); + FileContext lfc = FileContext.getLocalFSFileContext(); + for(Path dagPath : lDirAlloc.getAllLocalPathsToRead(baseStr, conf)) { +RemoteIterator status = lfc.listStatus(dagPath); +final JobID jobID = JobID.forName(jobId); +String taskDirPrefix = "attempt" + jobID.toString().replace("job", "") + +"_" + dagId + "_" + vertexId + "_"; +while (status.hasNext()) { + FileStatus fileStatus = status.next(); + Path attemptPath = fileStatus.getPath(); + if (attemptPath.getName().startsWith(taskDirPrefix)) { +if(lfc.delete(attemptPath, true)) { + LOG.info("Deleted shuffle data in task directory : " + attemptPath); Review comment: ack. I will fix this in next revision. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@tez.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [tez] shameersss1 commented on a change in pull request #60: TEZ-3363: Delete intermediate data at the vertex level for Shuffle Handler
shameersss1 commented on a change in pull request #60: URL: https://github.com/apache/tez/pull/60#discussion_r821871579 ## File path: tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java ## @@ -1256,6 +1280,29 @@ private String getBaseLocation(String jobId, String dagId, String user) { return baseStr; } +/** + * Delete shuffle data in task directories belonging to a vertex. + */ +private void deleteTaskDirsOfVertex(String jobId, String dagId, String vertexId, String user) throws IOException { + String baseStr = getBaseLocation(jobId, dagId, user); + FileContext lfc = FileContext.getLocalFSFileContext(); + for(Path dagPath : lDirAlloc.getAllLocalPathsToRead(baseStr, conf)) { +RemoteIterator status = lfc.listStatus(dagPath); +final JobID jobID = JobID.forName(jobId); +String taskDirPrefix = "attempt" + jobID.toString().replace("job", "") + Review comment: ack. I will fix this in next revision. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@tez.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [tez] shameersss1 commented on a change in pull request #60: TEZ-3363: Delete intermediate data at the vertex level for Shuffle Handler
shameersss1 commented on a change in pull request #60: URL: https://github.com/apache/tez/pull/60#discussion_r821870885 ## File path: tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ## @@ -3758,6 +3780,31 @@ public VertexState transition(VertexImpl vertex, VertexEvent event) { } } + private static class VertexDeleteTransition implements + SingleArcTransition { + +@Override +public void transition(VertexImpl vertex, VertexEvent event) { + vertex.incompleteChildrenVertices--; + // check if all the child vertices are completed + if (vertex.incompleteChildrenVertices == 0) { +LOG.info("Vertex shuffle data deletion for vertex name: " + +vertex.getName() + " with vertex id: " + vertex.getVertexId()); +// Get nodes of all the task attempts in vertex +Set nodes = Sets.newHashSet(); +Map tasksMap = vertex.getTasks(); +tasksMap.keySet().forEach(taskId -> { + Map taskAttemptMap = tasksMap.get(taskId).getAttempts(); + taskAttemptMap.keySet().forEach(attemptId -> { +nodes.add(taskAttemptMap.get(attemptId).getNodeId()); + }); +}); +vertex.appContext.getAppMaster().vertexComplete( +vertex.vertexId, nodes); + } Review comment: ack. I will fix this in next revision. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@tez.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [tez] shameersss1 commented on a change in pull request #60: TEZ-3363: Delete intermediate data at the vertex level for Shuffle Handler
shameersss1 commented on a change in pull request #60: URL: https://github.com/apache/tez/pull/60#discussion_r821870419 ## File path: tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java ## @@ -1772,6 +1774,52 @@ private static void parseVertexEdges(DAGImpl dag, Map edgePlan vertex.setInputVertices(inVertices); vertex.setOutputVertices(outVertices); +boolean cleanupShuffleDataAtVertexLevel = dag.dagConf.getBoolean(TezConfiguration.TEZ_AM_VERTEX_CLEANUP_ON_COMPLETION, +TezConfiguration.TEZ_AM_VERTEX_CLEANUP_ON_COMPLETION_DEFAULT) && ShuffleUtils.isTezShuffleHandler(dag.dagConf); +if (cleanupShuffleDataAtVertexLevel) { + int deletionHeight = dag.dagConf.getInt(TezConfiguration.TEZ_AM_VERTEX_CLEANUP_HEIGHT, + TezConfiguration.TEZ_AM_VERTEX_CLEANUP_HEIGHT_DEFAULT); + getSpannedVerticesAncestors(vertex, ancestors, deletionHeight); Review comment: ack. I will fix this in next revision. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@tez.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [tez] shameersss1 commented on a change in pull request #60: TEZ-3363: Delete intermediate data at the vertex level for Shuffle Handler
shameersss1 commented on a change in pull request #60: URL: https://github.com/apache/tez/pull/60#discussion_r821868285 ## File path: tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ## @@ -883,6 +883,25 @@ public TezConfiguration(boolean loadDefaults) { + "dag.cleanup.on.completion"; public static final boolean TEZ_AM_DAG_CLEANUP_ON_COMPLETION_DEFAULT = false; + /** + * Boolean value. Instructs AM to delete vertex shuffle data if a vertex and all its + * child vertices at a certain depth are completed. + */ + @ConfigurationScope(Scope.AM) + @ConfigurationProperty(type="boolean") + public static final String TEZ_AM_VERTEX_CLEANUP_ON_COMPLETION = TEZ_AM_PREFIX + + "vertex.cleanup.on.completion"; + public static final boolean TEZ_AM_VERTEX_CLEANUP_ON_COMPLETION_DEFAULT = false; + + /** + * Int value. The height from the vertex that it can issue shuffle data deletion upon completion + */ + @ConfigurationScope(Scope.AM) + @ConfigurationProperty(type="integer") + public static final String TEZ_AM_VERTEX_CLEANUP_HEIGHT = TEZ_AM_PREFIX + + "vertex.cleanup.height"; + public static final int TEZ_AM_VERTEX_CLEANUP_HEIGHT_DEFAULT = 1; Review comment: Just to add to my previous comment, The intention behind having height as a config is that (rather than value being 1 is that), The probability of shuffle data of great ancestor(s) being requested is very rare and happens when multiple shuffle nodes are lost. So having control over which ancestor(s) shuffle data to clear up will give user more flexibility to control -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@tez.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [tez] shameersss1 commented on a change in pull request #60: TEZ-3363: Delete intermediate data at the vertex level for Shuffle Handler
shameersss1 commented on a change in pull request #60: URL: https://github.com/apache/tez/pull/60#discussion_r821865132 ## File path: tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ## @@ -883,6 +883,25 @@ public TezConfiguration(boolean loadDefaults) { + "dag.cleanup.on.completion"; public static final boolean TEZ_AM_DAG_CLEANUP_ON_COMPLETION_DEFAULT = false; + /** + * Boolean value. Instructs AM to delete vertex shuffle data if a vertex and all its + * child vertices at a certain depth are completed. + */ + @ConfigurationScope(Scope.AM) Review comment: ack. I will fix this in next revision. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@tez.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [tez] shameersss1 commented on a change in pull request #60: TEZ-3363: Delete intermediate data at the vertex level for Shuffle Handler
shameersss1 commented on a change in pull request #60: URL: https://github.com/apache/tez/pull/60#discussion_r821509356 ## File path: tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ## @@ -883,6 +883,25 @@ public TezConfiguration(boolean loadDefaults) { + "dag.cleanup.on.completion"; public static final boolean TEZ_AM_DAG_CLEANUP_ON_COMPLETION_DEFAULT = false; + /** + * Boolean value. Instructs AM to delete vertex shuffle data if a vertex and all its + * child vertices at a certain depth are completed. + */ + @ConfigurationScope(Scope.AM) + @ConfigurationProperty(type="boolean") + public static final String TEZ_AM_VERTEX_CLEANUP_ON_COMPLETION = TEZ_AM_PREFIX + + "vertex.cleanup.on.completion"; + public static final boolean TEZ_AM_VERTEX_CLEANUP_ON_COMPLETION_DEFAULT = false; + + /** + * Int value. The height from the vertex that it can issue shuffle data deletion upon completion + */ + @ConfigurationScope(Scope.AM) + @ConfigurationProperty(type="integer") + public static final String TEZ_AM_VERTEX_CLEANUP_HEIGHT = TEZ_AM_PREFIX + + "vertex.cleanup.height"; + public static final int TEZ_AM_VERTEX_CLEANUP_HEIGHT_DEFAULT = 1; Review comment: Then all the vertex shuffle data of Map 1 will be deleted -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@tez.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [tez] shameersss1 commented on a change in pull request #60: TEZ-3363: Delete intermediate data at the vertex level for Shuffle Handler
shameersss1 commented on a change in pull request #60: URL: https://github.com/apache/tez/pull/60#discussion_r821509356 ## File path: tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ## @@ -883,6 +883,25 @@ public TezConfiguration(boolean loadDefaults) { + "dag.cleanup.on.completion"; public static final boolean TEZ_AM_DAG_CLEANUP_ON_COMPLETION_DEFAULT = false; + /** + * Boolean value. Instructs AM to delete vertex shuffle data if a vertex and all its + * child vertices at a certain depth are completed. + */ + @ConfigurationScope(Scope.AM) + @ConfigurationProperty(type="boolean") + public static final String TEZ_AM_VERTEX_CLEANUP_ON_COMPLETION = TEZ_AM_PREFIX + + "vertex.cleanup.on.completion"; + public static final boolean TEZ_AM_VERTEX_CLEANUP_ON_COMPLETION_DEFAULT = false; + + /** + * Int value. The height from the vertex that it can issue shuffle data deletion upon completion + */ + @ConfigurationScope(Scope.AM) + @ConfigurationProperty(type="integer") + public static final String TEZ_AM_VERTEX_CLEANUP_HEIGHT = TEZ_AM_PREFIX + + "vertex.cleanup.height"; + public static final int TEZ_AM_VERTEX_CLEANUP_HEIGHT_DEFAULT = 1; Review comment: Then all the vextex shuffle data of Map 1 will be deleted -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@tez.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org