Executors idle, driver heap exploding and maxing only 1 cpu core

2019-05-23 Thread Ashic Mahtab
Hi,
We have a quite long winded Spark application we inherited with many stages. 
When we run on our spark cluster, things start off well enough. Workers are 
busy, lots of progress made, etc. etc. However, 30 minutes into processing, we 
see CPU usage of the workers drop drastically. At this time, we also see that 
the driver is maxing out exactly one core (though we've given it more than 
one), and its ram usage is creeping up. At this time, there's no logs coming 
out on the driver. Everything seems to stop, and then it suddenly starts 
working, and the workers start working again. The driver ram doesn't go down, 
but flatlines. A few minutes later, the same thing happens again - the world 
seems to stop. However, the driver soon crashes with an out of memory exception.

What could be causing this sort of behaviour on the driver? We don't have any 
collect() or similar functions in the code. We're reading in from Azure blobs, 
processing, and writing back to Azure blobs. Where should we start in trying to 
get to the bottom of this? We're running Spark 2.4.1 in a stand-alone cluster.

Thanks,
Ashic.


Re: Unable to broadcast a very large variable

2019-04-10 Thread Ashic Mahtab
Default is 10mb. Depends on memory available, and what the network transfer 
effects are going to be. You can specify spark.sql.autoBroadcastJoinThreshold 
to increase the threshold in case of spark sql. But you definitely shouldn't be 
broadcasting gigabytes.

From: V0lleyBallJunki3 
Sent: 10 April 2019 10:06
To: user@spark.apache.org
Subject: Unable to broadcast a very large variable

Hello,
   I have a 110 node cluster with each executor having 50 GB memory and I
want to broadcast a variable of 70GB with each machine have 244 GB of
memory. I am having difficulty doing that. I was wondering at what size is
it unwise to broadcast a variable. Is there a general rule of thumb?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



spark and STS tokens (Federation Tokens)

2018-09-26 Thread Ashic Mahtab
Hi,
I'm looking to have spark jobs access S3 with temporary credentials. I've seen 
some examples around AssumeRole, but I have a scenario where the temp 
credentials are provided by GetFederationToken. Is there anything that can 
help, or do I need to use boto to execute GetFederationToken, and then pass the 
temp credentials as config params?

Also, for both GetFederationToken and AssumeRole, is there a valid way of 
refreshing the tokens once the job executes? Temp credentials from AssumeRole 
are quite limited in lifetime, and even with GetFederationToken, the maximum a 
set of temp credentials are valid is limited to 36 hours. If there a callback 
or similar thing we can give to spark that will be called when credentials are 
about to (have) expire (expired)?

Thanks,
Ashic.


Easily creating custom encoders

2017-03-21 Thread Ashic Mahtab
I'm trying to easily create custom encoders for case classes having 
"unfriendly" fields. I could just kryo the whole thing, but would like to at 
least have a few fields in the schema instead of one binary blob. For example,


case class MyClass(id: UUID, items: Map[String, Double], name: String)


Is there a way to create an Encoder[MyClass] by kryo-ing the things that don't 
work, and not the ones that do, while retaining distinct columns? In the 
previous example, I'd at least want (binary, binary, String).


Is this possible?


-Ashic.


Re: How does predicate push down really help?

2016-11-16 Thread Ashic Mahtab
Consider a data source that has data in 500mb files, and doesn't support 
predicate push down. Spark will have to load all the data into memory before it 
can apply filtering, select "columns" etc. Each 500mb file will at some point 
have to be loaded entirely in memory. Now consider a data source that does 
support predicate push down, like mysql. Spark will only need to retrieve the 
rows and columns it needs as the db provides an interface for it to do so. If 
the underlying data source supports predicate push down, and the corresponding 
connector supports it, then filtering, projection, etc. is pushed down to the 
storage level. If not, the full dataset needs to be loaded into memory, and 
filtering, projection, etc. would happen there.

Get Outlook for Android



On Thu, Nov 17, 2016 at 7:50 AM +, "kant kodali" 
> wrote:

Hi Assaf,

I am still trying to understand the merits of predicate push down from the 
examples you pointed out.

Example 1: Say we don't have a predicate push down feature why does spark needs 
to pull all the rows and filter it in memory? why not simply issue select 
statement with "where" clause to do the filtering via JDBC or something?

Example 2: Same Argument as Example 1 except when we don't have a predicate 
push down feature we could simply do it using JOIN and where operators in the 
SQL statement right.

I feel like I am missing something to understand the merits of predicate push 
down.

Thanks,
kant




On Wed, Nov 16, 2016 at 11:34 PM, Mendelson, Assaf 
> wrote:
Actually, both you translate to the same plan.
When you do sql("some code") or filter, it doesn't actually do the query. 
Instead it is translated to a plan (parsed plan) which transform everything 
into standard spark expressions. Then spark analyzes it to fill in the blanks 
(what is users table for example) and attempts to optimize it. Predicate 
pushdown happens in the optimization portion.
For example, let's say that users would actually be backed by a table on an sql 
query in mysql.
Without predicate pushdown spark would first pull the entire users table from 
mysql and only then do the filtering. Predicate pushdown would mean the 
filtering would be done as part of the original sql query.

Another (probably better) example would be something like having two table A 
and B which are joined by some common key. Then a filtering is done on the key. 
Moving the filter to be before the join would probably make everything faster 
as filter is a faster operation than a join.

Assaf.

From: kant kodali [mailto:kanth...@gmail.com]
Sent: Thursday, November 17, 2016 8:03 AM
To: user @spark
Subject: How does predicate push down really help?

How does predicate push down really help? in the following cases

val df1 = spark.sql("select * from users where age > 30")

 vs

val df1 = spark.sql("select * from users")
df.filter("age > 30")





Straming - Stop when there's no more data

2016-11-15 Thread Ashic Mahtab
I'm using Spark Streaming to process a large number of files (10s of millions) 
from a single directory in S3. Using sparkContext.textFile or wholeTextFile 
takes ages and doesn't do anything. Pointing Structured Streaming to that 
location seems to work, but  after processing all the input, it waits for more. 
Is there a way to terminate the Streaming app once all input has been exhausted?


RE: Does Spark SQL support indexes?

2016-08-15 Thread Ashic Mahtab
Guess the good people in the Cassandra world are stuck in the past making 
indexes, materialized views, etc. better with every release :)

From: mich.talebza...@gmail.com
Date: Mon, 15 Aug 2016 11:11:03 +0100
Subject: Re: Does Spark SQL support indexes?
To: gourav.sengu...@gmail.com
CC: charles.up...@gmail.com; user@spark.apache.org

Are you sure about that Gourav :)



Dr Mich Talebzadeh

 

LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction
of data or any other property which may arise from relying on this email's 
technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.  



On 15 August 2016 at 10:59, Gourav Sengupta  wrote:
The world has moved in from indexes, materialized views, and other single 
processor non-distributed system algorithms. Nice that you are not asking 
questions regarding hierarchical file systems.

Regards,Gourav 
On Sun, Aug 14, 2016 at 4:03 AM, Taotao.Li  wrote:
hi, guys, does Spark SQL support indexes?  if so, how can I create an index on 
my temp table? if not, how can I handle some specific queries on a very large 
table? it would iterate all the table even though all I want is just a small 
piece of that table.
great thanks, 

___Quant | Engineer | Boy
___blog:http://litaotao.github.iogithub: 
www.github.com/litaotao






  

RE: Simulate serialization when running local

2016-08-15 Thread Ashic Mahtab
Thanks Miguel...will have a read.
Thanks Jacek...that looks incredibly useful.

:)

Subject: Re: Simulate serialization when running local
From: mig...@zero-x.co
Date: Sun, 14 Aug 2016 21:07:41 -0700
CC: as...@live.com; user@spark.apache.org
To: ja...@japila.pl

Hi Ashic,
Absolutely.  Serialization errors can be caught locally by taking a test driven 
approach.  I have written a blog post about this as I believe it's important to 
develop spark applications this way.  
If you're interested you can find my post at 
https://medium.com/@therevoltingx/test-driven-development-w-apache-spark-746082b44941#.egnvmicyb
Thanks 

Sent from my iPhone
On Aug 14, 2016, at 6:07 PM, Jacek Laskowski <ja...@japila.pl> wrote:

Hi Ashic,

Yes, there is one - local-cluster[N, cores, memory] - that you can use
for simulating a Spark cluster of [N, cores, memory] locally.

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L2478

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Wed, Aug 10, 2016 at 10:24 AM, Ashic Mahtab <as...@live.com> wrote:
Hi,
Is there a way to simulate "networked" spark when running local (i.e.
master=local[4])? Ideally, some setting that'll ensure any "Task not
serializable" errors are caught during local testing? I seem to vaguely
remember something, but am having trouble pinpointing it.

Cheers,
Ashic.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

  

RE: Spark join and large temp files

2016-08-12 Thread Ashic Mahtab
Hi Gourav,Thanks for your input. As mentioned previously, we've tried the 
broadcast. We've failed to broadcast 1.5GB...perhaps some tuning can help. We 
see CPU go up to 100%, and then workers die during the broadcast. I'm not sure 
if it's a good idea to broadcast that much, as spark's broadcast hint be 
default uses a threshold of just 10MB to decide whether to broadcast or not.
As for redis, we're not needing a seperate redis cluster or anything. We're 
using embedded redis on the driver that lives for the duration of the job. It's 
essentially a way to have some memory on the driver that can accomodate 1.5GB 
and allows access over the network. https://github.com/kstyrc/embedded-redis 
makes this trivial to do.
I don't know if this a 2011 way of solving this problem or not, but 
http://www.slideshare.net/databricks/strata-sj-everyday-im-shuffling-tips-for-writing-better-spark-programs
 seems to suggest that a good approach to joining a huge dataset with one that 
can't be made smaller is using a database. We've gone by that, and it seems to 
be working. We've tried all the other recommendations (broadcast the dataframe 
as part of the join, explicitly broadcast a hashmap from the driver, register 
temp tables, etc.) - and nothing else has worked. The parquet dataframe doesn't 
have a partitioner when loaded, and any sort of operation requiring a network 
shuffle causes temp disk fill up. Within these constraints, the database 
approach turned out to be the only thing we could get working (without paying 
double / treble for nodes that have more disk space to hold the temp files).
Regards,Ashic.

From: gourav.sengu...@gmail.com
Date: Thu, 11 Aug 2016 21:52:07 +0100
Subject: Re: Spark join and large temp files
To: bteeu...@gmail.com
CC: user@spark.apache.org

