Re: Creating remote tables using PySpark

2024-03-07 Thread Tom Barber
Okay that was some caching issue. Now there is a shared mount point between
the place the pyspark code is executed and the spark nodes it runs. Hrmph,
I was hoping that wouldn't be the case. Fair enough!

On Thu, Mar 7, 2024 at 11:23 PM Tom Barber  wrote:

> Okay interesting, maybe my assumption was incorrect, although I'm still
> confused.
>
> I tried to mount a central mount point that would be the same on my local
> machine and the container. Same error although I moved the path to
> /tmp/hive/data/hive/ but when I rerun the test code to save a table,
> the complaint is still for
>
> Warehouse Dir: file:/tmp/hive/data/hive/warehouse
> Metastore URIs: thrift://192.168.1.245:9083
> Warehouse Dir: file:/tmp/hive/data/hive/warehouse
> Metastore URIs: thrift://192.168.1.245:9083
> ERROR FileOutputCommitter: Mkdirs failed to create
> file:/data/hive/warehouse/input.db/accounts_20240307_232110_1_0_6_post21_g4fdc321_d20240307/_temporary/0
>
> so what is /data/hive even referring to when I print out the spark conf
> values and neither now refer to /data/hive/
>
> On Thu, Mar 7, 2024 at 9:49 PM Tom Barber  wrote:
>
>> Wonder if anyone can just sort my brain out here as to whats possible or
>> not.
>>
>> I have a container running Spark, with Hive and a ThriftServer. I want to
>> run code against it remotely.
>>
>> If I take something simple like this
>>
>> from pyspark.sql import SparkSession
>> from pyspark.sql.types import StructType, StructField, IntegerType,
>> StringType
>>
>> # Initialize SparkSession
>> spark = SparkSession.builder \
>> .appName("ShowDatabases") \
>> .master("spark://192.168.1.245:7077") \
>> .config("spark.sql.warehouse.dir", "file:/data/hive/warehouse") \
>> .config("hive.metastore.uris","thrift://192.168.1.245:9083")\
>> .enableHiveSupport() \
>> .getOrCreate()
>>
>> # Define schema of the DataFrame
>> schema = StructType([
>> StructField("id", IntegerType(), True),
>> StructField("name", StringType(), True)
>> ])
>>
>> # Data to be converted into a DataFrame
>> data = [(1, "John Doe"), (2, "Jane Doe"), (3, "Mike Johnson")]
>>
>> # Create DataFrame
>> df = spark.createDataFrame(data, schema)
>>
>> # Show the DataFrame (optional, for verification)
>> df.show()
>>
>> # Save the DataFrame to a table named "my_table"
>> df.write.mode("overwrite").saveAsTable("my_table")
>>
>> # Stop the SparkSession
>> spark.stop()
>>
>> When I run it in the container it runs fine, but when I run it remotely
>> it says:
>>
>> : java.io.FileNotFoundException: File
>> file:/data/hive/warehouse/my_table/_temporary/0 does not exist
>> at
>> org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:597)
>> at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
>> at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
>> at
>> org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
>> at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
>> at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
>> at
>> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:334)
>> at
>> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:404)
>> at
>> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:377)
>> at
>> org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
>> at
>> org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:192)
>>
>> My assumption is that its trying to look on my local machine for
>> /data/hive/warehouse and failing because on the remote box I can see those
>> folders.
>>
>> So the question is, if you're not backing it with hadoop or something do
>> you have to mount the drive in the same place on the computer running the
>> pyspark? Or am I missing a config option somewhere?
>>
>> Thanks!
>>
>


Re: Creating remote tables using PySpark

2024-03-07 Thread Tom Barber
Okay interesting, maybe my assumption was incorrect, although I'm still
confused.

I tried to mount a central mount point that would be the same on my local
machine and the container. Same error although I moved the path to
/tmp/hive/data/hive/ but when I rerun the test code to save a table,
the complaint is still for

Warehouse Dir: file:/tmp/hive/data/hive/warehouse
Metastore URIs: thrift://192.168.1.245:9083
Warehouse Dir: file:/tmp/hive/data/hive/warehouse
Metastore URIs: thrift://192.168.1.245:9083
ERROR FileOutputCommitter: Mkdirs failed to create
file:/data/hive/warehouse/input.db/accounts_20240307_232110_1_0_6_post21_g4fdc321_d20240307/_temporary/0

so what is /data/hive even referring to when I print out the spark conf
values and neither now refer to /data/hive/

On Thu, Mar 7, 2024 at 9:49 PM Tom Barber  wrote:

> Wonder if anyone can just sort my brain out here as to whats possible or
> not.
>
> I have a container running Spark, with Hive and a ThriftServer. I want to
> run code against it remotely.
>
> If I take something simple like this
>
> from pyspark.sql import SparkSession
> from pyspark.sql.types import StructType, StructField, IntegerType,
> StringType
>
> # Initialize SparkSession
> spark = SparkSession.builder \
> .appName("ShowDatabases") \
> .master("spark://192.168.1.245:7077") \
> .config("spark.sql.warehouse.dir", "file:/data/hive/warehouse") \
> .config("hive.metastore.uris","thrift://192.168.1.245:9083")\
> .enableHiveSupport() \
> .getOrCreate()
>
> # Define schema of the DataFrame
> schema = StructType([
> StructField("id", IntegerType(), True),
> StructField("name", StringType(), True)
> ])
>
> # Data to be converted into a DataFrame
> data = [(1, "John Doe"), (2, "Jane Doe"), (3, "Mike Johnson")]
>
> # Create DataFrame
> df = spark.createDataFrame(data, schema)
>
> # Show the DataFrame (optional, for verification)
> df.show()
>
> # Save the DataFrame to a table named "my_table"
> df.write.mode("overwrite").saveAsTable("my_table")
>
> # Stop the SparkSession
> spark.stop()
>
> When I run it in the container it runs fine, but when I run it remotely it
> says:
>
> : java.io.FileNotFoundException: File
> file:/data/hive/warehouse/my_table/_temporary/0 does not exist
> at
> org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:597)
> at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
> at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
> at
> org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
> at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
> at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
> at
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:334)
> at
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:404)
> at
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:377)
> at
> org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
> at
> org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:192)
>
> My assumption is that its trying to look on my local machine for
> /data/hive/warehouse and failing because on the remote box I can see those
> folders.
>
> So the question is, if you're not backing it with hadoop or something do
> you have to mount the drive in the same place on the computer running the
> pyspark? Or am I missing a config option somewhere?
>
> Thanks!
>


Creating remote tables using PySpark

2024-03-07 Thread Tom Barber
Wonder if anyone can just sort my brain out here as to whats possible or
not.

I have a container running Spark, with Hive and a ThriftServer. I want to
run code against it remotely.

If I take something simple like this

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType,
StringType

# Initialize SparkSession
spark = SparkSession.builder \
.appName("ShowDatabases") \
.master("spark://192.168.1.245:7077") \
.config("spark.sql.warehouse.dir", "file:/data/hive/warehouse") \
.config("hive.metastore.uris","thrift://192.168.1.245:9083")\
.enableHiveSupport() \
.getOrCreate()

# Define schema of the DataFrame
schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True)
])

# Data to be converted into a DataFrame
data = [(1, "John Doe"), (2, "Jane Doe"), (3, "Mike Johnson")]

# Create DataFrame
df = spark.createDataFrame(data, schema)

# Show the DataFrame (optional, for verification)
df.show()

# Save the DataFrame to a table named "my_table"
df.write.mode("overwrite").saveAsTable("my_table")

# Stop the SparkSession
spark.stop()

When I run it in the container it runs fine, but when I run it remotely it
says:

: java.io.FileNotFoundException: File
file:/data/hive/warehouse/my_table/_temporary/0 does not exist
at
org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:597)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
at
org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
at
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:334)
at
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:404)
at
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:377)
at
org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
at
org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:192)

My assumption is that its trying to look on my local machine for
/data/hive/warehouse and failing because on the remote box I can see those
folders.

So the question is, if you're not backing it with hadoop or something do
you have to mount the drive in the same place on the computer running the
pyspark? Or am I missing a config option somewhere?

Thanks!


Unsubscribe

2023-09-11 Thread Tom Praison
Unsubscribe


Re: [EXTERNAL] Re: Re: Re: Stage level scheduling - lower the number of executors when using GPUs

2022-11-04 Thread Tom Graves
 So I'm not sure I completely follow. Are you asking for a way to change the 
limit without having to do the repartition?  And your DL software doesn't care 
if you got say 30 executors instead of 20?  Normally I would expect the number 
fo partitions at that point to be 200 (or whatever you set for your shuffle 
partitions) unless you are using AQE coalescing partitions functionality and 
then it could change. Are you using the latter?
> Normally I try to aim for anything between 30s-5m per task (failure-wise), 
> depending on the cluster, its stability, etc. But in this case, individual 
> tasks can take 30-60 minutes, if not much more. Any failure during this long 
> time is pretty expensive.
Are you saying when you manually do the repartition your DL tasks take 30-60 
minutes?  so again you want like AQE coalesce partitions to kick in to attempt 
to pick partition sizes for your?


Tom

On Thursday, November 3, 2022 at 03:18:07 PM CDT, Shay Elbaz 
 wrote:  
 
 #yiv4404278030 P {margin-top:0;margin-bottom:0;}This is exactly what we ended 
up doing! The only drawback I saw with this approach is that the GPU tasks get 
pretty big (in terms of data and compute time), and task failures become 
expansive. That's why I reached out to the mailing list in the first place  
Normally I try to aim for anything between 30s-5m per task (failure-wise), 
depending on the cluster, its stability, etc. But in this case, individual 
tasks can take 30-60 minutes, if not much more. Any failure during this long 
time is pretty expensive.

ShayFrom: Tom Graves 
Sent: Thursday, November 3, 2022 7:56 PM
To: Artemis User ; user@spark.apache.org 
; Shay Elbaz 
Subject: [EXTERNAL] Re: Re: Re: Stage level scheduling - lower the number of 
executors when using GPUs 


| 
ATTENTION: This email originated from outside of GM.
 |


 Stage level scheduling does not allow you to change configs right now. This is 
something we thought about as follow on but have never implemented.  How many 
tasks on the DL stage are you running?  The typical case is run some etl lots 
of tasks... do mapPartitions and then run your DL stuff, before that 
mapPartitions you could do a repartition if necessary to get to exactly the 
number of tasks you want (20).  That way even if maxExecutors=500 you will only 
ever need 20 or whatever you repartition to and spark isn't going to ask for 
more then that.
Tom

On Thursday, November 3, 2022 at 11:10:31 AM CDT, Shay Elbaz 
 wrote:

Thanks again Artemis, I really appreciate it. 
I have watched the video but did not find an answer.
Please bear with me just one more iteration 
Maybe I'll be more specific:Suppose I start the application with 
maxExecutors=500, executors.cores=2, because that's the amount of resources 
needed for the ETL part. But for the DL part I only need 20 GPUs. SLS API only 
allows to set the resources per executor/task, so Spark would (try to) allocate 
up to 500 GPUs, assuming I configure the profile with 1 GPU per executor. So, 
the question is how do I limit the stage resources to 20 GPUs total? 
Thanks again,Shay
From: Artemis User 
Sent: Thursday, November 3, 2022 5:23 PM
To: user@spark.apache.org 
Subject: [EXTERNAL] Re: Re: Stage level scheduling - lower the number of 
executors when using GPUs 


| 
ATTENTION: This email originated from outside of GM.
 |


  Shay,  You may find this video helpful (with some API code samples that you 
are looking for). https://www.youtube.com/watch?v=JNQu-226wUc=171s.  The 
issue here isn't how to limit the number of executors but to request for the 
right GPU-enabled executors dynamically.  Those executors used in pre-GPU 
stages should be returned back to resource managers with dynamic resource 
allocation enabled (and with the right DRA policies).  Hope this helps..

Unfortunately there isn't a lot of detailed docs for this topic since GPU 
acceleration is kind of new in Spark (not straightforward like in TF).   I wish 
the Spark doc team could provide more details in the next release...

On 11/3/22 2:37 AM, Shay Elbaz wrote:

Thanks Artemis. We are not using Rapids, 
but rather using GPUs through the Stage Level Scheduling feature with 
ResourceProfile. In Kubernetes you have to turn on shuffle tracking for dynamic 
allocation, anyhow.The question is how we can limit thenumber of executors when 
building a new ResourceProfile, directly (API) or indirectly (some advanced 
workaround).
Thanks,Shay 
From: Artemis User
Sent: Thursday, November 3, 2022 1:16 AM
To: user@spark.apache.org 
Subject: [EXTERNAL] Re: Stage level scheduling - lower the number of executors 
when using GPUs 
| 
ATTENTION: This email originated from outside of GM.
 |


  Are you using Rapids for GPU support in Spark?  Couple of options you may 
want to try:
   
   - In addition to dynamic allocation turned on, you may also need to turn on 
external shuffling service.   

   - Sounds like you are using Kubernetes.  In that case, you may also need to 
turn on shuf

Re: [EXTERNAL] Re: Re: Stage level scheduling - lower the number of executors when using GPUs

2022-11-03 Thread Tom Graves
 Stage level scheduling does not allow you to change configs right now. This is 
something we thought about as follow on but have never implemented.  How many 
tasks on the DL stage are you running?  The typical case is run some etl lots 
of tasks... do mapPartitions and then run your DL stuff, before that 
mapPartitions you could do a repartition if necessary to get to exactly the 
number of tasks you want (20).  That way even if maxExecutors=500 you will only 
ever need 20 or whatever you repartition to and spark isn't going to ask for 
more then that.
Tom

On Thursday, November 3, 2022 at 11:10:31 AM CDT, Shay Elbaz 
 wrote:  
 
 #yiv8086956851 P {margin-top:0;margin-bottom:0;}Thanks again Artemis, I really 
appreciate it. I have watched the video but did not find an answer.
Please bear with me just one more iteration 
Maybe I'll be more specific:Suppose I start the application with 
maxExecutors=500, executors.cores=2, because that's the amount of resources 
needed for the ETL part. But for the DL part I only need 20 GPUs. SLS API only 
allows to set the resources per executor/task, so Spark would (try to) allocate 
up to 500 GPUs, assuming I configure the profile with 1 GPU per executor. So, 
the question is how do I limit the stage resources to 20 GPUs total? 
Thanks again,Shay
From: Artemis User 
Sent: Thursday, November 3, 2022 5:23 PM
To: user@spark.apache.org 
Subject: [EXTERNAL] Re: Re: Stage level scheduling - lower the number of 
executors when using GPUs 


| 
ATTENTION: This email originated from outside of GM.
 |


  Shay,  You may find this video helpful (with some API code samples that you 
are looking for). https://www.youtube.com/watch?v=JNQu-226wUc=171s.  The 
issue here isn't how to limit the number of executors but to request for the 
right GPU-enabled executors dynamically.  Those executors used in pre-GPU 
stages should be returned back to resource managers with dynamic resource 
allocation enabled (and with the right DRA policies).  Hope this helps..

Unfortunately there isn't a lot of detailed docs for this topic since GPU 
acceleration is kind of new in Spark (not straightforward like in TF).   I wish 
the Spark doc team could provide more details in the next release...

On 11/3/22 2:37 AM, Shay Elbaz wrote:

#yiv8086956851 #yiv8086956851 --p {margin-top:0;margin-bottom:0;}#yiv8086956851 
Thanks Artemis. We are not using Rapids, but rather using GPUs through the 
Stage Level Scheduling feature with ResourceProfile. In Kubernetes you have to 
turn on shuffle tracking for dynamic allocation, anyhow.The question is how we 
can limit thenumber of executors when building a new ResourceProfile, directly 
(API) or indirectly (some advanced workaround).
Thanks,Shay 
From: Artemis User
Sent: Thursday, November 3, 2022 1:16 AM
To: user@spark.apache.org 
Subject: [EXTERNAL] Re: Stage level scheduling - lower the number of executors 
when using GPUs 
| 
ATTENTION: This email originated from outside of GM.
 |


  Are you using Rapids for GPU support in Spark?  Couple of options you may 
want to try:
   
   - In addition to dynamic allocation turned on, you may also need to turn on 
external shuffling service.   

   - Sounds like you are using Kubernetes.  In that case, you may also need to 
turn on shuffle tracking.   

   - The "stages" are controlled by the APIs.  The APIs for dynamic resource 
request (change of stage) do exist, but only for RDDs (e.g. TaskResourceRequest 
and ExecutorResourceRequest).

On 11/2/22 11:30 AM, Shay Elbaz wrote:

#yiv8086956851 #yiv8086956851 --p {margin-top:0;margin-bottom:0;}#yiv8086956851 
Hi,
Our typical applications need lessexecutors for a GPU stage than for a CPU 
stage. We are using dynamic allocation with stage level scheduling, and Spark 
tries to maximize the number of executors also during the GPU stage, causing a 
bit of resources chaos in the cluster. This forces us to use a lower value for 
'maxExecutors' in the first place, at the cost of the CPU stages performance. 
Or try to solve this in the Kubernets scheduler level, which is not 
straightforward and doesn't feel like the right way to go.
Is there a way to effectively use less executors in Stage Level Scheduling? The 
API does not seem to include such an option, but maybe there is some more 
advanced workaround?
Thanks,Shay 

 






  

Re: [Spark Core, PySpark] Separate stage level scheduling for consecutive map functions

2021-08-05 Thread Tom Graves
 As Sean mentioned its only available at Stage level but you said you don't 
want to shuffle so splitting into stages doesn't help you.  Without more 
details it seems like you could "hack" this by just requesting an executor with 
1 GPU (allowing 2 tasks per gpu) and 2 CPUs and the one task would use the GPU 
and the other could just use the CPU.  Perhaps that is to simplistic or brittle 
though.
TomOn Saturday, July 31, 2021, 03:56:18 AM CDT, Andreas Kunft 
 wrote:  
 
 I have a setup with two work intensive tasks, one map using GPU followed by a 
map using only CPU.
Using stage level resource scheduling, I request a GPU node, but would also 
like to execute the consecutive CPU map on a different executor so that the GPU 
node is not blocked.
However, spark will always combine the two maps due to the narrow dependency, 
and thus, I can not define two different resource requirements.
So the question is: can I force the two map functions on different executors 
without shuffling or even better is there a plan to enable this by assigning 
different resource requirements.
Best  

Re: Distributing a FlatMap across a Spark Cluster

2021-06-23 Thread Tom Barber
Looks like repartitioning was my friend, seems to be distributed across the
cluster now.

All good. Thanks!


On Wed, Jun 23, 2021 at 2:18 PM Tom Barber  wrote:

> Okay so I tried another idea which was to use a real simple class to drive
> a mapPartitions... because logic in my head seems to suggest that I want to
> map my partitions...
>
> @SerialVersionUID(100L)
> class RunCrawl extends Serializable{
>   def mapCrawl(x: Iterator[(String, Iterable[Resource])], job:
> SparklerJob): Iterator[CrawlData] = {
> val m = 1000
> x.flatMap({case (grp, rs) => new FairFetcher(job, rs.iterator, m,
>   FetchFunction, ParseFunction, OutLinkFilterFunction,
> StatusUpdateSolrTransformer)})
>   }
>
>   def runCrawl(f: RDD[(String, Iterable[Resource])], job: SparklerJob):
> RDD[CrawlData] = {
> f.mapPartitions( x => mapCrawl(x, job))
>
>   }
>
> }
>
> That is what it looks like. But the task execution window in the cluster
> looks the same:
>
> https://pasteboard.co/K7WrBnV.png
>
> 1 task on a single node.
>
> I feel like I'm missing something obvious here about either
>
> a) how spark works
> b) how it divides up partitions to tasks
> c) the fact its a POJO and not a file of stuff.
>
> Or probably some of all 3.
>
> Tom
>
> On Wed, Jun 23, 2021 at 11:44 AM Tom Barber  wrote:
>
>> (I should point out that I'm diagnosing this by looking at the active
>> tasks https://pasteboard.co/K7VryDJ.png, if I'm reading it incorrectly,
>> let me know)
>>
>> On Wed, Jun 23, 2021 at 11:38 AM Tom Barber  wrote:
>>
>>> Uff hello fine people.
>>>
>>> So the cause of the above issue was, unsurprisingly, human error. I
>>> found a local[*] spark master config which gazumped my own one so
>>> mystery solved. But I have another question, that is still the crux of this
>>> problem:
>>>
>>> Here's a bit of trimmed code, that I'm currently testing with. I
>>> deliberately stuck in a repartition(50), just to force it to, what I
>>> believe was chunk it up and distribute it. Which is all good.
>>>
>>> override def run(): Unit = {
>>> ...
>>>
>>> val rdd = new MemexCrawlDbRDD(sc, job, maxGroups = topG, topN = topN)
>>> val f = rdd.map(r => (r.getGroup, r))
>>>   .groupByKey().repartition(50);
>>>
>>> val c = f.getNumPartitions
>>>   val fetchedRdd = f.flatMap({ case (grp, rs) => new FairFetcher(job, 
>>> rs.iterator, localFetchDelay,
>>> FetchFunction, ParseFunction, OutLinkFilterFunction, 
>>> StatusUpdateSolrTransformer) })
>>>   .persist()
>>>
>>> val d = fetchedRdd.getNumPartitions
>>>
>>> ...
>>>
>>> val scoredRdd = score(fetchedRdd)
>>>
>>> ...
>>>
>>> }
>>>
>>> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
>>>   val job = this.job.asInstanceOf[SparklerJob]
>>>
>>>   val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))
>>>
>>>   val scoreUpdateRdd: RDD[SolrInputDocument] = scoredRdd.map(d => 
>>> ScoreUpdateSolrTransformer(d))
>>>   val scoreUpdateFunc = new SolrStatusUpdate(job)
>>>   sc.runJob(scoreUpdateRdd, scoreUpdateFunc)
>>>
>>> }
>>>
>>>
>>> Basically for anyone new to this, the business logic lives inside the
>>> FairFetcher and I need that distributed over all the nodes in spark cluster.
>>>
>>> Here's a quick illustration of what I'm seeing:
>>> https://pasteboard.co/K7VovBO.png
>>>
>>> It chunks up to code and distributes the tasks across the cluster, but
>>> that occurs _prior_ to the business logic  in the FlatMap being executed.
>>>
>>> So specifically, has anyone got any ideas about how to split that
>>> flatmap operation up so the RDD processing runs across the nodes, not
>>> limited to a single node?
>>>
>>> Thanks for all your help so far,
>>>
>>> Tom
>>>
>>> On Wed, Jun 9, 2021 at 8:08 PM Tom Barber  wrote:
>>>
>>>> Ah no sorry, so in the load image, the crawl has just kicked off on the
>>>> driver node which is why its flagged red and the load is spiking.
>>>> https://pasteboard.co/K5QHOJN.png here's the cluster now its been
>>>> running a while. The red node is still (and is always every time I tested
>>>> it) the driver node.
>>>>
>>>> Tom
>>>>
>>>>
>&g

Re: Distributing a FlatMap across a Spark Cluster

2021-06-23 Thread Tom Barber
Okay so I tried another idea which was to use a real simple class to drive
a mapPartitions... because logic in my head seems to suggest that I want to
map my partitions...

@SerialVersionUID(100L)
class RunCrawl extends Serializable{
  def mapCrawl(x: Iterator[(String, Iterable[Resource])], job:
SparklerJob): Iterator[CrawlData] = {
val m = 1000
x.flatMap({case (grp, rs) => new FairFetcher(job, rs.iterator, m,
  FetchFunction, ParseFunction, OutLinkFilterFunction,
StatusUpdateSolrTransformer)})
  }

  def runCrawl(f: RDD[(String, Iterable[Resource])], job: SparklerJob):
RDD[CrawlData] = {
f.mapPartitions( x => mapCrawl(x, job))

  }

}

That is what it looks like. But the task execution window in the cluster
looks the same:

https://pasteboard.co/K7WrBnV.png

1 task on a single node.

I feel like I'm missing something obvious here about either

a) how spark works
b) how it divides up partitions to tasks
c) the fact its a POJO and not a file of stuff.

Or probably some of all 3.

Tom

On Wed, Jun 23, 2021 at 11:44 AM Tom Barber  wrote:

> (I should point out that I'm diagnosing this by looking at the active
> tasks https://pasteboard.co/K7VryDJ.png, if I'm reading it incorrectly,
> let me know)
>
> On Wed, Jun 23, 2021 at 11:38 AM Tom Barber  wrote:
>
>> Uff hello fine people.
>>
>> So the cause of the above issue was, unsurprisingly, human error. I found
>> a local[*] spark master config which gazumped my own one so mystery
>> solved. But I have another question, that is still the crux of this problem:
>>
>> Here's a bit of trimmed code, that I'm currently testing with. I
>> deliberately stuck in a repartition(50), just to force it to, what I
>> believe was chunk it up and distribute it. Which is all good.
>>
>> override def run(): Unit = {
>> ...
>>
>> val rdd = new MemexCrawlDbRDD(sc, job, maxGroups = topG, topN = topN)
>> val f = rdd.map(r => (r.getGroup, r))
>>   .groupByKey().repartition(50);
>>
>> val c = f.getNumPartitions
>>   val fetchedRdd = f.flatMap({ case (grp, rs) => new FairFetcher(job, 
>> rs.iterator, localFetchDelay,
>> FetchFunction, ParseFunction, OutLinkFilterFunction, 
>> StatusUpdateSolrTransformer) })
>>   .persist()
>>
>> val d = fetchedRdd.getNumPartitions
>>
>> ...
>>
>> val scoredRdd = score(fetchedRdd)
>>
>> ...
>>
>> }
>>
>> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
>>   val job = this.job.asInstanceOf[SparklerJob]
>>
>>   val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))
>>
>>   val scoreUpdateRdd: RDD[SolrInputDocument] = scoredRdd.map(d => 
>> ScoreUpdateSolrTransformer(d))
>>   val scoreUpdateFunc = new SolrStatusUpdate(job)
>>   sc.runJob(scoreUpdateRdd, scoreUpdateFunc)
>>
>> }
>>
>>
>> Basically for anyone new to this, the business logic lives inside the
>> FairFetcher and I need that distributed over all the nodes in spark cluster.
>>
>> Here's a quick illustration of what I'm seeing:
>> https://pasteboard.co/K7VovBO.png
>>
>> It chunks up to code and distributes the tasks across the cluster, but
>> that occurs _prior_ to the business logic  in the FlatMap being executed.
>>
>> So specifically, has anyone got any ideas about how to split that flatmap
>> operation up so the RDD processing runs across the nodes, not limited to a
>> single node?
>>
>> Thanks for all your help so far,
>>
>> Tom
>>
>> On Wed, Jun 9, 2021 at 8:08 PM Tom Barber  wrote:
>>
>>> Ah no sorry, so in the load image, the crawl has just kicked off on the
>>> driver node which is why its flagged red and the load is spiking.
>>> https://pasteboard.co/K5QHOJN.png here's the cluster now its been
>>> running a while. The red node is still (and is always every time I tested
>>> it) the driver node.
>>>
>>> Tom
>>>
>>>
>>>
>>> On Wed, Jun 9, 2021 at 8:03 PM Sean Owen  wrote:
>>>
>>>> Where do you see that ... I see 3 executors busy at first. If that's
>>>> the crawl then ?
>>>>
>>>> On Wed, Jun 9, 2021 at 1:59 PM Tom Barber  wrote:
>>>>
>>>>> Yeah :)
>>>>>
>>>>> But it's all running through the same node. So I can run multiple
>>>>> tasks of the same type on the same node(the driver), but I can't run
>>>>> multiple tasks on multiple nodes.
>>>>>
>>>>> On Wed, Jun 9, 2021 at 7:57 PM Sean Owen  wrote:
>>>

Re: Distributing a FlatMap across a Spark Cluster

2021-06-23 Thread Tom Barber
(I should point out that I'm diagnosing this by looking at the active tasks
https://pasteboard.co/K7VryDJ.png, if I'm reading it incorrectly, let me
know)

On Wed, Jun 23, 2021 at 11:38 AM Tom Barber  wrote:

> Uff hello fine people.
>
> So the cause of the above issue was, unsurprisingly, human error. I found
> a local[*] spark master config which gazumped my own one so mystery
> solved. But I have another question, that is still the crux of this problem:
>
> Here's a bit of trimmed code, that I'm currently testing with. I
> deliberately stuck in a repartition(50), just to force it to, what I
> believe was chunk it up and distribute it. Which is all good.
>
> override def run(): Unit = {
> ...
>
> val rdd = new MemexCrawlDbRDD(sc, job, maxGroups = topG, topN = topN)
> val f = rdd.map(r => (r.getGroup, r))
>   .groupByKey().repartition(50);
>
> val c = f.getNumPartitions
>   val fetchedRdd = f.flatMap({ case (grp, rs) => new FairFetcher(job, 
> rs.iterator, localFetchDelay,
> FetchFunction, ParseFunction, OutLinkFilterFunction, 
> StatusUpdateSolrTransformer) })
>   .persist()
>
> val d = fetchedRdd.getNumPartitions
>
> ...
>
> val scoredRdd = score(fetchedRdd)
>
> ...
>
> }
>
> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
>   val job = this.job.asInstanceOf[SparklerJob]
>
>   val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))
>
>   val scoreUpdateRdd: RDD[SolrInputDocument] = scoredRdd.map(d => 
> ScoreUpdateSolrTransformer(d))
>   val scoreUpdateFunc = new SolrStatusUpdate(job)
>   sc.runJob(scoreUpdateRdd, scoreUpdateFunc)
>
> }
>
>
> Basically for anyone new to this, the business logic lives inside the
> FairFetcher and I need that distributed over all the nodes in spark cluster.
>
> Here's a quick illustration of what I'm seeing:
> https://pasteboard.co/K7VovBO.png
>
> It chunks up to code and distributes the tasks across the cluster, but
> that occurs _prior_ to the business logic  in the FlatMap being executed.
>
> So specifically, has anyone got any ideas about how to split that flatmap
> operation up so the RDD processing runs across the nodes, not limited to a
> single node?
>
> Thanks for all your help so far,
>
> Tom
>
> On Wed, Jun 9, 2021 at 8:08 PM Tom Barber  wrote:
>
>> Ah no sorry, so in the load image, the crawl has just kicked off on the
>> driver node which is why its flagged red and the load is spiking.
>> https://pasteboard.co/K5QHOJN.png here's the cluster now its been
>> running a while. The red node is still (and is always every time I tested
>> it) the driver node.
>>
>> Tom
>>
>>
>>
>> On Wed, Jun 9, 2021 at 8:03 PM Sean Owen  wrote:
>>
>>> Where do you see that ... I see 3 executors busy at first. If that's the
>>> crawl then ?
>>>
>>> On Wed, Jun 9, 2021 at 1:59 PM Tom Barber  wrote:
>>>
>>>> Yeah :)
>>>>
>>>> But it's all running through the same node. So I can run multiple tasks
>>>> of the same type on the same node(the driver), but I can't run multiple
>>>> tasks on multiple nodes.
>>>>
>>>> On Wed, Jun 9, 2021 at 7:57 PM Sean Owen  wrote:
>>>>
>>>>> Wait. Isn't that what you were trying to parallelize in the first
>>>>> place?
>>>>>
>>>>> On Wed, Jun 9, 2021 at 1:49 PM Tom Barber  wrote:
>>>>>
>>>>>> Yeah but that something else is the crawl being run, which is
>>>>>> triggered from inside the RDDs, because the log output is slowly 
>>>>>> outputting
>>>>>> crawl data.
>>>>>>
>>>>>>
>>>> Spicule Limited is registered in England & Wales. Company Number:
>>>> 09954122. Registered office: First Floor, Telecom House, 125-135 Preston
>>>> Road, Brighton, England, BN1 6AF. VAT No. 251478891.
>>>>
>>>>
>>>> All engagements are subject to Spicule Terms and Conditions of
>>>> Business. This email and its contents are intended solely for the
>>>> individual to whom it is addressed and may contain information that is
>>>> confidential, privileged or otherwise protected from disclosure,
>>>> distributing or copying. Any views or opinions presented in this email are
>>>> solely those of the author and do not necessarily represent those of
>>>> Spicule Limited. The company accepts no liability for any damage caused by
>>>> any virus transmitted b

Re: Distributing a FlatMap across a Spark Cluster

2021-06-23 Thread Tom Barber
Uff hello fine people.

So the cause of the above issue was, unsurprisingly, human error. I found a
local[*] spark master config which gazumped my own one so mystery
solved. But I have another question, that is still the crux of this problem:

Here's a bit of trimmed code, that I'm currently testing with. I
deliberately stuck in a repartition(50), just to force it to, what I
believe was chunk it up and distribute it. Which is all good.

override def run(): Unit = {
...

val rdd = new MemexCrawlDbRDD(sc, job, maxGroups = topG, topN = topN)
val f = rdd.map(r => (r.getGroup, r))
  .groupByKey().repartition(50);

val c = f.getNumPartitions
  val fetchedRdd = f.flatMap({ case (grp, rs) => new
FairFetcher(job, rs.iterator, localFetchDelay,
FetchFunction, ParseFunction, OutLinkFilterFunction,
StatusUpdateSolrTransformer) })
  .persist()

val d = fetchedRdd.getNumPartitions

...

val scoredRdd = score(fetchedRdd)

...

}

def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
  val job = this.job.asInstanceOf[SparklerJob]

  val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))

  val scoreUpdateRdd: RDD[SolrInputDocument] = scoredRdd.map(d =>
ScoreUpdateSolrTransformer(d))
  val scoreUpdateFunc = new SolrStatusUpdate(job)
  sc.runJob(scoreUpdateRdd, scoreUpdateFunc)

}


Basically for anyone new to this, the business logic lives inside the
FairFetcher and I need that distributed over all the nodes in spark cluster.

Here's a quick illustration of what I'm seeing:
https://pasteboard.co/K7VovBO.png

It chunks up to code and distributes the tasks across the cluster, but that
occurs _prior_ to the business logic  in the FlatMap being executed.

So specifically, has anyone got any ideas about how to split that flatmap
operation up so the RDD processing runs across the nodes, not limited to a
single node?

Thanks for all your help so far,

Tom

On Wed, Jun 9, 2021 at 8:08 PM Tom Barber  wrote:

> Ah no sorry, so in the load image, the crawl has just kicked off on the
> driver node which is why its flagged red and the load is spiking.
> https://pasteboard.co/K5QHOJN.png here's the cluster now its been running
> a while. The red node is still (and is always every time I tested it) the
> driver node.
>
> Tom
>
>
>
> On Wed, Jun 9, 2021 at 8:03 PM Sean Owen  wrote:
>
>> Where do you see that ... I see 3 executors busy at first. If that's the
>> crawl then ?
>>
>> On Wed, Jun 9, 2021 at 1:59 PM Tom Barber  wrote:
>>
>>> Yeah :)
>>>
>>> But it's all running through the same node. So I can run multiple tasks
>>> of the same type on the same node(the driver), but I can't run multiple
>>> tasks on multiple nodes.
>>>
>>> On Wed, Jun 9, 2021 at 7:57 PM Sean Owen  wrote:
>>>
>>>> Wait. Isn't that what you were trying to parallelize in the first place?
>>>>
>>>> On Wed, Jun 9, 2021 at 1:49 PM Tom Barber  wrote:
>>>>
>>>>> Yeah but that something else is the crawl being run, which is
>>>>> triggered from inside the RDDs, because the log output is slowly 
>>>>> outputting
>>>>> crawl data.
>>>>>
>>>>>
>>> Spicule Limited is registered in England & Wales. Company Number:
>>> 09954122. Registered office: First Floor, Telecom House, 125-135 Preston
>>> Road, Brighton, England, BN1 6AF. VAT No. 251478891.
>>>
>>>
>>> All engagements are subject to Spicule Terms and Conditions of Business.
>>> This email and its contents are intended solely for the individual to whom
>>> it is addressed and may contain information that is confidential,
>>> privileged or otherwise protected from disclosure, distributing or copying.
>>> Any views or opinions presented in this email are solely those of the
>>> author and do not necessarily represent those of Spicule Limited. The
>>> company accepts no liability for any damage caused by any virus transmitted
>>> by this email. If you have received this message in error, please notify us
>>> immediately by reply email before deleting it from your system. Service of
>>> legal notice cannot be effected on Spicule Limited by email.
>>>
>>

-- 


Spicule Limited is registered in England & Wales. Company Number: 
09954122. Registered office: First Floor, Telecom House, 125-135 Preston 
Road, Brighton, England, BN1 6AF. VAT No. 251478891.




All engagements 
are subject to Spicule Terms and Conditions of Business. This email and its 
contents are intended solely for the individual to whom it is addressed and 
may contain information that is confidential, privileged or otherwise 
protected from disclosure, di

Re: Distributing a FlatMap across a Spark Cluster

2021-06-09 Thread Tom Barber
Ah no sorry, so in the load image, the crawl has just kicked off on the
driver node which is why its flagged red and the load is spiking.
https://pasteboard.co/K5QHOJN.png here's the cluster now its been running a
while. The red node is still (and is always every time I tested it) the
driver node.

Tom



On Wed, Jun 9, 2021 at 8:03 PM Sean Owen  wrote:

> Where do you see that ... I see 3 executors busy at first. If that's the
> crawl then ?
>
> On Wed, Jun 9, 2021 at 1:59 PM Tom Barber  wrote:
>
>> Yeah :)
>>
>> But it's all running through the same node. So I can run multiple tasks
>> of the same type on the same node(the driver), but I can't run multiple
>> tasks on multiple nodes.
>>
>> On Wed, Jun 9, 2021 at 7:57 PM Sean Owen  wrote:
>>
>>> Wait. Isn't that what you were trying to parallelize in the first place?
>>>
>>> On Wed, Jun 9, 2021 at 1:49 PM Tom Barber  wrote:
>>>
>>>> Yeah but that something else is the crawl being run, which is triggered
>>>> from inside the RDDs, because the log output is slowly outputting crawl
>>>> data.
>>>>
>>>>
>> Spicule Limited is registered in England & Wales. Company Number:
>> 09954122. Registered office: First Floor, Telecom House, 125-135 Preston
>> Road, Brighton, England, BN1 6AF. VAT No. 251478891.
>>
>>
>> All engagements are subject to Spicule Terms and Conditions of Business.
>> This email and its contents are intended solely for the individual to whom
>> it is addressed and may contain information that is confidential,
>> privileged or otherwise protected from disclosure, distributing or copying.
>> Any views or opinions presented in this email are solely those of the
>> author and do not necessarily represent those of Spicule Limited. The
>> company accepts no liability for any damage caused by any virus transmitted
>> by this email. If you have received this message in error, please notify us
>> immediately by reply email before deleting it from your system. Service of
>> legal notice cannot be effected on Spicule Limited by email.
>>
>

-- 


Spicule Limited is registered in England & Wales. Company Number: 
09954122. Registered office: First Floor, Telecom House, 125-135 Preston 
Road, Brighton, England, BN1 6AF. VAT No. 251478891.




All engagements 
are subject to Spicule Terms and Conditions of Business. This email and its 
contents are intended solely for the individual to whom it is addressed and 
may contain information that is confidential, privileged or otherwise 
protected from disclosure, distributing or copying. Any views or opinions 
presented in this email are solely those of the author and do not 
necessarily represent those of Spicule Limited. The company accepts no 
liability for any damage caused by any virus transmitted by this email. If 
you have received this message in error, please notify us immediately by 
reply email before deleting it from your system. Service of legal notice 
cannot be effected on Spicule Limited by email.


Re: Distributing a FlatMap across a Spark Cluster

2021-06-09 Thread Tom Barber
Yeah :)

But it's all running through the same node. So I can run multiple tasks of
the same type on the same node(the driver), but I can't run multiple tasks
on multiple nodes.

On Wed, Jun 9, 2021 at 7:57 PM Sean Owen  wrote:

> Wait. Isn't that what you were trying to parallelize in the first place?
>
> On Wed, Jun 9, 2021 at 1:49 PM Tom Barber  wrote:
>
>> Yeah but that something else is the crawl being run, which is triggered
>> from inside the RDDs, because the log output is slowly outputting crawl
>> data.
>>
>>

-- 


Spicule Limited is registered in England & Wales. Company Number: 
09954122. Registered office: First Floor, Telecom House, 125-135 Preston 
Road, Brighton, England, BN1 6AF. VAT No. 251478891.




All engagements 
are subject to Spicule Terms and Conditions of Business. This email and its 
contents are intended solely for the individual to whom it is addressed and 
may contain information that is confidential, privileged or otherwise 
protected from disclosure, distributing or copying. Any views or opinions 
presented in this email are solely those of the author and do not 
necessarily represent those of Spicule Limited. The company accepts no 
liability for any damage caused by any virus transmitted by this email. If 
you have received this message in error, please notify us immediately by 
reply email before deleting it from your system. Service of legal notice 
cannot be effected on Spicule Limited by email.


Re: Distributing a FlatMap across a Spark Cluster

2021-06-09 Thread Tom Barber
Yeah but that something else is the crawl being run, which is triggered
from inside the RDDs, because the log output is slowly outputting crawl
data.

On Wed, 9 Jun 2021, 19:47 Sean Owen,  wrote:

> That looks like you did some work on the cluster, and now it's stuck doing
> something else on the driver - not doing everything on 1 machine.
>
> On Wed, Jun 9, 2021 at 12:43 PM Tom Barber  wrote:
>
>> And also as this morning: https://pasteboard.co/K5Q9aEf.png
>>
>> Removing the cpu pins gives me more tasks but as you can see here:
>>
>> https://pasteboard.co/K5Q9GO0.png
>>
>> It just loads up a single server.
>>
>>

-- 


Spicule Limited is registered in England & Wales. Company Number: 
09954122. Registered office: First Floor, Telecom House, 125-135 Preston 
Road, Brighton, England, BN1 6AF. VAT No. 251478891.




All engagements 
are subject to Spicule Terms and Conditions of Business. This email and its 
contents are intended solely for the individual to whom it is addressed and 
may contain information that is confidential, privileged or otherwise 
protected from disclosure, distributing or copying. Any views or opinions 
presented in this email are solely those of the author and do not 
necessarily represent those of Spicule Limited. The company accepts no 
liability for any damage caused by any virus transmitted by this email. If 
you have received this message in error, please notify us immediately by 
reply email before deleting it from your system. Service of legal notice 
cannot be effected on Spicule Limited by email.


Re: Distributing a FlatMap across a Spark Cluster

2021-06-09 Thread Tom Barber
No, this is an on demand databricks cluster.

On Wed, Jun 9, 2021 at 6:54 PM Mich Talebzadeh 
wrote:

>
>
> Are you running this in Managed Instance Group (MIG)?
>
> https://cloud.google.com/compute/docs/instance-groups
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 9 Jun 2021 at 18:43, Tom Barber  wrote:
>
>> And also as this morning: https://pasteboard.co/K5Q9aEf.png
>>
>> Removing the cpu pins gives me more tasks but as you can see here:
>>
>> https://pasteboard.co/K5Q9GO0.png
>>
>> It just loads up a single server.
>>
>> On Wed, Jun 9, 2021 at 6:32 PM Tom Barber  wrote:
>>
>>> Thanks Chris
>>>
>>> All the code I have on both sides is as modern as it allows. Running
>>> Spark 3.1.1 and Scala 2.12.
>>>
>>> I stuck some logging in to check reality:
>>>
>>> LOG.info("GROUP COUNT: " + fetchedgrp.count())
>>> val cgrp = fetchedgrp.collect()
>>> cgrp.foreach(f => {
>>>   LOG.info("Out1 :" + f._1)
>>>   f._2.foreach(u => {
>>> LOG.info("ID:" + u.getId)
>>> LOG.info("GROUP:" + u.getGroup)
>>>   })
>>> })
>>> LOG.info("PARTITION COUNT:" + fetchedgrp.getNumPartitions)
>>> val fetchedRdd = fetchedgrp.flatMap({ case (grp, rs) => new 
>>> FairFetcher(job, rs.iterator, localFetchDelay,
>>> FetchFunction, ParseFunction, OutLinkFilterFunction, 
>>> StatusUpdateSolrTransformer) })
>>>   .persist()
>>>
>>> LOG.info("FETCHED PARTITIONS: " + fetchedRdd.getNumPartitions)
>>> LOG.info("CoUNT: " + fetchedRdd.count())
>>>
>>>
>>> It says I have 5000 groups, which makes sense as its defined in my
>>> command line and both sides claim to have 50 partitions which also makes
>>> sense as I define that in my code as well.
>>>
>>> Then it starts the crawl at the final count line as I guess it needs to
>>> materialize things and so at that point I don't know what the count would
>>> return, but everything else checks out.
>>>
>>> I'll poke around in the other hints you suggested later, thanks for the
>>> help.
>>>
>>> Tom
>>>
>>> On Wed, Jun 9, 2021 at 5:49 PM Chris Martin 
>>> wrote:
>>>
>>>> Hmm then my guesses are (in order of decreasing probability:
>>>>
>>>> * Whatever class makes up fetchedRdd (MemexDeepCrawlDbRDD?) isn't
>>>> compatible with the lastest spark release.
>>>> * You've got 16 threads per task on a 16 core machine.  Should be fine,
>>>> but I wonder if it's confusing things as you don't also set
>>>> spark.executor.cores and Databricks might also default that to 1.
>>>> * There's some custom partitioner in play which is causing everything
>>>> to go to the same partition.
>>>> * The group keys are all hashing to the same value (it's difficult to
>>>> see how this would be the case if the group keys are genuinely different,
>>>> but maybe there's something else going on).
>>>>
>>>> My hints:
>>>>
>>>> 1. Make sure you're using a recent version of sparkler
>>>> 2. Try repartition with a custom partitioner that you know will end
>>>> things to different partitions
>>>> 3. Try either removing "spark.task.cpus":"16"  or setting
>>>> spark.executor.cores to 1.
>>>> 4. print out the group keys and see if there's any weird pattern to
>>>> them.
>>>> 5. See if the same thing happens in spark local.
>>>>
>>>> If you have a reproducible example you can post publically then I'm
>>>> happy to  take a look.
>>>>
>>>> Chris
>>>>
>>>> On Wed, Jun 9, 2021 at 5:17 PM Tom Barber  wrote:
>>>>
>>>>> Yeah to test that I just set the group key to the ID in the record
>>>>> which is a solr supplied UUID, which means effectively you end up with 
>>>>> 4000
>>>>> groups now.
>

Re: Distributing a FlatMap across a Spark Cluster

2021-06-09 Thread Tom Barber
And also as this morning: https://pasteboard.co/K5Q9aEf.png

Removing the cpu pins gives me more tasks but as you can see here:

https://pasteboard.co/K5Q9GO0.png

It just loads up a single server.

On Wed, Jun 9, 2021 at 6:32 PM Tom Barber  wrote:

> Thanks Chris
>
> All the code I have on both sides is as modern as it allows. Running Spark
> 3.1.1 and Scala 2.12.
>
> I stuck some logging in to check reality:
>
> LOG.info("GROUP COUNT: " + fetchedgrp.count())
> val cgrp = fetchedgrp.collect()
> cgrp.foreach(f => {
>   LOG.info("Out1 :" + f._1)
>   f._2.foreach(u => {
> LOG.info("ID:" + u.getId)
> LOG.info("GROUP:" + u.getGroup)
>   })
> })
> LOG.info("PARTITION COUNT:" + fetchedgrp.getNumPartitions)
> val fetchedRdd = fetchedgrp.flatMap({ case (grp, rs) => new FairFetcher(job, 
> rs.iterator, localFetchDelay,
> FetchFunction, ParseFunction, OutLinkFilterFunction, 
> StatusUpdateSolrTransformer) })
>   .persist()
>
> LOG.info("FETCHED PARTITIONS: " + fetchedRdd.getNumPartitions)
> LOG.info("CoUNT: " + fetchedRdd.count())
>
>
> It says I have 5000 groups, which makes sense as its defined in my command
> line and both sides claim to have 50 partitions which also makes sense as I
> define that in my code as well.
>
> Then it starts the crawl at the final count line as I guess it needs to
> materialize things and so at that point I don't know what the count would
> return, but everything else checks out.
>
> I'll poke around in the other hints you suggested later, thanks for the
> help.
>
> Tom
>
> On Wed, Jun 9, 2021 at 5:49 PM Chris Martin  wrote:
>
>> Hmm then my guesses are (in order of decreasing probability:
>>
>> * Whatever class makes up fetchedRdd (MemexDeepCrawlDbRDD?) isn't
>> compatible with the lastest spark release.
>> * You've got 16 threads per task on a 16 core machine.  Should be fine,
>> but I wonder if it's confusing things as you don't also set
>> spark.executor.cores and Databricks might also default that to 1.
>> * There's some custom partitioner in play which is causing everything to
>> go to the same partition.
>> * The group keys are all hashing to the same value (it's difficult to see
>> how this would be the case if the group keys are genuinely different, but
>> maybe there's something else going on).
>>
>> My hints:
>>
>> 1. Make sure you're using a recent version of sparkler
>> 2. Try repartition with a custom partitioner that you know will end
>> things to different partitions
>> 3. Try either removing "spark.task.cpus":"16"  or setting
>> spark.executor.cores to 1.
>> 4. print out the group keys and see if there's any weird pattern to them.
>> 5. See if the same thing happens in spark local.
>>
>> If you have a reproducible example you can post publically then I'm happy
>> to  take a look.
>>
>> Chris
>>
>> On Wed, Jun 9, 2021 at 5:17 PM Tom Barber  wrote:
>>
>>> Yeah to test that I just set the group key to the ID in the record which
>>> is a solr supplied UUID, which means effectively you end up with 4000
>>> groups now.
>>>
>>> On Wed, Jun 9, 2021 at 5:13 PM Chris Martin 
>>> wrote:
>>>
>>>> One thing I would check is this line:
>>>>
>>>> val fetchedRdd = rdd.map(r => (r.getGroup, r))
>>>>
>>>> how many distinct groups do you ended up with?  If there's just one
>>>> then I think you might see the behaviour you observe.
>>>>
>>>> Chris
>>>>
>>>>
>>>> On Wed, Jun 9, 2021 at 4:17 PM Tom Barber  wrote:
>>>>
>>>>> Also just to follow up on that slightly, I did also try off the back
>>>>> of another comment:
>>>>>
>>>>> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
>>>>>   val job = this.job.asInstanceOf[SparklerJob]
>>>>>
>>>>>   val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))
>>>>>
>>>>>   val scoreUpdateRdd: RDD[SolrInputDocument] = 
>>>>> scoredRdd.repartition(50).map(d => ScoreUpdateSolrTransformer(d))
>>>>>
>>>>>
>>>>> Where I repartitioned that scoredRdd map out of interest, it then
>>>>> triggers the FairFetcher function there, instead of in the runJob(), but
>>>>> still on a single executor 
>>>>>
>>>>> Tom
>>>>>
>>>>> On Wed, Jun 9, 

Re: Distributing a FlatMap across a Spark Cluster

2021-06-09 Thread Tom Barber
Thanks Chris

All the code I have on both sides is as modern as it allows. Running Spark
3.1.1 and Scala 2.12.

I stuck some logging in to check reality:

LOG.info("GROUP COUNT: " + fetchedgrp.count())
val cgrp = fetchedgrp.collect()
cgrp.foreach(f => {
  LOG.info("Out1 :" + f._1)
  f._2.foreach(u => {
LOG.info("ID:" + u.getId)
LOG.info("GROUP:" + u.getGroup)
  })
})
LOG.info("PARTITION COUNT:" + fetchedgrp.getNumPartitions)
val fetchedRdd = fetchedgrp.flatMap({ case (grp, rs) => new
FairFetcher(job, rs.iterator, localFetchDelay,
FetchFunction, ParseFunction, OutLinkFilterFunction,
StatusUpdateSolrTransformer) })
  .persist()

LOG.info("FETCHED PARTITIONS: " + fetchedRdd.getNumPartitions)
LOG.info("CoUNT: " + fetchedRdd.count())


It says I have 5000 groups, which makes sense as its defined in my command
line and both sides claim to have 50 partitions which also makes sense as I
define that in my code as well.

Then it starts the crawl at the final count line as I guess it needs to
materialize things and so at that point I don't know what the count would
return, but everything else checks out.

I'll poke around in the other hints you suggested later, thanks for the
help.

Tom

On Wed, Jun 9, 2021 at 5:49 PM Chris Martin  wrote:

> Hmm then my guesses are (in order of decreasing probability:
>
> * Whatever class makes up fetchedRdd (MemexDeepCrawlDbRDD?) isn't
> compatible with the lastest spark release.
> * You've got 16 threads per task on a 16 core machine.  Should be fine,
> but I wonder if it's confusing things as you don't also set
> spark.executor.cores and Databricks might also default that to 1.
> * There's some custom partitioner in play which is causing everything to
> go to the same partition.
> * The group keys are all hashing to the same value (it's difficult to see
> how this would be the case if the group keys are genuinely different, but
> maybe there's something else going on).
>
> My hints:
>
> 1. Make sure you're using a recent version of sparkler
> 2. Try repartition with a custom partitioner that you know will end things
> to different partitions
> 3. Try either removing "spark.task.cpus":"16"  or setting
> spark.executor.cores to 1.
> 4. print out the group keys and see if there's any weird pattern to them.
> 5. See if the same thing happens in spark local.
>
> If you have a reproducible example you can post publically then I'm happy
> to  take a look.
>
> Chris
>
> On Wed, Jun 9, 2021 at 5:17 PM Tom Barber  wrote:
>
>> Yeah to test that I just set the group key to the ID in the record which
>> is a solr supplied UUID, which means effectively you end up with 4000
>> groups now.
>>
>> On Wed, Jun 9, 2021 at 5:13 PM Chris Martin 
>> wrote:
>>
>>> One thing I would check is this line:
>>>
>>> val fetchedRdd = rdd.map(r => (r.getGroup, r))
>>>
>>> how many distinct groups do you ended up with?  If there's just one then
>>> I think you might see the behaviour you observe.
>>>
>>> Chris
>>>
>>>
>>> On Wed, Jun 9, 2021 at 4:17 PM Tom Barber  wrote:
>>>
>>>> Also just to follow up on that slightly, I did also try off the back of
>>>> another comment:
>>>>
>>>> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
>>>>   val job = this.job.asInstanceOf[SparklerJob]
>>>>
>>>>   val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))
>>>>
>>>>   val scoreUpdateRdd: RDD[SolrInputDocument] = 
>>>> scoredRdd.repartition(50).map(d => ScoreUpdateSolrTransformer(d))
>>>>
>>>>
>>>> Where I repartitioned that scoredRdd map out of interest, it then
>>>> triggers the FairFetcher function there, instead of in the runJob(), but
>>>> still on a single executor 
>>>>
>>>> Tom
>>>>
>>>> On Wed, Jun 9, 2021 at 4:11 PM Tom Barber  wrote:
>>>>
>>>>>
>>>>> Okay so what happens is that the crawler reads a bunch of solr data,
>>>>> we're not talking GB's just a list of JSON and turns that into a bunch of
>>>>> RDD's that end up in that flatmap that I linked to first.
>>>>>
>>>>> The fair fetcher is an interface to a pluggable backend that basically
>>>>> takes some of the fields and goes and crawls websites listed in them
>>>>> looking for information. We wrote this code 6 years ago for a DARPA 
>>>>> project
>>>>> tracking down criminals on the web. Now I'm reusing 

Re: Distributing a FlatMap across a Spark Cluster

2021-06-09 Thread Tom Barber
Yeah to test that I just set the group key to the ID in the record which is
a solr supplied UUID, which means effectively you end up with 4000 groups
now.

On Wed, Jun 9, 2021 at 5:13 PM Chris Martin  wrote:

> One thing I would check is this line:
>
> val fetchedRdd = rdd.map(r => (r.getGroup, r))
>
> how many distinct groups do you ended up with?  If there's just one then I
> think you might see the behaviour you observe.
>
> Chris
>
>
> On Wed, Jun 9, 2021 at 4:17 PM Tom Barber  wrote:
>
>> Also just to follow up on that slightly, I did also try off the back of
>> another comment:
>>
>> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
>>   val job = this.job.asInstanceOf[SparklerJob]
>>
>>   val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))
>>
>>   val scoreUpdateRdd: RDD[SolrInputDocument] = 
>> scoredRdd.repartition(50).map(d => ScoreUpdateSolrTransformer(d))
>>
>>
>> Where I repartitioned that scoredRdd map out of interest, it then
>> triggers the FairFetcher function there, instead of in the runJob(), but
>> still on a single executor 
>>
>> Tom
>>
>> On Wed, Jun 9, 2021 at 4:11 PM Tom Barber  wrote:
>>
>>>
>>> Okay so what happens is that the crawler reads a bunch of solr data,
>>> we're not talking GB's just a list of JSON and turns that into a bunch of
>>> RDD's that end up in that flatmap that I linked to first.
>>>
>>> The fair fetcher is an interface to a pluggable backend that basically
>>> takes some of the fields and goes and crawls websites listed in them
>>> looking for information. We wrote this code 6 years ago for a DARPA project
>>> tracking down criminals on the web. Now I'm reusing it but trying to force
>>> it to scale out a bit more.
>>>
>>> Say I have 4000 urls I want to crawl and a 3 node Spark cluster, I want
>>> to push down 1 URL (a few more wont hurt, but crawling 50 urls in parallel
>>> on one node makes my cluster sad) to each executor and have it run a crawl,
>>> then move on and get another one and so on. That way you're not saturating
>>> a node trying to look up all of them and you could add more nodes for
>>> greater capacity pretty quickly. Once the website has been captured, you
>>> can then "score" it for want of a better term to determine its usefulness,
>>> which is where the map is being triggered.
>>>
>>> In answer to your questions Sean, no action seems triggered until you
>>> end up in the score block and the sc.runJob() because thats literally the
>>> next line of functionality as Kafka isn't enabled.
>>>
>>> val fetchedRdd = rdd.map(r => (r.getGroup, r))
>>>   .groupByKey(m).flatMap({ case (grp, rs) => new FairFetcher(job, 
>>> rs.iterator, localFetchDelay,
>>> FetchFunction, ParseFunction, OutLinkFilterFunction, 
>>> StatusUpdateSolrTransformer).toSeq })
>>>   .persist()
>>>
>>> if (kafkaEnable) {
>>>   storeContentKafka(kafkaListeners, kafkaTopic.format(jobId), fetchedRdd)
>>> }
>>> val scoredRdd = score(fetchedRdd)
>>>
>>>
>>> That if block is disabled so the score function runs. Inside of that:
>>>
>>> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
>>>   val job = this.job.asInstanceOf[SparklerJob]
>>>
>>>   val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))
>>>
>>>   val scoreUpdateRdd: RDD[SolrInputDocument] = scoredRdd.map(d => 
>>> ScoreUpdateSolrTransformer(d))
>>>   val scoreUpdateFunc = new SolrStatusUpdate(job)
>>>   sc.runJob(scoreUpdateRdd, scoreUpdateFunc)
>>> 
>>>
>>>
>>> When its doing stuff in the SparkUI I can see that its waiting on the
>>> sc.runJob() line, so thats the execution point.
>>>
>>>
>>> Tom
>>>
>>> On Wed, Jun 9, 2021 at 3:59 PM Sean Owen  wrote:
>>>
>>>> persist() doesn't even persist by itself - just sets it to be persisted
>>>> when it's executed.
>>>> key doesn't matter here, nor partitioning, if this code is trying to
>>>> run things on the driver inadvertently.
>>>> I don't quite grok what the OSS code you linked to is doing, but it's
>>>> running some supplied functions very directly and at a low-level with
>>>> sc.runJob, which might be part of how this can do something unusual.
>>>> How do you trigger any action? what happens after persist()
>>>>
>>>&

