Re: spark-shell gets stuck in ACCEPTED state forever when ran in YARN client mode.

2018-07-08 Thread yohann jardin
When you run on Yarn, you don’t even need to start a spark cluster (spark 
master and slaves). Yarn receives a job and then allocate resources for the 
application master and then its workers.

Check the resources available in the node section of the resource manager UI 
(and is your node actually detected as alive?), as well as the scheduler 
section to check the default queue resources.
If you seem to lack resources for your driver, you can try to reduce the driver 
memory by specifying “--driver-memory 512” for example, but I’d expect the 
default of 1g to be low enough based on what you showed us.

Yohann Jardin

Le 7/8/2018 à 6:11 PM, kant kodali a écrit :
@yohann sorry I am assuming you meant application master if so I believe spark 
is the one that provides application master. Is there anyway to look for how 
much resources are being requested and how much yarn is allowed to provide? I 
would assume this is a common case if so I am not sure why these numbers are 
not part of resource manager logs?

On Sun, Jul 8, 2018 at 8:09 AM, kant kodali 
mailto:kanth...@gmail.com>> wrote:
yarn.scheduler.capacity.maximum-am-resource-percent by default is set to 0.1 
and I tried changing it to 1.0 and still no luck. same problem persists. The 
master here is yarn and I just trying to spawn spark-shell --master yarn 
--deploy-mode client and run a simple world count so I am not sure why it would 
request for more resources?

On Sun, Jul 8, 2018 at 8:02 AM, yohann jardin 
mailto:yohannjar...@hotmail.com>> wrote:

Following the logs from the resource manager:

2018-07-08 07:23:23,382 WARN 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue: 
maximum-am-resource-percent is insufficient to start a single application in 
queue, it is likely set too low. skipping enforcement to allow at least one 
application to start

2018-07-08 07:23:23,382 WARN 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue: 
maximum-am-resource-percent is insufficient to start a single application in 
queue for user, it is likely set too low. skipping enforcement to allow at 
least one application to start

I’d say it has nothing to do with spark. Your master is just asking more 
resources than the default Yarn queue is allowed to provide.
You might take a look at 
https://hadoop.apache.org/docs/r2.7.3/hadoop-yarn/hadoop-yarn-site/CapacityScheduler.html
 and search for maximum-am-resource-percent.

Regards,

Yohann Jardin

Le 7/8/2018 à 4:40 PM, kant kodali a écrit :
Hi,

It's on local mac book pro machine that has 16GB RAM 512GB disk and 8 vCpu! I 
am not running any code since I can't even spawn spark-shell with yarn as 
master as described in my previous email. I just want to run simple word count 
using yarn as master.

Thanks!

Below is the resource manager log once again if that helps


2018-07-08 07:23:23,343 INFO 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue: 
Application added - appId: application_1531059242261_0001 user: xxx leaf-queue 
of parent: root #applications: 1

2018-07-08 07:23:23,344 INFO 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler:
 Accepted application application_1531059242261_0001 from user: xxx, in queue: 
default

2018-07-08 07:23:23,350 INFO 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl: 
application_1531059242261_0001 State change from SUBMITTED to ACCEPTED on 
event=APP_ACCEPTED

2018-07-08 07:23:23,370 INFO 
org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService: 
Registering app attempt : appattempt_1531059242261_0001_01

2018-07-08 07:23:23,370 INFO 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl: 
appattempt_1531059242261_0001_01 State change from NEW to SUBMITTED

2018-07-08 07:23:23,382 WARN 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue: 
maximum-am-resource-percent is insufficient to start a single application in 
queue, it is likely set too low. skipping enforcement to allow at least one 
application to start

2018-07-08 07:23:23,382 WARN 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue: 
maximum-am-resource-percent is insufficient to start a single application in 
queue for user, it is likely set too low. skipping enforcement to allow at 
least one application to start

2018-07-08 07:23:23,382 INFO 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue: 
Application application_1531059242261_0001 from user: xxx activated in queue: 
default

2018-07-08 07:23:23,382 INFO 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue: 
Application added - appId: application_1531059242261_0001 user: 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue$User@476750cd,
 leaf-queue: default #user-pending-applications: 0 #user-active-applications: 1 
#queue-pending-applications: 0 #queue-active

Re: spark-shell gets stuck in ACCEPTED state forever when ran in YARN client mode.

2018-07-08 Thread yohann jardin
Following the logs from the resource manager:

2018-07-08 07:23:23,382 WARN 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue: 
maximum-am-resource-percent is insufficient to start a single application in 
queue, it is likely set too low. skipping enforcement to allow at least one 
application to start

2018-07-08 07:23:23,382 WARN 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue: 
maximum-am-resource-percent is insufficient to start a single application in 
queue for user, it is likely set too low. skipping enforcement to allow at 
least one application to start

I’d say it has nothing to do with spark. Your master is just asking more 
resources than the default Yarn queue is allowed to provide.
You might take a look at 
https://hadoop.apache.org/docs/r2.7.3/hadoop-yarn/hadoop-yarn-site/CapacityScheduler.html
 and search for maximum-am-resource-percent.

Regards,

Yohann Jardin

Le 7/8/2018 à 4:40 PM, kant kodali a écrit :
Hi,

It's on local mac book pro machine that has 16GB RAM 512GB disk and 8 vCpu! I 
am not running any code since I can't even spawn spark-shell with yarn as 
master as described in my previous email. I just want to run simple word count 
using yarn as master.

Thanks!

Below is the resource manager log once again if that helps


2018-07-08 07:23:23,343 INFO 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue: 
Application added - appId: application_1531059242261_0001 user: xxx leaf-queue 
of parent: root #applications: 1

2018-07-08 07:23:23,344 INFO 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler:
 Accepted application application_1531059242261_0001 from user: xxx, in queue: 
default

2018-07-08 07:23:23,350 INFO 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl: 
application_1531059242261_0001 State change from SUBMITTED to ACCEPTED on 
event=APP_ACCEPTED

2018-07-08 07:23:23,370 INFO 
org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService: 
Registering app attempt : appattempt_1531059242261_0001_01

2018-07-08 07:23:23,370 INFO 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl: 
appattempt_1531059242261_0001_01 State change from NEW to SUBMITTED

2018-07-08 07:23:23,382 WARN 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue: 
maximum-am-resource-percent is insufficient to start a single application in 
queue, it is likely set too low. skipping enforcement to allow at least one 
application to start

2018-07-08 07:23:23,382 WARN 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue: 
maximum-am-resource-percent is insufficient to start a single application in 
queue for user, it is likely set too low. skipping enforcement to allow at 
least one application to start

2018-07-08 07:23:23,382 INFO 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue: 
Application application_1531059242261_0001 from user: xxx activated in queue: 
default

