persist() doesn't even persist by itself - just sets it to be persisted
when it's executed.
key doesn't matter here, nor partitioning, if this code is trying to run
things on the driver inadvertently.
I don't quite grok what the OSS code you linked to is doing, but it's
running some supplied functions very directly and at a low-level with
sc.runJob, which might be part of how this can do something unusual.
How do you trigger any action? what happens after persist()

On Wed, Jun 9, 2021 at 9:48 AM Tom Barber <t...@spicule.co.uk> wrote:

> Thanks Mich,
>
> The key on the first iteration is just a string that says "seed", so it is
> indeed on the first crawl the same across all of the groups. Further
> iterations would be different, but I'm not there yet. I was under the
> impression that a repartition would distribute the tasks. Is that not the
> case?
>
> Thanks
>
> Tom
>
> On Wed, Jun 9, 2021 at 3:44 PM Mich Talebzadeh <mich.talebza...@gmail.com>
> wrote:
>
>> Hi Tom,
>>
>> Persist() here simply means persist to memory). That is all. You can
>> check UI tab on storage
>>
>>
>> https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence
>>
>> So I gather the code is stuck from your link in the driver. You stated
>> that you tried repartition() but it did not do anything,
>>
>> Further you stated :
>>
>> " The key is pretty static in these tests, so I have also tried forcing
>> the partition count (50 on a 16 core per node cluster) and also
>> repartitioning, but every time all the jobs are scheduled to run on one
>> node."
>>
>>
>> What is the key?
>>
>>
>> HTH
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Wed, 9 Jun 2021 at 15:23, Tom Barber <t...@spicule.co.uk> wrote:
>>
>>> Interesting Sean thanks for that insight, I wasn't aware of that fact, I
>>> assume the .persist() at the end of that line doesn't do it?
>>>
>>> I believe, looking at the output in the SparkUI, it gets to
>>> https://github.com/USCDataScience/sparkler/blob/master/sparkler-core/sparkler-app/src/main/scala/edu/usc/irds/sparkler/pipeline/Crawler.scala#L254
>>> and calls the context runJob.
>>>
>>> On Wed, Jun 9, 2021 at 2:07 PM Sean Owen <sro...@gmail.com> wrote:
>>>
>>>> All these configurations don't matter at all if this is executing on
>>>> the driver.
>>>> Returning an Iterator in flatMap is fine though it 'delays' execution
>>>> until that iterator is evaluated by something, which is normally fine.
>>>> Does creating this FairFetcher do anything by itself? you're just
>>>> returning an iterator that creates them here.
>>>> How do you actually trigger an action here? the code snippet itself
>>>> doesn't trigger anything.
>>>> I think we need more info about what else is happening in the code.
>>>>
>>>> On Wed, Jun 9, 2021 at 6:30 AM Tom Barber <t...@spicule.co.uk> wrote:
>>>>
>>>>> Yeah so if I update the FairFetcher to return a seq it makes no real
>>>>> difference.
>>>>>
>>>>> Here's an image of what I'm seeing just for reference:
>>>>> https://pasteboard.co/K5NFrz7.png
>>>>>
>>>>> Because this is databricks I don't have an actual spark submit command
>>>>> but it looks like this:
>>>>>
>>>>> curl xxxx -d
>>>>> '{"new_cluster":{"spark_conf":{"spark.executor.extraJavaOptions":"-Dpf4j.pluginsDir=/dbfs/FileStore/bcf/plugins/",
>>>>> "spark.task.cpus":"16"},
>>>>> "spark_version":"8.3.x-scala2.12","aws_attributes":{"availability":"SPOT_WITH_FALLBACK","first_on_demand":1,"zone_id":"us-west-2c"},"node_type_id":"c5d.4xlarge","init_scripts":[{"dbfs":{"destination":"dbfs:/FileStore/crawlinit.sh"}}],"num_workers":3},"spark_submit_task":{"parameters":["--driver-java-options",
>>>>> "-Dpf4j.pluginsDir=/dbfs/FileStore/bcf/plugins/", "--driver-memory", 
>>>>> "10g",
>>>>> "--executor-memory", "10g",
>>>>> "--class","edu.usc.irds.sparkler.Main","dbfs:/FileStore/bcf/sparkler7.jar","crawl","-id","mytestcrawl11",
>>>>> "-tn", "5000", "-co",
>>>>> "{\"plugins.active\":[\"urlfilter-regex\",\"urlfilter-samehost\",\"fetcher-chrome\"],\"plugins\":{\"fetcher.chrome\":{\"chrome.dns\":\"local\"}}}"]},"run_name":"testsubmi3t"}'
>>>>>
>>>>> I deliberately pinned spark.task.cpus to 16 to stop it swamping the
>>>>> driver trying to run all the tasks in parallel on the one node, but again
>>>>> I've got 50 tasks queued up all running on the single node.
>>>>>
>>>>> On Wed, Jun 9, 2021 at 12:01 PM Tom Barber <t...@spicule.co.uk> wrote:
>>>>>
>>>>>> I've not run it yet, but I've stuck a toSeq on the end, but in
>>>>>> reality a Seq just inherits Iterator, right?
>>>>>>
>>>>>> Flatmap does return a RDD[CrawlData] unless my IDE is lying to me.
>>>>>>
>>>>>> Tom
>>>>>>
>>>>>> On Wed, Jun 9, 2021 at 10:54 AM Tom Barber <t...@spicule.co.uk> wrote:
>>>>>>
>>>>>>> Interesting Jayesh, thanks, I will test.
>>>>>>>
>>>>>>> All this code is inherited and it runs, but I don't think its been
>>>>>>> tested in a distributed context for about 5 years, but yeah I need to 
>>>>>>> get
>>>>>>> this pushed down, so I'm happy to try anything! :)
>>>>>>>
>>>>>>> Tom
>>>>>>>
>>>>>>> On Wed, Jun 9, 2021 at 3:37 AM Lalwani, Jayesh <jlalw...@amazon.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> flatMap is supposed to return Seq, not Iterator. You are returning
>>>>>>>> a class that implements Iterator. I have a hunch that's what's causing 
>>>>>>>> the
>>>>>>>> confusion. flatMap is returning a RDD[FairFetcher] not RDD[CrawlData]. 
>>>>>>>> Do
>>>>>>>> you intend it to be RDD[CrawlData]? You might want to call toSeq on
>>>>>>>> FairFetcher.
>>>>>>>>
>>>>>>>> On 6/8/21, 10:10 PM, "Tom Barber" <magicaltr...@apache.org> wrote:
>>>>>>>>
>>>>>>>>     CAUTION: This email originated from outside of the
>>>>>>>> organization. Do not click links or open attachments unless you can 
>>>>>>>> confirm
>>>>>>>> the sender and know the content is safe.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>     For anyone interested here's the execution logs up until the
>>>>>>>> point where it actually kicks off the workload in question:
>>>>>>>> https://gist.github.com/buggtb/a9e0445f24182bc8eedfe26c0f07a473
>>>>>>>>
>>>>>>>>     On 2021/06/09 01:52:39, Tom Barber <magicaltr...@apache.org>
>>>>>>>> wrote:
>>>>>>>>     > ExecutorID says driver, and looking at the IP addresses its
>>>>>>>> running on its not any of the worker ip's.
>>>>>>>>     >
>>>>>>>>     > I forcibly told it to create 50, but they'd all end up
>>>>>>>> running in the same place.
>>>>>>>>     >
>>>>>>>>     > Working on some other ideas, I set spark.task.cpus to 16 to
>>>>>>>> match the nodes whilst still forcing it to 50 partitions
>>>>>>>>     >
>>>>>>>>     > val m = 50
>>>>>>>>     >
>>>>>>>>     > val fetchedRdd = rdd.map(r => (r.getGroup, r))
>>>>>>>>     >         .groupByKey(m).flatMap({ case (grp, rs) => new
>>>>>>>> FairFetcher(job, rs.iterator, localFetchDelay,
>>>>>>>>     >           FetchFunction, ParseFunction,
>>>>>>>> OutLinkFilterFunction, StatusUpdateSolrTransformer) })
>>>>>>>>     >         .persist()
>>>>>>>>     >
>>>>>>>>     > that sort of thing. But still the tasks are pinned to the
>>>>>>>> driver executor and none of the workers, so I no longer saturate the 
>>>>>>>> master
>>>>>>>> node, but I also have 3 workers just sat there doing nothing.
>>>>>>>>     >
>>>>>>>>     > On 2021/06/09 01:26:50, Sean Owen <sro...@gmail.com> wrote:
>>>>>>>>     > > Are you sure it's on the driver? or just 1 executor?
>>>>>>>>     > > how many partitions does the groupByKey produce? that would
>>>>>>>> limit your
>>>>>>>>     > > parallelism no matter what if it's a small number.
>>>>>>>>     > >
>>>>>>>>     > > On Tue, Jun 8, 2021 at 8:07 PM Tom Barber <
>>>>>>>> magicaltr...@apache.org> wrote:
>>>>>>>>     > >
>>>>>>>>     > > > Hi folks,
>>>>>>>>     > > >
>>>>>>>>     > > > Hopefully someone with more Spark experience than me can
>>>>>>>> explain this a
>>>>>>>>     > > > bit.
>>>>>>>>     > > >
>>>>>>>>     > > > I dont' know if this is possible, impossible or just an
>>>>>>>> old design that
>>>>>>>>     > > > could be better.
>>>>>>>>     > > >
>>>>>>>>     > > > I'm running Sparkler as a spark-submit job on a
>>>>>>>> databricks spark cluster
>>>>>>>>     > > > and its getting to this point in the code(
>>>>>>>>     > > >
>>>>>>>> https://github.com/USCDataScience/sparkler/blob/master/sparkler-core/sparkler-app/src/main/scala/edu/usc/irds/sparkler/pipeline/Crawler.scala#L222-L226
>>>>>>>>     > > > )
>>>>>>>>     > > >
>>>>>>>>     > > > val fetchedRdd = rdd.map(r => (r.getGroup, r))
>>>>>>>>     > > >         .groupByKey()
>>>>>>>>     > > >         .flatMap({ case (grp, rs) => new FairFetcher(job,
>>>>>>>> rs.iterator,
>>>>>>>>     > > > localFetchDelay,
>>>>>>>>     > > >           FetchFunction, ParseFunction,
>>>>>>>> OutLinkFilterFunction,
>>>>>>>>     > > > StatusUpdateSolrTransformer) })
>>>>>>>>     > > >         .persist()
>>>>>>>>     > > >
>>>>>>>>     > > > This basically takes the RDD and then runs a web based
>>>>>>>> crawl over each RDD
>>>>>>>>     > > > and returns the results. But when Spark executes it, it
>>>>>>>> runs all the crawls
>>>>>>>>     > > > on the driver node and doesn't distribute them.
>>>>>>>>     > > >
>>>>>>>>     > > > The key is pretty static in these tests, so I have also
>>>>>>>> tried forcing the
>>>>>>>>     > > > partition count (50 on a 16 core per node cluster) and
>>>>>>>> also repartitioning,
>>>>>>>>     > > > but every time all the jobs are scheduled to run on one
>>>>>>>> node.
>>>>>>>>     > > >
>>>>>>>>     > > > What can I do better to distribute the tasks? Because the
>>>>>>>> processing of
>>>>>>>>     > > > the data in the RDD isn't the bottleneck, the fetching of
>>>>>>>> the crawl data is
>>>>>>>>     > > > the bottleneck, but that happens after the code has been
>>>>>>>> assigned to a node.
>>>>>>>>     > > >
>>>>>>>>     > > > Thanks
>>>>>>>>     > > >
>>>>>>>>     > > > Tom
>>>>>>>>     > > >
>>>>>>>>     > > >
>>>>>>>>     > > >
>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>     > > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>>>>>     > > >
>>>>>>>>     > > >
>>>>>>>>     > >
>>>>>>>>     >
>>>>>>>>     >
>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>     > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>>>>>     >
>>>>>>>>     >
>>>>>>>>
>>>>>>>>
>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>     To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>> Spicule Limited is registered in England & Wales. Company Number:
>>>>> 09954122. Registered office: First Floor, Telecom House, 125-135 Preston
>>>>> Road, Brighton, England, BN1 6AF. VAT No. 251478891.
>>>>>
>>>>>
>>>>> All engagements are subject to Spicule Terms and Conditions of
>>>>> Business. This email and its contents are intended solely for the
>>>>> individual to whom it is addressed and may contain information that is
>>>>> confidential, privileged or otherwise protected from disclosure,
>>>>> distributing or copying. Any views or opinions presented in this email are
>>>>> solely those of the author and do not necessarily represent those of
>>>>> Spicule Limited. The company accepts no liability for any damage caused by
>>>>> any virus transmitted by this email. If you have received this message in
>>>>> error, please notify us immediately by reply email before deleting it from
>>>>> your system. Service of legal notice cannot be effected on Spicule Limited
>>>>> by email.
>>>>>
>>>>
>>> Spicule Limited is registered in England & Wales. Company Number:
>>> 09954122. Registered office: First Floor, Telecom House, 125-135 Preston
>>> Road, Brighton, England, BN1 6AF. VAT No. 251478891.
>>>
>>>
>>> All engagements are subject to Spicule Terms and Conditions of Business.
>>> This email and its contents are intended solely for the individual to whom
>>> it is addressed and may contain information that is confidential,
>>> privileged or otherwise protected from disclosure, distributing or copying.
>>> Any views or opinions presented in this email are solely those of the
>>> author and do not necessarily represent those of Spicule Limited. The
>>> company accepts no liability for any damage caused by any virus transmitted
>>> by this email. If you have received this message in error, please notify us
>>> immediately by reply email before deleting it from your system. Service of
>>> legal notice cannot be effected on Spicule Limited by email.
>>>
>>
> Spicule Limited is registered in England & Wales. Company Number:
> 09954122. Registered office: First Floor, Telecom House, 125-135 Preston
> Road, Brighton, England, BN1 6AF. VAT No. 251478891.
>
>
> All engagements are subject to Spicule Terms and Conditions of Business.
> This email and its contents are intended solely for the individual to whom
> it is addressed and may contain information that is confidential,
> privileged or otherwise protected from disclosure, distributing or copying.
> Any views or opinions presented in this email are solely those of the
> author and do not necessarily represent those of Spicule Limited. The
> company accepts no liability for any damage caused by any virus transmitted
> by this email. If you have received this message in error, please notify us
> immediately by reply email before deleting it from your system. Service of
> legal notice cannot be effected on Spicule Limited by email.
>

Reply via email to