Re: Distributing a FlatMap across a Spark Cluster

2021-06-09 Thread Tom Barber
@sam:

def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
  val job = this.job.asInstanceOf[SparklerJob]

  val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))
  val m = 50
  val repRdd = scoredRdd.repartition(m).cache()
  repRdd.take(1)

  val scoreUpdateRdd: RDD[SolrInputDocument] = repRdd.map(d =>
ScoreUpdateSolrTransformer(d))

I did that, but the crawl is executed in that repartition executor (which I
should have pointed out I already know).

Tom

On Wed, Jun 9, 2021 at 4:37 PM Tom Barber  wrote:

> Sorry Sam, I missed that earlier, I'll give it a spin.
>
>
> To everyone involved, this code is old, and not written by me. If you all
> go "oooh, you want to distribute the crawls over the cluster, you don't
> want to do it like that, you should look at XYZ instead" feel free to punt
> different ways of doing this across, I'm happy to refactor the code to
> modernize it/follow better practices.
>
> On Wed, Jun 9, 2021 at 4:25 PM Sam  wrote:
>
>> Like I said In my previous email, can you try this and let me know how
>> many tasks you see?
>>
>> val repRdd = scoredRdd.repartition(50).cache()
>> repRdd.take(1)
>> Then map operation on repRdd here.
>>
>> I’ve done similar map operations in the past and this works.
>>
>> Thanks.
>>
>> On Wed, Jun 9, 2021 at 11:17 AM Tom Barber  wrote:
>>
>>> Also just to follow up on that slightly, I did also try off the back of
>>> another comment:
>>>
>>> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
>>>   val job = this.job.asInstanceOf[SparklerJob]
>>>
>>>   val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))
>>>
>>>   val scoreUpdateRdd: RDD[SolrInputDocument] = 
>>> scoredRdd.repartition(50).map(d => ScoreUpdateSolrTransformer(d))
>>>
>>>
>>> Where I repartitioned that scoredRdd map out of interest, it then
>>> triggers the FairFetcher function there, instead of in the runJob(), but
>>> still on a single executor 
>>>
>>> Tom
>>>
>>> On Wed, Jun 9, 2021 at 4:11 PM Tom Barber  wrote:
>>>
>>>>
>>>> Okay so what happens is that the crawler reads a bunch of solr data,
>>>> we're not talking GB's just a list of JSON and turns that into a bunch of
>>>> RDD's that end up in that flatmap that I linked to first.
>>>>
>>>> The fair fetcher is an interface to a pluggable backend that basically
>>>> takes some of the fields and goes and crawls websites listed in them
>>>> looking for information. We wrote this code 6 years ago for a DARPA project
>>>> tracking down criminals on the web. Now I'm reusing it but trying to force
>>>> it to scale out a bit more.
>>>>
>>>> Say I have 4000 urls I want to crawl and a 3 node Spark cluster, I want
>>>> to push down 1 URL (a few more wont hurt, but crawling 50 urls in parallel
>>>> on one node makes my cluster sad) to each executor and have it run a crawl,
>>>> then move on and get another one and so on. That way you're not saturating
>>>> a node trying to look up all of them and you could add more nodes for
>>>> greater capacity pretty quickly. Once the website has been captured, you
>>>> can then "score" it for want of a better term to determine its usefulness,
>>>> which is where the map is being triggered.
>>>>
>>>> In answer to your questions Sean, no action seems triggered until you
>>>> end up in the score block and the sc.runJob() because thats literally the
>>>> next line of functionality as Kafka isn't enabled.
>>>>
>>>> val fetchedRdd = rdd.map(r => (r.getGroup, r))
>>>>   .groupByKey(m).flatMap({ case (grp, rs) => new FairFetcher(job, 
>>>> rs.iterator, localFetchDelay,
>>>> FetchFunction, ParseFunction, OutLinkFilterFunction, 
>>>> StatusUpdateSolrTransformer).toSeq })
>>>>   .persist()
>>>>
>>>> if (kafkaEnable) {
>>>>   storeContentKafka(kafkaListeners, kafkaTopic.format(jobId), fetchedRdd)
>>>> }
>>>> val scoredRdd = score(fetchedRdd)
>>>>
>>>>
>>>> That if block is disabled so the score function runs. Inside of that:
>>>>
>>>> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
>>>>   val job = this.job.asInstanceOf[SparklerJob]
>>>>
>>>>   val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))
>>>>
>>>>   val scoreUpdate

Re: Distributing a FlatMap across a Spark Cluster

2021-06-09 Thread Tom Barber
Sorry Sam, I missed that earlier, I'll give it a spin.


To everyone involved, this code is old, and not written by me. If you all
go "oooh, you want to distribute the crawls over the cluster, you don't
want to do it like that, you should look at XYZ instead" feel free to punt
different ways of doing this across, I'm happy to refactor the code to
modernize it/follow better practices.

On Wed, Jun 9, 2021 at 4:25 PM Sam  wrote:

> Like I said In my previous email, can you try this and let me know how
> many tasks you see?
>
> val repRdd = scoredRdd.repartition(50).cache()
> repRdd.take(1)
> Then map operation on repRdd here.
>
> I’ve done similar map operations in the past and this works.
>
> Thanks.
>
> On Wed, Jun 9, 2021 at 11:17 AM Tom Barber  wrote:
>
>> Also just to follow up on that slightly, I did also try off the back of
>> another comment:
>>
>> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
>>   val job = this.job.asInstanceOf[SparklerJob]
>>
>>   val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))
>>
>>   val scoreUpdateRdd: RDD[SolrInputDocument] = 
>> scoredRdd.repartition(50).map(d => ScoreUpdateSolrTransformer(d))
>>
>>
>> Where I repartitioned that scoredRdd map out of interest, it then
>> triggers the FairFetcher function there, instead of in the runJob(), but
>> still on a single executor 
>>
>> Tom
>>
>> On Wed, Jun 9, 2021 at 4:11 PM Tom Barber  wrote:
>>
>>>
>>> Okay so what happens is that the crawler reads a bunch of solr data,
>>> we're not talking GB's just a list of JSON and turns that into a bunch of
>>> RDD's that end up in that flatmap that I linked to first.
>>>
>>> The fair fetcher is an interface to a pluggable backend that basically
>>> takes some of the fields and goes and crawls websites listed in them
>>> looking for information. We wrote this code 6 years ago for a DARPA project
>>> tracking down criminals on the web. Now I'm reusing it but trying to force
>>> it to scale out a bit more.
>>>
>>> Say I have 4000 urls I want to crawl and a 3 node Spark cluster, I want
>>> to push down 1 URL (a few more wont hurt, but crawling 50 urls in parallel
>>> on one node makes my cluster sad) to each executor and have it run a crawl,
>>> then move on and get another one and so on. That way you're not saturating
>>> a node trying to look up all of them and you could add more nodes for
>>> greater capacity pretty quickly. Once the website has been captured, you
>>> can then "score" it for want of a better term to determine its usefulness,
>>> which is where the map is being triggered.
>>>
>>> In answer to your questions Sean, no action seems triggered until you
>>> end up in the score block and the sc.runJob() because thats literally the
>>> next line of functionality as Kafka isn't enabled.
>>>
>>> val fetchedRdd = rdd.map(r => (r.getGroup, r))
>>>   .groupByKey(m).flatMap({ case (grp, rs) => new FairFetcher(job, 
>>> rs.iterator, localFetchDelay,
>>> FetchFunction, ParseFunction, OutLinkFilterFunction, 
>>> StatusUpdateSolrTransformer).toSeq })
>>>   .persist()
>>>
>>> if (kafkaEnable) {
>>>   storeContentKafka(kafkaListeners, kafkaTopic.format(jobId), fetchedRdd)
>>> }
>>> val scoredRdd = score(fetchedRdd)
>>>
>>>
>>> That if block is disabled so the score function runs. Inside of that:
>>>
>>> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
>>>   val job = this.job.asInstanceOf[SparklerJob]
>>>
>>>   val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))
>>>
>>>   val scoreUpdateRdd: RDD[SolrInputDocument] = scoredRdd.map(d => 
>>> ScoreUpdateSolrTransformer(d))
>>>   val scoreUpdateFunc = new SolrStatusUpdate(job)
>>>   sc.runJob(scoreUpdateRdd, scoreUpdateFunc)
>>> 
>>>
>>>
>>> When its doing stuff in the SparkUI I can see that its waiting on the
>>> sc.runJob() line, so thats the execution point.
>>>
>>>
>>> Tom
>>>
>>> On Wed, Jun 9, 2021 at 3:59 PM Sean Owen  wrote:
>>>
>>>> persist() doesn't even persist by itself - just sets it to be persisted
>>>> when it's executed.
>>>> key doesn't matter here, nor partitioning, if this code is trying to
>>>> run things on the driver inadvertently.
>>>> I don't quite grok what the OSS code you linked to is doing, but 

Re: Distributing a FlatMap across a Spark Cluster

2021-06-09 Thread Tom Barber
Also just to follow up on that slightly, I did also try off the back of
another comment:

def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
  val job = this.job.asInstanceOf[SparklerJob]

  val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))

  val scoreUpdateRdd: RDD[SolrInputDocument] =
scoredRdd.repartition(50).map(d => ScoreUpdateSolrTransformer(d))


Where I repartitioned that scoredRdd map out of interest, it then triggers
the FairFetcher function there, instead of in the runJob(), but still on a
single executor 

Tom

On Wed, Jun 9, 2021 at 4:11 PM Tom Barber  wrote:

>
> Okay so what happens is that the crawler reads a bunch of solr data, we're
> not talking GB's just a list of JSON and turns that into a bunch of RDD's
> that end up in that flatmap that I linked to first.
>
> The fair fetcher is an interface to a pluggable backend that basically
> takes some of the fields and goes and crawls websites listed in them
> looking for information. We wrote this code 6 years ago for a DARPA project
> tracking down criminals on the web. Now I'm reusing it but trying to force
> it to scale out a bit more.
>
> Say I have 4000 urls I want to crawl and a 3 node Spark cluster, I want to
> push down 1 URL (a few more wont hurt, but crawling 50 urls in parallel on
> one node makes my cluster sad) to each executor and have it run a crawl,
> then move on and get another one and so on. That way you're not saturating
> a node trying to look up all of them and you could add more nodes for
> greater capacity pretty quickly. Once the website has been captured, you
> can then "score" it for want of a better term to determine its usefulness,
> which is where the map is being triggered.
>
> In answer to your questions Sean, no action seems triggered until you end
> up in the score block and the sc.runJob() because thats literally the next
> line of functionality as Kafka isn't enabled.
>
> val fetchedRdd = rdd.map(r => (r.getGroup, r))
>   .groupByKey(m).flatMap({ case (grp, rs) => new FairFetcher(job, 
> rs.iterator, localFetchDelay,
> FetchFunction, ParseFunction, OutLinkFilterFunction, 
> StatusUpdateSolrTransformer).toSeq })
>   .persist()
>
> if (kafkaEnable) {
>   storeContentKafka(kafkaListeners, kafkaTopic.format(jobId), fetchedRdd)
> }
> val scoredRdd = score(fetchedRdd)
>
>
> That if block is disabled so the score function runs. Inside of that:
>
> def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
>   val job = this.job.asInstanceOf[SparklerJob]
>
>   val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))
>
>   val scoreUpdateRdd: RDD[SolrInputDocument] = scoredRdd.map(d => 
> ScoreUpdateSolrTransformer(d))
>   val scoreUpdateFunc = new SolrStatusUpdate(job)
>   sc.runJob(scoreUpdateRdd, scoreUpdateFunc)
> 
>
>
> When its doing stuff in the SparkUI I can see that its waiting on the
> sc.runJob() line, so thats the execution point.
>
>
> Tom
>
> On Wed, Jun 9, 2021 at 3:59 PM Sean Owen  wrote:
>
>> persist() doesn't even persist by itself - just sets it to be persisted
>> when it's executed.
>> key doesn't matter here, nor partitioning, if this code is trying to run
>> things on the driver inadvertently.
>> I don't quite grok what the OSS code you linked to is doing, but it's
>> running some supplied functions very directly and at a low-level with
>> sc.runJob, which might be part of how this can do something unusual.
>> How do you trigger any action? what happens after persist()
>>
>> On Wed, Jun 9, 2021 at 9:48 AM Tom Barber  wrote:
>>
>>> Thanks Mich,
>>>
>>> The key on the first iteration is just a string that says "seed", so it
>>> is indeed on the first crawl the same across all of the groups. Further
>>> iterations would be different, but I'm not there yet. I was under the
>>> impression that a repartition would distribute the tasks. Is that not the
>>> case?
>>>
>>> Thanks
>>>
>>> Tom
>>>
>>> On Wed, Jun 9, 2021 at 3:44 PM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>> Hi Tom,
>>>>
>>>> Persist() here simply means persist to memory). That is all. You can
>>>> check UI tab on storage
>>>>
>>>>
>>>> https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence
>>>>
>>>> So I gather the code is stuck from your link in the driver. You stated
>>>> that you tried repartition() but it did not do anything,
>>>>
>>>> Further you stated :
>>>>
>>>> " The key

Re: Distributing a FlatMap across a Spark Cluster

2021-06-09 Thread Tom Barber
Okay so what happens is that the crawler reads a bunch of solr data, we're
not talking GB's just a list of JSON and turns that into a bunch of RDD's
that end up in that flatmap that I linked to first.

The fair fetcher is an interface to a pluggable backend that basically
takes some of the fields and goes and crawls websites listed in them
looking for information. We wrote this code 6 years ago for a DARPA project
tracking down criminals on the web. Now I'm reusing it but trying to force
it to scale out a bit more.

Say I have 4000 urls I want to crawl and a 3 node Spark cluster, I want to
push down 1 URL (a few more wont hurt, but crawling 50 urls in parallel on
one node makes my cluster sad) to each executor and have it run a crawl,
then move on and get another one and so on. That way you're not saturating
a node trying to look up all of them and you could add more nodes for
greater capacity pretty quickly. Once the website has been captured, you
can then "score" it for want of a better term to determine its usefulness,
which is where the map is being triggered.

In answer to your questions Sean, no action seems triggered until you end
up in the score block and the sc.runJob() because thats literally the next
line of functionality as Kafka isn't enabled.

val fetchedRdd = rdd.map(r => (r.getGroup, r))
  .groupByKey(m).flatMap({ case (grp, rs) => new FairFetcher(job,
rs.iterator, localFetchDelay,
FetchFunction, ParseFunction, OutLinkFilterFunction,
StatusUpdateSolrTransformer).toSeq })
  .persist()

if (kafkaEnable) {
  storeContentKafka(kafkaListeners, kafkaTopic.format(jobId), fetchedRdd)
}
val scoredRdd = score(fetchedRdd)


That if block is disabled so the score function runs. Inside of that:

def score(fetchedRdd: RDD[CrawlData]): RDD[CrawlData] = {
  val job = this.job.asInstanceOf[SparklerJob]

  val scoredRdd = fetchedRdd.map(d => ScoreFunction(job, d))

  val scoreUpdateRdd: RDD[SolrInputDocument] = scoredRdd.map(d =>
ScoreUpdateSolrTransformer(d))
  val scoreUpdateFunc = new SolrStatusUpdate(job)
  sc.runJob(scoreUpdateRdd, scoreUpdateFunc)



When its doing stuff in the SparkUI I can see that its waiting on the
sc.runJob() line, so thats the execution point.


Tom

On Wed, Jun 9, 2021 at 3:59 PM Sean Owen  wrote:

> persist() doesn't even persist by itself - just sets it to be persisted
> when it's executed.
> key doesn't matter here, nor partitioning, if this code is trying to run
> things on the driver inadvertently.
> I don't quite grok what the OSS code you linked to is doing, but it's
> running some supplied functions very directly and at a low-level with
> sc.runJob, which might be part of how this can do something unusual.
> How do you trigger any action? what happens after persist()
>
> On Wed, Jun 9, 2021 at 9:48 AM Tom Barber  wrote:
>
>> Thanks Mich,
>>
>> The key on the first iteration is just a string that says "seed", so it
>> is indeed on the first crawl the same across all of the groups. Further
>> iterations would be different, but I'm not there yet. I was under the
>> impression that a repartition would distribute the tasks. Is that not the
>> case?
>>
>> Thanks
>>
>> Tom
>>
>> On Wed, Jun 9, 2021 at 3:44 PM Mich Talebzadeh 
>> wrote:
>>
>>> Hi Tom,
>>>
>>> Persist() here simply means persist to memory). That is all. You can
>>> check UI tab on storage
>>>
>>>
>>> https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence
>>>
>>> So I gather the code is stuck from your link in the driver. You stated
>>> that you tried repartition() but it did not do anything,
>>>
>>> Further you stated :
>>>
>>> " The key is pretty static in these tests, so I have also tried forcing
>>> the partition count (50 on a 16 core per node cluster) and also
>>> repartitioning, but every time all the jobs are scheduled to run on one
>>> node."
>>>
>>>
>>> What is the key?
>>>
>>>
>>> HTH
>>>
>>>
>>>view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Wed, 9 Jun 2021 at 15:23, Tom Barber  wrote:
>>>
>>>>

Re: Distributing a FlatMap across a Spark Cluster

2021-06-09 Thread Tom Barber
Thanks Mich,

The key on the first iteration is just a string that says "seed", so it is
indeed on the first crawl the same across all of the groups. Further
iterations would be different, but I'm not there yet. I was under the
impression that a repartition would distribute the tasks. Is that not the
case?

Thanks

Tom

On Wed, Jun 9, 2021 at 3:44 PM Mich Talebzadeh 
wrote:

> Hi Tom,
>
> Persist() here simply means persist to memory). That is all. You can check
> UI tab on storage
>
>
> https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence
>
> So I gather the code is stuck from your link in the driver. You stated
> that you tried repartition() but it did not do anything,
>
> Further you stated :
>
> " The key is pretty static in these tests, so I have also tried forcing
> the partition count (50 on a 16 core per node cluster) and also
> repartitioning, but every time all the jobs are scheduled to run on one
> node."
>
>
> What is the key?
>
>
> HTH
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 9 Jun 2021 at 15:23, Tom Barber  wrote:
>
>> Interesting Sean thanks for that insight, I wasn't aware of that fact, I
>> assume the .persist() at the end of that line doesn't do it?
>>
>> I believe, looking at the output in the SparkUI, it gets to
>> https://github.com/USCDataScience/sparkler/blob/master/sparkler-core/sparkler-app/src/main/scala/edu/usc/irds/sparkler/pipeline/Crawler.scala#L254
>> and calls the context runJob.
>>
>> On Wed, Jun 9, 2021 at 2:07 PM Sean Owen  wrote:
>>
>>> All these configurations don't matter at all if this is executing on the
>>> driver.
>>> Returning an Iterator in flatMap is fine though it 'delays' execution
>>> until that iterator is evaluated by something, which is normally fine.
>>> Does creating this FairFetcher do anything by itself? you're just
>>> returning an iterator that creates them here.
>>> How do you actually trigger an action here? the code snippet itself
>>> doesn't trigger anything.
>>> I think we need more info about what else is happening in the code.
>>>
>>> On Wed, Jun 9, 2021 at 6:30 AM Tom Barber  wrote:
>>>
>>>> Yeah so if I update the FairFetcher to return a seq it makes no real
>>>> difference.
>>>>
>>>> Here's an image of what I'm seeing just for reference:
>>>> https://pasteboard.co/K5NFrz7.png
>>>>
>>>> Because this is databricks I don't have an actual spark submit command
>>>> but it looks like this:
>>>>
>>>> curl  -d
>>>> '{"new_cluster":{"spark_conf":{"spark.executor.extraJavaOptions":"-Dpf4j.pluginsDir=/dbfs/FileStore/bcf/plugins/",
>>>> "spark.task.cpus":"16"},
>>>> "spark_version":"8.3.x-scala2.12","aws_attributes":{"availability":"SPOT_WITH_FALLBACK","first_on_demand":1,"zone_id":"us-west-2c"},"node_type_id":"c5d.4xlarge","init_scripts":[{"dbfs":{"destination":"dbfs:/FileStore/crawlinit.sh"}}],"num_workers":3},"spark_submit_task":{"parameters":["--driver-java-options",
>>>> "-Dpf4j.pluginsDir=/dbfs/FileStore/bcf/plugins/", "--driver-memory", "10g",
>>>> "--executor-memory", "10g",
>>>> "--class","edu.usc.irds.sparkler.Main","dbfs:/FileStore/bcf/sparkler7.jar","crawl","-id","mytestcrawl11",
>>>> "-tn", "5000", "-co",
>>>> "{\"plugins.active\":[\"urlfilter-regex\",\"urlfilter-samehost\",\"fetcher-chrome\"],\"plugins\":{\"fetcher.chrome\":{\"chrome.dns\":\"local\"}}}"]},"run_name":"testsubmi3t"}'
>>>>
>>>> I deliberately pinned spark.task.cpus to 16 to stop it swamping the
>>>> driver trying to run all the tasks in parallel on the one node, but again
>>>> I've got 5

Re: Distributing a FlatMap across a Spark Cluster

2021-06-09 Thread Tom Barber
Interesting Sean thanks for that insight, I wasn't aware of that fact, I
assume the .persist() at the end of that line doesn't do it?

I believe, looking at the output in the SparkUI, it gets to
https://github.com/USCDataScience/sparkler/blob/master/sparkler-core/sparkler-app/src/main/scala/edu/usc/irds/sparkler/pipeline/Crawler.scala#L254
and calls the context runJob.

On Wed, Jun 9, 2021 at 2:07 PM Sean Owen  wrote:

> All these configurations don't matter at all if this is executing on the
> driver.
> Returning an Iterator in flatMap is fine though it 'delays' execution
> until that iterator is evaluated by something, which is normally fine.
> Does creating this FairFetcher do anything by itself? you're just
> returning an iterator that creates them here.
> How do you actually trigger an action here? the code snippet itself
> doesn't trigger anything.
> I think we need more info about what else is happening in the code.
>
> On Wed, Jun 9, 2021 at 6:30 AM Tom Barber  wrote:
>
>> Yeah so if I update the FairFetcher to return a seq it makes no real
>> difference.
>>
>> Here's an image of what I'm seeing just for reference:
>> https://pasteboard.co/K5NFrz7.png
>>
>> Because this is databricks I don't have an actual spark submit command
>> but it looks like this:
>>
>> curl  -d
>> '{"new_cluster":{"spark_conf":{"spark.executor.extraJavaOptions":"-Dpf4j.pluginsDir=/dbfs/FileStore/bcf/plugins/",
>> "spark.task.cpus":"16"},
>> "spark_version":"8.3.x-scala2.12","aws_attributes":{"availability":"SPOT_WITH_FALLBACK","first_on_demand":1,"zone_id":"us-west-2c"},"node_type_id":"c5d.4xlarge","init_scripts":[{"dbfs":{"destination":"dbfs:/FileStore/crawlinit.sh"}}],"num_workers":3},"spark_submit_task":{"parameters":["--driver-java-options",
>> "-Dpf4j.pluginsDir=/dbfs/FileStore/bcf/plugins/", "--driver-memory", "10g",
>> "--executor-memory", "10g",
>> "--class","edu.usc.irds.sparkler.Main","dbfs:/FileStore/bcf/sparkler7.jar","crawl","-id","mytestcrawl11",
>> "-tn", "5000", "-co",
>> "{\"plugins.active\":[\"urlfilter-regex\",\"urlfilter-samehost\",\"fetcher-chrome\"],\"plugins\":{\"fetcher.chrome\":{\"chrome.dns\":\"local\"}}}"]},"run_name":"testsubmi3t"}'
>>
>> I deliberately pinned spark.task.cpus to 16 to stop it swamping the
>> driver trying to run all the tasks in parallel on the one node, but again
>> I've got 50 tasks queued up all running on the single node.
>>
>> On Wed, Jun 9, 2021 at 12:01 PM Tom Barber  wrote:
>>
>>> I've not run it yet, but I've stuck a toSeq on the end, but in reality a
>>> Seq just inherits Iterator, right?
>>>
>>> Flatmap does return a RDD[CrawlData] unless my IDE is lying to me.
>>>
>>> Tom
>>>
>>> On Wed, Jun 9, 2021 at 10:54 AM Tom Barber  wrote:
>>>
>>>> Interesting Jayesh, thanks, I will test.
>>>>
>>>> All this code is inherited and it runs, but I don't think its been
>>>> tested in a distributed context for about 5 years, but yeah I need to get
>>>> this pushed down, so I'm happy to try anything! :)
>>>>
>>>> Tom
>>>>
>>>> On Wed, Jun 9, 2021 at 3:37 AM Lalwani, Jayesh 
>>>> wrote:
>>>>
>>>>> flatMap is supposed to return Seq, not Iterator. You are returning a
>>>>> class that implements Iterator. I have a hunch that's what's causing the
>>>>> confusion. flatMap is returning a RDD[FairFetcher] not RDD[CrawlData]. Do
>>>>> you intend it to be RDD[CrawlData]? You might want to call toSeq on
>>>>> FairFetcher.
>>>>>
>>>>> On 6/8/21, 10:10 PM, "Tom Barber"  wrote:
>>>>>
>>>>> CAUTION: This email originated from outside of the organization.
>>>>> Do not click links or open attachments unless you can confirm the sender
>>>>> and know the content is safe.
>>>>>
>>>>>
>>>>>
>>>>> For anyone interested here's the execution logs up until the point
>>>>> where it actually kicks off the workload in question:
>>>>> https://gist.github.c

Re: Distributing a FlatMap across a Spark Cluster

2021-06-09 Thread Tom Barber
Yeah so if I update the FairFetcher to return a seq it makes no real
difference.

Here's an image of what I'm seeing just for reference:
https://pasteboard.co/K5NFrz7.png

Because this is databricks I don't have an actual spark submit command but
it looks like this:

curl  -d
'{"new_cluster":{"spark_conf":{"spark.executor.extraJavaOptions":"-Dpf4j.pluginsDir=/dbfs/FileStore/bcf/plugins/",
"spark.task.cpus":"16"},
"spark_version":"8.3.x-scala2.12","aws_attributes":{"availability":"SPOT_WITH_FALLBACK","first_on_demand":1,"zone_id":"us-west-2c"},"node_type_id":"c5d.4xlarge","init_scripts":[{"dbfs":{"destination":"dbfs:/FileStore/crawlinit.sh"}}],"num_workers":3},"spark_submit_task":{"parameters":["--driver-java-options",
"-Dpf4j.pluginsDir=/dbfs/FileStore/bcf/plugins/", "--driver-memory", "10g",
"--executor-memory", "10g",
"--class","edu.usc.irds.sparkler.Main","dbfs:/FileStore/bcf/sparkler7.jar","crawl","-id","mytestcrawl11",
"-tn", "5000", "-co",
"{\"plugins.active\":[\"urlfilter-regex\",\"urlfilter-samehost\",\"fetcher-chrome\"],\"plugins\":{\"fetcher.chrome\":{\"chrome.dns\":\"local\"}}}"]},"run_name":"testsubmi3t"}'

I deliberately pinned spark.task.cpus to 16 to stop it swamping the driver
trying to run all the tasks in parallel on the one node, but again I've got
50 tasks queued up all running on the single node.

On Wed, Jun 9, 2021 at 12:01 PM Tom Barber  wrote:

> I've not run it yet, but I've stuck a toSeq on the end, but in reality a
> Seq just inherits Iterator, right?
>
> Flatmap does return a RDD[CrawlData] unless my IDE is lying to me.
>
> Tom
>
> On Wed, Jun 9, 2021 at 10:54 AM Tom Barber  wrote:
>
>> Interesting Jayesh, thanks, I will test.
>>
>> All this code is inherited and it runs, but I don't think its been tested
>> in a distributed context for about 5 years, but yeah I need to get this
>> pushed down, so I'm happy to try anything! :)
>>
>> Tom
>>
>> On Wed, Jun 9, 2021 at 3:37 AM Lalwani, Jayesh 
>> wrote:
>>
>>> flatMap is supposed to return Seq, not Iterator. You are returning a
>>> class that implements Iterator. I have a hunch that's what's causing the
>>> confusion. flatMap is returning a RDD[FairFetcher] not RDD[CrawlData]. Do
>>> you intend it to be RDD[CrawlData]? You might want to call toSeq on
>>> FairFetcher.
>>>
>>> On 6/8/21, 10:10 PM, "Tom Barber"  wrote:
>>>
>>> CAUTION: This email originated from outside of the organization. Do
>>> not click links or open attachments unless you can confirm the sender and
>>> know the content is safe.
>>>
>>>
>>>
>>> For anyone interested here's the execution logs up until the point
>>> where it actually kicks off the workload in question:
>>> https://gist.github.com/buggtb/a9e0445f24182bc8eedfe26c0f07a473
>>>
>>> On 2021/06/09 01:52:39, Tom Barber  wrote:
>>> > ExecutorID says driver, and looking at the IP addresses its
>>> running on its not any of the worker ip's.
>>> >
>>> > I forcibly told it to create 50, but they'd all end up running in
>>> the same place.
>>> >
>>> > Working on some other ideas, I set spark.task.cpus to 16 to match
>>> the nodes whilst still forcing it to 50 partitions
>>> >
>>> > val m = 50
>>> >
>>> > val fetchedRdd = rdd.map(r => (r.getGroup, r))
>>> > .groupByKey(m).flatMap({ case (grp, rs) => new
>>> FairFetcher(job, rs.iterator, localFetchDelay,
>>> >   FetchFunction, ParseFunction, OutLinkFilterFunction,
>>> StatusUpdateSolrTransformer) })
>>> > .persist()
>>> >
>>> > that sort of thing. But still the tasks are pinned to the driver
>>> executor and none of the workers, so I no longer saturate the master node,
>>> but I also have 3 workers just sat there doing nothing.
>>> >
>>> > On 2021/06/09 01:26:50, Sean Owen  wrote:
>>> > > Are you sure it's on the driver? or just 1 executor?
>>> > > how many partitions does the groupByKey produce? that woul

Re: Distributing a FlatMap across a Spark Cluster

2021-06-09 Thread Tom Barber
I've not run it yet, but I've stuck a toSeq on the end, but in reality a
Seq just inherits Iterator, right?

Flatmap does return a RDD[CrawlData] unless my IDE is lying to me.

Tom

On Wed, Jun 9, 2021 at 10:54 AM Tom Barber  wrote:

> Interesting Jayesh, thanks, I will test.
>
> All this code is inherited and it runs, but I don't think its been tested
> in a distributed context for about 5 years, but yeah I need to get this
> pushed down, so I'm happy to try anything! :)
>
> Tom
>
> On Wed, Jun 9, 2021 at 3:37 AM Lalwani, Jayesh 
> wrote:
>
>> flatMap is supposed to return Seq, not Iterator. You are returning a
>> class that implements Iterator. I have a hunch that's what's causing the
>> confusion. flatMap is returning a RDD[FairFetcher] not RDD[CrawlData]. Do
>> you intend it to be RDD[CrawlData]? You might want to call toSeq on
>> FairFetcher.
>>
>> On 6/8/21, 10:10 PM, "Tom Barber"  wrote:
>>
>> CAUTION: This email originated from outside of the organization. Do
>> not click links or open attachments unless you can confirm the sender and
>> know the content is safe.
>>
>>
>>
>> For anyone interested here's the execution logs up until the point
>> where it actually kicks off the workload in question:
>> https://gist.github.com/buggtb/a9e0445f24182bc8eedfe26c0f07a473
>>
>> On 2021/06/09 01:52:39, Tom Barber  wrote:
>> > ExecutorID says driver, and looking at the IP addresses its running
>> on its not any of the worker ip's.
>> >
>> > I forcibly told it to create 50, but they'd all end up running in
>> the same place.
>> >
>> > Working on some other ideas, I set spark.task.cpus to 16 to match
>> the nodes whilst still forcing it to 50 partitions
>> >
>> > val m = 50
>> >
>> > val fetchedRdd = rdd.map(r => (r.getGroup, r))
>> > .groupByKey(m).flatMap({ case (grp, rs) => new
>> FairFetcher(job, rs.iterator, localFetchDelay,
>> >   FetchFunction, ParseFunction, OutLinkFilterFunction,
>> StatusUpdateSolrTransformer) })
>> > .persist()
>> >
>> > that sort of thing. But still the tasks are pinned to the driver
>> executor and none of the workers, so I no longer saturate the master node,
>> but I also have 3 workers just sat there doing nothing.
>> >
>> > On 2021/06/09 01:26:50, Sean Owen  wrote:
>> > > Are you sure it's on the driver? or just 1 executor?
>> > > how many partitions does the groupByKey produce? that would limit
>> your
>> > > parallelism no matter what if it's a small number.
>> > >
>> > > On Tue, Jun 8, 2021 at 8:07 PM Tom Barber <
>> magicaltr...@apache.org> wrote:
>> > >
>> > > > Hi folks,
>> > > >
>> > > > Hopefully someone with more Spark experience than me can
>> explain this a
>> > > > bit.
>> > > >
>> > > > I dont' know if this is possible, impossible or just an old
>> design that
>> > > > could be better.
>> > > >
>> > > > I'm running Sparkler as a spark-submit job on a databricks
>> spark cluster
>> > > > and its getting to this point in the code(
>> > > >
>> https://github.com/USCDataScience/sparkler/blob/master/sparkler-core/sparkler-app/src/main/scala/edu/usc/irds/sparkler/pipeline/Crawler.scala#L222-L226
>> > > > )
>> > > >
>> > > > val fetchedRdd = rdd.map(r => (r.getGroup, r))
>> > > > .groupByKey()
>> > > > .flatMap({ case (grp, rs) => new FairFetcher(job,
>> rs.iterator,
>> > > > localFetchDelay,
>> > > >   FetchFunction, ParseFunction, OutLinkFilterFunction,
>> > > > StatusUpdateSolrTransformer) })
>> > > > .persist()
>> > > >
>> > > > This basically takes the RDD and then runs a web based crawl
>> over each RDD
>> > > > and returns the results. But when Spark executes it, it runs
>> all the crawls
>> > > > on the driver node and doesn't distribute them.
>> > > >
>> > > > The key is pretty static in these tests, so I have also tried
>> forcing the
>>

Re: Distributing a FlatMap across a Spark Cluster

2021-06-09 Thread Tom Barber
Interesting Jayesh, thanks, I will test.

All this code is inherited and it runs, but I don't think its been tested
in a distributed context for about 5 years, but yeah I need to get this
pushed down, so I'm happy to try anything! :)

Tom

On Wed, Jun 9, 2021 at 3:37 AM Lalwani, Jayesh  wrote:

> flatMap is supposed to return Seq, not Iterator. You are returning a class
> that implements Iterator. I have a hunch that's what's causing the
> confusion. flatMap is returning a RDD[FairFetcher] not RDD[CrawlData]. Do
> you intend it to be RDD[CrawlData]? You might want to call toSeq on
> FairFetcher.
>
> On 6/8/21, 10:10 PM, "Tom Barber"  wrote:
>
> CAUTION: This email originated from outside of the organization. Do
> not click links or open attachments unless you can confirm the sender and
> know the content is safe.
>
>
>
> For anyone interested here's the execution logs up until the point
> where it actually kicks off the workload in question:
> https://gist.github.com/buggtb/a9e0445f24182bc8eedfe26c0f07a473
>
> On 2021/06/09 01:52:39, Tom Barber  wrote:
> > ExecutorID says driver, and looking at the IP addresses its running
> on its not any of the worker ip's.
> >
> > I forcibly told it to create 50, but they'd all end up running in
> the same place.
> >
> > Working on some other ideas, I set spark.task.cpus to 16 to match
> the nodes whilst still forcing it to 50 partitions
> >
> > val m = 50
> >
> > val fetchedRdd = rdd.map(r => (r.getGroup, r))
> > .groupByKey(m).flatMap({ case (grp, rs) => new
> FairFetcher(job, rs.iterator, localFetchDelay,
> >   FetchFunction, ParseFunction, OutLinkFilterFunction,
> StatusUpdateSolrTransformer) })
> > .persist()
> >
> > that sort of thing. But still the tasks are pinned to the driver
> executor and none of the workers, so I no longer saturate the master node,
> but I also have 3 workers just sat there doing nothing.
> >
> > On 2021/06/09 01:26:50, Sean Owen  wrote:
> > > Are you sure it's on the driver? or just 1 executor?
> > > how many partitions does the groupByKey produce? that would limit
> your
> > > parallelism no matter what if it's a small number.
> > >
> > > On Tue, Jun 8, 2021 at 8:07 PM Tom Barber 
> wrote:
> > >
> > > > Hi folks,
> > > >
> > > > Hopefully someone with more Spark experience than me can explain
> this a
> > > > bit.
> > > >
> > > > I dont' know if this is possible, impossible or just an old
> design that
> > > > could be better.
> > > >
> > > > I'm running Sparkler as a spark-submit job on a databricks spark
> cluster
> > > > and its getting to this point in the code(
> > > >
> https://github.com/USCDataScience/sparkler/blob/master/sparkler-core/sparkler-app/src/main/scala/edu/usc/irds/sparkler/pipeline/Crawler.scala#L222-L226
> > > > )
> > > >
> > > > val fetchedRdd = rdd.map(r => (r.getGroup, r))
> > > > .groupByKey()
> > > > .flatMap({ case (grp, rs) => new FairFetcher(job,
> rs.iterator,
> > > > localFetchDelay,
> > > >   FetchFunction, ParseFunction, OutLinkFilterFunction,
> > > > StatusUpdateSolrTransformer) })
> > > > .persist()
> > > >
> > > > This basically takes the RDD and then runs a web based crawl
> over each RDD
> > > > and returns the results. But when Spark executes it, it runs all
> the crawls
> > > > on the driver node and doesn't distribute them.
> > > >
> > > > The key is pretty static in these tests, so I have also tried
> forcing the
> > > > partition count (50 on a 16 core per node cluster) and also
> repartitioning,
> > > > but every time all the jobs are scheduled to run on one node.
> > > >
> > > > What can I do better to distribute the tasks? Because the
> processing of
> > > > the data in the RDD isn't the bottleneck, the fetching of the
> crawl data is
> > > > the bottleneck, but that happens after the code has been
> assigned to a node.
> > > >
> > > > Thanks
> > > >
> > > > Tom
> > > >
> > > >
> > > >
> 

Re: Distributing a FlatMap across a Spark Cluster

2021-06-08 Thread Tom Barber
For anyone interested here's the execution logs up until the point where it 
actually kicks off the workload in question: 
https://gist.github.com/buggtb/a9e0445f24182bc8eedfe26c0f07a473

On 2021/06/09 01:52:39, Tom Barber  wrote: 
> ExecutorID says driver, and looking at the IP addresses its running on its 
> not any of the worker ip's.
> 
> I forcibly told it to create 50, but they'd all end up running in the same 
> place. 
> 
> Working on some other ideas, I set spark.task.cpus to 16 to match the nodes 
> whilst still forcing it to 50 partitions
> 
> val m = 50
> 
> val fetchedRdd = rdd.map(r => (r.getGroup, r))
> .groupByKey(m).flatMap({ case (grp, rs) => new FairFetcher(job, 
> rs.iterator, localFetchDelay,
>   FetchFunction, ParseFunction, OutLinkFilterFunction, 
> StatusUpdateSolrTransformer) })
> .persist()
> 
> that sort of thing. But still the tasks are pinned to the driver executor and 
> none of the workers, so I no longer saturate the master node, but I also have 
> 3 workers just sat there doing nothing.
> 
> On 2021/06/09 01:26:50, Sean Owen  wrote: 
> > Are you sure it's on the driver? or just 1 executor?
> > how many partitions does the groupByKey produce? that would limit your
> > parallelism no matter what if it's a small number.
> > 
> > On Tue, Jun 8, 2021 at 8:07 PM Tom Barber  wrote:
> > 
> > > Hi folks,
> > >
> > > Hopefully someone with more Spark experience than me can explain this a
> > > bit.
> > >
> > > I dont' know if this is possible, impossible or just an old design that
> > > could be better.
> > >
> > > I'm running Sparkler as a spark-submit job on a databricks spark cluster
> > > and its getting to this point in the code(
> > > https://github.com/USCDataScience/sparkler/blob/master/sparkler-core/sparkler-app/src/main/scala/edu/usc/irds/sparkler/pipeline/Crawler.scala#L222-L226
> > > )
> > >
> > > val fetchedRdd = rdd.map(r => (r.getGroup, r))
> > > .groupByKey()
> > > .flatMap({ case (grp, rs) => new FairFetcher(job, rs.iterator,
> > > localFetchDelay,
> > >   FetchFunction, ParseFunction, OutLinkFilterFunction,
> > > StatusUpdateSolrTransformer) })
> > > .persist()
> > >
> > > This basically takes the RDD and then runs a web based crawl over each RDD
> > > and returns the results. But when Spark executes it, it runs all the 
> > > crawls
> > > on the driver node and doesn't distribute them.
> > >
> > > The key is pretty static in these tests, so I have also tried forcing the
> > > partition count (50 on a 16 core per node cluster) and also 
> > > repartitioning,
> > > but every time all the jobs are scheduled to run on one node.
> > >
> > > What can I do better to distribute the tasks? Because the processing of
> > > the data in the RDD isn't the bottleneck, the fetching of the crawl data 
> > > is
> > > the bottleneck, but that happens after the code has been assigned to a 
> > > node.
> > >
> > > Thanks
> > >
> > > Tom
> > >
> > >
> > > -
> > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> > >
> > >
> > 
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Distributing a FlatMap across a Spark Cluster

2021-06-08 Thread Tom Barber
ExecutorID says driver, and looking at the IP addresses its running on its not 
any of the worker ip's.

I forcibly told it to create 50, but they'd all end up running in the same 
place. 

Working on some other ideas, I set spark.task.cpus to 16 to match the nodes 
whilst still forcing it to 50 partitions

val m = 50

val fetchedRdd = rdd.map(r => (r.getGroup, r))
.groupByKey(m).flatMap({ case (grp, rs) => new FairFetcher(job, 
rs.iterator, localFetchDelay,
  FetchFunction, ParseFunction, OutLinkFilterFunction, 
StatusUpdateSolrTransformer) })
.persist()

that sort of thing. But still the tasks are pinned to the driver executor and 
none of the workers, so I no longer saturate the master node, but I also have 3 
workers just sat there doing nothing.

On 2021/06/09 01:26:50, Sean Owen  wrote: 
> Are you sure it's on the driver? or just 1 executor?
> how many partitions does the groupByKey produce? that would limit your
> parallelism no matter what if it's a small number.
> 
> On Tue, Jun 8, 2021 at 8:07 PM Tom Barber  wrote:
> 
> > Hi folks,
> >
> > Hopefully someone with more Spark experience than me can explain this a
> > bit.
> >
> > I dont' know if this is possible, impossible or just an old design that
> > could be better.
> >
> > I'm running Sparkler as a spark-submit job on a databricks spark cluster
> > and its getting to this point in the code(
> > https://github.com/USCDataScience/sparkler/blob/master/sparkler-core/sparkler-app/src/main/scala/edu/usc/irds/sparkler/pipeline/Crawler.scala#L222-L226
> > )
> >
> > val fetchedRdd = rdd.map(r => (r.getGroup, r))
> > .groupByKey()
> > .flatMap({ case (grp, rs) => new FairFetcher(job, rs.iterator,
> > localFetchDelay,
> >   FetchFunction, ParseFunction, OutLinkFilterFunction,
> > StatusUpdateSolrTransformer) })
> > .persist()
> >
> > This basically takes the RDD and then runs a web based crawl over each RDD
> > and returns the results. But when Spark executes it, it runs all the crawls
> > on the driver node and doesn't distribute them.
> >
> > The key is pretty static in these tests, so I have also tried forcing the
> > partition count (50 on a 16 core per node cluster) and also repartitioning,
> > but every time all the jobs are scheduled to run on one node.
> >
> > What can I do better to distribute the tasks? Because the processing of
> > the data in the RDD isn't the bottleneck, the fetching of the crawl data is
> > the bottleneck, but that happens after the code has been assigned to a node.
> >
> > Thanks
> >
> > Tom
> >
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
> >
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Distributing a FlatMap across a Spark Cluster

2021-06-08 Thread Tom Barber
Hi folks, 

Hopefully someone with more Spark experience than me can explain this a bit.

I dont' know if this is possible, impossible or just an old design that could 
be better.

I'm running Sparkler as a spark-submit job on a databricks spark cluster and 
its getting to this point in the 
code(https://github.com/USCDataScience/sparkler/blob/master/sparkler-core/sparkler-app/src/main/scala/edu/usc/irds/sparkler/pipeline/Crawler.scala#L222-L226)

val fetchedRdd = rdd.map(r => (r.getGroup, r))
.groupByKey()
.flatMap({ case (grp, rs) => new FairFetcher(job, rs.iterator, 
localFetchDelay,
  FetchFunction, ParseFunction, OutLinkFilterFunction, 
StatusUpdateSolrTransformer) })
.persist()

This basically takes the RDD and then runs a web based crawl over each RDD and 
returns the results. But when Spark executes it, it runs all the crawls on the 
driver node and doesn't distribute them.

The key is pretty static in these tests, so I have also tried forcing the 
partition count (50 on a 16 core per node cluster) and also repartitioning, but 
every time all the jobs are scheduled to run on one node.

What can I do better to distribute the tasks? Because the processing of the 
data in the RDD isn't the bottleneck, the fetching of the crawl data is the 
bottleneck, but that happens after the code has been assigned to a node.

Thanks

Tom


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: GPU job in Spark 3

2021-04-09 Thread Tom Graves
 Hey Martin,
I would encourage you to file issues in the spark-rapids repo for questions 
with that plugin: https://github.com/NVIDIA/spark-rapids/issues
I'm assuming the query ran and you looked at the sql UI or the .expalin() 
output and it was on cpu and not gpu?  I am assuming you have the cuda 11.0 
runtime installed (look in /usr/local). You printed the driver version which is 
11.2 but the runtimes can be different. You are using the 11.0 cuda version of 
the cudf library. If that didn't match runtime though it would have failed and 
not ran anything.
The easiest way to tell why it didn't run on the GPU is to enable the config: 
spark.rapids.sql.explain=NOT_ON_GPU 
It will print out logs to your console as to why different operators don't run 
on the gpu.  
Again feel free to open up a question issues in the spark-rapids repo and we 
can discuss more there.
Tom
On Friday, April 9, 2021, 11:19:05 AM CDT, Martin Somers 
 wrote:  
 
 
Hi Everyone !!

Im trying to get on premise GPU instance of Spark 3 running on my ubuntu box, 
and I am following:  
https://nvidia.github.io/spark-rapids/docs/get-started/getting-started-on-prem.html#example-join-operation

Anyone with any insight into why a spark job isnt being ran on the GPU - 
appears to be all on the CPU, hadoop binary installed and appears to be 
functioning fine  

export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath)
here is my setup on ubuntu20.10


▶ nvidia-smi

+-+
| NVIDIA-SMI 460.39       Driver Version: 460.39       CUDA Version: 11.2     |
|---+--+--+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|===+==+==|
|   0  GeForce RTX 3090    Off  | :21:00.0  On |                  N/A |
|  0%   38C    P8    19W / 370W |    478MiB / 24265MiB |      0%      Default |
|                               |                      |                  N/A |
+---+--+--+

/opt/sparkRapidsPlugin                                                          
                                                                                
             
▶ ls
cudf-0.18.1-cuda11.jar  getGpusResources.sh  rapids-4-spark_2.12-0.4.1.jar

▶ scalac --version
Scala compiler version 2.13.0 -- Copyright 2002-2019, LAMP/EPFL and Lightbend, 
Inc.


▶ spark-shell --version
2021-04-09 17:05:36,158 WARN util.Utils: Your hostname, studio resolves to a 
loopback address: 127.0.1.1; using 192.168.0.221 instead (on interface wlp71s0)
2021-04-09 17:05:36,159 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind 
to another address
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform 
(file:/opt/spark/jars/spark-unsafe_2.12-3.1.1.jar) to constructor 
java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of 
org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal 
reflective access operations
WARNING: All illegal access operations will be denied in a future release
Welcome to
                    __
     / __/__  ___ _/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.1.1
      /_/
                        
Using Scala version 2.12.10, OpenJDK 64-Bit Server VM, 11.0.10
Branch HEAD
Compiled by user ubuntu on 2021-02-22T01:04:02Z
Revision 1d550c4e90275ab418b9161925049239227f3dc9
Url https://github.com/apache/spark
Type --help for more information.


here is how I calling spark prior to adding the test job 

$SPARK_HOME/bin/spark-shell \
       --master local \
       --num-executors 1 \
       --conf spark.executor.cores=16 \
       --conf spark.rapids.sql.concurrentGpuTasks=1 \
       --driver-memory 10g \
       --conf 
spark.executor.extraClassPath=${SPARK_CUDF_JAR}:${SPARK_RAPIDS_PLUGIN_JAR}      
 
       --conf spark.rapids.memory.pinnedPool.size=16G \
       --conf spark.locality.wait=0s \
       --conf spark.sql.files.maxPartitionBytes=512m \
       --conf spark.sql.shuffle.partitions=10 \
       --conf spark.plugins=com.nvidia.spark.SQLPlugin \
       --files $SPARK_RAPIDS_DIR/getGpusResources.sh \
       --jars ${SPARK_CUDF_JAR},${SPARK_RAPIDS_PLUGIN_JAR}


Test job is from the example join-operation 

val df = sc.makeRDD(1 to 1000, 6).toDF
val df2 = sc.makeRDD(1 to 1000, 6).toDF
df.select( $"value" as "a").join(df2.select($"value" as "b"), $"a" === 
$"b").count


I just noticed that the scala versions are out of sync - 

Re: [Spark Core] makeRDD() preferredLocations do not appear to be considered

2020-09-12 Thread Tom Scott
It turned out the issue was with my environment not Spark. Just in case
anyone else is experiencing this the problem was that the Spark workers did
not use the machine hostname by default. Setting the following environment
variable on each worker rectified it: SPARK_LOCAL_HOSTNAME: "worker1" etc.
<https://stackoverflow.com/users/14147688/tom-scott>

On Tue, Sep 8, 2020 at 10:11 PM Tom Scott  wrote:

> Hi Guys,
>
>   I asked this in stack overflow here:
> https://stackoverflow.com/questions/63535720/why-would-preferredlocations-not-be-enforced-on-an-empty-spark-cluster
> but am hoping there is further help here.
>
>   I have a 4 node standalone cluster with workers named worker1, worker2
> and worker3 and a master on which I am running spark-shell. Given the
> following example:
>
> -
> import scala.collection.mutable
>
> val someData = mutable.ArrayBuffer[(String, Seq[String])]()
>
> someData += ("1" -> Seq("worker1"))
> someData += ("2" -> Seq("worker2"))
> someData += ("3" -> Seq("worker3"))
>
> val someRdd = sc.makeRDD(someData)
>
> someRdd.map(i=>i + ":" +
> java.net.InetAddress.getLocalHost().getHostName()).collect().foreach(println)
>
> -
>
> The cluster is completely clean with nothing else executing so I would
> expect to see output:
>
> 1:worker1
> 2:worker2
> 3:worker3
>
> but in fact the output is undefined and i see things like:
>
> scala> someRdd.map(i=>i + ":" +
> java.net.InetAddress.getLocalHost().getHostName()).collect().foreach(println)
> 1:worker3
> 2:worker1
> 3:worker2
>
> scala> someRdd.map(i=>i + ":" +
> java.net.InetAddress.getLocalHost().getHostName()).collect().foreach(println)
> 1:worker2
> 2:worker3
> 3:worker1
>
> Am I doing this wrong or is this expected behaviour?
>
> Thanks
>
>   Tom
>
>


[Spark Core] makeRDD() preferredLocations do not appear to be considered

2020-09-08 Thread Tom Scott
Hi Guys,

  I asked this in stack overflow here:
https://stackoverflow.com/questions/63535720/why-would-preferredlocations-not-be-enforced-on-an-empty-spark-cluster
but am hoping there is further help here.

  I have a 4 node standalone cluster with workers named worker1, worker2
and worker3 and a master on which I am running spark-shell. Given the
following example:
-
import scala.collection.mutable

val someData = mutable.ArrayBuffer[(String, Seq[String])]()

someData += ("1" -> Seq("worker1"))
someData += ("2" -> Seq("worker2"))
someData += ("3" -> Seq("worker3"))

val someRdd = sc.makeRDD(someData)

someRdd.map(i=>i + ":" +
java.net.InetAddress.getLocalHost().getHostName()).collect().foreach(println)
-

The cluster is completely clean with nothing else executing so I would
expect to see output:

1:worker1
2:worker2
3:worker3

but in fact the output is undefined and i see things like:

scala> someRdd.map(i=>i + ":" +
java.net.InetAddress.getLocalHost().getHostName()).collect().foreach(println)
1:worker3
2:worker1
3:worker2

scala> someRdd.map(i=>i + ":" +
java.net.InetAddress.getLocalHost().getHostName()).collect().foreach(println)
1:worker2
2:worker3
3:worker1

Am I doing this wrong or is this expected behaviour?

Thanks

  Tom


Re: Build customized resource manager

2019-11-08 Thread Tom Graves
 I don't know if it all works but some work was done to make cluster manager 
pluggable, see SPARK-13904.
Tom
On Wednesday, November 6, 2019, 07:22:59 PM CST, Klaus Ma 
 wrote:  
 
 Any suggestions?

- Klaus

On Mon, Nov 4, 2019 at 5:04 PM Klaus Ma  wrote:

Hi team,
AFAIK, we built k8s/yarn/mesos as resource manager; but I'd like to did some 
enhancement to them, e.g. integrate with Volcano in k8s. Is that possible to do 
that without fork the whole spark project? For example, enable customized 
resource manager with configuration, e.g. replace 
`org.apache.spark.deploy.k8s.submit.KubernetesClientApplication` with 
`MyK8SClient`, so I can only maintain the resource manager instead of the whole 
project.
-- Klaus
  

[ANNOUNCE] Apache Spark 2.2.2

2018-07-10 Thread Tom Graves
We are happy to announce the availability of Spark 2.2.2!
Apache Spark 2.2.2 is a maintenance release, based on the branch-2.2 
maintenance branch of Spark. We strongly recommend all 2.2.x users to upgrade 
to this stable release. The release notes are available at 
http://spark.apache.org/releases/spark-release-2-2-2.html

To download Apache Spark 2.2.2 visit http://spark.apache.org/downloads.html. 
This version of Spark is also available on Maven and PyPI.
We would like to acknowledge all community members for contributing patches to 
this release.



Re: Streaming - lookup against reference data

2016-09-15 Thread Tom Davis
Thanks Jörn, sounds like there's nothing obvious I'm missing, which is
encouraging.

I've not used Redis, but it does seem that for most of my current and
likely future use-cases it would be the best fit (nice compromise of scale
and easy setup / access).

Thanks,

Tom

On Wed, Sep 14, 2016 at 10:09 PM Jörn Franke <jornfra...@gmail.com> wrote:

> Hmm is it just a lookup and the values are small? I do not think that in
> this case redis needs to be installed on each worker node. Redis has a
> rather efficient protocol. Hence one or a few dedicated redis nodes
> probably fit your purpose more then needed. Just try to reuse connections
> and do not establish it for each lookup from the same node.
>
> Additionally Redis has a lot of interesting data structures such as
> hyperloglogs.
>
> Hbase - you can design here where to store which part of the reference
> data set and partition in Spark accordingly. Depends on the data and is
> tricky.
>
> About the other options I am a bit skeptical - especially since you need
> to include updated data, might have side effects.
>
> Nevertheless, you mention all the options that are possible. I guess for a
> true evaluation you have to check your use case, the envisioned future
> architecture for other use cases, required performance, maintability etc.
>
> On 14 Sep 2016, at 20:44, Tom Davis <mailinglists...@gmail.com> wrote:
>
> Hi all,
>
> Interested in patterns people use in the wild for lookup against reference
> data sets from a Spark streaming job. The reference dataset will be updated
> during the life of the job (although being 30mins out of date wouldn't be
> an issue, for example).
>
> So far I have come up with a few options, all of which have advantages and
> disadvantages:
>
> 1. For small reference datasets, distribute the data as an in memory Map()
> from the driver, refreshing it inside the foreachRDD() loop.
>
> Obviously the limitation here is size.
>
> 2. Run a Redis (or similar) cache on each worker node, perform lookups
> against this.
>
> There's some complexity to managing this, probably outside of the Spark
> job.
>
> 3. Load the reference data into an RDD, again inside the foreachRDD() loop
> on the driver. Perform a join of the reference and stream batch RDDs.
> Perhaps keep the reference RDD in memory.
>
> I suspect that this will scale, but I also suspect there's going to be the
> potential for a lot of data shuffling across the network which will slow
> things down.
>
> 4. Similar to the Redis option, but use Hbase. Scales well and makes data
> available to other services but is a call out over the network, albeit
> within the cluster.
>
> I guess there's no solution that fits all, but interested in other
> people's experience and whether I've missed anything obvious.
>
> Thanks,
>
> Tom
>
>


Streaming - lookup against reference data

2016-09-14 Thread Tom Davis
Hi all,

Interested in patterns people use in the wild for lookup against reference
data sets from a Spark streaming job. The reference dataset will be updated
during the life of the job (although being 30mins out of date wouldn't be
an issue, for example).

So far I have come up with a few options, all of which have advantages and
disadvantages:

1. For small reference datasets, distribute the data as an in memory Map()
from the driver, refreshing it inside the foreachRDD() loop.

Obviously the limitation here is size.

2. Run a Redis (or similar) cache on each worker node, perform lookups
against this.

There's some complexity to managing this, probably outside of the Spark job.

3. Load the reference data into an RDD, again inside the foreachRDD() loop
on the driver. Perform a join of the reference and stream batch RDDs.
Perhaps keep the reference RDD in memory.