2018-07-08 07:23:23,382 INFO 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue: 
Application added - appId: application_1531059242261_0001 user: 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue$User@476750cd,
 leaf-queue: default #user-pending-applications: 0 #user-active-applications: 1 
#queue-pending-applications: 0 #queue-active-applications: 1

2018-07-08 07:23:23,382 INFO 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler:
 Added Application Attempt appattempt_1531059242261_0001_01 to scheduler 
from user xxx in queue default

2018-07-08 07:23:23,386 INFO 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl: 
appattempt_1531059242261_0001_01 State change from SUBMITTED to SCHEDULED





Re: Spark LOCAL mode and external jar (extraClassPath)

2018-04-13 Thread yohann jardin
Hey Jason,

Might be related to what is behind your variable ALLUXIO_SPARK_CLIENT and where 
is located the lib (is it on HDFS, on the node that submits the job, or locally 
to all spark workers?)
There is a great post on SO about it: https://stackoverflow.com/a/37348234

We might as well check that you provide correctly the jar based on its 
location. I have found it tricky in some cases.
As a debug try, if the jar is not on HDFS, you can copy it there and then 
specify the full path in the extraclasspath property.

Regards,

Yohann Jardin

Le 4/13/2018 à 5:38 PM, Jason Boorn a écrit :
I do, and this is what I will fall back to if nobody has a better idea :)

I was just hoping to get this working as it is much more convenient for my 
testing pipeline.

Thanks again for the help

On Apr 13, 2018, at 11:33 AM, Geoff Von Allmen 
mailto:ge...@ibleducation.com>> wrote:

Ok - `LOCAL` makes sense now.

Do you have the option to still use `spark-submit` in this scenario, but using 
the following options:

```bash
--master "local[*]" \
--deploy-mode "client" \
...
```

I know in the past, I have setup some options using `.config("Option", 
"value")` when creating the spark session, and then other runtime options as 
you describe above with `spark.conf.set`. At this point though I've just moved 
everything out into a `spark-submit` script.

On Fri, Apr 13, 2018 at 8:18 AM, Jason Boorn 
mailto:jbo...@gmail.com>> wrote:
Hi Geoff -

Appreciate the help here - I do understand what you’re saying below.  And I am 
able to get this working when I submit a job to a local cluster.

I think part of the issue here is that there’s ambiguity in the terminology.  
When I say “LOCAL” spark, I mean an instance of spark that is created by my 
driver program, and is not a cluster itself.  It means that my master node is 
“local”, and this mode is primarily used for testing.

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-local.html

While I am able to get alluxio working with spark-submit, I am unable to get it 
working when using local mode.  The mechanisms for setting class paths during 
spark-submit are not available in local mode.  My understanding is that all one 
is able to use is:

spark.conf.set(“”)

To set any runtime properties of the local instance.  Note that it is possible 
(and I am more convinced of this as time goes on) that alluxio simply does not 
work in spark local mode as described above.


On Apr 13, 2018, at 11:09 AM, Geoff Von Allmen 
mailto:ge...@ibleducation.com>> wrote:


I fought with a ClassNotFoundException for quite some time, but it was for 
kafka.

The final configuration that got everything working was running spark-submit 
with the following options:

--jars "/path/to/.ivy2/jars/package.jar" \
--driver-class-path "/path/to/.ivy2/jars/package.jar" \
--conf "spark.executor.extraClassPath=/path/to/.ivy2/package.jar" \
--packages org.some.package:package_name:version


While this was needed for me to run in cluster mode, it works equally well for 
client mode as well.

One other note when needing to supplied multiple items to these args - --jars 
and --packages should be comma separated, --driver-class-path and 
extraClassPath should be : separated

HTH

​

On Fri, Apr 13, 2018 at 4:28 AM, jb44 
mailto:jbo...@gmail.com>> wrote:
Haoyuan -

As I mentioned below, I've been through the documentation already.  It has
not helped me to resolve the issue.

Here is what I have tried so far:

- setting extraClassPath as explained below
- adding fs.alluxio.impl through sparkconf
- adding spark.sql.hive.metastore.sharedPrefixes (though I don't believe
this matters in my case)
- compiling the client from source

Do you have any other suggestions on how to get this working?

Thanks



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>








Re: learning Spark

2017-12-04 Thread yohann jardin
Plenty of documentation is available on Spark website itself: 
http://spark.apache.org/docs/latest/#where-to-go-from-here

You’ll find deployment guides, tuning, etc.

Yohann Jardin

Le 05-Dec-17 à 1:38 AM, Somasundaram Sekar a écrit :
Learning Spark - ORielly publication as a starter and official doc

On 4 Dec 2017 9:19 am, "Manuel Sopena Ballesteros" 
mailto:manuel...@garvan.org.au>> wrote:
Dear Spark community,

Is there any resource (books, online course, etc.) available that you know of 
to learn about spark? I am interested in the sys admin side of it? like the 
different parts inside spark, how spark works internally, best ways to 
install/deploy/monitor and how to get best performance possible.

Any suggestion?

Thank you very much

Manuel Sopena Ballesteros | Systems Engineer
Garvan Institute of Medical Research
The Kinghorn Cancer Centre, 370 Victoria Street, Darlinghurst, NSW 
2010<https://maps.google.com/?q=370+Victoria+Street,+Darlinghurst,+NSW+2010&entry=gmail&source=g>
T: + 61 (0)2 9355 5760 | F: +61 (0)2 9295 8507 | E: 
manuel...@garvan.org.au<mailto:manuel...@garvan.org.au>

NOTICE
Please consider the environment before printing this email. This message and 
any attachments are intended for the addressee named and may contain legally 
privileged/confidential/copyright information. If you are not the intended 
recipient, you should not read, use, disclose, copy or distribute this 
communication. If you have received this message in error please notify us at 
once by return email and then delete both messages. We accept no liability for 
the distribution of viruses or similar in electronic communications. This 
notice should not be removed.

Disclaimer: This e-mail is intended to be delivered only to the named 
addressee(s). If this information is received by anyone other than the named 
addressee(s), the recipient(s) should immediately notify 
i...@tigeranalytics.com<mailto:i...@tigeranalytics.com> and promptly delete the 
transmitted material from your computer and server.   In no event shall this 
material be read, used, stored, or retained by anyone other than the named 
addressee(s) without the express written consent of the sender or the named 
addressee(s). Computer viruses can be transmitted viaemail. The recipient 
should check this email and any attachments for viruses. The company accepts no 
liability for any damage caused by any virus transmitted by this email.



Re: DataFrame multiple agg on the same column

2017-10-07 Thread yohann jardin
Hey Somasundaram,

Using a map is only one way to use the function agg. For the complete list: 
https://spark.apache.org/docs/1.5.2/api/java/org/apache/spark/sql/GroupedData.html

