For anyone interested here's the execution logs up until the point where it actually kicks off the workload in question: https://gist.github.com/buggtb/a9e0445f24182bc8eedfe26c0f07a473
On 2021/06/09 01:52:39, Tom Barber <magicaltr...@apache.org> wrote: > ExecutorID says driver, and looking at the IP addresses its running on its > not any of the worker ip's. > > I forcibly told it to create 50, but they'd all end up running in the same > place. > > Working on some other ideas, I set spark.task.cpus to 16 to match the nodes > whilst still forcing it to 50 partitions > > val m = 50 > > val fetchedRdd = rdd.map(r => (r.getGroup, r)) > .groupByKey(m).flatMap({ case (grp, rs) => new FairFetcher(job, > rs.iterator, localFetchDelay, > FetchFunction, ParseFunction, OutLinkFilterFunction, > StatusUpdateSolrTransformer) }) > .persist() > > that sort of thing. But still the tasks are pinned to the driver executor and > none of the workers, so I no longer saturate the master node, but I also have > 3 workers just sat there doing nothing. > > On 2021/06/09 01:26:50, Sean Owen <sro...@gmail.com> wrote: > > Are you sure it's on the driver? or just 1 executor? > > how many partitions does the groupByKey produce? that would limit your > > parallelism no matter what if it's a small number. > > > > On Tue, Jun 8, 2021 at 8:07 PM Tom Barber <magicaltr...@apache.org> wrote: > > > > > Hi folks, > > > > > > Hopefully someone with more Spark experience than me can explain this a > > > bit. > > > > > > I dont' know if this is possible, impossible or just an old design that > > > could be better. > > > > > > I'm running Sparkler as a spark-submit job on a databricks spark cluster > > > and its getting to this point in the code( > > > https://github.com/USCDataScience/sparkler/blob/master/sparkler-core/sparkler-app/src/main/scala/edu/usc/irds/sparkler/pipeline/Crawler.scala#L222-L226 > > > ) > > > > > > val fetchedRdd = rdd.map(r => (r.getGroup, r)) > > > .groupByKey() > > > .flatMap({ case (grp, rs) => new FairFetcher(job, rs.iterator, > > > localFetchDelay, > > > FetchFunction, ParseFunction, OutLinkFilterFunction, > > > StatusUpdateSolrTransformer) }) > > > .persist() > > > > > > This basically takes the RDD and then runs a web based crawl over each RDD > > > and returns the results. But when Spark executes it, it runs all the > > > crawls > > > on the driver node and doesn't distribute them. > > > > > > The key is pretty static in these tests, so I have also tried forcing the > > > partition count (50 on a 16 core per node cluster) and also > > > repartitioning, > > > but every time all the jobs are scheduled to run on one node. > > > > > > What can I do better to distribute the tasks? Because the processing of > > > the data in the RDD isn't the bottleneck, the fetching of the crawl data > > > is > > > the bottleneck, but that happens after the code has been assigned to a > > > node. > > > > > > Thanks > > > > > > Tom > > > > > > > > > --------------------------------------------------------------------- > > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > > > > > > > > > --------------------------------------------------------------------- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org