Optimal Amount of Tasks Per size of data in memory

2016-07-20 Thread Brandon White
What is the best heuristic for setting the number of partitions/task on an
RDD based on the size of the RDD in memory?

The Spark docs say that the number of partitions/tasks should be 2-3x the
number of CPU cores but this does not make sense for all data sizes.
Sometimes, this number is way to much and slows down the executor because
of overhead.


R: ML PipelineModel to be scored locally

2016-07-20 Thread Simone
Thanks for your reply. 

I cannot rely on jpmml due licensing stuff.
I can evaluate writing my own prediction code, but I am looking for a more 
general purpose approach. 

Any other thoughts?
Best
Simone

- Messaggio originale -
Da: "Peyman Mohajerian" 
Inviato: ‎20/‎07/‎2016 21:55
A: "Simone Miraglia" 
Cc: "User" 
Oggetto: Re: ML PipelineModel to be scored locally

One option is to save the model in parquet or json format and then build your 
own prediction code. Some also use: 
https://github.com/jpmml/jpmml-sparkml
It depends on the model, e.g. ml v mllib and other factors whether this works 
on or not. Couple of weeks ago there was a long discussion on this topic.


On Wed, Jul 20, 2016 at 7:08 AM, Simone Miraglia  
wrote:

Hi all,


I am working on the following use case involving ML Pipelines.


1. I created a Pipeline composed from a set of stages
2. I called "fit" method on my training set
3. I validated my model by calling "transform" on my test set
4. I stored my fitted Pipeline to a shared folder


Then I have a very low latency interactive application (say a kinda of web 
service), that should work as follows:
1. The app receives a request
2. A scoring needs to be made, according to my fitted PipelineModel
3. The app sends the score to the caller, in a synchronous fashion


Is there a way to call the .transform method of the PipelineModel over a single 
Row?


I will definitely not want to parallelize a single record to a DataFrame, nor 
relying on Spark Streaming due to latency requirements.
I would like to use something similar to mllib .predict(Vector) method which 
does not rely on Spark Context performing all the computation locally.


Thanks in advance
Best

Ratings in mllib.recommendation

2016-07-20 Thread glen


calculate time difference between consecutive rows

2016-07-20 Thread Divya Gehlot
I have a dataset of time as shown below :
Time1
07:30:23
07:34:34
07:38:23
07:39:12
07:45:20

I need to find the diff between two consecutive rows
I googled and found the *lag *function in *spark *helps in finding it .
but its  giving me *null *in the result set.

Would really appreciate the help.


Thanks,
Divya


Re: Understanding spark concepts cluster, master, slave, job, stage, worker, executor, task

2016-07-20 Thread Sachin Mittal
Hi,
Thanks for the links, is there any english translation for the same?

Sachin


On Thu, Jul 21, 2016 at 8:34 AM, Taotao.Li  wrote:

> Hi, Sachin,  here are two posts about the basic concepts about spark:
>
>
>- spark-questions-concepts
>
>- deep-into-spark-exection-model
>
>
>
> And, I fully recommend databrick's post:
> https://databricks.com/blog/2016/06/22/apache-spark-key-terms-explained.html
>
>
> On Thu, Jul 21, 2016 at 1:36 AM, Jean Georges Perrin  wrote:
>
>> Hey,
>>
>> I love when questions are numbered, it's easier :)
>>
>> 1) Yes (but I am not an expert)
>> 2) You don't control... One of my process is going to 8k tasks, so...
>> 3) Yes, if you have HT, it double. My servers have 12 cores, but HT, so
>> it makes 24.
>> 4) From my understanding: Slave is the logical computational unit and
>> Worker is really the one doing the job.
>> 5) Dunnoh
>> 6) Dunnoh
>>
>> On Jul 20, 2016, at 1:30 PM, Sachin Mittal  wrote:
>>
>> Hi,
>> I was able to build and run my spark application via spark submit.
>>
>> I have understood some of the concepts by going through the resources at
>> https://spark.apache.org but few doubts still remain. I have few
>> specific questions and would be glad if someone could share some light on
>> it.
>>
>> So I submitted the application using spark.masterlocal[*] and I have
>> a 8 core PC.
>>
>> - What I understand is that application is called as job. Since mine had
>> two stages it gets divided into 2 stages and each stage had number of tasks
>> which ran in parallel.
>> Is this understanding correct.
>>
>> - What I notice is that each stage is further divided into 262 tasks From
>> where did this number 262 came from. Is this configurable. Would increasing
>> this number improve performance.
>>
>> - Also I see that the tasks are run in parallel in set of 8. Is this
>> because I have a 8 core PC.
>>
>> - What is the difference or relation between slave and worker. When I did
>> spark-submit did it start 8 slaves or worker threads?
>>
>> - I see all worker threads running in one single JVM. Is this because I
>> did not start  slaves separately and connect it to a single master cluster
>> manager. If I had done that then each worker would have run in its own JVM.
>>
>> - What is the relationship between worker and executor. Can a worker have
>> more than one executors? If yes then how do we configure that. Does all
>> executor run in the worker JVM and are independent threads.
>>
>> I suppose that is all for now. Would appreciate any response.Will add
>> followup questions if any.
>>
>> Thanks
>> Sachin
>>
>>
>>
>>
>
>
> --
> *___*
> Quant | Engineer | Boy
> *___*
> *blog*:http://litaotao.github.io
> 
> *github*: www.github.com/litaotao
>


Re: the spark job is so slow - almost frozen

2016-07-20 Thread Zhiliang Zhu
Thanks a lot for your kind help.  

On Wednesday, July 20, 2016 11:35 AM, Andrew Ehrlich  
wrote:
 

 Try:
- filtering down the data as soon as possible in the job, dropping columns you 
don’t need.- processing fewer partitions of the hive tables at a time- caching 
frequently accessed data, for example dimension tables, lookup tables, or other 
datasets that are repeatedly accessed- using the Spark UI to identify the 
bottlenecked resource- remove features or columns from the output data, until 
it runs, then add them back in one at a time.- creating a static dataset small 
enough to work, and editing the query, then retesting, repeatedly until you cut 
the execution time by a significant fraction- Using the Spark UI or spark shell 
to check the skew and make sure partitions are evenly distributed

On Jul 18, 2016, at 3:33 AM, Zhiliang Zhu  wrote:
Thanks a lot for your reply .
In effect , here we tried to run the sql on kettle, hive and spark hive (by 
HiveContext) respectively, the job seems frozen  to finish to run .
In the 6 tables , need to respectively read the different columns in different 
tables for specific information , then do some simple calculation before output 
. join operation is used most in the sql . 
Best wishes! 

 

On Monday, July 18, 2016 6:24 PM, Chanh Le  wrote:
 

 Hi,What about the network (bandwidth) between hive and spark? Does it run in 
Hive before then you move to Spark?Because It's complex you can use something 
like EXPLAIN command to show what going on.



 
On Jul 18, 2016, at 5:20 PM, Zhiliang Zhu  wrote:
the sql logic in the program is very much complex , so do not describe the 
detailed codes   here .  

On Monday, July 18, 2016 6:04 PM, Zhiliang Zhu 
 wrote:
 

 Hi All,  
Here we have one application, it needs to extract different columns from 6 hive 
tables, and then does some easy calculation, there is around 100,000 number of 
rows in each table,finally need to output another table or file (with format of 
consistent columns) .
 However, after lots of days trying, the spark hive job is unthinkably slow - 
sometimes almost frozen. There is 5 nodes for spark cluster.  Could anyone 
offer some help, some idea or clue is also good. 
Thanks in advance~
Zhiliang 

   



   



  

Re: run spark apps in linux crontab

2016-07-20 Thread Mich Talebzadeh
you should source the environment file before or in the file. for example
this one is ksh type

0,5,10,15,20,25,30,35,40,45,50,55 * * * *
(/home/hduser/dba/bin/send_messages_to_Kafka.ksh >
/var/tmp/send_messages_to_Kafka.err 2>&1)

in that shell it sources the environment file

#
# Main Section
#
ENVFILE=/home/hduser/.kshrc
if [[ -f $ENVFILE ]]
then
. $ENVFILE
else
echo "Abort: $0 failed. No environment file ( $ENVFILE ) found"
exit 1
fi

Or simply in your case say

0,5,10,15,20,25,30,35,40,45,50,55 * * * *
*(/home/hduser/.bashrc;*$SPARK_HOME/bin/spark-submit...
> /var/tmp/somefile 2>&1)


HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 21 July 2016 at 04:38, Chanh Le  wrote:

> you should you use command.sh | tee file.log
>
> On Jul 21, 2016, at 10:36 AM,  
> wrote:
>
>
> thank you focus, and all.
> this problem solved by adding a line ". /etc/profile" in my shell.
>
>
> 
>
> ThanksBest regards!
> San.Luo
>
> - 原始邮件 -
> 发件人:"focus" 
> 收件人:"luohui20001" , "user@spark.apache.org" <
> user@spark.apache.org>
> 主题:Re:run spark apps in linux crontab
> 日期:2016年07月20日 18点11分
>
> Hi, I just meet this problem, too! The reason is crontab runtime doesn't
> have the variables you defined, such as $SPARK_HOME.
> I defined the $SPARK_HOME and other variables in /etc/profile like this:
>
> export $MYSCRIPTS=/opt/myscripts
> export $SPARK_HOME=/opt/spark
>
> then, in my crontab job script daily_job.sh
>
> #!/bin/sh
>
> . /etc/profile
>
> $SPARK_HOME/bin/spark-submit $MYSCRIPTS/fix_fh_yesterday.py
>
> then, in crontab -e
>
> 0 8 * * * /home/user/daily_job.sh
>
> hope this helps~
>
>
>
>
> -- Original --
> *From:* "luohui20001";
> *Date:* 2016年7月20日(星期三) 晚上6:00
> *To:* "user@spark.apache.org";
> *Subject:* run spark apps in linux crontab
>
> hi guys:
>   I add a spark-submit job into my Linux crontab list by the means
> below ,however none of them works. If I change it to a normal shell script,
> it is ok. I don't quite understand why. I checked the 8080 web ui of my
> spark cluster, no job submitted, and there is not messages in
> /home/hadoop/log.
>   Any idea is welcome.
>
> [hadoop@master ~]$ crontab -e
> 1.
> 22 21 * * * sh /home/hadoop/shellscripts/run4.sh > /home/hadoop/log
>
> and in run4.sh,it wrote:
> $SPARK_HOME/bin/spark-submit --class com.abc.myclass
> --total-executor-cores 10 --jars $SPARK_HOME/lib/MyDep.jar
> $SPARK_HOME/MyJar.jar  > /home/hadoop/log
>
> 2.
> 22 21 * * * $SPARK_HOME/bin/spark-submit --class com.abc.myclass
> --total-executor-cores 10 --jars $SPARK_HOME/lib/MyDep.jar
> $SPARK_HOME/MyJar.jar  > /home/hadoop/log
>
> 3.
> 22 21 * * * /usr/lib/spark/bin/spark-submit --class com.abc.myclass
> --total-executor-cores 10 --jars /usr/lib/spark/lib/MyDep.jar
> /usr/lib/spark/MyJar.jar  > /home/hadoop/log
>
> 4.
> 22 21 * * * hadoop /usr/lib/spark/bin/spark-submit --class com.abc.myclass
> --total-executor-cores 10 --jars /usr/lib/spark/lib/MyDep.jar
> /usr/lib/spark/MyJar.jar  > /home/hadoop/log
>
> 
>
> ThanksBest regards!
> San.Luo
>
>
>


Re: write and call UDF in spark dataframe

2016-07-20 Thread Mich Talebzadeh
something similar

def ChangeToDate (word : String) : Date = {
  //return
TO_DATE(FROM_UNIXTIME(UNIX_TIMESTAMP(word,"dd/MM/"),"-MM-dd"))
  val d1 = Date.valueOf(ReverseDate(word))
  return d1
}
sqlContext.udf.register("ChangeToDate", ChangeToDate(_:String))

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 21 July 2016 at 03:53, Divya Gehlot  wrote:

> Hi ,
> To be very specific I am looking for UDFs syntax for example which takes
> String as parameter and returns integer .. how do we define the return type
> .
>
>
> Thanks,
>
> Divya
>
> On 21 July 2016 at 00:24, Andy Davidson 
> wrote:
>
>> Hi Divya
>>
>> In general you will get better performance if you can minimize your use
>> of UDFs. Spark 2.0/ tungsten does a lot of code generation. It will have to
>> treat your UDF as a block box.
>>
>> Andy
>>
>> From: Rishabh Bhardwaj 
>> Date: Wednesday, July 20, 2016 at 4:22 AM
>> To: Rabin Banerjee 
>> Cc: Divya Gehlot , "user @spark" <
>> user@spark.apache.org>
>> Subject: Re: write and call UDF in spark dataframe
>>
>> Hi Divya,
>>
>> There is already "from_unixtime" exists in
>> org.apache.spark.sql.frunctions,
>> Rabin has used that in the sql query,if you want to use it in
>> dataframe DSL you can try like this,
>>
>> val new_df = df.select(from_unixtime($"time").as("newtime"))
>>
>>
>> Thanks,
>> Rishabh.
>>
>> On Wed, Jul 20, 2016 at 4:21 PM, Rabin Banerjee <
>> dev.rabin.baner...@gmail.com> wrote:
>>
>>> Hi Divya ,
>>>
>>> Try,
>>>
>>> val df = sqlContext.sql("select from_unixtime(ts,'-MM-dd') as `ts` from 
>>> mr")
>>>
>>> Regards,
>>> Rabin
>>>
>>> On Wed, Jul 20, 2016 at 12:44 PM, Divya Gehlot 
>>> wrote:
>>>
 Hi,
 Could somebody share example of writing and calling udf which converts
 unix tme stamp to date tiime .


 Thanks,
 Divya

>>>
>>>
>>
>


getting null when calculating time diff with unix_timestamp + spark 1.6

2016-07-20 Thread Divya Gehlot
Hi,

val lags=sqlContext.sql("select *,(unix_timestamp(time1,'$timeFmt') -
lag(unix_timestamp(time2,'$timeFmt'))) as time_diff  from df_table");

Instead of time difference in seconds I am gettng null .

Would reay appreciate the help.


Thanks,
Divya


Re: run spark apps in linux crontab

2016-07-20 Thread Chanh Le
you should you use command.sh | tee file.log

> On Jul 21, 2016, at 10:36 AM,   
> wrote:
> 
> 
> thank you focus, and all.
> this problem solved by adding a line ". /etc/profile" in my shell.
> 
> 
> 
>  
> ThanksBest regards!
> San.Luo
> 
> - 原始邮件 -
> 发件人:"focus" 
> 收件人:"luohui20001" , "user@spark.apache.org" 
> 
> 主题:Re:run spark apps in linux crontab
> 日期:2016年07月20日 18点11分
> 
> Hi, I just meet this problem, too! The reason is crontab runtime doesn't have 
> the variables you defined, such as $SPARK_HOME.
> I defined the $SPARK_HOME and other variables in /etc/profile like this:
> 
> export $MYSCRIPTS=/opt/myscripts
> export $SPARK_HOME=/opt/spark
> 
> then, in my crontab job script daily_job.sh
> 
> #!/bin/sh
> 
> . /etc/profile
> 
> $SPARK_HOME/bin/spark-submit $MYSCRIPTS/fix_fh_yesterday.py
> 
> then, in crontab -e
> 
> 0 8 * * * /home/user/daily_job.sh
> 
> hope this helps~
> 
> 
> 
> 
> -- Original --
> From: "luohui20001";
> Date: 2016年7月20日(星期三) 晚上6:00
> To: "user@spark.apache.org";
> Subject: run spark apps in linux crontab
> 
> hi guys:
>   I add a spark-submit job into my Linux crontab list by the means below 
> ,however none of them works. If I change it to a normal shell script, it is 
> ok. I don't quite understand why. I checked the 8080 web ui of my spark 
> cluster, no job submitted, and there is not messages in /home/hadoop/log.
>   Any idea is welcome.
> 
> [hadoop@master ~]$ crontab -e
> 1.
> 22 21 * * * sh /home/hadoop/shellscripts/run4.sh > /home/hadoop/log
> 
> and in run4.sh,it wrote:
> $SPARK_HOME/bin/spark-submit --class com.abc.myclass --total-executor-cores 
> 10 --jars $SPARK_HOME/lib/MyDep.jar $SPARK_HOME/MyJar.jar  > /home/hadoop/log
> 
> 2.
> 22 21 * * * $SPARK_HOME/bin/spark-submit --class com.abc.myclass 
> --total-executor-cores 10 --jars $SPARK_HOME/lib/MyDep.jar 
> $SPARK_HOME/MyJar.jar  > /home/hadoop/log
> 
> 3.
> 22 21 * * * /usr/lib/spark/bin/spark-submit --class com.abc.myclass 
> --total-executor-cores 10 --jars /usr/lib/spark/lib/MyDep.jar 
> /usr/lib/spark/MyJar.jar  > /home/hadoop/log
> 
> 4.
> 22 21 * * * hadoop /usr/lib/spark/bin/spark-submit --class com.abc.myclass 
> --total-executor-cores 10 --jars /usr/lib/spark/lib/MyDep.jar 
> /usr/lib/spark/MyJar.jar  > /home/hadoop/log
> 
> 
>  
> ThanksBest regards!
> San.Luo



回复:Re:run spark apps in linux crontab

2016-07-20 Thread luohui20001

thank you focus, and all.this problem solved by adding a line ". /etc/profile" 
in my shell.



 

ThanksBest regards!
San.Luo

- 原始邮件 -
发件人:"focus" 
收件人:"luohui20001" , "user@spark.apache.org" 

主题:Re:run spark apps in linux crontab
日期:2016年07月20日 18点11分

Hi, I just meet this problem, too! The reason is crontab runtime doesn't have 
the variables you defined, such as $SPARK_HOME.I defined the $SPARK_HOME and 
other variables in /etc/profile like this:
export $MYSCRIPTS=/opt/myscriptsexport $SPARK_HOME=/opt/spark
then, in my crontab job script daily_job.sh
#!/bin/sh
. /etc/profile
$SPARK_HOME/bin/spark-submit $MYSCRIPTS/fix_fh_yesterday.py
then, in crontab -e
0 8 * * * /home/user/daily_job.sh
hope this helps~



-- Original --From: 
"luohui20001"; Date: 2016年7月20日(星期三) 晚上6:00To: 
"user@spark.apache.org"; Subject: run spark apps in 
linux crontab
hi guys:  I add a spark-submit job into my Linux crontab list by the means 
below ,however none of them works. If I change it to a normal shell script, it 
is ok. I don't quite understand why. I checked the 8080 web ui of my spark 
cluster, no job submitted, and there is not messages in /home/hadoop/log.   
Any idea is welcome.
[hadoop@master ~]$ crontab -e1.22 21 * * * sh /home/hadoop/shellscripts/run4.sh 
> /home/hadoop/log 
and in run4.sh,it wrote:$SPARK_HOME/bin/spark-submit --class com.abc.myclass 
--total-executor-cores 10 --jars $SPARK_HOME/lib/MyDep.jar 
$SPARK_HOME/MyJar.jar  > /home/hadoop/log 
2.
22 21 * * * $SPARK_HOME/bin/spark-submit --class com.abc.myclass 
--total-executor-cores 10 --jars $SPARK_HOME/lib/MyDep.jar 
$SPARK_HOME/MyJar.jar  > /home/hadoop/log 
3.22 21 * * * /usr/lib/spark/bin/spark-submit --class com.abc.myclass 
--total-executor-cores 10 --jars /usr/lib/spark/lib/MyDep.jar 
/usr/lib/spark/MyJar.jar  > /home/hadoop/log 
4.22 21 * * * hadoop /usr/lib/spark/bin/spark-submit --class com.abc.myclass 
--total-executor-cores 10 --jars /usr/lib/spark/lib/MyDep.jar 
/usr/lib/spark/MyJar.jar  > /home/hadoop/log 


 

ThanksBest regards!
San.Luo


Re: XLConnect in SparkR

2016-07-20 Thread Felix Cheung
>From looking at be CLConnect package, its loadWorkbook() function only 
>supports reading from local file path, so you might need a way to call HDFS 
>command to get the file from HDFS first.

SparkR currently does not support this - you could read it in as a text file (I 
don't think .xlsx is a text format though), collect to get all the data at the 
driver, then save to local path perhaps?





On Wed, Jul 20, 2016 at 3:48 AM -0700, "Rabin Banerjee" 
> wrote:

Hi Yogesh ,

  I have never tried reading XLS files using Spark . But I think you can use 
sc.wholeTextFiles  to read the complete xls at once , as xls files are xml 
internally, you need to read them all to parse . Then I think you can use 
apache poi to read them .

Also, you can copy you XLS data to a MS-Access file to access via JDBC ,

Regards,
Rabin Banerjee

On Wed, Jul 20, 2016 at 2:12 PM, Yogesh Vyas 
> wrote:
Hi,

I am trying to load and read excel sheets from HDFS in sparkR using
XLConnect package.
Can anyone help me in finding out how to read xls files from HDFS in sparkR ?

Regards,
Yogesh

-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org




Re: Understanding spark concepts cluster, master, slave, job, stage, worker, executor, task

2016-07-20 Thread Taotao.Li
Hi, Sachin,  here are two posts about the basic concepts about spark:


   - spark-questions-concepts
   
   - deep-into-spark-exection-model
   


And, I fully recommend databrick's post:
https://databricks.com/blog/2016/06/22/apache-spark-key-terms-explained.html


On Thu, Jul 21, 2016 at 1:36 AM, Jean Georges Perrin  wrote:

> Hey,
>
> I love when questions are numbered, it's easier :)
>
> 1) Yes (but I am not an expert)
> 2) You don't control... One of my process is going to 8k tasks, so...
> 3) Yes, if you have HT, it double. My servers have 12 cores, but HT, so it
> makes 24.
> 4) From my understanding: Slave is the logical computational unit and
> Worker is really the one doing the job.
> 5) Dunnoh
> 6) Dunnoh
>
> On Jul 20, 2016, at 1:30 PM, Sachin Mittal  wrote:
>
> Hi,
> I was able to build and run my spark application via spark submit.
>
> I have understood some of the concepts by going through the resources at
> https://spark.apache.org but few doubts still remain. I have few specific
> questions and would be glad if someone could share some light on it.
>
> So I submitted the application using spark.masterlocal[*] and I have a
> 8 core PC.
>
> - What I understand is that application is called as job. Since mine had
> two stages it gets divided into 2 stages and each stage had number of tasks
> which ran in parallel.
> Is this understanding correct.
>
> - What I notice is that each stage is further divided into 262 tasks From
> where did this number 262 came from. Is this configurable. Would increasing
> this number improve performance.
>
> - Also I see that the tasks are run in parallel in set of 8. Is this
> because I have a 8 core PC.
>
> - What is the difference or relation between slave and worker. When I did
> spark-submit did it start 8 slaves or worker threads?
>
> - I see all worker threads running in one single JVM. Is this because I
> did not start  slaves separately and connect it to a single master cluster
> manager. If I had done that then each worker would have run in its own JVM.
>
> - What is the relationship between worker and executor. Can a worker have
> more than one executors? If yes then how do we configure that. Does all
> executor run in the worker JVM and are independent threads.
>
> I suppose that is all for now. Would appreciate any response.Will add
> followup questions if any.
>
> Thanks
> Sachin
>
>
>
>


-- 
*___*
Quant | Engineer | Boy
*___*
*blog*:http://litaotao.github.io

*github*: www.github.com/litaotao


Re: write and call UDF in spark dataframe

2016-07-20 Thread Divya Gehlot
Hi ,
To be very specific I am looking for UDFs syntax for example which takes
String as parameter and returns integer .. how do we define the return type
.


Thanks,

Divya

On 21 July 2016 at 00:24, Andy Davidson 
wrote:

> Hi Divya
>
> In general you will get better performance if you can minimize your use of
> UDFs. Spark 2.0/ tungsten does a lot of code generation. It will have to
> treat your UDF as a block box.
>
> Andy
>
> From: Rishabh Bhardwaj 
> Date: Wednesday, July 20, 2016 at 4:22 AM
> To: Rabin Banerjee 
> Cc: Divya Gehlot , "user @spark" <
> user@spark.apache.org>
> Subject: Re: write and call UDF in spark dataframe
>
> Hi Divya,
>
> There is already "from_unixtime" exists in org.apache.spark.sql.frunctions,
> Rabin has used that in the sql query,if you want to use it in
> dataframe DSL you can try like this,
>
> val new_df = df.select(from_unixtime($"time").as("newtime"))
>
>
> Thanks,
> Rishabh.
>
> On Wed, Jul 20, 2016 at 4:21 PM, Rabin Banerjee <
> dev.rabin.baner...@gmail.com> wrote:
>
>> Hi Divya ,
>>
>> Try,
>>
>> val df = sqlContext.sql("select from_unixtime(ts,'-MM-dd') as `ts` from 
>> mr")
>>
>> Regards,
>> Rabin
>>
>> On Wed, Jul 20, 2016 at 12:44 PM, Divya Gehlot 
>> wrote:
>>
>>> Hi,
>>> Could somebody share example of writing and calling udf which converts
>>> unix tme stamp to date tiime .
>>>
>>>
>>> Thanks,
>>> Divya
>>>
>>
>>
>


Re: Role-based S3 access outside of EMR

2016-07-20 Thread Everett Anderson
Thanks, Andy.

I am indeed often doing something similar, now -- copying data locally
rather than dealing with the S3 impl selection and AWS credentials issues.
It'd be nice if it worked a little easier out of the box, though!


On Tue, Jul 19, 2016 at 2:47 PM, Andy Davidson <
a...@santacruzintegration.com> wrote:

> Hi Everett
>
> I always do my initial data exploration and all our product development in
> my local dev env. I typically select a small data set and copy it to my
> local machine
>
> My main() has an optional command line argument ‘- - runLocal’ Normally I
> load data from either hdfs:/// or S3n:// . If the arg is set I read from
> file:///
>
> Sometime I use a CLI arg ‘- -dataFileURL’
>
> So in your case I would log into my data cluster and use “AWS s3 cp" to
> copy the data into my cluster and then use “SCP” to copy the data from the
> data center back to my local env.
>
> Andy
>
> From: Everett Anderson 
> Date: Tuesday, July 19, 2016 at 2:30 PM
> To: "user @spark" 
> Subject: Role-based S3 access outside of EMR
>
> Hi,
>
> When running on EMR, AWS configures Hadoop to use their EMRFS Hadoop
> FileSystem implementation for s3:// URLs and seems to install the
> necessary S3 credentials properties, as well.
>
> Often, it's nice during development to run outside of a cluster even with
> the "local" Spark master, though, which I've found to be more troublesome.
> I'm curious if I'm doing this the right way.
>
> There are two issues -- AWS credentials and finding the right combination
> of compatible AWS SDK and Hadoop S3 FileSystem dependencies.
>
> *Credentials and Hadoop Configuration*
>
> For credentials, some guides recommend setting AWS_SECRET_ACCESS_KEY and
> AWS_ACCESS_KEY_ID environment variables or putting the corresponding
> properties in Hadoop XML config files, but it seems better practice to rely
> on machine roles and not expose these.
>
> What I end up doing is, in code, when not running on EMR, creating a
> DefaultAWSCredentialsProviderChain
> 
> and then installing the following properties in the Hadoop Configuration
> using it:
>
> fs.s3.awsAccessKeyId
> fs.s3n.awsAccessKeyId
> fs.s3a.awsAccessKeyId
> fs.s3.awsSecretAccessKey
> fs.s3n.awsSecretAccessKey
> fs.s3a.awsSecretAccessKey
>
> I also set the fs.s3.impl and fs.s3n.impl properties to
> org.apache.hadoop.fs.s3a.S3AFileSystem to force them to use the S3A
> implementation since people usually use "s3://" URIs.
>
> *SDK and File System Dependencies*
>
> Some special combination
>  of the Hadoop
> version, AWS SDK version, and hadoop-aws is necessary.
>
> One working S3A combination with Spark 1.6.1 + Hadoop 2.7.x for me seems
> to be with
>
> --packages
> com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.2
>
> Is this generally what people do? Is there a better way?
>
> I realize this isn't entirely a Spark-specific problem, but as so many
> people seem to be using S3 with Spark, I imagine this community's faced the
> problem a lot.
>
> Thanks!
>
> - Everett
>
>


Re: Subquery in having-clause (Spark 1.1.0)

2016-07-20 Thread rickn
Seeing the same results on the current 1.62 release ... just wanted to
confirm.   
Are there any work arounds?  Do I need to wait for 2.0 for support ?  
https://issues.apache.org/jira/browse/SPARK-12543

Thank you



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Subquery-in-having-clause-Spark-1-1-0-tp17401p27380.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: HiveThriftServer and spark.sql.hive.thriftServer.singleSession setting

2016-07-20 Thread Chang Lim
It's an issue with the preview build.  Switched to RC5 and all is working.
Thanks to Michael Armbrust.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/HiveThriftServer-and-spark-sql-hive-thriftServer-singleSession-setting-tp27340p27379.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: MultiThreading in Spark 1.6.0

2016-07-20 Thread Maciej Bryński
RK Aduri,
Another idea is to union all results and then run collect.
The question is how big collected data is.

2016-07-20 20:32 GMT+02:00 RK Aduri :
> Spark version: 1.6.0
> So, here is the background:
>
> I have a data frame (Large_Row_DataFrame) which I have created from an
> array of row objects and also have another array of unique ids (U_ID) which
> I’m going to use to look up into the Large_Row_DataFrame (which is cached)
> to do a customized function.
>For the each lookup for each unique id, I do a collect on the cached
> dataframe Large_Row_DataFrame. This means that they would be a bunch of
> ‘collect’ actions which Spark has to run. Since I’m executing this in a loop
> for each unique id (U_ID), all the such collect actions run in sequential
> mode.
>
> Solution that I implemented:
>
> To avoid the sequential wait of each collect, I have created few subsets of
> unique ids with a specific size and run each thread for such a subset. For
> each such subset, I executed a thread which is a spark job that runs
> collects in sequence only for that subset. And, I have created as many
> threads as subsets, each thread handling each subset. Surprisingly, The
> resultant run time is better than the earlier sequential approach.
>
> Now the question:
>
> Is the multithreading a correct approach towards the solution? Or 
> could
> there be a better way of doing this.
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/MultiThreading-in-Spark-1-6-0-tp27374.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>



-- 
Maciek Bryński

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: PySpark 2.0 Structured Streaming Question

2016-07-20 Thread Tathagata Das
foreachWriter is not currently available in the python. we dont have a
clear plan yet on when foreachWriter will be available in Python.

On Wed, Jul 20, 2016 at 1:22 PM, A.W. Covert III  wrote:

> Hi All,
>
> I've been digging into spark 2.0, I have some streaming jobs running well
> on YARN, and I'm working on some Spark Structured Streaming jobs now.
>
> I have a couple of jobs I'd like to move to Structured Streaming with the
> `foreachWriter` but it's not available in PySpark yet. Is it just because
> Structured Streaming is still in alpha or is there some technical subtlety
> that I'm overlooking?
>
> Thanks,
>
> Art Covert
>
> http://covertphotography.tumblr.com/
> https://github.com/covertspartan/
>
>
>


Re: Little idea needed

2016-07-20 Thread Aakash Basu
Thanks for the detailed description buddy. But this will actually be done
through NiFi (End to End) so we need to add the delta logic inside NiFi to
automate the whole process.

That's why, need a good (best) solution to solve this problem. Since, this
is a classic issue which we can face any company we work with.
On 20-Jul-2016 1:38 AM, "Mich Talebzadeh"  wrote:

> Well this is a classic.
>
> The initial load can be done through Sqoop (outside of Spark) or through
> JDBC connection in Spark. 10 million rows in nothing.
>
> Then you have to think of updates and deletes in addition to new rows.
>
> With Sqoop you can load from the last ID in the source table, assuming
> that you have a unique key in Your Oracle table.
>
> If you have 10 new roes and I assume you know how to load these rows from
> Oracle.
>
> I suggest that you add two additional columns to your HDFS/target table,
>
> ,op_type int
> ,op_time timestamp
>
> These two columns will specify the row type op_type = 1,2,3
> INSERT/UPDATE/DELETE and op_time = cast(from_unixtime(unix_timestamp())
> AS op_time) when the record was added.
>
> So you will end up with two additional columns in your HDFS table compared
> to Oracle table and that will be your staging table.
>
> Of course you can do real time analytics through Oracle GoldenGate that
> read the redolog of the source table in Oracle or better Sap Replication
> Server (SRS). You will achieve real-time integration between RDBMS tables
> and Big Data.
>
> Once you have you have the staging table (immutable) and the rest is
> pretty easy. You have the full Entity Life History in this case for records
> and you can do your queries on them.
>
> HTH
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 19 July 2016 at 20:27, Aakash Basu  wrote:
>
>> Hi all,
>>
>> I'm trying to pull a full table from oracle, which is huge with some 10
>> million records which will be the initial load to HDFS.
>>
>> Then I will do delta loads everyday in the same folder in HDFS.
>>
>> Now, my query here is,
>>
>> DAY 0 - I did the initial load (full dump).
>>
>> DAY 1 - I'll load only that day's data which has suppose 10 records (5
>> old with some column's value altered and 5 new).
>>
>> Here, my question is, how will I push this file to HDFS through Spark
>> code, if I do append, it will create duplicates (which i don't want), if i
>> keep separate files and while using it in other program am giving the path
>> of it as folder which contains all files /. But in this case also the
>> registerTempTable will have duplicates for those 5 old rows.
>>
>> What is the BEST logic to be applied here?
>>
>> I tried to resolve this by doing a search in that file of the records if
>> matching load the new ones by deleting the old, but this will be time
>> consuming for such a huge record, right?
>>
>> Please help!
>>
>> Thanks,
>> Aakash.
>>
>
>


Re: Little idea needed

2016-07-20 Thread Aakash Basu
Your second point: That's going to be a bottleneck for all the programs
which will fetch the data from that folder and again add extra filters into
the DF. I want to finish that off, there itself.

And that merge logic is weak when one table is huge and the other is very
small (which is the case here), it literally gulps memory and time.

And business won't allow Hive and all else to be used AT ALL, since we may
shift to EMR where Hive has compatibility issues maybe (need to check).
On 20-Jul-2016 1:27 AM, "Jörn Franke"  wrote:

Well as far as I know there is some update statement planned for spark, but
not sure which release. You could alternatively use Hive+Orc.
Another alternative would be to add the deltas in a separate file and when
accessing the table filtering out the double entries. From time to time you
could have a merge process creating one file out of all the deltas.

On 19 Jul 2016, at 21:27, Aakash Basu  wrote:

Hi all,

I'm trying to pull a full table from oracle, which is huge with some 10
million records which will be the initial load to HDFS.

Then I will do delta loads everyday in the same folder in HDFS.

Now, my query here is,

DAY 0 - I did the initial load (full dump).

DAY 1 - I'll load only that day's data which has suppose 10 records (5 old
with some column's value altered and 5 new).

Here, my question is, how will I push this file to HDFS through Spark code,
if I do append, it will create duplicates (which i don't want), if i keep
separate files and while using it in other program am giving the path of it
as folder which contains all files /. But in this case also the
registerTempTable will have duplicates for those 5 old rows.

What is the BEST logic to be applied here?

I tried to resolve this by doing a search in that file of the records if
matching load the new ones by deleting the old, but this will be time
consuming for such a huge record, right?

Please help!

Thanks,
Aakash.


PySpark 2.0 Structured Streaming Question

2016-07-20 Thread A.W. Covert III
Hi All,

I've been digging into spark 2.0, I have some streaming jobs running well
on YARN, and I'm working on some Spark Structured Streaming jobs now.

I have a couple of jobs I'd like to move to Structured Streaming with the
`foreachWriter` but it's not available in PySpark yet. Is it just because
Structured Streaming is still in alpha or is there some technical subtlety
that I'm overlooking?

Thanks,

Art Covert

http://covertphotography.tumblr.com/
https://github.com/covertspartan/


SparkWebUI and Master URL on EC2

2016-07-20 Thread KhajaAsmath Mohammed
Hi,

I got an access to spark cluser and have intstatiated spark-shell on aws
using command
$spark-shell.

Spark shell is started successfully but I am looking to access WebUI and
Master URL. does anyone know how to access that in AWS.

I tried http://IPMaster:4040 and http://IpMaster:8080 but it didnt show up
anything.

Thanks,
Asmath.


Re: ML PipelineModel to be scored locally

2016-07-20 Thread Peyman Mohajerian
One option is to save the model in parquet or json format and then build
your own prediction code. Some also use:

https://github.com/jpmml/jpmml-sparkml

It depends on the model, e.g. ml v mllib and other factors whether this
works on or not. Couple of weeks ago there was a long discussion on this
topic.

On Wed, Jul 20, 2016 at 7:08 AM, Simone Miraglia 
wrote:

> Hi all,
>
> I am working on the following use case involving ML Pipelines.
>
> 1. I created a Pipeline composed from a set of stages
> 2. I called "fit" method on my training set
> 3. I validated my model by calling "transform" on my test set
> 4. I stored my fitted Pipeline to a shared folder
>
> Then I have a very low latency interactive application (say a kinda of web
> service), that should work as follows:
> 1. The app receives a request
> 2. A scoring needs to be made, according to my fitted PipelineModel
> 3. The app sends the score to the caller, in a synchronous fashion
>
> Is there a way to call the .transform method of the PipelineModel over a
> single Row?
>
> I will definitely not want to parallelize a single record to a DataFrame,
> nor relying on Spark Streaming due to latency requirements.
> I would like to use something similar to mllib .predict(Vector) method
> which does not rely on Spark Context performing all the computation locally.
>
> Thanks in advance
> Best
>


Using multiple data sources in one stream

2016-07-20 Thread Joe Panciera
Hi,

I have a rather complicated situation thats raised an issue regarding
consuming multiple data sources for processing. Unlike the use cases I've
found, I have 3 sources of different formats. There's one 'main' stream A
that does the processing, and 2 sources B and C that provide elements
required for processing. All 3 sources come from Kinesis streams and are
updated in real time, so using a broadcast variable or passing a file to
each node won't work. Also, each data source is different so I won't be
able to join the streams.

Source A is an event stream, source B is used for processing events, and
source C us used for filtering events. State needs to be maintained for
source A and source B. So my question is, how can I pull in data from
multiple, disparate sources and feed it into a single stream?

Thanks


Re: HiveThriftServer and spark.sql.hive.thriftServer.singleSession setting

2016-07-20 Thread Chang Lim
Would appreciate if someone:
1. Can confirm if this is an issue or 
2. Share on how to get HiveThriftServer2.startWithContext working with
shared temp table.

I am using Beeline as the JDBC client to access the temp tables of the
running Spark app.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/HiveThriftServer-and-spark-sql-hive-thriftServer-singleSession-setting-tp27340p27375.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



MultiThreading in Spark 1.6.0

2016-07-20 Thread RK Aduri
Spark version: 1.6.0 
So, here is the background:

I have a data frame (Large_Row_DataFrame) which I have created from an
array of row objects and also have another array of unique ids (U_ID) which
I’m going to use to look up into the Large_Row_DataFrame (which is cached)
to do a customized function. 
   For the each lookup for each unique id, I do a collect on the cached
dataframe Large_Row_DataFrame. This means that they would be a bunch of
‘collect’ actions which Spark has to run. Since I’m executing this in a loop
for each unique id (U_ID), all the such collect actions run in sequential
mode. 

Solution that I implemented:

To avoid the sequential wait of each collect, I have created few subsets of
unique ids with a specific size and run each thread for such a subset. For
each such subset, I executed a thread which is a spark job that runs
collects in sequence only for that subset. And, I have created as many
threads as subsets, each thread handling each subset. Surprisingly, The
resultant run time is better than the earlier sequential approach.

Now the question:

Is the multithreading a correct approach towards the solution? Or could
there be a better way of doing this.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/MultiThreading-in-Spark-1-6-0-tp27374.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Saving a pyspark.ml.feature.PCA model

2016-07-20 Thread Ajinkya Kale
Just found Google dataproc has a preview of spark 2.0. Tried it and
save/load works! Thanks Shuai.
Followup question - is there a way to export the pyspark.ml models to PMML
? If not, what is the best way to integrate the model for inference in a
production service ?

On Tue, Jul 19, 2016 at 8:22 PM Ajinkya Kale  wrote:

> I am using google cloud dataproc which comes with spark 1.6.1. So upgrade
> is not really an option.
> No way / hack to save the models in spark 1.6.1 ?
>
> On Tue, Jul 19, 2016 at 8:13 PM Shuai Lin  wrote:
>
>> It's added in not-released-yet 2.0.0 version.
>>
>> https://issues.apache.org/jira/browse/SPARK-13036
>> https://github.com/apache/spark/commit/83302c3b
>>
>> so i guess you need to wait for 2.0 release (or use the current rc4).
>>
>> On Wed, Jul 20, 2016 at 6:54 AM, Ajinkya Kale 
>> wrote:
>>
>>> Is there a way to save a pyspark.ml.feature.PCA model ? I know mllib has
>>> that but mllib does not have PCA afaik. How do people do model persistence
>>> for inference using the pyspark ml models ? Did not find any documentation
>>> on model persistency for ml.
>>>
>>> --ajinkya
>>>
>>
>>


Re: Building standalone spark application via sbt

2016-07-20 Thread Sachin Mittal
I got the error during run time. It was for mongo-spark-connector class
files.
My build.sbt is like this

name := "Test Advice Project"
version := "1.0"
scalaVersion := "2.10.6"
libraryDependencies ++= Seq(
"org.mongodb.spark" %% "mongo-spark-connector" % "1.0.0",
"org.apache.spark" %% "spark-core" % "1.6.1" % "provided",
"org.apache.spark" %% "spark-sql" % "1.6.1" % "provided"
)

assemblyMergeStrategy in assembly := {
   case PathList("META-INF", xs @ _*) => MergeStrategy.discard
   case x => MergeStrategy.first
}

and I create the fat jar using sbt assembly
I have also added addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")
in ~/.sbt/0.13/plugins/plugins.sbt for sbt assembly to work.

I think when you use provided it does not include those jars in your fat
jar.

I am using spark 1.6

Thanks
Sachin



On Wed, Jul 20, 2016 at 11:23 PM, Marco Mistroni 
wrote:

>  that will work but ideally you should not include any of the
> spark-releated jars as they are provided to you by the spark environment
> whenever you launch your app via spark-submit (this will prevent unexpected
> errors e.g. when you kick off your app using a different version of spark
> where some of the classes has been renamd or movedaround -tbh i don't think
> this is a case that happen often)
>
> Btw did you get the NoClassDefFoundException at compile time or run
> time?if at run time, what is your Spark Version  and what is the spark
> libraries version you used in your sbt?
> are you using a Spark version pre 1.4?
>
> kr
>  marco
>
>
>
>
>
>
> On Wed, Jul 20, 2016 at 6:13 PM, Sachin Mittal  wrote:
>
>> NoClassDefFound error was for spark classes like say SparkConext.
>> When running a standalone spark application I was not passing external
>> jars using --jars option.
>>
>> However I have fixed this by making a fat jar using sbt assembly plugin.
>>
>> Now all the dependencies are included in that jar and I use that jar in
>> spark-submit
>>
>> Thanks
>> Sachin
>>
>>
>> On Wed, Jul 20, 2016 at 9:42 PM, Marco Mistroni 
>> wrote:
>>
>>> Hello Sachin
>>>   pls paste the NoClassDefFound Exception so we can see what's failing,
>>> aslo please advise how are you running your Spark App
>>> For an extremely simple case, let's assume  you have your
>>> MyFirstSparkApp packaged in your   myFirstSparkApp.jar
>>> Then all you need to do would be to kick off
>>>
>>> spark-submit --class MyFirstSparkApp   myFirstSparkApp.jar
>>>
>>> if you have any external dependencies (not spark , let's assume you are
>>> using common-utils.jar) then you should be able to kick it off via
>>>
>>> spark-submit --class MyFirstSparkApp --jars common-utiils.jar
>>> myFirstSparkApp.jar
>>>
>>> I paste below the build.sbt i am using for my SparkExamples apps, hope
>>> this helps.
>>> kr
>>>  marco
>>>
>>> name := "SparkExamples"
>>>
>>> version := "1.0"
>>>
>>> scalaVersion := "2.10.5"
>>>
>>>
>>> // Add a single dependency
>>> libraryDependencies += "junit" % "junit" % "4.8" % "test"
>>> libraryDependencies += "org.mockito" % "mockito-core" % "1.9.5"
>>> libraryDependencies ++= Seq("org.slf4j" % "slf4j-api" % "1.7.5",
>>> "org.slf4j" % "slf4j-simple" % "1.7.5",
>>> "org.clapper" %% "grizzled-slf4j" % "1.0.2")
>>> libraryDependencies += "org.powermock" %
>>> "powermock-mockito-release-full" % "1.5.4" % "test"
>>> libraryDependencies += "org.apache.spark" %% "spark-core"   % "1.6.1" %
>>> "provided"
>>> libraryDependencies += "org.apache.spark" %% "spark-streaming"   %
>>> "1.6.1" % "provided"
>>> libraryDependencies += "org.apache.spark" %% "spark-mllib"   % "1.6.1"
>>> % "provided"
>>> libraryDependencies += "org.apache.spark" %% "spark-streaming-flume"   %
>>> "1.3.0"  % "provided"
>>> resolvers += "softprops-maven" at "
>>> http://dl.bintray.com/content/softprops/maven;
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Wed, Jul 20, 2016 at 3:39 PM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 you need an uber jar file.

 Have you actually followed the dependencies and project sub-directory
 build?

 check this.


 http://stackoverflow.com/questions/28459333/how-to-build-an-uber-jar-fat-jar-using-sbt-within-intellij-idea

 under three answers the top one.

 I started reading the official SBT tutorial
 .  .

 HTH

 Dr Mich Talebzadeh



 LinkedIn * 
 https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 *



 http://talebzadehmich.wordpress.com


 *Disclaimer:* Use it at your own risk. Any and all responsibility for
 any loss, damage or destruction of data or any other property which may
 arise from relying on this email's technical content is 

Re: Building standalone spark application via sbt

2016-07-20 Thread Marco Mistroni
 that will work but ideally you should not include any of the
spark-releated jars as they are provided to you by the spark environment
whenever you launch your app via spark-submit (this will prevent unexpected
errors e.g. when you kick off your app using a different version of spark
where some of the classes has been renamd or movedaround -tbh i don't think
this is a case that happen often)

Btw did you get the NoClassDefFoundException at compile time or run time?if
at run time, what is your Spark Version  and what is the spark libraries
version you used in your sbt?
are you using a Spark version pre 1.4?

kr
 marco






On Wed, Jul 20, 2016 at 6:13 PM, Sachin Mittal  wrote:

> NoClassDefFound error was for spark classes like say SparkConext.
> When running a standalone spark application I was not passing external
> jars using --jars option.
>
> However I have fixed this by making a fat jar using sbt assembly plugin.
>
> Now all the dependencies are included in that jar and I use that jar in
> spark-submit
>
> Thanks
> Sachin
>
>
> On Wed, Jul 20, 2016 at 9:42 PM, Marco Mistroni 
> wrote:
>
>> Hello Sachin
>>   pls paste the NoClassDefFound Exception so we can see what's failing,
>> aslo please advise how are you running your Spark App
>> For an extremely simple case, let's assume  you have your
>> MyFirstSparkApp packaged in your   myFirstSparkApp.jar
>> Then all you need to do would be to kick off
>>
>> spark-submit --class MyFirstSparkApp   myFirstSparkApp.jar
>>
>> if you have any external dependencies (not spark , let's assume you are
>> using common-utils.jar) then you should be able to kick it off via
>>
>> spark-submit --class MyFirstSparkApp --jars common-utiils.jar
>> myFirstSparkApp.jar
>>
>> I paste below the build.sbt i am using for my SparkExamples apps, hope
>> this helps.
>> kr
>>  marco
>>
>> name := "SparkExamples"
>>
>> version := "1.0"
>>
>> scalaVersion := "2.10.5"
>>
>>
>> // Add a single dependency
>> libraryDependencies += "junit" % "junit" % "4.8" % "test"
>> libraryDependencies += "org.mockito" % "mockito-core" % "1.9.5"
>> libraryDependencies ++= Seq("org.slf4j" % "slf4j-api" % "1.7.5",
>> "org.slf4j" % "slf4j-simple" % "1.7.5",
>> "org.clapper" %% "grizzled-slf4j" % "1.0.2")
>> libraryDependencies += "org.powermock" % "powermock-mockito-release-full"
>> % "1.5.4" % "test"
>> libraryDependencies += "org.apache.spark" %% "spark-core"   % "1.6.1" %
>> "provided"
>> libraryDependencies += "org.apache.spark" %% "spark-streaming"   %
>> "1.6.1" % "provided"
>> libraryDependencies += "org.apache.spark" %% "spark-mllib"   % "1.6.1"  %
>> "provided"
>> libraryDependencies += "org.apache.spark" %% "spark-streaming-flume"   %
>> "1.3.0"  % "provided"
>> resolvers += "softprops-maven" at "
>> http://dl.bintray.com/content/softprops/maven;
>>
>>
>>
>>
>>
>>
>>
>>
>> On Wed, Jul 20, 2016 at 3:39 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> you need an uber jar file.
>>>
>>> Have you actually followed the dependencies and project sub-directory
>>> build?
>>>
>>> check this.
>>>
>>>
>>> http://stackoverflow.com/questions/28459333/how-to-build-an-uber-jar-fat-jar-using-sbt-within-intellij-idea
>>>
>>> under three answers the top one.
>>>
>>> I started reading the official SBT tutorial
>>> .  .
>>>
>>> HTH
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>> On 20 July 2016 at 09:54, Sachin Mittal  wrote:
>>>
 Hi,
 I am following the example under
 https://spark.apache.org/docs/latest/quick-start.html
 For standalone scala application.

 I added all my dependencies via build.sbt (one dependency is under lib
 folder).

 When I run sbt package I see the jar created under
 target/scala-2.10/

 So compile seems to be working fine. However when I inspect that jar,
 it only contains my scala class.
 Unlike in java application we build a standalone jar, which contains
 all the dependencies inside that jar, here all the dependencies are 
 missing.

 So as expected when I run the application via spark-submit I get the
 NoClassDefFoundError.

 Here is my build.sbt

 name := "Test Advice Project"
 version := "1.0"
 scalaVersion := 

Attribute name "sum(proceeds)" contains invalid character(s) among " ,;{}()\n\t="

2016-07-20 Thread Chanh Le
Hi everybody,
I got a error about the name of the columns is not following the rule. 
Please tell me the way to fix it.
Here is my code

metricFields
Here is a Seq of metrics: spent, proceed, click, impression
 sqlContext
  .sql(s"select * from hourly where time between '$dateStr-00' and 
'$dateStr-23' ")
  .groupBy("time", dimensions.filter(!"time".contains(_)): _*)
  .agg(metricFields.map(a => a -> "sum").toMap)
Error message was:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Attribute 
name "sum(proceeds)" contains invalid character(s) among " ,;{}()\n\t=". Please 
use alias to rename it.;
at 
org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter$.checkConversionRequirement(CatalystSchemaConverter.scala:556)
at 
org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter$.checkFieldName(CatalystSchemaConverter.scala:542)
at 
org.apache.spark.sql.execution.datasources.parquet.CatalystWriteSupport$$anonfun$setSchema$2.apply(CatalystWriteSupport.scala:430)
at 
org.apache.spark.sql.execution.datasources.parquet.CatalystWriteSupport$$anonfun$setSchema$2.apply(CatalystWriteSupport.scala:430)
at scala.collection.immutable.List.foreach(List.scala:318)
at 
org.apache.spark.sql.execution.datasources.parquet.CatalystWriteSupport$.setSchema(CatalystWriteSupport.scala:430)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.prepareJobForWrite(ParquetRelation.scala:258)
at 
org.apache.spark.sql.execution.datasources.BaseWriterContainer.driverSideSetup(WriterContainer.scala:103)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:147)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58)
at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56)
at 
org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
at 
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
at 
org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:256)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139)
at 
org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:334)
at jobs.DailyJob$delayedInit$body.apply(DailyJob.scala:46)
at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
at 
scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.collection.immutable.List.foreach(List.scala:318)
at 
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
at scala.App$class.main(App.scala:71)
at jobs.DailyJob$.main(DailyJob.scala:12)
at jobs.DailyJob.main(DailyJob.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Re: Understanding spark concepts cluster, master, slave, job, stage, worker, executor, task

2016-07-20 Thread Jean Georges Perrin
Hey,

I love when questions are numbered, it's easier :)

1) Yes (but I am not an expert)
2) You don't control... One of my process is going to 8k tasks, so...
3) Yes, if you have HT, it double. My servers have 12 cores, but HT, so it 
makes 24.
4) From my understanding: Slave is the logical computational unit and Worker is 
really the one doing the job. 
5) Dunnoh
6) Dunnoh