Using the first one: 
agg<https://spark.apache.org/docs/1.5.2/api/java/org/apache/spark/sql/GroupedData.html#agg%28org.apache.spark.sql.Column,%20org.apache.spark.sql.Column...%29>(Column<https://spark.apache.org/docs/1.5.2/api/java/org/apache/spark/sql/Column.html>
 expr, 
Column<https://spark.apache.org/docs/1.5.2/api/java/org/apache/spark/sql/Column.html>...
 exprs)
grouped_txn.agg(count(lit(1)), sum('amount), max('amount), min('create_time), 
max('created_time)).show

Yohann Jardin

Le 10/7/2017 à 7:12 PM, Somasundaram Sekar a écrit :
Hi,

I have a GroupedData object, on which I perform aggregation of few columns 
since GroupedData takes in map, I cannot perform multiple aggregate on the same 
column, say I want to have both max and min of amount.

So the below line of code will return only one aggregate per column

grouped_txn.agg({'*' : 'count', 'amount' : 'sum', 'amount' : 'max', 
'created_time' : 'min', 'created_time' : 'max'})

What are the possible alternatives, I can have a new column defined, that is 
just a copy of the original and use that, but that looks ugly any suggestions?

Thanks,
Somasundaram S



Re:Fwd: Help needed in Dividing open close dates column into multiple columns in dataframe

2017-09-20 Thread yohann jardin
Hey,

Did you check similar questions on Google first ? Like 
https://stackoverflow.com/questions/32196207/derive-multiple-columns-from-a-single-column-in-a-spark-dataframe



Sent from my Redmi Note 2
Le Aakash Basu , 20 sept. 2017 21:36 a écrit :

Hi folks,

Any solution to it? PFB.

Thanks,
Aakash.
-- Forwarded message --
From: Aakash Basu 
mailto:aakash.spark@gmail.com>>
Date: Tue, Sep 19, 2017 at 2:32 PM
Subject: Help needed in Dividing open close dates column into multiple columns 
in dataframe
To: user mailto:user@spark.apache.org>>


Hi,

I've a csv dataset which has a column with all the details of store open and 
close timings as per dates, but the data is highly variant, as follows -


Mon-Fri 10am-9pm, Sat 10am-8pm, Sun 12pm-6pm
Mon-Sat 10am-8pm, Sun Closed
Mon-Sat 10am-8pm, Sun 10am-6pm
Mon-Friday 9-8 / Saturday 10-7 / Sunday 11-5
Mon-Sat 9am-8pm, Sun 10am-7pm
Mon-Sat 10am-8pm, 11am - 6pm
Mon-Fri 9am-6pm, Sat 10am-5pm, Sun Closed
Mon-Thur 10am-7pm, Fri 10am-5pm, Sat Closed, Sun 10am-5pm
Mon-Sat 10-7 Sun Closed
MON-FRI 10:00-8:00, SAT 10:00-7:00, SUN 12:00-5:00


I have to split the data of this one column into 14 columns, as -

Monday Open Time
Monday Close Time
Tuesday Open Time
Tuesday Close Time
Wednesday Open Time
Wednesday Close Time
Thursday Open Time
Thursday Close Time
Friday Open Time
Friday Close Time
Saturday Open Time
Saturday Close Time
Sunday Open Time
Sunday Close Time


Can someone please let me know if someone faced similar issue and also how they 
resolved this in SparkSQL dataframes.

Using: CSV data, Spark 2.1, PySpark, using dataframes. (Tried using case 
statement.)

Thanks,
Aakash.



Re: add arraylist to dataframe

2017-08-29 Thread yohann jardin
Hello Asmath,

Your list exist inside the driver, but you try to add element in it from 
the executors. They are in different processes, on different nodes, they 
do not communicate just like that.
https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions

There exist an action called 'collect' that will create the list for 
you. Something like the following should do what you want:

     import scala.collection.JavaConversions._
     points = df.rdd.map { row =>
         val latitude = 
com.navistar.telematics.datascience.validation.PreValidation.getDefaultDoubleVal(row.getAs[String](Constants.Datapoint.Latitude))
         val longitude = 
com.navistar.telematics.datascience.validation.PreValidation.getDefaultDoubleVal(row.getAs[String](Constants.Datapoint.Longitude))
         return new Coordinate(latitude, longitude)
     }.collect()

Note that you are retrieving ALL your coordinates in the driver. If you 
have too much data, this will lead to Out Of Memory.

Le 8/29/2017 à 8:21 PM, KhajaAsmath Mohammed a écrit :
> Hi,
>
> I am initiating arraylist before iterating throuugh the map method. I 
> am always getting the list size value as zero after map operation.
>
> How do I add values to list inside the map method of dataframe ? any 
> suggestions?
>
>  val points = new 
> java.util.ArrayList[com.vividsolutions.jts.geom.Coordinate]()
>     import scala.collection.JavaConversions._
>     df.rdd.map { row =>
>         val latitude = 
> com.navistar.telematics.datascience.validation.PreValidation.getDefaultDoubleVal(row.getAs[String](Constants.Datapoint.Latitude))
>         val longitude = 
> com.navistar.telematics.datascience.validation.PreValidation.getDefaultDoubleVal(row.getAs[String](Constants.Datapoint.Longitude))
>         points.add(new Coordinate(latitude, longitude))
>     }
> points.size is always zero.
>
>
> Thanks,
> Asmath



Re: How to configure spark on Yarn cluster

2017-07-28 Thread yohann jardin
For yarn, I'm speaking about the file fairscheduler.xml (if you kept the 
default scheduling of Yarn): 
https://hadoop.apache.org/docs/r2.7.3/hadoop-yarn/hadoop-yarn-site/FairScheduler.html#Allocation_file_format


Yohann Jardin

Le 7/28/2017 à 8:00 PM, jeff saremi a écrit :

The only relevant setting i see in Yarn is this:

  
yarn.nodemanager.resource.memory-mb
120726
  
which is 120GB and we are well below that. I don't see a total limit.

I haven't played with spark.memory.fraction. I'm not sure if it makes a 
difference. Note that there are no errors coming from Spark with respect to 
memory being an issue. Yarn kills the JVM and just prints out one line: Out of 
memory in the stdout of the container. After that Spark complains about the 
ExecutorLostFailure. So the memory factions are not playing a factor here.
I just looked at the link you included. Thank you. Yes this is the same problem 
however it looks like no one has come up with a solution for this problem yet


________
From: yohann jardin <mailto:yohannjar...@hotmail.com>
Sent: Friday, July 28, 2017 10:47:40 AM
To: jeff saremi; user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: How to configure spark on Yarn cluster


Not sure that we are OK on one thing: Yarn limitations are for the sum of all 
nodes, while you only specify the memory for a single node through Spark.


By the way, the memory displayed in the UI is only a part of the total memory 
allocation: 
https://spark.apache.org/docs/latest/configuration.html#memory-management

It corresponds to “spark.memory.fraction”, so it will mainly be filled by the 
rdd you’re trying to persist. The memory left by this parameter will be used to 
read the input file and compute. When the fail comes from this, the Out Of 
Memory exception is quite explicit in the driver logs.

Testing sampled files of 1 GB, 1 TB, 10 TB should help investigate what goes 
right and what goes wrong at least a bit.


Also, did you check for similar issues on stackoverflow? Like 
https://stackoverflow.com/questions/40781354/container-killed-by-yarn-for-exceeding-memory-limits-10-4-gb-of-10-4-gb-physic


Regards,

Yohann Jardin

Le 7/28/2017 à 6:05 PM, jeff saremi a écrit :

Thanks so much Yohann

I checked the Storage/Memory column in Executors status page. Well below where 
I wanted to be.
I will try the suggestion on smaller data sets.
I am also well within the Yarn limitations (128GB). In my last try I asked for 
48+32 (overhead). So somehow I am exceeding that or I should say Spark is 
exceeding since I am trusting to manage the memory I provided for it.
Is there anything in Shuffle Write Size, Shuffle Spill, or anything in the logs 
that I should be looking for to come up with the recommended memory size or 
partition count?

thanks

________
From: yohann jardin <mailto:yohannjar...@hotmail.com>
Sent: Thursday, July 27, 2017 11:15:39 PM
To: jeff saremi; user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: How to configure spark on Yarn cluster


Check the executor page of the Spark UI, to check if your storage level is 
limiting.


Also, instead of starting with 100 TB of data, sample it, make it work, and 
grow it little by little until you reached 100 TB. This will validate the 
workflow and let you see how much data is shuffled, etc.


And just in case, check the limits you set on your Yarn queue. If you try to 
allocate more memory to your job than what is set on the queue, there might be 
cases of failure.
Though there are some limitations, it’s possible to allocate more ram to your 
job than available on your Yarn queue.


Yohann Jardin

Le 7/28/2017 à 8:03 AM, jeff saremi a écrit :

I have the simplest job which i'm running against 100TB of data. The job keeps 
failing with ExecutorLostFailure's on containers killed by Yarn for exceeding 
memory limits

I have varied the executor-memory from 32GB to 96GB, the 
spark.yarn.executor.memoryOverhead from 8192 to 36000 and similar changes to 
the number of cores, and driver size. It looks like nothing stops this error 
(running out of memory) from happening. Looking at metrics reported by Spark 
status page, is there anything I can use to configure my job properly? Is 
repartitioning more or less going to help at all? The current number of 
partitions is around 40,000 currently.

Here's the gist of the code:


val input = sc.textFile(path)

val t0 = input.map(s => s.split("\t").map(a => ((a(0),a(1)), a)))

t0.persist(StorageLevel.DISK_ONLY)


I have changed storagelevel from MEMORY_ONLY to MEMORY_AND_DISK to DISK_ONLY to 
no avail.







Re: How to configure spark on Yarn cluster

2017-07-28 Thread yohann jardin
Not sure that we are OK on one thing: Yarn limitations are for the sum of all 
nodes, while you only specify the memory for a single node through Spark.


By the way, the memory displayed in the UI is only a part of the total memory 
allocation: 
https://spark.apache.org/docs/latest/configuration.html#memory-management

It corresponds to “spark.memory.fraction”, so it will mainly be filled by the 
rdd you’re trying to persist. The memory left by this parameter will be used to 
read the input file and compute. When the fail comes from this, the Out Of 
Memory exception is quite explicit in the driver logs.

Testing sampled files of 1 GB, 1 TB, 10 TB should help investigate what goes 
right and what goes wrong at least a bit.


Also, did you check for similar issues on stackoverflow? Like 
https://stackoverflow.com/questions/40781354/container-killed-by-yarn-for-exceeding-memory-limits-10-4-gb-of-10-4-gb-physic


Regards,

Yohann Jardin

Le 7/28/2017 à 6:05 PM, jeff saremi a écrit :

Thanks so much Yohann

I checked the Storage/Memory column in Executors status page. Well below where 
I wanted to be.
I will try the suggestion on smaller data sets.
I am also well within the Yarn limitations (128GB). In my last try I asked for 
48+32 (overhead). So somehow I am exceeding that or I should say Spark is 
exceeding since I am trusting to manage the memory I provided for it.
Is there anything in Shuffle Write Size, Shuffle Spill, or anything in the logs 
that I should be looking for to come up with the recommended memory size or 
partition count?

thanks


From: yohann jardin <mailto:yohannjar...@hotmail.com>
Sent: Thursday, July 27, 2017 11:15:39 PM
To: jeff saremi; user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: How to configure spark on Yarn cluster


Check the executor page of the Spark UI, to check if your storage level is 
limiting.


Also, instead of starting with 100 TB of data, sample it, make it work, and 
grow it little by little until you reached 100 TB. This will validate the 
workflow and let you see how much data is shuffled, etc.


And just in case, check the limits you set on your Yarn queue. If you try to 
allocate more memory to your job than what is set on the queue, there might be 
cases of failure.
Though there are some limitations, it’s possible to allocate more ram to your 
job than available on your Yarn queue.


Yohann Jardin

Le 7/28/2017 à 8:03 AM, jeff saremi a écrit :

I have the simplest job which i'm running against 100TB of data. The job keeps 
failing with ExecutorLostFailure's on containers killed by Yarn for exceeding 
memory limits

I have varied the executor-memory from 32GB to 96GB, the 
spark.yarn.executor.memoryOverhead from 8192 to 36000 and similar changes to 
the number of cores, and driver size. It looks like nothing stops this error 
(running out of memory) from happening. Looking at metrics reported by Spark 
status page, is there anything I can use to configure my job properly? Is 
repartitioning more or less going to help at all? The current number of 
partitions is around 40,000 currently.

Here's the gist of the code:


val input = sc.textFile(path)

val t0 = input.map(s => s.split("\t").map(a => ((a(0),a(1)), a)))

t0.persist(StorageLevel.DISK_ONLY)


I have changed storagelevel from MEMORY_ONLY to MEMORY_AND_DISK to DISK_ONLY to 
no avail.






Re: How to configure spark on Yarn cluster

2017-07-27 Thread yohann jardin
Check the executor page of the Spark UI, to check if your storage level is 
limiting.


Also, instead of starting with 100 TB of data, sample it, make it work, and 
grow it little by little until you reached 100 TB. This will validate the 
workflow and let you see how much data is shuffled, etc.


And just in case, check the limits you set on your Yarn queue. If you try to 
allocate more memory to your job than what is set on the queue, there might be 
cases of failure.
Though there are some limitations, it’s possible to allocate more ram to your 
job than available on your Yarn queue.


Yohann Jardin

Le 7/28/2017 à 8:03 AM, jeff saremi a écrit :

I have the simplest job which i'm running against 100TB of data. The job keeps 
failing with ExecutorLostFailure's on containers killed by Yarn for exceeding 
memory limits

I have varied the executor-memory from 32GB to 96GB, the 
spark.yarn.executor.memoryOverhead from 8192 to 36000 and similar changes to 
the number of cores, and driver size. It looks like nothing stops this error 
(running out of memory) from happening. Looking at metrics reported by Spark 
status page, is there anything I can use to configure my job properly? Is 
repartitioning more or less going to help at all? The current number of 
partitions is around 40,000 currently.

Here's the gist of the code:


val input = sc.textFile(path)

val t0 = input.map(s => s.split("\t").map(a => ((a(0),a(1)), a)))

t0.persist(StorageLevel.DISK_ONLY)


I have changed storagelevel from MEMORY_ONLY to MEMORY_AND_DISK to DISK_ONLY to 
no avail.





RE: Is there a difference between these aggregations

2017-07-24 Thread yohann jardin
Seen directly in the code:


  /**
   * Aggregate function: returns the average of the values in a group.
   * Alias for avg.
   *
   * @group agg_funcs
   * @since 1.4.0
   */
  def mean(e: Column): Column = avg(e)



That's the same when the argument is the column name.

So no difference between mean and avg functions.



De : Aseem Bansal 
Envoyé : lundi 24 juillet 2017 13:34
À : user
Objet : Is there a difference between these aggregations

If I want to aggregate mean and subtract from my column I can do either of the 
following in Spark 2.1.0 Java API. Is there any difference between these? 
Couldn't find anything from reading the docs.

dataset.select(mean("mycol"))
dataset.agg(mean("mycol"))

dataset.select(avg("mycol"))
dataset.agg(avg("mycol"))


RE: [Spark] Working with JavaPairRDD from Scala

2017-07-22 Thread yohann jardin
Hello Lukasz,


You can just:

val pairRdd = javapairrdd.rdd();


Then pairRdd will be of type RDD>, with K being 
com.vividsolutions.jts.geom.Polygon, and V being 
java.util.HashSet[com.vividsolutions.jts.geom.Polygon]



If you really want to continue with Java objects:

val calculateIntersection = new Function2, 
scala.collection.mutable.Set[Double]>() {}

and in the curly braces, overriding the call function.


Another solution would be to use lambda (I do not code much in scala and I'm 
definitely not sure this works, but I expect it to, so you'd have to test it):

javaparrdd.map((polygon: Polygon, hash: HashSet) => (polygon, 
hash.asScala.map(polygon.intersection(_).getArea))


De : Lukasz Tracewski 
Envoyé : samedi 22 juillet 2017 00:18
À : user@spark.apache.org
Objet : [Spark] Working with JavaPairRDD from Scala


Hi,

I would like to call a method on JavaPairRDD from Scala and I am not sure how 
to write a function for the "map". I am using a third-party library that uses 
Spark for geospatial computations and it happens that it returns some results 
through Java API. I'd welcome a hint how to write a function for 'map' such 
that JavaPairRDD is happy.

Here's a signature:
org.apache.spark.api.java.JavaPairRDD[com.vividsolutions.jts.geom.Polygon,java.util.HashSet[com.vividsolutions.jts.geom.Polygon]]
 = org.apache.spark.api.java.JavaPairRDD

Normally I would write something like this:

def calculate_intersection(polygon: Polygon, hashSet: HashSet[Polygon]) = {
  (polygon, hashSet.asScala.map(polygon.intersection(_).getArea))
}

javapairrdd.map(calculate_intersection)


... but it will complain that it's not a Java Function.

My first thought was to implement the interface, i.e.:


class PairRDDWrapper extends 
org.apache.spark.api.java.function.Function2[Polygon, HashSet[Polygon]]
{
  override def call(polygon: Polygon, hashSet: HashSet[Polygon]): (Polygon, 
scala.collection.mutable.Set[Double]) = {
(polygon, hashSet.asScala.map(polygon.intersection(_).getArea))
  }
}




I am not sure though how to use it, or if it makes any sense in the first 
place. Should be simple, it's just my Java / Scala is "little rusty".


Cheers,
Lucas


Re: Spark 2.1.1 and Hadoop version 2.2 or 2.7?

2017-06-20 Thread yohann jardin
https://spark.apache.org/docs/2.1.0/building-spark.html#specifying-the-hadoop-version

Version Hadoop v2.2.0 only is the default build version, but other versions can 
still be built. The package you downloaded is prebuilt for Hadoop 2.7 as said 
on the download page, don't worry.

Yohann Jardin

Le 6/21/2017 à 7:51 AM, N B a écrit :
I had downloaded the pre build package labeled "Spark 2.1.1 prebuilt with 
Hadoop 2.7 or later" from the direct download link on 
spark.apache.org<http://spark.apache.org>.

However, I am seeing compatibility errors running against a deployed HDFS 
2.7.3. (See my earlier message about Flume DStream producing 0 records after 
HDFS node restarted) I have been digging into this issue and have started to 
suspect versions mismatch between Hadoop server and client. I decided to look 
at Spark 2.1.1's pom.xml. It states hadoop,version as 2.2.0. There seems to be 
some mismtach here that I am not sure if that's the root cause of the issues I 
have been seeing.

Can someone please confirm if the package mentioned above was indeed compiled 
with Hadoop 2.7? Or should I fall back on an HDFS Server 2.2 instead?

Thanks
N B




Re: Flume DStream produces 0 records after HDFS node killed

2017-06-20 Thread yohann jardin
Which version of Hadoop are you running on?

Yohann Jardin

Le 6/21/2017 à 1:06 AM, N B a écrit :
Ok some more info about this issue to see if someone can shine a light on what 
could be going on. I turned on debug logging for 
org.apache.spark.streaming.scheduler in the driver process and this is what 
gets thrown in the logs and keeps throwing it even after the downed HDFS node 
is restarted. Using Spark 2.1.1 and HDFS 2.7.3 here.

2017-06-20 22:38:11,302 WARN JobGenerator ReceivedBlockTracker.logWarning - 
Exception thrown while writing record: BatchCleanupEvent(ArrayBuffer()) to the 
WriteAheadLog.
org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194)
at 
org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:83)
at 
org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:234)
at 
org.apache.spark.streaming.scheduler.ReceivedBlockTracker.cleanupOldBatches(ReceivedBlockTracker.scala:171)
at 
org.apache.spark.streaming.scheduler.ReceiverTracker.cleanupOldBlocksAndBatches(ReceiverTracker.scala:233)
at 
org.apache.spark.streaming.scheduler.JobGenerator.clearCheckpointData(JobGenerator.scala:287)
at 
org.apache.spark.streaming.scheduler.JobGenerator.org<http://org.apache.spark.streaming.scheduler.JobGenerator.org>$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:187)
at 
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
at 
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Caused by: 
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.HadoopIllegalArgumentException):
 Missing storageIDs: It is likely that the HDFS client, who made this call, is 
