RE: Out of Memory Errors on less number of cores in proportion to Partitions in Data

2015-07-08 Thread Evo Eftimov
This is most likely due to the internal implementation of ALS in MLib. Probably 
for each parallel unit of execution (partition in Spark terms) the 
implementation allocates and uses a RAM buffer where it keeps interim results 
during the ALS iterations

 

If we assume that the size of that internal RAM buffer is fixed per Unit of 
Execution then Total RAM (20 partitions x fixed RAM buffer)  Total RAM (100 
partitions x fixed RAM buffer) 

 

From: Aniruddh Sharma [mailto:asharma...@gmail.com] 
Sent: Wednesday, July 8, 2015 12:22 PM
To: user@spark.apache.org
Subject: Out of Memory Errors on less number of cores in proportion to 
Partitions in Data

 

Hi,

I am new to Spark. I have done following tests and I am confused in 
conclusions. I have 2 queries.

Following is the detail of test

Test 1) Used 11 Node Cluster where each machine has 64 GB RAM and 4 physical 
cores. I ran a ALS algorithm using MilLib on 1.6 GB data set. I ran 10 
executors and my Rating data set has 20 partitions. It works. In order to 
increase parallelism, I did 100 partitions instead of 20 and now program does 
not work and it throws out of memory error.

 

Query a): As I had 4 cores on each machine , but my number of partitions are 10 
in each executor and my cores are not sufficient for partitions. Is it supposed 
to give memory errors when this kind of misconfiguration.If there are not 
sufficient cores and processing cannot be done in parallel, can different 
partitions not be processed sequentially and operation could have become slow 
rather than throwing memory error.

Query b)  If it gives error, then error message is not meaningful Here my DAG 
was very simple and I could trace that lowering number of partitions is 
working, but if on misconfiguration of cores it throws error, then how to debug 
it in complex DAGs as error does not tell explicitly that problem could be due 
to low number of cores. If my understanding is incorrect, then kindly explain 
the reasons of error in this case

 

Thanks and Regards

Aniruddh



RE: Out of Memory Errors on less number of cores in proportion to Partitions in Data

2015-07-08 Thread Evo Eftimov
Are you sure you have actually increased the RAM (how exactly did you do that 
and does it show in Spark UI)

 

Also use the SPARK UI and the driver console  to check the RAM allocated for 
each RDD and RDD partion in each of the scenarios  

 

Re b) the general rule is num of partitions = 2 x num of CPU cores

 

All partitions are operated in parallel (by independently running JVM Threads), 
however if you have substantially higher num of partitions (JVM Threads) than 
num of core then you will get what happens in any JVM or OS – there will be 
switching between the Threads and some of them will be in a suspended mode 
waiting for free core (Thread contexts also occupy additional RAM )

 

From: Aniruddh Sharma [mailto:asharma...@gmail.com] 
Sent: Wednesday, July 8, 2015 12:52 PM
To: Evo Eftimov
Subject: Re: Out of Memory Errors on less number of cores in proportion to 
Partitions in Data

 

Thanks for your revert...

I increased executor memory from 4GB to 35 GB and still out of memory error 
happens. So it seems it may not be entirely due to more buffers due to more 
partitions.

Query a) Is there a way to debug at more granular level from user code 
perspective where things could go wrong.

 

Query b) 

In general my query is lets suppose it is not ALS (or some iterative 
algorithm). Lets say it is some sample RDD but which 1 partitions and each 
executor has 50 partitions and each machine has 4 physical cores.So do 4 
physical cores parallely try to process these 50 partitions (doing 
multitasking) or will it work in a way that 4 cores will first process first 4 
partitions and then next 4 partitions and so on. 

Thanks and Regards

Aniruddh

 

On Wed, Jul 8, 2015 at 5:09 PM, Evo Eftimov evo.efti...@isecc.com wrote:

This is most likely due to the internal implementation of ALS in MLib. Probably 
for each parallel unit of execution (partition in Spark terms) the 
implementation allocates and uses a RAM buffer where it keeps interim results 
during the ALS iterations

 

If we assume that the size of that internal RAM buffer is fixed per Unit of 
Execution then Total RAM (20 partitions x fixed RAM buffer)  Total RAM (100 
partitions x fixed RAM buffer) 

 

From: Aniruddh Sharma [mailto:asharma...@gmail.com] 
Sent: Wednesday, July 8, 2015 12:22 PM
To: user@spark.apache.org
Subject: Out of Memory Errors on less number of cores in proportion to 
Partitions in Data

 

Hi,

I am new to Spark. I have done following tests and I am confused in 
conclusions. I have 2 queries.

Following is the detail of test

Test 1) Used 11 Node Cluster where each machine has 64 GB RAM and 4 physical 
cores. I ran a ALS algorithm using MilLib on 1.6 GB data set. I ran 10 
executors and my Rating data set has 20 partitions. It works. In order to 
increase parallelism, I did 100 partitions instead of 20 and now program does 
not work and it throws out of memory error.

 

Query a): As I had 4 cores on each machine , but my number of partitions are 10 
in each executor and my cores are not sufficient for partitions. Is it supposed 
to give memory errors when this kind of misconfiguration.If there are not 
sufficient cores and processing cannot be done in parallel, can different 
partitions not be processed sequentially and operation could have become slow 
rather than throwing memory error.

Query b)  If it gives error, then error message is not meaningful Here my DAG 
was very simple and I could trace that lowering number of partitions is 
working, but if on misconfiguration of cores it throws error, then how to debug 
it in complex DAGs as error does not tell explicitly that problem could be due 
to low number of cores. If my understanding is incorrect, then kindly explain 
the reasons of error in this case

 

Thanks and Regards

Aniruddh

 



RE: Out of Memory Errors on less number of cores in proportion to Partitions in Data

2015-07-08 Thread Evo Eftimov
Also try to increase the number of partions gradually – not in one big jump 
from 20 to 100 but adding e.g. 10 at a time and see whether there is a 
correlation with adding more RAM to the executors 

 

From: Evo Eftimov [mailto:evo.efti...@isecc.com] 
Sent: Wednesday, July 8, 2015 1:26 PM
To: 'Aniruddh Sharma'; 'user@spark.apache.org'
Subject: RE: Out of Memory Errors on less number of cores in proportion to 
Partitions in Data

 

Are you sure you have actually increased the RAM (how exactly did you do that 
and does it show in Spark UI)

 

Also use the SPARK UI and the driver console  to check the RAM allocated for 
each RDD and RDD partion in each of the scenarios  

 

Re b) the general rule is num of partitions = 2 x num of CPU cores

 

All partitions are operated in parallel (by independently running JVM Threads), 
however if you have substantially higher num of partitions (JVM Threads) than 
num of core then you will get what happens in any JVM or OS – there will be 
switching between the Threads and some of them will be in a suspended mode 
waiting for free core (Thread contexts also occupy additional RAM )

 

From: Aniruddh Sharma [mailto:asharma...@gmail.com] 
Sent: Wednesday, July 8, 2015 12:52 PM
To: Evo Eftimov
Subject: Re: Out of Memory Errors on less number of cores in proportion to 
Partitions in Data

 

Thanks for your revert...

I increased executor memory from 4GB to 35 GB and still out of memory error 
happens. So it seems it may not be entirely due to more buffers due to more 
partitions.

Query a) Is there a way to debug at more granular level from user code 
perspective where things could go wrong.

 

Query b) 

In general my query is lets suppose it is not ALS (or some iterative 
algorithm). Lets say it is some sample RDD but which 1 partitions and each 
executor has 50 partitions and each machine has 4 physical cores.So do 4 
physical cores parallely try to process these 50 partitions (doing 
multitasking) or will it work in a way that 4 cores will first process first 4 
partitions and then next 4 partitions and so on. 

Thanks and Regards

Aniruddh

 

On Wed, Jul 8, 2015 at 5:09 PM, Evo Eftimov evo.efti...@isecc.com wrote:

This is most likely due to the internal implementation of ALS in MLib. Probably 
for each parallel unit of execution (partition in Spark terms) the 
implementation allocates and uses a RAM buffer where it keeps interim results 
during the ALS iterations

 

If we assume that the size of that internal RAM buffer is fixed per Unit of 
Execution then Total RAM (20 partitions x fixed RAM buffer)  Total RAM (100 
partitions x fixed RAM buffer) 

 

From: Aniruddh Sharma [mailto:asharma...@gmail.com] 
Sent: Wednesday, July 8, 2015 12:22 PM
To: user@spark.apache.org
Subject: Out of Memory Errors on less number of cores in proportion to 
Partitions in Data

 

Hi,

I am new to Spark. I have done following tests and I am confused in 
conclusions. I have 2 queries.

Following is the detail of test

Test 1) Used 11 Node Cluster where each machine has 64 GB RAM and 4 physical 
cores. I ran a ALS algorithm using MilLib on 1.6 GB data set. I ran 10 
executors and my Rating data set has 20 partitions. It works. In order to 
increase parallelism, I did 100 partitions instead of 20 and now program does 
not work and it throws out of memory error.

 

Query a): As I had 4 cores on each machine , but my number of partitions are 10 
in each executor and my cores are not sufficient for partitions. Is it supposed 
to give memory errors when this kind of misconfiguration.If there are not 
sufficient cores and processing cannot be done in parallel, can different 
partitions not be processed sequentially and operation could have become slow 
rather than throwing memory error.

Query b)  If it gives error, then error message is not meaningful Here my DAG 
was very simple and I could trace that lowering number of partitions is 
working, but if on misconfiguration of cores it throws error, then how to debug 
it in complex DAGs as error does not tell explicitly that problem could be due 
to low number of cores. If my understanding is incorrect, then kindly explain 
the reasons of error in this case

 

Thanks and Regards

Aniruddh

 



RE: foreachRDD vs. forearchPartition ?

2015-07-08 Thread Evo Eftimov
That was a) fuzzy b) insufficient – one can certainly use forach (only) on 
DStream RDDs – it works as empirical observation  

 

As another empirical observation:

 

For each partition results in having one instance of the lambda/closure per 
partition when e.g. publishing to output systems like message brokers, 
databases and file systems - that increases the level of parallelism of your 
output processing 

 

As an architect I deal with gazillions of products and don’t have time to read 
the source code of all of them to make up for documentation deficiencies. On 
the other hand I believe you have been involved in writing some of the code so 
be a good boy and either answer this question properly or enhance the product 
documentation of that area of the system 

 

From: Sean Owen [mailto:so...@cloudera.com] 
Sent: Wednesday, July 8, 2015 2:52 PM
To: dgoldenberg; user@spark.apache.org
Subject: Re: foreachRDD vs. forearchPartition ?

 

These are quite different operations. One operates on RDDs in  DStream and one 
operates on partitions of an RDD. They are not alternatives. 

 

On Wed, Jul 8, 2015, 2:43 PM dgoldenberg dgoldenberg...@gmail.com wrote:

Is there a set of best practices for when to use foreachPartition vs.
foreachRDD?

Is it generally true that using foreachPartition avoids some of the
over-network data shuffling overhead?

When would I definitely want to use one method vs. the other?

Thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/foreachRDD-vs-forearchPartition-tp23714.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE:

2015-07-07 Thread Evo Eftimov
spark.streaming.unpersist = false // in order for SStreaming to not drop the 
raw RDD data

spark.cleaner.ttl = some reasonable value in seconds

 

why is the above suggested provided the persist/vache operation on the 
constantly unioniuzed Batch RDD will have to be invoked anyway (after every 
union with DStream RDD), besides it will result in DStraeam RDDs accumulating 
in RAM unncesesarily for the duration of TTL  

 

re 

 

“A more reliable way would be to do dstream.window(...) for the length of time 
you want to keep the data and then union that data with your RDD for further 
processing using transform.”

 

I think the actual requirement here is picking up and adding Specific Messages 
from EVERY DStream RDD  to the Batch RDD rather than “preserving” messages from 
specific  sliding window and adding them to the Batch RDD

 

This should be defined as the Frequency of Updates to the Batch RDD and then 
using dstream.window() equal to that frequency 

 

Can you also elaborate why you consider the dstream.window  approach more 
“reliable”

 

From: Gerard Maas [mailto:gerard.m...@gmail.com] 
Sent: Tuesday, July 7, 2015 12:56 PM
To: Anand Nalya
Cc: spark users
Subject: Re:

 

Anand,

 

AFAIK, you will need to change two settings:

 

spark.streaming.unpersist = false // in order for SStreaming to not drop the 
raw RDD data

spark.cleaner.ttl = some reasonable value in seconds

 

Also be aware that the lineage of your union RDD will grow with each batch 
interval. You will need to break lineage often with cache(), and rely on the 
ttl for clean up.

You will probably be in some tricky ground with this approach.

 

A more reliable way would be to do dstream.window(...) for the length of time 
you want to keep the data and then union that data with your RDD for further 
processing using transform.

Something like:

dstream.window(Seconds(900), Seconds(900)).transform(rdd = rdd union 
otherRdd)...

 

If you need an unbound amount of dstream batch intervals, considering writing 
the data to secondary storage instead.

 

-kr, Gerard.

 

 

 

On Tue, Jul 7, 2015 at 1:34 PM, Anand Nalya anand.na...@gmail.com wrote:

Hi,

 

Suppose I have an RDD that is loaded from some file and then I also have a 
DStream that has data coming from some stream. I want to keep union some of the 
tuples from the DStream into my RDD. For this I can use something like this:

 

  var myRDD: RDD[(String, Long)] = sc.fromText...

  dstream.foreachRDD{ rdd =

myRDD = myRDD.union(rdd.filter(myfilter))

  }

 

My questions is that for how long spark will keep RDDs underlying the dstream 
around? Is there some configuratoin knob that can control that?

 

Regards,

Anand

 



RE:

2015-07-07 Thread Evo Eftimov
Requirements – then see my abstracted interpretation – what else do you need in 
terms of Requirements …:

 

“Suppose I have an RDD that is loaded from some file and then I also have a 
DStream that has data coming from some stream. I want to keep union some of the 
tuples from the DStream into my RDD. For this I can use something like this:”

A formal requirements spec derived from the above - I think the actual 
requirement here is picking up and adding Specific (filtered) Messages from 
EVERY DStream RDD  to the Batch RDD rather than “preserving” (on top of that 
all) messages from  sliding window and adding them to the Batch RDD. Such 
requiremet should be defined as the Frequency of Updates to the Batch RDD and 
what these updates are e.g. specific filtered messages and then using 
dstream.window() can be made equal to that frequency

Essentialy the update frequency can range from the filtered messages of Every 
Single DStream RDD to the filetered messages of a SLIDING WINDOW  

 

Secondly what do you call “mutable uniniong”

 

That was his initial code

 

  var myRDD: RDD[(String, Long)] = sc.fromText...

  dstream.foreachRDD{ rdd =

myRDD = myRDD.union(rdd.filter(myfilter))

  }

 

 

Here is how it looks when Persisting the result from evet union – supposed to 
produce NEW PERSTINET IMMUTABLE Batch RDD – why is that supposed to be less 
“stable/reliable” – what are the exact tectnical reasons for that 

  var myRDD: RDD[(String, Long)] = sc.fromText...

  dstream.foreachRDD{ rdd =

myRDD = myRDD.union(rdd.filter(myfilter)).cashe()

  }

 

 

 

 

From: Gerard Maas [mailto:gerard.m...@gmail.com] 
Sent: Tuesday, July 7, 2015 1:55 PM
To: Evo Eftimov
Cc: Anand Nalya; spark users
Subject: Re:

 

Evo,

 

I'd let the OP clarify the question. I'm not in position of clarifying his 
requirements beyond what's written on the question.

 

Regarding window vs mutable union: window is a well-supported feature that 
accumulates messages over time. The mutable unioning of RDDs is bound to 
operational trouble as there're no warranties tied to data preservation and 
it's unclear how one can produce 'cuts' of that union ready to be served for 
some process/computation.  Intuitively, it will 'explode' at some point.

 

-kr, Gerard.

 

 

 

On Tue, Jul 7, 2015 at 2:06 PM, Evo Eftimov evo.efti...@isecc.com wrote:

spark.streaming.unpersist = false // in order for SStreaming to not drop the 
raw RDD data

spark.cleaner.ttl = some reasonable value in seconds

 

why is the above suggested provided the persist/vache operation on the 
constantly unioniuzed Batch RDD will have to be invoked anyway (after every 
union with DStream RDD), besides it will result in DStraeam RDDs accumulating 
in RAM unncesesarily for the duration of TTL  

 

re 

 

“A more reliable way would be to do dstream.window(...) for the length of time 
you want to keep the data and then union that data with your RDD for further 
processing using transform.”

 

I think the actual requirement here is picking up and adding Specific Messages 
from EVERY DStream RDD  to the Batch RDD rather than “preserving” messages from 
specific  sliding window and adding them to the Batch RDD

 

This should be defined as the Frequency of Updates to the Batch RDD and then 
using dstream.window() equal to that frequency 

 

Can you also elaborate why you consider the dstream.window  approach more 
“reliable”

 

From: Gerard Maas [mailto:gerard.m...@gmail.com] 
Sent: Tuesday, July 7, 2015 12:56 PM
To: Anand Nalya
Cc: spark users
Subject: Re:

 

Anand,

 

AFAIK, you will need to change two settings:

 

spark.streaming.unpersist = false // in order for SStreaming to not drop the 
raw RDD data

spark.cleaner.ttl = some reasonable value in seconds

 

Also be aware that the lineage of your union RDD will grow with each batch 
interval. You will need to break lineage often with cache(), and rely on the 
ttl for clean up.

You will probably be in some tricky ground with this approach.

 

A more reliable way would be to do dstream.window(...) for the length of time 
you want to keep the data and then union that data with your RDD for further 
processing using transform.

Something like:

dstream.window(Seconds(900), Seconds(900)).transform(rdd = rdd union 
otherRdd)...

 

If you need an unbound amount of dstream batch intervals, considering writing 
the data to secondary storage instead.

 

-kr, Gerard.

 

 

 

On Tue, Jul 7, 2015 at 1:34 PM, Anand Nalya anand.na...@gmail.com wrote:

Hi,

 

Suppose I have an RDD that is loaded from some file and then I also have a 
DStream that has data coming from some stream. I want to keep union some of the 
tuples from the DStream into my RDD. For this I can use something like this:

 

  var myRDD: RDD[(String, Long)] = sc.fromText...

  dstream.foreachRDD{ rdd =

myRDD = myRDD.union(rdd.filter(myfilter))

  }

 

My questions is that for how long spark will keep RDDs underlying the dstream 
around

R on spark

2015-06-27 Thread Evo Eftimov
I had a look at the new R on Spark API / Feature in Spark 1.4.0

For those skilled in the art (of R and distributed computing) it will be
immediately clear that ON is a marketing ploy and what it actually is is
TO ie Spark 1.4.0 offers INTERFACE from R TO DATA stored in Spark in
distributed fashion and some distributed queries which can be initiated FROM
R and run on that data within Spark - these are essentially certain types of
SQL style queries 

In order to deserve the ON label, RSpark has to be able to run ON Spark
most of the Statistical Analysis and Machine Learning Algos as found in the
R engine. This is absolutely not the case at the moment.

As an example of what type of Solution/Architecture I am referring to you
can review Revolution Analytics (recently acquired by Microsoft) and some
other open source frameworks for running R ON distributed clusters 

 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/R-on-spark-tp23512.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Spark Streaming: limit number of nodes

2015-06-24 Thread Evo Eftimov
Ok so you are running Spark in a Standalone Mode then 

 

Then for every Worker process on every node (you can run more than one Worker 
per node) you will have an Executor waiting for jobs ….

 

As far as I am concerned I think there are only two ways to achieve what  you 
need:

 

1.   Simply shutdown the spark worker processes / demons on the nodes you 
want to keep free from spark workloads OR run two separate Spark clusters one 
with e.g. 2 workers and one with e..g 5 workers – small jobs go to cluster 1 
and big jobs to cluster 2

2.   Try to set spark.executor.cores BUT that limits the number of cores 
per Executor rather than the total cores for the job and hence will probably 
not yield the effect you need  

 

From: Wojciech Pituła [mailto:w.pit...@gmail.com] 
Sent: Wednesday, June 24, 2015 10:49 AM
To: Evo Eftimov; user@spark.apache.org
Subject: Re: Spark Streaming: limit number of nodes

 

Ok, thanks. I have 1 worker process on each machine but I would like to run my 
app on only 3 of them. Is it possible?

 

śr., 24.06.2015 o 11:44 użytkownik Evo Eftimov evo.efti...@isecc.com napisał:

There is no direct one to one mapping between Executor and Node

 

Executor is simply the spark framework term for JVM instance with some spark 
framework system code running in it 

 

A node is a physical server machine 

 

You can have more than one JVM per node 

 

And vice versa you can have Nodes without any JVM running on them. How? BY 
specifying the number of executors to be less than the number of nodes  

 

So if you specify number of executors to be 1 and you have 5 nodes,  ONE 
executor will run on only one of them 

 

The above is valid for Spark on YARN 

 

For spark in standalone mode the number of executors is equal to the number of 
spark worker processes (daemons) running on each node

 

From: Wojciech Pituła [mailto:w.pit...@gmail.com] 
Sent: Tuesday, June 23, 2015 12:38 PM
To: user@spark.apache.org
Subject: Spark Streaming: limit number of nodes

 

I have set up small standalone cluster: 5 nodes, every node has 5GB of memory 
an 8 cores. As you can see, node doesn't have much RAM.

 

I have 2 streaming apps, first one is configured to use 3GB of memory per node 
and second one uses 2GB per node.

 

My problem is, that smaller app could easily run on 2 or 3 nodes, instead of 5 
so I could lanuch third app. 

 

Is it possible to limit number of nodes(executors) that app wil get from 
standalone cluster?



RE: Spark Streaming: limit number of nodes

2015-06-24 Thread Evo Eftimov
There is no direct one to one mapping between Executor and Node

 

Executor is simply the spark framework term for JVM instance with some spark 
framework system code running in it 

 

A node is a physical server machine 

 

You can have more than one JVM per node 

 

And vice versa you can have Nodes without any JVM running on them. How? BY 
specifying the number of executors to be less than the number of nodes  

 

So if you specify number of executors to be 1 and you have 5 nodes,  ONE 
executor will run on only one of them 

 

The above is valid for Spark on YARN 

 

For spark in standalone mode the number of executors is equal to the number of 
spark worker processes (daemons) running on each node

 

From: Wojciech Pituła [mailto:w.pit...@gmail.com] 
Sent: Tuesday, June 23, 2015 12:38 PM
To: user@spark.apache.org
Subject: Spark Streaming: limit number of nodes

 

I have set up small standalone cluster: 5 nodes, every node has 5GB of memory 
an 8 cores. As you can see, node doesn't have much RAM.

 

I have 2 streaming apps, first one is configured to use 3GB of memory per node 
and second one uses 2GB per node.

 

My problem is, that smaller app could easily run on 2 or 3 nodes, instead of 5 
so I could lanuch third app. 

 

Is it possible to limit number of nodes(executors) that app wil get from 
standalone cluster?



RE: Web UI vs History Server Bugs

2015-06-23 Thread Evo Eftimov
Probably your application has crashed or was terminated without invoking the
stop method of spark context - in such cases it doesn't create the empty
flag file which apparently tells the history server that it can safely show
the log data - simpy go to some of the other dirs of the history server to
see what the name of the flag file was and then create it manually in the
dirs of the missing apps - then they will appear in the history server ui

 

From: Steve Loughran [mailto:ste...@hortonworks.com] 
Sent: Monday, June 22, 2015 7:22 PM
To: Jonathon Cai
Cc: user@spark.apache.org
Subject: Re: Web UI vs History Server Bugs

 

well, I'm afraid you've reached the limits of my knowledge ... hopefully
someone else can answer 

 

On 22 Jun 2015, at 16:37, Jonathon Cai jonathon@yale.edu wrote:

 

No, what I'm seeing is that while the cluster is running, I can't see the
app info after the app is completed. That is to say, when I click on the
application name on master:8080, no info is shown. However, when I examine
the same file on the History Server, the application information opens fine.

 

On Sat, Jun 20, 2015 at 6:47 AM, Steve Loughran ste...@hortonworks.com
wrote:


 On 17 Jun 2015, at 19:10, jcai jonathon@yale.edu wrote:

 Hi,

 I am running this on Spark stand-alone mode. I find that when I examine
the
 web UI, a couple bugs arise:

 1. There is a discrepancy between the number denoting the duration of the
 application when I run the history server and the number given by the web
UI
 (default address is master:8080). I checked more specific details,
including
 task and stage durations (when clicking on the application), and these
 appear to be the same for both avenues.

 2. Sometimes the web UI on master:8080 is unable to display more specific
 information for an application that has finished (when clicking on the
 application), even when there is a log file in the appropriate directory.
 But when the history server is opened, it is able to read this file and
 output information.


There's a JIRA open on the history server caching incomplete work...if you
click on the link to a job while it's in progress, you don't get any updates
later.

does this sound like what you are seeing?

 

 



Spark Streaming 1.3.0 ERROR LiveListenerBus

2015-06-19 Thread Evo Eftimov
Spark Streaming 1.3.0 on YARN during Job Execution keeps generating the
following error while the application is running:

ERROR LiveListenerBus: Listener EventLoggingListener threw an exception 
java.lang.reflect.InvocationTargetException
etc
etc
Caused by: java.io.IOException: Filesystem closed
   at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:794)


This exception does NOT make the streaming job fail.

Secondly, the streaming job does NOT perform ANY operations with HDFS and is
really a basic streaming job to test a new Spark 1.3.0 deployment on CDH 5.4

The above exception is thrown by the Spark framework itself. I have seen
some posts related to that exception which are about Spark Batch and
confirmed as bug in spark which however is not critical.

And btw this error appears well before any  attempted stop of the spark
streaming context and hence is not related to trying to stop the context 

Does anybody know how to get rid off this in Spark Streaming 1.3.0 on YARN
on CDH 5.4 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-1-3-0-ERROR-LiveListenerBus-tp23411.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Machine Learning on GraphX

2015-06-18 Thread Evo Eftimov
What is GraphX:

 

-  It can be viewed as a kind of Distributed, Parallel, Graph Database

-  It can be viewed as Graph Data Structure (Data Structures 101 from 
your CS course)

-  It features some off the shelve algos for Graph Processing and 
Navigation  (Algos and Data Structures 101) and the implementation of these 
takes advantage of the distributed parallel nature of GrapphX

 

Any of the MLib algos can be applied to ANY data structure from time series to 
graph to matrix/tabular etc – it is up to your needs and imagination 

 

As an example – Clustering – you can apply it to Graph Data Structure BUT you 
may also leverage the Graph inherent connection/clustering properties and Graph 
algos taking advantage of that Instead of e.g. the run of the mill K-Means 
which is ok for te.g. time series, matrix etc data structures

 

From: Timothée Rebours [mailto:t.rebo...@gmail.com] 
Sent: Thursday, June 18, 2015 10:44 AM
To: Akhil Das
Cc: user@spark.apache.org
Subject: Re: Machine Learning on GraphX

 

Thanks for the quick answer.
I've already followed this tutorial but it doesn't use GraphX at all. My goal 
would be to work directly on the graph, and not extracting edges and vertices 
from the graph as standard RDDs and then work on that with the standard MLlib's 
ALS, which has no interest. That's why I tried with the other implementation, 
but it's not optimized at all.

I might have gone in the wrong direction with the ALS, but I'd like to see 
what's possible to do with MLlib on GraphX. Any idea ?

 

2015-06-18 11:19 GMT+02:00 Akhil Das ak...@sigmoidanalytics.com:

This might give you a good start 
http://ampcamp.berkeley.edu/big-data-mini-course/movie-recommendation-with-mllib.html
 its a bit old though.




Thanks

Best Regards

 

On Thu, Jun 18, 2015 at 2:33 PM, texol t.rebo...@gmail.com wrote:

Hi,

I'm new to GraphX and I'd like to use Machine Learning algorithms on top of
it. I wanted to write a simple program implementing MLlib's ALS on a
bipartite graph (a simple movie recommendation), but didn't succeed. I found
an implementation on Spark 1.1.x
(https://github.com/ankurdave/spark/blob/GraphXALS/graphx/src/main/scala/org/apache/spark/graphx/lib/ALS.scala)
of ALS on GraphX, but it is painfully slow compared to the standard
implementation, and uses the deprecated (in the current version)
PregelVertex class.
Do we expect a new implementation ? Is there a smarter solution to do so ?

Thanks,
Regards,
Timothée Rebours.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Machine-Learning-on-GraphX-tp23388.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

 





 

-- 

Timothée Rebours
13, rue Georges Bizet
78380 BOUGIVAL



RE: Spark or Storm

2015-06-17 Thread Evo Eftimov
The only thing which doesn't make much sense in Spark Streaming (and I am
not saying it is done better in Storm) is the iterative and redundant
shipping of the essentially the same tasks (closures/lambdas/functions) to
the cluster nodes AND re-launching them there again and again 

 

This is a legacy from Spark Batch where such approach DOES make sense 

 

So to recap - in Spark Streaming, the driver keeps serializing and
transmitting the same Tasks (comprising a Job) for every new DStream RDD,
which then get re-launched ie new JVM Threads launched within each Executor
(JVM) and then the tasks report their final execution status to the driver
(only the last has real functional purpose)

 

An optimization (provided Spark Streaming was implemented from scratch)
could be to launch the Stages/Tasks of a Streaming Job as constantly running
Threads (Demons/Agents) within the Executors and leave the DStream RDD
stream through them as only the final execution status for each DSTream
RDD and some periodical heartbeats (of the Agents) are reported to the
driver   

 

Essentially this gives you are Pipeline Architecture (of stringed Agents)
which is a well known Parallel Programming Patterns especially suitable for
streaming data 

 

From: Matei Zaharia [mailto:matei.zaha...@gmail.com] 
Sent: Wednesday, June 17, 2015 7:14 PM
To: Enno Shioji
Cc: Ashish Soni; ayan guha; Sabarish Sasidharan; Spark Enthusiast; Will
Briggs; user; Sateesh Kavuri
Subject: Re: Spark or Storm

 

This documentation is only for writes to an external system, but all the
counting you do within your streaming app (e.g. if you use
reduceByKeyAndWindow to keep track of a running count) is exactly-once. When
you write to a storage system, no matter which streaming framework you use,
you'll have to make sure the writes are idempotent, because the storage
system can't know whether you meant to write the same data again or not. But
the place where Spark Streaming helps over Storm, etc is for tracking state
within your computation. Without that facility, you'd not only have to make
sure that writes are idempotent, but you'd have to make sure that updates to
your own internal state (e.g. reduceByKeyAndWindow) are exactly-once too.

 

Matei

 

On Jun 17, 2015, at 8:26 AM, Enno Shioji eshi...@gmail.com wrote:

 

The thing is, even with that improvement, you still have to make updates
idempotent or transactional yourself. If you read
http://spark.apache.org/docs/latest/streaming-programming-guide.html#fault-t
olerance-semantics

 

that refers to the latest version, it says:

 


Semantics of output operations


Output operations (like foreachRDD) have at-least once semantics, that is,
the transformed data may get written to an external entity more than once in
the event of a worker failure. While this is acceptable for saving to file
systems using the saveAs***Files operations (as the file will simply get
overwritten with the same data), additional effort may be necessary to
achieve exactly-once semantics. There are two approaches.

. Idempotent updates: Multiple attempts always write the same data.
For example, saveAs***Files always writes the same data to the generated
files.

. Transactional updates: All updates are made transactionally so
that updates are made exactly once atomically. One way to do this would be
the following.

