[ 
https://issues.apache.org/jira/browse/TEZ-4075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17173259#comment-17173259
 ] 

Sungwoo commented on TEZ-4075:
------------------------------

I would like to submit a patch for reducing lock contention in ShuffleManager, 
and have trouble with figuring out what to do with the field 
*completedInputSet* in ShuffleManager. The final patch in this Jira uses this 
code (which was removed in TEZ-2196):

{code:java}
    if (!completedInputSet.get(inputIdentifier)) {
      synchronized (completedInputSet) {
        if (!completedInputSet.get(inputIdentifier)) {
{code}

In the rest of the code of ShuffleManager, however, *completedInputSet* is not 
guarded with *synchronized (completedInputSet)*, and it looks like it is 
guarded with *lock.lock()*.

Could someone explain the intended use of *lock.lock()* in ShuffleManager? From 
my analysis, it looks like *lock.lock* is used to guard:

{code:java}
final BitSet completedInputSet;
private final BlockingQueue<InputHost> pendingHosts;
private long lastProgressTime;
private long totalBytesShuffledTillNow;
{code}

(*pendingHosts* is thread-safe, but the comment in the code says that it should 
be guarded with *lock.lock()*.)



> Tez: Reimplement tez.runtime.transfer.data-via-events.enabled
> -------------------------------------------------------------
>
>                 Key: TEZ-4075
>                 URL: https://issues.apache.org/jira/browse/TEZ-4075
>             Project: Apache Tez
>          Issue Type: Improvement
>            Reporter: Gopal Vijayaraghavan
>            Assignee: Richard Zhang
>            Priority: Major
>             Fix For: 0.10.1
>
>         Attachments: TEZ-4075.10.patch, TEZ-4075.15.patch, TEZ-4075.16.patch, 
> TEZ-4075.enable-dme.16.patch, Tez-4075.5.patch, Tez-4075.8.patch
>
>          Time Spent: 50m
>  Remaining Estimate: 0h
>
> This was factored out by TEZ-2196, which does skip buffers for 1-partition 
> data exchanges (therefore goes to disk directly).
> {code}
>     if (shufflePayload.hasData()) {       
> shuffleManager.addKnownInput(shufflePayload.getHost(),
>       DataProto dataProto = shufflePayload.getData();         
> shufflePayload.getPort(), srcAttemptIdentifier, srcIndex);
>       FetchedInput fetchedInput = 
> inputAllocator.allocate(dataProto.getRawLength(),   
>           dataProto.getCompressedLength(), srcAttemptIdentifier);     
>       moveDataToFetchedInput(dataProto, fetchedInput, hostIdentifier);        
>       shuffleManager.addCompletedInputWithData(srcAttemptIdentifier, 
> fetchedInput);   
>     } else {  
>       shuffleManager.addKnownInput(shufflePayload.getHost(),  
>           shufflePayload.getPort(), srcAttemptIdentifier, srcIndex);  
>     } 
> {code}
> got removed in 
> https://github.com/apache/tez/commit/1ba1f927c16a1d7c273b6cd1a8553e5269d1541a
> It would be better to buffer up the 512Byte limit for the event size before 
> writing to disk, since creating a new file always incurs disk traffic, even 
> if the file is eventually being served out of the buffer cache.
> The total overhead of receiving an event, then firing an HTTP call to fetch 
> the data etc adds approx 100-150ms to a query - the data xfer through the 
> event will skip the disk entirely for this & also remove the extra IOPS 
> incurred.
> This channel is not suitable for large-scale event transport, but 
> specifically the workload here deals with 1-row control tables which consume 
> more bandwidth with HTTP headers and hostnames than the 93 byte payload.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to