running in an older version of Hadoop which does not support storageIDs. 
datanodeID.length=1, 
src=/vm/spark-checkpoint/receivedBlockMetadata/log-1497997390799-1497997450799, 
fileId=0, blk=BP-1450953312-10.0.0.9-1490120290209:blk_1081472520_7731872, 
clientName=DFSClient_NONMAPREDUCE_-23097586_1, clientMachine=10.0.0.17
at 
org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager.getDatanodeStorageInfos(DatanodeManager.java:514)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalDatanode(FSNamesystem.java:3353)
at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getAdditionalDatanode(NameNodeRpcServer.java:759)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getAdditionalDatanode(ClientNamenodeProtocolServerSideTranslatorPB.java:515)
at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)

at org.apache.hadoop.ipc.Client.call(Client.java:1347)
at org.apache.hadoop.ipc.Client.call(Client.java:1300)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
at com.sun.proxy.$Proxy10.getAdditionalDatanode(Unknown Source)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy10.getAdditionalDatanode(Unknown Source)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getAdditionalDatanode(ClientNamenodeProtocolTranslatorPB.java:352)
at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:919)
at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1031)
at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:823)
at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:475)
polygraph-engine 2017-06-20 22:38:11,302 WARN JobGenerator 
ReceivedBlockTracker.logWarning - Failed to acknowledge batch clean up in the 
Write Ahead Log.

