[ 
https://issues.apache.org/jira/browse/HDFS-347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13495903#comment-13495903
 ] 

Todd Lipcon commented on HDFS-347:
----------------------------------

The current implementation puts all the communication with the DN over the unix 
socket. I think it would be worth having three modes for this configuration:

1) Disabled -- code paths identical to today
2) Enabled for FD passing only -- only connects via unix socket if it's about 
to try to do fd-passing. Otherwise, it uses loopback TCP
3) Enabled for FD passing and all communication

The reason for option 1 is obvious: it's a lot of new code and we'd be wise to 
introduce it as "experimental" initially.
The reason for option 2 is that, if we only use it for fd passing, we don't 
need to care about performance or subtle bugs in the data transfer path. The FD 
transfer has the nice property that it either works or doesn't work - it's much 
less likely that it would pass a 'corrupt' FD. Also, the unix socket path seems 
to be much slower than TCP in the current implementation (see more below)
The reason for option 3 is that, according to benchmarks seen elsewhere (and 
'netperf'), the unix sockets should be able to go 2x the speed of TCP loopback 
once we spend some time optimizing them. This would have some benefits:
- faster performance, with no semantic difference (eg metrics and architectural 
layering maintained)
- improvements on the write path as well as the read path

If the data-over-unix-sockets path is significantly faster than the existing 
TCP path (I think it should be possible to get 2x), then that seems like the 
kind of thing we'd want on by default for every MR task, etc, since we'd get 
the speedup without any cost of lost metrics or QoS opportunities in the DN. I 
can see still wanting fd passing for applications like HBase that are heavily 
random-access oriented, but for streaming, I think if we can get 'close' to the 
optimal, the metrics are worth more than the last little bit of juice.

I spent some time looking at the performance of unix sockets (data path over 
unix, _not_ fd passing) in your current patch, and found that the data path is 
at least 2x slower in my benchmark, and uses 3x as much CPU. This seems to be 
due to a number of things:
- The domain socket doesn't implement transferTo (aka sendfile). So, we end up 
doing a lot more copies on the sending side to go in and out of kernel space
- "CallIntMethod" is showing up a lot in my 'perf top' output. This seems to be 
from within the readByteBuffer0 call. I think we can optimize this 
significantly as follows:
-- Assume that the Java code always passes a direct buffer into the native 
code. If the user supplies a non direct buffer, use a cached 32KB (or so) 
direct buffer inside the InputStream to read into and then copy into the 
user-supplied array-based buffer. Given that our read path always uses direct 
buffers, this should be an easy simplification.
-- Pass the buffer's offset and remaining length in via parameters to the JNI 
function, rather than calling "back" into Java with CallIntMethod. This should 
have significantly better performance, since the JIT will take care of inlining 
and lock elision on the Java side.
- In the read() call, you're currently calling {{fdRef()}} and {{fdUnref()}} 
every time. Looking at the implementation of the similar pieces of the JDK, 
they get around this kind of overhead. It would be interesting to try 
'breaking' the code to not do the ref counting on read, to see if it's a 
bottleneck. My guess is that it might be, since the atomic operations end up 
issuing a reasonably costly memory barrier, somewhat needlessly.

Overall, I'd try to model the code a little closer to the built-in JDK 
implementations of SocketChannel, etc.

All of the above only matters if the data path is going over the unix sockets 
(option 3 above). Hence the suggestion that we could do a more minimal initial 
implementation without offering option 3, or at least not recommending option 
3, and then work to do the optimization for the data path separately.

Regarding test plan, have you thought about how we can verify this? It's a lot 
of new code if we assume that the data path may run over it. I'm particularly 
concerned about things like timeout handling or races on socket close which 
could lock up a datanode or cause an FD leak. Explaining a test plan that 
covers things like this would be helpful. (One of the original reasons that I 
liked importing the Android code was that it's likely to have been well tested, 
whereas this patch has nearly the same amount of new code, except that it 
hasn't been baked anywhere yet).

I have some comments on the code itself, but I want to take a few more passes 
through it to understand it all better before I post - no sense nit picking 
small things when there are bigger questions per above.

                
> DFS read performance suboptimal when client co-located on nodes with data
> -------------------------------------------------------------------------
>
>                 Key: HDFS-347
>                 URL: https://issues.apache.org/jira/browse/HDFS-347
>             Project: Hadoop HDFS
>          Issue Type: Improvement
>          Components: data-node, hdfs client, performance
>            Reporter: George Porter
>            Assignee: Colin Patrick McCabe
>         Attachments: all.tsv, BlockReaderLocal1.txt, HADOOP-4801.1.patch, 
> HADOOP-4801.2.patch, HADOOP-4801.3.patch, HDFS-347-016_cleaned.patch, 
> HDFS-347.016.patch, HDFS-347.017.clean.patch, HDFS-347.017.patch, 
> HDFS-347.018.clean.patch, HDFS-347.018.patch2, HDFS-347.019.patch, 
> HDFS-347.020.patch, HDFS-347.021.patch, HDFS-347.022.patch, 
> HDFS-347-branch-20-append.txt, hdfs-347.png, hdfs-347.txt, local-reads-doc
>
>
> One of the major strategies Hadoop uses to get scalable data processing is to 
> move the code to the data.  However, putting the DFS client on the same 
> physical node as the data blocks it acts on doesn't improve read performance 
> as much as expected.
> After looking at Hadoop and O/S traces (via HADOOP-4049), I think the problem 
> is due to the HDFS streaming protocol causing many more read I/O operations 
> (iops) than necessary.  Consider the case of a DFSClient fetching a 64 MB 
> disk block from the DataNode process (running in a separate JVM) running on 
> the same machine.  The DataNode will satisfy the single disk block request by 
> sending data back to the HDFS client in 64-KB chunks.  In BlockSender.java, 
> this is done in the sendChunk() method, relying on Java's transferTo() 
> method.  Depending on the host O/S and JVM implementation, transferTo() is 
> implemented as either a sendfilev() syscall or a pair of mmap() and write().  
> In either case, each chunk is read from the disk by issuing a separate I/O 
> operation for each chunk.  The result is that the single request for a 64-MB 
> block ends up hitting the disk as over a thousand smaller requests for 64-KB 
> each.
> Since the DFSClient runs in a different JVM and process than the DataNode, 
> shuttling data from the disk to the DFSClient also results in context 
> switches each time network packets get sent (in this case, the 64-kb chunk 
> turns into a large number of 1500 byte packet send operations).  Thus we see 
> a large number of context switches for each block send operation.
> I'd like to get some feedback on the best way to address this, but I think 
> providing a mechanism for a DFSClient to directly open data blocks that 
> happen to be on the same machine.  It could do this by examining the set of 
> LocatedBlocks returned by the NameNode, marking those that should be resident 
> on the local host.  Since the DataNode and DFSClient (probably) share the 
> same hadoop configuration, the DFSClient should be able to find the files 
> holding the block data, and it could directly open them and send data back to 
> the client.  This would avoid the context switches imposed by the network 
> layer, and would allow for much larger read buffers than 64KB, which should 
> reduce the number of iops imposed by each read block operation.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to