I'm not sure what "looking up entries... in batches of 100 from Riak" devolves into in the java client but riak doesn't have a native multiget. It either does 100 get ops or a [search>]mapreduce. That might inform some of your performance issues.
-Alexander @siculars http://siculars.posthaven.com Sent from my iRotaryPhone > On Apr 14, 2014, at 8:26, Sean Allen <s...@monkeysnatchbanana.com> wrote: > > I'm seeing something very odd trying to scale out part of code I'm working on. > > It runs inside of Storm and lookups up entries from 10 node riak cluster. > I've hit a wall that we can't get past. We are looking up entries (json > representation of a job) > in batches of 100 from Riak, each batch gets handled by a bolt in Storm, > adding more > bolts (an instance of the bolt class with a dedicated thread) results in no > increase > in performance. I instrumted the code and saw that waiting for all riak > futures to finish > increases as more bolts are added. Thinking that perhaps there was contention > around the > RiakCluster object that we were sharing per jvm, I tried giving each bolt > instance its own > cluster object and there wasn't any change. > > Note that changing Thread spool size given to withExecutor not > withExecutionAttempts value > has any impact. > > We're working off of the develop branch for the java client. We've been using > d3cc30d but I also tried with cef7570 and had the same issue. > > A simplied version of the scala code running this: > > // called once upon bolt initialization. > def prepare(config: JMap[_, _], > context: TopologyContext, > collector: OutputCollector): Unit = { > ... > > val nodes = RiakNode.Builder.buildNodes(new RiakNode.Builder, (1 to > 10).map(n => s"riak-beavis-$n").toList.asJava) > riak = new RiakCluster.Builder(nodes) > // varying this has made no difference > .withExecutionAttempts(1) > // nor has varying this > .withExecutor(new ScheduledThreadPoolExecutor(200)) > .build() > riak.start > > ... > } > > private def get(jobLocationId: String): RiakFuture[FetchOperation.Response] > = { > val location = new > Location("jobseeker-job-view").setBucketType("no-siblings").setKey(jobLocationId) > val fop = new > FetchOperation.Builder(location).withTimeout(75).withR(1).build > > riak.execute(fop) > } > > def execute(tuple: Tuple): Unit = { > val indexType = tuple.getStringByField("index_type") > val indexName = tuple.getStringByField("index_name") > val batch = tuple.getValueByField("batch").asInstanceOf[Set[Payload]] > > var lookups: Set[(Payload, RiakFuture[FetchOperation.Response])] = > Set.empty > > // this always returns in a standard time based on batch size > time("dispatch-calls") { > lookups = batch.filter(_.key.isDefined).map { > payload => {(payload, get(payload.key.get))} > } > } > > val futures = lookups.map(_._2) > > // this is what takes longer and longer when more bolts are added. > // it doesnt matter what the sleep time is. > time("waiting-on-futures") { > while (futures.count(!_.isDone) > 0) { > Thread.sleep(25L) > } > } > > > // everything from here to the end returns in a fixed amount of time > // and doesn't change with the number of bolts > ... > > } > > > It seems like we are running into contention somewhere in the riak java > client. > My first thought was the LinkedBlockingQueue that serves as the retry queue > in RiakCluster > but, I've tried running with only a single execution attempt as well as a > custom client > version where I removed all retries from the codebase and still experience > the same problem. > > I'm still digging through the code looking for possible points of contention. > > Any thoughts? > > _______________________________________________ > riak-users mailing list > riak-users@lists.basho.com > http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com _______________________________________________ riak-users mailing list riak-users@lists.basho.com http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com