I suspect that this will scale, but I also suspect there's going to be the
potential for a lot of data shuffling across the network which will slow
things down.

4. Similar to the Redis option, but use Hbase. Scales well and makes data
available to other services but is a call out over the network, albeit
within the cluster.

I guess there's no solution that fits all, but interested in other people's
experience and whether I've missed anything obvious.

Thanks,

Tom


Unresponsive Spark Streaming UI in YARN cluster mode - 1.5.2

2016-07-08 Thread Ellis, Tom (Financial Markets IT)
(StreamingJobProgressListener.scala:93)
org.apache.spark.streaming.scheduler.StreamingListenerBus.onPostEvent(StreamingListenerBus.scala:41)
org.apache.spark.streaming.scheduler.StreamingListenerBus.onPostEvent(StreamingListenerBus.scala:26)
org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:56)
org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:37)
org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:79)
org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1136)
org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:63)




Cheers,

Tom Ellis
Consultant Developer - Excelian
Data Lake | Financial Markets IT
LLOYDS BANK COMMERCIAL BANKING


E: tom.el...@lloydsbanking.com<mailto:tom.el...@lloydsbanking.com>
Website: www.lloydsbankcommercial.com<http://www.lloydsbankcommercial.com/>
, , ,
Reduce printing. Lloyds Banking Group is helping to build the low carbon 
economy.
Corporate Responsibility Report: 
www.lloydsbankinggroup-cr.com/downloads<http://www.lloydsbankinggroup-cr.com/downloads>



Lloyds Banking Group plc. Registered Office: The Mound, Edinburgh EH1 1YZ. 
Registered in Scotland no. SC95000. Telephone: 0131 225 4555. Lloyds Bank plc. 
Registered Office: 25 Gresham Street, London EC2V 7HN. Registered in England 
and Wales no. 2065. Telephone 0207626 1500. Bank of Scotland plc. Registered 
Office: The Mound, Edinburgh EH1 1YZ. Registered in Scotland no. SC327000. 
Telephone: 03457 801 801. Cheltenham & Gloucester plc. Registered Office: 
Barnett Way, Gloucester GL4 3RL. Registered in England and Wales 2299428. 
Telephone: 0345 603 1637

Lloyds Bank plc, Bank of Scotland plc are authorised by the Prudential 
Regulation Authority and regulated by the Financial Conduct Authority and 
Prudential Regulation Authority.

Cheltenham & Gloucester plc is authorised and regulated by the Financial 
Conduct Authority.

Halifax is a division of Bank of Scotland plc. Cheltenham & Gloucester Savings 
is a division of Lloyds Bank plc.

HBOS plc. Registered Office: The Mound, Edinburgh EH1 1YZ. Registered in 
Scotland no. SC218813.

This e-mail (including any attachments) is private and confidential and may 
contain privileged material. If you have received this e-mail in error, please 
notify the sender and delete it (including any attachments) immediately. You 
must not copy, distribute, disclose or use any of the information in it or any 
attachments. Telephone calls may be monitored or recorded.


In yarn-cluster mode, provide system prop to the client jvm

2016-06-16 Thread Ellis, Tom (Financial Markets IT)
Hi,

I was wondering if it was possible to submit a java system property to the JVM 
that does the submission of a yarn-cluster application, for instance, 
-Dlog4j.configuration. I believe it will default to using the SPARK_CONF_DIR's 
log4j.properties, is it possible to override this, as I do not have access?

Cheers,

Tom Ellis
Consultant Developer - Excelian
Data Lake | Financial Markets IT
LLOYDS BANK COMMERCIAL BANKING


E: tom.el...@lloydsbanking.com<mailto:tom.el...@lloydsbanking.com>
Website: www.lloydsbankcommercial.com<http://www.lloydsbankcommercial.com/>
, , ,
Reduce printing. Lloyds Banking Group is helping to build the low carbon 
economy.
Corporate Responsibility Report: 
www.lloydsbankinggroup-cr.com/downloads<http://www.lloydsbankinggroup-cr.com/downloads>



Lloyds Banking Group plc. Registered Office: The Mound, Edinburgh EH1 1YZ. 
Registered in Scotland no. SC95000. Telephone: 0131 225 4555. Lloyds Bank plc. 
Registered Office: 25 Gresham Street, London EC2V 7HN. Registered in England 
and Wales no. 2065. Telephone 0207626 1500. Bank of Scotland plc. Registered 
Office: The Mound, Edinburgh EH1 1YZ. Registered in Scotland no. SC327000. 
Telephone: 03457 801 801. Cheltenham & Gloucester plc. Registered Office: 
Barnett Way, Gloucester GL4 3RL. Registered in England and Wales 2299428. 
Telephone: 0345 603 1637

Lloyds Bank plc, Bank of Scotland plc are authorised by the Prudential 
Regulation Authority and regulated by the Financial Conduct Authority and 
Prudential Regulation Authority.

Cheltenham & Gloucester plc is authorised and regulated by the Financial 
Conduct Authority.

Halifax is a division of Bank of Scotland plc. Cheltenham & Gloucester Savings 
is a division of Lloyds Bank plc.

HBOS plc. Registered Office: The Mound, Edinburgh EH1 1YZ. Registered in 
Scotland no. SC218813.

This e-mail (including any attachments) is private and confidential and may 
contain privileged material. If you have received this e-mail in error, please 
notify the sender and delete it (including any attachments) immediately. You 
must not copy, distribute, disclose or use any of the information in it or any 
attachments. Telephone calls may be monitored or recorded.


RE: HBase / Spark Kerberos problem

2016-05-19 Thread Ellis, Tom (Financial Markets IT)
Yeah we ran into this issue. Key part is to have the hbase jars and 
hbase-site.xml config on the classpath of the spark submitter.

We did it slightly differently from Y Bodnar, where we set the required jars 
and config on the env var SPARK_DIST_CLASSPATH in our spark env file (rather 
than SPARK_CLASSPATH which is deprecated).

With this and –principal/--keytab, if you turn DEBUG logging for 
org.apache.spark.deploy.yarn you should see “Added HBase security token to 
credentials.”

Otherwise you should at least hopefully see the error where it fails to add the 
HBase tokens.

Check out the source of Client [1] and YarnSparkHadoopUtil  [2] – you’ll see 
how obtainTokenForHBase is being done.

It’s a bit confusing as to why it says you haven’t kinited even when you do 
loginUserFromKeytab – I haven’t quite worked through the reason for that yet.

Cheers,

Tom Ellis
telli...@gmail.com<mailto:telli...@gmail.com>

[1] 
https://github.com/apache/spark/blob/master/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
[2] 
https://github.com/apache/spark/blob/master/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala


From: John Trengrove [mailto:john.trengr...@servian.com.au]
Sent: 19 May 2016 08:09
To: philipp.meyerhoe...@thomsonreuters.com
Cc: user
Subject: Re: HBase / Spark Kerberos problem

-- This email has reached the Bank via an external source --

Have you had a look at this issue?

https://issues.apache.org/jira/browse/SPARK-12279

There is a comment by Y Bodnar on how they successfully got Kerberos and HBase 
working.

2016-05-18 18:13 GMT+10:00 
<philipp.meyerhoe...@thomsonreuters.com<mailto:philipp.meyerhoe...@thomsonreuters.com>>:
Hi all,

I have been puzzling over a Kerberos problem for a while now and wondered if 
anyone can help.

For spark-submit, I specify --keytab x --principal y, which creates my 
SparkContext fine.
Connections to Zookeeper Quorum to find the HBase master work well too.
But when it comes to a .count() action on the RDD, I am always presented with 
the stack trace at the end of this mail.

We are using CDH5.5.2 (spark 1.5.0), and com.cloudera.spark.hbase.HBaseContext 
is a wrapper around TableInputFormat/hadoopRDD (see 
https://github.com/cloudera-labs/SparkOnHBase), as you can see in the stack 
trace.

Am I doing something obvious wrong here?
A similar flow, inside test code, works well, only going via spark-submit 
exposes this issue.

Code snippet (I have tried using the commented-out lines in various 
combinations, without success):

   val conf = new SparkConf().
  set("spark.shuffle.consolidateFiles", "true").
  set("spark.kryo.registrationRequired", "false").
  set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").
  set("spark.kryoserializer.buffer", "30m")
val sc = new SparkContext(conf)
val cfg = sc.hadoopConfiguration
//cfg.addResource(new 
org.apache.hadoop.fs.Path("/etc/hbase/conf/hbase-site.xml"))
//
UserGroupInformation.getCurrentUser.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.KERBEROS)
//cfg.set("hbase.security.authentication", "kerberos")
val hc = new HBaseContext(sc, cfg)
val scan = new Scan
scan.setTimeRange(startMillis, endMillis)
val matchesInRange = hc.hbaseRDD(MY_TABLE, scan, resultToMatch)
val cnt = matchesInRange.count()
log.info<http://log.info>(s"matches in range $cnt")

Stack trace / log:

16/05/17 17:04:47 INFO SparkContext: Starting job: count at Analysis.scala:93
16/05/17 17:04:47 INFO DAGScheduler: Got job 0 (count at Analysis.scala:93) 
with 1 output partitions
16/05/17 17:04:47 INFO DAGScheduler: Final stage: ResultStage 0(count at 
Analysis.scala:93)
16/05/17 17:04:47 INFO DAGScheduler: Parents of final stage: List()
16/05/17 17:04:47 INFO DAGScheduler: Missing parents: List()
16/05/17 17:04:47 INFO DAGScheduler: Submitting ResultStage 0 
(MapPartitionsRDD[1] at map at HBaseContext.scala:580), which has no missing 
parents
16/05/17 17:04:47 INFO MemoryStore: ensureFreeSpace(3248) called with 
curMem=428022, maxMem=244187136
16/05/17 17:04:47 INFO MemoryStore: Block broadcast_3 stored as values in 
memory (estimated size 3.2 KB, free 232.5 MB)
16/05/17 17:04:47 INFO MemoryStore: ensureFreeSpace(2022) called with 
curMem=431270, maxMem=244187136
16/05/17 17:04:47 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in 
memory (estimated size 2022.0 B, free 232.5 MB)
16/05/17 17:04:47 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 
10.6.164.40:33563<http://10.6.164.40:33563> (size: 2022.0 B, free: 232.8 MB)
16/05/17 17:04:47 INFO SparkContext: Created broadcast 3 from broadcast at 
DAGScheduler.scala:861
16/05/17 17:04:47 INFO DAGScheduler: Submitting 1 missing tasks from 
ResultStage 0 (MapPartitionsRDD[1] at map at HBaseContext.scala:580)
16/

Re: Java: Return type of RDDFunctions.sliding(int, int)

2016-05-13 Thread Tom Godden
I corrected the type to RDD<ArrayList[]>, but it's still giving
me the error.
I believe I have found the reason though. The vals variable is created
using the map procedure on some other RDD. Although it is declared as a
JavaRDD<ArrayList>, the classTag it returns is Object. I think
that because of this, the RDD returned from sliding() only accepts
Object as a type.
I have no idea how to fix this though.

On 13-05-16 13:12, Sean Owen wrote:
> The Java docs won't help since they only show "Object", yes. Have a
> look at the Scala docs:
> https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.rdd.RDDFunctions
>
> An RDD of T produces an RDD of T[].
>
> On Fri, May 13, 2016 at 12:10 PM, Tom Godden <tgod...@vub.ac.be> wrote:
>> I assumed the "fixed size blocks" mentioned in the documentation
>> (https://spark.apache.org/docs/1.6.0/api/java/org/apache/spark/mllib/rdd/RDDFunctions.html#sliding%28int,%20int%29)
>> were RDDs, but I guess they're arrays? Even when I change the RDD to
>> arrays (so it looks like RDD<ArrayList[]>), it doesn't work.
>> I'm passing an RDD of ArrayLists of Integers to the sliding functions,
>> so that's where the ArrayList comes from.
>> I can't seem to find up to date example code, could you maybe give an
>> example?
>>
>> On 13-05-16 12:53, Sean Owen wrote:
>>> I'm not sure what you're trying there. The return type is an RDD of
>>> arrays, not of RDDs or of ArrayLists. There may be another catch but
>>> that is not it.
>>>
>>> On Fri, May 13, 2016 at 11:50 AM, Tom Godden <tgod...@vub.ac.be> wrote:
>>>> I believe it's an illegal cast. This is the line of code:
>>>>> RDD<RDD<ArrayList>> windowed =
>>>>> RDDFunctions.fromRDD(vals.rdd(), vals.classTag()).sliding(20, 1);
>>>> with vals being a JavaRDD<ArrayList>.  Explicitly casting
>>>> doesn't work either:
>>>>> RDD<RDD<ArrayList>> windowed = (RDD<RDD<ArrayList>>)
>>>>> RDDFunctions.fromRDD(vals.rdd(), vals.classTag()).sliding(20, 1);
>>>> Did I miss something?
>>>>
>>>> On 13-05-16 09:44, Sean Owen wrote:
>>>>> The problem is there's no Java-friendly version of this, and the Scala
>>>>> API return type actually has no analog in Java (an array of any type,
>>>>> not just of objects) so it becomes Object. You can just cast it to the
>>>>> type you know it will be -- RDD<String[]> or RDD<long[]> or whatever.
>>>>>
>>>>> On Fri, May 13, 2016 at 8:40 AM, tgodden <tgod...@vub.ac.be> wrote:
>>>>>> Hello,
>>>>>>
>>>>>> We're trying to use PrefixSpan on sequential data, by passing a sliding
>>>>>> window over it. Spark Streaming is not an option.
>>>>>> RDDFunctions.sliding() returns an item of class RDD,
>>>>>> regardless of the original type of the RDD. Because of this, the
>>>>>> returned item seems to be pretty much worthless.
>>>>>> Is this a bug/nyi? Is there a way to circumvent this somehow?
>>>>>>
>>>>>> Official docs:
>>>>>> https://spark.apache.org/docs/1.6.0/api/java/org/apache/spark/mllib/rdd/RDDFunctions.html
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>> 
>>>>>> View this message in context: Java: Return type of 
>>>>>> RDDFunctions.sliding(int,
>>>>>> int)
>>>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Java: Return type of RDDFunctions.sliding(int, int)

2016-05-13 Thread Tom Godden
I assumed the "fixed size blocks" mentioned in the documentation
(https://spark.apache.org/docs/1.6.0/api/java/org/apache/spark/mllib/rdd/RDDFunctions.html#sliding%28int,%20int%29)
were RDDs, but I guess they're arrays? Even when I change the RDD to
arrays (so it looks like RDD<ArrayList[]>), it doesn't work.
I'm passing an RDD of ArrayLists of Integers to the sliding functions,
so that's where the ArrayList comes from.
I can't seem to find up to date example code, could you maybe give an
example?

On 13-05-16 12:53, Sean Owen wrote:
> I'm not sure what you're trying there. The return type is an RDD of
> arrays, not of RDDs or of ArrayLists. There may be another catch but
> that is not it.
>
> On Fri, May 13, 2016 at 11:50 AM, Tom Godden <tgod...@vub.ac.be> wrote:
>> I believe it's an illegal cast. This is the line of code:
>>> RDD<RDD<ArrayList>> windowed =
>>> RDDFunctions.fromRDD(vals.rdd(), vals.classTag()).sliding(20, 1);
>> with vals being a JavaRDD<ArrayList>.  Explicitly casting
>> doesn't work either:
>>> RDD<RDD<ArrayList>> windowed = (RDD<RDD<ArrayList>>)
>>> RDDFunctions.fromRDD(vals.rdd(), vals.classTag()).sliding(20, 1);
>> Did I miss something?
>>
>> On 13-05-16 09:44, Sean Owen wrote:
>>> The problem is there's no Java-friendly version of this, and the Scala
>>> API return type actually has no analog in Java (an array of any type,
>>> not just of objects) so it becomes Object. You can just cast it to the
>>> type you know it will be -- RDD<String[]> or RDD<long[]> or whatever.
>>>
>>> On Fri, May 13, 2016 at 8:40 AM, tgodden <tgod...@vub.ac.be> wrote:
>>>> Hello,
>>>>
>>>> We're trying to use PrefixSpan on sequential data, by passing a sliding
>>>> window over it. Spark Streaming is not an option.
>>>> RDDFunctions.sliding() returns an item of class RDD,
>>>> regardless of the original type of the RDD. Because of this, the
>>>> returned item seems to be pretty much worthless.
>>>> Is this a bug/nyi? Is there a way to circumvent this somehow?
>>>>
>>>> Official docs:
>>>> https://spark.apache.org/docs/1.6.0/api/java/org/apache/spark/mllib/rdd/RDDFunctions.html
>>>>
>>>> Thanks
>>>>
>>>> 
>>>> View this message in context: Java: Return type of 
>>>> RDDFunctions.sliding(int,
>>>> int)
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Java: Return type of RDDFunctions.sliding(int, int)

2016-05-13 Thread Tom Godden
I believe it's an illegal cast. This is the line of code:
> RDD> windowed =
> RDDFunctions.fromRDD(vals.rdd(), vals.classTag()).sliding(20, 1);
with vals being a JavaRDD.  Explicitly casting
doesn't work either:
> RDD> windowed = (RDD>)
> RDDFunctions.fromRDD(vals.rdd(), vals.classTag()).sliding(20, 1);
Did I miss something?

On 13-05-16 09:44, Sean Owen wrote:
> The problem is there's no Java-friendly version of this, and the Scala
> API return type actually has no analog in Java (an array of any type,
> not just of objects) so it becomes Object. You can just cast it to the
> type you know it will be -- RDD or RDD or whatever.
>
> On Fri, May 13, 2016 at 8:40 AM, tgodden  wrote:
>> Hello,
>>
>> We're trying to use PrefixSpan on sequential data, by passing a sliding
>> window over it. Spark Streaming is not an option.
>> RDDFunctions.sliding() returns an item of class RDD,
>> regardless of the original type of the RDD. Because of this, the
>> returned item seems to be pretty much worthless.
>> Is this a bug/nyi? Is there a way to circumvent this somehow?
>>
>> Official docs:
>> https://spark.apache.org/docs/1.6.0/api/java/org/apache/spark/mllib/rdd/RDDFunctions.html
>>
>> Thanks
>>
>> 
>> View this message in context: Java: Return type of RDDFunctions.sliding(int,
>> int)
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: My notes on Spark Performance & Tuning Guide

2016-05-12 Thread Tom Ellis
I would like to also Mich, please send it through, thanks!

On Thu, 12 May 2016 at 15:14 Alonso Isidoro  wrote:

> Me too, send me the guide.
>
> Enviado desde mi iPhone
>
> El 12 may 2016, a las 12:11, Ashok Kumar  > escribió:
>
> Hi Dr Mich,
>
> I will be very keen to have a look at it and review if possible.
>
> Please forward me a copy
>
> Thanking you warmly
>
>
> On Thursday, 12 May 2016, 11:08, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>
> Hi Al,,
>
>
> Following the threads in spark forum, I decided to write up on
> configuration of Spark including allocation of resources and configuration
> of driver, executors, threads, execution of Spark apps and general
> troubleshooting taking into account the allocation of resources for Spark
> applications and OS tools at the disposal.
>
> Since the most widespread configuration as I notice is with "Spark
> Standalone Mode", I have decided to write these notes starting with
> Standalone and later on moving to Yarn
>
>
>- *Standalone *– a simple cluster manager included with Spark that
>makes it easy to set up a cluster.
>- *YARN* – the resource manager in Hadoop 2.
>
>
> I would appreciate if anyone interested in reading and commenting to get
> in touch with me directly on mich.talebza...@gmail.com so I can send the
> write-up for their review and comments.
>
> Just to be clear this is not meant to be any commercial proposition or
> anything like that. As I seem to get involved with members troubleshooting
> issues and threads on this topic, I thought it is worthwhile writing a note
> about it to summarise the findings for the benefit of the community.
>
> Regards.
>
> Dr Mich Talebzadeh
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
> http://talebzadehmich.wordpress.com
>
>
>
>


Re: Using spark.memory.useLegacyMode true does not yield expected behavior

2016-04-11 Thread Tom Hubregtsen
Solved:
Call spark-submit with

--driver-memory 512m --driver-java-options
"-Dspark.memory.useLegacyMode=true -Dspark.shuffle.memoryFraction=0.2
-Dspark.storage.memoryFraction=0.6 -Dspark.storage.unrollFraction=0.2"

Thanks to:
https://issues.apache.org/jira/browse/SPARK-14367



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Using-spark-memory-useLegacyMode-true-does-not-yield-expected-behavior-tp26631p26750.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Using spark.memory.useLegacyMode true does not yield expected behavior

2016-03-29 Thread Tom Hubregtsen
Hi,

I am trying to get the same memory behavior in Spark 1.6 as I had in Spark
1.3 with default settings. 

I set 
--driver-java-options "--Dspark.memory.useLegacyMode=true
-Dspark.shuffle.memoryFraction=0.2 -Dspark.storage.memoryFraction=0.6
-Dspark.storage.unrollFraction=0.2"
in Spark 1.6.

But the numbers don't add up. For instance:
--driver-java-options "-Dspark.shuffle.memoryFraction=0.1
-Dspark.storage.memoryFraction=0.1"
in Spark 1.3.1 leads to:
16/03/29 14:47:36 INFO MemoryStore: MemoryStore started with capacity 46.1
MB
The same in Spark 1.6.0 with -Dspark.memory.useLegacyMode=true
-Dspark.shuffle.memoryFraction=0.1 -Dspark.storage.memoryFraction=0.1.
16/03/29 14:50:55 INFO MemoryStore: MemoryStore started with capacity 92.2
MB

If I then increase both fractions to 0.2, the numbers of the MemoryStore
both double (as one would expect), but that means there is still a 2x
difference in allocated memory between Spark 1.3 and Spark 1.6. So my
question:

What do I need to do to get the default memory behavior of Spark 1.3.1 in
Spark 1.6.0?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Using-spark-memory-useLegacyMode-true-does-not-yield-expected-behavior-tp26631.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



What is the relationship between reduceByKey and spark.driver.maxResultSize?

2015-12-11 Thread Tom Seddon
I have a job that is running into intermittent errors with  [SparkDriver]
java.lang.OutOfMemoryError: Java heap space.  Before I was getting this
error I was getting errors saying the result size exceed the
spark.driver.maxResultSize.
This does not make any sense to me, as there are no actions in my job that
send data to the driver - just a pull of data from S3, a map and
reduceByKey and then conversion to dataframe and saveAsTable action that
puts the results back on S3.

I've found a few references to reduceByKey and spark.driver.maxResultSize
having some importance, but cannot fathom how this setting could be related.

Would greatly appreciated any advice.

Thanks in advance,

Tom


Shuffle FileNotFound Exception

2015-11-18 Thread Tom Arnfeld
Hey,

I’m wondering if anyone has run into issues with Spark 1.5 and a FileNotFound 
exception with shuffle.index files? It’s been cropping up with very large joins 
and aggregations, and causing all of our jobs to fail towards the end. The 
memory limit for the executors (we’re running on mesos) is touching 60GB+ with 
~10 cores per executor, which is way oversubscribed.

We’re running spark inside containers, and have configured 
“spark.executorEnv.SPARK_LOCAL_DIRS” to a special directory path in the 
container for performance/disk reasons, and since then the issue started to 
arise. I’m wondering if there’s a bug with the way spark looks for shuffle 
files, and one of the implementations isn’t obeying the path properly?

I don’t want to set "spark.local.dir” because that requires the driver also 
have this directory set up, which is not the case.

Has anyone seen this issue before?



15/11/18 11:32:27 ERROR storage.ShuffleBlockFetcherIterator: Failed to get 
block(s) from XXX:50777
java.lang.RuntimeException: java.io.FileNotFoundException: 
/mnt/mesos/sandbox/spark-tmp/blockmgr-7a5b85c8-7c5f-43c2-b9e2-2bc0ffdb902d/23/shuffle_2_94_0.index
 (No such file or directory)
   at java.io.FileInputStream.open(Native Method)
   at java.io.FileInputStream.(FileInputStream.java:146)
   at 
org.apache.spark.shuffle.IndexShuffleBlockResolver.getBlockData(IndexShuffleBlockResolver.scala:98)
   at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:300)
   at 
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
   at 
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
   at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
   at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
   at 
org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
   at 
org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114)
   at 
org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87)
   at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101)
   at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
   at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
   at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
   at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
   at 
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
   at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
   at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
   at 
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
   at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
   at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
   at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)
   at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
   at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
   at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
   at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
   at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
   at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
   at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
   at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
   at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
   at java.lang.Thread.run(Thread.java:745)

   at 
org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:162)
   at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:103)
   at 

Re: Shuffle FileNotFound Exception

2015-11-18 Thread Tom Arnfeld
Hi Romi,

Thanks! Could you give me an indication of how much increase the partitions by? 
We’ll take a stab in the dark, the input data is around 5M records (though each 
record is fairly small). We’ve had trouble both with DataFrames and RDDs.

Tom.

> On 18 Nov 2015, at 12:04, Romi Kuntsman <r...@totango.com> wrote:
> 
> I had many issues with shuffles (but not this one exactly), and what 
> eventually solved it was to repartition to input into more parts. Have you 
> tried that?
> 
> P.S. not sure if related, but there's a memory leak in the shuffle mechanism
> https://issues.apache.org/jira/browse/SPARK-11293 
> <https://issues.apache.org/jira/browse/SPARK-11293>
> 
> Romi Kuntsman, Big Data Engineer
> http://www.totango.com <http://www.totango.com/>
> 
> On Wed, Nov 18, 2015 at 2:00 PM, Tom Arnfeld <t...@duedil.com 
> <mailto:t...@duedil.com>> wrote:
> Hey,
> 
> I’m wondering if anyone has run into issues with Spark 1.5 and a FileNotFound 
> exception with shuffle.index files? It’s been cropping up with very large 
> joins and aggregations, and causing all of our jobs to fail towards the end. 
> The memory limit for the executors (we’re running on mesos) is touching 60GB+ 
> with ~10 cores per executor, which is way oversubscribed.
> 
> We’re running spark inside containers, and have configured 
> “spark.executorEnv.SPARK_LOCAL_DIRS” to a special directory path in the 
> container for performance/disk reasons, and since then the issue started to 
> arise. I’m wondering if there’s a bug with the way spark looks for shuffle 
> files, and one of the implementations isn’t obeying the path properly?
> 
> I don’t want to set "spark.local.dir” because that requires the driver also 
> have this directory set up, which is not the case.
> 
> Has anyone seen this issue before?
> 
> 
> 
> 15/11/18 11:32:27 ERROR storage.ShuffleBlockFetcherIterator: Failed to get 
> block(s) from XXX:50777
> java.lang.RuntimeException: java.io.FileNotFoundException: 
> /mnt/mesos/sandbox/spark-tmp/blockmgr-7a5b85c8-7c5f-43c2-b9e2-2bc0ffdb902d/23/shuffle_2_94_0.index
>  (No such file or directory)
>at java.io.FileInputStream.open(Native Method)
>at java.io.FileInputStream.(FileInputStream.java:146)
>at 
> org.apache.spark.shuffle.IndexShuffleBlockResolver.getBlockData(IndexShuffleBlockResolver.scala:98)
>at 
> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:300)
>at 
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>at 
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>at 
> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
>at 
> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114)
>at 
> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87)
>at 
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101)
>at 
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
>at 
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>at 
> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
>at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>at 
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>at 
> io.netty.channel

Re: anyone using netlib-java with sparkR on yarn spark1.6?

2015-11-11 Thread Tom Graves
Is there anything other then the spark assembly that needs to be in the 
classpath?  I verified the assembly was built right and its in the classpath 
(else nothing would work).
Thanks,Tom 


 On Tuesday, November 10, 2015 8:29 PM, Shivaram Venkataraman 
<shiva...@eecs.berkeley.edu> wrote:
   

 I think this is happening in the driver. Could you check the classpath
of the JVM that gets started ? If you use spark-submit on yarn the
classpath is setup before R gets launched, so it should match the
behavior of Scala / Python.

Thanks
Shivaram

On Fri, Nov 6, 2015 at 1:39 PM, Tom Graves <tgraves...@yahoo.com.invalid> wrote:
> I'm trying to use the netlib-java stuff with mllib and sparkR on yarn. I've
> compiled with -Pnetlib-lgpl, see the necessary things in the spark assembly
> jar.  The nodes have  /usr/lib64/liblapack.so.3, /usr/lib64/libblas.so.3,
> and /usr/lib/libgfortran.so.3.
>
>
> Running:
> data <- read.df(sqlContext, 'data.csv', 'com.databricks.spark.csv')
> mdl = glm(C2~., data, family="gaussian")
>
> But I get the error:
> 15/11/06 21:17:27 WARN LAPACK: Failed to load implementation from:
> com.github.fommil.netlib.NativeSystemLAPACK
> 15/11/06 21:17:27 WARN LAPACK: Failed to load implementation from:
> com.github.fommil.netlib.NativeRefLAPACK
> 15/11/06 21:17:27 ERROR RBackendHandler: fitRModelFormula on
> org.apache.spark.ml.api.r.SparkRWrappers failed
> Error in invokeJava(isStatic = TRUE, className, methodName, ...) :
>  java.lang.AssertionError: assertion failed: lapack.dpotrs returned 18.
>        at scala.Predef$.assert(Predef.scala:179)
>        at
> org.apache.spark.mllib.linalg.CholeskyDecomposition$.solve(CholeskyDecomposition.scala:40)
>        at
> org.apache.spark.ml.optim.WeightedLeastSquares.fit(WeightedLeastSquares.scala:114)
>        at
> org.apache.spark.ml.regression.LinearRegression.train(LinearRegression.scala:166)
>        at
> org.apache.spark.ml.regression.LinearRegression.train(LinearRegression.scala:65)
>        at org.apache.spark.ml.Predictor.fit(Predictor.scala:90)
>        at org.apache.spark.ml.Predictor.fit(Predictor.scala:71)
>        at
> org.apache.spark.ml.Pipeline$$anonfun$fit$2.apply(Pipeline.scala:138)
>        at
> org.apache.spark.ml.Pipeline$$anonfun$fit$2.apply(Pipeline.scala:134)
>
> Anyone have this working?
>
> Thanks,
> Tom

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



  

