No, this is an on demand databricks cluster.

On Wed, Jun 9, 2021 at 6:54 PM Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

>
>
> Are you running this in Managed Instance Group (MIG)?
>
> https://cloud.google.com/compute/docs/instance-groups
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 9 Jun 2021 at 18:43, Tom Barber <t...@spicule.co.uk> wrote:
>
>> And also as this morning: https://pasteboard.co/K5Q9aEf.png
>>
>> Removing the cpu pins gives me more tasks but as you can see here:
>>
>> https://pasteboard.co/K5Q9GO0.png
>>
>> It just loads up a single server.
>>
>> On Wed, Jun 9, 2021 at 6:32 PM Tom Barber <t...@spicule.co.uk> wrote:
>>
>>> Thanks Chris....
>>>
>>> All the code I have on both sides is as modern as it allows. Running
>>> Spark 3.1.1 and Scala 2.12.
>>>
>>> I stuck some logging in to check reality:
>>>
>>> LOG.info("GROUP COUNT: " + fetchedgrp.count())
>>> val cgrp = fetchedgrp.collect()
>>> cgrp.foreach(f => {
>>>   LOG.info("Out1 :" + f._1)
>>>   f._2.foreach(u => {
>>>     LOG.info("ID:" + u.getId)
>>>     LOG.info("GROUP:" + u.getGroup)
>>>   })
>>> })
>>> LOG.info("PARTITION COUNT:" + fetchedgrp.getNumPartitions)
>>> val fetchedRdd = fetchedgrp.flatMap({ case (grp, rs) => new 
>>> FairFetcher(job, rs.iterator, localFetchDelay,
>>>     FetchFunction, ParseFunction, OutLinkFilterFunction, 
>>> StatusUpdateSolrTransformer) })
>>>   .persist()
>>>
>>> LOG.info("FETCHED PARTITIONS: " + fetchedRdd.getNumPartitions)
>>> LOG.info("CoUNT: " + fetchedRdd.count())
>>>
>>>
>>> It says I have 5000 groups, which makes sense as its defined in my
>>> command line and both sides claim to have 50 partitions which also makes
>>> sense as I define that in my code as well.
>>>
>>> Then it starts the crawl at the final count line as I guess it needs to
>>> materialize things and so at that point I don't know what the count would
>>> return, but everything else checks out.
>>>
>>> I'll poke around in the other hints you suggested later, thanks for the
>>> help.
>>>
>>> Tom
>>>
>>> On Wed, Jun 9, 2021 at 5:49 PM Chris Martin <ch...@cmartinit.co.uk>
>>> wrote:
>>>
>>>> Hmm then my guesses are (in order of decreasing probability:
>>>>
>>>> * Whatever class makes up fetchedRdd (MemexDeepCrawlDbRDD?) isn't
>>>> compatible with the lastest spark release.
>>>> * You've got 16 threads per task on a 16 core machine.  Should be fine,
>>>> but I wonder if it's confusing things as you don't also set
>>>> spark.executor.cores and Databricks might also default that to 1.
>>>> * There's some custom partitioner in play which is causing everything
>>>> to go to the same partition.
>>>> * The group keys are all hashing to the same value (it's difficult to
>>>> see how this would be the case if the group keys are genuinely different,
>>>> but maybe there's something else going on).
>>>>
>>>> My hints:
>>>>
>>>> 1. Make sure you're using a recent version of sparkler
>>>> 2. Try repartition with a custom partitioner that you know will end
>>>> things to different partitions
>>>> 3. Try either removing "spark.task.cpus":"16"  or setting
>>>> spark.executor.cores to 1.
>>>> 4. print out the group keys and see if there's any weird pattern to
>>>> them.
>>>> 5. See if the same thing happens in spark local.
>>>>
>>>> If you have a reproducible example you can post publically then I'm
>>>> happy to  take a look.
>>>>
>>>> Chris
>>>>
>>>> On Wed, Jun 9, 2021 at 5:17 PM Tom Barber <t...@spicule.co.uk> wrote:
>>>>
>>>>> Yeah to test that I just set the group key to the ID in the record
>>>>> which is a solr supplied UUID, which means effectively you end up with 
>>>>> 4000
>>>>> groups now.
>>>>>
>>>>> On Wed, Jun 9, 2021 at 5:13 PM Chris Martin <ch...@cmartinit.co.uk>
>>>>> wrote:
>>>>>
>>>>>> One thing I would check is this line:
>>>>>>
>>>>>> val fetchedRdd = rdd.map(r => (r.getGroup, r))
>>>>>>
>>>>>> how many distinct groups do you ended up with?  If there's just one
>>>>>> then I think you might see the behaviour you observe.
>>>>>>
>>>>>> Chris
>>>>>>
>>>>>>
>>>>>> On Wed, Jun 9, 2021 at 4:17 PM Tom Barber <t...@spicule.co.uk> wrote:
>>>>>>
>>>>>>> Also just to follow up on that slightly, I did also try off the back
>>>>>>> of another comment:
>>>>>>>
>>>>>>> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
>>>>>>>   val job = this.job.asInstanceOf[SparklerJob]
>>>>>>>
>>>>>>>   val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))
>>>>>>>
>>>>>>>   val scoreUpdateRdd: RDD[SolrInputDocument] = 
>>>>>>> scoredRdd.repartition(50).map(d => ScoreUpdateSolrTransformer(d))
>>>>>>>
>>>>>>>
>>>>>>> Where I repartitioned that scoredRdd map out of interest, it then
>>>>>>> triggers the FairFetcher function there, instead of in the runJob(), but
>>>>>>> still on a single executor 😞
>>>>>>>
>>>>>>> Tom
>>>>>>>
>>>>>>> On Wed, Jun 9, 2021 at 4:11 PM Tom Barber <t...@spicule.co.uk> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>> Okay so what happens is that the crawler reads a bunch of solr
>>>>>>>> data, we're not talking GB's just a list of JSON and turns that into a
>>>>>>>> bunch of RDD's that end up in that flatmap that I linked to first.
>>>>>>>>
>>>>>>>> The fair fetcher is an interface to a pluggable backend that
>>>>>>>> basically takes some of the fields and goes and crawls websites listed 
>>>>>>>> in
>>>>>>>> them looking for information. We wrote this code 6 years ago for a 
>>>>>>>> DARPA
>>>>>>>> project tracking down criminals on the web. Now I'm reusing it but 
>>>>>>>> trying
>>>>>>>> to force it to scale out a bit more.
>>>>>>>>
>>>>>>>> Say I have 4000 urls I want to crawl and a 3 node Spark cluster, I
>>>>>>>> want to push down 1 URL (a few more wont hurt, but crawling 50 urls in
>>>>>>>> parallel on one node makes my cluster sad) to each executor and have 
>>>>>>>> it run
>>>>>>>> a crawl, then move on and get another one and so on. That way you're 
>>>>>>>> not
>>>>>>>> saturating a node trying to look up all of them and you could add more
>>>>>>>> nodes for greater capacity pretty quickly. Once the website has been
>>>>>>>> captured, you can then "score" it for want of a better term to 
>>>>>>>> determine
>>>>>>>> its usefulness, which is where the map is being triggered.
>>>>>>>>
>>>>>>>> In answer to your questions Sean, no action seems triggered until
>>>>>>>> you end up in the score block and the sc.runJob() because thats 
>>>>>>>> literally
>>>>>>>> the next line of functionality as Kafka isn't enabled.
>>>>>>>>
>>>>>>>> val fetchedRdd = rdd.map(r => (r.getGroup, r))
>>>>>>>>   .groupByKey(m).flatMap({ case (grp, rs) => new FairFetcher(job, 
>>>>>>>> rs.iterator, localFetchDelay,
>>>>>>>>     FetchFunction, ParseFunction, OutLinkFilterFunction, 
>>>>>>>> StatusUpdateSolrTransformer).toSeq })
>>>>>>>>   .persist()
>>>>>>>>
>>>>>>>> if (kafkaEnable) {
>>>>>>>>   storeContentKafka(kafkaListeners, kafkaTopic.format(jobId), 
>>>>>>>> fetchedRdd)
>>>>>>>> }
>>>>>>>> val scoredRdd = score(fetchedRdd)
>>>>>>>>
>>>>>>>>
>>>>>>>> That if block is disabled so the score function runs. Inside of
>>>>>>>> that:
>>>>>>>>
>>>>>>>> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
>>>>>>>>   val job = this.job.asInstanceOf[SparklerJob]
>>>>>>>>
>>>>>>>>   val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))
>>>>>>>>
>>>>>>>>   val scoreUpdateRdd: RDD[SolrInputDocument] = scoredRdd.map(d => 
>>>>>>>> ScoreUpdateSolrTransformer(d))
>>>>>>>>   val scoreUpdateFunc = new SolrStatusUpdate(job)
>>>>>>>>   sc.runJob(scoreUpdateRdd, scoreUpdateFunc)
>>>>>>>> ....
>>>>>>>>
>>>>>>>>
>>>>>>>> When its doing stuff in the SparkUI I can see that its waiting on
>>>>>>>> the sc.runJob() line, so thats the execution point.
>>>>>>>>
>>>>>>>>
>>>>>>>> Tom
>>>>>>>>
>>>>>>>> On Wed, Jun 9, 2021 at 3:59 PM Sean Owen <sro...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> persist() doesn't even persist by itself - just sets it to be
>>>>>>>>> persisted when it's executed.
>>>>>>>>> key doesn't matter here, nor partitioning, if this code is trying
>>>>>>>>> to run things on the driver inadvertently.
>>>>>>>>> I don't quite grok what the OSS code you linked to is doing, but
>>>>>>>>> it's running some supplied functions very directly and at a low-level 
>>>>>>>>> with
>>>>>>>>> sc.runJob, which might be part of how this can do something unusual.
>>>>>>>>> How do you trigger any action? what happens after persist()
>>>>>>>>>
>>>>>>>>> On Wed, Jun 9, 2021 at 9:48 AM Tom Barber <t...@spicule.co.uk>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Thanks Mich,
>>>>>>>>>>
>>>>>>>>>> The key on the first iteration is just a string that says "seed",
>>>>>>>>>> so it is indeed on the first crawl the same across all of the groups.
>>>>>>>>>> Further iterations would be different, but I'm not there yet. I was 
>>>>>>>>>> under
>>>>>>>>>> the impression that a repartition would distribute the tasks. Is 
>>>>>>>>>> that not
>>>>>>>>>> the case?
>>>>>>>>>>
>>>>>>>>>> Thanks
>>>>>>>>>>
>>>>>>>>>> Tom
>>>>>>>>>>
>>>>>>>>>> On Wed, Jun 9, 2021 at 3:44 PM Mich Talebzadeh <
>>>>>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Tom,
>>>>>>>>>>>
>>>>>>>>>>> Persist() here simply means persist to memory). That is all. You
>>>>>>>>>>> can check UI tab on storage
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence
>>>>>>>>>>>
>>>>>>>>>>> So I gather the code is stuck from your link in the driver. You
>>>>>>>>>>> stated that you tried repartition() but it did not do anything,
>>>>>>>>>>>
>>>>>>>>>>> Further you stated :
>>>>>>>>>>>
>>>>>>>>>>> " The key is pretty static in these tests, so I have also tried
>>>>>>>>>>> forcing the partition count (50 on a 16 core per node cluster) and 
>>>>>>>>>>> also
>>>>>>>>>>> repartitioning, but every time all the jobs are scheduled to run on 
>>>>>>>>>>> one
>>>>>>>>>>> node."
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> What is the key?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> HTH
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>    view my Linkedin profile
>>>>>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all
>>>>>>>>>>> responsibility for any loss, damage or destruction of data or any 
>>>>>>>>>>> other
>>>>>>>>>>> property which may arise from relying on this email's technical 
>>>>>>>>>>> content is
>>>>>>>>>>> explicitly disclaimed. The author will in no case be liable for any
>>>>>>>>>>> monetary damages arising from such loss, damage or destruction.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, 9 Jun 2021 at 15:23, Tom Barber <t...@spicule.co.uk>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Interesting Sean thanks for that insight, I wasn't aware of
>>>>>>>>>>>> that fact, I assume the .persist() at the end of that line doesn't 
>>>>>>>>>>>> do it?
>>>>>>>>>>>>
>>>>>>>>>>>> I believe, looking at the output in the SparkUI, it gets to
>>>>>>>>>>>> https://github.com/USCDataScience/sparkler/blob/master/sparkler-core/sparkler-app/src/main/scala/edu/usc/irds/sparkler/pipeline/Crawler.scala#L254
>>>>>>>>>>>> and calls the context runJob.
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Jun 9, 2021 at 2:07 PM Sean Owen <sro...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> All these configurations don't matter at all if this is
>>>>>>>>>>>>> executing on the driver.
>>>>>>>>>>>>> Returning an Iterator in flatMap is fine though it 'delays'
>>>>>>>>>>>>> execution until that iterator is evaluated by something, which is 
>>>>>>>>>>>>> normally
>>>>>>>>>>>>> fine.
>>>>>>>>>>>>> Does creating this FairFetcher do anything by itself? you're
>>>>>>>>>>>>> just returning an iterator that creates them here.
>>>>>>>>>>>>> How do you actually trigger an action here? the code snippet
>>>>>>>>>>>>> itself doesn't trigger anything.
>>>>>>>>>>>>> I think we need more info about what else is happening in the
>>>>>>>>>>>>> code.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Jun 9, 2021 at 6:30 AM Tom Barber <t...@spicule.co.uk>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Yeah so if I update the FairFetcher to return a seq it makes
>>>>>>>>>>>>>> no real difference.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Here's an image of what I'm seeing just for reference:
>>>>>>>>>>>>>> https://pasteboard.co/K5NFrz7.png
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Because this is databricks I don't have an actual spark
>>>>>>>>>>>>>> submit command but it looks like this:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> curl xxxx -d
>>>>>>>>>>>>>> '{"new_cluster":{"spark_conf":{"spark.executor.extraJavaOptions":"-Dpf4j.pluginsDir=/dbfs/FileStore/bcf/plugins/",
>>>>>>>>>>>>>> "spark.task.cpus":"16"},
>>>>>>>>>>>>>> "spark_version":"8.3.x-scala2.12","aws_attributes":{"availability":"SPOT_WITH_FALLBACK","first_on_demand":1,"zone_id":"us-west-2c"},"node_type_id":"c5d.4xlarge","init_scripts":[{"dbfs":{"destination":"dbfs:/FileStore/crawlinit.sh"}}],"num_workers":3},"spark_submit_task":{"parameters":["--driver-java-options",
>>>>>>>>>>>>>> "-Dpf4j.pluginsDir=/dbfs/FileStore/bcf/plugins/", 
>>>>>>>>>>>>>> "--driver-memory", "10g",
>>>>>>>>>>>>>> "--executor-memory", "10g",
>>>>>>>>>>>>>> "--class","edu.usc.irds.sparkler.Main","dbfs:/FileStore/bcf/sparkler7.jar","crawl","-id","mytestcrawl11",
>>>>>>>>>>>>>> "-tn", "5000", "-co",
>>>>>>>>>>>>>> "{\"plugins.active\":[\"urlfilter-regex\",\"urlfilter-samehost\",\"fetcher-chrome\"],\"plugins\":{\"fetcher.chrome\":{\"chrome.dns\":\"local\"}}}"]},"run_name":"testsubmi3t"}'
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I deliberately pinned spark.task.cpus to 16 to stop it
>>>>>>>>>>>>>> swamping the driver trying to run all the tasks in parallel on 
>>>>>>>>>>>>>> the one
>>>>>>>>>>>>>> node, but again I've got 50 tasks queued up all running on the 
>>>>>>>>>>>>>> single node.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Jun 9, 2021 at 12:01 PM Tom Barber <t...@spicule.co.uk>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I've not run it yet, but I've stuck a toSeq on the end, but
>>>>>>>>>>>>>>> in reality a Seq just inherits Iterator, right?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Flatmap does return a RDD[CrawlData] unless my IDE is lying
>>>>>>>>>>>>>>> to me.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Tom
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Wed, Jun 9, 2021 at 10:54 AM Tom Barber <
>>>>>>>>>>>>>>> t...@spicule.co.uk> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Interesting Jayesh, thanks, I will test.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> All this code is inherited and it runs, but I don't think
>>>>>>>>>>>>>>>> its been tested in a distributed context for about 5 years, 
>>>>>>>>>>>>>>>> but yeah I need
>>>>>>>>>>>>>>>> to get this pushed down, so I'm happy to try anything! :)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Tom
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Wed, Jun 9, 2021 at 3:37 AM Lalwani, Jayesh <
>>>>>>>>>>>>>>>> jlalw...@amazon.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> flatMap is supposed to return Seq, not Iterator. You are
>>>>>>>>>>>>>>>>> returning a class that implements Iterator. I have a hunch 
>>>>>>>>>>>>>>>>> that's what's
>>>>>>>>>>>>>>>>> causing the confusion. flatMap is returning a 
>>>>>>>>>>>>>>>>> RDD[FairFetcher] not
>>>>>>>>>>>>>>>>> RDD[CrawlData]. Do you intend it to be RDD[CrawlData]? You 
>>>>>>>>>>>>>>>>> might want to
>>>>>>>>>>>>>>>>> call toSeq on FairFetcher.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On 6/8/21, 10:10 PM, "Tom Barber" <
>>>>>>>>>>>>>>>>> magicaltr...@apache.org> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>     CAUTION: This email originated from outside of the
>>>>>>>>>>>>>>>>> organization. Do not click links or open attachments unless 
>>>>>>>>>>>>>>>>> you can confirm
>>>>>>>>>>>>>>>>> the sender and know the content is safe.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>     For anyone interested here's the execution logs up
>>>>>>>>>>>>>>>>> until the point where it actually kicks off the workload in 
>>>>>>>>>>>>>>>>> question:
>>>>>>>>>>>>>>>>> https://gist.github.com/buggtb/a9e0445f24182bc8eedfe26c0f07a473
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>     On 2021/06/09 01:52:39, Tom Barber <
>>>>>>>>>>>>>>>>> magicaltr...@apache.org> wrote:
>>>>>>>>>>>>>>>>>     > ExecutorID says driver, and looking at the IP
>>>>>>>>>>>>>>>>> addresses its running on its not any of the worker ip's.
>>>>>>>>>>>>>>>>>     >
>>>>>>>>>>>>>>>>>     > I forcibly told it to create 50, but they'd all end
>>>>>>>>>>>>>>>>> up running in the same place.
>>>>>>>>>>>>>>>>>     >
>>>>>>>>>>>>>>>>>     > Working on some other ideas, I set spark.task.cpus
>>>>>>>>>>>>>>>>> to 16 to match the nodes whilst still forcing it to 50 
>>>>>>>>>>>>>>>>> partitions
>>>>>>>>>>>>>>>>>     >
>>>>>>>>>>>>>>>>>     > val m = 50
>>>>>>>>>>>>>>>>>     >
>>>>>>>>>>>>>>>>>     > val fetchedRdd = rdd.map(r => (r.getGroup, r))
>>>>>>>>>>>>>>>>>     >         .groupByKey(m).flatMap({ case (grp, rs) =>
>>>>>>>>>>>>>>>>> new FairFetcher(job, rs.iterator, localFetchDelay,
>>>>>>>>>>>>>>>>>     >           FetchFunction, ParseFunction,
>>>>>>>>>>>>>>>>> OutLinkFilterFunction, StatusUpdateSolrTransformer) })
>>>>>>>>>>>>>>>>>     >         .persist()
>>>>>>>>>>>>>>>>>     >
>>>>>>>>>>>>>>>>>     > that sort of thing. But still the tasks are pinned
>>>>>>>>>>>>>>>>> to the driver executor and none of the workers, so I no 
>>>>>>>>>>>>>>>>> longer saturate the
>>>>>>>>>>>>>>>>> master node, but I also have 3 workers just sat there doing 
>>>>>>>>>>>>>>>>> nothing.
>>>>>>>>>>>>>>>>>     >
>>>>>>>>>>>>>>>>>     > On 2021/06/09 01:26:50, Sean Owen <sro...@gmail.com>
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>     > > Are you sure it's on the driver? or just 1
>>>>>>>>>>>>>>>>> executor?
>>>>>>>>>>>>>>>>>     > > how many partitions does the groupByKey produce?
>>>>>>>>>>>>>>>>> that would limit your
>>>>>>>>>>>>>>>>>     > > parallelism no matter what if it's a small number.
>>>>>>>>>>>>>>>>>     > >
>>>>>>>>>>>>>>>>>     > > On Tue, Jun 8, 2021 at 8:07 PM Tom Barber <
>>>>>>>>>>>>>>>>> magicaltr...@apache.org> wrote:
>>>>>>>>>>>>>>>>>     > >
>>>>>>>>>>>>>>>>>     > > > Hi folks,
>>>>>>>>>>>>>>>>>     > > >
>>>>>>>>>>>>>>>>>     > > > Hopefully someone with more Spark experience
>>>>>>>>>>>>>>>>> than me can explain this a
>>>>>>>>>>>>>>>>>     > > > bit.
>>>>>>>>>>>>>>>>>     > > >
>>>>>>>>>>>>>>>>>     > > > I dont' know if this is possible, impossible or
>>>>>>>>>>>>>>>>> just an old design that
>>>>>>>>>>>>>>>>>     > > > could be better.
>>>>>>>>>>>>>>>>>     > > >
>>>>>>>>>>>>>>>>>     > > > I'm running Sparkler as a spark-submit job on a
>>>>>>>>>>>>>>>>> databricks spark cluster
>>>>>>>>>>>>>>>>>     > > > and its getting to this point in the code(
>>>>>>>>>>>>>>>>>     > > >
>>>>>>>>>>>>>>>>> https://github.com/USCDataScience/sparkler/blob/master/sparkler-core/sparkler-app/src/main/scala/edu/usc/irds/sparkler/pipeline/Crawler.scala#L222-L226
>>>>>>>>>>>>>>>>>     > > > )
>>>>>>>>>>>>>>>>>     > > >
>>>>>>>>>>>>>>>>>     > > > val fetchedRdd = rdd.map(r => (r.getGroup, r))
>>>>>>>>>>>>>>>>>     > > >         .groupByKey()
>>>>>>>>>>>>>>>>>     > > >         .flatMap({ case (grp, rs) => new
>>>>>>>>>>>>>>>>> FairFetcher(job, rs.iterator,
>>>>>>>>>>>>>>>>>     > > > localFetchDelay,
>>>>>>>>>>>>>>>>>     > > >           FetchFunction, ParseFunction,
>>>>>>>>>>>>>>>>> OutLinkFilterFunction,
>>>>>>>>>>>>>>>>>     > > > StatusUpdateSolrTransformer) })
>>>>>>>>>>>>>>>>>     > > >         .persist()
>>>>>>>>>>>>>>>>>     > > >
>>>>>>>>>>>>>>>>>     > > > This basically takes the RDD and then runs a web
>>>>>>>>>>>>>>>>> based crawl over each RDD
>>>>>>>>>>>>>>>>>     > > > and returns the results. But when Spark executes
>>>>>>>>>>>>>>>>> it, it runs all the crawls
>>>>>>>>>>>>>>>>>     > > > on the driver node and doesn't distribute them.
>>>>>>>>>>>>>>>>>     > > >
>>>>>>>>>>>>>>>>>     > > > The key is pretty static in these tests, so I
>>>>>>>>>>>>>>>>> have also tried forcing the
>>>>>>>>>>>>>>>>>     > > > partition count (50 on a 16 core per node
>>>>>>>>>>>>>>>>> cluster) and also repartitioning,
>>>>>>>>>>>>>>>>>     > > > but every time all the jobs are scheduled to run
>>>>>>>>>>>>>>>>> on one node.
>>>>>>>>>>>>>>>>>     > > >
>>>>>>>>>>>>>>>>>     > > > What can I do better to distribute the tasks?
>>>>>>>>>>>>>>>>> Because the processing of
>>>>>>>>>>>>>>>>>     > > > the data in the RDD isn't the bottleneck, the
>>>>>>>>>>>>>>>>> fetching of the crawl data is
>>>>>>>>>>>>>>>>>     > > > the bottleneck, but that happens after the code
>>>>>>>>>>>>>>>>> has been assigned to a node.
>>>>>>>>>>>>>>>>>     > > >
>>>>>>>>>>>>>>>>>     > > > Thanks
>>>>>>>>>>>>>>>>>     > > >
>>>>>>>>>>>>>>>>>     > > > Tom
>>>>>>>>>>>>>>>>>     > > >
>>>>>>>>>>>>>>>>>     > > >
>>>>>>>>>>>>>>>>>     > > >
>>>>>>>>>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>>>>>>>>>>     > > > To unsubscribe e-mail:
>>>>>>>>>>>>>>>>> user-unsubscr...@spark.apache.org
>>>>>>>>>>>>>>>>>     > > >
>>>>>>>>>>>>>>>>>     > > >
>>>>>>>>>>>>>>>>>     > >
>>>>>>>>>>>>>>>>>     >
>>>>>>>>>>>>>>>>>     >
>>>>>>>>>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>>>>>>>>>>     > To unsubscribe e-mail:
>>>>>>>>>>>>>>>>> user-unsubscr...@spark.apache.org
>>>>>>>>>>>>>>>>>     >
>>>>>>>>>>>>>>>>>     >
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>>>>>>>>>>     To unsubscribe e-mail:
>>>>>>>>>>>>>>>>> user-unsubscr...@spark.apache.org
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Spicule Limited is registered in England & Wales. Company
>>>>>>>>>>>>>> Number: 09954122. Registered office: First Floor, Telecom House, 
>>>>>>>>>>>>>> 125-135
>>>>>>>>>>>>>> Preston Road, Brighton, England, BN1 6AF. VAT No. 251478891.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> All engagements are subject to Spicule Terms and Conditions
>>>>>>>>>>>>>> of Business. This email and its contents are intended solely for 
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>> individual to whom it is addressed and may contain information 
>>>>>>>>>>>>>> that is
>>>>>>>>>>>>>> confidential, privileged or otherwise protected from disclosure,
>>>>>>>>>>>>>> distributing or copying. Any views or opinions presented in this 
>>>>>>>>>>>>>> email are
>>>>>>>>>>>>>> solely those of the author and do not necessarily represent 
>>>>>>>>>>>>>> those of
>>>>>>>>>>>>>> Spicule Limited. The company accepts no liability for any damage 
>>>>>>>>>>>>>> caused by
>>>>>>>>>>>>>> any virus transmitted by this email. If you have received this 
>>>>>>>>>>>>>> message in
>>>>>>>>>>>>>> error, please notify us immediately by reply email before 
>>>>>>>>>>>>>> deleting it from
>>>>>>>>>>>>>> your system. Service of legal notice cannot be effected on 
>>>>>>>>>>>>>> Spicule Limited
>>>>>>>>>>>>>> by email.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>> Spicule Limited is registered in England & Wales. Company
>>>>>>>>>>>> Number: 09954122. Registered office: First Floor, Telecom House, 
>>>>>>>>>>>> 125-135
>>>>>>>>>>>> Preston Road, Brighton, England, BN1 6AF. VAT No. 251478891.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> All engagements are subject to Spicule Terms and Conditions of
>>>>>>>>>>>> Business. This email and its contents are intended solely for the
>>>>>>>>>>>> individual to whom it is addressed and may contain information 
>>>>>>>>>>>> that is
>>>>>>>>>>>> confidential, privileged or otherwise protected from disclosure,
>>>>>>>>>>>> distributing or copying. Any views or opinions presented in this 
>>>>>>>>>>>> email are
>>>>>>>>>>>> solely those of the author and do not necessarily represent those 
>>>>>>>>>>>> of
>>>>>>>>>>>> Spicule Limited. The company accepts no liability for any damage 
>>>>>>>>>>>> caused by
>>>>>>>>>>>> any virus transmitted by this email. If you have received this 
>>>>>>>>>>>> message in
>>>>>>>>>>>> error, please notify us immediately by reply email before deleting 
>>>>>>>>>>>> it from
>>>>>>>>>>>> your system. Service of legal notice cannot be effected on Spicule 
>>>>>>>>>>>> Limited
>>>>>>>>>>>> by email.
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>> Spicule Limited is registered in England & Wales. Company Number:
>>>>>>>>>> 09954122. Registered office: First Floor, Telecom House, 125-135 
>>>>>>>>>> Preston
>>>>>>>>>> Road, Brighton, England, BN1 6AF. VAT No. 251478891.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> All engagements are subject to Spicule Terms and Conditions of
>>>>>>>>>> Business. This email and its contents are intended solely for the
>>>>>>>>>> individual to whom it is addressed and may contain information that 
>>>>>>>>>> is
>>>>>>>>>> confidential, privileged or otherwise protected from disclosure,
>>>>>>>>>> distributing or copying. Any views or opinions presented in this 
>>>>>>>>>> email are
>>>>>>>>>> solely those of the author and do not necessarily represent those of
>>>>>>>>>> Spicule Limited. The company accepts no liability for any damage 
>>>>>>>>>> caused by
>>>>>>>>>> any virus transmitted by this email. If you have received this 
>>>>>>>>>> message in
>>>>>>>>>> error, please notify us immediately by reply email before deleting 
>>>>>>>>>> it from
>>>>>>>>>> your system. Service of legal notice cannot be effected on Spicule 
>>>>>>>>>> Limited
>>>>>>>>>> by email.
>>>>>>>>>>
>>>>>>>>>
>>>>>>> Spicule Limited is registered in England & Wales. Company Number:
>>>>>>> 09954122. Registered office: First Floor, Telecom House, 125-135 Preston
>>>>>>> Road, Brighton, England, BN1 6AF. VAT No. 251478891.
>>>>>>>
>>>>>>>
>>>>>>> All engagements are subject to Spicule Terms and Conditions of
>>>>>>> Business. This email and its contents are intended solely for the
>>>>>>> individual to whom it is addressed and may contain information that is
>>>>>>> confidential, privileged or otherwise protected from disclosure,
>>>>>>> distributing or copying. Any views or opinions presented in this email 
>>>>>>> are
>>>>>>> solely those of the author and do not necessarily represent those of
>>>>>>> Spicule Limited. The company accepts no liability for any damage caused 
>>>>>>> by
>>>>>>> any virus transmitted by this email. If you have received this message 
>>>>>>> in
>>>>>>> error, please notify us immediately by reply email before deleting it 
>>>>>>> from
>>>>>>> your system. Service of legal notice cannot be effected on Spicule 
>>>>>>> Limited
>>>>>>> by email.
>>>>>>>
>>>>>>
>>>>> Spicule Limited is registered in England & Wales. Company Number:
>>>>> 09954122. Registered office: First Floor, Telecom House, 125-135 Preston
>>>>> Road, Brighton, England, BN1 6AF. VAT No. 251478891.
>>>>>
>>>>>
>>>>> All engagements are subject to Spicule Terms and Conditions of
>>>>> Business. This email and its contents are intended solely for the
>>>>> individual to whom it is addressed and may contain information that is
>>>>> confidential, privileged or otherwise protected from disclosure,
>>>>> distributing or copying. Any views or opinions presented in this email are
>>>>> solely those of the author and do not necessarily represent those of
>>>>> Spicule Limited. The company accepts no liability for any damage caused by
>>>>> any virus transmitted by this email. If you have received this message in
>>>>> error, please notify us immediately by reply email before deleting it from
>>>>> your system. Service of legal notice cannot be effected on Spicule Limited
>>>>> by email.
>>>>>
>>>>
>> Spicule Limited is registered in England & Wales. Company Number:
>> 09954122. Registered office: First Floor, Telecom House, 125-135 Preston
>> Road, Brighton, England, BN1 6AF. VAT No. 251478891.
>>
>>
>> All engagements are subject to Spicule Terms and Conditions of Business.
>> This email and its contents are intended solely for the individual to whom
>> it is addressed and may contain information that is confidential,
>> privileged or otherwise protected from disclosure, distributing or copying.
>> Any views or opinions presented in this email are solely those of the
>> author and do not necessarily represent those of Spicule Limited. The
>> company accepts no liability for any damage caused by any virus transmitted
>> by this email. If you have received this message in error, please notify us
>> immediately by reply email before deleting it from your system. Service of
>> legal notice cannot be effected on Spicule Limited by email.
>>
>

-- 


Spicule Limited is registered in England & Wales. Company Number: 
09954122. Registered office: First Floor, Telecom House, 125-135 Preston 
Road, Brighton, England, BN1 6AF. VAT No. 251478891.




All engagements 
are subject to Spicule Terms and Conditions of Business. This email and its 
contents are intended solely for the individual to whom it is addressed and 
may contain information that is confidential, privileged or otherwise 
protected from disclosure, distributing or copying. Any views or opinions 
presented in this email are solely those of the author and do not 
necessarily represent those of Spicule Limited. The company accepts no 
liability for any damage caused by any virus transmitted by this email. If 
you have received this message in error, please notify us immediately by 
reply email before deleting it from your system. Service of legal notice 
cannot be effected on Spicule Limited by email.

Reply via email to