The point is that if you have skewed data then one single reducer will finally 
take a very long time, and you do not need to try this even, just search in 
Google and skewed data is a known problem in joins even in SPARK.
Therefore instead of using join, in case the used case permits, just write a 
UDF, which then works as a look up. Using broadcast is the SPARK way, and 
someone mentioned here the use of Redis, which I remember used to be the way 
around in 2011 in the initial days of MR.
Regards,Gourav
On Thu, Aug 11, 2016 at 9:24 PM, Ben Teeuwen <bteeu...@gmail.com> wrote:
Hmm, hashing will probably send all of the records with the same key to the 
same partition / machine.I’d try it out, and hope that if you have a few 
superlarge keys bigger than the RAM available of one node, they spill to disk. 
Maybe play with persist() and using a different Storage Level.
On Aug 11, 2016, at 9:48 PM, Gourav Sengupta <gourav.sengu...@gmail.com> wrote:
Hi Ben,
and that will take care of skewed data?
Gourav 
On Thu, Aug 11, 2016 at 8:41 PM, Ben Teeuwen <bteeu...@gmail.com> wrote:
When you read both ‘a’ and ‘b', can you try repartitioning both by column ‘id’? 
If you .cache() and .count() to force a shuffle, it'll push the records that 
will be joined to the same executors. 
So;a = spark.read.parquet(‘path_to_table_a’).repartition(‘id’).cache()a.count()
b = spark.read.parquet(‘path_to_table_b').repartition(‘id’).cache()b.count()
And then join..

On Aug 8, 2016, at 8:17 PM, Ashic Mahtab <as...@live.com> wrote:
Hello,We have two parquet inputs of the following form:
a: id:String, Name:String  (1.5TB)b: id:String, Number:Int  (1.3GB)
We need to join these two to get (id, Number, Name). We've tried two approaches:
a.join(b, Seq("id"), "right_outer")
where a and b are dataframes. We also tried taking the rdds, mapping them to 
pair rdds with id as the key, and then joining. What we're seeing is that temp 
file usage is increasing on the join stage, and filling up our disks, causing 
the job to crash. Is there a way to join these two data sets without 
well...crashing?
Note, the ids are unique, and there's a one to one mapping between the two 
datasets. 
Any help would be appreciated.
-Ashic. 




  

RE: Spark join and large temp files

2016-08-11 Thread Ashic Mahtab
Hi Ben,Already tried that. The thing is that any form of shuffle on the big 
dataset (which repartition will cause) puts a node's chunk into /tmp, and that 
fill up disk. I solved the problem by storing the 1.5GB dataset in an embedded 
redis instance on the driver, and doing a straight flatmap of the big dataset 
(doing lookups in redis). This avoids shuffling, and prevents the /tmp fill-up 
issue.
-Ashic. 

Subject: Re: Spark join and large temp files
From: bteeu...@gmail.com
Date: Thu, 11 Aug 2016 22:24:42 +0200
CC: user@spark.apache.org
To: gourav.sengu...@gmail.com

Hmm, hashing will probably send all of the records with the same key to the 
same partition / machine.I’d try it out, and hope that if you have a few 
superlarge keys bigger than the RAM available of one node, they spill to disk. 
Maybe play with persist() and using a different Storage Level.
On Aug 11, 2016, at 9:48 PM, Gourav Sengupta <gourav.sengu...@gmail.com> 
wrote:Hi Ben,
and that will take care of skewed data?
Gourav 
On Thu, Aug 11, 2016 at 8:41 PM, Ben Teeuwen <bteeu...@gmail.com> wrote:
When you read both ‘a’ and ‘b', can you try repartitioning both by column ‘id’? 
If you .cache() and .count() to force a shuffle, it'll push the records that 
will be joined to the same executors. 
So;a = spark.read.parquet(‘path_to_table_a’).repartition(‘id’).cache()a.count()
b = spark.read.parquet(‘path_to_table_b').repartition(‘id’).cache()b.count()
And then join..

On Aug 8, 2016, at 8:17 PM, Ashic Mahtab <as...@live.com> wrote:
Hello,We have two parquet inputs of the following form:
a: id:String, Name:String  (1.5TB)b: id:String, Number:Int  (1.3GB)
We need to join these two to get (id, Number, Name). We've tried two approaches:
a.join(b, Seq("id"), "right_outer")
where a and b are dataframes. We also tried taking the rdds, mapping them to 
pair rdds with id as the key, and then joining. What we're seeing is that temp 
file usage is increasing on the join stage, and filling up our disks, causing 
the job to crash. Is there a way to join these two data sets without 
well...crashing?
Note, the ids are unique, and there's a one to one mapping between the two 
datasets. 
Any help would be appreciated.
-Ashic. 



  

Simulate serialization when running local

2016-08-10 Thread Ashic Mahtab
Hi,Is there a way to simulate "networked" spark when running local (i.e. 
master=local[4])? Ideally, some setting that'll ensure any "Task not 
serializable" errors are caught during local testing? I seem to vaguely 
remember something, but am having trouble pinpointing it.
Cheers,Ashic. 

RE: Spark join and large temp files

2016-08-10 Thread Ashic Mahtab
Already tried that. The CPU hits 100% on the collectAsMap (even tried 
foreaching to a java ConcurrentHashmap), and eventually finishes, but while 
broadcasting, it takes a while, and at some point there's some timeout, and the 
worker is killed. The driver (and workers) have more than enough RAM (1.5GB of 
parquet expands to about 4.5GB, and the nodes have 64GB RAM). Filtering is also 
not an option, as every entry of the "smaller" dataset exists in the large one.
As mentioned in another reply, I managed to get it working by using embedded 
Redis on the driver, loading the smaller dataset into it, and then doing a 
straight map on the larger dataset via a foreachPartition, and doing lookups to 
the dirver's Redis. Since there's no network shuffle, the temp folder is barely 
touched, and it seems to work quite well.
-Ashic.

From: zouz...@gmail.com
Date: Wed, 10 Aug 2016 08:22:24 +0200
Subject: Re: Spark join and large temp files
To: as...@live.com

Hi Ashic,
I think this approach should solve your problem, i.e., by broadcasting the 
small RDD. However you should do it propertly.
IMO, you should try
val smallRDDBroadcasted: Map[Int, YouTypeValue] = 
sc.broadcast(smallRDD.collectAsMap())
bigRDD.mapPartitoin{ case elems =>   // Here manually join using the map
elems.flatMap{ case (key, value) =>  
smallRDDBroadcasted.value.get(key).map(x => (key, (value,x))}}
Ensure that your driver has enough memory to store the above Map. If you get 
out of memory on the driver, increase your memory.
Speaking of which, a filtering step might also help on the above, i.e., filter 
the bigRDD with the keys of the Map before joining.
Hope this helps,Anastasios 

On Tue, Aug 9, 2016 at 4:46 PM, Ashic Mahtab <as...@live.com> wrote:



Hi Sam,Yup. It seems it stalls when broadcasting. CPU goes to 100%, but there's 
no progress. The spark UI doesn't even show up.
-Ashic. 

From: samkiller@gmail.com
Date: Tue, 9 Aug 2016 16:21:27 +0200
Subject: Re: Spark join and large temp files
To: as...@live.com
CC: deepakmc...@gmail.com; user@spark.apache.org

Have you tried to broadcast your small table table in order to perform your 
join ?
joined = bigDF.join(broadcast(smallDF, )

On Tue, Aug 9, 2016 at 3:29 PM, Ashic Mahtab <as...@live.com> wrote:



Hi Deepak,No...not really. Upping the disk size is a solution, but more 
expensive as you can't attach EBS volumes to EMR clusters configured with data 
pipelines easily (which is what we're doing). I've tried collecting the 1.5G 
dataset in a hashmap, and broadcasting. Timeouts seems to prevent that (even 
after upping the max driver result size). Increasing partition counts didn't 
help (the shuffle used up the temp space). I'm now looking at some form of 
clever broadcasting, or maybe falling back to chunking up the input, producing 
interim output, and unioning them for the final output. Might even try using 
Spark Streaming pointing to the parquet and seeing if that helps. 
-Ashic. 

From: deepakmc...@gmail.com
Date: Tue, 9 Aug 2016 17:31:19 +0530
Subject: Re: Spark join and large temp files
To: as...@live.com

Hi AshicDid you find the resolution to this issue?Just curious to know like 
what helped in this scenario.
ThanksDeepak


On Tue, Aug 9, 2016 at 12:23 AM, Ashic Mahtab <as...@live.com> wrote:



Hi Deepak,Thanks for the response. 
Registering the temp tables didn't help. Here's what I have:
val a = sqlContext..read.parquet(...).select("eid.id", 
"name").withColumnRenamed("eid.id", "id")val b = 
sqlContext.read.parquet(...).select("id", "number")
a.registerTempTable("a")b.registerTempTable("b")
val results = sqlContext.sql("SELECT x.id, x.name, y.number FROM a x join b y 
on x.id=y.id)
results.write.parquet(...)
Is there something I'm missing?
Cheers,Ashic.
From: deepakmc...@gmail.com
Date: Tue, 9 Aug 2016 00:01:32 +0530
Subject: Re: Spark join and large temp files
To: as...@live.com
CC: user@spark.apache.org

Register you dataframes as temp tables and then try the join on the temp 
table.This should resolve your issue.
ThanksDeepak
On Mon, Aug 8, 2016 at 11:47 PM, Ashic Mahtab <as...@live.com> wrote:



Hello,We have two parquet inputs of the following form:
a: id:String, Name:String  (1.5TB)b: id:String, Number:Int  (1.3GB)
We need to join these two to get (id, Number, Name). We've tried two approaches:
a.join(b, Seq("id"), "right_outer")
where a and b are dataframes. We also tried taking the rdds, mapping them to 
pair rdds with id as the key, and then joining. What we're seeing is that temp 
file usage is increasing on the join stage, and filling up our disks, causing 
the job to crash. Is there a way to join these two data sets without 
well...crashing?
Note, the ids are unique, and there's a one to one mapping between the two 
datasets. 
Any help would be appreciated.
-Ashic. 



  

RE: Spark join and large temp files

2016-08-09 Thread Ashic Mahtab
Can you give some outline as to what you mean? Should I broadcast a dataframe, 
and register the broadcasted df as a temp table? And then use a lookup UDF in a 
SELECT query?  
I've managed to get it working by loading the 1.5GB dataset into an embedded 
redis instance on the driver, and used a mapPartitions on the big dataframe to 
map it to the required triples by doing the lookup from redis. It took around 
13 minutes to load the data into redis using 4 cores, and the subsequent map on 
the main dataset was quite fast. 

From: gourav.sengu...@gmail.com
Date: Tue, 9 Aug 2016 21:13:51 +0100
Subject: Re: Spark join and large temp files
To: as...@live.com
CC: mich.talebza...@gmail.com; samkiller@gmail.com; deepakmc...@gmail.com; 
user@spark.apache.org

In case of skewed data the joins will mess things up. Try to write a UDF with 
the lookup on broadcast variable and then let me know the results. It should 
not take more than 40 mins in a 32 GB RAM system with 6 core processors.

Gourav
On Tue, Aug 9, 2016 at 6:02 PM, Ashic Mahtab <as...@live.com> wrote:



Hi Mich,Hardware: AWS EMR cluster with 15 nodes with Rx3.2xlarge (CPU, RAM 
fine, disk a couple of hundred gig).
When we do:
onPointFiveTB.join(onePointFiveGig.cache(), "id")
we're seing that the temp directory is filling up fast, until a node gets 
killed. And then everything dies. 
-Ashic. 

From: mich.talebza...@gmail.com
Date: Tue, 9 Aug 2016 17:25:23 +0100
Subject: Re: Spark join and large temp files
To: as...@live.com
CC: samkiller@gmail.com; deepakmc...@gmail.com; user@spark.apache.org

Hi Sam,
What is your spark Hardware spec, No of nodes, RAM per node and disks please?
I don't understand this should not really be an issue. Underneath the bonnet it 
is a hash join. The small table I gather can be cached and the big table will 
do multiple passes using the temp space.
HTH

Dr Mich Talebzadeh


 


LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw


 


http://talebzadehmich.wordpress.com
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction
of data or any other property which may arise from relying on this email's 
technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.  



On 9 August 2016 at 15:46, Ashic Mahtab <as...@live.com> wrote:



Hi Sam,Yup. It seems it stalls when broadcasting. CPU goes to 100%, but there's 
no progress. The spark UI doesn't even show up.
-Ashic. 

From: samkiller@gmail.com
Date: Tue, 9 Aug 2016 16:21:27 +0200
Subject: Re: Spark join and large temp files
To: as...@live.com
CC: deepakmc...@gmail.com; user@spark.apache.org

Have you tried to broadcast your small table table in order to perform your 
join ?
joined = bigDF.join(broadcast(smallDF, )

On Tue, Aug 9, 2016 at 3:29 PM, Ashic Mahtab <as...@live.com> wrote:



Hi Deepak,No...not really. Upping the disk size is a solution, but more 
expensive as you can't attach EBS volumes to EMR clusters configured with data 
pipelines easily (which is what we're doing). I've tried collecting the 1.5G 
dataset in a hashmap, and broadcasting. Timeouts seems to prevent that (even 
after upping the max driver result size). Increasing partition counts didn't 
help (the shuffle used up the temp space). I'm now looking at some form of 
clever broadcasting, or maybe falling back to chunking up the input, producing 
interim output, and unioning them for the final output. Might even try using 
Spark Streaming pointing to the parquet and seeing if that helps. 
-Ashic. 

From: deepakmc...@gmail.com
Date: Tue, 9 Aug 2016 17:31:19 +0530
Subject: Re: Spark join and large temp files
To: as...@live.com

Hi AshicDid you find the resolution to this issue?Just curious to know like 
what helped in this scenario.
ThanksDeepak


On Tue, Aug 9, 2016 at 12:23 AM, Ashic Mahtab <as...@live.com> wrote:



Hi Deepak,Thanks for the response. 
Registering the temp tables didn't help. Here's what I have:
val a = sqlContext..read.parquet(...).select("eid.id", 
"name").withColumnRenamed("eid.id", "id")val b = 
sqlContext.read.parquet(...).select("id", "number")
a.registerTempTable("a")b.registerTempTable("b")
val results = sqlContext.sql("SELECT x.id, x.name, y.number FROM a x join b y 
on x.id=y.id)
results.write.parquet(...)
Is there something I'm missing?
Cheers,Ashic.
From: deepakmc...@gmail.com
Date: Tue, 9 Aug 2016 00:01:32 +0530
Subject: Re: Spark join and large temp files
To: as...@live.com
CC: user@spark.apache.org

Register you dataframes as temp tables and then try the join on the temp 
table.This should resolve your issue.
ThanksDeepak
On Mon, Aug 8, 2016 at 11:47 PM, Ashic Mahtab <as...@live.com> wrote:



Hello,We have two parquet inputs of the following form:
a: id:Strin

RE: Spark join and large temp files

2016-08-09 Thread Ashic Mahtab
Hi Mich,Hardware: AWS EMR cluster with 15 nodes with Rx3.2xlarge (CPU, RAM 
fine, disk a couple of hundred gig).
When we do:
onPointFiveTB.join(onePointFiveGig.cache(), "id")
we're seing that the temp directory is filling up fast, until a node gets 
killed. And then everything dies. 
-Ashic. 

From: mich.talebza...@gmail.com
Date: Tue, 9 Aug 2016 17:25:23 +0100
Subject: Re: Spark join and large temp files
To: as...@live.com
CC: samkiller@gmail.com; deepakmc...@gmail.com; user@spark.apache.org

Hi Sam,
What is your spark Hardware spec, No of nodes, RAM per node and disks please?
I don't understand this should not really be an issue. Underneath the bonnet it 
is a hash join. The small table I gather can be cached and the big table will 
do multiple passes using the temp space.
HTH

Dr Mich Talebzadeh

 

LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction
of data or any other property which may arise from relying on this email's 
technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.  



On 9 August 2016 at 15:46, Ashic Mahtab <as...@live.com> wrote:



Hi Sam,Yup. It seems it stalls when broadcasting. CPU goes to 100%, but there's 
no progress. The spark UI doesn't even show up.
-Ashic. 

From: samkiller@gmail.com
Date: Tue, 9 Aug 2016 16:21:27 +0200
Subject: Re: Spark join and large temp files
To: as...@live.com
CC: deepakmc...@gmail.com; user@spark.apache.org

Have you tried to broadcast your small table table in order to perform your 
join ?
joined = bigDF.join(broadcast(smallDF, )

On Tue, Aug 9, 2016 at 3:29 PM, Ashic Mahtab <as...@live.com> wrote:



Hi Deepak,No...not really. Upping the disk size is a solution, but more 
expensive as you can't attach EBS volumes to EMR clusters configured with data 
pipelines easily (which is what we're doing). I've tried collecting the 1.5G 
dataset in a hashmap, and broadcasting. Timeouts seems to prevent that (even 
after upping the max driver result size). Increasing partition counts didn't 
help (the shuffle used up the temp space). I'm now looking at some form of 
clever broadcasting, or maybe falling back to chunking up the input, producing 
interim output, and unioning them for the final output. Might even try using 
Spark Streaming pointing to the parquet and seeing if that helps. 
-Ashic. 

From: deepakmc...@gmail.com
Date: Tue, 9 Aug 2016 17:31:19 +0530
Subject: Re: Spark join and large temp files
To: as...@live.com

Hi AshicDid you find the resolution to this issue?Just curious to know like 
what helped in this scenario.
ThanksDeepak


On Tue, Aug 9, 2016 at 12:23 AM, Ashic Mahtab <as...@live.com> wrote:



Hi Deepak,Thanks for the response. 
Registering the temp tables didn't help. Here's what I have:
val a = sqlContext..read.parquet(...).select("eid.id", 
"name").withColumnRenamed("eid.id", "id")val b = 
sqlContext.read.parquet(...).select("id", "number")
a.registerTempTable("a")b.registerTempTable("b")
val results = sqlContext.sql("SELECT x.id, x.name, y.number FROM a x join b y 
on x.id=y.id)
results.write.parquet(...)
Is there something I'm missing?
Cheers,Ashic.
From: deepakmc...@gmail.com
Date: Tue, 9 Aug 2016 00:01:32 +0530
Subject: Re: Spark join and large temp files
To: as...@live.com
CC: user@spark.apache.org

Register you dataframes as temp tables and then try the join on the temp 
table.This should resolve your issue.
ThanksDeepak
On Mon, Aug 8, 2016 at 11:47 PM, Ashic Mahtab <as...@live.com> wrote:



Hello,We have two parquet inputs of the following form:
a: id:String, Name:String  (1.5TB)b: id:String, Number:Int  (1.3GB)
We need to join these two to get (id, Number, Name). We've tried two approaches:
a.join(b, Seq("id"), "right_outer")
where a and b are dataframes. We also tried taking the rdds, mapping them to 
pair rdds with id as the key, and then joining. What we're seeing is that temp 
file usage is increasing on the join stage, and filling up our disks, causing 
the job to crash. Is there a way to join these two data sets without 
well...crashing?
Note, the ids are unique, and there's a one to one mapping between the two 
datasets. 
Any help would be appreciated.
-Ashic. 



  


-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net
  


-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net
  

  

  

RE: Spark join and large temp files

2016-08-09 Thread Ashic Mahtab
Hi Sam,Yup. It seems it stalls when broadcasting. CPU goes to 100%, but there's 
no progress. The spark UI doesn't even show up.
-Ashic. 

From: samkiller@gmail.com
Date: Tue, 9 Aug 2016 16:21:27 +0200
Subject: Re: Spark join and large temp files
To: as...@live.com
CC: deepakmc...@gmail.com; user@spark.apache.org

Have you tried to broadcast your small table table in order to perform your 
join ?
joined = bigDF.join(broadcast(smallDF, )

On Tue, Aug 9, 2016 at 3:29 PM, Ashic Mahtab <as...@live.com> wrote:



Hi Deepak,No...not really. Upping the disk size is a solution, but more 
expensive as you can't attach EBS volumes to EMR clusters configured with data 
pipelines easily (which is what we're doing). I've tried collecting the 1.5G 
dataset in a hashmap, and broadcasting. Timeouts seems to prevent that (even 
after upping the max driver result size). Increasing partition counts didn't 
help (the shuffle used up the temp space). I'm now looking at some form of 
clever broadcasting, or maybe falling back to chunking up the input, producing 
interim output, and unioning them for the final output. Might even try using 
Spark Streaming pointing to the parquet and seeing if that helps. 
-Ashic. 

From: deepakmc...@gmail.com
Date: Tue, 9 Aug 2016 17:31:19 +0530
Subject: Re: Spark join and large temp files
To: as...@live.com

Hi AshicDid you find the resolution to this issue?Just curious to know like 
what helped in this scenario.
ThanksDeepak


On Tue, Aug 9, 2016 at 12:23 AM, Ashic Mahtab <as...@live.com> wrote:



Hi Deepak,Thanks for the response. 
Registering the temp tables didn't help. Here's what I have:
val a = sqlContext..read.parquet(...).select("eid.id", 
"name").withColumnRenamed("eid.id", "id")val b = 
sqlContext.read.parquet(...).select("id", "number")
a.registerTempTable("a")b.registerTempTable("b")
val results = sqlContext.sql("SELECT x.id, x.name, y.number FROM a x join b y 
on x.id=y.id)
results.write.parquet(...)
Is there something I'm missing?
Cheers,Ashic.
From: deepakmc...@gmail.com
Date: Tue, 9 Aug 2016 00:01:32 +0530
Subject: Re: Spark join and large temp files
To: as...@live.com
CC: user@spark.apache.org

Register you dataframes as temp tables and then try the join on the temp 
table.This should resolve your issue.
ThanksDeepak
On Mon, Aug 8, 2016 at 11:47 PM, Ashic Mahtab <as...@live.com> wrote:



Hello,We have two parquet inputs of the following form:
a: id:String, Name:String  (1.5TB)b: id:String, Number:Int  (1.3GB)
We need to join these two to get (id, Number, Name). We've tried two approaches:
a.join(b, Seq("id"), "right_outer")
where a and b are dataframes. We also tried taking the rdds, mapping them to 
pair rdds with id as the key, and then joining. What we're seeing is that temp 
file usage is increasing on the join stage, and filling up our disks, causing 
the job to crash. Is there a way to join these two data sets without 
well...crashing?
Note, the ids are unique, and there's a one to one mapping between the two 
datasets. 
Any help would be appreciated.
-Ashic. 



  


-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net
  


-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net
  

  

RE: Spark join and large temp files

2016-08-09 Thread Ashic Mahtab
Hi Deepak,No...not really. Upping the disk size is a solution, but more 
expensive as you can't attach EBS volumes to EMR clusters configured with data 
pipelines easily (which is what we're doing). I've tried collecting the 1.5G 
dataset in a hashmap, and broadcasting. Timeouts seems to prevent that (even 
after upping the max driver result size). Increasing partition counts didn't 
help (the shuffle used up the temp space). I'm now looking at some form of 
clever broadcasting, or maybe falling back to chunking up the input, producing 
interim output, and unioning them for the final output. Might even try using 
Spark Streaming pointing to the parquet and seeing if that helps. 
-Ashic. 

From: deepakmc...@gmail.com
Date: Tue, 9 Aug 2016 17:31:19 +0530
Subject: Re: Spark join and large temp files
To: as...@live.com

Hi AshicDid you find the resolution to this issue?Just curious to know like 
what helped in this scenario.
ThanksDeepak


On Tue, Aug 9, 2016 at 12:23 AM, Ashic Mahtab <as...@live.com> wrote:



Hi Deepak,Thanks for the response. 
Registering the temp tables didn't help. Here's what I have:
val a = sqlContext..read.parquet(...).select("eid.id", 
"name").withColumnRenamed("eid.id", "id")val b = 
sqlContext.read.parquet(...).select("id", "number")
a.registerTempTable("a")b.registerTempTable("b")
val results = sqlContext.sql("SELECT x.id, x.name, y.number FROM a x join b y 
on x.id=y.id)
results.write.parquet(...)
Is there something I'm missing?
Cheers,Ashic.
From: deepakmc...@gmail.com
Date: Tue, 9 Aug 2016 00:01:32 +0530
Subject: Re: Spark join and large temp files
To: as...@live.com
CC: user@spark.apache.org

Register you dataframes as temp tables and then try the join on the temp 
table.This should resolve your issue.
ThanksDeepak
On Mon, Aug 8, 2016 at 11:47 PM, Ashic Mahtab <as...@live.com> wrote:



Hello,We have two parquet inputs of the following form:
a: id:String, Name:String  (1.5TB)b: id:String, Number:Int  (1.3GB)
We need to join these two to get (id, Number, Name). We've tried two approaches:
a.join(b, Seq("id"), "right_outer")
where a and b are dataframes. We also tried taking the rdds, mapping them to 
pair rdds with id as the key, and then joining. What we're seeing is that temp 
file usage is increasing on the join stage, and filling up our disks, causing 
the job to crash. Is there a way to join these two data sets without 
well...crashing?
Note, the ids are unique, and there's a one to one mapping between the two 
datasets. 
Any help would be appreciated.
-Ashic. 



  


-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net
  


-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net
  

RE: Spark join and large temp files

2016-08-08 Thread Ashic Mahtab
Hi Deepak,Thanks for the response. 
Registering the temp tables didn't help. Here's what I have:
val a = sqlContext..read.parquet(...).select("eid.id", 
"name").withColumnRenamed("eid.id", "id")val b = 
sqlContext.read.parquet(...).select("id", "number")
a.registerTempTable("a")b.registerTempTable("b")
val results = sqlContext.sql("SELECT x.id, x.name, y.number FROM a x join b y 
on x.id=y.id)
results.write.parquet(...)
Is there something I'm missing?
Cheers,Ashic.
From: deepakmc...@gmail.com
Date: Tue, 9 Aug 2016 00:01:32 +0530
Subject: Re: Spark join and large temp files
To: as...@live.com
CC: user@spark.apache.org

Register you dataframes as temp tables and then try the join on the temp 
table.This should resolve your issue.
ThanksDeepak
On Mon, Aug 8, 2016 at 11:47 PM, Ashic Mahtab <as...@live.com> wrote:



Hello,We have two parquet inputs of the following form:
a: id:String, Name:String  (1.5TB)b: id:String, Number:Int  (1.3GB)
We need to join these two to get (id, Number, Name). We've tried two approaches:
a.join(b, Seq("id"), "right_outer")
where a and b are dataframes. We also tried taking the rdds, mapping them to 
pair rdds with id as the key, and then joining. What we're seeing is that temp 
file usage is increasing on the join stage, and filling up our disks, causing 
the job to crash. Is there a way to join these two data sets without 
well...crashing?
Note, the ids are unique, and there's a one to one mapping between the two 
datasets. 
Any help would be appreciated.
-Ashic. 



  


-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net
  

Spark join and large temp files

2016-08-08 Thread Ashic Mahtab
Hello,We have two parquet inputs of the following form:
a: id:String, Name:String  (1.5TB)b: id:String, Number:Int  (1.3GB)
We need to join these two to get (id, Number, Name). We've tried two approaches:
a.join(b, Seq("id"), "right_outer")
where a and b are dataframes. We also tried taking the rdds, mapping them to 
pair rdds with id as the key, and then joining. What we're seeing is that temp 
file usage is increasing on the join stage, and filling up our disks, causing 
the job to crash. Is there a way to join these two data sets without 
well...crashing?
Note, the ids are unique, and there's a one to one mapping between the two 
datasets. 
Any help would be appreciated.
-Ashic. 



  

RE: Cluster mode deployment from jar in S3

2016-07-04 Thread Ashic Mahtab
I've found a workaround. I set up an http server serving the jar, and pointed 
to the http url in spark submit.
Which brings me to ask would it be a good option to allow spark-submit to 
upload a local jar to the master, which the master can then serve via an http 
interface? The master already runs a web UI, so I imagine we could allow it to 
receive jars, and serve them as well. Perhaps an additional flag could be used 
to signify that the local jar should be uploaded in this manner? I'd be happy 
to take a stab at it...but thoughts?
-Ashic.

From: as...@live.com
To: lohith.sam...@mphasis.com; user@spark.apache.org
Subject: RE: Cluster mode deployment from jar in S3
Date: Mon, 4 Jul 2016 11:30:31 +0100




Hi Lohith,Thanks for the response.
The S3 bucket does have access restrictions, but the instances in which the 
Spark master and workers run have an IAM role policy that allows them access to 
it. As such, we don't really configure the cli with credentials...the IAM roles 
take care of that. Is there a way to make Spark work the same way? Or should I 
get temporary credentials somehow (like 
http://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_use-resources.html
 ), and use them to somehow submit the job? I guess I'll have to set it via 
environment variables; I can't put it in application code, as the issue is in 
downloading the jar from S3.
-Ashic.

From: lohith.sam...@mphasis.com
To: as...@live.com; user@spark.apache.org
Subject: RE: Cluster mode deployment from jar in S3
Date: Mon, 4 Jul 2016 09:50:50 +









Hi,
The
aws CLI already has your access key aid and secret access key when you 
initially configured it.
Is your s3 bucket without any access restrictions?
 
 

Best regards / Mit freundlichen Grüßen / Sincères salutations
M. Lohith Samaga
 

 


From: Ashic Mahtab [mailto:as...@live.com]


Sent: Monday, July 04, 2016 15.06

To: Apache Spark

Subject: RE: Cluster mode deployment from jar in S3


 

Sorry to do this...but... *bump*

 






From:
as...@live.com

To: user@spark.apache.org

Subject: Cluster mode deployment from jar in S3

Date: Fri, 1 Jul 2016 17:45:12 +0100

Hello,

I've got a Spark stand-alone cluster using EC2 instances. I can submit jobs 
using "--deploy-mode client", however using "--deploy-mode cluster" is proving 
to be a challenge. I've tries this:


 


spark-submit --class foo --master spark:://master-ip:7077 --deploy-mode cluster 
s3://bucket/dir/foo.jar


 


When I do this, I get:



16/07/01 16:23:16 ERROR ClientEndpoint: Exception from cluster was: 
java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key 
must be specified as the username or password
 (respectively) of a s3 URL, or by setting the fs.s3.awsAccessKeyId or 
fs.s3.awsSecretAccessKey properties (respectively).


java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key 
must be specified as the username or password (respectively) of a s3 URL, or by 
setting the fs.s3.awsAccessKeyId
 or fs.s3.awsSecretAccessKey properties (respectively).


at 
org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:66)


at 
org.apache.hadoop.fs.s3.Jets3tFileSystemStore.initialize(Jets3tFileSystemStore.java:82)


at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)


at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)


at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)


at java.lang.reflect.Method.invoke(Method.java:498)


at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:85)


at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:62)



 


 


Now I'm not using any S3 or hadoop stuff within my code (it's just an 
sc.parallelize(1 to 100)). So, I imagine it's the driver trying to fetch the 
jar. I haven't set the AWS Access Key Id and
 Secret as mentioned, but the role the machine's are in allow them to copy the 
jar. In other words, this works:


 


aws s3 cp s3://bucket/dir/foo.jar /tmp/foo.jar


 


I'm using Spark 1.6.2, and can't really think of what I can do so that I can 
submit the jar from s3 using cluster deploy mode. I've also tried simply 
downloading the jar onto a node, and spark-submitting
 that... that works in client mode, but I get a not found error when using 
cluster mode.


 


Any help will be appreciated.


 


Thanks,


Ashic.








Information transmitted by this e-mail is proprietary to Mphasis, its 
associated companies and/ or its customers and is intended 

for use only by the individual or entity to which it is addressed, and may 
contain information that is privileged, confidential or 

exempt from disclosure under applicable law. If you are not the intended 
recipient or it appears that this mail has been forwarded 

to you without p

RE: Cluster mode deployment from jar in S3

2016-07-04 Thread Ashic Mahtab
Hi Lohith,Thanks for the response.
The S3 bucket does have access restrictions, but the instances in which the 
Spark master and workers run have an IAM role policy that allows them access to 
it. As such, we don't really configure the cli with credentials...the IAM roles 
take care of that. Is there a way to make Spark work the same way? Or should I 
get temporary credentials somehow (like 
http://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_use-resources.html
 ), and use them to somehow submit the job? I guess I'll have to set it via 
environment variables; I can't put it in application code, as the issue is in 
downloading the jar from S3.
-Ashic.

From: lohith.sam...@mphasis.com
To: as...@live.com; user@spark.apache.org
Subject: RE: Cluster mode deployment from jar in S3
Date: Mon, 4 Jul 2016 09:50:50 +









Hi,
The
aws CLI already has your access key aid and secret access key when you 
initially configured it.
Is your s3 bucket without any access restrictions?
 
 

Best regards / Mit freundlichen Grüßen / Sincères salutations
M. Lohith Samaga
 

 


From: Ashic Mahtab [mailto:as...@live.com]


Sent: Monday, July 04, 2016 15.06

To: Apache Spark

Subject: RE: Cluster mode deployment from jar in S3


 

Sorry to do this...but... *bump*

 






From:
as...@live.com

To: user@spark.apache.org

Subject: Cluster mode deployment from jar in S3

Date: Fri, 1 Jul 2016 17:45:12 +0100

Hello,

I've got a Spark stand-alone cluster using EC2 instances. I can submit jobs 
using "--deploy-mode client", however using "--deploy-mode cluster" is proving 
to be a challenge. I've tries this:


 


spark-submit --class foo --master spark:://master-ip:7077 --deploy-mode cluster 
s3://bucket/dir/foo.jar


 


When I do this, I get:



16/07/01 16:23:16 ERROR ClientEndpoint: Exception from cluster was: 
java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key 
must be specified as the username or password
 (respectively) of a s3 URL, or by setting the fs.s3.awsAccessKeyId or 
fs.s3.awsSecretAccessKey properties (respectively).


java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key 
must be specified as the username or password (respectively) of a s3 URL, or by 
setting the fs.s3.awsAccessKeyId
 or fs.s3.awsSecretAccessKey properties (respectively).


at 
org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:66)


at 
org.apache.hadoop.fs.s3.Jets3tFileSystemStore.initialize(Jets3tFileSystemStore.java:82)


at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)


at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)


at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)


at java.lang.reflect.Method.invoke(Method.java:498)


at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:85)


at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:62)



 


 


Now I'm not using any S3 or hadoop stuff within my code (it's just an 
sc.parallelize(1 to 100)). So, I imagine it's the driver trying to fetch the 
jar. I haven't set the AWS Access Key Id and
 Secret as mentioned, but the role the machine's are in allow them to copy the 
jar. In other words, this works:


 


aws s3 cp s3://bucket/dir/foo.jar /tmp/foo.jar


 


I'm using Spark 1.6.2, and can't really think of what I can do so that I can 
submit the jar from s3 using cluster deploy mode. I've also tried simply 
downloading the jar onto a node, and spark-submitting
 that... that works in client mode, but I get a not found error when using 
cluster mode.


 


Any help will be appreciated.


 


Thanks,


Ashic.








Information transmitted by this e-mail is proprietary to Mphasis, its 
associated companies and/ or its customers and is intended 

for use only by the individual or entity to which it is addressed, and may 
contain information that is privileged, confidential or 

exempt from disclosure under applicable law. If you are not the intended 
recipient or it appears that this mail has been forwarded 

to you without proper authority, you are notified that any use or dissemination 
of this information in any manner is strictly 

prohibited. In such cases, please notify us immediately at 
mailmas...@mphasis.com and delete this mail from your records.
  

RE: Cluster mode deployment from jar in S3

2016-07-04 Thread Ashic Mahtab
Sorry to do this...but... *bump*
From: as...@live.com
To: user@spark.apache.org
Subject: Cluster mode deployment from jar in S3
Date: Fri, 1 Jul 2016 17:45:12 +0100




Hello,I've got a Spark stand-alone cluster using EC2 instances. I can submit 
jobs using "--deploy-mode client", however using "--deploy-mode cluster" is 
proving to be a challenge. I've tries this:
spark-submit --class foo --master spark:://master-ip:7077 --deploy-mode cluster 
s3://bucket/dir/foo.jar
When I do this, I get:

16/07/01 16:23:16 ERROR ClientEndpoint: Exception from cluster was: 
java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key 
must be specified as the username or password (respectively) of a s3 URL, or by 
setting the fs.s3.awsAccessKeyId or fs.s3.awsSecretAccessKey properties 
(respectively).java.lang.IllegalArgumentException: AWS Access Key ID and Secret 
Access Key must be specified as the username or password (respectively) of a s3 
URL, or by setting the fs.s3.awsAccessKeyId or fs.s3.awsSecretAccessKey 
properties (respectively).at 
org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:66)
at 
org.apache.hadoop.fs.s3.Jets3tFileSystemStore.initialize(Jets3tFileSystemStore.java:82)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)   
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:85)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:62)

Now I'm not using any S3 or hadoop stuff within my code (it's just an 
sc.parallelize(1 to 100)). So, I imagine it's the driver trying to fetch the 
jar. I haven't set the AWS Access Key Id and Secret as mentioned, but the role 
the machine's are in allow them to copy the jar. In other words, this works:
aws s3 cp s3://bucket/dir/foo.jar /tmp/foo.jar
I'm using Spark 1.6.2, and can't really think of what I can do so that I can 
submit the jar from s3 using cluster deploy mode. I've also tried simply 
downloading the jar onto a node, and spark-submitting that... that works in 
client mode, but I get a not found error when using cluster mode.
Any help will be appreciated.
Thanks,Ashic.   
  

Cluster mode deployment from jar in S3

2016-07-01 Thread Ashic Mahtab
Hello,I've got a Spark stand-alone cluster using EC2 instances. I can submit 
jobs using "--deploy-mode client", however using "--deploy-mode cluster" is 
proving to be a challenge. I've tries this:
spark-submit --class foo --master spark:://master-ip:7077 --deploy-mode cluster 
s3://bucket/dir/foo.jar
When I do this, I get:

16/07/01 16:23:16 ERROR ClientEndpoint: Exception from cluster was: 
java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key 
must be specified as the username or password (respectively) of a s3 URL, or by 
setting the fs.s3.awsAccessKeyId or fs.s3.awsSecretAccessKey properties 
(respectively).java.lang.IllegalArgumentException: AWS Access Key ID and Secret 
Access Key must be specified as the username or password (respectively) of a s3 
URL, or by setting the fs.s3.awsAccessKeyId or fs.s3.awsSecretAccessKey 
properties (respectively).at 
org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:66)
at 
org.apache.hadoop.fs.s3.Jets3tFileSystemStore.initialize(Jets3tFileSystemStore.java:82)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)   
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:85)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:62)

Now I'm not using any S3 or hadoop stuff within my code (it's just an 
sc.parallelize(1 to 100)). So, I imagine it's the driver trying to fetch the 
jar. I haven't set the AWS Access Key Id and Secret as mentioned, but the role 
the machine's are in allow them to copy the jar. In other words, this works:
aws s3 cp s3://bucket/dir/foo.jar /tmp/foo.jar
I'm using Spark 1.6.2, and can't really think of what I can do so that I can 
submit the jar from s3 using cluster deploy mode. I've also tried simply 
downloading the jar onto a node, and spark-submitting that... that works in 
client mode, but I get a not found error when using cluster mode.
Any help will be appreciated.
Thanks,Ashic. 

RE: Spark + HDFS

2016-04-19 Thread Ashic Mahtab
Spark will execute as a client for hdfs. In other words, it'll contact the 
hadoop master for the hdfs cluster, which will return the block info, and then 
the data will be fetched from the data nodes.

Date: Tue, 19 Apr 2016 14:00:31 +0530
Subject: Spark + HDFS
From: chaturvedich...@gmail.com
To: user@spark.apache.org

When I use spark and hdfs on two different clusters.How does spark workers know 
that which block of data is available in which hdfs node. Who basically caters 
to this.
Can someone throw light on this.  

RE: Logging in executors

2016-04-18 Thread Ashic Mahtab
I spent ages on this recently, and here's what I found:
--conf  
"spark.executor.extraJavaOptions=-Dlog4j.configuration=file:///local/file/on.executor.properties"
 
works. Alternatively, you can also do:
--conf  
"spark.executor.extraJavaOptions=-Dlog4j.configuration=filename.properties"  
--files="path/to/filename.properties"
log4j.properties files packaged with the application don't seem to have any 
effect. This is likely because log4j gets initialised before your app stuff is 
loaded. You can also reinitialise log4j logging as part of your application 
code. That also worked for us, but we went the extraJavaOptions route as it was 
less invasive on the application side.
-Ashic.

Date: Mon, 18 Apr 2016 10:32:03 -0300
Subject: Re: Logging in executors
From: cma...@despegar.com
To: yuzhih...@gmail.com
CC: user@spark.apache.org

Thanks Ted, already checked it but is not the same. I'm working with StandAlone 
spark, the examples refers to HDFS paths, therefore I assume Hadoop 2 Resource 
Manager is used. I've tried all possible flavours. The only one that worked was 
changing the spark-defaults.conf in every machine. I'll go with this by now, 
but the extra java opts for the executor are definitely not working, at least 
for logging configuration.

Thanks,-carlos.
On Fri, Apr 15, 2016 at 3:28 PM, Ted Yu  wrote:
See this thread: http://search-hadoop.com/m/q3RTtsFrd61q291j1
On Fri, Apr 15, 2016 at 5:38 AM, Carlos Rojas Matas  wrote:
Hi guys,
any clue on this? Clearly the 
spark.executor.extraJavaOpts=-Dlog4j.configuration is not working on the 
executors.
Thanks,-carlos.
On Wed, Apr 13, 2016 at 2:48 PM, Carlos Rojas Matas  wrote:
Hi Yong,
thanks for your response. As I said in my first email, I've tried both the 
reference to the classpath resource (env/dev/log4j-executor.properties) as the 
file:// protocol. Also, the driver logging is working fine and I'm using the 
same kind of reference.
Below the content of my classpath:


Plus this is the content of the exploded fat jar assembled with sbt assembly 
plugin:



This folder is at the root level of the classpath.
Thanks,-carlos.
On Wed, Apr 13, 2016 at 2:35 PM, Yong Zhang  wrote:



Is the env/dev/log4j-executor.properties file within your jar file? Is the path 
matching with what you specified as env/dev/log4j-executor.properties?
If you read the log4j document here: 
https://logging.apache.org/log4j/1.2/manual.html
When you specify the log4j.configuration=my_custom.properties, you have 2 
option:
1) the my_custom.properties has to be in the jar (or in the classpath). In your 
case, since you specify the package path, you need to make sure they are 
matched in your jar file2) use like 
log4j.configuration=file:///tmp/my_custom.properties. In this way, you need to 
make sure file my_custom.properties exists in /tmp folder on ALL of your worker 
nodes.
Yong

Date: Wed, 13 Apr 2016 14:18:24 -0300
Subject: Re: Logging in executors
From: cma...@despegar.com
To: yuzhih...@gmail.com
CC: user@spark.apache.org

