I'm not sure what "looking up entries... in batches of 100 from Riak" devolves 
into in the java client but riak doesn't have a native multiget. It either does 
100 get ops or a [search>]mapreduce. That might inform some of your performance 



Sent from my iRotaryPhone

> On Apr 14, 2014, at 8:26, Sean Allen <s...@monkeysnatchbanana.com> wrote:
> I'm seeing something very odd trying to scale out part of code I'm working on.
> It runs inside of Storm and lookups up entries from 10 node riak cluster.
> I've hit a wall that we can't get past. We are looking up entries (json 
> representation of a job)
> in batches of 100 from Riak, each batch gets handled by a bolt in Storm, 
> adding more
> bolts (an instance of the bolt class with a dedicated thread) results in no 
> increase
> in performance. I instrumted the code and saw that waiting for all riak 
> futures to finish
> increases as more bolts are added. Thinking that perhaps there was contention 
> around the
> RiakCluster object that we were sharing per jvm, I tried giving each bolt 
> instance its own
> cluster object and there wasn't any change.
> Note that changing Thread spool size given to withExecutor not 
> withExecutionAttempts value
> has any impact.
> We're working off of the develop branch for the java client. We've been using 
> d3cc30d but I also tried with cef7570 and had the same issue.
> A simplied version of the scala code running this:
>   // called once upon bolt initialization.
>   def prepare(config: JMap[_, _],
>               context: TopologyContext,
>               collector: OutputCollector): Unit = {
>     ...
>     val nodes = RiakNode.Builder.buildNodes(new RiakNode.Builder, (1 to 
> 10).map(n => s"riak-beavis-$n").toList.asJava)
>     riak = new RiakCluster.Builder(nodes)
>       // varying this has made no difference
>       .withExecutionAttempts(1)
>      // nor has varying this
>       .withExecutor(new ScheduledThreadPoolExecutor(200))
>       .build()
>     riak.start
>     ...
>   }
>   private def get(jobLocationId: String): RiakFuture[FetchOperation.Response] 
> = {
>     val location = new 
> Location("jobseeker-job-view").setBucketType("no-siblings").setKey(jobLocationId)
>     val fop = new 
> FetchOperation.Builder(location).withTimeout(75).withR(1).build
>     riak.execute(fop)
>   }
>   def execute(tuple: Tuple): Unit = {
>     val indexType = tuple.getStringByField("index_type")
>     val indexName = tuple.getStringByField("index_name")
>     val batch = tuple.getValueByField("batch").asInstanceOf[Set[Payload]]
>     var lookups: Set[(Payload, RiakFuture[FetchOperation.Response])] = 
> Set.empty
>     // this always returns in a standard time based on batch size
>     time("dispatch-calls") {
>       lookups = batch.filter(_.key.isDefined).map {
>         payload => {(payload, get(payload.key.get))}
>       }
>     }
>     val futures = lookups.map(_._2)
>     // this is what takes longer and longer when more bolts are added.
>     // it doesnt matter what the sleep time is.
>     time("waiting-on-futures") {
>       while (futures.count(!_.isDone) > 0) {
>         Thread.sleep(25L)
>       }
>     }
>     // everything from here to the end returns in a fixed amount of time
>     // and doesn't change with the number of bolts
>     ...
>   }
> It seems like we are running into contention somewhere in the riak java 
> client.
> My first thought was the LinkedBlockingQueue that serves as the retry queue 
> in RiakCluster
> but, I've tried running with only a single execution attempt as well as a 
> custom client
> version where I removed all retries from the codebase and still experience 
> the same problem.
> I'm still digging through the code looking for possible points of contention.
> Any thoughts?
> _______________________________________________
> riak-users mailing list
> riak-users@lists.basho.com
> http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com

riak-users mailing list

Reply via email to