persist() doesn't even persist by itself - just sets it to be persisted when it's executed. key doesn't matter here, nor partitioning, if this code is trying to run things on the driver inadvertently. I don't quite grok what the OSS code you linked to is doing, but it's running some supplied functions very directly and at a low-level with sc.runJob, which might be part of how this can do something unusual. How do you trigger any action? what happens after persist()
On Wed, Jun 9, 2021 at 9:48 AM Tom Barber <t...@spicule.co.uk> wrote: > Thanks Mich, > > The key on the first iteration is just a string that says "seed", so it is > indeed on the first crawl the same across all of the groups. Further > iterations would be different, but I'm not there yet. I was under the > impression that a repartition would distribute the tasks. Is that not the > case? > > Thanks > > Tom > > On Wed, Jun 9, 2021 at 3:44 PM Mich Talebzadeh <mich.talebza...@gmail.com> > wrote: > >> Hi Tom, >> >> Persist() here simply means persist to memory). That is all. You can >> check UI tab on storage >> >> >> https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence >> >> So I gather the code is stuck from your link in the driver. You stated >> that you tried repartition() but it did not do anything, >> >> Further you stated : >> >> " The key is pretty static in these tests, so I have also tried forcing >> the partition count (50 on a 16 core per node cluster) and also >> repartitioning, but every time all the jobs are scheduled to run on one >> node." >> >> >> What is the key? >> >> >> HTH >> >> >> view my Linkedin profile >> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >> >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> >> >> On Wed, 9 Jun 2021 at 15:23, Tom Barber <t...@spicule.co.uk> wrote: >> >>> Interesting Sean thanks for that insight, I wasn't aware of that fact, I >>> assume the .persist() at the end of that line doesn't do it? >>> >>> I believe, looking at the output in the SparkUI, it gets to >>> https://github.com/USCDataScience/sparkler/blob/master/sparkler-core/sparkler-app/src/main/scala/edu/usc/irds/sparkler/pipeline/Crawler.scala#L254 >>> and calls the context runJob. >>> >>> On Wed, Jun 9, 2021 at 2:07 PM Sean Owen <sro...@gmail.com> wrote: >>> >>>> All these configurations don't matter at all if this is executing on >>>> the driver. >>>> Returning an Iterator in flatMap is fine though it 'delays' execution >>>> until that iterator is evaluated by something, which is normally fine. >>>> Does creating this FairFetcher do anything by itself? you're just >>>> returning an iterator that creates them here. >>>> How do you actually trigger an action here? the code snippet itself >>>> doesn't trigger anything. >>>> I think we need more info about what else is happening in the code. >>>> >>>> On Wed, Jun 9, 2021 at 6:30 AM Tom Barber <t...@spicule.co.uk> wrote: >>>> >>>>> Yeah so if I update the FairFetcher to return a seq it makes no real >>>>> difference. >>>>> >>>>> Here's an image of what I'm seeing just for reference: >>>>> https://pasteboard.co/K5NFrz7.png >>>>> >>>>> Because this is databricks I don't have an actual spark submit command >>>>> but it looks like this: >>>>> >>>>> curl xxxx -d >>>>> '{"new_cluster":{"spark_conf":{"spark.executor.extraJavaOptions":"-Dpf4j.pluginsDir=/dbfs/FileStore/bcf/plugins/", >>>>> "spark.task.cpus":"16"}, >>>>> "spark_version":"8.3.x-scala2.12","aws_attributes":{"availability":"SPOT_WITH_FALLBACK","first_on_demand":1,"zone_id":"us-west-2c"},"node_type_id":"c5d.4xlarge","init_scripts":[{"dbfs":{"destination":"dbfs:/FileStore/crawlinit.sh"}}],"num_workers":3},"spark_submit_task":{"parameters":["--driver-java-options", >>>>> "-Dpf4j.pluginsDir=/dbfs/FileStore/bcf/plugins/", "--driver-memory", >>>>> "10g", >>>>> "--executor-memory", "10g", >>>>> "--class","edu.usc.irds.sparkler.Main","dbfs:/FileStore/bcf/sparkler7.jar","crawl","-id","mytestcrawl11", >>>>> "-tn", "5000", "-co", >>>>> "{\"plugins.active\":[\"urlfilter-regex\",\"urlfilter-samehost\",\"fetcher-chrome\"],\"plugins\":{\"fetcher.chrome\":{\"chrome.dns\":\"local\"}}}"]},"run_name":"testsubmi3t"}' >>>>> >>>>> I deliberately pinned spark.task.cpus to 16 to stop it swamping the >>>>> driver trying to run all the tasks in parallel on the one node, but again >>>>> I've got 50 tasks queued up all running on the single node. >>>>> >>>>> On Wed, Jun 9, 2021 at 12:01 PM Tom Barber <t...@spicule.co.uk> wrote: >>>>> >>>>>> I've not run it yet, but I've stuck a toSeq on the end, but in >>>>>> reality a Seq just inherits Iterator, right? >>>>>> >>>>>> Flatmap does return a RDD[CrawlData] unless my IDE is lying to me. >>>>>> >>>>>> Tom >>>>>> >>>>>> On Wed, Jun 9, 2021 at 10:54 AM Tom Barber <t...@spicule.co.uk> wrote: >>>>>> >>>>>>> Interesting Jayesh, thanks, I will test. >>>>>>> >>>>>>> All this code is inherited and it runs, but I don't think its been >>>>>>> tested in a distributed context for about 5 years, but yeah I need to >>>>>>> get >>>>>>> this pushed down, so I'm happy to try anything! :) >>>>>>> >>>>>>> Tom >>>>>>> >>>>>>> On Wed, Jun 9, 2021 at 3:37 AM Lalwani, Jayesh <jlalw...@amazon.com> >>>>>>> wrote: >>>>>>> >>>>>>>> flatMap is supposed to return Seq, not Iterator. You are returning >>>>>>>> a class that implements Iterator. I have a hunch that's what's causing >>>>>>>> the >>>>>>>> confusion. flatMap is returning a RDD[FairFetcher] not RDD[CrawlData]. >>>>>>>> Do >>>>>>>> you intend it to be RDD[CrawlData]? You might want to call toSeq on >>>>>>>> FairFetcher. >>>>>>>> >>>>>>>> On 6/8/21, 10:10 PM, "Tom Barber" <magicaltr...@apache.org> wrote: >>>>>>>> >>>>>>>> CAUTION: This email originated from outside of the >>>>>>>> organization. Do not click links or open attachments unless you can >>>>>>>> confirm >>>>>>>> the sender and know the content is safe. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> For anyone interested here's the execution logs up until the >>>>>>>> point where it actually kicks off the workload in question: >>>>>>>> https://gist.github.com/buggtb/a9e0445f24182bc8eedfe26c0f07a473 >>>>>>>> >>>>>>>> On 2021/06/09 01:52:39, Tom Barber <magicaltr...@apache.org> >>>>>>>> wrote: >>>>>>>> > ExecutorID says driver, and looking at the IP addresses its >>>>>>>> running on its not any of the worker ip's. >>>>>>>> > >>>>>>>> > I forcibly told it to create 50, but they'd all end up >>>>>>>> running in the same place. >>>>>>>> > >>>>>>>> > Working on some other ideas, I set spark.task.cpus to 16 to >>>>>>>> match the nodes whilst still forcing it to 50 partitions >>>>>>>> > >>>>>>>> > val m = 50 >>>>>>>> > >>>>>>>> > val fetchedRdd = rdd.map(r => (r.getGroup, r)) >>>>>>>> > .groupByKey(m).flatMap({ case (grp, rs) => new >>>>>>>> FairFetcher(job, rs.iterator, localFetchDelay, >>>>>>>> > FetchFunction, ParseFunction, >>>>>>>> OutLinkFilterFunction, StatusUpdateSolrTransformer) }) >>>>>>>> > .persist() >>>>>>>> > >>>>>>>> > that sort of thing. But still the tasks are pinned to the >>>>>>>> driver executor and none of the workers, so I no longer saturate the >>>>>>>> master >>>>>>>> node, but I also have 3 workers just sat there doing nothing. >>>>>>>> > >>>>>>>> > On 2021/06/09 01:26:50, Sean Owen <sro...@gmail.com> wrote: >>>>>>>> > > Are you sure it's on the driver? or just 1 executor? >>>>>>>> > > how many partitions does the groupByKey produce? that would >>>>>>>> limit your >>>>>>>> > > parallelism no matter what if it's a small number. >>>>>>>> > > >>>>>>>> > > On Tue, Jun 8, 2021 at 8:07 PM Tom Barber < >>>>>>>> magicaltr...@apache.org> wrote: >>>>>>>> > > >>>>>>>> > > > Hi folks, >>>>>>>> > > > >>>>>>>> > > > Hopefully someone with more Spark experience than me can >>>>>>>> explain this a >>>>>>>> > > > bit. >>>>>>>> > > > >>>>>>>> > > > I dont' know if this is possible, impossible or just an >>>>>>>> old design that >>>>>>>> > > > could be better. >>>>>>>> > > > >>>>>>>> > > > I'm running Sparkler as a spark-submit job on a >>>>>>>> databricks spark cluster >>>>>>>> > > > and its getting to this point in the code( >>>>>>>> > > > >>>>>>>> https://github.com/USCDataScience/sparkler/blob/master/sparkler-core/sparkler-app/src/main/scala/edu/usc/irds/sparkler/pipeline/Crawler.scala#L222-L226 >>>>>>>> > > > ) >>>>>>>> > > > >>>>>>>> > > > val fetchedRdd = rdd.map(r => (r.getGroup, r)) >>>>>>>> > > > .groupByKey() >>>>>>>> > > > .flatMap({ case (grp, rs) => new FairFetcher(job, >>>>>>>> rs.iterator, >>>>>>>> > > > localFetchDelay, >>>>>>>> > > > FetchFunction, ParseFunction, >>>>>>>> OutLinkFilterFunction, >>>>>>>> > > > StatusUpdateSolrTransformer) }) >>>>>>>> > > > .persist() >>>>>>>> > > > >>>>>>>> > > > This basically takes the RDD and then runs a web based >>>>>>>> crawl over each RDD >>>>>>>> > > > and returns the results. But when Spark executes it, it >>>>>>>> runs all the crawls >>>>>>>> > > > on the driver node and doesn't distribute them. >>>>>>>> > > > >>>>>>>> > > > The key is pretty static in these tests, so I have also >>>>>>>> tried forcing the >>>>>>>> > > > partition count (50 on a 16 core per node cluster) and >>>>>>>> also repartitioning, >>>>>>>> > > > but every time all the jobs are scheduled to run on one >>>>>>>> node. >>>>>>>> > > > >>>>>>>> > > > What can I do better to distribute the tasks? Because the >>>>>>>> processing of >>>>>>>> > > > the data in the RDD isn't the bottleneck, the fetching of >>>>>>>> the crawl data is >>>>>>>> > > > the bottleneck, but that happens after the code has been >>>>>>>> assigned to a node. >>>>>>>> > > > >>>>>>>> > > > Thanks >>>>>>>> > > > >>>>>>>> > > > Tom >>>>>>>> > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>> --------------------------------------------------------------------- >>>>>>>> > > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>>>>>>> > > > >>>>>>>> > > > >>>>>>>> > > >>>>>>>> > >>>>>>>> > >>>>>>>> --------------------------------------------------------------------- >>>>>>>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>>>>>>> > >>>>>>>> > >>>>>>>> >>>>>>>> >>>>>>>> --------------------------------------------------------------------- >>>>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>>>>>>> >>>>>>>> >>>>>>>> >>>>> Spicule Limited is registered in England & Wales. Company Number: >>>>> 09954122. Registered office: First Floor, Telecom House, 125-135 Preston >>>>> Road, Brighton, England, BN1 6AF. VAT No. 251478891. >>>>> >>>>> >>>>> All engagements are subject to Spicule Terms and Conditions of >>>>> Business. This email and its contents are intended solely for the >>>>> individual to whom it is addressed and may contain information that is >>>>> confidential, privileged or otherwise protected from disclosure, >>>>> distributing or copying. Any views or opinions presented in this email are >>>>> solely those of the author and do not necessarily represent those of >>>>> Spicule Limited. The company accepts no liability for any damage caused by >>>>> any virus transmitted by this email. If you have received this message in >>>>> error, please notify us immediately by reply email before deleting it from >>>>> your system. Service of legal notice cannot be effected on Spicule Limited >>>>> by email. >>>>> >>>> >>> Spicule Limited is registered in England & Wales. Company Number: >>> 09954122. Registered office: First Floor, Telecom House, 125-135 Preston >>> Road, Brighton, England, BN1 6AF. VAT No. 251478891. >>> >>> >>> All engagements are subject to Spicule Terms and Conditions of Business. >>> This email and its contents are intended solely for the individual to whom >>> it is addressed and may contain information that is confidential, >>> privileged or otherwise protected from disclosure, distributing or copying. >>> Any views or opinions presented in this email are solely those of the >>> author and do not necessarily represent those of Spicule Limited. The >>> company accepts no liability for any damage caused by any virus transmitted >>> by this email. If you have received this message in error, please notify us >>> immediately by reply email before deleting it from your system. Service of >>> legal notice cannot be effected on Spicule Limited by email. >>> >> > Spicule Limited is registered in England & Wales. Company Number: > 09954122. Registered office: First Floor, Telecom House, 125-135 Preston > Road, Brighton, England, BN1 6AF. VAT No. 251478891. > > > All engagements are subject to Spicule Terms and Conditions of Business. > This email and its contents are intended solely for the individual to whom > it is addressed and may contain information that is confidential, > privileged or otherwise protected from disclosure, distributing or copying. > Any views or opinions presented in this email are solely those of the > author and do not necessarily represent those of Spicule Limited. The > company accepts no liability for any damage caused by any virus transmitted > by this email. If you have received this message in error, please notify us > immediately by reply email before deleting it from your system. Service of > legal notice cannot be effected on Spicule Limited by email. >