Thanks
N B


On Tue, Jun 20, 2017 at 10:24 A

Re: What is the real difference between Kafka streaming and Spark Streaming?

2017-06-11 Thread yohann jardin
Hey,

Kafka can also do streaming on its own: 
https://kafka.apache.org/documentation/streams
I don’t know much about it unfortunately. I can only repeat what I heard in 
conferences, saying that one should give a try to Kafka streaming when its 
whole pipeline is using Kafka. I have no pros/cons to argument on this topic.

Yohann Jardin

Le 6/11/2017 à 7:08 PM, vaquar khan a écrit :

Hi Kant,

Kafka is the message broker that using as Producers and Consumers and Spark 
Streaming is used as the real time processing ,Kafka and Spark Streaming work 
together not competitors.

Spark Streaming is reading data from Kafka and process into micro batching for 
streaming data, In easy terms collects data for some time, build RDD and then 
process these micro batches.


Please read doc : 
https://spark.apache.org/docs/latest/streaming-programming-guide.html


Spark Streaming is an extension of the core Spark API that enables scalable, 
high-throughput, fault-tolerant stream processing of live data streams. Data 
can be ingested from many sources like Kafka, Flume, Kinesis, or TCP sockets, 
and can be processed using complex algorithms expressed with high-level 
functions like map, reduce, join and window. Finally, processed data can be 
pushed out to filesystems, databases, and live dashboards. In fact, you can 
apply Spark’s machine 
learning<https://spark.apache.org/docs/latest/ml-guide.html> and graph 
processing<https://spark.apache.org/docs/latest/graphx-programming-guide.html> 
algorithms on data streams.


