[ https://issues.apache.org/jira/browse/TEZ-4075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17173259#comment-17173259 ]
Sungwoo commented on TEZ-4075: ------------------------------ I would like to submit a patch for reducing lock contention in ShuffleManager, and have trouble with figuring out what to do with the field *completedInputSet* in ShuffleManager. The final patch in this Jira uses this code (which was removed in TEZ-2196): {code:java} if (!completedInputSet.get(inputIdentifier)) { synchronized (completedInputSet) { if (!completedInputSet.get(inputIdentifier)) { {code} In the rest of the code of ShuffleManager, however, *completedInputSet* is not guarded with *synchronized (completedInputSet)*, and it looks like it is guarded with *lock.lock()*. Could someone explain the intended use of *lock.lock()* in ShuffleManager? From my analysis, it looks like *lock.lock* is used to guard: {code:java} final BitSet completedInputSet; private final BlockingQueue<InputHost> pendingHosts; private long lastProgressTime; private long totalBytesShuffledTillNow; {code} (*pendingHosts* is thread-safe, but the comment in the code says that it should be guarded with *lock.lock()*.) > Tez: Reimplement tez.runtime.transfer.data-via-events.enabled > ------------------------------------------------------------- > > Key: TEZ-4075 > URL: https://issues.apache.org/jira/browse/TEZ-4075 > Project: Apache Tez > Issue Type: Improvement > Reporter: Gopal Vijayaraghavan > Assignee: Richard Zhang > Priority: Major > Fix For: 0.10.1 > > Attachments: TEZ-4075.10.patch, TEZ-4075.15.patch, TEZ-4075.16.patch, > TEZ-4075.enable-dme.16.patch, Tez-4075.5.patch, Tez-4075.8.patch > > Time Spent: 50m > Remaining Estimate: 0h > > This was factored out by TEZ-2196, which does skip buffers for 1-partition > data exchanges (therefore goes to disk directly). > {code} > if (shufflePayload.hasData()) { > shuffleManager.addKnownInput(shufflePayload.getHost(), > DataProto dataProto = shufflePayload.getData(); > shufflePayload.getPort(), srcAttemptIdentifier, srcIndex); > FetchedInput fetchedInput = > inputAllocator.allocate(dataProto.getRawLength(), > dataProto.getCompressedLength(), srcAttemptIdentifier); > moveDataToFetchedInput(dataProto, fetchedInput, hostIdentifier); > shuffleManager.addCompletedInputWithData(srcAttemptIdentifier, > fetchedInput); > } else { > shuffleManager.addKnownInput(shufflePayload.getHost(), > shufflePayload.getPort(), srcAttemptIdentifier, srcIndex); > } > {code} > got removed in > https://github.com/apache/tez/commit/1ba1f927c16a1d7c273b6cd1a8553e5269d1541a > It would be better to buffer up the 512Byte limit for the event size before > writing to disk, since creating a new file always incurs disk traffic, even > if the file is eventually being served out of the buffer cache. > The total overhead of receiving an event, then firing an HTTP call to fetch > the data etc adds approx 100-150ms to a query - the data xfer through the > event will skip the disk entirely for this & also remove the extra IOPS > incurred. > This channel is not suitable for large-scale event transport, but > specifically the workload here deals with 1-row control tables which consume > more bandwidth with HTTP headers and hostnames than the 93 byte payload. -- This message was sent by Atlassian Jira (v8.3.4#803005)