Re: spark standalone mode problem about executor add and removed again and again!

2019-07-17 Thread Amit Sharma
Do you have dynamic resource allocation enabled?


On Wednesday, July 17, 2019, zenglong chen  wrote:

> Hi,all,
> My standalone mode has two slaves.When I submit my job,the
> localhost slave is working well,but second slave do add and remove executor
> action always!The log are below:
>2019-07-17 10:51:38,889 INFO client.StandaloneAppClient$ClientEndpoint:
> Executor updated: app-20190717105135-0008/2 is now EXITED (Command exited
> with code 1)
> 2019-07-17 10:51:38,890 INFO cluster.StandaloneSchedulerBackend: Executor
> app-20190717105135-0008/2 removed: Command exited with code 1
> 2019-07-17 10:51:38,890 INFO storage.BlockManagerMasterEndpoint: Trying
> to remove executor 2 from BlockManagerMaster.
> 2019-07-17 10:51:38,890 INFO storage.BlockManagerMaster: Removal of
> executor 2 requested
> 2019-07-17 10:51:38,891 INFO 
> cluster.CoarseGrainedSchedulerBackend$DriverEndpoint:
> Asked to remove non-existent executor 2
> 2019-07-17 10:51:38,892 INFO client.StandaloneAppClient$ClientEndpoint:
> Executor added: app-20190717105135-0008/3 on 
> worker-20190717093045-172.22.9.179-40573
> (172.22.9.179:40573) with 8 core(s)
> 2019-07-17 10:51:38,892 INFO cluster.StandaloneSchedulerBackend: Granted
> executor ID app-20190717105135-0008/3 on hostPort 172.22.9.179:40573 with
> 8 core(s), 12.0 GB RAM
> 2019-07-17 10:51:38,893 INFO client.StandaloneAppClient$ClientEndpoint:
> Executor updated: app-20190717105135-0008/3 is now RUNNING
> 2019-07-17 10:51:40,521 INFO client.StandaloneAppClient$ClientEndpoint:
> Executor updated: app-20190717105135-0008/3 is now EXITED (Command exited
> with code 1)
> 2019-07-17 10:51:40,521 INFO cluster.StandaloneSchedulerBackend: Executor
> app-20190717105135-0008/3 removed: Command exited with code 1
> 2019-07-17 10:51:40,521 INFO client.StandaloneAppClient$ClientEndpoint:
> Executor added: app-20190717105135-0008/4 on 
> worker-20190717093045-172.22.9.179-40573
> (172.22.9.179:40573) with 8 core(s)
> 2019-07-17 10:51:40,521 INFO storage.BlockManagerMaster: Removal of
> executor 3 requested
> 2019-07-17 10:51:40,521 INFO 
> cluster.CoarseGrainedSchedulerBackend$DriverEndpoint:
> Asked to remove non-existent executor 3
> 2019-07-17 10:51:40,521 INFO storage.BlockManagerMasterEndpoint: Trying
> to remove executor 3 from BlockManagerMaster.
> 2019-07-17 10:51:40,521 INFO cluster.StandaloneSchedulerBackend: Granted
> executor ID app-20190717105135-0008/4 on hostPort 172.22.9.179:40573 with
> 8 core(s), 12.0 GB RAM
> 2019-07-17 10:51:40,523 INFO client.StandaloneAppClient$ClientEndpoint:
> Executor updated: app-20190717105135-0008/4 is now RUNNING
>
>
> And the slave output are below:
>19/07/17 10:47:12 INFO ExecutorRunner: Launch command:
> "/home/ubuntu/data/jdk/jre/bin/java" "-cp" "/home/ubuntu/spark-2.4.3/
> conf/:/home/ubuntu/spark-2.4.3/jars/*" "-Xmx12288M"
> "-Dspark.driver.port=40335" 
> "org.apache.spark.executor.CoarseGrainedExecutorBackend"
> "--driver-url" "spark://CoarseGrainedScheduler@iZk1a7vdbutmi6eluaskecZ:40335"
> "--executor-id" "18" "--hostname" "172.22.9.179" "--cores" "8" "--app-id"
> "app-20190717104645-0007" "--worker-url" "spark://Worker@172.22.9.179:
> 40573"
> 19/07/17 10:47:13 INFO Worker: Executor app-20190717104645-0007/18
> finished with state EXITED message Command exited with code 1 exitStatus 1
> 19/07/17 10:47:13 INFO ExternalShuffleBlockResolver: Clean up non-shuffle
> files associated with the finished executor 18
> 19/07/17 10:47:13 INFO ExternalShuffleBlockResolver: Executor is not
> registered (appId=app-20190717104645-0007, execId=18)
> 19/07/17 10:47:13 INFO Worker: Asked to launch executor
> app-20190717104645-0007/19 for ph_user_pre_level
> 19/07/17 10:47:13 INFO SecurityManager: Changing view acls to: ubuntu
> 19/07/17 10:47:13 INFO SecurityManager: Changing modify acls to: ubuntu
> 19/07/17 10:47:13 INFO SecurityManager: Changing view acls groups to:
> 19/07/17 10:47:13 INFO SecurityManager: Changing modify acls groups to:
> 19/07/17 10:47:13 INFO SecurityManager: SecurityManager: authentication
> disabled; ui acls disabled; users  with view permissions: Set(ubuntu);
> groups with view permissions: Set(); users  with modify permissions:
> Set(ubuntu); groups with modify permissions: Set()
> 19/07/17 10:47:14 INFO ExecutorRunner: Launch command:
> "/home/ubuntu/data/jdk/jre/bin/java" "-cp" "/home/ubuntu/spark-2.4.3/
> conf/:/home/ubuntu/spark-2.4.3/jars/*" "-Xmx12288M"
> "-Dspark.driver.port=40335" 
> "org.apache.spark.executor.CoarseGrainedExecutorBackend"
> "--driver-url" "spark://CoarseGrainedScheduler@iZk1a7vdbutmi6eluaskecZ:40335"
> "--executor-id" "19" "--hostname" "172.22.9.179" "--cores" "8" "--app-id"
> "app-20190717104645-0007" "--worker-url" "spark://Worker@172.22.9.179:
> 40573"
>
> I guest that  may be  "Dspark.driver.port=40335" problem.
> Any suggests will help me a lot!
>


RE: [Beginner] Run compute on large matrices and return the result in seconds?

2019-07-17 Thread Gautham Acharya
Users can also request random rows in those columns. So a user can request a 
subset of the matrix (N rows and N columns) which would change the value of the 
correlation coefficient.

From: Jerry Vinokurov [mailto:grapesmo...@gmail.com]
Sent: Wednesday, July 17, 2019 1:27 PM
To: user@spark.apache.org
Subject: Re: [Beginner] Run compute on large matrices and return the result in 
seconds?

CAUTION: This email originated from outside the Allen Institute. Please do not 
click links or open attachments unless you've validated the sender and know the 
content is safe.

Maybe I'm not understanding something about this use case, but why is 
precomputation not an option? Is it because the matrices themselves change? 
Because if the matrices are constant, then I think precomputation would work 
for you even if the users request random correlations. You can just store the 
resulting column with the matrix id, row, and column as the key for retrieval.

My general impression is that while you could do this in Spark, it's probably 
not the correct framework for carrying out this kind of operation. This feels 
more like a job for something like OpenMP than for Spark.


On Wed, Jul 17, 2019 at 3:42 PM Gautham Acharya 
mailto:gauth...@alleninstitute.org>> wrote:
As I said in the my initial message, precomputing is not an option.

Retrieving only the top/bottom N most correlated is an option – would that 
speed up the results?

Our SLAs are soft – slight variations (+- 15 seconds) will not cause issues.

--gautham
From: Patrick McCarthy 
[mailto:pmccar...@dstillery.com]
Sent: Wednesday, July 17, 2019 12:39 PM
To: Gautham Acharya 
mailto:gauth...@alleninstitute.org>>
Cc: Bobby Evans mailto:reva...@gmail.com>>; Steven Stetzler 
mailto:steven.stetz...@gmail.com>>; 
user@spark.apache.org
Subject: Re: [Beginner] Run compute on large matrices and return the result in 
seconds?

CAUTION: This email originated from outside the Allen Institute. Please do not 
click links or open attachments unless you've validated the sender and know the 
content is safe.

Do you really need the results of all 3MM computations, or only the top- and 
bottom-most correlation coefficients? Could correlations be computed on a 
sample and from that estimate a distribution of coefficients? Would it make 
sense to precompute offline and instead focus on fast key-value retrieval, like 
ElasticSearch or ScyllaDB?

Spark is a compute framework rather than a serving backend, I don't think it's 
designed with retrieval SLAs in mind and you may find those SLAs difficult to 
maintain.

On Wed, Jul 17, 2019 at 3:14 PM Gautham Acharya 
mailto:gauth...@alleninstitute.org>> wrote:
Thanks for the reply, Bobby.

I’ve received notice that we can probably tolerate response times of up to 30 
seconds. Would this be more manageable? 5 seconds was an initial ask, but 20-30 
seconds is also a reasonable response time for our use case.

With the new SLA, do you think that we can easily perform this computation in 
spark?
--gautham

From: Bobby Evans [mailto:reva...@gmail.com]
Sent: Wednesday, July 17, 2019 7:06 AM
To: Steven Stetzler 
mailto:steven.stetz...@gmail.com>>
Cc: Gautham Acharya 
mailto:gauth...@alleninstitute.org>>; 
user@spark.apache.org
Subject: Re: [Beginner] Run compute on large matrices and return the result in 
seconds?

CAUTION: This email originated from outside the Allen Institute. Please do not 
click links or open attachments unless you've validated the sender and know the 
content is safe.

Let's do a few quick rules of thumb to get an idea of what kind of processing 
power you will need in general to do what you want.

You need 3,000,000 ints by 50,000 rows.  Each int is 4 bytes so that ends up 
being about 560 GB that you need to fully process in 5 seconds.

If you are reading this from spinning disks (which average about 80 MB/s) you 
would need at least 1,450 disks to just read the data in 5 seconds (that number 
can vary a lot depending on the storage format and your compression ratio).
If you are reading the data over a network (let's say 10GigE even though in 
practice you cannot get that in the cloud easily) you would need about 90 NICs 
just to read the data in 5 seconds, again depending on the compression ration 
this may be lower.
If you assume you have a cluster where it all fits in main memory and have 
cached all of the data in memory (which in practice I have seen on most modern 
systems at somewhere between 12 and 16 GB/sec) you would need between 7 and 10 
machines just to read through the data once in 5 seconds.  Spark also stores 
cached data compressed so you might need less as well.

All the numbers fit with things that spark should be able to handle, but a 5 
second SLA is very tight for this amount of data.

Can you 

Re: [Beginner] Run compute on large matrices and return the result in seconds?

2019-07-17 Thread Jerry Vinokurov
Maybe I'm not understanding something about this use case, but why is
precomputation not an option? Is it because the matrices themselves change?
Because if the matrices are constant, then I think precomputation would
work for you even if the users request random correlations. You can just
store the resulting column with the matrix id, row, and column as the key
for retrieval.

My general impression is that while you could do this in Spark, it's
probably not the correct framework for carrying out this kind of operation.
This feels more like a job for something like OpenMP than for Spark.


On Wed, Jul 17, 2019 at 3:42 PM Gautham Acharya 
wrote:

> As I said in the my initial message, precomputing is not an option.
>
>
>
> Retrieving only the top/bottom N most correlated is an option – would that
> speed up the results?
>
>
>
> Our SLAs are soft – slight variations (+- 15 seconds) will not cause
> issues.
>
>
>
> --gautham
>
> *From:* Patrick McCarthy [mailto:pmccar...@dstillery.com]
> *Sent:* Wednesday, July 17, 2019 12:39 PM
> *To:* Gautham Acharya 
> *Cc:* Bobby Evans ; Steven Stetzler <
> steven.stetz...@gmail.com>; user@spark.apache.org
> *Subject:* Re: [Beginner] Run compute on large matrices and return the
> result in seconds?
>
>
>
> *CAUTION:* This email originated from outside the Allen Institute. Please
> do not click links or open attachments unless you've validated the sender
> and know the content is safe.
> --
>
> Do you really need the results of all 3MM computations, or only the top-
> and bottom-most correlation coefficients? Could correlations be computed on
> a sample and from that estimate a distribution of coefficients? Would it
> make sense to precompute offline and instead focus on fast key-value
> retrieval, like ElasticSearch or ScyllaDB?
>
>
>
> Spark is a compute framework rather than a serving backend, I don't think
> it's designed with retrieval SLAs in mind and you may find those SLAs
> difficult to maintain.
>
>
>
> On Wed, Jul 17, 2019 at 3:14 PM Gautham Acharya <
> gauth...@alleninstitute.org> wrote:
>
> Thanks for the reply, Bobby.
>
>
>
> I’ve received notice that we can probably tolerate response times of up to
> 30 seconds. Would this be more manageable? 5 seconds was an initial ask,
> but 20-30 seconds is also a reasonable response time for our use case.
>
>
>
> With the new SLA, do you think that we can easily perform this computation
> in spark?
>
> --gautham
>
>
>
> *From:* Bobby Evans [mailto:reva...@gmail.com]
> *Sent:* Wednesday, July 17, 2019 7:06 AM
> *To:* Steven Stetzler 
> *Cc:* Gautham Acharya ; user@spark.apache.org
> *Subject:* Re: [Beginner] Run compute on large matrices and return the
> result in seconds?
>
>
>
> *CAUTION:* This email originated from outside the Allen Institute. Please
> do not click links or open attachments unless you've validated the sender
> and know the content is safe.
> --
>
> Let's do a few quick rules of thumb to get an idea of what kind of
> processing power you will need in general to do what you want.
>
>
>
> You need 3,000,000 ints by 50,000 rows.  Each int is 4 bytes so that ends
> up being about 560 GB that you need to fully process in 5 seconds.
>
>
>
> If you are reading this from spinning disks (which average about 80 MB/s)
> you would need at least 1,450 disks to just read the data in 5 seconds
> (that number can vary a lot depending on the storage format and your
> compression ratio).
>
> If you are reading the data over a network (let's say 10GigE even though
> in practice you cannot get that in the cloud easily) you would need about
> 90 NICs just to read the data in 5 seconds, again depending on the
> compression ration this may be lower.
>
> If you assume you have a cluster where it all fits in main memory and have
> cached all of the data in memory (which in practice I have seen on most
> modern systems at somewhere between 12 and 16 GB/sec) you would need
> between 7 and 10 machines just to read through the data once in 5 seconds.
> Spark also stores cached data compressed so you might need less as well.
>
>
>
> All the numbers fit with things that spark should be able to handle, but a
> 5 second SLA is very tight for this amount of data.
>
>
>
> Can you make this work with Spark?  probably. Does spark have something
> built in that will make this fast and simple for you?  I doubt it you have
> some very tight requirements and will likely have to write something custom
> to make it work the way you want.
>
>
>
>
>
> On Thu, Jul 11, 2019 at 4:12 PM Steven Stetzler 
> wrote:
>
> Hi Gautham,
>
>
>
> I am a beginner spark user too and I may not have a complete understanding
> of your question, but I thought I would start a discussion anyway. Have you
> looked into using Spark's built in Correlation function? (
> https://spark.apache.org/docs/latest/ml-statistics.html
> 

RE: [Beginner] Run compute on large matrices and return the result in seconds?

2019-07-17 Thread Gautham Acharya
As I said in the my initial message, precomputing is not an option.

Retrieving only the top/bottom N most correlated is an option – would that 
speed up the results?

Our SLAs are soft – slight variations (+- 15 seconds) will not cause issues.

--gautham
From: Patrick McCarthy [mailto:pmccar...@dstillery.com]
Sent: Wednesday, July 17, 2019 12:39 PM
To: Gautham Acharya 
Cc: Bobby Evans ; Steven Stetzler 
; user@spark.apache.org
Subject: Re: [Beginner] Run compute on large matrices and return the result in 
seconds?

CAUTION: This email originated from outside the Allen Institute. Please do not 
click links or open attachments unless you've validated the sender and know the 
content is safe.

Do you really need the results of all 3MM computations, or only the top- and 
bottom-most correlation coefficients? Could correlations be computed on a 
sample and from that estimate a distribution of coefficients? Would it make 
sense to precompute offline and instead focus on fast key-value retrieval, like 
ElasticSearch or ScyllaDB?

Spark is a compute framework rather than a serving backend, I don't think it's 
designed with retrieval SLAs in mind and you may find those SLAs difficult to 
maintain.

On Wed, Jul 17, 2019 at 3:14 PM Gautham Acharya 
mailto:gauth...@alleninstitute.org>> wrote:
Thanks for the reply, Bobby.

I’ve received notice that we can probably tolerate response times of up to 30 
seconds. Would this be more manageable? 5 seconds was an initial ask, but 20-30 
seconds is also a reasonable response time for our use case.

With the new SLA, do you think that we can easily perform this computation in 
spark?
--gautham

From: Bobby Evans [mailto:reva...@gmail.com]
Sent: Wednesday, July 17, 2019 7:06 AM
To: Steven Stetzler 
mailto:steven.stetz...@gmail.com>>
Cc: Gautham Acharya 
mailto:gauth...@alleninstitute.org>>; 
user@spark.apache.org
Subject: Re: [Beginner] Run compute on large matrices and return the result in 
seconds?

CAUTION: This email originated from outside the Allen Institute. Please do not 
click links or open attachments unless you've validated the sender and know the 
content is safe.

Let's do a few quick rules of thumb to get an idea of what kind of processing 
power you will need in general to do what you want.

You need 3,000,000 ints by 50,000 rows.  Each int is 4 bytes so that ends up 
being about 560 GB that you need to fully process in 5 seconds.

If you are reading this from spinning disks (which average about 80 MB/s) you 
would need at least 1,450 disks to just read the data in 5 seconds (that number 
can vary a lot depending on the storage format and your compression ratio).
If you are reading the data over a network (let's say 10GigE even though in 
practice you cannot get that in the cloud easily) you would need about 90 NICs 
just to read the data in 5 seconds, again depending on the compression ration 
this may be lower.
If you assume you have a cluster where it all fits in main memory and have 
cached all of the data in memory (which in practice I have seen on most modern 
systems at somewhere between 12 and 16 GB/sec) you would need between 7 and 10 
machines just to read through the data once in 5 seconds.  Spark also stores 
cached data compressed so you might need less as well.

All the numbers fit with things that spark should be able to handle, but a 5 
second SLA is very tight for this amount of data.

Can you make this work with Spark?  probably. Does spark have something built 
in that will make this fast and simple for you?  I doubt it you have some very 
tight requirements and will likely have to write something custom to make it 
work the way you want.


On Thu, Jul 11, 2019 at 4:12 PM Steven Stetzler 
mailto:steven.stetz...@gmail.com>> wrote:
Hi Gautham,

I am a beginner spark user too and I may not have a complete understanding of 
your question, but I thought I would start a discussion anyway. Have you looked 
into using Spark's built in Correlation function? 
(https://spark.apache.org/docs/latest/ml-statistics.html)
 This might let you get what you want (per-row correlation against the same 
matrix) without having to deal with parallelizing the computation yourself. 
Also, I think the question of how quick you can get your results is largely a 
data access question vs how fast is Spark question. As long as you can exploit 
data parallelism (i.e. you can partition up your data), Spark will give you a 
speedup. You can imagine that if you had a large machine with many cores and 
~100 GB of RAM (e.g. a m5.12xlarge EC2 instance), you could fit your problem in 
main 

Re: [Beginner] Run compute on large matrices and return the result in seconds?

2019-07-17 Thread Patrick McCarthy
Do you really need the results of all 3MM computations, or only the top-
and bottom-most correlation coefficients? Could correlations be computed on
a sample and from that estimate a distribution of coefficients? Would it
make sense to precompute offline and instead focus on fast key-value
retrieval, like ElasticSearch or ScyllaDB?

Spark is a compute framework rather than a serving backend, I don't think
it's designed with retrieval SLAs in mind and you may find those SLAs
difficult to maintain.

On Wed, Jul 17, 2019 at 3:14 PM Gautham Acharya 
wrote:

> Thanks for the reply, Bobby.
>
>
>
> I’ve received notice that we can probably tolerate response times of up to
> 30 seconds. Would this be more manageable? 5 seconds was an initial ask,
> but 20-30 seconds is also a reasonable response time for our use case.
>
>
>
> With the new SLA, do you think that we can easily perform this computation
> in spark?
>
> --gautham
>
>
>
> *From:* Bobby Evans [mailto:reva...@gmail.com]
> *Sent:* Wednesday, July 17, 2019 7:06 AM
> *To:* Steven Stetzler 
> *Cc:* Gautham Acharya ; user@spark.apache.org
> *Subject:* Re: [Beginner] Run compute on large matrices and return the
> result in seconds?
>
>
>
> *CAUTION:* This email originated from outside the Allen Institute. Please
> do not click links or open attachments unless you've validated the sender
> and know the content is safe.
> --
>
> Let's do a few quick rules of thumb to get an idea of what kind of
> processing power you will need in general to do what you want.
>
>
>
> You need 3,000,000 ints by 50,000 rows.  Each int is 4 bytes so that ends
> up being about 560 GB that you need to fully process in 5 seconds.
>
>
>
> If you are reading this from spinning disks (which average about 80 MB/s)
> you would need at least 1,450 disks to just read the data in 5 seconds
> (that number can vary a lot depending on the storage format and your
> compression ratio).
>
> If you are reading the data over a network (let's say 10GigE even though
> in practice you cannot get that in the cloud easily) you would need about
> 90 NICs just to read the data in 5 seconds, again depending on the
> compression ration this may be lower.
>
> If you assume you have a cluster where it all fits in main memory and have
> cached all of the data in memory (which in practice I have seen on most
> modern systems at somewhere between 12 and 16 GB/sec) you would need
> between 7 and 10 machines just to read through the data once in 5 seconds.
> Spark also stores cached data compressed so you might need less as well.
>
>
>
> All the numbers fit with things that spark should be able to handle, but a
> 5 second SLA is very tight for this amount of data.
>
>
>
> Can you make this work with Spark?  probably. Does spark have something
> built in that will make this fast and simple for you?  I doubt it you have
> some very tight requirements and will likely have to write something custom
> to make it work the way you want.
>
>
>
>
>
> On Thu, Jul 11, 2019 at 4:12 PM Steven Stetzler 
> wrote:
>
> Hi Gautham,
>
>
>
> I am a beginner spark user too and I may not have a complete understanding
> of your question, but I thought I would start a discussion anyway. Have you
> looked into using Spark's built in Correlation function? (
> https://spark.apache.org/docs/latest/ml-statistics.html
> )
> This might let you get what you want (per-row correlation against the same
> matrix) without having to deal with parallelizing the computation yourself.
> Also, I think the question of how quick you can get your results is largely
> a data access question vs how fast is Spark question. As long as you can
> exploit data parallelism (i.e. you can partition up your data), Spark will
> give you a speedup. You can imagine that if you had a large machine with
> many cores and ~100 GB of RAM (e.g. a m5.12xlarge EC2 instance), you could
> fit your problem in main memory and perform your computation with thread
> based parallelism. This might get your result relatively quickly. For a
> dedicated application with well constrained memory and compute
> requirements, it might not be a bad option to do everything on one machine
> as well. Accessing an external database and distributing work over a large
> number of computers can add overhead that might be out of your control.
>
>
>
> Thanks,
>
> Steven
>
>
>
> On Thu, Jul 11, 2019 at 9:24 AM Gautham Acharya <
> gauth...@alleninstitute.org> wrote:
>
> Ping? I would really appreciate advice on this! Thank you!
>
>
>
> *From:* Gautham Acharya
> *Sent:* Tuesday, July 9, 2019 4:22 PM
> *To:* user@spark.apache.org
> *Subject:* [Beginner] Run compute on large matrices and return the result
> 

RE: [Beginner] Run compute on large matrices and return the result in seconds?

2019-07-17 Thread Gautham Acharya
Thanks for the reply, Bobby.

I’ve received notice that we can probably tolerate response times of up to 30 
seconds. Would this be more manageable? 5 seconds was an initial ask, but 20-30 
seconds is also a reasonable response time for our use case.

With the new SLA, do you think that we can easily perform this computation in 
spark?
--gautham

From: Bobby Evans [mailto:reva...@gmail.com]
Sent: Wednesday, July 17, 2019 7:06 AM
To: Steven Stetzler 
Cc: Gautham Acharya ; user@spark.apache.org
Subject: Re: [Beginner] Run compute on large matrices and return the result in 
seconds?

CAUTION: This email originated from outside the Allen Institute. Please do not 
click links or open attachments unless you've validated the sender and know the 
content is safe.

Let's do a few quick rules of thumb to get an idea of what kind of processing 
power you will need in general to do what you want.

You need 3,000,000 ints by 50,000 rows.  Each int is 4 bytes so that ends up 
being about 560 GB that you need to fully process in 5 seconds.

If you are reading this from spinning disks (which average about 80 MB/s) you 
would need at least 1,450 disks to just read the data in 5 seconds (that number 
can vary a lot depending on the storage format and your compression ratio).
If you are reading the data over a network (let's say 10GigE even though in 
practice you cannot get that in the cloud easily) you would need about 90 NICs 
just to read the data in 5 seconds, again depending on the compression ration 
this may be lower.
If you assume you have a cluster where it all fits in main memory and have 
cached all of the data in memory (which in practice I have seen on most modern 
systems at somewhere between 12 and 16 GB/sec) you would need between 7 and 10 
machines just to read through the data once in 5 seconds.  Spark also stores 
cached data compressed so you might need less as well.

All the numbers fit with things that spark should be able to handle, but a 5 
second SLA is very tight for this amount of data.

Can you make this work with Spark?  probably. Does spark have something built 
in that will make this fast and simple for you?  I doubt it you have some very 
tight requirements and will likely have to write something custom to make it 
work the way you want.


On Thu, Jul 11, 2019 at 4:12 PM Steven Stetzler 
mailto:steven.stetz...@gmail.com>> wrote:
Hi Gautham,

I am a beginner spark user too and I may not have a complete understanding of 
your question, but I thought I would start a discussion anyway. Have you looked 
into using Spark's built in Correlation function? 
(https://spark.apache.org/docs/latest/ml-statistics.html)
 This might let you get what you want (per-row correlation against the same 
matrix) without having to deal with parallelizing the computation yourself. 
Also, I think the question of how quick you can get your results is largely a 
data access question vs how fast is Spark question. As long as you can exploit 
data parallelism (i.e. you can partition up your data), Spark will give you a 
speedup. You can imagine that if you had a large machine with many cores and 
~100 GB of RAM (e.g. a m5.12xlarge EC2 instance), you could fit your problem in 
main memory and perform your computation with thread based parallelism. This 
might get your result relatively quickly. For a dedicated application with well 
constrained memory and compute requirements, it might not be a bad option to do 
everything on one machine as well. Accessing an external database and 
distributing work over a large number of computers can add overhead that might 
be out of your control.

Thanks,
Steven

On Thu, Jul 11, 2019 at 9:24 AM Gautham Acharya 
mailto:gauth...@alleninstitute.org>> wrote:
Ping? I would really appreciate advice on this! Thank you!

From: Gautham Acharya
Sent: Tuesday, July 9, 2019 4:22 PM
To: user@spark.apache.org
Subject: [Beginner] Run compute on large matrices and return the result in 
seconds?


This is my first email to this mailing list, so I apologize if I made any 
errors.



My team's going to be building an application and I'm investigating some 
options for distributed compute systems. We want to be performing computes on 
large matrices.



The requirements are as follows:



1. The matrices can be expected to be up to 50,000 columns x 3 million 
rows. The values are all integers (except for the row/column headers).

2. The application needs to select a specific row, and calculate the 
correlation coefficient ( 

Re: event log directory(spark-history) filled by large .inprogress files for spark streaming applications

2019-07-17 Thread Shahid K. I.
Hi,
With the current design, eventlogs are not ideal for long running streaming
applications. So, it is better then to disable the eventlogs. There was  a
proposal for splitting the eventlogs based on size/Job/query for long
running applications,  not sure about the followup for that.

Regards,
Shahid

On Tue, 16 Jul 2019, 3:38 pm raman gugnani, 
wrote:

> HI ,
>
> I have long running spark streaming jobs.
> Event log directories are getting filled with .inprogress files.
> Is there fix or work around for spark streaming.
>
> There is also one jira raised for the same by one reporter.
>
> https://issues.apache.org/jira/browse/SPARK-22783
>
> --
> Raman Gugnani
>
> 8588892293
> Principal Engineer
> *ixigo.com *
>


Re: event log directory(spark-history) filled by large .inprogress files for spark streaming applications

2019-07-17 Thread Artur Sukhenko
Hi.
There is a workaround for that.
You can disable event logs for Spark Streaming applications.


On Tue, Jul 16, 2019 at 1:08 PM raman gugnani 
wrote:

> HI ,
>
> I have long running spark streaming jobs.
> Event log directories are getting filled with .inprogress files.
> Is there fix or work around for spark streaming.
>
> There is also one jira raised for the same by one reporter.
>
> https://issues.apache.org/jira/browse/SPARK-22783
>
> --
> Raman Gugnani
>
> 8588892293
> Principal Engineer
> *ixigo.com *
>


Re: [Beginner] Run compute on large matrices and return the result in seconds?

2019-07-17 Thread Bobby Evans
Let's do a few quick rules of thumb to get an idea of what kind of
processing power you will need in general to do what you want.

You need 3,000,000 ints by 50,000 rows.  Each int is 4 bytes so that ends
up being about 560 GB that you need to fully process in 5 seconds.

If you are reading this from spinning disks (which average about 80 MB/s)
you would need at least 1,450 disks to just read the data in 5 seconds
(that number can vary a lot depending on the storage format and your
compression ratio).
If you are reading the data over a network (let's say 10GigE even though in
practice you cannot get that in the cloud easily) you would need about 90
NICs just to read the data in 5 seconds, again depending on the compression
ration this may be lower.
If you assume you have a cluster where it all fits in main memory and have
cached all of the data in memory (which in practice I have seen on most
modern systems at somewhere between 12 and 16 GB/sec) you would need
between 7 and 10 machines just to read through the data once in 5 seconds.
Spark also stores cached data compressed so you might need less as well.

All the numbers fit with things that spark should be able to handle, but a
5 second SLA is very tight for this amount of data.

Can you make this work with Spark?  probably. Does spark have something
built in that will make this fast and simple for you?  I doubt it you have
some very tight requirements and will likely have to write something custom
to make it work the way you want.


On Thu, Jul 11, 2019 at 4:12 PM Steven Stetzler 
wrote:

> Hi Gautham,
>
> I am a beginner spark user too and I may not have a complete understanding
> of your question, but I thought I would start a discussion anyway. Have you
> looked into using Spark's built in Correlation function? (
> https://spark.apache.org/docs/latest/ml-statistics.html) This might let
> you get what you want (per-row correlation against the same matrix) without
> having to deal with parallelizing the computation yourself. Also, I think
> the question of how quick you can get your results is largely a data access
> question vs how fast is Spark question. As long as you can exploit data
> parallelism (i.e. you can partition up your data), Spark will give you a
> speedup. You can imagine that if you had a large machine with many cores
> and ~100 GB of RAM (e.g. a m5.12xlarge EC2 instance), you could fit your
> problem in main memory and perform your computation with thread based
> parallelism. This might get your result relatively quickly. For a dedicated
> application with well constrained memory and compute requirements, it might
> not be a bad option to do everything on one machine as well. Accessing an
> external database and distributing work over a large number of computers
> can add overhead that might be out of your control.
>
> Thanks,
> Steven
>
> On Thu, Jul 11, 2019 at 9:24 AM Gautham Acharya <
> gauth...@alleninstitute.org> wrote:
>
>> Ping? I would really appreciate advice on this! Thank you!
>>
>>
>>
>> *From:* Gautham Acharya
>> *Sent:* Tuesday, July 9, 2019 4:22 PM
>> *To:* user@spark.apache.org
>> *Subject:* [Beginner] Run compute on large matrices and return the
>> result in seconds?
>>
>>
>>
>> This is my first email to this mailing list, so I apologize if I made any
>> errors.
>>
>>
>>
>> My team's going to be building an application and I'm investigating some
>> options for distributed compute systems. We want to be performing computes
>> on large matrices.
>>
>>
>>
>> The requirements are as follows:
>>
>>
>>
>> 1. The matrices can be expected to be up to 50,000 columns x 3
>> million rows. The values are all integers (except for the row/column
>> headers).
>>
>> 2. The application needs to select a specific row, and calculate the
>> correlation coefficient (
>> https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.corr.html
>>  )
>> against every other row. This means up to 3 million different calculations.
>>
>> 3. A sorted list of the correlation coefficients and their
>> corresponding row keys need to be returned in under 5 seconds.
>>
>> 4. Users will eventually request random row/column subsets to run
>> calculations on, so precomputing our coefficients is not an option. This
>> needs to be done on request.
>>
>>
>>
>> I've been looking at many compute solutions, but I'd consider Spark first
>> due to the widespread use and community. I currently have my data loaded
>> into Apache Hbase for a different scenario (random access of rows/columns).
>> I’ve naively tired loading a dataframe from the CSV using a Spark instance
>> hosted on AWS EMR, but getting the results for even a single correlation
>> takes over 20 seconds.
>>
>>
>>
>> Thank you!
>>
>>
>>
>>
>>
>> --gautham
>>
>>
>>
>


CPU:s per task

2019-07-17 Thread Magnus Nilsson
Hello all,

TLDR; Can the number of cores used by a task vary or is it always one core
per task? Is there a UI, metrics or logs I can check to see the number of
cores used by the task?

I have an ETL-pipeline where I do some transformations. In one of the
stages which ought to be quite CPU-heavy there's only a single task running
for a few minutes. I'm trying to determine if this means only one cpu core
is in use or if a single task could use many cores under the cover?

When I read data from an Event Hub the stage includes as many tasks as
there are partitions in the Event Hub up to the maximum nr of cores
available in the cluster. Clearly those tasks use one core each and are
limited in parallellism by the cluster size.

Regards,

Magnus


Re: Usage of PyArrow in Spark

2019-07-17 Thread Hyukjin Kwon
Regular Python UDFs don't use PyArrow under the hood.
Yes, they can potentially benefit but they can be easily worked around via
Pandas UDFs.

For instance, both below are virtually identical.

@udf(...)
def func(col):
return col

@pandas_udf(...)
def pandas_func(col):
return a.apply(lambda col: col)

If we only need some minimised change, I would be positive about adding
Arrow support into regular Python UDFs. Otherwise, I am not sure yet.


2019년 7월 17일 (수) 오후 1:19, Abdeali Kothari 님이 작성:

> Hi,
> In spark 2.3+ I saw that pyarrow was being used in a bunch of places in
> spark. And I was trying to understand the benefit in terms of serialization
> / deserializaiton it provides.
>
> I understand that the new pandas-udf works only if pyarrow is installed.
> But what about the plain old PythonUDF which can be used in map() kind of
> operations?
> Are they also using pyarrow under the hood to reduce the cost is serde? Or
> do they remain as earlier and no performance gain should be expected in
> those?
>
> If I'm not mistaken, plain old PythonUDFs could also benefit from Arrow as
> the data transfer cost to serialize/deserialzie from Java to Python and
> back still exists and could potentially be reduced by using Arrow?
> Is my understanding correct? Are there any plans to implement this?
>
> Pointers to any notes or Jira about this would be appreciated.
>


spark standalone mode problem about executor add and removed again and again!

2019-07-17 Thread zenglong chen
Hi,all,
My standalone mode has two slaves.When I submit my job,the
localhost slave is working well,but second slave do add and remove executor
action always!The log are below:
   2019-07-17 10:51:38,889 INFO
client.StandaloneAppClient$ClientEndpoint: Executor updated:
app-20190717105135-0008/2 is now EXITED (Command exited with code 1)
2019-07-17 10:51:38,890 INFO cluster.StandaloneSchedulerBackend: Executor
app-20190717105135-0008/2 removed: Command exited with code 1
2019-07-17 10:51:38,890 INFO storage.BlockManagerMasterEndpoint: Trying to
remove executor 2 from BlockManagerMaster.
2019-07-17 10:51:38,890 INFO storage.BlockManagerMaster: Removal of
executor 2 requested
2019-07-17 10:51:38,891 INFO
cluster.CoarseGrainedSchedulerBackend$DriverEndpoint: Asked to remove
non-existent executor 2
2019-07-17 10:51:38,892 INFO client.StandaloneAppClient$ClientEndpoint:
Executor added: app-20190717105135-0008/3 on
worker-20190717093045-172.22.9.179-40573 (172.22.9.179:40573) with 8 core(s)
2019-07-17 10:51:38,892 INFO cluster.StandaloneSchedulerBackend: Granted
executor ID app-20190717105135-0008/3 on hostPort 172.22.9.179:40573 with 8
core(s), 12.0 GB RAM
2019-07-17 10:51:38,893 INFO client.StandaloneAppClient$ClientEndpoint:
Executor updated: app-20190717105135-0008/3 is now RUNNING
2019-07-17 10:51:40,521 INFO client.StandaloneAppClient$ClientEndpoint:
Executor updated: app-20190717105135-0008/3 is now EXITED (Command exited
with code 1)
2019-07-17 10:51:40,521 INFO cluster.StandaloneSchedulerBackend: Executor
app-20190717105135-0008/3 removed: Command exited with code 1
2019-07-17 10:51:40,521 INFO client.StandaloneAppClient$ClientEndpoint:
Executor added: app-20190717105135-0008/4 on
worker-20190717093045-172.22.9.179-40573 (172.22.9.179:40573) with 8 core(s)
2019-07-17 10:51:40,521 INFO storage.BlockManagerMaster: Removal of
executor 3 requested
2019-07-17 10:51:40,521 INFO
cluster.CoarseGrainedSchedulerBackend$DriverEndpoint: Asked to remove
non-existent executor 3
2019-07-17 10:51:40,521 INFO storage.BlockManagerMasterEndpoint: Trying to
remove executor 3 from BlockManagerMaster.
2019-07-17 10:51:40,521 INFO cluster.StandaloneSchedulerBackend: Granted
executor ID app-20190717105135-0008/4 on hostPort 172.22.9.179:40573 with 8
core(s), 12.0 GB RAM
2019-07-17 10:51:40,523 INFO client.StandaloneAppClient$ClientEndpoint:
Executor updated: app-20190717105135-0008/4 is now RUNNING


And the slave output are below:
   19/07/17 10:47:12 INFO ExecutorRunner: Launch command:
"/home/ubuntu/data/jdk/jre/bin/java" "-cp"
"/home/ubuntu/spark-2.4.3/conf/:/home/ubuntu/spark-2.4.3/jars/*"
"-Xmx12288M" "-Dspark.driver.port=40335"
"org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url"
"spark://CoarseGrainedScheduler@iZk1a7vdbutmi6eluaskecZ:40335"
"--executor-id" "18" "--hostname" "172.22.9.179" "--cores" "8" "--app-id"
"app-20190717104645-0007" "--worker-url" "spark://Worker@172.22.9.179:40573"
19/07/17 10:47:13 INFO Worker: Executor app-20190717104645-0007/18 finished
with state EXITED message Command exited with code 1 exitStatus 1
19/07/17 10:47:13 INFO ExternalShuffleBlockResolver: Clean up non-shuffle
files associated with the finished executor 18
19/07/17 10:47:13 INFO ExternalShuffleBlockResolver: Executor is not
registered (appId=app-20190717104645-0007, execId=18)
19/07/17 10:47:13 INFO Worker: Asked to launch executor
app-20190717104645-0007/19 for ph_user_pre_level
19/07/17 10:47:13 INFO SecurityManager: Changing view acls to: ubuntu
19/07/17 10:47:13 INFO SecurityManager: Changing modify acls to: ubuntu
19/07/17 10:47:13 INFO SecurityManager: Changing view acls groups to:
19/07/17 10:47:13 INFO SecurityManager: Changing modify acls groups to:
19/07/17 10:47:13 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users  with view permissions: Set(ubuntu);
groups with view permissions: Set(); users  with modify permissions:
Set(ubuntu); groups with modify permissions: Set()
19/07/17 10:47:14 INFO ExecutorRunner: Launch command:
"/home/ubuntu/data/jdk/jre/bin/java" "-cp"
"/home/ubuntu/spark-2.4.3/conf/:/home/ubuntu/spark-2.4.3/jars/*"
"-Xmx12288M" "-Dspark.driver.port=40335"
"org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url"
"spark://CoarseGrainedScheduler@iZk1a7vdbutmi6eluaskecZ:40335"
"--executor-id" "19" "--hostname" "172.22.9.179" "--cores" "8" "--app-id"
"app-20190717104645-0007" "--worker-url" "spark://Worker@172.22.9.179:40573"

I guest that  may be  "Dspark.driver.port=40335" problem.
Any suggests will help me a lot!


Parse RDD[Seq[String]] to DataFrame with types.

2019-07-17 Thread Guillermo Ortiz Fernández
I'm trying to parse a RDD[Seq[String]] to Dataframe.
ALthough it's a Seq of Strings they could have a more specific type as Int,
Boolean, Double, String an so on.
For example, a line could be:
"hello", "1", "bye", "1.1"
"hello1", "11", "bye1", "2.1"
...

First column is going to be always a String, second an int and so on and
it's going to be always on this way. On the other hand, one execution could
have  seq of five elements and others the sequences could have 2000, so it
depends of the execution but in each execution I know the types of each
"column" or "elem" of the sequence.

To do it, I could have something like this:
//I could have a parameter to generate the StructType dinamically.
def getSchema(): StructType = {
  var schemaArray = scala.collection.mutable.ArrayBuffer[StructField]()
  schemaArray += StructField("col1" , IntegerType, true)
  schemaArray += StructField("col2" , StringType, true)
  schemaArray += StructField("col2" , DoubleType, true)
  StructType(schemaArray)
}

//Array of Any?? it doesn't seem the best option!!
val l1: Seq[Any] = Seq(1,"2", 1.1 )
val rdd1 = sc.parallelize(Lz).map(Row.fromSeq(_))

val schema = getSchema()
val df = sqlContext.createDataFrame(rdd1, schema)
df.show()
df.schema

I don't like at all to have a Seq of Any, but it's really what I have.
Another chance??

On the other hand I was thinking that I have something similar to a CSV, I
could create one. With spark there is a library to read an CSV and return a
dataframe where types are infered. Is it possible to call it if I have
already an RDD[String]?