Regards,

Vaquar khan

On Sun, Jun 11, 2017 at 3:12 AM, kant kodali 
mailto:kanth...@gmail.com>> wrote:
Hi All,

I am trying hard to figure out what is the real difference between Kafka 
Streaming vs Spark Streaming other than saying one can be used as part of Micro 
services (since Kafka streaming is just a library) and the other is a 
Standalone framework by itself.

If I can accomplish same job one way or other this is a sort of a puzzling 
question for me so it would be great to know what Spark streaming can do that 
Kafka Streaming cannot do efficiently or whatever ?

Thanks!




--
Regards,
Vaquar Khan
+1 -224-436-0783
Greater Chicago



Re: What is the real difference between Kafka streaming and Spark Streaming?

2017-06-11 Thread yohann jardin
Hey,

Kafka can also do streaming on its own: 
https://kafka.apache.org/documentation/streams
I don't know much about it unfortunately. I can only repeat what I heard in 
conferences, saying that one should give a try to Kafka streaming when its 
whole pipeline is using Kafka. I have no pros/cons to argument on this topic.

Yohann Jardin

Le 6/11/2017 à 7:08 PM, vaquar khan a écrit :

Hi Kant,

Kafka is the message broker that using as Producers and Consumers and Spark 
Streaming is used as the real time processing ,Kafka and Spark Streaming work 
together not competitors.

Spark Streaming is reading data from Kafka and process into micro batching for 
streaming data, In easy terms collects data for some time, build RDD and then 
process these micro batches.


Please read doc : 
https://spark.apache.org/docs/latest/streaming-programming-guide.html


Spark Streaming is an extension of the core Spark API that enables scalable, 
high-throughput, fault-tolerant stream processing of live data streams. Data 
can be ingested from many sources like Kafka, Flume, Kinesis, or TCP sockets, 
and can be processed using complex algorithms expressed with high-level 
functions like map, reduce, join and window. Finally, processed data can be 
pushed out to filesystems, databases, and live dashboards. In fact, you can 
apply Spark’s machine 
learning<https://spark.apache.org/docs/latest/ml-guide.html> and graph 
processing<https://spark.apache.org/docs/latest/graphx-programming-guide.html> 
algorithms on data streams.


Regards,

Vaquar khan

On Sun, Jun 11, 2017 at 3:12 AM, kant kodali 
mailto:kanth...@gmail.com>> wrote:
Hi All,

I am trying hard to figure out what is the real difference between Kafka 
Streaming vs Spark Streaming other than saying one can be used as part of Micro 
services (since Kafka streaming is just a library) and the other is a 
Standalone framework by itself.

If I can accomplish same job one way or other this is a sort of a puzzling 
question for me so it would be great to know what Spark streaming can do that 
Kafka Streaming cannot do efficiently or whatever ?

Thanks!




--
Regards,
Vaquar Khan
+1 -224-436-0783
Greater Chicago



Spark SQL, formatting timezone in UTC

2017-06-02 Thread yohann jardin
Hello everyone,


I'm having a hard time with time zones.

I have a Long representing a timestamp: 149636160, I want the output to be 
2017-06-02 00:00:00


Based on 
https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/functions.html

The only function that helps formatting a timestamp is from_unixtime, but it 
bases the output timezone as the system timezone... but my timezone and the 
server timezone I'm working on are not in UTC.

I couldn't find any help on Google among the dozens of JIRA, stackoverflow and 
blog articles I found.


