Like I said In my previous email, can you try this and let me know how many tasks you see?
val repRdd = scoredRdd.repartition(50).cache() repRdd.take(1) Then map operation on repRdd here. I’ve done similar map operations in the past and this works. Thanks. On Wed, Jun 9, 2021 at 11:17 AM Tom Barber <t...@spicule.co.uk> wrote: > Also just to follow up on that slightly, I did also try off the back of > another comment: > > def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = { > val job = this.job.asInstanceOf[SparklerJob] > > val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d)) > > val scoreUpdateRdd: RDD[SolrInputDocument] = > scoredRdd.repartition(50).map(d => ScoreUpdateSolrTransformer(d)) > > > Where I repartitioned that scoredRdd map out of interest, it then triggers > the FairFetcher function there, instead of in the runJob(), but still on a > single executor 😞 > > Tom > > On Wed, Jun 9, 2021 at 4:11 PM Tom Barber <t...@spicule.co.uk> wrote: > >> >> Okay so what happens is that the crawler reads a bunch of solr data, >> we're not talking GB's just a list of JSON and turns that into a bunch of >> RDD's that end up in that flatmap that I linked to first. >> >> The fair fetcher is an interface to a pluggable backend that basically >> takes some of the fields and goes and crawls websites listed in them >> looking for information. We wrote this code 6 years ago for a DARPA project >> tracking down criminals on the web. Now I'm reusing it but trying to force >> it to scale out a bit more. >> >> Say I have 4000 urls I want to crawl and a 3 node Spark cluster, I want >> to push down 1 URL (a few more wont hurt, but crawling 50 urls in parallel >> on one node makes my cluster sad) to each executor and have it run a crawl, >> then move on and get another one and so on. That way you're not saturating >> a node trying to look up all of them and you could add more nodes for >> greater capacity pretty quickly. Once the website has been captured, you >> can then "score" it for want of a better term to determine its usefulness, >> which is where the map is being triggered. >> >> In answer to your questions Sean, no action seems triggered until you end >> up in the score block and the sc.runJob() because thats literally the next >> line of functionality as Kafka isn't enabled. >> >> val fetchedRdd = rdd.map(r => (r.getGroup, r)) >> .groupByKey(m).flatMap({ case (grp, rs) => new FairFetcher(job, >> rs.iterator, localFetchDelay, >> FetchFunction, ParseFunction, OutLinkFilterFunction, >> StatusUpdateSolrTransformer).toSeq }) >> .persist() >> >> if (kafkaEnable) { >> storeContentKafka(kafkaListeners, kafkaTopic.format(jobId), fetchedRdd) >> } >> val scoredRdd = score(fetchedRdd) >> >> >> That if block is disabled so the score function runs. Inside of that: >> >> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = { >> val job = this.job.asInstanceOf[SparklerJob] >> >> val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d)) >> >> val scoreUpdateRdd: RDD[SolrInputDocument] = scoredRdd.map(d => >> ScoreUpdateSolrTransformer(d)) >> val scoreUpdateFunc = new SolrStatusUpdate(job) >> sc.runJob(scoreUpdateRdd, scoreUpdateFunc) >> .... >> >> >> When its doing stuff in the SparkUI I can see that its waiting on the >> sc.runJob() line, so thats the execution point. >> >> >> Tom >> >> On Wed, Jun 9, 2021 at 3:59 PM Sean Owen <sro...@gmail.com> wrote: >> >>> persist() doesn't even persist by itself - just sets it to be persisted >>> when it's executed. >>> key doesn't matter here, nor partitioning, if this code is trying to run >>> things on the driver inadvertently. >>> I don't quite grok what the OSS code you linked to is doing, but it's >>> running some supplied functions very directly and at a low-level with >>> sc.runJob, which might be part of how this can do something unusual. >>> How do you trigger any action? what happens after persist() >>> >>> On Wed, Jun 9, 2021 at 9:48 AM Tom Barber <t...@spicule.co.uk> wrote: >>> >>>> Thanks Mich, >>>> >>>> The key on the first iteration is just a string that says "seed", so it >>>> is indeed on the first crawl the same across all of the groups. Further >>>> iterations would be different, but I'm not there yet. I was under the >>>> impression that a repartition would distribute the tasks. Is that not the >>>> case? >>>> >>>> Thanks >>>> >>>> Tom >>>> >>>> On Wed, Jun 9, 2021 at 3:44 PM Mich Talebzadeh < >>>> mich.talebza...@gmail.com> wrote: >>>> >>>>> Hi Tom, >>>>> >>>>> Persist() here simply means persist to memory). That is all. You can >>>>> check UI tab on storage >>>>> >>>>> >>>>> https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence >>>>> >>>>> So I gather the code is stuck from your link in the driver. You stated >>>>> that you tried repartition() but it did not do anything, >>>>> >>>>> Further you stated : >>>>> >>>>> " The key is pretty static in these tests, so I have also tried >>>>> forcing the partition count (50 on a 16 core per node cluster) and also >>>>> repartitioning, but every time all the jobs are scheduled to run on one >>>>> node." >>>>> >>>>> >>>>> What is the key? >>>>> >>>>> >>>>> HTH >>>>> >>>>> >>>>> view my Linkedin profile >>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>>> >>>>> >>>>> >>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>>>> any loss, damage or destruction of data or any other property which may >>>>> arise from relying on this email's technical content is explicitly >>>>> disclaimed. The author will in no case be liable for any monetary damages >>>>> arising from such loss, damage or destruction. >>>>> >>>>> >>>>> >>>>> >>>>> On Wed, 9 Jun 2021 at 15:23, Tom Barber <t...@spicule.co.uk> wrote: >>>>> >>>>>> Interesting Sean thanks for that insight, I wasn't aware of that >>>>>> fact, I assume the .persist() at the end of that line doesn't do it? >>>>>> >>>>>> I believe, looking at the output in the SparkUI, it gets to >>>>>> https://github.com/USCDataScience/sparkler/blob/master/sparkler-core/sparkler-app/src/main/scala/edu/usc/irds/sparkler/pipeline/Crawler.scala#L254 >>>>>> and calls the context runJob. >>>>>> >>>>>> On Wed, Jun 9, 2021 at 2:07 PM Sean Owen <sro...@gmail.com> wrote: >>>>>> >>>>>>> All these configurations don't matter at all if this is executing on >>>>>>> the driver. >>>>>>> Returning an Iterator in flatMap is fine though it 'delays' >>>>>>> execution until that iterator is evaluated by something, which is >>>>>>> normally >>>>>>> fine. >>>>>>> Does creating this FairFetcher do anything by itself? you're just >>>>>>> returning an iterator that creates them here. >>>>>>> How do you actually trigger an action here? the code snippet itself >>>>>>> doesn't trigger anything. >>>>>>> I think we need more info about what else is happening in the code. >>>>>>> >>>>>>> On Wed, Jun 9, 2021 at 6:30 AM Tom Barber <t...@spicule.co.uk> wrote: >>>>>>> >>>>>>>> Yeah so if I update the FairFetcher to return a seq it makes no >>>>>>>> real difference. >>>>>>>> >>>>>>>> Here's an image of what I'm seeing just for reference: >>>>>>>> https://pasteboard.co/K5NFrz7.png >>>>>>>> >>>>>>>> Because this is databricks I don't have an actual spark submit >>>>>>>> command but it looks like this: >>>>>>>> >>>>>>>> curl xxxx -d >>>>>>>> '{"new_cluster":{"spark_conf":{"spark.executor.extraJavaOptions":"-Dpf4j.pluginsDir=/dbfs/FileStore/bcf/plugins/", >>>>>>>> "spark.task.cpus":"16"}, >>>>>>>> "spark_version":"8.3.x-scala2.12","aws_attributes":{"availability":"SPOT_WITH_FALLBACK","first_on_demand":1,"zone_id":"us-west-2c"},"node_type_id":"c5d.4xlarge","init_scripts":[{"dbfs":{"destination":"dbfs:/FileStore/crawlinit.sh"}}],"num_workers":3},"spark_submit_task":{"parameters":["--driver-java-options", >>>>>>>> "-Dpf4j.pluginsDir=/dbfs/FileStore/bcf/plugins/", "--driver-memory", >>>>>>>> "10g", >>>>>>>> "--executor-memory", "10g", >>>>>>>> "--class","edu.usc.irds.sparkler.Main","dbfs:/FileStore/bcf/sparkler7.jar","crawl","-id","mytestcrawl11", >>>>>>>> "-tn", "5000", "-co", >>>>>>>> "{\"plugins.active\":[\"urlfilter-regex\",\"urlfilter-samehost\",\"fetcher-chrome\"],\"plugins\":{\"fetcher.chrome\":{\"chrome.dns\":\"local\"}}}"]},"run_name":"testsubmi3t"}' >>>>>>>> >>>>>>>> I deliberately pinned spark.task.cpus to 16 to stop it swamping the >>>>>>>> driver trying to run all the tasks in parallel on the one node, but >>>>>>>> again >>>>>>>> I've got 50 tasks queued up all running on the single node. >>>>>>>> >>>>>>>> On Wed, Jun 9, 2021 at 12:01 PM Tom Barber <t...@spicule.co.uk> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> I've not run it yet, but I've stuck a toSeq on the end, but in >>>>>>>>> reality a Seq just inherits Iterator, right? >>>>>>>>> >>>>>>>>> Flatmap does return a RDD[CrawlData] unless my IDE is lying to me. >>>>>>>>> >>>>>>>>> Tom >>>>>>>>> >>>>>>>>> On Wed, Jun 9, 2021 at 10:54 AM Tom Barber <t...@spicule.co.uk> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Interesting Jayesh, thanks, I will test. >>>>>>>>>> >>>>>>>>>> All this code is inherited and it runs, but I don't think its >>>>>>>>>> been tested in a distributed context for about 5 years, but yeah I >>>>>>>>>> need to >>>>>>>>>> get this pushed down, so I'm happy to try anything! :) >>>>>>>>>> >>>>>>>>>> Tom >>>>>>>>>> >>>>>>>>>> On Wed, Jun 9, 2021 at 3:37 AM Lalwani, Jayesh < >>>>>>>>>> jlalw...@amazon.com> wrote: >>>>>>>>>> >>>>>>>>>>> flatMap is supposed to return Seq, not Iterator. You are >>>>>>>>>>> returning a class that implements Iterator. I have a hunch that's >>>>>>>>>>> what's >>>>>>>>>>> causing the confusion. flatMap is returning a RDD[FairFetcher] not >>>>>>>>>>> RDD[CrawlData]. Do you intend it to be RDD[CrawlData]? You might >>>>>>>>>>> want to >>>>>>>>>>> call toSeq on FairFetcher. >>>>>>>>>>> >>>>>>>>>>> On 6/8/21, 10:10 PM, "Tom Barber" <magicaltr...@apache.org> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>> CAUTION: This email originated from outside of the >>>>>>>>>>> organization. Do not click links or open attachments unless you can >>>>>>>>>>> confirm >>>>>>>>>>> the sender and know the content is safe. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> For anyone interested here's the execution logs up until the >>>>>>>>>>> point where it actually kicks off the workload in question: >>>>>>>>>>> https://gist.github.com/buggtb/a9e0445f24182bc8eedfe26c0f07a473 >>>>>>>>>>> >>>>>>>>>>> On 2021/06/09 01:52:39, Tom Barber <magicaltr...@apache.org> >>>>>>>>>>> wrote: >>>>>>>>>>> > ExecutorID says driver, and looking at the IP addresses >>>>>>>>>>> its running on its not any of the worker ip's. >>>>>>>>>>> > >>>>>>>>>>> > I forcibly told it to create 50, but they'd all end up >>>>>>>>>>> running in the same place. >>>>>>>>>>> > >>>>>>>>>>> > Working on some other ideas, I set spark.task.cpus to 16 >>>>>>>>>>> to match the nodes whilst still forcing it to 50 partitions >>>>>>>>>>> > >>>>>>>>>>> > val m = 50 >>>>>>>>>>> > >>>>>>>>>>> > val fetchedRdd = rdd.map(r => (r.getGroup, r)) >>>>>>>>>>> > .groupByKey(m).flatMap({ case (grp, rs) => new >>>>>>>>>>> FairFetcher(job, rs.iterator, localFetchDelay, >>>>>>>>>>> > FetchFunction, ParseFunction, >>>>>>>>>>> OutLinkFilterFunction, StatusUpdateSolrTransformer) }) >>>>>>>>>>> > .persist() >>>>>>>>>>> > >>>>>>>>>>> > that sort of thing. But still the tasks are pinned to the >>>>>>>>>>> driver executor and none of the workers, so I no longer saturate >>>>>>>>>>> the master >>>>>>>>>>> node, but I also have 3 workers just sat there doing nothing. >>>>>>>>>>> > >>>>>>>>>>> > On 2021/06/09 01:26:50, Sean Owen <sro...@gmail.com> >>>>>>>>>>> wrote: >>>>>>>>>>> > > Are you sure it's on the driver? or just 1 executor? >>>>>>>>>>> > > how many partitions does the groupByKey produce? that >>>>>>>>>>> would limit your >>>>>>>>>>> > > parallelism no matter what if it's a small number. >>>>>>>>>>> > > >>>>>>>>>>> > > On Tue, Jun 8, 2021 at 8:07 PM Tom Barber < >>>>>>>>>>> magicaltr...@apache.org> wrote: >>>>>>>>>>> > > >>>>>>>>>>> > > > Hi folks, >>>>>>>>>>> > > > >>>>>>>>>>> > > > Hopefully someone with more Spark experience than me >>>>>>>>>>> can explain this a >>>>>>>>>>> > > > bit. >>>>>>>>>>> > > > >>>>>>>>>>> > > > I dont' know if this is possible, impossible or just >>>>>>>>>>> an old design that >>>>>>>>>>> > > > could be better. >>>>>>>>>>> > > > >>>>>>>>>>> > > > I'm running Sparkler as a spark-submit job on a >>>>>>>>>>> databricks spark cluster >>>>>>>>>>> > > > and its getting to this point in the code( >>>>>>>>>>> > > > >>>>>>>>>>> https://github.com/USCDataScience/sparkler/blob/master/sparkler-core/sparkler-app/src/main/scala/edu/usc/irds/sparkler/pipeline/Crawler.scala#L222-L226 >>>>>>>>>>> > > > ) >>>>>>>>>>> > > > >>>>>>>>>>> > > > val fetchedRdd = rdd.map(r => (r.getGroup, r)) >>>>>>>>>>> > > > .groupByKey() >>>>>>>>>>> > > > .flatMap({ case (grp, rs) => new >>>>>>>>>>> FairFetcher(job, rs.iterator, >>>>>>>>>>> > > > localFetchDelay, >>>>>>>>>>> > > > FetchFunction, ParseFunction, >>>>>>>>>>> OutLinkFilterFunction, >>>>>>>>>>> > > > StatusUpdateSolrTransformer) }) >>>>>>>>>>> > > > .persist() >>>>>>>>>>> > > > >>>>>>>>>>> > > > This basically takes the RDD and then runs a web based >>>>>>>>>>> crawl over each RDD >>>>>>>>>>> > > > and returns the results. But when Spark executes it, >>>>>>>>>>> it runs all the crawls >>>>>>>>>>> > > > on the driver node and doesn't distribute them. >>>>>>>>>>> > > > >>>>>>>>>>> > > > The key is pretty static in these tests, so I have >>>>>>>>>>> also tried forcing the >>>>>>>>>>> > > > partition count (50 on a 16 core per node cluster) and >>>>>>>>>>> also repartitioning, >>>>>>>>>>> > > > but every time all the jobs are scheduled to run on >>>>>>>>>>> one node. >>>>>>>>>>> > > > >>>>>>>>>>> > > > What can I do better to distribute the tasks? Because >>>>>>>>>>> the processing of >>>>>>>>>>> > > > the data in the RDD isn't the bottleneck, the fetching >>>>>>>>>>> of the crawl data is >>>>>>>>>>> > > > the bottleneck, but that happens after the code has >>>>>>>>>>> been assigned to a node. >>>>>>>>>>> > > > >>>>>>>>>>> > > > Thanks >>>>>>>>>>> > > > >>>>>>>>>>> > > > Tom >>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>>> --------------------------------------------------------------------- >>>>>>>>>>> > > > To unsubscribe e-mail: >>>>>>>>>>> user-unsubscr...@spark.apache.org >>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>>> > > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> --------------------------------------------------------------------- >>>>>>>>>>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> --------------------------------------------------------------------- >>>>>>>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>> Spicule Limited is registered in England & Wales. Company Number: >>>>>>>> 09954122. Registered office: First Floor, Telecom House, 125-135 >>>>>>>> Preston Road, Brighton, England, BN1 6AF >>>>>>>> <https://www.google.com/maps/search/125-135+Preston+Road,+Brighton,+England,+BN1+6AF?entry=gmail&source=g>. >>>>>>>> VAT No. 251478891. >>>>>>>> >>>>>>>> >>>>>>>> All engagements are subject to Spicule Terms and Conditions of >>>>>>>> Business. This email and its contents are intended solely for the >>>>>>>> individual to whom it is addressed and may contain information that is >>>>>>>> confidential, privileged or otherwise protected from disclosure, >>>>>>>> distributing or copying. Any views or opinions presented in this email >>>>>>>> are >>>>>>>> solely those of the author and do not necessarily represent those of >>>>>>>> Spicule Limited. The company accepts no liability for any damage >>>>>>>> caused by >>>>>>>> any virus transmitted by this email. If you have received this message >>>>>>>> in >>>>>>>> error, please notify us immediately by reply email before deleting it >>>>>>>> from >>>>>>>> your system. Service of legal notice cannot be effected on Spicule >>>>>>>> Limited >>>>>>>> by email. >>>>>>>> >>>>>>> >>>>>> Spicule Limited is registered in England & Wales. Company Number: >>>>>> 09954122. Registered office: First Floor, Telecom House, 125-135 >>>>>> Preston Road, Brighton, England, BN1 6AF >>>>>> <https://www.google.com/maps/search/125-135+Preston+Road,+Brighton,+England,+BN1+6AF?entry=gmail&source=g>. >>>>>> VAT No. 251478891. >>>>>> >>>>>> >>>>>> All engagements are subject to Spicule Terms and Conditions of >>>>>> Business. This email and its contents are intended solely for the >>>>>> individual to whom it is addressed and may contain information that is >>>>>> confidential, privileged or otherwise protected from disclosure, >>>>>> distributing or copying. Any views or opinions presented in this email >>>>>> are >>>>>> solely those of the author and do not necessarily represent those of >>>>>> Spicule Limited. The company accepts no liability for any damage caused >>>>>> by >>>>>> any virus transmitted by this email. If you have received this message in >>>>>> error, please notify us immediately by reply email before deleting it >>>>>> from >>>>>> your system. Service of legal notice cannot be effected on Spicule >>>>>> Limited >>>>>> by email. >>>>>> >>>>> >>>> Spicule Limited is registered in England & Wales. Company Number: >>>> 09954122. Registered office: First Floor, Telecom House, 125-135 >>>> Preston Road, Brighton, England, BN1 6AF >>>> <https://www.google.com/maps/search/125-135+Preston+Road,+Brighton,+England,+BN1+6AF?entry=gmail&source=g>. >>>> VAT No. 251478891. >>>> >>>> >>>> All engagements are subject to Spicule Terms and Conditions of >>>> Business. This email and its contents are intended solely for the >>>> individual to whom it is addressed and may contain information that is >>>> confidential, privileged or otherwise protected from disclosure, >>>> distributing or copying. Any views or opinions presented in this email are >>>> solely those of the author and do not necessarily represent those of >>>> Spicule Limited. The company accepts no liability for any damage caused by >>>> any virus transmitted by this email. If you have received this message in >>>> error, please notify us immediately by reply email before deleting it from >>>> your system. Service of legal notice cannot be effected on Spicule Limited >>>> by email. >>>> >>> > Spicule Limited is registered in England & Wales. Company Number: > 09954122. Registered office: First Floor, Telecom House, 125-135 Preston > Road, Brighton, England, BN1 6AF > <https://www.google.com/maps/search/125-135+Preston+Road,+Brighton,+England,+BN1+6AF?entry=gmail&source=g>. > VAT No. 251478891. > > > All engagements are subject to Spicule Terms and Conditions of Business. > This email and its contents are intended solely for the individual to whom > it is addressed and may contain information that is confidential, > privileged or otherwise protected from disclosure, distributing or copying. > Any views or opinions presented in this email are solely those of the > author and do not necessarily represent those of Spicule Limited. The > company accepts no liability for any damage caused by any virus transmitted > by this email. If you have received this message in error, please notify us > immediately by reply email before deleting it from your system. Service of > legal notice cannot be effected on Spicule Limited by email. >