o   Use the batch time (available in foreachRDD) and the partition index of
the transformed RDD to create an identifier. This identifier uniquely
identifies a blob data in the streaming application.

o   Update external system with this blob transactionally (that is, exactly
once, atomically) using the identifier. That is, if the identifier is not
already committed, commit the partition data and the identifier atomically.
Else if this was already committed, skip the update.

 

So either you make the update idempotent, or you have to make it
transactional yourself, and the suggested mechanism is very similar to what
Storm does.

 

 

 

 

On Wed, Jun 17, 2015 at 3:51 PM, Ashish Soni asoni.le...@gmail.com wrote:

@Enno 

As per the latest version and documentation Spark Streaming does offer
exactly once semantics using improved kafka integration , Not i have not
tested yet.

 

Any feedback will be helpful if anyone is tried the same.

 

http://koeninger.github.io/kafka-exactly-once/#7

 

https://databricks.com/blog/2015/03/30/improvements-to-kafka-integration-of-
spark-streaming.html

 

 

 

On Wed, Jun 17, 2015 at 10:33 AM, Enno Shioji eshi...@gmail.com wrote:

AFAIK KCL is *supposed* to provide fault tolerance and load balancing (plus
additionally, elastic scaling unlike Storm), Kinesis providing the
coordination. My understanding is that it's like a naked Storm worker
process that can consequently only do map.

 

I haven't really used it tho, so can't really comment how it compares to
Spark/Storm. Maybe somebody else will be able to comment.

 

 

 

On Wed, Jun 17, 2015 at 3:13 PM, ayan guha 

RE: stop streaming context of job failure

2015-06-16 Thread Evo Eftimov
 

https://spark.apache.org/docs/latest/monitoring.html

 

also subscribe to various Listeners for various Metrcis Types e.g. Job 
Stats/Statuses  - this will allow you (in the driver) to decide when to stop  
the context gracefully (the listening and stopping can be done from a 
completely separate thread in the driver)

 

https://spark.apache.org/docs/latest/api/java/

 

org.apache.spark.ui.jobs


Class JobProgressListener


· Object

·  

· org.apache.spark.ui.jobs.JobProgressListener

· All Implemented Interfaces:

Logging 
https://spark.apache.org/docs/latest/api/java/org/apache/spark/Logging.html , 
SparkListener 
https://spark.apache.org/docs/latest/api/java/org/apache/spark/scheduler/SparkListener.html
 

  _  

 

public class JobProgressListener

extends Object

implements SparkListener 
https://spark.apache.org/docs/latest/api/java/org/apache/spark/scheduler/SparkListener.html
 , Logging 
https://spark.apache.org/docs/latest/api/java/org/apache/spark/Logging.html 

:: DeveloperApi :: Tracks task-level information to be displayed in the UI. 

All access to the data structures in this class must be synchronized on the 
class, since the UI thread and the EventBus loop may otherwise be reading and 
updating the internal data structures concurrently.

·  

·  

 

 

From: Krot Viacheslav [mailto:krot.vyaches...@gmail.com] 
Sent: Tuesday, June 16, 2015 2:35 PM
To: user@spark.apache.org
Subject: stop streaming context of job failure

 

Hi all,

Is there a way to stop streaming context when some batch processing failed?

I want to set reasonable reties count, say 10, and if failed - stop context 
completely.

Is that possible?



RE: How does one decide no of executors/cores/memory allocation?

2015-06-16 Thread Evo Eftimov
Best is by measuring and recording how The Performance of your solution
scales as The Workload scales - recording As In Data Points recording and
then you can do some times series stat analysis and visualizations 

For example you can start with a single box with e.g. 8 CPU cores 

Use e.g. 1 or two partitions and 1 executor which would correspond to 1 CPU
Core (JVM Thread) processing your workload - scale the workload and see how
the performance scales and record all data points 
Then re[eat the same for more cpu cores, ram and boxes - you get the idea?

Then analyze your performance datasets in the way explained 

Basically this stuff is known as Performance Engineering and has nothing to
do with specific product - read something on PE as well  

-Original Message-
From: shreesh [mailto:shreesh.la...@mail.com] 
Sent: Tuesday, June 16, 2015 4:22 PM
To: user@spark.apache.org
Subject: Re: How does one decide no of executors/cores/memory allocation?

I realize that there are a lot of ways to configure my application in spark.
The part that is not clear is that how do I decide say for example in how
many partitions should I divide my data or how much ram should I have or how
many workers should one initialize?




--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/How-does-one-decide-no-o
f-executors-cores-memory-allocation-tp23326p23339.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Join between DStream and Periodically-Changing-RDD

2015-06-15 Thread Evo Eftimov
“turn (keep turning) your HDFS file (Batch RDD) into a stream of messages 
(outside spark streaming)” – what I meant by that was “turn the Updates to your 
HDFS dataset into Messages” and send them as such to spark streaming 

 

From: Evo Eftimov [mailto:evo.efti...@isecc.com] 
Sent: Monday, June 15, 2015 8:38 AM
To: 'Ilove Data'; 'Tathagata Das'
Cc: 'Akhil Das'; 'user'
Subject: RE: Join between DStream and Periodically-Changing-RDD

 

Then go for the second option I suggested - simply turn (keep turning) your 
HDFS file (Batch RDD) into a stream of messages (outside spark streaming) – 
then spark streaming consumes and aggregates the messages FOR THE RUNTIME 
LIFETIME of your application in some of the following ways:

 

1.   Continuous Union of DStream RDDs as you also Persist the result (so it 
doesn’t not get discarded whioch is what happens to DStream RDDs by default in 
spark streaming)   

2.   Apply one of the Window operations e.g. aggregation on the DSream RDD 
– as the window is the runtime lifetime of the app 

 

And at the same time you join the DStream RDDs of your actual Streaming Data 
with the above continuously updated DStream RDD representing your HDFS file 

 

From: Ilove Data [mailto:data4...@gmail.com] 
Sent: Monday, June 15, 2015 5:19 AM
To: Tathagata Das
Cc: Evo Eftimov; Akhil Das; user
Subject: Re: Join between DStream and Periodically-Changing-RDD

 

@Akhil Das

Join two Dstreams might not be an option since I want to join stream with 
historical data in HDFS folder.

 

@Tagatha Das  @Evo Eftimov

Batch RDD to be reloaded is considerably huge compare to Dstream data since it 
is historical data. To be more specific, most of join from rdd stream to hdfs 
folder (~90% of rdd streams data) will hit to recent data (last 1-2 days data) 
in hdfs folder. So it is important to get the most updated data.

 

Is there a workaround for that specific case? Since RDD are not mutable, do I 
need a K-V database for this join with historical data?

 

On Fri, Jun 12, 2015 at 8:14 AM, Tathagata Das t...@databricks.com wrote:

Another approach not mentioned is to use a function to get the RDD that is to 
be joined. Something like this.

 

 

Not sure, but you can try something like this also:

 

kvDstream.foreachRDD(rdd = {

  

  val rdd = getOrUpdateRDD(params...)

  

  rdd.join(kvFile)

  

  

})

The getOrUpdateRDD() function that you implement will get called every batch 
interval. And you can decide to return the same RDD or an updated RDD when you 
want to. Once updated, if the RDD is going to be used in multiple batch 
intervals, you should cache it. Furthermore, if you are going to join it, you 
should partition it by a partitioner, then cached it and make sure that the 
same partitioner is used for joining. That would be more efficient, as the RDD 
will stay partitioned in memory, minimizing the cost of join. 

 

 

On Wed, Jun 10, 2015 at 9:08 AM, Evo Eftimov evo.efti...@isecc.com wrote:

It depends on how big the Batch RDD requiring reloading is 

 

Reloading it for EVERY single DStream RDD would slow down the stream processing 
inline with the total time required to reload the Batch RDD …..

 

But if the Batch RDD is not that big then that might not be an issues 
especially in the context of the latency requirements for your streaming app

 

Another more efficient and real-time approach may be to represent your Batch 
RDD as a Dstraeam RDDs and keep aggregating them over the lifetime of the spark 
streaming app instance and keep joining with the actual Dstream RDDs 

 

You can feed your HDFS file into a Message Broker topic and consume it from 
there in the form of DStream RDDs which you keep aggregating over the lifetime 
of the spark streaming app instance 

 

From: Akhil Das [mailto:ak...@sigmoidanalytics.com] 
Sent: Wednesday, June 10, 2015 8:36 AM
To: Ilove Data
Cc: user@spark.apache.org
Subject: Re: Join between DStream and Periodically-Changing-RDD

 

RDD's are immutable, why not join two DStreams? 

 

Not sure, but you can try something like this also:

 

kvDstream.foreachRDD(rdd = {

  

  val file = ssc.sparkContext.textFile(/sigmoid/)

  val kvFile = file.map(x = (x.split(,)(0), x))

  

  rdd.join(kvFile)

  

  

})

 




Thanks

Best Regards

 

On Tue, Jun 9, 2015 at 7:37 PM, Ilove Data data4...@gmail.com wrote:

Hi,

 

I'm trying to join DStream with interval let say 20s, join with RDD loaded from 
HDFS folder which is changing periodically, let say new file is coming to the 
folder for every 10 minutes.

 

How should it be done, considering the HDFS files in the folder is periodically 
changing/adding new files? Do RDD automatically detect changes in HDFS folder 
as RDD source and automatically reload RDD?

 

Thanks!

Rendy

 

 

 



RE: Join between DStream and Periodically-Changing-RDD

2015-06-15 Thread Evo Eftimov
Then go for the second option I suggested - simply turn (keep turning) your 
HDFS file (Batch RDD) into a stream of messages (outside spark streaming) – 
then spark streaming consumes and aggregates the messages FOR THE RUNTIME 
LIFETIME of your application in some of the following ways:

 

1.   Continuous Union of DStream RDDs as you also Persist the result (so it 
doesn’t not get discarded whioch is what happens to DStream RDDs by default in 
spark streaming)   

2.   Apply one of the Window operations e.g. aggregation on the DSream RDD 
– as the window is the runtime lifetime of the app 

 

And at the same time you join the DStream RDDs of your actual Streaming Data 
with the above continuously updated DStream RDD representing your HDFS file 

 

From: Ilove Data [mailto:data4...@gmail.com] 
Sent: Monday, June 15, 2015 5:19 AM
To: Tathagata Das
Cc: Evo Eftimov; Akhil Das; user
Subject: Re: Join between DStream and Periodically-Changing-RDD

 

@Akhil Das

Join two Dstreams might not be an option since I want to join stream with 
historical data in HDFS folder.

 

@Tagatha Das  @Evo Eftimov

Batch RDD to be reloaded is considerably huge compare to Dstream data since it 
is historical data. To be more specific, most of join from rdd stream to hdfs 
folder (~90% of rdd streams data) will hit to recent data (last 1-2 days data) 
in hdfs folder. So it is important to get the most updated data.

 

Is there a workaround for that specific case? Since RDD are not mutable, do I 
need a K-V database for this join with historical data?

 

On Fri, Jun 12, 2015 at 8:14 AM, Tathagata Das t...@databricks.com wrote:

Another approach not mentioned is to use a function to get the RDD that is to 
be joined. Something like this.

 

 

Not sure, but you can try something like this also:

 

kvDstream.foreachRDD(rdd = {

  

  val rdd = getOrUpdateRDD(params...)

  

  rdd.join(kvFile)

  

  

})

The getOrUpdateRDD() function that you implement will get called every batch 
interval. And you can decide to return the same RDD or an updated RDD when you 
want to. Once updated, if the RDD is going to be used in multiple batch 
intervals, you should cache it. Furthermore, if you are going to join it, you 
should partition it by a partitioner, then cached it and make sure that the 
same partitioner is used for joining. That would be more efficient, as the RDD 
will stay partitioned in memory, minimizing the cost of join. 

 

 

On Wed, Jun 10, 2015 at 9:08 AM, Evo Eftimov evo.efti...@isecc.com wrote:

It depends on how big the Batch RDD requiring reloading is 

 

Reloading it for EVERY single DStream RDD would slow down the stream processing 
inline with the total time required to reload the Batch RDD …..

 

But if the Batch RDD is not that big then that might not be an issues 
especially in the context of the latency requirements for your streaming app

 

Another more efficient and real-time approach may be to represent your Batch 
RDD as a Dstraeam RDDs and keep aggregating them over the lifetime of the spark 
streaming app instance and keep joining with the actual Dstream RDDs 

 

You can feed your HDFS file into a Message Broker topic and consume it from 
there in the form of DStream RDDs which you keep aggregating over the lifetime 
of the spark streaming app instance 

 

From: Akhil Das [mailto:ak...@sigmoidanalytics.com] 
Sent: Wednesday, June 10, 2015 8:36 AM
To: Ilove Data
Cc: user@spark.apache.org
Subject: Re: Join between DStream and Periodically-Changing-RDD

 

RDD's are immutable, why not join two DStreams? 

 

Not sure, but you can try something like this also:

 

kvDstream.foreachRDD(rdd = {

  

  val file = ssc.sparkContext.textFile(/sigmoid/)

  val kvFile = file.map(x = (x.split(,)(0), x))

  

  rdd.join(kvFile)

  

  

})

 




Thanks

Best Regards

 

On Tue, Jun 9, 2015 at 7:37 PM, Ilove Data data4...@gmail.com wrote:

Hi,

 

I'm trying to join DStream with interval let say 20s, join with RDD loaded from 
HDFS folder which is changing periodically, let say new file is coming to the 
folder for every 10 minutes.

 

How should it be done, considering the HDFS files in the folder is periodically 
changing/adding new files? Do RDD automatically detect changes in HDFS folder 
as RDD source and automatically reload RDD?

 

Thanks!

Rendy

 

 

 



RE: Join between DStream and Periodically-Changing-RDD

2015-06-10 Thread Evo Eftimov
It depends on how big the Batch RDD requiring reloading is 

 

Reloading it for EVERY single DStream RDD would slow down the stream processing 
inline with the total time required to reload the Batch RDD …..

 

But if the Batch RDD is not that big then that might not be an issues 
especially in the context of the latency requirements for your streaming app

 

Another more efficient and real-time approach may be to represent your Batch 
RDD as a Dstraeam RDDs and keep aggregating them over the lifetime of the spark 
streaming app instance and keep joining with the actual Dstream RDDs 

 

You can feed your HDFS file into a Message Broker topic and consume it from 
there in the form of DStream RDDs which you keep aggregating over the lifetime 
of the spark streaming app instance 

 

From: Akhil Das [mailto:ak...@sigmoidanalytics.com] 
Sent: Wednesday, June 10, 2015 8:36 AM
To: Ilove Data
Cc: user@spark.apache.org
Subject: Re: Join between DStream and Periodically-Changing-RDD

 

RDD's are immutable, why not join two DStreams? 

 

Not sure, but you can try something like this also:

 

kvDstream.foreachRDD(rdd = {

  

  val file = ssc.sparkContext.textFile(/sigmoid/)

  val kvFile = file.map(x = (x.split(,)(0), x))

  

  rdd.join(kvFile)

  

  

})

 




Thanks

Best Regards

 

On Tue, Jun 9, 2015 at 7:37 PM, Ilove Data data4...@gmail.com wrote:

Hi,

 

I'm trying to join DStream with interval let say 20s, join with RDD loaded from 
HDFS folder which is changing periodically, let say new file is coming to the 
folder for every 10 minutes.

 

How should it be done, considering the HDFS files in the folder is periodically 
changing/adding new files? Do RDD automatically detect changes in HDFS folder 
as RDD source and automatically reload RDD?

 

Thanks!

Rendy

 



Re: Determining number of executors within RDD

2015-06-10 Thread Evo Eftimov
Yes  i think it is ONE worker ONE executor as executor is nothing but jvm 
instance spawned by the worker 

To run more executors ie jvm instances on the same physical cluster node you 
need to run more than one worker on that node and then allocate only part of 
the sys resourced to that worker/executot


Sent from Samsung Mobile

div Original message /divdivFrom: maxdml 
max...@cs.duke.edu /divdivDate:2015/06/10  19:56  (GMT+00:00) 
/divdivTo: user@spark.apache.org /divdivSubject: Re: Determining number 
of executors within RDD /divdiv
/divActually this is somehow confusing for two reasons:

- First, the option 'spark.executor.instances', which seems to be only dealt
with in the case of YARN in the source code of SparkSubmit.scala, is also
present in the conf/spark-env.sh file under the standalone section, which
would indicate that it is also available for this mode

- Second, a post from Andrew Or states that this properties define the
number of workers in the cluster, not the number of executors on a given
worker.
(http://apache-spark-user-list.1001560.n3.nabble.com/clarification-for-some-spark-on-yarn-configuration-options-td13692.html)

Could anyone clarify this? :-)

Thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Determining-number-of-executors-within-RDD-tp15554p23262.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: How to share large resources like dictionaries while processing data with Spark ?

2015-06-05 Thread Evo Eftimov
Oops, @Yiannis, sorry to be a party pooper but the Job Server is for Spark 
Batch Jobs (besides anyone can put something like that in 5 min), while I am 
under the impression that Dmytiy is working on Spark Streaming app 

 

Besides the Job Server is essentially for sharing the Spark Context between 
multiple threads 

 

Re Dmytiis intial question – you can load large data sets as Batch (Static) RDD 
from any Spark Streaming App and then join DStream RDDs  against them to 
emulate “lookups” , you can also try the “Lookup RDD” – there is a git hub 
project

 

From: Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] 
Sent: Friday, June 5, 2015 12:12 AM
To: Yiannis Gkoufas
Cc: Olivier Girardot; user@spark.apache.org
Subject: Re: How to share large resources like dictionaries while processing 
data with Spark ?

 

Thanks so much, Yiannis, Olivier, Huang!

 

On Thu, Jun 4, 2015 at 6:44 PM, Yiannis Gkoufas johngou...@gmail.com wrote:

Hi there,

 

I would recommend checking out 
https://github.com/spark-jobserver/spark-jobserver which I think gives the 
functionality you are looking for.

I haven't tested it though.

 

BR

 

On 5 June 2015 at 01:35, Olivier Girardot ssab...@gmail.com wrote:

You can use it as a broadcast variable, but if it's too large (more than 1Gb 
I guess), you may need to share it joining this using some kind of key to the 
other RDDs.

But this is the kind of thing broadcast variables were designed for.

 

Regards, 

 

Olivier.

 

Le jeu. 4 juin 2015 à 23:50, dgoldenberg dgoldenberg...@gmail.com a écrit :

We have some pipelines defined where sometimes we need to load potentially
large resources such as dictionaries.

What would be the best strategy for sharing such resources among the
transformations/actions within a consumer?  Can they be shared somehow
across the RDD's?

I'm looking for a way to load such a resource once into the cluster memory
and have it be available throughout the lifecycle of a consumer...

Thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-large-resources-like-dictionaries-while-processing-data-with-Spark-tp23162.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

 

 



RE: How to share large resources like dictionaries while processing data with Spark ?

2015-06-05 Thread Evo Eftimov
And RDD.lookup() can not be invoked from Transformations e.g. maps

 

Lookup() is an action which can be invoked only from the driver – if you want 
functionality like that from within Transformations executed on the cluster 
nodes try Indexed RDD

 

Other options are load a Batch / Static RDD once in your Spark Streaming App 
and then keep joining and then e.g. filtering every incoming DStream RDD with 
the (big static) Batch RDD

 

From: Evo Eftimov [mailto:evo.efti...@isecc.com] 
Sent: Friday, June 5, 2015 3:27 PM
To: 'Dmitry Goldenberg'
Cc: 'Yiannis Gkoufas'; 'Olivier Girardot'; 'user@spark.apache.org'
Subject: RE: How to share large resources like dictionaries while processing 
data with Spark ?

 

It is called Indexed RDD https://github.com/amplab/spark-indexedrdd 

 

From: Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] 
Sent: Friday, June 5, 2015 3:15 PM
To: Evo Eftimov
Cc: Yiannis Gkoufas; Olivier Girardot; user@spark.apache.org
Subject: Re: How to share large resources like dictionaries while processing 
data with Spark ?

 

Thanks everyone. Evo, could you provide a link to the Lookup RDD project? I 
can't seem to locate it exactly on Github. (Yes, to your point, our project is 
Spark streaming based). Thank you.

 

On Fri, Jun 5, 2015 at 6:04 AM, Evo Eftimov evo.efti...@isecc.com wrote:

Oops, @Yiannis, sorry to be a party pooper but the Job Server is for Spark 
Batch Jobs (besides anyone can put something like that in 5 min), while I am 
under the impression that Dmytiy is working on Spark Streaming app 

 

Besides the Job Server is essentially for sharing the Spark Context between 
multiple threads 

 

Re Dmytiis intial question – you can load large data sets as Batch (Static) RDD 
from any Spark Streaming App and then join DStream RDDs  against them to 
emulate “lookups” , you can also try the “Lookup RDD” – there is a git hub 
project

 

From: Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] 
Sent: Friday, June 5, 2015 12:12 AM
To: Yiannis Gkoufas
Cc: Olivier Girardot; user@spark.apache.org
Subject: Re: How to share large resources like dictionaries while processing 
data with Spark ?

 

Thanks so much, Yiannis, Olivier, Huang!

 

On Thu, Jun 4, 2015 at 6:44 PM, Yiannis Gkoufas johngou...@gmail.com wrote:

Hi there,

 

I would recommend checking out 
https://github.com/spark-jobserver/spark-jobserver which I think gives the 
functionality you are looking for.

I haven't tested it though.

 

BR

 

On 5 June 2015 at 01:35, Olivier Girardot ssab...@gmail.com wrote:

You can use it as a broadcast variable, but if it's too large (more than 1Gb 
I guess), you may need to share it joining this using some kind of key to the 
other RDDs.

But this is the kind of thing broadcast variables were designed for.

 

Regards, 

 

Olivier.

 

Le jeu. 4 juin 2015 à 23:50, dgoldenberg dgoldenberg...@gmail.com a écrit :

We have some pipelines defined where sometimes we need to load potentially
large resources such as dictionaries.

What would be the best strategy for sharing such resources among the
transformations/actions within a consumer?  Can they be shared somehow
across the RDD's?

I'm looking for a way to load such a resource once into the cluster memory
and have it be available throughout the lifecycle of a consumer...

Thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-large-resources-like-dictionaries-while-processing-data-with-Spark-tp23162.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

 

 

 



RE: How to share large resources like dictionaries while processing data with Spark ?

2015-06-05 Thread Evo Eftimov
It is called Indexed RDD https://github.com/amplab/spark-indexedrdd 

 

From: Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] 
Sent: Friday, June 5, 2015 3:15 PM
To: Evo Eftimov
Cc: Yiannis Gkoufas; Olivier Girardot; user@spark.apache.org
Subject: Re: How to share large resources like dictionaries while processing 
data with Spark ?

 

Thanks everyone. Evo, could you provide a link to the Lookup RDD project? I 
can't seem to locate it exactly on Github. (Yes, to your point, our project is 
Spark streaming based). Thank you.

 

On Fri, Jun 5, 2015 at 6:04 AM, Evo Eftimov evo.efti...@isecc.com wrote:

Oops, @Yiannis, sorry to be a party pooper but the Job Server is for Spark 
Batch Jobs (besides anyone can put something like that in 5 min), while I am 
under the impression that Dmytiy is working on Spark Streaming app 

 

Besides the Job Server is essentially for sharing the Spark Context between 
multiple threads 

 

Re Dmytiis intial question – you can load large data sets as Batch (Static) RDD 
from any Spark Streaming App and then join DStream RDDs  against them to 
emulate “lookups” , you can also try the “Lookup RDD” – there is a git hub 
project

 

From: Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] 
Sent: Friday, June 5, 2015 12:12 AM
To: Yiannis Gkoufas
Cc: Olivier Girardot; user@spark.apache.org
Subject: Re: How to share large resources like dictionaries while processing 
data with Spark ?

 

Thanks so much, Yiannis, Olivier, Huang!

 

On Thu, Jun 4, 2015 at 6:44 PM, Yiannis Gkoufas johngou...@gmail.com wrote:

Hi there,

 

I would recommend checking out 
https://github.com/spark-jobserver/spark-jobserver which I think gives the 
functionality you are looking for.

I haven't tested it though.

 

BR

 

On 5 June 2015 at 01:35, Olivier Girardot ssab...@gmail.com wrote:

You can use it as a broadcast variable, but if it's too large (more than 1Gb 
I guess), you may need to share it joining this using some kind of key to the 
other RDDs.

But this is the kind of thing broadcast variables were designed for.

 

Regards, 

 

Olivier.

 

Le jeu. 4 juin 2015 à 23:50, dgoldenberg dgoldenberg...@gmail.com a écrit :

We have some pipelines defined where sometimes we need to load potentially
large resources such as dictionaries.

What would be the best strategy for sharing such resources among the
transformations/actions within a consumer?  Can they be shared somehow
across the RDD's?

I'm looking for a way to load such a resource once into the cluster memory
and have it be available throughout the lifecycle of a consumer...

Thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-large-resources-like-dictionaries-while-processing-data-with-Spark-tp23162.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

 

 

 



RE: Spark Streaming for Each RDD - Exception on Empty

2015-06-05 Thread Evo Eftimov
Foreachpartition callback is provided with Iterator by the Spark Frameowrk – 
while iterator.hasNext() ……

 

Also check whether this is not some sort of Python Spark API bug – Python seems 
to be the foster child here – Scala and Java are the darlings

 

From: John Omernik [mailto:j...@omernik.com] 
Sent: Friday, June 5, 2015 4:08 PM
To: user
Subject: Spark Streaming for Each RDD - Exception on Empty

 

Is there pythonic/sparkonic way to test for an empty RDD before using the 
foreachRDD?  Basically I am using the Python example 
https://spark.apache.org/docs/latest/streaming-programming-guide.html to put 
records somewhere  When I have data, it works fine, when I don't I get an 
exception. I am not sure about the performance implications of just throwing an 
exception every time there is no data, but can I just test before sending it?

 

I did see one post mentioning look for take(1) from the stream to test for 
data, but I am not sure where I put that in this example... Is that in the 
lambda function? or somewhere else? Looking for pointers!

Thanks!

 

 

 

mydstream.foreachRDD(lambda rdd: rdd.foreachPartition(parseRDD))

 

 

Using this example code from the link above:

 

def sendPartition(iter):
connection = createNewConnection()
for record in iter:
connection.send(record)
connection.close()
 
dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))


RE: How to increase the number of tasks

2015-06-05 Thread Evo Eftimov
It may be that your system runs out of resources (ie 174 is the ceiling) due to 
the following 

 

1.   RDD Partition = (Spark) Task

2.   RDD Partition != (Spark) Executor

3.   (Spark) Task != (Spark) Executor

4.   (Spark) Task = JVM Thread

5.   (Spark) Executor = JVM instance 

 

From: ÐΞ€ρ@Ҝ (๏̯͡๏) [mailto:deepuj...@gmail.com] 
Sent: Friday, June 5, 2015 10:48 AM
To: user
Subject: How to increase the number of tasks

 

I have a  stage that spawns 174 tasks when i run repartition on avro data. 

Tasks read between 512/317/316/214/173  MB of data. Even if i increase number 
of executors/ number of partitions (when calling repartition) the number of 
tasks launched remains fixed to 174.

 

1) I want to speed up this task. How do i do it ?

2) Few tasks finish in 20 mins, few in 15 and few in less than 10. Why is this 
behavior ?

Since this is a repartition stage, it should not depend on the nature of data.

 

Its taking more than 30 mins and i want to speed it up by throwing more 
executors at it.

 

Please suggest

 

Deepak

 



RE: How to increase the number of tasks

2015-06-05 Thread Evo Eftimov
The param is for “Default number of partitions in RDDs returned by 
transformations like join, reduceByKey, and parallelize when NOT set by user.”

 

While Deepak is setting the number of partitions EXPLICITLY 

 

From: 李铖 [mailto:lidali...@gmail.com] 
Sent: Friday, June 5, 2015 11:08 AM
To: ÐΞ€ρ@Ҝ (๏̯͡๏)
Cc: Evo Eftimov; user
Subject: Re: How to increase the number of tasks

 

just multiply 2-4 with the cpu core number of the node .

 

2015-06-05 18:04 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com:

I did not change spark.default.parallelism,

What is recommended value for it. 

 

On Fri, Jun 5, 2015 at 3:31 PM, 李铖 lidali...@gmail.com wrote:

Did you have a change of the value of 'spark.default.parallelism'?be a bigger 
number.

 

2015-06-05 17:56 GMT+08:00 Evo Eftimov evo.efti...@isecc.com:

It may be that your system runs out of resources (ie 174 is the ceiling) due to 
the following 

 

1.   RDD Partition = (Spark) Task

2.   RDD Partition != (Spark) Executor

3.   (Spark) Task != (Spark) Executor

4.   (Spark) Task = JVM Thread

5.   (Spark) Executor = JVM instance 

 

From: ÐΞ€ρ@Ҝ (๏̯͡๏) [mailto:deepuj...@gmail.com] 
Sent: Friday, June 5, 2015 10:48 AM
To: user
Subject: How to increase the number of tasks

 

I have a  stage that spawns 174 tasks when i run repartition on avro data. 

Tasks read between 512/317/316/214/173  MB of data. Even if i increase number 
of executors/ number of partitions (when calling repartition) the number of 
tasks launched remains fixed to 174.

 

1) I want to speed up this task. How do i do it ?

2) Few tasks finish in 20 mins, few in 15 and few in less than 10. Why is this 
behavior ?

Since this is a repartition stage, it should not depend on the nature of data.

 

Its taking more than 30 mins and i want to speed it up by throwing more 
executors at it.

 

Please suggest

 

Deepak

 

 





 

-- 

Deepak

 

 



RE: How to share large resources like dictionaries while processing data with Spark ?

2015-06-05 Thread Evo Eftimov
Spark uses Tachyon internally ie all SERIALIZED IN-MEMORY RDDs are kept there – 
so if you have a BATCH RDD which is SERIALIZED IN_MEMORY then you are using 
Tachyon implicitly – the only difference is that if you are using Tachyon 
explicitly ie as a distributed, in-memory file system you can share data 
between Jobs, while an RDD is ALWAYS visible within Jobs using the same Spark 
Context 

 

From: Charles Earl [mailto:charles.ce...@gmail.com] 
Sent: Friday, June 5, 2015 12:10 PM
To: Evo Eftimov
Cc: Dmitry Goldenberg; Yiannis Gkoufas; Olivier Girardot; user@spark.apache.org
Subject: Re: How to share large resources like dictionaries while processing 
data with Spark ?

 

Would tachyon be appropriate here?

On Friday, June 5, 2015, Evo Eftimov evo.efti...@isecc.com wrote:

Oops, @Yiannis, sorry to be a party pooper but the Job Server is for Spark 
Batch Jobs (besides anyone can put something like that in 5 min), while I am 
under the impression that Dmytiy is working on Spark Streaming app 

 

Besides the Job Server is essentially for sharing the Spark Context between 
multiple threads 

 

Re Dmytiis intial question – you can load large data sets as Batch (Static) RDD 
from any Spark Streaming App and then join DStream RDDs  against them to 
emulate “lookups” , you can also try the “Lookup RDD” – there is a git hub 
project

 

From: Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com 
javascript:_e(%7B%7D,'cvml','dgoldenberg...@gmail.com'); ] 
Sent: Friday, June 5, 2015 12:12 AM
To: Yiannis Gkoufas
Cc: Olivier Girardot; user@spark.apache.org 
javascript:_e(%7B%7D,'cvml','user@spark.apache.org'); 
Subject: Re: How to share large resources like dictionaries while processing 
data with Spark ?

 

