[ 
https://issues.apache.org/jira/browse/TEZ-3222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15290228#comment-15290228
 ] 

Bikas Saha commented on TEZ-3222:
---------------------------------

Thanks for the update! And sorry for the delayed response.

{code}@@ -78,10 +78,10 @@ public class BroadcastEdgeManager extends 
EdgeManagerPluginOnDemand {
   }
   
   @Override
-  public EventRouteMetadata routeCompositeDataMovementEventToDestination(
+  public CompositeEventRouteMetadata 
routeCompositeDataMovementEventToDestination(
       int sourceTaskIndex, int destinationTaskIndex)
       throws Exception {
-    return commonRouteMeta[sourceTaskIndex];
+    return CompositeEventRouteMetadata.create(1, sourceTaskIndex, 0);
   }{code}
This should probably used the same caching logic instead creating new objects.

{code}
@@ -360,8 +360,8 @@ public class ShuffleVertexManager extends 
VertexManagerPlugin {
         partitionRange = basePartitionRange;
       }
 
-      return EventRouteMetadata.create(partitionRange, targetIndicesToSend, 
-          sourceIndices[destinationTaskIndex]);
+      return CompositeEventRouteMetadata.create(partitionRange, 
targetIndicesToSend[0], 
+          sourceIndices[destinationTaskIndex][0]);
     }{code}
This is not clear to me. The main reason for array type in EventRouteMetadata 
is this auto-reduce edge manager case where a single source CDME expands to 
multiple DMEs for the same destination task where the expansion number is the 
number of partitions coalesced during auto-reduce. Hence its not clear how 
passing the first element in the array would work.

If the above is true then perhaps we could look at adding EventRouteMetadata at 
a member of CDME cloned from the source for the destination. And in the 
destination, the CDME with route metadata gets expanded into DMEs in the same 
manner as the following code in Edge (which could be moved into a helper method 
on CDME)
{code}-              int numEvents = routeMeta.getNumEvents();
-              int[] sourceIndices = routeMeta.getSourceIndices();
-              int[] targetIndices = routeMeta.getTargetIndices();
-              while (numEventsDone < numEvents && listSize++ < listMaxSize) {
-                DataMovementEvent e = 
compEvent.expand(sourceIndices[numEventsDone],
-                    targetIndices[numEventsDone]);
-                numEventsDone++;
-                TezEvent tezEventToSend = new TezEvent(e, 
tezEvent.getSourceInfo(),
-                    tezEvent.getEventReceivedTime());
-                tezEventToSend.setDestinationInfo(destinationMetaInfo);
-                listToAdd.add(tezEventToSend);
-              }{code}

This would also keep the API unchanged for the edge plugin.

Does the above sound correct to you? I am looking at this code after a while 
and I may have gotten it all wrong :)

The code change in all inputs look quite similar to each other. Any potential 
for common methods?

> Reduce messaging overhead for auto-reduce parallelism case
> ----------------------------------------------------------
>
>                 Key: TEZ-3222
>                 URL: https://issues.apache.org/jira/browse/TEZ-3222
>             Project: Apache Tez
>          Issue Type: Bug
>            Reporter: Jonathan Eagles
>            Assignee: Jonathan Eagles
>         Attachments: TEZ-3222.1.patch, TEZ-3222.2.patch, TEZ-3222.3.patch, 
> TEZ-3222.4.patch
>
>
> A dag with 15k x 1000k vertex may auto-reduce to 15k x 1. And while the data  
> size is appropriate for 1 task attempt, this results in an increase in task 
> attempt message processing of 1000x.
> This jira aims to reduce the message processing in the auto-reduced task 
> while keeping the amount of message processing in the AM the same or less.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to