[ https://issues.apache.org/jira/browse/TEZ-3222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15290228#comment-15290228 ]
Bikas Saha commented on TEZ-3222: --------------------------------- Thanks for the update! And sorry for the delayed response. {code}@@ -78,10 +78,10 @@ public class BroadcastEdgeManager extends EdgeManagerPluginOnDemand { } @Override - public EventRouteMetadata routeCompositeDataMovementEventToDestination( + public CompositeEventRouteMetadata routeCompositeDataMovementEventToDestination( int sourceTaskIndex, int destinationTaskIndex) throws Exception { - return commonRouteMeta[sourceTaskIndex]; + return CompositeEventRouteMetadata.create(1, sourceTaskIndex, 0); }{code} This should probably used the same caching logic instead creating new objects. {code} @@ -360,8 +360,8 @@ public class ShuffleVertexManager extends VertexManagerPlugin { partitionRange = basePartitionRange; } - return EventRouteMetadata.create(partitionRange, targetIndicesToSend, - sourceIndices[destinationTaskIndex]); + return CompositeEventRouteMetadata.create(partitionRange, targetIndicesToSend[0], + sourceIndices[destinationTaskIndex][0]); }{code} This is not clear to me. The main reason for array type in EventRouteMetadata is this auto-reduce edge manager case where a single source CDME expands to multiple DMEs for the same destination task where the expansion number is the number of partitions coalesced during auto-reduce. Hence its not clear how passing the first element in the array would work. If the above is true then perhaps we could look at adding EventRouteMetadata at a member of CDME cloned from the source for the destination. And in the destination, the CDME with route metadata gets expanded into DMEs in the same manner as the following code in Edge (which could be moved into a helper method on CDME) {code}- int numEvents = routeMeta.getNumEvents(); - int[] sourceIndices = routeMeta.getSourceIndices(); - int[] targetIndices = routeMeta.getTargetIndices(); - while (numEventsDone < numEvents && listSize++ < listMaxSize) { - DataMovementEvent e = compEvent.expand(sourceIndices[numEventsDone], - targetIndices[numEventsDone]); - numEventsDone++; - TezEvent tezEventToSend = new TezEvent(e, tezEvent.getSourceInfo(), - tezEvent.getEventReceivedTime()); - tezEventToSend.setDestinationInfo(destinationMetaInfo); - listToAdd.add(tezEventToSend); - }{code} This would also keep the API unchanged for the edge plugin. Does the above sound correct to you? I am looking at this code after a while and I may have gotten it all wrong :) The code change in all inputs look quite similar to each other. Any potential for common methods? > Reduce messaging overhead for auto-reduce parallelism case > ---------------------------------------------------------- > > Key: TEZ-3222 > URL: https://issues.apache.org/jira/browse/TEZ-3222 > Project: Apache Tez > Issue Type: Bug > Reporter: Jonathan Eagles > Assignee: Jonathan Eagles > Attachments: TEZ-3222.1.patch, TEZ-3222.2.patch, TEZ-3222.3.patch, > TEZ-3222.4.patch > > > A dag with 15k x 1000k vertex may auto-reduce to 15k x 1. And while the data > size is appropriate for 1 task attempt, this results in an increase in task > attempt message processing of 1000x. > This jira aims to reduce the message processing in the auto-reduced task > while keeping the amount of message processing in the AM the same or less. -- This message was sent by Atlassian JIRA (v6.3.4#6332)