Like I said In my previous email, can you try this and let me know how many
tasks you see?

val repRdd = scoredRdd.repartition(50).cache()
Then map operation on repRdd here.

I’ve done similar map operations in the past and this works.


On Wed, Jun 9, 2021 at 11:17 AM Tom Barber <> wrote:

> Also just to follow up on that slightly, I did also try off the back of
> another comment:
> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
>   val job = this.job.asInstanceOf[SparklerJob]
>   val scoredRdd = => ScoreFunction(job, d))
>   val scoreUpdateRdd: RDD[SolrInputDocument] = 
> scoredRdd.repartition(50).map(d => ScoreUpdateSolrTransformer(d))
> Where I repartitioned that scoredRdd map out of interest, it then triggers
> the FairFetcher function there, instead of in the runJob(), but still on a
> single executor 😞
> Tom
> On Wed, Jun 9, 2021 at 4:11 PM Tom Barber <> wrote:
>> Okay so what happens is that the crawler reads a bunch of solr data,
>> we're not talking GB's just a list of JSON and turns that into a bunch of
>> RDD's that end up in that flatmap that I linked to first.
>> The fair fetcher is an interface to a pluggable backend that basically
>> takes some of the fields and goes and crawls websites listed in them
>> looking for information. We wrote this code 6 years ago for a DARPA project
>> tracking down criminals on the web. Now I'm reusing it but trying to force
>> it to scale out a bit more.
>> Say I have 4000 urls I want to crawl and a 3 node Spark cluster, I want
>> to push down 1 URL (a few more wont hurt, but crawling 50 urls in parallel
>> on one node makes my cluster sad) to each executor and have it run a crawl,
>> then move on and get another one and so on. That way you're not saturating
>> a node trying to look up all of them and you could add more nodes for
>> greater capacity pretty quickly. Once the website has been captured, you
>> can then "score" it for want of a better term to determine its usefulness,
>> which is where the map is being triggered.
>> In answer to your questions Sean, no action seems triggered until you end
>> up in the score block and the sc.runJob() because thats literally the next
>> line of functionality as Kafka isn't enabled.
>> val fetchedRdd = => (r.getGroup, r))
>>   .groupByKey(m).flatMap({ case (grp, rs) => new FairFetcher(job, 
>> rs.iterator, localFetchDelay,
>>     FetchFunction, ParseFunction, OutLinkFilterFunction, 
>> StatusUpdateSolrTransformer).toSeq })
>>   .persist()
>> if (kafkaEnable) {
>>   storeContentKafka(kafkaListeners, kafkaTopic.format(jobId), fetchedRdd)
>> }
>> val scoredRdd = score(fetchedRdd)
>> That if block is disabled so the score function runs. Inside of that:
>> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
>>   val job = this.job.asInstanceOf[SparklerJob]
>>   val scoredRdd = => ScoreFunction(job, d))
>>   val scoreUpdateRdd: RDD[SolrInputDocument] = => 
>> ScoreUpdateSolrTransformer(d))
>>   val scoreUpdateFunc = new SolrStatusUpdate(job)
>>   sc.runJob(scoreUpdateRdd, scoreUpdateFunc)
>> ....
>> When its doing stuff in the SparkUI I can see that its waiting on the
>> sc.runJob() line, so thats the execution point.
>> Tom
>> On Wed, Jun 9, 2021 at 3:59 PM Sean Owen <> wrote:
>>> persist() doesn't even persist by itself - just sets it to be persisted
>>> when it's executed.
>>> key doesn't matter here, nor partitioning, if this code is trying to run
>>> things on the driver inadvertently.
>>> I don't quite grok what the OSS code you linked to is doing, but it's
>>> running some supplied functions very directly and at a low-level with
>>> sc.runJob, which might be part of how this can do something unusual.
>>> How do you trigger any action? what happens after persist()
>>> On Wed, Jun 9, 2021 at 9:48 AM Tom Barber <> wrote:
>>>> Thanks Mich,
>>>> The key on the first iteration is just a string that says "seed", so it
>>>> is indeed on the first crawl the same across all of the groups. Further
>>>> iterations would be different, but I'm not there yet. I was under the
>>>> impression that a repartition would distribute the tasks. Is that not the
>>>> case?
>>>> Thanks
>>>> Tom
>>>> On Wed, Jun 9, 2021 at 3:44 PM Mich Talebzadeh <
>>>>> wrote:
>>>>> Hi Tom,
>>>>> Persist() here simply means persist to memory). That is all. You can
>>>>> check UI tab on storage
>>>>> So I gather the code is stuck from your link in the driver. You stated
>>>>> that you tried repartition() but it did not do anything,
>>>>> Further you stated :
>>>>> " The key is pretty static in these tests, so I have also tried
>>>>> forcing the partition count (50 on a 16 core per node cluster) and also
>>>>> repartitioning, but every time all the jobs are scheduled to run on one
>>>>> node."
>>>>> What is the key?
>>>>> HTH
>>>>> On Wed, 9 Jun 2021 at 15:23, Tom Barber <> wrote:
>>>>>> Interesting Sean thanks for that insight, I wasn't aware of that
>>>>>> fact, I assume the .persist() at the end of that line doesn't do it?
>>>>>> I believe, looking at the output in the SparkUI, it gets to
>>>>>> and calls the context runJob.
>>>>>> On Wed, Jun 9, 2021 at 2:07 PM Sean Owen <> wrote:
>>>>>>> All these configurations don't matter at all if this is executing on
>>>>>>> the driver.
>>>>>>> Returning an Iterator in flatMap is fine though it 'delays'
>>>>>>> execution until that iterator is evaluated by something, which is 
>>>>>>> normally
>>>>>>> fine.
>>>>>>> Does creating this FairFetcher do anything by itself? you're just
>>>>>>> returning an iterator that creates them here.
>>>>>>> How do you actually trigger an action here? the code snippet itself
>>>>>>> doesn't trigger anything.
>>>>>>> I think we need more info about what else is happening in the code.
>>>>>>> On Wed, Jun 9, 2021 at 6:30 AM Tom Barber <> wrote:
>>>>>>>> Yeah so if I update the FairFetcher to return a seq it makes no
>>>>>>>> real difference.
>>>>>>>> Here's an image of what I'm seeing just for reference:
>>>>>>>> Because this is databricks I don't have an actual spark submit
>>>>>>>> command but it looks like this:
>>>>>>>> curl xxxx -d
>>>>>>>> '{"new_cluster":{"spark_conf":{"spark.executor.extraJavaOptions":"-Dpf4j.pluginsDir=/dbfs/FileStore/bcf/plugins/",
>>>>>>>> "spark.task.cpus":"16"},
>>>>>>>> "spark_version":"8.3.x-scala2.12","aws_attributes":{"availability":"SPOT_WITH_FALLBACK","first_on_demand":1,"zone_id":"us-west-2c"},"node_type_id":"c5d.4xlarge","init_scripts":[{"dbfs":{"destination":"dbfs:/FileStore/"}}],"num_workers":3},"spark_submit_task":{"parameters":["--driver-java-options",
>>>>>>>> "-Dpf4j.pluginsDir=/dbfs/FileStore/bcf/plugins/", "--driver-memory", 
>>>>>>>> "10g",
>>>>>>>> "--executor-memory", "10g",
>>>>>>>> "--class","edu.usc.irds.sparkler.Main","dbfs:/FileStore/bcf/sparkler7.jar","crawl","-id","mytestcrawl11",
>>>>>>>> "-tn", "5000", "-co",
>>>>>>>> "{\"\":[\"urlfilter-regex\",\"urlfilter-samehost\",\"fetcher-chrome\"],\"plugins\":{\"\":{\"chrome.dns\":\"local\"}}}"]},"run_name":"testsubmi3t"}'
>>>>>>>> I deliberately pinned spark.task.cpus to 16 to stop it swamping the
>>>>>>>> driver trying to run all the tasks in parallel on the one node, but 
>>>>>>>> again
>>>>>>>> I've got 50 tasks queued up all running on the single node.
>>>>>>>> On Wed, Jun 9, 2021 at 12:01 PM Tom Barber <>
>>>>>>>> wrote:
>>>>>>>>> I've not run it yet, but I've stuck a toSeq on the end, but in
>>>>>>>>> reality a Seq just inherits Iterator, right?
>>>>>>>>> Flatmap does return a RDD[CrawlData] unless my IDE is lying to me.
>>>>>>>>> Tom
>>>>>>>>> On Wed, Jun 9, 2021 at 10:54 AM Tom Barber <>
>>>>>>>>> wrote:
>>>>>>>>>> Interesting Jayesh, thanks, I will test.
>>>>>>>>>> All this code is inherited and it runs, but I don't think its
>>>>>>>>>> been tested in a distributed context for about 5 years, but yeah I 
>>>>>>>>>> need to
>>>>>>>>>> get this pushed down, so I'm happy to try anything! :)
>>>>>>>>>> Tom
>>>>>>>>>> On Wed, Jun 9, 2021 at 3:37 AM Lalwani, Jayesh <
>>>>>>>>>>> wrote:
>>>>>>>>>>> flatMap is supposed to return Seq, not Iterator. You are
>>>>>>>>>>> returning a class that implements Iterator. I have a hunch that's 
>>>>>>>>>>> what's
>>>>>>>>>>> causing the confusion. flatMap is returning a RDD[FairFetcher] not
>>>>>>>>>>> RDD[CrawlData]. Do you intend it to be RDD[CrawlData]? You might 
>>>>>>>>>>> want to
>>>>>>>>>>> call toSeq on FairFetcher.
>>>>>>>>>>> On 6/8/21, 10:10 PM, "Tom Barber" <>
>>>>>>>>>>> wrote:
>>>>>>>>>>>     CAUTION: This email originated from outside of the
>>>>>>>>>>> organization. Do not click links or open attachments unless you can 
>>>>>>>>>>> confirm
>>>>>>>>>>> the sender and know the content is safe.
>>>>>>>>>>>     For anyone interested here's the execution logs up until the
>>>>>>>>>>> point where it actually kicks off the workload in question:
>>>>>>>>>>>     On 2021/06/09 01:52:39, Tom Barber <>
>>>>>>>>>>> wrote:
>>>>>>>>>>>     > ExecutorID says driver, and looking at the IP addresses
>>>>>>>>>>> its running on its not any of the worker ip's.
>>>>>>>>>>>     >
>>>>>>>>>>>     > I forcibly told it to create 50, but they'd all end up
>>>>>>>>>>> running in the same place.
>>>>>>>>>>>     >
>>>>>>>>>>>     > Working on some other ideas, I set spark.task.cpus to 16
>>>>>>>>>>> to match the nodes whilst still forcing it to 50 partitions
>>>>>>>>>>>     >
>>>>>>>>>>>     > val m = 50
>>>>>>>>>>>     >
>>>>>>>>>>>     > val fetchedRdd = => (r.getGroup, r))
>>>>>>>>>>>     >         .groupByKey(m).flatMap({ case (grp, rs) => new
>>>>>>>>>>> FairFetcher(job, rs.iterator, localFetchDelay,
>>>>>>>>>>>     >           FetchFunction, ParseFunction,
>>>>>>>>>>> OutLinkFilterFunction, StatusUpdateSolrTransformer) })
>>>>>>>>>>>     >         .persist()
>>>>>>>>>>>     >
>>>>>>>>>>>     > that sort of thing. But still the tasks are pinned to the
>>>>>>>>>>> driver executor and none of the workers, so I no longer saturate 
>>>>>>>>>>> the master
>>>>>>>>>>> node, but I also have 3 workers just sat there doing nothing.
>>>>>>>>>>>     >
>>>>>>>>>>>     > On 2021/06/09 01:26:50, Sean Owen <>
>>>>>>>>>>> wrote:
>>>>>>>>>>>     > > Are you sure it's on the driver? or just 1 executor?
>>>>>>>>>>>     > > how many partitions does the groupByKey produce? that
>>>>>>>>>>> would limit your
>>>>>>>>>>>     > > parallelism no matter what if it's a small number.
>>>>>>>>>>>     > >
>>>>>>>>>>>     > > On Tue, Jun 8, 2021 at 8:07 PM Tom Barber <
>>>>>>>>>>>> wrote:
>>>>>>>>>>>     > >
>>>>>>>>>>>     > > > Hi folks,
>>>>>>>>>>>     > > >
>>>>>>>>>>>     > > > Hopefully someone with more Spark experience than me
>>>>>>>>>>> can explain this a
>>>>>>>>>>>     > > > bit.
>>>>>>>>>>>     > > >
>>>>>>>>>>>     > > > I dont' know if this is possible, impossible or just
>>>>>>>>>>> an old design that
>>>>>>>>>>>     > > > could be better.
>>>>>>>>>>>     > > >
>>>>>>>>>>>     > > > I'm running Sparkler as a spark-submit job on a
>>>>>>>>>>> databricks spark cluster
>>>>>>>>>>>     > > > and its getting to this point in the code(
>>>>>>>>>>>     > > >
>>>>>>>>>>>     > > > )
>>>>>>>>>>>     > > >
>>>>>>>>>>>     > > > val fetchedRdd = => (r.getGroup, r))
>>>>>>>>>>>     > > >         .groupByKey()
>>>>>>>>>>>     > > >         .flatMap({ case (grp, rs) => new
>>>>>>>>>>> FairFetcher(job, rs.iterator,
>>>>>>>>>>>     > > > localFetchDelay,
>>>>>>>>>>>     > > >           FetchFunction, ParseFunction,
>>>>>>>>>>> OutLinkFilterFunction,
>>>>>>>>>>>     > > > StatusUpdateSolrTransformer) })
>>>>>>>>>>>     > > >         .persist()
>>>>>>>>>>>     > > >
>>>>>>>>>>>     > > > This basically takes the RDD and then runs a web based
>>>>>>>>>>> crawl over each RDD
>>>>>>>>>>>     > > > and returns the results. But when Spark executes it,
>>>>>>>>>>> it runs all the crawls
>>>>>>>>>>>     > > > on the driver node and doesn't distribute them.
>>>>>>>>>>>     > > >
>>>>>>>>>>>     > > > The key is pretty static in these tests, so I have
>>>>>>>>>>> also tried forcing the
>>>>>>>>>>>     > > > partition count (50 on a 16 core per node cluster) and
>>>>>>>>>>> also repartitioning,
>>>>>>>>>>>     > > > but every time all the jobs are scheduled to run on
>>>>>>>>>>> one node.
>>>>>>>>>>>     > > >
>>>>>>>>>>>     > > > What can I do better to distribute the tasks? Because
>>>>>>>>>>> the processing of
>>>>>>>>>>>     > > > the data in the RDD isn't the bottleneck, the fetching
>>>>>>>>>>> of the crawl data is
>>>>>>>>>>>     > > > the bottleneck, but that happens after the code has
>>>>>>>>>>> been assigned to a node.
>>>>>>>>>>>     > > >
>>>>>>>>>>>     > > > Thanks
>>>>>>>>>>>     > > >
>>>>>>>>>>>     > > > Tom
>>>>>>>>>>>     > > >
>>>>>>>>>>>     > > >
>>>>>>>>>>>     > > >
>>>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>>>>     > > > To unsubscribe e-mail:
>>>>>>>>>>>     > > >
>>>>>>>>>>>     > > >
>>>>>>>>>>>     > >
>>>>>>>>>>>     >
>>>>>>>>>>>     >
>>>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>>>>     > To unsubscribe e-mail:
>>>>>>>>>>>     >
>>>>>>>>>>>     >
>>>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>>>>     To unsubscribe e-mail:
