Hi Sujit,


>From experimenting with Spark (and other documentation), my understanding
is as follows:

1.       Each application consists of one or more Jobs

2.       Each Job has one or more Stages

3.       Each Stage creates one or more Tasks (normally, one Task per
Partition)

4.       Master allocates one Executor per Worker (that contains Partition)
per Application

5.       The Executor stays up for the lifetime of the Application (and
dies when the Application ends)

6.       Each Executor can run multiple Tasks in parallel (normally, the
parallelism depends on the number of cores per Executor).

7.       The Scheduler schedules only one Task from each Stage to one
Executor.

8.       If there are multiple Stages (from a Job) and these Stages could
be run asynchronously (i.e., in parallel), one Task from each Stage could
be scheduled on the same Executor (thus this Executor runs multiple Tasks
in parallel: see #6 above).



Of course, there could be many exception/exclusions to what I explained
above.  I expect that Spark community will confirm or correct my
observations/understanding above.



Now, let’s come back to your situation.  You have a cluster of 4 Workers
with 10 Partitions.  All of these 10 Partitions are distributed among these
4 Workers.  Also, from the information provided by you, your Application
has just one Job with a two Stages (repartition and mapPartition).  The
mapPartition Stage will have 10 Tasks.  Assuming my
observations/understanding is correct, by virtue of #7 above, only 4 Tasks
can be executed in parallel.  The subsequent Jobs will have to wait.



However, if you had 10 or more Workers, all Tasks would have been executed
in parallel.  BTW, I believe, you can have multiple Workers on one Physical
Node.  So, one of the solution to your problem would be to increase the
number of Workers.



Having said so, I believe #7 above is the bottleneck.  If there is no good
reason for keeping this bottleneck, this could be a good area of
improvement (and needs to be addressed by Spark community).  I will wait
for the community response, and if needed, I will open a JIRA item.



I hope it helps.



Regards,

Ajay

On Mon, Aug 3, 2015 at 1:16 PM, Sujit Pal <sujitatgt...@gmail.com> wrote:

> @Silvio: the mapPartitions instantiates a HttpSolrServer, then for each
> query string in the partition, sends the query to Solr using SolrJ, and
> gets back the top N results. It then reformats the result data into one
> long string and returns the key value pair as (query string, result string).
>
> @Igor: Thanks for the parameter suggestions. I will check the
> --num-executors and if there is a way to set the number of cores/executor
> with my Databricks admin and update here if I find it, but from the
> Databricks console, it appears that the number of executors per box is 1.
> This seems normal though, per the diagram on this page:
>
> http://spark.apache.org/docs/latest/cluster-overview.html
>
> where it seems that there is 1 executor per box, and each executor can
> spawn multiple threads to take care of multiple tasks (see bullet #1 copied
> below).
>
>> Each application gets its own executor processes, which stay up for the
>> duration of the whole application and run tasks in multiple threads. This
>> has the benefit of isolating applications from each other, on both the
>> scheduling side (each driver schedules its own tasks) and executor side
>> (tasks from different applications run in different JVMs).
>
>
> Regarding hitting the max number of requests, thanks for the link. I am
> using the default client. Just peeked at the Solr code, and the default
> settings (if no HttpClient instance is supplied in the ctor) is to use
> DefaultHttpClient (from HttpComponents) whose settings are as follows:
>
>>
>>    - Version: HttpVersion.HTTP_1_1
>>
>>
>>    - ContentCharset: HTTP.DEFAULT_CONTENT_CHARSET
>>
>>
>>    - NoTcpDelay: true
>>
>>
>>    - SocketBufferSize: 8192
>>
>>
>>    - UserAgent: Apache-HttpClient/release (java 1.5)
>>
>> In addition, the Solr code sets the following additional config
> parameters on the DefaultHttpClient.
>
>       params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, 128);
>>       params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, 32);
>>       params.set(HttpClientUtil.PROP_FOLLOW_REDIRECTS, followRedirects);
>
> Since all my connections are coming out of 2 worker boxes, it looks like I
> could get 32x2 = 64 clients hitting Solr, right?
>
> @Steve: Thanks for the link to the HttpClient config. I was thinking about
> using a thread pool (or better using a PoolingHttpClientManager per the
> docs), but it probably won't help since its still being fed one request at
> a time.
> @Abhishek: my observations agree with what you said. In the past I have
> had success with repartition to reduce the partition size especially when
> groupBy operations were involved. But I believe an executor should be able
> to handle multiple tasks in parallel from what I understand about Akka on
> which Spark is built - the worker is essentially an ActorSystem which can
> contain multiple Actors, each actor works on a queue of tasks. Within an
> Actor everything is sequential, but the ActorSystem is responsible for
> farming out tasks it gets to each of its Actors. Although it is possible I
> could be generalizing incorrectly from my limited experience with Akka.
>
> Thanks again for all your help. Please let me know if something jumps out
> and/or if there is some configuration I should check.
>
> -sujit
>
>
>
> On Sun, Aug 2, 2015 at 6:13 PM, Abhishek R. Singh <
> abhis...@tetrationanalytics.com> wrote:
>
>> I don't know if (your assertion/expectation that) workers will process
>> things (multiple partitions) in parallel is really valid. Or if having more
>> partitions than workers will necessarily help (unless you are memory bound
>> - so partitions is essentially helping your work size rather than execution
>> parallelism).
>>
>> [Disclaimer: I am no authority on Spark, but wanted to throw my spin
>> based my own understanding].
>>
>> Nothing official about it :)
>>
>> -abhishek-
>>
>> On Jul 31, 2015, at 1:03 PM, Sujit Pal <sujitatgt...@gmail.com> wrote:
>>
>> Hello,
>>
>> I am trying to run a Spark job that hits an external webservice to get
>> back some information. The cluster is 1 master + 4 workers, each worker has
>> 60GB RAM and 4 CPUs. The external webservice is a standalone Solr server,
>> and is accessed using code similar to that shown below.
>>
>> def getResults(keyValues: Iterator[(String, Array[String])]):
>>>         Iterator[(String, String)] = {
>>>     val solr = new HttpSolrClient()
>>>     initializeSolrParameters(solr)
>>>     keyValues.map(keyValue => (keyValue._1, process(solr, keyValue)))
>>> }
>>> myRDD.repartition(10)
>>
>>              .mapPartitions(keyValues => getResults(keyValues))
>>>
>>
>> The mapPartitions does some initialization to the SolrJ client per
>> partition and then hits it for each record in the partition via the
>> getResults() call.
>>
>> I repartitioned in the hope that this will result in 10 clients hitting
>> Solr simultaneously (I would like to go upto maybe 30-40 simultaneous
>> clients if I can). However, I counted the number of open connections using
>> "netstat -anp | grep ":8983.*ESTABLISHED" in a loop on the Solr box and
>> observed that Solr has a constant 4 clients (ie, equal to the number of
>> workers) over the lifetime of the run.
>>
>> My observation leads me to believe that each worker processes a single
>> stream of work sequentially. However, from what I understand about how
>> Spark works, each worker should be able to process number of tasks
>> parallelly, and that repartition() is a hint for it to do so.
>>
>> Is there some SparkConf environment variable I should set to increase
>> parallelism in these workers, or should I just configure a cluster with
>> multiple workers per machine? Or is there something I am doing wrong?
>>
>> Thank you in advance for any pointers you can provide.
>>
>> -sujit
>>
>>
>

Reply via email to