In the end I decided to write a udf:

def formatMs(ms: Long): java.lang.String = {
val formatter = new java.text.SimpleDateFormat("MMdd HH:mm:ss")
formatter.setTimeZone(java.util.TimeZone.getTimeZone("GMT"));
formatter.format(new java.util.Date(ms))
}

spark.udf.register("formatMs", formatMs)
spark.sql("SELECT formatMs(149636160)").show


But if I really go for that, it will decrease the performance of my 
application, right?

Like I need to aggregate some data based on such column. As my function is a 
black box, spark will use it first and aggregate on the String output of the 
udf, though it would lead to the same result by aggregate on the initial Long 
value and then using the udf.


I know I can also forget my udf in the sql query, and apply, on the created 
dataframe, the functions withColumn() and withColumnRenamed(), but that is 
something to bench.


Did I miss any possibility to do that within a SparkQL query using standard 
functions or something much more performant than what I can think of?


Regards,

Yohann


Re: Recommended cluster parameters

2017-04-30 Thread yohann jardin
It really depends on your needs and your data.


Do you want to store 1 TB, 1 PB or far more? Do you want to just read that 
data, retrieve it then do little work on it and then read it, have a complex 
machine learning pipeline? Depending on the workload, the ratio between cores 
and storage will vary.


First start with a subset of your data and do some tests on your own computer 
or (that’s better) with a little cluster of 3 nodes. This will help you to find 
your ratio between storage/cores and the needs of memory that you might expect 
if you are not using just a subset of your data but the whole bunch available 
that you (can) have.