Thanks so much, Yiannis, Olivier, Huang!

 

On Thu, Jun 4, 2015 at 6:44 PM, Yiannis Gkoufas johngou...@gmail.com 
javascript:_e(%7B%7D,'cvml','johngou...@gmail.com');  wrote:

Hi there,

 

I would recommend checking out 
https://github.com/spark-jobserver/spark-jobserver which I think gives the 
functionality you are looking for.

I haven't tested it though.

 

BR

 

On 5 June 2015 at 01:35, Olivier Girardot ssab...@gmail.com 
javascript:_e(%7B%7D,'cvml','ssab...@gmail.com');  wrote:

You can use it as a broadcast variable, but if it's too large (more than 1Gb 
I guess), you may need to share it joining this using some kind of key to the 
other RDDs.

But this is the kind of thing broadcast variables were designed for.

 

Regards, 

 

Olivier.

 

Le jeu. 4 juin 2015 à 23:50, dgoldenberg dgoldenberg...@gmail.com 
javascript:_e(%7B%7D,'cvml','dgoldenberg...@gmail.com');  a écrit :

We have some pipelines defined where sometimes we need to load potentially
large resources such as dictionaries.

What would be the best strategy for sharing such resources among the
transformations/actions within a consumer?  Can they be shared somehow
across the RDD's?

I'm looking for a way to load such a resource once into the cluster memory
and have it be available throughout the lifecycle of a consumer...

Thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-large-resources-like-dictionaries-while-processing-data-with-Spark-tp23162.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
javascript:_e(%7B%7D,'cvml','user-unsubscr...@spark.apache.org'); 
For additional commands, e-mail: user-h...@spark.apache.org 
javascript:_e(%7B%7D,'cvml','user-h...@spark.apache.org'); 

 

 



-- 
- Charles



RE: Objects serialized before foreachRDD/foreachPartition ?

2015-06-03 Thread Evo Eftimov
Dmitry was concerned about the “serialization cost” NOT the “memory footprint – 
hence option a) is still viable since a Broadcast is performed only ONCE for 
the lifetime of Driver instance 

 

From: Ted Yu [mailto:yuzhih...@gmail.com] 
Sent: Wednesday, June 3, 2015 2:44 PM
To: Evo Eftimov
Cc: dgoldenberg; user
Subject: Re: Objects serialized before foreachRDD/foreachPartition ?

 

Considering memory footprint of param as mentioned by Dmitry, option b seems 
better.

 

Cheers

 

On Wed, Jun 3, 2015 at 6:27 AM, Evo Eftimov evo.efti...@isecc.com wrote:

Hmmm a spark streaming app code doesn't execute in the linear fashion
assumed in your previous code snippet - to achieve your objectives you
should do something like the following

in terms of your second objective - saving the initialization and
serialization of the params you can:

a) broadcast them
b) have them as a Singleton (initialized from e.g. params in a file on HDFS)
on each Executor

messageBodies.foreachRDD(new FunctionJavaRDDlt;String, Void() {

Param param = new Param();
param.initialize();

  @Override
  public Void call(JavaRDDString rdd) throws Exception {
ProcessPartitionFunction func = new ProcessPartitionFunction(param);
rdd.foreachPartition(func);
return null;
  }

});

//put this in e.g. the object destructor
param.deinitialize();


-Original Message-
From: dgoldenberg [mailto:dgoldenberg...@gmail.com]
Sent: Wednesday, June 3, 2015 1:56 PM
To: user@spark.apache.org
Subject: Objects serialized before foreachRDD/foreachPartition ?

I'm looking at https://spark.apache.org/docs/latest/tuning.html.  Basically
the takeaway is that all objects passed into the code processing RDD's must
be serializable. So if I've got a few objects that I'd rather initialize
once and deinitialize once outside of the logic processing the RDD's, I'd
need to think twice about the costs of serializing such objects, it would
seem.

In the below, does the Spark serialization happen before calling foreachRDD
or before calling foreachPartition?

Param param = new Param();
param.initialize();
messageBodies.foreachRDD(new FunctionJavaRDDlt;String, Void() {
  @Override
  public Void call(JavaRDDString rdd) throws Exception {
ProcessPartitionFunction func = new ProcessPartitionFunction(param);
rdd.foreachPartition(func);
return null;
  }
});
param.deinitialize();

If param gets initialized to a significant memory footprint, are we better
off creating/initializing it before calling new ProcessPartitionFunction()
or perhaps in the 'call' method within that function?

I'm trying to avoid calling expensive init()/deinit() methods while
balancing against the serialization costs. Thanks.



--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Objects-serialized-befor 
http://apache-spark-user-list.1001560.n3.nabble.com/Objects-serialized-before-foreachRDD-foreachPartition-tp23134.html
 
e-foreachRDD-foreachPartition-tp23134.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

 



RE: Objects serialized before foreachRDD/foreachPartition ?

2015-06-03 Thread Evo Eftimov
Hmmm a spark streaming app code doesn't execute in the linear fashion
assumed in your previous code snippet - to achieve your objectives you
should do something like the following 

in terms of your second objective - saving the initialization and
serialization of the params you can:

a) broadcast them
b) have them as a Singleton (initialized from e.g. params in a file on HDFS)
on each Executor  

messageBodies.foreachRDD(new FunctionJavaRDDlt;String, Void() {

Param param = new Param();
param.initialize();

  @Override
  public Void call(JavaRDDString rdd) throws Exception {
ProcessPartitionFunction func = new ProcessPartitionFunction(param);
rdd.foreachPartition(func);
return null;
  }

});

//put this in e.g. the object destructor 
param.deinitialize();

-Original Message-
From: dgoldenberg [mailto:dgoldenberg...@gmail.com] 
Sent: Wednesday, June 3, 2015 1:56 PM
To: user@spark.apache.org
Subject: Objects serialized before foreachRDD/foreachPartition ?

I'm looking at https://spark.apache.org/docs/latest/tuning.html.  Basically
the takeaway is that all objects passed into the code processing RDD's must
be serializable. So if I've got a few objects that I'd rather initialize
once and deinitialize once outside of the logic processing the RDD's, I'd
need to think twice about the costs of serializing such objects, it would
seem.

In the below, does the Spark serialization happen before calling foreachRDD
or before calling foreachPartition?

Param param = new Param();
param.initialize();
messageBodies.foreachRDD(new FunctionJavaRDDlt;String, Void() {
  @Override
  public Void call(JavaRDDString rdd) throws Exception {
ProcessPartitionFunction func = new ProcessPartitionFunction(param);
rdd.foreachPartition(func);
return null;
  }
});
param.deinitialize();

If param gets initialized to a significant memory footprint, are we better
off creating/initializing it before calling new ProcessPartitionFunction()
or perhaps in the 'call' method within that function?

I'm trying to avoid calling expensive init()/deinit() methods while
balancing against the serialization costs. Thanks.



--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Objects-serialized-befor
e-foreachRDD-foreachPartition-tp23134.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-06-03 Thread Evo Eftimov
Makes sense especially if you have a cloud with “infinite” resources / nodes 
which allows you to double, triple etc in the background/parallel the resources 
of the currently running cluster 

 

I was thinking more about the scenario where you have e.g. 100 boxes and want 
to / can add e.g. 20 more 

 

From: Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] 
Sent: Wednesday, June 3, 2015 4:46 PM
To: Evo Eftimov
Cc: Cody Koeninger; Andrew Or; Gerard Maas; spark users
Subject: Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of 
growth in Kafka or Spark's metrics?

 

Evo,

 

One of the ideas is to shadow the current cluster. This way there's no extra 
latency incurred due to shutting down of the consumers. If two sets of 
consumers are running, potentially processing the same data, that is OK. We 
phase out the older cluster and gradually flip over to the new one, insuring no 
downtime or extra latency.  Thoughts?

 

On Wed, Jun 3, 2015 at 11:27 AM, Evo Eftimov evo.efti...@isecc.com wrote:

You should monitor vital performance / job clogging stats of the Spark 
Streaming Runtime not “kafka topics”

 

You should be able to bring new worker nodes online and make them contact and 
register with the Master without bringing down the Master (or any of the 
currently running worker nodes) 

 

Then just shutdown your currently running spark streaming job/app and restart 
it with new params to take advantage of the larger cluster 

 

From: Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] 
Sent: Wednesday, June 3, 2015 4:14 PM
To: Cody Koeninger
Cc: Andrew Or; Evo Eftimov; Gerard Maas; spark users
Subject: Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of 
growth in Kafka or Spark's metrics?

 

Would it be possible to implement Spark autoscaling somewhat along these lines? 
--

 

1. If we sense that a new machine is needed, by watching the data load in Kafka 
topic(s), then

2. Provision a new machine via a Provisioner interface (e.g. talk to AWS and 
get a machine);

3. Create a shadow/mirror Spark master running alongside the initial version 
which talks to N machines. The new mirror version is aware of N+1 machines (or 
N+M if we had decided we needed M new boxes).

4. The previous version of the Spark runtime is acquiesced/decommissioned.  We 
possibly get both clusters working on the same data which may actually be OK 
(at least for our specific use-cases).

5. Now the new Spark cluster is running.

 

Similarly, the decommissioning of M unused boxes would happen, via this notion 
of a mirror Spark runtime.  How feasible would it be for such a mirrorlike 
setup to be created, especially created programmatically?  Especially point #3.

 

The other idea we'd entertained was to bring in a new machine, acquiesce down 
all currently running workers by telling them to process their current batch 
then shut down, then restart the consumers now that Spark is aware of a 
modified cluster.  This has the drawback of a downtime that may not be 
tolerable in terms of latency, by the system's clients waiting for their 
responses in a synchronous fashion.

 

Thanks.

 

On Thu, May 28, 2015 at 5:15 PM, Cody Koeninger c...@koeninger.org wrote:

I'm not sure that points 1 and 2 really apply to the kafka direct stream.  
There are no receivers, and you know at the driver how big each of your batches 
is.

 

On Thu, May 28, 2015 at 2:21 PM, Andrew Or and...@databricks.com wrote:

Hi all,

 

As the author of the dynamic allocation feature I can offer a few insights here.

 

Gerard's explanation was both correct and concise: dynamic allocation is not 
intended to be used in Spark streaming at the moment (1.4 or before). This is 
because of two things:

 

(1) Number of receivers is necessarily fixed, and these are started in 
executors. Since we need a receiver for each InputDStream, if we kill these 
receivers we essentially stop the stream, which is not what we want. It makes 
little sense to close and restart a stream the same way we kill and relaunch 
executors.

 

(2) Records come in every batch, and when there is data to process your 
executors are not idle. If your idle timeout is less than the batch duration, 
then you'll end up having to constantly kill and restart executors. If your 
idle timeout is greater than the batch duration, then you'll never kill 
executors.

 

Long answer short, with Spark streaming there is currently no straightforward 
way to scale the size of your cluster. I had a long discussion with TD (Spark 
streaming lead) about what needs to be done to provide some semblance of 
dynamic scaling to streaming applications, e.g. take into account the batch 
queue instead. We came up with a few ideas that I will not detail here, but we 
are looking into this and do intend to support it in the near future.

 

-Andrew

 

 

 

2015-05-28 8:02 GMT-07:00 Evo Eftimov evo.efti...@isecc.com:

 

Probably you should ALWAYS keep the RDD storage policy to MEMORY

RE: Value for SPARK_EXECUTOR_CORES

2015-05-28 Thread Evo Eftimov
I don’t think the number of CPU cores controls the “number of parallel tasks”. 
The number of Tasks corresponds first and foremost to the number of (Dstream) 
RDD Partitions  

 

The Spark documentation doesn’t mention what is meant by “Task” in terms of 
Standard Multithreading Terminology ie a Thread or Process so your point is 
good 

 

Ps: time and time again every product and dev team and company invent their own 
terminology so 50% of the time using the product is spent on deciphering and 
reinventing the wheel 

 

From: Mulugeta Mammo [mailto:mulugeta.abe...@gmail.com] 
Sent: Thursday, May 28, 2015 7:24 PM
To: Ruslan Dautkhanov
Cc: user
Subject: Re: Value for SPARK_EXECUTOR_CORES

 

Thanks for the valuable information. The blog states:

 

The cores property controls the number of concurrent tasks an executor can 
run. --executor-cores 5 means that each executor can run a maximum of five 
tasks at the same time.  

 

So, I guess the max number of executor-cores I can assign is the CPU count 
(which includes the number of threads per core), not just the number of cores. 
I just want to be sure the cores term Spark is using.

 

Thanks

 

On Thu, May 28, 2015 at 11:16 AM, Ruslan Dautkhanov dautkha...@gmail.com 
wrote:

It's not only about cores. Keep in mind spark.executor.cores also affects 
available memeory for each task:

 

From 
http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/

 

The memory available to each task is (spark.executor.memory * 
spark.shuffle.memoryFraction 
*spark.shuffle.safetyFraction)/spark.executor.cores. Memory fraction and safety 
fraction default to 0.2 and 0.8 respectively.




I'd test spark.executor.cores with 2,4,8 and 16 and see what makes your job run 
faster..

 


-- 
Ruslan Dautkhanov

 

On Wed, May 27, 2015 at 6:46 PM, Mulugeta Mammo mulugeta.abe...@gmail.com 
wrote:

My executor has the following spec (lscpu):

 

CPU(s): 16

Core(s) per socket: 4

Socket(s): 2

Thread(s) per code: 2

 

The CPU count is obviously 4*2*2 = 16. My question is what value is Spark 
expecting in SPARK_EXECUTOR_CORES ? The CPU count (16) or total # of cores (2 * 
2 = 4) ?

 

Thanks

 

 



RE: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-05-28 Thread Evo Eftimov
@DG; The key metrics should be

 

-  Scheduling delay – its ideal state is to remain constant over time 
and ideally be less than the time of the microbatch window 

-  The average job processing time should remain less than the 
micro-batch window

-  Number of Lost Jobs – even if there is a single Job lost that means 
that you have lost all messages for the DStream RDD processed by that job due 
to the previously described spark streaming memory leak condition and 
subsequent crash – described in previous postings submitted by me

 

You can even go one step further and periodically issue “get/check free memory” 
to see whether it is decreasing relentlessly at a constant rate – if it touches 
a predetermined RAM threshold that should be your third metric 

 

Re the “back pressure” mechanism – this is a Feedback Loop mechanism and you 
can implement one on your own without waiting for Jiras and new features 
whenever they might be implemented by the Spark dev team – moreover you can 
avoid using slow mechanisms such as ZooKeeper and even incorporate some Machine 
Learning in your Feedback Loop to make it handle the message consumption rate 
more intelligently and benefit from ongoing online learning – BUT this is STILL 
about voluntarily sacrificing your performance in the name of keeping your 
system stable – it is not about scaling your system/solution 

 

In terms of how to scale the Spark Framework Dynamically – even though this is 
not supported at the moment out of the box I guess you can have a sys 
management framework spin dynamically a few more boxes (spark worker nodes), 
stop dynamically your currently running Spark Streaming Job, relaunch it with 
new params e.g. more Receivers, larger number of Partitions (hence tasks), more 
RAM per executor etc. Obviously this will cause some temporary delay in fact 
interruption in your processing but if the business use case can tolerate that 
then go for it 

 

From: Gerard Maas [mailto:gerard.m...@gmail.com] 
Sent: Thursday, May 28, 2015 12:36 PM
To: dgoldenberg
Cc: spark users
Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in 
Kafka or Spark's metrics?

 

Hi,

 

tl;dr At the moment (with a BIG disclaimer *) elastic scaling of spark 
streaming processes is not supported. 

 

 

Longer version.

 

I assume that you are talking about Spark Streaming as the discussion is about 
handing Kafka streaming data.

 

Then you have two things to consider: the Streaming receivers and the Spark 
processing cluster.

 

Currently, the receiving topology is static. One receiver is allocated with 
each DStream instantiated and it will use 1 core in the cluster. Once the 
StreamingContext is started, this topology cannot be changed, therefore the 
number of Kafka receivers is fixed for the lifetime of your DStream. 

What we do is to calculate the cluster capacity and use that as a fixed upper 
bound (with a margin) for the receiver throughput.

 

There's work in progress to add a reactive model to the receiver, where 
backpressure can be applied to handle overload conditions. See 
https://issues.apache.org/jira/browse/SPARK-7398

 

Once the data is received, it will be processed in a 'classical' Spark 
pipeline, so previous posts on spark resource scheduling might apply.

 

Regarding metrics, the standard metrics subsystem of spark will report 
streaming job performance. Check the driver's metrics endpoint to peruse the 
available metrics:

 

driver:ui-port/metrics/json

 

-kr, Gerard.

 

 

(*) Spark is a project that moves so fast that statements might be invalidated 
by new work every minute.

 

On Thu, May 28, 2015 at 1:21 AM, dgoldenberg dgoldenberg...@gmail.com wrote:

Hi,

I'm trying to understand if there are design patterns for autoscaling Spark
(add/remove slave machines to the cluster) based on the throughput.

Assuming we can throttle Spark consumers, the respective Kafka topics we
stream data from would start growing.  What are some of the ways to generate
the metrics on the number of new messages and the rate they are piling up?
This perhaps is more of a Kafka question; I see a pretty sparse javadoc with
the Metric interface and not much else...

What are some of the ways to expand/contract the Spark cluster? Someone has
mentioned Mesos...

I see some info on Spark metrics in  the Spark monitoring guide
https://spark.apache.org/docs/latest/monitoring.html  .  Do we want to
perhaps implement a custom sink that would help us autoscale up or down
based on the throughput?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Autoscaling-Spark-cluster-based-on-topic-sizes-rate-of-growth-in-Kafka-or-Spark-s-metrics-tp23062.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: 

Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-05-28 Thread Evo Eftimov
You can always spin new boxes in the background and bring them into the cluster 
fold when fully operational and time that with job relaunch and param change

Kafka offsets are mabaged automatically for you by the kafka clients which keep 
them in zoomeeper dont worry about that ad long as you shut down your job 
gracefuly. Besides msnaging the offsets explicitly is not a big deal if 
necessary


Sent from Samsung Mobile

div Original message /divdivFrom: Dmitry Goldenberg 
dgoldenberg...@gmail.com /divdivDate:2015/05/28  13:16  (GMT+00:00) 
/divdivTo: Evo Eftimov evo.efti...@isecc.com /divdivCc: Gerard Maas 
gerard.m...@gmail.com,spark users user@spark.apache.org /divdivSubject: 
Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or 
Spark's metrics? /divdiv
/divThanks, Evo.  Per the last part of your comment, it sounds like we will 
need to implement a job manager which will be in control of starting the jobs, 
monitoring the status of the Kafka topic(s), shutting jobs down and marking 
them as ones to relaunch, scaling the cluster up/down by adding/removing 
machines, and relaunching the 'suspended' (shut down) jobs.

I suspect that relaunching the jobs may be tricky since that means keeping 
track of the starter offsets in Kafka topic(s) from which the jobs started 
working on.

Ideally, we'd want to avoid a re-launch.  The 'suspension' and relaunching of 
jobs, coupled with the wait for the new machines to come online may turn out 
quite time-consuming which will make for lengthy request times, and our 
requests are not asynchronous.  Ideally, the currently running jobs would 
continue to run on the machines currently available in the cluster.

In the scale-down case, the job manager would want to signal to Spark's job 
scheduler not to send work to the node being taken out, find out when the last 
job has finished running on the node, then take the node out.

This is somewhat like changing the number of cylinders in a car engine while 
the car is running...

Sounds like a great candidate for a set of enhancements in Spark...

On Thu, May 28, 2015 at 7:52 AM, Evo Eftimov evo.efti...@isecc.com wrote:
@DG; The key metrics should be

 

-  Scheduling delay – its ideal state is to remain constant over time 
and ideally be less than the time of the microbatch window

-  The average job processing time should remain less than the 
micro-batch window

-  Number of Lost Jobs – even if there is a single Job lost that means 
that you have lost all messages for the DStream RDD processed by that job due 
to the previously described spark streaming memory leak condition and 
subsequent crash – described in previous postings submitted by me

 

You can even go one step further and periodically issue “get/check free memory” 
to see whether it is decreasing relentlessly at a constant rate – if it touches 
a predetermined RAM threshold that should be your third metric

 

Re the “back pressure” mechanism – this is a Feedback Loop mechanism and you 
can implement one on your own without waiting for Jiras and new features 
whenever they might be implemented by the Spark dev team – moreover you can 
avoid using slow mechanisms such as ZooKeeper and even incorporate some Machine 
Learning in your Feedback Loop to make it handle the message consumption rate 
more intelligently and benefit from ongoing online learning – BUT this is STILL 
about voluntarily sacrificing your performance in the name of keeping your 
system stable – it is not about scaling your system/solution

 

In terms of how to scale the Spark Framework Dynamically – even though this is 
not supported at the moment out of the box I guess you can have a sys 
management framework spin dynamically a few more boxes (spark worker nodes), 
stop dynamically your currently running Spark Streaming Job, relaunch it with 
new params e.g. more Receivers, larger number of Partitions (hence tasks), more 
RAM per executor etc. Obviously this will cause some temporary delay in fact 
interruption in your processing but if the business use case can tolerate that 
then go for it

 

From: Gerard Maas [mailto:gerard.m...@gmail.com] 
Sent: Thursday, May 28, 2015 12:36 PM
To: dgoldenberg
Cc: spark users
Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in 
Kafka or Spark's metrics?

 

Hi,

 

tl;dr At the moment (with a BIG disclaimer *) elastic scaling of spark 
streaming processes is not supported. 

 

 

Longer version.

 

I assume that you are talking about Spark Streaming as the discussion is about 
handing Kafka streaming data.

 

Then you have two things to consider: the Streaming receivers and the Spark 
processing cluster.

 

Currently, the receiving topology is static. One receiver is allocated with 
each DStream instantiated and it will use 1 core in the cluster. Once the 
StreamingContext is started, this topology cannot be changed, therefore the 
number of Kafka receivers

RE: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-05-28 Thread Evo Eftimov
Probably you should ALWAYS keep the RDD storage policy to MEMORY AND DISK – it 
will be your insurance policy against sys crashes due to memory leaks. Until 
there is free RAM, spark streaming (spark) will NOT resort to disk – and of 
course resorting to disk from time to time (ie when there is no free RAM ) and 
taking a performance hit from that, BUT only until there is no free RAM 

 

From: Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] 
Sent: Thursday, May 28, 2015 2:34 PM
To: Evo Eftimov
Cc: Gerard Maas; spark users
Subject: Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of 
growth in Kafka or Spark's metrics?

 

Evo, good points.

 

On the dynamic resource allocation, I'm surmising this only works within a 
particular cluster setup.  So it improves the usage of current cluster 
resources but it doesn't make the cluster itself elastic. At least, that's my 
understanding.

 

Memory + disk would be good and hopefully it'd take *huge* load on the system 
to start exhausting the disk space too.  I'd guess that falling onto disk will 
make things significantly slower due to the extra I/O.

 

Perhaps we'll really want all of these elements eventually.  I think we'd want 
to start with memory only, keeping maxRate low enough not to overwhelm the 
consumers; implement the cluster autoscaling.  We might experiment with dynamic 
resource allocation before we get to implement the cluster autoscale.

 

 

 

On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov evo.efti...@isecc.com wrote:

You can also try Dynamic Resource Allocation

 

https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation
 

 

Also re the Feedback Loop for automatic message consumption rate adjustment – 
there is a “dumb” solution option – simply set the storage policy for the 
DStream RDDs to MEMORY AND DISK – when the memory gets exhausted spark 
streaming will resort to keeping new RDDs on disk which will prevent it from 
crashing and hence loosing them. Then some memory will get freed and it will 
resort back to RAM and so on and so forth  

 

 

Sent from Samsung Mobile

 Original message 

From: Evo Eftimov 

Date:2015/05/28 13:22 (GMT+00:00) 

To: Dmitry Goldenberg 

Cc: Gerard Maas ,spark users 

Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in 
Kafka or Spark's metrics? 

 

You can always spin new boxes in the background and bring them into the cluster 
fold when fully operational and time that with job relaunch and param change

 

Kafka offsets are mabaged automatically for you by the kafka clients which keep 
them in zoomeeper dont worry about that ad long as you shut down your job 
gracefuly. Besides msnaging the offsets explicitly is not a big deal if 
necessary

 

 

Sent from Samsung Mobile

 

 Original message 

From: Dmitry Goldenberg 

Date:2015/05/28 13:16 (GMT+00:00) 

To: Evo Eftimov 

Cc: Gerard Maas ,spark users 

Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in 
Kafka or Spark's metrics? 

 

Thanks, Evo.  Per the last part of your comment, it sounds like we will need to 
implement a job manager which will be in control of starting the jobs, 
monitoring the status of the Kafka topic(s), shutting jobs down and marking 
them as ones to relaunch, scaling the cluster up/down by adding/removing 
machines, and relaunching the 'suspended' (shut down) jobs.

 

I suspect that relaunching the jobs may be tricky since that means keeping 
track of the starter offsets in Kafka topic(s) from which the jobs started 
working on.

 

Ideally, we'd want to avoid a re-launch.  The 'suspension' and relaunching of 
jobs, coupled with the wait for the new machines to come online may turn out 
quite time-consuming which will make for lengthy request times, and our 
requests are not asynchronous.  Ideally, the currently running jobs would 
continue to run on the machines currently available in the cluster.

 

In the scale-down case, the job manager would want to signal to Spark's job 
scheduler not to send work to the node being taken out, find out when the last 
job has finished running on the node, then take the node out.

 

This is somewhat like changing the number of cylinders in a car engine while 
the car is running...

 

Sounds like a great candidate for a set of enhancements in Spark...

 

On Thu, May 28, 2015 at 7:52 AM, Evo Eftimov evo.efti...@isecc.com wrote:

@DG; The key metrics should be

 

-  Scheduling delay – its ideal state is to remain constant over time 
and ideally be less than the time of the microbatch window 

-  The average job processing time should remain less than the 
micro-batch window

-  Number of Lost Jobs – even if there is a single Job lost that means 
that you have lost all messages for the DStream RDD processed by that job due 
to the previously described spark streaming memory leak condition and 
subsequent crash – described

RE: How does spark manage the memory of executor with multiple tasks

2015-05-26 Thread Evo Eftimov
An Executor is a JVM instance spawned and running on a Cluster Node (Server 
machine). Task is essentially a JVM Thread – you can have as many Threads as 
you want per JVM. You will also hear about “Executor Slots” – these are 
essentially the CPU Cores available on the machine and granted for use to the 
Executor 

 

Ps: what creates ongoing confusion here is that the Spark folks have “invented” 
their own terms to describe the design of their what is essentially a 
Distributed OO Framework facilitating Parallel Programming and Data Management 
in a Distributed Environment, BUT have not provided clear 
dictionary/explanations linking these “inventions” with standard concepts 
familiar to every Java, Scala etc developer  

 

From: canan chen [mailto:ccn...@gmail.com] 
Sent: Tuesday, May 26, 2015 9:02 AM
To: user@spark.apache.org
Subject: How does spark manage the memory of executor with multiple tasks

 

Since spark can run multiple tasks in one executor, so I am curious to know how 
does spark manage memory across these tasks. Say if one executor takes 1GB 
memory, then if this executor can run 10 tasks simultaneously, then each task 
can consume 100MB on average. Do I understand it correctly ? It doesn't make 
sense to me that spark run multiple tasks in one executor. 



RE: How does spark manage the memory of executor with multiple tasks

2015-05-26 Thread Evo Eftimov
This is the first time I hear that “one can specify the RAM per task” – the RAM 
is granted per Executor (JVM). On the other hand each Task operates on ONE RDD 
Partition – so you can say that this is “the RAM allocated to the Task to 
process” – but it is still within the boundaries allocated to the Executor 
(JVM) within which the Task is running. Also while running, any Task like any 
JVM Thread can request as much additional RAM e.g. for new Object instances  as 
there is available in the Executor aka JVM Heap  

 

From: canan chen [mailto:ccn...@gmail.com] 
Sent: Tuesday, May 26, 2015 9:30 AM
To: Evo Eftimov
Cc: user@spark.apache.org
Subject: Re: How does spark manage the memory of executor with multiple tasks

 

Yes, I know that one task represent a JVM thread. This is what I confused. 
Usually users want to specify the memory on task level, so how can I do it if 
task if thread level and multiple tasks runs in the same executor. And even I 
don't know how many threads there will be. Besides that, if one task cause OOM, 
it would cause other tasks in the same executor fail too. There's no isolation 
between tasks.  

 

On Tue, May 26, 2015 at 4:15 PM, Evo Eftimov evo.efti...@isecc.com wrote:

An Executor is a JVM instance spawned and running on a Cluster Node (Server 
machine). Task is essentially a JVM Thread – you can have as many Threads as 
you want per JVM. You will also hear about “Executor Slots” – these are 
essentially the CPU Cores available on the machine and granted for use to the 
Executor 

 

Ps: what creates ongoing confusion here is that the Spark folks have “invented” 
their own terms to describe the design of their what is essentially a 
Distributed OO Framework facilitating Parallel Programming and Data Management 
in a Distributed Environment, BUT have not provided clear 
dictionary/explanations linking these “inventions” with standard concepts 
familiar to every Java, Scala etc developer  

 

From: canan chen [mailto:ccn...@gmail.com] 
Sent: Tuesday, May 26, 2015 9:02 AM
To: user@spark.apache.org
Subject: How does spark manage the memory of executor with multiple tasks

 

Since spark can run multiple tasks in one executor, so I am curious to know how 
does spark manage memory across these tasks. Say if one executor takes 1GB 
memory, then if this executor can run 10 tasks simultaneously, then each task 
can consume 100MB on average. Do I understand it correctly ? It doesn't make 
sense to me that spark run multiple tasks in one executor. 

 



Re: How does spark manage the memory of executor with multiple tasks

2015-05-26 Thread Evo Eftimov
 the link you sent says multiple executors per node

Worker is just demon process launching Executors / JVMs so it can execute tasks 
- it does that by cooperating with the master and the driver 

There is a one to one maping between Executor and JVM 


Sent from Samsung Mobile

div Original message /divdivFrom: Arush Kharbanda 
ar...@sigmoidanalytics.com /divdivDate:2015/05/26  10:55  (GMT+00:00) 
/divdivTo: canan chen ccn...@gmail.com /divdivCc: Evo Eftimov 
evo.efti...@isecc.com,user@spark.apache.org /divdivSubject: Re: How does 
spark manage the memory of executor with multiple tasks /divdiv
/divHi Evo,

Worker is the JVM and an executor runs on the JVM. And after Spark 1.4 you 
would be able to run multiple executors on the same JVM/worker.

https://issues.apache.org/jira/browse/SPARK-1706.

Thanks
Arush

On Tue, May 26, 2015 at 2:54 PM, canan chen ccn...@gmail.com wrote:
I think the concept of task in spark should be on the same level of task in MR. 
Usually in MR, we need to specify the memory the each mapper/reducer task. And 
I believe executor is not a user-facing concept, it's a spark internal concept. 
For spark users they don't need to know the concept of executor, but need to 
know the concept of task. 

On Tue, May 26, 2015 at 5:09 PM, Evo Eftimov evo.efti...@isecc.com wrote:
This is the first time I hear that “one can specify the RAM per task” – the RAM 
is granted per Executor (JVM). On the other hand each Task operates on ONE RDD 
Partition – so you can say that this is “the RAM allocated to the Task to 
process” – but it is still within the boundaries allocated to the Executor 
(JVM) within which the Task is running. Also while running, any Task like any 
JVM Thread can request as much additional RAM e.g. for new Object instances  as 
there is available in the Executor aka JVM Heap  

 

