Cleanup after Spark SQL job with window aggregation takes a long time

2016-08-29 Thread Jestin Ma
After a Spark SQL job appending a few columns using window aggregation
functions, and performing a join and some data massaging, I find that the
cleanup after the job finishes saving the result data to disk takes as long
if not longer than the job.

I currently am performing window aggregation on a dataset ~150 GB and
joining with another dataset of about ~50 GB.

With window aggregation, it takes about 15 minutes. Without window
aggregation and instead performing a standard groupBy(..).agg(...) and
join, it takes about 19 minutes.

However, when using window aggregation functions, for more than 15-20
minutes, the driver program is removing broadcast pieces, cleaning
accumulators, and cleaning shuffles.

Can anyone explain what these are at a lower level besides what I see on
the command line, or why this happens ONLY when I use window aggregation?
And are there any ways to remedy this?

Thank you!
Jestin


Re: Converting DataFrame's int column to Double

2016-08-25 Thread Jestin Ma
How about this:

df.withColumn("doubles", col("ints").cast("double")).drop("ints")

On Thu, Aug 25, 2016 at 2:09 PM, Marco Mistroni  wrote:

> hi all
>  i might be stuck in old code, but this is what i am doing to convert  a
> DF int column to Double
>
> val intToDoubleFunc:(Int => Double) = lbl => lbl.toDouble
> val labelToDblFunc = udf(intToDoubleFunc)
>
>
> val convertedDF = df.withColumn("SurvivedDbl",
> labelToDblFunc(col("Survived")))
>
> is there a  better way to do that?
>
> kr
>
>


Caching broadcasted DataFrames?

2016-08-25 Thread Jestin Ma
I have a DataFrame d1 that I would like to join with two separate
DataFrames.
Since d1 is small enough, I broadcast it.

What I understand about cache vs broadcast is that cache leads to each
executor storing the partitions its assigned in memory (cluster-wide
in-memory). Broadcast leads to each node (with multiple executors) storing
a copy of the dataset (all partitions) inside its own memory.

Since the dataset for d1 is used in two separate joins, should I also
persist it to prevent reading it from disk again? Or would broadcasting the
data already take care of that?


Thank you,
Jestin


Re:

2016-08-14 Thread Jestin Ma
Hi Michael, Mich, and Jacek, thank you for providing good suggestions. I
found some ways of getting rid of skew, such as the approaches you have
suggested (filtering, broadcasting, joining, unioning), as well as salting
my 0-value IDs.

Thank you for the help!


On Sun, Aug 14, 2016 at 11:33 AM, Michael Armbrust <mich...@databricks.com>
wrote:

> You can force a broadcast, but with tables that large its probably not a
> good idea.  However, filtering and then broadcasting one of the joins is
> likely to get you the benefits of broadcasting (no shuffle on the larger
> table that will colocate all the skewed tuples to a single overloaded
> executor) without attempting to broadcast something thats too large.
>
> On Sun, Aug 14, 2016 at 11:02 AM, Jacek Laskowski <ja...@japila.pl> wrote:
>
>> Hi Michael,
>>
>> As I understand broadcast joins, Jestin could also use broadcast
>> function on a dataset to make it broadcast. Jestin could force the
>> brodcast without the trick hoping it's gonna kick off brodcast.
>> Correct?
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://medium.com/@jaceklaskowski/
>> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>>
>>
>> On Sun, Aug 14, 2016 at 9:51 AM, Michael Armbrust
>> <mich...@databricks.com> wrote:
>> > Have you tried doing the join in two parts (id == 0 and id != 0) and
>> then
>> > doing a union of the results?  It is possible that with this technique,
>> that
>> > the join which only contains skewed data would be filtered enough to
>> allow
>> > broadcasting of one side.
>> >
>> > On Sat, Aug 13, 2016 at 11:15 PM, Jestin Ma <jestinwith.a...@gmail.com>
>> > wrote:
>> >>
>> >> Hi, I'm currently trying to perform an outer join between two
>> >> DataFrames/Sets, one is ~150GB, one is about ~50 GB on a column, id.
>> >>
>> >> df1.id is skewed in that there are many 0's, the rest being unique
>> IDs.
>> >>
>> >> df2.id is not skewed. If I filter df1.id != 0, then the join works
>> well.
>> >> If I don't, then the join does not complete for a very, very long time.
>> >>
>> >> I have diagnosed this problem due to the hashpartitioning on IDs,
>> >> resulting in one partition containing many values due to data skew. One
>> >> executor ends up reading most of the shuffle data, and writing all of
>> the
>> >> shuffle data, as shown below.
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> Shown above is the task in question assigned to one executor.
>> >>
>> >>
>> >>
>> >> This screenshot comes from one of the executors, showing one single
>> thread
>> >> spilling sort data since the executor cannot hold 90%+ of the ~200 GB
>> result
>> >> in memory.
>> >>
>> >> Moreover, looking at the event timeline, I find that the executor on
>> that
>> >> task spends about 20% time reading shuffle data, 70% computation, and
>> 10%
>> >> writing output data.
>> >>
>> >> I have tried the following:
>> >>
>> >> "Salting" the 0-value keys by monotonically_increasing_id().mod(N)
>> >> - This doesn't seem to have an effect since now I have
>> hundreds/thousands
>> >> of keys with tens of thousands of occurrences.
>> >> - Should I increase N? Is there a way to just do random.mod(N) instead
>> of
>> >> monotonically_increasing_id()?
>> >>
>> >> Repartitioning according to column I know contains unique values
>> >>
>> >> - This is overridden by Spark's sort-based shuffle manager which hash
>> >> repartitions on the skewed column
>> >>
>> >> - Is it possible to change this? Or will the join column need to be
>> hashed
>> >> and partitioned on for joins to work
>> >>
>> >> Broadcasting does not work for my large tables
>> >>
>> >> Increasing/decreasing spark.sql.shuffle.partitions does not remedy the
>> >> skewed data problem as 0-product values are still being hashed to the
>> same
>> >> partition.
>> >>
>> >>
>> >> --
>> >>
>> >> What I am considering currently is doing the join at the RDD level,
>> but is
>> >> there any level of control which can solve my skewed data problem?
>> Other
>> >> than that, see the bolded question.
>> >>
>> >> I would appreciate any suggestions/tips/experience with this. Thank
>> you!
>> >>
>> >
>>
>
>


Re:

2016-08-14 Thread Jestin Ma
Hi Mich, do you mean using the skewed column as a join condition? I tried
repartition(skewed column, unique column) but had no success, possibly
because the join was still hash-partitioning on just the skewed column
after I called repartition.

