Re: Distributing a FlatMap across a Spark Cluster

2021-06-09 Thread Chris Martin
Hmm then my guesses are (in order of decreasing probability:

* Whatever class makes up fetchedRdd (MemexDeepCrawlDbRDD?) isn't
compatible with the lastest spark release.
* You've got 16 threads per task on a 16 core machine.  Should be fine, but
I wonder if it's confusing things as you don't also set
spark.executor.cores and Databricks might also default that to 1.
* There's some custom partitioner in play which is causing everything to go
to the same partition.
* The group keys are all hashing to the same value (it's difficult to see
how this would be the case if the group keys are genuinely different, but
maybe there's something else going on).

My hints:

1. Make sure you're using a recent version of sparkler
2. Try repartition with a custom partitioner that you know will end things
to different partitions
3. Try either removing "spark.task.cpus":"16"  or setting
spark.executor.cores to 1.
4. print out the group keys and see if there's any weird pattern to them.
5. See if the same thing happens in spark local.

If you have a reproducible example you can post publically then I'm happy
to  take a look.

Chris

On Wed, Jun 9, 2021 at 5:17 PM Tom Barber  wrote:

> Yeah to test that I just set the group key to the ID in the record which
> is a solr supplied UUID, which means effectively you end up with 4000
> groups now.
>
> On Wed, Jun 9, 2021 at 5:13 PM Chris Martin  wrote:
>
>> One thing I would check is this line:
>>
>> val fetchedRdd = rdd.map(r => (r.getGroup, r))
>>
>> how many distinct groups do you ended up with?  If there's just one then
>> I think you might see the behaviour you observe.
>>
>> Chris
>>
>>
>> On Wed, Jun 9, 2021 at 4:17 PM Tom Barber  wrote:
>>
>>> Also just to follow up on that slightly, I did also try off the back of
>>> another comment:
>>>
>>> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
>>>   val job = this.job.asInstanceOf[SparklerJob]
>>>
>>>   val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))
>>>
>>>   val scoreUpdateRdd: RDD[SolrInputDocument] = 
>>> scoredRdd.repartition(50).map(d => ScoreUpdateSolrTransformer(d))
>>>
>>>
>>> Where I repartitioned that scoredRdd map out of interest, it then
>>> triggers the FairFetcher function there, instead of in the runJob(), but
>>> still on a single executor 😞
>>>
>>> Tom
>>>
>>> On Wed, Jun 9, 2021 at 4:11 PM Tom Barber  wrote:
>>>
>>>>
>>>> Okay so what happens is that the crawler reads a bunch of solr data,
>>>> we're not talking GB's just a list of JSON and turns that into a bunch of
>>>> RDD's that end up in that flatmap that I linked to first.
>>>>
>>>> The fair fetcher is an interface to a pluggable backend that basically
>>>> takes some of the fields and goes and crawls websites listed in them
>>>> looking for information. We wrote this code 6 years ago for a DARPA project
>>>> tracking down criminals on the web. Now I'm reusing it but trying to force
>>>> it to scale out a bit more.
>>>>
>>>> Say I have 4000 urls I want to crawl and a 3 node Spark cluster, I want
>>>> to push down 1 URL (a few more wont hurt, but crawling 50 urls in parallel
>>>> on one node makes my cluster sad) to each executor and have it run a crawl,
>>>> then move on and get another one and so on. That way you're not saturating
>>>> a node trying to look up all of them and you could add more nodes for
>>>> greater capacity pretty quickly. Once the website has been captured, you
>>>> can then "score" it for want of a better term to determine its usefulness,
>>>> which is where the map is being triggered.
>>>>
>>>> In answer to your questions Sean, no action seems triggered until you
>>>> end up in the score block and the sc.runJob() because thats literally the
>>>> next line of functionality as Kafka isn't enabled.
>>>>
>>>> val fetchedRdd = rdd.map(r => (r.getGroup, r))
>>>>   .groupByKey(m).flatMap({ case (grp, rs) => new FairFetcher(job, 
>>>> rs.iterator, localFetchDelay,
>>>> FetchFunction, ParseFunction, OutLinkFilterFunction, 
>>>> StatusUpdateSolrTransformer).toSeq })
>>>>   .persist()
>>>>
>>>> if (kafkaEnable) {
>>>>   storeContentKafka(kafkaListeners, kafkaTopic.format(jobId), fetchedRdd)
>>>> }
>>>> val sc

Re: Distributing a FlatMap across a Spark Cluster

2021-06-09 Thread Chris Martin
One thing I would check is this line:

val fetchedRdd = rdd.map(r => (r.getGroup, r))

how many distinct groups do you ended up with?  If there's just one then I
think you might see the behaviour you observe.

Chris


