Re: Creating remote tables using PySpark
Okay that was some caching issue. Now there is a shared mount point between the place the pyspark code is executed and the spark nodes it runs. Hrmph, I was hoping that wouldn't be the case. Fair enough! On Thu, Mar 7, 2024 at 11:23 PM Tom Barber wrote: > Okay interesting, maybe my assumption was incorrect, although I'm still > confused. > > I tried to mount a central mount point that would be the same on my local > machine and the container. Same error although I moved the path to > /tmp/hive/data/hive/ but when I rerun the test code to save a table, > the complaint is still for > > Warehouse Dir: file:/tmp/hive/data/hive/warehouse > Metastore URIs: thrift://192.168.1.245:9083 > Warehouse Dir: file:/tmp/hive/data/hive/warehouse > Metastore URIs: thrift://192.168.1.245:9083 > ERROR FileOutputCommitter: Mkdirs failed to create > file:/data/hive/warehouse/input.db/accounts_20240307_232110_1_0_6_post21_g4fdc321_d20240307/_temporary/0 > > so what is /data/hive even referring to when I print out the spark conf > values and neither now refer to /data/hive/ > > On Thu, Mar 7, 2024 at 9:49 PM Tom Barber wrote: > >> Wonder if anyone can just sort my brain out here as to whats possible or >> not. >> >> I have a container running Spark, with Hive and a ThriftServer. I want to >> run code against it remotely. >> >> If I take something simple like this >> >> from pyspark.sql import SparkSession >> from pyspark.sql.types import StructType, StructField, IntegerType, >> StringType >> >> # Initialize SparkSession >> spark = SparkSession.builder \ >> .appName("ShowDatabases") \ >> .master("spark://192.168.1.245:7077") \ >> .config("spark.sql.warehouse.dir", "file:/data/hive/warehouse") \ >> .config("hive.metastore.uris","thrift://192.168.1.245:9083")\ >> .enableHiveSupport() \ >> .getOrCreate() >> >> # Define schema of the DataFrame >> schema = StructType([ >> StructField("id", IntegerType(), True), >> StructField("name", StringType(), True) >> ]) >> >> # Data to be converted into a DataFrame >> data = [(1, "John Doe"), (2, "Jane Doe"), (3, "Mike Johnson")] >> >> # Create DataFrame >> df = spark.createDataFrame(data, schema) >> >> # Show the DataFrame (optional, for verification) >> df.show() >> >> # Save the DataFrame to a table named "my_table" >> df.write.mode("overwrite").saveAsTable("my_table") >> >> # Stop the SparkSession >> spark.stop() >> >> When I run it in the container it runs fine, but when I run it remotely >> it says: >> >> : java.io.FileNotFoundException: File >> file:/data/hive/warehouse/my_table/_temporary/0 does not exist >> at >> org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:597) >> at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972) >> at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014) >> at >> org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761) >> at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972) >> at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014) >> at >> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:334) >> at >> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:404) >> at >> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:377) >> at >> org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48) >> at >> org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:192) >> >> My assumption is that its trying to look on my local machine for >> /data/hive/warehouse and failing because on the remote box I can see those >> folders. >> >> So the question is, if you're not backing it with hadoop or something do >> you have to mount the drive in the same place on the computer running the >> pyspark? Or am I missing a config option somewhere? >> >> Thanks! >> >
Re: Creating remote tables using PySpark
Okay interesting, maybe my assumption was incorrect, although I'm still confused. I tried to mount a central mount point that would be the same on my local machine and the container. Same error although I moved the path to /tmp/hive/data/hive/ but when I rerun the test code to save a table, the complaint is still for Warehouse Dir: file:/tmp/hive/data/hive/warehouse Metastore URIs: thrift://192.168.1.245:9083 Warehouse Dir: file:/tmp/hive/data/hive/warehouse Metastore URIs: thrift://192.168.1.245:9083 ERROR FileOutputCommitter: Mkdirs failed to create file:/data/hive/warehouse/input.db/accounts_20240307_232110_1_0_6_post21_g4fdc321_d20240307/_temporary/0 so what is /data/hive even referring to when I print out the spark conf values and neither now refer to /data/hive/ On Thu, Mar 7, 2024 at 9:49 PM Tom Barber wrote: > Wonder if anyone can just sort my brain out here as to whats possible or > not. > > I have a container running Spark, with Hive and a ThriftServer. I want to > run code against it remotely. > > If I take something simple like this > > from pyspark.sql import SparkSession > from pyspark.sql.types import StructType, StructField, IntegerType, > StringType > > # Initialize SparkSession > spark = SparkSession.builder \ > .appName("ShowDatabases") \ > .master("spark://192.168.1.245:7077") \ > .config("spark.sql.warehouse.dir", "file:/data/hive/warehouse") \ > .config("hive.metastore.uris","thrift://192.168.1.245:9083")\ > .enableHiveSupport() \ > .getOrCreate() > > # Define schema of the DataFrame > schema = StructType([ > StructField("id", IntegerType(), True), > StructField("name", StringType(), True) > ]) > > # Data to be converted into a DataFrame > data = [(1, "John Doe"), (2, "Jane Doe"), (3, "Mike Johnson")] > > # Create DataFrame > df = spark.createDataFrame(data, schema) > > # Show the DataFrame (optional, for verification) > df.show() > > # Save the DataFrame to a table named "my_table" > df.write.mode("overwrite").saveAsTable("my_table") > > # Stop the SparkSession > spark.stop() > > When I run it in the container it runs fine, but when I run it remotely it > says: > > : java.io.FileNotFoundException: File > file:/data/hive/warehouse/my_table/_temporary/0 does not exist > at > org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:597) > at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972) > at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014) > at > org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761) > at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972) > at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014) > at > org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:334) > at > org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:404) > at > org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:377) > at > org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48) > at > org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:192) > > My assumption is that its trying to look on my local machine for > /data/hive/warehouse and failing because on the remote box I can see those > folders. > > So the question is, if you're not backing it with hadoop or something do > you have to mount the drive in the same place on the computer running the > pyspark? Or am I missing a config option somewhere? > > Thanks! >
Creating remote tables using PySpark
Wonder if anyone can just sort my brain out here as to whats possible or not. I have a container running Spark, with Hive and a ThriftServer. I want to run code against it remotely. If I take something simple like this from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, IntegerType, StringType # Initialize SparkSession spark = SparkSession.builder \ .appName("ShowDatabases") \ .master("spark://192.168.1.245:7077") \ .config("spark.sql.warehouse.dir", "file:/data/hive/warehouse") \ .config("hive.metastore.uris","thrift://192.168.1.245:9083")\ .enableHiveSupport() \ .getOrCreate() # Define schema of the DataFrame schema = StructType([ StructField("id", IntegerType(), True), StructField("name", StringType(), True) ]) # Data to be converted into a DataFrame data = [(1, "John Doe"), (2, "Jane Doe"), (3, "Mike Johnson")] # Create DataFrame df = spark.createDataFrame(data, schema) # Show the DataFrame (optional, for verification) df.show() # Save the DataFrame to a table named "my_table" df.write.mode("overwrite").saveAsTable("my_table") # Stop the SparkSession spark.stop() When I run it in the container it runs fine, but when I run it remotely it says: : java.io.FileNotFoundException: File file:/data/hive/warehouse/my_table/_temporary/0 does not exist at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:597) at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972) at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014) at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761) at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972) at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:334) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:404) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:377) at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48) at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:192) My assumption is that its trying to look on my local machine for /data/hive/warehouse and failing because on the remote box I can see those folders. So the question is, if you're not backing it with hadoop or something do you have to mount the drive in the same place on the computer running the pyspark? Or am I missing a config option somewhere? Thanks!
Re: Distributing a FlatMap across a Spark Cluster
Looks like repartitioning was my friend, seems to be distributed across the cluster now. All good. Thanks! On Wed, Jun 23, 2021 at 2:18 PM Tom Barber wrote: > Okay so I tried another idea which was to use a real simple class to drive > a mapPartitions... because logic in my head seems to suggest that I want to > map my partitions... > > @SerialVersionUID(100L) > class RunCrawl extends Serializable{ > def mapCrawl(x: Iterator[(String, Iterable[Resource])], job: > SparklerJob): Iterator[CrawlData] = { > val m = 1000 > x.flatMap({case (grp, rs) => new FairFetcher(job, rs.iterator, m, > FetchFunction, ParseFunction, OutLinkFilterFunction, > StatusUpdateSolrTransformer)}) > } > > def runCrawl(f: RDD[(String, Iterable[Resource])], job: SparklerJob): > RDD[CrawlData] = { > f.mapPartitions( x => mapCrawl(x, job)) > > } > > } > > That is what it looks like. But the task execution window in the cluster > looks the same: > > https://pasteboard.co/K7WrBnV.png > > 1 task on a single node. > > I feel like I'm missing something obvious here about either > > a) how spark works > b) how it divides up partitions to tasks > c) the fact its a POJO and not a file of stuff. > > Or probably some of all 3. > > Tom > > On Wed, Jun 23, 2021 at 11:44 AM Tom Barber wrote: > >> (I should point out that I'm diagnosing this by looking at the active >> tasks https://pasteboard.co/K7VryDJ.png, if I'm reading it incorrectly, >> let me know) >> >> On Wed, Jun 23, 2021 at 11:38 AM Tom Barber wrote: >> >>> Uff hello fine people. >>> >>> So the cause of the above issue was, unsurprisingly, human error. I >>> found a local[*] spark master config which gazumped my own one so >>> mystery solved. But I have another question, that is still the crux of this >>> problem: >>> >>> Here's a bit of trimmed code, that I'm currently testing with. I >>> deliberately stuck in a repartition(50), just to force it to, what I >>> believe was chunk it up and distribute it. Which is all good. >>> >>> override def run(): Unit = { >>> ... >>> >>> val rdd = new MemexCrawlDbRDD(sc, job, maxGroups = topG, topN = topN) >>> val f = rdd.map(r => (r.getGroup, r)) >>> .groupByKey().repartition(50); >>> >>> val c = f.getNumPartitions >>> val fetchedRdd = f.flatMap({ case (grp, rs) => new FairFetcher(job, >>> rs.iterator, localFetchDelay, >>> FetchFunction, ParseFunction, OutLinkFilterFunction, >>> StatusUpdateSolrTransformer) }) >>> .persist() >>> >>> val d = fetchedRdd.getNumPartitions >>> >>> ... >>> >>> val scoredRdd = score(fetchedRdd) >>> >>> ... >>> >>> } >>> >>> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = { >>> val job = this.job.asInstanceOf[SparklerJob] >>> >>> val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d)) >>> >>> val scoreUpdateRdd: RDD[SolrInputDocument] = scoredRdd.map(d => >>> ScoreUpdateSolrTransformer(d)) >>> val scoreUpdateFunc = new SolrStatusUpdate(job) >>> sc.runJob(scoreUpdateRdd, scoreUpdateFunc) >>> >>> } >>> >>> >>> Basically for anyone new to this, the business logic lives inside the >>> FairFetcher and I need that distributed over all the nodes in spark cluster. >>> >>> Here's a quick illustration of what I'm seeing: >>> https://pasteboard.co/K7VovBO.png >>> >>> It chunks up to code and distributes the tasks across the cluster, but >>> that occurs _prior_ to the business logic in the FlatMap being executed. >>> >>> So specifically, has anyone got any ideas about how to split that >>> flatmap operation up so the RDD processing runs across the nodes, not >>> limited to a single node? >>> >>> Thanks for all your help so far, >>> >>> Tom >>> >>> On Wed, Jun 9, 2021 at 8:08 PM Tom Barber wrote: >>> >>>> Ah no sorry, so in the load image, the crawl has just kicked off on the >>>> driver node which is why its flagged red and the load is spiking. >>>> https://pasteboard.co/K5QHOJN.png here's the cluster now its been >>>> running a while. The red node is still (and is always every time I tested >>>> it) the driver node. >>>> >>>> Tom >>>> >>>> >&g
Re: Distributing a FlatMap across a Spark Cluster
Okay so I tried another idea which was to use a real simple class to drive a mapPartitions... because logic in my head seems to suggest that I want to map my partitions... @SerialVersionUID(100L) class RunCrawl extends Serializable{ def mapCrawl(x: Iterator[(String, Iterable[Resource])], job: SparklerJob): Iterator[CrawlData] = { val m = 1000 x.flatMap({case (grp, rs) => new FairFetcher(job, rs.iterator, m, FetchFunction, ParseFunction, OutLinkFilterFunction, StatusUpdateSolrTransformer)}) } def runCrawl(f: RDD[(String, Iterable[Resource])], job: SparklerJob): RDD[CrawlData] = { f.mapPartitions( x => mapCrawl(x, job)) } } That is what it looks like. But the task execution window in the cluster looks the same: https://pasteboard.co/K7WrBnV.png 1 task on a single node. I feel like I'm missing something obvious here about either a) how spark works b) how it divides up partitions to tasks c) the fact its a POJO and not a file of stuff. Or probably some of all 3. Tom On Wed, Jun 23, 2021 at 11:44 AM Tom Barber wrote: > (I should point out that I'm diagnosing this by looking at the active > tasks https://pasteboard.co/K7VryDJ.png, if I'm reading it incorrectly, > let me know) > > On Wed, Jun 23, 2021 at 11:38 AM Tom Barber wrote: > >> Uff hello fine people. >> >> So the cause of the above issue was, unsurprisingly, human error. I found >> a local[*] spark master config which gazumped my own one so mystery >> solved. But I have another question, that is still the crux of this problem: >> >> Here's a bit of trimmed code, that I'm currently testing with. I >> deliberately stuck in a repartition(50), just to force it to, what I >> believe was chunk it up and distribute it. Which is all good. >> >> override def run(): Unit = { >> ... >> >> val rdd = new MemexCrawlDbRDD(sc, job, maxGroups = topG, topN = topN) >> val f = rdd.map(r => (r.getGroup, r)) >> .groupByKey().repartition(50); >> >> val c = f.getNumPartitions >> val fetchedRdd = f.flatMap({ case (grp, rs) => new FairFetcher(job, >> rs.iterator, localFetchDelay, >> FetchFunction, ParseFunction, OutLinkFilterFunction, >> StatusUpdateSolrTransformer) }) >> .persist() >> >> val d = fetchedRdd.getNumPartitions >> >> ... >> >> val scoredRdd = score(fetchedRdd) >> >> ... >> >> } >> >> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = { >> val job = this.job.asInstanceOf[SparklerJob] >> >> val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d)) >> >> val scoreUpdateRdd: RDD[SolrInputDocument] = scoredRdd.map(d => >> ScoreUpdateSolrTransformer(d)) >> val scoreUpdateFunc = new SolrStatusUpdate(job) >> sc.runJob(scoreUpdateRdd, scoreUpdateFunc) >> >> } >> >> >> Basically for anyone new to this, the business logic lives inside the >> FairFetcher and I need that distributed over all the nodes in spark cluster. >> >> Here's a quick illustration of what I'm seeing: >> https://pasteboard.co/K7VovBO.png >> >> It chunks up to code and distributes the tasks across the cluster, but >> that occurs _prior_ to the business logic in the FlatMap being executed. >> >> So specifically, has anyone got any ideas about how to split that flatmap >> operation up so the RDD processing runs across the nodes, not limited to a >> single node? >> >> Thanks for all your help so far, >> >> Tom >> >> On Wed, Jun 9, 2021 at 8:08 PM Tom Barber wrote: >> >>> Ah no sorry, so in the load image, the crawl has just kicked off on the >>> driver node which is why its flagged red and the load is spiking. >>> https://pasteboard.co/K5QHOJN.png here's the cluster now its been >>> running a while. The red node is still (and is always every time I tested >>> it) the driver node. >>> >>> Tom >>> >>> >>> >>> On Wed, Jun 9, 2021 at 8:03 PM Sean Owen wrote: >>> >>>> Where do you see that ... I see 3 executors busy at first. If that's >>>> the crawl then ? >>>> >>>> On Wed, Jun 9, 2021 at 1:59 PM Tom Barber wrote: >>>> >>>>> Yeah :) >>>>> >>>>> But it's all running through the same node. So I can run multiple >>>>> tasks of the same type on the same node(the driver), but I can't run >>>>> multiple tasks on multiple nodes. >>>>> >>>>> On Wed, Jun 9, 2021 at 7:57 PM Sean Owen wrote: >>>
Re: Distributing a FlatMap across a Spark Cluster
(I should point out that I'm diagnosing this by looking at the active tasks https://pasteboard.co/K7VryDJ.png, if I'm reading it incorrectly, let me know) On Wed, Jun 23, 2021 at 11:38 AM Tom Barber wrote: > Uff hello fine people. > > So the cause of the above issue was, unsurprisingly, human error. I found > a local[*] spark master config which gazumped my own one so mystery > solved. But I have another question, that is still the crux of this problem: > > Here's a bit of trimmed code, that I'm currently testing with. I > deliberately stuck in a repartition(50), just to force it to, what I > believe was chunk it up and distribute it. Which is all good. > > override def run(): Unit = { > ... > > val rdd = new MemexCrawlDbRDD(sc, job, maxGroups = topG, topN = topN) > val f = rdd.map(r => (r.getGroup, r)) > .groupByKey().repartition(50); > > val c = f.getNumPartitions > val fetchedRdd = f.flatMap({ case (grp, rs) => new FairFetcher(job, > rs.iterator, localFetchDelay, > FetchFunction, ParseFunction, OutLinkFilterFunction, > StatusUpdateSolrTransformer) }) > .persist() > > val d = fetchedRdd.getNumPartitions > > ... > > val scoredRdd = score(fetchedRdd) > > ... > > } > > def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = { > val job = this.job.asInstanceOf[SparklerJob] > > val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d)) > > val scoreUpdateRdd: RDD[SolrInputDocument] = scoredRdd.map(d => > ScoreUpdateSolrTransformer(d)) > val scoreUpdateFunc = new SolrStatusUpdate(job) > sc.runJob(scoreUpdateRdd, scoreUpdateFunc) > > } > > > Basically for anyone new to this, the business logic lives inside the > FairFetcher and I need that distributed over all the nodes in spark cluster. > > Here's a quick illustration of what I'm seeing: > https://pasteboard.co/K7VovBO.png > > It chunks up to code and distributes the tasks across the cluster, but > that occurs _prior_ to the business logic in the FlatMap being executed. > > So specifically, has anyone got any ideas about how to split that flatmap > operation up so the RDD processing runs across the nodes, not limited to a > single node? > > Thanks for all your help so far, > > Tom > > On Wed, Jun 9, 2021 at 8:08 PM Tom Barber wrote: > >> Ah no sorry, so in the load image, the crawl has just kicked off on the >> driver node which is why its flagged red and the load is spiking. >> https://pasteboard.co/K5QHOJN.png here's the cluster now its been >> running a while. The red node is still (and is always every time I tested >> it) the driver node. >> >> Tom >> >> >> >> On Wed, Jun 9, 2021 at 8:03 PM Sean Owen wrote: >> >>> Where do you see that ... I see 3 executors busy at first. If that's the >>> crawl then ? >>> >>> On Wed, Jun 9, 2021 at 1:59 PM Tom Barber wrote: >>> >>>> Yeah :) >>>> >>>> But it's all running through the same node. So I can run multiple tasks >>>> of the same type on the same node(the driver), but I can't run multiple >>>> tasks on multiple nodes. >>>> >>>> On Wed, Jun 9, 2021 at 7:57 PM Sean Owen wrote: >>>> >>>>> Wait. Isn't that what you were trying to parallelize in the first >>>>> place? >>>>> >>>>> On Wed, Jun 9, 2021 at 1:49 PM Tom Barber wrote: >>>>> >>>>>> Yeah but that something else is the crawl being run, which is >>>>>> triggered from inside the RDDs, because the log output is slowly >>>>>> outputting >>>>>> crawl data. >>>>>> >>>>>> >>>> Spicule Limited is registered in England & Wales. Company Number: >>>> 09954122. Registered office: First Floor, Telecom House, 125-135 Preston >>>> Road, Brighton, England, BN1 6AF. VAT No. 251478891. >>>> >>>> >>>> All engagements are subject to Spicule Terms and Conditions of >>>> Business. This email and its contents are intended solely for the >>>> individual to whom it is addressed and may contain information that is >>>> confidential, privileged or otherwise protected from disclosure, >>>> distributing or copying. Any views or opinions presented in this email are >>>> solely those of the author and do not necessarily represent those of >>>> Spicule Limited. The company accepts no liability for any damage caused by >>>> any virus transmitted b
Re: Distributing a FlatMap across a Spark Cluster
Uff hello fine people. So the cause of the above issue was, unsurprisingly, human error. I found a local[*] spark master config which gazumped my own one so mystery solved. But I have another question, that is still the crux of this problem: Here's a bit of trimmed code, that I'm currently testing with. I deliberately stuck in a repartition(50), just to force it to, what I believe was chunk it up and distribute it. Which is all good. override def run(): Unit = { ... val rdd = new MemexCrawlDbRDD(sc, job, maxGroups = topG, topN = topN) val f = rdd.map(r => (r.getGroup, r)) .groupByKey().repartition(50); val c = f.getNumPartitions val fetchedRdd = f.flatMap({ case (grp, rs) => new FairFetcher(job, rs.iterator, localFetchDelay, FetchFunction, ParseFunction, OutLinkFilterFunction, StatusUpdateSolrTransformer) }) .persist() val d = fetchedRdd.getNumPartitions ... val scoredRdd = score(fetchedRdd) ... } def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = { val job = this.job.asInstanceOf[SparklerJob] val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d)) val scoreUpdateRdd: RDD[SolrInputDocument] = scoredRdd.map(d => ScoreUpdateSolrTransformer(d)) val scoreUpdateFunc = new SolrStatusUpdate(job) sc.runJob(scoreUpdateRdd, scoreUpdateFunc) } Basically for anyone new to this, the business logic lives inside the FairFetcher and I need that distributed over all the nodes in spark cluster. Here's a quick illustration of what I'm seeing: https://pasteboard.co/K7VovBO.png It chunks up to code and distributes the tasks across the cluster, but that occurs _prior_ to the business logic in the FlatMap being executed. So specifically, has anyone got any ideas about how to split that flatmap operation up so the RDD processing runs across the nodes, not limited to a single node? Thanks for all your help so far, Tom On Wed, Jun 9, 2021 at 8:08 PM Tom Barber wrote: > Ah no sorry, so in the load image, the crawl has just kicked off on the > driver node which is why its flagged red and the load is spiking. > https://pasteboard.co/K5QHOJN.png here's the cluster now its been running > a while. The red node is still (and is always every time I tested it) the > driver node. > > Tom > > > > On Wed, Jun 9, 2021 at 8:03 PM Sean Owen wrote: > >> Where do you see that ... I see 3 executors busy at first. If that's the >> crawl then ? >> >> On Wed, Jun 9, 2021 at 1:59 PM Tom Barber wrote: >> >>> Yeah :) >>> >>> But it's all running through the same node. So I can run multiple tasks >>> of the same type on the same node(the driver), but I can't run multiple >>> tasks on multiple nodes. >>> >>> On Wed, Jun 9, 2021 at 7:57 PM Sean Owen wrote: >>> >>>> Wait. Isn't that what you were trying to parallelize in the first place? >>>> >>>> On Wed, Jun 9, 2021 at 1:49 PM Tom Barber wrote: >>>> >>>>> Yeah but that something else is the crawl being run, which is >>>>> triggered from inside the RDDs, because the log output is slowly >>>>> outputting >>>>> crawl data. >>>>> >>>>> >>> Spicule Limited is registered in England & Wales. Company Number: >>> 09954122. Registered office: First Floor, Telecom House, 125-135 Preston >>> Road, Brighton, England, BN1 6AF. VAT No. 251478891. >>> >>> >>> All engagements are subject to Spicule Terms and Conditions of Business. >>> This email and its contents are intended solely for the individual to whom >>> it is addressed and may contain information that is confidential, >>> privileged or otherwise protected from disclosure, distributing or copying. >>> Any views or opinions presented in this email are solely those of the >>> author and do not necessarily represent those of Spicule Limited. The >>> company accepts no liability for any damage caused by any virus transmitted >>> by this email. If you have received this message in error, please notify us >>> immediately by reply email before deleting it from your system. Service of >>> legal notice cannot be effected on Spicule Limited by email. >>> >> -- Spicule Limited is registered in England & Wales. Company Number: 09954122. Registered office: First Floor, Telecom House, 125-135 Preston Road, Brighton, England, BN1 6AF. VAT No. 251478891. All engagements are subject to Spicule Terms and Conditions of Business. This email and its contents are intended solely for the individual to whom it is addressed and may contain information that is confidential, privileged or otherwise protected from disclosure, di
Re: Distributing a FlatMap across a Spark Cluster
Ah no sorry, so in the load image, the crawl has just kicked off on the driver node which is why its flagged red and the load is spiking. https://pasteboard.co/K5QHOJN.png here's the cluster now its been running a while. The red node is still (and is always every time I tested it) the driver node. Tom On Wed, Jun 9, 2021 at 8:03 PM Sean Owen wrote: > Where do you see that ... I see 3 executors busy at first. If that's the > crawl then ? > > On Wed, Jun 9, 2021 at 1:59 PM Tom Barber wrote: > >> Yeah :) >> >> But it's all running through the same node. So I can run multiple tasks >> of the same type on the same node(the driver), but I can't run multiple >> tasks on multiple nodes. >> >> On Wed, Jun 9, 2021 at 7:57 PM Sean Owen wrote: >> >>> Wait. Isn't that what you were trying to parallelize in the first place? >>> >>> On Wed, Jun 9, 2021 at 1:49 PM Tom Barber wrote: >>> >>>> Yeah but that something else is the crawl being run, which is triggered >>>> from inside the RDDs, because the log output is slowly outputting crawl >>>> data. >>>> >>>> >> Spicule Limited is registered in England & Wales. Company Number: >> 09954122. Registered office: First Floor, Telecom House, 125-135 Preston >> Road, Brighton, England, BN1 6AF. VAT No. 251478891. >> >> >> All engagements are subject to Spicule Terms and Conditions of Business. >> This email and its contents are intended solely for the individual to whom >> it is addressed and may contain information that is confidential, >> privileged or otherwise protected from disclosure, distributing or copying. >> Any views or opinions presented in this email are solely those of the >> author and do not necessarily represent those of Spicule Limited. The >> company accepts no liability for any damage caused by any virus transmitted >> by this email. If you have received this message in error, please notify us >> immediately by reply email before deleting it from your system. Service of >> legal notice cannot be effected on Spicule Limited by email. >> > -- Spicule Limited is registered in England & Wales. Company Number: 09954122. Registered office: First Floor, Telecom House, 125-135 Preston Road, Brighton, England, BN1 6AF. VAT No. 251478891. All engagements are subject to Spicule Terms and Conditions of Business. This email and its contents are intended solely for the individual to whom it is addressed and may contain information that is confidential, privileged or otherwise protected from disclosure, distributing or copying. Any views or opinions presented in this email are solely those of the author and do not necessarily represent those of Spicule Limited. The company accepts no liability for any damage caused by any virus transmitted by this email. If you have received this message in error, please notify us immediately by reply email before deleting it from your system. Service of legal notice cannot be effected on Spicule Limited by email.
Re: Distributing a FlatMap across a Spark Cluster
Yeah :) But it's all running through the same node. So I can run multiple tasks of the same type on the same node(the driver), but I can't run multiple tasks on multiple nodes. On Wed, Jun 9, 2021 at 7:57 PM Sean Owen wrote: > Wait. Isn't that what you were trying to parallelize in the first place? > > On Wed, Jun 9, 2021 at 1:49 PM Tom Barber wrote: > >> Yeah but that something else is the crawl being run, which is triggered >> from inside the RDDs, because the log output is slowly outputting crawl >> data. >> >> -- Spicule Limited is registered in England & Wales. Company Number: 09954122. Registered office: First Floor, Telecom House, 125-135 Preston Road, Brighton, England, BN1 6AF. VAT No. 251478891. All engagements are subject to Spicule Terms and Conditions of Business. This email and its contents are intended solely for the individual to whom it is addressed and may contain information that is confidential, privileged or otherwise protected from disclosure, distributing or copying. Any views or opinions presented in this email are solely those of the author and do not necessarily represent those of Spicule Limited. The company accepts no liability for any damage caused by any virus transmitted by this email. If you have received this message in error, please notify us immediately by reply email before deleting it from your system. Service of legal notice cannot be effected on Spicule Limited by email.
Re: Distributing a FlatMap across a Spark Cluster
Yeah but that something else is the crawl being run, which is triggered from inside the RDDs, because the log output is slowly outputting crawl data. On Wed, 9 Jun 2021, 19:47 Sean Owen, wrote: > That looks like you did some work on the cluster, and now it's stuck doing > something else on the driver - not doing everything on 1 machine. > > On Wed, Jun 9, 2021 at 12:43 PM Tom Barber wrote: > >> And also as this morning: https://pasteboard.co/K5Q9aEf.png >> >> Removing the cpu pins gives me more tasks but as you can see here: >> >> https://pasteboard.co/K5Q9GO0.png >> >> It just loads up a single server. >> >> -- Spicule Limited is registered in England & Wales. Company Number: 09954122. Registered office: First Floor, Telecom House, 125-135 Preston Road, Brighton, England, BN1 6AF. VAT No. 251478891. All engagements are subject to Spicule Terms and Conditions of Business. This email and its contents are intended solely for the individual to whom it is addressed and may contain information that is confidential, privileged or otherwise protected from disclosure, distributing or copying. Any views or opinions presented in this email are solely those of the author and do not necessarily represent those of Spicule Limited. The company accepts no liability for any damage caused by any virus transmitted by this email. If you have received this message in error, please notify us immediately by reply email before deleting it from your system. Service of legal notice cannot be effected on Spicule Limited by email.
Re: Distributing a FlatMap across a Spark Cluster
No, this is an on demand databricks cluster. On Wed, Jun 9, 2021 at 6:54 PM Mich Talebzadeh wrote: > > > Are you running this in Managed Instance Group (MIG)? > > https://cloud.google.com/compute/docs/instance-groups > > >view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Wed, 9 Jun 2021 at 18:43, Tom Barber wrote: > >> And also as this morning: https://pasteboard.co/K5Q9aEf.png >> >> Removing the cpu pins gives me more tasks but as you can see here: >> >> https://pasteboard.co/K5Q9GO0.png >> >> It just loads up a single server. >> >> On Wed, Jun 9, 2021 at 6:32 PM Tom Barber wrote: >> >>> Thanks Chris >>> >>> All the code I have on both sides is as modern as it allows. Running >>> Spark 3.1.1 and Scala 2.12. >>> >>> I stuck some logging in to check reality: >>> >>> LOG.info("GROUP COUNT: " + fetchedgrp.count()) >>> val cgrp = fetchedgrp.collect() >>> cgrp.foreach(f => { >>> LOG.info("Out1 :" + f._1) >>> f._2.foreach(u => { >>> LOG.info("ID:" + u.getId) >>> LOG.info("GROUP:" + u.getGroup) >>> }) >>> }) >>> LOG.info("PARTITION COUNT:" + fetchedgrp.getNumPartitions) >>> val fetchedRdd = fetchedgrp.flatMap({ case (grp, rs) => new >>> FairFetcher(job, rs.iterator, localFetchDelay, >>> FetchFunction, ParseFunction, OutLinkFilterFunction, >>> StatusUpdateSolrTransformer) }) >>> .persist() >>> >>> LOG.info("FETCHED PARTITIONS: " + fetchedRdd.getNumPartitions) >>> LOG.info("CoUNT: " + fetchedRdd.count()) >>> >>> >>> It says I have 5000 groups, which makes sense as its defined in my >>> command line and both sides claim to have 50 partitions which also makes >>> sense as I define that in my code as well. >>> >>> Then it starts the crawl at the final count line as I guess it needs to >>> materialize things and so at that point I don't know what the count would >>> return, but everything else checks out. >>> >>> I'll poke around in the other hints you suggested later, thanks for the >>> help. >>> >>> Tom >>> >>> On Wed, Jun 9, 2021 at 5:49 PM Chris Martin >>> wrote: >>> >>>> Hmm then my guesses are (in order of decreasing probability: >>>> >>>> * Whatever class makes up fetchedRdd (MemexDeepCrawlDbRDD?) isn't >>>> compatible with the lastest spark release. >>>> * You've got 16 threads per task on a 16 core machine. Should be fine, >>>> but I wonder if it's confusing things as you don't also set >>>> spark.executor.cores and Databricks might also default that to 1. >>>> * There's some custom partitioner in play which is causing everything >>>> to go to the same partition. >>>> * The group keys are all hashing to the same value (it's difficult to >>>> see how this would be the case if the group keys are genuinely different, >>>> but maybe there's something else going on). >>>> >>>> My hints: >>>> >>>> 1. Make sure you're using a recent version of sparkler >>>> 2. Try repartition with a custom partitioner that you know will end >>>> things to different partitions >>>> 3. Try either removing "spark.task.cpus":"16" or setting >>>> spark.executor.cores to 1. >>>> 4. print out the group keys and see if there's any weird pattern to >>>> them. >>>> 5. See if the same thing happens in spark local. >>>> >>>> If you have a reproducible example you can post publically then I'm >>>> happy to take a look. >>>> >>>> Chris >>>> >>>> On Wed, Jun 9, 2021 at 5:17 PM Tom Barber wrote: >>>> >>>>> Yeah to test that I just set the group key to the ID in the record >>>>> which is a solr supplied UUID, which means effectively you end up with >>>>> 4000 >>>>> groups now. >
Re: Distributing a FlatMap across a Spark Cluster
And also as this morning: https://pasteboard.co/K5Q9aEf.png Removing the cpu pins gives me more tasks but as you can see here: https://pasteboard.co/K5Q9GO0.png It just loads up a single server. On Wed, Jun 9, 2021 at 6:32 PM Tom Barber wrote: > Thanks Chris > > All the code I have on both sides is as modern as it allows. Running Spark > 3.1.1 and Scala 2.12. > > I stuck some logging in to check reality: > > LOG.info("GROUP COUNT: " + fetchedgrp.count()) > val cgrp = fetchedgrp.collect() > cgrp.foreach(f => { > LOG.info("Out1 :" + f._1) > f._2.foreach(u => { > LOG.info("ID:" + u.getId) > LOG.info("GROUP:" + u.getGroup) > }) > }) > LOG.info("PARTITION COUNT:" + fetchedgrp.getNumPartitions) > val fetchedRdd = fetchedgrp.flatMap({ case (grp, rs) => new FairFetcher(job, > rs.iterator, localFetchDelay, > FetchFunction, ParseFunction, OutLinkFilterFunction, > StatusUpdateSolrTransformer) }) > .persist() > > LOG.info("FETCHED PARTITIONS: " + fetchedRdd.getNumPartitions) > LOG.info("CoUNT: " + fetchedRdd.count()) > > > It says I have 5000 groups, which makes sense as its defined in my command > line and both sides claim to have 50 partitions which also makes sense as I > define that in my code as well. > > Then it starts the crawl at the final count line as I guess it needs to > materialize things and so at that point I don't know what the count would > return, but everything else checks out. > > I'll poke around in the other hints you suggested later, thanks for the > help. > > Tom > > On Wed, Jun 9, 2021 at 5:49 PM Chris Martin wrote: > >> Hmm then my guesses are (in order of decreasing probability: >> >> * Whatever class makes up fetchedRdd (MemexDeepCrawlDbRDD?) isn't >> compatible with the lastest spark release. >> * You've got 16 threads per task on a 16 core machine. Should be fine, >> but I wonder if it's confusing things as you don't also set >> spark.executor.cores and Databricks might also default that to 1. >> * There's some custom partitioner in play which is causing everything to >> go to the same partition. >> * The group keys are all hashing to the same value (it's difficult to see >> how this would be the case if the group keys are genuinely different, but >> maybe there's something else going on). >> >> My hints: >> >> 1. Make sure you're using a recent version of sparkler >> 2. Try repartition with a custom partitioner that you know will end >> things to different partitions >> 3. Try either removing "spark.task.cpus":"16" or setting >> spark.executor.cores to 1. >> 4. print out the group keys and see if there's any weird pattern to them. >> 5. See if the same thing happens in spark local. >> >> If you have a reproducible example you can post publically then I'm happy >> to take a look. >> >> Chris >> >> On Wed, Jun 9, 2021 at 5:17 PM Tom Barber wrote: >> >>> Yeah to test that I just set the group key to the ID in the record which >>> is a solr supplied UUID, which means effectively you end up with 4000 >>> groups now. >>> >>> On Wed, Jun 9, 2021 at 5:13 PM Chris Martin >>> wrote: >>> >>>> One thing I would check is this line: >>>> >>>> val fetchedRdd = rdd.map(r => (r.getGroup, r)) >>>> >>>> how many distinct groups do you ended up with? If there's just one >>>> then I think you might see the behaviour you observe. >>>> >>>> Chris >>>> >>>> >>>> On Wed, Jun 9, 2021 at 4:17 PM Tom Barber wrote: >>>> >>>>> Also just to follow up on that slightly, I did also try off the back >>>>> of another comment: >>>>> >>>>> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = { >>>>> val job = this.job.asInstanceOf[SparklerJob] >>>>> >>>>> val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d)) >>>>> >>>>> val scoreUpdateRdd: RDD[SolrInputDocument] = >>>>> scoredRdd.repartition(50).map(d => ScoreUpdateSolrTransformer(d)) >>>>> >>>>> >>>>> Where I repartitioned that scoredRdd map out of interest, it then >>>>> triggers the FairFetcher function there, instead of in the runJob(), but >>>>> still on a single executor >>>>> >>>>> Tom >>>>> >>>>> On Wed, Jun 9,
Re: Distributing a FlatMap across a Spark Cluster
Thanks Chris All the code I have on both sides is as modern as it allows. Running Spark 3.1.1 and Scala 2.12. I stuck some logging in to check reality: LOG.info("GROUP COUNT: " + fetchedgrp.count()) val cgrp = fetchedgrp.collect() cgrp.foreach(f => { LOG.info("Out1 :" + f._1) f._2.foreach(u => { LOG.info("ID:" + u.getId) LOG.info("GROUP:" + u.getGroup) }) }) LOG.info("PARTITION COUNT:" + fetchedgrp.getNumPartitions) val fetchedRdd = fetchedgrp.flatMap({ case (grp, rs) => new FairFetcher(job, rs.iterator, localFetchDelay, FetchFunction, ParseFunction, OutLinkFilterFunction, StatusUpdateSolrTransformer) }) .persist() LOG.info("FETCHED PARTITIONS: " + fetchedRdd.getNumPartitions) LOG.info("CoUNT: " + fetchedRdd.count()) It says I have 5000 groups, which makes sense as its defined in my command line and both sides claim to have 50 partitions which also makes sense as I define that in my code as well. Then it starts the crawl at the final count line as I guess it needs to materialize things and so at that point I don't know what the count would return, but everything else checks out. I'll poke around in the other hints you suggested later, thanks for the help. Tom On Wed, Jun 9, 2021 at 5:49 PM Chris Martin wrote: > Hmm then my guesses are (in order of decreasing probability: > > * Whatever class makes up fetchedRdd (MemexDeepCrawlDbRDD?) isn't > compatible with the lastest spark release. > * You've got 16 threads per task on a 16 core machine. Should be fine, > but I wonder if it's confusing things as you don't also set > spark.executor.cores and Databricks might also default that to 1. > * There's some custom partitioner in play which is causing everything to > go to the same partition. > * The group keys are all hashing to the same value (it's difficult to see > how this would be the case if the group keys are genuinely different, but > maybe there's something else going on). > > My hints: > > 1. Make sure you're using a recent version of sparkler > 2. Try repartition with a custom partitioner that you know will end things > to different partitions > 3. Try either removing "spark.task.cpus":"16" or setting > spark.executor.cores to 1. > 4. print out the group keys and see if there's any weird pattern to them. > 5. See if the same thing happens in spark local. > > If you have a reproducible example you can post publically then I'm happy > to take a look. > > Chris > > On Wed, Jun 9, 2021 at 5:17 PM Tom Barber wrote: > >> Yeah to test that I just set the group key to the ID in the record which >> is a solr supplied UUID, which means effectively you end up with 4000 >> groups now. >> >> On Wed, Jun 9, 2021 at 5:13 PM Chris Martin >> wrote: >> >>> One thing I would check is this line: >>> >>> val fetchedRdd = rdd.map(r => (r.getGroup, r)) >>> >>> how many distinct groups do you ended up with? If there's just one then >>> I think you might see the behaviour you observe. >>> >>> Chris >>> >>> >>> On Wed, Jun 9, 2021 at 4:17 PM Tom Barber wrote: >>> >>>> Also just to follow up on that slightly, I did also try off the back of >>>> another comment: >>>> >>>> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = { >>>> val job = this.job.asInstanceOf[SparklerJob] >>>> >>>> val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d)) >>>> >>>> val scoreUpdateRdd: RDD[SolrInputDocument] = >>>> scoredRdd.repartition(50).map(d => ScoreUpdateSolrTransformer(d)) >>>> >>>> >>>> Where I repartitioned that scoredRdd map out of interest, it then >>>> triggers the FairFetcher function there, instead of in the runJob(), but >>>> still on a single executor >>>> >>>> Tom >>>> >>>> On Wed, Jun 9, 2021 at 4:11 PM Tom Barber wrote: >>>> >>>>> >>>>> Okay so what happens is that the crawler reads a bunch of solr data, >>>>> we're not talking GB's just a list of JSON and turns that into a bunch of >>>>> RDD's that end up in that flatmap that I linked to first. >>>>> >>>>> The fair fetcher is an interface to a pluggable backend that basically >>>>> takes some of the fields and goes and crawls websites listed in them >>>>> looking for information. We wrote this code 6 years ago for a DARPA >>>>> project >>>>> tracking down criminals on the web. Now I'm reusing
Re: Distributing a FlatMap across a Spark Cluster
Yeah to test that I just set the group key to the ID in the record which is a solr supplied UUID, which means effectively you end up with 4000 groups now. On Wed, Jun 9, 2021 at 5:13 PM Chris Martin wrote: > One thing I would check is this line: > > val fetchedRdd = rdd.map(r => (r.getGroup, r)) > > how many distinct groups do you ended up with? If there's just one then I > think you might see the behaviour you observe. > > Chris > > > On Wed, Jun 9, 2021 at 4:17 PM Tom Barber wrote: > >> Also just to follow up on that slightly, I did also try off the back of >> another comment: >> >> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = { >> val job = this.job.asInstanceOf[SparklerJob] >> >> val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d)) >> >> val scoreUpdateRdd: RDD[SolrInputDocument] = >> scoredRdd.repartition(50).map(d => ScoreUpdateSolrTransformer(d)) >> >> >> Where I repartitioned that scoredRdd map out of interest, it then >> triggers the FairFetcher function there, instead of in the runJob(), but >> still on a single executor >> >> Tom >> >> On Wed, Jun 9, 2021 at 4:11 PM Tom Barber wrote: >> >>> >>> Okay so what happens is that the crawler reads a bunch of solr data, >>> we're not talking GB's just a list of JSON and turns that into a bunch of >>> RDD's that end up in that flatmap that I linked to first. >>> >>> The fair fetcher is an interface to a pluggable backend that basically >>> takes some of the fields and goes and crawls websites listed in them >>> looking for information. We wrote this code 6 years ago for a DARPA project >>> tracking down criminals on the web. Now I'm reusing it but trying to force >>> it to scale out a bit more. >>> >>> Say I have 4000 urls I want to crawl and a 3 node Spark cluster, I want >>> to push down 1 URL (a few more wont hurt, but crawling 50 urls in parallel >>> on one node makes my cluster sad) to each executor and have it run a crawl, >>> then move on and get another one and so on. That way you're not saturating >>> a node trying to look up all of them and you could add more nodes for >>> greater capacity pretty quickly. Once the website has been captured, you >>> can then "score" it for want of a better term to determine its usefulness, >>> which is where the map is being triggered. >>> >>> In answer to your questions Sean, no action seems triggered until you >>> end up in the score block and the sc.runJob() because thats literally the >>> next line of functionality as Kafka isn't enabled. >>> >>> val fetchedRdd = rdd.map(r => (r.getGroup, r)) >>> .groupByKey(m).flatMap({ case (grp, rs) => new FairFetcher(job, >>> rs.iterator, localFetchDelay, >>> FetchFunction, ParseFunction, OutLinkFilterFunction, >>> StatusUpdateSolrTransformer).toSeq }) >>> .persist() >>> >>> if (kafkaEnable) { >>> storeContentKafka(kafkaListeners, kafkaTopic.format(jobId), fetchedRdd) >>> } >>> val scoredRdd = score(fetchedRdd) >>> >>> >>> That if block is disabled so the score function runs. Inside of that: >>> >>> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = { >>> val job = this.job.asInstanceOf[SparklerJob] >>> >>> val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d)) >>> >>> val scoreUpdateRdd: RDD[SolrInputDocument] = scoredRdd.map(d => >>> ScoreUpdateSolrTransformer(d)) >>> val scoreUpdateFunc = new SolrStatusUpdate(job) >>> sc.runJob(scoreUpdateRdd, scoreUpdateFunc) >>> >>> >>> >>> When its doing stuff in the SparkUI I can see that its waiting on the >>> sc.runJob() line, so thats the execution point. >>> >>> >>> Tom >>> >>> On Wed, Jun 9, 2021 at 3:59 PM Sean Owen wrote: >>> >>>> persist() doesn't even persist by itself - just sets it to be persisted >>>> when it's executed. >>>> key doesn't matter here, nor partitioning, if this code is trying to >>>> run things on the driver inadvertently. >>>> I don't quite grok what the OSS code you linked to is doing, but it's >>>> running some supplied functions very directly and at a low-level with >>>> sc.runJob, which might be part of how this can do something unusual. >>>> How do you trigger any action? what happens after persist() >>>> >>>&
Re: Distributing a FlatMap across a Spark Cluster
@sam: def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = { val job = this.job.asInstanceOf[SparklerJob] val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d)) val m = 50 val repRdd = scoredRdd.repartition(m).cache() repRdd.take(1) val scoreUpdateRdd: RDD[SolrInputDocument] = repRdd.map(d => ScoreUpdateSolrTransformer(d)) I did that, but the crawl is executed in that repartition executor (which I should have pointed out I already know). Tom On Wed, Jun 9, 2021 at 4:37 PM Tom Barber wrote: > Sorry Sam, I missed that earlier, I'll give it a spin. > > > To everyone involved, this code is old, and not written by me. If you all > go "oooh, you want to distribute the crawls over the cluster, you don't > want to do it like that, you should look at XYZ instead" feel free to punt > different ways of doing this across, I'm happy to refactor the code to > modernize it/follow better practices. > > On Wed, Jun 9, 2021 at 4:25 PM Sam wrote: > >> Like I said In my previous email, can you try this and let me know how >> many tasks you see? >> >> val repRdd = scoredRdd.repartition(50).cache() >> repRdd.take(1) >> Then map operation on repRdd here. >> >> I’ve done similar map operations in the past and this works. >> >> Thanks. >> >> On Wed, Jun 9, 2021 at 11:17 AM Tom Barber wrote: >> >>> Also just to follow up on that slightly, I did also try off the back of >>> another comment: >>> >>> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = { >>> val job = this.job.asInstanceOf[SparklerJob] >>> >>> val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d)) >>> >>> val scoreUpdateRdd: RDD[SolrInputDocument] = >>> scoredRdd.repartition(50).map(d => ScoreUpdateSolrTransformer(d)) >>> >>> >>> Where I repartitioned that scoredRdd map out of interest, it then >>> triggers the FairFetcher function there, instead of in the runJob(), but >>> still on a single executor >>> >>> Tom >>> >>> On Wed, Jun 9, 2021 at 4:11 PM Tom Barber wrote: >>> >>>> >>>> Okay so what happens is that the crawler reads a bunch of solr data, >>>> we're not talking GB's just a list of JSON and turns that into a bunch of >>>> RDD's that end up in that flatmap that I linked to first. >>>> >>>> The fair fetcher is an interface to a pluggable backend that basically >>>> takes some of the fields and goes and crawls websites listed in them >>>> looking for information. We wrote this code 6 years ago for a DARPA project >>>> tracking down criminals on the web. Now I'm reusing it but trying to force >>>> it to scale out a bit more. >>>> >>>> Say I have 4000 urls I want to crawl and a 3 node Spark cluster, I want >>>> to push down 1 URL (a few more wont hurt, but crawling 50 urls in parallel >>>> on one node makes my cluster sad) to each executor and have it run a crawl, >>>> then move on and get another one and so on. That way you're not saturating >>>> a node trying to look up all of them and you could add more nodes for >>>> greater capacity pretty quickly. Once the website has been captured, you >>>> can then "score" it for want of a better term to determine its usefulness, >>>> which is where the map is being triggered. >>>> >>>> In answer to your questions Sean, no action seems triggered until you >>>> end up in the score block and the sc.runJob() because thats literally the >>>> next line of functionality as Kafka isn't enabled. >>>> >>>> val fetchedRdd = rdd.map(r => (r.getGroup, r)) >>>> .groupByKey(m).flatMap({ case (grp, rs) => new FairFetcher(job, >>>> rs.iterator, localFetchDelay, >>>> FetchFunction, ParseFunction, OutLinkFilterFunction, >>>> StatusUpdateSolrTransformer).toSeq }) >>>> .persist() >>>> >>>> if (kafkaEnable) { >>>> storeContentKafka(kafkaListeners, kafkaTopic.format(jobId), fetchedRdd) >>>> } >>>> val scoredRdd = score(fetchedRdd) >>>> >>>> >>>> That if block is disabled so the score function runs. Inside of that: >>>> >>>> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = { >>>> val job = this.job.asInstanceOf[SparklerJob] >>>> >>>> val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d)) >>>> >>>> val scoreUpdate
Re: Distributing a FlatMap across a Spark Cluster
Sorry Sam, I missed that earlier, I'll give it a spin. To everyone involved, this code is old, and not written by me. If you all go "oooh, you want to distribute the crawls over the cluster, you don't want to do it like that, you should look at XYZ instead" feel free to punt different ways of doing this across, I'm happy to refactor the code to modernize it/follow better practices. On Wed, Jun 9, 2021 at 4:25 PM Sam wrote: > Like I said In my previous email, can you try this and let me know how > many tasks you see? > > val repRdd = scoredRdd.repartition(50).cache() > repRdd.take(1) > Then map operation on repRdd here. > > I’ve done similar map operations in the past and this works. > > Thanks. > > On Wed, Jun 9, 2021 at 11:17 AM Tom Barber wrote: > >> Also just to follow up on that slightly, I did also try off the back of >> another comment: >> >> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = { >> val job = this.job.asInstanceOf[SparklerJob] >> >> val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d)) >> >> val scoreUpdateRdd: RDD[SolrInputDocument] = >> scoredRdd.repartition(50).map(d => ScoreUpdateSolrTransformer(d)) >> >> >> Where I repartitioned that scoredRdd map out of interest, it then >> triggers the FairFetcher function there, instead of in the runJob(), but >> still on a single executor >> >> Tom >> >> On Wed, Jun 9, 2021 at 4:11 PM Tom Barber wrote: >> >>> >>> Okay so what happens is that the crawler reads a bunch of solr data, >>> we're not talking GB's just a list of JSON and turns that into a bunch of >>> RDD's that end up in that flatmap that I linked to first. >>> >>> The fair fetcher is an interface to a pluggable backend that basically >>> takes some of the fields and goes and crawls websites listed in them >>> looking for information. We wrote this code 6 years ago for a DARPA project >>> tracking down criminals on the web. Now I'm reusing it but trying to force >>> it to scale out a bit more. >>> >>> Say I have 4000 urls I want to crawl and a 3 node Spark cluster, I want >>> to push down 1 URL (a few more wont hurt, but crawling 50 urls in parallel >>> on one node makes my cluster sad) to each executor and have it run a crawl, >>> then move on and get another one and so on. That way you're not saturating >>> a node trying to look up all of them and you could add more nodes for >>> greater capacity pretty quickly. Once the website has been captured, you >>> can then "score" it for want of a better term to determine its usefulness, >>> which is where the map is being triggered. >>> >>> In answer to your questions Sean, no action seems triggered until you >>> end up in the score block and the sc.runJob() because thats literally the >>> next line of functionality as Kafka isn't enabled. >>> >>> val fetchedRdd = rdd.map(r => (r.getGroup, r)) >>> .groupByKey(m).flatMap({ case (grp, rs) => new FairFetcher(job, >>> rs.iterator, localFetchDelay, >>> FetchFunction, ParseFunction, OutLinkFilterFunction, >>> StatusUpdateSolrTransformer).toSeq }) >>> .persist() >>> >>> if (kafkaEnable) { >>> storeContentKafka(kafkaListeners, kafkaTopic.format(jobId), fetchedRdd) >>> } >>> val scoredRdd = score(fetchedRdd) >>> >>> >>> That if block is disabled so the score function runs. Inside of that: >>> >>> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = { >>> val job = this.job.asInstanceOf[SparklerJob] >>> >>> val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d)) >>> >>> val scoreUpdateRdd: RDD[SolrInputDocument] = scoredRdd.map(d => >>> ScoreUpdateSolrTransformer(d)) >>> val scoreUpdateFunc = new SolrStatusUpdate(job) >>> sc.runJob(scoreUpdateRdd, scoreUpdateFunc) >>> >>> >>> >>> When its doing stuff in the SparkUI I can see that its waiting on the >>> sc.runJob() line, so thats the execution point. >>> >>> >>> Tom >>> >>> On Wed, Jun 9, 2021 at 3:59 PM Sean Owen wrote: >>> >>>> persist() doesn't even persist by itself - just sets it to be persisted >>>> when it's executed. >>>> key doesn't matter here, nor partitioning, if this code is trying to >>>> run things on the driver inadvertently. >>>> I don't quite grok what the OSS code you linked to is doing, but
Re: Distributing a FlatMap across a Spark Cluster
Also just to follow up on that slightly, I did also try off the back of another comment: def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = { val job = this.job.asInstanceOf[SparklerJob] val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d)) val scoreUpdateRdd: RDD[SolrInputDocument] = scoredRdd.repartition(50).map(d => ScoreUpdateSolrTransformer(d)) Where I repartitioned that scoredRdd map out of interest, it then triggers the FairFetcher function there, instead of in the runJob(), but still on a single executor Tom On Wed, Jun 9, 2021 at 4:11 PM Tom Barber wrote: > > Okay so what happens is that the crawler reads a bunch of solr data, we're > not talking GB's just a list of JSON and turns that into a bunch of RDD's > that end up in that flatmap that I linked to first. > > The fair fetcher is an interface to a pluggable backend that basically > takes some of the fields and goes and crawls websites listed in them > looking for information. We wrote this code 6 years ago for a DARPA project > tracking down criminals on the web. Now I'm reusing it but trying to force > it to scale out a bit more. > > Say I have 4000 urls I want to crawl and a 3 node Spark cluster, I want to > push down 1 URL (a few more wont hurt, but crawling 50 urls in parallel on > one node makes my cluster sad) to each executor and have it run a crawl, > then move on and get another one and so on. That way you're not saturating > a node trying to look up all of them and you could add more nodes for > greater capacity pretty quickly. Once the website has been captured, you > can then "score" it for want of a better term to determine its usefulness, > which is where the map is being triggered. > > In answer to your questions Sean, no action seems triggered until you end > up in the score block and the sc.runJob() because thats literally the next > line of functionality as Kafka isn't enabled. > > val fetchedRdd = rdd.map(r => (r.getGroup, r)) > .groupByKey(m).flatMap({ case (grp, rs) => new FairFetcher(job, > rs.iterator, localFetchDelay, > FetchFunction, ParseFunction, OutLinkFilterFunction, > StatusUpdateSolrTransformer).toSeq }) > .persist() > > if (kafkaEnable) { > storeContentKafka(kafkaListeners, kafkaTopic.format(jobId), fetchedRdd) > } > val scoredRdd = score(fetchedRdd) > > > That if block is disabled so the score function runs. Inside of that: > > def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = { > val job = this.job.asInstanceOf[SparklerJob] > > val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d)) > > val scoreUpdateRdd: RDD[SolrInputDocument] = scoredRdd.map(d => > ScoreUpdateSolrTransformer(d)) > val scoreUpdateFunc = new SolrStatusUpdate(job) > sc.runJob(scoreUpdateRdd, scoreUpdateFunc) > > > > When its doing stuff in the SparkUI I can see that its waiting on the > sc.runJob() line, so thats the execution point. > > > Tom > > On Wed, Jun 9, 2021 at 3:59 PM Sean Owen wrote: > >> persist() doesn't even persist by itself - just sets it to be persisted >> when it's executed. >> key doesn't matter here, nor partitioning, if this code is trying to run >> things on the driver inadvertently. >> I don't quite grok what the OSS code you linked to is doing, but it's >> running some supplied functions very directly and at a low-level with >> sc.runJob, which might be part of how this can do something unusual. >> How do you trigger any action? what happens after persist() >> >> On Wed, Jun 9, 2021 at 9:48 AM Tom Barber wrote: >> >>> Thanks Mich, >>> >>> The key on the first iteration is just a string that says "seed", so it >>> is indeed on the first crawl the same across all of the groups. Further >>> iterations would be different, but I'm not there yet. I was under the >>> impression that a repartition would distribute the tasks. Is that not the >>> case? >>> >>> Thanks >>> >>> Tom >>> >>> On Wed, Jun 9, 2021 at 3:44 PM Mich Talebzadeh < >>> mich.talebza...@gmail.com> wrote: >>> >>>> Hi Tom, >>>> >>>> Persist() here simply means persist to memory). That is all. You can >>>> check UI tab on storage >>>> >>>> >>>> https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence >>>> >>>> So I gather the code is stuck from your link in the driver. You stated >>>> that you tried repartition() but it did not do anything, >>>> >>>> Further you stated : >>>> >>>> " The key
Re: Distributing a FlatMap across a Spark Cluster
Okay so what happens is that the crawler reads a bunch of solr data, we're not talking GB's just a list of JSON and turns that into a bunch of RDD's that end up in that flatmap that I linked to first. The fair fetcher is an interface to a pluggable backend that basically takes some of the fields and goes and crawls websites listed in them looking for information. We wrote this code 6 years ago for a DARPA project tracking down criminals on the web. Now I'm reusing it but trying to force it to scale out a bit more. Say I have 4000 urls I want to crawl and a 3 node Spark cluster, I want to push down 1 URL (a few more wont hurt, but crawling 50 urls in parallel on one node makes my cluster sad) to each executor and have it run a crawl, then move on and get another one and so on. That way you're not saturating a node trying to look up all of them and you could add more nodes for greater capacity pretty quickly. Once the website has been captured, you can then "score" it for want of a better term to determine its usefulness, which is where the map is being triggered. In answer to your questions Sean, no action seems triggered until you end up in the score block and the sc.runJob() because thats literally the next line of functionality as Kafka isn't enabled. val fetchedRdd = rdd.map(r => (r.getGroup, r)) .groupByKey(m).flatMap({ case (grp, rs) => new FairFetcher(job, rs.iterator, localFetchDelay, FetchFunction, ParseFunction, OutLinkFilterFunction, StatusUpdateSolrTransformer).toSeq }) .persist() if (kafkaEnable) { storeContentKafka(kafkaListeners, kafkaTopic.format(jobId), fetchedRdd) } val scoredRdd = score(fetchedRdd) That if block is disabled so the score function runs. Inside of that: def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = { val job = this.job.asInstanceOf[SparklerJob] val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d)) val scoreUpdateRdd: RDD[SolrInputDocument] = scoredRdd.map(d => ScoreUpdateSolrTransformer(d)) val scoreUpdateFunc = new SolrStatusUpdate(job) sc.runJob(scoreUpdateRdd, scoreUpdateFunc) When its doing stuff in the SparkUI I can see that its waiting on the sc.runJob() line, so thats the execution point. Tom On Wed, Jun 9, 2021 at 3:59 PM Sean Owen wrote: > persist() doesn't even persist by itself - just sets it to be persisted > when it's executed. > key doesn't matter here, nor partitioning, if this code is trying to run > things on the driver inadvertently. > I don't quite grok what the OSS code you linked to is doing, but it's > running some supplied functions very directly and at a low-level with > sc.runJob, which might be part of how this can do something unusual. > How do you trigger any action? what happens after persist() > > On Wed, Jun 9, 2021 at 9:48 AM Tom Barber wrote: > >> Thanks Mich, >> >> The key on the first iteration is just a string that says "seed", so it >> is indeed on the first crawl the same across all of the groups. Further >> iterations would be different, but I'm not there yet. I was under the >> impression that a repartition would distribute the tasks. Is that not the >> case? >> >> Thanks >> >> Tom >> >> On Wed, Jun 9, 2021 at 3:44 PM Mich Talebzadeh >> wrote: >> >>> Hi Tom, >>> >>> Persist() here simply means persist to memory). That is all. You can >>> check UI tab on storage >>> >>> >>> https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence >>> >>> So I gather the code is stuck from your link in the driver. You stated >>> that you tried repartition() but it did not do anything, >>> >>> Further you stated : >>> >>> " The key is pretty static in these tests, so I have also tried forcing >>> the partition count (50 on a 16 core per node cluster) and also >>> repartitioning, but every time all the jobs are scheduled to run on one >>> node." >>> >>> >>> What is the key? >>> >>> >>> HTH >>> >>> >>>view my Linkedin profile >>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>> >>> >>> >>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>> any loss, damage or destruction of data or any other property which may >>> arise from relying on this email's technical content is explicitly >>> disclaimed. The author will in no case be liable for any monetary damages >>> arising from such loss, damage or destruction. >>> >>> >>> >>> >>> On Wed, 9 Jun 2021 at 15:23, Tom Barber wrote: >>> >>>>
Re: Distributing a FlatMap across a Spark Cluster
Thanks Mich, The key on the first iteration is just a string that says "seed", so it is indeed on the first crawl the same across all of the groups. Further iterations would be different, but I'm not there yet. I was under the impression that a repartition would distribute the tasks. Is that not the case? Thanks Tom On Wed, Jun 9, 2021 at 3:44 PM Mich Talebzadeh wrote: > Hi Tom, > > Persist() here simply means persist to memory). That is all. You can check > UI tab on storage > > > https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence > > So I gather the code is stuck from your link in the driver. You stated > that you tried repartition() but it did not do anything, > > Further you stated : > > " The key is pretty static in these tests, so I have also tried forcing > the partition count (50 on a 16 core per node cluster) and also > repartitioning, but every time all the jobs are scheduled to run on one > node." > > > What is the key? > > > HTH > > >view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Wed, 9 Jun 2021 at 15:23, Tom Barber wrote: > >> Interesting Sean thanks for that insight, I wasn't aware of that fact, I >> assume the .persist() at the end of that line doesn't do it? >> >> I believe, looking at the output in the SparkUI, it gets to >> https://github.com/USCDataScience/sparkler/blob/master/sparkler-core/sparkler-app/src/main/scala/edu/usc/irds/sparkler/pipeline/Crawler.scala#L254 >> and calls the context runJob. >> >> On Wed, Jun 9, 2021 at 2:07 PM Sean Owen wrote: >> >>> All these configurations don't matter at all if this is executing on the >>> driver. >>> Returning an Iterator in flatMap is fine though it 'delays' execution >>> until that iterator is evaluated by something, which is normally fine. >>> Does creating this FairFetcher do anything by itself? you're just >>> returning an iterator that creates them here. >>> How do you actually trigger an action here? the code snippet itself >>> doesn't trigger anything. >>> I think we need more info about what else is happening in the code. >>> >>> On Wed, Jun 9, 2021 at 6:30 AM Tom Barber wrote: >>> >>>> Yeah so if I update the FairFetcher to return a seq it makes no real >>>> difference. >>>> >>>> Here's an image of what I'm seeing just for reference: >>>> https://pasteboard.co/K5NFrz7.png >>>> >>>> Because this is databricks I don't have an actual spark submit command >>>> but it looks like this: >>>> >>>> curl -d >>>> '{"new_cluster":{"spark_conf":{"spark.executor.extraJavaOptions":"-Dpf4j.pluginsDir=/dbfs/FileStore/bcf/plugins/", >>>> "spark.task.cpus":"16"}, >>>> "spark_version":"8.3.x-scala2.12","aws_attributes":{"availability":"SPOT_WITH_FALLBACK","first_on_demand":1,"zone_id":"us-west-2c"},"node_type_id":"c5d.4xlarge","init_scripts":[{"dbfs":{"destination":"dbfs:/FileStore/crawlinit.sh"}}],"num_workers":3},"spark_submit_task":{"parameters":["--driver-java-options", >>>> "-Dpf4j.pluginsDir=/dbfs/FileStore/bcf/plugins/", "--driver-memory", "10g", >>>> "--executor-memory", "10g", >>>> "--class","edu.usc.irds.sparkler.Main","dbfs:/FileStore/bcf/sparkler7.jar","crawl","-id","mytestcrawl11", >>>> "-tn", "5000", "-co", >>>> "{\"plugins.active\":[\"urlfilter-regex\",\"urlfilter-samehost\",\"fetcher-chrome\"],\"plugins\":{\"fetcher.chrome\":{\"chrome.dns\":\"local\"}}}"]},"run_name":"testsubmi3t"}' >>>> >>>> I deliberately pinned spark.task.cpus to 16 to stop it swamping the >>>> driver trying to run all the tasks in parallel on the one node, but again >>>> I've got 5
Re: Distributing a FlatMap across a Spark Cluster
Interesting Sean thanks for that insight, I wasn't aware of that fact, I assume the .persist() at the end of that line doesn't do it? I believe, looking at the output in the SparkUI, it gets to https://github.com/USCDataScience/sparkler/blob/master/sparkler-core/sparkler-app/src/main/scala/edu/usc/irds/sparkler/pipeline/Crawler.scala#L254 and calls the context runJob. On Wed, Jun 9, 2021 at 2:07 PM Sean Owen wrote: > All these configurations don't matter at all if this is executing on the > driver. > Returning an Iterator in flatMap is fine though it 'delays' execution > until that iterator is evaluated by something, which is normally fine. > Does creating this FairFetcher do anything by itself? you're just > returning an iterator that creates them here. > How do you actually trigger an action here? the code snippet itself > doesn't trigger anything. > I think we need more info about what else is happening in the code. > > On Wed, Jun 9, 2021 at 6:30 AM Tom Barber wrote: > >> Yeah so if I update the FairFetcher to return a seq it makes no real >> difference. >> >> Here's an image of what I'm seeing just for reference: >> https://pasteboard.co/K5NFrz7.png >> >> Because this is databricks I don't have an actual spark submit command >> but it looks like this: >> >> curl -d >> '{"new_cluster":{"spark_conf":{"spark.executor.extraJavaOptions":"-Dpf4j.pluginsDir=/dbfs/FileStore/bcf/plugins/", >> "spark.task.cpus":"16"}, >> "spark_version":"8.3.x-scala2.12","aws_attributes":{"availability":"SPOT_WITH_FALLBACK","first_on_demand":1,"zone_id":"us-west-2c"},"node_type_id":"c5d.4xlarge","init_scripts":[{"dbfs":{"destination":"dbfs:/FileStore/crawlinit.sh"}}],"num_workers":3},"spark_submit_task":{"parameters":["--driver-java-options", >> "-Dpf4j.pluginsDir=/dbfs/FileStore/bcf/plugins/", "--driver-memory", "10g", >> "--executor-memory", "10g", >> "--class","edu.usc.irds.sparkler.Main","dbfs:/FileStore/bcf/sparkler7.jar","crawl","-id","mytestcrawl11", >> "-tn", "5000", "-co", >> "{\"plugins.active\":[\"urlfilter-regex\",\"urlfilter-samehost\",\"fetcher-chrome\"],\"plugins\":{\"fetcher.chrome\":{\"chrome.dns\":\"local\"}}}"]},"run_name":"testsubmi3t"}' >> >> I deliberately pinned spark.task.cpus to 16 to stop it swamping the >> driver trying to run all the tasks in parallel on the one node, but again >> I've got 50 tasks queued up all running on the single node. >> >> On Wed, Jun 9, 2021 at 12:01 PM Tom Barber wrote: >> >>> I've not run it yet, but I've stuck a toSeq on the end, but in reality a >>> Seq just inherits Iterator, right? >>> >>> Flatmap does return a RDD[CrawlData] unless my IDE is lying to me. >>> >>> Tom >>> >>> On Wed, Jun 9, 2021 at 10:54 AM Tom Barber wrote: >>> >>>> Interesting Jayesh, thanks, I will test. >>>> >>>> All this code is inherited and it runs, but I don't think its been >>>> tested in a distributed context for about 5 years, but yeah I need to get >>>> this pushed down, so I'm happy to try anything! :) >>>> >>>> Tom >>>> >>>> On Wed, Jun 9, 2021 at 3:37 AM Lalwani, Jayesh >>>> wrote: >>>> >>>>> flatMap is supposed to return Seq, not Iterator. You are returning a >>>>> class that implements Iterator. I have a hunch that's what's causing the >>>>> confusion. flatMap is returning a RDD[FairFetcher] not RDD[CrawlData]. Do >>>>> you intend it to be RDD[CrawlData]? You might want to call toSeq on >>>>> FairFetcher. >>>>> >>>>> On 6/8/21, 10:10 PM, "Tom Barber" wrote: >>>>> >>>>> CAUTION: This email originated from outside of the organization. >>>>> Do not click links or open attachments unless you can confirm the sender >>>>> and know the content is safe. >>>>> >>>>> >>>>> >>>>> For anyone interested here's the execution logs up until the point >>>>> where it actually kicks off the workload in question: >>>>> https://gist.github.c
Re: Distributing a FlatMap across a Spark Cluster
Yeah so if I update the FairFetcher to return a seq it makes no real difference. Here's an image of what I'm seeing just for reference: https://pasteboard.co/K5NFrz7.png Because this is databricks I don't have an actual spark submit command but it looks like this: curl -d '{"new_cluster":{"spark_conf":{"spark.executor.extraJavaOptions":"-Dpf4j.pluginsDir=/dbfs/FileStore/bcf/plugins/", "spark.task.cpus":"16"}, "spark_version":"8.3.x-scala2.12","aws_attributes":{"availability":"SPOT_WITH_FALLBACK","first_on_demand":1,"zone_id":"us-west-2c"},"node_type_id":"c5d.4xlarge","init_scripts":[{"dbfs":{"destination":"dbfs:/FileStore/crawlinit.sh"}}],"num_workers":3},"spark_submit_task":{"parameters":["--driver-java-options", "-Dpf4j.pluginsDir=/dbfs/FileStore/bcf/plugins/", "--driver-memory", "10g", "--executor-memory", "10g", "--class","edu.usc.irds.sparkler.Main","dbfs:/FileStore/bcf/sparkler7.jar","crawl","-id","mytestcrawl11", "-tn", "5000", "-co", "{\"plugins.active\":[\"urlfilter-regex\",\"urlfilter-samehost\",\"fetcher-chrome\"],\"plugins\":{\"fetcher.chrome\":{\"chrome.dns\":\"local\"}}}"]},"run_name":"testsubmi3t"}' I deliberately pinned spark.task.cpus to 16 to stop it swamping the driver trying to run all the tasks in parallel on the one node, but again I've got 50 tasks queued up all running on the single node. On Wed, Jun 9, 2021 at 12:01 PM Tom Barber wrote: > I've not run it yet, but I've stuck a toSeq on the end, but in reality a > Seq just inherits Iterator, right? > > Flatmap does return a RDD[CrawlData] unless my IDE is lying to me. > > Tom > > On Wed, Jun 9, 2021 at 10:54 AM Tom Barber wrote: > >> Interesting Jayesh, thanks, I will test. >> >> All this code is inherited and it runs, but I don't think its been tested >> in a distributed context for about 5 years, but yeah I need to get this >> pushed down, so I'm happy to try anything! :) >> >> Tom >> >> On Wed, Jun 9, 2021 at 3:37 AM Lalwani, Jayesh >> wrote: >> >>> flatMap is supposed to return Seq, not Iterator. You are returning a >>> class that implements Iterator. I have a hunch that's what's causing the >>> confusion. flatMap is returning a RDD[FairFetcher] not RDD[CrawlData]. Do >>> you intend it to be RDD[CrawlData]? You might want to call toSeq on >>> FairFetcher. >>> >>> On 6/8/21, 10:10 PM, "Tom Barber" wrote: >>> >>> CAUTION: This email originated from outside of the organization. Do >>> not click links or open attachments unless you can confirm the sender and >>> know the content is safe. >>> >>> >>> >>> For anyone interested here's the execution logs up until the point >>> where it actually kicks off the workload in question: >>> https://gist.github.com/buggtb/a9e0445f24182bc8eedfe26c0f07a473 >>> >>> On 2021/06/09 01:52:39, Tom Barber wrote: >>> > ExecutorID says driver, and looking at the IP addresses its >>> running on its not any of the worker ip's. >>> > >>> > I forcibly told it to create 50, but they'd all end up running in >>> the same place. >>> > >>> > Working on some other ideas, I set spark.task.cpus to 16 to match >>> the nodes whilst still forcing it to 50 partitions >>> > >>> > val m = 50 >>> > >>> > val fetchedRdd = rdd.map(r => (r.getGroup, r)) >>> > .groupByKey(m).flatMap({ case (grp, rs) => new >>> FairFetcher(job, rs.iterator, localFetchDelay, >>> > FetchFunction, ParseFunction, OutLinkFilterFunction, >>> StatusUpdateSolrTransformer) }) >>> > .persist() >>> > >>> > that sort of thing. But still the tasks are pinned to the driver >>> executor and none of the workers, so I no longer saturate the master node, >>> but I also have 3 workers just sat there doing nothing. >>> > >>> > On 2021/06/09 01:26:50, Sean Owen wrote: >>> > > Are you sure it's on the driver? or just 1 executor? >>> > > how many partitions does the groupByKey produce? that woul
Re: Distributing a FlatMap across a Spark Cluster
I've not run it yet, but I've stuck a toSeq on the end, but in reality a Seq just inherits Iterator, right? Flatmap does return a RDD[CrawlData] unless my IDE is lying to me. Tom On Wed, Jun 9, 2021 at 10:54 AM Tom Barber wrote: > Interesting Jayesh, thanks, I will test. > > All this code is inherited and it runs, but I don't think its been tested > in a distributed context for about 5 years, but yeah I need to get this > pushed down, so I'm happy to try anything! :) > > Tom > > On Wed, Jun 9, 2021 at 3:37 AM Lalwani, Jayesh > wrote: > >> flatMap is supposed to return Seq, not Iterator. You are returning a >> class that implements Iterator. I have a hunch that's what's causing the >> confusion. flatMap is returning a RDD[FairFetcher] not RDD[CrawlData]. Do >> you intend it to be RDD[CrawlData]? You might want to call toSeq on >> FairFetcher. >> >> On 6/8/21, 10:10 PM, "Tom Barber" wrote: >> >> CAUTION: This email originated from outside of the organization. Do >> not click links or open attachments unless you can confirm the sender and >> know the content is safe. >> >> >> >> For anyone interested here's the execution logs up until the point >> where it actually kicks off the workload in question: >> https://gist.github.com/buggtb/a9e0445f24182bc8eedfe26c0f07a473 >> >> On 2021/06/09 01:52:39, Tom Barber wrote: >> > ExecutorID says driver, and looking at the IP addresses its running >> on its not any of the worker ip's. >> > >> > I forcibly told it to create 50, but they'd all end up running in >> the same place. >> > >> > Working on some other ideas, I set spark.task.cpus to 16 to match >> the nodes whilst still forcing it to 50 partitions >> > >> > val m = 50 >> > >> > val fetchedRdd = rdd.map(r => (r.getGroup, r)) >> > .groupByKey(m).flatMap({ case (grp, rs) => new >> FairFetcher(job, rs.iterator, localFetchDelay, >> > FetchFunction, ParseFunction, OutLinkFilterFunction, >> StatusUpdateSolrTransformer) }) >> > .persist() >> > >> > that sort of thing. But still the tasks are pinned to the driver >> executor and none of the workers, so I no longer saturate the master node, >> but I also have 3 workers just sat there doing nothing. >> > >> > On 2021/06/09 01:26:50, Sean Owen wrote: >> > > Are you sure it's on the driver? or just 1 executor? >> > > how many partitions does the groupByKey produce? that would limit >> your >> > > parallelism no matter what if it's a small number. >> > > >> > > On Tue, Jun 8, 2021 at 8:07 PM Tom Barber < >> magicaltr...@apache.org> wrote: >> > > >> > > > Hi folks, >> > > > >> > > > Hopefully someone with more Spark experience than me can >> explain this a >> > > > bit. >> > > > >> > > > I dont' know if this is possible, impossible or just an old >> design that >> > > > could be better. >> > > > >> > > > I'm running Sparkler as a spark-submit job on a databricks >> spark cluster >> > > > and its getting to this point in the code( >> > > > >> https://github.com/USCDataScience/sparkler/blob/master/sparkler-core/sparkler-app/src/main/scala/edu/usc/irds/sparkler/pipeline/Crawler.scala#L222-L226 >> > > > ) >> > > > >> > > > val fetchedRdd = rdd.map(r => (r.getGroup, r)) >> > > > .groupByKey() >> > > > .flatMap({ case (grp, rs) => new FairFetcher(job, >> rs.iterator, >> > > > localFetchDelay, >> > > > FetchFunction, ParseFunction, OutLinkFilterFunction, >> > > > StatusUpdateSolrTransformer) }) >> > > > .persist() >> > > > >> > > > This basically takes the RDD and then runs a web based crawl >> over each RDD >> > > > and returns the results. But when Spark executes it, it runs >> all the crawls >> > > > on the driver node and doesn't distribute them. >> > > > >> > > > The key is pretty static in these tests, so I have also tried >> forcing the >>
Re: Distributing a FlatMap across a Spark Cluster
Interesting Jayesh, thanks, I will test. All this code is inherited and it runs, but I don't think its been tested in a distributed context for about 5 years, but yeah I need to get this pushed down, so I'm happy to try anything! :) Tom On Wed, Jun 9, 2021 at 3:37 AM Lalwani, Jayesh wrote: > flatMap is supposed to return Seq, not Iterator. You are returning a class > that implements Iterator. I have a hunch that's what's causing the > confusion. flatMap is returning a RDD[FairFetcher] not RDD[CrawlData]. Do > you intend it to be RDD[CrawlData]? You might want to call toSeq on > FairFetcher. > > On 6/8/21, 10:10 PM, "Tom Barber" wrote: > > CAUTION: This email originated from outside of the organization. Do > not click links or open attachments unless you can confirm the sender and > know the content is safe. > > > > For anyone interested here's the execution logs up until the point > where it actually kicks off the workload in question: > https://gist.github.com/buggtb/a9e0445f24182bc8eedfe26c0f07a473 > > On 2021/06/09 01:52:39, Tom Barber wrote: > > ExecutorID says driver, and looking at the IP addresses its running > on its not any of the worker ip's. > > > > I forcibly told it to create 50, but they'd all end up running in > the same place. > > > > Working on some other ideas, I set spark.task.cpus to 16 to match > the nodes whilst still forcing it to 50 partitions > > > > val m = 50 > > > > val fetchedRdd = rdd.map(r => (r.getGroup, r)) > > .groupByKey(m).flatMap({ case (grp, rs) => new > FairFetcher(job, rs.iterator, localFetchDelay, > > FetchFunction, ParseFunction, OutLinkFilterFunction, > StatusUpdateSolrTransformer) }) > > .persist() > > > > that sort of thing. But still the tasks are pinned to the driver > executor and none of the workers, so I no longer saturate the master node, > but I also have 3 workers just sat there doing nothing. > > > > On 2021/06/09 01:26:50, Sean Owen wrote: > > > Are you sure it's on the driver? or just 1 executor? > > > how many partitions does the groupByKey produce? that would limit > your > > > parallelism no matter what if it's a small number. > > > > > > On Tue, Jun 8, 2021 at 8:07 PM Tom Barber > wrote: > > > > > > > Hi folks, > > > > > > > > Hopefully someone with more Spark experience than me can explain > this a > > > > bit. > > > > > > > > I dont' know if this is possible, impossible or just an old > design that > > > > could be better. > > > > > > > > I'm running Sparkler as a spark-submit job on a databricks spark > cluster > > > > and its getting to this point in the code( > > > > > https://github.com/USCDataScience/sparkler/blob/master/sparkler-core/sparkler-app/src/main/scala/edu/usc/irds/sparkler/pipeline/Crawler.scala#L222-L226 > > > > ) > > > > > > > > val fetchedRdd = rdd.map(r => (r.getGroup, r)) > > > > .groupByKey() > > > > .flatMap({ case (grp, rs) => new FairFetcher(job, > rs.iterator, > > > > localFetchDelay, > > > > FetchFunction, ParseFunction, OutLinkFilterFunction, > > > > StatusUpdateSolrTransformer) }) > > > > .persist() > > > > > > > > This basically takes the RDD and then runs a web based crawl > over each RDD > > > > and returns the results. But when Spark executes it, it runs all > the crawls > > > > on the driver node and doesn't distribute them. > > > > > > > > The key is pretty static in these tests, so I have also tried > forcing the > > > > partition count (50 on a 16 core per node cluster) and also > repartitioning, > > > > but every time all the jobs are scheduled to run on one node. > > > > > > > > What can I do better to distribute the tasks? Because the > processing of > > > > the data in the RDD isn't the bottleneck, the fetching of the > crawl data is > > > > the bottleneck, but that happens after the code has been > assigned to a node. > > > > > > > > Thanks > > > > > > > > Tom > > > > > > > > > > > > >
Re: Distributing a FlatMap across a Spark Cluster
For anyone interested here's the execution logs up until the point where it actually kicks off the workload in question: https://gist.github.com/buggtb/a9e0445f24182bc8eedfe26c0f07a473 On 2021/06/09 01:52:39, Tom Barber wrote: > ExecutorID says driver, and looking at the IP addresses its running on its > not any of the worker ip's. > > I forcibly told it to create 50, but they'd all end up running in the same > place. > > Working on some other ideas, I set spark.task.cpus to 16 to match the nodes > whilst still forcing it to 50 partitions > > val m = 50 > > val fetchedRdd = rdd.map(r => (r.getGroup, r)) > .groupByKey(m).flatMap({ case (grp, rs) => new FairFetcher(job, > rs.iterator, localFetchDelay, > FetchFunction, ParseFunction, OutLinkFilterFunction, > StatusUpdateSolrTransformer) }) > .persist() > > that sort of thing. But still the tasks are pinned to the driver executor and > none of the workers, so I no longer saturate the master node, but I also have > 3 workers just sat there doing nothing. > > On 2021/06/09 01:26:50, Sean Owen wrote: > > Are you sure it's on the driver? or just 1 executor? > > how many partitions does the groupByKey produce? that would limit your > > parallelism no matter what if it's a small number. > > > > On Tue, Jun 8, 2021 at 8:07 PM Tom Barber wrote: > > > > > Hi folks, > > > > > > Hopefully someone with more Spark experience than me can explain this a > > > bit. > > > > > > I dont' know if this is possible, impossible or just an old design that > > > could be better. > > > > > > I'm running Sparkler as a spark-submit job on a databricks spark cluster > > > and its getting to this point in the code( > > > https://github.com/USCDataScience/sparkler/blob/master/sparkler-core/sparkler-app/src/main/scala/edu/usc/irds/sparkler/pipeline/Crawler.scala#L222-L226 > > > ) > > > > > > val fetchedRdd = rdd.map(r => (r.getGroup, r)) > > > .groupByKey() > > > .flatMap({ case (grp, rs) => new FairFetcher(job, rs.iterator, > > > localFetchDelay, > > > FetchFunction, ParseFunction, OutLinkFilterFunction, > > > StatusUpdateSolrTransformer) }) > > > .persist() > > > > > > This basically takes the RDD and then runs a web based crawl over each RDD > > > and returns the results. But when Spark executes it, it runs all the > > > crawls > > > on the driver node and doesn't distribute them. > > > > > > The key is pretty static in these tests, so I have also tried forcing the > > > partition count (50 on a 16 core per node cluster) and also > > > repartitioning, > > > but every time all the jobs are scheduled to run on one node. > > > > > > What can I do better to distribute the tasks? Because the processing of > > > the data in the RDD isn't the bottleneck, the fetching of the crawl data > > > is > > > the bottleneck, but that happens after the code has been assigned to a > > > node. > > > > > > Thanks > > > > > > Tom > > > > > > > > > - > > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > > > > > > > > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Distributing a FlatMap across a Spark Cluster
ExecutorID says driver, and looking at the IP addresses its running on its not any of the worker ip's. I forcibly told it to create 50, but they'd all end up running in the same place. Working on some other ideas, I set spark.task.cpus to 16 to match the nodes whilst still forcing it to 50 partitions val m = 50 val fetchedRdd = rdd.map(r => (r.getGroup, r)) .groupByKey(m).flatMap({ case (grp, rs) => new FairFetcher(job, rs.iterator, localFetchDelay, FetchFunction, ParseFunction, OutLinkFilterFunction, StatusUpdateSolrTransformer) }) .persist() that sort of thing. But still the tasks are pinned to the driver executor and none of the workers, so I no longer saturate the master node, but I also have 3 workers just sat there doing nothing. On 2021/06/09 01:26:50, Sean Owen wrote: > Are you sure it's on the driver? or just 1 executor? > how many partitions does the groupByKey produce? that would limit your > parallelism no matter what if it's a small number. > > On Tue, Jun 8, 2021 at 8:07 PM Tom Barber wrote: > > > Hi folks, > > > > Hopefully someone with more Spark experience than me can explain this a > > bit. > > > > I dont' know if this is possible, impossible or just an old design that > > could be better. > > > > I'm running Sparkler as a spark-submit job on a databricks spark cluster > > and its getting to this point in the code( > > https://github.com/USCDataScience/sparkler/blob/master/sparkler-core/sparkler-app/src/main/scala/edu/usc/irds/sparkler/pipeline/Crawler.scala#L222-L226 > > ) > > > > val fetchedRdd = rdd.map(r => (r.getGroup, r)) > > .groupByKey() > > .flatMap({ case (grp, rs) => new FairFetcher(job, rs.iterator, > > localFetchDelay, > > FetchFunction, ParseFunction, OutLinkFilterFunction, > > StatusUpdateSolrTransformer) }) > > .persist() > > > > This basically takes the RDD and then runs a web based crawl over each RDD > > and returns the results. But when Spark executes it, it runs all the crawls > > on the driver node and doesn't distribute them. > > > > The key is pretty static in these tests, so I have also tried forcing the > > partition count (50 on a 16 core per node cluster) and also repartitioning, > > but every time all the jobs are scheduled to run on one node. > > > > What can I do better to distribute the tasks? Because the processing of > > the data in the RDD isn't the bottleneck, the fetching of the crawl data is > > the bottleneck, but that happens after the code has been assigned to a node. > > > > Thanks > > > > Tom > > > > > > - > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > > > > - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Distributing a FlatMap across a Spark Cluster
Hi folks, Hopefully someone with more Spark experience than me can explain this a bit. I dont' know if this is possible, impossible or just an old design that could be better. I'm running Sparkler as a spark-submit job on a databricks spark cluster and its getting to this point in the code(https://github.com/USCDataScience/sparkler/blob/master/sparkler-core/sparkler-app/src/main/scala/edu/usc/irds/sparkler/pipeline/Crawler.scala#L222-L226) val fetchedRdd = rdd.map(r => (r.getGroup, r)) .groupByKey() .flatMap({ case (grp, rs) => new FairFetcher(job, rs.iterator, localFetchDelay, FetchFunction, ParseFunction, OutLinkFilterFunction, StatusUpdateSolrTransformer) }) .persist() This basically takes the RDD and then runs a web based crawl over each RDD and returns the results. But when Spark executes it, it runs all the crawls on the driver node and doesn't distribute them. The key is pretty static in these tests, so I have also tried forcing the partition count (50 on a 16 core per node cluster) and also repartitioning, but every time all the jobs are scheduled to run on one node. What can I do better to distribute the tasks? Because the processing of the data in the RDD isn't the bottleneck, the fetching of the crawl data is the bottleneck, but that happens after the code has been assigned to a node. Thanks Tom - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Help getting Spark JDBC metadata
Hi guys Hopefully someone can help me, or at least explain stuff to me. I use a tool that required JDBC metadata (tables/columns etc) So using spark 1.3.1 I try stuff like: registerTempTable() or saveAsTable() on my parquet file. The former doesn't show any table metadata for JDBC connections, but you can query the table, which is annoying. The latter shows a table but the column metadata is 1 column type array, again I can query the table. What I found I can do though is create a standard SQL table in beeline with all its columns defined, and then insert into that table the contents of my invisible parquet table, but I assume that removes the data from parquet and stores it in hive, and I'd prefer to stick with parquet. Ideally i'd like to be able to run CREATE TEMPORARY TABLE XYZ USING org.apache.spark.sql.parquet OPTIONS ( path "/user/ubuntu/file_with_id.par" define my table columns ) Is something like that possible, does that make any sense? Thanks Tom