Interesting Jayesh, thanks, I will test.

All this code is inherited and it runs, but I don't think its been tested
in a distributed context for about 5 years, but yeah I need to get this
pushed down, so I'm happy to try anything! :)

Tom

On Wed, Jun 9, 2021 at 3:37 AM Lalwani, Jayesh <jlalw...@amazon.com> wrote:

> flatMap is supposed to return Seq, not Iterator. You are returning a class
> that implements Iterator. I have a hunch that's what's causing the
> confusion. flatMap is returning a RDD[FairFetcher] not RDD[CrawlData]. Do
> you intend it to be RDD[CrawlData]? You might want to call toSeq on
> FairFetcher.
>
> On 6/8/21, 10:10 PM, "Tom Barber" <magicaltr...@apache.org> wrote:
>
>     CAUTION: This email originated from outside of the organization. Do
> not click links or open attachments unless you can confirm the sender and
> know the content is safe.
>
>
>
>     For anyone interested here's the execution logs up until the point
> where it actually kicks off the workload in question:
> https://gist.github.com/buggtb/a9e0445f24182bc8eedfe26c0f07a473
>
>     On 2021/06/09 01:52:39, Tom Barber <magicaltr...@apache.org> wrote:
>     > ExecutorID says driver, and looking at the IP addresses its running
> on its not any of the worker ip's.
>     >
>     > I forcibly told it to create 50, but they'd all end up running in
> the same place.
>     >
>     > Working on some other ideas, I set spark.task.cpus to 16 to match
> the nodes whilst still forcing it to 50 partitions
>     >
>     > val m = 50
>     >
>     > val fetchedRdd = rdd.map(r => (r.getGroup, r))
>     >         .groupByKey(m).flatMap({ case (grp, rs) => new
> FairFetcher(job, rs.iterator, localFetchDelay,
>     >           FetchFunction, ParseFunction, OutLinkFilterFunction,
> StatusUpdateSolrTransformer) })
>     >         .persist()
>     >
>     > that sort of thing. But still the tasks are pinned to the driver
> executor and none of the workers, so I no longer saturate the master node,
> but I also have 3 workers just sat there doing nothing.
>     >
>     > On 2021/06/09 01:26:50, Sean Owen <sro...@gmail.com> wrote:
>     > > Are you sure it's on the driver? or just 1 executor?
>     > > how many partitions does the groupByKey produce? that would limit
> your
>     > > parallelism no matter what if it's a small number.
>     > >
>     > > On Tue, Jun 8, 2021 at 8:07 PM Tom Barber <magicaltr...@apache.org>
> wrote:
>     > >
>     > > > Hi folks,
>     > > >
>     > > > Hopefully someone with more Spark experience than me can explain
> this a
>     > > > bit.
>     > > >
>     > > > I dont' know if this is possible, impossible or just an old
> design that
>     > > > could be better.
>     > > >
>     > > > I'm running Sparkler as a spark-submit job on a databricks spark
> cluster
>     > > > and its getting to this point in the code(
>     > > >
> https://github.com/USCDataScience/sparkler/blob/master/sparkler-core/sparkler-app/src/main/scala/edu/usc/irds/sparkler/pipeline/Crawler.scala#L222-L226
>     > > > )
>     > > >
>     > > > val fetchedRdd = rdd.map(r => (r.getGroup, r))
>     > > >         .groupByKey()
>     > > >         .flatMap({ case (grp, rs) => new FairFetcher(job,
> rs.iterator,
>     > > > localFetchDelay,
>     > > >           FetchFunction, ParseFunction, OutLinkFilterFunction,
>     > > > StatusUpdateSolrTransformer) })
>     > > >         .persist()
>     > > >
>     > > > This basically takes the RDD and then runs a web based crawl
> over each RDD
>     > > > and returns the results. But when Spark executes it, it runs all
> the crawls
>     > > > on the driver node and doesn't distribute them.
>     > > >
>     > > > The key is pretty static in these tests, so I have also tried
> forcing the
>     > > > partition count (50 on a 16 core per node cluster) and also
> repartitioning,
>     > > > but every time all the jobs are scheduled to run on one node.
>     > > >
>     > > > What can I do better to distribute the tasks? Because the
> processing of
>     > > > the data in the RDD isn't the bottleneck, the fetching of the
> crawl data is
>     > > > the bottleneck, but that happens after the code has been
> assigned to a node.
>     > > >
>     > > > Thanks
>     > > >
>     > > > Tom
>     > > >
>     > > >
>     > > >
> ---------------------------------------------------------------------
>     > > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>     > > >
>     > > >
>     > >
>     >
>     > ---------------------------------------------------------------------
>     > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>     >
>     >
>
>     ---------------------------------------------------------------------
>     To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
>

-- 


Spicule Limited is registered in England & Wales. Company Number: 
09954122. Registered office: First Floor, Telecom House, 125-135 Preston 
Road, Brighton, England, BN1 6AF. VAT No. 251478891.




All engagements 
are subject to Spicule Terms and Conditions of Business. This email and its 
contents are intended solely for the individual to whom it is addressed and 
may contain information that is confidential, privileged or otherwise 
protected from disclosure, distributing or copying. Any views or opinions 
presented in this email are solely those of the author and do not 
necessarily represent those of Spicule Limited. The company accepts no 
liability for any damage caused by any virus transmitted by this email. If 
you have received this message in error, please notify us immediately by 
reply email before deleting it from your system. Service of legal notice 
cannot be effected on Spicule Limited by email.

Reply via email to