Re: Distributing a FlatMap across a Spark Cluster

2021-06-08 Thread Lalwani, Jayesh
flatMap is supposed to return Seq, not Iterator. You are returning a class that implements Iterator. I have a hunch that's what's causing the confusion. flatMap is returning a RDD[FairFetcher] not RDD[CrawlData]. Do you intend it to be RDD[CrawlData]? You might want to call toSeq on

Re: Distributing a FlatMap across a Spark Cluster

2021-06-08 Thread Sean Owen
Really weird. flatMap definitely doesn't happen on the driver. My only long-shot theory that I haven't thought through is, what is FairFetcher doing with 'job'? it kind of looks like this is submitting a (driver) Job directly or something into its scheduler which could be .. something but maybe

Re: Distributing a FlatMap across a Spark Cluster

2021-06-08 Thread Tom Barber
For anyone interested here's the execution logs up until the point where it actually kicks off the workload in question: https://gist.github.com/buggtb/a9e0445f24182bc8eedfe26c0f07a473 On 2021/06/09 01:52:39, Tom Barber wrote: > ExecutorID says driver, and looking at the IP addresses its

Re: Distributing a FlatMap across a Spark Cluster

2021-06-08 Thread Tom Barber
ExecutorID says driver, and looking at the IP addresses its running on its not any of the worker ip's. I forcibly told it to create 50, but they'd all end up running in the same place. Working on some other ideas, I set spark.task.cpus to 16 to match the nodes whilst still forcing it to 50

Re: Distributing a FlatMap across a Spark Cluster

2021-06-08 Thread Sean Owen
Are you sure it's on the driver? or just 1 executor? how many partitions does the groupByKey produce? that would limit your parallelism no matter what if it's a small number. On Tue, Jun 8, 2021 at 8:07 PM Tom Barber wrote: > Hi folks, > > Hopefully someone with more Spark experience than me

Distributing a FlatMap across a Spark Cluster

2021-06-08 Thread Tom Barber
Hi folks, Hopefully someone with more Spark experience than me can explain this a bit. I dont' know if this is possible, impossible or just an old design that could be better. I'm running Sparkler as a spark-submit job on a databricks spark cluster and its getting to this point in the

Re: Problem in Restoring ML Pipeline with UDF

2021-06-08 Thread Sean Owen
It's a little bit of a guess, but the class name $line103090609224.$read$FeatureModder looks like something generated by the shell. I think it's your 'real' classname in this case. If you redefined this later and loaded it you may not find it matches up. Can you declare this in a package? On Tue,

Problem in Restoring ML Pipeline with UDF

2021-06-08 Thread Artemis User
We have a feature engineering transformer defined as a custom class with UDF as follows: class FeatureModder extends Transformer with DefaultParamsWritable with DefaultParamsReadable[FeatureModder] {     val uid: String = "FeatureModder"+randomUUID     final val inputCol: Param[String] = new

Re: class KafkaCluster related errors

2021-06-08 Thread Mich Talebzadeh
Hi Kiran, I don't seem to have a reference to handling offsets in my old code. However, in Spark structured streaming (SSS) I handle it using a reference to checkpointLocation as below: (this is in Python) checkpoint_path = "file:///ssd/hduser/avgtemperature/chkpt" result =