On Wed, Jun 9, 2021 at 4:17 PM Tom Barber  wrote:

> Also just to follow up on that slightly, I did also try off the back of
> another comment:
>
> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
>   val job = this.job.asInstanceOf[SparklerJob]
>
>   val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))
>
>   val scoreUpdateRdd: RDD[SolrInputDocument] = 
> scoredRdd.repartition(50).map(d => ScoreUpdateSolrTransformer(d))
>
>
> Where I repartitioned that scoredRdd map out of interest, it then triggers
> the FairFetcher function there, instead of in the runJob(), but still on a
> single executor 😞
>
> Tom
>
> On Wed, Jun 9, 2021 at 4:11 PM Tom Barber  wrote:
>
>>
>> Okay so what happens is that the crawler reads a bunch of solr data,
>> we're not talking GB's just a list of JSON and turns that into a bunch of
>> RDD's that end up in that flatmap that I linked to first.
>>
>> The fair fetcher is an interface to a pluggable backend that basically
>> takes some of the fields and goes and crawls websites listed in them
>> looking for information. We wrote this code 6 years ago for a DARPA project
>> tracking down criminals on the web. Now I'm reusing it but trying to force
>> it to scale out a bit more.
>>
>> Say I have 4000 urls I want to crawl and a 3 node Spark cluster, I want
>> to push down 1 URL (a few more wont hurt, but crawling 50 urls in parallel
>> on one node makes my cluster sad) to each executor and have it run a crawl,
>> then move on and get another one and so on. That way you're not saturating
>> a node trying to look up all of them and you could add more nodes for
>> greater capacity pretty quickly. Once the website has been captured, you
>> can then "score" it for want of a better term to determine its usefulness,
>> which is where the map is being triggered.
>>
>> In answer to your questions Sean, no action seems triggered until you end
>> up in the score block and the sc.runJob() because thats literally the next
>> line of functionality as Kafka isn't enabled.
>>
>> val fetchedRdd = rdd.map(r => (r.getGroup, r))
>>   .groupByKey(m).flatMap({ case (grp, rs) => new FairFetcher(job, 
>> rs.iterator, localFetchDelay,
>> FetchFunction, ParseFunction, OutLinkFilterFunction, 
>> StatusUpdateSolrTransformer).toSeq })
>>   .persist()
>>
>> if (kafkaEnable) {
>>   storeContentKafka(kafkaListeners, kafkaTopic.format(jobId), fetchedRdd)
>> }
>> val scoredRdd = score(fetchedRdd)
>>
>>
>> That if block is disabled so the score function runs. Inside of that:
>>
>> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
>>   val job = this.job.asInstanceOf[SparklerJob]
>>
>>   val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))
>>
>>   val scoreUpdateRdd: RDD[SolrInputDocument] = scoredRdd.map(d => 
>> ScoreUpdateSolrTransformer(d))
>>   val scoreUpdateFunc = new SolrStatusUpdate(job)
>>   sc.runJob(scoreUpdateRdd, scoreUpdateFunc)
>> 
>>
>>
>> When its doing stuff in the SparkUI I can see that its waiting on the
>> sc.runJob() line, so thats the execution point.
>>
>>
>> Tom
>>
>> On Wed, Jun 9, 2021 at 3:59 PM Sean Owen  wrote:
>>
>>> persist() doesn't even persist by itself - just sets it to be persisted
>>> when it's executed.
>>> key doesn't matter here, nor partitioning, if this code is trying to run
>>> things on the driver inadvertently.
>>> I don't quite grok what the OSS code you linked to is doing, but it's
>>> running some supplied functions very directly and at a low-level with
>>> sc.runJob, which might be part of how this can do something unusual.
>>> How do you trigger any action? what happens after persist()
>>>
>>> On Wed, Jun 9, 2021 at 9:48 AM Tom Barber  wrote:
>>>
 Thanks Mich,

 The key on the first iteration is just a string that says "seed", so it
 is indeed on the first crawl the same across all of the groups. Further
 iterations would be different, but I'm not there yet. I was under the
 impression that a repartition would distribute the tasks. Is that not the
 case?

 Thanks

 Tom

 On Wed, Jun 9, 2021 at 3:44 PM Mich Talebzadeh <
 mich.talebza...@gmail.com> wrote:

> Hi Tom,
>
> Persist() here simply means persist to memory). That is all. You can
> check UI tab on storage
>
>
> https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence
>
> So I gather the code is stuck from your link in the driver. You stated
> that you tried repartition() but it did not do anything,
>
> Further you stated :
>
> " The key is pretty static in these tests, so I have also tried
> forcing the partition count (50 on a 16 core per node cluster) and also
> repartitioning, but every time all the jobs are scheduled t