anyone using netlib-java with sparkR on yarn spark1.6?

2015-11-06 Thread Tom Graves
I'm trying to use the netlib-java stuff with mllib and sparkR on yarn. I've 
compiled with -Pnetlib-lgpl, see the necessary things in the spark assembly 
jar.  The nodes have  /usr/lib64/liblapack.so.3, /usr/lib64/libblas.so.3, and 
/usr/lib/libgfortran.so.3.

Running:data <- read.df(sqlContext, 'data.csv', 'com.databricks.spark.csv')
mdl = glm(C2~., data, family="gaussian")

But I get the error:15/11/06 21:17:27 WARN LAPACK: Failed to load 
implementation from: com.github.fommil.netlib.NativeSystemLAPACK15/11/06 
21:17:27 WARN LAPACK: Failed to load implementation from: 
com.github.fommil.netlib.NativeRefLAPACK15/11/06 21:17:27 ERROR 
RBackendHandler: fitRModelFormula on org.apache.spark.ml.api.r.SparkRWrappers 
failedError in invokeJava(isStatic = TRUE, className, methodName, ...) :   
java.lang.AssertionError: assertion failed: lapack.dpotrs returned 18.       at 
scala.Predef$.assert(Predef.scala:179)        at 
org.apache.spark.mllib.linalg.CholeskyDecomposition$.solve(CholeskyDecomposition.scala:40)
        at 
org.apache.spark.ml.optim.WeightedLeastSquares.fit(WeightedLeastSquares.scala:114)
        at 
org.apache.spark.ml.regression.LinearRegression.train(LinearRegression.scala:166)
        at 
org.apache.spark.ml.regression.LinearRegression.train(LinearRegression.scala:65)
        at org.apache.spark.ml.Predictor.fit(Predictor.scala:90)        at 
org.apache.spark.ml.Predictor.fit(Predictor.scala:71)        at 
org.apache.spark.ml.Pipeline$$anonfun$fit$2.apply(Pipeline.scala:138)        at 
org.apache.spark.ml.Pipeline$$anonfun$fit$2.apply(Pipeline.scala:134)
Anyone have this working?
Thanks,Tom

sparkR 1.5.1 batch yarn-client mode failing on daemon.R not found

2015-10-30 Thread Tom Stewart
I have the following script in a file named test.R:

library(SparkR)
sc <- sparkR.init(master="yarn-client")
sqlContext <- sparkRSQL.init(sc)
df <- createDataFrame(sqlContext, faithful)
showDF(df)
sparkR.stop()
q(save="no")

If I submit this with "sparkR test.R" or "R  CMD BATCH test.R" or "Rscript 
test.R" it fails with this error:
15/10/29 08:08:49 INFO r.BufferedStreamThread: Fatal error: cannot open file 
'/mnt/hdfs9/yarn/nm-local-dir/usercache/hadoop/appcache/application_1446058618330_0171/container_e805_1446058618330_0171_01_05/sparkr/SparkR/worker/daemon.R':
 No such file or directory
15/10/29 08:08:59 ERROR executor.Executor: Exception in task 0.0 in stage 1.0 
(TID 1)
java.net.SocketTimeoutException: Accept timed out


However, if I launch just an interactive sparkR shell and cut/paste those 
commands - it runs fine.
It also runs fine on the same Hadoop cluster with Spark 1.4.1.
And, it runs fine from batch mode if I just use sparkR.init() and not 
sparkR.init(master="yarn-client") 



Spark 1.5.1 Dynamic Resource Allocation

2015-10-30 Thread Tom Stewart
I am running the following command on a Hadoop cluster to launch Spark shell 
with DRA:
spark-shell  --conf spark.dynamicAllocation.enabled=true --conf 
spark.shuffle.service.enabled=true --conf 
spark.dynamicAllocation.minExecutors=4 --conf 
spark.dynamicAllocation.maxExecutors=12 --conf 
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout=120 --conf 
spark.dynamicAllocation.schedulerBacklogTimeout=300 --conf 
spark.dynamicAllocation.executorIdleTimeout=60 --executor-memory 512m --master 
yarn-client --queue default

This is the code I'm running within the Spark Shell - just demo stuff from teh 
web site.

import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.Vectors

// Load and parse the data
val data = sc.textFile("hdfs://ns/public/sample/kmeans_data.txt")