From: canan chen [mailto:ccn...@gmail.com] 
Sent: Tuesday, May 26, 2015 9:30 AM
To: Evo Eftimov
Cc: user@spark.apache.org
Subject: Re: How does spark manage the memory of executor with multiple tasks

 

Yes, I know that one task represent a JVM thread. This is what I confused. 
Usually users want to specify the memory on task level, so how can I do it if 
task if thread level and multiple tasks runs in the same executor. And even I 
don't know how many threads there will be. Besides that, if one task cause OOM, 
it would cause other tasks in the same executor fail too. There's no isolation 
between tasks.  

 

On Tue, May 26, 2015 at 4:15 PM, Evo Eftimov evo.efti...@isecc.com wrote:

An Executor is a JVM instance spawned and running on a Cluster Node (Server 
machine). Task is essentially a JVM Thread – you can have as many Threads as 
you want per JVM. You will also hear about “Executor Slots” – these are 
essentially the CPU Cores available on the machine and granted for use to the 
Executor

 

Ps: what creates ongoing confusion here is that the Spark folks have “invented” 
their own terms to describe the design of their what is essentially a 
Distributed OO Framework facilitating Parallel Programming and Data Management 
in a Distributed Environment, BUT have not provided clear 
dictionary/explanations linking these “inventions” with standard concepts 
familiar to every Java, Scala etc developer  

 

From: canan chen [mailto:ccn...@gmail.com] 
Sent: Tuesday, May 26, 2015 9:02 AM
To: user@spark.apache.org
Subject: How does spark manage the memory of executor with multiple tasks

 

Since spark can run multiple tasks in one executor, so I am curious to know how 
does spark manage memory across these tasks. Say if one executor takes 1GB 
memory, then if this executor can run 10 tasks simultaneously, then each task 
can consume 100MB on average. Do I understand it correctly ? It doesn't make 
sense to me that spark run multiple tasks in one executor. 

 





-- 


Arush Kharbanda || Technical Teamlead

ar...@sigmoidanalytics.com || www.sigmoidanalytics.com

RE: Storing spark processed output to Database asynchronously.

2015-05-22 Thread Evo Eftimov
If the message consumption rate is higher than the time required to process ALL 
data for a micro batch (ie the next RDD produced for your stream)   the 
following  happens – lets say that e.g. your micro batch time is 3 sec:

 

1.   Based on your message streaming and consumption rate, you get e.g. a 
500 MB RDD to be processed during the next 3 sec micro batch 

2.   However the work performed on the RDD by your streaming job takes more 
than 3 sec

3.   In the meantime the next RDD comes in and occupies another 500MB and 
so on and so forth until bm the current iteration of the job crashes due to 
what is essentially a memory exhaustion (no more free ram for the next RDD) due 
to what is essentially a memory leak  

 

The above can be called a design a flaw because Spark Streaming seems to rely 
on the default behavior of Spark Batch which is to remove In Memory Only RDDs 
when there is no more free memory in the system, however in a batch context 
Spark Batch can always recreate a removed RDD from e.g. the file system, while 
in a streaming context the data is gone for ever 

 

You can check whether the above behavior is the reason for your lost messages 
by reviewing the Driver logs for exceptions AND/OR simply using the Spark UI to 
see whether your streaming app has any LOST JOBS and how many – each lost job 
is a lost RDD is a lost messages 

 

The above can be overcome by using one of the following measures:

 

1.   Set the Receiver rate to a level which will allow your job to complete 
within the time for micro-batch (obviously you are limiting voluntarily your 
performance in this way)

2.   Throw more boxes/cores/ram at the problem and also  improve the 
performance of your tasks performing the work on the messages (e.g. review and 
refactor the code)

3.   Set the Storage Mode of the RDDs to “Memory AND Disk” – this will keep 
using the RAM until there is free space and then switch to disk rather than 
crashing miserably and losing the affected job iteration and all its messages – 
obviously every time it has to resort to the disk your performance will get hit 

 

From: Tathagata Das [mailto:t...@databricks.com] 
Sent: Friday, May 22, 2015 8:55 PM
To: Gautam Bajaj
Cc: user
Subject: Re: Storing spark processed output to Database asynchronously.

 

Something does not make sense. Receivers (currently) does not get blocked 
(unless rate limit has been set) due to processing load. The receiver will 
continue to receive data and store it in memory and until it is processed. So I 
am still not sure how the data loss is happening. Unless you are sending data 
at a faster rate than the receiver can handle (that more than the max rate the 
receiver can save data in memory and replicate to other nodes). 

 

In general, if you are particular about data loss, then UDP is not really a 
good choice in the first place. If you can try using TCP, try it. It would at 
least eliminate the possibility that I mentioned above. Ultimately if you try 
sending data faster that the receiver can handle (independent of whether 
processing can handle), then you will loose data if you are using UDP. You have 
to use TCP to naturally control the sending rate to match the receiving rate in 
the receiver, without dropping data.

 

 

On Fri, May 22, 2015 at 1:25 AM, Gautam Bajaj gautam1...@gmail.com wrote:

This is just a friendly ping, just to remind you of my query.

Also, is there a possible explanation/example on the usage of AsyncRDDActions 
in Java ?

 

On Thu, May 21, 2015 at 7:18 PM, Gautam Bajaj gautam1...@gmail.com wrote:

I am received data at UDP port 8060 and doing processing on it using Spark and 
storing the output in Neo4j.

But the data I'm receiving and the data that is getting stored doesn't match 
probably because Neo4j API takes too long to push the data into database. 
Meanwhile, Spark is unable to receive data probably because the process is 
blocked.

 

On Thu, May 21, 2015 at 5:28 PM, Tathagata Das t...@databricks.com wrote:

Can you elaborate on how the data loss is occurring?

 

 

On Thu, May 21, 2015 at 1:10 AM, Gautam Bajaj gautam1...@gmail.com wrote:

That is completely alright, as the system will make sure the works get done.

My major concern is, the data drop. Will using async stop data loss? 

 

On Thu, May 21, 2015 at 4:55 PM, Tathagata Das t...@databricks.com wrote:

If you cannot push data as fast as you are generating it, then async isnt going 
to help either. The work is just going to keep piling up as many many async 
jobs even though your batch processing times will be low as that processing 
time is not going to reflect how much of overall work is pending in the system.

 

On Wed, May 20, 2015 at 10:28 PM, Gautam Bajaj gautam1...@gmail.com wrote:

Hi,

 

From my understanding of Spark Streaming, I created a spark entry point, for 
continuous UDP data, using:

 

SparkConf conf = new 

RE: Storing spark processed output to Database asynchronously.

2015-05-22 Thread Evo Eftimov
… and measure 4 is to implement a custom Feedback Loop to e.g.to  monitor the 
amount of free RAM and number of queued jobs and automatically decrease the 
message consumption  rate of the Receiver until the number of clogged RDDs and 
Jobs subsides (again here you artificially decrease your performance in the 
name of the reliability/integrity of your system ie not loosing messages)

 

From: Evo Eftimov [mailto:evo.efti...@isecc.com] 
Sent: Friday, May 22, 2015 9:39 PM
To: 'Tathagata Das'; 'Gautam Bajaj'
Cc: 'user'
Subject: RE: Storing spark processed output to Database asynchronously.

 

If the message consumption rate is higher than the time required to process ALL 
data for a micro batch (ie the next RDD produced for your stream)   the 
following  happens – lets say that e.g. your micro batch time is 3 sec:

 

1.   Based on your message streaming and consumption rate, you get e.g. a 
500 MB RDD to be processed during the next 3 sec micro batch 

2.   However the work performed on the RDD by your streaming job takes more 
than 3 sec

3.   In the meantime the next RDD comes in and occupies another 500MB and 
so on and so forth until bm the current iteration of the job crashes due to 
what is essentially a memory exhaustion (no more free ram for the next RDD) due 
to what is essentially a memory leak  

 

The above can be called a design a flaw because Spark Streaming seems to rely 
on the default behavior of Spark Batch which is to remove In Memory Only RDDs 
when there is no more free memory in the system, however in a batch context 
Spark Batch can always recreate a removed RDD from e.g. the file system, while 
in a streaming context the data is gone for ever 

 

You can check whether the above behavior is the reason for your lost messages 
by reviewing the Driver logs for exceptions AND/OR simply using the Spark UI to 
see whether your streaming app has any LOST JOBS and how many – each lost job 
is a lost RDD is a lost messages 

 

The above can be overcome by using one of the following measures:

 

1.   Set the Receiver rate to a level which will allow your job to complete 
within the time for micro-batch (obviously you are limiting voluntarily your 
performance in this way)

2.   Throw more boxes/cores/ram at the problem and also  improve the 
performance of your tasks performing the work on the messages (e.g. review and 
refactor the code)

3.   Set the Storage Mode of the RDDs to “Memory AND Disk” – this will keep 
using the RAM until there is free space and then switch to disk rather than 
crashing miserably and losing the affected job iteration and all its messages – 
obviously every time it has to resort to the disk your performance will get hit 

 

From: Tathagata Das [mailto:t...@databricks.com] 
Sent: Friday, May 22, 2015 8:55 PM
To: Gautam Bajaj
Cc: user
Subject: Re: Storing spark processed output to Database asynchronously.

 

Something does not make sense. Receivers (currently) does not get blocked 
(unless rate limit has been set) due to processing load. The receiver will 
continue to receive data and store it in memory and until it is processed. So I 
am still not sure how the data loss is happening. Unless you are sending data 
at a faster rate than the receiver can handle (that more than the max rate the 
receiver can save data in memory and replicate to other nodes). 

 

In general, if you are particular about data loss, then UDP is not really a 
good choice in the first place. If you can try using TCP, try it. It would at 
least eliminate the possibility that I mentioned above. Ultimately if you try 
sending data faster that the receiver can handle (independent of whether 
processing can handle), then you will loose data if you are using UDP. You have 
to use TCP to naturally control the sending rate to match the receiving rate in 
the receiver, without dropping data.

 

 

On Fri, May 22, 2015 at 1:25 AM, Gautam Bajaj gautam1...@gmail.com wrote:

This is just a friendly ping, just to remind you of my query.

Also, is there a possible explanation/example on the usage of AsyncRDDActions 
in Java ?

 

On Thu, May 21, 2015 at 7:18 PM, Gautam Bajaj gautam1...@gmail.com wrote:

I am received data at UDP port 8060 and doing processing on it using Spark and 
storing the output in Neo4j.

But the data I'm receiving and the data that is getting stored doesn't match 
probably because Neo4j API takes too long to push the data into database. 
Meanwhile, Spark is unable to receive data probably because the process is 
blocked.

 

On Thu, May 21, 2015 at 5:28 PM, Tathagata Das t...@databricks.com wrote:

Can you elaborate on how the data loss is occurring?

 

 

On Thu, May 21, 2015 at 1:10 AM, Gautam Bajaj gautam1...@gmail.com wrote:

That is completely alright, as the system will make sure the works get done.

My major concern is, the data drop. Will using async stop data loss? 

 

On Thu, May 21, 2015 at 4:55 PM, Tathagata Das t...@databricks.com

Re: Spark Streaming: all tasks running on one executor (Kinesis + Mongodb)

2015-05-22 Thread Evo Eftimov
A receiver occupies a cpu core, an executor is simply a jvm instance and as 
such it can be granted any number of cores and ram

So check how many cores you have per executor


Sent from Samsung Mobile

div Original message /divdivFrom: Mike Trienis 
mike.trie...@orcsol.com /divdivDate:2015/05/22  21:51  (GMT+00:00) 
/divdivTo: user@spark.apache.org /divdivSubject: Re: Spark Streaming: 
all tasks running on one executor (Kinesis + Mongodb) /divdiv
/divI guess each receiver occupies a executor. So there was only one executor 
available for processing the job. 

On Fri, May 22, 2015 at 1:24 PM, Mike Trienis mike.trie...@orcsol.com wrote:
Hi All,

I have cluster of four nodes (three workers and one master, with one core each) 
which consumes data from Kinesis at 15 second intervals using two streams (i.e. 
receivers). The job simply grabs the latest batch and pushes it to MongoDB. I 
believe that the problem is that all tasks are executed on a single worker node 
and never distributed to the others. This is true even after I set the number 
of concurrentJobs to 3. Overall, I would really like to increase throughput 
(i.e. more than 500 records / second) and understand why all executors are not 
being utilized. 

Here are some parameters I have set: 
spark.streaming.blockInterval       200
spark.locality.wait 500
spark.streaming.concurrentJobs      3
This is the code that's actually doing the writing:

def write(rdd: RDD[Data], time:Time) : Unit = {
    val result = doSomething(rdd, time)
    result.foreachPartition { i =
        i.foreach(record = connection.insert(record))
    }
}

def doSomething(rdd: RDD[Data]) : RDD[MyObject] = {
    rdd.flatMap(MyObject)
}

Any ideas as to how to improve the throughput?

Thanks, Mike. 



RE: Spark Streaming and Drools

2015-05-22 Thread Evo Eftimov
OR you can run Drools in a Central Server Mode ie as a common/shared service, 
but that would slowdown your Spark Streaming job due to the remote network call 
which will have to be generated for every single message 

 

From: Evo Eftimov [mailto:evo.efti...@isecc.com] 
Sent: Friday, May 22, 2015 11:22 AM
To: 'Evo Eftimov'; 'Antonio Giambanco'
Cc: 'user@spark.apache.org'
Subject: RE: Spark Streaming and Drools

 

The only “tricky” bit would be when you want to manage/update the Rule Base in 
your Drools Engines already running as Singletons in Executor JVMs on Worker 
Nodes. The invocation of Drools from Spark Streaming to evaluate a Rule already 
loaded in Drools is not a problem.  

 

From: Evo Eftimov [mailto:evo.efti...@isecc.com] 
Sent: Friday, May 22, 2015 11:20 AM
To: 'Antonio Giambanco'
Cc: 'user@spark.apache.org'
Subject: RE: Spark Streaming and Drools

 

I am not aware of existing examples but you can always “ask” Google 

 

Basically from Spark Streaming perspective, Drools is a third-party Software 
Library, you would invoke it in the same way as any other third-party software 
library from the Tasks (maps, filters etc) within your DAG job 

 

From: Antonio Giambanco [mailto:antogia...@gmail.com] 
Sent: Friday, May 22, 2015 11:07 AM
To: Evo Eftimov
Cc: user@spark.apache.org
Subject: Re: Spark Streaming and Drools

 

Thanks a lot Evo,

do you know where I can find some examples?

Have a great one




A G

 

2015-05-22 12:00 GMT+02:00 Evo Eftimov evo.efti...@isecc.com:

You can deploy and invoke Drools as a Singleton on every Spark Worker Node / 
Executor / Worker JVM

 

You can invoke it from e.g. map, filter etc and use the result from the Rule to 
make decision how to transform/filter an event/message 

 

From: Antonio Giambanco [mailto:antogia...@gmail.com] 
Sent: Friday, May 22, 2015 9:43 AM
To: user@spark.apache.org
Subject: Spark Streaming and Drools

 

Hi All,

I'm deploying and architecture that uses flume for sending log information in a 
sink.

Spark streaming read from this sink (pull strategy) e process al this 
information, during this process I would like to make some event processing. . 
. for example:

Log appender writes information about all transactions in my trading platforms,

if a platform user sells more than buy during a week I need to receive an alert 
on an event dashboard.

How can I realize it? Is it possible with drools?

Thanks so much

 



RE: Spark Streaming and Drools

2015-05-22 Thread Evo Eftimov
I am not aware of existing examples but you can always “ask” Google 

 

Basically from Spark Streaming perspective, Drools is a third-party Software 
Library, you would invoke it in the same way as any other third-party software 
library from the Tasks (maps, filters etc) within your DAG job 

 

From: Antonio Giambanco [mailto:antogia...@gmail.com] 
Sent: Friday, May 22, 2015 11:07 AM
To: Evo Eftimov
Cc: user@spark.apache.org
Subject: Re: Spark Streaming and Drools

 

Thanks a lot Evo,

do you know where I can find some examples?

Have a great one




A G

 

2015-05-22 12:00 GMT+02:00 Evo Eftimov evo.efti...@isecc.com:

You can deploy and invoke Drools as a Singleton on every Spark Worker Node / 
Executor / Worker JVM

 

You can invoke it from e.g. map, filter etc and use the result from the Rule to 
make decision how to transform/filter an event/message 

 

From: Antonio Giambanco [mailto:antogia...@gmail.com] 
Sent: Friday, May 22, 2015 9:43 AM
To: user@spark.apache.org
Subject: Spark Streaming and Drools

 

Hi All,

I'm deploying and architecture that uses flume for sending log information in a 
sink.

Spark streaming read from this sink (pull strategy) e process al this 
information, during this process I would like to make some event processing. . 
. for example:

Log appender writes information about all transactions in my trading platforms,

if a platform user sells more than buy during a week I need to receive an alert 
on an event dashboard.

How can I realize it? Is it possible with drools?

Thanks so much

 



RE: Spark Streaming and Drools

2015-05-22 Thread Evo Eftimov
You can deploy and invoke Drools as a Singleton on every Spark Worker Node / 
Executor / Worker JVM

 

You can invoke it from e.g. map, filter etc and use the result from the Rule to 
make decision how to transform/filter an event/message 

 

From: Antonio Giambanco [mailto:antogia...@gmail.com] 
Sent: Friday, May 22, 2015 9:43 AM
To: user@spark.apache.org
Subject: Spark Streaming and Drools

 

Hi All,

I'm deploying and architecture that uses flume for sending log information in a 
sink.

Spark streaming read from this sink (pull strategy) e process al this 
information, during this process I would like to make some event processing. . 
. for example:

Log appender writes information about all transactions in my trading platforms,

if a platform user sells more than buy during a week I need to receive an alert 
on an event dashboard.

How can I realize it? Is it possible with drools?

Thanks so much



RE: Spark Streaming and Drools

2015-05-22 Thread Evo Eftimov
The only “tricky” bit would be when you want to manage/update the Rule Base in 
your Drools Engines already running as Singletons in Executor JVMs on Worker 
Nodes. The invocation of Drools from Spark Streaming to evaluate a Rule already 
loaded in Drools is not a problem.  

 

From: Evo Eftimov [mailto:evo.efti...@isecc.com] 
Sent: Friday, May 22, 2015 11:20 AM
To: 'Antonio Giambanco'
Cc: 'user@spark.apache.org'
Subject: RE: Spark Streaming and Drools

 

I am not aware of existing examples but you can always “ask” Google 

 

Basically from Spark Streaming perspective, Drools is a third-party Software 
Library, you would invoke it in the same way as any other third-party software 
library from the Tasks (maps, filters etc) within your DAG job 

 

From: Antonio Giambanco [mailto:antogia...@gmail.com] 
Sent: Friday, May 22, 2015 11:07 AM
To: Evo Eftimov
Cc: user@spark.apache.org
Subject: Re: Spark Streaming and Drools

 

Thanks a lot Evo,

do you know where I can find some examples?

Have a great one




A G

 

2015-05-22 12:00 GMT+02:00 Evo Eftimov evo.efti...@isecc.com:

You can deploy and invoke Drools as a Singleton on every Spark Worker Node / 
Executor / Worker JVM

 

You can invoke it from e.g. map, filter etc and use the result from the Rule to 
make decision how to transform/filter an event/message 

 

From: Antonio Giambanco [mailto:antogia...@gmail.com] 
Sent: Friday, May 22, 2015 9:43 AM
To: user@spark.apache.org
Subject: Spark Streaming and Drools

 

Hi All,

I'm deploying and architecture that uses flume for sending log information in a 
sink.

Spark streaming read from this sink (pull strategy) e process al this 
information, during this process I would like to make some event processing. . 
. for example:

Log appender writes information about all transactions in my trading platforms,

if a platform user sells more than buy during a week I need to receive an alert 
on an event dashboard.

How can I realize it? Is it possible with drools?

Thanks so much

 



RE: Intermittent difficulties for Worker to contact Master on same machine in standalone

2015-05-20 Thread Evo Eftimov
Check whether the name can be resolved in the /etc/hosts file (or DNS) of the 
worker 

 

(the same btw applies for the Node where you run the driver app – all other 
nodes must be able to resolve its name)

 

From: Stephen Boesch [mailto:java...@gmail.com] 
Sent: Wednesday, May 20, 2015 10:07 AM
To: user
Subject: Intermittent difficulties for Worker to contact Master on same machine 
in standalone

 

 

What conditions would cause the following delays / failure for a standalone 
machine/cluster to have the Worker contact the Master?

 

15/05/20 02:02:53 INFO WorkerWebUI: Started WorkerWebUI at http://10.0.0.3:8081

15/05/20 02:02:53 INFO Worker: Connecting to master 
akka.tcp://sparkMaster@mellyrn.local:7077/user/Master...

15/05/20 02:02:53 WARN Remoting: Tried to associate with unreachable remote 
address [akka.tcp://sparkMaster@mellyrn.local:7077]. Address is now gated for 
5000 ms, all messages to this address will be delivered to dead letters. 
Reason: Connection refused: mellyrn.local/10.0.0.3:7077

15/05/20 02:03:04 INFO Worker: Retrying connection to master (attempt # 1)

..

..

15/05/20 02:03:26 INFO Worker: Retrying connection to master (attempt # 3)

15/05/20 02:03:26 INFO Worker: Connecting to master 
akka.tcp://sparkMaster@mellyrn.local:7077/user/Master...

15/05/20 02:03:26 WARN Remoting: Tried to associate with unreachable remote 
address [akka.tcp://sparkMaster@mellyrn.local:7077]. Address is now gated for 
5000 ms, all messages to this address will be delivered to dead letters. 
Reason: Connection refused: mellyrn.local/10.0.0.3:7077



RE: Spark 1.3.1 Performance Tuning/Patterns for Data Generation Heavy/Throughput Jobs

2015-05-19 Thread Evo Eftimov
Is that a Spark or Spark Streaming application 

 

Re the map transformation which is required you can also try flatMap

 

Finally an Executor is essentially a JVM spawn by a Spark Worker Node or YARN – 
giving 60GB RAM to a single JVM will certainly result in “off the charts” GC. I 
would suggest to experiment with the following two things:

 

1.   Give less RAM to each Executor but have more Executor including more 
than one Executor per Node especially if the ratio RAM to CPU Cores is favorable

2.   Use Memory Serialized RDDs – this will store them still in RAM but in 
Java Object Serialized form and Spark uses Tachion for that purpose – a 
distributed In Memory File System – and it is Off the JVM Heap and hence avoids 
GC 

 

From: Night Wolf [mailto:nightwolf...@gmail.com] 
Sent: Tuesday, May 19, 2015 9:36 AM
To: user@spark.apache.org
Subject: Spark 1.3.1 Performance Tuning/Patterns for Data Generation 
Heavy/Throughput Jobs

 

Hi all,

 

I have a job that, for every row, creates about 20 new objects (i.e. RDD of 100 
rows in = RDD 2000 rows out). The reason for this is each row is tagged with a 
list of the 'buckets' or 'windows' it belongs to. 

 

The actual data is about 10 billion rows. Each executor has 60GB of memory.

 

Currently I have a mapPartitions task that is doing this object creation in a 
Scala Map and then returning the HashMap as an iterator via .toIterator. 

 

Is there a more efficient way to do this (assuming I can't use something like 
flatMap). 

 

The job runs (assuming each task size is small enough). But the GC time is 
understandably off the charts. 

 

I've reduced the spark cache memory percentage to 0.05 (as I just need space 
for a few broadcasts and this is a data churn task). I've left the shuffle 
memory percent unchanged. 

 

What kinds of settings should I be tuning with regards to GC? 

 

Looking at 
https://spark-summit.org/2014/wp-content/uploads/2015/03/SparkSummitEast2015-AdvDevOps-StudentSlides.pdf
 slide 125 recommends some settings but I'm not sure what would be best here). 
I tried using -XX:+UseG1GC but it pretty much causes my job to fail (all the 
executors die). Are there any tips with respect to the ratio of new gen and old 
gen space when creating lots of objects which will live in a data structure 
until the entire partition is processed? 

 

Any tips for tuning these kinds of jobs would be helpful!

 

Thanks,

~N

 



RE: Spark Streaming and reducing latency

2015-05-18 Thread Evo Eftimov
Who are “we” and what is the mysterious “back-pressuring mechanism” and is it 
part of the Spark Distribution (are you talking about implementation of the 
custom feedback loop mentioned in my previous emails below)- asking these 
because I can assure you that at least as of Spark Streaming 1.2.0, as Evo says 
Spark Streaming DOES crash in “unceremonious way” when the free RAM available 
for In Memory Cashed RDDs gets exhausted 

 

From: Akhil Das [mailto:ak...@sigmoidanalytics.com] 
Sent: Monday, May 18, 2015 2:03 PM
To: Evo Eftimov
Cc: Dmitry Goldenberg; user@spark.apache.org
Subject: Re: Spark Streaming and reducing latency

 

We fix the receivers rate at which it should consume at any given point of 
time. Also we have a back-pressuring mechanism attached to the receivers so it 
won't simply crashes in the unceremonious way like Evo said. Mesos has some 
sort of auto-scaling (read it somewhere), may be you can look into that also.




Thanks

Best Regards

 

On Mon, May 18, 2015 at 5:20 PM, Evo Eftimov evo.efti...@isecc.com wrote:

And if you want to genuinely “reduce the latency” (still within the boundaries 
of the micro-batch) THEN you need to design and finely tune the Parallel 
Programming / Execution Model of your application. The objective/metric here is:

 

a)  Consume all data within your selected micro-batch window WITHOUT any 
artificial message rate limits

b)  The above will result in a certain size of Dstream RDD per micro-batch. 

c)   The objective now is to Process that RDD WITHIN the time of the 
micro-batch (and also account for temporary message rate spike etc which may 
further increase the size of the RDD) – this will avoid any clogging up of the 
app and will process your messages at the lowest latency possible in a 
micro-batch architecture 

d)  You achieve the objective stated in c by designing, varying and 
experimenting with various aspects of the Spark Streaming Parallel Programming 
and Execution Model – e.g. number of receivers, number of threads per receiver, 
number of executors, number of cores, RAM allocated to executors, number of RDD 
partitions which correspond to the number of parallel threads operating on the 
RDD etc etc  

 

Re the “unceremonious removal of DStream RDDs” from RAM by Spark Streaming when 
the available RAM is exhausted due to high message rate and which crashes your 
(hence clogged up) application the name of the condition is:

 

Loss was due to java.lang.Exception   

java.lang.Exception: Could not compute split, block
input-4-1410542878200 not found

 

From: Evo Eftimov [mailto:evo.efti...@isecc.com] 
Sent: Monday, May 18, 2015 12:13 PM
To: 'Dmitry Goldenberg'; 'Akhil Das'
Cc: 'user@spark.apache.org'
Subject: RE: Spark Streaming and reducing latency

 

You can use

 


spark.streaming.receiver.maxRate

not set

Maximum rate (number of records per second) at which each receiver will receive 
data. Effectively, each stream will consume at most this number of records per 
second. Setting this configuration to 0 or a negative number will put no limit 
on the rate. See the deployment guide 
https://spark.apache.org/docs/latest/streaming-programming-guide.html#deploying-applications
  in the Spark Streaming programing guide for mode details.

 

 

Another way is to implement a feedback loop in your receivers monitoring the 
performance metrics of your application/job and based on that adjusting 
automatically the receiving rate – BUT all these have nothing to do  with 
“reducing the latency” – they simply prevent your application/job from clogging 
up – the nastier effect of which is when S[ark Streaming starts removing In 
Memory RDDs from RAM before they are processed by the job – that works fine in 
Spark Batch (ie removing RDDs from RAM based on LRU) but in Spark Streaming 
when done in this “unceremonious way” it simply Crashes the application

 

From: Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] 
Sent: Monday, May 18, 2015 11:46 AM
To: Akhil Das
Cc: user@spark.apache.org
Subject: Re: Spark Streaming and reducing latency

 

Thanks, Akhil. So what do folks typically do to increase/contract the capacity? 
Do you plug in some cluster auto-scaling solution to make this elastic?

 

Does Spark have any hooks for instrumenting auto-scaling?

In other words, how do you avoid overwheling the receivers in a scenario when 
your system's input can be unpredictable, based on users' activity?


On May 17, 2015, at 11:04 AM, Akhil Das ak...@sigmoidanalytics.com wrote:

With receiver based streaming, you can actually specify 
spark.streaming.blockInterval which is the interval at which the receiver will 
fetch data from the source. Default value is 200ms and hence if your batch 
duration is 1 second, it will produce 5 blocks of data. And yes, with 
sparkstreaming when your processing time goes beyond your batch duration and 
you are having a higher data consumption then you will overwhelm the receiver's 
memory and hence

RE: Spark Streaming and reducing latency

2015-05-18 Thread Evo Eftimov
Ooow – that is essentially the custom feedback loop mentioned in my previous 
emails in generic Architecture Terms and what you have done is only one of the 
possible implementations moreover based on Zookeeper – there are other possible 
designs not using things like zookeeper at all and hence achieving much lower 
latency and responsiveness 

 

Can I also give you a friendly advice – there is a long way FROM 
“we=Sigmoid and our custom sigmoid solution”, TO your earlier statement that 
Spark Streaming does “NOT” crash UNCEREMNOUSLY – please maintain responsible 
and objective communication and facts 

 

From: Akhil Das [mailto:ak...@sigmoidanalytics.com] 
Sent: Monday, May 18, 2015 2:28 PM
To: Evo Eftimov
Cc: Dmitry Goldenberg; user@spark.apache.org
Subject: Re: Spark Streaming and reducing latency

 

we = Sigmoid

 

back-pressuring mechanism = Stoping the receiver from receiving more messages 
when its about to exhaust the worker memory. Here's a similar 
https://issues.apache.org/jira/browse/SPARK-7398  kind of proposal if you 
haven't seen already.

 

 




Thanks

Best Regards

 

On Mon, May 18, 2015 at 6:53 PM, Evo Eftimov evo.efti...@isecc.com wrote:

Who are “we” and what is the mysterious “back-pressuring mechanism” and is it 
part of the Spark Distribution (are you talking about implementation of the 
custom feedback loop mentioned in my previous emails below)- asking these 
because I can assure you that at least as of Spark Streaming 1.2.0, as Evo says 
Spark Streaming DOES crash in “unceremonious way” when the free RAM available 
for In Memory Cashed RDDs gets exhausted 

 

From: Akhil Das [mailto:ak...@sigmoidanalytics.com] 
Sent: Monday, May 18, 2015 2:03 PM
To: Evo Eftimov
Cc: Dmitry Goldenberg; user@spark.apache.org


Subject: Re: Spark Streaming and reducing latency

 

We fix the receivers rate at which it should consume at any given point of 
time. Also we have a back-pressuring mechanism attached to the receivers so it 
won't simply crashes in the unceremonious way like Evo said. Mesos has some 
sort of auto-scaling (read it somewhere), may be you can look into that also.




Thanks

Best Regards

 

On Mon, May 18, 2015 at 5:20 PM, Evo Eftimov evo.efti...@isecc.com wrote:

And if you want to genuinely “reduce the latency” (still within the boundaries 
of the micro-batch) THEN you need to design and finely tune the Parallel 
Programming / Execution Model of your application. The objective/metric here is:

 

a)  Consume all data within your selected micro-batch window WITHOUT any 
artificial message rate limits

b)  The above will result in a certain size of Dstream RDD per micro-batch. 

c)   The objective now is to Process that RDD WITHIN the time of the 
micro-batch (and also account for temporary message rate spike etc which may 
further increase the size of the RDD) – this will avoid any clogging up of the 
app and will process your messages at the lowest latency possible in a 
micro-batch architecture 

d)  You achieve the objective stated in c by designing, varying and 
experimenting with various aspects of the Spark Streaming Parallel Programming 
and Execution Model – e.g. number of receivers, number of threads per receiver, 
number of executors, number of cores, RAM allocated to executors, number of RDD 
partitions which correspond to the number of parallel threads operating on the 
RDD etc etc  

 

Re the “unceremonious removal of DStream RDDs” from RAM by Spark Streaming when 
the available RAM is exhausted due to high message rate and which crashes your 
(hence clogged up) application the name of the condition is:

 

Loss was due to java.lang.Exception   

java.lang.Exception: Could not compute split, block
input-4-1410542878200 not found

 

From: Evo Eftimov [mailto:evo.efti...@isecc.com] 
Sent: Monday, May 18, 2015 12:13 PM
To: 'Dmitry Goldenberg'; 'Akhil Das'
Cc: 'user@spark.apache.org'
Subject: RE: Spark Streaming and reducing latency

 

You can use

 


spark.streaming.receiver.maxRate

not set

Maximum rate (number of records per second) at which each receiver will receive 
data. Effectively, each stream will consume at most this number of records per 
second. Setting this configuration to 0 or a negative number will put no limit 
on the rate. See the deployment guide 
https://spark.apache.org/docs/latest/streaming-programming-guide.html#deploying-applications
  in the Spark Streaming programing guide for mode details.

 

 

Another way is to implement a feedback loop in your receivers monitoring the 
performance metrics of your application/job and based on that adjusting 
automatically the receiving rate – BUT all these have nothing to do  with 
“reducing the latency” – they simply prevent your application/job from clogging 
up – the nastier effect of which is when S[ark Streaming starts removing In 
Memory RDDs from RAM before they are processed by the job – that works fine in 
Spark Batch (ie removing RDDs from RAM based on LRU

RE: Spark Streaming and reducing latency

2015-05-18 Thread Evo Eftimov
My pleasure young man, i will even go beynd the so called heads up and send 
you a solution design for Feedback Loop preventing spark streaming app clogging 
and resource depletion and featuring machine learning based self-tunning AND 
which is not zookeeper based and hence offers lower latency

Ps: ultimately though remember that none of this stuff is part of spark 
streming as of yet


Sent from Samsung Mobile

div Original message /divdivFrom: Akhil Das 
ak...@sigmoidanalytics.com /divdivDate:2015/05/18  16:56  (GMT+00:00) 
/divdivTo: Evo Eftimov evo.efti...@isecc.com /divdivCc: 
user@spark.apache.org /divdivSubject: RE: Spark Streaming and reducing 
latency /divdiv
/divThanks for the heads up mate.

On 18 May 2015 19:08, Evo Eftimov evo.efti...@isecc.com wrote:
Ooow – that is essentially the custom feedback loop mentioned in my previous 
emails in generic Architecture Terms and what you have done is only one of the 
possible implementations moreover based on Zookeeper – there are other possible 
designs not using things like zookeeper at all and hence achieving much lower 
latency and responsiveness

 

Can I also give you a friendly advice – there is a long way FROM 
“we=Sigmoid and our custom sigmoid solution”, TO your earlier statement that 
Spark Streaming does “NOT” crash UNCEREMNOUSLY – please maintain responsible 
and objective communication and facts

 

From: Akhil Das [mailto:ak...@sigmoidanalytics.com] 
Sent: Monday, May 18, 2015 2:28 PM
To: Evo Eftimov
Cc: Dmitry Goldenberg; user@spark.apache.org
Subject: Re: Spark Streaming and reducing latency

 

we = Sigmoid

 

back-pressuring mechanism = Stoping the receiver from receiving more messages 
when its about to exhaust the worker memory. Here's a similar kind of proposal 
if you haven't seen already.

 

 



Thanks

Best Regards

 

On Mon, May 18, 2015 at 6:53 PM, Evo Eftimov evo.efti...@isecc.com wrote:

Who are “we” and what is the mysterious “back-pressuring mechanism” and is it 
part of the Spark Distribution (are you talking about implementation of the 
custom feedback loop mentioned in my previous emails below)- asking these 
because I can assure you that at least as of Spark Streaming 1.2.0, as Evo says 
Spark Streaming DOES crash in “unceremonious way” when the free RAM available 
for In Memory Cashed RDDs gets exhausted

 

From: Akhil Das [mailto:ak...@sigmoidanalytics.com] 
Sent: Monday, May 18, 2015 2:03 PM
To: Evo Eftimov
Cc: Dmitry Goldenberg; user@spark.apache.org


Subject: Re: Spark Streaming and reducing latency

 

We fix the receivers rate at which it should consume at any given point of 
time. Also we have a back-pressuring mechanism attached to the receivers so it 
won't simply crashes in the unceremonious way like Evo said. Mesos has some 
sort of auto-scaling (read it somewhere), may be you can look into that also.



Thanks

Best Regards

 

On Mon, May 18, 2015 at 5:20 PM, Evo Eftimov evo.efti...@isecc.com wrote:

And if you want to genuinely “reduce the latency” (still within the boundaries 
of the micro-batch) THEN you need to design and finely tune the Parallel 
Programming / Execution Model of your application. The objective/metric here is:

 

a)  Consume all data within your selected micro-batch window WITHOUT any 
artificial message rate limits

b)  The above will result in a certain size of Dstream RDD per micro-batch.

c)   The objective now is to Process that RDD WITHIN the time of the 
micro-batch (and also account for temporary message rate spike etc which may 
further increase the size of the RDD) – this will avoid any clogging up of the 
app and will process your messages at the lowest latency possible in a 
micro-batch architecture

d)  You achieve the objective stated in c by designing, varying and 
experimenting with various aspects of the Spark Streaming Parallel Programming 
and Execution Model – e.g. number of receivers, number of threads per receiver, 
number of executors, number of cores, RAM allocated to executors, number of RDD 
partitions which correspond to the number of parallel threads operating on the 
RDD etc etc  

 

Re the “unceremonious removal of DStream RDDs” from RAM by Spark Streaming when 
the available RAM is exhausted due to high message rate and which crashes your 
(hence clogged up) application the name of the condition is:

 

Loss was due to java.lang.Exception  

java.lang.Exception: Could not compute split, block
input-4-1410542878200 not found

 

From: Evo Eftimov [mailto:evo.efti...@isecc.com] 
Sent: Monday, May 18, 2015 12:13 PM
To: 'Dmitry Goldenberg'; 'Akhil Das'
Cc: 'user@spark.apache.org'
Subject: RE: Spark Streaming and reducing latency

 

You can use

 

spark.streaming.receiver.maxRate

not set

Maximum rate (number of records per second) at which each receiver will receive 
data. Effectively, each stream will consume at most this number of records per 
second. Setting this configuration to 0 or a negative number

RE: Spark Streaming and reducing latency

2015-05-18 Thread Evo Eftimov
You can use

 


spark.streaming.receiver.maxRate

not set

Maximum rate (number of records per second) at which each receiver will receive 
data. Effectively, each stream will consume at most this number of records per 
second. Setting this configuration to 0 or a negative number will put no limit 
on the rate. See the deployment guide 
https://spark.apache.org/docs/latest/streaming-programming-guide.html#deploying-applications
  in the Spark Streaming programing guide for mode details.

 

 

Another way is to implement a feedback loop in your receivers monitoring the 
performance metrics of your application/job and based on that adjusting 
automatically the receiving rate – BUT all these have nothing to do  with 
“reducing the latency” – they simply prevent your application/job from clogging 
up – the nastier effect of which is when S[ark Streaming starts removing In 
Memory RDDs from RAM before they are processed by the job – that works fine in 
Spark Batch (ie removing RDDs from RAM based on LRU) but in Spark Streaming 
when done in this “unceremonious way” it simply Crashes the application

 

From: Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] 
Sent: Monday, May 18, 2015 11:46 AM
To: Akhil Das
Cc: user@spark.apache.org
Subject: Re: Spark Streaming and reducing latency

 

Thanks, Akhil. So what do folks typically do to increase/contract the capacity? 
Do you plug in some cluster auto-scaling solution to make this elastic?

 

Does Spark have any hooks for instrumenting auto-scaling?

In other words, how do you avoid overwheling the receivers in a scenario when 
your system's input can be unpredictable, based on users' activity?


On May 17, 2015, at 11:04 AM, Akhil Das ak...@sigmoidanalytics.com wrote:

With receiver based streaming, you can actually specify 
spark.streaming.blockInterval which is the interval at which the receiver will 
fetch data from the source. Default value is 200ms and hence if your batch 
duration is 1 second, it will produce 5 blocks of data. And yes, with 
sparkstreaming when your processing time goes beyond your batch duration and 
you are having a higher data consumption then you will overwhelm the receiver's 
memory and hence will throw up block not found exceptions. 




Thanks

Best Regards

 

On Sun, May 17, 2015 at 7:21 PM, dgoldenberg dgoldenberg...@gmail.com wrote:

I keep hearing the argument that the way Discretized Streams work with Spark
Streaming is a lot more of a batch processing algorithm than true streaming.
For streaming, one would expect a new item, e.g. in a Kafka topic, to be
available to the streaming consumer immediately.

With the discretized streams, streaming is done with batch intervals i.e.
the consumer has to wait the interval to be able to get at the new items. If
one wants to reduce latency it seems the only way to do this would be by
reducing the batch interval window. However, that may lead to a great deal
of churn, with many requests going into Kafka out of the consumers,
potentially with no results whatsoever as there's nothing new in the topic
at the moment.

Is there a counter-argument to this reasoning? What are some of the general
approaches to reduce latency  folks might recommend? Or, perhaps there are
ways of dealing with this at the streaming API level?

If latency is of great concern, is it better to look into streaming from
something like Flume where data is pushed to consumers rather than pulled by
them? Are there techniques, in that case, to ensure the consumers don't get
overwhelmed with new data?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-and-reducing-latency-tp22922.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

 



RE: Spark Streaming and reducing latency

2015-05-18 Thread Evo Eftimov
And if you want to genuinely “reduce the latency” (still within the boundaries 
of the micro-batch) THEN you need to design and finely tune the Parallel 
Programming / Execution Model of your application. The objective/metric here is:

 

a)  Consume all data within your selected micro-batch window WITHOUT any 
artificial message rate limits

b)  The above will result in a certain size of Dstream RDD per micro-batch. 

c)   The objective now is to Process that RDD WITHIN the time of the 
micro-batch (and also account for temporary message rate spike etc which may 
further increase the size of the RDD) – this will avoid any clogging up of the 
app and will process your messages at the lowest latency possible in a 
micro-batch architecture 

d)  You achieve the objective stated in c by designing, varying and 
experimenting with various aspects of the Spark Streaming Parallel Programming 
and Execution Model – e.g. number of receivers, number of threads per receiver, 
number of executors, number of cores, RAM allocated to executors, number of RDD 
partitions which correspond to the number of parallel threads operating on the 
RDD etc etc  

 

Re the “unceremonious removal of DStream RDDs” from RAM by Spark Streaming when 
the available RAM is exhausted due to high message rate and which crashes your 
(hence clogged up) application the name of the condition is:

 

Loss was due to java.lang.Exception   

java.lang.Exception: Could not compute split, block
input-4-1410542878200 not found

 

From: Evo Eftimov [mailto:evo.efti...@isecc.com] 
Sent: Monday, May 18, 2015 12:13 PM
To: 'Dmitry Goldenberg'; 'Akhil Das'
Cc: 'user@spark.apache.org'
Subject: RE: Spark Streaming and reducing latency

 

You can use

 


spark.streaming.receiver.maxRate

not set

Maximum rate (number of records per second) at which each receiver will receive 
data. Effectively, each stream will consume at most this number of records per 
second. Setting this configuration to 0 or a negative number will put no limit 
on the rate. See the deployment guide 
https://spark.apache.org/docs/latest/streaming-programming-guide.html#deploying-applications
  in the Spark Streaming programing guide for mode details.

 

 

Another way is to implement a feedback loop in your receivers monitoring the 
performance metrics of your application/job and based on that adjusting 
automatically the receiving rate – BUT all these have nothing to do  with 
“reducing the latency” – they simply prevent your application/job from clogging 
up – the nastier effect of which is when S[ark Streaming starts removing In 
Memory RDDs from RAM before they are processed by the job – that works fine in 
Spark Batch (ie removing RDDs from RAM based on LRU) but in Spark Streaming 
when done in this “unceremonious way” it simply Crashes the application

 

From: Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] 
Sent: Monday, May 18, 2015 11:46 AM
To: Akhil Das
Cc: user@spark.apache.org
Subject: Re: Spark Streaming and reducing latency

 

Thanks, Akhil. So what do folks typically do to increase/contract the capacity? 
Do you plug in some cluster auto-scaling solution to make this elastic?

 

Does Spark have any hooks for instrumenting auto-scaling?

In other words, how do you avoid overwheling the receivers in a scenario when 
your system's input can be unpredictable, based on users' activity?


On May 17, 2015, at 11:04 AM, Akhil Das ak...@sigmoidanalytics.com wrote:

With receiver based streaming, you can actually specify 
spark.streaming.blockInterval which is the interval at which the receiver will 
fetch data from the source. Default value is 200ms and hence if your batch 
duration is 1 second, it will produce 5 blocks of data. And yes, with 
sparkstreaming when your processing time goes beyond your batch duration and 
you are having a higher data consumption then you will overwhelm the receiver's 
memory and hence will throw up block not found exceptions. 




Thanks

Best Regards

 

On Sun, May 17, 2015 at 7:21 PM, dgoldenberg dgoldenberg...@gmail.com wrote:

I keep hearing the argument that the way Discretized Streams work with Spark
Streaming is a lot more of a batch processing algorithm than true streaming.
For streaming, one would expect a new item, e.g. in a Kafka topic, to be
available to the streaming consumer immediately.

With the discretized streams, streaming is done with batch intervals i.e.
the consumer has to wait the interval to be able to get at the new items. If
one wants to reduce latency it seems the only way to do this would be by
reducing the batch interval window. However, that may lead to a great deal
of churn, with many requests going into Kafka out of the consumers,
potentially with no results whatsoever as there's nothing new in the topic
at the moment.

Is there a counter-argument to this reasoning? What are some of the general
approaches to reduce latency  folks might recommend? Or, perhaps there are
ways of dealing

RE: Spark Streaming and reducing latency

2015-05-17 Thread Evo Eftimov
This is the nature of Spark Streaming as a System Architecture:

 

1.   It is a batch processing system architecture (Spark Batch) optimized 
for Streaming Data

2.   In terms of sources of Latency in such System Architecture, bear in 
mind that besides “batching”, there is also the Central “Driver” 
function/module, which is essentially a Central Job/Task Manager (ie running on 
a dedicated node, which doesn’t sit on the Path of the Messages), which even in 
a Streaming Data scenario, FOR EACH Streaming BATCH schedules tasks (as per the 
DAG for the streaming job), sends them to the workers, receives the results, 
then schedules and sends more tasks (as per the DAG for the job) and so on and 
so forth

 

In terms of Parallel Programming Patterns/Architecture, the above is known as 
Data Parallel Architecture with Central Job/Task Manager.

 

There are other alternatives for achieving lower latency and in terms of 
Parallel Programming Patterns they are known as Pipelines or Task Parallel 
Architecture – essentially every messages streams individually through an 
assembly line of Tasks. As the tasks can be run on multiple cores of one box or 
in a distributed environment. Storm for example implements this pattern or you 
can just put together your own solution 

 

From: Akhil Das [mailto:ak...@sigmoidanalytics.com] 
Sent: Sunday, May 17, 2015 4:04 PM
To: dgoldenberg
Cc: user@spark.apache.org
Subject: Re: Spark Streaming and reducing latency

 

With receiver based streaming, you can actually specify 
spark.streaming.blockInterval which is the interval at which the receiver will 
fetch data from the source. Default value is 200ms and hence if your batch 
duration is 1 second, it will produce 5 blocks of data. And yes, with 
sparkstreaming when your processing time goes beyond your batch duration and 
you are having a higher data consumption then you will overwhelm the receiver's 
memory and hence will throw up block not found exceptions. 




Thanks

Best Regards

 

On Sun, May 17, 2015 at 7:21 PM, dgoldenberg dgoldenberg...@gmail.com wrote:

I keep hearing the argument that the way Discretized Streams work with Spark
Streaming is a lot more of a batch processing algorithm than true streaming.
For streaming, one would expect a new item, e.g. in a Kafka topic, to be
available to the streaming consumer immediately.

With the discretized streams, streaming is done with batch intervals i.e.
the consumer has to wait the interval to be able to get at the new items. If
one wants to reduce latency it seems the only way to do this would be by
reducing the batch interval window. However, that may lead to a great deal
of churn, with many requests going into Kafka out of the consumers,
potentially with no results whatsoever as there's nothing new in the topic
at the moment.

Is there a counter-argument to this reasoning? What are some of the general
approaches to reduce latency  folks might recommend? Or, perhaps there are
ways of dealing with this at the streaming API level?

If latency is of great concern, is it better to look into streaming from
something like Flume where data is pushed to consumers rather than pulled by
them? Are there techniques, in that case, to ensure the consumers don't get
overwhelmed with new data?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-and-reducing-latency-tp22922.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

 



RE: [SparkStreaming] Is it possible to delay the start of some DStream in the application?

2015-05-17 Thread Evo Eftimov
You can make ANY standard receiver sleep by implementing a custom Message 
Deserializer class with sleep method inside it. 

 

From: Akhil Das [mailto:ak...@sigmoidanalytics.com] 
Sent: Sunday, May 17, 2015 4:29 PM
To: Haopu Wang
Cc: user
Subject: Re: [SparkStreaming] Is it possible to delay the start of some DStream 
in the application?

 

Why not just trigger your batch job with that event?

 

If you really need streaming, then you can create a custom receiver and make 
the receiver sleep till the event has happened. That will obviously run your 
streaming pipelines without having any data to process.




Thanks

Best Regards

 

On Fri, May 15, 2015 at 4:39 AM, Haopu Wang hw...@qilinsoft.com wrote:

In my application, I want to start a DStream computation only after an
special event has happened (for example, I want to start the receiver
only after the reference data has been properly initialized).

My question is: it looks like the DStream will be started right after
the StreaminContext has been started. Is it possible to delay the start
of specific DStream?

Thank you very much!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

 



RE: Spark Fair Scheduler for Spark Streaming - 1.2 and beyond

2015-05-15 Thread Evo Eftimov
No pools for the moment – for each of the apps using the straightforward way 
with the spark conf param for scheduling = FAIR 

 

Spark is running in a Standalone Mode 

 

Are you saying that Configuring Pools is mandatory to get the FAIR scheduling 
working – from the docs it seemed optional to me 

 

From: Tathagata Das [mailto:t...@databricks.com] 
Sent: Friday, May 15, 2015 6:45 PM
To: Evo Eftimov
Cc: user
Subject: Re: Spark Fair Scheduler for Spark Streaming - 1.2 and beyond

 

How are you configuring the fair scheduler pools?

 

On Fri, May 15, 2015 at 8:33 AM, Evo Eftimov evo.efti...@isecc.com wrote:

I have run / submitted a few Spark Streaming apps configured with Fair
scheduling on Spark Streaming 1.2.0, however they still run in a FIFO mode.
Is FAIR scheduling supported at all for Spark Streaming apps and from what
release / version - e.g. 1.3.1




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Fair-Scheduler-for-Spark-Streaming-1-2-and-beyond-tp22902.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

 



RE: Spark Fair Scheduler for Spark Streaming - 1.2 and beyond

2015-05-15 Thread Evo Eftimov
Ok thanks a lot for clarifying that – btw was your application a Spark 
Streaming App – I am also looking for confirmation that FAIR scheduling is 
supported for Spark Streaming Apps 

 

From: Richard Marscher [mailto:rmarsc...@localytics.com] 
Sent: Friday, May 15, 2015 7:20 PM
To: Evo Eftimov
Cc: Tathagata Das; user
Subject: Re: Spark Fair Scheduler for Spark Streaming - 1.2 and beyond

 

The doc is a bit confusing IMO, but at least for my application I had to use a 
fair pool configuration to get my stages to be scheduled with FAIR.

 

On Fri, May 15, 2015 at 2:13 PM, Evo Eftimov evo.efti...@isecc.com wrote:

No pools for the moment – for each of the apps using the straightforward way 
with the spark conf param for scheduling = FAIR 

 

Spark is running in a Standalone Mode 

 

Are you saying that Configuring Pools is mandatory to get the FAIR scheduling 
working – from the docs it seemed optional to me 

 

From: Tathagata Das [mailto:t...@databricks.com] 
Sent: Friday, May 15, 2015 6:45 PM
To: Evo Eftimov
Cc: user
Subject: Re: Spark Fair Scheduler for Spark Streaming - 1.2 and beyond

 

How are you configuring the fair scheduler pools?

 

On Fri, May 15, 2015 at 8:33 AM, Evo Eftimov evo.efti...@isecc.com wrote:

I have run / submitted a few Spark Streaming apps configured with Fair
scheduling on Spark Streaming 1.2.0, however they still run in a FIFO mode.
Is FAIR scheduling supported at all for Spark Streaming apps and from what
release / version - e.g. 1.3.1




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Fair-Scheduler-for-Spark-Streaming-1-2-and-beyond-tp22902.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

 

 



RE: swap tuple

2015-05-14 Thread Evo Eftimov
Where is the “Tuple”  supposed to be in String, String - you can refer to a 
“Tuple” if it was e.g. String, Tuple2String, String

 

From: holden.ka...@gmail.com [mailto:holden.ka...@gmail.com] On Behalf Of 
Holden Karau
Sent: Thursday, May 14, 2015 5:56 PM
To: Yasemin Kaya
Cc: user@spark.apache.org
Subject: Re: swap tuple

 

Can you paste your code? transformations return a new RDD rather than modifying 
an existing one, so if you were to swap the values of the tuple using a map you 
would get back a new RDD and then you would want to try and print this new RDD 
instead of the original one.

On Thursday, May 14, 2015, Yasemin Kaya godo...@gmail.com wrote:

Hi,

 

I have JavaPairRDDString, String and I want to swap tuple._1() to tuple._2(). 
I use tuple.swap() but it can't be changed JavaPairRDD in real. When I print 
JavaPairRDD, the values are same.

 

Anyone can help me for that?

 

Thank you.

Have nice day.

 

yasemin


 

-- 

hiç ender hiç



-- 

Cell : 425-233-8271

Twitter: https://twitter.com/holdenkarau

Linked In: https://www.linkedin.com/in/holdenkarau

 



RE: SPARKTA: a real-time aggregation engine based on Spark Streaming

2015-05-14 Thread Evo Eftimov
I do not intend to provide comments on the actual “product” since my time is 
engaged elsewhere 

 

My comments were on the “process” for commenting which looked as 
self-indulgent, self patting on the back communication (between members of the 
party and its party leader) – that bs used to be inherent to the “commercial” 
vendors, but I can confirm as fact it is also in effect to the “open source 
movement” (because human nature remains the same)

 

From: David Morales [mailto:dmora...@stratio.com] 
Sent: Thursday, May 14, 2015 4:30 PM
To: Paolo Platter
Cc: Evo Eftimov; Matei Zaharia; user@spark.apache.org
Subject: Re: SPARKTA: a real-time aggregation engine based on Spark Streaming

 

Thank you Paolo. Don't hesitate to contact us.

 

Evo, we will be glad to hear from you and we are happy to see some kind of fast 
feedback from the main thought leader of spark, for sure.

 

 

 

2015-05-14 17:24 GMT+02:00 Paolo Platter paolo.plat...@agilelab.it:

Nice Job!

 

we are developing something very similar… I will contact you to understand if 
we can contribute to you with some piece !

 

Best

 

Paolo 

 

Da: Evo Eftimov mailto:evo.efti...@isecc.com 
Data invio: ‎giovedì‎ ‎14‎ ‎maggio‎ ‎2015 ‎17‎:‎21
A: 'David Morales' mailto:dmora...@stratio.com , Matei Zaharia 
mailto:matei.zaha...@gmail.com 
Cc: user@spark.apache.org

 

That has been a really rapid “evaluation” of the “work” and its “direction” 

 

From: David Morales [mailto:dmora...@stratio.com] 
Sent: Thursday, May 14, 2015 4:12 PM
To: Matei Zaharia
Cc: user@spark.apache.org
Subject: Re: SPARKTA: a real-time aggregation engine based on Spark Streaming

 

Thanks for your kind words Matei, happy to see that our work is in the right 
way.

 

 

 

 

2015-05-14 17:10 GMT+02:00 Matei Zaharia matei.zaha...@gmail.com:

(Sorry, for non-English people: that means it's a good thing.)

Matei


 On May 14, 2015, at 10:53 AM, Matei Zaharia matei.zaha...@gmail.com wrote:

 ...This is madness!

 On May 14, 2015, at 9:31 AM, dmoralesdf dmora...@stratio.com wrote:

 Hi there,

 We have released our real-time aggregation engine based on Spark Streaming.

 SPARKTA is fully open source (Apache2)


 You can checkout the slides showed up at the Strata past week:

 http://www.slideshare.net/Stratio/strata-sparkta

 Source code:

 https://github.com/Stratio/sparkta

 And documentation

 http://docs.stratio.com/modules/sparkta/development/


 We are open to your ideas and contributors are welcomed.


 Regards.



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/SPARKTA-a-real-time-aggregation-engine-based-on-Spark-Streaming-tp22883.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







 

-- 

David Morales de Frías  ::  +34 607 010 411 ::  
https://twitter.com/dmoralesdf @dmoralesdf

 

 http://www.stratio.com/ 
Vía de las dos Castillas, 33, Ática 4, 3ª Planta

28224 Pozuelo de Alarcón, Madrid

Tel: +34 91 828 6473 // www.stratio.com //  https://twitter.com/StratioBD 
@stratiobd





 

-- 

David Morales de Frías  ::  +34 607 010 411 ::  
https://twitter.com/dmoralesdf @dmoralesdf

 

 http://www.stratio.com/ 
Vía de las dos Castillas, 33, Ática 4, 3ª Planta

28224 Pozuelo de Alarcón, Madrid

Tel: +34 91 828 6473 // www.stratio.com //  https://twitter.com/StratioBD 
@stratiobd



RE: SPARKTA: a real-time aggregation engine based on Spark Streaming

2015-05-14 Thread Evo Eftimov
That has been a really rapid “evaluation” of the “work” and its “direction” 

 

From: David Morales [mailto:dmora...@stratio.com] 
Sent: Thursday, May 14, 2015 4:12 PM
To: Matei Zaharia
Cc: user@spark.apache.org
Subject: Re: SPARKTA: a real-time aggregation engine based on Spark Streaming

 

Thanks for your kind words Matei, happy to see that our work is in the right 
way.

 

 

 

 

2015-05-14 17:10 GMT+02:00 Matei Zaharia matei.zaha...@gmail.com:

(Sorry, for non-English people: that means it's a good thing.)

Matei


 On May 14, 2015, at 10:53 AM, Matei Zaharia matei.zaha...@gmail.com wrote:

 ...This is madness!

 On May 14, 2015, at 9:31 AM, dmoralesdf dmora...@stratio.com wrote:

 Hi there,

 We have released our real-time aggregation engine based on Spark Streaming.

 SPARKTA is fully open source (Apache2)


 You can checkout the slides showed up at the Strata past week:

 http://www.slideshare.net/Stratio/strata-sparkta

 Source code:

 https://github.com/Stratio/sparkta

 And documentation

 http://docs.stratio.com/modules/sparkta/development/


 We are open to your ideas and contributors are welcomed.


 Regards.



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/SPARKTA-a-real-time-aggregation-engine-based-on-Spark-Streaming-tp22883.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







 

-- 

David Morales de Frías  ::  +34 607 010 411 ::  
https://twitter.com/dmoralesdf @dmoralesdf

 

 http://www.stratio.com/ 
Vía de las dos Castillas, 33, Ática 4, 3ª Planta

28224 Pozuelo de Alarcón, Madrid

Tel: +34 91 828 6473 // www.stratio.com //  https://twitter.com/StratioBD 
@stratiobd



RE: DStream Union vs. StreamingContext Union

2015-05-12 Thread Evo Eftimov
I can confirm it does work in Java 

 

From: Vadim Bichutskiy [mailto:vadim.bichuts...@gmail.com] 
Sent: Tuesday, May 12, 2015 5:53 PM
To: Evo Eftimov
Cc: Saisai Shao; user@spark.apache.org
Subject: Re: DStream Union vs. StreamingContext Union

 

Thanks Evo. I tried chaining Dstream unions like what you have and it didn't 
work for me. But passing

multiple arguments to StreamingContext.union worked fine. Any idea why? I am 
using Python, BTW.

  
https://mailfoogae.appspot.com/t?sender=admFkaW0uYmljaHV0c2tpeUBnbWFpbC5jb20%3Dtype=zerocontentguid=b343f6c5-5a2e-45fc-8317-54caf52e49ed
 ᐧ

  
http://t.signauxcinq.com/e1t/o/5/f18dQhb0S7ks8dDMPbW2n0x6l2B9gXrN7sKj6v5dsrxW7gbZX-8q-6ZdVdnPvF2zlZNzW3hF9wD1k1H6H0?si=5533377798602752pi=6d288bce-f90c-47b8-b786-1cc26adf5b93
 

 

On Tue, May 12, 2015 at 12:45 PM, Evo Eftimov evo.efti...@isecc.com wrote:

You can also union multiple DstreamRDDs in this way 
DstreamRDD1.union(DstreamRDD2).union(DstreamRDD3)  etc etc

 

Ps: the API is not “redundant” it offers several ways for achivieving the same 
thing as a convenience depending on the situation 

 

From: Vadim Bichutskiy [mailto:vadim.bichuts...@gmail.com] 
Sent: Tuesday, May 12, 2015 5:37 PM
To: Saisai Shao
Cc: user@spark.apache.org
Subject: Re: DStream Union vs. StreamingContext Union

 

Thanks Saisai. That makes sense. Just seems redundant to have both.

  
https://mailfoogae.appspot.com/t?sender=admFkaW0uYmljaHV0c2tpeUBnbWFpbC5jb20%3Dtype=zerocontentguid=7c28f88f-f212-4811-a16e-e8b21035b172
 ᐧ

 

On Mon, May 11, 2015 at 10:36 PM, Saisai Shao sai.sai.s...@gmail.com wrote:

DStream.union can only union two DStream, one is itself. While 
StreamingContext.union can union an array of DStreams, internally DStream.union 
is a special case of StreamingContext.union:

 

def union(that: DStream[T]): DStream[T] = new UnionDStream[T](Array(this, that))

 

So there's no difference, if you want to union more than two DStreams, just use 
the one in StreamingContext, otherwise, both two APIs are fine.

 

 

2015-05-12 6:49 GMT+08:00 Vadim Bichutskiy vadim.bichuts...@gmail.com:

Can someone explain to me the difference between DStream union and 
StreamingContext union? 

When do you use one vs the other?

 

Thanks,

Vadim

  
https://mailfoogae.appspot.com/t?sender=admFkaW0uYmljaHV0c2tpeUBnbWFpbC5jb20%3Dtype=zerocontentguid=6cd729de-8339-40af-b2c5-b249011d6c3e
 ᐧ

 

 

 



RE: Spark streaming closes with Cassandra Conector

2015-05-10 Thread Evo Eftimov
And one other suggestion in relation to the connection pool line of enquiry - 
check whether your cassandra service is configured to allow only one session 
per e.g. User

I think the error is generated inside thr connection pool when it tries to 
initialize a connection after the first one


Sent from Samsung Mobile

div Original message /divdivFrom: Evo Eftimov 
evo.efti...@isecc.com /divdivDate:2015/05/10  12:02  (GMT+00:00) 
/divdivTo: 'Gerard Maas' gerard.m...@gmail.com /divdivCc: 'Sergio 
Jiménez Barrio' drarse.a...@gmail.com,'spark users' user@spark.apache.org 
/divdivSubject: RE: Spark streaming closes with Cassandra Conector 
/divdiv
/divHmm there is also a Connection Pool involved and such things (especially 
while still rough on the edges) may behave erratically in a distributed 
multithreaded environment
 
Can you try forEachPartition and  foreach  together – this will create a 
slightly different multithreading execution and distribution profile which may 
skip a potential error in the Connection Pool code   
 
From: Gerard Maas [mailto:gerard.m...@gmail.com] 
Sent: Sunday, May 10, 2015 11:56 AM
To: Evo Eftimov
Cc: Sergio Jiménez Barrio; spark users
Subject: Re: Spark streaming closes with Cassandra Conector
 
I'm familiar with the TableWriter code and that log only appears if the write 
actually succeeded. (See 
https://github.com/datastax/spark-cassandra-connector/blob/master/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/TableWriter.scala)
 
Thinking infrastructure, we see that it's always trying to reach 'localhost'. 
Are you running 1 node test in local mode?  Otherwise, there's something wrong 
with the way you're configuring Cassandra or the connection to it  (always 
tempted to say her :-)  ).
 
-kr, Gerard.
 
On Sun, May 10, 2015 at 12:47 PM, Evo Eftimov evo.efti...@isecc.com wrote:
I think the message that it has written 2 rows is misleading
 
If you look further down you will see that it could not initialize a connection 
pool for Casandra (presumably while trying to write the previously mentioned 2 
rows)
 
Another confirmation of this hypothesis is the phrase “error during Transport 
Initialization” – so all these stuff points out in the direction of 
Infrastructure or Configuration issues – check you Casandra service and how you 
connect to it etc mate
 
From: Gerard Maas [mailto:gerard.m...@gmail.com] 
Sent: Sunday, May 10, 2015 11:33 AM
To: Sergio Jiménez Barrio; spark users
Subject: Re: Spark streaming closes with Cassandra Conector
 
It successfully writes some data and fails afterwards, like the host or 
connection goes down. Weird.
 
Maybe you should post this question on the Spark-Cassandra connector group:
https://groups.google.com/a/lists.datastax.com/forum/#!forum/spark-connector-user
 
 
-kr, Gerard.
 
 
On Sun, May 10, 2015 at 12:23 PM, Sergio Jiménez Barrio drarse.a...@gmail.com 
wrote:
This is:

15/05/10 12:20:08 INFO TableWriter: Wrote 2 rows to ataques.attacks in 0,016 s.
15/05/10 12:20:08 INFO LocalNodeFirstLoadBalancingPolicy: Suspected host 
127.0.0.1 (datacenter1)
15/05/10 12:20:08 ERROR Session: Error creating pool to /127.0.0.1:9042
com.datastax.driver.core.ConnectionException: [/127.0.0.1:9042] Unexpected 
error during transport initialization 
(com.datastax.driver.core.TransportException: [/127.0.0.1:9042] Error writing: 
Closed channel)
    at 
com.datastax.driver.core.Connection.initializeTransport(Connection.java:186)
    at com.datastax.driver.core.Connection.init(Connection.java:116)
    at 
com.datastax.driver.core.PooledConnection.init(PooledConnection.java:32)
    at com.datastax.driver.core.Connection$Factory.open(Connection.java:586)
    at 
com.datastax.driver.core.DynamicConnectionPool.init(DynamicConnectionPool.java:74)
    at 
com.datastax.driver.core.HostConnectionPool.newInstance(HostConnectionPool.java:33)
    at com.datastax.driver.core.SessionManager$2.call(SessionManager.java:231)
    at com.datastax.driver.core.SessionManager$2.call(SessionManager.java:224)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at 
com.google.common.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293)
    at 