Thanks for your response Ted. You're right, there was a typo. I changed it, now 
I'm executing:
bin/spark-submit --master spark://localhost:7077 --conf 
"spark.driver.extraJavaOptions=-Dlog4j.configuration=env/dev/log4j-driver.properties"
 --conf 
"spark.executor.extraJavaOptions=-Dlog4j.configuration=env/dev/log4j-executor.properties"
 --class
The content of this file is:
# Set everything to be logged to the consolelog4j.rootCategory=INFO, 
FILElog4j.appender.console=org.apache.log4j.ConsoleAppenderlog4j.appender.console.target=System.errlog4j.appender.console.layout=org.apache.log4j.PatternLayoutlog4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd
 HH:mm:ss} %p %c{1}: %m%n
log4j.appender.FILE=org.apache.log4j.RollingFileAppenderlog4j.appender.FILE.File=/tmp/executor.loglog4j.appender.FILE.ImmediateFlush=truelog4j.appender.FILE.Threshold=debuglog4j.appender.FILE.Append=truelog4j.appender.FILE.MaxFileSize=100MBlog4j.appender.FILE.MaxBackupIndex=5log4j.appender.FILE.layout=org.apache.log4j.PatternLayoutlog4j.appender.FILE.layout.ConversionPattern=%d{yy/MM/dd
 HH:mm:ss} %p %c{1}: %m%n
# Settings to quiet third party logs that are too 
verboselog4j.logger.org.spark-project.jetty=WARNlog4j.logger.org.spark-project.jetty.util.component.AbstractLifeCycle=ERRORlog4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFOlog4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFOlog4j.logger.org.apache.parquet=ERRORlog4j.logger.parquet=ERRORlog4j.logger.com.despegar.p13n=DEBUG
# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent 
UDFs in SparkSQL with Hive 
supportlog4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATALlog4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR

Finally, the code on which I'm using logging in the executor is:
def groupAndCount(keys: DStream[(String, List[String])])(handler: 

RE: ML Random Forest Classifier

2016-04-13 Thread Ashic Mahtab
It looks like all of that is building up to spark 2.0 (for random forests / 
gbts / etc.). Ah well...thanks for your help. Was interesting digging into the 
depths.

Date: Wed, 13 Apr 2016 09:48:32 +0100
Subject: Re: ML Random Forest Classifier
From: ja...@gluru.co
To: as...@live.com
CC: user@spark.apache.org

Hi Ashic,
Unfortunately I don't know how to work around that - I suggested this line as 
it looked promising (I had considered it once before deciding to use a 
different algorithm) but I never actually tried it. 
Regards,
James
On 13 April 2016 at 02:29, Ashic Mahtab <as...@live.com> wrote:



It looks like the issue is around impurity stats. After converting an rf model 
to old, and back to new (without disk storage or anything), and specifying the 
same num of features, same categorical features map, etc., 
DecisionTreeClassifier::predictRaw throws a null pointer exception here:
 override protected def predictRaw(features: Vector): Vector = {
Vectors.dense(rootNode.predictImpl(features).impurityStats.stats.clone())  }
It appears impurityStats is always null (even though impurity does have a 
value). Any known workarounds? It's looking like I'll have to revert to using 
mllib instead :(
-Ashic.
From: as...@live.com
To: ja...@gluru.co
CC: user@spark.apache.org
Subject: RE: ML Random Forest Classifier
Date: Wed, 13 Apr 2016 02:20:53 +0100




I managed to get to the map using MetadataUtils (it's a private ml package). 
There are still some issues, around feature names, etc. Trying to pin them down.

From: as...@live.com
To: ja...@gluru.co
CC: user@spark.apache.org
Subject: RE: ML Random Forest Classifier
Date: Wed, 13 Apr 2016 00:50:31 +0100




Hi James,Following on from the previous email, is there a way to get the 
categoricalFeatures of a Spark ML Random Forest? Essentially something I can 
pass to
RandomForestClassificationModel.fromOld(oldModel, parent, categoricalFeatures, 
numClasses, numFeatures)
I could construct it by hand, but I was hoping for a more automated way of 
getting the map. Since the trained model already knows about the value, perhaps 
it's possible to grab it for storage?
Thanks,Ashic.

From: as...@live.com
To: ja...@gluru.co
CC: user@spark.apache.org
Subject: RE: ML Random Forest Classifier
Date: Mon, 11 Apr 2016 23:21:53 +0100




Thanks, James. That looks promising. 

Date: Mon, 11 Apr 2016 10:41:07 +0100
Subject: Re: ML Random Forest Classifier
From: ja...@gluru.co
To: as...@live.com
CC: user@spark.apache.org

To add a bit more detail perhaps something like this might work:








package org.apache.spark.ml




import org.apache.spark.ml.classification.RandomForestClassificationModel

import org.apache.spark.ml.classification.DecisionTreeClassificationModel

import org.apache.spark.ml.classification.LogisticRegressionModel

import org.apache.spark.mllib.tree.model.{ RandomForestModel => 
OldRandomForestModel }

import org.apache.spark.ml.classification.RandomForestClassifier




object RandomForestModelConverter {




  def fromOld(oldModel: OldRandomForestModel, parent: RandomForestClassifier = 
null,

categoricalFeatures: Map[Int, Int], numClasses: Int, numFeatures: Int = 
-1): RandomForestClassificationModel = {

RandomForestClassificationModel.fromOld(oldModel, parent, 
categoricalFeatures, numClasses, numFeatures)

  }




  def toOld(newModel: RandomForestClassificationModel): OldRandomForestModel = {

newModel.toOld

  }

}

Regards,
James 
On 11 April 2016 at 10:36, James Hammerton <ja...@gluru.co> wrote:
There are methods for converting the dataframe based random forest models to 
the old RDD based models and vice versa. Perhaps using these will help given 
that the old models can be saved and loaded?
In order to use them however you will need to write code in the 
org.apache.spark.ml package.
I've not actually tried doing this myself but it looks as if it might work.
Regards,
James








On 11 April 2016 at 10:29, Ashic Mahtab <as...@live.com> wrote:



Hello,I'm trying to save a pipeline with a random forest classifier. If I try 
to save the pipeline, it complains that the classifier is not Writable, and 
indeed the classifier itself doesn't have a write function. There's a pull 
request that's been merged that enables this for Spark 2.0 (any dates around 
when that'll release?). I am, however, using the Spark Cassandra Connector 
which doesn't seem to be able to create a CqlContext with spark 2.0 snapshot 
builds. Seeing that ML Lib's random forest classifier supports storing and 
loading models, is there a way to create a Spark ML pipeline in Spark 1.6 with 
a random forest classifier that'll allow me to store and load the model? The 
model takes significant amount of time to train, and I really don't want to 
have to train it every time my application launches.
Thanks,Ashic. 





  

  

RE: ML Random Forest Classifier

2016-04-12 Thread Ashic Mahtab
It looks like the issue is around impurity stats. After converting an rf model 
to old, and back to new (without disk storage or anything), and specifying the 
same num of features, same categorical features map, etc., 
DecisionTreeClassifier::predictRaw throws a null pointer exception here:
 override protected def predictRaw(features: Vector): Vector = {
Vectors.dense(rootNode.predictImpl(features).impurityStats.stats.clone())  }
It appears impurityStats is always null (even though impurity does have a 
value). Any known workarounds? It's looking like I'll have to revert to using 
mllib instead :(
-Ashic.
From: as...@live.com
To: ja...@gluru.co
CC: user@spark.apache.org
Subject: RE: ML Random Forest Classifier
Date: Wed, 13 Apr 2016 02:20:53 +0100




I managed to get to the map using MetadataUtils (it's a private ml package). 
There are still some issues, around feature names, etc. Trying to pin them down.

From: as...@live.com
To: ja...@gluru.co
CC: user@spark.apache.org
Subject: RE: ML Random Forest Classifier
Date: Wed, 13 Apr 2016 00:50:31 +0100




Hi James,Following on from the previous email, is there a way to get the 
categoricalFeatures of a Spark ML Random Forest? Essentially something I can 
pass to
RandomForestClassificationModel.fromOld(oldModel, parent, categoricalFeatures, 
numClasses, numFeatures)
I could construct it by hand, but I was hoping for a more automated way of 
getting the map. Since the trained model already knows about the value, perhaps 
it's possible to grab it for storage?
Thanks,Ashic.

From: as...@live.com
To: ja...@gluru.co
CC: user@spark.apache.org
Subject: RE: ML Random Forest Classifier
Date: Mon, 11 Apr 2016 23:21:53 +0100




Thanks, James. That looks promising. 

Date: Mon, 11 Apr 2016 10:41:07 +0100
Subject: Re: ML Random Forest Classifier
From: ja...@gluru.co
To: as...@live.com
CC: user@spark.apache.org

To add a bit more detail perhaps something like this might work:








package org.apache.spark.ml




import org.apache.spark.ml.classification.RandomForestClassificationModel

import org.apache.spark.ml.classification.DecisionTreeClassificationModel

import org.apache.spark.ml.classification.LogisticRegressionModel

import org.apache.spark.mllib.tree.model.{ RandomForestModel => 
OldRandomForestModel }

import org.apache.spark.ml.classification.RandomForestClassifier




object RandomForestModelConverter {




  def fromOld(oldModel: OldRandomForestModel, parent: RandomForestClassifier = 
null,

categoricalFeatures: Map[Int, Int], numClasses: Int, numFeatures: Int = 
-1): RandomForestClassificationModel = {

RandomForestClassificationModel.fromOld(oldModel, parent, 
categoricalFeatures, numClasses, numFeatures)

  }




  def toOld(newModel: RandomForestClassificationModel): OldRandomForestModel = {

newModel.toOld

  }

}

Regards,
James 
On 11 April 2016 at 10:36, James Hammerton <ja...@gluru.co> wrote:
There are methods for converting the dataframe based random forest models to 
the old RDD based models and vice versa. Perhaps using these will help given 
that the old models can be saved and loaded?
In order to use them however you will need to write code in the 
org.apache.spark.ml package.
I've not actually tried doing this myself but it looks as if it might work.
Regards,
James








On 11 April 2016 at 10:29, Ashic Mahtab <as...@live.com> wrote:



Hello,I'm trying to save a pipeline with a random forest classifier. If I try 
to save the pipeline, it complains that the classifier is not Writable, and 
indeed the classifier itself doesn't have a write function. There's a pull 
request that's been merged that enables this for Spark 2.0 (any dates around 
when that'll release?). I am, however, using the Spark Cassandra Connector 
which doesn't seem to be able to create a CqlContext with spark 2.0 snapshot 
builds. Seeing that ML Lib's random forest classifier supports storing and 
loading models, is there a way to create a Spark ML pipeline in Spark 1.6 with 
a random forest classifier that'll allow me to store and load the model? The 
model takes significant amount of time to train, and I really don't want to 
have to train it every time my application launches.
Thanks,Ashic. 





  

RE: ML Random Forest Classifier

2016-04-12 Thread Ashic Mahtab
I managed to get to the map using MetadataUtils (it's a private ml package). 
There are still some issues, around feature names, etc. Trying to pin them down.

From: as...@live.com
To: ja...@gluru.co
CC: user@spark.apache.org
Subject: RE: ML Random Forest Classifier
Date: Wed, 13 Apr 2016 00:50:31 +0100




Hi James,Following on from the previous email, is there a way to get the 
categoricalFeatures of a Spark ML Random Forest? Essentially something I can 
pass to
RandomForestClassificationModel.fromOld(oldModel, parent, categoricalFeatures, 
numClasses, numFeatures)
I could construct it by hand, but I was hoping for a more automated way of 
getting the map. Since the trained model already knows about the value, perhaps 
it's possible to grab it for storage?
Thanks,Ashic.

From: as...@live.com
To: ja...@gluru.co
CC: user@spark.apache.org
Subject: RE: ML Random Forest Classifier
Date: Mon, 11 Apr 2016 23:21:53 +0100




Thanks, James. That looks promising. 

Date: Mon, 11 Apr 2016 10:41:07 +0100
Subject: Re: ML Random Forest Classifier
From: ja...@gluru.co
To: as...@live.com
CC: user@spark.apache.org

To add a bit more detail perhaps something like this might work:








package org.apache.spark.ml




import org.apache.spark.ml.classification.RandomForestClassificationModel

import org.apache.spark.ml.classification.DecisionTreeClassificationModel

import org.apache.spark.ml.classification.LogisticRegressionModel

import org.apache.spark.mllib.tree.model.{ RandomForestModel => 
OldRandomForestModel }

import org.apache.spark.ml.classification.RandomForestClassifier




object RandomForestModelConverter {




  def fromOld(oldModel: OldRandomForestModel, parent: RandomForestClassifier = 
null,

categoricalFeatures: Map[Int, Int], numClasses: Int, numFeatures: Int = 
-1): RandomForestClassificationModel = {

RandomForestClassificationModel.fromOld(oldModel, parent, 
categoricalFeatures, numClasses, numFeatures)

  }




  def toOld(newModel: RandomForestClassificationModel): OldRandomForestModel = {

newModel.toOld

  }

}

Regards,
James 
On 11 April 2016 at 10:36, James Hammerton <ja...@gluru.co> wrote:
There are methods for converting the dataframe based random forest models to 
the old RDD based models and vice versa. Perhaps using these will help given 
that the old models can be saved and loaded?
In order to use them however you will need to write code in the 
org.apache.spark.ml package.
I've not actually tried doing this myself but it looks as if it might work.
Regards,
James








On 11 April 2016 at 10:29, Ashic Mahtab <as...@live.com> wrote:



Hello,I'm trying to save a pipeline with a random forest classifier. If I try 
to save the pipeline, it complains that the classifier is not Writable, and 
indeed the classifier itself doesn't have a write function. There's a pull 
request that's been merged that enables this for Spark 2.0 (any dates around 
when that'll release?). I am, however, using the Spark Cassandra Connector 
which doesn't seem to be able to create a CqlContext with spark 2.0 snapshot 
builds. Seeing that ML Lib's random forest classifier supports storing and 
loading models, is there a way to create a Spark ML pipeline in Spark 1.6 with 
a random forest classifier that'll allow me to store and load the model? The 
model takes significant amount of time to train, and I really don't want to 
have to train it every time my application launches.
Thanks,Ashic. 




  

RE: ML Random Forest Classifier

2016-04-12 Thread Ashic Mahtab
Hi James,Following on from the previous email, is there a way to get the 
categoricalFeatures of a Spark ML Random Forest? Essentially something I can 
pass to
RandomForestClassificationModel.fromOld(oldModel, parent, categoricalFeatures, 
numClasses, numFeatures)
I could construct it by hand, but I was hoping for a more automated way of 
getting the map. Since the trained model already knows about the value, perhaps 
it's possible to grab it for storage?
Thanks,Ashic.

From: as...@live.com
To: ja...@gluru.co
CC: user@spark.apache.org
Subject: RE: ML Random Forest Classifier
Date: Mon, 11 Apr 2016 23:21:53 +0100




Thanks, James. That looks promising. 

Date: Mon, 11 Apr 2016 10:41:07 +0100
Subject: Re: ML Random Forest Classifier
From: ja...@gluru.co
To: as...@live.com
CC: user@spark.apache.org

To add a bit more detail perhaps something like this might work:








package org.apache.spark.ml




import org.apache.spark.ml.classification.RandomForestClassificationModel

import org.apache.spark.ml.classification.DecisionTreeClassificationModel

import org.apache.spark.ml.classification.LogisticRegressionModel

import org.apache.spark.mllib.tree.model.{ RandomForestModel => 
OldRandomForestModel }

import org.apache.spark.ml.classification.RandomForestClassifier




object RandomForestModelConverter {




  def fromOld(oldModel: OldRandomForestModel, parent: RandomForestClassifier = 
null,

categoricalFeatures: Map[Int, Int], numClasses: Int, numFeatures: Int = 
-1): RandomForestClassificationModel = {

RandomForestClassificationModel.fromOld(oldModel, parent, 
categoricalFeatures, numClasses, numFeatures)

  }




  def toOld(newModel: RandomForestClassificationModel): OldRandomForestModel = {

newModel.toOld

  }

}

Regards,
James 
On 11 April 2016 at 10:36, James Hammerton <ja...@gluru.co> wrote:
There are methods for converting the dataframe based random forest models to 
the old RDD based models and vice versa. Perhaps using these will help given 
that the old models can be saved and loaded?
In order to use them however you will need to write code in the 
org.apache.spark.ml package.
I've not actually tried doing this myself but it looks as if it might work.
Regards,
James








On 11 April 2016 at 10:29, Ashic Mahtab <as...@live.com> wrote:



Hello,I'm trying to save a pipeline with a random forest classifier. If I try 
to save the pipeline, it complains that the classifier is not Writable, and 
indeed the classifier itself doesn't have a write function. There's a pull 
request that's been merged that enables this for Spark 2.0 (any dates around 
when that'll release?). I am, however, using the Spark Cassandra Connector 
which doesn't seem to be able to create a CqlContext with spark 2.0 snapshot 
builds. Seeing that ML Lib's random forest classifier supports storing and 
loading models, is there a way to create a Spark ML pipeline in Spark 1.6 with 
a random forest classifier that'll allow me to store and load the model? The 
model takes significant amount of time to train, and I really don't want to 
have to train it every time my application launches.
Thanks,Ashic. 




  

RE: ML Random Forest Classifier

2016-04-11 Thread Ashic Mahtab
Thanks, James. That looks promising. 

Date: Mon, 11 Apr 2016 10:41:07 +0100
Subject: Re: ML Random Forest Classifier
From: ja...@gluru.co
To: as...@live.com
CC: user@spark.apache.org

To add a bit more detail perhaps something like this might work:








package org.apache.spark.ml




import org.apache.spark.ml.classification.RandomForestClassificationModel

import org.apache.spark.ml.classification.DecisionTreeClassificationModel

import org.apache.spark.ml.classification.LogisticRegressionModel

import org.apache.spark.mllib.tree.model.{ RandomForestModel => 
OldRandomForestModel }

import org.apache.spark.ml.classification.RandomForestClassifier




object RandomForestModelConverter {




  def fromOld(oldModel: OldRandomForestModel, parent: RandomForestClassifier = 
null,

categoricalFeatures: Map[Int, Int], numClasses: Int, numFeatures: Int = 
-1): RandomForestClassificationModel = {

RandomForestClassificationModel.fromOld(oldModel, parent, 
categoricalFeatures, numClasses, numFeatures)

  }




  def toOld(newModel: RandomForestClassificationModel): OldRandomForestModel = {

newModel.toOld

  }

}

Regards,
James 
On 11 April 2016 at 10:36, James Hammerton <ja...@gluru.co> wrote:
There are methods for converting the dataframe based random forest models to 
the old RDD based models and vice versa. Perhaps using these will help given 
that the old models can be saved and loaded?
In order to use them however you will need to write code in the 
org.apache.spark.ml package.
I've not actually tried doing this myself but it looks as if it might work.
Regards,
James








On 11 April 2016 at 10:29, Ashic Mahtab <as...@live.com> wrote:



Hello,I'm trying to save a pipeline with a random forest classifier. If I try 
to save the pipeline, it complains that the classifier is not Writable, and 
indeed the classifier itself doesn't have a write function. There's a pull 
request that's been merged that enables this for Spark 2.0 (any dates around 
when that'll release?). I am, however, using the Spark Cassandra Connector 
which doesn't seem to be able to create a CqlContext with spark 2.0 snapshot 
builds. Seeing that ML Lib's random forest classifier supports storing and 
loading models, is there a way to create a Spark ML pipeline in Spark 1.6 with 
a random forest classifier that'll allow me to store and load the model? The 
model takes significant amount of time to train, and I really don't want to 
have to train it every time my application launches.
Thanks,Ashic. 



  

ML Random Forest Classifier

2016-04-11 Thread Ashic Mahtab
Hello,I'm trying to save a pipeline with a random forest classifier. If I try 
to save the pipeline, it complains that the classifier is not Writable, and 
indeed the classifier itself doesn't have a write function. There's a pull 
request that's been merged that enables this for Spark 2.0 (any dates around 
when that'll release?). I am, however, using the Spark Cassandra Connector 
which doesn't seem to be able to create a CqlContext with spark 2.0 snapshot 
builds. Seeing that ML Lib's random forest classifier supports storing and 
loading models, is there a way to create a Spark ML pipeline in Spark 1.6 with 
a random forest classifier that'll allow me to store and load the model? The 
model takes significant amount of time to train, and I really don't want to 
have to train it every time my application launches.
Thanks,Ashic. 

RE: Spark on Mobile platforms

2016-04-07 Thread Ashic Mahtab
Spark may not be the right tool for this. Working on just the mobile device, 
you won't be scaling out stuff, and as such most of the benefits of Spark would 
be nullified. Moreover, it'd likely run slower than things that are meant to 
work in a single process. Spark is also quite large, which is another drawback 
in terms of mobile apps.
Perhaps check out Tensorflow, which may be better suited for this particular 
requirement.
-Ashic.

> Date: Thu, 7 Apr 2016 04:50:18 -0700
> From: sbarbhuiy...@qub.ac.uk
> To: user@spark.apache.org
> Subject: Spark on Mobile platforms
> 
> Hi all,
> 
> I have been trying to find if Spark can be run on a mobile device platform
> (Android preferably) to analyse mobile log data for some performance
> analysis. So, basically the idea is to collect and process the mobile log
> data within the mobile device using the Spark framework to allow real-time
> log data analysis, without offloading the data to remote server/cloud.
> 
> Does anybody have any idea about whether running Spark on a mobile platform
> is supported by the existing Spark framework or is there any other option
> for this?
> 
> Many thanks.
> Sakil
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-on-Mobile-platforms-tp26699.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 
  

Additional classpaths / java options

2016-03-22 Thread Ashic Mahtab
Hello,Is it possible to specify additional class paths / java options "in 
addition to" those specified in spark-defaults.conf? I see that if I specify 
spark.executor.extraJavaOptions or spark.executor.extraClassPaths in defaults, 
and then specify --conf 
"spark.executor.extraJavaOptions=-Dlog4.configuration=file:///some.properties" 
or --conf "spark.executor.extraClassPaths=/usr/lib/something/*" when spark 
submitting, then the spark-submit-passed value completely replaces anything in 
defaults. Is there a way to add some java options and class paths by default, 
and have the spark-submit passed ones be used in addition to the defaults?
Thanks,Ashic. 

RE: log4j pains

2016-03-10 Thread Ashic Mahtab
src/main/resources/log4j.properties

Subject: Re: log4j pains
From: st...@memeticlabs.org
Date: Thu, 10 Mar 2016 11:08:46 -0600
CC: user@spark.apache.org
To: as...@live.com

Where in the jar is the log4j.properties file?
On Mar 10, 2016, at 9:40 AM, Ashic Mahtab <as...@live.com> wrote:1. Fat jar 
with logging dependencies included. log4j.properties in fat jar. Spark doesn't 
pick up the properties file, so uses its defaults.
  

log4j pains

2016-03-10 Thread Ashic Mahtab
Hello,I'm trying to use a custom log4j appender, with things specified in a 
log4j.properties file. Very little seems to work in this regard. Here's what 
I've tried:
1. Fat jar with logging dependencies included. log4j.properties in fat jar. 
Spark doesn't pick up the properties file, so uses its defaults.
2. Fat jar with logging dependencies included. Submitted the properties file 
like this:
 spark-submit  --files ./log4j.properties   --conf 
'spark.executor.extraJavaOptions=-Dlog4j.configuration=log4j.properties' 
fat.jarJust uses defaults. Doesn't use the submitted file.
3. Fat jar with logging dependencies included. Submitted the properties file 
like this:
 spark-submit  --files ./log4j.properties   --conf 
'spark.executor.extraJavaOptions=-Dlog4j.configuration=file:log4j.properties' 
fat.jarCrashes. Says log4j.properties file not found.
4. Fat jar with logging dependencies included. Submitted the properties file 
like this:
 spark-submit  --files ./log4j.properties   --conf 
'spark.executor.extraJavaOptions=-Dlog4j.configuration=file:/localfolder/log4j.properties'
 fat.jarCrashes. Says it could not find the appender stuff (JSonEventLayoutV1). 
That's something that's provided in the fat jar.
5. Fat jar with logging dependencies included. Submitted the properties file 
and logging jars like this: spark-submit  --files ./log4j.properties 
--jars 'comma,separated,list'   --conf 
'spark.executor.extraJavaOptions=-Dlog4j.configuration=file:/localfolder/log4j.properties'
 fat.jarCrashes. Says it could not find the appender stuff (JSonEventLayoutV1). 
That's something that's provided in the fat jar.

It seems only including the jars in extra class path is working. There seems to 
be a somewhat related issue here: 
https://issues.apache.org/jira/browse/SPARK-9826 , but that seems to have been 
resolved in 1.4.1. I'm using 1.5.2. Is this a regression? Or am I missing 
something?
Any help will be appreciated.
Thanks,Ashic. 

RE: Specify log4j properties file

2016-03-09 Thread Ashic Mahtab
Found it. 
You can pass in the jvm parameter log4j.configuration. The following works:
-Dlog4j.configuration=file:path/to/log4j.properties
It doesn't work without the file: prefix though. Tested in 1.6.0.
Cheers,Ashic.

From: as...@live.com
To: user@spark.apache.org
Subject: Specify log4j properties file
Date: Wed, 9 Mar 2016 17:57:00 +




Hello,Is it possible to provide a log4j properties file when submitting jobs to 
a cluster? I know that by default spark looks for a log4j.properties file in 
the conf directory. I'm looking for a way to specify a different 
log4j.properties file (external to the application) without pointing to a 
completely different conf directory. Is there a way to achieve this?
Thanks,Ashic.   
  

Specify log4j properties file

2016-03-09 Thread Ashic Mahtab
Hello,Is it possible to provide a log4j properties file when submitting jobs to 
a cluster? I know that by default spark looks for a log4j.properties file in 
the conf directory. I'm looking for a way to specify a different 
log4j.properties file (external to the application) without pointing to a 
completely different conf directory. Is there a way to achieve this?
Thanks,Ashic. 

RE: Batch together RDDs for Streaming output, without delaying execution of map or transform functions

2015-12-31 Thread Ashic Mahtab
Hi Ewan,Transforms are definitions of what needs to be done - they don't 
execute until and action is triggered. For what you want, I think you might 
need to have an action that writes out rdds to some sort of buffered writer. 
-Ashic.

From: ewan.le...@realitymine.com
To: user@spark.apache.org
Subject: Batch together RDDs for Streaming output, without delaying execution 
of map or transform functions
Date: Thu, 31 Dec 2015 11:35:37 +









Hi all,
 
I’m sure this must have been solved already, but I can’t see anything obvious.
 
Using Spark Streaming, I’m trying to execute a transform function on a DStream 
at short batch intervals (e.g. 1 second), but only write the resulting data to 
disk using saveAsTextFiles in a larger batch after a longer delay (say 60 
seconds).
 
I thought the ReceiverInputDStream window function might be a good help here, 
but instead, applying it to a transformed DStream causes the transform function 
to only execute at the end of the window too.
 
Has anyone got a solution to this?
 
Thanks,
Ewan
 
 
 
  

Working offline with spark-core and sbt

2015-12-30 Thread Ashic Mahtab
Hello,I'm trying to work offline with spark-core. I've got an empty project 
with the following:
name := "sbtSand"

version := "1.0"

scalaVersion := "2.11.7"

libraryDependencies ++= Seq(
"joda-time" % "joda-time" % "2.9.1",
"org.apache.spark" %% "spark-core" % "1.5.2"
)
I can "sbt compile" this. But if I go offline, and "sbt clean", then "sbt 
compile", then it fails. If I remove the spark-core dependency (but leave 
joda-time in), then "sbt compile" succeeds, and the package from the ivy2 cache 
is used. I also added scalatest, and that works offline (assuming the package 
is in ~/.ivy2/cache). However, I can't find a way to work offline with 
spark-core. Is there a simple way to get this working?

Crossposted here from 
http://stackoverflow.com/questions/34537886/sbt-ivy-offline-work-and-weirdness 
as I'm wondering if somebody working with Spark has found a solution.
Thanks,Ashic. 

RE: Working offline with spark-core and sbt

2015-12-30 Thread Ashic Mahtab
To answer my own question, it appears certain tihngs (like parents, etc.) 
caused the issue. I was using sbt 0.13.8. Using 0.13.9 works fine.

From: as...@live.com
To: user@spark.apache.org
Subject: Working offline with spark-core and sbt
Date: Thu, 31 Dec 2015 02:07:26 +




Hello,I'm trying to work offline with spark-core. I've got an empty project 
with the following:
name := "sbtSand"

version := "1.0"

scalaVersion := "2.11.7"

libraryDependencies ++= Seq(
"joda-time" % "joda-time" % "2.9.1",
"org.apache.spark" %% "spark-core" % "1.5.2"
)
I can "sbt compile" this. But if I go offline, and "sbt clean", then "sbt 
compile", then it fails. If I remove the spark-core dependency (but leave 
joda-time in), then "sbt compile" succeeds, and the package from the ivy2 cache 
is used. I also added scalatest, and that works offline (assuming the package 
is in ~/.ivy2/cache). However, I can't find a way to work offline with 
spark-core. Is there a simple way to get this working?

Crossposted here from 
http://stackoverflow.com/questions/34537886/sbt-ivy-offline-work-and-weirdness 
as I'm wondering if somebody working with Spark has found a solution.
Thanks,Ashic.   
  

RE: Spark Broadcasting large dataset

2015-07-10 Thread Ashic Mahtab
When you say tasks, do you mean different applications, or different tasks in 
the same application? If it's the same program, they should be able to share 
the broadcasted value. But given you're asking the question, I imagine they're 
separate.

And in that case, afaik, the answer is no. You might look into putting the data 
into a fast store like Cassandra - that might help depending on your use case. 

Cheers,
Ashic.

Date: Fri, 10 Jul 2015 15:52:42 +0200
From: huan...@cebitec.uni-bielefeld.de
To: user@spark.apache.org
Subject: Spark Broadcasting large dataset


Hey, Guys!
I am using spark for NGS data application.
In my case I have to broadcast a very big dataset to each task.  
However there are serveral tasks (say 48 tasks) running on cpus (also 48 cores) 
in the same node. These tasks, who run on the same node, could share the same 
dataset. But spark broadcast them 48 times (if I understand correctly). Is 
there a way to broadcast just one copy for each node and share by all tasks 
running on such nodes? 
Much appreciated!
best!


huanglr   

RE: JDBC Streams

2015-07-05 Thread Ashic Mahtab
Hi Ayan,How continuous is your workload? As Akhil points out, with streaming, 
you'll give up at least one core for receiving, will need at most one more core 
for processing. Unless you're running on something like Mesos, this means that 
those cores are dedicated to your app, and can't be leveraged by other apps / 
jobs.
If it's something periodic (once an hour, once every 15 minutes, etc.), then 
I'd simply write a normal spark application, and trigger it periodically. 
There are many things that can take care of that - sometimes a simple cronjob 
is enough!

Date: Sun, 5 Jul 2015 22:48:37 +1000
Subject: Re: JDBC Streams
From: guha.a...@gmail.com
To: ak...@sigmoidanalytics.com
CC: user@spark.apache.org

Thanks Akhil. In case I go with spark streaming, I guess I have to implment a 
custom receiver and spark streaming will call this receiver every batch 
interval, is that correct? Any gotcha you see in this plan? TIA...Best, Ayan

On Sun, Jul 5, 2015 at 5:40 PM, Akhil Das ak...@sigmoidanalytics.com wrote:
If you want a long running application, then go with spark streaming (which 
kind of blocks your resources). On the other hand, if you use job server then 
you can actually use the resources (CPUs) for other jobs also when your dbjob 
is not using them.ThanksBest Regards

On Sun, Jul 5, 2015 at 5:28 AM, ayan guha guha.a...@gmail.com wrote:
Hi All

I have a requireent to connect to a DB every few minutes and bring data to 
HBase. Can anyone suggest if spark streaming would be appropriate for this 
senario or I shoud look into jobserver?

Thanks in advance
-- 
Best Regards,
Ayan Guha






-- 
Best Regards,
Ayan Guha

  

RE: JDBC Streams

2015-07-05 Thread Ashic Mahtab
If it is indeed a reactive use case, then Spark Streaming would be a good 
choice. 
One approach worth considering - is it possible to receive a message via kafka 
(or some other queue). That'd not need any polling, and you could use standard 
consumers. If polling isn't an issue, then writing a custom receiver will work 
fine. The way a receiver works is this:
* Your receiver has a receive() function, where you'd typically start a loop. 
In your loop, you'd fetch items, and call store(entry). * You control 
everything in the receiver. If you're listening on a queue, you receive 
messages, store() and ack your queue. If you're polling, it's up to you to 
ensure delays between db calls.* The things you store() go on to make up the 
rdds in your DStream. So, intervals, windowing, etc. apply to those. The 
receiver is the boundary between your data source and the DStream RDDs. In 
other words, if your interval is 15 seconds with no windowing, then the things 
that went to store() every 15 seconds are bunched up into an RDD of your 
DStream. That's kind of a simplification, but should give you the idea that 
your db polling interval and streaming interval are not tied together.
-Ashic.

Date: Mon, 6 Jul 2015 01:12:34 +1000
Subject: Re: JDBC Streams
From: guha.a...@gmail.com
To: as...@live.com
CC: ak...@sigmoidanalytics.com; user@spark.apache.org

Hi

Thanks for the reply. here is my situation: I hve a DB which enbles synchronus 
CDC, think this as a DBtrigger which writes to a taable with changed values 
as soon as something changes in production table. My job will need to pick up 
the data as soon as it arrives which can be every 1 min interval. Ideally it 
will pick up the changes, transform it into a jsonand puts it to kinesis. In 
short, I am emulating a Kinesis producer with a DB source (dont even ask why, 
lets say these are the constraints :) )

Please advice (a) is spark a good choice here (b)  whats your suggestion either 
way.

I understand I can easily do it using a simple java/python app but I am little 
worried about managing scaling/fault tolerance and thats where my concern is.

TIA
Ayan

On Mon, Jul 6, 2015 at 12:51 AM, Ashic Mahtab as...@live.com wrote:



Hi Ayan,How continuous is your workload? As Akhil points out, with streaming, 
you'll give up at least one core for receiving, will need at most one more core 
for processing. Unless you're running on something like Mesos, this means that 
those cores are dedicated to your app, and can't be leveraged by other apps / 
jobs.
If it's something periodic (once an hour, once every 15 minutes, etc.), then 
I'd simply write a normal spark application, and trigger it periodically. 
There are many things that can take care of that - sometimes a simple cronjob 
is enough!

Date: Sun, 5 Jul 2015 22:48:37 +1000
Subject: Re: JDBC Streams
From: guha.a...@gmail.com
To: ak...@sigmoidanalytics.com
CC: user@spark.apache.org

Thanks Akhil. In case I go with spark streaming, I guess I have to implment a 
custom receiver and spark streaming will call this receiver every batch 
interval, is that correct? Any gotcha you see in this plan? TIA...Best, Ayan

On Sun, Jul 5, 2015 at 5:40 PM, Akhil Das ak...@sigmoidanalytics.com wrote:
If you want a long running application, then go with spark streaming (which 
kind of blocks your resources). On the other hand, if you use job server then 
you can actually use the resources (CPUs) for other jobs also when your dbjob 
is not using them.ThanksBest Regards

On Sun, Jul 5, 2015 at 5:28 AM, ayan guha guha.a...@gmail.com wrote:
Hi All

I have a requireent to connect to a DB every few minutes and bring data to 
HBase. Can anyone suggest if spark streaming would be appropriate for this 
senario or I shoud look into jobserver?

Thanks in advance
-- 
Best Regards,
Ayan Guha






-- 
Best Regards,
Ayan Guha

  


-- 
Best Regards,
Ayan Guha

  

RE: .NET on Apache Spark?

2015-07-05 Thread Ashic Mahtab
Unfortunately, afaik that project is long dead.
It'd be an interesting project to create an intermediary protocol, perhaps 
using something that nearly everything these days understand (unfortunately [!] 
that might be JavaScript). For example, instead of pickling language 
constructs, it might be interesting to translate rdd operations to some json 
structure, and have a single thing server side processing the instructions. 
There's also mbrace (http://www.m-brace.net/)... mbrace-spark integration would 
be quite interesting indeed. Though the difference in approach might be quite a 
challenge.
Another approach could be using IKVM to host the JVM, much like how pyspark 
executes.
Microsoft research published some very early work in OneNet: 
http://research.microsoft.com/en-us/um/people/jinl/redesign/research/onenet_executive_summary.pdf
 - their careers page seems to be recruiting for the project.
Again, these are all future things, most of which would need to be community 
driven. If you need something right now, then there really isn't good 
integration between spark and .NET. However, given your requirements, mbrace 
might be something that you might find useful.
-Ashic.

Date: Sun, 5 Jul 2015 11:05:30 -0600
Subject: Re: .NET on Apache Spark?
From: dautkha...@gmail.com
To: ski.rodrig...@gmail.com
CC: user@spark.apache.org

Scala used to run on .NEThttp://www.scala-lang.org/old/node/10299

-- 
Ruslan Dautkhanov


On Thu, Jul 2, 2015 at 1:26 PM, pedro ski.rodrig...@gmail.com wrote:
You might try using .pipe() and installing your .NET program as a binary

across the cluster (or using addFile). Its not ideal to pipe things in/out

along with the overhead, but it would work.



I don't know much about IronPython, but perhaps changing the default python

by changing your path might work?







--

View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/NET-on-Apache-Spark-tp23578p23594.html

Sent from the Apache Spark User List mailing list archive at Nabble.com.



-

To unsubscribe, e-mail: user-unsubscr...@spark.apache.org

For additional commands, e-mail: user-h...@spark.apache.org




  

RE: What does Spark is not just MapReduce mean? Isn't every Spark job a form of MapReduce?

2015-06-28 Thread Ashic Mahtab
Spark comes with quite a few components. At it's core is..surprisespark 
core. This provides the core things required to run spark jobs. Spark provides 
a lot of operators out of the box...take a look at 
https://spark.apache.org/docs/latest/programming-guide.html#transformationshttps://spark.apache.org/docs/latest/programming-guide.html#actions
While all of them can be implemented with variations of rd.map().reduce(), 
there are optimisations to be gained in terms of data locality, etc., and the 
additional operators simply make life simpler.
In addition to the core stuff, spark also brings things like Spark Streaming, 
Spark Sql and data frames, MLLib, GraphX, etc. Spark Streaming gives you 
microbatches of rdds at periodic intervals.Think give me the last 15 seconds 
of events every 5 seconds. You can then program towards the small collection, 
and the job will run in a fault tolerant manner on your cluster. Spark Sql 
provides hive like functionality that works nicely with various data sources, 
and RDDs. MLLib provide a lot of oob machine learning algorithms, and the new 
Spark ML project provides a nice elegant pipeline api to take care of a lot of 
common machine learning tasks. GraphX allows you to represent data in graphs, 
and run graph algorithms on it. e.g. you can represent your data as RDDs of 
vertexes and edges, and run pagerank on a distributed cluster.
And there's moreso, yeah...Spark is definitely not just MapReduce. :)

 Date: Sun, 28 Jun 2015 09:13:18 -0700
 From: jonrgr...@gmail.com
 To: user@spark.apache.org
 Subject: What does Spark is not just MapReduce mean?  Isn't every Spark job 
 a form of MapReduce?
 
 I've heard Spark is not just MapReduce mentioned during Spark talks, but it
 seems like every method that Spark has is really doing something like (Map
 - Reduce) or (Map - Map - Map - Reduce) etc behind the scenes, with the
 performance benefit of keeping RDDs in memory between stages.
 
 Am I wrong about that?  Is Spark doing anything more efficiently than a
 series of Maps followed by a Reduce in memory?  What methods does Spark have
 that can't easily be mapped (with somewhat similar efficiency) to Map and
 Reduce in memory?
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/What-does-Spark-is-not-just-MapReduce-mean-Isn-t-every-Spark-job-a-form-of-MapReduce-tp23518.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
  

RE: Recent spark sc.textFile needs hadoop for folders?!?

2015-06-26 Thread Ashic Mahtab
Thanks for the replies, guys.
Is this a permanent change as of 1.3, or will it go away at some point? Also, 
does it require an entire Hadoop installation, or just WinUtils.exe?
Thanks,Ashic.

Date: Fri, 26 Jun 2015 18:22:03 +1000
Subject: Re: Recent spark sc.textFile needs hadoop for folders?!?
From: guha.a...@gmail.com
To: as...@live.com
CC: user@spark.apache.org

It's a problem since 1.3 I think
On 26 Jun 2015 04:00, Ashic Mahtab as...@live.com wrote:



Hello,Just trying out spark 1.4 (we're using 1.1 at present). On Windows, I've 
noticed the following:
* On 1.4, sc.textFile(D:\\folder\\).collect() fails from both spark-shell.cmd 
and when running a scala application referencing the spark-core package from 
maven.* sc.textFile(D:\\folder\\file.txt).collect() succeeds.* On 1.1, both 
succeed.* When referencing the binaries in the scala application, this is the 
error:

15/06/25 18:30:13 ERROR Shell: Failed to locate the winutils binary in the 
hadoop binary pathjava.io.IOException: Could not locate executable 
null\bin\winutils.exe in the Hadoop binaries. at 
org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:278) at 
org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:300) at 
org.apache.hadoop.util.Shell.clinit(Shell.java:293)at 
org.apache.hadoop.util.StringUtils.clinit(StringUtils.java:76) at 
org.apache.hadoop.mapred.FileInputFormat.setInputPaths(FileInputFormat.java:362)
 at 
org.apache.spark.SparkContext$$anonfun$hadoopFile$1$$anonfun$32.apply(SparkContext.scala:978)
at 
org.apache.spark.SparkContext$$anonfun$hadoopFile$1$$anonfun$32.apply(SparkContext.scala:978)
at 
org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$6.apply(HadoopRDD.scala:176) 
 at 
org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$6.apply(HadoopRDD.scala:176)
This seems quite strange...is this a known issue? Worse, is this a feature? I 
don't have to be using hadoop at all... just want to process some files and 
data in Cassandra.
Regards,Ashic.
  
  

RE: Recent spark sc.textFile needs hadoop for folders?!?

2015-06-26 Thread Ashic Mahtab
Thanks for the awesome response, Steve.
As you say, it's not ideal, but the clarification greatly helps.
Cheers, everyone :)
-Ashic.

Subject: Re: Recent spark sc.textFile needs hadoop for folders?!?
From: ste...@hortonworks.com
To: as...@live.com
CC: guha.a...@gmail.com; user@spark.apache.org
Date: Fri, 26 Jun 2015 08:54:31 +










On 26 Jun 2015, at 09:29, Ashic Mahtab as...@live.com wrote:



Thanks for the replies, guys.



Is this a permanent change as of 1.3, or will it go away at some point?







Don't blame the spark team, complain to the hadoop team for being slow to 
embrace the java 1.7 APIs for low-level filesystem IO.





Also, does it require an entire Hadoop installation, or just WinUtils.exe?



Thanks,
Ashic.









you really only need a HADOOP_HOME dir with a bin/ subdir containing the DLLs 
and exes needed to work with the specific Hadoop JARs you are running with



This should be all you need for the Hadoop 2.6
https://github.com/steveloughran/clusterconfigs/tree/master/clusters/morzine/hadoop_home



I know it's a pain, we really do need to fix it. 



-Steve

Recent spark sc.textFile needs hadoop for folders?!?

2015-06-25 Thread Ashic Mahtab
Hello,Just trying out spark 1.4 (we're using 1.1 at present). On Windows, I've 
noticed the following:
* On 1.4, sc.textFile(D:\\folder\\).collect() fails from both spark-shell.cmd 
and when running a scala application referencing the spark-core package from 
maven.* sc.textFile(D:\\folder\\file.txt).collect() succeeds.* On 1.1, both 
succeed.* When referencing the binaries in the scala application, this is the 
error:

15/06/25 18:30:13 ERROR Shell: Failed to locate the winutils binary in the 
hadoop binary pathjava.io.IOException: Could not locate executable 
null\bin\winutils.exe in the Hadoop binaries. at 
org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:278) at 
org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:300) at 
org.apache.hadoop.util.Shell.clinit(Shell.java:293)at 
org.apache.hadoop.util.StringUtils.clinit(StringUtils.java:76) at 
org.apache.hadoop.mapred.FileInputFormat.setInputPaths(FileInputFormat.java:362)
 at 
