@Silvio: the mapPartitions instantiates a HttpSolrServer, then for each
query string in the partition, sends the query to Solr using SolrJ, and
gets back the top N results. It then reformats the result data into one
long string and returns the key value pair as (query string, result string).

@Igor: Thanks for the parameter suggestions. I will check the
--num-executors and if there is a way to set the number of cores/executor
with my Databricks admin and update here if I find it, but from the
Databricks console, it appears that the number of executors per box is 1.
This seems normal though, per the diagram on this page:

http://spark.apache.org/docs/latest/cluster-overview.html

where it seems that there is 1 executor per box, and each executor can
spawn multiple threads to take care of multiple tasks (see bullet #1 copied
below).

> Each application gets its own executor processes, which stay up for the
> duration of the whole application and run tasks in multiple threads. This
> has the benefit of isolating applications from each other, on both the
> scheduling side (each driver schedules its own tasks) and executor side
> (tasks from different applications run in different JVMs).


Regarding hitting the max number of requests, thanks for the link. I am
using the default client. Just peeked at the Solr code, and the default
settings (if no HttpClient instance is supplied in the ctor) is to use
DefaultHttpClient (from HttpComponents) whose settings are as follows:

>
>    - Version: HttpVersion.HTTP_1_1
>
>
>    - ContentCharset: HTTP.DEFAULT_CONTENT_CHARSET
>
>
>    - NoTcpDelay: true
>
>
>    - SocketBufferSize: 8192
>
>
>    - UserAgent: Apache-HttpClient/release (java 1.5)
>
> In addition, the Solr code sets the following additional config parameters
on the DefaultHttpClient.

      params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, 128);
>       params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, 32);
>       params.set(HttpClientUtil.PROP_FOLLOW_REDIRECTS, followRedirects);

Since all my connections are coming out of 2 worker boxes, it looks like I
could get 32x2 = 64 clients hitting Solr, right?

@Steve: Thanks for the link to the HttpClient config. I was thinking about
using a thread pool (or better using a PoolingHttpClientManager per the
docs), but it probably won't help since its still being fed one request at
a time.
@Abhishek: my observations agree with what you said. In the past I have had
success with repartition to reduce the partition size especially when
groupBy operations were involved. But I believe an executor should be able
to handle multiple tasks in parallel from what I understand about Akka on
which Spark is built - the worker is essentially an ActorSystem which can
contain multiple Actors, each actor works on a queue of tasks. Within an
Actor everything is sequential, but the ActorSystem is responsible for
farming out tasks it gets to each of its Actors. Although it is possible I
could be generalizing incorrectly from my limited experience with Akka.

Thanks again for all your help. Please let me know if something jumps out
and/or if there is some configuration I should check.

-sujit



On Sun, Aug 2, 2015 at 6:13 PM, Abhishek R. Singh <
abhis...@tetrationanalytics.com> wrote:

> I don't know if (your assertion/expectation that) workers will process
> things (multiple partitions) in parallel is really valid. Or if having more
> partitions than workers will necessarily help (unless you are memory bound
> - so partitions is essentially helping your work size rather than execution
> parallelism).
>
> [Disclaimer: I am no authority on Spark, but wanted to throw my spin based
> my own understanding].
>
> Nothing official about it :)
>
> -abhishek-
>
> On Jul 31, 2015, at 1:03 PM, Sujit Pal <sujitatgt...@gmail.com> wrote:
>
> Hello,
>
> I am trying to run a Spark job that hits an external webservice to get
> back some information. The cluster is 1 master + 4 workers, each worker has
> 60GB RAM and 4 CPUs. The external webservice is a standalone Solr server,
> and is accessed using code similar to that shown below.
>
> def getResults(keyValues: Iterator[(String, Array[String])]):
>>         Iterator[(String, String)] = {
>>     val solr = new HttpSolrClient()
>>     initializeSolrParameters(solr)
>>     keyValues.map(keyValue => (keyValue._1, process(solr, keyValue)))
>> }
>> myRDD.repartition(10)
>
>              .mapPartitions(keyValues => getResults(keyValues))
>>
>
> The mapPartitions does some initialization to the SolrJ client per
> partition and then hits it for each record in the partition via the
> getResults() call.
>
> I repartitioned in the hope that this will result in 10 clients hitting
> Solr simultaneously (I would like to go upto maybe 30-40 simultaneous
> clients if I can). However, I counted the number of open connections using
> "netstat -anp | grep ":8983.*ESTABLISHED" in a loop on the Solr box and
> observed that Solr has a constant 4 clients (ie, equal to the number of
> workers) over the lifetime of the run.
>
> My observation leads me to believe that each worker processes a single
> stream of work sequentially. However, from what I understand about how
> Spark works, each worker should be able to process number of tasks
> parallelly, and that repartition() is a hint for it to do so.
>
> Is there some SparkConf environment variable I should set to increase
> parallelism in these workers, or should I just configure a cluster with
> multiple workers per machine? Or is there something I am doing wrong?
>
> Thank you in advance for any pointers you can provide.
>
> -sujit
>
>

Reply via email to