com.google.common.util.concurrent.AbstractListeningExecutorService.submit(AbstractListeningExecutorService.java:61)
    at 
com.datastax.driver.core.SessionManager.forceRenewPool(SessionManager.java:224)
    at com.datastax.driver.core.Cluster$Manager.onUp(Cluster.java:1469)
    at com.datastax.driver.core.Cluster$Manager.access$1100(Cluster.java:1144)
    at com.datastax.driver.core.Cluster$Manager$4.runMayThrow(Cluster.java:1562)
    at 
com.datastax.driver.core.ExceptionCatchingRunnable.run(ExceptionCatchingRunnable.java:32)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145

RE: Spark streaming closes with Cassandra Conector

2015-05-10 Thread Evo Eftimov
And in case you are running in local mode try giving  more cores to spark with 
e.g. [5] – low number could be interfering with the tuning params which you can 
try to play with as well – all this is in the context of how those params 
interact with the Connection Pool and what that pool is doing in terms of 
Multithreading

 

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/5_saving.md
  

 

Tuning

The following properties set in SparkConf can be used to fine-tune the saving 
process, These values have been set to achieve stability and not performance. 
Changing these values may increase your performance based on your workload:

*   spark.cassandra.output.batch.size.rows: number of rows per single 
batch; default is 'auto' which means the connector will adjust the number of 
rows based on the amount of data in each row
*   spark.cassandra.output.batch.size.bytes: maximum total size of the 
batch in bytes; defaults to 1 kB.
*   spark.cassandra.output.batch.grouping.key: determines how insert 
statements are grouped into batches; available values are: 

*   none: a batch may contain any statements
*   replica_set: a batch may contain only statements to be written to the 
same replica set
*   partition (default): a batch may contain only statements for rows 
sharing the same partition key value

*   spark.cassandra.output.batch.buffer.size: how many batches per single 
Spark task can be stored in memory before sending to Cassandra; default 1000
*   spark.cassandra.output.concurrent.writes: maximum number of batches 
executed in parallel by a single Spark task; defaults to 5
*   spark.cassandra.output.consistency.level: consistency level for 
writing; defaults to LOCAL_ONE.
*   spark.cassandra.output.throughput_mb_per_sec: maximum write throughput 
allowed per single core in MB/s limit this on long (+8 hour) runs to 70% of 
your max throughput as seen on a smaller job for stability

 

 

From: Sergio Jiménez Barrio [mailto:drarse.a...@gmail.com] 
Sent: Sunday, May 10, 2015 12:59 PM
To: Evo Eftimov
Subject: Re: Spark streaming closes with Cassandra Conector

 

How Can I see this? Thanks Evo

 

2015-05-10 13:36 GMT+02:00 Evo Eftimov evo.efti...@isecc.com:

And one other suggestion in relation to the connection pool line of enquiry - 
check whether your cassandra service is configured to allow only one session 
per e.g. User

 

I think the error is generated inside thr connection pool when it tries to 
initialize a connection after the first one

 

 

Sent from Samsung Mobile

 

 Original message 

From: Evo Eftimov 

Date:2015/05/10 12:02 (GMT+00:00) 

To: 'Gerard Maas' 

Cc: 'Sergio Jiménez Barrio' ,'spark users' 

Subject: RE: Spark streaming closes with Cassandra Conector 

 

Hmm there is also a Connection Pool involved and such things (especially while 
still rough on the edges) may behave erratically in a distributed multithreaded 
environment 

 

Can you try forEachPartition and  foreach  together – this will create a 
slightly different multithreading execution and distribution profile which may 
skip a potential error in the Connection Pool code   

 

From: Gerard Maas [mailto:gerard.m...@gmail.com] 
Sent: Sunday, May 10, 2015 11:56 AM
To: Evo Eftimov
Cc: Sergio Jiménez Barrio; spark users
Subject: Re: Spark streaming closes with Cassandra Conector

 

I'm familiar with the TableWriter code and that log only appears if the write 
actually succeeded. (See 
https://github.com/datastax/spark-cassandra-connector/blob/master/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/TableWriter.scala)

 

Thinking infrastructure, we see that it's always trying to reach 'localhost'. 
Are you running 1 node test in local mode?  Otherwise, there's something wrong 
with the way you're configuring Cassandra or the connection to it  (always 
tempted to say her :-)  ).

 

-kr, Gerard.

 

On Sun, May 10, 2015 at 12:47 PM, Evo Eftimov evo.efti...@isecc.com wrote:

I think the message that it has written 2 rows is misleading 

 

If you look further down you will see that it could not initialize a connection 
pool for Casandra (presumably while trying to write the previously mentioned 2 
rows)

 

Another confirmation of this hypothesis is the phrase “error during Transport 
Initialization” – so all these stuff points out in the direction of 
Infrastructure or Configuration issues – check you Casandra service and how you 
connect to it etc mate 

 

From: Gerard Maas [mailto:gerard.m...@gmail.com] 
Sent: Sunday, May 10, 2015 11:33 AM
To: Sergio Jiménez Barrio; spark users
Subject: Re: Spark streaming closes with Cassandra Conector

 

It successfully writes some data and fails afterwards, like the host or 
connection goes down. Weird.

 

Maybe you should post this question on the Spark-Cassandra connector group:

https://groups.google.com/a/lists.datastax.com/forum/#!forum/spark-connector-user

 

 

-kr

RE: Spark streaming closes with Cassandra Conector

2015-05-10 Thread Evo Eftimov
I think the message that it has written 2 rows is misleading 

 

If you look further down you will see that it could not initialize a connection 
pool for Casandra (presumably while trying to write the previously mentioned 2 
rows)

 

Another confirmation of this hypothesis is the phrase “error during Transport 
Initialization” – so all these stuff points out in the direction of 
Infrastructure or Configuration issues – check you Casandra service and how you 
connect to it etc mate 

 

From: Gerard Maas [mailto:gerard.m...@gmail.com] 
Sent: Sunday, May 10, 2015 11:33 AM
To: Sergio Jiménez Barrio; spark users
Subject: Re: Spark streaming closes with Cassandra Conector

 

It successfully writes some data and fails afterwards, like the host or 
connection goes down. Weird.

 

Maybe you should post this question on the Spark-Cassandra connector group:

https://groups.google.com/a/lists.datastax.com/forum/#!forum/spark-connector-user

 

 

-kr, Gerard.

 

 

On Sun, May 10, 2015 at 12:23 PM, Sergio Jiménez Barrio drarse.a...@gmail.com 
wrote:

This is:


15/05/10 12:20:08 INFO TableWriter: Wrote 2 rows to ataques.attacks in 0,016 s.
15/05/10 12:20:08 INFO LocalNodeFirstLoadBalancingPolicy: Suspected host 
127.0.0.1 (datacenter1)
15/05/10 12:20:08 ERROR Session: Error creating pool to /127.0.0.1:9042
com.datastax.driver.core.ConnectionException: [/127.0.0.1:9042] Unexpected 
error during transport initialization 
(com.datastax.driver.core.TransportException: [/127.0.0.1:9042] Error writing: 
Closed channel)
at 
com.datastax.driver.core.Connection.initializeTransport(Connection.java:186)
at com.datastax.driver.core.Connection.init(Connection.java:116)
at 
com.datastax.driver.core.PooledConnection.init(PooledConnection.java:32)
at com.datastax.driver.core.Connection$Factory.open(Connection.java:586)
at 
com.datastax.driver.core.DynamicConnectionPool.init(DynamicConnectionPool.java:74)
at 
com.datastax.driver.core.HostConnectionPool.newInstance(HostConnectionPool.java:33)
at com.datastax.driver.core.SessionManager$2.call(SessionManager.java:231)
at com.datastax.driver.core.SessionManager$2.call(SessionManager.java:224)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at 
com.google.common.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293)
at 
com.google.common.util.concurrent.AbstractListeningExecutorService.submit(AbstractListeningExecutorService.java:61)
at 
com.datastax.driver.core.SessionManager.forceRenewPool(SessionManager.java:224)
at com.datastax.driver.core.Cluster$Manager.onUp(Cluster.java:1469)
at com.datastax.driver.core.Cluster$Manager.access$1100(Cluster.java:1144)
at com.datastax.driver.core.Cluster$Manager$4.runMayThrow(Cluster.java:1562)
at 
com.datastax.driver.core.ExceptionCatchingRunnable.run(ExceptionCatchingRunnable.java:32)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.datastax.driver.core.TransportException: [/127.0.0.1:9042] Error 
writing: Closed channel
at 
com.datastax.driver.core.Connection$1.operationComplete(Connection.java:432)
at 
org.jboss.netty.channel.DefaultChannelFuture.notifyListener(DefaultChannelFuture.java:427)
at 
org.jboss.netty.channel.DefaultChannelFuture.notifyListeners(DefaultChannelFuture.java:413)
at 
org.jboss.netty.channel.DefaultChannelFuture.setFailure(DefaultChannelFuture.java:380)
at 
org.jboss.netty.channel.socket.nio.AbstractNioWorker.write0(AbstractNioWorker.java:248)
at 
org.jboss.netty.channel.socket.nio.AbstractNioWorker.writeFromTaskLoop(AbstractNioWorker.java:151)
at 
org.jboss.netty.channel.socket.nio.AbstractNioChannel$WriteTask.run(AbstractNioChannel.java:335)
at 
org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:372)
at 
org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:296)
at 
org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at 
org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
at 
org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
... 3 more
15/05/10 12:20:08 ERROR ControlConnection: [Control connection] Cannot connect 
to any host, scheduling retry in 1000 milliseconds

Thanks!

 

2015-05-10 0:58 GMT+02:00 Gerard Maas gerard.m...@gmail.com:

Hola Sergio,

 

It would help if you added the error message + stack trace.

 

-kr, Gerard.

 

On Sat, May 9, 2015 at 11:32 PM, Sergio Jiménez Barrio 

RE: Spark streaming closes with Cassandra Conector

2015-05-10 Thread Evo Eftimov
Hmm there is also a Connection Pool involved and such things (especially while 
still rough on the edges) may behave erratically in a distributed multithreaded 
environment 

 

Can you try forEachPartition and  foreach  together – this will create a 
slightly different multithreading execution and distribution profile which may 
skip a potential error in the Connection Pool code   

 

From: Gerard Maas [mailto:gerard.m...@gmail.com] 
Sent: Sunday, May 10, 2015 11:56 AM
To: Evo Eftimov
Cc: Sergio Jiménez Barrio; spark users
Subject: Re: Spark streaming closes with Cassandra Conector

 

I'm familiar with the TableWriter code and that log only appears if the write 
actually succeeded. (See 
https://github.com/datastax/spark-cassandra-connector/blob/master/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/TableWriter.scala)

 

Thinking infrastructure, we see that it's always trying to reach 'localhost'. 
Are you running 1 node test in local mode?  Otherwise, there's something wrong 
with the way you're configuring Cassandra or the connection to it  (always 
tempted to say her :-)  ).

 

-kr, Gerard.

 

On Sun, May 10, 2015 at 12:47 PM, Evo Eftimov evo.efti...@isecc.com wrote:

I think the message that it has written 2 rows is misleading 

 

If you look further down you will see that it could not initialize a connection 
pool for Casandra (presumably while trying to write the previously mentioned 2 
rows)

 

Another confirmation of this hypothesis is the phrase “error during Transport 
Initialization” – so all these stuff points out in the direction of 
Infrastructure or Configuration issues – check you Casandra service and how you 
connect to it etc mate 

 

From: Gerard Maas [mailto:gerard.m...@gmail.com] 
Sent: Sunday, May 10, 2015 11:33 AM
To: Sergio Jiménez Barrio; spark users
Subject: Re: Spark streaming closes with Cassandra Conector

 

It successfully writes some data and fails afterwards, like the host or 
connection goes down. Weird.

 

Maybe you should post this question on the Spark-Cassandra connector group:

https://groups.google.com/a/lists.datastax.com/forum/#!forum/spark-connector-user

 

 

-kr, Gerard.

 

 

On Sun, May 10, 2015 at 12:23 PM, Sergio Jiménez Barrio drarse.a...@gmail.com 
wrote:

This is:


15/05/10 12:20:08 INFO TableWriter: Wrote 2 rows to ataques.attacks in 0,016 s.
15/05/10 12:20:08 INFO LocalNodeFirstLoadBalancingPolicy: Suspected host 
127.0.0.1 (datacenter1)
15/05/10 12:20:08 ERROR Session: Error creating pool to /127.0.0.1:9042
com.datastax.driver.core.ConnectionException: [/127.0.0.1:9042] Unexpected 
error during transport initialization 
(com.datastax.driver.core.TransportException: [/127.0.0.1:9042] Error writing: 
Closed channel)
at 
com.datastax.driver.core.Connection.initializeTransport(Connection.java:186)
at com.datastax.driver.core.Connection.init(Connection.java:116)
at 
com.datastax.driver.core.PooledConnection.init(PooledConnection.java:32)
at com.datastax.driver.core.Connection$Factory.open(Connection.java:586)
at 
com.datastax.driver.core.DynamicConnectionPool.init(DynamicConnectionPool.java:74)
at 
com.datastax.driver.core.HostConnectionPool.newInstance(HostConnectionPool.java:33)
at com.datastax.driver.core.SessionManager$2.call(SessionManager.java:231)
at com.datastax.driver.core.SessionManager$2.call(SessionManager.java:224)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at 
com.google.common.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293)
at 
com.google.common.util.concurrent.AbstractListeningExecutorService.submit(AbstractListeningExecutorService.java:61)
at 
com.datastax.driver.core.SessionManager.forceRenewPool(SessionManager.java:224)
at com.datastax.driver.core.Cluster$Manager.onUp(Cluster.java:1469)
at com.datastax.driver.core.Cluster$Manager.access$1100(Cluster.java:1144)
at com.datastax.driver.core.Cluster$Manager$4.runMayThrow(Cluster.java:1562)
at 
com.datastax.driver.core.ExceptionCatchingRunnable.run(ExceptionCatchingRunnable.java:32)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.datastax.driver.core.TransportException: [/127.0.0.1:9042] Error 
writing: Closed channel
at 
com.datastax.driver.core.Connection$1.operationComplete(Connection.java:432)
at 
org.jboss.netty.channel.DefaultChannelFuture.notifyListener(DefaultChannelFuture.java:427)
at 
org.jboss.netty.channel.DefaultChannelFuture.notifyListeners(DefaultChannelFuture.java:413)
at 
org.jboss.netty.channel.DefaultChannelFuture.setFailure(DefaultChannelFuture.java:380

RE: Map one RDD into two RDD

2015-05-07 Thread Evo Eftimov
Scala is a language, Spark is an OO/Functional, Distributed Framework 
facilitating Parallel Programming in a distributed environment 

 

Any “Scala parallelism” occurs within the Parallel Model imposed by the Spark 
OO Framework – ie it is limited in terms of what it can achieve in terms of 
influencing the Spark Framework behavior – that is the nature of programming 
with/for frameworks 

 

When RDD1 and RDD2 are partitioned and different Actions applied to them this 
will result in Parallel Pipelines / DAGs within the Spark Framework

RDD1 = RDD.filter()

RDD2 = RDD.filter()

 

 

From: Bill Q [mailto:bill.q@gmail.com] 
Sent: Thursday, May 7, 2015 4:55 PM
To: Evo Eftimov
Cc: user@spark.apache.org
Subject: Re: Map one RDD into two RDD

 

Thanks for the replies. We decided to use concurrency in Scala to do the two 
mappings using the same source RDD in parallel. So far, it seems to be working. 
Any comments?

On Wednesday, May 6, 2015, Evo Eftimov evo.efti...@isecc.com wrote:

RDD1 = RDD.filter()

RDD2 = RDD.filter()

 

From: Bill Q [mailto:bill.q@gmail.com 
javascript:_e(%7B%7D,'cvml','bill.q@gmail.com'); ] 
Sent: Tuesday, May 5, 2015 10:42 PM
To: user@spark.apache.org 
javascript:_e(%7B%7D,'cvml','user@spark.apache.org'); 
Subject: Map one RDD into two RDD

 

Hi all,

I have a large RDD that I map a function to it. Based on the nature of each 
record in the input RDD, I will generate two types of data. I would like to 
save each type into its own RDD. But I can't seem to find an efficient way to 
do it. Any suggestions?

 

Many thanks.

 

 

Bill



-- 

Many thanks.

Bill

 



-- 

Many thanks.



Bill

 



RE: Creating topology in spark streaming

2015-05-06 Thread Evo Eftimov
What is called Bolt in Storm is essentially a combination of 
[Transformation/Action and DStream RDD] in Spark – so to achieve a higher 
parallelism for specific Transformation/Action on specific Dstream RDD simply 
repartition it to the required number of partitions which directly relates to 
the corresponding number of Threads   

 

From: anshu shukla [mailto:anshushuk...@gmail.com] 
Sent: Wednesday, May 6, 2015 9:33 AM
To: ayan guha
Cc: user@spark.apache.org; d...@spark.apache.org
Subject: Re: Creating topology in spark streaming

 

But main problem is how to increase the level of parallelism  for any 
particular bolt logic .

 

suppose i  want  this type of topology .

 

https://storm.apache.org/documentation/images/topology.png

 

How we can manage it .

 

On Wed, May 6, 2015 at 1:36 PM, ayan guha guha.a...@gmail.com wrote:

Every transformation on a dstream will create another dstream. You may want to 
take a look at foreachrdd? Also, kindly share your code so people can help 
better

On 6 May 2015 17:54, anshu shukla anshushuk...@gmail.com wrote:

Please help  guys, Even  After going through all the examples given i have not 
understood how to pass the  D-streams  from one bolt/logic to other (without 
writing it on HDFS etc.) just like emit function in storm .

Suppose i have topology with 3  bolts(say) 

 

BOLT1(parse the tweets nd emit tweet using given hashtags)=Bolt2(Complex 
logic for sentiment analysis over tweets)===BOLT3(submit tweets to the sql 
database using spark SQL)


 

 

Now  since Sentiment analysis will take most of the time ,we have to increase 
its level of parallelism for tuning latency. Howe to increase the levele of 
parallelism since the logic of topology is not clear .

 

-- 

Thanks  Regards,
Anshu Shukla

Indian Institute of Sciences





 

-- 

Thanks  Regards,
Anshu Shukla



RE: Map one RDD into two RDD

2015-05-06 Thread Evo Eftimov
RDD1 = RDD.filter()

RDD2 = RDD.filter()

 

From: Bill Q [mailto:bill.q@gmail.com] 
Sent: Tuesday, May 5, 2015 10:42 PM
To: user@spark.apache.org
Subject: Map one RDD into two RDD

 

Hi all,

I have a large RDD that I map a function to it. Based on the nature of each 
record in the input RDD, I will generate two types of data. I would like to 
save each type into its own RDD. But I can't seem to find an efficient way to 
do it. Any suggestions?

 

Many thanks.

 

 

Bill



-- 

Many thanks.



Bill

 



RE: Creating topology in spark streaming

2015-05-06 Thread Evo Eftimov
The “abstraction level” of Storm or shall we call it Architecture, is 
effectively Pipelines of Nodes/Agents – Pipelines is one of the standard 
Parallel Programming Patterns which you can use on multicore CPUs as well as 
Distributed Systems – the chaps  from Storm simply implemented it as a reusable 
framework for distributed systems and offered it for free. Effectively it you 
have a set of independent Agents chained in a pipeline as the output from the 
previous Agent feeds into the Input of the next Agent  

 

Spark Streaming (which is essentially Batch Spark but with some optimizations 
for Streaming) on the other hand is more like a Map Reduce framework where you 
always have to have a Central Job/Task Manager scheduling and submitting tasks 
to remote distributed nodes, collecting the results / statuses and then 
scheduling and sending some more tasks and so on 

 

“Map Reduce” is simply another Parallel Programming pattern known as Data 
Parallelism or Data Parallel Programming. Although you can also have Data 
Parallelism without a Central Scheduler 

 

From: Juan Rodríguez Hortalá [mailto:juan.rodriguez.hort...@gmail.com] 
Sent: Wednesday, May 6, 2015 11:20 AM
To: Evo Eftimov
Cc: anshu shukla; ayan guha; user@spark.apache.org
Subject: Re: Creating topology in spark streaming

 

Hi, 

 

I agree with Evo, Spark works at a different abstraction level than Storm, and 
there is not a direct translation from Storm topologies to Spark Streaming 
jobs. I think something remotely close is the notion of lineage of  DStreams or 
RDDs, which is similar to a logical plan of an engine like Apache Pig. Here  
https://github.com/JerryLead/SparkInternals/blob/master/pdf/2-JobLogicalPlan.pdf
 is a diagram of a spark logical plan by a third party. I would suggest you 
reading the book Learning Spark 
https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/foreword01.html
 for more on this. But in general I think that Storm has an abstraction level 
closer to MapReduce, and Spark has an abstraction level closer to Pig, so the 
correspondence between Storm and Spark notions cannot be perfect. 

 

Greetings, 

 

Juan 

 

 

 

 

2015-05-06 11:37 GMT+02:00 Evo Eftimov evo.efti...@isecc.com:

What is called Bolt in Storm is essentially a combination of 
[Transformation/Action and DStream RDD] in Spark – so to achieve a higher 
parallelism for specific Transformation/Action on specific Dstream RDD simply 
repartition it to the required number of partitions which directly relates to 
the corresponding number of Threads   

 

From: anshu shukla [mailto:anshushuk...@gmail.com] 
Sent: Wednesday, May 6, 2015 9:33 AM
To: ayan guha
Cc: user@spark.apache.org; d...@spark.apache.org
Subject: Re: Creating topology in spark streaming

 

But main problem is how to increase the level of parallelism  for any 
particular bolt logic .

 

suppose i  want  this type of topology .

 

https://storm.apache.org/documentation/images/topology.png

 

How we can manage it .

 

On Wed, May 6, 2015 at 1:36 PM, ayan guha guha.a...@gmail.com wrote:

Every transformation on a dstream will create another dstream. You may want to 
take a look at foreachrdd? Also, kindly share your code so people can help 
better

On 6 May 2015 17:54, anshu shukla anshushuk...@gmail.com wrote:

Please help  guys, Even  After going through all the examples given i have not 
understood how to pass the  D-streams  from one bolt/logic to other (without 
writing it on HDFS etc.) just like emit function in storm .

Suppose i have topology with 3  bolts(say) 

 

BOLT1(parse the tweets nd emit tweet using given hashtags)=Bolt2(Complex 
logic for sentiment analysis over tweets)===BOLT3(submit tweets to the sql 
database using spark SQL)


 

 

Now  since Sentiment analysis will take most of the time ,we have to increase 
its level of parallelism for tuning latency. Howe to increase the levele of 
parallelism since the logic of topology is not clear .

 

-- 

Thanks  Regards,
Anshu Shukla

Indian Institute of Sciences





 

-- 

Thanks  Regards,
Anshu Shukla

 



RE: Receiver Fault Tolerance

2015-05-06 Thread Evo Eftimov
This is about Kafka Receiver  IF you are using Spark Streaming 

 

Ps: that book is now behind the curve in a quite a few areas since the release 
of 1.3.1 – read the documentation and forums 

 

From: James King [mailto:jakwebin...@gmail.com] 
Sent: Wednesday, May 6, 2015 1:09 PM
To: user
Subject: Receiver Fault Tolerance

 

In the O'reilly book Learning Spark Chapter 10 section 24/7 Operation 

 

It talks about 'Receiver Fault Tolerance'

 

I'm unsure of what a Receiver is here, from reading it sounds like when you 
submit an application to the cluster in cluster mode i.e. --deploy-mode cluster 
the driver program will run on a Worker and this case this Worker is seen as a 
Receiver because it is consuming messages from the source.

 

 

Is the above understanding correct? or is there more to it?

 

 



spark filestrea problem

2015-05-02 Thread Evo Eftimov
it seems that on Spark Streaming 1.2 the filestream API may have a bug - it
doesn't detect new files when moving or renaming them on HDFS - only when
copying them but that leads to a well known problem with .tmp files which
get removed and make spark steraming filestream throw exception



spark filestream problem

2015-05-02 Thread Evo Eftimov
it seems that on Spark Streaming 1.2 the filestream API may have a bug - it
doesn't detect new files when moving or renaming them on HDFS - only when
copying them but that leads to a well known problem with .tmp files which
get removed and make spark steraming filestream throw exception



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-filestream-problem-tp22742.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark filestream problem

2015-05-02 Thread Evo Eftimov
it seems that on Spark Streaming 1.2 the filestream API may have a bug - it
doesn't detect new files when moving or renaming them on HDFS - only when
copying them but that leads to a well known problem with .tmp files which
get removed and make spark steraming filestream throw exception 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-filestream-problem-tp22743.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: spark filestream problem

2015-05-02 Thread Evo Eftimov
I have figured it out in the meantime - simply when moving file on HDFS it
preserves its time stamp and on the other hand the spark filestream adapter
seems to care as much about filenames as timestamps - hence NEW files with
OLD time stamps will NOT be processed - yuk 

The hack you can use is to a) copy the required file in a temp location and
then b) move it from there to the dir monitored by spark filestream - this
will ensure it is with recent timestamp

-Original Message-
From: Evo Eftimov [mailto:evo.efti...@isecc.com] 
Sent: Saturday, May 2, 2015 5:09 PM
To: user@spark.apache.org
Subject: spark filestream problem

it seems that on Spark Streaming 1.2 the filestream API may have a bug - it
doesn't detect new files when moving or renaming them on HDFS - only when
copying them but that leads to a well known problem with .tmp files which
get removed and make spark steraming filestream throw exception 



--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/spark-filestream-problem
-tp22743.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: spark filestream problem

2015-05-02 Thread Evo Eftimov
I have figured it out in the meantime - simply when moving file on HDFS it
preserves its time stamp and on the other hand the spark filestream adapter
seems to care as much about filenames as timestamps - hence NEW files with
OLD time stamps will NOT be processed - yuk 

The hack you can use is to a) copy the required file in a temp location and
then b) move it from there to the dir monitored by spark filestream - this
will ensure it is with recent timestamp

-Original Message-
From: Evo Eftimov [mailto:evo.efti...@isecc.com] 
Sent: Saturday, May 2, 2015 5:07 PM
To: user@spark.apache.org
Subject: spark filestream problem

it seems that on Spark Streaming 1.2 the filestream API may have a bug - it
doesn't detect new files when moving or renaming them on HDFS - only when
copying them but that leads to a well known problem with .tmp files which
get removed and make spark steraming filestream throw exception



--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/spark-filestream-problem
-tp22742.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Tasks run only on one machine

2015-04-24 Thread Evo Eftimov
# of tasks = # of partitions, hence you can provide the desired number of 
partitions to the textFile API which should result a) in a better spatial 
distribution of the RDD b) each partition will be operated upon by a separate 
task 

You can provide the number of p

-Original Message-
From: Pat Ferrel [mailto:p...@occamsmachete.com] 
Sent: Thursday, April 23, 2015 5:51 PM
To: user@spark.apache.org
Subject: Tasks run only on one machine

Using Spark streaming to create a large volume of small nano-batch input files, 
~4k per file, thousands of ‘part-x’ files.  When reading the nano-batch 
files and doing a distributed calculation my tasks run only on the machine 
where it was launched. I’m launching in “yarn-client” mode. The rdd is created 
using sc.textFile(“list of thousand files”)

What would cause the read to occur only on the machine that launched the 
driver. 

