hi  sujit

Can you spin it with 4 (server)*4 (cores) 16 cores i.e there should be 16
cores in your cluster, try to use same no. of partitions. Also look at the
http://apache-spark-user-list.1001560.n3.nabble.com/No-of-Task-vs-No-of-Executors-td23824.html

On Tue, Aug 4, 2015 at 1:46 AM, Ajay Singal <asinga...@gmail.com> wrote:

> Hi Sujit,
>
>
>
> From experimenting with Spark (and other documentation), my understanding
> is as follows:
>
> 1.       Each application consists of one or more Jobs
>
> 2.       Each Job has one or more Stages
>
> 3.       Each Stage creates one or more Tasks (normally, one Task per
> Partition)
>
> 4.       Master allocates one Executor per Worker (that contains
> Partition) per Application
>
> 5.       The Executor stays up for the lifetime of the Application (and
> dies when the Application ends)
>
> 6.       Each Executor can run multiple Tasks in parallel (normally, the
> parallelism depends on the number of cores per Executor).
>
> 7.       The Scheduler schedules only one Task from each Stage to one
> Executor.
>
> 8.       If there are multiple Stages (from a Job) and these Stages could
> be run asynchronously (i.e., in parallel), one Task from each Stage could
> be scheduled on the same Executor (thus this Executor runs multiple Tasks
> in parallel: see #6 above).
>
>
>
> Of course, there could be many exception/exclusions to what I explained
> above.  I expect that Spark community will confirm or correct my
> observations/understanding above.
>
>
>
> Now, let’s come back to your situation.  You have a cluster of 4 Workers
> with 10 Partitions.  All of these 10 Partitions are distributed among these
> 4 Workers.  Also, from the information provided by you, your Application
> has just one Job with a two Stages (repartition and mapPartition).  The
> mapPartition Stage will have 10 Tasks.  Assuming my
> observations/understanding is correct, by virtue of #7 above, only 4 Tasks
> can be executed in parallel.  The subsequent Jobs will have to wait.
>
>
>
> However, if you had 10 or more Workers, all Tasks would have been executed
> in parallel.  BTW, I believe, you can have multiple Workers on one Physical
> Node.  So, one of the solution to your problem would be to increase the
> number of Workers.
>
>
>
> Having said so, I believe #7 above is the bottleneck.  If there is no good
> reason for keeping this bottleneck, this could be a good area of
> improvement (and needs to be addressed by Spark community).  I will wait
> for the community response, and if needed, I will open a JIRA item.
>
>
>
> I hope it helps.
>
>
>
> Regards,
>
> Ajay
>
> On Mon, Aug 3, 2015 at 1:16 PM, Sujit Pal <sujitatgt...@gmail.com> wrote:
>
>> @Silvio: the mapPartitions instantiates a HttpSolrServer, then for each
>> query string in the partition, sends the query to Solr using SolrJ, and
>> gets back the top N results. It then reformats the result data into one
>> long string and returns the key value pair as (query string, result string).
>>
>> @Igor: Thanks for the parameter suggestions. I will check the
>> --num-executors and if there is a way to set the number of cores/executor
>> with my Databricks admin and update here if I find it, but from the
>> Databricks console, it appears that the number of executors per box is 1.
>> This seems normal though, per the diagram on this page:
>>
>> http://spark.apache.org/docs/latest/cluster-overview.html
>>
>> where it seems that there is 1 executor per box, and each executor can
>> spawn multiple threads to take care of multiple tasks (see bullet #1 copied
>> below).
>>
>>> Each application gets its own executor processes, which stay up for the
>>> duration of the whole application and run tasks in multiple threads. This
>>> has the benefit of isolating applications from each other, on both the
>>> scheduling side (each driver schedules its own tasks) and executor side
>>> (tasks from different applications run in different JVMs).
>>
>>
>> Regarding hitting the max number of requests, thanks for the link. I am
>> using the default client. Just peeked at the Solr code, and the default
>> settings (if no HttpClient instance is supplied in the ctor) is to use
>> DefaultHttpClient (from HttpComponents) whose settings are as follows:
>>
>>>
>>>    - Version: HttpVersion.HTTP_1_1
>>>
>>>
>>>    - ContentCharset: HTTP.DEFAULT_CONTENT_CHARSET
>>>
>>>
>>>    - NoTcpDelay: true
>>>
>>>
>>>    - SocketBufferSize: 8192
>>>
>>>
>>>    - UserAgent: Apache-HttpClient/release (java 1.5)
>>>
>>> In addition, the Solr code sets the following additional config
>> parameters on the DefaultHttpClient.
>>
>>       params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, 128);
>>>       params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, 32);
>>>       params.set(HttpClientUtil.PROP_FOLLOW_REDIRECTS, followRedirects);
>>
>> Since all my connections are coming out of 2 worker boxes, it looks like
>> I could get 32x2 = 64 clients hitting Solr, right?
>>
>> @Steve: Thanks for the link to the HttpClient config. I was thinking
>> about using a thread pool (or better using a PoolingHttpClientManager per
>> the docs), but it probably won't help since its still being fed one request
>> at a time.
>> @Abhishek: my observations agree with what you said. In the past I have
>> had success with repartition to reduce the partition size especially when
>> groupBy operations were involved. But I believe an executor should be able
>> to handle multiple tasks in parallel from what I understand about Akka on
>> which Spark is built - the worker is essentially an ActorSystem which can
>> contain multiple Actors, each actor works on a queue of tasks. Within an
>> Actor everything is sequential, but the ActorSystem is responsible for
>> farming out tasks it gets to each of its Actors. Although it is possible I
>> could be generalizing incorrectly from my limited experience with Akka.
>>
>> Thanks again for all your help. Please let me know if something jumps out
>> and/or if there is some configuration I should check.
>>
>> -sujit
>>
>>
>>
>> On Sun, Aug 2, 2015 at 6:13 PM, Abhishek R. Singh <
>> abhis...@tetrationanalytics.com> wrote:
>>
>>> I don't know if (your assertion/expectation that) workers will process
>>> things (multiple partitions) in parallel is really valid. Or if having more
>>> partitions than workers will necessarily help (unless you are memory bound
>>> - so partitions is essentially helping your work size rather than execution
>>> parallelism).
>>>
>>> [Disclaimer: I am no authority on Spark, but wanted to throw my spin
>>> based my own understanding].
>>>
>>> Nothing official about it :)
>>>
>>> -abhishek-
>>>
>>> On Jul 31, 2015, at 1:03 PM, Sujit Pal <sujitatgt...@gmail.com> wrote:
>>>
>>> Hello,
>>>
>>> I am trying to run a Spark job that hits an external webservice to get
>>> back some information. The cluster is 1 master + 4 workers, each worker has
>>> 60GB RAM and 4 CPUs. The external webservice is a standalone Solr server,
>>> and is accessed using code similar to that shown below.
>>>
>>> def getResults(keyValues: Iterator[(String, Array[String])]):
>>>>         Iterator[(String, String)] = {
>>>>     val solr = new HttpSolrClient()
>>>>     initializeSolrParameters(solr)
>>>>     keyValues.map(keyValue => (keyValue._1, process(solr, keyValue)))
>>>> }
>>>> myRDD.repartition(10)
>>>
>>>              .mapPartitions(keyValues => getResults(keyValues))
>>>>
>>>
>>> The mapPartitions does some initialization to the SolrJ client per
>>> partition and then hits it for each record in the partition via the
>>> getResults() call.
>>>
>>> I repartitioned in the hope that this will result in 10 clients hitting
>>> Solr simultaneously (I would like to go upto maybe 30-40 simultaneous
>>> clients if I can). However, I counted the number of open connections using
>>> "netstat -anp | grep ":8983.*ESTABLISHED" in a loop on the Solr box and
>>> observed that Solr has a constant 4 clients (ie, equal to the number of
>>> workers) over the lifetime of the run.
>>>
>>> My observation leads me to believe that each worker processes a single
>>> stream of work sequentially. However, from what I understand about how
>>> Spark works, each worker should be able to process number of tasks
>>> parallelly, and that repartition() is a hint for it to do so.
>>>
>>> Is there some SparkConf environment variable I should set to increase
>>> parallelism in these workers, or should I just configure a cluster with
>>> multiple workers per machine? Or is there something I am doing wrong?
>>>
>>> Thank you in advance for any pointers you can provide.
>>>
>>> -sujit
>>>
>>>
>>
>


-- 
with Regards
Shahid Ashraf

Reply via email to