On 7/30/09 2:32 PM, "william kinney" <william.kin...@gmail.com> wrote:

> Hrm I think I found an issue. In my RecordReader, I do an
> Arrays.copyOfRange() to get the protobuf binary section for
> RecordReader.next(key,value). In the profile dumps from child map
> processes, this one call takes up ~90% of the CPU Samples time.
> So, I wrapped the line w/ a System.nanoTime(), and got:
>
> Local process:
>   total(ms): 902.138069, avg: 112.145 ns
>
> Hadoop Child processes:
>   1) total(ms): 6953.47106, avg: 726.906 ns
>   2) total(ms): 6503.962176, avg: 802.270 ns
>   3) total(ms): 5482.494256, avg: 677.589 ns
>   4) total(ms): 5291.664592, avg: 661.764 ns
>   5) total(ms): 5568.289465, avg: 697.353 ns
>   6) total(ms): 5638.778551, avg: 702.290 ns
>    ...etc
>
> So for some reason, that call is taking over 6 times longer in hadoop...
>
> The buffer size for it is 65536 for both processes.
>
> Any ideas?
>

That is a very interesting result.  Try counting the number of times that
the above is called to ensure that is the same for both -- if the average
size of the copy is much smaller it will be slower.

Other ideas -- is one using a native ByteBuffer underneath the covers
somewhere and the other not?   Is there some other difference in buffering
on either side of that copy?



>
> ------
> "You might want to try -Xmx486m as an experiment on the local test to see if
> it affects the behavior.  If you are doing a lot of garbage creation it may."
>   -Tried it, no changes.
>
> "Hmm, that was a random guess, because it would obviously affect CPU use.
> Another thing to try -- make sure your writer that is writing into HDFS is
> wrapped with a buffer (try 32k or 64k).  That's another random guess for
> something that might not show up well in stack traces without a profiler --
> but also might already be done"
>   - So you're saying when writing the file into the HDFS, I should
> make sure it ends in 64k chunk (ie, zero-out until i reach such a
> point)? So all file sizes are a multiple of 64kb?

No, just that its using something like a BufferedOutputStream when writing
from your custom format out (HDFS does this itself so it shouldn't be
necessary) and BufferedInputStream for reading.

>
> "There is definitely a mystery here.  I expect the task scheduling delays and
> some startup inefficiency but the overall difference is odd.  What about a
> local test on a single, larger file versus a hadoop job on that same single,
> larger file (which would have just one map job)?  This test may be very
> enlightening."
>   - Total job time was 20 seconds for the 506MB file. Task took 19
> seconds. Local process on the same file took ~ 3 seconds.

Ok, so drilling down here is where we need to look  (and what the results
above are).  Scheduling may be a few seconds of that.
>
> "Hmm that difference seems a bit troubling.  For one, you are running two
> tasks at once per node -- is there any way to do your local, non MR test
> with two concurrent processes or threads?"
>   - Does the above test answer this? Only one task was executed on the
> node that took 19 seconds.
>
Yeah, it looks like we have ruled that out.

> ext3 filesystem.
>
> "Make sure your OS readahead on the device is set to a good value (at least
> 2048 blocks, preferably 8192 ish):"
>   - For RA its showing 256, BSZ is 2048. RA should be 8192 ? Should
> BSZ then be larger? What about "SSZ"?

I'm referring to /sbin/blockdev --getra  <device>
Which is just "RA".  SSZ is sector size -- that can't change, and I think
"BSZ" is block size, and is also static.

Use
/sbin/blockdev/ --setra <value> <device> to set the readahead.  This will
increase sequential throughput somewhat at the device level, but moreso if
there are two or more concurrent reads.   It doesn't affect random I/O
performance.  Basically, if the block layer detects a sequence of I/O's that
are sequential, it starts reading "ahead" of the last I/O and keeps
increasing the size of this readahead as long as the sequential access
continues, up to a max size.

>
> Since the performance is high for the local process, would that then
> mean my disk i/o is sufficient, as you suggested? Do I still need to
> change any of these settings?