org.apache.spark.SparkContext$$anonfun$hadoopFile$1$$anonfun$32.apply(SparkContext.scala:978)
at 
org.apache.spark.SparkContext$$anonfun$hadoopFile$1$$anonfun$32.apply(SparkContext.scala:978)
at 
org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$6.apply(HadoopRDD.scala:176) 
 at 
org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$6.apply(HadoopRDD.scala:176)
This seems quite strange...is this a known issue? Worse, is this a feature? I 
don't have to be using hadoop at all... just want to process some files and 
data in Cassandra.
Regards,Ashic.
  

RE: Spark SQL odbc on Windows

2015-02-22 Thread Ashic Mahtab
Hi Francisco,While I haven't tried this, have a look at the contents of 
start-thriftserver.sh - all it's doing is setting up a few variables and 
calling:
/bin/spark-submit --class 
org.apache.spark.sql.hive.thriftserver.HiveThriftServer2
and passing some additional parameters. Perhaps doing the same would work?
I also believe that this hosts a jdbc server (not odbc), but there's a free 
odbc connector from databricks built by Simba, with which I've been able to 
connect to a spark cluster hosted on linux.
-Ashic.

To: user@spark.apache.org
From: forch...@gmail.com
Subject: Spark SQL odbc on Windows
Date: Sun, 22 Feb 2015 09:45:03 +0100

Hello, 
I work on a MS consulting company and we are evaluating including SPARK on our 
BigData offer. We are particulary interested into testing SPARK as rolap engine 
for SSAS but we cannot find a way to activate the odbc server (thrift) on a 
Windows custer. There is no start-thriftserver.sh command available for 
windows. 

Somebody knows if there is a way to make this work? 

Thanks in advance!!
Francisco 

Hive, Spark, Cassandra, Tableau, BI, etc.

2015-02-17 Thread Ashic Mahtab
Hi,I've seen a few articles where they CqlStorageHandler to create hive tables 
referencing Cassandra data using the thriftserver. Is there a secret to getting 
this to work? I've basically got Spark built with Hive, and a Cassandra 
cluster. Is there a way to get the hive server to talk to Cassandra? I've seen 
Calliope from Tuplejump, and it looks good, but was wondering if there's a more 
direct approach. The version updates of various things mean that if I could go 
direct even with a bit of hassle, I'd prefer that.
I've also managed to connect the Simba ODBC driver for Spark. Is there are way 
to get ODBC clients to use that to get to Cassandra data? Would I need to build 
a Spark application and pull in Cassandra data (or define RDDs) for ODBC 
clients to make use of it? I can't really use the Simba ODBC connector for 
Cassandra as it's missing a few things currently (like understanding UDTs + 
collection columns). 
What I'm really after is somehow letting Tableau + Excel users access to 
Cassandra data. I'd love to be able to use DataStax Enterprise, but as of now, 
the client is firmly in Apache Cassandra + Apache Spark territory. 
Thanks,Ashic. 

Cleanup Questions

2015-02-17 Thread Ashic Mahtab
Two questions regarding worker cleanup:
1) Is the best place to enable worker cleanup setting
export SPARK_WORKER_OPTS=-Dspark.worker.cleanup.enabled=true 
-Dspark.worker.cleanup.interval=30 in conf/spark-env.sh for each worker? Or is 
there a better place?
2) I see this has a default TTL of 7 days. I've got some spark streaming jobs 
that run for more than 7 days. Would the spark cleanup honour currently running 
applications and not discard the data, or will it obliterate everything that's 
older than 7 days? If the latter, what's a good approach to clean up 
considering my streaming app will be running for  ttl period? 
Thanks,Ashic. 

Check if spark was built with hive

2015-02-09 Thread Ashic Mahtab
Is there an easy way to check if a spark binary release was built with Hive 
support? Are any of the prebuilt binaries on the spark website built with hive 
support?
Thanks,Ashic. 

RE: Check if spark was built with hive

2015-02-09 Thread Ashic Mahtab
Awesome...thanks Sean.

 From: so...@cloudera.com
 Date: Mon, 9 Feb 2015 22:43:45 +
 Subject: Re: Check if spark was built with hive
 To: as...@live.com
 CC: user@spark.apache.org
 
 https://github.com/apache/spark/blob/master/dev/create-release/create-release.sh#L217
 
 Yes all releases are built with -Phive except the 'without-hive' build.
 
 On Mon, Feb 9, 2015 at 10:41 PM, Ashic Mahtab as...@live.com wrote:
  Is there an easy way to check if a spark binary release was built with Hive
  support? Are any of the prebuilt binaries on the spark website built with
  hive support?
 
  Thanks,
  Ashic.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
  

RE: Full per node replication level (architecture question)

2015-01-24 Thread Ashic Mahtab
You could look at using Cassandra for storage. Spark integrates nicely with 
Cassandra, and a combination of Spark + Cassandra would give you fast access to 
structured data in Cassandra, while enabling analytic scenarios via Spark. 
Cassandra would take care of the replication, as it's one of the core features 
of the database.

Date: Sat, 24 Jan 2015 23:34:15 +0200
Subject: Full per node replication level (architecture question)
From: dev.ma...@gmail.com
To: u...@spark.incubator.apache.org

Hi,
I wonder whether any of the file systems supported by Spark, may well support a 
replication level whereby each node has a full copy of the data. I realize this 
was not the main intended scenario of spark/hadoop, but may be a good fit for a 
compute cluster that needs to be very fast over its input data, and that has 
data only in the amount of few terabytes in total (which fit nicely on any 
commodity disk and soon on any SSD).
It would be nice to use Spark map-reduce over the data, and enjoy automatic 
replication.
It would be also nice to assume Spark can seamlessly manage a job's workflow 
across such cluster...
Thanks!Matan  

RE: Starting a spark streaming app in init.d

2015-01-24 Thread Ashic Mahtab
Cool. I was thinking of waiting a second and doing ps aux | grep java | grep 
jarname.jar, and I guess checking 4040 would work as as well. Thanks for the 
input.
Regards,Ashic.

Date: Sat, 24 Jan 2015 13:00:13 +0530
Subject: Re: Starting a spark streaming app in init.d
From: ak...@sigmoidanalytics.com
To: as...@live.com
CC: user@spark.apache.org

I'd do the same but put an extra condition to check whether the job has 
successfully started or not by checking the application ui (port availability 
4040 would do, if you want more complex one then write a parser for the same.) 
after putting the main script on sleep for some time (say 2 minutes).ThanksBest 
Regards

On Sat, Jan 24, 2015 at 1:57 AM, Ashic Mahtab as...@live.com wrote:



Hello,
I'm trying to kick off a spark streaming job to a stand alone master using 
spark submit inside of init.d. This is what I have:


DAEMON=spark-submit --class Streamer --executor-memory 500M 
--total-executor-cores 4 /path/to/assembly.jar

start() {
$DAEMON -p /var/run/my_assembly.pid 
echo OK 
return 0
}

However, will return 0 even if spark_submit fails. Is there a way to run 
spark-submit in the background and return 0 only if it successfully starts up? 
Or better yet, is there something in spark-submit that will allow me to do 
this, perhaps via a command line argument?

Thanks,
Ashic.
  

  

Starting a spark streaming app in init.d

2015-01-23 Thread Ashic Mahtab
Hello,
I'm trying to kick off a spark streaming job to a stand alone master using 
spark submit inside of init.d. This is what I have:


DAEMON=spark-submit --class Streamer --executor-memory 500M 
--total-executor-cores 4 /path/to/assembly.jar

start() {
$DAEMON -p /var/run/my_assembly.pid 
echo OK 
return 0
}

However, will return 0 even if spark_submit fails. Is there a way to run 
spark-submit in the background and return 0 only if it successfully starts up? 
Or better yet, is there something in spark-submit that will allow me to do 
this, perhaps via a command line argument?

Thanks,
Ashic.
  

RE: Are these numbers abnormal for spark streaming?

2015-01-22 Thread Ashic Mahtab
Hi Gerard,
Thanks for the response.

The messages get desrialised from msgpack format, and one of the strings is 
desrialised to json. Certain fields are checked to decide if further processing 
is required. If so, it goes through a series of in mem filters to check if more 
processing is required. If so, only then does the heavy work start. That 
consists of a few db queries, and potential updates to the db + message on 
message queue. The majority of messages don't need processing. The messages 
needing processing at peak are about three every other second. 

One possible things that might be happening is the session initialisation and 
prepared statement initialisation for each partition. I can resort to some 
tricks, but I think I'll try increasing batch interval to 15 seconds. I'll 
report back with findings.

Thanks,
Ashic.

From: gerard.m...@gmail.com
Date: Thu, 22 Jan 2015 12:30:08 +0100
Subject: Re: Are these numbers abnormal for spark streaming?
To: tathagata.das1...@gmail.com
CC: as...@live.com; t...@databricks.com; user@spark.apache.org

