One thing I would check is this line:
val fetchedRdd = rdd.map(r => (r.getGroup, r))
how many distinct groups do you ended up with? If there's just one then I
think you might see the behaviour you observe.
Chris
On Wed, Jun 9, 2021 at 4:17 PM Tom Barber wrote:
> Also just to follow up on that slightly, I did also try off the back of
> another comment:
>
> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
> val job = this.job.asInstanceOf[SparklerJob]
>
> val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))
>
> val scoreUpdateRdd: RDD[SolrInputDocument] =
> scoredRdd.repartition(50).map(d => ScoreUpdateSolrTransformer(d))
>
>
> Where I repartitioned that scoredRdd map out of interest, it then triggers
> the FairFetcher function there, instead of in the runJob(), but still on a
> single executor 😞
>
> Tom
>
> On Wed, Jun 9, 2021 at 4:11 PM Tom Barber wrote:
>
>>
>> Okay so what happens is that the crawler reads a bunch of solr data,
>> we're not talking GB's just a list of JSON and turns that into a bunch of
>> RDD's that end up in that flatmap that I linked to first.
>>
>> The fair fetcher is an interface to a pluggable backend that basically
>> takes some of the fields and goes and crawls websites listed in them
>> looking for information. We wrote this code 6 years ago for a DARPA project
>> tracking down criminals on the web. Now I'm reusing it but trying to force
>> it to scale out a bit more.
>>
>> Say I have 4000 urls I want to crawl and a 3 node Spark cluster, I want
>> to push down 1 URL (a few more wont hurt, but crawling 50 urls in parallel
>> on one node makes my cluster sad) to each executor and have it run a crawl,
>> then move on and get another one and so on. That way you're not saturating
>> a node trying to look up all of them and you could add more nodes for
>> greater capacity pretty quickly. Once the website has been captured, you
>> can then "score" it for want of a better term to determine its usefulness,
>> which is where the map is being triggered.
>>
>> In answer to your questions Sean, no action seems triggered until you end
>> up in the score block and the sc.runJob() because thats literally the next
>> line of functionality as Kafka isn't enabled.
>>
>> val fetchedRdd = rdd.map(r => (r.getGroup, r))
>> .groupByKey(m).flatMap({ case (grp, rs) => new FairFetcher(job,
>> rs.iterator, localFetchDelay,
>> FetchFunction, ParseFunction, OutLinkFilterFunction,
>> StatusUpdateSolrTransformer).toSeq })
>> .persist()
>>
>> if (kafkaEnable) {
>> storeContentKafka(kafkaListeners, kafkaTopic.format(jobId), fetchedRdd)
>> }
>> val scoredRdd = score(fetchedRdd)
>>
>>
>> That if block is disabled so the score function runs. Inside of that:
>>
>> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
>> val job = this.job.asInstanceOf[SparklerJob]
>>
>> val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))
>>
>> val scoreUpdateRdd: RDD[SolrInputDocument] = scoredRdd.map(d =>
>> ScoreUpdateSolrTransformer(d))
>> val scoreUpdateFunc = new SolrStatusUpdate(job)
>> sc.runJob(scoreUpdateRdd, scoreUpdateFunc)
>>
>>
>>
>> When its doing stuff in the SparkUI I can see that its waiting on the
>> sc.runJob() line, so thats the execution point.
>>
>>
>> Tom
>>
>> On Wed, Jun 9, 2021 at 3:59 PM Sean Owen wrote:
>>
>>> persist() doesn't even persist by itself - just sets it to be persisted
>>> when it's executed.
>>> key doesn't matter here, nor partitioning, if this code is trying to run
>>> things on the driver inadvertently.
>>> I don't quite grok what the OSS code you linked to is doing, but it's
>>> running some supplied functions very directly and at a low-level with
>>> sc.runJob, which might be part of how this can do something unusual.
>>> How do you trigger any action? what happens after persist()
>>>
>>> On Wed, Jun 9, 2021 at 9:48 AM Tom Barber wrote:
>>>
Thanks Mich,
The key on the first iteration is just a string that says "seed", so it
is indeed on the first crawl the same across all of the groups. Further
iterations would be different, but I'm not there yet. I was under the
impression that a repartition would distribute the tasks. Is that not the
case?
Thanks
Tom
On Wed, Jun 9, 2021 at 3:44 PM Mich Talebzadeh <
mich.talebza...@gmail.com> wrote:
> Hi Tom,
>
> Persist() here simply means persist to memory). That is all. You can
> check UI tab on storage
>
>
> https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence
>
> So I gather the code is stuck from your link in the driver. You stated
> that you tried repartition() but it did not do anything,
>
> Further you stated :
>
> " The key is pretty static in these tests, so I have also tried
> forcing the partition count (50 on a 16 core per node cluster) and also
> repartitioning, but every time all the jobs are scheduled t