On Sun, Aug 14, 2016 at 1:49 AM, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Can you make the join more selective by using the skewed column ID  +
> another column that has valid unique vales( Repartitioning according to
> column I know contains unique values)?
>
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 14 August 2016 at 07:17, Jestin Ma <jestinwith.a...@gmail.com> wrote:
>
>> Attached are screenshots mentioned, apologies for that.
>>
>> On Sat, Aug 13, 2016 at 11:15 PM, Jestin Ma <jestinwith.a...@gmail.com>
>> wrote:
>>
>>> Hi, I'm currently trying to perform an outer join between two
>>> DataFrames/Sets, one is ~150GB, one is about ~50 GB on a column, id.
>>>
>>> df1.id is skewed in that there are many 0's, the rest being unique IDs.
>>>
>>> df2.id is not skewed. If I filter df1.id != 0, then the join works
>>> well. If I don't, then the join does not complete for a very, very long
>>> time.
>>>
>>> I have diagnosed this problem due to the hashpartitioning on IDs,
>>> resulting in one partition containing many values due to data skew. One
>>> executor ends up reading most of the shuffle data, and writing all of the
>>> shuffle data, as shown below.
>>>
>>>
>>>
>>>
>>>
>>> Shown above is the task in question assigned to one executor.
>>>
>>>
>>>
>>> This screenshot comes from one of the executors, showing one single
>>> thread spilling sort data since the executor cannot hold 90%+ of the ~200
>>> GB result in memory.
>>>
>>> Moreover, looking at the event timeline, I find that the executor on
>>> that task spends about 20% time reading shuffle data, 70% computation, and
>>> 10% writing output data.
>>>
>>> I have tried the following:
>>>
>>>
>>>- "Salting" the 0-value keys by monotonically_increasing_id().mod(N)
>>>- - This doesn't seem to have an effect since now I have
>>>hundreds/thousands of keys with tens of thousands of occurrences.
>>>- - Should I increase N? Is there a way to just do random.mod(N)
>>>instead of monotonically_increasing_id()?
>>>-
>>>- Repartitioning according to column I know contains unique values
>>>-
>>>- - This is overridden by Spark's sort-based shuffle manager which
>>>hash repartitions on the skewed column
>>>-
>>>- - Is it possible to change this? Or will the join column need to
>>>be hashed and partitioned on for joins to work
>>>-
>>>- Broadcasting does not work for my large tables
>>>-
>>>- Increasing/decreasing spark.sql.shuffle.partitions does not remedy
>>>the skewed data problem as 0-product values are still being hashed to the
>>>same partition.
>>>
>>>
>>> --
>>>
>>> What I am considering currently is doing the join at the RDD level, but
>>> is there any level of control which can solve my skewed data problem? Other
>>> than that, see the bolded question.
>>>
>>> I would appreciate any suggestions/tips/experience with this. Thank you!
>>>
>>>
>>
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>
>


[no subject]

2016-08-14 Thread Jestin Ma
Hi, I'm currently trying to perform an outer join between two
DataFrames/Sets, one is ~150GB, one is about ~50 GB on a column, id.

df1.id is skewed in that there are many 0's, the rest being unique IDs.

df2.id is not skewed. If I filter df1.id != 0, then the join works well. If
I don't, then the join does not complete for a very, very long time.

I have diagnosed this problem due to the hashpartitioning on IDs, resulting
in one partition containing many values due to data skew. One executor ends
up reading most of the shuffle data, and writing all of the shuffle data,
as shown below.





Shown above is the task in question assigned to one executor.



This screenshot comes from one of the executors, showing one single thread
spilling sort data since the executor cannot hold 90%+ of the ~200 GB
result in memory.

Moreover, looking at the event timeline, I find that the executor on that
task spends about 20% time reading shuffle data, 70% computation, and 10%
writing output data.

I have tried the following:


   - "Salting" the 0-value keys by monotonically_increasing_id().mod(N)
   - - This doesn't seem to have an effect since now I have
   hundreds/thousands of keys with tens of thousands of occurrences.
   - - Should I increase N? Is there a way to just do random.mod(N) instead
   of monotonically_increasing_id()?
   -
   - Repartitioning according to column I know contains unique values
   -
   - - This is overridden by Spark's sort-based shuffle manager which hash
   repartitions on the skewed column
   -
   - - Is it possible to change this? Or will the join column need to be
   hashed and partitioned on for joins to work
   -
   - Broadcasting does not work for my large tables
   -
   - Increasing/decreasing spark.sql.shuffle.partitions does not remedy the
   skewed data problem as 0-product values are still being hashed to the same
   partition.


--

What I am considering currently is doing the join at the RDD level, but is
there any level of control which can solve my skewed data problem? Other
than that, see the bolded question.

I would appreciate any suggestions/tips/experience with this. Thank you!


DataFramesWriter saving DataFrames timestamp in weird format

2016-08-11 Thread Jestin Ma
When I load in a timestamp column and try to save it immediately without
any transformations, the output time is unix time with padded 0's until
there are 16 values.

For example,
loading in a time of August 3, 2016, 00:36:25 GMT, which is 1470184585 in
UNIX time, saves as 147018458500.