and post the code (if possible).In a nutshell, your processing time  batch 
interval,  resulting in an ever-increasing delay that will end up in a crash.
3 secs to process 14 messages looks like a lot. Curious what the job logic is.
-kr, Gerard.
On Thu, Jan 22, 2015 at 12:15 PM, Tathagata Das tathagata.das1...@gmail.com 
wrote:
This is not normal. Its a huge scheduling delay!! Can you tell me more about 
the application?- cluser setup, number of receivers, whats the computation, etc.
On Thu, Jan 22, 2015 at 3:11 AM, Ashic Mahtab as...@live.com wrote:



Hate to do this...but...erm...bump? Would really appreciate input from others 
using Streaming. Or at least some docs that would tell me if these are expected 
or not.

From: as...@live.com
To: user@spark.apache.org
Subject: Are these numbers abnormal for spark streaming?
Date: Wed, 21 Jan 2015 11:26:31 +




Hi Guys,
I've got Spark Streaming set up for a low data rate system (using spark's 
features for analysis, rather than high throughput). Messages are coming in 
throughout the day, at around 1-20 per second (finger in the air estimate...not 
analysed yet).  In the spark streaming UI for the application, I'm getting the 
following after 17 hours.

StreamingStarted at: Tue Jan 20 16:58:43 GMT 2015Time since start: 18 hours 24 
minutes 34 secondsNetwork receivers: 2Batch interval: 2 secondsProcessed 
batches: 16482Waiting batches: 1

Statistics over last 100 processed batchesReceiver 
StatisticsReceiverStatusLocationRecords in last batch[2015/01/21 
11:23:18]Minimum rate[records/sec]Median rate[records/sec]Maximum 
rate[records/sec]Last ErrorRmqReceiver-0ACTIVEF
144727-RmqReceiver-1ACTIVEBR
124726-Batch Processing StatisticsMetricLast batchMinimum25th 
percentileMedian75th percentileMaximumProcessing Time3 seconds 994 ms157 ms4 
seconds 16 ms4 seconds 961 ms5 seconds 3 ms5 seconds 171 msScheduling Delay9 
hours 15 minutes 4 seconds9 hours 10 minutes 54 seconds9 hours 11 minutes 56 
seconds9 hours 12 minutes 57 seconds9 hours 14 minutes 5 seconds9 hours 15 
minutes 4 secondsTotal Delay9 hours 15 minutes 8 seconds9 hours 10 minutes 58 
seconds9 hours 12 minutes9 hours 13 minutes 2 seconds9 hours 14 minutes 10 
seconds9 hours 15 minutes 8 seconds
Are these normal. I was wondering what the scheduling delay and total delay 
terms are, and if it's normal for them to be 9 hours.

I've got a standalone spark master and 4 spark nodes. The streaming app has 
been given 4 cores, and it's using 1 core per worker node. The streaming app is 
submitted from a 5th machine, and that machine has nothing but the driver 
running. The worker nodes are running alongside Cassandra (and reading and 
writing to it).

Any insights would be appreciated.

Regards,
Ashic.

  



  

RE: Are these numbers abnormal for spark streaming?

2015-01-22 Thread Ashic Mahtab
Hate to do this...but...erm...bump? Would really appreciate input from others 
using Streaming. Or at least some docs that would tell me if these are expected 
or not.

From: as...@live.com
To: user@spark.apache.org
Subject: Are these numbers abnormal for spark streaming?
Date: Wed, 21 Jan 2015 11:26:31 +




Hi Guys,
I've got Spark Streaming set up for a low data rate system (using spark's 
features for analysis, rather than high throughput). Messages are coming in 
throughout the day, at around 1-20 per second (finger in the air estimate...not 
analysed yet).  In the spark streaming UI for the application, I'm getting the 
following after 17 hours.

StreamingStarted at: Tue Jan 20 16:58:43 GMT 2015Time since start: 18 hours 24 
minutes 34 secondsNetwork receivers: 2Batch interval: 2 secondsProcessed 
batches: 16482Waiting batches: 1

Statistics over last 100 processed batchesReceiver 
StatisticsReceiverStatusLocationRecords in last batch[2015/01/21 
11:23:18]Minimum rate[records/sec]Median rate[records/sec]Maximum 
rate[records/sec]Last ErrorRmqReceiver-0ACTIVEF
144727-RmqReceiver-1ACTIVEBR
124726-Batch Processing StatisticsMetricLast batchMinimum25th 
percentileMedian75th percentileMaximumProcessing Time3 seconds 994 ms157 ms4 
seconds 16 ms4 seconds 961 ms5 seconds 3 ms5 seconds 171 msScheduling Delay9 
hours 15 minutes 4 seconds9 hours 10 minutes 54 seconds9 hours 11 minutes 56 
seconds9 hours 12 minutes 57 seconds9 hours 14 minutes 5 seconds9 hours 15 
minutes 4 secondsTotal Delay9 hours 15 minutes 8 seconds9 hours 10 minutes 58 
seconds9 hours 12 minutes9 hours 13 minutes 2 seconds9 hours 14 minutes 10 
seconds9 hours 15 minutes 8 seconds
Are these normal. I was wondering what the scheduling delay and total delay 
terms are, and if it's normal for them to be 9 hours.

I've got a standalone spark master and 4 spark nodes. The streaming app has 
been given 4 cores, and it's using 1 core per worker node. The streaming app is 
submitted from a 5th machine, and that machine has nothing but the driver 
running. The worker nodes are running alongside Cassandra (and reading and 
writing to it).

Any insights would be appreciated.

Regards,
Ashic.

  

RE: Are these numbers abnormal for spark streaming?

2015-01-22 Thread Ashic Mahtab
Hi TD,
Here's some information:

1. Cluster has one standalone master, 4 workers. Workers are co-hosted with 
Apache Cassandra. Master is set up with external Zookeeper.
2. Each machine has 2 cores and 4GB of ram. This is for testing. All machines 
are vmware vms. Spark has 2GB dedicated to it on each node.
3. In addition to the streaming details, the master details as of now are given 
below. Only the streaming app is running.
4. I'm listening to two rabbitmq queues using a rabbitmq receiver (code: 
https://gist.github.com/ashic/b5edc7cfdc85aa60b066 ). Notifier code is here 
https://gist.github.com/ashic/9abd352c691eafc8c9f3 
5. The receivers are initialised with the following code:
val ssc = new StreamingContext(sc, Seconds(2))
val messages1 = ssc.receiverStream(new RmqReceiver(abc, abc, /, 
vdclog03, abc_input, None))
val messages2 = ssc.receiverStream(new RmqReceiver(abc, abc, /, 
vdclog04, abc_input, None))
val messages = messages1.union(messages2)
val notifier = new RabbitMQEventNotifier(vdclog03, abc, 
abc_output_events, abc, abc, /)