Do I need to do something to the RDD after reading? Has some partition factor 
been applied to all derived rdds?
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Slower performance when bigger memory?

2015-04-24 Thread Evo Eftimov
You can resort to Serialized storage (still in memory) of your RDDs - this
will obviate the need for GC since the RDD elements are stored as serialized
objects off the JVM heap (most likely in Tachion which is distributed in
memory files system used by Spark internally)

 

Also review the Object Oriented Model of your RDD to see whether it consists
of too many redundant objects and multiple levels of hierarchy - in high
performance computing and distributed cluster object oriented frameworks
like Spark some of the OO Patterns  represent unnecessary burden .. 

 

From: Shuai Zheng [mailto:szheng.c...@gmail.com] 
Sent: Thursday, April 23, 2015 6:14 PM
To: user@spark.apache.org
Subject: Slower performance when bigger memory?

 

Hi All,

 

I am running some benchmark on r3*8xlarge instance. I have a cluster with
one master (no executor on it) and one slave (r3*8xlarge).

 

My job has 1000 tasks in stage 0.

 

R3*8xlarge has 244G memory and 32 cores.

 

If I create 4 executors, each has 8 core+50G memory, each task will take
around 320s-380s. And if I only use one big executor with 32 cores and 200G
memory, each task will take 760s-900s.

 

And I check the log, looks like the minor GC takes much longer when using
200G memory:

 

285.242: [GC [PSYoungGen: 29027310K-8646087K(31119872K)]
38810417K-19703013K(135977472K), 11.2509770 secs] [Times: user=38.95
sys=120.65, real=11.25 secs] 

 

And when it uses 50G memory, the minor GC takes only less than 1s.

 

I try to see what is the best way to configure the Spark. For some special
reason, I tempt to use a bigger memory on single executor if no significant
penalty on performance. But now looks like it is?

 

Anyone has any idea?

 

Regards,

 

Shuai



RE: Custom paritioning of DSTream

2015-04-23 Thread Evo Eftimov
You can use transform which yields RDDs from the DStream as on each of the
RDDs you can then apply partitionBy - transform also returns another DSTream
while foreach doesn't 

 

Btw what do you mean re foreach killing the performance by not distributing
the workload  - every function (provided it is not Action) applied to an
RDD within foreach is distributed across the cluster since it gets applied
to an RDD 

 

From: davidkl [via Apache Spark User List]
[mailto:ml-node+s1001560n22630...@n3.nabble.com] 
Sent: Thursday, April 23, 2015 10:13 AM
To: Evo Eftimov
Subject: Re: Custom paritioning of DSTream

 

Hello Evo, Ranjitiyer, 

I am also looking for the same thing. Using foreach is not useful for me as
processing the RDD as a whole won't be distributed across workers and that
would kill performance in my application :-/ 

Let me know if you find a solution for this. 

Regards 

  _  

If you reply to this email, your message will be added to the discussion
below:

http://apache-spark-user-list.1001560.n3.nabble.com/Custom-paritioning-of-DS
Tream-tp22574p22630.html 

To unsubscribe from Custom paritioning of DSTream, click here
http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jt
p?macro=unsubscribe_by_codenode=22574code=ZXZvLmVmdGltb3ZAaXNlY2MuY29tfDIy
NTc0fDY0MDQ0NDg5Ng== .
 
http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jt
p?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.
namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.vi
ew.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemai
l.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aem
ail.naml NAML 





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Custom-paritioning-of-DSTream-tp22574p22631.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

RE: writing to hdfs on master node much faster

2015-04-20 Thread Evo Eftimov
Check whether your partitioning results in balanced partitions ie partitions 
with similar sizes - one of the reasons for the performance differences 
observed by you may be that after your explicit repartitioning, the partition 
on your master node is much smaller than the RDD partitions on the other 2 
nodes  

-Original Message-
From: Sean Owen [mailto:so...@cloudera.com] 
Sent: Monday, April 20, 2015 12:57 PM
To: jamborta
Cc: user@spark.apache.org
Subject: Re: writing to hdfs on master node much faster

What machines are HDFS data nodes -- just your master? that would explain it. 
Otherwise, is it actually the write that's slow or is something else you're 
doing much faster on the master for other reasons maybe? like you're actually 
shipping data via the master first in some local computation? so the master's 
executor has the result much faster?

On Mon, Apr 20, 2015 at 12:21 PM, jamborta jambo...@gmail.com wrote:
 Hi all,

 I have a three node cluster with identical hardware. I am trying a 
 workflow where it reads data from hdfs, repartitions it and runs a few 
 map operations then writes the results back to hdfs.

 It looks like that all the computation, including the repartitioning 
 and the maps complete within similar time intervals on all the nodes, 
 except when it writes it back to HDFS when the master node does the 
 job way much faster then the slaves (15s for each block as opposed to 1.2 min 
 for the slaves).

 Any suggestion what the reason might be?

 thanks,



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/writing-to-hdfs-on
 -master-node-much-faster-tp22570.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For 
 additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Equal number of RDD Blocks

2015-04-20 Thread Evo Eftimov
What is meant by “streams” here:

 

1.   Two different DSTream Receivers producing two different DSTreams 
consuming from two different kafka topics, each with different message rate 

2.   One kafka topic (hence only one message rate to consider) but with two 
different DStream receivers (ie running in parallel) giving a start of two 
different DSTreams 

 

From: Laeeq Ahmed [mailto:laeeqsp...@yahoo.com.INVALID] 
Sent: Monday, April 20, 2015 3:15 PM
To: user@spark.apache.org
Subject: Equal number of RDD Blocks

 

Hi,

 

I have two streams of data from kafka. How can I make approx. equal number of 
RDD blocks of on two executors.

Please see the attachement, one worker has 1785 RDD blocks and the other has 
26. 

 

Regards,

Laeeq

 

 



RE: Equal number of RDD Blocks

2015-04-20 Thread Evo Eftimov
And what is the message rate of each topic mate – that was the other part of 
the required clarifications 

 

From: Laeeq Ahmed [mailto:laeeqsp...@yahoo.com] 
Sent: Monday, April 20, 2015 3:38 PM
To: Evo Eftimov; user@spark.apache.org
Subject: Re: Equal number of RDD Blocks

 

Hi,

 

I have two different topics and two Kafka receivers, one for each topic.

 

Regards,

Laeeq

 

 

 

On Monday, April 20, 2015 4:28 PM, Evo Eftimov evo.efti...@isecc.com wrote:

 

What is meant by “streams” here:

 

1.   Two different DSTream Receivers producing two different DSTreams 
consuming from two different kafka topics, each with different message rate 

2.   One kafka topic (hence only one message rate to consider) but with two 
different DStream receivers (ie running in parallel) giving a start of two 
different DSTreams 

 

From: Laeeq Ahmed [mailto:laeeqsp...@yahoo.com.INVALID] 
Sent: Monday, April 20, 2015 3:15 PM
To: user@spark.apache.org
Subject: Equal number of RDD Blocks

 

Hi,

 

I have two streams of data from kafka. How can I make approx. equal number of 
RDD blocks of on two executors.

Please see the attachement, one worker has 1785 RDD blocks and the other has 
26. 

 

Regards,

Laeeq

 

 

 



RE: Equal number of RDD Blocks

2015-04-20 Thread Evo Eftimov
Well spark steraming is supposed to create / distribute the Receivers on 
different cluster nodes. If you are saying that actually your receivers are 
running on the same node probably that node is getting most of the data to 
minimize the network transfer costs 

 

If you want to distribute your data more evenly you can partition it explicitly 

 

Also contact Data Bricks why the Receivers are not being distributed on 
different cluster nodes 

 

From: Laeeq Ahmed [mailto:laeeqsp...@yahoo.com] 
Sent: Monday, April 20, 2015 3:57 PM
To: Evo Eftimov; user@spark.apache.org
Subject: Re: Equal number of RDD Blocks

 

I also see that its creating both receivers on the same executor and that might 
be the cause of having more RDDs on executor than the other. Can I suggest 
spark to create each receiver on a each executor 

 

Regards,

Laeeq

 

 

On Monday, April 20, 2015 4:51 PM, Evo Eftimov evo.efti...@isecc.com wrote:

 

And what is the message rate of each topic mate – that was the other part of 
the required clarifications 

 

From: Laeeq Ahmed [mailto:laeeqsp...@yahoo.com] 
Sent: Monday, April 20, 2015 3:38 PM
To: Evo Eftimov; user@spark.apache.org
Subject: Re: Equal number of RDD Blocks

 

Hi,

 

I have two different topics and two Kafka receivers, one for each topic.

 

Regards,

Laeeq

 

 

 

On Monday, April 20, 2015 4:28 PM, Evo Eftimov evo.efti...@isecc.com wrote:

 

What is meant by “streams” here:

 

1.   Two different DSTream Receivers producing two different DSTreams 
consuming from two different kafka topics, each with different message rate 

2.   One kafka topic (hence only one message rate to consider) but with two 
different DStream receivers (ie running in parallel) giving a start of two 
different DSTreams 

 

From: Laeeq Ahmed [mailto:laeeqsp...@yahoo.com.INVALID] 
Sent: Monday, April 20, 2015 3:15 PM
To: user@spark.apache.org
Subject: Equal number of RDD Blocks

 

Hi,

 

I have two streams of data from kafka. How can I make approx. equal number of 
RDD blocks of on two executors.

Please see the attachement, one worker has 1785 RDD blocks and the other has 
26. 

 

Regards,

Laeeq

 

 

 

 



RE: Super slow caching in 1.3?

2015-04-20 Thread Evo Eftimov
Now this is very important:

 

“Normal RDDs” refers to “batch RDDs”. However the default in-memory 
Serialization of RDDs which are part of DSTream is “Srialized” rather than 
actual (hydrated) Objects. The Spark documentation states that “Serialization” 
is required for space and garbage collection efficiency (but creates higher CPU 
load) – which makes sense consider the large number of RDDs which get discarded 
in a streaming app

 

So what does Data Bricks actually recommend as Object Oriented model for RDD 
elements used in Spark Streaming apps – flat or not and can you provide a 
detailed description / spec of both 

 

From: Michael Armbrust [mailto:mich...@databricks.com] 
Sent: Thursday, April 16, 2015 7:23 PM
To: Evo Eftimov
Cc: Christian Perez; user
Subject: Re: Super slow caching in 1.3?

 

Here are the types that we specialize, other types will be much slower.  This 
is only for Spark SQL, normal RDDs do not serialize data that is cached.  I'll 
also not that until yesterday we were missing FloatType

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala#L154

 

Christian, can you provide the schema of the fast and slow datasets?

 

On Thu, Apr 16, 2015 at 10:14 AM, Evo Eftimov evo.efti...@isecc.com wrote:

Michael what exactly do you mean by flattened version/structure here e.g.:

1. An Object with only primitive data types as attributes
2. An Object with  no more than one level of other Objects as attributes
3. An Array/List of primitive types
4. An Array/List of Objects

This question is in general about RDDs not necessarily RDDs in the context of 
SparkSQL

When answering can you also score how bad the performance of each of the above 
options is


-Original Message-
From: Christian Perez [mailto:christ...@svds.com]
Sent: Thursday, April 16, 2015 6:09 PM
To: Michael Armbrust
Cc: user
Subject: Re: Super slow caching in 1.3?

Hi Michael,

Good question! We checked 1.2 and found that it is also slow cacheing the same 
flat parquet file. Caching other file formats of the same data were faster by 
up to a factor of ~2. Note that the parquet file was created in Impala but the 
other formats were written by Spark SQL.

Cheers,

Christian

On Mon, Apr 6, 2015 at 6:17 PM, Michael Armbrust mich...@databricks.com wrote:
 Do you think you are seeing a regression from 1.2?  Also, are you
 caching nested data or flat rows?  The in-memory caching is not really
 designed for nested data and so performs pretty slowly here (its just
 falling back to kryo and even then there are some locking issues).

 If so, would it be possible to try caching a flattened version?

 CACHE TABLE flattenedTable AS SELECT ... FROM parquetTable

 On Mon, Apr 6, 2015 at 5:00 PM, Christian Perez christ...@svds.com wrote:

 Hi all,

 Has anyone else noticed very slow time to cache a Parquet file? It
 takes 14 s per 235 MB (1 block) uncompressed node local Parquet file
 on M2 EC2 instances. Or are my expectations way off...

 Cheers,

 Christian

 --
 Christian Perez
 Silicon Valley Data Science
 Data Analyst
 christ...@svds.com
 @cp_phd

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For
 additional commands, e-mail: user-h...@spark.apache.org





--
Christian Perez
Silicon Valley Data Science
Data Analyst
christ...@svds.com
@cp_phd

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org



 



Custom paritioning of DSTream

2015-04-20 Thread Evo Eftimov
Is the only way to implement a custom partitioning of DStream via the foreach
approach so to gain access to the actual RDDs comprising the DSTReam and
hence their paritionBy method 

DSTReam has only a repartition method accepting only the number of
partitions, BUT not the method of partitioning 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Custom-paritioning-of-DSTream-tp22574.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Can a map function return null

2015-04-19 Thread Evo Eftimov
In fact you can return “NULL” from your initial map and hence not resort to 
OptionalString at all 

 

From: Evo Eftimov [mailto:evo.efti...@isecc.com] 
Sent: Sunday, April 19, 2015 9:48 PM
To: 'Steve Lewis'
Cc: 'Olivier Girardot'; 'user@spark.apache.org'
Subject: RE: Can a map function return null

 

Well you can do another map to turn OptionalString into String as in the 
cases when Optional is empty you can store e.g. “NULL” as the value of the RDD 
element 

 

If this is not acceptable (based on the objectives of your architecture) and IF 
when returning plain null instead of Optional does throw Spark exception THEN 
as far as I am concerned, chess-mate 

 

From: Steve Lewis [mailto:lordjoe2...@gmail.com] 
Sent: Sunday, April 19, 2015 8:16 PM
To: Evo Eftimov
Cc: Olivier Girardot; user@spark.apache.org
Subject: Re: Can a map function return null

 

 

So you imagine something like this:

 

 JavaRDDString words = ...

 JavaRDD OptionalString wordsFiltered = words.map(new FunctionString, 
OptionalString() {
@Override
public OptionalString call(String s) throws Exception {
if ((s.length()) % 2 == 1) // drop strings of odd length
return Optional.empty();
else
return Optional.of(s);
}
});
 
That seems to return the wrong type a  JavaRDD OptionalString which cannot 
be used as a JavaRDDString which is what the next step expects

 

On Sun, Apr 19, 2015 at 12:17 PM, Evo Eftimov evo.efti...@isecc.com wrote:

I am on the move at the moment so i cant try it immediately but from previous 
memory / experience i think if you return plain null you will get a spark 
exception

 

Anyway yiu can try it and see what happens and then ask the question 

 

If you do get exception try Optional instead of plain null

 

 

Sent from Samsung Mobile

 

 Original message 

From: Olivier Girardot 

Date:2015/04/18 22:04 (GMT+00:00) 

To: Steve Lewis ,user@spark.apache.org 

Subject: Re: Can a map function return null 

 

You can return an RDD with null values inside, and afterwards filter on item 
!= null 
In scala (or even in Java 8) you'd rather use Option/Optional, and in Scala 
they're directly usable from Spark. 

Exemple : 

 sc.parallelize(1 to 1000).flatMap(item = if (item % 2 ==0) Some(item) else 
None).collect()

res0: Array[Int] = Array(2, 4, 6, )

Regards, 

Olivier.

 

Le sam. 18 avr. 2015 à 20:44, Steve Lewis lordjoe2...@gmail.com a écrit :

I find a number of cases where I have an JavaRDD and I wish to transform the 
data and depending on a test return 0 or one item (don't suggest a filter - the 
real case is more complex). So I currently do something like the following - 
perform a flatmap returning a list with 0 or 1 entry depending on the isUsed 
function.


 

 JavaRDDFoo original = ...

  JavaRDDFoo words = original.flatMap(new FlatMapFunctionFoo, Foo() {

@Override

public IterableFoo call(final Foo s) throws Exception {

ListFoo ret = new ArrayListFoo();

  if(isUsed(s))

   ret.add(transform(s));

return ret; // contains 0 items if isUsed is false

}

});

 

My question is can I do a map returning the transformed data and null if 
nothing is to be returned. as shown below - what does a Spark do with a map 
function returning null

 

JavaRDDFoo words = original.map(new MapFunctionString, String() {

@Override

  Foo  call(final Foo s) throws Exception {

ListFoo ret = new ArrayListFoo();

  if(isUsed(s))

   return transform(s);

return null; // not used - what happens now

}

});

 

 

 





 

-- 

Steven M. Lewis PhD

4221 105th Ave NE

Kirkland, WA 98033

206-384-1340 (cell)
Skype lordjoe_com



RE: Can a map function return null

2015-04-19 Thread Evo Eftimov
Well you can do another map to turn OptionalString into String as in the 
cases when Optional is empty you can store e.g. “NULL” as the value of the RDD 
element 

 

If this is not acceptable (based on the objectives of your architecture) and IF 
when returning plain null instead of Optional does throw Spark exception THEN 
as far as I am concerned, chess-mate 

 

From: Steve Lewis [mailto:lordjoe2...@gmail.com] 
Sent: Sunday, April 19, 2015 8:16 PM
To: Evo Eftimov
Cc: Olivier Girardot; user@spark.apache.org
Subject: Re: Can a map function return null

 

 

So you imagine something like this:

 

 JavaRDDString words = ...

 JavaRDD OptionalString wordsFiltered = words.map(new FunctionString, 
OptionalString() {
@Override
public OptionalString call(String s) throws Exception {
if ((s.length()) % 2 == 1) // drop strings of odd length
return Optional.empty();
else
return Optional.of(s);
}
});
 
That seems to return the wrong type a  JavaRDD OptionalString which cannot 
be used as a JavaRDDString which is what the next step expects

 

On Sun, Apr 19, 2015 at 12:17 PM, Evo Eftimov evo.efti...@isecc.com wrote:

I am on the move at the moment so i cant try it immediately but from previous 
memory / experience i think if you return plain null you will get a spark 
exception

 

Anyway yiu can try it and see what happens and then ask the question 

 

If you do get exception try Optional instead of plain null

 

 

Sent from Samsung Mobile

 

 Original message 

From: Olivier Girardot 

Date:2015/04/18 22:04 (GMT+00:00) 

To: Steve Lewis ,user@spark.apache.org 

Subject: Re: Can a map function return null 

 

You can return an RDD with null values inside, and afterwards filter on item 
!= null 
In scala (or even in Java 8) you'd rather use Option/Optional, and in Scala 
they're directly usable from Spark. 

Exemple : 

 sc.parallelize(1 to 1000).flatMap(item = if (item % 2 ==0) Some(item) else 
None).collect()

res0: Array[Int] = Array(2, 4, 6, )

Regards, 

Olivier.

 

Le sam. 18 avr. 2015 à 20:44, Steve Lewis lordjoe2...@gmail.com a écrit :

I find a number of cases where I have an JavaRDD and I wish to transform the 
data and depending on a test return 0 or one item (don't suggest a filter - the 
real case is more complex). So I currently do something like the following - 
perform a flatmap returning a list with 0 or 1 entry depending on the isUsed 
function.


 

 JavaRDDFoo original = ...

  JavaRDDFoo words = original.flatMap(new FlatMapFunctionFoo, Foo() {

@Override

public IterableFoo call(final Foo s) throws Exception {

ListFoo ret = new ArrayListFoo();

  if(isUsed(s))

   ret.add(transform(s));

return ret; // contains 0 items if isUsed is false

}

});

 

My question is can I do a map returning the transformed data and null if 
nothing is to be returned. as shown below - what does a Spark do with a map 
function returning null

 

JavaRDDFoo words = original.map(new MapFunctionString, String() {

@Override

  Foo  call(final Foo s) throws Exception {

ListFoo ret = new ArrayListFoo();

  if(isUsed(s))

   return transform(s);

return null; // not used - what happens now

}

});

 

 

 





 

-- 

Steven M. Lewis PhD

4221 105th Ave NE

Kirkland, WA 98033

206-384-1340 (cell)
Skype lordjoe_com



RE: How to do dispatching in Streaming?

2015-04-17 Thread Evo Eftimov
Good use of analogies J

 

Yep friction (or entropy in general) exists in everything – but hey by adding 
and doing “more work” at the same time (aka more powerful rockets) some people 
have overcome the friction of the air and even got as far as the moon and 
beyond 

 

It is all about the bottom lime / the big picture – in some models, friction 
can be a huge factor in the equations in some other it is just part of the 
landscape  

 

From: Gerard Maas [mailto:gerard.m...@gmail.com] 
Sent: Friday, April 17, 2015 10:12 AM
To: Evo Eftimov
Cc: Tathagata Das; Jianshi Huang; user; Shao, Saisai; Huang Jie
Subject: Re: How to do dispatching in Streaming?

 

Evo,

 

In Spark there's a fixed scheduling cost for each task, so more tasks mean an 
increased bottom line for the same amount of work being done. The number of 
tasks per batch interval should relate to the CPU resources available for the 
job following the same 'rule of thumbs' than for Spark, being 2-3 times the #of 
cores.  

 

In that physical model presented before, I think we could consider this 
scheduling cost as a form of friction.

 

-kr, Gerard.

 

On Thu, Apr 16, 2015 at 11:47 AM, Evo Eftimov evo.efti...@isecc.com wrote:

Ooops – what does “more work” mean in a Parallel Programming paradigm and does 
it always translate in “inefficiency” 

 

Here are a few laws of physics in this space:

 

1.   More Work if done AT THE SAME time AND fully utilizes the cluster 
resources is a GOOD thing 

2.   More Work which can not be done at the same time and has to be 
processed sequentially is a BAD thing 

 

So the key is whether it is about 1 or 2 and if it is about 1, whether it leads 
to e.g. Higher Throughput and Lower Latency or not 

 

Regards,

Evo Eftimov 

 

From: Gerard Maas [mailto:gerard.m...@gmail.com] 
Sent: Thursday, April 16, 2015 10:41 AM
To: Evo Eftimov
Cc: Tathagata Das; Jianshi Huang; user; Shao, Saisai; Huang Jie


Subject: Re: How to do dispatching in Streaming?

 

From experience, I'd recommend using the  dstream.foreachRDD method and doing 
the filtering within that context. Extending the example of TD, something like 
this:

 

dstream.foreachRDD { rdd =

   rdd.cache()   

   messageType.foreach (msgTyp = 

   val selection = rdd.filter(msgTyp.match(_))

selection.foreach { ... }

}

   rdd.unpersist()

}

 

I would discourage the use of:

MessageType1DStream = MainDStream.filter(message type1)

MessageType2DStream = MainDStream.filter(message type2)

MessageType3DStream = MainDStream.filter(message type3)

 

Because it will be a lot more work to process on the spark side. 

Each DSteam will schedule tasks for each partition, resulting in #dstream x 
#partitions x #stages tasks instead of the #partitions x #stages with the 
approach presented above.

 

 

-kr, Gerard.

 

On Thu, Apr 16, 2015 at 10:57 AM, Evo Eftimov evo.efti...@isecc.com wrote:

And yet another way is to demultiplex at one point which will yield separate 
DStreams for each message type which you can then process in independent DAG 
pipelines in the following way:

 

MessageType1DStream = MainDStream.filter(message type1)

MessageType2DStream = MainDStream.filter(message type2)

MessageType3DStream = MainDStream.filter(message type3)

 

Then proceed your processing independently with MessageType1DStream, 
MessageType2DStream and MessageType3DStream ie each of them is a starting point 
of a new DAG pipeline running in parallel

 

From: Tathagata Das [mailto:t...@databricks.com] 
Sent: Thursday, April 16, 2015 12:52 AM
To: Jianshi Huang
Cc: user; Shao, Saisai; Huang Jie
Subject: Re: How to do dispatching in Streaming?

 

It may be worthwhile to do architect the computation in a different way. 

 

dstream.foreachRDD { rdd = 

   rdd.foreach { record = 

  // do different things for each record based on filters

   }

}

 

TD

 

On Sun, Apr 12, 2015 at 7:52 PM, Jianshi Huang jianshi.hu...@gmail.com wrote:

Hi,

 

I have a Kafka topic that contains dozens of different types of messages. And 
for each one I'll need to create a DStream for it.

 

Currently I have to filter the Kafka stream over and over, which is very 
inefficient.

 

So what's the best way to do dispatching in Spark Streaming? (one DStream - 
multiple DStreams)

 




Thanks,

-- 

Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/

 

 

 



RE: General configurations on CDH5 to achieve maximum Spark Performance

2015-04-17 Thread Evo Eftimov
And btw if you suspect this is a YARN issue you can always launch and use
Spark in a Standalone Mode which uses its own embedded cluster resource
manager - this is possible even when Spark has been deployed on CDH under
YARN by the pre-canned install  scripts of CDH

 

To achieve that:

 

1.   Launch spark in a standalone mode using its shell scripts - you may
get some script errors initially because of some mess in the scripts created
by the pre-canned CDH YARN install - which you can fix by editing the spark
standalone scripts - the error messages will guide you 

2.   Submit a spark job to the standalone spark master rather than YARN
and this is it 

3.   Measure and compare the performance under YARN, Spark Standalone on
Cluster and Spark Standalone on a single machine  

 

Bear in mind that running Spark in  Standalone mode while using YARN for all
other apps would not be very appropriate in production because the two
resource managers will be competing for cluster resources - but you can use
this for performance tests  

 

From: Evo Eftimov [mailto:evo.efti...@isecc.com] 
Sent: Thursday, April 16, 2015 6:28 PM
To: 'Manish Gupta 8'; 'user@spark.apache.org'
Subject: RE: General configurations on CDH5 to achieve maximum Spark
Performance

 

Essentially to change the performance yield of software cluster
infrastructure platform like spark you play with different permutations of:

 

-  Number of CPU cores used by Spark Executors on every cluster node

-  Amount of RAM allocated for each executor   

 

How disks and network IO is used also plays a role but that is influenced
more by app algorithmic aspects rather than YARN / Spark cluster config
(except rack awreness etc) 

 

When Spark runs under the management of YARN the above is controlled /
allocated by YARN 

 

https://spark.apache.org/docs/latest/running-on-yarn.html 

 

From: Manish Gupta 8 [mailto:mgupt...@sapient.com] 
Sent: Thursday, April 16, 2015 6:21 PM
To: Evo Eftimov; user@spark.apache.org
Subject: RE: General configurations on CDH5 to achieve maximum Spark
Performance

 

Thanks Evo. Yes, my concern is only regarding the infrastructure
configurations. Basically, configuring Yarn (Node manager) + Spark is must
and default setting never works. And what really happens, is we make changes
as and when an issue is faced because of one of the numerous default
configuration settings. And every time, we have to google a lot to decide on
the right values J

 

Again, my issue is very centric to running Spark on Yarn in CDH5
environment.

 

If you know a link that talks about optimum configuration settings for
running Spark on Yarn (CDH5), please share the same. 

 

Thanks,

Manish

 

From: Evo Eftimov [mailto:evo.efti...@isecc.com] 
Sent: Thursday, April 16, 2015 10:38 PM
To: Manish Gupta 8; user@spark.apache.org
Subject: RE: General configurations on CDH5 to achieve maximum Spark
Performance

 

Well there are a number of performance tuning guidelines in dedicated
sections of the spark documentation - have you read and applied them 

 

Secondly any performance problem within a distributed cluster environment
has two aspects:

 

1.   Infrastructure 

2.   App Algorithms 

 

You seem to be focusing only on 1, but what you said about the performance
differences between single laptop and cluster points to potential
algorithmic inefficiency in your app when e.g. distributing and performing
parallel processing and data. On a single laptop data moves instantly
between workers because all worker instances run in the memory of a single
machine ..

 

Regards,

Evo Eftimov  

 

From: Manish Gupta 8 [mailto:mgupt...@sapient.com] 
Sent: Thursday, April 16, 2015 6:03 PM
To: user@spark.apache.org
Subject: General configurations on CDH5 to achieve maximum Spark Performance

 

Hi,

 

Is there a document/link that describes the general configuration settings
to achieve maximum Spark Performance while running on CDH5? In our
environment, we did lot of changes (and still doing it) to get decent
performance otherwise our 6 node dev cluster with default configurations,
lags behind a single laptop running Spark.

 

Having a standard checklist (taking a base node size of 4-CPU, 16GB RAM)
would be really great. Any pointers in this regards will be really helpful.

 

We are running Spark 1.2.0 on CDH 5.3.0.

 

Thanks,

 

Manish Gupta

Specialist | Sapient Global Markets

 

Green Boulevard (Tower C)

3rd  4th Floor

Plot No. B-9A, Sector 62

Noida 201 301

Uttar Pradesh, India

 

Tel: +91 (120) 479 5000

Fax: +91 (120) 479 5001

Email: mgupt...@sapient.com

 

sapientglobalmarkets.com

 

The information transmitted is intended only for the person or entity to
which it is addressed and may contain confidential and/or privileged
material. Any review, retransmission, dissemination or other use of, or
taking of any action in reliance upon, this information by persons or
entities other than the intended recipient

RE: How to do dispatching in Streaming?

2015-04-16 Thread Evo Eftimov
And yet another way is to demultiplex at one point which will yield separate 
DStreams for each message type which you can then process in independent DAG 
pipelines in the following way:

 

MessageType1DStream = MainDStream.filter(message type1)

MessageType2DStream = MainDStream.filter(message type2)

MessageType3DStream = MainDStream.filter(message type3)

 

Then proceed your processing independently with MessageType1DStream, 
MessageType2DStream and MessageType3DStream ie each of them is a starting point 
of a new DAG pipeline running in parallel

 

From: Tathagata Das [mailto:t...@databricks.com] 
Sent: Thursday, April 16, 2015 12:52 AM
To: Jianshi Huang
Cc: user; Shao, Saisai; Huang Jie
Subject: Re: How to do dispatching in Streaming?

 

It may be worthwhile to do architect the computation in a different way. 

 

dstream.foreachRDD { rdd = 

   rdd.foreach { record = 

  // do different things for each record based on filters

   }

}

 

TD

 

On Sun, Apr 12, 2015 at 7:52 PM, Jianshi Huang jianshi.hu...@gmail.com wrote:

Hi,

 

I have a Kafka topic that contains dozens of different types of messages. And 
for each one I'll need to create a DStream for it.

 

Currently I have to filter the Kafka stream over and over, which is very 
inefficient.

 

So what's the best way to do dispatching in Spark Streaming? (one DStream - 
multiple DStreams)

 




Thanks,

-- 

Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/

 



RE: How to do dispatching in Streaming?

2015-04-16 Thread Evo Eftimov
Ooops – what does “more work” mean in a Parallel Programming paradigm and does 
it always translate in “inefficiency” 

 

Here are a few laws of physics in this space:

 

1.   More Work if done AT THE SAME time AND fully utilizes the cluster 
resources is a GOOD thing 

2.   More Work which can not be done at the same time and has to be 
processed sequentially is a BAD thing 

 

So the key is whether it is about 1 or 2 and if it is about 1, whether it leads 
to e.g. Higher Throughput and Lower Latency or not 

 

Regards,

Evo Eftimov 

 

From: Gerard Maas [mailto:gerard.m...@gmail.com] 
Sent: Thursday, April 16, 2015 10:41 AM
To: Evo Eftimov
Cc: Tathagata Das; Jianshi Huang; user; Shao, Saisai; Huang Jie
Subject: Re: How to do dispatching in Streaming?

 

From experience, I'd recommend using the  dstream.foreachRDD method and doing 
the filtering within that context. Extending the example of TD, something like 
this:

 