> On Jul 20, 2016, at 1:30 PM, Sachin Mittal  wrote:
> 
> Hi,
> I was able to build and run my spark application via spark submit.
> 
> I have understood some of the concepts by going through the resources at 
> https://spark.apache.org  but few doubts still 
> remain. I have few specific questions and would be glad if someone could 
> share some light on it.
> 
> So I submitted the application using spark.masterlocal[*] and I have a 8 
> core PC.
> 
> - What I understand is that application is called as job. Since mine had two 
> stages it gets divided into 2 stages and each stage had number of tasks which 
> ran in parallel.
> Is this understanding correct.
> 
> - What I notice is that each stage is further divided into 262 tasks From 
> where did this number 262 came from. Is this configurable. Would increasing 
> this number improve performance.
> 
> - Also I see that the tasks are run in parallel in set of 8. Is this because 
> I have a 8 core PC.
> 
> - What is the difference or relation between slave and worker. When I did 
> spark-submit did it start 8 slaves or worker threads?
> 
> - I see all worker threads running in one single JVM. Is this because I did 
> not start  slaves separately and connect it to a single master cluster 
> manager. If I had done that then each worker would have run in its own JVM.
> 
> - What is the relationship between worker and executor. Can a worker have 
> more than one executors? If yes then how do we configure that. Does all 
> executor run in the worker JVM and are independent threads.
> 
> I suppose that is all for now. Would appreciate any response.Will add 
> followup questions if any.
> 
> Thanks
> Sachin
> 
> 



Re: OutOfMemory when doing joins in spark 2.0 while same code runs fine in spark 1.5.2

2016-07-20 Thread Ian O'Connell
Ravi did your issue ever get solved for this?

I think i've been hitting the same thing, it looks like
the spark.sql.autoBroadcastJoinThreshold stuff isn't kicking in as
expected, if I set that to -1 then the computation proceeds successfully.

On Tue, Jun 14, 2016 at 12:28 AM, Ravi Aggarwal  wrote:

> Hi,
>
>
>
> Is there any breakthrough here?
>
>
>
> I had one more observation while debugging the issue
>
> Here are the 4 types of data I had:
>
>
>
> Da -> stored in parquet
>
> Di -> stored in parquet
>
> Dl1 -> parquet version of lookup
>
> Dl2 -> hbase version of lookup
>
>
>
> Joins performed and type of join done by spark:
>
> Da and Di Sort-merge failed (OOM)
>
> Da and Dl1   B-H passed
>
> Da and Dl2   Sort-Mergepassed
>
> Di and Dl1B-H passed
>
> Di and Dl2Sort-Mergefailed (OOM)
>
>
>
> From entries I can deduce that problem is with sort-merge join involving
> Di.
>
> So the hbase thing is out of equation, that is not the culprit.
>
> In physical plan I could see there are only two operations that are done
> additionally in sort-merge as compared to Broadcast-hash.
>
> è Exchange Hashpartitioning
>
> è Sort
>
> And finally sort-merge join.
>
>
>
> Can we deduce anything from this?
>
>
>
> Thanks
>
> Ravi
>
> *From:* Ravi Aggarwal
> *Sent:* Friday, June 10, 2016 12:31 PM
> *To:* 'Ted Yu' 
> *Cc:* user 
> *Subject:* RE: OutOfMemory when doing joins in spark 2.0 while same code
> runs fine in spark 1.5.2
>
>
>
> Hi Ted,
>
> Thanks for the reply.
>
>
>
> Here is the code
>
> Btw – df.count is running fine on dataframe generated from this default
> source. I think it is something in the combination of join and hbase data
> source that is creating issue. But not sure entirely.
>
> I have also dumped the physical plans of both approaches s3a/s3a join and
> s3a/hbase join, In case you want that let me know.
>
>
>
> import org.apache.hadoop.fs.FileStatus
>
> import org.apache.hadoop.hbase.client._
>
> import org.apache.hadoop.hbase.io.ImmutableBytesWritable
>
> import org.apache.hadoop.hbase.mapreduce.{TableInputFormat}
>
> import org.apache.hadoop.hbase._
>
> import org.apache.hadoop.mapreduce.Job
>
> import org.apache.spark.rdd.RDD
>
> import org.apache.spark.sql.Row
>
> import org.apache.spark.sql.catalyst.CatalystTypeConverters
>
> import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
>
> import org.apache.spark.sql.execution.datasources.{OutputWriterFactory,
> FileFormat}
>
> import org.apache.spark.sql.sources._
>
> import org.apache.spark.sql.types._
>
> import org.apache.spark.sql._
>
> import org.slf4j.LoggerFactory
>
>
>
> class DefaultSource extends SchemaRelationProvider with FileFormat {
>
>
>
>   override def createRelation(sqlContext: SQLContext, parameters:
> Map[String, String], schema: StructType) = {
>
> new HBaseRelation(schema, parameters)(sqlContext)
>
>   }
>
>
>
>   def inferSchema(sparkSession: SparkSession,
>
>   options: Map[String, String],
>
>   files: Seq[FileStatus]): Option[StructType] = ???
>
>
>
>   def prepareWrite(sparkSession: SparkSession,
>
>job: Job,
>
>options: Map[String, String],
>
>dataSchema: StructType): OutputWriterFactory = ???
>
> }
>
>
>
> object HBaseConfigurationUtil {
>
>   lazy val logger = LoggerFactory.getLogger("HBaseConfigurationUtil")
>
>   val hbaseConfiguration = (tableName: String, hbaseQuorum: String) => {
>
> val conf = HBaseConfiguration.create()
>
> conf.set(TableInputFormat.INPUT_TABLE, tableName)
>
> conf.set("hbase.mapred.outputtable", tableName)
>
> conf.set("hbase.zookeeper.quorum", hbaseQuorum)
>
> conf
>
>   }
>
> }
>
>
>
> class HBaseRelation(val schema: StructType, parameters: Map[String,
> String])
>
>(@transient val sqlContext: SQLContext) extends
> BaseRelation with TableScan {
>
>
>
>   import sqlContext.sparkContext
>
>
>
>   override def buildScan(): RDD[Row] = {
>
>
>
> val bcDataSchema = sparkContext.broadcast(schema)
>
>
>
> val tableName = parameters.get("path") match {
>
>   case Some(t) => t
>
>   case _ => throw new RuntimeException("Table name (path) not provided
> in parameters")
>
> }
>
>
>
> val hbaseQuorum = parameters.get("hbaseQuorum") match {
>
>   case Some(s: String) => s
>
>   case _ => throw new RuntimeException("hbaseQuorum not provided in
> options")
>
> }
>
>
>
> val rdd = sparkContext.newAPIHadoopRDD(
>
>   HBaseConfigurationUtil.hbaseConfiguration(tableName, hbaseQuorum),
>
>   classOf[TableInputFormat],
>
>   classOf[ImmutableBytesWritable],
>
>   classOf[Result]
>
> )
>
>
>
> val rowRdd = rdd
>
>   .map(tuple => tuple._2)
>
>   .map { record =>
>
>
>
>   val cells: java.util.List[Cell] = 

Understanding spark concepts cluster, master, slave, job, stage, worker, executor, task

2016-07-20 Thread Sachin Mittal
Hi,
I was able to build and run my spark application via spark submit.

I have understood some of the concepts by going through the resources at
https://spark.apache.org but few doubts still remain. I have few specific
questions and would be glad if someone could share some light on it.

So I submitted the application using spark.masterlocal[*] and I have a
8 core PC.

- What I understand is that application is called as job. Since mine had
two stages it gets divided into 2 stages and each stage had number of tasks
which ran in parallel.
Is this understanding correct.

- What I notice is that each stage is further divided into 262 tasks From
where did this number 262 came from. Is this configurable. Would increasing
this number improve performance.

- Also I see that the tasks are run in parallel in set of 8. Is this
because I have a 8 core PC.

- What is the difference or relation between slave and worker. When I did
spark-submit did it start 8 slaves or worker threads?

- I see all worker threads running in one single JVM. Is this because I did
not start  slaves separately and connect it to a single master cluster
manager. If I had done that then each worker would have run in its own JVM.

- What is the relationship between worker and executor. Can a worker have
more than one executors? If yes then how do we configure that. Does all
executor run in the worker JVM and are independent threads.

I suppose that is all for now. Would appreciate any response.Will add
followup questions if any.

Thanks
Sachin


Re: RandomForestClassifier

2016-07-20 Thread Marco Mistroni
Hi
 afaik yes (other pls override ). Generally, in RandomForest and
DecisionTree you have a column which you are trying to 'predict' (the
label) and a set of features that are used to predict the outcome.
i would assume that if you specify thelabel column and the 'features'
columns, everything else is ignored...

kr

On Wed, Jul 20, 2016 at 6:03 PM, pseudo oduesp 
wrote:

> hi ,
>  we have parmaters named
>
>
> labelCol="labe"
>
>
> ,featuresCol="features",
>
> when i precise the value here (label and features)  if  train my model on
> data frame with other columns  tha algorithme choos only label columns and
> features columns ?
>
> thanks
>


Re: Building standalone spark application via sbt

2016-07-20 Thread Sachin Mittal
NoClassDefFound error was for spark classes like say SparkConext.
When running a standalone spark application I was not passing external jars
using --jars option.

However I have fixed this by making a fat jar using sbt assembly plugin.

Now all the dependencies are included in that jar and I use that jar in
spark-submit

Thanks
Sachin


On Wed, Jul 20, 2016 at 9:42 PM, Marco Mistroni  wrote:

> Hello Sachin
>   pls paste the NoClassDefFound Exception so we can see what's failing,
> aslo please advise how are you running your Spark App
> For an extremely simple case, let's assume  you have your  MyFirstSparkApp
> packaged in your   myFirstSparkApp.jar
> Then all you need to do would be to kick off
>
> spark-submit --class MyFirstSparkApp   myFirstSparkApp.jar
>
> if you have any external dependencies (not spark , let's assume you are
> using common-utils.jar) then you should be able to kick it off via
>
> spark-submit --class MyFirstSparkApp --jars common-utiils.jar
> myFirstSparkApp.jar
>
> I paste below the build.sbt i am using for my SparkExamples apps, hope
> this helps.
> kr
>  marco
>
> name := "SparkExamples"
>
> version := "1.0"
>
> scalaVersion := "2.10.5"
>
>
> // Add a single dependency
> libraryDependencies += "junit" % "junit" % "4.8" % "test"
> libraryDependencies += "org.mockito" % "mockito-core" % "1.9.5"
> libraryDependencies ++= Seq("org.slf4j" % "slf4j-api" % "1.7.5",
> "org.slf4j" % "slf4j-simple" % "1.7.5",
> "org.clapper" %% "grizzled-slf4j" % "1.0.2")
> libraryDependencies += "org.powermock" % "powermock-mockito-release-full"
> % "1.5.4" % "test"
> libraryDependencies += "org.apache.spark" %% "spark-core"   % "1.6.1" %
> "provided"
> libraryDependencies += "org.apache.spark" %% "spark-streaming"   % "1.6.1"
> % "provided"
> libraryDependencies += "org.apache.spark" %% "spark-mllib"   % "1.6.1"  %
> "provided"
> libraryDependencies += "org.apache.spark" %% "spark-streaming-flume"   %
> "1.3.0"  % "provided"
> resolvers += "softprops-maven" at "
> http://dl.bintray.com/content/softprops/maven;
>
>
>
>
>
>
>
>
> On Wed, Jul 20, 2016 at 3:39 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> you need an uber jar file.
>>
>> Have you actually followed the dependencies and project sub-directory
>> build?
>>
>> check this.
>>
>>
>> http://stackoverflow.com/questions/28459333/how-to-build-an-uber-jar-fat-jar-using-sbt-within-intellij-idea
>>
>> under three answers the top one.
>>
>> I started reading the official SBT tutorial
>> .  .
>>
>> HTH
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 20 July 2016 at 09:54, Sachin Mittal  wrote:
>>
>>> Hi,
>>> I am following the example under
>>> https://spark.apache.org/docs/latest/quick-start.html
>>> For standalone scala application.
>>>
>>> I added all my dependencies via build.sbt (one dependency is under lib
>>> folder).
>>>
>>> When I run sbt package I see the jar created under
>>> target/scala-2.10/
>>>
>>> So compile seems to be working fine. However when I inspect that jar, it
>>> only contains my scala class.
>>> Unlike in java application we build a standalone jar, which contains all
>>> the dependencies inside that jar, here all the dependencies are missing.
>>>
>>> So as expected when I run the application via spark-submit I get the
>>> NoClassDefFoundError.
>>>
>>> Here is my build.sbt
>>>
>>> name := "Test Advice Project"
>>> version := "1.0"
>>> scalaVersion := "2.10.6"
>>> libraryDependencies ++= Seq(
>>> "org.apache.spark" %% "spark-core" % "1.6.1",
>>> "org.apache.spark" %% "spark-sql" % "1.6.1"
>>> )
>>>
>>> Can anyone please guide me to as what is going wrong and why sbt package
>>> is not including all the dependencies jar classes in the new jar.
>>>
>>> Thanks
>>> Sachin
>>>
>>>
>>> On Tue, Jul 19, 2016 at 8:23 PM, Andrew Ehrlich 
>>> wrote:
>>>
 Yes, spark-core will depend on Hadoop and several other jars.  Here’s
 the list of dependencies:
 https://github.com/apache/spark/blob/master/core/pom.xml#L35

 Whether you need spark-sql depends on whether you will use the
 DataFrame API. Without spark-sql, you will just have the RDD API.

 On Jul 19, 2016, at 7:09 AM, Sachin Mittal  wrote:


 Hi,
 Can someone 

RandomForestClassifier

2016-07-20 Thread pseudo oduesp
hi ,
 we have parmaters named


labelCol="labe"


,featuresCol="features",

when i precise the value here (label and features)  if  train my model on
data frame with other columns  tha algorithme choos only label columns and
features columns ?

thanks


Re: Spark driver getting out of memory

2016-07-20 Thread RK Aduri
Cache defaults to MEMORY_ONLY. Can you try with different storage levels
,i.e., MEMORY_ONLY_SER or even DISK_ONLY.  you may want to use persist( )
instead of cache.
Or there is an experimental storage level OFF_HEAP which might also help.

On Tue, Jul 19, 2016 at 11:08 PM, Saurav Sinha 
wrote:

> Hi,
>
> I have set driver memory 10 GB and job ran with intermediate failure which
> is recovered back by spark.
>
> But I still what to know if no of parts increases git driver ram need to
> be increased and what is ration of no of parts/RAM.
>
> @RK : I am using cache on RDD. Is this reason of high RAM utilization.
>
> Thanks,
> Saurav Sinha
>
> On Tue, Jul 19, 2016 at 10:14 PM, RK Aduri 
> wrote:
>
>> Just want to see if this helps.
>>
>> Are you doing heavy collects and persist that? If that is so, you might
>> want to parallelize that collection by converting to an RDD.
>>
>> Thanks,
>> RK
>>
>> On Tue, Jul 19, 2016 at 12:09 AM, Saurav Sinha 
>> wrote:
>>
>>> Hi Mich,
>>>
>>>1. In what mode are you running the spark standalone, yarn-client,
>>>yarn cluster etc
>>>
>>> Ans: spark standalone
>>>
>>>1. You have 4 nodes with each executor having 10G. How many actual
>>>executors do you see in UI (Port 4040 by default)
>>>
>>> Ans: There are 4 executor on which am using 8 cores
>>> (--total-executor-core 32)
>>>
>>>1. What is master memory? Are you referring to diver memory? May be
>>>I am misunderstanding this
>>>
>>> Ans: Driver memory is set as --drive-memory 5g
>>>
>>>1. The only real correlation I see with the driver memory is when
>>>you are running in local mode where worker lives within JVM process that
>>>you start with spark-shell etc. In that case driver memory matters.
>>>However, it appears that you are running in another mode with 4 nodes?
>>>
>>> Ans: I am running my job as spark-submit and on my worker(executor) node
>>> there is no OOM issue ,it only happening on driver app.
>>>
>>> Thanks,
>>> Saurav Sinha
>>>
>>> On Tue, Jul 19, 2016 at 2:42 AM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 can you please clarify:


1. In what mode are you running the spark standalone, yarn-client,
yarn cluster etc
2. You have 4 nodes with each executor having 10G. How many actual
executors do you see in UI (Port 4040 by default)
3. What is master memory? Are you referring to diver memory? May be
I am misunderstanding this
4. The only real correlation I see with the driver memory is when
you are running in local mode where worker lives within JVM process that
you start with spark-shell etc. In that case driver memory matters.
However, it appears that you are running in another mode with 4 nodes?

 Can you get a snapshot of your environment tab in UI and send the
 output please?

 HTH


 Dr Mich Talebzadeh



 LinkedIn * 
 https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 *



 http://talebzadehmich.wordpress.com


 *Disclaimer:* Use it at your own risk. Any and all responsibility for
 any loss, damage or destruction of data or any other property which may
 arise from relying on this email's technical content is explicitly
 disclaimed. The author will in no case be liable for any monetary damages
 arising from such loss, damage or destruction.



 On 18 July 2016 at 11:50, Saurav Sinha  wrote:

> I have set --drive-memory 5g. I need to understand that as no of
> partition increase drive-memory need to be increased. What will be
> best ration of No of partition/drive-memory.
>
> On Mon, Jul 18, 2016 at 4:07 PM, Zhiliang Zhu 
> wrote:
>
>> try to set --drive-memory xg , x would be as large as can be set .
>>
>>
>> On Monday, July 18, 2016 6:31 PM, Saurav Sinha <
>> sauravsinh...@gmail.com> wrote:
>>
>>
>> Hi,
>>
>> I am running spark job.
>>
>> Master memory - 5G
>> executor memort 10G(running on 4 node)
>>
>> My job is getting killed as no of partition increase to 20K.
>>
>> 16/07/18 14:53:13 INFO DAGScheduler: Got job 17 (foreachPartition at
>> WriteToKafka.java:45) with 13524 output partitions (allowLocal=false)
>> 16/07/18 14:53:13 INFO DAGScheduler: Final stage: ResultStage
>> 640(foreachPartition at WriteToKafka.java:45)
>> 16/07/18 14:53:13 INFO DAGScheduler: Parents of final stage:
>> List(ShuffleMapStage 518, ShuffleMapStage 639)
>> 16/07/18 14:53:23 INFO DAGScheduler: Missing parents: List()
>> 16/07/18 14:53:23 INFO DAGScheduler: Submitting ResultStage 640
>> (MapPartitionsRDD[271] at 

Storm HDFS bolt equivalent in Spark Streaming.

2016-07-20 Thread Rajesh_Kalluri
Dell - Internal Use - Confidential
While writing to Kafka from Storm, the hdfs bolt provides a nice way to batch 
the messages , rotate files, file name convention etc as shown below.

Do you know of something similar in Spark Streaming or do we have to roll our 
own? If anyone attempted this can you throw some pointers.

Every other streaming solution like Flume and NIFI handle logic like below.

https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.3.6/bk_storm-user-guide/content/writing-data-with-storm-hdfs-connector.html

// use "|" instead of "," for field delimiter
RecordFormat format = new DelimitedRecordFormat()
.withFieldDelimiter("|");

// Synchronize the filesystem after every 1000 tuples
SyncPolicy syncPolicy = new CountSyncPolicy(1000);

// Rotate data files when they reach 5 MB
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);

// Use default, Storm-generated file names
FileNameFormat fileNameFormat = new DefaultFileNameFormat()
.withPath("/foo/");


// Instantiate the HdfsBolt
HdfsBolt bolt = new HdfsBolt()
.withFsUrl("hdfs://localhost:8020")
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy);




How to connect HBase and Spark using Python?

2016-07-20 Thread Def_Os
I'd like to know whether there's any way to query HBase with Spark SQL via
the PySpark interface. See my question on SO:
http://stackoverflow.com/questions/38470114/how-to-connect-hbase-and-spark-using-python

The new HBase-Spark module in HBase, which introduces the
HBaseContext/JavaHBaseContext, can solve this but it appears to cater to
Scala/Java users only. Anybody know how Python users can make use of this,
or what needs to be built before that is possible?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-connect-HBase-and-Spark-using-Python-tp27372.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: write and call UDF in spark dataframe

2016-07-20 Thread Andy Davidson
Hi Divya

In general you will get better performance if you can minimize your use of
UDFs. Spark 2.0/ tungsten does a lot of code generation. It will have to
treat your UDF as a block box.

Andy

From:  Rishabh Bhardwaj 
Date:  Wednesday, July 20, 2016 at 4:22 AM
To:  Rabin Banerjee 
Cc:  Divya Gehlot , "user @spark"

Subject:  Re: write and call UDF in spark dataframe

> Hi Divya,
> 
> There is already "from_unixtime" exists in org.apache.spark.sql.frunctions,
> Rabin has used that in the sql query,if you want to use it in dataframe DSL
> you can try like this,
> 
>> val new_df = df.select(from_unixtime($"time").as("newtime"))
> 
> Thanks,
> Rishabh.
> 
> On Wed, Jul 20, 2016 at 4:21 PM, Rabin Banerjee 
> wrote:
>> Hi Divya ,
>> 
>> Try,
>> 
>> val df = sqlContext.sql("select from_unixtime(ts,'-MM-dd') as `ts` from
>> mr")
>> Regards,
>> Rabin
>> 
>> On Wed, Jul 20, 2016 at 12:44 PM, Divya Gehlot 
>> wrote:
>>> Hi,
>>> Could somebody share example of writing and calling udf which converts unix
>>> tme stamp to date tiime .
>>> 
>>> 
>>> Thanks,
>>> Divya 
>> 
> 




Re: Building standalone spark application via sbt

2016-07-20 Thread Marco Mistroni
Hello Sachin
  pls paste the NoClassDefFound Exception so we can see what's failing,
aslo please advise how are you running your Spark App
For an extremely simple case, let's assume  you have your  MyFirstSparkApp
packaged in your   myFirstSparkApp.jar
Then all you need to do would be to kick off

spark-submit --class MyFirstSparkApp   myFirstSparkApp.jar

if you have any external dependencies (not spark , let's assume you are
using common-utils.jar) then you should be able to kick it off via

spark-submit --class MyFirstSparkApp --jars common-utiils.jar
myFirstSparkApp.jar

I paste below the build.sbt i am using for my SparkExamples apps, hope this
helps.
kr
 marco

name := "SparkExamples"

version := "1.0"

scalaVersion := "2.10.5"


// Add a single dependency
libraryDependencies += "junit" % "junit" % "4.8" % "test"
libraryDependencies += "org.mockito" % "mockito-core" % "1.9.5"
libraryDependencies ++= Seq("org.slf4j" % "slf4j-api" % "1.7.5",
"org.slf4j" % "slf4j-simple" % "1.7.5",
"org.clapper" %% "grizzled-slf4j" % "1.0.2")
libraryDependencies += "org.powermock" % "powermock-mockito-release-full" %
"1.5.4" % "test"
libraryDependencies += "org.apache.spark" %% "spark-core"   % "1.6.1" %
"provided"
libraryDependencies += "org.apache.spark" %% "spark-streaming"   % "1.6.1"
% "provided"
libraryDependencies += "org.apache.spark" %% "spark-mllib"   % "1.6.1"  %
"provided"
libraryDependencies += "org.apache.spark" %% "spark-streaming-flume"   %
"1.3.0"  % "provided"
resolvers += "softprops-maven" at "
http://dl.bintray.com/content/softprops/maven;








On Wed, Jul 20, 2016 at 3:39 PM, Mich Talebzadeh 
wrote:

> you need an uber jar file.
>
> Have you actually followed the dependencies and project sub-directory
> build?
>
> check this.
>
>
> http://stackoverflow.com/questions/28459333/how-to-build-an-uber-jar-fat-jar-using-sbt-within-intellij-idea
>
> under three answers the top one.
>
> I started reading the official SBT tutorial
> .  .
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 20 July 2016 at 09:54, Sachin Mittal  wrote:
>
>> Hi,
>> I am following the example under
>> https://spark.apache.org/docs/latest/quick-start.html
>> For standalone scala application.
>>
>> I added all my dependencies via build.sbt (one dependency is under lib
>> folder).
>>
>> When I run sbt package I see the jar created under
>> target/scala-2.10/
>>
>> So compile seems to be working fine. However when I inspect that jar, it
>> only contains my scala class.
>> Unlike in java application we build a standalone jar, which contains all
>> the dependencies inside that jar, here all the dependencies are missing.
>>
>> So as expected when I run the application via spark-submit I get the
>> NoClassDefFoundError.
>>
>> Here is my build.sbt
>>
>> name := "Test Advice Project"
>> version := "1.0"
>> scalaVersion := "2.10.6"
>> libraryDependencies ++= Seq(
>> "org.apache.spark" %% "spark-core" % "1.6.1",
>> "org.apache.spark" %% "spark-sql" % "1.6.1"
>> )
>>
>> Can anyone please guide me to as what is going wrong and why sbt package
>> is not including all the dependencies jar classes in the new jar.
>>
>> Thanks
>> Sachin
>>
>>
>> On Tue, Jul 19, 2016 at 8:23 PM, Andrew Ehrlich 
>> wrote:
>>
>>> Yes, spark-core will depend on Hadoop and several other jars.  Here’s
>>> the list of dependencies:
>>> https://github.com/apache/spark/blob/master/core/pom.xml#L35
>>>
>>> Whether you need spark-sql depends on whether you will use the DataFrame
>>> API. Without spark-sql, you will just have the RDD API.
>>>
>>> On Jul 19, 2016, at 7:09 AM, Sachin Mittal  wrote:
>>>
>>>
>>> Hi,
>>> Can someone please guide me what all jars I need to place in my lib
>>> folder of the project to build a standalone scala application via sbt.
>>>
>>> Note I need to provide static dependencies and I cannot download the
>>> jars using libraryDependencies.
>>> So I need to provide all the jars upfront.
>>>
>>> So far I found that we need:
>>> spark-core_.jar
>>>
>>> Do we also need
>>> spark-sql_.jar
>>> and
>>> hadoop-core-.jar
>>>
>>> Is there any jar from spark side I may be missing? What I found that
>>> spark-core needs hadoop-core classes and if I don't add them then sbt was
>>> giving me this error:
>>> 

difference between two consecutive rows of same column + spark + dataframe

2016-07-20 Thread Divya Gehlot
Hi,

I have a dataset of time as shown below :
Time1
07:30:23
07:34:34
07:38:23
07:39:12
07:45:20

I need to find the diff between two consecutive rows
I googled and found the *lag *function in *spark *helps in finding it .
but its not giving me *null *in the result set.

Would really appreciate the help.


Thanks,
Divya


Re: spark worker continuously trying to connect to master and failed in standalone mode

2016-07-20 Thread Igor Berman
in addition check what ip the master is binding to(with nestat)

On 20 July 2016 at 06:12, Andrew Ehrlich  wrote:

> Troubleshooting steps:
>
> $ telnet localhost 7077 (on master, to confirm port is open)
> $ telnet  7077 (on slave, to confirm port is blocked)
>
> If the port is available on the master from the master, but not on the
> master from the slave, check firewall settings on the master:
> https://help.ubuntu.com/lts/serverguide/firewall.html
>
> On Jul 19, 2016, at 6:25 PM, Neil Chang  wrote:
>
> Hi,
>   I have two virtual pcs on private cloud (ubuntu 14). I installed spark
> 2.0 preview on both machines. I then tried to test it with standalone mode.
> I have no problem start the master. However, when I start the worker
> (slave) on another machine, it makes many attempts to connect to master and
> failed at the end.
>   I can ssh from each machine to another without any problem. I can also
> run a master and worker at the same machine without any problem.
>
> What did I miss? Any clue?
>
> here are the messages:
>
> WARN NativeCodeLoader: Unable to load native-hadoop library for your
> platform ... using builtin-java classes where applicable
> ..
> INFO Worker: Connecting to master ip:7077 ...
> INFO Worker: Retrying connection to master (attempt #1)
> ..
> INFO Worker: Retrying connection to master (attempt #7)
> java.lang.IllegalArgumentException: requirement failed: TransportClient
> has not yet been set.
>at scala.Predef$.require(Predef.scala:224)
> ...
> WARN NettyRocEnv: Ignored failure: java.io.IOException: Connecting to
> ip:7077 timed out
> WARN Worker: Failed to connect to master ip.7077
>
>
>
> Thanks,
> Neil
>
>
>


Re: Is it good choice to use DAO to store results generated by spark application?

2016-07-20 Thread Yu Wei
This is startup project. We don't know how much data will be written everyday.

Definitely, there is not too much data at the beginning. But data will increase 
later.

And we want to use spark streaming to receive data via MQTT Util.

We're now evaluate which components could be used for storing data. We need to 
extend spark application to query and analysis data later.


Thx,

Jared


From: Ted Yu 
Sent: Wednesday, July 20, 2016 10:34:15 PM
To: Yu Wei
Cc: ayan guha; Rabin Banerjee; user; Deepak Sharma
Subject: Re: Is it good choice to use DAO to store results generated by spark 
application?

You can decide which component(s) to use for storing your data.
If you haven't used hbase before, it may be better to store data on hdfs and 
query through Hive or SparkSQL.

Maintaining hbase is not trivial task, especially when the cluster size is 
large.

How much data are you expecting to be written on a daily / weekly basis ?

Cheers

On Wed, Jul 20, 2016 at 7:22 AM, Yu Wei 
> wrote:

I'm beginner to big data. I don't have too much knowledge about hbase/hive.

What's the difference between hbase and hive/hdfs for storing data for 
analytics?


Thanks,

Jared


From: ayan guha >
Sent: Wednesday, July 20, 2016 9:34:24 PM
To: Rabin Banerjee
Cc: user; Yu Wei; Deepak Sharma

Subject: Re: Is it good choice to use DAO to store results generated by spark 
application?


Just as a rain check, saving data to hbase for analytics may not be the best 
choice. Any specific reason for not using hdfs or hive?

On 20 Jul 2016 20:57, "Rabin Banerjee" 
> wrote:
Hi Wei ,

You can do something like this ,


foreachPartition( (part) => {
val conn = ConnectionFactory.createConnection(HBaseConfiguration.create());
val table = conn.getTable(TableName.valueOf(tablename));
//part.foreach((inp)=>{println(inp);table.put(inp)}) //This is line by line 
put
table.put(part.toList.asJava)
table.close();
conn.close();


\

Now if you want to wrap it inside a DAO,its upto you. Making DAO will abstract 
thing , but ultimately going to use the same code .

Note: Always use Hbase ConnectionFactory to get connection ,and dump data per 
partition basis.

Regards,
Rabin Banerjee


On Wed, Jul 20, 2016 at 12:06 PM, Yu Wei 
> wrote:

I need to write all data received from MQTT data into hbase for further 
processing.

They're not final result.  I also need to read the data from hbase for analysis.


Is it good choice to use DAO in such situation?


Thx,

Jared



From: Deepak Sharma >
Sent: Wednesday, July 20, 2016 12:34:07 PM
To: Yu Wei
Cc: spark users
Subject: Re: Is it good choice to use DAO to store results generated by spark 
application?


I am using DAO in spark application to write the final computation to Cassandra 
 and it performs well.
What kinds of issues you foresee using DAO for hbase ?

Thanks
Deepak

On 19 Jul 2016 10:04 pm, "Yu Wei" 
> wrote:

Hi guys,


I write spark application and want to store results generated by spark 
application to hbase.

Do I need to access hbase via java api directly?

Or is it better choice to use DAO similar as traditional RDBMS?  I suspect that 
there is major performance downgrade and other negative impacts using DAO. 
However, I have little knowledge in this field.


Any advice?


Thanks,

Jared






Re: Building standalone spark application via sbt

2016-07-20 Thread Mich Talebzadeh
you need an uber jar file.

Have you actually followed the dependencies and project sub-directory build?

check this.

http://stackoverflow.com/questions/28459333/how-to-build-an-uber-jar-fat-jar-using-sbt-within-intellij-idea

under three answers the top one.

I started reading the official SBT tutorial
.  .

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 20 July 2016 at 09:54, Sachin Mittal  wrote:

> Hi,
> I am following the example under
> https://spark.apache.org/docs/latest/quick-start.html
> For standalone scala application.
>
> I added all my dependencies via build.sbt (one dependency is under lib
> folder).
>
> When I run sbt package I see the jar created under
> target/scala-2.10/
>
> So compile seems to be working fine. However when I inspect that jar, it
> only contains my scala class.
> Unlike in java application we build a standalone jar, which contains all
> the dependencies inside that jar, here all the dependencies are missing.
>
> So as expected when I run the application via spark-submit I get the
> NoClassDefFoundError.
>
> Here is my build.sbt
>
> name := "Test Advice Project"
> version := "1.0"
> scalaVersion := "2.10.6"
> libraryDependencies ++= Seq(
> "org.apache.spark" %% "spark-core" % "1.6.1",
> "org.apache.spark" %% "spark-sql" % "1.6.1"
> )
>
> Can anyone please guide me to as what is going wrong and why sbt package
> is not including all the dependencies jar classes in the new jar.
>
> Thanks
> Sachin
>
>
> On Tue, Jul 19, 2016 at 8:23 PM, Andrew Ehrlich 
> wrote:
>
>> Yes, spark-core will depend on Hadoop and several other jars.  Here’s the
>> list of dependencies:
>> https://github.com/apache/spark/blob/master/core/pom.xml#L35
>>
>> Whether you need spark-sql depends on whether you will use the DataFrame
>> API. Without spark-sql, you will just have the RDD API.
>>
>> On Jul 19, 2016, at 7:09 AM, Sachin Mittal  wrote:
>>
>>
>> Hi,
>> Can someone please guide me what all jars I need to place in my lib
>> folder of the project to build a standalone scala application via sbt.
>>
>> Note I need to provide static dependencies and I cannot download the jars
>> using libraryDependencies.
>> So I need to provide all the jars upfront.
>>
>> So far I found that we need:
>> spark-core_.jar
>>
>> Do we also need
>> spark-sql_.jar
>> and
>> hadoop-core-.jar
>>
>> Is there any jar from spark side I may be missing? What I found that
>> spark-core needs hadoop-core classes and if I don't add them then sbt was
>> giving me this error:
>> [error] bad symbolic reference. A signature in SparkContext.class refers
>> to term hadoop
>> [error] in package org.apache which is not available.
>>
>> So I was just confused on library dependency part when building an
>> application via sbt. Any inputs here would be helpful.
>>
>> Thanks
>> Sachin
>>
>>
>>
>>
>>
>


Re: Is it good choice to use DAO to store results generated by spark application?

2016-07-20 Thread Ted Yu
You can decide which component(s) to use for storing your data.
If you haven't used hbase before, it may be better to store data on hdfs
and query through Hive or SparkSQL.

Maintaining hbase is not trivial task, especially when the cluster size is
large.

How much data are you expecting to be written on a daily / weekly basis ?

Cheers

On Wed, Jul 20, 2016 at 7:22 AM, Yu Wei  wrote:

> I'm beginner to big data. I don't have too much knowledge about hbase/hive.
>
> What's the difference between hbase and hive/hdfs for storing data for
> analytics?
>
>
> Thanks,
>
> Jared
> --
> *From:* ayan guha 
> *Sent:* Wednesday, July 20, 2016 9:34:24 PM
> *To:* Rabin Banerjee
> *Cc:* user; Yu Wei; Deepak Sharma
>
> *Subject:* Re: Is it good choice to use DAO to store results generated by
> spark application?
>
>
> Just as a rain check, saving data to hbase for analytics may not be the
> best choice. Any specific reason for not using hdfs or hive?
> On 20 Jul 2016 20:57, "Rabin Banerjee" 
> wrote:
>
>> Hi Wei ,
>>
>> You can do something like this ,
>>
>> foreachPartition( (part) => {val conn = 
>> ConnectionFactory.createConnection(HBaseConfiguration.create());val 
>> table = conn.getTable(TableName.valueOf(tablename));
>> //part.foreach((inp)=>{println(inp);table.put(inp)}) //This is line by line 
>> put  table.put(part.toList.asJava)table.close();conn.close();
>>
>>
>> \
>>
>> Now if you want to wrap it inside a DAO,its upto you. Making DAO will
>> abstract thing , but ultimately going to use the same code .
>>
>> Note: Always use Hbase ConnectionFactory to get connection ,and dump data
>> per partition basis.
>>
>> Regards,
>> Rabin Banerjee
>>
>>
>> On Wed, Jul 20, 2016 at 12:06 PM, Yu Wei  wrote:
>>
>>> I need to write all data received from MQTT data into hbase for further
>>> processing.
>>>
>>> They're not final result.  I also need to read the data from hbase for
>>> analysis.
>>>
>>>
>>> Is it good choice to use DAO in such situation?
>>>
>>>
>>> Thx,
>>>
>>> Jared
>>>
>>>
>>> --
>>> *From:* Deepak Sharma 
>>> *Sent:* Wednesday, July 20, 2016 12:34:07 PM
>>> *To:* Yu Wei
>>> *Cc:* spark users
>>> *Subject:* Re: Is it good choice to use DAO to store results generated
>>> by spark application?
>>>
>>>
>>> I am using DAO in spark application to write the final computation to
>>> Cassandra  and it performs well.
>>> What kinds of issues you foresee using DAO for hbase ?
>>>
>>> Thanks
>>> Deepak
>>>
>>> On 19 Jul 2016 10:04 pm, "Yu Wei"  wrote:
>>>
 Hi guys,


 I write spark application and want to store results generated by spark
 application to hbase.

 Do I need to access hbase via java api directly?

 Or is it better choice to use DAO similar as traditional RDBMS?  I
 suspect that there is major performance downgrade and other negative
 impacts using DAO. However, I have little knowledge in this field.


 Any advice?


 Thanks,

 Jared




>>


Re: Is it good choice to use DAO to store results generated by spark application?

2016-07-20 Thread Yu Wei
I'm beginner to big data. I don't have too much knowledge about hbase/hive.

What's the difference between hbase and hive/hdfs for storing data for 
analytics?


Thanks,

Jared


From: ayan guha 
Sent: Wednesday, July 20, 2016 9:34:24 PM
To: Rabin Banerjee
Cc: user; Yu Wei; Deepak Sharma
Subject: Re: Is it good choice to use DAO to store results generated by spark 
application?


Just as a rain check, saving data to hbase for analytics may not be the best 
choice. Any specific reason for not using hdfs or hive?

On 20 Jul 2016 20:57, "Rabin Banerjee" 
> wrote:
Hi Wei ,

You can do something like this ,


foreachPartition( (part) => {
val conn = ConnectionFactory.createConnection(HBaseConfiguration.create());
val table = conn.getTable(TableName.valueOf(tablename));
//part.foreach((inp)=>{println(inp);table.put(inp)}) //This is line by line 
put
table.put(part.toList.asJava)
table.close();
conn.close();


\

Now if you want to wrap it inside a DAO,its upto you. Making DAO will abstract 
thing , but ultimately going to use the same code .

Note: Always use Hbase ConnectionFactory to get connection ,and dump data per 
partition basis.

Regards,
Rabin Banerjee


On Wed, Jul 20, 2016 at 12:06 PM, Yu Wei 
> wrote:

I need to write all data received from MQTT data into hbase for further 
processing.

They're not final result.  I also need to read the data from hbase for analysis.


Is it good choice to use DAO in such situation?


Thx,

Jared



From: Deepak Sharma >
Sent: Wednesday, July 20, 2016 12:34:07 PM
To: Yu Wei
Cc: spark users
Subject: Re: Is it good choice to use DAO to store results generated by spark 
application?


I am using DAO in spark application to write the final computation to Cassandra 
 and it performs well.
What kinds of issues you foresee using DAO for hbase ?

Thanks
Deepak

On 19 Jul 2016 10:04 pm, "Yu Wei" 
> wrote:

Hi guys,


I write spark application and want to store results generated by spark 
application to hbase.

Do I need to access hbase via java api directly?

Or is it better choice to use DAO similar as traditional RDBMS?  I suspect that 
there is major performance downgrade and other negative impacts using DAO. 
However, I have little knowledge in this field.


Any advice?


Thanks,

Jared





Re: Latest 200 messages per topic

2016-07-20 Thread Cody Koeninger
If they're files in a file system, and you don't actually need
multiple kinds of consumers, have you considered
streamingContext.fileStream instead of kafka?

On Wed, Jul 20, 2016 at 5:40 AM, Rabin Banerjee
 wrote:
> Hi Cody,
>
> Thanks for your reply .
>
>Let Me elaborate a bit,We have a Directory where small xml(90 KB) files
> are continuously coming(pushed from other node).File has  ID & Timestamp in
> name and also inside record.  Data coming in the directory has to be pushed
> to Kafka to finally get into Spark Streaming . Data is time series data(Per
> device per 15 min 1 file of 90 KB, Total 10,000 Device. So 50,000 files per
> 15 min). No utility can be installed in the source where data is generated ,
> so data will be always ftp-ed to  a directory .In Spark streaming we are
> always interested with latest 60 min(window) of data(latest 4 files per
> device). What do you suggest to get them into Spark Streaming with
> reliability (probably with Kafka). In streaming I am only interested with
> the latest 4 data(60 min).
>
>
> Also I am thinking about , instead of using Spark Windowing ,Using Custom
> java code will push the ID  of the file to Kafka and push parsed XML data to
> HBASE keeping Hbase insert timestamp as File Timestamp, HBASE key will be
> only ID ,CF will have 4 version(Time series version) per device ID (4 latest
> data). As hbase keeps the data per key sorted with timestamp , I will always
> get the latest 4 ts data on get(key). Spark streaming will get the ID from
> Kafka, then read the data from HBASE using get(ID). This will eliminate
> usage of Windowing from Spark-Streaming . Is it good to use ?
>
> Regards,
> Rabin Banerjee
>
>
> On Tue, Jul 19, 2016 at 8:44 PM, Cody Koeninger  wrote:
>>
>> Unless you're using only 1 partition per topic, there's no reasonable
>> way of doing this.  Offsets for one topicpartition do not necessarily
>> have anything to do with offsets for another topicpartition.  You
>> could do the last (200 / number of partitions) messages per
>> topicpartition, but you have no guarantee as to the time those events
>> represent, especially if your producers are misbehaving.  To be
>> perfectly clear, this is a consequence of the Kafka data model, and
>> has nothing to do with spark.
>>
>> So, given that it's a bad idea and doesn't really do what you're
>> asking...  you can do this using KafkaUtils.createRDD
>>
>> On Sat, Jul 16, 2016 at 10:43 AM, Rabin Banerjee
>>  wrote:
>> > Just to add ,
>> >
>> >   I want to read the MAX_OFFSET of a topic , then read MAX_OFFSET-200 ,
>> > every time .
>> >
>> > Also I want to know , If I want to fetch a specific offset range for
>> > Batch
>> > processing, is there any option for doing that ?
>> >
>> >
>> >
>> >
>> > On Sat, Jul 16, 2016 at 9:08 PM, Rabin Banerjee
>> >  wrote:
>> >>
>> >> HI All,
>> >>
>> >>I have 1000 kafka topics each storing messages for different devices
>> >> .
>> >> I want to use the direct approach for connecting kafka from Spark , in
>> >> which
>> >> I am only interested in latest 200 messages in the Kafka .
>> >>
>> >> How do I do that ?
>> >>
>> >> Thanks.
>> >
>> >
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



ML PipelineModel to be scored locally

2016-07-20 Thread Simone Miraglia
Hi all,

I am working on the following use case involving ML Pipelines.

1. I created a Pipeline composed from a set of stages
2. I called "fit" method on my training set
3. I validated my model by calling "transform" on my test set
4. I stored my fitted Pipeline to a shared folder

Then I have a very low latency interactive application (say a kinda of web
service), that should work as follows:
1. The app receives a request
2. A scoring needs to be made, according to my fitted PipelineModel
3. The app sends the score to the caller, in a synchronous fashion

Is there a way to call the .transform method of the PipelineModel over a
single Row?

I will definitely not want to parallelize a single record to a DataFrame,
nor relying on Spark Streaming due to latency requirements.
I would like to use something similar to mllib .predict(Vector) method
which does not rely on Spark Context performing all the computation locally.

Thanks in advance
Best


Snappy initialization issue, spark assembly jar missing snappy classes?

2016-07-20 Thread Eugene Morozov
Greetings!

We're reading input files with newApiHadoopFile that is configured with
multiline split. Everything's fine, besides
https://issues.apache.org/jira/browse/MAPREDUCE-6549. It looks like the
issue is fixed, but within hadoop 2.7.2. Which means we have to download
spark without hadoop and provide custom version of it. Now we use
spark-1.6.1.

It mostly fine, there is doc how to configure, spark started, but when I
use it it gives me nasty exception about snappy cannot be initialized. I
tried few things - update snappy version inside hadoop, package snappy into
my own application jar, but it works only when I literally copy
snappy-java.jar classes into spark-assembly-1.6.1-hadoop2.2.0.jar. It seems
working for now, but I dislike this approach, because I simply cannot know
what else won't work tomorrow.
It looks like I can just turn off snappy, but I want it, I believe it makes
sense to compress data shuffled and stored around.

Could you suggest any way besides copying these classes inside assembled
spark jar file?


The snappy exception
Job aborted due to stage failure: Task 1 in stage 1.0 failed 4 times, most
recent failure: Lost task 1.3 in stage 1.0 (TID 69,
icomputer.petersburg.epam.com): java.io.IOException:
java.lang.reflect.InvocationTargetException
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1222)
at
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
at
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
at
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
at
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.GeneratedConstructorAccessor9.newInstance(Unknown Source)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
at
org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:72)
at
org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:65)
at org.apache.spark.broadcast.TorrentBroadcast.org
$apache$spark$broadcast$TorrentBroadcast$$setConf(TorrentBroadcast.scala:73)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:167)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1219)
... 11 more
Caused by: java.lang.IllegalArgumentException:
java.lang.NoClassDefFoundError: Could not initialize class
org.xerial.snappy.Snappy
at
org.apache.spark.io.SnappyCompressionCodec$.liftedTree1$1(CompressionCodec.scala:171)
at
org.apache.spark.io.SnappyCompressionCodec$.org$apache$spark$io$SnappyCompressionCodec$$version$lzycompute(CompressionCodec.scala:168)
at
org.apache.spark.io.SnappyCompressionCodec$.org$apache$spark$io$SnappyCompressionCodec$$version(CompressionCodec.scala:168)
at
org.apache.spark.io.SnappyCompressionCodec.(CompressionCodec.scala:152)
... 19 more
Caused by: java.lang.NoClassDefFoundError: Could not initialize class
org.xerial.snappy.Snappy
at
org.apache.spark.io.SnappyCompressionCodec$.liftedTree1$1(CompressionCodec.scala:169)
... 22 more
--
Be well!
Jean Morozov


Re: Spark Job trigger in production

2016-07-20 Thread Sathish Kumaran Vairavelu
If you are using Mesos, then u can use Chronos or Marathon
On Wed, Jul 20, 2016 at 6:08 AM Rabin Banerjee 
wrote:

> ++ crontab :)
>
> On Wed, Jul 20, 2016 at 9:07 AM, Andrew Ehrlich 
> wrote:
>
>> Another option is Oozie with the spark action:
>> https://oozie.apache.org/docs/4.2.0/DG_SparkActionExtension.html
>>
>> On Jul 18, 2016, at 12:15 AM, Jagat Singh  wrote:
>>
>> You can use following options
>>
>> * spark-submit from shell
>> * some kind of job server. See spark-jobserver for details
>> * some notebook environment See Zeppelin for example
>>
>>
>>
>>
>>
>> On 18 July 2016 at 17:13, manish jaiswal  wrote:
>>
>>> Hi,
>>>
>>>
>>> What is the best approach to trigger spark job in production cluster?
>>>
>>
>>
>>
>


Spark 1.6.2 Spark-SQL RACK_LOCAL

2016-07-20 Thread chandana
Hive - 1.2.1
AWS EMR 4.7.2

I have external tables with partitions from s3. I had some good performance
with Spark 1.6.1 with NODE_LOCAL data 7x compared to RACK_LOCAL data.

With Spark 1.6.2 and AWS EMR 4.7.2, my node locality is 0! Rack locality
100%.

I am using the default settings and didn't change any locality wait
configurations.

Anybody experiencing similar issues with 1.6.2?





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-6-2-Spark-SQL-RACK-LOCAL-tp27370.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Is it good choice to use DAO to store results generated by spark application?

2016-07-20 Thread ayan guha
Just as a rain check, saving data to hbase for analytics may not be the
best choice. Any specific reason for not using hdfs or hive?
On 20 Jul 2016 20:57, "Rabin Banerjee"  wrote:

> Hi Wei ,
>
> You can do something like this ,
>
> foreachPartition( (part) => {val conn = 
> ConnectionFactory.createConnection(HBaseConfiguration.create());val table 
> = conn.getTable(TableName.valueOf(tablename));
> //part.foreach((inp)=>{println(inp);table.put(inp)}) //This is line by line 
> put   table.put(part.toList.asJava)table.close();conn.close();
>
>
> \
>
> Now if you want to wrap it inside a DAO,its upto you. Making DAO will
> abstract thing , but ultimately going to use the same code .
>
> Note: Always use Hbase ConnectionFactory to get connection ,and dump data
> per partition basis.
>
> Regards,
> Rabin Banerjee
>
>
> On Wed, Jul 20, 2016 at 12:06 PM, Yu Wei  wrote:
>
>> I need to write all data received from MQTT data into hbase for further
>> processing.
>>
>> They're not final result.  I also need to read the data from hbase for
>> analysis.
>>
>>
>> Is it good choice to use DAO in such situation?
>>
>>
>> Thx,
>>
>> Jared
>>
>>
>> --
>> *From:* Deepak Sharma 
>> *Sent:* Wednesday, July 20, 2016 12:34:07 PM
>> *To:* Yu Wei
>> *Cc:* spark users
>> *Subject:* Re: Is it good choice to use DAO to store results generated
>> by spark application?
>>
>>
>> I am using DAO in spark application to write the final computation to
>> Cassandra  and it performs well.
>> What kinds of issues you foresee using DAO for hbase ?
>>
>> Thanks
>> Deepak
>>
>> On 19 Jul 2016 10:04 pm, "Yu Wei"  wrote:
>>
>>> Hi guys,
>>>
>>>
>>> I write spark application and want to store results generated by spark
>>> application to hbase.
>>>
>>> Do I need to access hbase via java api directly?
>>>
>>> Or is it better choice to use DAO similar as traditional RDBMS?  I
>>> suspect that there is major performance downgrade and other negative
>>> impacts using DAO. However, I have little knowledge in this field.
>>>
>>>
>>> Any advice?
>>>
>>>
>>> Thanks,
>>>
>>> Jared
>>>
>>>
>>>
>>>
>


lift coefficien

2016-07-20 Thread pseudo oduesp
Hi ,
how we can claculate lift coeff  from pyspark result of prediction ?

thanks ?


Re: Little idea needed

2016-07-20 Thread Mich Talebzadeh
In reality a true real time analytics will require interrogating the
transaction (redo) log of the RDBMS database to see for changes.

An RDBMS will only keep on current record (the most recent) so if record is
deleted since last import into HDFS that record will not exist.

If the record has been updated since last import, it could be multiple
updates and it is almost impossible to see which record has been updated
since.

So it is going to be tedious without having an automated mechanism that
reads the transaction log of RDBMS database converts that into SQL
statements (insert/update/delete) and send data to Big Data (Hive, Spark
whatever).

The commercial ones work but as yet I have not seen anything open that can
hook and integrate the redolog of Oracle database and convert that into SQL
and send it to Hive etc.

May be Hortonworks Data Flow (HDF) in recent edition has such thing.

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 20 July 2016 at 00:10, ayan guha  wrote:

> Well this one keeps cropping up in every project especially when hadoop
> implemented alongside MPP.
> For the fact, there is no reliable out of box update operation available
> in hdfs or hive or SPARK.
> Hence, one approach is what Mitch suggested, that do not update. Rather
> just keep all source records, by timestamping their arrival.
> Another way is, if I think an data warehouse with open and closed records,
> you can create a partition in hive only for open records. So, you can
> refresh that partition in every run.
> On 20 Jul 2016 06:08, "Mich Talebzadeh"  wrote:
>
>> Well this is a classic.
>>
>> The initial load can be done through Sqoop (outside of Spark) or through
>> JDBC connection in Spark. 10 million rows in nothing.
>>
>> Then you have to think of updates and deletes in addition to new rows.
>>
>> With Sqoop you can load from the last ID in the source table, assuming
>> that you have a unique key in Your Oracle table.
>>
>> If you have 10 new roes and I assume you know how to load these rows from
>> Oracle.
>>
>> I suggest that you add two additional columns to your HDFS/target table,
>>
>> ,op_type int
>> ,op_time timestamp
>>
>> These two columns will specify the row type op_type = 1,2,3
>> INSERT/UPDATE/DELETE and op_time = cast(from_unixtime(unix_timestamp())
>> AS op_time) when the record was added.
>>
>> So you will end up with two additional columns in your HDFS table
>> compared to Oracle table and that will be your staging table.
>>
>> Of course you can do real time analytics through Oracle GoldenGate that
>> read the redolog of the source table in Oracle or better Sap Replication
>> Server (SRS). You will achieve real-time integration between
>> RDBMS tables and Big Data.
>>
>> Once you have you have the staging table (immutable) and the rest is
>> pretty easy. You have the full Entity Life History in this case for records
>> and you can do your queries on them.
>>
>> HTH
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 19 July 2016 at 20:27, Aakash Basu  wrote:
>>
>>> Hi all,
>>>
>>> I'm trying to pull a full table from oracle, which is huge with some 10
>>> million records which will be the initial load to HDFS.
>>>
>>> Then I will do delta loads everyday in the same folder in HDFS.
>>>
>>> Now, my query here is,
>>>
>>> DAY 0 - I did the initial load (full dump).
>>>
>>> DAY 1 - I'll load only that day's data which has suppose 10 records (5
>>> old with some column's value altered and 5 new).
>>>
>>> Here, my question is, how will I push this file to HDFS through Spark
>>> code, if I do append, it will create duplicates (which i don't want), if i
>>> keep separate files and while using it in other program am giving the path
>>> of it as folder which contains all files /. But in this case also the
>>> 

Re: write and call UDF in spark dataframe

2016-07-20 Thread Mich Talebzadeh
yep something in line of

val df = sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/
HH:mm:ss.ss') as time ")

Note that this does not require a column from an already existing table.

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 20 July 2016 at 12:22, Rishabh Bhardwaj  wrote:

> Hi Divya,
>
> There is already "from_unixtime" exists in org.apache.spark.sql.frunctions,
> Rabin has used that in the sql query,if you want to use it in
> dataframe DSL you can try like this,
>
> val new_df = df.select(from_unixtime($"time").as("newtime"))
>
>
> Thanks,
> Rishabh.
>
> On Wed, Jul 20, 2016 at 4:21 PM, Rabin Banerjee <
> dev.rabin.baner...@gmail.com> wrote:
>
>> Hi Divya ,
>>
>> Try,
>>
>> val df = sqlContext.sql("select from_unixtime(ts,'-MM-dd') as `ts` from 
>> mr")
>>
>> Regards,
>> Rabin
>>
>> On Wed, Jul 20, 2016 at 12:44 PM, Divya Gehlot 
>> wrote:
>>
>>> Hi,
>>> Could somebody share example of writing and calling udf which converts
>>> unix tme stamp to date tiime .
>>>
>>>
>>> Thanks,
>>> Divya
>>>
>>
>>
>


Re: write and call UDF in spark dataframe

2016-07-20 Thread Rishabh Bhardwaj
Hi Divya,

There is already "from_unixtime" exists in org.apache.spark.sql.frunctions,
Rabin has used that in the sql query,if you want to use it in dataframe DSL
you can try like this,

val new_df = df.select(from_unixtime($"time").as("newtime"))


Thanks,
Rishabh.

On Wed, Jul 20, 2016 at 4:21 PM, Rabin Banerjee <
dev.rabin.baner...@gmail.com> wrote:

> Hi Divya ,
>
> Try,
>
> val df = sqlContext.sql("select from_unixtime(ts,'-MM-dd') as `ts` from 
> mr")
>
> Regards,
> Rabin
>
> On Wed, Jul 20, 2016 at 12:44 PM, Divya Gehlot 
> wrote:
>
>> Hi,
>> Could somebody share example of writing and calling udf which converts
>> unix tme stamp to date tiime .
>>
>>
>> Thanks,
>> Divya
>>
>
>


Re: Spark Job trigger in production

2016-07-20 Thread Rabin Banerjee
++ crontab :)

On Wed, Jul 20, 2016 at 9:07 AM, Andrew Ehrlich  wrote:

> Another option is Oozie with the spark action:
> https://oozie.apache.org/docs/4.2.0/DG_SparkActionExtension.html
>
> On Jul 18, 2016, at 12:15 AM, Jagat Singh  wrote:
>
> You can use following options
>
> * spark-submit from shell
> * some kind of job server. See spark-jobserver for details
> * some notebook environment See Zeppelin for example
>
>
>
>
>
> On 18 July 2016 at 17:13, manish jaiswal  wrote:
>
>> Hi,
>>
>>
>> What is the best approach to trigger spark job in production cluster?
>>
>
>
>


Re: Storm HDFS bolt equivalent in Spark Streaming.

2016-07-20 Thread Rabin Banerjee
++Deepak,

There is also a option to use saveAsHadoopFile & saveAsNewAPIHadoopFile, In
which you can customize(filename and many things ...) the way you want to
save it. :)

Happy Sparking 

Regards,
Rabin Banerjee

On Wed, Jul 20, 2016 at 10:01 AM, Deepak Sharma 
wrote:

> In spark streaming , you have to decide the duration of micro batches to
> run.
> Once you get the micro batch , transform it as per your logic and then you
> can use saveAsTextFiles on your final RDD to write it to HDFS.
>
> Thanks
> Deepak
>
> On 20 Jul 2016 9:49 am,  wrote:
>
> *Dell - Internal Use - Confidential *
>
> *Dell - Internal Use - Confidential *
>
> While writing to Kafka from Storm, the hdfs bolt provides a nice way to
> batch the messages , rotate files, file name convention etc as shown below.
>
>
>
> Do you know of something similar in Spark Streaming or do we have to roll
> our own? If anyone attempted this can you throw some pointers.
>
>
>
> Every other streaming solution like Flume and NIFI handle logic like below.
>
>
>
>
> https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.3.6/bk_storm-user-guide/content/writing-data-with-storm-hdfs-connector.html
>
>
>
> // use "|" instead of "," for field delimiter
>
> RecordFormat format = new DelimitedRecordFormat()
>
> .withFieldDelimiter("|");
>
>
>
> // Synchronize the filesystem after every 1000 tuples
>
> SyncPolicy syncPolicy = new CountSyncPolicy(1000);
>
>
>
> // Rotate data files when they reach 5 MB
>
> FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f,
> Units.MB);
>
>
>
> // Use default, Storm-generated file names
>
> FileNameFormat fileNameFormat = new DefaultFileNameFormat()
>
> .withPath("/foo/");
>
>
>
>
>
> // Instantiate the HdfsBolt
>
> HdfsBolt bolt = new HdfsBolt()
>
> .withFsUrl("hdfs://localhost:8020")
>
> .withFileNameFormat(fileNameFormat)
>
> .withRecordFormat(format)
>
> .withRotationPolicy(rotationPolicy)
>
> .withSyncPolicy(syncPolicy);
>
>
>
>
>
>
>


Re: Running multiple Spark Jobs on Yarn( Client mode)

2016-07-20 Thread Rabin Banerjee
Hi Vaibhav,
 Please check your yarn configuration and make sure you have available
resources .Please try creating multiple queues ,And submit job on queues.
--queue thequeue

Regards,
Rabin Banerjee

On Wed, Jul 20, 2016 at 12:05 PM, vaibhavrtk  wrote:

> I have a silly question:
>
> Do multiple spark jobs running on yarn have any impact on each other?
> e.g. If the traffic on one streaming job increases too much does it have
> any
> effect on second job? Will it slow it down or any other consequences?
>
> I have enough resources(memory,cores) for both jobs in the same cluster.
>
> Thanks
> Vaibhav
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Running-multiple-Spark-Jobs-on-Yarn-Client-mode-tp27364.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Is it good choice to use DAO to store results generated by spark application?

2016-07-20 Thread Rabin Banerjee
Hi Wei ,

You can do something like this ,

foreachPartition( (part) => {val conn =
ConnectionFactory.createConnection(HBaseConfiguration.create());
val table = conn.getTable(TableName.valueOf(tablename));
//part.foreach((inp)=>{println(inp);table.put(inp)}) //This is line by
line puttable.put(part.toList.asJava)table.close();
conn.close();


\

Now if you want to wrap it inside a DAO,its upto you. Making DAO will
abstract thing , but ultimately going to use the same code .

Note: Always use Hbase ConnectionFactory to get connection ,and dump data
per partition basis.

Regards,
Rabin Banerjee


On Wed, Jul 20, 2016 at 12:06 PM, Yu Wei  wrote:

> I need to write all data received from MQTT data into hbase for further
> processing.
>
> They're not final result.  I also need to read the data from hbase for
> analysis.
>
>
> Is it good choice to use DAO in such situation?
>
>
> Thx,
>
> Jared
>
>
> --
> *From:* Deepak Sharma 
> *Sent:* Wednesday, July 20, 2016 12:34:07 PM
> *To:* Yu Wei
> *Cc:* spark users
> *Subject:* Re: Is it good choice to use DAO to store results generated by
> spark application?
>
>
> I am using DAO in spark application to write the final computation to
> Cassandra  and it performs well.
> What kinds of issues you foresee using DAO for hbase ?
>
> Thanks
> Deepak
>
> On 19 Jul 2016 10:04 pm, "Yu Wei"  wrote:
>
>> Hi guys,
>>
>>
>> I write spark application and want to store results generated by spark
>> application to hbase.
>>
>> Do I need to access hbase via java api directly?
>>
>> Or is it better choice to use DAO similar as traditional RDBMS?  I
>> suspect that there is major performance downgrade and other negative
>> impacts using DAO. However, I have little knowledge in this field.
>>
>>
>> Any advice?
>>
>>
>> Thanks,
>>
>> Jared
>>
>>
>>
>>


Re: write and call UDF in spark dataframe

2016-07-20 Thread Rabin Banerjee
Hi Divya ,

Try,

val df = sqlContext.sql("select from_unixtime(ts,'-MM-dd') as `ts` from mr")

Regards,
Rabin
On Wed, Jul 20, 2016 at 12:44 PM, Divya Gehlot 
wrote:

> Hi,
> Could somebody share example of writing and calling udf which converts
> unix tme stamp to date tiime .
>
>
> Thanks,
> Divya
>


Re: XLConnect in SparkR

2016-07-20 Thread Rabin Banerjee
Hi Yogesh ,

  I have never tried reading XLS files using Spark . But I think you can
use sc.wholeTextFiles  to read the complete xls at once , as xls files are
xml internally, you need to read them all to parse . Then I think you can
use apache poi to read them .

Also, you can copy you XLS data to a MS-Access file to access via JDBC ,

Regards,
Rabin Banerjee

On Wed, Jul 20, 2016 at 2:12 PM, Yogesh Vyas  wrote:

> Hi,
>
> I am trying to load and read excel sheets from HDFS in sparkR using
> XLConnect package.
> Can anyone help me in finding out how to read xls files from HDFS in
> sparkR ?
>
> Regards,
> Yogesh
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: run spark apps in linux crontab

2016-07-20 Thread Rabin Banerjee
HI ,
  Please check your deploy mode and master ,  For example if you want to
deploy in yarn cluster you should use --master yarn-cluster , if you want
to do it on yarn client mode you should use --master yarn-client .

Please note for your case deploying yarn-cluster will be better as cluster
mode is async , it will not die if the launcher dies, in yarn-client mode
your driver die as soon as launcher die so as your executors .

Regards,
Rabin


On Wed, Jul 20, 2016 at 3:41 PM, focus  wrote:

> Hi, I just meet this problem, too! The reason is crontab runtime doesn't
> have the variables you defined, such as $SPARK_HOME.
> I defined the $SPARK_HOME and other variables in /etc/profile like this:
>
> export $MYSCRIPTS=/opt/myscripts
> export $SPARK_HOME=/opt/spark
>
> then, in my crontab job script daily_job.sh
>
> #!/bin/sh
>
> . /etc/profile
>
> $SPARK_HOME/bin/spark-submit $MYSCRIPTS/fix_fh_yesterday.py
>
> then, in crontab -e
>
> 0 8 * * * /home/user/daily_job.sh
>
> hope this helps~
>
>
>
>
> -- Original --
> *From:* "luohui20001";
> *Date:* 2016年7月20日(星期三) 晚上6:00
> *To:* "user@spark.apache.org";
> *Subject:* run spark apps in linux crontab
>
> hi guys:
>   I add a spark-submit job into my Linux crontab list by the means
> below ,however none of them works. If I change it to a normal shell script,
> it is ok. I don't quite understand why. I checked the 8080 web ui of my
> spark cluster, no job submitted, and there is not messages in
> /home/hadoop/log.
>   Any idea is welcome.
>
> [hadoop@master ~]$ crontab -e
> 1.
> 22 21 * * * sh /home/hadoop/shellscripts/run4.sh > /home/hadoop/log
>
> and in run4.sh,it wrote:
> $SPARK_HOME/bin/spark-submit --class com.abc.myclass
> --total-executor-cores 10 --jars $SPARK_HOME/lib/MyDep.jar
> $SPARK_HOME/MyJar.jar  > /home/hadoop/log
>
> 2.
> 22 21 * * * $SPARK_HOME/bin/spark-submit --class com.abc.myclass
> --total-executor-cores 10 --jars $SPARK_HOME/lib/MyDep.jar
> $SPARK_HOME/MyJar.jar  > /home/hadoop/log
>
> 3.
> 22 21 * * * /usr/lib/spark/bin/spark-submit --class com.abc.myclass
> --total-executor-cores 10 --jars /usr/lib/spark/lib/MyDep.jar
> /usr/lib/spark/MyJar.jar  > /home/hadoop/log
>
> 4.
> 22 21 * * * hadoop /usr/lib/spark/bin/spark-submit --class com.abc.myclass
> --total-executor-cores 10 --jars /usr/lib/spark/lib/MyDep.jar
> /usr/lib/spark/MyJar.jar  > /home/hadoop/log
>
> 
>
> ThanksBest regards!
> San.Luo
>


Re: Latest 200 messages per topic

2016-07-20 Thread Rabin Banerjee
Hi Cody,

Thanks for your reply .

   Let Me elaborate a bit,We have a Directory where small xml(90 KB) files
are continuously coming(pushed from other node).File has  ID & Timestamp in
name and also inside record.  Data coming in the directory has to be pushed
to Kafka to finally get into Spark Streaming . Data is time series data(Per
device per 15 min 1 file of 90 KB, Total 10,000 Device. So 50,000 files per
15 min). No utility can be installed in the source where data is generated
, so data will be always ftp-ed to  a directory .In Spark streaming we are
always interested with latest 60 min(window) of data(latest 4 files per
device). What do you suggest to get them into Spark Streaming with
reliability (probably with Kafka). In streaming I am only interested with
the latest 4 data(60 min).


Also I am thinking about , instead of using Spark Windowing ,Using Custom
java code will push the ID  of the file to Kafka and push parsed XML data
to HBASE keeping Hbase insert timestamp as File Timestamp, HBASE key will
be only ID ,CF will have 4 version(Time series version) per device ID (4
latest data). As hbase keeps the data per key sorted with timestamp , I
will always get the latest 4 ts data on get(key). Spark streaming will get
the ID from Kafka, then read the data from HBASE using get(ID). This will
eliminate usage of Windowing from Spark-Streaming . Is it good to use ?

Regards,
Rabin Banerjee


On Tue, Jul 19, 2016 at 8:44 PM, Cody Koeninger  wrote:

> Unless you're using only 1 partition per topic, there's no reasonable
> way of doing this.  Offsets for one topicpartition do not necessarily
> have anything to do with offsets for another topicpartition.  You
> could do the last (200 / number of partitions) messages per
> topicpartition, but you have no guarantee as to the time those events
> represent, especially if your producers are misbehaving.  To be
> perfectly clear, this is a consequence of the Kafka data model, and
> has nothing to do with spark.
>
> So, given that it's a bad idea and doesn't really do what you're
> asking...  you can do this using KafkaUtils.createRDD
>
> On Sat, Jul 16, 2016 at 10:43 AM, Rabin Banerjee
>  wrote:
> > Just to add ,
> >
> >   I want to read the MAX_OFFSET of a topic , then read MAX_OFFSET-200 ,
> > every time .
> >
> > Also I want to know , If I want to fetch a specific offset range for
> Batch
> > processing, is there any option for doing that ?
> >
> >
> >
> >
> > On Sat, Jul 16, 2016 at 9:08 PM, Rabin Banerjee
> >  wrote:
> >>
> >> HI All,
> >>
> >>I have 1000 kafka topics each storing messages for different devices
> .
> >> I want to use the direct approach for connecting kafka from Spark , in
> which
> >> I am only interested in latest 200 messages in the Kafka .
> >>
> >> How do I do that ?
> >>
> >> Thanks.
> >
> >
>


Re:run spark apps in linux crontab

2016-07-20 Thread focus
Hi, I just meet this problem, too! The reason is crontab runtime doesn't have 
the variables you defined, such as $SPARK_HOME.
I defined the $SPARK_HOME and other variables in /etc/profile like this:


export $MYSCRIPTS=/opt/myscripts
export $SPARK_HOME=/opt/spark


then, in my crontab job script daily_job.sh


#!/bin/sh


. /etc/profile


$SPARK_HOME/bin/spark-submit $MYSCRIPTS/fix_fh_yesterday.py


then, in crontab -e


0 8 * * * /home/user/daily_job.sh


hope this helps~








-- Original --
From: "luohui20001"; 
Date: 2016??7??20??(??) 6:00
To: "user@spark.apache.org"; 
Subject: run spark apps in linux crontab



hi guys:
  I add a spark-submit job into my Linux crontab list by the means below 
,however none of them works. If I change it to a normal shell script, it is ok. 
I don't quite understand why. I checked the 8080 web ui of my spark cluster, no 
job submitted, and there is not messages in /home/hadoop/log. 
  Any idea is welcome.


[hadoop@master ~]$ crontab -e
1.
22 21 * * * sh /home/hadoop/shellscripts/run4.sh > /home/hadoop/log 


and in run4.sh,it wrote:
$SPARK_HOME/bin/spark-submit --class com.abc.myclass --total-executor-cores 10 
--jars $SPARK_HOME/lib/MyDep.jar $SPARK_HOME/MyJar.jar  > /home/hadoop/log 


2.
22 21 * * * $SPARK_HOME/bin/spark-submit --class com.abc.myclass 
--total-executor-cores 10 --jars $SPARK_HOME/lib/MyDep.jar 
$SPARK_HOME/MyJar.jar  > /home/hadoop/log 


3.
22 21 * * * /usr/lib/spark/bin/spark-submit --class com.abc.myclass 
--total-executor-cores 10 --jars /usr/lib/spark/lib/MyDep.jar 
/usr/lib/spark/MyJar.jar  > /home/hadoop/log 


4.
22 21 * * * hadoop /usr/lib/spark/bin/spark-submit --class com.abc.myclass 
--total-executor-cores 10 --jars /usr/lib/spark/lib/MyDep.jar 
/usr/lib/spark/MyJar.jar  > /home/hadoop/log 




  

 ThanksBest regards!
San.Luo

RE: run spark apps in linux crontab

2016-07-20 Thread Joaquin Alzola

Remember that the you need to souce your .bashrc
For your PATH to be set up.

From: luohui20...@sina.com [mailto:luohui20...@sina.com]
Sent: 20 July 2016 11:01
To: user 
Subject: run spark apps in linux crontab

hi guys:
  I add a spark-submit job into my Linux crontab list by the means below 
,however none of them works. If I change it to a normal shell script, it is ok. 
I don't quite understand why. I checked the 8080 web ui of my spark cluster, no 
job submitted, and there is not messages in /home/hadoop/log.
  Any idea is welcome.

[hadoop@master ~]$ crontab -e
1.
22 21 * * * sh /home/hadoop/shellscripts/run4.sh > /home/hadoop/log

and in run4.sh,it wrote:
$SPARK_HOME/bin/spark-submit --class com.abc.myclass --total-executor-cores 10 
--jars $SPARK_HOME/lib/MyDep.jar $SPARK_HOME/MyJar.jar  > /home/hadoop/log

2.
22 21 * * * $SPARK_HOME/bin/spark-submit --class com.abc.myclass 
--total-executor-cores 10 --jars $SPARK_HOME/lib/MyDep.jar 
$SPARK_HOME/MyJar.jar  > /home/hadoop/log

3.
22 21 * * * /usr/lib/spark/bin/spark-submit --class com.abc.myclass 
--total-executor-cores 10 --jars /usr/lib/spark/lib/MyDep.jar 
/usr/lib/spark/MyJar.jar  > /home/hadoop/log

4.
22 21 * * * hadoop /usr/lib/spark/bin/spark-submit --class com.abc.myclass 
--total-executor-cores 10 --jars /usr/lib/spark/lib/MyDep.jar 
/usr/lib/spark/MyJar.jar  > /home/hadoop/log



ThanksBest regards!
San.Luo
This email is confidential and may be subject to privilege. If you are not the 
intended recipient, please do not copy or disclose its content but contact the 
sender immediately upon receipt.


run spark apps in linux crontab

2016-07-20 Thread luohui20001
hi guys:  I add a spark-submit job into my Linux crontab list by the means 
below ,however none of them works. If I change it to a normal shell script, it 
is ok. I don't quite understand why. I checked the 8080 web ui of my spark 
cluster, no job submitted, and there is not messages in /home/hadoop/log.   
Any idea is welcome.
[hadoop@master ~]$ crontab -e1.22 21 * * * sh /home/hadoop/shellscripts/run4.sh 
> /home/hadoop/log 
and in run4.sh,it wrote:$SPARK_HOME/bin/spark-submit --class com.abc.myclass 
--total-executor-cores 10 --jars $SPARK_HOME/lib/MyDep.jar 
$SPARK_HOME/MyJar.jar  > /home/hadoop/log 
2.
22 21 * * * $SPARK_HOME/bin/spark-submit --class com.abc.myclass 
--total-executor-cores 10 --jars $SPARK_HOME/lib/MyDep.jar 
$SPARK_HOME/MyJar.jar  > /home/hadoop/log 
3.22 21 * * * /usr/lib/spark/bin/spark-submit --class com.abc.myclass 
--total-executor-cores 10 --jars /usr/lib/spark/lib/MyDep.jar 
/usr/lib/spark/MyJar.jar  > /home/hadoop/log 
4.22 21 * * * hadoop /usr/lib/spark/bin/spark-submit --class com.abc.myclass 
--total-executor-cores 10 --jars /usr/lib/spark/lib/MyDep.jar 
/usr/lib/spark/MyJar.jar  > /home/hadoop/log 


 

ThanksBest regards!
San.Luo


Re: Building standalone spark application via sbt

2016-07-20 Thread Sachin Mittal
Hi,
I am following the example under
https://spark.apache.org/docs/latest/quick-start.html
For standalone scala application.

I added all my dependencies via build.sbt (one dependency is under lib
folder).

When I run sbt package I see the jar created under
target/scala-2.10/

So compile seems to be working fine. However when I inspect that jar, it
only contains my scala class.
Unlike in java application we build a standalone jar, which contains all
the dependencies inside that jar, here all the dependencies are missing.

So as expected when I run the application via spark-submit I get the
NoClassDefFoundError.

Here is my build.sbt

name := "Test Advice Project"
version := "1.0"
scalaVersion := "2.10.6"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.6.1",
"org.apache.spark" %% "spark-sql" % "1.6.1"
)

Can anyone please guide me to as what is going wrong and why sbt package is
not including all the dependencies jar classes in the new jar.

Thanks
Sachin


On Tue, Jul 19, 2016 at 8:23 PM, Andrew Ehrlich  wrote:

> Yes, spark-core will depend on Hadoop and several other jars.  Here’s the
> list of dependencies:
> https://github.com/apache/spark/blob/master/core/pom.xml#L35
>
> Whether you need spark-sql depends on whether you will use the DataFrame
> API. Without spark-sql, you will just have the RDD API.
>
> On Jul 19, 2016, at 7:09 AM, Sachin Mittal  wrote:
>
>
> Hi,
> Can someone please guide me what all jars I need to place in my lib folder
> of the project to build a standalone scala application via sbt.
>
> Note I need to provide static dependencies and I cannot download the jars
> using libraryDependencies.
> So I need to provide all the jars upfront.
>
> So far I found that we need:
> spark-core_.jar
>
> Do we also need
> spark-sql_.jar
> and
> hadoop-core-.jar
>
> Is there any jar from spark side I may be missing? What I found that
> spark-core needs hadoop-core classes and if I don't add them then sbt was
> giving me this error:
> [error] bad symbolic reference. A signature in SparkContext.class refers
> to term hadoop
> [error] in package org.apache which is not available.
>
> So I was just confused on library dependency part when building an
> application via sbt. Any inputs here would be helpful.
>
> Thanks
> Sachin
>
>
>
>
>


XLConnect in SparkR

2016-07-20 Thread Yogesh Vyas
Hi,

I am trying to load and read excel sheets from HDFS in sparkR using
XLConnect package.
Can anyone help me in finding out how to read xls files from HDFS in sparkR ?

Regards,
Yogesh

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



How spark decides whether to do BroadcastHashJoin or SortMergeJoin

2016-07-20 Thread raaggarw
Hi,

How spark decides/optimizes internally as to when it needs to a
BroadcastHashJoin vs SortMergeJoin? Is there anyway we can guide from
outside or through options which Join to use?
Because in my case when i am trying to do a join, spark makes that join as
BroadCastHashJoin internally and when join is actually being executed it
waits for broadcast to be done (which is big data), resulting in timeout.
I do not want to increase value of timeout i.e. 
"spark.sql.broadcastTimeout". Rather i want this to be done via
SortMergeJoin. How can i enforce that?

Thanks
Ravi



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-spark-decides-whether-to-do-BroadcastHashJoin-or-SortMergeJoin-tp27369.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



write and call UDF in spark dataframe

2016-07-20 Thread Divya Gehlot
Hi,
Could somebody share example of writing and calling udf which converts unix
tme stamp to date tiime .


Thanks,
Divya


Re: Is it good choice to use DAO to store results generated by spark application?

2016-07-20 Thread Yu Wei
I need to write all data received from MQTT data into hbase for further 
processing.

They're not final result.  I also need to read the data from hbase for analysis.


Is it good choice to use DAO in such situation?


Thx,

Jared



From: Deepak Sharma 
Sent: Wednesday, July 20, 2016 12:34:07 PM
To: Yu Wei
Cc: spark users
Subject: Re: Is it good choice to use DAO to store results generated by spark 
application?


I am using DAO in spark application to write the final computation to Cassandra 
 and it performs well.
What kinds of issues you foresee using DAO for hbase ?

Thanks
Deepak

On 19 Jul 2016 10:04 pm, "Yu Wei" 
> wrote:

Hi guys,


I write spark application and want to store results generated by spark 
application to hbase.

Do I need to access hbase via java api directly?

Or is it better choice to use DAO similar as traditional RDBMS?  I suspect that 
there is major performance downgrade and other negative impacts using DAO. 
However, I have little knowledge in this field.


Any advice?


Thanks,

Jared




Running multiple Spark Jobs on Yarn( Client mode)

2016-07-20 Thread vaibhavrtk
I have a silly question:

Do multiple spark jobs running on yarn have any impact on each other?
e.g. If the traffic on one streaming job increases too much does it have any
effect on second job? Will it slow it down or any other consequences?

I have enough resources(memory,cores) for both jobs in the same cluster.

Thanks
Vaibhav



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Running-multiple-Spark-Jobs-on-Yarn-Client-mode-tp27364.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Is it good choice to use DAO to store results generated by spark application?

2016-07-20 Thread Yu Wei
Hi Ted,

I also noticed HBASE-13992.

I never used stuff similar as DAO.

As a general rule, which is better choice when working with spark, hbase? 
hbase-spark module, DAO or hbase client api?


I'm beginner to big data.

Any guidance is very helpful for me.


Thanks,

Jared


From: Ted Yu 
Sent: Wednesday, July 20, 2016 12:14:17 PM
To: Andrew Ehrlich
Cc: Yu Wei; user@spark.apache.org
Subject: Re: Is it good choice to use DAO to store results generated by spark 
application?

hbase-spark module is in the up-coming hbase 2.0 release.
Currently it is in master branch of hbase git repo.

FYI

On Tue, Jul 19, 2016 at 8:27 PM, Andrew Ehrlich 
> wrote:
There is a Spark<->HBase library that does this.  I used it once in a prototype 
(never tried in production through): 
http://blog.cloudera.com/blog/2015/08/apache-spark-comes-to-apache-hbase-with-hbase-spark-module/

On Jul 19, 2016, at 9:34 AM, Yu Wei 
> wrote:

Hi guys,

I write spark application and want to store results generated by spark 
application to hbase.
Do I need to access hbase via java api directly?
Or is it better choice to use DAO similar as traditional RDBMS?  I suspect that 
there is major performance downgrade and other negative impacts using DAO. 
However, I have little knowledge in this field.

Any advice?

Thanks,
Jared




Re: Spark driver getting out of memory

2016-07-20 Thread Saurav Sinha
Hi,

I have set driver memory 10 GB and job ran with intermediate failure which
is recovered back by spark.

But I still what to know if no of parts increases git driver ram need to be
increased and what is ration of no of parts/RAM.

@RK : I am using cache on RDD. Is this reason of high RAM utilization.

Thanks,
Saurav Sinha

On Tue, Jul 19, 2016 at 10:14 PM, RK Aduri  wrote:

> Just want to see if this helps.
>
> Are you doing heavy collects and persist that? If that is so, you might
> want to parallelize that collection by converting to an RDD.
>
> Thanks,
> RK
>
> On Tue, Jul 19, 2016 at 12:09 AM, Saurav Sinha 
> wrote:
>
>> Hi Mich,
>>
>>1. In what mode are you running the spark standalone, yarn-client,
>>yarn cluster etc
>>
>> Ans: spark standalone
>>
>>1. You have 4 nodes with each executor having 10G. How many actual
>>executors do you see in UI (Port 4040 by default)
>>
>> Ans: There are 4 executor on which am using 8 cores
>> (--total-executor-core 32)
>>
>>1. What is master memory? Are you referring to diver memory? May be I
>>am misunderstanding this
>>
>> Ans: Driver memory is set as --drive-memory 5g
>>
>>1. The only real correlation I see with the driver memory is when you
>>are running in local mode where worker lives within JVM process that you
>>start with spark-shell etc. In that case driver memory matters. However, 
>> it
>>appears that you are running in another mode with 4 nodes?
>>
>> Ans: I am running my job as spark-submit and on my worker(executor) node
>> there is no OOM issue ,it only happening on driver app.
>>
>> Thanks,
>> Saurav Sinha
>>
>> On Tue, Jul 19, 2016 at 2:42 AM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> can you please clarify:
>>>
>>>
>>>1. In what mode are you running the spark standalone, yarn-client,
>>>yarn cluster etc
>>>2. You have 4 nodes with each executor having 10G. How many actual
>>>executors do you see in UI (Port 4040 by default)
>>>3. What is master memory? Are you referring to diver memory? May be
>>>I am misunderstanding this
>>>4. The only real correlation I see with the driver memory is when
>>>you are running in local mode where worker lives within JVM process that
>>>you start with spark-shell etc. In that case driver memory matters.
>>>However, it appears that you are running in another mode with 4 nodes?
>>>
>>> Can you get a snapshot of your environment tab in UI and send the output
>>> please?
>>>
>>> HTH
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>> On 18 July 2016 at 11:50, Saurav Sinha  wrote:
>>>
 I have set --drive-memory 5g. I need to understand that as no of
 partition increase drive-memory need to be increased. What will be
 best ration of No of partition/drive-memory.

 On Mon, Jul 18, 2016 at 4:07 PM, Zhiliang Zhu 
 wrote:

> try to set --drive-memory xg , x would be as large as can be set .
>
>
> On Monday, July 18, 2016 6:31 PM, Saurav Sinha <
> sauravsinh...@gmail.com> wrote:
>
>
> Hi,
>
> I am running spark job.
>
> Master memory - 5G
> executor memort 10G(running on 4 node)
>
> My job is getting killed as no of partition increase to 20K.
>
> 16/07/18 14:53:13 INFO DAGScheduler: Got job 17 (foreachPartition at
> WriteToKafka.java:45) with 13524 output partitions (allowLocal=false)
> 16/07/18 14:53:13 INFO DAGScheduler: Final stage: ResultStage
> 640(foreachPartition at WriteToKafka.java:45)
> 16/07/18 14:53:13 INFO DAGScheduler: Parents of final stage:
> List(ShuffleMapStage 518, ShuffleMapStage 639)
> 16/07/18 14:53:23 INFO DAGScheduler: Missing parents: List()
> 16/07/18 14:53:23 INFO DAGScheduler: Submitting ResultStage 640
> (MapPartitionsRDD[271] at map at BuildSolrDocs.java:209), which has no
> missing
> parents
> 16/07/18 14:53:23 INFO MemoryStore: ensureFreeSpace(8248) called with
> curMem=41923262, maxMem=2778778828
> 16/07/18 14:53:23 INFO MemoryStore: Block broadcast_90 stored as
> values in memory (estimated size 8.1 KB, free 2.5 GB)
> Exception in thread "dag-scheduler-event-loop"
> java.lang.OutOfMemoryError: Java heap space
> at
> 

Running multiple Spark Jobs on Yarn( Client mode)

2016-07-20 Thread Vaibhav Nagpal
I have a silly question:

Do multiple spark jobs running on yarn have any impact on each other?
e.g. If the traffic on one streaming job increases too much does it have
any effect on second job? Will it slow it down or any other consequences?

Thanks
Vaibhav