6. Usage:

  messages.map(x = ScalaMessagePack.read[RadioMessage](x))
  .flatMap(InputMessageParser.parse(_).getEvents())
  .foreachRDD(x = {
  x.foreachPartition(x = {
cassandraConnector.withSessionDo(session ={
  val graphStorage = new CassandraGraphStorage(session)
  val notificationStorage = new CassandraNotificationStorage(session)
  val savingNotifier = new SavingNotifier(notifier, notificationStorage)

  x.foreach(eventWrapper = eventWrapper.event match {
//do some queries.
// save some stuff if needed to cassandra
//raise a message to a separate queue with a msg = Unit() 
operation.

7. The algorithm is simple: listen to messages from two separate rmq queues. 
union them. for each message, check message properties. 
if needed, query cassandra for additional details (graph search..but done in 
0.5-3 seconds...and rare..shouldn't overwhelm with low input rate).
If needed, save some info back into cassandra (1-2ms), and raise an event to 
the notifier.

I'm probably missing something basic, just wondering what. It has been running 
fine for about 42 hours now, but the numbers are a tad worrying.

Cheers,
Ashic.


Workers: 4Cores: 8 Total, 4 UsedMemory: 8.0 GB Total, 2000.0 MB 
UsedApplications: 1 Running, 0 CompletedDrivers: 0 Running, 0 CompletedStatus: 
ALIVEWorkersIdAddressStateCoresMemoryworker-20141208131918-VDCAPP50.AAA.local-44476VDCAPP50.AAA.local:44476ALIVE2
 (1 Used)2.0 GB (500.0 MB 
Used)worker-20141208132012-VDCAPP52.AAA.local-34349VDCAPP52.AAA.local:34349ALIVE2
 (1 Used)2.0 GB (500.0 MB 
Used)worker-20141208132136-VDCAPP53.AAA.local-54000VDCAPP53.AAA.local:54000ALIVE2
 (1 Used)2.0 GB (500.0 MB 
Used)worker-2014121627-VDCAPP49.AAA.local-57899VDCAPP49.AAA.local:57899ALIVE2
 (1 Used)2.0 GB (500.0 MB Used)Running ApplicationsIDNameCoresMemory per 
NodeSubmitted TimeUserStateDurationapp-20150120165844-0005App1
4500.0 MB2015/01/20 16:58:44rootWAITING42.4 h

From: tathagata.das1...@gmail.com
Date: Thu, 22 Jan 2015 03:15:58 -0800
Subject: Re: Are these numbers abnormal for spark streaming?
To: as...@live.com; t...@databricks.com
CC: user@spark.apache.org

This is not normal. Its a huge scheduling delay!! Can you tell me more about 
the application?- cluser setup, number of receivers, whats the computation, etc.
On Thu, Jan 22, 2015 at 3:11 AM, Ashic Mahtab as...@live.com wrote:



Hate to do this...but...erm...bump? Would really appreciate input from others 
using Streaming. Or at least some docs that would tell me if these are expected 
or not.

From: as...@live.com
To: user@spark.apache.org
Subject: Are these numbers abnormal for spark streaming?
Date: Wed, 21 Jan 2015 11:26:31 +




Hi Guys,
I've got Spark Streaming set up for a low data rate system (using spark's 
features for analysis, rather than high throughput). Messages are coming in 
throughout the day, at around 1-20 per second (finger in the air estimate...not 
analysed yet).  In the spark streaming UI for the application, I'm getting the 
following after 17 hours.

StreamingStarted at: Tue Jan 20 16:58:43 GMT 2015Time since start: 18 hours 24 
minutes 34 secondsNetwork receivers: 2Batch interval: 2 secondsProcessed 
batches: 16482Waiting batches: 1

Statistics over last 100 processed batchesReceiver 
StatisticsReceiverStatusLocationRecords in last batch[2015/01/21 
11:23:18]Minimum rate[records/sec]Median rate[records/sec]Maximum 
rate[records/sec]Last ErrorRmqReceiver-0ACTIVEF
144727-RmqReceiver-1ACTIVEBR
124726-Batch Processing StatisticsMetricLast batchMinimum25th 
percentileMedian75th percentileMaximumProcessing Time3 seconds 994 ms157 ms4 
seconds 16 ms4 seconds 961 ms5 seconds 3 ms5 seconds 171 msScheduling Delay9 
hours 15 minutes 4 seconds9 hours 10 minutes 54 seconds9 hours 11 minutes 56 
seconds9 hours 12 minutes 57 seconds9 hours 14 minutes 5 seconds9 hours 15 
minutes 4

RE: Are these numbers abnormal for spark streaming?

2015-01-22 Thread Ashic Mahtab
Another quick question... I've got 4 nodes with 2 cores each. I've assinged the 
streaming app 4 cores. It seems to be using one per node. I imagine forwarding 
from the receivers to the executors are causing unnecessary processing. Is 
there a way to specify that I want 2 cores from the same machines to be 
involved (even better if this can be specified during spark-submit)?

Thanks,
Ashic.

From: as...@live.com
To: gerard.m...@gmail.com; asudipta.baner...@gmail.com
CC: user@spark.apache.org; tathagata.das1...@gmail.com
Subject: RE: Are these numbers abnormal for spark streaming?
Date: Thu, 22 Jan 2015 15:40:17 +




Yup...looks like it. I can do some tricks to reduce setup costs further, but 
this is much better than where I was yesterday. Thanks for your awesome input :)

-Ashic.

From: gerard.m...@gmail.com
Date: Thu, 22 Jan 2015 16:34:38 +0100
Subject: Re: Are these numbers abnormal for spark streaming?
To: asudipta.baner...@gmail.com
CC: as...@live.com; user@spark.apache.org; tathagata.das1...@gmail.com

Given that the process, and in particular, the setup of connections, is bound 
to the number of partitions (in x.foreachPartition{ x= ???}), I think it would 
be worth trying reducing them.
Increasing the  'spark.streaming.BlockInterval' will do the trick (you can read 
the tuning details here:  http://www.virdata.com/tuning-spark/#Partitions)
-kr, Gerard.
On Thu, Jan 22, 2015 at 4:28 PM, Gerard Maas gerard.m...@gmail.com wrote:
So the system has gone from 7msg in 4.961 secs (median) to 106msgs in 4,761 
seconds. I think there's evidence that setup costs are quite high in this case 
and increasing the batch interval is helping.
On Thu, Jan 22, 2015 at 4:12 PM, Sudipta Banerjee asudipta.baner...@gmail.com 
wrote:
Hi Ashic Mahtab,

The Cassandra and the Zookeeper are they installed as a part of Yarn 
architecture or are they installed in a separate layer with Apache Spark .

Thanks and Regards,
Sudipta

On Thu, Jan 22, 2015 at 8:13 PM, Ashic Mahtab as...@live.com wrote:



Hi Guys,
So I changed the interval to 15 seconds. There's obviously a lot more messages 
per batch, but (I think) it looks a lot healthier. Can you see any major 
warning signs? I think that with 2 second intervals, the setup / teardown per 
partition was what was causing the delays.

StreamingStarted at: Thu Jan 22 13:23:12 GMT 2015Time since start: 1 hour 17 
minutes 16 secondsNetwork receivers: 2Batch interval: 15 secondsProcessed 
batches: 309Waiting batches: 0

Statistics over last 100 processed batchesReceiver 
StatisticsReceiverStatusLocationRecords in last batch[2015/01/22 
14:40:29]Minimum rate[records/sec]Median rate[records/sec]Maximum 
rate[records/sec]Last ErrorRmqReceiver-0ACTIVEVDCAPP53.foo.local2.6 
K29106295-RmqReceiver-1ACTIVEVDCAPP50.bar.local2.6 K29107291-Batch Processing 
StatisticsMetricLast batchMinimum25th percentileMedian75th 
percentileMaximumProcessing Time4 seconds 812 ms4 seconds 698 ms4 seconds 738 
ms4 seconds 761 ms4 seconds 788 ms5 seconds 802 msScheduling Delay2 ms0 ms3 ms3 
ms4 ms9 msTotal Delay4 seconds 814 ms4 seconds 701 ms4 seconds 739 ms4 seconds 
764 ms4 seconds 792 ms5 seconds 809 ms
Regards,
Ashic.
From: as...@live.com
To: gerard.m...@gmail.com
CC: user@spark.apache.org
Subject: RE: Are these numbers abnormal for spark streaming?
Date: Thu, 22 Jan 2015 12:32:05 +




Hi Gerard,
Thanks for the response.

The messages get desrialised from msgpack format, and one of the strings is 
desrialised to json. Certain fields are checked to decide if further processing 
is required. If so, it goes through a series of in mem filters to check if more 
processing is required. If so, only then does the heavy work start. That 
consists of a few db queries, and potential updates to the db + message on 
message queue. The majority of messages don't need processing. The messages 
needing processing at peak are about three every other second. 

One possible things that might be happening is the session initialisation and 
prepared statement initialisation for each partition. I can resort to some 
tricks, but I think I'll try increasing batch interval to 15 seconds. I'll 
report back with findings.

Thanks,
Ashic.

From: gerard.m...@gmail.com
Date: Thu, 22 Jan 2015 12:30:08 +0100
Subject: Re: Are these numbers abnormal for spark streaming?
To: tathagata.das1...@gmail.com
CC: as...@live.com; t...@databricks.com; user@spark.apache.org

and post the code (if possible).In a nutshell, your processing time  batch 
interval,  resulting in an ever-increasing delay that will end up in a crash.
3 secs to process 14 messages looks like a lot. Curious what the job logic is.
-kr, Gerard.
On Thu, Jan 22, 2015 at 12:15 PM, Tathagata Das tathagata.das1...@gmail.com 
wrote:
This is not normal. Its a huge scheduling delay!! Can you tell me more about 
the application?- cluser setup, number of receivers, whats the computation, etc.
On Thu, Jan 22, 2015 at 3:11 AM, Ashic Mahtab as...@live.com wrote:



Hate to do

RE: Are these numbers abnormal for spark streaming?

2015-01-22 Thread Ashic Mahtab
Hi Sudipta,
Standalone spark master. Separate Zookeeper cluster. 4 worker nodes with 
cassandra + spark on each. No hadoop / hdfs / yarn.

Regards,
Ashic.

Date: Thu, 22 Jan 2015 20:42:43 +0530
Subject: Re: Are these numbers abnormal for spark streaming?
From: asudipta.baner...@gmail.com
To: as...@live.com
CC: gerard.m...@gmail.com; user@spark.apache.org; tathagata.das1...@gmail.com

Hi Ashic Mahtab,

The Cassandra and the Zookeeper are they installed as a part of Yarn 
architecture or are they installed in a separate layer with Apache Spark .

Thanks and Regards,
Sudipta

On Thu, Jan 22, 2015 at 8:13 PM, Ashic Mahtab as...@live.com wrote:



Hi Guys,
So I changed the interval to 15 seconds. There's obviously a lot more messages 
per batch, but (I think) it looks a lot healthier. Can you see any major 
warning signs? I think that with 2 second intervals, the setup / teardown per 
partition was what was causing the delays.

StreamingStarted at: Thu Jan 22 13:23:12 GMT 2015Time since start: 1 hour 17 
minutes 16 secondsNetwork receivers: 2Batch interval: 15 secondsProcessed 
batches: 309Waiting batches: 0

Statistics over last 100 processed batchesReceiver 
StatisticsReceiverStatusLocationRecords in last batch[2015/01/22 
14:40:29]Minimum rate[records/sec]Median rate[records/sec]Maximum 
rate[records/sec]Last ErrorRmqReceiver-0ACTIVEVDCAPP53.foo.local2.6 
K29106295-RmqReceiver-1ACTIVEVDCAPP50.bar.local2.6 K29107291-Batch Processing 
StatisticsMetricLast batchMinimum25th percentileMedian75th 
percentileMaximumProcessing Time4 seconds 812 ms4 seconds 698 ms4 seconds 738 
ms4 seconds 761 ms4 seconds 788 ms5 seconds 802 msScheduling Delay2 ms0 ms3 ms3 
ms4 ms9 msTotal Delay4 seconds 814 ms4 seconds 701 ms4 seconds 739 ms4 seconds 
764 ms4 seconds 792 ms5 seconds 809 ms
Regards,
Ashic.
From: as...@live.com
To: gerard.m...@gmail.com
CC: user@spark.apache.org
Subject: RE: Are these numbers abnormal for spark streaming?
Date: Thu, 22 Jan 2015 12:32:05 +




Hi Gerard,
Thanks for the response.

The messages get desrialised from msgpack format, and one of the strings is 
desrialised to json. Certain fields are checked to decide if further processing 
is required. If so, it goes through a series of in mem filters to check if more 
processing is required. If so, only then does the heavy work start. That 
consists of a few db queries, and potential updates to the db + message on 
message queue. The majority of messages don't need processing. The messages 
needing processing at peak are about three every other second. 

One possible things that might be happening is the session initialisation and 
prepared statement initialisation for each partition. I can resort to some 
tricks, but I think I'll try increasing batch interval to 15 seconds. I'll 
report back with findings.

Thanks,
Ashic.

From: gerard.m...@gmail.com
Date: Thu, 22 Jan 2015 12:30:08 +0100
Subject: Re: Are these numbers abnormal for spark streaming?
To: tathagata.das1...@gmail.com
CC: as...@live.com; t...@databricks.com; user@spark.apache.org

and post the code (if possible).In a nutshell, your processing time  batch 
interval,  resulting in an ever-increasing delay that will end up in a crash.
3 secs to process 14 messages looks like a lot. Curious what the job logic is.
-kr, Gerard.
On Thu, Jan 22, 2015 at 12:15 PM, Tathagata Das tathagata.das1...@gmail.com 
wrote:
This is not normal. Its a huge scheduling delay!! Can you tell me more about 
the application?- cluser setup, number of receivers, whats the computation, etc.
On Thu, Jan 22, 2015 at 3:11 AM, Ashic Mahtab as...@live.com wrote:



Hate to do this...but...erm...bump? Would really appreciate input from others 
using Streaming. Or at least some docs that would tell me if these are expected 
or not.

From: as...@live.com
To: user@spark.apache.org
Subject: Are these numbers abnormal for spark streaming?
Date: Wed, 21 Jan 2015 11:26:31 +




Hi Guys,
I've got Spark Streaming set up for a low data rate system (using spark's 
features for analysis, rather than high throughput). Messages are coming in 
throughout the day, at around 1-20 per second (finger in the air estimate...not 
analysed yet).  In the spark streaming UI for the application, I'm getting the 
following after 17 hours.

StreamingStarted at: Tue Jan 20 16:58:43 GMT 2015Time since start: 18 hours 24 
minutes 34 secondsNetwork receivers: 2Batch interval: 2 secondsProcessed 
batches: 16482Waiting batches: 1

Statistics over last 100 processed batchesReceiver 
StatisticsReceiverStatusLocationRecords in last batch[2015/01/21 
11:23:18]Minimum rate[records/sec]Median rate[records/sec]Maximum 
rate[records/sec]Last ErrorRmqReceiver-0ACTIVEF
144727-RmqReceiver-1ACTIVEBR
124726-Batch Processing StatisticsMetricLast batchMinimum25th 
percentileMedian75th percentileMaximumProcessing Time3 seconds 994 ms157 ms4 
seconds 16 ms4 seconds 961 ms5 seconds 3 ms5 seconds 171 msScheduling Delay9 
hours 15 minutes 4 seconds9 hours 10 minutes

RE: Are these numbers abnormal for spark streaming?

2015-01-22 Thread Ashic Mahtab
Hi Guys,
So I changed the interval to 15 seconds. There's obviously a lot more messages 
per batch, but (I think) it looks a lot healthier. Can you see any major 
warning signs? I think that with 2 second intervals, the setup / teardown per 
partition was what was causing the delays.

StreamingStarted at: Thu Jan 22 13:23:12 GMT 2015Time since start: 1 hour 17 
minutes 16 secondsNetwork receivers: 2Batch interval: 15 secondsProcessed 
batches: 309Waiting batches: 0

Statistics over last 100 processed batchesReceiver 
StatisticsReceiverStatusLocationRecords in last batch[2015/01/22 
14:40:29]Minimum rate[records/sec]Median rate[records/sec]Maximum 
rate[records/sec]Last ErrorRmqReceiver-0ACTIVEVDCAPP53.foo.local2.6 
K29106295-RmqReceiver-1ACTIVEVDCAPP50.bar.local2.6 K29107291-Batch Processing 
StatisticsMetricLast batchMinimum25th percentileMedian75th 
percentileMaximumProcessing Time4 seconds 812 ms4 seconds 698 ms4 seconds 738 
ms4 seconds 761 ms4 seconds 788 ms5 seconds 802 msScheduling Delay2 ms0 ms3 ms3 
ms4 ms9 msTotal Delay4 seconds 814 ms4 seconds 701 ms4 seconds 739 ms4 seconds 
764 ms4 seconds 792 ms5 seconds 809 ms
Regards,
Ashic.
From: as...@live.com
To: gerard.m...@gmail.com
CC: user@spark.apache.org
Subject: RE: Are these numbers abnormal for spark streaming?
Date: Thu, 22 Jan 2015 12:32:05 +




Hi Gerard,
Thanks for the response.

The messages get desrialised from msgpack format, and one of the strings is 
desrialised to json. Certain fields are checked to decide if further processing 
is required. If so, it goes through a series of in mem filters to check if more 
processing is required. If so, only then does the heavy work start. That 
consists of a few db queries, and potential updates to the db + message on 
message queue. The majority of messages don't need processing. The messages 
needing processing at peak are about three every other second. 

One possible things that might be happening is the session initialisation and 
prepared statement initialisation for each partition. I can resort to some 
tricks, but I think I'll try increasing batch interval to 15 seconds. I'll 
report back with findings.

Thanks,
Ashic.

From: gerard.m...@gmail.com
Date: Thu, 22 Jan 2015 12:30:08 +0100
Subject: Re: Are these numbers abnormal for spark streaming?
To: tathagata.das1...@gmail.com
CC: as...@live.com; t...@databricks.com; user@spark.apache.org

and post the code (if possible).In a nutshell, your processing time  batch 
interval,  resulting in an ever-increasing delay that will end up in a crash.
3 secs to process 14 messages looks like a lot. Curious what the job logic is.
-kr, Gerard.
On Thu, Jan 22, 2015 at 12:15 PM, Tathagata Das tathagata.das1...@gmail.com 
wrote:
This is not normal. Its a huge scheduling delay!! Can you tell me more about 
the application?- cluser setup, number of receivers, whats the computation, etc.
On Thu, Jan 22, 2015 at 3:11 AM, Ashic Mahtab as...@live.com wrote:



Hate to do this...but...erm...bump? Would really appreciate input from others 
using Streaming. Or at least some docs that would tell me if these are expected 
or not.

From: as...@live.com
To: user@spark.apache.org
Subject: Are these numbers abnormal for spark streaming?
Date: Wed, 21 Jan 2015 11:26:31 +




Hi Guys,
I've got Spark Streaming set up for a low data rate system (using spark's 
features for analysis, rather than high throughput). Messages are coming in 
throughout the day, at around 1-20 per second (finger in the air estimate...not 
analysed yet).  In the spark streaming UI for the application, I'm getting the 
following after 17 hours.

StreamingStarted at: Tue Jan 20 16:58:43 GMT 2015Time since start: 18 hours 24 
minutes 34 secondsNetwork receivers: 2Batch interval: 2 secondsProcessed 
batches: 16482Waiting batches: 1

Statistics over last 100 processed batchesReceiver 
StatisticsReceiverStatusLocationRecords in last batch[2015/01/21 
11:23:18]Minimum rate[records/sec]Median rate[records/sec]Maximum 
rate[records/sec]Last ErrorRmqReceiver-0ACTIVEF
144727-RmqReceiver-1ACTIVEBR
124726-Batch Processing StatisticsMetricLast batchMinimum25th 
percentileMedian75th percentileMaximumProcessing Time3 seconds 994 ms157 ms4 
seconds 16 ms4 seconds 961 ms5 seconds 3 ms5 seconds 171 msScheduling Delay9 
hours 15 minutes 4 seconds9 hours 10 minutes 54 seconds9 hours 11 minutes 56 
seconds9 hours 12 minutes 57 seconds9 hours 14 minutes 5 seconds9 hours 15 
minutes 4 secondsTotal Delay9 hours 15 minutes 8 seconds9 hours 10 minutes 58 
seconds9 hours 12 minutes9 hours 13 minutes 2 seconds9 hours 14 minutes 10 
seconds9 hours 15 minutes 8 seconds
Are these normal. I was wondering what the scheduling delay and total delay 
terms are, and if it's normal for them to be 9 hours.

I've got a standalone spark master and 4 spark nodes. The streaming app has 
been given 4 cores, and it's using 1 core per worker node. The streaming app is 
submitted from a 5th machine, and that machine has nothing

RE: Are these numbers abnormal for spark streaming?

2015-01-22 Thread Ashic Mahtab
Yup...looks like it. I can do some tricks to reduce setup costs further, but 
this is much better than where I was yesterday. Thanks for your awesome input :)

-Ashic.

From: gerard.m...@gmail.com
Date: Thu, 22 Jan 2015 16:34:38 +0100
Subject: Re: Are these numbers abnormal for spark streaming?
To: asudipta.baner...@gmail.com
CC: as...@live.com; user@spark.apache.org; tathagata.das1...@gmail.com

Given that the process, and in particular, the setup of connections, is bound 
to the number of partitions (in x.foreachPartition{ x= ???}), I think it would 
be worth trying reducing them.
Increasing the  'spark.streaming.BlockInterval' will do the trick (you can read 
the tuning details here:  http://www.virdata.com/tuning-spark/#Partitions)
-kr, Gerard.
On Thu, Jan 22, 2015 at 4:28 PM, Gerard Maas gerard.m...@gmail.com wrote:
So the system has gone from 7msg in 4.961 secs (median) to 106msgs in 4,761 
seconds. I think there's evidence that setup costs are quite high in this case 
and increasing the batch interval is helping.
On Thu, Jan 22, 2015 at 4:12 PM, Sudipta Banerjee asudipta.baner...@gmail.com 
wrote:
Hi Ashic Mahtab,

The Cassandra and the Zookeeper are they installed as a part of Yarn 
architecture or are they installed in a separate layer with Apache Spark .

Thanks and Regards,
Sudipta

On Thu, Jan 22, 2015 at 8:13 PM, Ashic Mahtab as...@live.com wrote:



Hi Guys,
So I changed the interval to 15 seconds. There's obviously a lot more messages 
per batch, but (I think) it looks a lot healthier. Can you see any major 
warning signs? I think that with 2 second intervals, the setup / teardown per 
partition was what was causing the delays.

StreamingStarted at: Thu Jan 22 13:23:12 GMT 2015Time since start: 1 hour 17 
minutes 16 secondsNetwork receivers: 2Batch interval: 15 secondsProcessed 
batches: 309Waiting batches: 0

Statistics over last 100 processed batchesReceiver 
StatisticsReceiverStatusLocationRecords in last batch[2015/01/22 
14:40:29]Minimum rate[records/sec]Median rate[records/sec]Maximum 
rate[records/sec]Last ErrorRmqReceiver-0ACTIVEVDCAPP53.foo.local2.6 
K29106295-RmqReceiver-1ACTIVEVDCAPP50.bar.local2.6 K29107291-Batch Processing 
StatisticsMetricLast batchMinimum25th percentileMedian75th 
percentileMaximumProcessing Time4 seconds 812 ms4 seconds 698 ms4 seconds 738 
ms4 seconds 761 ms4 seconds 788 ms5 seconds 802 msScheduling Delay2 ms0 ms3 ms3 
ms4 ms9 msTotal Delay4 seconds 814 ms4 seconds 701 ms4 seconds 739 ms4 seconds 
764 ms4 seconds 792 ms5 seconds 809 ms
Regards,
Ashic.
From: as...@live.com
To: gerard.m...@gmail.com
CC: user@spark.apache.org
Subject: RE: Are these numbers abnormal for spark streaming?
Date: Thu, 22 Jan 2015 12:32:05 +




Hi Gerard,
Thanks for the response.

The messages get desrialised from msgpack format, and one of the strings is 
desrialised to json. Certain fields are checked to decide if further processing 
is required. If so, it goes through a series of in mem filters to check if more 
processing is required. If so, only then does the heavy work start. That 
consists of a few db queries, and potential updates to the db + message on 
message queue. The majority of messages don't need processing. The messages 
needing processing at peak are about three every other second. 

One possible things that might be happening is the session initialisation and 
prepared statement initialisation for each partition. I can resort to some 
tricks, but I think I'll try increasing batch interval to 15 seconds. I'll 
report back with findings.

Thanks,
Ashic.

From: gerard.m...@gmail.com
Date: Thu, 22 Jan 2015 12:30:08 +0100
Subject: Re: Are these numbers abnormal for spark streaming?
To: tathagata.das1...@gmail.com
CC: as...@live.com; t...@databricks.com; user@spark.apache.org

and post the code (if possible).In a nutshell, your processing time  batch 
interval,  resulting in an ever-increasing delay that will end up in a crash.
3 secs to process 14 messages looks like a lot. Curious what the job logic is.
-kr, Gerard.
On Thu, Jan 22, 2015 at 12:15 PM, Tathagata Das tathagata.das1...@gmail.com 
wrote:
This is not normal. Its a huge scheduling delay!! Can you tell me more about 
the application?- cluser setup, number of receivers, whats the computation, etc.
On Thu, Jan 22, 2015 at 3:11 AM, Ashic Mahtab as...@live.com wrote:



Hate to do this...but...erm...bump? Would really appreciate input from others 
using Streaming. Or at least some docs that would tell me if these are expected 
or not.

From: as...@live.com
To: user@spark.apache.org
Subject: Are these numbers abnormal for spark streaming?
Date: Wed, 21 Jan 2015 11:26:31 +




Hi Guys,
I've got Spark Streaming set up for a low data rate system (using spark's 
features for analysis, rather than high throughput). Messages are coming in 
throughout the day, at around 1-20 per second (finger in the air estimate...not 
analysed yet).  In the spark streaming UI for the application, I'm getting the 
following

Are these numbers abnormal for spark streaming?

2015-01-21 Thread Ashic Mahtab
Hi Guys,
I've got Spark Streaming set up for a low data rate system (using spark's 
features for analysis, rather than high throughput). Messages are coming in 
throughout the day, at around 1-20 per second (finger in the air estimate...not 
analysed yet).  In the spark streaming UI for the application, I'm getting the 
following after 17 hours.

StreamingStarted at: Tue Jan 20 16:58:43 GMT 2015Time since start: 18 hours 24 
minutes 34 secondsNetwork receivers: 2Batch interval: 2 secondsProcessed 
batches: 16482Waiting batches: 1

Statistics over last 100 processed batchesReceiver 
StatisticsReceiverStatusLocationRecords in last batch[2015/01/21 
11:23:18]Minimum rate[records/sec]Median rate[records/sec]Maximum 
rate[records/sec]Last ErrorRmqReceiver-0ACTIVEF
144727-RmqReceiver-1ACTIVEBR
124726-Batch Processing StatisticsMetricLast batchMinimum25th 
percentileMedian75th percentileMaximumProcessing Time3 seconds 994 ms157 ms4 
seconds 16 ms4 seconds 961 ms5 seconds 3 ms5 seconds 171 msScheduling Delay9 
hours 15 minutes 4 seconds9 hours 10 minutes 54 seconds9 hours 11 minutes 56 
seconds9 hours 12 minutes 57 seconds9 hours 14 minutes 5 seconds9 hours 15 
minutes 4 secondsTotal Delay9 hours 15 minutes 8 seconds9 hours 10 minutes 58 
seconds9 hours 12 minutes9 hours 13 minutes 2 seconds9 hours 14 minutes 10 
seconds9 hours 15 minutes 8 seconds
Are these normal. I was wondering what the scheduling delay and total delay 
terms are, and if it's normal for them to be 9 hours.

I've got a standalone spark master and 4 spark nodes. The streaming app has 
been given 4 cores, and it's using 1 core per worker node. The streaming app is 
submitted from a 5th machine, and that machine has nothing but the driver 
running. The worker nodes are running alongside Cassandra (and reading and 
writing to it).

Any insights would be appreciated.

Regards,
Ashic.
  

Can multiple streaming apps use the same checkpoint directory?

2015-01-20 Thread Ashic Mahtab
Hi,
For client mode spark submits of applications, we can do the following:

def createStreamingContext() = {
...
 val sc = new SparkContext(conf)
 // Create a StreamingContext with a 1 second batch size
 val ssc = new StreamingContext(sc, Seconds(1))
}
...
val ssc = StreamingContext.getOrCreate(checkPointdir, createStreamingContext _)

If the driver goes down, and we restart it, it'll pick up where it left off. My 
question is if multiple streaming apps are submitted through the same machine, 
can the share the same check point directory, or does each have to have its own?

Thanks,
Ashic.
  

Using more cores on machines

2014-12-22 Thread Ashic Mahtab
Hi,
Say we have 4 nodes with 2 cores each in stand alone mode. I'd like to dedicate 
4 cores to a streaming application. I can do this via spark submit by:

spark-submit  --total-executor-cores 4

However, this assigns one core per machine. I would like to use 2 cores on 2 
machines instead, leaving the other two machines untouched. Is this possible? 
Is there a downside to doing this? My thinking is that I should be able to 
reduce quite a bit of network traffic if all machines are not involved.


Thanks,
Ashic.
  

RE: Using more cores on machines

2014-12-22 Thread Ashic Mahtab
Hi Sean,
Thanks for the response. 

It seems --num-executors is ignored. Specifying --num-executors 2 
--executor-cores 2 is giving the app all 8 cores across 4 machines.

-Ashic.

 From: so...@cloudera.com
 Date: Mon, 22 Dec 2014 10:57:31 +
 Subject: Re: Using more cores on machines
 To: as...@live.com
 CC: user@spark.apache.org
 
 I think you want:
 
 --num-executors 2 --executor-cores 2
 
 On Mon, Dec 22, 2014 at 10:39 AM, Ashic Mahtab as...@live.com wrote:
  Hi,
  Say we have 4 nodes with 2 cores each in stand alone mode. I'd like to
  dedicate 4 cores to a streaming application. I can do this via spark submit
  by:
 
  spark-submit  --total-executor-cores 4
 
  However, this assigns one core per machine. I would like to use 2 cores on 2
  machines instead, leaving the other two machines untouched. Is this
  possible? Is there a downside to doing this? My thinking is that I should be
  able to reduce quite a bit of network traffic if all machines are not
  involved.
 
 
  Thanks,
  Ashic.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
  

RE: Using more cores on machines

2014-12-22 Thread Ashic Mahtab
Hi Josh,
I'm not looking to change the 1:1 ratio.

What I'm trying to do is get both cores on two machines working, rather than 
one core on all four machines. With --total-executor-cores 4, I have 1 core per 
machine working for an app. I'm looking for something that'll let me use 2 
cores per machine on 2 machines (so 4 cores in total) while not using the other 
two machines.

Regards,
Ashic.

 From: j...@soundcloud.com
 Date: Mon, 22 Dec 2014 17:36:26 +0100
 Subject: Re: Using more cores on machines
 To: as...@live.com
 CC: so...@cloudera.com; user@spark.apache.org
 
 AFAIK, `--num-executors` is not available for standalone clusters. In
 standalone mode, you must start new workers on your node as it is a
 1:1 ratio of workers to executors.
 
 
 On 22 December 2014 at 12:25, Ashic Mahtab as...@live.com wrote:
  Hi Sean,
  Thanks for the response.
 
  It seems --num-executors is ignored. Specifying --num-executors 2
  --executor-cores 2 is giving the app all 8 cores across 4 machines.
 
  -Ashic.
 
  From: so...@cloudera.com
  Date: Mon, 22 Dec 2014 10:57:31 +
  Subject: Re: Using more cores on machines
  To: as...@live.com
  CC: user@spark.apache.org
 
 
  I think you want:
 
  --num-executors 2 --executor-cores 2
 
  On Mon, Dec 22, 2014 at 10:39 AM, Ashic Mahtab as...@live.com wrote:
   Hi,
   Say we have 4 nodes with 2 cores each in stand alone mode. I'd like to
   dedicate 4 cores to a streaming application. I can do this via spark
   submit
   by:
  
   spark-submit  --total-executor-cores 4
  
   However, this assigns one core per machine. I would like to use 2 cores
   on 2
   machines instead, leaving the other two machines untouched. Is this
   possible? Is there a downside to doing this? My thinking is that I
   should be
   able to reduce quite a bit of network traffic if all machines are not
   involved.
  
  
   Thanks,
   Ashic.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
  

How to run an action and get output?

2014-12-19 Thread Ashic Mahtab
Hi,Say we have an operation that writes something to an external resource and 
gets some output. For example:
val doSomething(entry:SomeEntry, session:Session) : SomeOutput = {val 
result = session.SomeOp(entry)SomeOutput(entry.Key, result.SomeProp)} 
I could use a transformation for rdd.map, but in case of failures, the map 
would run on another executor for the same rdd. I could do rdd.foreach, but 
that returns unit. Is there something like a foreach that can return values?
Thanks,
Ashic.

RE: How to run an action and get output?‏

2014-12-19 Thread Ashic Mahtab
Thanks Sean. That's kind of what I figured. Luckily, for my use case writes are 
idempotent, so map works.

 From: so...@cloudera.com
 Date: Fri, 19 Dec 2014 11:06:51 +
 Subject: Re: How to run an action and get output?‏
 To: as...@live.com
 CC: user@spark.apache.org
 
 To really be correct, I think you may have to use the foreach action
 to persist your data, since this isn't idempotent, and then read it
 again in a new RDD. You might get away with map as long as you can
 ensure that your write process is idempotent.
 
 On Fri, Dec 19, 2014 at 10:57 AM, ashic as...@live.com wrote:
  Hi,
  Say we have an operation that writes something to an external resource and
  gets some output. For example:
 
  val doSomething(entry:SomeEntry, session:Session) : SomeOutput = {
  val result = session.SomeOp(entry)
  SomeOutput(entry.Key, result.SomeProp)
  }
 
  I could use a transformation for rdd.map, but in case of failures, the map
  would run on another executor for the same rdd. I could do rdd.foreach, but
  that returns unit. Is there something like a foreach that can return values?
 
  Thanks,
  Ashic.
 
  PS: Resending to nabble email due to spam issues.
 
  
  View this message in context: How to run an action and get output?‏
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
  

Scala Lazy values and partitions

2014-12-19 Thread Ashic Mahtab
Hi Guys,
Are scala lazy values instantiated once per executor, or once per partition? 
For example, if I have:

object Something = 
val lazy context = create()

def foo(item) = context.doSomething(item)

and I do

someRdd.foreach(Something.foo)

then will context get instantiated once per executor, or once per partition?

Thanks,
Ashic.

  

RE: Scala Lazy values and partitions

2014-12-19 Thread Ashic Mahtab
Just to confirm, once per VM means that it'll be the same instance across all 
applications in a particular JVM instance (i.e. executor). So even if the spark 
application is terminated, the instance will live on, correct? I think that's 
what Sean said, and it seems logical.

From: gerard.m...@gmail.com
Date: Fri, 19 Dec 2014 12:52:23 +0100
Subject: Re: Scala Lazy values and partitions
To: as...@live.com
CC: user@spark.apache.org

It will be instantiated once per VM, which translates to once per executor.
-kr, Gerard.
On Fri, Dec 19, 2014 at 12:21 PM, Ashic Mahtab as...@live.com wrote:


Hi Guys,
Are scala lazy values instantiated once per executor, or once per partition? 
For example, if I have:

object Something = 
val lazy context = create()

def foo(item) = context.doSomething(item)

and I do

someRdd.foreach(Something.foo)

then will context get instantiated once per executor, or once per partition?

Thanks,
Ashic.

  
  

Are lazy values created once per node or once per partition?

2014-12-17 Thread Ashic Mahtab
Hello,
Say, I have the following code:

let something = Something()

someRdd.foreachRdd(something.someMethod)

And in something, I have a lazy member variable that gets created in 
something.someMethod.

Would that lazy be created once per node, or once per partition? 

Thanks,
Ashic.
  

RE: Session for connections?

2014-12-13 Thread Ashic Mahtab
Thanks for the response. The fact that they'll get killed when the sc is closed 
is quite useful in this case. I'm looking at a cluster of four workers trying 
to send messages to rabbitmq, which can have many sessions open without much 
penalty. For other stores (like say SQL) and larger clusters, the idle 
connections would be a bigger issues. One last question...if I do leave them 
open till the end of the job, does that mean one per worker or one per rdd 
partition? I'd imagine the former, but wanted to confirm.
Regards,Ashic.

 From: tathagata.das1...@gmail.com
 Date: Sat, 13 Dec 2014 15:16:46 +0800
 Subject: Re: Session for connections?
 To: as...@live.com
 CC: user@spark.apache.org
 
 That is your call. If you think it is not a problem to have large
 number of open but idle connections to your data store, then it is
 probably okay to let them hang around until the executor is killed
 (when the sparkContext is closed).
 
 TD
 
 On Fri, Dec 12, 2014 at 11:51 PM, Ashic Mahtab as...@live.com wrote:
  Looks like the way to go.
 
  Quick question regarding the connection pool approach - if I have a
  connection that gets lazily instantiated, will it automatically die if I
  kill the driver application? In my scenario, I can keep a connection open
  for the duration of the app, and aren't that concerned about having idle
  connections as long as the app is running. For this specific scenario, do I
  still need to think of the timeout, or would it be shut down when the driver
  stops? (Using a stand alone cluster btw).
 
  Regards,
  Ashic.
 
  From: tathagata.das1...@gmail.com
  Date: Thu, 11 Dec 2014 06:33:49 -0800
 
  Subject: Re: Session for connections?
  To: as...@live.com
  CC: user@spark.apache.org
 
  Also, this is covered in the streaming programming guide in bits and
  pieces.
 
  http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd
 
  On Thu, Dec 11, 2014 at 4:55 AM, Ashic Mahtab as...@live.com wrote:
   That makes sense. I'll try that.
  
   Thanks :)
  
   From: tathagata.das1...@gmail.com
   Date: Thu, 11 Dec 2014 04:53:01 -0800
   Subject: Re: Session for connections?
   To: as...@live.com
   CC: user@spark.apache.org
  
  
   You could create a lazily initialized singleton factory and connection
   pool. Whenever an executor starts running the firt task that needs to
   push out data, it will create the connection pool as a singleton. And
   subsequent tasks running on the executor is going to use the
   connection pool. You will also have to intelligently shutdown the
   connections because there is not a obvious way to shut them down. You
   could have a usage timeout - shutdown connection after not being used
   for 10 x batch interval.
  
   TD
  
   On Thu, Dec 11, 2014 at 4:28 AM, Ashic Mahtab as...@live.com wrote:
Hi,
I was wondering if there's any way of having long running session
type
behaviour in spark. For example, let's say we're using Spark
Streaming
to
listen to a stream of events. Upon receiving an event, we process it,
and if
certain conditions are met, we wish to send a message to rabbitmq.
Now,
rabbit clients have the concept of a connection factory, from which
you
create a connection, from which you create a channel. You use the
channel to
get a queue, and finally the queue is what you publish messages on.
   
Currently, what I'm doing can be summarised as :
   
dstream.foreachRDD(x = x.forEachPartition(y = {
val factory = ..
val connection = ...
val channel = ...
val queue = channel.declareQueue(...);
   
y.foreach(z = Processor.Process(z, queue));
   
cleanup the queue stuff.
}));
   
I'm doing the same thing for using Cassandra, etc. Now in these
cases,
the
session initiation is expensive, so foing it per message is not a
good
idea.
However, I can't find a way to say hey...do this per worker once and
only
once.
   
Is there a better pattern to do this?
   
Regards,
Ashic.
  
   -
   To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
   For additional commands, e-mail: user-h...@spark.apache.org
  
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
  

RE: Session for connections?

2014-12-12 Thread Ashic Mahtab
Looks like the way to go. 

Quick question regarding the connection pool approach - if I have a connection 
that gets lazily instantiated, will it automatically die if I kill the driver 
application? In my scenario, I can keep a connection open for the duration of 
the app, and aren't that concerned about having idle connections as long as the 
app is running. For this specific scenario, do I still need to think of the 
timeout, or would it be shut down when the driver stops? (Using a stand alone 
cluster btw).

Regards,
Ashic.

 From: tathagata.das1...@gmail.com
 Date: Thu, 11 Dec 2014 06:33:49 -0800
 Subject: Re: Session for connections?
 To: as...@live.com
 CC: user@spark.apache.org
 
 Also, this is covered in the streaming programming guide in bits and pieces.
 http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd
 
 On Thu, Dec 11, 2014 at 4:55 AM, Ashic Mahtab as...@live.com wrote:
  That makes sense. I'll try that.
 
  Thanks :)
 
  From: tathagata.das1...@gmail.com
  Date: Thu, 11 Dec 2014 04:53:01 -0800
  Subject: Re: Session for connections?
  To: as...@live.com
  CC: user@spark.apache.org
 
 
  You could create a lazily initialized singleton factory and connection
  pool. Whenever an executor starts running the firt task that needs to
  push out data, it will create the connection pool as a singleton. And
  subsequent tasks running on the executor is going to use the
  connection pool. You will also have to intelligently shutdown the
  connections because there is not a obvious way to shut them down. You
  could have a usage timeout - shutdown connection after not being used
  for 10 x batch interval.
 
  TD
 
  On Thu, Dec 11, 2014 at 4:28 AM, Ashic Mahtab as...@live.com wrote:
   Hi,
   I was wondering if there's any way of having long running session type
   behaviour in spark. For example, let's say we're using Spark Streaming
   to
   listen to a stream of events. Upon receiving an event, we process it,
   and if
   certain conditions are met, we wish to send a message to rabbitmq. Now,
   rabbit clients have the concept of a connection factory, from which you
   create a connection, from which you create a channel. You use the
   channel to
   get a queue, and finally the queue is what you publish messages on.
  
   Currently, what I'm doing can be summarised as :
  
   dstream.foreachRDD(x = x.forEachPartition(y = {
   val factory = ..
   val connection = ...
   val channel = ...
   val queue = channel.declareQueue(...);
  
   y.foreach(z = Processor.Process(z, queue));
  
   cleanup the queue stuff.
   }));
  
   I'm doing the same thing for using Cassandra, etc. Now in these cases,
   the
   session initiation is expensive, so foing it per message is not a good
   idea.
   However, I can't find a way to say hey...do this per worker once and
   only
   once.
  
   Is there a better pattern to do this?
  
   Regards,
   Ashic.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
  

Session for connections?

2014-12-11 Thread Ashic Mahtab
Hi,
I was wondering if there's any way of having long running session type 
behaviour in spark. For example, let's say we're using Spark Streaming to 
listen to a stream of events. Upon receiving an event, we process it, and if 
certain conditions are met, we wish to send a message to rabbitmq. Now, rabbit 
clients have the concept of a connection factory, from which you create a 
connection, from which you create a channel. You use the channel to get a 
queue, and finally the queue is what you publish messages on.

Currently, what I'm doing can be summarised as :

dstream.foreachRDD(x = x.forEachPartition(y = {
   val factory = ..
   val connection = ...
   val channel = ...
   val queue = channel.declareQueue(...);

   y.foreach(z = Processor.Process(z, queue));
   
   cleanup the queue stuff. 
}));

I'm doing the same thing for using Cassandra, etc. Now in these cases, the 
session initiation is expensive, so foing it per message is not a good idea. 
However, I can't find a way to say hey...do this per worker once and only 
once.

Is there a better pattern to do this?

Regards,
Ashic.
  

RE: Session for connections?

2014-12-11 Thread Ashic Mahtab
That makes sense. I'll try that.

Thanks :)

 From: tathagata.das1...@gmail.com
 Date: Thu, 11 Dec 2014 04:53:01 -0800
 Subject: Re: Session for connections?
 To: as...@live.com
 CC: user@spark.apache.org
 
 You could create a lazily initialized singleton factory and connection
 pool. Whenever an executor starts running the firt task that needs to
 push out data, it will create the connection pool as a singleton. And
 subsequent tasks running on the executor is going to use the
 connection pool. You will also have to intelligently shutdown the
 connections because there is not a obvious way to shut them down. You
 could have a usage timeout - shutdown connection after not being used
 for 10 x batch interval.
 
 TD
 
 On Thu, Dec 11, 2014 at 4:28 AM, Ashic Mahtab as...@live.com wrote:
  Hi,
  I was wondering if there's any way of having long running session type
  behaviour in spark. For example, let's say we're using Spark Streaming to
  listen to a stream of events. Upon receiving an event, we process it, and if
  certain conditions are met, we wish to send a message to rabbitmq. Now,
  rabbit clients have the concept of a connection factory, from which you
  create a connection, from which you create a channel. You use the channel to
  get a queue, and finally the queue is what you publish messages on.
 
  Currently, what I'm doing can be summarised as :
 
  dstream.foreachRDD(x = x.forEachPartition(y = {
 val factory = ..
 val connection = ...
 val channel = ...
 val queue = channel.declareQueue(...);
 
 y.foreach(z = Processor.Process(z, queue));
 
 cleanup the queue stuff.
  }));
 
  I'm doing the same thing for using Cassandra, etc. Now in these cases, the
  session initiation is expensive, so foing it per message is not a good idea.
  However, I can't find a way to say hey...do this per worker once and only
  once.
 
  Is there a better pattern to do this?
 
  Regards,
  Ashic.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
  

RE: Is there a way to force spark to use specific ips?

2014-12-07 Thread Ashic Mahtab
Hi Matt,That's what I'm seeing too. I've reverted to creating a fact in the 
vagrantfile + adding host in puppet. Save's from having to have the vagrant 
plugin installed. Vagrant-hosts looks interesting for scenarios where I control 
all the machines.
Cheers,Ashic.

Subject: Re: Is there a way to force spark to use specific ips?
From: matt.narr...@gmail.com
Date: Sat, 6 Dec 2014 16:34:13 -0700
CC: user@spark.apache.org
To: as...@live.com

Its much easier if you access your nodes by name.  If you’re using Vagrant, use 
the hosts provisioner:  https://github.com/adrienthebo/vagrant-hosts
mn
On Dec 6, 2014, at 8:37 AM, Ashic Mahtab as...@live.com wrote:Hi,It appears 
that spark is always attempting to use the driver's hostname to connect / 
broadcast. This is usually fine, except when the cluster doesn't have DNS 
configured. For example, in a vagrant cluster with a private network. The 
workers and masters, and the host (where the driver runs from) can all see each 
other by ip. I can also specify --conf spark.driver.host=192.168.40.1, and 
that results in the workers being able to connect to the driver. However, when 
trying to broadcast anything, it's still trying to use the hostname of the 
host. Now, I can set up a host entry in etc/hosts, but was wondering if there's 
a way to not require the hassle. Is there any way I can force spark to always 
use ips and not hostnames?
Thanks,
Ashic.
  

Is there a way to force spark to use specific ips?

2014-12-06 Thread Ashic Mahtab
Hi,It appears that spark is always attempting to use the driver's hostname to 
connect / broadcast. This is usually fine, except when the cluster doesn't have 
DNS configured. For example, in a vagrant cluster with a private network. The 
workers and masters, and the host (where the driver runs from) can all see each 
other by ip. I can also specify --conf spark.driver.host=192.168.40.1, and 
that results in the workers being able to connect to the driver. However, when 
trying to broadcast anything, it's still trying to use the hostname of the 
host. Now, I can set up a host entry in etc/hosts, but was wondering if there's 
a way to not require the hassle. Is there any way I can force spark to always 
use ips and not hostnames?
Thanks,
Ashic.

  

RE: Adding Spark Cassandra dependency breaks Spark Streaming?

2014-12-06 Thread Ashic Mahtab
Update:
It seems the following combo causes things in spark streaming to go missing:
spark-core 1.1.0spark-streaming 1.1.0spark-cassandra-connector 1.1.0
The moment I add the three together, things like StreamingContext and Seconds 
are unavailable. sbt assembly fails saying those aren't there. Sbt clean / 
deleting .ivy2 and .m2 doesn't resolve the issue.
I've also set up an 1.1.1 spark cluster, and created a jar with the following 
dependencies:
spark-core 1.1.1spark-streaming 1.1.1spark-sql 1.1.1spark-cassandra-connector 
1.1.0
Everything runs perfectly.
I'll be upgrading my clusters to 1.1.1 anyway, but I am intrigued...I'm fairly 
new to sbt, scala and the jvm in general. Any idea how having spark streaming 
1.1.0 and spark cassandra connector 1.1.0 together would cause classes in spark 
streaming to go missing?
Here's the full sbt file if anybody is interested:
import sbt._
import Keys._



name := untitled19

version := 1.0

scalaVersion := 2.10.4

val sparkCore = org.apache.spark %% spark-core % 1.1.0 % provided
val sparkStreaming = org.apache.spark %% spark-streaming % 1.1.0 % 
provided
val sparkSql = org.apache.spark %% spark-sql % 1.1.0 % provided
val sparkCassandra = com.datastax.spark %% spark-cassandra-connector % 
1.1.0 withSources() withJavadoc()

libraryDependencies ++= Seq(
  sparkCore,
  sparkSql,
  sparkStreaming,
  sparkCassandra
)

resolvers += Akka Repository at http://repo.akka.io/releases/;

assemblyMergeStrategy in assembly := {
  case PathList(META-INF, xs@_*) =
(xs map (_.toLowerCase)) match {
  case (manifest.mf :: Nil) | (index.list :: Nil) | (dependencies :: 
Nil) = MergeStrategy.discard
  case _ = MergeStrategy.discard
}
  case _ = MergeStrategy.first
} 
Regards,Ashic.

From: as...@live.com
To: yuzhih...@gmail.com
CC: user@spark.apache.org
Subject: RE: Adding Spark Cassandra dependency breaks Spark Streaming?
Date: Sat, 6 Dec 2014 03:54:19 +




Getting this on the home machine as well. Not referencing the spark cassandra 
connector in libraryDependencies compiles. 
I've recently updated IntelliJ to 14. Could that be causing an issue? 

From: as...@live.com
To: yuzhih...@gmail.com
CC: user@spark.apache.org
Subject: RE: Adding Spark Cassandra dependency breaks Spark Streaming?
Date: Fri, 5 Dec 2014 19:24:46 +




Sorry...really don't have enough maven know how to do this quickly. I tried the 
pom below, and IntelliJ could find org.apache.spark.streaming.StreamingContext 
and org.apache.spark.streaming.Seconds, but not 
org.apache.spark.streaming.receiver.Receiver. Is there something specific I can 
try? I'll try sbt on the home machine in about a couple of hours.

?xml version=1.0 encoding=UTF-8?
project xmlns=http://maven.apache.org/POM/4.0.0;
 xmlns:xsi=http://www.w3.org/2001/XMLSchema-instance;
 xsi:schemaLocation=http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;
modelVersion4.0.0/modelVersion

groupIduntitled100/groupId
artifactIduntiled100/artifactId
version1.0-SNAPSHOT/version

dependencies
dependency
groupIdorg.apache.spark/groupId
artifactIdspark-core_2.10/artifactId
version1.1.0/version
/dependency
dependency
groupIdorg.apache.spark/groupId
artifactIdspark-streaming_2.10/artifactId
version1.1.0/version
/dependency
dependency
groupIdcom.datastax.spark/groupId
artifactIdspark-cassandra-connector_2.10/artifactId
version1.1.0/version
/dependency
/dependencies

/project




Date: Fri, 5 Dec 2014 10:58:51 -0800
Subject: Re: Adding Spark Cassandra dependency breaks Spark Streaming?
From: yuzhih...@gmail.com
To: as...@live.com
CC: user@spark.apache.org

Can you try with maven ?
diff --git a/streaming/pom.xml b/streaming/pom.xmlindex b8b8f2e..6cc8102 
100644--- a/streaming/pom.xml+++ b/streaming/pom.xml@@ -68,6 +68,11 @@   
artifactIdjunit-interface/artifactId   scopetest/scope 
/dependency+dependency+  groupIdcom.datastax.spark/groupId+ 
 artifactIdspark-cassandra-connector_2.10/artifactId+  
version1.1.0/version+/dependency   /dependencies   build 
outputDirectorytarget/scala-${scala.binary.version}/classes/outputDirectory
You can use the following command:mvn -pl core,streaming package -DskipTests

Cheers
On Fri, Dec 5, 2014 at 9:35 AM, Ashic Mahtab as...@live.com wrote:



Hi,
Seems adding the cassandra connector and spark streaming causes issues. I've 
added by build and code file. Running sbt compile gives weird errors like 
Seconds is not part of org.apache.spark.streaming and object Receiver is not a 
member of package org.apache.spark.streaming.receiver. If I take out 
cassandraConnector from the list of dependencies, sbt compile succeeds.
How is adding the dependency removing things from spark streaming packages? Is 
there something I can do (perhaps in sbt) to not have this break

RE: Adding Spark Cassandra dependency breaks Spark Streaming?

2014-12-06 Thread Ashic Mahtab
Hi,Just checked cassandra connector 1.1.0-beta1 runs fine. The issue seems 
to be 1.1.0 for spark streaming and 1.1.0 cassandra connector (final).
Regards,Ashic.

Date: Sat, 6 Dec 2014 13:52:20 -0500
Subject: Re: Adding Spark Cassandra dependency breaks Spark Streaming?
From: jayunit100.apa...@gmail.com
To: as...@live.com

This is working for me as a dependency set for spark streaming app w/ cassandra.

https://github.com/jayunit100/SparkBlueprint/blob/master/build.sbt



  
  




  
  

libraryDependencies += com.datastax.spark %% 
spark-cassandra-connector % 1.1.0-beta1 withSources() withJavadoc()
  
  




  
  

libraryDependencies += org.apache.spark %% spark-core % 1.1.0
  
  




  
  

libraryDependencies +=  org.scalatest % scalatest_2.10.0-M4 % 
1.9-2.10.0-M4-B1
  
  




  
  

libraryDependencies +=  junit % junit % 4.8.1 % test
  
  




  
  

libraryDependencies += org.apache.spark %% spark-mllib % 1.1.0
  
  




  
  

libraryDependencies += org.apache.spark %% spark-sql % 1.1.0
  
  




  
  

libraryDependencies += org.apache.spark %% spark-streaming % 1.1.0
  
  




On Sat, Dec 6, 2014 at 12:29 PM, Ashic Mahtab as...@live.com wrote:



Update:
It seems the following combo causes things in spark streaming to go missing:
spark-core 1.1.0spark-streaming 1.1.0spark-cassandra-connector 1.1.0
The moment I add the three together, things like StreamingContext and Seconds 
are unavailable. sbt assembly fails saying those aren't there. Sbt clean / 
deleting .ivy2 and .m2 doesn't resolve the issue.
I've also set up an 1.1.1 spark cluster, and created a jar with the following 
dependencies:
spark-core 1.1.1spark-streaming 1.1.1spark-sql 1.1.1spark-cassandra-connector 
1.1.0
Everything runs perfectly.
I'll be upgrading my clusters to 1.1.1 anyway, but I am intrigued...I'm fairly 
new to sbt, scala and the jvm in general. Any idea how having spark streaming 
1.1.0 and spark cassandra connector 1.1.0 together would cause classes in spark 
streaming to go missing?
Here's the full sbt file if anybody is interested:
import sbt._
import Keys._



name := untitled19

version := 1.0

scalaVersion := 2.10.4

val sparkCore = org.apache.spark %% spark-core % 1.1.0 % provided
val sparkStreaming = org.apache.spark %% spark-streaming % 1.1.0 % 
provided
val sparkSql = org.apache.spark %% spark-sql % 1.1.0 % provided
val sparkCassandra = com.datastax.spark %% spark-cassandra-connector % 
1.1.0 withSources() withJavadoc()

libraryDependencies ++= Seq(
  sparkCore,
  sparkSql,
  sparkStreaming,
  sparkCassandra
)

resolvers += Akka Repository at http://repo.akka.io/releases/;

assemblyMergeStrategy in assembly := {
  case PathList(META-INF, xs@_*) =
(xs map (_.toLowerCase)) match {
  case (manifest.mf :: Nil) | (index.list :: Nil) | (dependencies :: 
Nil) = MergeStrategy.discard
  case _ = MergeStrategy.discard
}
  case _ = MergeStrategy.first
} 
Regards,Ashic.

From: as...@live.com
To: yuzhih...@gmail.com
CC: user@spark.apache.org
Subject: RE: Adding Spark Cassandra dependency breaks Spark Streaming?
Date: Sat, 6 Dec 2014 03:54:19 +




Getting this on the home machine as well. Not referencing the spark cassandra 
connector in libraryDependencies compiles. 
I've recently updated IntelliJ to 14. Could that be causing an issue? 

From: as...@live.com
To: yuzhih...@gmail.com
CC: user@spark.apache.org
Subject: RE: Adding Spark Cassandra dependency breaks Spark Streaming?
Date: Fri, 5 Dec 2014 19:24:46 +




Sorry...really don't have enough maven know how to do this quickly. I tried the 
pom below, and IntelliJ could find org.apache.spark.streaming.StreamingContext 
and org.apache.spark.streaming.Seconds, but not 
org.apache.spark.streaming.receiver.Receiver. Is there something specific I can 
try? I'll try sbt on the home machine in about a couple of hours.

?xml version=1.0 encoding=UTF-8?
project xmlns=http://maven.apache.org/POM/4.0.0;
 xmlns:xsi=http://www.w3.org/2001/XMLSchema-instance;
 xsi:schemaLocation=http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;
modelVersion4.0.0/modelVersion

groupIduntitled100/groupId
artifactIduntiled100/artifactId
version1.0-SNAPSHOT/version

dependencies
dependency
groupIdorg.apache.spark/groupId
artifactIdspark-core_2.10/artifactId
version1.1.0/version
/dependency
dependency
groupIdorg.apache.spark/groupId
artifactIdspark-streaming_2.10/artifactId
version1.1.0/version
/dependency
dependency

Adding Spark Cassandra dependency breaks Spark Streaming?

2014-12-05 Thread Ashic Mahtab
Hi,
Seems adding the cassandra connector and spark streaming causes issues. I've 
added by build and code file. Running sbt compile gives weird errors like 
Seconds is not part of org.apache.spark.streaming and object Receiver is not a 
member of package org.apache.spark.streaming.receiver. If I take out 
cassandraConnector from the list of dependencies, sbt compile succeeds.
How is adding the dependency removing things from spark streaming packages? Is 
there something I can do (perhaps in sbt) to not have this break?

Here's my build file:
import sbt.Keys._import sbt._
name := untitled99
version := 1.0
scalaVersion := 2.10.4
val spark = org.apache.spark %% spark-core % 1.1.0val sparkStreaming = 
org.apache.spark %% spark-streaming % 1.1.0val cassandraConnector = 
com.datastax.spark %% spark-cassandra-connector % 1.1.0 withSources() 
withJavadoc()
libraryDependencies ++= Seq(cassandraConnector,spark,sparkStreaming)
resolvers += Akka Repository at http://repo.akka.io/releases/;
And here's my code:
import org.apache.spark.SparkContextimport 
org.apache.spark.storage.StorageLevelimport 
org.apache.spark.streaming.{Seconds, StreamingContext}import 
org.apache.spark.streaming.receiver.Receiverobject Foo {def main(args: 
Array[String]) {val context = new SparkContext()val ssc = new 
StreamingContext(context, Seconds(2))}}class Bar extends Receiver[Int]{override 
def onStart(): Unit = ???override def onStop(): Unit = ???}
  

RE: Spark Streaming Reusing JDBC Connections

2014-12-05 Thread Ashic Mahtab
I've done this:

1. foreachPartition
2. Open connection.
3. foreach inside the partition.
4. close the connection.

Slightly crufty, but works. Would love to see a better approach.

Regards,
Ashic.

Date: Fri, 5 Dec 2014 12:32:24 -0500
Subject: Spark Streaming Reusing JDBC Connections
From: asimja...@gmail.com
To: user@spark.apache.org

Is there a way I can have a JDBC connection open through a streaming job. I 
have a foreach which is running once per batch. However, I don’t want to open 
the connection for each batch but would rather have a persistent connection 
that I can reuse. How can I do this?

Thanks.
Asim  

RE: Adding Spark Cassandra dependency breaks Spark Streaming?

2014-12-05 Thread Ashic Mahtab
Sorry...really don't have enough maven know how to do this quickly. I tried the 
pom below, and IntelliJ could find org.apache.spark.streaming.StreamingContext 
and org.apache.spark.streaming.Seconds, but not 
org.apache.spark.streaming.receiver.Receiver. Is there something specific I can 
try? I'll try sbt on the home machine in about a couple of hours.

?xml version=1.0 encoding=UTF-8?
project xmlns=http://maven.apache.org/POM/4.0.0;
 xmlns:xsi=http://www.w3.org/2001/XMLSchema-instance;
 xsi:schemaLocation=http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;
modelVersion4.0.0/modelVersion

groupIduntitled100/groupId
artifactIduntiled100/artifactId
version1.0-SNAPSHOT/version

dependencies
dependency
groupIdorg.apache.spark/groupId
artifactIdspark-core_2.10/artifactId
version1.1.0/version
/dependency
dependency
groupIdorg.apache.spark/groupId
artifactIdspark-streaming_2.10/artifactId
version1.1.0/version
/dependency
dependency
groupIdcom.datastax.spark/groupId
artifactIdspark-cassandra-connector_2.10/artifactId
version1.1.0/version
/dependency
/dependencies

/project




Date: Fri, 5 Dec 2014 10:58:51 -0800
Subject: Re: Adding Spark Cassandra dependency breaks Spark Streaming?
From: yuzhih...@gmail.com
To: as...@live.com
CC: user@spark.apache.org

Can you try with maven ?
diff --git a/streaming/pom.xml b/streaming/pom.xmlindex b8b8f2e..6cc8102 
100644--- a/streaming/pom.xml+++ b/streaming/pom.xml@@ -68,6 +68,11 @@   
artifactIdjunit-interface/artifactId   scopetest/scope 
/dependency+dependency+  groupIdcom.datastax.spark/groupId+ 
 artifactIdspark-cassandra-connector_2.10/artifactId+  
version1.1.0/version+/dependency   /dependencies   build 
outputDirectorytarget/scala-${scala.binary.version}/classes/outputDirectory
You can use the following command:mvn -pl core,streaming package -DskipTests

Cheers
On Fri, Dec 5, 2014 at 9:35 AM, Ashic Mahtab as...@live.com wrote:



Hi,
Seems adding the cassandra connector and spark streaming causes issues. I've 
added by build and code file. Running sbt compile gives weird errors like 
Seconds is not part of org.apache.spark.streaming and object Receiver is not a 
member of package org.apache.spark.streaming.receiver. If I take out 
cassandraConnector from the list of dependencies, sbt compile succeeds.
How is adding the dependency removing things from spark streaming packages? Is 
there something I can do (perhaps in sbt) to not have this break?

Here's my build file:
import sbt.Keys._
import sbt._
name := untitled99
version := 1.0
scalaVersion := 2.10.4
val spark = org.apache.spark %% spark-core % 1.1.0
val sparkStreaming = org.apache.spark %% spark-streaming % 1.1.0
val cassandraConnector = com.datastax.spark %% spark-cassandra-connector % 
1.1.0 withSources() withJavadoc()
libraryDependencies ++= Seq(
cassandraConnector,
spark,
sparkStreaming
)
resolvers += Akka Repository at http://repo.akka.io/releases/;
And here's my code:
import org.apache.spark.SparkContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.receiver.Receiverobject Foo {
def main(args: Array[String]) {
val context = new SparkContext()
val ssc = new StreamingContext(context, Seconds(2))
}
}class Bar extends Receiver[Int]{
override def onStart(): Unit = ???override def onStop(): Unit = ???
}
  

  

RE: Adding Spark Cassandra dependency breaks Spark Streaming?

2014-12-05 Thread Ashic Mahtab
Getting this on the home machine as well. Not referencing the spark cassandra 
connector in libraryDependencies compiles. 
I've recently updated IntelliJ to 14. Could that be causing an issue? 

From: as...@live.com
To: yuzhih...@gmail.com
CC: user@spark.apache.org
Subject: RE: Adding Spark Cassandra dependency breaks Spark Streaming?
Date: Fri, 5 Dec 2014 19:24:46 +




Sorry...really don't have enough maven know how to do this quickly. I tried the 
pom below, and IntelliJ could find org.apache.spark.streaming.StreamingContext 
and org.apache.spark.streaming.Seconds, but not 
org.apache.spark.streaming.receiver.Receiver. Is there something specific I can 
try? I'll try sbt on the home machine in about a couple of hours.

?xml version=1.0 encoding=UTF-8?
project xmlns=http://maven.apache.org/POM/4.0.0;
 xmlns:xsi=http://www.w3.org/2001/XMLSchema-instance;
 xsi:schemaLocation=http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;
modelVersion4.0.0/modelVersion

groupIduntitled100/groupId
artifactIduntiled100/artifactId
version1.0-SNAPSHOT/version

dependencies
dependency
groupIdorg.apache.spark/groupId
artifactIdspark-core_2.10/artifactId
version1.1.0/version
/dependency
dependency
groupIdorg.apache.spark/groupId
artifactIdspark-streaming_2.10/artifactId
version1.1.0/version
/dependency
dependency
groupIdcom.datastax.spark/groupId
artifactIdspark-cassandra-connector_2.10/artifactId
version1.1.0/version
/dependency
/dependencies

/project




Date: Fri, 5 Dec 2014 10:58:51 -0800
Subject: Re: Adding Spark Cassandra dependency breaks Spark Streaming?
From: yuzhih...@gmail.com
To: as...@live.com
CC: user@spark.apache.org

Can you try with maven ?
diff --git a/streaming/pom.xml b/streaming/pom.xmlindex b8b8f2e..6cc8102 
100644--- a/streaming/pom.xml+++ b/streaming/pom.xml@@ -68,6 +68,11 @@   
artifactIdjunit-interface/artifactId   scopetest/scope 
/dependency+dependency+  groupIdcom.datastax.spark/groupId+ 
 artifactIdspark-cassandra-connector_2.10/artifactId+  
version1.1.0/version+/dependency   /dependencies   build 
outputDirectorytarget/scala-${scala.binary.version}/classes/outputDirectory
You can use the following command:mvn -pl core,streaming package -DskipTests

Cheers
On Fri, Dec 5, 2014 at 9:35 AM, Ashic Mahtab as...@live.com wrote:



Hi,
Seems adding the cassandra connector and spark streaming causes issues. I've 
added by build and code file. Running sbt compile gives weird errors like 
Seconds is not part of org.apache.spark.streaming and object Receiver is not a 
member of package org.apache.spark.streaming.receiver. If I take out 
cassandraConnector from the list of dependencies, sbt compile succeeds.
How is adding the dependency removing things from spark streaming packages? Is 
there something I can do (perhaps in sbt) to not have this break?

Here's my build file:
import sbt.Keys._
import sbt._
name := untitled99
version := 1.0
scalaVersion := 2.10.4
val spark = org.apache.spark %% spark-core % 1.1.0
val sparkStreaming = org.apache.spark %% spark-streaming % 1.1.0
val cassandraConnector = com.datastax.spark %% spark-cassandra-connector % 
1.1.0 withSources() withJavadoc()
libraryDependencies ++= Seq(
cassandraConnector,
spark,
sparkStreaming
)
resolvers += Akka Repository at http://repo.akka.io/releases/;
And here's my code:
import org.apache.spark.SparkContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.receiver.Receiverobject Foo {
def main(args: Array[String]) {
val context = new SparkContext()
val ssc = new StreamingContext(context, Seconds(2))
}
}class Bar extends Receiver[Int]{
override def onStart(): Unit = ???override def onStop(): Unit = ???
}
  


  

RE: Kryo exception for CassandraSQLRow

2014-12-01 Thread Ashic Mahtab
Don't know if this'll solve it, but if you're on Spark 1.1, the Cassandra 
Connector version 1.1.0 final fixed the guava back compat issue. Maybe taking 
the guava exclusions might help?

Date: Mon, 1 Dec 2014 10:48:25 +0100
Subject: Kryo exception for CassandraSQLRow
From: shahab.mok...@gmail.com
To: user@spark.apache.org

I am using Cassandra-Spark connector to pull data from Cassandra, process it 
and write it back to Cassandra.
 Now I am  getting the following exception, and apparently it is Kryo 
serialisation. Does anyone what is the reason and how this can be solved?
I also tried to register org.apache.spark.sql.cassandra.CassandraSQLRow in  
kryo.register , but even this did not solve the problem and exception remains.
WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 7, ip-X-Y-Z): 
com.esotericsoftware.kryo.KryoException: Unable to find class: 
org.apache.spark.sql.cassandra.CassandraSQLRowSerialization trace:_2 
(org.apache.spark.util.MutablePair)
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)