dstream.foreachRDD { rdd =

   rdd.cache()   

   messageType.foreach (msgTyp = 

   val selection = rdd.filter(msgTyp.match(_))

selection.foreach { ... }

}

   rdd.unpersist()

}

 

I would discourage the use of:

MessageType1DStream = MainDStream.filter(message type1)

MessageType2DStream = MainDStream.filter(message type2)

MessageType3DStream = MainDStream.filter(message type3)

 

Because it will be a lot more work to process on the spark side. 

Each DSteam will schedule tasks for each partition, resulting in #dstream x 
#partitions x #stages tasks instead of the #partitions x #stages with the 
approach presented above.

 

 

-kr, Gerard.

 

On Thu, Apr 16, 2015 at 10:57 AM, Evo Eftimov evo.efti...@isecc.com wrote:

And yet another way is to demultiplex at one point which will yield separate 
DStreams for each message type which you can then process in independent DAG 
pipelines in the following way:

 

MessageType1DStream = MainDStream.filter(message type1)

MessageType2DStream = MainDStream.filter(message type2)

MessageType3DStream = MainDStream.filter(message type3)

 

Then proceed your processing independently with MessageType1DStream, 
MessageType2DStream and MessageType3DStream ie each of them is a starting point 
of a new DAG pipeline running in parallel

 

From: Tathagata Das [mailto:t...@databricks.com] 
Sent: Thursday, April 16, 2015 12:52 AM
To: Jianshi Huang
Cc: user; Shao, Saisai; Huang Jie
Subject: Re: How to do dispatching in Streaming?

 

It may be worthwhile to do architect the computation in a different way. 

 

dstream.foreachRDD { rdd = 

   rdd.foreach { record = 

  // do different things for each record based on filters

   }

}

 

TD

 

On Sun, Apr 12, 2015 at 7:52 PM, Jianshi Huang jianshi.hu...@gmail.com wrote:

Hi,

 

I have a Kafka topic that contains dozens of different types of messages. And 
for each one I'll need to create a DStream for it.

 

Currently I have to filter the Kafka stream over and over, which is very 
inefficient.

 

So what's the best way to do dispatching in Spark Streaming? (one DStream - 
multiple DStreams)

 




Thanks,

-- 

Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/

 

 



RE: How to do dispatching in Streaming?

2015-04-16 Thread Evo Eftimov
Also you can have each message type in a different topic (needs to be arranged 
upstream from your Spark Streaming app ie in the publishing systems and the 
messaging brokers) and then for each topic you can have a dedicated instance of 
InputReceiverDStream which will be the start of a dedicated DAG pipeline 
instance for every message type. Moreover each such DAG pipeline instance will 
run in parallel with the others 

 

From: Tathagata Das [mailto:t...@databricks.com] 
Sent: Thursday, April 16, 2015 12:52 AM
To: Jianshi Huang
Cc: user; Shao, Saisai; Huang Jie
Subject: Re: How to do dispatching in Streaming?

 

It may be worthwhile to do architect the computation in a different way. 

 

dstream.foreachRDD { rdd = 

   rdd.foreach { record = 

  // do different things for each record based on filters

   }

}

 

TD

 

On Sun, Apr 12, 2015 at 7:52 PM, Jianshi Huang jianshi.hu...@gmail.com wrote:

Hi,

 

I have a Kafka topic that contains dozens of different types of messages. And 
for each one I'll need to create a DStream for it.

 

Currently I have to filter the Kafka stream over and over, which is very 
inefficient.

 

So what's the best way to do dispatching in Spark Streaming? (one DStream - 
multiple DStreams)

 




Thanks,

-- 

Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github  Blog: http://huangjs.github.com/

 



RE: Data partitioning and node tracking in Spark-GraphX

2015-04-16 Thread Evo Eftimov
How do you intend to fetch the required data - from within Spark or using
an app / code / module outside Spark  

-Original Message-
From: mas [mailto:mas.ha...@gmail.com] 
Sent: Thursday, April 16, 2015 4:08 PM
To: user@spark.apache.org
Subject: Data partitioning and node tracking in Spark-GraphX

I have a big data file, i aim to create index on the data. I want to
partition the data based on user defined function in Spark-GraphX (Scala). 
Further i want to keep track the node on which a particular data partition
is send and being processed so i could fetch the required data by accessing
the right node and data partition.
How can i achieve this? 
Any help in this regard will be highly appreciated.



--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Data-partitioning-and-no
de-tracking-in-Spark-GraphX-tp22527.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: How to join RDD keyValuePairs efficiently

2015-04-16 Thread Evo Eftimov
Ningjun, to speed up your current design you can do the following:

1.partition the large doc RDD based on the hash function on the key ie the docid

2. persist the large dataset in memory to be available for subsequent queries 
without reloading and repartitioning for every search query 

3. partition the small doc dataset in the same way - this will result in 
collocated small and large RDD partitions with the same key

4. run the join - the match is not going to be sequential it is based on hash 
of the key moreover RDD elements with the same key will be collocated on the 
same cluster node


OR simply go for Sean suggestion - under the hood it works in a slightly 
different way - the filter is executed in mappers running in parallel on every 
node and also by passing the small doc IDs to each filter (mapper) you 
essentially replicate them on every node so each mapper instance has its own 
copy and runs with it when filtering 

And finally you can prototype both options described above and measure and 
compare their performance   

-Original Message-
From: Sean Owen [mailto:so...@cloudera.com] 
Sent: Thursday, April 16, 2015 5:02 PM
To: Wang, Ningjun (LNG-NPV)
Cc: user@spark.apache.org
Subject: Re: How to join RDD keyValuePairs efficiently

This would be much, much faster if your set of IDs was simply a Set, and you 
passed that to a filter() call that just filtered in the docs that matched an 
ID in the set.

On Thu, Apr 16, 2015 at 4:51 PM, Wang, Ningjun (LNG-NPV) 
ningjun.w...@lexisnexis.com wrote:
 Does anybody have a solution for this?





 From: Wang, Ningjun (LNG-NPV)
 Sent: Tuesday, April 14, 2015 10:41 AM
 To: user@spark.apache.org
 Subject: How to join RDD keyValuePairs efficiently



 I have an RDD that contains millions of Document objects. Each 
 document has an unique Id that is a string. I need to find the documents by 
 ids quickly.
 Currently I used RDD join as follow



 First I save the RDD as object file



 allDocs : RDD[Document] = getDocs()  // this RDD contains 7 million 
 Document objects

 allDocs.saveAsObjectFile(“/temp/allDocs.obj”)



 Then I wrote a function to find documents by Ids



 def findDocumentsByIds(docids: RDD[String]) = {

 // docids contains less than 100 item

 val allDocs : RDD[Document] =sc.objectFile[Document]( 
 (“/temp/allDocs.obj”)

 val idAndDocs = allDocs.keyBy(d = dv.id)

 docids.map(id = (id,id)).join(idAndDocs).map(t = t._2._2)

 }



 I found that this is very slow. I suspect it scan the entire 7 million 
 Document objects in “/temp/allDocs.obj” sequentially to find the 
 desired document.



 Is there any efficient way to do this?



 One option I am thinking is that instead of storing the RDD[Document] 
 as object file, I store each document in a separate file with filename 
 equal to the docid. This way I can find a document quickly by docid. 
 However this means I need to save the RDD to 7 million small file 
 which will take a very long time to save and may cause IO problems with so 
 many small files.



 Is there any other way?







 Ningjun

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Data partitioning and node tracking in Spark-GraphX

2015-04-16 Thread Evo Eftimov
Well you can use a [Key, Value] RDD and partition it based on hash function on 
the Key and even a specific number of partitions (and hence cluster nodes). 
This will a) index the data, b) divide it and send it to multiple nodes. Re 
your last requirement - in a cluster programming environment/framework your app 
code should not be bothered on which physical node exactly, a partition resides 
 

 

Regards

Evo Eftimov

 

From: MUHAMMAD AAMIR [mailto:mas.ha...@gmail.com] 
Sent: Thursday, April 16, 2015 4:20 PM
To: Evo Eftimov
Cc: user@spark.apache.org
Subject: Re: Data partitioning and node tracking in Spark-GraphX

 

I want to use Spark functions/APIs to do this task. My basic purpose is to 
index the data and divide and send it to multiple nodes. Then at the time of 
accessing i want to reach the right node and data partition. I don't have any 
clue how to do this.

Thanks,

 

On Thu, Apr 16, 2015 at 5:13 PM, Evo Eftimov evo.efti...@isecc.com wrote:

How do you intend to fetch the required data - from within Spark or using
an app / code / module outside Spark

-Original Message-
From: mas [mailto:mas.ha...@gmail.com]
Sent: Thursday, April 16, 2015 4:08 PM
To: user@spark.apache.org
Subject: Data partitioning and node tracking in Spark-GraphX

I have a big data file, i aim to create index on the data. I want to
partition the data based on user defined function in Spark-GraphX (Scala).
Further i want to keep track the node on which a particular data partition
is send and being processed so i could fetch the required data by accessing
the right node and data partition.
How can i achieve this?
Any help in this regard will be highly appreciated.



--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Data-partitioning-and-no 
http://apache-spark-user-list.1001560.n3.nabble.com/Data-partitioning-and-node-tracking-in-Spark-GraphX-tp22527.html
 
de-tracking-in-Spark-GraphX-tp22527.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
commands, e-mail: user-h...@spark.apache.org







 

-- 

Regards,
Muhammad Aamir


CONFIDENTIALITY:This email is intended solely for the person(s) named and may 
be confidential and/or privileged.If you are not the intended recipient,please 
delete it,notify me and do not copy,use,or disclose its content.



RE: Data partitioning and node tracking in Spark-GraphX

2015-04-16 Thread Evo Eftimov
Well you can have a two level index structure, still without any need for 
physical cluster node awareness

 

Level 1 Index is the previously described partitioned [K,V] RDD – this gets you 
to the value (RDD element) you need on the respective cluster node

 

Level 2 Index – it will be built and reside within the Value of each [K,V] RDD 
element – so after you retrieve the appropriate Element from the appropriate 
cluster node based on Level 1 Index, then you query the Value in the element 
based on Level 2 Index  

 

From: MUHAMMAD AAMIR [mailto:mas.ha...@gmail.com] 
Sent: Thursday, April 16, 2015 4:32 PM
To: Evo Eftimov
Cc: user@spark.apache.org
Subject: Re: Data partitioning and node tracking in Spark-GraphX

 

Thanks a lot for the reply. Indeed it is useful but to be more precise i have 
3D data and want to index it using octree. Thus i aim to build a two level 
indexing mechanism i.e. First at global level i want to partition and send the 
data to the nodes then at node level i again want to use octree to inded my 
data at local level.

Could you please elaborate the solution in this context ?

 

On Thu, Apr 16, 2015 at 5:23 PM, Evo Eftimov evo.efti...@isecc.com wrote:

Well you can use a [Key, Value] RDD and partition it based on hash function on 
the Key and even a specific number of partitions (and hence cluster nodes). 
This will a) index the data, b) divide it and send it to multiple nodes. Re 
your last requirement - in a cluster programming environment/framework your app 
code should not be bothered on which physical node exactly, a partition resides 
 

 

Regards

Evo Eftimov

 

From: MUHAMMAD AAMIR [mailto:mas.ha...@gmail.com] 
Sent: Thursday, April 16, 2015 4:20 PM
To: Evo Eftimov
Cc: user@spark.apache.org
Subject: Re: Data partitioning and node tracking in Spark-GraphX

 

I want to use Spark functions/APIs to do this task. My basic purpose is to 
index the data and divide and send it to multiple nodes. Then at the time of 
accessing i want to reach the right node and data partition. I don't have any 
clue how to do this.

Thanks,

 

On Thu, Apr 16, 2015 at 5:13 PM, Evo Eftimov evo.efti...@isecc.com wrote:

How do you intend to fetch the required data - from within Spark or using
an app / code / module outside Spark

-Original Message-
From: mas [mailto:mas.ha...@gmail.com]
Sent: Thursday, April 16, 2015 4:08 PM
To: user@spark.apache.org
Subject: Data partitioning and node tracking in Spark-GraphX

I have a big data file, i aim to create index on the data. I want to
partition the data based on user defined function in Spark-GraphX (Scala).
Further i want to keep track the node on which a particular data partition
is send and being processed so i could fetch the required data by accessing
the right node and data partition.
How can i achieve this?
Any help in this regard will be highly appreciated.



--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Data-partitioning-and-no 
http://apache-spark-user-list.1001560.n3.nabble.com/Data-partitioning-and-node-tracking-in-Spark-GraphX-tp22527.html
 
de-tracking-in-Spark-GraphX-tp22527.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
commands, e-mail: user-h...@spark.apache.org





 

-- 

Regards,
Muhammad Aamir


CONFIDENTIALITY:This email is intended solely for the person(s) named and may 
be confidential and/or privileged.If you are not the intended recipient,please 
delete it,notify me and do not copy,use,or disclose its content.





 

-- 

Regards,
Muhammad Aamir


CONFIDENTIALITY:This email is intended solely for the person(s) named and may 
be confidential and/or privileged.If you are not the intended recipient,please 
delete it,notify me and do not copy,use,or disclose its content.



RE: General configurations on CDH5 to achieve maximum Spark Performance

2015-04-16 Thread Evo Eftimov
Well there are a number of performance tuning guidelines in dedicated
sections of the spark documentation - have you read and applied them 

 

Secondly any performance problem within a distributed cluster environment
has two aspects:

 

1.   Infrastructure 

2.   App Algorithms 

 

You seem to be focusing only on 1, but what you said about the performance
differences between single laptop and cluster points to potential
algorithmic inefficiency in your app when e.g. distributing and performing
parallel processing and data. On a single laptop data moves instantly
between workers because all worker instances run in the memory of a single
machine ..

 

Regards,

Evo Eftimov  

 

From: Manish Gupta 8 [mailto:mgupt...@sapient.com] 
Sent: Thursday, April 16, 2015 6:03 PM
To: user@spark.apache.org
Subject: General configurations on CDH5 to achieve maximum Spark Performance

 

Hi,

 

Is there a document/link that describes the general configuration settings
to achieve maximum Spark Performance while running on CDH5? In our
environment, we did lot of changes (and still doing it) to get decent
performance otherwise our 6 node dev cluster with default configurations,
lags behind a single laptop running Spark.

 

Having a standard checklist (taking a base node size of 4-CPU, 16GB RAM)
would be really great. Any pointers in this regards will be really helpful.

 

We are running Spark 1.2.0 on CDH 5.3.0.

 

Thanks,

 

Manish Gupta

Specialist | Sapient Global Markets

 

Green Boulevard (Tower C)

3rd  4th Floor

Plot No. B-9A, Sector 62

Noida 201 301

Uttar Pradesh, India

 

Tel: +91 (120) 479 5000

Fax: +91 (120) 479 5001

Email: mgupt...@sapient.com

 

sapientglobalmarkets.com

 

The information transmitted is intended only for the person or entity to
which it is addressed and may contain confidential and/or privileged
material. Any review, retransmission, dissemination or other use of, or
taking of any action in reliance upon, this information by persons or
entities other than the intended recipient is prohibited. If you received
this in error, please contact the sender and delete the material from any
(your) computer.

 

***Please consider the environment before printing this email.***

 



RE: Super slow caching in 1.3?

2015-04-16 Thread Evo Eftimov
Michael what exactly do you mean by flattened version/structure here e.g.:

1. An Object with only primitive data types as attributes
2. An Object with  no more than one level of other Objects as attributes 
3. An Array/List of primitive types 
4. An Array/List of Objects 

This question is in general about RDDs not necessarily RDDs in the context of 
SparkSQL

When answering can you also score how bad the performance of each of the above 
options is  

-Original Message-
From: Christian Perez [mailto:christ...@svds.com] 
Sent: Thursday, April 16, 2015 6:09 PM
To: Michael Armbrust
Cc: user
Subject: Re: Super slow caching in 1.3?

Hi Michael,

Good question! We checked 1.2 and found that it is also slow cacheing the same 
flat parquet file. Caching other file formats of the same data were faster by 
up to a factor of ~2. Note that the parquet file was created in Impala but the 
other formats were written by Spark SQL.

Cheers,

Christian

On Mon, Apr 6, 2015 at 6:17 PM, Michael Armbrust mich...@databricks.com wrote:
 Do you think you are seeing a regression from 1.2?  Also, are you 
 caching nested data or flat rows?  The in-memory caching is not really 
 designed for nested data and so performs pretty slowly here (its just 
 falling back to kryo and even then there are some locking issues).

 If so, would it be possible to try caching a flattened version?

 CACHE TABLE flattenedTable AS SELECT ... FROM parquetTable

 On Mon, Apr 6, 2015 at 5:00 PM, Christian Perez christ...@svds.com wrote:

 Hi all,

 Has anyone else noticed very slow time to cache a Parquet file? It 
 takes 14 s per 235 MB (1 block) uncompressed node local Parquet file 
 on M2 EC2 instances. Or are my expectations way off...

 Cheers,

 Christian

 --
 Christian Perez
 Silicon Valley Data Science
 Data Analyst
 christ...@svds.com
 @cp_phd

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For 
 additional commands, e-mail: user-h...@spark.apache.org





--
Christian Perez
Silicon Valley Data Science
Data Analyst
christ...@svds.com
@cp_phd

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: General configurations on CDH5 to achieve maximum Spark Performance

2015-04-16 Thread Evo Eftimov
Essentially to change the performance yield of software cluster
infrastructure platform like spark you play with different permutations of:

 

-  Number of CPU cores used by Spark Executors on every cluster node

-  Amount of RAM allocated for each executor   

 

How disks and network IO is used also plays a role but that is influenced
more by app algorithmic aspects rather than YARN / Spark cluster config
(except rack awreness etc) 

 

When Spark runs under the management of YARN the above is controlled /
allocated by YARN 

 

https://spark.apache.org/docs/latest/running-on-yarn.html 

 

From: Manish Gupta 8 [mailto:mgupt...@sapient.com] 
Sent: Thursday, April 16, 2015 6:21 PM
To: Evo Eftimov; user@spark.apache.org
Subject: RE: General configurations on CDH5 to achieve maximum Spark
Performance

 

Thanks Evo. Yes, my concern is only regarding the infrastructure
configurations. Basically, configuring Yarn (Node manager) + Spark is must
and default setting never works. And what really happens, is we make changes
as and when an issue is faced because of one of the numerous default
configuration settings. And every time, we have to google a lot to decide on
the right values J

 

Again, my issue is very centric to running Spark on Yarn in CDH5
environment.

 

If you know a link that talks about optimum configuration settings for
running Spark on Yarn (CDH5), please share the same. 

 

Thanks,

Manish

 

From: Evo Eftimov [mailto:evo.efti...@isecc.com] 
Sent: Thursday, April 16, 2015 10:38 PM
To: Manish Gupta 8; user@spark.apache.org
Subject: RE: General configurations on CDH5 to achieve maximum Spark
Performance

 

Well there are a number of performance tuning guidelines in dedicated
sections of the spark documentation - have you read and applied them 

 

Secondly any performance problem within a distributed cluster environment
has two aspects:

 

1.   Infrastructure 

2.   App Algorithms 

 

You seem to be focusing only on 1, but what you said about the performance
differences between single laptop and cluster points to potential
algorithmic inefficiency in your app when e.g. distributing and performing
parallel processing and data. On a single laptop data moves instantly
between workers because all worker instances run in the memory of a single
machine ..

 

Regards,

Evo Eftimov  

 

From: Manish Gupta 8 [mailto:mgupt...@sapient.com] 
Sent: Thursday, April 16, 2015 6:03 PM
To: user@spark.apache.org
Subject: General configurations on CDH5 to achieve maximum Spark Performance

 

Hi,

 

Is there a document/link that describes the general configuration settings
to achieve maximum Spark Performance while running on CDH5? In our
environment, we did lot of changes (and still doing it) to get decent
performance otherwise our 6 node dev cluster with default configurations,
lags behind a single laptop running Spark.

 

Having a standard checklist (taking a base node size of 4-CPU, 16GB RAM)
would be really great. Any pointers in this regards will be really helpful.

 

We are running Spark 1.2.0 on CDH 5.3.0.

 

Thanks,

 

Manish Gupta

Specialist | Sapient Global Markets

 

Green Boulevard (Tower C)

3rd  4th Floor

Plot No. B-9A, Sector 62

Noida 201 301

Uttar Pradesh, India

 

Tel: +91 (120) 479 5000

Fax: +91 (120) 479 5001

Email: mgupt...@sapient.com

 

sapientglobalmarkets.com

 

The information transmitted is intended only for the person or entity to
which it is addressed and may contain confidential and/or privileged
material. Any review, retransmission, dissemination or other use of, or
taking of any action in reliance upon, this information by persons or
entities other than the intended recipient is prohibited. If you received
this in error, please contact the sender and delete the material from any
(your) computer.

 

***Please consider the environment before printing this email.***

 



RE: saveAsTextFile

2015-04-16 Thread Evo Eftimov
The reason for this is as follows:

 

1.   You are saving data on HDFS

2.   HDFS as a cluster/server side Service has a Single Writer / Multiple 
Reader multithreading model 

3.   Hence each thread of execution in Spark has to write to a separate 
file in HDFS

4.   Moreover the RDDs are partitioned across cluster nodes and operated 
upon by multiple threads there and on top of that in Spark Streaming you have 
many micro-batch RDDs streaming in all the time as part of a DStream  

 

If you want fine / detailed management of the writing to HDFS you can implement 
your own HDFS adapter and invoke it in forEachRDD and foreach 

 

Regards

Evo Eftimov  

 

From: Vadim Bichutskiy [mailto:vadim.bichuts...@gmail.com] 
Sent: Thursday, April 16, 2015 6:33 PM
To: user@spark.apache.org
Subject: saveAsTextFile

 

I am using Spark Streaming where during each micro-batch I output data to S3 
using

saveAsTextFile. Right now each batch of data is put into its own directory 
containing

2 objects, _SUCCESS and part-0.

 

How do I output each batch into a common directory?

 

Thanks,

Vadim

  
https://mailfoogae.appspot.com/t?sender=admFkaW0uYmljaHV0c2tpeUBnbWFpbC5jb20%3Dtype=zerocontentguid=057349bb-29a2-4296-82b7-c52b46ae19f6
 ᐧ

  
http://t.signauxcinq.com/e1t/o/5/f18dQhb0S7ks8dDMPbW2n0x6l2B9gXrN7sKj6v5dsrxW7gbZX-8q-6ZdVdnPvF2zlZNzW3hF9wD1k1H6H0?si=5533377798602752pi=ff283f35-99c4-4b15-dd07-91df78970bf8
 



RE: saveAsTextFile

2015-04-16 Thread Evo Eftimov
Nop Sir, it is possible - check my reply earlier 

-Original Message-
From: Sean Owen [mailto:so...@cloudera.com] 
Sent: Thursday, April 16, 2015 6:35 PM
To: Vadim Bichutskiy
Cc: user@spark.apache.org
Subject: Re: saveAsTextFile

You can't, since that's how it's designed to work. Batches are saved in 
different files, which are really directories containing partitions, as is 
common in Hadoop. You can move them later, or just read them where they are.

On Thu, Apr 16, 2015 at 6:32 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com 
wrote:
 I am using Spark Streaming where during each micro-batch I output data 
 to S3 using saveAsTextFile. Right now each batch of data is put into 
 its own directory containing
 2 objects, _SUCCESS and part-0.

 How do I output each batch into a common directory?

 Thanks,
 Vadim
 ᐧ

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: saveAsTextFile

2015-04-16 Thread Evo Eftimov
Basically you need to unbundle the elements of the RDD and then store them 
wherever you want - Use foreacPartition and then foreach 

-Original Message-
From: Vadim Bichutskiy [mailto:vadim.bichuts...@gmail.com] 
Sent: Thursday, April 16, 2015 6:39 PM
To: Sean Owen
Cc: user@spark.apache.org
Subject: Re: saveAsTextFile

Thanks Sean. I want to load each batch into Redshift. What's the best/most 
efficient way to do that?

Vadim


 On Apr 16, 2015, at 1:35 PM, Sean Owen so...@cloudera.com wrote:
 
 You can't, since that's how it's designed to work. Batches are saved 
 in different files, which are really directories containing 
 partitions, as is common in Hadoop. You can move them later, or just 
 read them where they are.
 
 On Thu, Apr 16, 2015 at 6:32 PM, Vadim Bichutskiy 
 vadim.bichuts...@gmail.com wrote:
 I am using Spark Streaming where during each micro-batch I output 
 data to S3 using saveAsTextFile. Right now each batch of data is put 
 into its own directory containing
 2 objects, _SUCCESS and part-0.
 
 How do I output each batch into a common directory?
 
 Thanks,
 Vadim
 ᐧ

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: saveAsTextFile

2015-04-16 Thread Evo Eftimov
Also to juggle even further the multithreading model of both spark and HDFS you 
can even publish the data from spark first to a message broker e.g. kafka from 
where a predetermined number (from 1 to infinity) of parallel consumers will 
retrieve and store in HDFS in one or more finely controlled files and 
directories  

 

From: Vadim Bichutskiy [mailto:vadim.bichuts...@gmail.com] 
Sent: Thursday, April 16, 2015 6:45 PM
To: Evo Eftimov
Cc: user@spark.apache.org
Subject: Re: saveAsTextFile

 

Thanks Evo for your detailed explanation.


On Apr 16, 2015, at 1:38 PM, Evo Eftimov evo.efti...@isecc.com wrote:

The reason for this is as follows:

 

1.  You are saving data on HDFS

2.  HDFS as a cluster/server side Service has a Single Writer / Multiple 
Reader multithreading model 

3.  Hence each thread of execution in Spark has to write to a separate file 
in HDFS

4.  Moreover the RDDs are partitioned across cluster nodes and operated 
upon by multiple threads there and on top of that in Spark Streaming you have 
many micro-batch RDDs streaming in all the time as part of a DStream  

 

If you want fine / detailed management of the writing to HDFS you can implement 
your own HDFS adapter and invoke it in forEachRDD and foreach 

 

Regards

Evo Eftimov  

 

From: Vadim Bichutskiy [mailto:vadim.bichuts...@gmail.com] 
Sent: Thursday, April 16, 2015 6:33 PM
To: user@spark.apache.org
Subject: saveAsTextFile

 

I am using Spark Streaming where during each micro-batch I output data to S3 
using

saveAsTextFile. Right now each batch of data is put into its own directory 
containing

2 objects, _SUCCESS and part-0.

 

How do I output each batch into a common directory?

 

Thanks,

Vadim

  
https://mailfoogae.appspot.com/t?sender=admFkaW0uYmljaHV0c2tpeUBnbWFpbC5jb20%3Dtype=zerocontentguid=057349bb-29a2-4296-82b7-c52b46ae19f6
 ᐧ

  
http://t.signauxcinq.com/e1t/o/5/f18dQhb0S7ks8dDMPbW2n0x6l2B9gXrN7sKj6v5dsrxW7gbZX-8q-6ZdVdnPvF2zlZNzW3hF9wD1k1H6H0?si=5533377798602752pi=ff283f35-99c4-4b15-dd07-91df78970bf8
 



RE: How to join RDD keyValuePairs efficiently

2015-04-16 Thread Evo Eftimov
You can use

def  partitionBy(partitioner: Partitioner): RDD[(K, V)] 
Return a copy of the RDD partitioned using the specified partitioner

The https://github.com/amplab/spark-indexedrdd stuff looks pretty cool and is 
something which adds valuable functionality to spark e.g. the point lookups 
PROVIDED it can be executed from within function running on worker executors 

Can somebody from DataBricks sched more light here  

-Original Message-
From: Wang, Ningjun (LNG-NPV) [mailto:ningjun.w...@lexisnexis.com] 
Sent: Thursday, April 16, 2015 9:39 PM
To: user@spark.apache.org
Subject: RE: How to join RDD keyValuePairs efficiently

Evo

 partition the large doc RDD based on the hash function on the key ie 
the docid

What API to use to do this?

By the way, loading the entire dataset to memory cause OutOfMemory problem 
because it is too large (I only have one machine with 16GB and 4 cores).

I found something called IndexedRDD on the web 
https://github.com/amplab/spark-indexedrdd

Has anybody use it?

Ningjun

-Original Message-
From: Evo Eftimov [mailto:evo.efti...@isecc.com]
Sent: Thursday, April 16, 2015 12:18 PM
To: 'Sean Owen'; Wang, Ningjun (LNG-NPV)
Cc: user@spark.apache.org
Subject: RE: How to join RDD keyValuePairs efficiently

Ningjun, to speed up your current design you can do the following:

1.partition the large doc RDD based on the hash function on the key ie the docid

2. persist the large dataset in memory to be available for subsequent queries 
without reloading and repartitioning for every search query 

3. partition the small doc dataset in the same way - this will result in 
collocated small and large RDD partitions with the same key

4. run the join - the match is not going to be sequential it is based on hash 
of the key moreover RDD elements with the same key will be collocated on the 
same cluster node


OR simply go for Sean suggestion - under the hood it works in a slightly 
different way - the filter is executed in mappers running in parallel on every 
node and also by passing the small doc IDs to each filter (mapper) you 
essentially replicate them on every node so each mapper instance has its own 
copy and runs with it when filtering 

And finally you can prototype both options described above and measure and 
compare their performance   

-Original Message-
From: Sean Owen [mailto:so...@cloudera.com]
Sent: Thursday, April 16, 2015 5:02 PM
To: Wang, Ningjun (LNG-NPV)
Cc: user@spark.apache.org
Subject: Re: How to join RDD keyValuePairs efficiently

This would be much, much faster if your set of IDs was simply a Set, and you 
passed that to a filter() call that just filtered in the docs that matched an 
ID in the set.

On Thu, Apr 16, 2015 at 4:51 PM, Wang, Ningjun (LNG-NPV) 
ningjun.w...@lexisnexis.com wrote:
 Does anybody have a solution for this?





 From: Wang, Ningjun (LNG-NPV)
 Sent: Tuesday, April 14, 2015 10:41 AM
 To: user@spark.apache.org
 Subject: How to join RDD keyValuePairs efficiently



 I have an RDD that contains millions of Document objects. Each 
 document has an unique Id that is a string. I need to find the documents by 
 ids quickly.
 Currently I used RDD join as follow



 First I save the RDD as object file



 allDocs : RDD[Document] = getDocs()  // this RDD contains 7 million 
 Document objects

 allDocs.saveAsObjectFile(“/temp/allDocs.obj”)



 Then I wrote a function to find documents by Ids



 def findDocumentsByIds(docids: RDD[String]) = {

 // docids contains less than 100 item

 val allDocs : RDD[Document] =sc.objectFile[Document](
 (“/temp/allDocs.obj”)

 val idAndDocs = allDocs.keyBy(d = dv.id)

 docids.map(id = (id,id)).join(idAndDocs).map(t = t._2._2)

 }



 I found that this is very slow. I suspect it scan the entire 7 million 
 Document objects in “/temp/allDocs.obj” sequentially to find the 
 desired document.



 Is there any efficient way to do this?



 One option I am thinking is that instead of storing the RDD[Document] 
 as object file, I store each document in a separate file with filename 
 equal to the docid. This way I can find a document quickly by docid.
 However this means I need to save the RDD to 7 million small file 
 which will take a very long time to save and may cause IO problems with so 
 many small files.



 Is there any other way?







 Ningjun

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org



--
T ususcib, -mil uerunubcrbesprkapch.og
Fo adiioalcomads emal:usr...@sar.aace.rg



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



  1   2   >