@Silvio: the mapPartitions instantiates a HttpSolrServer, then for each query string in the partition, sends the query to Solr using SolrJ, and gets back the top N results. It then reformats the result data into one long string and returns the key value pair as (query string, result string).
@Igor: Thanks for the parameter suggestions. I will check the --num-executors and if there is a way to set the number of cores/executor with my Databricks admin and update here if I find it, but from the Databricks console, it appears that the number of executors per box is 1. This seems normal though, per the diagram on this page: http://spark.apache.org/docs/latest/cluster-overview.html where it seems that there is 1 executor per box, and each executor can spawn multiple threads to take care of multiple tasks (see bullet #1 copied below). > Each application gets its own executor processes, which stay up for the > duration of the whole application and run tasks in multiple threads. This > has the benefit of isolating applications from each other, on both the > scheduling side (each driver schedules its own tasks) and executor side > (tasks from different applications run in different JVMs). Regarding hitting the max number of requests, thanks for the link. I am using the default client. Just peeked at the Solr code, and the default settings (if no HttpClient instance is supplied in the ctor) is to use DefaultHttpClient (from HttpComponents) whose settings are as follows: > > - Version: HttpVersion.HTTP_1_1 > > > - ContentCharset: HTTP.DEFAULT_CONTENT_CHARSET > > > - NoTcpDelay: true > > > - SocketBufferSize: 8192 > > > - UserAgent: Apache-HttpClient/release (java 1.5) > > In addition, the Solr code sets the following additional config parameters on the DefaultHttpClient. params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, 128); > params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, 32); > params.set(HttpClientUtil.PROP_FOLLOW_REDIRECTS, followRedirects); Since all my connections are coming out of 2 worker boxes, it looks like I could get 32x2 = 64 clients hitting Solr, right? @Steve: Thanks for the link to the HttpClient config. I was thinking about using a thread pool (or better using a PoolingHttpClientManager per the docs), but it probably won't help since its still being fed one request at a time. @Abhishek: my observations agree with what you said. In the past I have had success with repartition to reduce the partition size especially when groupBy operations were involved. But I believe an executor should be able to handle multiple tasks in parallel from what I understand about Akka on which Spark is built - the worker is essentially an ActorSystem which can contain multiple Actors, each actor works on a queue of tasks. Within an Actor everything is sequential, but the ActorSystem is responsible for farming out tasks it gets to each of its Actors. Although it is possible I could be generalizing incorrectly from my limited experience with Akka. Thanks again for all your help. Please let me know if something jumps out and/or if there is some configuration I should check. -sujit On Sun, Aug 2, 2015 at 6:13 PM, Abhishek R. Singh < abhis...@tetrationanalytics.com> wrote: > I don't know if (your assertion/expectation that) workers will process > things (multiple partitions) in parallel is really valid. Or if having more > partitions than workers will necessarily help (unless you are memory bound > - so partitions is essentially helping your work size rather than execution > parallelism). > > [Disclaimer: I am no authority on Spark, but wanted to throw my spin based > my own understanding]. > > Nothing official about it :) > > -abhishek- > > On Jul 31, 2015, at 1:03 PM, Sujit Pal <sujitatgt...@gmail.com> wrote: > > Hello, > > I am trying to run a Spark job that hits an external webservice to get > back some information. The cluster is 1 master + 4 workers, each worker has > 60GB RAM and 4 CPUs. The external webservice is a standalone Solr server, > and is accessed using code similar to that shown below. > > def getResults(keyValues: Iterator[(String, Array[String])]): >> Iterator[(String, String)] = { >> val solr = new HttpSolrClient() >> initializeSolrParameters(solr) >> keyValues.map(keyValue => (keyValue._1, process(solr, keyValue))) >> } >> myRDD.repartition(10) > > .mapPartitions(keyValues => getResults(keyValues)) >> > > The mapPartitions does some initialization to the SolrJ client per > partition and then hits it for each record in the partition via the > getResults() call. > > I repartitioned in the hope that this will result in 10 clients hitting > Solr simultaneously (I would like to go upto maybe 30-40 simultaneous > clients if I can). However, I counted the number of open connections using > "netstat -anp | grep ":8983.*ESTABLISHED" in a loop on the Solr box and > observed that Solr has a constant 4 clients (ie, equal to the number of > workers) over the lifetime of the run. > > My observation leads me to believe that each worker processes a single > stream of work sequentially. However, from what I understand about how > Spark works, each worker should be able to process number of tasks > parallelly, and that repartition() is a hint for it to do so. > > Is there some SparkConf environment variable I should set to increase > parallelism in these workers, or should I just configure a cluster with > multiple workers per machine? Or is there something I am doing wrong? > > Thank you in advance for any pointers you can provide. > > -sujit > >