com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610)
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:599)

com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133)

org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)   
 
org.apache.spark.storage.BlockManager$LazyProxyIterator$1.hasNext(BlockManager.scala:1171)
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)   
 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)  
  scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1218)
org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:904)
org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:904)
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1143)  
  
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1143)  
  org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
   
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
   java.lang.Thread.run(Thread.java:745)


I am using  Spark 1.1.0 with cassandra-spark connector 1.1.0 , here is the 
build:







   org.apache.spark % spark-mllib_2.10 % 1.1.0 
exclude(com.google.guava, guava),

com.google.guava % guava % 16.0 % provided,
com.datastax.spark %% spark-cassandra-connector % 1.1.0 
exclude(com.google.guava, guava)   withSources() withJavadoc(),

org.apache.cassandra % cassandra-all % 2.1.1  
exclude(com.google.guava, guava) ,

org.apache.cassandra % cassandra-thrift % 2.1.1  
exclude(com.google.guava, guava) ,

com.datastax.cassandra % cassandra-driver-core % 2.1.2  
exclude(com.google.guava, guava) ,

org.apache.spark %% spark-core % 1.1.0 % provided 
exclude(com.google.guava, guava) exclude(org.apache.hadoop, 
hadoop-core),

org.apache.spark %% spark-streaming % 1.1.0 % provided  
exclude(com.google.guava, guava),

org.apache.spark %% spark-catalyst   % 1.1.0  % provided 
exclude(com.google.guava, guava) exclude(org.apache.spark, spark-core),

 org.apache.spark %% spark-sql % 1.1.0 %  provided 
exclude(com.google.guava, guava) exclude(org.apache.spark, spark-core),

org.apache.spark %% spark-hive % 1.1.0 % provided 
exclude(com.google.guava, guava) exclude(org.apache.spark, spark-core), 
   

org.apache.hadoop % hadoop-client % 1.0.4 % provided,

best,/Shahab
  

Best way to do a lookup in Spark

2014-11-27 Thread Ashic Mahtab
Hi,
I'm looking to do an iterative algorithm implementation with data coming in 
from Cassandra. This might be a use case for GraphX, however the ids are 
non-integral, and I would like to avoid a mapping (for now). I'm doing a simple 
hubs and authorities HITS implementation, and the current implementation does a 
lot of db access. It's fine (one half of a full iteration is done in 25 minutes 
on 3M+ vertices), and use of Spark's cache() has achieved that. However, each 
full iteration is 50 minutes, and I would like to improve that.

A high level overview of what I'm trying to do is:

1) Vertex structure (id, in, out, aScore, hScore).
2) Load all the vertices into memory (simple enough).
3) Have a lookup vertexid - (aScore, hScore) in memory (currently, this is 
where I need to do a lot of cassandra queries...which are very fast, but hoping 
to avoid).
4) Iterate n times in 2 statges:
In the Hub Stage:
a) Foreach vertex, get the sum of aScores for vertices it points to. Cache 
this.
b) From the cache, get the max score. Divide each score in the cache by the 
max.
c) Get rid of the cache.
d) Update the lookup (in (3)) with the new hScores.
   
In the Authority Stage:
a) Foreach vertex, get the sum of hScores for vertices that point to it. 
Cache this.
b) From the cache, get the max score. Divide each score in the cache by the 
max.
c) Get rid of the cache.
d) Update the lookup (in (3)) with the new aScores.
 
5) Update the final aScores and hScores from memory to Cassandra.

The one bit that I don't have now is the in memory lookup (i.e. to get the 
hScores and aScores of neighbours in (4-a ). As such, I have to query cassandra 
for each vertex x times where x is the number of neighbours. And as those 
values are used in the next iteration, I also have to update cassandra for each 
run. Is it possibly to have this as an in memory distributed lookup so that I 
can deal with the data store at the start and end?

One option is to identify clusters and run HITS for each cluster entirely in 
memory, however if there's a simpler way I'd prefer that.

Regards,
Ashic.
  

Spark Cassandra Guava version issues

