For anyone interested here's the execution logs up until the point where it 
actually kicks off the workload in question: 
https://gist.github.com/buggtb/a9e0445f24182bc8eedfe26c0f07a473

On 2021/06/09 01:52:39, Tom Barber <magicaltr...@apache.org> wrote: 
> ExecutorID says driver, and looking at the IP addresses its running on its 
> not any of the worker ip's.
> 
> I forcibly told it to create 50, but they'd all end up running in the same 
> place. 
> 
> Working on some other ideas, I set spark.task.cpus to 16 to match the nodes 
> whilst still forcing it to 50 partitions
> 
> val m = 50
> 
> val fetchedRdd = rdd.map(r => (r.getGroup, r))
>         .groupByKey(m).flatMap({ case (grp, rs) => new FairFetcher(job, 
> rs.iterator, localFetchDelay,
>           FetchFunction, ParseFunction, OutLinkFilterFunction, 
> StatusUpdateSolrTransformer) })
>         .persist()
> 
> that sort of thing. But still the tasks are pinned to the driver executor and 
> none of the workers, so I no longer saturate the master node, but I also have 
> 3 workers just sat there doing nothing.
> 
> On 2021/06/09 01:26:50, Sean Owen <sro...@gmail.com> wrote: 
> > Are you sure it's on the driver? or just 1 executor?
> > how many partitions does the groupByKey produce? that would limit your
> > parallelism no matter what if it's a small number.
> > 
> > On Tue, Jun 8, 2021 at 8:07 PM Tom Barber <magicaltr...@apache.org> wrote:
> > 
> > > Hi folks,
> > >
> > > Hopefully someone with more Spark experience than me can explain this a
> > > bit.
> > >
> > > I dont' know if this is possible, impossible or just an old design that
> > > could be better.
> > >
> > > I'm running Sparkler as a spark-submit job on a databricks spark cluster
> > > and its getting to this point in the code(
> > > https://github.com/USCDataScience/sparkler/blob/master/sparkler-core/sparkler-app/src/main/scala/edu/usc/irds/sparkler/pipeline/Crawler.scala#L222-L226
> > > )
> > >
> > > val fetchedRdd = rdd.map(r => (r.getGroup, r))
> > >         .groupByKey()
> > >         .flatMap({ case (grp, rs) => new FairFetcher(job, rs.iterator,
> > > localFetchDelay,
> > >           FetchFunction, ParseFunction, OutLinkFilterFunction,
> > > StatusUpdateSolrTransformer) })
> > >         .persist()
> > >
> > > This basically takes the RDD and then runs a web based crawl over each RDD
> > > and returns the results. But when Spark executes it, it runs all the 
> > > crawls
> > > on the driver node and doesn't distribute them.
> > >
> > > The key is pretty static in these tests, so I have also tried forcing the
> > > partition count (50 on a 16 core per node cluster) and also 
> > > repartitioning,
> > > but every time all the jobs are scheduled to run on one node.
> > >
> > > What can I do better to distribute the tasks? Because the processing of
> > > the data in the RDD isn't the bottleneck, the fetching of the crawl data 
> > > is
> > > the bottleneck, but that happens after the code has been assigned to a 
> > > node.
> > >
> > > Thanks
> > >
> > > Tom
> > >
> > >
> > > ---------------------------------------------------------------------
> > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> > >
> > >
> > 
> 
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 
> 

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to