Then using this information and indications on Spark website 
(http://spark.apache.org/docs/latest/hardware-provisioning.html), you will be 
able to specify the hardware of one node, and how many nodes you need (at least 
3).


Yohann Jardin

Le 4/30/2017 à 10:26 AM, rakesh sharma a écrit :

Hi

I would like to know the details of implementing a cluster.

What kind of machines one would require, how many nodes, number of cores etc.


thanks

rakesh



Writing dataframe to a final path using another temporary path

2017-03-28 Thread yohann jardin
Hello,



I’m using spark 2.1.

Once a job completes, I want to write a Parquet file to, let’s say, the folder 
/user/my_user/final_path/


However, I have other jobs reading files in that specific folder, so I need 
those files to be completely written when there are in that folder.

So while the file is written, I need it to be written in a temporary location 
like /user/my_user/tmp_path/, the path of my application or any other path that 
could be temporary. Once fully written, that file can then be moved to the real 
destination folder /user/my_user/final_path/


So I was wondering, is this the default behavior? If not, did I miss an option 
to do so? I looked in the documentation and in 
org.apache.spark.sql.execution.datasources.parquet.ParquetOptions.scala but I 
can’t find any information about this.

Or else, should I save by myself to a temporary location and then move the file 
to the right location?


Any input is greatly appreciated,

Yohann


RE: RE: Fast write datastore...

2017-03-16 Thread yohann jardin
Hello everyone,

I'm also really interested in the answers as I will be facing the same issue 
soon.
Muthu, if you evaluate again Apache Ignite, can you share your results? I also 
noticed Alluxio to store spark results in memory that you might want to 
investigate.

In my case I want to use them to have a real time dashboard (or like waiting 
very few seconds to refine a dashboard), and that use case seems similar to 
your filter/aggregate previously computed spark results.

Regards,
Yohann


De : Rick Moritz 
Envoyé : jeudi 16 mars 2017 10:37
À : user
Objet : Re: RE: Fast write datastore...

If you have enough RAM/SSDs available, maybe tiered HDFS storage and Parquet 
might also be an option. Of course, management-wise it has much more overhead 
than using ES, since you need to manually define partitions and buckets, which 
is suboptimal. On the other hand, for querying, you can probably get some 
decent performance by hooking up Impala or Presto or LLAP-Hive, if Spark were 
too slow/cumbersome.
Depending on your particular access patterns, this may not be very practical, 
but as a general approach it might be one way to get intermediate results 
quicker, and with less of a storage-zoo than some alternatives.

On Thu, Mar 16, 2017 at 7:57 AM, Shiva Ramagopal 
mailto:tr.s...@gmail.com>> wrote:
I do think Kafka is an overkill in this case. There are no streaming use- cases 
that needs a queue to do pub-sub.

On 16-Mar-2017 11:47 AM, "vvshvv" mailto:vvs...@gmail.com>> 
wrote:
Hi,

>> A slightly over-kill solution may be Spark to Kafka to ElasticSearch?

I do not think so, in this case you will be able to process Parquet files as 
usual, but Kafka will allow your Elasticsearch cluster to be stable and survive 
regarding the number of rows.

Regards,
Uladzimir



On jasbir.s...@accenture.com, Mar 16, 2017 
7:52 AM wrote:
Hi,

Will MongoDB not fit this solution?



From: Vova Shelgunov [mailto:vvs...@gmail.com]
Sent: Wednesday, March 15, 2017 11:51 PM
To: Muthu Jayakumar mailto:bablo...@gmail.com>>
Cc: vincent gromakowski 
mailto:vincent.gromakow...@gmail.com>>; Richard 
Siebeling mailto:rsiebel...@gmail.com>>; user 
mailto:user@spark.apache.org>>; Shiva Ramagopal 
mailto:tr.s...@gmail.com>>
Subject: Re: Fast write datastore...

Hi Muthu,.

I did not catch from your message, what performance do you expect from 
subsequent queries?

Regards,
Uladzimir

On Mar 15, 2017 9:03 PM, "Muthu Jayakumar" 
mailto:bablo...@gmail.com>> wrote:
Hello Uladzimir / Shiva,

>From ElasticSearch documentation (i have to see the logical plan of a query to 
>confirm), the richness of filters (like regex,..) is pretty good while 
>comparing to Cassandra. As for aggregates, i think Spark Dataframes is quite 
>rich enough to tackle.
Let me know your thoughts.

Thanks,
Muthu


On Wed, Mar 15, 2017 at 10:55 AM, vvshvv 
mailto:vvs...@gmail.com>> wrote:
Hi muthu,

I agree with Shiva, Cassandra also supports SASI indexes, which can partially 
replace Elasticsearch functionality.

Regards,
Uladzimir



Sent from my Mi phone
On Shiva Ramagopal mailto:tr.s...@gmail.com>>, Mar 15, 2017 
5:57 PM wrote:
Probably Cassandra is a good choice if you are mainly looking for a datastore 
that supports fast writes. You can ingest the data into a table and define one 
or more materialized views on top of it to support your queries. Since you 
mention that your queries are going to be simple you can define your indexes in 
the materialized views according to how you want to query the data.
Thanks,
Shiva


On Wed, Mar 15, 2017 at 7:58 PM, Muthu Jayakumar 
mailto:bablo...@gmail.com>> wrote:
Hello Vincent,

Cassandra may not fit my bill if I need to define my partition and other 
indexes upfront. Is this right?

Hello Richard,

Let me evaluate Apache Ignite. I did evaluate it 3 months back and back then 
the connector to Apache Spark did not support Spark 2.0.

Another drastic thought may be repartition the result count to 1 (but have to 
be cautions on making sure I don't run into Heap issues if the result is too 
large to fit into an executor)  and write to a relational database like mysql / 
postgres. But, I believe I can do the same using ElasticSearch too.

A slightly over-kill solution may be Spark to Kafka to ElasticSearch?

More thoughts welcome please.

Thanks,
Muthu

On Wed, Mar 15, 2017 at 4:53 AM, Richard Siebeling 
mailto:rsiebel...@gmail.com>> wrote:
maybe Apache Ignite does fit your requirements

On 15 March 2017 at 08:44, vincent gromakowski 
mailto:vincent.gromakow...@gmail.com>> wrote:
Hi
If queries are statics and filters are on the same columns, Cassandra is a good 
option.

Le 15 mars 2017 7:04 AM, "muthu" 
mailto:bablo...@gmail.com>> a écrit :
Hello there,

I have one or more parquet files to read and perform some aggregate queries
using Spark Dataframe. I would like to find a reasonable fast datastore that
allows me to write the results for subsequ

Re: No main class set in JAR; please specify one with --class and java.lang.ClassNotFoundException

2017-02-25 Thread yohann jardin
You should read (again?) the Spark documentation about submitting an 
application: http://spark.apache.org/docs/latest/submitting-applications.html

Try with the Pi computation example available with Spark.
For example:

./bin/spark-submit --class org.apache.spark.examples.SparkPi 
examples/jars/spark-examples*.jar

after --class you specify the path, in your provided jar, to the Main you want 
to run. You finish by specifying the jar that contains your main class.

Yohann Jardin

Le 2/25/2017 à 9:50 PM, Raymond Xie a écrit :
I am doing a spark streaming on a hortonworks sandbox and am stuck here now, 
can anyone tell me what's wrong with the following code and the exception it 
causes and how do I fix it? Thank you very much in advance.

spark-submit --jars 
/usr/hdp/2.5.0.0-1245/spark/lib/spark-assembly-1.6.2.2.5.0.0-1245-hadoop2.7.3.2.5.0.0-1245.jar
  /usr/hdp/2.5.0.0-1245/kafka/libs/kafka-streams-0.10.0.2.5.0.0-1245.jar 
/root/hdp/kafka_wordcount.py 192.168.128.119:2181<http://192.168.128.119:2181> 
test

Error:
No main class set in JAR; please specify one with --class


spark-submit --class 
/usr/hdp/2.5.0.0-1245/spark/lib/spark-assembly-1.6.2.2.5.0.0-1245-hadoop2.7.3.2.5.0.0-1245.jar
  /usr/hdp/2.5.0.0-1245/kafka/libs/kafka-streams-0.10.0.2.5.0.0-1245.jar 
/root/hdp/kafka_wordcount.py 192.168.128.119:2181<http://192.168.128.119:2181> 
test

Error:
java.lang.ClassNotFoundException: 
/usr/hdp/2.5.0.0-1245/spark/lib/spark-assembly-1.6.2.2.5.0.0-1245-hadoop2.7.3.2.5.0.0-1245.jar

spark-submit --class  
/usr/hdp/2.5.0.0-1245/kafka/libs/kafka-streams-0.10.0.2.5.0.0-1245.jar 
/usr/hdp/2.5.0.0-1245/spark/lib/spark-assembly-1.6.2.2.5.0.0-1245-hadoop2.7.3.2.5.0.0-1245.jar
  /root/hdp/kafka_wordcount.py 
192.168.128.119:2181<http://192.168.128.119:2181> test

Error:
java.lang.ClassNotFoundException: 
/usr/hdp/2.5.0.0-1245/kafka/libs/kafka-streams-0.10.0.2.5.0.0-1245.jar


Sincerely yours,


Raymond



Executor links in Job History

2017-02-22 Thread yohann jardin
Hello,


I'm using Spark 2.1.0 and hadoop 2.2.0.

When I launch jobs on Yarn, I can retrieve their information on Spark History 
Server, except that the links to stdout/stderr of executors are wrong -> they 
lead to their url while the job was running.


We have the flag 'yarn.log-aggregation-enable' set to true and once a job is 
finished on Yarn, its logs are sent to HDFS.


On the client end, when I launch my job i set 'spark.eventLog.enabled' to true, 
and specify 'spark.eventLog.dir'. I can retrieve the DAG and such afterward on 
Spark History Server.


I checked http://spark.apache.org/docs/latest/running-on-yarn.html and 
http://spark.apache.org/docs/latest/monitoring.html

But I do not find what i'm missing to let Spark History Server redirect me to 
Yarn History Server with a valid link, to see the stdout/stderr logs of the 
executors.



Any idea?


Regards,

Yohann


Issues launching job dynamically in Java

2017-02-08 Thread yohann jardin
Hello everyone,

I'm trying to develop a WebService launching jobs. The WebService is based on 
tomcat, and I'm working with Spark 2.1.0.
The SparkLauncher provides two method to launch the job. First 
SparkLauncher.launch(), and 
SparkLauncher.startApplication(SparkAppHandle.Listener... listeners). The 
latter is preferred in the document since it provides much more control over 
the application.

In my case, the first one works fine. However the second one is failing when 
creating the SparkContext.
>From my investigation, it might be related to the handle. startApplication 
>creates a launcher server as the user tomcat on a specific port. Then 
>SparkContext tries to create a Socket listening to the just created launcher 
>server, but it ends on connection refused and the job is FAILED. The driver 
>being logged as a different user than tomcat, it seems that it cannot register 
>the Socket on the given port (even tried starting the cluster on the root 
>user, but no success).

I did some check before calling "new JavaSparkContext();" to create the 
context. The Socket is created in org.apache.spark.launcher.LauncherBackend:
val port = sys.env.get(LauncherProtocol.ENV_LAUNCHER_PORT).map(_.toInt)
val s = new Socket(InetAddress.getLoopbackAddress(), port.get)

I'm able to create a local ServerSocket on a different port and create a Socket 
on that port.
I can't create a local ServerSocket on the same loopback address as Spark, and 
the same port. The message says that there already is a LauncherServer on that 
address / port.

I'm not a socket exert at all, but this situation leads me to believe that 
Spark is unable to manage this use case yet, and that I should stay with the 
less conveniant method SparkLauncher.launch().

Did I do something wrong? Or someone faced a similar issue and have a 
workaround?
Also, I'm quite new in the Spark community. Should I fill an issue on JIRA or 
somewhere else?

Yohann