2014-11-24 Thread Ashic Mahtab
I've got a Cassandra 2.1.1 + Spark 1.1.0 cluster running. I'm using 
sbt-assembly to create a uber jar to submit to the stand alone master. I'm 
using the hadoop 1 prebuilt binaries for Spark. As soon as I try to do 
sc.CassandraTable(...) I get an error that's likely to be a Guava versioning 
issue. I'm using the Spark Cassandra connector v 1.1.0-rc2 which just came out, 
though the issue was in rc1 as well. I can't see the cassandra connector using 
Guava directly, so I guess it's a dependency for some other thing that the 
cassandra spark connector is using. Does anybody have a workaround for this?

The sbt file and the exception are given below.

Regards,
Ashic.


sbt file:

import sbt._
import Keys._
import sbtassembly.Plugin._
import AssemblyKeys._

assemblySettings

name := foo

version := 0.1.0

scalaVersion := 2.10.4

libraryDependencies ++= Seq (
  org.apache.spark %% spark-core % 1.1.0 % provided,
  org.apache.spark %% spark-sql % 1.1.0 % provided,
  com.datastax.spark %% spark-cassandra-connector % 1.1.0-rc2 
withSources() withJavadoc(),
  org.specs2 %% specs2 % 2.4 % test withSources()
)

//allow provided for run
run in Compile = Defaults.runTask(fullClasspath in Compile, mainClass in 
(Compile, run), runner in (Compile, run))

mergeStrategy in assembly := {
  case PathList(META-INF, xs @ _*) =
(xs map {_.toLowerCase}) match {
  case (manifest.mf :: Nil) | (index.list :: Nil) | (dependencies :: 
Nil) = MergeStrategy.discard
  case _ = MergeStrategy.discard
}
  case _ = MergeStrategy.first
}

resolvers += Akka Repository at http://repo.akka.io/releases/;

test in assembly := {}

Exception:
14/11/24 14:20:11 INFO client.AppClient$ClientActor: Executor updated: 
app-20141124142008-0001/0 is now RUNNING
Exception in thread main java.lang.NoSuchMethodError: 
com.google.common.collect.Sets.newConcurrentHashSet()Ljava/util/Set;
at 
com.datastax.driver.core.Cluster$ConnectionReaper.init(Cluster.java:2065)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1163)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1110)
at com.datastax.driver.core.Cluster.init(Cluster.java:118)
at com.datastax.driver.core.Cluster.init(Cluster.java:105)
at com.datastax.driver.core.Cluster.buildFrom(Cluster.java:174)
at com.datastax.driver.core.Cluster$Builder.build(Cluster.java:1075)
at 
com.datastax.spark.connector.cql.DefaultConnectionFactory$.createCluster(CassandraConnectionFactory.scala:81)
at 
com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:165)
at 
com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:160)
at 
com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:160)
at 
com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:36)
at 
com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:61)
at 
com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:71)
at 
com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:97)
at 
com.datastax.spark.connector.cql.CassandraConnector.withClusterDo(CassandraConnector.scala:108)
at 
com.datastax.spark.connector.cql.Schema$.fromCassandra(Schema.scala:134)
at 
com.datastax.spark.connector.rdd.CassandraRDD.tableDef$lzycompute(CassandraRDD.scala:227)
at 
com.datastax.spark.connector.rdd.CassandraRDD.tableDef(CassandraRDD.scala:226)
at 
com.datastax.spark.connector.rdd.CassandraRDD.verify$lzycompute(CassandraRDD.scala:266)
at 
com.datastax.spark.connector.rdd.CassandraRDD.verify(CassandraRDD.scala:263)
at 
com.datastax.spark.connector.rdd.CassandraRDD.getPartitions(CassandraRDD.scala:292)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at 

RE: Spark Cassandra Guava version issues

2014-11-24 Thread Ashic Mahtab
Did the workaround work for you? Doesn't seem to work for me.

Date: Mon, 24 Nov 2014 16:44:17 +0100
Subject: Re: Spark Cassandra Guava version issues
From: shahab.mok...@gmail.com
To: as...@live.com
CC: user@spark.apache.org


I faced same problem, and s work around  solution is here : 
https://github.com/datastax/spark-cassandra-connector/issues/292
best,/Shahab


On Mon, Nov 24, 2014 at 3:21 PM, Ashic Mahtab as...@live.com wrote:



I've got a Cassandra 2.1.1 + Spark 1.1.0 cluster running. I'm using 
sbt-assembly to create a uber jar to submit to the stand alone master. I'm 
using the hadoop 1 prebuilt binaries for Spark. As soon as I try to do 
sc.CassandraTable(...) I get an error that's likely to be a Guava versioning 
issue. I'm using the Spark Cassandra connector v 1.1.0-rc2 which just came out, 
though the issue was in rc1 as well. I can't see the cassandra connector using 
Guava directly, so I guess it's a dependency for some other thing that the 
cassandra spark connector is using. Does anybody have a workaround for this?

The sbt file and the exception are given below.

Regards,
Ashic.


sbt file:

import sbt._
import Keys._
import sbtassembly.Plugin._
import AssemblyKeys._

assemblySettings

name := foo

version := 0.1.0

scalaVersion := 2.10.4

libraryDependencies ++= Seq (
  org.apache.spark %% spark-core % 1.1.0 % provided,
  org.apache.spark %% spark-sql % 1.1.0 % provided,
  com.datastax.spark %% spark-cassandra-connector % 1.1.0-rc2 
withSources() withJavadoc(),
  org.specs2 %% specs2 % 2.4 % test withSources()
)

//allow provided for run
run in Compile = Defaults.runTask(fullClasspath in Compile, mainClass in 
(Compile, run), runner in (Compile, run))

mergeStrategy in assembly := {
  case PathList(META-INF, xs @ _*) =
(xs map {_.toLowerCase}) match {
  case (manifest.mf :: Nil) | (index.list :: Nil) | (dependencies :: 
Nil) = MergeStrategy.discard
  case _ = MergeStrategy.discard
}
  case _ = MergeStrategy.first
}

resolvers += Akka Repository at http://repo.akka.io/releases/;

test in assembly := {}

Exception:
14/11/24 14:20:11 INFO client.AppClient$ClientActor: Executor updated: 
app-20141124142008-0001/0 is now RUNNING
Exception in thread main java.lang.NoSuchMethodError: 
com.google.common.collect.Sets.newConcurrentHashSet()Ljava/util/Set;
at 
com.datastax.driver.core.Cluster$ConnectionReaper.init(Cluster.java:2065)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1163)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1110)
at com.datastax.driver.core.Cluster.init(Cluster.java:118)
at com.datastax.driver.core.Cluster.init(Cluster.java:105)
at com.datastax.driver.core.Cluster.buildFrom(Cluster.java:174)
at com.datastax.driver.core.Cluster$Builder.build(Cluster.java:1075)
at 
com.datastax.spark.connector.cql.DefaultConnectionFactory$.createCluster(CassandraConnectionFactory.scala:81)
at 
com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:165)
at 
com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:160)
at 
com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:160)
at 
com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:36)
at 
com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:61)
at 
com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:71)
at 
com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:97)
at 
com.datastax.spark.connector.cql.CassandraConnector.withClusterDo(CassandraConnector.scala:108)
at 
com.datastax.spark.connector.cql.Schema$.fromCassandra(Schema.scala:134)
at 
com.datastax.spark.connector.rdd.CassandraRDD.tableDef$lzycompute(CassandraRDD.scala:227)
at 
com.datastax.spark.connector.rdd.CassandraRDD.tableDef(CassandraRDD.scala:226)
at 
com.datastax.spark.connector.rdd.CassandraRDD.verify$lzycompute(CassandraRDD.scala:266)
at 
com.datastax.spark.connector.rdd.CassandraRDD.verify(CassandraRDD.scala:263)
at 
com.datastax.spark.connector.rdd.CassandraRDD.getPartitions(CassandraRDD.scala:292)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120

RE: Spark or MR, Scala or Java?

2014-11-22 Thread Ashic Mahtab
Spark can do Map Reduce and more, and faster.
One area where using MR would make sense is if you're using something (maybe 
like Mahout) that doesn't understand Spark yet (Mahout may be Spark compatible 
now...just pulled that name out of thin air!).
You *can* use Spark from Java, but you'd have a MUCH better time using Scala. 
You don't necessarily need to know heaps of Scala to get stuff done in Spark. 
I'm not from a JVM background, having been in the .NET world for most of my 
career, and I haven't found scala at all difficult. And considering the amount 
of stuff in Spark that's built on or uses Scala, it'll always be first class. 
If you write Spark stuff in Java, you'll need a) a LOT more code, and b) will 
have to deal with Spark bridging classes that are provided to overcome 
deficiencies in Java.
Hope that helps.
 Date: Sat, 22 Nov 2014 16:34:04 +0100
 Subject: Spark or MR, Scala or Java?
 From: konstt2...@gmail.com
 To: user@spark.apache.org
 
 Hello,
 
 I'm a newbie with Spark but I've been working with Hadoop for a while.
 I have two questions.
 
 Is there any case where MR is better than Spark? I don't know what
 cases I should be used Spark by MR. When is MR faster than Spark?
 
 The other question is, I know Java, is it worth it to learn Scala for
 programming to Spark or it's okay just with Java? I have done a little
 piece of code with Java because I feel more confident with it,, but I
 seems that I'm missed something
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
  

RE: tableau spark sql cassandra

2014-11-20 Thread Ashic Mahtab
Hi Jerome,
I've been trying to get this working as well...

Where are you specifying cassandra parameters (i.e. seed nodes, consistency 
levels, etc.)?

-Ashic.

 Date: Thu, 20 Nov 2014 10:34:58 -0700
 From: jer...@gmail.com
 To: u...@spark.incubator.apache.org
 Subject: Re: tableau spark sql cassandra
 
 Well, after many attempts I can now successfully run the thrift server using
 root@cdb-01:~/spark# ./sbin/start-thriftserver.sh --master
 spark://10.194.30.2:7077 --hiveconf hive.server2.thrift.bind.host 0.0.0.0
 --hiveconf hive.server2.thrift.port 1
 
 (the command was failing because of the --driver-class-path $CLASSPATH
 parameter which I guess was setting the spark.driver.extraClassPath) and I
 can get the cassandra data using beeline!
 
 However, the table's values are null in Tableau but this is another problem
 ;)
 
 Best,
 Jerome
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/tableau-spark-sql-cassandra-tp19282p19392.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
  

RE: Submitting Python Applications from Remote to Master

2014-11-15 Thread Ashic Mahtab
Hi Ben,I haven't tried it with Python, but the instructions are the same as for 
Scala compiled (jar) apps. What it's saying is that it's not possible to 
offload the entire work to the master (ala hadoop) in a fire and forget (or 
rather submit-and-forget) manner when running on stand alone. There are two 
deployment modes - client and cluster. For standalone, only client is 
supported. What this means is that the submitting process will be the driver 
process (not to be confused with master). It should very well be possible to 
submit from you laptop to a standalone cluster, but the process running 
spark-submit will be alive until the job finishes. If you terminate the process 
(via kill-9 or otherwise), then the job will be terminated as well. The driver 
process will submit the work to the spark master, which will do the usually 
divvying up of tasks, distribution, fault tolerance, etc. and the results will 
get reported back to the driver process. 
Often it's not possible to have arbitrary access to the spark master, and if 
jobs take hours to complete, it's not feasible to have the process running on 
the laptop without interruptions, disconnects, etc. As such, a gateway 
machine is used closer to the spark master that's used to submit jobs from. 
That way, the process on the gateway machine lives for the duration of the job, 
and no connection from the laptop, etc. is needed. It's not uncommon to 
actually have an api to the gateway machine. For example, Ooyala's job server 
https://github.com/ooyala/spark-jobserver provides a restful interface to 
submit jobs.
Does that help?
Regards,Ashic.
Date: Fri, 14 Nov 2014 13:40:43 -0600
Subject: Submitting Python Applications from Remote to Master
From: quasi...@gmail.com
To: user@spark.apache.org

Hi All,
I'm not quite clear on whether submitting a python application to spark 
standalone on ec2 is possible. 
Am I reading this correctly:
*A common deployment strategy is to submit your application from a gateway 
machine that is physically co-located with your worker machines (e.g. Master 
node in a standalone EC2 cluster). In this setup, client mode is appropriate. 
In client mode, the driver is launched directly within the client spark-submit 
process, with the input and output of the application attached to the console. 
Thus, this mode is especially suitable for applications that involve the REPL 
(e.g. Spark shell).Alternatively, if your application is submitted from a 
machine far from the worker machines (e.g. locally on your laptop), it is 
common to usecluster mode to minimize network latency between the drivers and 
the executors. Note that cluster mode is currently not supported for standalone 
clusters, Mesos clusters, or python applications.
So I shouldn't be able to do something like:./bin/spark-submit  --master 
spark:/x.compute-1.amazonaws.com:7077  examples/src/main/python/pi.py 
From a laptop connecting to a previously launched spark cluster using the 
default spark-ec2 script, correct?
If I am not mistaken about this then docs are slightly confusing -- the above 
example is more or less the example here: 
https://spark.apache.org/docs/1.1.0/submitting-applications.html
If I am mistaken, apologies, can you help me figure out where I went wrong?I've 
also taken to opening port 7077 to 0.0.0.0/0
--Ben


  

RE: Submitting Python Applications from Remote to Master

2014-11-15 Thread Ashic Mahtab
Hi Ognen,Currently, 
Note that cluster mode is currently not supported for standalone clusters, 
Mesos clusters, or python applications.
So it seems like Yarn + scala is the only option for fire and forget. It 
shouldn't be too hard to create a proxy submitter, but yes, that does involve 
another process (potentially server) on that side. I've heard good things about 
Ooyala's server, but haven't got around to trying to set it up. As such, can't 
really comment.
Regards,Ashic. 
 Date: Sat, 15 Nov 2014 09:50:14 -0600
 From: ognen.duzlev...@gmail.com
 To: as...@live.com
 CC: quasi...@gmail.com; user@spark.apache.org
 Subject: Re: Submitting Python Applications from Remote to Master
 
 Ashic,
 
 Thanks for your email.
 
 Two things:
 
 1. I think a whole lot of data scientists and other people would love
 it if they could just fire off jobs from their laptops. It is, in my
 opinion, a common desired use case.
 
 2. Did anyone actually get the Ooyala job server to work? I asked that
 question 6 months ago and never got a straight answer. I ended up
 writing a middle-layer using Scalatra and actors to submit jobs via an
 API and receive results back in JSON. In that I ran into the inability
 to share the SparkContext feature and it took a lot of finagling to
 make things work (but it never felt production ready).
 
 Ognen
 
 On Sat, Nov 15, 2014 at 03:36:43PM +, Ashic Mahtab wrote:
  Hi Ben,I haven't tried it with Python, but the instructions are the same as 
  for Scala compiled (jar) apps. What it's saying is that it's not possible 
  to offload the entire work to the master (ala hadoop) in a fire and forget 
  (or rather submit-and-forget) manner when running on stand alone. There are 
  two deployment modes - client and cluster. For standalone, only client is 
  supported. What this means is that the submitting process will be the 
  driver process (not to be confused with master). It should very well be 
  possible to submit from you laptop to a standalone cluster, but the process 
  running spark-submit will be alive until the job finishes. If you terminate 
  the process (via kill-9 or otherwise), then the job will be terminated as 
  well. The driver process will submit the work to the spark master, which 
  will do the usually divvying up of tasks, distribution, fault tolerance, 
  etc. and the results will get reported back to the driver process. 
  Often it's not possible to have arbitrary access to the spark master, and 
  if jobs take hours to complete, it's not feasible to have the process 
  running on the laptop without interruptions, disconnects, etc. As such, a 
  gateway machine is used closer to the spark master that's used to submit 
  jobs from. That way, the process on the gateway machine lives for the 
  duration of the job, and no connection from the laptop, etc. is needed. 
  It's not uncommon to actually have an api to the gateway machine. For 
  example, Ooyala's job server https://github.com/ooyala/spark-jobserver 
  provides a restful interface to submit jobs.
  Does that help?
  Regards,Ashic.
  Date: Fri, 14 Nov 2014 13:40:43 -0600
  Subject: Submitting Python Applications from Remote to Master
  From: quasi...@gmail.com
  To: user@spark.apache.org
  
  Hi All,
  I'm not quite clear on whether submitting a python application to spark 
  standalone on ec2 is possible. 
  Am I reading this correctly:
  *A common deployment strategy is to submit your application from a gateway 
  machine that is physically co-located with your worker machines (e.g. 
  Master node in a standalone EC2 cluster). In this setup, client mode is 
  appropriate. In client mode, the driver is launched directly within the 
  client spark-submit process, with the input and output of the application 
  attached to the console. Thus, this mode is especially suitable for 
  applications that involve the REPL (e.g. Spark shell).Alternatively, if 
  your application is submitted from a machine far from the worker machines 
  (e.g. locally on your laptop), it is common to usecluster mode to minimize 
  network latency between the drivers and the executors. Note that cluster 
  mode is currently not supported for standalone clusters, Mesos clusters, or 
  python applications.
  So I shouldn't be able to do something like:./bin/spark-submit  --master 
  spark:/x.compute-1.amazonaws.com:7077  examples/src/main/python/pi.py 
  From a laptop connecting to a previously launched spark cluster using the 
  default spark-ec2 script, correct?
  If I am not mistaken about this then docs are slightly confusing -- the 
  above example is more or less the example here: 
  https://spark.apache.org/docs/1.1.0/submitting-applications.html
  If I am mistaken, apologies, can you help me figure out where I went 
  wrong?I've also taken to opening port 7077 to 0.0.0.0/0
  --Ben
  
  

 
 -- 
 Convictions are more dangerous enemies of truth than lies. - Friedrich 
 Nietzsche

RE: Spark-submit and Windows / Linux mixed network

2014-11-12 Thread Ashic Mahtab
jar not found :(

Seems if I create a directory sim link so that the share path in the same on 
the unix mount point as in windows, and submit from the drive where the mount 
point is, then it works. Granted, that's quite an ugly hack.

Reverting to serving jar off http (i.e. using a relative path) for the time 
being.

Date: Tue, 11 Nov 2014 20:15:17 +0530
Subject: Re: Spark-submit and Windows / Linux mixed network
From: riteshoneinamill...@gmail.com
To: as...@live.com
CC: user@spark.apache.org

Never tried this form but just guessing,
What's the output when you submit this jar: 
\\shares\publish\Spark\app1\someJar.jarusing spark-submit.cmd   
  

Spark-submit and Windows / Linux mixed network

2014-11-11 Thread Ashic Mahtab
Hi,
I'm trying to submit a spark application fro network share to the spark master. 
Network shares are configured so that the master and all nodes have access to 
the target ja at (say):

\\shares\publish\Spark\app1\someJar.jar


And this is mounted on each linux box (i.e. master and workers) at:


/mnt/spark/app1/someJar.jar


I'm using the following to submit the app from a windows machine:


spark-submit.cmd --class Main --master spark://mastername:7077 
local:/mnt/spark/app1/someJar.jar


However, I get an error saying:


Warning: Local jar \mnt\spark\app1\someJar.jar does not exist, skipping.


Followed by a ClassNotFoundException: Main.


Notice the \'s instead of /s. 


Does anybody have experience getting something similar to work?


Regards,
Ashic.

  

Solidifying Understanding of Standalone Mode

2014-11-10 Thread Ashic Mahtab
Hello,
I'm hoping to understand exactly what happens when a spark compiled app is 
submitted to a spark stand-alone cluster master. Say, our master is A, and 
workers are W1 and W2. Client machine C is submitting an app to the master 
using spark-submit. Here's what I think happens?

* C submits jar (possibly uber jar) to A. A starts execution and sends 
partitions to W1 and W2 to carry out work. Results are sent back to A. Results 
are stored in output files / tables according to the application. W1 and W2 may 
also be reading and writing data to and from sources. The submission from C is 
fire and forget, and the final results aren't sent back to C.

Is this correct?

I noticed something about the submitting processes working as the driver 
application for Spark stand alone. That would mean the above is wrong. Is there 
some information about exactly what happens when I submit an app to the Spark 
master in a stand alone cluster?

Thanks,
Ashic.
  

Redploying a spark streaming application

2014-11-06 Thread Ashic Mahtab
Hello,I'm trying to find the best way of redeploying a spark streaming 
application. Ideally, I was thinking of a scenario where a build server 
packages up a jar and a deployment step submits it to a Spark Master. On the 
next successful build, the next version would get deployed taking down the 
previous version. What would be the best way of achieving this?
Thanks,Ashic. 

Standalone Specify mem / cores defaults

2014-11-05 Thread Ashic Mahtab
Hi,
The docs specify that we can control the amount of ram / cores available via:

-c CORES, --cores CORESTotal CPU cores to allow Spark applications to use on 
the machine (default: all available); only on worker-m MEM, --memory MEMTotal 
amount of memory to allow Spark applications to use on the machine, in a format 
like 1000M or 2G (default: your machine's total RAM minus 1 GB); only on worker
Ommitting these values would cause them to take on defaults. Is there a way of 
specifying the default? Or is the only way for it to take on default values 
is to ommit the parameters? Will -c default and -m default work?

Thanks,
Ashic.
  

RE: Workers not registering after master restart

2014-11-04 Thread Ashic Mahtab
Hi Nan,Cool. Thanks.
Regards,Ashic.
Date: Tue, 4 Nov 2014 18:26:48 -0500
From: zhunanmcg...@gmail.com
To: as...@live.com
CC: user@spark.apache.org
Subject: Re: Workers not registering after master restart



Hi, Ashic, 
this is expected for the latest released version

However, workers should be able to re-register since 1.2, since this patch 
https://github.com/apache/spark/pull/2828 was merged
Best,

-- Nan Zhu

 
On Tuesday, November 4, 2014 at 6:00 PM, Ashic Mahtab wrote:




Hi,I've set up a standalone Spark master (no failover or file recovery 
specified), and brought up a few worker nodes. All of them registered and were 
shown in the master web UI. I then stopped and started the master service (the 
workers were still running). After the master started up, I checked the web UI 
and none of the workers were registered. I then stopped and started each worker 
and they registered with the master again.
My question is - is this expected? Is there a timeout after which the worker 
would have rejoined the master? Or is the only way to ensure workers rejoin is 
to run master failover  or file based recovery for the master?
Thanks,Ashic. 

 
 
 
 

 



  

RE: how idf is calculated

2014-10-30 Thread Ashic Mahtab
Hi Andrejs,The calculations are a bit different to what I've come across in 
Mining Massive Datasets (2nd Ed. Ullman et. al.,  Cambridge Press) available 
here:http://www.mmds.org/ 
Their calculation of IDF is as follows:
IDFi = log2(N / ni)
where N is the number of documents and ni is the number of documents in which 
the word appears. This looks different to your IDF function.
For TF, they use
TFij = fij / maxk fkj
That is:
For document j, the term frequency of the term i in j is the number of 
times i appears in j divided by the maximum number of times any term appears in 
j. Stop words are usually excluded when considering the maximum).
So, in your case, the 
TFa1 = 2 / 2 = 1
TFb1 = 1 / 2 = 0.5TFc1 = 1/2 = 0.5TFm1 = 2/2 = 1...
IDFa = log2(3 / 2) = 0.585
So, TFa1 * IDFa = 0.585
Wikipedia mentions an adjustment to overcome biases for long documents, by 
calculating TFij = 0.5 + {(0.5*fij)/maxk fkj}, but that doesn't change anything 
for TFa1, as the value remains 1.
In other words, my calculations don't agree with yours, and neither seem to 
agree with Spark :)
Regards,Ashic.
Date: Thu, 30 Oct 2014 22:13:49 +
Subject: how idf is calculated
From: andr...@sindicetech.com
To: u...@spark.incubator.apache.org

Hi,I'm writing a paper and I need to calculate tf-idf. Whit your help I managed 
to get results, I needed, but the problem is that I need to be able to explain 
how each number was gotten. So I tried to understand how idf was calculated and 
the numbers i get don't correspond to those I should get .  
I have 3 documents (each line a document)a a b c m me a c d e ed j k l m m c
When I calculate tf, I get this 
(1048576,[99,100,106,107,108,109],[1.0,1.0,1.0,1.0,1.0,2.0])(1048576,[97,98,99,109],[2.0,1.0,1.0,2.0])(1048576,[97,99,100,101],[1.0,1.0,1.0,3.0]
idf is supposedly calculated idf = log((m + 1) / (d(t) + 1))m -number of 
documents (3 in my case).d(t) - in how many documents is term presenta: 
log(4/3) =0.1249387366b: log(4/2) =0.3010299957c: log(4/4) =0d: log(4/3) 
=0.1249387366e: log(4/2) =0.3010299957l: log(4/2) =0.3010299957m: log(4/3) 
=0.1249387366
When I output  idf vector ` 
idf.idf.toArray.filter(_.(0)).distinct.foreach(println(_)) `I get 
:1.38629436111989060.287682072451780850.6931471805599453
I understand why there are only 3 numbers, because only 3 are unique : 
log(4/2), log(4/3), log(4/4), but I don't understand how numbers in idf where 
calculated 
Best regards,Andrejs 
  

RE: Which is better? One spark app listening to 10 topics vs. 10 spark apps each listening to 1 topic

2014-10-27 Thread Ashic Mahtab
I'm quite interested in this as well. I remember something about a streaming 
context needing one core. If that's the case, then won't 10 apps require 10 
cores? Seems like a waste unless each topic is quite resource hungry? Would 
love to hear from the experts :)

Date: Mon, 27 Oct 2014 06:35:29 -0400
From: a...@alectenharmsel.com
To: user@spark.apache.org
Subject: Re: Which is better? One spark app listening to 10 topics vs. 10 spark 
apps each listening to 1 topic


  

  
  


On 10/27/2014 05:19 AM, Jianshi Huang
  wrote:



  Any suggestion? :)



Jianshi
  
  

On Thu, Oct 23, 2014 at 3:49 PM,
  Jianshi Huang jianshi.hu...@gmail.com
  wrote:

  

  The Kafka stream has 10 topics and the data rate is
quite high (~ 100K/s per topic).
  

  
  Which configuration do you recommend?
  - 1 Spark app consuming all Kafka topics
  - 10 separate Spark app each consuming one topic
  

  
  Assuming they have the same resource pool.
  

  
  Cheers,
  -- 

  Jianshi Huang

  

  LinkedIn: jianshi

  Twitter: @jshuang

  Github  Blog: http://huangjs.github.com/


  







-- 

Jianshi Huang



LinkedIn: jianshi

Twitter: @jshuang

Github  Blog: http://huangjs.github.com/

  



Do you have time to try and benchmark both? I don't know anything
about Kafka, but I would imagine that the performance of both
options would be similar.



That said, I would recommend having them all run separately; adding
new data streams doesn't require killing a monolithic job, and an
error in one stream would affect a monolithic job much worse that
having them all run separately.



Regards,



Alec
  

  1   2   >