Hi Christian, thank you for the detailed reply, It sheds light on some other issues that we're having and I'm beginning to believe that our map_js_vm_count and reduce_js_vm_count settings are set too low for our ring size. I will definitely be learning some Erlang.
best regards, Deyan On Jul 23, 2013, at 9:32 AM, Christian Dahlqvist <[email protected]> wrote: > Hi Deyan, > > As mentioned, it is recommended to write reduce phases it so that it can run > recursively [1], so that you can avoid having to use the > 'reduce_phase_only_1' parameter. Once you have a reduce function that behaves > this way, you can tune it by overriding the size of the > 'reduce_phase_batch_size' parameter, which by default is 20. You can also > specify the 'do_prereduce' parameter on the preceding map phase to make the > first reduce iteration run in parallel across the cluster before finishing > off at the coordinating node. This can significantly reduce the amount of > data transferred across the cluster and speed up performance, although it may > make the reduce phase functions a bit more difficult to write as they need to > be able to handle a mix of output from the preceding map phase as well as > results from previous iterations of the reduce phase. > > As JavaScript map and reduce functions use VMs from a pool specified in the > app.config file (map_js_vm_count and reduce_js_vm_count parameters), you will > need to tune the size of these parameters based on your processing needs. As > map phases run in parallel close to the partitions that hold the data, they > often require considerably more VMs available than reduce phase functions. > The exact number depends on the number of your ring size, the number of map > phases you have in the job and the number of concurrent jobs you will be > running. [2] > > Using JavaScript functions is however considerably slower than using > functions implemented in Erlang. For any functions that will execute > regularly or as part of large jobs, we always recommend rewriting them in > Erlang. In addition to speed things up, it removes the reliance on the > JavaScript pools. > > [1] http://docs.basho.com/riak/latest/dev/advanced/mapreduce/#How-Phases-Work > [2] > http://riak-users.197444.n3.nabble.com/Follow-up-Riak-Map-Reduce-error-preflist-exhausted-td4024330.html > > > Best regards, > > Christian > > > On 22 Jul 2013, at 17:03, Deyan Dyankov <[email protected]> wrote: > >> Thanks Christian, >> >> I was able to modify the job code in a similar manner as you suggested and >> the issue is now resolved. However, I'd still like to understand the cause >> of these timeouts and what parameter should be raised, if possible, to >> mitigate them? This particular job was not expected to perform in real time >> and we were willing to wait for it. We may have other such cases in the >> future.. >> >> best regards, >> Deyan >> >> On Jul 15, 2013, at 4:49 PM, Christian Dahlqvist <[email protected]> wrote: >> >>> Hi Deyan, >>> >>> When running mapreduce jobs, reduce phases often end up being the >>> bottleneck. This is especially true when all input data needs to be >>> gathered on the coordinating node before it can be executed, as is the case >>> if the reduce_phase_only_1 flag is enabled. Having this flag set will cause >>> the mapreduce job to not scale very well. >>> >>> Depending on your exact requirements, it may be worthwhile considering >>> gathering the histogram data periodically, e.g. per hour and/or day. These >>> aggregates can then be stored in separate buckets with a key that describes >>> the content, e.g. <cust>_<setup>_<date> . Once this has been done, you can >>> efficiently retrieve a limited number of objects that cover the period you >>> want to get statistics for directly through the descriptive keys, and >>> process these in the application layer. Even though this periodically >>> requires a bit more work, it will most likely be much more efficient at >>> query time and scale better. >>> >>> Best regards, >>> >>> Christian >>> >>> >>> On 14 Jul 2013, at 12:16, Deyan Dyankov <[email protected]> wrote: >>> >>>> Hi everyone, >>>> >>>> first time here. Thanks in advance. >>>> >>>> I am experiencing issues with MapReduce and it seems to timeout after a >>>> certain volume data threshold is reached. The reducer is only one and here >>>> is the mapreduce initiation script: >>>> #!/usr/bin/env ruby >>>> […] >>>> @client = Riak::Client.new( >>>> :nodes => [ >>>> {:host => 'db1', :pb_port => 8087, :http_port => 8098}, >>>> {:host => 'db2', :pb_port => 8087, :http_port => 8098}, >>>> {:host => 'db3', :pb_port => 8087, :http_port => 8098} >>>> ], >>>> :protocol => 'pbc' >>>> ) >>>> >>>> start_key = "#{cust}:#{setup}:#{start_time}" >>>> end_key = "#{cust}:#{setup}:#{end_time}" >>>> >>>> result = Riak::MapReduce.new(@client). >>>> index(bucket_name, index_name, start_key..end_key). >>>> map('map95th'). >>>> reduce('reduce95th', :arg => { 'reduce_phase_only_1' => true }, :keep => >>>> true). >>>> run() >>>> >>>> puts result >>>> >>>> The following is the code for the map95th and reduce95th javascript >>>> functions: >>>> function map95th(v, keyData, arg) { >>>> var key_elements = v['key'].split(':'); >>>> var cust = key_elements[0]; >>>> var setup = key_elements[1]; >>>> var sid = key_elements[2]; >>>> var ts = key_elements[3]; >>>> >>>> var result_key = cust + ':' + setup + ':' + ts; >>>> var obj = {} >>>> var obj_data = Riak.mapValuesJson(v)[0]; >>>> >>>> obj_data['bps'] = (obj_data['rx_bytes'] + obj_data['tx_bytes']) / 60; >>>> return_val = obj_data['bps']; >>>> return [ return_val ]; >>>> } >>>> >>>> // if used, this must be a single reducer! Call from Ruby like this: >>>> // reduce('reduce95th', :arg => { 'reduce_phase_only_1' => true }, :keep >>>> => true). >>>> function reduce95th(values) { >>>> var sorted = values.sort(function(a,b) { return a - b; }); >>>> var pct = sorted.length / 100; >>>> var element_95th = pct * 95; >>>> element_95th = parseInt(element_95th, 10) + 1; >>>> >>>> return [ sorted[element_95th] ]; >>>> } >>>> >>>> >>>> >>>> Now here is the interesting part. The MR goes through one record per >>>> minute. If I run it for a period of less than ~20 days, it executes. >>>> Otherwise, it times out: >>>> [deyandyankov@azobook ~/repos/loshko/mapreduce/ruby (master)]$ >>>> [deyandyankov@azobook ~/repos/loshko/mapreduce/ruby (master)]$ ./95h.rb >>>> yellingtone default $((`date +%s` - 20 * 86400)) `date +%s` >>>> 125581.51666666666 >>>> [deyandyankov@azobook ~/repos/loshko/mapreduce/ruby (master)]$ ./95h.rb >>>> yellingtone default $((`date +%s` - 30 * 86400)) `date +%s` >>>> /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/client/beefcake_protobuffs_backend.rb:182:in >>>> `decode_response': Expected success from Riak but received 0. >>>> {"phase":1,"error":"timeout","input":null,"type":null,"stack":null} >>>> (Riak::ProtobuffsFailedRequest) >>>> from >>>> /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/client/beefcake_protobuffs_backend.rb:116:in >>>> `mapred' >>>> from >>>> /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/client.rb:325:in >>>> `block in mapred' >>>> from >>>> /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/client.rb:435:in >>>> `block in recover_from' >>>> from >>>> /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/innertube-1.0.2/lib/innertube.rb:127:in >>>> `take' >>>> from >>>> /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/client.rb:433:in >>>> `recover_from' >>>> from >>>> /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/client.rb:379:in >>>> `protobuffs' >>>> from >>>> /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/client.rb:133:in >>>> `backend' >>>> from >>>> /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/client.rb:324:in >>>> `mapred' >>>> from >>>> /Users/deyandyankov/.rvm/gems/ruby-1.9.3-p392/gems/riak-client-1.1.1/lib/riak/map_reduce.rb:217:in >>>> `run' >>>> from ./95h.rb:29:in `<main>' >>>> [deyandyankov@azobook ~/repos/loshko/mapreduce/ruby (master)]$ >>>> >>>> The records being processed look lie this: >>>> {"rx_bytes":3485395.0,"tx_bytes":1658479.0} >>>> >>>> When running the script with more than 20 days worth of data (two records >>>> per minute are processed, which amounts to 2 * 60 * 24 * 20 = more than >>>> 57,600 processed), the script times out and here are some things from the >>>> logs: >>>> ==> /var/log/riak/erlang.log.1 <== >>>> Erlang has closed >>>> >>>> ==> /var/log/riak/error.log <== >>>> 2013-07-14 13:03:51.580 [error] <0.709.0>@riak_pipe_vnode:new_worker:768 >>>> Pipe worker startup failed:fitting was gone before startup >>>> >>>> ==> /var/log/riak/console.log <== >>>> 2013-07-14 13:03:51.584 [error] <0.22049.4326> gen_fsm <0.22049.4326> in >>>> state wait_for_input terminated with reason: timeout >>>> >>>> ==> /var/log/riak/error.log <== >>>> 2013-07-14 13:03:51.584 [error] <0.22049.4326> gen_fsm <0.22049.4326> in >>>> state wait_for_input terminated with reason: timeout >>>> >>>> ==> /var/log/riak/console.log <== >>>> 2013-07-14 13:03:51.940 [error] <0.22049.4326> CRASH REPORT Process >>>> <0.22049.4326> with 0 neighbours exited with reason: timeout in >>>> gen_fsm:terminate/7 line 611 >>>> >>>> ==> /var/log/riak/crash.log <== >>>> 2013-07-14 13:03:51 =CRASH REPORT==== >>>> crasher: >>>> initial call: riak_pipe_vnode_worker:init/1 >>>> pid: <0.22049.4326> >>>> registered_name: [] >>>> exception exit: >>>> {timeout,[{gen_fsm,terminate,7,[{file,"gen_fsm.erl"},{line,611}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,227}]}]} >>>> ancestors: >>>> [<0.710.0>,<0.709.0>,riak_core_vnode_sup,riak_core_sup,<0.129.0>] >>>> messages: [] >>>> links: [<0.710.0>,<0.709.0>] >>>> dictionary: >>>> [{eunit,[{module,riak_pipe_vnode_worker},{partition,388211372416021087647853783690262677096107081728},{<0.709.0>,<0.709.0>},{details,{fitting_details,{fitting,<18125.23420.4566>,#Ref<18125.0.5432.50467>,<<"C�������������������">>,1},1,riak_kv_w_reduce,{rct,#Fun<riak_kv_w_reduce.0.20542221>,{struct,[{<<"reduce_phase_only_1">>,true}]}},{fitting,<18125.23418.4566>,#Ref<18125.0.5432.50467>,sink,undefined},[{log,sink},{trace,[error]},{sink,{fitting,<18125.23418.4566>,#Ref<18125.0.5432.50467>,sink,undefined}},{sink_type,{fsm,10,infinity}}],64}}]}] >>>> trap_exit: false >>>> status: running >>>> heap_size: 832040 >>>> stack_size: 24 >>>> reductions: 1456611 >>>> neighbours: >>>> >>>> ==> /var/log/riak/error.log <== >>>> 2013-07-14 13:03:51.940 [error] <0.22049.4326> CRASH REPORT Process >>>> <0.22049.4326> with 0 neighbours exited with reason: timeout in >>>> gen_fsm:terminate/7 line 611 >>>> >>>> ==> /var/log/riak/crash.log <== >>>> 2013-07-14 13:03:52 =SUPERVISOR REPORT==== >>>> Supervisor: {<0.710.0>,riak_pipe_vnode_worker_sup} >>>> Context: child_terminated >>>> Reason: timeout >>>> Offender: >>>> [{pid,<0.22049.4326>},{name,undefined},{mfargs,{riak_pipe_vnode_worker,start_link,undefined}},{restart_type,temporary},{shutdown,2000},{child_type,worker}] >>>> >>>> >>>> ==> /var/log/riak/console.log <== >>>> 2013-07-14 13:03:52.059 [error] <0.710.0> Supervisor >>>> riak_pipe_vnode_worker_sup had child undefined started with >>>> {riak_pipe_vnode_worker,start_link,undefined} at <0.22049.4326> exit with >>>> reason timeout in context child_terminated >>>> >>>> ==> /var/log/riak/error.log <== >>>> 2013-07-14 13:03:52.059 [error] <0.710.0> Supervisor >>>> riak_pipe_vnode_worker_sup had child undefined started with >>>> {riak_pipe_vnode_worker,start_link,undefined} at <0.22049.4326> exit with >>>> reason timeout in context child_terminated >>>> >>>> >>>> The data is in leveldb and is accessed through secondary indexes. >>>> This is a 3 node cluster with 32GB ram, current usage is about 12G per >>>> node. n_val=3. The same issues occurs on a similar 2 node cluster with 8GB >>>> of ram (usage is ~6G). >>>> >>>> The following is my app.config: >>>> [ >>>> {riak_api, [ >>>> {pb_ip, "0.0.0.0" }, >>>> {pb_port, 8087 }, >>>> {pb_backlog, 100 } >>>> ]}, >>>> {riak_core, [ >>>> {default_bucket_props, [ >>>> {n_val, 3}, >>>> {last_write_wins, true} >>>> ]}, >>>> {ring_state_dir, "/storage/riak/ring"}, >>>> {ring_creation_size, 256}, >>>> {http, [ {"0.0.0.0", 8098 } ]}, >>>> {https, [{ "0.0.0.0", 8069 }]}, >>>> {ssl, [ >>>> {certfile, "/etc/ssl/riak/server.crt"}, >>>> {cacertfile, "/etc/ssl/riak/root.crt"}, >>>> {keyfile, "/etc/ssl/riak/server.key"} >>>> ]}, >>>> {handoff_port, 8099 }, >>>> {dtrace_support, false}, >>>> {enable_health_checks, true}, >>>> {platform_bin_dir, "/usr/sbin"}, >>>> {platform_data_dir, "/storage/riak"}, >>>> {platform_etc_dir, "/etc/riak"}, >>>> {platform_lib_dir, "/usr/lib/riak/lib"}, >>>> {platform_log_dir, "/var/log/riak"} >>>> ]}, >>>> {riak_kv, [ >>>> {storage_backend, riak_kv_eleveldb_backend}, >>>> {anti_entropy, {on, []}}, >>>> {anti_entropy_build_limit, {1, 3600000}}, >>>> {anti_entropy_expire, 604800000}, >>>> {anti_entropy_concurrency, 2}, >>>> {anti_entropy_tick, 15000}, >>>> {anti_entropy_data_dir, "/storage/riak/anti_entropy"}, >>>> {anti_entropy_leveldb_opts, [{write_buffer_size, 4194304}, >>>> {max_open_files, 20}]}, >>>> >>>> {mapred_name, "mapred"}, >>>> {mapred_2i_pipe, true}, >>>> {map_js_vm_count, 16 }, >>>> {reduce_js_vm_count, 12 }, >>>> {hook_js_vm_count, 20 }, >>>> {js_max_vm_mem, 8}, >>>> {js_thread_stack, 16}, >>>> {js_source_dir, "/etc/riak/mapreduce/js_source"}, >>>> {http_url_encoding, on}, >>>> {vnode_vclocks, true}, >>>> {listkeys_backpressure, true}, >>>> {vnode_mailbox_limit, {1, 5000}} >>>> ]}, >>>> >>>> {riak_search, [ >>>> {enabled, true} >>>> ]}, >>>> >>>> {merge_index, [ >>>> {data_root, "/storage/riak/merge_index"}, >>>> {buffer_rollover_size, 1048576}, >>>> {max_compact_segments, 20} >>>> ]}, >>>> >>>> {bitcask, [ >>>> {data_root, "/storage/riak/bitcask"} >>>> ]}, >>>> >>>> {eleveldb, [ >>>> {cache_size, 1024}, >>>> {max_open_files, 64}, >>>> {data_root, "/storage/riak/leveldb"} >>>> ]}, >>>> >>>> {lager, [ >>>> {handlers, [ >>>> {lager_file_backend, [ >>>> {"/var/log/riak/error.log", error, >>>> 10485760, "$D0", 5}, >>>> {"/var/log/riak/console.log", info, >>>> 10485760, "$D0", 5} >>>> ]} >>>> ] }, >>>> >>>> {crash_log, "/var/log/riak/crash.log"}, >>>> {crash_log_msg_size, 65536}, >>>> {crash_log_size, 10485760}, >>>> {crash_log_date, "$D0"}, >>>> {crash_log_count, 5}, >>>> {error_logger_redirect, true} >>>> ]}, >>>> >>>> {riak_sysmon, [ >>>> {process_limit, 30}, >>>> {port_limit, 2}, >>>> {gc_ms_limit, 0}, >>>> {heap_word_limit, 40111000}, >>>> {busy_port, true}, >>>> {busy_dist_port, true} >>>> ]}, >>>> >>>> {sasl, [ >>>> {sasl_error_logger, false} >>>> ]}, >>>> >>>> Sorry to bug you with such a long e-mail but I wanted to be as thorough as >>>> possible. I tried raising a few options but it didn't help: >>>> map_js_vm_count, reduce_js_vm_count, js_max_vm_mem >>>> I also tried adding a timeout argument to the map reduce caller code but >>>> even if I set it to 60,000 or more (this is milliseconds), the script is >>>> terminating with timeout error after 10-12 secs. The same behaviour is >>>> observed if I use http instead of pbc. >>>> >>>> What seems to be the problem? Is this a matter of configuration? I am >>>> surprised about the fact that the job runs with 20-25 days of data and not >>>> more. >>>> >>>> thank you for your efforts, >>>> Deyan >>>> _______________________________________________ >>>> riak-users mailing list >>>> [email protected] >>>> http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com >>> >> >
_______________________________________________ riak-users mailing list [email protected] http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com
