Hi Christian,

thank you for the detailed reply, It sheds light on some other issues that 
we're having and I'm beginning to believe that our map_js_vm_count and 
reduce_js_vm_count settings are set too low for our ring size.
I will definitely be learning some Erlang.

best regards,
Deyan

On Jul 23, 2013, at 9:32 AM, Christian Dahlqvist <[email protected]> wrote:

> Hi Deyan,
> 
> As mentioned, it is recommended to write reduce phases it so that it can run 
> recursively [1], so that you can avoid having to use the 
> 'reduce_phase_only_1' parameter. Once you have a reduce function that behaves 
> this way, you can tune it by overriding the size of the 
> 'reduce_phase_batch_size' parameter, which by default is 20. You can also 
> specify the 'do_prereduce' parameter on the preceding map phase to make the 
> first reduce iteration run in parallel across the cluster before finishing 
> off at the coordinating node. This can significantly reduce the amount of 
> data transferred across the cluster and speed up performance, although it may 
> make the reduce phase functions a bit more difficult to write as they need to 
> be able to handle a mix of output from the preceding map phase as well as 
> results from previous iterations of the reduce phase.
> 
> As JavaScript map and reduce functions use VMs from a pool specified in the 
> app.config file (map_js_vm_count and reduce_js_vm_count parameters), you will 
> need to tune the size of these parameters based on your processing needs. As 
> map phases run in parallel close to the partitions that hold the data, they 
> often require considerably more VMs available than reduce phase functions. 
> The exact number depends on the number of your ring size, the number of map 
> phases you have in the job and the number of concurrent jobs you will be 
> running. [2]
> 
> Using JavaScript functions is however considerably slower than using 
> functions implemented in Erlang. For any functions that will execute 
> regularly or as part of large jobs, we always recommend rewriting them in 
> Erlang. In addition to speed things up, it removes the reliance on the 
> JavaScript pools.
> 
> [1] http://docs.basho.com/riak/latest/dev/advanced/mapreduce/#How-Phases-Work
> [2] 
> http://riak-users.197444.n3.nabble.com/Follow-up-Riak-Map-Reduce-error-preflist-exhausted-td4024330.html
> 
> 
> Best regards,
> 
> Christian
> 
> 
> On 22 Jul 2013, at 17:03, Deyan Dyankov <[email protected]> wrote:
> 
>> Thanks Christian,
>> 
>> I was able to modify the job code in a similar manner as you suggested and 
>> the issue is now resolved. However, I'd still like to understand the cause 
>> of these timeouts and what parameter should be raised, if possible, to 
>> mitigate them? This particular job was not expected to perform in real time 
>> and we were willing to wait for it. We may have other such cases in the 
>> future..
>> 
>> best regards,
>> Deyan
>> 
>> On Jul 15, 2013, at 4:49 PM, Christian Dahlqvist <[email protected]> wrote:
>> 
>>> Hi Deyan,
>>> 
>>> When running mapreduce jobs, reduce phases often end up being the 
>>> bottleneck. This is especially true when all input data needs to be 
>>> gathered on the coordinating node before it can be executed, as is the case 
>>> if the reduce_phase_only_1 flag is enabled. Having this flag set will cause 
>>> the mapreduce job to not scale very well.
>>> 
>>> Depending on your exact requirements, it may be worthwhile considering 
>>> gathering the histogram data periodically, e.g. per hour and/or day. These 
>>> aggregates can then be stored in separate buckets with a key that describes 
>>> the content, e.g. <cust>_<setup>_<date> . Once this has been done, you can 
>>> efficiently retrieve a limited number of objects that cover the period you 
>>> want to get statistics for directly through the descriptive keys, and 
>>> process these in the application layer. Even though this periodically 
>>> requires a bit more work, it will most likely be much more efficient at 
>>> query time and scale better.
>>> 
>>> Best regards,
>>> 
>>> Christian
>>> 
>>> 
>>> On 14 Jul 2013, at 12:16, Deyan Dyankov <[email protected]> wrote:
>>> 
>>>> Hi everyone,
>>>> 
>>>> first time here. Thanks in advance.
>>>> 
>>>> I am experiencing issues with MapReduce and it seems to timeout after a 
>>>> certain volume data threshold is reached. The reducer is only one and here 
>>>> is the mapreduce initiation script:
>>>> #!/usr/bin/env ruby
>>>> […]
>>>> @client = Riak::Client.new(
>>>>   :nodes => [
>>>>     {:host => 'db1', :pb_port => 8087, :http_port => 8098},
>>>>     {:host => 'db2', :pb_port => 8087, :http_port => 8098},
>>>>     {:host => 'db3', :pb_port => 8087, :http_port => 8098}
>>>>   ],
>>>>   :protocol => 'pbc'
>>>> )
>>>> 
>>>> start_key = "#{cust}:#{setup}:#{start_time}"
>>>> end_key = "#{cust}:#{setup}:#{end_time}"
>>>> 
>>>> result = Riak::MapReduce.new(@client).
>>>>   index(bucket_name, index_name, start_key..end_key).
>>>>   map('map95th').
>>>>   reduce('reduce95th', :arg => { 'reduce_phase_only_1' => true }, :keep => 
>>>> true).
>>>>   run()
>>>> 
>>>> puts result
>>>> 
>>>> The following is the code for the map95th and reduce95th javascript 
>>>> functions:
>>>> function map95th(v, keyData, arg) {
>>>>   var key_elements = v['key'].split(':');
>>>>   var cust = key_elements[0];
>>>>   var setup = key_elements[1];
>>>>   var sid = key_elements[2];
>>>>   var ts = key_elements[3];
>>>> 
>>>>   var result_key = cust + ':' + setup + ':' + ts;
>>>>   var obj = {}
>>>>   var obj_data = Riak.mapValuesJson(v)[0];
>>>> 
>>>>   obj_data['bps'] = (obj_data['rx_bytes'] + obj_data['tx_bytes']) / 60;
>>>>   return_val = obj_data['bps'];
>>>>   return [ return_val ];
>>>> }
>>>> 
>>>> // if used, this must be a single reducer! Call from Ruby like this:
>>>> //  reduce('reduce95th', :arg => { 'reduce_phase_only_1' => true }, :keep 
>>>> => true).
>>>> function reduce95th(values) {
>>>>   var sorted = values.sort(function(a,b) { return a - b; });
>>>>   var pct = sorted.length / 100;
>>>>   var element_95th = pct * 95;
>>>>   element_95th = parseInt(element_95th, 10) + 1;
>>>> 
>>>>   return [ sorted[element_95th] ];
>>>> }
>>>> 
>>>> 
>>>> 
>>>> Now here is the interesting part. The MR goes through one record per 
>>>> minute. If I run it for a period of less than ~20 days, it executes. 
>>>> Otherwise, it times out:
>>>> [deyandyankov@azobook ~/repos/loshko/mapreduce/ruby (master)]$
>>>> [deyandyankov@azobook ~/repos/loshko/mapreduce/ruby (master)]$ ./95h.rb 
>>>> yellingtone default $((`date +%s` - 20 * 86400)) `date +%s`
>>>> 125581.51666666666
>>>> [deyandyankov@azobook ~/repos/loshko/mapreduce/ruby (master)]$ ./95h.rb 
>>>> yellingtone default $((`date +%s` - 30 * 86400)) `date +%s`
>>>> /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/client/beefcake_protobuffs_backend.rb:182:in
>>>>  `decode_response': Expected success from Riak but received 0. 
>>>> {"phase":1,"error":"timeout","input":null,"type":null,"stack":null} 
>>>> (Riak::ProtobuffsFailedRequest)
>>>>    from 
>>>> /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/client/beefcake_protobuffs_backend.rb:116:in
>>>>  `mapred'
>>>>    from 
>>>> /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/client.rb:325:in
>>>>  `block in mapred'
>>>>    from 
>>>> /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/client.rb:435:in
>>>>  `block in recover_from'
>>>>    from 
>>>> /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/innertube-1.0.2/lib/innertube.rb:127:in
>>>>  `take'
>>>>    from 
>>>> /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/client.rb:433:in
>>>>  `recover_from'
>>>>    from 
>>>> /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/client.rb:379:in
>>>>  `protobuffs'
>>>>    from 
>>>> /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/client.rb:133:in
>>>>  `backend'
>>>>    from 
>>>> /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/client.rb:324:in
>>>>  `mapred'
>>>>    from 
>>>> /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/map_reduce.rb:217:in
>>>>  `run'
>>>>    from ./95h.rb:29:in `<main>'
>>>> [deyandyankov@azobook ~/repos/loshko/mapreduce/ruby (master)]$
>>>> 
>>>> The records being processed look lie this:
>>>> {"rx_bytes":3485395.0,"tx_bytes":1658479.0}
>>>> 
>>>> When running the script with more than 20 days worth of data (two records 
>>>> per minute are processed, which amounts to 2 * 60 * 24 * 20 = more than 
>>>> 57,600 processed), the script times out and here are some things from the 
>>>> logs:
>>>> ==> /var/log/riak/erlang.log.1 <==
>>>> Erlang has closed
>>>> 
>>>> ==> /var/log/riak/error.log <==
>>>> 2013-07-14 13:03:51.580 [error] <0.709.0>@riak_pipe_vnode:new_worker:768 
>>>> Pipe worker startup failed:fitting was gone before startup
>>>> 
>>>> ==> /var/log/riak/console.log <==
>>>> 2013-07-14 13:03:51.584 [error] <0.22049.4326> gen_fsm <0.22049.4326> in 
>>>> state wait_for_input terminated with reason: timeout
>>>> 
>>>> ==> /var/log/riak/error.log <==
>>>> 2013-07-14 13:03:51.584 [error] <0.22049.4326> gen_fsm <0.22049.4326> in 
>>>> state wait_for_input terminated with reason: timeout
>>>> 
>>>> ==> /var/log/riak/console.log <==
>>>> 2013-07-14 13:03:51.940 [error] <0.22049.4326> CRASH REPORT Process 
>>>> <0.22049.4326> with 0 neighbours exited with reason: timeout in 
>>>> gen_fsm:terminate/7 line 611
>>>> 
>>>> ==> /var/log/riak/crash.log <==
>>>> 2013-07-14 13:03:51 =CRASH REPORT====
>>>>   crasher:
>>>>     initial call: riak_pipe_vnode_worker:init/1
>>>>     pid: <0.22049.4326>
>>>>     registered_name: []
>>>>     exception exit: 
>>>> {timeout,[{gen_fsm,terminate,7,[{file,"gen_fsm.erl"},{line,611}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,227}]}]}
>>>>     ancestors: 
>>>> [<0.710.0>,<0.709.0>,riak_core_vnode_sup,riak_core_sup,<0.129.0>]
>>>>     messages: []
>>>>     links: [<0.710.0>,<0.709.0>]
>>>>     dictionary: 
>>>> [{eunit,[{module,riak_pipe_vnode_worker},{partition,388211372416021087647853783690262677096107081728},{<0.709.0>,<0.709.0>},{details,{fitting_details,{fitting,<18125.23420.4566>,#Ref<18125.0.5432.50467>,<<"C�������������������">>,1},1,riak_kv_w_reduce,{rct,#Fun<riak_kv_w_reduce.0.20542221>,{struct,[{<<"reduce_phase_only_1">>,true}]}},{fitting,<18125.23418.4566>,#Ref<18125.0.5432.50467>,sink,undefined},[{log,sink},{trace,[error]},{sink,{fitting,<18125.23418.4566>,#Ref<18125.0.5432.50467>,sink,undefined}},{sink_type,{fsm,10,infinity}}],64}}]}]
>>>>     trap_exit: false
>>>>     status: running
>>>>     heap_size: 832040
>>>>     stack_size: 24
>>>>     reductions: 1456611
>>>>   neighbours:
>>>> 
>>>> ==> /var/log/riak/error.log <==
>>>> 2013-07-14 13:03:51.940 [error] <0.22049.4326> CRASH REPORT Process 
>>>> <0.22049.4326> with 0 neighbours exited with reason: timeout in 
>>>> gen_fsm:terminate/7 line 611
>>>> 
>>>> ==> /var/log/riak/crash.log <==
>>>> 2013-07-14 13:03:52 =SUPERVISOR REPORT====
>>>>      Supervisor: {<0.710.0>,riak_pipe_vnode_worker_sup}
>>>>      Context:    child_terminated
>>>>      Reason:     timeout
>>>>      Offender:   
>>>> [{pid,<0.22049.4326>},{name,undefined},{mfargs,{riak_pipe_vnode_worker,start_link,undefined}},{restart_type,temporary},{shutdown,2000},{child_type,worker}]
>>>> 
>>>> 
>>>> ==> /var/log/riak/console.log <==
>>>> 2013-07-14 13:03:52.059 [error] <0.710.0> Supervisor 
>>>> riak_pipe_vnode_worker_sup had child undefined started with 
>>>> {riak_pipe_vnode_worker,start_link,undefined} at <0.22049.4326> exit with 
>>>> reason timeout in context child_terminated
>>>> 
>>>> ==> /var/log/riak/error.log <==
>>>> 2013-07-14 13:03:52.059 [error] <0.710.0> Supervisor 
>>>> riak_pipe_vnode_worker_sup had child undefined started with 
>>>> {riak_pipe_vnode_worker,start_link,undefined} at <0.22049.4326> exit with 
>>>> reason timeout in context child_terminated
>>>> 
>>>> 
>>>> The data is in leveldb and is accessed through secondary indexes. 
>>>> This is a 3 node cluster with 32GB ram, current usage is about 12G per 
>>>> node. n_val=3. The same issues occurs on a similar 2 node cluster with 8GB 
>>>> of ram (usage is ~6G).
>>>> 
>>>> The following is my app.config:
>>>> [
>>>>  {riak_api, [
>>>>             {pb_ip,   "0.0.0.0" },
>>>>             {pb_port, 8087 },
>>>>             {pb_backlog, 100 }
>>>>             ]},
>>>>  {riak_core, [
>>>>               {default_bucket_props, [
>>>>                     {n_val, 3},
>>>>                     {last_write_wins, true}
>>>>                     ]},
>>>>               {ring_state_dir, "/storage/riak/ring"},
>>>>               {ring_creation_size, 256},
>>>>               {http, [ {"0.0.0.0", 8098 } ]},
>>>>               {https, [{ "0.0.0.0", 8069 }]},
>>>>               {ssl, [
>>>>                      {certfile, "/etc/ssl/riak/server.crt"},
>>>>                      {cacertfile, "/etc/ssl/riak/root.crt"},
>>>>                      {keyfile, "/etc/ssl/riak/server.key"}
>>>>                     ]},
>>>>               {handoff_port, 8099 },
>>>>               {dtrace_support, false},
>>>>               {enable_health_checks, true},
>>>>               {platform_bin_dir, "/usr/sbin"},
>>>>               {platform_data_dir, "/storage/riak"},
>>>>               {platform_etc_dir, "/etc/riak"},
>>>>               {platform_lib_dir, "/usr/lib/riak/lib"},
>>>>               {platform_log_dir, "/var/log/riak"}
>>>>              ]},
>>>>  {riak_kv, [
>>>>             {storage_backend, riak_kv_eleveldb_backend},
>>>>             {anti_entropy, {on, []}},
>>>>             {anti_entropy_build_limit, {1, 3600000}},
>>>>             {anti_entropy_expire, 604800000},
>>>>             {anti_entropy_concurrency, 2},
>>>>             {anti_entropy_tick, 15000},
>>>>             {anti_entropy_data_dir, "/storage/riak/anti_entropy"},
>>>>             {anti_entropy_leveldb_opts, [{write_buffer_size, 4194304},
>>>>                                          {max_open_files, 20}]},
>>>> 
>>>>             {mapred_name, "mapred"},
>>>>             {mapred_2i_pipe, true},
>>>>             {map_js_vm_count, 16 },
>>>>             {reduce_js_vm_count, 12 },
>>>>             {hook_js_vm_count, 20 },
>>>>             {js_max_vm_mem, 8},
>>>>             {js_thread_stack, 16},
>>>>             {js_source_dir, "/etc/riak/mapreduce/js_source"},
>>>>             {http_url_encoding, on},
>>>>             {vnode_vclocks, true},
>>>>             {listkeys_backpressure, true},
>>>>             {vnode_mailbox_limit, {1, 5000}}
>>>>            ]},
>>>> 
>>>>  {riak_search, [
>>>>                 {enabled, true}
>>>>                ]},
>>>> 
>>>>  {merge_index, [
>>>>                 {data_root, "/storage/riak/merge_index"},
>>>>                 {buffer_rollover_size, 1048576},
>>>>                 {max_compact_segments, 20}
>>>>                ]},
>>>> 
>>>>  {bitcask, [
>>>>              {data_root, "/storage/riak/bitcask"}
>>>>            ]},
>>>> 
>>>>  {eleveldb, [
>>>>              {cache_size, 1024},
>>>>              {max_open_files, 64},
>>>>              {data_root, "/storage/riak/leveldb"}
>>>>             ]},
>>>> 
>>>>  {lager, [
>>>>             {handlers, [
>>>>                            {lager_file_backend, [
>>>>                                {"/var/log/riak/error.log", error, 
>>>> 10485760, "$D0", 5},
>>>>                                {"/var/log/riak/console.log", info, 
>>>> 10485760, "$D0", 5}
>>>>                            ]}
>>>>                        ] },
>>>> 
>>>>             {crash_log, "/var/log/riak/crash.log"},
>>>>             {crash_log_msg_size, 65536},
>>>>             {crash_log_size, 10485760},
>>>>             {crash_log_date, "$D0"},
>>>>             {crash_log_count, 5},
>>>>             {error_logger_redirect, true}
>>>>         ]},
>>>> 
>>>>  {riak_sysmon, [
>>>>          {process_limit, 30},
>>>>          {port_limit, 2},
>>>>          {gc_ms_limit, 0},
>>>>          {heap_word_limit, 40111000},
>>>>          {busy_port, true},
>>>>          {busy_dist_port, true}
>>>>         ]},
>>>> 
>>>>  {sasl, [
>>>>          {sasl_error_logger, false}
>>>>         ]},
>>>> 
>>>> Sorry to bug you with such a long e-mail but I wanted to be as thorough as 
>>>> possible. I tried raising a few options but it didn't help: 
>>>> map_js_vm_count, reduce_js_vm_count, js_max_vm_mem
>>>> I also tried adding a timeout argument to the map reduce caller code but 
>>>> even if I set it to 60,000 or more (this is milliseconds), the script is 
>>>> terminating with timeout error after 10-12 secs. The same behaviour is 
>>>> observed if I use http instead of pbc.
>>>> 
>>>> What seems to be the problem? Is this a matter of configuration? I am 
>>>> surprised about the fact that the job runs with 20-25 days of data and not 
>>>> more.
>>>> 
>>>> thank you for your efforts,
>>>> Deyan
>>>> _______________________________________________
>>>> riak-users mailing list
>>>> [email protected]
>>>> http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com
>>> 
>> 
> 

_______________________________________________
riak-users mailing list
[email protected]
http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com

Reply via email to