val parsedData = data.map(s => Vectors.dense(s.split(' 
').map(_.toDouble))).cache()

// Cluster the data into two classes using KMeans
val numClusters = 2
val numIterations = 20
val clusters = KMeans.train(parsedData, numClusters, numIterations)

This works fine on Spark 1.4.1 but is failing on Spark 1.5.1. Did something 
change that I need to do differently for DRA on 1.5.1?

This is the error I am getting:
15/10/29 21:44:19 WARN YarnScheduler: Initial job has not accepted any 
resources; check your cluster UI to ensure that workers are registered and have 
sufficient resources
15/10/29 21:44:34 WARN YarnScheduler: Initial job has not accepted any 
resources; check your cluster UI to ensure that workers are registered and have 
sufficient resources
15/10/29 21:44:49 WARN YarnScheduler: Initial job has not accepted any 
resources; check your cluster UI to ensure that workers are registered and have 
sufficient resources

That happens to be the same error you get if you haven't followed the steps to 
enable DRA, however I have done those and as I said if I just flip to Spark 
1.4.1 on the same cluster it works with my YARN config.



Changing application log level in standalone cluster

2015-10-13 Thread Tom Graves
I would like to change the logging level for my application running on a 
standalone Spark cluster.  Is there an easy way to do that  without changing 
the log4j.properties on each individual node?
Thanks,Tom

Re: Spark on Mesos with Jobs in Cluster Mode Documentation

2015-09-11 Thread Tom Waterhouse (tomwater)
Tim,

Thank you for the explanation.  You are correct, my Mesos experience is very 
light, and I haven’t deployed anything via Marathon yet.  What you have stated 
here makes sense, I will look into doing this.

Adding this info to the docs would be great.  Is the appropriate action to 
create an issue regarding improvement of the docs?  For those of us who are 
gaining the experience having such a pointer is very helpful.

Tom

From: Tim Chen <t...@mesosphere.io<mailto:t...@mesosphere.io>>
Date: Thursday, September 10, 2015 at 10:25 AM
To: Tom Waterhouse <tomwa...@cisco.com<mailto:tomwa...@cisco.com>>
Cc: "user@spark.apache.org<mailto:user@spark.apache.org>" 
<user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: Spark on Mesos with Jobs in Cluster Mode Documentation

Hi Tom,

Sorry the documentation isn't really rich, since it's probably assuming users 
understands how Mesos and framework works.

First I need explain the rationale of why create the dispatcher. If you're not 
familiar with Mesos yet, each node in your datacenter is installed a Mesos 
slave where it's responsible for publishing resources and running/watching 
tasks, and Mesos master is responsible for taking the aggregated resources and 
scheduling them among frameworks.

Frameworks are not managed by Mesos, as Mesos master/slave doesn't launch and 
maintain framework but assume they're launched and kept running on its own. All 
the existing frameworks in the ecosystem therefore all have their own ways to 
deploy, HA and persist state (e.g: Aurora, Marathon, etc).

Therefore, to introduce cluster mode with Mesos, we must create a framework 
that is long running that can be running in your datacenter, and can handle 
launching spark drivers on demand and handle HA, etc. This is what the 
dispatcher is all about.

So the idea is that you should launch the dispatcher not on the client, but on 
a machine in your datacenter. In Mesosphere's DCOS we launch all frameworks and 
long running services with Marathon, and you can use Marathon to launch the 
Spark dispatcher.

Then all clients instead of specifying the Mesos master URL (e.g: 
mesos://mesos.master:2181), then just talks to the dispatcher only 
(mesos://spark-dispatcher.mesos:7077), and the dispatcher will then start and 
watch the driver for you.

Tim



On Thu, Sep 10, 2015 at 10:13 AM, Tom Waterhouse (tomwater) 
<tomwa...@cisco.com<mailto:tomwa...@cisco.com>> wrote:
After spending most of yesterday scouring the Internet for sources of 
documentation for submitting Spark jobs in cluster mode to a Spark cluster 
managed by Mesos I was able to do just that, but I am not convinced that how I 
have things setup is correct.

I used the Mesos 
published<https://open.mesosphere.com/getting-started/datacenter/install/> 
instructions for setting up my Mesos cluster.  I have three Zookeeper 
instances, three Mesos master instances, and three Mesos slave instances.  This 
is all running in Openstack.

The documentation on the Spark documentation site states that “To use cluster 
mode, you must start the MesosClusterDispatcher in your cluster via the 
sbin/start-mesos-dispatcher.sh script, passing in the Mesos master url (e.g: 
mesos://host:5050).”  That is it, no more information than that.  So that is 
what I did: I have one machine that I use as the Spark client for submitting 
jobs.  I started the Mesos dispatcher with script as described, and using the 
client machine’s IP address and port as the target for the job submitted the 
job.

The job is currently running in Mesos as expected.  This is not however how I 
would have expected to configure the system.  As running there is one instance 
of the Spark Mesos dispatcher running outside of Mesos, so not a part of the 
sphere of Mesos resource management.

I used the following Stack Overflow posts as guidelines:
http://stackoverflow.com/questions/31164725/spark-mesos-dispatcher
http://stackoverflow.com/questions/31294515/start-spark-via-mesos

There must be better documentation on how to deploy Spark in Mesos with jobs 
able to be deployed in cluster mode.

I can follow up with more specific information regarding my deployment if 
necessary.

Tom



Spark on Mesos with Jobs in Cluster Mode Documentation

2015-09-10 Thread Tom Waterhouse (tomwater)
After spending most of yesterday scouring the Internet for sources of 
documentation for submitting Spark jobs in cluster mode to a Spark cluster 
managed by Mesos I was able to do just that, but I am not convinced that how I 
have things setup is correct.

I used the Mesos 
published<https://open.mesosphere.com/getting-started/datacenter/install/> 
instructions for setting up my Mesos cluster.  I have three Zookeeper 
instances, three Mesos master instances, and three Mesos slave instances.  This 
is all running in Openstack.

The documentation on the Spark documentation site states that “To use cluster 
mode, you must start the MesosClusterDispatcher in your cluster via the 
sbin/start-mesos-dispatcher.sh script, passing in the Mesos master url (e.g: 
mesos://host:5050).”  That is it, no more information than that.  So that is 
what I did: I have one machine that I use as the Spark client for submitting 
jobs.  I started the Mesos dispatcher with script as described, and using the 
client machine’s IP address and port as the target for the job submitted the 
job.

The job is currently running in Mesos as expected.  This is not however how I 
would have expected to configure the system.  As running there is one instance 
of the Spark Mesos dispatcher running outside of Mesos, so not a part of the 
sphere of Mesos resource management.

I used the following Stack Overflow posts as guidelines:
http://stackoverflow.com/questions/31164725/spark-mesos-dispatcher
http://stackoverflow.com/questions/31294515/start-spark-via-mesos

There must be better documentation on how to deploy Spark in Mesos with jobs 
able to be deployed in cluster mode.

I can follow up with more specific information regarding my deployment if 
necessary.

Tom


java.lang.NoSuchMethodError and yarn-client mode

2015-09-09 Thread Tom Seddon
Hi,

I have a problem trying to get a fairly simple app working which makes use
of native avro libraries.  The app runs fine on my local machine and in
yarn-cluster mode, but when I try to run it on EMR yarn-client mode I get
the error below.  I'm aware this is a version problem, as EMR runs an
earlier version of avro, and I am trying to use avro-1.7.7.

What's confusing me a great deal is the fact that this runs fine in
yarn-cluster mode.

What is it about yarn-cluster mode that means the application has access to
the correct version of the avro library?  I need to run in yarn-client mode
as I will be caching data to the driver machine in between batches.  I
think in yarn-cluster mode the driver can run on any machine in the cluster
so this would not work.

Grateful for any advice as I'm really stuck on this.  AWS support are
trying but they don't seem to know why this is happening either!

Just to note, I'm aware of Databricks spark-avro project and have used it.
This is an investigation to see if I can use RDDs instead of dataframes.

java.lang.NoSuchMethodError:
org.apache.avro.Schema$Parser.parse(Ljava/lang/String;[Ljava/lang/String;)Lorg/apache/avro/Schema;
at ophan.thrift.event.Event.(Event.java:10)
at SimpleApp$.main(SimpleApp.scala:25)
at SimpleApp.main(SimpleApp.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Thanks,

Tom


Help getting Spark JDBC metadata

2015-09-09 Thread Tom Barber
Hi guys

Hopefully someone can help me, or at least explain stuff to me.

I use a tool that required JDBC metadata (tables/columns etc)

So using spark 1.3.1 I try stuff like:

registerTempTable()
or saveAsTable()

on my parquet file.

The former doesn't show any table metadata for JDBC connections, but you
can query the table, which is annoying.
The latter shows a table but the column metadata is 1 column type
array, again I can query the table.

What I found I can do though is create a standard SQL table in beeline with
all its columns defined, and then insert into that table the contents of my
invisible parquet table, but I assume that removes the data from parquet
and stores it in hive, and I'd prefer to stick with parquet.

Ideally i'd like to be able to run

CREATE TEMPORARY TABLE XYZ
USING org.apache.spark.sql.parquet
OPTIONS (
  path "/user/ubuntu/file_with_id.par"
   define my table columns
)

Is something like that possible, does that make any sense?

Thanks

Tom


Re: java.lang.NoSuchMethodError and yarn-client mode

2015-09-09 Thread Tom Seddon
Thanks for your reply Aniket.

Ok I've done this and I'm still confused.  Output from running locally
shows:

file:/home/tom/spark-avro/target/scala-2.10/simpleapp.jar
file:/home/tom/spark-1.4.0-bin-hadoop2.4/conf/
file:/home/tom/spark-1.4.0-bin-hadoop2.4/lib/spark-assembly-1.4.0-hadoop2.4.0.jar
file:/home/tom/spark-1.4.0-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar
file:/home/tom/spark-1.4.0-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar
file:/home/tom/spark-1.4.0-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar
file:/usr/lib/jvm/java-7-oracle/jre/lib/ext/sunjce_provider.jar
file:/usr/lib/jvm/java-7-oracle/jre/lib/ext/zipfs.jar
file:/usr/lib/jvm/java-7-oracle/jre/lib/ext/localedata.jar
file:/usr/lib/jvm/java-7-oracle/jre/lib/ext/dnsns.jar
file:/usr/lib/jvm/java-7-oracle/jre/lib/ext/sunec.jar
file:/usr/lib/jvm/java-7-oracle/jre/lib/ext/sunpkcs11.jar
saving text file...
done!

In yarn-client mode:

file:/home/hadoop/simpleapp.jar
file:/usr/lib/hadoop/hadoop-auth-2.6.0-amzn-0.jar
...
*file:/usr/lib/hadoop-mapreduce/avro-1.7.4.jar*
...

And in yarn-cluster mode:
file:/mnt/yarn/usercache/hadoop/appcache/application_1441787021820_0004/container_1441787021820_0004_01_01/__app__.jar
...
*file:/usr/lib/hadoop/lib/avro-1.7.4.jar*
...
saving text file...
done!

In yarn-cluster mode it doesn't appear to have sight of the fat jar
(simpleapp), but can see avro-1.7.4, but runs fine!

Thanks,

Tom


On Wed, Sep 9, 2015 at 9:49 AM Aniket Bhatnagar <aniket.bhatna...@gmail.com>
wrote:

> Hi Tom
>
> There has to be a difference in classpaths in yarn-client and yarn-cluster
> mode. Perhaps a good starting point would be to print classpath as a first
> thing in SimpleApp.main. It should give clues around why it works in
> yarn-cluster mode.
>
> Thanks,
> Aniket
>
> On Wed, Sep 9, 2015, 2:11 PM Tom Seddon <mr.tom.sed...@gmail.com> wrote:
>
>> Hi,
>>
>> I have a problem trying to get a fairly simple app working which makes
>> use of native avro libraries.  The app runs fine on my local machine and in
>> yarn-cluster mode, but when I try to run it on EMR yarn-client mode I get
>> the error below.  I'm aware this is a version problem, as EMR runs an
>> earlier version of avro, and I am trying to use avro-1.7.7.
>>
>> What's confusing me a great deal is the fact that this runs fine in
>> yarn-cluster mode.
>>
>> What is it about yarn-cluster mode that means the application has access
>> to the correct version of the avro library?  I need to run in yarn-client
>> mode as I will be caching data to the driver machine in between batches.  I
>> think in yarn-cluster mode the driver can run on any machine in the cluster
>> so this would not work.
>>
>> Grateful for any advice as I'm really stuck on this.  AWS support are
>> trying but they don't seem to know why this is happening either!
>>
>> Just to note, I'm aware of Databricks spark-avro project and have used
>> it.  This is an investigation to see if I can use RDDs instead of
>> dataframes.
>>
>> java.lang.NoSuchMethodError:
>> org.apache.avro.Schema$Parser.parse(Ljava/lang/String;[Ljava/lang/String;)Lorg/apache/avro/Schema;
>> at ophan.thrift.event.Event.(Event.java:10)
>> at SimpleApp$.main(SimpleApp.scala:25)
>> at SimpleApp.main(SimpleApp.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at
>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
>> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>
>> Thanks,
>>
>> Tom
>>
>>
>>


50% performance decrease when using local file vs hdfs

2015-07-24 Thread Tom Hubregtsen
Hi,

When running two experiments with the same application, we see a 50%
performance difference between using HDFS and files on disk, both using the
textFile/saveAsTextFile call. Almost all performance loss is in Stage 1. 

Input (in Stage 0):
The file is read in using val input = sc.textFile(inputFile). The total
input size is 500GB. The files on disk are partitioned into 128 MB files,
HDFS is set to a block size of 128MB. When looking at the the number of
task, we see 4x more task. We have seen this before, and it seems that this
is because Spark breaks up the files in to 32MB files. This is not the case
in HDFS.

Output (in Stage 1):
The file is written using saveAsTextFile(outputFile). The total output size
is 500GB. Because we use a custom partittioner, we always have 9025 task in
this stage. This is the stage where we see most performance loss. 

Questions:
* What is the cause of the performance loss?
- Possible answers: 
Because of the block size (e.g. 128MB vs 33 MB) the write is less efficient
(more/less data being transferred at once)
or
Because of the block size we need to open 4x as many files, leading to a
performance loss
* How can we solve this? (We would like to not use HDFS)
* Bonus question: Should I use a different API to get a better performance?

Thanks for any responses!

Tom Hubregtsen



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/50-performance-decrease-when-using-local-file-vs-hdfs-tp23987.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Info from the event timeline appears to contradict dstat info

2015-07-15 Thread Tom Hubregtsen
I am trying to analyze my program, in particular to see what the bottleneck
is (IO, CPU, network), and started using the event timeline for this. 

When looking at my Job 0, Stage 0 (the sampler function taking up 5.6
minutes of my 40 minute program), I see in the even timeline that all time
is spend in Executor Computing Time. I am not quite sure what this means.
I first thought that because of this metric, I could immediately assume that
I was CPU bound, but this does not line up with my dstat log. When looking
at dstat, I see that I spend 65% in CPU wait, 17% in CPU system and only 18%
in CPU user, together with disk IO being fully utilized for the entire
duration of the stage. From this data, I would assume I am actually disk
bound.

My question based on this is: How do I interpreted the label Executor
Computing Time, and what conclusions can I make from it? 
As I do not see read input/write output as one of the 7 labels, is IO meant
to be part of the Executor Computing Time (even though shuffle IO seems to
be separate)? Can I use information from event timeline as a basis for any
conclusions on my bottleneck (IO, CPU or network)? Is network included in
any of these 7 labels?

Thanks in advance,

Tom Hubregtsen




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Info-from-the-event-timeline-appears-to-contradict-dstat-info-tp23862.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Info from the event timeline appears to contradict dstat info

2015-07-15 Thread Tom Hubregtsen
I think I found my answer at https://github.com/kayousterhout/trace-analysis:

One thing to keep in mind is that Spark does not currently include
instrumentation to measure the time spent reading input data from disk or
writing job output to disk (the `Output write wait'' shown in the waterfall
is time to write shuffle output to disk, which Spark does have
instrumentation for); as a result, the time shown asCompute' may include
time using the disk. We have a custom Hadoop branch that measures the time
Hadoop spends transferring data to/from disk, and we are hopeful that
similar timing metrics will someday be included in the Hadoop FileStatistics
API. In the meantime, it is not currently possible to understand how much of
a Spark task's time is spent reading from disk via HDFS.

That said, this might be posted as a footnote at the event timeline to avoid
confusion :)

Best regards,

Tom Hubregtsen



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Info-from-the-event-timeline-appears-to-contradict-dstat-info-tp23862p23865.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Un-persist RDD in a loop

2015-06-23 Thread Tom Hubregtsen
I believe that as you are not persisting anything into the memory space
defined by
spark.storage.memoryFraction
you also have nothing to clear from this area using the unpersist. 

FYI: The data will be kept in the OS-buffer/on disk at the point of the
reduce (as this involves a wide dependency - shuffle of the data), but you
can not clear this through the API.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Un-persist-RDD-in-a-loop-tp23414p23460.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



PartitionBy/Partitioner for dataFrames?

2015-06-21 Thread Tom Hubregtsen
Hi,

I am trying to rewrite my program to use dataFrames, and I see that I can
perform a mapPartitions and a foreachPartition, but can I perform a
partitionBy/set a partitioner? Or is there some other way to make my data
land in the right partition for *Partition to use? (I see that PartitionBy
is only available on pairRDD's, this might have something to with it..)

I am using the spark master branch. The error:
[error]
/home/th/spark-1.5.0/spark/IBM_ARL_teraSort_v4-01/src/main/scala/IBM_ARL_teraSort.scala:107:
value partitionBy is not a member of org.apache.spark.sql.DataFrame

Thanks,

Tom Hubregtsen





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/PartitionBy-Partitioner-for-dataFrames-tp23420.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



DataFrames for non-SQL computation?

2015-06-11 Thread Tom Hubregtsen
I've looked a bit into what DataFrames are, and it seems that most posts on
the subject are related to SQL, but it does seem to be very efficient. My
main questions is: Are DataFrames also beneficial for non-SQL computations? 

For instance I want to:
- sort k/v pairs (in particular, is the naive versus efficient
- perform some arbitrary map-reduce instructions

I am wondering this, as I read about the *naive vs cache aware layout*, and
also read the following on the databricks blog:
The first pieces will land in Spark 1.4, which includes explicitly managed
memory for aggregation operations *in Spark’s DataFrame API* as well as
customized serializers. Expanded coverage of binary memory management and
cache-aware data structures will appear in Spark 1.5.
This leads me to believe that the cache aware layout that also seems
beneficial for regular computation/sort is (currently?) only implemented in
dataFrames (?) and makes me wonder if I then should just use dataFrames in
my regular computation.

Thanks in advance,

Tom

P.S. currently using the master branch from the gitHub



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/DataFrames-for-non-SQL-computation-tp23281.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SparkSQL DF.explode with Nulls

2015-06-05 Thread Tom Seddon
I figured it out.  Needed a block style map and a check for null.  The case
class is just to name the transformed columns.

case class Component(name: String, loadTimeMs: Long)
avroFile.filter($lazyComponents.components.isNotNull)
  .explode($lazyComponents.components)
{ case Row(lazyComponents: Seq[Row]) = lazyComponents
  .map { x = val name = x.getString(0);
  val loadTimeMs = if (x.isNullAt(1)) 0 else x.getLong(1);
  Component(name, loadTimeMs) } }
  .select('pageViewId, 'name, 'loadTimeMs).take(20).foreach(println)

On Thu, Jun 4, 2015 at 12:05 PM Tom Seddon mr.tom.sed...@gmail.com wrote:

 Hi,

 I've worked out how to use explode on my input avro dataset with the
 following structure
 root
  |-- pageViewId: string (nullable = false)
  |-- components: array (nullable = true)
  ||-- element: struct (containsNull = false)
  |||-- name: string (nullable = false)
  |||-- loadTimeMs: long (nullable = true)


 I'm trying to turn this into this layout with repeated pageViewIds for
 each row of my components:
 root
  |-- pageViewId: string (nullable = false)
  |-- name: string (nullable = false)
  |-- loadTimeMs: long (nullable = true)

 Explode words fine for the first 10 records using this bit of code, but my
 big problem is that loadTimeMs has nulls in it, which I think is causing
 the error.  Any ideas how I can trap those nulls?  Perhaps by converting to
 zeros and then I can deal with them later?  I tried writing a udf which
 just takes the loadTimeMs column and swaps nulls for zeros, but this
 separates the struct and then I don't know how to use explode.

 avroFile.filter($lazyComponents.components.isNotNull)
 .explode($lazyComponents.components)
 { case Row(lazyComponents: Seq[Row]) = lazyComponents
 .map(x = x.getString(0) - x.getLong(1))}
 .select('pageViewId, '_1, '_2)
 .take(10).foreach(println)

 15/06/04 12:01:21 ERROR Executor: Exception in task 0.0 in stage 19.0 (TID
 65)
 java.lang.RuntimeException: Failed to check null bit for primitive long
 value.
 at scala.sys.package$.error(package.scala:27)
 at
 org.apache.spark.sql.catalyst.expressions.GenericRow.getLong(rows.scala:87)
 at
 $line127.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1$$anonfun$apply$1.apply(console:33)
 at
 $line127.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1$$anonfun$apply$1.apply(console:33)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
 at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.AbstractTraversable.map(Traversable.scala:105)
 at
 $line127.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:33)
 at
 $line127.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:33)
 at scala.Function1$$anonfun$andThen$1.apply(Function1.scala:55)
 at
 org.apache.spark.sql.catalyst.expressions.UserDefinedGenerator.eval(generators.scala:89)
 at
 org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:71)
 at
 org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:70)
 at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
 at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
 at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
 at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
 at scala.collection.AbstractIterator.to(Iterator.scala:1157)
 at
 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
 at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
 at
 scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
 at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
 at
 org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:122)
 at
 org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:122)
 at
 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
 at
 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61

SparkSQL DF.explode with Nulls

2015-06-04 Thread Tom Seddon
Hi,

I've worked out how to use explode on my input avro dataset with the
following structure
root
 |-- pageViewId: string (nullable = false)
 |-- components: array (nullable = true)
 ||-- element: struct (containsNull = false)
 |||-- name: string (nullable = false)
 |||-- loadTimeMs: long (nullable = true)


I'm trying to turn this into this layout with repeated pageViewIds for each
row of my components:
root
 |-- pageViewId: string (nullable = false)
 |-- name: string (nullable = false)
 |-- loadTimeMs: long (nullable = true)

Explode words fine for the first 10 records using this bit of code, but my
big problem is that loadTimeMs has nulls in it, which I think is causing
the error.  Any ideas how I can trap those nulls?  Perhaps by converting to
zeros and then I can deal with them later?  I tried writing a udf which
just takes the loadTimeMs column and swaps nulls for zeros, but this
separates the struct and then I don't know how to use explode.

avroFile.filter($lazyComponents.components.isNotNull)
.explode($lazyComponents.components)
{ case Row(lazyComponents: Seq[Row]) = lazyComponents
.map(x = x.getString(0) - x.getLong(1))}
.select('pageViewId, '_1, '_2)
.take(10).foreach(println)

15/06/04 12:01:21 ERROR Executor: Exception in task 0.0 in stage 19.0 (TID
65)
java.lang.RuntimeException: Failed to check null bit for primitive long
value.
at scala.sys.package$.error(package.scala:27)
at
org.apache.spark.sql.catalyst.expressions.GenericRow.getLong(rows.scala:87)
at
$line127.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1$$anonfun$apply$1.apply(console:33)
at
$line127.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1$$anonfun$apply$1.apply(console:33)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at
$line127.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:33)
at
$line127.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:33)
at scala.Function1$$anonfun$andThen$1.apply(Function1.scala:55)
at
org.apache.spark.sql.catalyst.expressions.UserDefinedGenerator.eval(generators.scala:89)
at
org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:71)
at
org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:70)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:122)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:122)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)


Re: Extra stage that executes before triggering computation with an action

2015-04-29 Thread Tom Hubregtsen
Thanks for the responses.

Try removing toDebugString and see what happens. 

The toDebugString is performed after [d] (the action), as [e]. By then all
stages are already executed.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Extra-stage-that-executes-before-triggering-computation-with-an-action-tp22707p22712.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Extra stage that executes before triggering computation with an action

2015-04-29 Thread Tom Hubregtsen
Hi, I am trying to see exactly what happens underneath the hood of Spar when
performing a simple sortByKey. So far I've already discovered the
fetch-files and both the temp-shuffle and shuffle files being written to
disk, but there is still an extra stage that keeps on puzzling me.

This is the code that I execute 1 by 1 in the spark-shell (I will refer to
them as [letter])
val input = sc.textFile(path, 2) [a]
val inputRDD = input.map(some lamdba to create key_value) [b]
val result = inputRDD.sortByKey(true, 2) [c]
result.saveAsTextFile [d]

As there is only one shuffle, I expect to see two stages. This is confirmed
by result.toDebugString:
(2) ShuffledRDD[5] at sortByKey at console:25 [] - [c]
 +-(2) MapPartitionsRDD[2] at map at console:23 [] - [b]
|  ./Sort/input-10-records-2-parts/ MapPartitionsRDD[1] at textFile at
console:21 [] - [a] 
|  ./Sort/input-10-records-2-parts/ HadoopRDD[0] at textFile at
console:21 [] - [a]
As there is one indentation, there should be 2 stages. There is an extra RDD
(MapPartitionsRDD[6]) that is created by [d], but is not a parent of my
result RDD, so not listed in this trace.

Now when I run these commands 1 by 1 in the spark-shell, I see the following
execution:
[a]
[b]
[c] (//no action performed yet)
INFO SparkContext: Starting job: sortByKey at console:25
INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[4] at
sortByKey at console:25), which has no missing parents
INFO DAGScheduler: ResultStage 0 (sortByKey at console:25) finished in
0.109 s
INFO DAGScheduler: Job 0 finished: sortByKey at console:25,
[d] (// Here I trigger the computation with an actual action)
INFO SparkContext: Starting job: saveAsTextFile at console:28
INFO DAGScheduler: Submitting ShuffleMapStage 1 (MapPartitionsRDD[2] at map
at console:23), which has no missing parents
INFO DAGScheduler: ShuffleMapStage 1 (map at console:23) finished
INFO DAGScheduler: Submitting ResultStage 2 (MapPartitionsRDD[6] at
saveAsTextFile at console:28)
INFO DAGScheduler: ResultStage 2 (saveAsTextFile at console:28) finished
INFO DAGScheduler: Job 1 finished: saveAsTextFile at console:28

Job 1 with stage 1 and 2 seem logical for me, it is computing everything
before and after the shuffle (wide dependency) respectively. Now what I find
interesting and puzzling, is Job 0 with stage 0. It executes and finishes
before I perform an action (in [d]), and with larger input set can also take
a noticeable time. Does anybody have any idea what is running in this
Job/stage 0?  

Thanks,

Tom Hubregtsen




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Extra-stage-that-executes-before-triggering-computation-with-an-action-tp22707.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Extra stage that executes before triggering computation with an action

2015-04-29 Thread Tom Hubregtsen
I'm not sure, but I wonder if because you are using the Spark REPL that it
may not be representing what a normal runtime execution would look like and
is possibly eagerly running a partial DAG once you define an operation that
would cause a shuffle.

What happens if you setup your same set of commands [a-e] in a file and use
the Spark REPL's `load` or `paste` command to load them all at once? From
Richard

I have also packaged it in a jar file (without [e], the debug string), and
still see the extra stage before the other two that I would expect. Even
when I remove [d], the action, I still see stage 0 being executed (and do
not see stage 1 and 2). 

Again a shortened log of the Stage 0:
INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[4] at
sortByKey, which has no missing parents
INFO DAGScheduler: ResultStage 0 (sortByKey) finished in 0.192 s




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Extra-stage-that-executes-before-triggering-computation-with-an-action-tp22707p22713.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark TeraSort source request

2015-04-13 Thread Tom Hubregtsen
Thank you for your response Ewan. I quickly looked yesterday and it was
there, but today at work I tried to open it again to start working on it,
but it appears to be removed. Is this correct?

Thanks,

Tom

On 12 April 2015 at 06:58, Ewan Higgs ewan.hi...@ugent.be wrote:

  Hi all.
 The code is linked from my repo:

 https://github.com/ehiggs/spark-terasort
 
 This is an example Spark program for running TeraSort benchmarks. It is
 based on work from Reynold Xin's branch
 https://github.com/rxin/spark/tree/terasort, but it is not the same
 TeraSort program that currently holds the record
 http://sortbenchmark.org/. That program is here
 https://github.com/rxin/spark/tree/sort-benchmark/core/src/main/scala/org/apache/spark/sort
 .
 

 That program is here links to:

 https://github.com/rxin/spark/tree/sort-benchmark/core/src/main/scala/org/apache/spark/sort

 I've been working on other projects at the moment so I haven't returned to
 the spark-terasort stuff. If you have any pull requests, I would be very
 grateful.

 Yours,
 Ewan


 On 08/04/15 03:26, Pramod Biligiri wrote:

 +1. I would love to have the code for this as well.

  Pramod

 On Fri, Apr 3, 2015 at 12:47 PM, Tom thubregt...@gmail.com wrote:

 Hi all,

 As we all know, Spark has set the record for sorting data, as published
 on:
 https://databricks.com/blog/2014/10/10/spark-petabyte-sort.html.

 Here at our group, we would love to verify these results, and compare
 machine using this benchmark. We've spend quite some time trying to find
 the
 terasort source code that was used, but can not find it anywhere.

 We did find two candidates:

 A version posted by Reynold [1], the posted of the message above. This
 version is stuck at // TODO: Add partition-local (external) sorting
 using TeraSortRecordOrdering, only generating data.

 Here, Ewan noticed that it didn't appear to be similar to Hadoop
 TeraSort.
 [2] After this he created a version on his own [3]. With this version, we
 noticed problems with TeraValidate with datasets above ~10G (as mentioned
 by
 others at [4]. When examining the raw input and output files, it actually
 appears that the input data is sorted and the output data unsorted in both
 cases.

 Because of this, we believe we did not yet find the actual used source
 code.
 I've tried to search in the Spark User forum archive's, seeing request of
 people, indicating a demand, but did not succeed in finding the actual
 source code.

 My question:
 Could you guys please make the source code of the used TeraSort program,
 preferably with settings, available? If not, what are the reasons that
 this
 seems to be withheld?

 Thanks for any help,

 Tom Hubregtsen

 [1]

 https://github.com/rxin/spark/commit/adcae69145905162fa3b6932f70be2c932f95f87
 [2]

 http://mail-archives.apache.org/mod_mbox/spark-dev/201411.mbox/%3c5462092c.1060...@ugent.be%3E
 [3] https://github.com/ehiggs/spark-terasort
 [4]

 http://mail-archives.apache.org/mod_mbox/spark-user/201501.mbox/%3CCAPszQwgap4o1inZkTwcwV=7scwoqtr5yxfnsqo5p2kgp1bn...@mail.gmail.com%3E



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-TeraSort-source-request-tp22371.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






sortByKey with multiple partitions

2015-04-08 Thread Tom
Hi,

If I perform a sortByKey(true, 2).saveAsTextFile(filename) on a cluster,
will the data be sorted per partition, or in total. (And is this
guaranteed?)

Example:
Input 4,2,3,6,5,7

Sorted per partition:
part-0: 2,3,7
part-1: 4,5,6

Sorted in total:
part-0: 2,3,4 
part-1: 5,6,7

Thanks,

Tom

P.S. (I know that the data might not end up being uniformly distributed,
example: 4 elements in part-0 and 2 in part-1)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/sortByKey-with-multiple-partitions-tp22426.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark TeraSort source request

2015-04-03 Thread Tom
Hi all,

As we all know, Spark has set the record for sorting data, as published on:
https://databricks.com/blog/2014/10/10/spark-petabyte-sort.html.

Here at our group, we would love to verify these results, and compare
machine using this benchmark. We've spend quite some time trying to find the
terasort source code that was used, but can not find it anywhere.

We did find two candidates: 

A version posted by Reynold [1], the posted of the message above. This
version is stuck at // TODO: Add partition-local (external) sorting
using TeraSortRecordOrdering, only generating data. 

Here, Ewan noticed that it didn't appear to be similar to Hadoop TeraSort.
[2] After this he created a version on his own [3]. With this version, we
noticed problems with TeraValidate with datasets above ~10G (as mentioned by
others at [4]. When examining the raw input and output files, it actually
appears that the input data is sorted and the output data unsorted in both
cases. 

Because of this, we believe we did not yet find the actual used source code.
I've tried to search in the Spark User forum archive's, seeing request of
people, indicating a demand, but did not succeed in finding the actual
source code. 

My question:
Could you guys please make the source code of the used TeraSort program,
preferably with settings, available? If not, what are the reasons that this
seems to be withheld?

Thanks for any help,

Tom Hubregtsen 

[1]
https://github.com/rxin/spark/commit/adcae69145905162fa3b6932f70be2c932f95f87
[2]
http://mail-archives.apache.org/mod_mbox/spark-dev/201411.mbox/%3c5462092c.1060...@ugent.be%3E
[3] https://github.com/ehiggs/spark-terasort
[4]
http://mail-archives.apache.org/mod_mbox/spark-user/201501.mbox/%3CCAPszQwgap4o1inZkTwcwV=7scwoqtr5yxfnsqo5p2kgp1bn...@mail.gmail.com%3E



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-TeraSort-source-request-tp22371.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Did anybody run Spark-perf on powerpc?

2015-03-31 Thread Tom
We verified it runs on x86, and are now trying to run it on powerPC. We
currently run into dependency trouble with sbt. I tried installing sbt by
hand and resolving all dependencies by hand, but must have made an error, as
I still get errors.

Original error:
Getting org.scala-sbt sbt 0.13.6 ...

:: problems summary ::
 WARNINGS
module not found: org.scala-sbt#sbt;0.13.6

 local: tried

  /home/th/.ivy2/local/org.scala-sbt/sbt/0.13.6/ivys/ivy.xml

  -- artifact org.scala-sbt#sbt;0.13.6!sbt.jar:

  /home/th/.ivy2/local/org.scala-sbt/sbt/0.13.6/jars/sbt.jar

 typesafe-ivy-releases: tried

 
https://repo.typesafe.com/typesafe/ivy-releases/org.scala-sbt/sbt/0.13.6/ivys/ivy.xml

 Maven Central: tried

  https://repo1.maven.org/maven2/org/scala-sbt/sbt/0.13.6/sbt-0.13.6.pom

  -- artifact org.scala-sbt#sbt;0.13.6!sbt.jar:

  https://repo1.maven.org/maven2/org/scala-sbt/sbt/0.13.6/sbt-0.13.6.jar

::

::  UNRESOLVED DEPENDENCIES ::

::

:: org.scala-sbt#sbt;0.13.6: not found

::


 ERRORS
Server access Error: Received fatal alert: decrypt_error
url=https://repo.typesafe.com/typesafe/ivy-releases/org.scala-sbt/sbt/0.13.6/ivys/ivy.xml

Server access Error: com.ibm.jsse2.util.h: PKIX path validation failed:
java.security.cert.CertPathValidatorException: The certificate issued by
CN=DigiCert Global Root CA, OU=www.digicert.com, O=DigiCert Inc, C=US is not
trusted; internal cause is: 
java.security.cert.CertPathValidatorException: Certificate chaining 
error
url=https://repo1.maven.org/maven2/org/scala-sbt/sbt/0.13.6/sbt-0.13.6.pom

Server access Error: com.ibm.jsse2.util.h: PKIX path validation failed:
java.security.cert.CertPathValidatorException: The certificate issued by
CN=DigiCert Global Root CA, OU=www.digicert.com, O=DigiCert Inc, C=US is not
trusted; internal cause is: 
java.security.cert.CertPathValidatorException: Certificate chaining 
error
url=https://repo1.maven.org/maven2/org/scala-sbt/sbt/0.13.6/sbt-0.13.6.jar




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Did-anybody-run-Spark-perf-on-powerpc-tp22329.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark-events does not exist error, while it does with all the req. rights

2015-03-30 Thread Tom Hubregtsen
Updated spark-defaults and spark-env:
Log directory /home/hduser/spark/spark-events does not exist.
(Also, in the default /tmp/spark-events it also did not work)

On 30 March 2015 at 18:03, Marcelo Vanzin van...@cloudera.com wrote:

 Are those config values in spark-defaults.conf? I don't think you can
 use ~ there - IIRC it does not do any kind of variable expansion.

 On Mon, Mar 30, 2015 at 3:50 PM, Tom thubregt...@gmail.com wrote:
  I have set
  spark.eventLog.enabled true
  as I try to preserve log files. When I run, I get
  Log directory /tmp/spark-events does not exist.
  I set
  spark.local.dir ~/spark
  spark.eventLog.dir ~/spark/spark-events
  and
  SPARK_LOCAL_DIRS=~/spark
  Now I get:
  Log directory ~/spark/spark-events does not exist.
  I am running spark as hduser, which I also use on the cmd(as verified
 in
  the stdout Set(hduser); users with modify permissions: Set(hduser)). I
 am
  able to cd into this directory. I can also create, view and delete files
 in
  this directory, logged in as hduser. I checked the folder, it is owned by
  hduser. I even performed chmod 777, but Spark keeps on crashing when I
 run
  with spark.eventLog.enabled. It works without. Any hints?
 
  Thanks,
 
  Tom
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-events-does-not-exist-error-while-it-does-with-all-the-req-rights-tp22308.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 



 --
 Marcelo



Spark-events does not exist error, while it does with all the req. rights

2015-03-30 Thread Tom
I have set 
spark.eventLog.enabled true
as I try to preserve log files. When I run, I get 
Log directory /tmp/spark-events does not exist.
I set 
spark.local.dir ~/spark
spark.eventLog.dir ~/spark/spark-events
and
SPARK_LOCAL_DIRS=~/spark
Now I get:
Log directory ~/spark/spark-events does not exist.
I am running spark as hduser, which I also use on the cmd(as verified in
the stdout Set(hduser); users with modify permissions: Set(hduser)). I am
able to cd into this directory. I can also create, view and delete files in
this directory, logged in as hduser. I checked the folder, it is owned by
hduser. I even performed chmod 777, but Spark keeps on crashing when I run
with spark.eventLog.enabled. It works without. Any hints?

Thanks,

Tom



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-events-does-not-exist-error-while-it-does-with-all-the-req-rights-tp22308.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark-events does not exist error, while it does with all the req. rights

2015-03-30 Thread Tom Hubregtsen
I run Spark in local mode.

Command line (added some debug info):
hduser@hadoop7:~/spark-terasort$ ./bin/run-example SparkPi 10
Jar:
/home/hduser/spark-terasort/examples/target/scala-2.10/spark-examples-1.3.0-SNAPSHOT-hadoop2.4.0.jar
/home/hduser/spark-terasort/bin/spark-submit --master local[*] --class
org.apache.spark.examples.SparkPi
/home/hduser/spark-terasort/examples/target/scala-2.10/spark-examples-1.3.0-SNAPSHOT-hadoop2.4.0.jar
10
15/03/30 17:37:28 INFO spark.SparkContext: Running Spark version
1.3.0-SNAPSHOT
15/03/30 17:37:28 WARN spark.SparkConf: In Spark 1.0 and later
spark.local.dir will be overridden by the value set by the cluster manager
(via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN).
15/03/30 17:37:28 INFO spark.SparkContext: Spark configuration:
spark.app.name=Spark Pi
spark.default.parallelism=8
spark.driver.extraJavaOptions=-Dos.arch=ppc64le
spark.driver.memory=4G
spark.eventLog.dir=/home/hduser/spark/spark-events
spark.eventLog.enabled=true
spark.executor.extraJavaOptions=-Dos.arch=ppc64le
spark.executor.memory=32G
spark.jars=file:/home/hduser/spark-terasort/examples/target/scala-2.10/spark-examples-1.3.0-SNAPSHOT-hadoop2.4.0.jar
spark.local.dir=/home/hduser/spark
spark.logConf=true
spark.master=local[*]

Stack trace:
15/03/30 17:37:30 INFO storage.BlockManagerMaster: Registered BlockManager
Exception in thread main java.lang.IllegalArgumentException: Log
directory ~/spark/spark-events does not exist.
at
org.apache.spark.scheduler.EventLoggingListener.start(EventLoggingListener.scala:90)
at org.apache.spark.SparkContext.init(SparkContext.scala:363)
at org.apache.spark.examples.SparkPi$.main(SparkPi.scala:28)
at org.apache.spark.examples.SparkPi.main(SparkPi.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:95)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55)
at java.lang.reflect.Method.invoke(Method.java:495)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:365)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)



On 30 March 2015 at 18:28, Marcelo Vanzin van...@cloudera.com wrote:

 Are you running Spark in cluster mode by any chance?

 (It always helps to show the command line you're actually running, and
 if there's an exception, the first few frames of the stack trace.)

 On Mon, Mar 30, 2015 at 4:11 PM, Tom Hubregtsen thubregt...@gmail.com
 wrote:
  Updated spark-defaults and spark-env:
  Log directory /home/hduser/spark/spark-events does not exist.
  (Also, in the default /tmp/spark-events it also did not work)
 
  On 30 March 2015 at 18:03, Marcelo Vanzin van...@cloudera.com wrote:
 
  Are those config values in spark-defaults.conf? I don't think you can
  use ~ there - IIRC it does not do any kind of variable expansion.
 
  On Mon, Mar 30, 2015 at 3:50 PM, Tom thubregt...@gmail.com wrote:
   I have set
   spark.eventLog.enabled true
   as I try to preserve log files. When I run, I get
   Log directory /tmp/spark-events does not exist.
   I set
   spark.local.dir ~/spark
   spark.eventLog.dir ~/spark/spark-events
   and
   SPARK_LOCAL_DIRS=~/spark
   Now I get:
   Log directory ~/spark/spark-events does not exist.
   I am running spark as hduser, which I also use on the cmd(as
 verified
   in
   the stdout Set(hduser); users with modify permissions:
 Set(hduser)). I
   am
   able to cd into this directory. I can also create, view and delete
 files
   in
   this directory, logged in as hduser. I checked the folder, it is owned
   by
   hduser. I even performed chmod 777, but Spark keeps on crashing when I
   run
   with spark.eventLog.enabled. It works without. Any hints?
  
   Thanks,
  
   Tom
  
  
  
   --
   View this message in context:
  
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-events-does-not-exist-error-while-it-does-with-all-the-req-rights-tp22308.html
   Sent from the Apache Spark User List mailing list archive at
 Nabble.com.
  
   -
   To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
   For additional commands, e-mail: user-h...@spark.apache.org
  
 
 
 
  --
  Marcelo
 
 



 --
 Marcelo



Re: Spark-events does not exist error, while it does with all the req. rights

2015-03-30 Thread Tom Hubregtsen
The stack trace for the first scenario and your suggested improvement is
similar, with as only difference the first line (Sorry for not including
this):
Log directory /home/hduser/spark/spark-events does not exist.

To verify your premises, I cd'ed into the directory by copy pasting the
path listed in the error message (i, ii), created a text file, closed it an
viewed it, and deleted it (iii). My findings were reconfirmed by my
colleague. Any other ideas?

Thanks,

Tom


On 30 March 2015 at 19:19, Marcelo Vanzin van...@cloudera.com wrote:

 So, the error below is still showing the invalid configuration.

 You mentioned in the other e-mails that you also changed the
 configuration, and that the directory really, really exists. Given the
 exception below, the only ways you'd get the error with a valid
 configuration would be if (i) the directory didn't exist, (ii) it
 existed but the user could not navigate to it or (iii) it existed but
 was not actually a directory.

 So please double-check all that.

 On Mon, Mar 30, 2015 at 5:11 PM, Tom Hubregtsen thubregt...@gmail.com
 wrote:
  Stack trace:
  15/03/30 17:37:30 INFO storage.BlockManagerMaster: Registered
 BlockManager
  Exception in thread main java.lang.IllegalArgumentException: Log
 directory
  ~/spark/spark-events does not exist.


 --
 Marcelo



Re: saveAsTable with path not working as expected (pyspark + Scala)

2015-03-27 Thread Tom Walwyn
We can set a path, refer to the unit tests. For example:
df.saveAsTable(savedJsonTable, org.apache.spark.sql.json, append, path
=tmpPath)
https://github.com/apache/spark/blob/master/python/pyspark/sql/tests.py

Investigating some more, I found that the table is being created at the
specified location, but the error is still being thrown, and the table has
not been stored. This is the code that I ran:

 a = [Row(key=k, value=str(k)) for k in range(100)]
 df =  sc.parallelize(a).toDF()
 df.saveAsTable(savedJsonTable, org.apache.spark.sql.json, append,
path=/tmp/test10)
15/03/27 10:45:13 ERROR RetryingHMSHandler:
MetaException(message:file:/user/hive/warehouse/savedjsontable is not a
directory or unable to create one)
at
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_core(HiveMetaStore.java:1239)
at
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_with_environment_context(HiveMetaStore.java:1294)
...
 sqlCtx.tables()
DataFrame[tableName: string, isTemporary: boolean]
 exit()
~ cat /tmp/test10/part-0
{key:0,value:0}
{key:1,value:1}
{key:2,value:2}
{key:3,value:3}
{key:4,value:4}
{key:5,value:5}

Kind Regards,
Tom







On 27 March 2015 at 10:33, Yanbo Liang yblia...@gmail.com wrote:

 saveAsTable will use the default data source configured by
 spark.sql.sources.default.

 def saveAsTable(tableName: String): Unit = {
 saveAsTable(tableName, SaveMode.ErrorIfExists)
   }

 It can not set path if I understand correct.

 2015-03-27 15:45 GMT+08:00 Tom Walwyn twal...@gmail.com:

 Hi,

 The behaviour is the same for me in Scala and Python, so posting here in
 Python. When I use DataFrame.saveAsTable with the path option, I expect an
 external Hive table to be created at the specified path. Specifically, when
 I call:

   df.saveAsTable(..., path=/tmp/test)

 I expect an external Hive table to be created pointing to /tmp/test which
 would contain the data in df.

 However, running locally on my Mac, I get an error indicating that Spark
 tried to create a managed table in the location of the Hive warehouse:

 ERROR RetryingHMSHandler:
 MetaException(message:file:/user/hive/warehouse/savetable is not a
 directory or unable to create one)
 at
 org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_core(HiveMetaStore.java:1239)
 at
 org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_with_environment_context(HiveMetaStore.java:1294)

 Am I wrong to expect that Spark create an external table in this case?
 What is the expected behaviour of saveAsTable with the path option?

 Setup: running spark locally with spark 1.3.0.

 Kind Regards,
 Tom





saveAsTable with path not working as expected (pyspark + Scala)

2015-03-27 Thread Tom Walwyn
Hi,

The behaviour is the same for me in Scala and Python, so posting here in
Python. When I use DataFrame.saveAsTable with the path option, I expect an
external Hive table to be created at the specified path. Specifically, when
I call:

  df.saveAsTable(..., path=/tmp/test)

I expect an external Hive table to be created pointing to /tmp/test which
would contain the data in df.

However, running locally on my Mac, I get an error indicating that Spark
tried to create a managed table in the location of the Hive warehouse:

ERROR RetryingHMSHandler:
MetaException(message:file:/user/hive/warehouse/savetable is not a
directory or unable to create one)
at
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_core(HiveMetaStore.java:1239)
at
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_with_environment_context(HiveMetaStore.java:1294)

Am I wrong to expect that Spark create an external table in this case? What
is the expected behaviour of saveAsTable with the path option?

Setup: running spark locally with spark 1.3.0.

Kind Regards,
Tom


Re: saveAsTable with path not working as expected (pyspark + Scala)

2015-03-27 Thread Tom Walwyn
Another follow-up: saveAsTable works as expected when running on hadoop
cluster with Hive installed. It's just locally that I'm getting this
strange behaviour. Any ideas why this is happening?

Kind Regards.
Tom

On 27 March 2015 at 11:29, Tom Walwyn twal...@gmail.com wrote:

 We can set a path, refer to the unit tests. For example:
 df.saveAsTable(savedJsonTable, org.apache.spark.sql.json, append,
 path=tmpPath)
 https://github.com/apache/spark/blob/master/python/pyspark/sql/tests.py

 Investigating some more, I found that the table is being created at the
 specified location, but the error is still being thrown, and the table has
 not been stored. This is the code that I ran:

  a = [Row(key=k, value=str(k)) for k in range(100)]
  df =  sc.parallelize(a).toDF()
  df.saveAsTable(savedJsonTable, org.apache.spark.sql.json,
 append, path=/tmp/test10)
 15/03/27 10:45:13 ERROR RetryingHMSHandler:
 MetaException(message:file:/user/hive/warehouse/savedjsontable is not a
 directory or unable to create one)
 at
 org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_core(HiveMetaStore.java:1239)
 at
 org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_with_environment_context(HiveMetaStore.java:1294)
 ...
  sqlCtx.tables()
 DataFrame[tableName: string, isTemporary: boolean]
  exit()
 ~ cat /tmp/test10/part-0
 {key:0,value:0}
 {key:1,value:1}
 {key:2,value:2}
 {key:3,value:3}
 {key:4,value:4}
 {key:5,value:5}

 Kind Regards,
 Tom







 On 27 March 2015 at 10:33, Yanbo Liang yblia...@gmail.com wrote:

 saveAsTable will use the default data source configured by
 spark.sql.sources.default.

 def saveAsTable(tableName: String): Unit = {
 saveAsTable(tableName, SaveMode.ErrorIfExists)
   }

 It can not set path if I understand correct.

 2015-03-27 15:45 GMT+08:00 Tom Walwyn twal...@gmail.com:

 Hi,

 The behaviour is the same for me in Scala and Python, so posting here in
 Python. When I use DataFrame.saveAsTable with the path option, I expect an
 external Hive table to be created at the specified path. Specifically, when
 I call:

   df.saveAsTable(..., path=/tmp/test)

 I expect an external Hive table to be created pointing to /tmp/test
 which would contain the data in df.

 However, running locally on my Mac, I get an error indicating that Spark
 tried to create a managed table in the location of the Hive warehouse:

 ERROR RetryingHMSHandler:
 MetaException(message:file:/user/hive/warehouse/savetable is not a
 directory or unable to create one)
 at
 org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_core(HiveMetaStore.java:1239)
 at
 org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_with_environment_context(HiveMetaStore.java:1294)

 Am I wrong to expect that Spark create an external table in this case?
 What is the expected behaviour of saveAsTable with the path option?

 Setup: running spark locally with spark 1.3.0.

 Kind Regards,
 Tom






Which strategy is used for broadcast variables?

2015-03-11 Thread Tom
In Performance and Scalability of Broadcast in Spark by Mosharaf Chowdhury
I read that Spark uses HDFS for its broadcast variables. This seems highly
inefficient. In the same paper alternatives are proposed, among which
Bittorent Broadcast (BTB). While studying Learning Spark, page 105,
second paragraph about Broadcast Variables, I read  The value is sent to
each node only once, using an efficient, BitTorrent-like communication
mechanism. 

- Is the book talking about the proposed BTB from the paper? 

- Is this currently the default? 

- If not, what is?

Thanks,

Tom



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Which-strategy-is-used-for-broadcast-variables-tp22004.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Which strategy is used for broadcast variables?

2015-03-11 Thread Tom Hubregtsen
Those results look very good for the larger workloads (100MB and 1GB). Were
you also able to run experiments for smaller amounts of data? For instance
broadcasting a single variable to the entire cluster? In the paper you
state that HDFS-based mechanisms performed well only for small amounts of
data. Do you have an approximation for the trade-off point when HDFS-based
becomes more favorable, and BitTorrent-like performs worse? I also read
that the minimum size transmitted using a broadcast variable is 4MB. Maybe
I should look for a different way of sharing this constant?

Use case: I am looking for the most efficient way to perform a
transformation involving a constant (of which the value is determined at
runtime) for a large input file.

Scala example:
var constant1 = sc.broadcast(2) // The actual value, 2 in this case, would
be a result from a different function, generated during runtime
val result = input.map(x = x + constant1.value)

On 11 March 2015 at 21:13, Mosharaf Chowdhury mosharafka...@gmail.com
wrote:

 The current broadcast algorithm in Spark approximates the one described
 in the Section 5 of this paper
 http://www.mosharaf.com/wp-content/uploads/orchestra-sigcomm11.pdf.
 It is expected to scale sub-linearly; i.e., O(log N), where N is the
 number of machines in your cluster.
 We evaluated up to 100 machines, and it does follow O(log N) scaling.

 --
 Mosharaf Chowdhury
 http://www.mosharaf.com/

 On Wed, Mar 11, 2015 at 3:11 PM, Tom Hubregtsen thubregt...@gmail.com
 wrote:

 Thanks Mosharaf, for the quick response! Can you maybe give me some
 pointers to an explanation of this strategy? Or elaborate a bit more on it?
 Which parts are involved in which way? Where are the time penalties and how
 scalable is this implementation?

 Thanks again,

 Tom

 On 11 March 2015 at 16:01, Mosharaf Chowdhury mosharafka...@gmail.com
 wrote:

 Hi Tom,

 That's an outdated document from 4/5 years ago.

 Spark currently uses a BitTorrent like mechanism that's been tuned for
 datacenter environments.

 Mosharaf
 --
 From: Tom thubregt...@gmail.com
 Sent: ‎3/‎11/‎2015 4:58 PM
 To: user@spark.apache.org
 Subject: Which strategy is used for broadcast variables?

 In Performance and Scalability of Broadcast in Spark by Mosharaf
 Chowdhury
 I read that Spark uses HDFS for its broadcast variables. This seems
 highly
 inefficient. In the same paper alternatives are proposed, among which
 Bittorent Broadcast (BTB). While studying Learning Spark, page 105,
 second paragraph about Broadcast Variables, I read  The value is sent to
 each node only once, using an efficient, BitTorrent-like communication
 mechanism.

 - Is the book talking about the proposed BTB from the paper?

 - Is this currently the default?

 - If not, what is?

 Thanks,

 Tom



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Which-strategy-is-used-for-broadcast-variables-tp22004.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: Which strategy is used for broadcast variables?

2015-03-11 Thread Tom Hubregtsen
Thanks Mosharaf, for the quick response! Can you maybe give me some
pointers to an explanation of this strategy? Or elaborate a bit more on it?
Which parts are involved in which way? Where are the time penalties and how
scalable is this implementation?

Thanks again,

Tom

On 11 March 2015 at 16:01, Mosharaf Chowdhury mosharafka...@gmail.com
wrote:

 Hi Tom,

 That's an outdated document from 4/5 years ago.

 Spark currently uses a BitTorrent like mechanism that's been tuned for
 datacenter environments.

 Mosharaf
 --
 From: Tom thubregt...@gmail.com
 Sent: ‎3/‎11/‎2015 4:58 PM
 To: user@spark.apache.org
 Subject: Which strategy is used for broadcast variables?

 In Performance and Scalability of Broadcast in Spark by Mosharaf
 Chowdhury
 I read that Spark uses HDFS for its broadcast variables. This seems highly
 inefficient. In the same paper alternatives are proposed, among which
 Bittorent Broadcast (BTB). While studying Learning Spark, page 105,
 second paragraph about Broadcast Variables, I read  The value is sent to
 each node only once, using an efficient, BitTorrent-like communication
 mechanism.

 - Is the book talking about the proposed BTB from the paper?

 - Is this currently the default?

 - If not, what is?

 Thanks,

 Tom



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Which-strategy-is-used-for-broadcast-variables-tp22004.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Error when running the terasort branche in a cluster

2015-02-25 Thread Tom
Not sure if this is the place to ask, but i am using the terasort branche of
Spark for benchmarking, as found on
https://github.com/ehiggs/spark/tree/terasort, and I get the error below
when running on two machines (one machine works just fine). When looking at
the code, listed below the error message, I see 
while (read  TeraInputFormat.RECORD_LEN) {
- Is it possible that this restricts the branch from running on a cluster? 
- Did anybody manage to run this branch on a cluster? 

Thanks,

Tom

15/02/25 17:55:42 WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1,
arlab152): org.apache.hadoop.fs.ChecksumException: Checksum error:
file:/home/th/terasort_in/part-r-0 at 4872 exp: 1592400191 got:
-1117747586
at 
org.apache.hadoop.fs.FSInputChecker.verifySums(FSInputChecker.java:322)
at
org.apache.hadoop.fs.FSInputChecker.readChecksumChunk(FSInputChecker.java:278)
at org.apache.hadoop.fs.FSInputChecker.fill(FSInputChecker.java:213)
at org.apache.hadoop.fs.FSInputChecker.read1(FSInputChecker.java:231)
at org.apache.hadoop.fs.FSInputChecker.read(FSInputChecker.java:195)
at java.io.DataInputStream.read(DataInputStream.java:161)
at
org.apache.spark.examples.terasort.TeraInputFormat$TeraRecordReader.nextKeyValue(TeraInputFormat.scala:91)

Code:
override def nextKeyValue() : Boolean = {
  if (offset = length) {
return false
  }
  var read : Int = 0
  while (read  TeraInputFormat.RECORD_LEN) {
var newRead : Int = in.read(buffer, read, TeraInputFormat.RECORD_LEN
- read)
if (newRead == -1) {
  if (read == 0) false
  else throw new EOFException(read past eof)
}
read += newRead
  }
  if (key == null) {
key = new Array[Byte](TeraInputFormat.KEY_LEN)
  }
  if (value == null) {
value = new Array[Byte](TeraInputFormat.VALUE_LEN)
  }
  buffer.copyToArray(key, 0, TeraInputFormat.KEY_LEN)
  buffer.takeRight(TeraInputFormat.VALUE_LEN).copyToArray(value, 0,
TeraInputFormat.VALUE_LEN)
  offset += TeraInputFormat.RECORD_LEN
  true
}



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-running-the-terasort-branche-in-a-cluster-tp21808.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to send user variables from Spark client to custom InputFormat or RecordReader ?

2015-02-22 Thread Tom Vacek
The SparkConf doesn't allow you to set arbitrary variables.  You can use
SparkContext's HadoopRDD and create a JobConf (with whatever variables you
want), and then grab them out of the JobConf in your RecordReader.

On Sun, Feb 22, 2015 at 4:28 PM, hnahak harihar1...@gmail.com wrote:

 Hi,

 I have written custom InputFormat and RecordReader for Spark, I need  to
 use
 user variables from spark client program.

 I added them in SparkConf

  val sparkConf = new
 SparkConf().setAppName(args(0)).set(developer,MyName)

 *and in InputFormat class*

 protected boolean isSplitable(JobContext context, Path filename) {


 System.out.println(# Developer 
 + context.getConfiguration().get(developer) );
 return false;
 }

 but its return me *null* , is there any way I can pass user variables to my
 custom code?

 Thanks !!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-send-user-variables-from-Spark-client-to-custom-InputFormat-or-RecordReader-tp21755.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: OutOfMemory and GC limits (TODO) Error in map after self-join

2015-02-18 Thread Tom Walwyn
Thanks Imran, I'll try your suggestions.

I eventually got this to run by 'checkpointing' the joined RDD (according
to Akhil's suggestion) before performing the reduceBy, and then
checkpointing it again afterward. i.e.

 val rdd2 = rdd.join(rdd, numPartitions=1000)
  .map(fp=((fp._2._1, fp._2._2), 1))
  .persist(MEMORY_AND_DISK_SER)
 val rdd3 = rdd2.reduceByKey((x,y)=x+y).persist(MEMORY_AND_DISK_SER)
 rdd3.count()

It takes a while, but at least it runs. So, I'll be sure to try your
suggestions for further speed-up.

Thanks again for your help.


On 18 February 2015 at 18:47, Imran Rashid iras...@cloudera.com wrote:

 Hi Tom,

 there are a couple of things you can do here to make this more efficient.
  first, I think you can replace your self-join with a groupByKey. on your
 example data set, this would give you

 (1, Iterable(2,3))
 (4, Iterable(3))

 this reduces the amount of data that needs to be shuffled, and that way
 you can produce all of your pairs just from the Iterable(2,3).

 second, if you expect the same pairs to appear many times in your dataset,
 you might first want to replace them with a count.  eg., if you start with

 (1,2)
 (1,2)
 (1,2)
 ...
 (1,2)
 (1,3)
 (1,3)
 (4,3)
 ...

 you might want to first convert that to get a count of each pair

 val pairCounts = rdd.map{x = (x,1)}.reduceByKey{_ + _}

 to give you something like:

 ((1,2), 145)
 ((1,3), 2)
 ((4,3), 982)
 ...

 and then with a little more massaging you can group by key and also keep
 the counts of each item:

 val groupedCounts: RDD[(Int, Iterable[(Int,Int)])] =
 pairCounts.map{case((key, value), counts) =
   key - (value,counts)
 }.groupByKey

 which would give you something like

 (1, Iterable((2,145), (3, 2))
 (4, Iterable((3, 982))


 hope this helps
 Imran

 On Wed, Feb 18, 2015 at 1:43 AM, Tom Walwyn twal...@gmail.com wrote:

 Thanks for the reply, I'll try your suggestions.

 Apologies, in my previous post I was mistaken. rdd is actually an PairRDD
 of (Int, Int). I'm doing the self-join so I can count two things. First, I
 can count the number of times a value appears in the data set. Second I can
 count number of times values occur with the same key. For example, if I
 have the following:

 (1,2)
 (1,3)
 (4,3)

 Then joining with itself I get:

 (1,(2,2)) - map - ((2,2),1) - reduceByKey - ((2,2),1)
 (1,(2,3)) - map - ((2,3),1) - reduceByKey - ((2,3),1)
 (1,(3,2)) - map - ((3,2),1) - reduceByKey - ((3,2),1)
 (1,(3,3)) - map - ((3,3),1) - reduceByKey - ((3,3),2)
 (4,(3,3)) - map - ((3,3),1) _|

 Note that I want to keep the duplicates (2,2) and reflections.

 Rgds

 On 18 February 2015 at 09:00, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Why are you joining the rdd with itself?

 You can try these things:

 - Change the StorageLevel of both rdds to MEMORY_AND_DISK_2 or
 MEMORY_AND_DISK_SER, so that it doesnt need to keep everything up in memory.

 - Set your default Serializer to Kryo (.set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer))

 - Enable rdd compression (.set(spark.rdd.compress,true))


 Thanks
 Best Regards

 On Wed, Feb 18, 2015 at 12:21 PM, Tom Walwyn twal...@gmail.com wrote:

 Hi All,

 I'm a new Spark (and Hadoop) user and I want to find out if the cluster
 resources I am using are feasible for my use-case. The following is a
 snippet of code that is causing a OOM exception in the executor after about
 125/1000 tasks during the map stage.

  val rdd2 = rdd.join(rdd, numPartitions=1000)
  .map(fp=((fp._2._1, fp._2._2), 1))
  .reduceByKey((x,y)=x+y)
  rdd2.count()

 Which errors with a stack trace like:

  15/02/17 16:30:11 ERROR executor.Executor: Exception in task 98.0 in
 stage 2.0 (TID 498)
  java.lang.OutOfMemoryError: GC overhead limit exceeded
  at
 scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:168)
  at
 scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:45)
  at
 scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
  at
 scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
  at scala.collection.immutable.List.foreach(List.scala:318)

 rdd is a PairRDD of (Int, (Int, Int)). The idea is to get the count of
 co-occuring values by key in the dataset, i.e. 'These two numbers occurred
 with the same key n times'. I intentionally don't want to filter out
 duplicates and reflections. rdd is about 3.6 million records, which has a
 size in memory of about 120MB, and results in a 'joined' RDD (before the
 reduceByKey stage) of around 460 million records, with a size in memory of
 about 35GB.

 My cluster setup is as follows. I have 3 nodes, where each node has 2
 cores and about 7.5GB of memory. I'm running Spark on YARN. The driver and
 executors are allowed 1280m each and the job has 5 executors and 1 driver.
 Additionally, I have set spark.storage.memoryFraction to 0.06, and
 spark.shuffle.memoryFraction to 0.65 in the hopes that this would mitigate

Re: OutOfMemory and GC limits (TODO) Error in map after self-join

2015-02-17 Thread Tom Walwyn
Thanks for the reply, I'll try your suggestions.

Apologies, in my previous post I was mistaken. rdd is actually an PairRDD
of (Int, Int). I'm doing the self-join so I can count two things. First, I
can count the number of times a value appears in the data set. Second I can
count number of times values occur with the same key. For example, if I
have the following:

(1,2)
(1,3)
(4,3)

Then joining with itself I get:

(1,(2,2)) - map - ((2,2),1) - reduceByKey - ((2,2),1)
(1,(2,3)) - map - ((2,3),1) - reduceByKey - ((2,3),1)
(1,(3,2)) - map - ((3,2),1) - reduceByKey - ((3,2),1)
(1,(3,3)) - map - ((3,3),1) - reduceByKey - ((3,3),2)
(4,(3,3)) - map - ((3,3),1) _|

Note that I want to keep the duplicates (2,2) and reflections.

Rgds

On 18 February 2015 at 09:00, Akhil Das ak...@sigmoidanalytics.com wrote:

 Why are you joining the rdd with itself?

 You can try these things:

 - Change the StorageLevel of both rdds to MEMORY_AND_DISK_2 or
 MEMORY_AND_DISK_SER, so that it doesnt need to keep everything up in memory.

 - Set your default Serializer to Kryo (.set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer))

 - Enable rdd compression (.set(spark.rdd.compress,true))


 Thanks
 Best Regards

 On Wed, Feb 18, 2015 at 12:21 PM, Tom Walwyn twal...@gmail.com wrote:

 Hi All,

 I'm a new Spark (and Hadoop) user and I want to find out if the cluster
 resources I am using are feasible for my use-case. The following is a
 snippet of code that is causing a OOM exception in the executor after about
 125/1000 tasks during the map stage.

  val rdd2 = rdd.join(rdd, numPartitions=1000)
  .map(fp=((fp._2._1, fp._2._2), 1))
  .reduceByKey((x,y)=x+y)
  rdd2.count()

 Which errors with a stack trace like:

  15/02/17 16:30:11 ERROR executor.Executor: Exception in task 98.0 in
 stage 2.0 (TID 498)
  java.lang.OutOfMemoryError: GC overhead limit exceeded
  at
 scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:168)
  at
 scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:45)
  at
 scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
  at
 scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
  at scala.collection.immutable.List.foreach(List.scala:318)

 rdd is a PairRDD of (Int, (Int, Int)). The idea is to get the count of
 co-occuring values by key in the dataset, i.e. 'These two numbers occurred
 with the same key n times'. I intentionally don't want to filter out
 duplicates and reflections. rdd is about 3.6 million records, which has a
 size in memory of about 120MB, and results in a 'joined' RDD (before the
 reduceByKey stage) of around 460 million records, with a size in memory of
 about 35GB.

 My cluster setup is as follows. I have 3 nodes, where each node has 2
 cores and about 7.5GB of memory. I'm running Spark on YARN. The driver and
 executors are allowed 1280m each and the job has 5 executors and 1 driver.
 Additionally, I have set spark.storage.memoryFraction to 0.06, and
 spark.shuffle.memoryFraction to 0.65 in the hopes that this would mitigate
 the issue. I've also tried increasing the number of partitions after the
 join dramatically (up to 15000). Nothing has been effective. Thus, I'm
 beginning to suspect I don't have enough resources for the job.

 Does anyone have a feeling about what the resource requirements would be
 for a use-case like this? I could scale the cluster up if necessary, but
 would like to avoid it. I'm willing to accept longer computation times if
 that is an option.

 Warm Regards,
 Thomas





OutOfMemory and GC limits (TODO) Error in map after self-join

2015-02-17 Thread Tom Walwyn
Hi All,

I'm a new Spark (and Hadoop) user and I want to find out if the cluster
resources I am using are feasible for my use-case. The following is a
snippet of code that is causing a OOM exception in the executor after about
125/1000 tasks during the map stage.

 val rdd2 = rdd.join(rdd, numPartitions=1000)
 .map(fp=((fp._2._1, fp._2._2), 1))
 .reduceByKey((x,y)=x+y)
 rdd2.count()

Which errors with a stack trace like:

 15/02/17 16:30:11 ERROR executor.Executor: Exception in task 98.0 in
stage 2.0 (TID 498)
 java.lang.OutOfMemoryError: GC overhead limit exceeded
 at
scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:168)
 at
scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:45)
 at
scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
 at
scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
 at scala.collection.immutable.List.foreach(List.scala:318)

rdd is a PairRDD of (Int, (Int, Int)). The idea is to get the count of
co-occuring values by key in the dataset, i.e. 'These two numbers occurred
with the same key n times'. I intentionally don't want to filter out
duplicates and reflections. rdd is about 3.6 million records, which has a
size in memory of about 120MB, and results in a 'joined' RDD (before the
reduceByKey stage) of around 460 million records, with a size in memory of
about 35GB.

My cluster setup is as follows. I have 3 nodes, where each node has 2 cores
and about 7.5GB of memory. I'm running Spark on YARN. The driver and
executors are allowed 1280m each and the job has 5 executors and 1 driver.
Additionally, I have set spark.storage.memoryFraction to 0.06, and
spark.shuffle.memoryFraction to 0.65 in the hopes that this would mitigate
the issue. I've also tried increasing the number of partitions after the
join dramatically (up to 15000). Nothing has been effective. Thus, I'm
beginning to suspect I don't have enough resources for the job.

Does anyone have a feeling about what the resource requirements would be
for a use-case like this? I could scale the cluster up if necessary, but
would like to avoid it. I'm willing to accept longer computation times if
that is an option.

Warm Regards,
Thomas


PySpark saveAsTextFile gzip

2015-01-15 Thread Tom Seddon
Hi,

I've searched but can't seem to find a PySpark example.  How do I write
compressed text file output to S3 using PySpark saveAsTextFile?

Thanks,

Tom


Efficient way to split an input data set into different output files

2014-11-19 Thread Tom Seddon
I'm trying to set up a PySpark ETL job that takes in JSON log files and
spits out fact table files for upload to Redshift.  Is there an efficient
way to send different event types to different outputs without having to
just read the same cached RDD twice?  I have my first RDD which is just a
json parsed version of the input data, and I need to create a flattened
page views dataset off this based on eventType = 'INITIAL', and then a page
events dataset from the same RDD based on eventType  = 'ADDITIONAL'.
Ideally I'd like the output files for both these tables to be written at
the same time, so I'm picturing a function with one input RDD in and two
RDDs out, or a function utilising two CSV writers.  I'm using mapPartitions
at the moment to write to files like this:

def write_records(records):
output = StringIO.StringIO()
writer = vlad.CsvUnicodeWriter(output, dialect='excel')
for record in records:
writer.writerow(record)
return [output.getvalue()]

and I use this in the call to write the file as follows (pageviews and
events get created off the same json parsed RDD by filtering on INITIAL or
ADDITIONAL respectively):

pageviews.mapPartitions(writeRecords).saveAsTextFile('s3n://output/pageviews/')
events.mapPartitions(writeRecords).saveAsTextFile(''s3n://output/events/)

Is there a way to change this so that both are written in the same process?


Re: Broadcast failure with variable size of ~ 500mb with key already cancelled ?

2014-11-11 Thread Tom Seddon
Hi,

Just wondering if anyone has any advice about this issue, as I am
experiencing the same thing.  I'm working with multiple broadcast variables
in PySpark, most of which are small, but one of around 4.5GB, using 10
workers at 31GB memory each and driver with same spec.  It's not running
out of memory as far as I can see, but definitely only happens when I add
the large broadcast.  Would be most grateful for advice.

I tried playing around with the last 3 conf settings below, but no luck:

SparkConf().set(spark.master.memory, 26)
.set(spark.executor.memory, 26)
.set(spark.worker.memory, 26)
.set(spark.driver.memory, 26).
.set(spark.storage.memoryFraction,1)
.set(spark.core.connection.ack.wait.timeout,6000)
.set(spark.akka.frameSize,50)

Thanks,

Tom


On 24 October 2014 12:31, htailor hemant.tai...@live.co.uk wrote:

 Hi All,

 I am relatively new to spark and currently having troubles with
 broadcasting
 large variables ~500mb in size. Th
 e broadcast fails with an error shown below and the memory usage on the
 hosts also blow up.

 Our hardware consists of 8 hosts (1 x 64gb (driver) and 7 x 32gb (workers))
 and we are using Spark 1.1 (Python) via Cloudera CDH 5.2.

 We have managed to replicate the error using a test script shown below. I
 would be interested to know if anyone has seen this before with
 broadcasting
 or know of a fix.

 === ERROR ==

 14/10/24 08:20:04 INFO BlockManager: Found block rdd_11_31 locally
 14/10/24 08:20:08 INFO ConnectionManager: Key not valid ?
 sun.nio.ch.SelectionKeyImpl@fbc6caf
 14/10/24 08:20:08 INFO ConnectionManager: Removing ReceivingConnection to
 ConnectionManagerId(pigeon3.ldn.ebs.io,55316)
 14/10/24 08:20:08 INFO ConnectionManager: Removing SendingConnection to
 ConnectionManagerId(pigeon3.ldn.ebs.io,55316)
 14/10/24 08:20:08 INFO ConnectionManager: Removing SendingConnection to
 ConnectionManagerId(pigeon3.ldn.ebs.io,55316)
 14/10/24 08:20:08 INFO ConnectionManager: key already cancelled ?
 sun.nio.ch.SelectionKeyImpl@fbc6caf
 java.nio.channels.CancelledKeyException
 at
 org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386)
 at

 org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139)
 14/10/24 08:20:13 INFO ConnectionManager: Removing ReceivingConnection to
 ConnectionManagerId(pigeon7.ldn.ebs.io,52524)
 14/10/24 08:20:13 INFO ConnectionManager: Removing SendingConnection to
 ConnectionManagerId(pigeon7.ldn.ebs.io,52524)
 14/10/24 08:20:13 INFO ConnectionManager: Removing SendingConnection to
 ConnectionManagerId(pigeon7.ldn.ebs.io,52524)
 14/10/24 08:20:15 INFO ConnectionManager: Removing ReceivingConnection to
 ConnectionManagerId(toppigeon.ldn.ebs.io,34370)
 14/10/24 08:20:15 INFO ConnectionManager: Key not valid ?
 sun.nio.ch.SelectionKeyImpl@3ecfdb7e
 14/10/24 08:20:15 INFO ConnectionManager: Removing SendingConnection to
 ConnectionManagerId(toppigeon.ldn.ebs.io,34370)
 14/10/24 08:20:15 INFO ConnectionManager: Removing SendingConnection to
 ConnectionManagerId(toppigeon.ldn.ebs.io,34370)
 14/10/24 08:20:15 INFO ConnectionManager: key already cancelled ?
 sun.nio.ch.SelectionKeyImpl@3ecfdb7e
 java.nio.channels.CancelledKeyException
 at
 org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386)
 at

 org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139)
 14/10/24 08:20:19 INFO ConnectionManager: Removing ReceivingConnection to
 ConnectionManagerId(pigeon8.ldn.ebs.io,48628)
 14/10/24 08:20:19 INFO ConnectionManager: Removing SendingConnection to
 ConnectionManagerId(pigeon8.ldn.ebs.io,48628)
 14/10/24 08:20:19 INFO ConnectionManager: Removing ReceivingConnection to
 ConnectionManagerId(pigeon6.ldn.ebs.io,44996)
 14/10/24 08:20:19 INFO ConnectionManager: Removing SendingConnection to
 ConnectionManagerId(pigeon6.ldn.ebs.io,44996)
 14/10/24 08:20:27 ERROR SendingConnection: Exception while reading
 SendingConnection to ConnectionManagerId(pigeon8.ldn.ebs.io,48628)
 java.nio.channels.ClosedChannelException
 at
 sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252)
 at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:295)
 at
 org.apache.spark.network.SendingConnection.read(Connection.scala:390)
 at

 org.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:199)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 14/10/24 08:20:27 ERROR SendingConnection: Exception while reading
 SendingConnection to ConnectionManagerId(pigeon6.ldn.ebs.io,44996)
 java.nio.channels.ClosedChannelException
 at
 sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252)
 at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:295

Re: ERROR ConnectionManager: Corresponding SendingConnection to ConnectionManagerId

2014-11-11 Thread Tom Seddon
Yes please can you share.  I am getting this error after expanding my
application to include a large broadcast variable. Would be good to know if
it can be fixed with configuration.

On 23 October 2014 18:04, Michael Campbell michael.campb...@gmail.com
wrote:

 Can you list what your fix was so others can benefit?

 On Wed, Oct 22, 2014 at 8:15 PM, arthur.hk.c...@gmail.com 
 arthur.hk.c...@gmail.com wrote:

 Hi,

 I have managed to resolve it because a wrong setting. Please ignore this .

 Regards
 Arthur

 On 23 Oct, 2014, at 5:14 am, arthur.hk.c...@gmail.com 
 arthur.hk.c...@gmail.com wrote:


 14/10/23 05:09:04 WARN ConnectionManager: All connections not cleaned up






java.library.path

2014-10-05 Thread Tom
Hi,

I am trying to call some c code, let's say the compiled file is /path/code,
and it has chmod +x. When I call it directly, it works. Now i want to call
it from Spark 1.1. My problem is not building it into Spark, but making sure
Spark can find it.

I have tried:
SPARK_DAEMON_JAVA_OPTS=-Djava.library.path=/path
SPARK_DAEMON_JAVA_OPTS=-Djava.library.path=/path/code
SPARK_CLASSPATH=-Djava.library.path=/path
SPARK_JAVA_OPTS=-Djava.library.path=/path
SPARK_LIBRARY_PATH=/path

All with and without the . Every single time I get
Exception in thread main java.lang.UnsatisfiedLinkError: code (Not found
in java.library.path)

Any advice? Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/java-library-path-tp15766.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Question about addFiles()

2014-10-03 Thread Tom Weber
Just getting started with Spark, so hopefully this is all there and I just 
haven't found it yet.

I have a driver pgm on my client machine, I can use addFiles to distribute 
files to the remote
worker nodes of the cluster. They are there to be found by my code running in 
the executors,
so al is good. But ...


1)  it also makes a copy on the local machine

-  is there a way to identify this isn't needed? I only need it on the 
cluster.

-  if I sent a .tar file, it unzips it for me, which is nice, but 
again, extra work on the

client machine when I'm not using it there.

2)  it copies the files to the spark_installdir/work/.

-  that's fine, I suppose. though is there any way to designate a 
location?

3)  they don't get cleaned up

-  I don't see anything ever getting removed from the work/. location; 
just keeps adding up

-  there was a cleanFiles() call, but I don't know that it cleaned up 
rather than just stopped

copying anymore (how it was doc'ed). But, this is deprecated now so it's moot 
anyway.

-  is there a removeFiles() call to clean up? What's the expected use 
case? How does my

code manually clean up; permission issues if I try?
Again, I searched the archives but didn't see any of this, but I'm just getting 
started so may very well
be missing this somewhere.

Thanks!
Tom


Re: Retrieve dataset of Big Data Benchmark

2014-09-27 Thread Tom
Hi,

I was able to download the dataset this way (and just reconfirmed it by
doing so again):
//Following before starting spark
export AWS_ACCESS_KEY_ID=*key_id* 
export AWS_SECRET_ACCESS_KEY=*access_key*
//Start spark
./spark-shell
//In the spark shell
val dataset = sc.textFile(s3n://big-data-benchmark/pavlo/text/tiny/crawl)
dataset.saveAsTextFile(/home/tom/hadoop/bigDataBenchmark/test/crawl3.txt)

If you want to do this more often, or use it directly from the cloud instead
of from local (which will be slower), you can add these keys to
./conf/spark-env.sh



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Retrieve-dataset-of-Big-Data-Benchmark-tp9821p15278.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Reduce Tuple2Integer, Integer to Tuple2Integer,ListInteger

2014-09-16 Thread Tom
From my map function I create Tuple2Integer, Integer pairs. Now I want to
reduce them, and get something like Tuple2Integer, Listlt;Integer. 

The only way I found to do this was by treating all variables as String, and
in the reduceByKey do
/return a._2 + , + b._2/ //in which both are numeric values saved in a
String
After which I do a Arrays.asList(string.split(,)) in mapValues. This
leaves me with String, Listlt;Integer. So now I am looking for either
- A function with which I can transform String, Listlt;Integer to
Integer, Listlt;Integer
or
- A way to reduce Tuple2Integer, Integer into a Tuple2Integer,
Listlt;Integer in the reduceByKey function so that I can use Integers all
the way

Of course option two would have preferences.

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Reduce-Tuple2-Integer-Integer-to-Tuple2-Integer-List-Integer-tp14361.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



JavaPairRDDString, Integer to JavaPairRDDString, String based on key

2014-09-10 Thread Tom
Is it possible to generate a JavaPairRDDString, Integer from a
JavaPairRDDString, String, where I can also use the key values? I have
looked at for instance mapToPair, but this generates a new K/V pair based on
the original value, and does not give me information about the key.

I need this in the initialization phase, where I have two RDD's with similar
keys, but with different types of values. Generating these is computational
intensive, and if I could use the first list to generate the second, it
would save me a big map/reduce phase.

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/JavaPairRDD-String-Integer-to-JavaPairRDD-String-String-based-on-key-tp13875.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Return multiple [K,V] pairs from a Java Function

2014-08-24 Thread Tom
Hi,

I would like to create multiple key-value pairs, where all keys still can be
reduced. For instance, I have the following 2 lines:
A,B,C
B,D

I would like to return the following pairs for the first line:
A,B
A,C
B,A
B,C
C,A
C,B
And for the second
B,D
D,B

After a reduce by key, I want to end up with
A,B,C
BA,B,D
CA,B
D

In Hadoop, I used a list and a for-loop to write multiple times like below
context.write(new Text(local[i]), new Text(local[j]));

In Spark I was thinking of the mapToPair with a JavaPairRDD, but this only
returns 1 Tuple2. I know I can return a key, listlt;value, but then I
could only reduce on the A|B|C, not on all.
JavaPairRDDString, String tuples = actors.mapToPair(
  new PairFunctionString, String, String() {
public Tuple2String, String call(String w) {
  return new Tuple2String, String(w, 1);
}
});

Thanks!

P.S. No need to fill in the function, just interested in the return type
P.S.2 I'm using Java 7, so I can't use lambda's :)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Return-multiple-K-V-pairs-from-a-Java-Function-tp12720.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Trying to make sense of the actual executed code

2014-08-06 Thread Tom
Hi,

I am trying to look at for instance the following SQL query in Spark 1.1:
SELECT table.key, table.value, table2.value FROM table2 JOIN table WHERE
table2.key = table.key
When I look at the output, I see that there are several stages, and several
tasks per stage. The tasks have a TID, I do not see such a thing for a
stage. I see the input split of the files and start, running and finished
messages for the tasks. But what I really want to know is the following:
Which map, shuffle and reduces are performed in which order/where can I see
the actual executed code per task/stage. In between files/rdd's would be a
bonus!

Thanks in advance,

Tom



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Trying-to-make-sense-of-the-actual-executed-code-tp11594.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



  1   2   >