When I do df.show(), it shows the date format that I pass in (custom
format), but it saves as I mentioned.
I tried loading the saved file as a timestamp and it expectedly throws an
exception, not being able to recognize an invalid time.

Are there any explanations / workarounds for this?

Thank you,
Jestin


Changing Spark configuration midway through application.

2016-08-10 Thread Jestin Ma
If I run an application, for example with 3 joins:

[join 1]
[join 2]
[join 3]

[final join and save to disk]

Could I change Spark properties in between each join?

[join 1]
[change properties]
[join 2]
[change properties]
...

Or would I have to create a separate application with different properties
for each of the three joins and also save each intermediate join result to
disk?

Jestin


Spark 2.0.1 / 2.1.0 on Maven

2016-08-09 Thread Jestin Ma
If we want to use versions of Spark beyond the official 2.0.0 release,
specifically on Maven + Java, what steps should we take to upgrade? I can't
find the newer versions on Maven central.

Thank you!
Jestin


Using Kyro for DataFrames (Dataset)?

2016-08-07 Thread Jestin Ma
When using DataFrames (Dataset), there's no option for an Encoder.
Does that mean DataFrames (since it builds on top of an RDD) uses Java
serialization? Does using Kyro make sense as an optimization here?

If not, what's the difference between Java/Kyro serialization, Tungsten,
and Encoders?

Thank you!


Re: Tuning level of Parallelism: Increase or decrease?

2016-08-01 Thread Jestin Ma
Hi Nikolay, I'm looking at data locality improvements for Spark, and I have
conflicting sources on using YARN for Spark.

Reynold said that Spark workers automatically take care of data locality
here:
https://www.quora.com/Does-Apache-Spark-take-care-of-data-locality-when-Spark-workers-load-data-from-HDFS

However, I've read elsewhere (
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/yarn/)
that Spark on YARN increases data locality because YARN tries to place
tasks next to HDFS blocks.

Can anyone verify/support one side or the other?

Thank you,
Jestin

On Mon, Aug 1, 2016 at 1:15 AM, Nikolay Zhebet <phpap...@gmail.com> wrote:

> Hi.
> Maybe you can help "data locality"..
> If you use groupBY and joins, than most likely you will see alot of
> network operations. This can be werry slow. You can try prepare, transform
> your information in that way, what can minimize transporting temporary
> information between worker-nodes.
>
> Try google in this way "Data locality in Hadoop"
>
>
> 2016-08-01 4:41 GMT+03:00 Jestin Ma <jestinwith.a...@gmail.com>:
>
>> It seems that the number of tasks being this large do not matter. Each
>> task was set default by the HDFS as 128 MB (block size) which I've heard to
>> be ok. I've tried tuning the block (task) size to be larger and smaller to
>> no avail.
>>
>> I tried coalescing to 50 but that introduced large data skew and slowed
>> down my job a lot.
>>
>> On Sun, Jul 31, 2016 at 5:27 PM, Andrew Ehrlich <and...@aehrlich.com>
>> wrote:
>>
>>> 15000 seems like a lot of tasks for that size. Test it out with a
>>> .coalesce(50) placed right after loading the data. It will probably either
>>> run faster or crash with out of memory errors.
>>>
>>> On Jul 29, 2016, at 9:02 AM, Jestin Ma <jestinwith.a...@gmail.com>
>>> wrote:
>>>
>>> I am processing ~2 TB of hdfs data using DataFrames. The size of a task
>>> is equal to the block size specified by hdfs, which happens to be 128 MB,
>>> leading to about 15000 tasks.
>>>
>>> I'm using 5 worker nodes with 16 cores each and ~25 GB RAM.
>>> I'm performing groupBy, count, and an outer-join with another DataFrame
>>> of ~200 MB size (~80 MB cached but I don't need to cache it), then saving
>>> to disk.
>>>
>>> Right now it takes about 55 minutes, and I've been trying to tune it.
>>>
>>> I read on the Spark Tuning guide that:
>>> *In general, we recommend 2-3 tasks per CPU core in your cluster.*
>>>
>>> This means that I should have about 30-50 tasks instead of 15000, and
>>> each task would be much bigger in size. Is my understanding correct, and is
>>> this suggested? I've read from difference sources to decrease or increase
>>> parallelism, or even keep it default.
>>>
>>> Thank you for your help,
>>> Jestin
>>>
>>>
>>>
>>
>


Re: Tuning level of Parallelism: Increase or decrease?

2016-07-31 Thread Jestin Ma
It seems that the number of tasks being this large do not matter. Each task
was set default by the HDFS as 128 MB (block size) which I've heard to be
ok. I've tried tuning the block (task) size to be larger and smaller to no
avail.

I tried coalescing to 50 but that introduced large data skew and slowed
down my job a lot.

On Sun, Jul 31, 2016 at 5:27 PM, Andrew Ehrlich <and...@aehrlich.com> wrote:

> 15000 seems like a lot of tasks for that size. Test it out with a
> .coalesce(50) placed right after loading the data. It will probably either
> run faster or crash with out of memory errors.
>
> On Jul 29, 2016, at 9:02 AM, Jestin Ma <jestinwith.a...@gmail.com> wrote:
>
> I am processing ~2 TB of hdfs data using DataFrames. The size of a task is
> equal to the block size specified by hdfs, which happens to be 128 MB,
> leading to about 15000 tasks.
>
> I'm using 5 worker nodes with 16 cores each and ~25 GB RAM.
> I'm performing groupBy, count, and an outer-join with another DataFrame of
> ~200 MB size (~80 MB cached but I don't need to cache it), then saving to
> disk.
>
> Right now it takes about 55 minutes, and I've been trying to tune it.
>
> I read on the Spark Tuning guide that:
> *In general, we recommend 2-3 tasks per CPU core in your cluster.*
>
> This means that I should have about 30-50 tasks instead of 15000, and each
> task would be much bigger in size. Is my understanding correct, and is this
> suggested? I've read from difference sources to decrease or increase
> parallelism, or even keep it default.
>
> Thank you for your help,
> Jestin
>
>
>


Tuning level of Parallelism: Increase or decrease?

2016-07-29 Thread Jestin Ma
I am processing ~2 TB of hdfs data using DataFrames. The size of a task is
equal to the block size specified by hdfs, which happens to be 128 MB,
leading to about 15000 tasks.

I'm using 5 worker nodes with 16 cores each and ~25 GB RAM.
I'm performing groupBy, count, and an outer-join with another DataFrame of
~200 MB size (~80 MB cached but I don't need to cache it), then saving to
disk.

Right now it takes about 55 minutes, and I've been trying to tune it.

I read on the Spark Tuning guide that:
*In general, we recommend 2-3 tasks per CPU core in your cluster.*

This means that I should have about 30-50 tasks instead of 15000, and each
task would be much bigger in size. Is my understanding correct, and is this
suggested? I've read from difference sources to decrease or increase
parallelism, or even keep it default.

Thank you for your help,
Jestin


Spark Standalone Cluster: Having a master and worker on the same node

2016-07-27 Thread Jestin Ma
Hi, I'm doing performance testing and currently have 1 master node and 4
worker nodes and am submitting in client mode from a 6th cluster node.

I know we can have a master and worker on the same node. Speaking in terms
of performance and practicality, is it possible/suggested to have another
working running on either the 6th node or the master node?

Thank you!


Spark 2.0 SparkSession, SparkConf, SparkContext

2016-07-27 Thread Jestin Ma
I know that Sparksession is replacing the SQL and HiveContexts, but what
about SparkConf and SparkContext? Are those still relevant in our programs?

Thank you!
Jestin


Re: Spark Web UI port 4040 not working

2016-07-26 Thread Jestin Ma
I did netstat -apn | grep 4040 on machine 6, and I see

tcp0  0 :::4040 :::*
 LISTEN  30597/java

What does this mean?

On Tue, Jul 26, 2016 at 6:47 AM, Jestin Ma <jestinwith.a...@gmail.com>
wrote:

> I do not deploy using cluster mode and I don't use EC2.
>
> I just read that launching as client mode: "the driver is launched
> directly within the spark-submit process which acts as a *client* to the
> cluster."
>
> My current setup is that I have cluster machines 1, 2, 3, 4, 5, with 1
> being the master.
> I submit from another cluster machine 6 in client mode. So I'm taking that
> the driver is launched in my machine 6.
>
> On Tue, Jul 26, 2016 at 6:38 AM, Jacek Laskowski <ja...@japila.pl> wrote:
>
>> Hi,
>>
>> Do you perhaps deploy using cluster mode? Is this EC2? You'd need to
>> figure out where the driver runs and use the machine's IP.
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://medium.com/@jaceklaskowski/
>> Mastering Apache Spark http://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>>
>>
>> On Tue, Jul 26, 2016 at 3:36 PM, Jestin Ma <jestinwith.a...@gmail.com>
>> wrote:
>> > I tried doing that on my master node.
>> > I got nothing.
>> > However, I grep'd port 8080 and I got the standalone UI.
>> >
>> > On Tue, Jul 26, 2016 at 12:39 AM, Chanh Le <giaosu...@gmail.com> wrote:
>> >>
>> >> You’re running in StandAlone Mode?
>> >> Usually inside active task it will show the address of current job.
>> >> or you can check in master node by using netstat -apn | grep 4040
>> >>
>> >>
>> >>
>> >> > On Jul 26, 2016, at 8:21 AM, Jestin Ma <jestinwith.a...@gmail.com>
>> >> > wrote:
>> >> >
>> >> > Hello, when running spark jobs, I can access the master UI (port 8080
>> >> > one) no problem. However, I'm confused as to how to access the web
>> UI to see
>> >> > jobs/tasks/stages/etc.
>> >> >
>> >> > I can access the master UI at http://:8080. But port
>> 4040
>> >> > gives me a -connection cannot be reached-.
>> >> >
>> >> > Is the web UI http:// with a port of 4040?
>> >> >
>> >> > I'm running my Spark job on a cluster machine and submitting it to a
>> >> > master node part of the cluster. I heard of ssh tunneling; is that
>> relevant
>> >> > here?
>> >> >
>> >> > Thank you!
>> >>
>> >
>>
>
>


Re: Spark Web UI port 4040 not working

2016-07-26 Thread Jestin Ma
I do not deploy using cluster mode and I don't use EC2.

I just read that launching as client mode: "the driver is launched directly
within the spark-submit process which acts as a *client* to the cluster."

My current setup is that I have cluster machines 1, 2, 3, 4, 5, with 1
being the master.
I submit from another cluster machine 6 in client mode. So I'm taking that
the driver is launched in my machine 6.

On Tue, Jul 26, 2016 at 6:38 AM, Jacek Laskowski <ja...@japila.pl> wrote:

> Hi,
>
> Do you perhaps deploy using cluster mode? Is this EC2? You'd need to
> figure out where the driver runs and use the machine's IP.
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Tue, Jul 26, 2016 at 3:36 PM, Jestin Ma <jestinwith.a...@gmail.com>
> wrote:
> > I tried doing that on my master node.
> > I got nothing.
> > However, I grep'd port 8080 and I got the standalone UI.
> >
> > On Tue, Jul 26, 2016 at 12:39 AM, Chanh Le <giaosu...@gmail.com> wrote:
> >>
> >> You’re running in StandAlone Mode?
> >> Usually inside active task it will show the address of current job.
> >> or you can check in master node by using netstat -apn | grep 4040
> >>
> >>
> >>
> >> > On Jul 26, 2016, at 8:21 AM, Jestin Ma <jestinwith.a...@gmail.com>
> >> > wrote:
> >> >
> >> > Hello, when running spark jobs, I can access the master UI (port 8080
> >> > one) no problem. However, I'm confused as to how to access the web UI
> to see
> >> > jobs/tasks/stages/etc.
> >> >
> >> > I can access the master UI at http://:8080. But port
> 4040
> >> > gives me a -connection cannot be reached-.
> >> >
> >> > Is the web UI http:// with a port of 4040?
> >> >
> >> > I'm running my Spark job on a cluster machine and submitting it to a
> >> > master node part of the cluster. I heard of ssh tunneling; is that
> relevant
> >> > here?
> >> >
> >> > Thank you!
> >>
> >
>


Re: Spark Web UI port 4040 not working

2016-07-26 Thread Jestin Ma
I tried doing that on my master node.
I got nothing.
However, I grep'd port 8080 and I got the standalone UI.

On Tue, Jul 26, 2016 at 12:39 AM, Chanh Le <giaosu...@gmail.com> wrote:

> You’re running in StandAlone Mode?
> Usually inside active task it will show the address of current job.
> or you can check in master node by using netstat -apn | grep 4040
>
>
>
> > On Jul 26, 2016, at 8:21 AM, Jestin Ma <jestinwith.a...@gmail.com>
> wrote:
> >
> > Hello, when running spark jobs, I can access the master UI (port 8080
> one) no problem. However, I'm confused as to how to access the web UI to
> see jobs/tasks/stages/etc.
> >
> > I can access the master UI at http://:8080. But port 4040
> gives me a -connection cannot be reached-.
> >
> > Is the web UI http:// with a port of 4040?
> >
> > I'm running my Spark job on a cluster machine and submitting it to a
> master node part of the cluster. I heard of ssh tunneling; is that relevant
> here?
> >
> > Thank you!
>
>


Spark Web UI port 4040 not working

2016-07-25 Thread Jestin Ma
Hello, when running spark jobs, I can access the master UI (port 8080 one)
no problem. However, I'm confused as to how to access the web UI to see
jobs/tasks/stages/etc.

I can access the master UI at http://:8080. But port 4040
gives me a -connection cannot be reached-.

Is the web UI http:// with a port of 4040?

I'm running my Spark job on a cluster machine and submitting it to a master
node part of the cluster. I heard of ssh tunneling; is that relevant here?

Thank you!


Choosing RDD/DataFrame/DataSet and Cluster Tuning

2016-07-23 Thread Jestin Ma
Hello,
Right now I'm using DataFrames to perform a df1.groupBy(key).count() on one
DataFrame and join with another, df2.

The first, df1, is very large (many gigabytes) compared to df2 (250 Mb).

Right now I'm running this on a cluster of 5 nodes, 16 cores each, 90 GB
RAM each.
It is taking me about 1 hour and 40 minutes to perform the groupBy, count,
and join, which seems very slow to me.

Currently I have set the following in my *spark-defaults.conf*:

spark.executor.instances 24
spark.executor.memory   10g
spark.executor.cores3
spark.driver.memory 5g
spark.sql.autoBroadcastJoinThreshold200Mb


I have a couple of questions regarding tuning for performance as a beginner.

   1. Right now I'm running Spark 1.6.0. Would moving to Spark 2.0 DataSet
   (or even DataFrames) be better?
   2. What if I used RDDs instead? I know that reduceByKey is better than
   groupByKey, and DataFrames don't have that method.
   3. I think I can do a broadcast join and have set a threshold. Do I need
   to set it above my second DataFrame size? Do I need to explicitly call
   broadcast(df2)?
   4. What's the point of driver memory?
   5. Can anyone point out something wrong with my tuning numbers, or any
   additional parameters worth checking out?


Thank you a lot!
Sincerely,
Jestin


Can Spark Dataframes preserve order when joining?

2016-06-29 Thread Jestin Ma
If it’s not too much trouble, could I get some pointers/help on this? (see link)
http://stackoverflow.com/questions/38085801/can-dataframe-joins-in-spark-preserve-order
 


-also, as a side question, do Dataframes support easy reordering of columns?

Thank you!
Jestin