You seem CPU bound, especially considering your evidence above.  I/O tuning
might help somewhere, but not this use case.

>
> "If even a single task on a single large file is slower in MB/sec than your
> test program, then I suspect read/write buffer issues or misuse somewhere."
>   - Do you know of an instance where I'd have buffer issues with the
> Child process, and not local? The only difference I can think of is of
> course how the buffer is filled, FileInputStream vs FSDataInputStream.
> But once it is filled, why would reading portions of that buffer (ie,
> Arrays.copyOfRange()) take long in one instance but not another?

I am not familiar enough with that part of Hadoop to know.  In general, that
buffer may be too small, or be backed by a Native ByteBuffer which will be
slow for small reads into Java memory.

>
> Would it be helpful to get a histogram of the Arrays.copyOfRange(),
> rather than the average and total? Perhaps for the most part it is
> fine (~ 120 ns), but chokes sometimes (thefore increasing total time
> and average).
>

A crude histogram could be useful, but I wouldn't spend too much time on it.
Some basic characterization of the difference measured other than averages
and totals might be insightful, but I'm unsure how useful the results would
be.

> Thanks for the help,
> Will
>
>
> On Thu, Jul 30, 2009 at 2:19 PM, Scott Carey<sc...@richrelevance.com> wrote:
>> Comments inline:
>>
>> On 7/30/09 7:37 AM, "william kinney" <william.kin...@gmail.com> wrote:
>>
>>> Local is executed on a Hadoop node (when no job is running), So same
>>> JRE/hardware.
>>>
>>> JRE:
>>> java version "1.6.0_13"
>>> Java(TM) SE Runtime Environment (build 1.6.0_13-b03)
>>> Java HotSpot(TM) 64-Bit Server VM (build 11.3-b02, mixed mode)
>>>
>>> JVM arguments for child task:
>>> /usr/java/jdk1.6.0_13/jre/bin/java
>>> -Djava.library.path=/usr/lib/hadoop/lib/native/Linux-amd64-64:/disk1/hadoop/
>>> ma
>>> pred/local/taskTracker/jobcache/job_200907242015_0048/attempt_200907242015_0
>>> 04
>>> 8_m_000008_0/work
>>> -Xmx486m
>>
>> You might benefit from JVM 1.6.0_14 with -XX:+UseCompressedOops, but that
>> would probably help both roughly equally.  It won't help much if you aren't
>> creating significant work for the garbage collector thouth.
>>
>>>
>>> Local call has no JVM arguments, just:
>>> java -cp <myjar>.jar com......RecordReaderTest <fileToTest>
>>>
>>
>> You might want to try -Xmx486m as an experiment on the local test to see if
>> it affects the behavior.  If you are doing a lot of garbage creation it may.
>>
>>
>>>
>>> Data is not compressed.
>>
>> Hmm, that was a random guess, because it would obviously affect CPU use.
>> Another thing to try -- make sure your writer that is writing into HDFS is
>> wrapped with a buffer (try 32k or 64k).  That's another random guess for
>> something that might not show up well in stack traces without a profiler --
>> but also might already be done.
>>
>>>
>>> JobTracker:
>>> Running: Started around 20, but as the job progressed it slowly
>>> increased to at the end: 432 (when Pending was 0). Running dropped to
>>> 0/Status was marked Succeeded about 10 seconds after that. Is this
>>> normal? The total # of Tasks was 1449.
>>
>> This is the "one new task per heartbeat" scheduler slowness.  The next
>> version of the Fair Scheduler will schedule many tasks in one heartbeat
>> which should make this faster.
>> Its a big reason that fewer, larger files was faster.  Though if you are CPU
>> bound, you only need 2 tasks running at the same time per node on your
>> hardware to be at near top efficiency.  Fewer tasks per node (say, 4) with
>> more RAM each (800MB) might do better on this sort of workload.
>>
>>
>>>
>>> Stack Traces.
>>> Looked at about 20 stack traces from 2 different nodes. Consistently saw:
>>> 2 x org.apache.hadoop.dfs.DFSClient$LeaseChecker @ Thread.sleep()
>>> "Comm thread for attempt_200907242015_0050_m_001409_0" @ Thread.sleep()
>>> "IPC Client (47) connection to <master-hostname>/192.168.1.100:8020
>>> from wkinney" @ Object.wait()
>>> "IPC Client (47) connection to /127.0.0.1:49202 from an unknown user"
>>> @ Object.wait()
>>> VM, GC, Signal Dispatcher, Low Memory Detector, CompilerThread,
>>> Finalizer, Reference Handler...
>>
>> Sounds like the usual threads that don't do much.
>>
>>>
>>> Then would sometimes see FastDateFormat thread, parseFrom(), or
>>> somewhere near there (e.g. MapRunner.run())
>>
>> The meat of the task.
>>
>>>
>>> Finally, I consistently saw this:
>>> "Thread-5" daemon prio=10 tid=0x0000000040bbfc00 nid=0x2f87 in
>>> Object.wait() [0x00007fb7498ce000..0x00007fb7498cebf0]
>>>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
>>>         at java.lang.Object.wait(Native Method)
>>>         - waiting on <0x00007fb769fdec00> (a java.util.LinkedList)
>>>         at
>>> org.apache.hadoop.dfs.DFSClient$DFSOutputStream$DataStreamer.run(DFSClient.j
>>> av
>>> a:1905)
>>>         - locked <0x00007fb769fdec00> (a java.util.LinkedList)
>>> I'm guessing this is normal DataNode activity...
>>
>> Yes, this is the normal dfs thread.  It can be hard to catch it doing work
>> with just stack traces and no profiler attached.
>>
>>>
>>> Will
>>>
>>>
>>
>> There is definitely a mystery here.  I expect the task scheduling delays and
>> some startup inefficiency but the overall difference is odd.  What about a
>> local test on a single, larger file versus a hadoop job on that same single,
>> larger file (which would have just one map job)?  This test may be very
>> enlightening.
>>
>>
>>> On Thu, Jul 30, 2009 at 1:31 AM, Scott Carey<sc...@richrelevance.com> wrote:
>>>> What is the JRE for the Hadoop nodes versus local?  What are the JVM
>>>> arguments for the child tasks and the local version (and heap size)?  What
>>>> is
>>>> the hardware and platform details for the nodes versus the local test?
>>>> Is the data compressed in Hadoop (check the config)?
>>>>
>>>> You mention the TaskTracker web inerface during a job, but what about the
>>>> JobTracker interface?  This should show the global view of currently
>>>> scheduled maps versus total slots.
>>>>
>>>> Lastly, check out some more stack traces on the tasks.  If they are all
>>>> still
>>>> in the DateFormat stuff?  Surely some of them should be in your parseFrom()
>>>> method too?
>>>>
>>>>
>>>> On 7/29/09 9:07 PM, "william kinney" <william.kin...@gmail.com> wrote:
>>>>
>>>> OK:
>>>>  implemented some iotop/iostat monitoring in ganglia. Looks pretty
>>>> standard (job was 23:00 to 23:06):
>>>>   - Single Node Disk Read: http://imagebin.org/57716
>>>>   - Single Node Disk Write: http://imagebin.org/57717
>>>>
>>>> On each node, noticed that the two TaskTracker$Child processes were
>>>> consuming close to 90% of each core. The TaskTracker and DataNode were
>>>> close to 0%. For the TT children, I did jstack dumps, but didn't
>>>> really see much that popped out other than a lot of time spent in a
>>>> SimpleDateFormat section and the protobuf parse. I switched the SDF
>>>> out with commons.lang FastDateFormat, which reduced the total time for
>>>> both the Hadoop job and the local/non-hadoop test, so still a
>>>> discrepancy between local and hadoop runs.
>>>>
>>>> "You can look at the logs for an individual task, and see how much data it
>>>> read, and how long it took.  It might be hitting your 50MB/sec or close in
>>>> a
>>>> burst, or perhaps not."
>>>>  - I decided to log the performance of each RecordReader use within
>>>> hadoop, which is essentially 1:1 for TaskTracker$Child process since I
>>>> have 1 InputSplit per file (ie, no splitting), right?. Saw:
>>>> Example 1) 527639090 bytes in : 18050 ms. (27.8778 MB/s)
>>>> Example 2) 533770314 bytes in : 23494 ms. (21.6669 MB/s)
>>>> Example 3) 529711886 bytes in : 20092 ms. (25.1429 MB/s)
>>>> ...etc
>>>> For reference, the non-hadoop/local test:
>>>> 530710906 bytes in : 9133 ms. (55.41721 MB/s)
>>>>
>>>> Regarding the JobTracker only doing 1 task / node / 2 seconds, that
>>>> will definitely hurt. Although the above discrepancy takes priority
>>>> for me, for now.
>>>>
>>>> "What does the web interface tell you about the number of concurrent map
>>>> tasks during the run?  Does it approach the max task slots?"
>>>>  - Yeah it definitely does, from the TaskTracker page on each node,
>>>> I'm seeing almost always 2 "RUNNING" tasks (and an accumulating list
>>>> of "COMMIT_PENDING" tasks under Non-Running, which slowly grows as the
>>>> job progresses). Normal?
>>>>
>>>> Also, I used a profiler to profile a local/non-hadoop test of the
>>>> RecordReader/Map():
>>>>  class: %Time
>>>>      org.apache.commons.lang.time.FastDateFormat.format(long):  46%
>>>>      com......parseFrom(byte[]):  42%
>>>>      java.io.FileInputStream.read(byte[], int, int): 5%
>>>>      ...rest are 1%'ish
>>>>  I guess this doesn't show anything helpful. I'll try to attach it to
>>>> hadoop remotely...anyone have any experience doing this w/ YourKit
>>>> Java Profiler?
>>>>
>>>> Anyways, decided to test the "large files" vs "small files" theory again:
>>>>  Small files (1449 files, ranging 10-100MB. average: 32 MB)
>>>>    - HDFS bytes read  49,057,491,374
>>>>    - Map input records  737,850,142
>>>>    - Finished in: 7mins, 26sec
>>>>    ... 104.898 MB/s
>>>>  Large files (22 files, around 500MB. average 514MB)
>>>>    - HDFS bytes read  11,852,421,152
>>>>    - Map input records 179,657,432
>>>>    - Finished in: 1mins, 8sec
>>>>    ... 166.225 MB/s
>>>>
>>>>   Not sure why before the large files were taking longer, perhaps the
>>>> SimpleDateFormat>FastDateFormat change? Anyways, good to see where I
>>>> need to take the file sizes too...but still 166 MB is not the rate I
>>>> was hoping for (given the # of nodes and local performance).
>>>>
>>>> So I guess in summary, hadoop TaskTracker$Child processes that are
>>>> doing the Map() and RecordReader are about 50% slower than the normal,
>>>> local non-hadoop version. In addition, their rate (~25MB/s) * Num
>>>> Nodes (10) suggests ~ 250MB/s total job performance, but I'm only
>>>> seeing ~166MB/s.
>>>>
>>>> Will
>>>>
>>>> On Tue, Jul 28, 2009 at 6:35 PM, Scott Carey<sc...@richrelevance.com>
>>>> wrote:
>>>>> See below:
>>>>>
>>>>>
>>>>> On 7/28/09 12:15 PM, "william kinney" <william.kin...@gmail.com> wrote:
>>>>>
>>>>>> Sorry, forgot to include that detail.
>>>>>>
>>>>>> Some data from ganglia:
>>>>>>
>>>>>>   CPU:
>>>>>>     - on all 10 nodes, I am seeing for the life of the job 85-95% CPU
>>>>>> usage, with about 10% of that being "System" CPU, vs "User".
>>>>>>     - Single node graph: http://imagebin.org/57520
>>>>>>     - Cluster graph: http://imagebin.org/57523
>>>>>
>>>>> Ok, CPU is definitely loaded.  Identify which processes are primarily
>>>>> responsible (Tasks? Datanode? Tasktracker?) You'll want to make the
>>>>> processes eating CPU during a run spit out some stack traces to 'profile'
>>>>> the activity.  Use either the 'jstack' utility with the JDK, or do a 'kill
>>>>> -3 <pid>' on a java process to spit out the stack trace to stdout.  You'll
>>>>> want to do this a handful of times on a single job if possible to identify
>>>>> any trends.
>>>>>
>>>>>>
>>>>>>   Memory:
>>>>>>     - Memory used before job is about 0.4GB, During job it fluctuates
>>>>>> up to 0.6GB and 0.7GB, then back down to 0.4GB. Most of the node
>>>>>> memory (8GB) is showing as "Cached".
>>>>>>     - Single node graph: http://imagebin.org/57522
>>>>>
>>>>> So the OS is mostly just caching disk files in RAM.
>>>>>
>>>>>>
>>>>>>   Network:
>>>>>>     - IN and OUT: Each node 6-12MB/s, cumulative about 30-44MB/s.
>>>>>>     - Single node graph: http://imagebin.org/57521
>>>>>>     - Cluster graph: http://imagebin.org/57525
>>>>>>
>>>>>
>>>>> That is a not insignificant, but cumulative across the cluster its not
>>>>> much.
>>>>>
>>>>>> iostat (disk) (sampled most of the nodes, below values are ranges I saw):
>>>>>>     tps: 0.41-1.27
>>>>>>     Blk_read/s: 46-58
>>>>>>     Blk_wrtn/s: 20-23
>>>>>> (have two disks per node, both SAS, 10k RPM)
>>>>>>
>>>>>
>>>>> Did you do iostat with a parameter to have it spit out more than one row?
>>>>> By default, it spits out data averaged since boot time, like vmstat.
>>>>> My favorite iostat params for monitoring are:
>>>>> iostat -mx 5
>>>>> iostat -dmx 5
>>>>> (or 10 or 15 or 60 second intervals depending on what I'm doing)  Ganglia
>>>>> might have some I/O info -- you want both iops and some sort of bytes/sec
>>>>> measurement.
>>>>>
>>>>>> ---
>>>>>> Are those Blk_read/wrtn/s as in block size (4096?) = bytes/second?
>>>>>>
>>>>>
>>>>> I think its the 512 byte block notion, but I always use -m to put it in
>>>>> useful units.
>>>>>
>>>>>> Also, from the job page (different job, same Map method, just more
>>>>>> data...~40GB. 781 files):
>>>>>> Map input records       629,738,080
>>>>>> Map input bytes         41,538,992,880
>>>>>>
>>>>>> Anything else I can look into?
>>>>>
>>>>> Based on your other email:
>>>>>
>>>>> There are almost 800 map tasks, these seem to mostly be data local.  The
>>>>> current implementation of the JobTracker schedules rather slowly, and can
>>>>> at
>>>>> best place one new task per node per 2 seconds or so on a small cluster.
>>>>> So, with 10 servers, it will take at least 80 seconds just to schedule all
>>>>> the tasks.
>>>>> If each server can run 8 tasks concurrently, then if the average task
>>>>> doesn't take somewhat longer than 16 seconds, the system will not reach
>>>>> full
>>>>> utilization.
>>>>>
>>>>> What does the web interface tell you about the number of concurrent map
>>>>> tasks during the run?  Does it approach the max task slots?
>>>>>
>>>>> You can look at the logs for an individual task, and see how much data it
>>>>> read, and how long it took.  It might be hitting your 50MB/sec or close in
>>>>> a
>>>>> burst, or perhaps not.
>>>>>
>>>>> Given the sort of bottlenecks I often see, I suspect the scheduling.  But,
>>>>> you have almost maxed CPU use, so its probably not that.  Getting stack
>>>>> dumps to see what the processor is doing during your test will help narrow
>>>>> it down.
>>>>>
>>>>>
>>>>>>
>>>>>> Do my original numbers (only 2x performance) jump out at you as being
>>>>>> way off? Or it is common to see that a setup similar to mine?
>>>>>>
>>>>>> I should also note that given its a custom binary format, I do not
>>>>>> support Splitting (isSplittable() is false). I don't think that would
>>>>>> count for such a large discrepancy in expected performance, would it?
>>>>>>
>>>>>
>>>>> If the files are all larger than the block size, it would cause a lot more
>>>>> network activity -- but unless your switch or network is broken or not
>>>>> gigabit -- there is a lot of capacity left in the network.
>>>>>
>>>>>> Thanks for the help,
>>>>>> Will
>>>>>>
>>>>>>
>>>>>> On Tue, Jul 28, 2009 at 12:58 PM, Scott Carey<sc...@richrelevance.com>
>>>>>> wrote:
>>>>>>> Well, the first thing to do in any performance bottleneck investigation
>>>>>>> is
>>>>>>> to look at the machine hardware resource usage.
>>>>>>>
>>>>>>> During your test, what is the CPU use and disk usage?  What about
>>>>>>> network
>>>>>>> utilization?
>>>>>>> Top, vmstat, iostat, and some network usage monitoring would be useful.
>>>>>>>  It
>>>>>>> could be many things causing your lack of scalability, but without
>>>>>>> actually
>>>>>>> monitoring your machines to see if there is an obvious bottleneck its
>>>>>>> just
>>>>>>> random guessing and hunches.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On 7/28/09 8:18 AM, "william kinney" <william.kin...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> Thanks in advance for the help!
>>>>>>>>
>>>>>>>> I have a performance question relating to how fast I can expect Hadoop
>>>>>>>> to scale. Running Cloudera's 0.18.3-10.
>>>>>>>>
>>>>>>>> I have custom binary format, which is just Google Protocol Buffer
>>>>>>>> (protobuf) serialized data:
>>>>>>>>
>>>>>>>>   669 files, ~30GB total size (ranging 10MB to 100MB each).
>>>>>>>>   128MB block size.
>>>>>>>>   10 Hadoop Nodes.
>>>>>>>>
>>>>>>>> I tested my InputFormat and RecordReader for my input format, and it
>>>>>>>> showed about 56MB/s performance (single thread, no hadoop, passed in
>>>>>>>> test file via FileInputFormat instead of FSDataInputStream) on
>>>>>>>> hardware similar to what I have in my cluster.
>>>>>>>> I also then tested some simple Map logic along w/ the above, and got
>>>>>>>> around 54MB/s. I believe that difference can be accounted for parsing
>>>>>>>> the protobuf data into java objects.
>>>>>>>>
>>>>>>>> Anyways, when I put this logic into a job that has
>>>>>>>>   - no reduce (.setNumReduceTasks(0);)
>>>>>>>>   - no emit
>>>>>>>>   - just protobuf parsing calls (like above)
>>>>>>>>
>>>>>>>> I get a finish time of 10mins, 25sec, which is about 106.24 MB/s.
>>>>>>>>
>>>>>>>> So my question, why is the rate only 2x what I see on a single thread,
>>>>>>>> non-hadoop test? Would it not be:
>>>>>>>>   54MB/s x 10 (Num Nodes) - small hadoop overhead ?
>>>>>>>>
>>>>>>>> Is there any area of my configuration I should look into for tuning?
>>>>>>>>
>>>>>>>> Anyway I could get more accurate performance monitoring of my job?
>>>>>>>>
>>>>>>>> On a side note, I tried the same job after combining the files into
>>>>>>>> about 11 files (still 30GB in size), and actually saw a decrease in
>>>>>>>> performance (~90MB/s).
>>>>>>>>
>>>>>>>> Any help is appreciated. Thanks!
>>>>>>>>
>>>>>>>> Will
>>>>>>>>
>>>>>>>> some hadoop-site.xml values:
>>>>>>>> dfs.replication  3
>>>>>>>> io.file.buffer.size   65536
>>>>>>>> dfs.datanode.handler.count  3
>>>>>>>> mapred.tasktracker.map.tasks.maximum  6
>>>>>>>> dfs.namenode.handler.count  5
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>>
>

Reply via email to