Differences between Spark APIs for Hadoop 1.x and Hadoop 2.x in terms of performance, progress reporting and IO metrics.

2015-12-09 Thread Hyukjin Kwon
Hi all,

I am writing this email to both user-group and dev-group since this is
applicable to both.

I am now working on Spark XML datasource (
https://github.com/databricks/spark-xml).
This uses a InputFormat implementation which I downgraded to Hadoop 1.x for
version compatibility.

However, I found all the internal JSON datasource and others in Databricks
use Hadoop 2.x API dealing with TaskAttemptContextImpl by reflecting the
method for this because TaskAttemptContext is a class in Hadoop 1.x and an
interface in Hadoop 2.x.

So, I looked through the codes for some advantages for Hadoop 2.x API but I
couldn't.
I wonder if there are some advantages for using Hadoop 2.x API.

I understand that it is still preferable to use Hadoop 2.x APIs at least
for future differences but somehow I feel like it might not have to use
Hadoop 2.x by reflecting a method.

I would appreciate that if you leave a comment here
https://github.com/databricks/spark-xml/pull/14 as well as sending back a
reply if there is a good explanation

Thanks!


getting error while persisting in hive

2015-12-09 Thread Divya Gehlot
Hi,
I am using spark 1.4.1 .
I am getting error when persisting spark dataframe output to hive

> scala>
> df.select("name","age").write().format("com.databricks.spark.csv").mode(SaveMode.Append).saveAsTable("PersonHiveTable");
> :39: error: org.apache.spark.sql.DataFrameWriter does not take
> parameters
>
>

Can somebody points me whats wrong here ?

Would really appreciate your help.

Thanks in advance

Divya


回复: Re: About Spark On Hbase

2015-12-09 Thread fightf...@163.com
If you are using maven , you can add the cloudera maven repo to the repository 
in pom.xml 
and add the dependency of spark-hbase. 
I just found this : 
http://spark-packages.org/package/nerdammer/spark-hbase-connector 
as Feng Dongyu recommend, you can try this also, but I had no experience of 
using this. 




fightf...@163.com
 
发件人: censj
发送时间: 2015-12-09 15:44
收件人: fightf...@163.com
抄送: user@spark.apache.org
主题: Re: About Spark On Hbase
So, I how to get this jar? I use set package project.I not found sbt lib.
在 2015年12月9日,15:42,fightf...@163.com 写道:

I don't think it really need CDH component. Just use the API 



fightf...@163.com
 
发件人: censj
发送时间: 2015-12-09 15:31
收件人: fightf...@163.com
抄送: user@spark.apache.org
主题: Re: About Spark On Hbase
But this is dependent on CDH。I not install CDH。
在 2015年12月9日,15:18,fightf...@163.com 写道:

Actually you can refer to https://github.com/cloudera-labs/SparkOnHBase 
Also, HBASE-13992  already integrates that feature into the hbase side, but 
that feature has not been released. 

Best,
Sun.



fightf...@163.com
 
From: censj
Date: 2015-12-09 15:04
To: user@spark.apache.org
Subject: About Spark On Hbase
hi all,
 now I using spark,but I not found spark operation hbase open source. Do 
any one tell me? 



Re: Differences between Spark APIs for Hadoop 1.x and Hadoop 2.x in terms of performance, progress reporting and IO metrics.

2015-12-09 Thread Fengdong Yu
I don’t think there is performance difference between 1.x API and 2.x API.

but it’s not a big issue for your change, only 
com.databricks.hadoop.mapreduce.lib.input.XmlInputFormat.java 

 need to change, right?

It’s not a big change to 2.x API. if you agree, I can do, but I cannot promise 
the time within one or two weeks because of my daily job.





> On Dec 9, 2015, at 5:01 PM, Hyukjin Kwon  wrote:
> 
> Hi all, 
> 
> I am writing this email to both user-group and dev-group since this is 
> applicable to both.
> 
> I am now working on Spark XML datasource 
> (https://github.com/databricks/spark-xml 
> ).
> This uses a InputFormat implementation which I downgraded to Hadoop 1.x for 
> version compatibility.
> 
> However, I found all the internal JSON datasource and others in Databricks 
> use Hadoop 2.x API dealing with TaskAttemptContextImpl by reflecting the 
> method for this because TaskAttemptContext is a class in Hadoop 1.x and an 
> interface in Hadoop 2.x.
> 
> So, I looked through the codes for some advantages for Hadoop 2.x API but I 
> couldn't.
> I wonder if there are some advantages for using Hadoop 2.x API.
> 
> I understand that it is still preferable to use Hadoop 2.x APIs at least for 
> future differences but somehow I feel like it might not have to use Hadoop 
> 2.x by reflecting a method.
> 
> I would appreciate that if you leave a comment here 
> https://github.com/databricks/spark-xml/pull/14 
>  as well as sending back a 
> reply if there is a good explanation
> 
> Thanks! 



Re: Differences between Spark APIs for Hadoop 1.x and Hadoop 2.x in terms of performance, progress reporting and IO metrics.

2015-12-09 Thread Hyukjin Kwon
Thank you for your reply!

I have already done the change locally. So for changing it would be fine.

I just wanted to be sure which way is correct.
On 9 Dec 2015 18:20, "Fengdong Yu"  wrote:

> I don’t think there is performance difference between 1.x API and 2.x API.
>
> but it’s not a big issue for your change, only
> com.databricks.hadoop.mapreduce.lib.input.XmlInputFormat.java
> 
>  need to change, right?
>
> It’s not a big change to 2.x API. if you agree, I can do, but I cannot
> promise the time within one or two weeks because of my daily job.
>
>
>
>
>
> On Dec 9, 2015, at 5:01 PM, Hyukjin Kwon  wrote:
>
> Hi all,
>
> I am writing this email to both user-group and dev-group since this is
> applicable to both.
>
> I am now working on Spark XML datasource (
> https://github.com/databricks/spark-xml).
> This uses a InputFormat implementation which I downgraded to Hadoop 1.x
> for version compatibility.
>
> However, I found all the internal JSON datasource and others in Databricks
> use Hadoop 2.x API dealing with TaskAttemptContextImpl by reflecting the
> method for this because TaskAttemptContext is a class in Hadoop 1.x and an
> interface in Hadoop 2.x.
>
> So, I looked through the codes for some advantages for Hadoop 2.x API but
> I couldn't.
> I wonder if there are some advantages for using Hadoop 2.x API.
>
> I understand that it is still preferable to use Hadoop 2.x APIs at least
> for future differences but somehow I feel like it might not have to use
> Hadoop 2.x by reflecting a method.
>
> I would appreciate that if you leave a comment here
> https://github.com/databricks/spark-xml/pull/14 as well as sending back a
> reply if there is a good explanation
>
> Thanks!
>
>
>


Re: getting error while persisting in hive

2015-12-09 Thread Fengdong Yu
.write   not .write()




> On Dec 9, 2015, at 5:37 PM, Divya Gehlot  wrote:
> 
> Hi,
> I am using spark 1.4.1 .
> I am getting error when persisting spark dataframe output to hive 
> scala> 
> df.select("name","age").write().format("com.databricks.spark.csv").mode(SaveMode.Append).saveAsTable("PersonHiveTable");
> :39: error: org.apache.spark.sql.DataFrameWriter does not take 
> parameters
>  
> 
> Can somebody points me whats wrong here ?
> 
> Would really appreciate your help.
> 
> Thanks in advance 
> 
> Divya   



Re: How to use collections inside foreach block

2015-12-09 Thread Rishi Mishra
Your list is defined on the driver, whereas function specified in forEach
will be evaluated on each executor.
You might want to add an accumulator or handle a Sequence of list from each
partition.

On Wed, Dec 9, 2015 at 11:19 AM, Madabhattula Rajesh Kumar <
mrajaf...@gmail.com> wrote:

> Hi,
>
> I have a below query. Please help me to solve this
>
> I have a 2 ids. I want to join these ids to table. This table contains
> some blob data. So i can not join these 2000 ids to this table in one step.
>
> I'm planning to join this table in a chunks. For example, each step I will
> join 5000 ids.
>
> Below code is not working. I'm not able to add result to ListBuffer.
> Result s giving always ZERO
>
> *Code Block :-*
>
> var listOfIds is a ListBuffer with 2 records
>
> listOfIds.grouped(5000).foreach { x =>
> {
> var v1 = new ListBuffer[String]()
> val r = sc.parallelize(x).toDF()
> r.registerTempTable("r")
> var result = sqlContext.sql("SELECT r.id, t.data from r, t where r.id =
> t.id")
>  result.foreach{ y =>
>  {
>  v1 += y
>   }
> }
> println(" SIZE OF V1 === "+ v1.size)  ==>
>
> *THIS VALUE PRINTING AS ZERO*
>
> *// Save v1 values to other db*
> }
>
> Please help me on this.
>
> Regards,
> Rajesh
>



-- 
Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra


Re: About Spark On Hbase

2015-12-09 Thread censj
Thank you! I know
> 在 2015年12月9日,15:59,fightf...@163.com 写道:
> 
> If you are using maven , you can add the cloudera maven repo to the 
> repository in pom.xml 
> and add the dependency of spark-hbase. 
> I just found this : 
> http://spark-packages.org/package/nerdammer/spark-hbase-connector 
>  
> as Feng Dongyu recommend, you can try this also, but I had no experience of 
> using this. 
> 
> 
> fightf...@163.com 
>  
> 发件人: censj 
> 发送时间: 2015-12-09 15:44
> 收件人: fightf...@163.com 
> 抄送: user@spark.apache.org 
> 主题: Re: About Spark On Hbase
> So, I how to get this jar? I use set package project.I not found sbt lib.
>> 在 2015年12月9日,15:42,fightf...@163.com  写道:
>> 
>> I don't think it really need CDH component. Just use the API 
>> 
>> fightf...@163.com 
>>  
>> 发件人: censj 
>> 发送时间: 2015-12-09 15:31
>> 收件人: fightf...@163.com 
>> 抄送: user@spark.apache.org 
>> 主题: Re: About Spark On Hbase
>> But this is dependent on CDH。I not install CDH。
>>> 在 2015年12月9日,15:18,fightf...@163.com  写道:
>>> 
>>> Actually you can refer to https://github.com/cloudera-labs/SparkOnHBase 
>>>  
>>> Also, HBASE-13992   
>>> already integrates that feature into the hbase side, but 
>>> that feature has not been released. 
>>> 
>>> Best,
>>> Sun.
>>> 
>>> fightf...@163.com 
>>>  
>>> From: censj 
>>> Date: 2015-12-09 15:04
>>> To: user@spark.apache.org 
>>> Subject: About Spark On Hbase
>>> hi all,
>>>  now I using spark,but I not found spark operation hbase open source. 
>>> Do any one tell me? 



Re: spark-defaults.conf optimal configuration

2015-12-09 Thread cjrumble
Hello Neelesh,

Thank you for the checklist for determining the correct configuration of
Spark. I will go through these and let you know if I have further questions. 

Regards,

Chris 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-defaults-conf-optimal-configuration-tp25641p25649.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



ALS with repeated entries

2015-12-09 Thread Roberto Pagliari
What happens with ALS when the same pair of user/item appears more than once 
with either the same ratings or different ratings?


Re: Filtering records based on empty value of column in SparkSql

2015-12-09 Thread Fengdong Yu
val req_logs_with_dpid = req_logs.filter(req_logs("req_info.pid") != "" )

Azuryy Yu
Sr. Infrastructure Engineer

cel: 158-0164-9103
wetchat: azuryy


On Wed, Dec 9, 2015 at 7:43 PM, Prashant Bhardwaj <
prashant2006s...@gmail.com> wrote:

> Hi
>
> I have two columns in my json which can have null, empty and non-empty
> string as value.
> I know how to filter records which have non-null value using following:
>
> val req_logs = sqlContext.read.json(filePath)
>
> val req_logs_with_dpid = req_log.filter("req_info.dpid is not null or
> req_info.dpid_sha1 is not null")
>
> But how to filter if value of column is empty string?
> --
> Regards
> Prashant
>


Re: How to use collections inside foreach block

2015-12-09 Thread Ted Yu
To add onto what Rishi said, you can use foreachPartition() on result where
you can save values to DB.

Cheers

On Wed, Dec 9, 2015 at 12:51 AM, Rishi Mishra  wrote:

> Your list is defined on the driver, whereas function specified in forEach
> will be evaluated on each executor.
> You might want to add an accumulator or handle a Sequence of list from
> each partition.
>
> On Wed, Dec 9, 2015 at 11:19 AM, Madabhattula Rajesh Kumar <
> mrajaf...@gmail.com> wrote:
>
>> Hi,
>>
>> I have a below query. Please help me to solve this
>>
>> I have a 2 ids. I want to join these ids to table. This table
>> contains some blob data. So i can not join these 2000 ids to this table in
>> one step.
>>
>> I'm planning to join this table in a chunks. For example, each step I
>> will join 5000 ids.
>>
>> Below code is not working. I'm not able to add result to ListBuffer.
>> Result s giving always ZERO
>>
>> *Code Block :-*
>>
>> var listOfIds is a ListBuffer with 2 records
>>
>> listOfIds.grouped(5000).foreach { x =>
>> {
>> var v1 = new ListBuffer[String]()
>> val r = sc.parallelize(x).toDF()
>> r.registerTempTable("r")
>> var result = sqlContext.sql("SELECT r.id, t.data from r, t where r.id =
>> t.id")
>>  result.foreach{ y =>
>>  {
>>  v1 += y
>>   }
>> }
>> println(" SIZE OF V1 === "+ v1.size)  ==>
>>
>> *THIS VALUE PRINTING AS ZERO*
>>
>> *// Save v1 values to other db*
>> }
>>
>> Please help me on this.
>>
>> Regards,
>> Rajesh
>>
>
>
>
> --
> Regards,
> Rishitesh Mishra,
> SnappyData . (http://www.snappydata.io/)
>
> https://in.linkedin.com/in/rishiteshmishra
>


Re: Filtering records based on empty value of column in SparkSql

2015-12-09 Thread Prashant Bhardwaj
Already tried it. But getting following error.

overloaded method value filter with alternatives: (conditionExpr:
String)org.apache.spark.sql.DataFrame  (condition:
org.apache.spark.sql.Column)org.apache.spark.sql.DataFrame cannot be
applied to (Boolean)

Also tried:

val req_logs_with_dpid =
req_logs.filter(req_logs("req_info.dpid").toString.length
!= 0 )

But getting same error.


On Wed, Dec 9, 2015 at 6:45 PM, Fengdong Yu 
wrote:

> val req_logs_with_dpid = req_logs.filter(req_logs("req_info.pid") != "" )
>
> Azuryy Yu
> Sr. Infrastructure Engineer
>
> cel: 158-0164-9103
> wetchat: azuryy
>
>
> On Wed, Dec 9, 2015 at 7:43 PM, Prashant Bhardwaj <
> prashant2006s...@gmail.com> wrote:
>
>> Hi
>>
>> I have two columns in my json which can have null, empty and non-empty
>> string as value.
>> I know how to filter records which have non-null value using following:
>>
>> val req_logs = sqlContext.read.json(filePath)
>>
>> val req_logs_with_dpid = req_log.filter("req_info.dpid is not null or
>> req_info.dpid_sha1 is not null")
>>
>> But how to filter if value of column is empty string?
>> --
>> Regards
>> Prashant
>>
>
>


-- 
Regards
Prashant


HiveContext.read.orc - buffer size not respected after setting it

2015-12-09 Thread Fabian Böhnlein

Hello everyone,

I'm hitting below exception when reading an ORC file with default 
HiveContext after setting hive.exec.orc.default.buffer.size to 1517137. 
See below for details.


Is there another buffer parameter relevant or another place where I 
could set it?


Any other ideas what's going wrong?

/15/12/09 03:30:31 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 
0.0 (TID 0, prtlap09): java.lang.IllegalArgumentException: Buffer size 
too small. size = 262144 needed = 1317137 at 
org.apache.hadoop.hive.ql.io.orc.InStream$CompressedStream.readHeader(InStream.java:193) 
at 
org.apache.hadoop.hive.ql.io.orc.InStream$CompressedStream.read(InStream.java:238) 
at java.io.InputStream.read(InputStream.java:101)/



Details:

starting spark-1.5.2-bin-hadoop2.6 without any hive-site.xml of 
default.xml in /conf:


spark-shell --master mesos://mymaster:5050 --driver-memory 10G 
--driver-java-options="-Dspark.executor.memory=20g 
-Dhive.exec.orc.default.buffer.size=1517137"


seems to accept the config parameter

scala> sqlContext
res0: org.apache.spark.sql.SQLContext = 
org.apache.spark.sql.hive.HiveContext@61d16653
scala> sqlContext.getAllConfs.get("hive.exec.orc.default.buffer.size")
res1: Option[String] = Some(1517137)


adding an orc file I hit some buffer being too low. Above buffer was the 
obvious candidate, also its default is 262144.


scala> val df = sqlContext.read.orc("hdfs://myhdfsmaster/some_orc_file")
15/12/09 03:30:18 INFO orc.OrcRelation: Listing 
hdfs://myhdfsmaster/some_orc_file on driver
df: org.apache.spark.sql.DataFrame = [_col0: string, _col1: int, _col2: 
timestamp]
/scala> df.first() .. 15/12/09 03:30:26 INFO log.PerfLogger: method=OrcGetSplits from=org.apache.hadoop.hive.ql.io.orc.ReaderImpl> .. 
15/12/09 03:30:26 INFO orc.OrcInputFormat: FooterCacheHitRatio: 0/1 .. 
15/12/09 03:30:26 INFO log.PerfLogger: start=1449660626845 end=1449660626950 duration=105 
from=org.apache.hadoop.hive.ql.io.orc.ReaderImpl> 15/12/09 03:30:31 WARN 
scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, prtlap09): 
java.lang.IllegalArgumentException: Buffer size too small. size = 262144 
needed = 1317137 at 
org.apache.hadoop.hive.ql.io.orc.InStream$CompressedStream.readHeader(InStream.java:193) 
at 
org.apache.hadoop.hive.ql.io.orc.InStream$CompressedStream.read(InStream.java:238) 
at java.io.InputStream.read(InputStream.java:101) at 
org.spark-project.hive.shaded.com.google.protobuf.CodedInputStream.refillBuffer(CodedInputStream.java:737) 
at 
org.spark-project.hive.shaded.com.google.protobuf.CodedInputStream.isAtEnd(CodedInputStream.java:701) 
at 
org.spark-project.hive.shaded.com.google.protobuf.CodedInputStream.readTag(CodedInputStream.java:99) 
at 
org.apache.hadoop.hive.ql.io.orc.OrcProto$StripeFooter.(OrcProto.java:10661) 
at 
org.apache.hadoop.hive.ql.io.orc.OrcProto$StripeFooter.(OrcProto.java:10625) 
at 
org.apache.hadoop.hive.ql.io.orc.OrcProto$StripeFooter$1.parsePartialFrom(OrcProto.java:10730) 
at 
org.apache.hadoop.hive.ql.io.orc.OrcProto$StripeFooter$1.parsePartialFrom(OrcProto.java:10725) 
at 
org.spark-project.hive.shaded.com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:200) 
at 
org.spark-project.hive.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:217) 
at 
org.spark-project.hive.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:223) 
at 
org.spark-project.hive.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:49) 
/


Any hint much appreciated!

Best regards,
Fabian





Filtering records based on empty value of column in SparkSql

2015-12-09 Thread Prashant Bhardwaj
Hi

I have two columns in my json which can have null, empty and non-empty
string as value.
I know how to filter records which have non-null value using following:

val req_logs = sqlContext.read.json(filePath)

val req_logs_with_dpid = req_log.filter("req_info.dpid is not null or
req_info.dpid_sha1 is not null")

But how to filter if value of column is empty string?
-- 
Regards
Prashant


spark data frame write.mode("append") bug

2015-12-09 Thread kali.tumm...@gmail.com
Hi Spark Contributors,

I am trying to append data  to target table using df.write.mode("append")
functionality but spark throwing up table already exists exception.

Is there a fix scheduled in later spark release ?, I am using spark 1.5.

val sourcedfmode=sourcedf.write.mode("append")
sourcedfmode.jdbc(TargetDBinfo.url,TargetDBinfo.table,targetprops)

Full Code:-
https://github.com/kali786516/ScalaDB/blob/master/src/main/java/com/kali/db/SaprkSourceToTargetBulkLoad.scala

Spring Config File:-
https://github.com/kali786516/ScalaDB/blob/master/src/main/resources/SourceToTargetBulkLoad.xml


Thanks
Sri 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-data-frame-write-mode-append-bug-tp25650.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: is repartition very cost

2015-12-09 Thread Daniel Siegmann
Each node can have any number of partitions. Spark will try to have a node
process partitions which are already on the node for best performance (if
you look at the list of tasks in the UI, look under the locality level
column).

As a rule of thumb, you probably want 2-3 times the number of partitions as
you have executors. This helps distribute the work evenly. You would need
to experiment to find the best number for your own case.

If you're reading from a distributed data store (such as HDFS), you should
expect the data to already be partitioned. Any time a shuffle is performed
the data will be repartitioned into a number of partitions equal to the
spark.default.parallelism setting (see
http://spark.apache.org/docs/latest/configuration.html), but most
operations which cause a shuffle also take an optional parameter to set a
different value. If using data frames, use spark.sql.shuffle.partitions.

I recommend you do not do any explicit partitioning or mess with these
values until you find a need for it. If executors are sitting idle, that's
a sign you may need to repartition.


On Tue, Dec 8, 2015 at 9:35 PM, Zhiliang Zhu 
wrote:

> Thanks very much for Yong's help.
>
> Sorry that for one more issue, is it that different partitions must be in
> different nodes? that is, each node would only have one partition, in
> cluster mode ...
>
>
>
> On Wednesday, December 9, 2015 6:41 AM, "Young, Matthew T" <
> matthew.t.yo...@intel.com> wrote:
>
>
> Shuffling large amounts of data over the network is expensive, yes. The
> cost is lower if you are just using a single node where no networking needs
> to be involved to do the repartition (using Spark as a multithreading
> engine).
>
> In general you need to do performance testing to see if a repartition is
> worth the shuffle time.
>
> A common model is to repartition the data once after ingest to achieve
> parallelism and avoid shuffles whenever possible later.
>
> *From:* Zhiliang Zhu [mailto:zchl.j...@yahoo.com.INVALID]
> *Sent:* Tuesday, December 08, 2015 5:05 AM
> *To:* User 
> *Subject:* is repartition very cost
>
>
> Hi All,
>
> I need to do optimize objective function with some linear constraints by
>  genetic algorithm.
> I would like to make as much parallelism for it by spark.
>
> repartition / shuffle may be used sometimes in it, however, is repartition
> API very cost ?
>
> Thanks in advance!
> Zhiliang
>
>
>
>
>


How to interpret executorRunTime?

2015-12-09 Thread Saraswat, Sandeep
Hi,

I'm using Spark 1.5.1 and if I look at the JSON data for a running application, 
every Stage has an "executorRunTime" field associated with it which is 
typically a 7-digit number for the PageRank application running on a large (1.1 
GB) input. Does this represent the execution-time for the stage in 
milliseconds? It doesn't seem so because it doesn't match with what the SparkUI 
shows (the "Duration" field for a stage). So what does it represent? How do I 
get the duration of a stage?

Regards,
Sandeep Saraswat

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Filtering records based on empty value of column in SparkSql

2015-12-09 Thread Gokula Krishnan D
Hello Prashant -

Can you please try like this :

For the instance, input file name is "student_detail.txt" and

ID,Name,Sex,Age
===
101,Alfred,Male,30
102,Benjamin,Male,31
103,Charlie,Female,30
104,Julie,Female,30
105,Maven,Male,30
106,Dexter,Male,30
107,Lundy,Male,32
108,Rita,Female,30
109,Aster,Female,30
110,Harrison,Male,15
111,Rita,,30
112,Aster,,30
113,Harrison,,15
114,Rita,Male,20
115,Aster,,30
116,Harrison,,20

[image: Inline image 2]

*Output:*

Total No.of Records without SEX 5
[111,Rita,,30]
[112,Aster,,30]
[113,Harrison,,15]
[115,Aster,,30]
[116,Harrison,,20]

Total No.of Records with AGE <=15 2
[110,Harrison,Male,15]
[113,Harrison,,15]

Thanks & Regards,
Gokula Krishnan* (Gokul)*
Contact :+1 980-298-1740

On Wed, Dec 9, 2015 at 8:24 AM, Prashant Bhardwaj <
prashant2006s...@gmail.com> wrote:

> Already tried it. But getting following error.
>
> overloaded method value filter with alternatives: (conditionExpr:
> String)org.apache.spark.sql.DataFrame  (condition:
> org.apache.spark.sql.Column)org.apache.spark.sql.DataFrame cannot be
> applied to (Boolean)
>
> Also tried:
>
> val req_logs_with_dpid = 
> req_logs.filter(req_logs("req_info.dpid").toString.length
> != 0 )
>
> But getting same error.
>
>
> On Wed, Dec 9, 2015 at 6:45 PM, Fengdong Yu 
> wrote:
>
>> val req_logs_with_dpid = req_logs.filter(req_logs("req_info.pid") != "" )
>>
>> Azuryy Yu
>> Sr. Infrastructure Engineer
>>
>> cel: 158-0164-9103
>> wetchat: azuryy
>>
>>
>> On Wed, Dec 9, 2015 at 7:43 PM, Prashant Bhardwaj <
>> prashant2006s...@gmail.com> wrote:
>>
>>> Hi
>>>
>>> I have two columns in my json which can have null, empty and non-empty
>>> string as value.
>>> I know how to filter records which have non-null value using following:
>>>
>>> val req_logs = sqlContext.read.json(filePath)
>>>
>>> val req_logs_with_dpid = req_log.filter("req_info.dpid is not null or
>>> req_info.dpid_sha1 is not null")
>>>
>>> But how to filter if value of column is empty string?
>>> --
>>> Regards
>>> Prashant
>>>
>>
>>
>
>
> --
> Regards
> Prashant
>


Re: can i write only RDD transformation into hdfs or any other storage system

2015-12-09 Thread kali.tumm...@gmail.com
Hi Prateek,

you mean writing spark output to any storage system ? yes you can .

Thanks
Sri 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/can-i-write-only-RDD-transformation-into-hdfs-or-any-other-storage-system-tp25637p25651.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Filtering records based on empty value of column in SparkSql

2015-12-09 Thread Gokula Krishnan D
Ok, then you can slightly change like

[image: Inline image 1]

Thanks & Regards,
Gokula Krishnan* (Gokul)*


On Wed, Dec 9, 2015 at 11:09 AM, Prashant Bhardwaj <
prashant2006s...@gmail.com> wrote:

> I have to do opposite of what you're doing. I have to filter non-empty
> records.
>
> On Wed, Dec 9, 2015 at 9:33 PM, Gokula Krishnan D 
> wrote:
>
>> Hello Prashant -
>>
>> Can you please try like this :
>>
>> For the instance, input file name is "student_detail.txt" and
>>
>> ID,Name,Sex,Age
>> ===
>> 101,Alfred,Male,30
>> 102,Benjamin,Male,31
>> 103,Charlie,Female,30
>> 104,Julie,Female,30
>> 105,Maven,Male,30
>> 106,Dexter,Male,30
>> 107,Lundy,Male,32
>> 108,Rita,Female,30
>> 109,Aster,Female,30
>> 110,Harrison,Male,15
>> 111,Rita,,30
>> 112,Aster,,30
>> 113,Harrison,,15
>> 114,Rita,Male,20
>> 115,Aster,,30
>> 116,Harrison,,20
>>
>> [image: Inline image 2]
>>
>> *Output:*
>>
>> Total No.of Records without SEX 5
>> [111,Rita,,30]
>> [112,Aster,,30]
>> [113,Harrison,,15]
>> [115,Aster,,30]
>> [116,Harrison,,20]
>>
>> Total No.of Records with AGE <=15 2
>> [110,Harrison,Male,15]
>> [113,Harrison,,15]
>>
>> Thanks & Regards,
>> Gokula Krishnan* (Gokul)*
>> Contact :+1 980-298-1740
>>
>> On Wed, Dec 9, 2015 at 8:24 AM, Prashant Bhardwaj <
>> prashant2006s...@gmail.com> wrote:
>>
>>> Already tried it. But getting following error.
>>>
>>> overloaded method value filter with alternatives: (conditionExpr:
>>> String)org.apache.spark.sql.DataFrame  (condition:
>>> org.apache.spark.sql.Column)org.apache.spark.sql.DataFrame cannot be
>>> applied to (Boolean)
>>>
>>> Also tried:
>>>
>>> val req_logs_with_dpid = 
>>> req_logs.filter(req_logs("req_info.dpid").toString.length
>>> != 0 )
>>>
>>> But getting same error.
>>>
>>>
>>> On Wed, Dec 9, 2015 at 6:45 PM, Fengdong Yu 
>>> wrote:
>>>
 val req_logs_with_dpid = req_logs.filter(req_logs("req_info.pid") !=
 "" )

 Azuryy Yu
 Sr. Infrastructure Engineer

 cel: 158-0164-9103
 wetchat: azuryy


 On Wed, Dec 9, 2015 at 7:43 PM, Prashant Bhardwaj <
 prashant2006s...@gmail.com> wrote:

> Hi
>
> I have two columns in my json which can have null, empty and non-empty
> string as value.
> I know how to filter records which have non-null value using following:
>
> val req_logs = sqlContext.read.json(filePath)
>
> val req_logs_with_dpid = req_log.filter("req_info.dpid is not null or
> req_info.dpid_sha1 is not null")
>
> But how to filter if value of column is empty string?
> --
> Regards
> Prashant
>


>>>
>>>
>>> --
>>> Regards
>>> Prashant
>>>
>>
>>
>
>
> --
> Regards
> Prashant
>


Re: Spark Stream Monitoring with Kafka Direct API

2015-12-09 Thread Cody Koeninger
Looks like probably

https://issues.apache.org/jira/browse/SPARK-8701

so 1.5.0

On Wed, Dec 9, 2015 at 10:25 AM, Dan Dutrow  wrote:

> I'm on spark version 1.4.1. I couldn't find documentation that said it was
> fixed, so I thought maybe it was still an open issue. Any idea what the fix
> version is?
>
> On Wed, Dec 9, 2015 at 11:10 AM Cody Koeninger  wrote:
>
>> Which version of spark are you on?  I thought that was added to the spark
>> UI in recent versions.
>>
>> DIrect api doesn't have any inherent interaction with zookeeper.  If you
>> need number of messages per batch and aren't on a recent enough version of
>> spark to see them in the ui, you can get them programatically from the
>> offset ranges.  See the definition of count() in recent versions of
>> KafkaRDD for an example.
>>
>> On Wed, Dec 9, 2015 at 9:39 AM, Dan Dutrow  wrote:
>>
>>> Is there documentation for how to update the metrics (#messages per
>>> batch) in the Spark Streaming tab when using the Direct API? Does the
>>> Streaming tab get its information from Zookeeper or something else
>>> internally?
>>> --
>>> Dan ✆
>>>
>>
>> --
> Dan ✆
>


default parallelism and mesos executors

2015-12-09 Thread Adrian Bridgett

(resending, text only as first post on 2nd never seemed to make it)

Using parallelize() on a dataset I'm only seeing two tasks rather than 
the number of cores in the Mesos cluster.  This is with spark 1.5.1 and 
using the mesos coarse grained scheduler.


Running pyspark in a console seems to show that it's taking a while 
before the mesos executors come online (at which point the default 
parallelism is changing).  If I add "sleep 30" after initialising the 
SparkContext I get the "right" number (42 by coincidence!)


I've just tried increasing minRegisteredResourcesRatio to 0.5 but this 
doesn't affect either the test case below nor my code.


Is there something else I can do instead?  Perhaps it should be seeing 
how many tasks _should_ be available rather than how many are (I'm also 
using dynamicAllocation).


15/12/02 14:34:09 INFO mesos.CoarseMesosSchedulerBackend: 
SchedulerBackend is ready for scheduling beginning after reached 
minRegisteredResourcesRatio: 0.0

>>>
>>>
>>> print (sc.defaultParallelism)
2
>>> 15/12/02 14:34:12 INFO mesos.CoarseMesosSchedulerBackend: Mesos 
task 5 is now TASK_RUNNING
15/12/02 14:34:13 INFO mesos.MesosExternalShuffleClient: Successfully 
registered app 20151117-115458-164233482-5050-24333-0126 with external 
shuffle service.


15/12/02 14:34:15 INFO mesos.CoarseMesosSchedulerBackend: Registered 
executor: 
AkkaRpcEndpointRef(Actor[akka.tcp://sparkExecutor@ip-10-1-200-147.ec2.internal:41194/user/Executor#-1021429650]) 
with ID 20151117-115458-164233482-5050-24333-S22/5
15/12/02 14:34:15 INFO spark.ExecutorAllocationManager: New executor 
20151117-115458-164233482-5050-24333-S22/5 has registered (new total is 1)


>>> print (sc.defaultParallelism)
42

Thanks

Adrian Bridgett

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Stream Monitoring with Kafka Direct API

2015-12-09 Thread Cody Koeninger
Which version of spark are you on?  I thought that was added to the spark
UI in recent versions.

DIrect api doesn't have any inherent interaction with zookeeper.  If you
need number of messages per batch and aren't on a recent enough version of
spark to see them in the ui, you can get them programatically from the
offset ranges.  See the definition of count() in recent versions of
KafkaRDD for an example.

On Wed, Dec 9, 2015 at 9:39 AM, Dan Dutrow  wrote:

> Is there documentation for how to update the metrics (#messages per batch)
> in the Spark Streaming tab when using the Direct API? Does the Streaming
> tab get its information from Zookeeper or something else internally?
> --
> Dan ✆
>


Re: Filtering records based on empty value of column in SparkSql

2015-12-09 Thread Prashant Bhardwaj
Anyway I got it. I have to use !== instead of ===. Thank BTW.

On Wed, Dec 9, 2015 at 9:39 PM, Prashant Bhardwaj <
prashant2006s...@gmail.com> wrote:

> I have to do opposite of what you're doing. I have to filter non-empty
> records.
>
> On Wed, Dec 9, 2015 at 9:33 PM, Gokula Krishnan D 
> wrote:
>
>> Hello Prashant -
>>
>> Can you please try like this :
>>
>> For the instance, input file name is "student_detail.txt" and
>>
>> ID,Name,Sex,Age
>> ===
>> 101,Alfred,Male,30
>> 102,Benjamin,Male,31
>> 103,Charlie,Female,30
>> 104,Julie,Female,30
>> 105,Maven,Male,30
>> 106,Dexter,Male,30
>> 107,Lundy,Male,32
>> 108,Rita,Female,30
>> 109,Aster,Female,30
>> 110,Harrison,Male,15
>> 111,Rita,,30
>> 112,Aster,,30
>> 113,Harrison,,15
>> 114,Rita,Male,20
>> 115,Aster,,30
>> 116,Harrison,,20
>>
>> [image: Inline image 2]
>>
>> *Output:*
>>
>> Total No.of Records without SEX 5
>> [111,Rita,,30]
>> [112,Aster,,30]
>> [113,Harrison,,15]
>> [115,Aster,,30]
>> [116,Harrison,,20]
>>
>> Total No.of Records with AGE <=15 2
>> [110,Harrison,Male,15]
>> [113,Harrison,,15]
>>
>> Thanks & Regards,
>> Gokula Krishnan* (Gokul)*
>> Contact :+1 980-298-1740
>>
>> On Wed, Dec 9, 2015 at 8:24 AM, Prashant Bhardwaj <
>> prashant2006s...@gmail.com> wrote:
>>
>>> Already tried it. But getting following error.
>>>
>>> overloaded method value filter with alternatives: (conditionExpr:
>>> String)org.apache.spark.sql.DataFrame  (condition:
>>> org.apache.spark.sql.Column)org.apache.spark.sql.DataFrame cannot be
>>> applied to (Boolean)
>>>
>>> Also tried:
>>>
>>> val req_logs_with_dpid = 
>>> req_logs.filter(req_logs("req_info.dpid").toString.length
>>> != 0 )
>>>
>>> But getting same error.
>>>
>>>
>>> On Wed, Dec 9, 2015 at 6:45 PM, Fengdong Yu 
>>> wrote:
>>>
 val req_logs_with_dpid = req_logs.filter(req_logs("req_info.pid") !=
 "" )

 Azuryy Yu
 Sr. Infrastructure Engineer

 cel: 158-0164-9103
 wetchat: azuryy


 On Wed, Dec 9, 2015 at 7:43 PM, Prashant Bhardwaj <
 prashant2006s...@gmail.com> wrote:

> Hi
>
> I have two columns in my json which can have null, empty and non-empty
> string as value.
> I know how to filter records which have non-null value using following:
>
> val req_logs = sqlContext.read.json(filePath)
>
> val req_logs_with_dpid = req_log.filter("req_info.dpid is not null or
> req_info.dpid_sha1 is not null")
>
> But how to filter if value of column is empty string?
> --
> Regards
> Prashant
>


>>>
>>>
>>> --
>>> Regards
>>> Prashant
>>>
>>
>>
>
>
> --
> Regards
> Prashant
>



-- 
Regards
Prashant


Re: Spark Stream Monitoring with Kafka Direct API

2015-12-09 Thread Dan Dutrow
I'm on spark version 1.4.1. I couldn't find documentation that said it was
fixed, so I thought maybe it was still an open issue. Any idea what the fix
version is?

On Wed, Dec 9, 2015 at 11:10 AM Cody Koeninger  wrote:

> Which version of spark are you on?  I thought that was added to the spark
> UI in recent versions.
>
> DIrect api doesn't have any inherent interaction with zookeeper.  If you
> need number of messages per batch and aren't on a recent enough version of
> spark to see them in the ui, you can get them programatically from the
> offset ranges.  See the definition of count() in recent versions of
> KafkaRDD for an example.
>
> On Wed, Dec 9, 2015 at 9:39 AM, Dan Dutrow  wrote:
>
>> Is there documentation for how to update the metrics (#messages per
>> batch) in the Spark Streaming tab when using the Direct API? Does the
>> Streaming tab get its information from Zookeeper or something else
>> internally?
>> --
>> Dan ✆
>>
>
> --
Dan ✆


RDD.isEmpty

2015-12-09 Thread Pat Ferrel
I’m getting *huge* execution times on a moderate sized dataset during the 
RDD.isEmpty. Everything in the calculation is fast except an RDD.isEmpty 
calculation. I’m using Spark 1.5.1 and from researching I would expect this 
calculation to be linearly proportional to the number of partitions as a worst 
case, which should be a trivial amount of time but it is taking many minutes to 
hours to complete this single phase.

I know that has been a small amount of discussion about using this so would 
love to hear what the current thinking on the subject is. Is there a better way 
to find if an RDD has data? Can someone explain why this is happening?

reference PR
https://github.com/apache/spark/pull/4534 


Spark Stream Monitoring with Kafka Direct API

2015-12-09 Thread Dan Dutrow
Is there documentation for how to update the metrics (#messages per batch)
in the Spark Streaming tab when using the Direct API? Does the Streaming
tab get its information from Zookeeper or something else internally?
-- 
Dan ✆


Re: Filtering records based on empty value of column in SparkSql

2015-12-09 Thread Prashant Bhardwaj
I have to do opposite of what you're doing. I have to filter non-empty
records.

On Wed, Dec 9, 2015 at 9:33 PM, Gokula Krishnan D 
wrote:

> Hello Prashant -
>
> Can you please try like this :
>
> For the instance, input file name is "student_detail.txt" and
>
> ID,Name,Sex,Age
> ===
> 101,Alfred,Male,30
> 102,Benjamin,Male,31
> 103,Charlie,Female,30
> 104,Julie,Female,30
> 105,Maven,Male,30
> 106,Dexter,Male,30
> 107,Lundy,Male,32
> 108,Rita,Female,30
> 109,Aster,Female,30
> 110,Harrison,Male,15
> 111,Rita,,30
> 112,Aster,,30
> 113,Harrison,,15
> 114,Rita,Male,20
> 115,Aster,,30
> 116,Harrison,,20
>
> [image: Inline image 2]
>
> *Output:*
>
> Total No.of Records without SEX 5
> [111,Rita,,30]
> [112,Aster,,30]
> [113,Harrison,,15]
> [115,Aster,,30]
> [116,Harrison,,20]
>
> Total No.of Records with AGE <=15 2
> [110,Harrison,Male,15]
> [113,Harrison,,15]
>
> Thanks & Regards,
> Gokula Krishnan* (Gokul)*
> Contact :+1 980-298-1740
>
> On Wed, Dec 9, 2015 at 8:24 AM, Prashant Bhardwaj <
> prashant2006s...@gmail.com> wrote:
>
>> Already tried it. But getting following error.
>>
>> overloaded method value filter with alternatives: (conditionExpr:
>> String)org.apache.spark.sql.DataFrame  (condition:
>> org.apache.spark.sql.Column)org.apache.spark.sql.DataFrame cannot be
>> applied to (Boolean)
>>
>> Also tried:
>>
>> val req_logs_with_dpid = 
>> req_logs.filter(req_logs("req_info.dpid").toString.length
>> != 0 )
>>
>> But getting same error.
>>
>>
>> On Wed, Dec 9, 2015 at 6:45 PM, Fengdong Yu 
>> wrote:
>>
>>> val req_logs_with_dpid = req_logs.filter(req_logs("req_info.pid") != ""
>>> )
>>>
>>> Azuryy Yu
>>> Sr. Infrastructure Engineer
>>>
>>> cel: 158-0164-9103
>>> wetchat: azuryy
>>>
>>>
>>> On Wed, Dec 9, 2015 at 7:43 PM, Prashant Bhardwaj <
>>> prashant2006s...@gmail.com> wrote:
>>>
 Hi

 I have two columns in my json which can have null, empty and non-empty
 string as value.
 I know how to filter records which have non-null value using following:

 val req_logs = sqlContext.read.json(filePath)

 val req_logs_with_dpid = req_log.filter("req_info.dpid is not null or
 req_info.dpid_sha1 is not null")

 But how to filter if value of column is empty string?
 --
 Regards
 Prashant

>>>
>>>
>>
>>
>> --
>> Regards
>> Prashant
>>
>
>


-- 
Regards
Prashant


Re: spark data frame write.mode("append") bug

2015-12-09 Thread Seongduk Cheon
Not for sure, but I think it is bug as of 1.5.

Spark is using LIMIT keyword whether a table exists.
https://github.com/apache/spark/blob/branch-1.5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala#L48

If your database does not support LIMIT keyword such as SQL Server, spark
try to create table
https://github.com/apache/spark/blob/branch-1.5/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala#L272-L275

This issue has already fixed and It will be released on 1.6
https://issues.apache.org/jira/browse/SPARK-9078


--
Cheon

2015-12-09 22:54 GMT+09:00 kali.tumm...@gmail.com :

> Hi Spark Contributors,
>
> I am trying to append data  to target table using df.write.mode("append")
> functionality but spark throwing up table already exists exception.
>
> Is there a fix scheduled in later spark release ?, I am using spark 1.5.
>
> val sourcedfmode=sourcedf.write.mode("append")
> sourcedfmode.jdbc(TargetDBinfo.url,TargetDBinfo.table,targetprops)
>
> Full Code:-
>
> https://github.com/kali786516/ScalaDB/blob/master/src/main/java/com/kali/db/SaprkSourceToTargetBulkLoad.scala
>
> Spring Config File:-
>
> https://github.com/kali786516/ScalaDB/blob/master/src/main/resources/SourceToTargetBulkLoad.xml
>
>
> Thanks
> Sri
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/spark-data-frame-write-mode-append-bug-tp25650.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Unsubsribe

2015-12-09 Thread Michael Nolting
cancel

-- 

*Michael Nolting*
Head of Sevenval FDX

*Sevenval Technologies GmbH *

FRONT-END-EXPERTS SINCE 1999

Köpenicker Straße 154 | 10997 Berlin

office   +49 30 707 190 - 278
mail michael.nolt...@sevenval.com 

www.sevenval.com

Sitz: Köln, HRB 79823
Geschäftsführung: Jan Webering (CEO), Thorsten May, Joern-Carlos Kuntze

*Wir erhöhen den Return On Investment bei Ihren Mobile und Web-Projekten.
Sprechen Sie uns an: *http://roi.sevenval.com/
---
FOLLOW US on

[image: Sevenval blog]


[image: sevenval on twitter]

 [image: sevenval on linkedin]
[image:
sevenval on pinterest]



Re: RDD.isEmpty

2015-12-09 Thread Sean Owen
Are you sure it's isEmpty? and not an upstream stage? isEmpty is
definitely the action here.  It doesn't make sense that take(1) is so
much faster since it's the "same thing".

On Wed, Dec 9, 2015 at 5:11 PM, Pat Ferrel  wrote:
> Sure, I thought this might be a known issue.
>
> I have a 122M dataset, which is the trust and rating data from epinions. The 
> data is split into two RDDs and there is an item properties RDD. The code is 
> just trying to remove any empty RDD from the list.
>
> val esRDDs: List[RDD[(String, Map[String, Any])]] =
>   (correlators ::: properties).filterNot( c => c.isEmpty())
>
> On my 16G MBP with 4g per executor and 4 executors the IsEmpty takes over a 
> hundred minutes (going from memory, I can supply the timeline given a few 
> hours to recalc it).
>
> Running a different version of the code that does a .count for debug and 
> .take(1) instead of the .isEmpty the count of one epinions RDD take 8 minutes 
> and the .take(1) uses 3 minutes.
>
> Other users have seen total runtime on 13G dataset of 700 minutes with the 
> execution time mostly spent in isEmpty.
>
>
> On Dec 9, 2015, at 8:50 AM, Sean Owen  wrote:
>
> It should at best collect 1 item to the driver. This means evaluating
> at least 1 element of 1 partition. I can imagine pathological cases
> where that's slow, but, do you have any more info? how slow is slow
> and what is slow?
>
> On Wed, Dec 9, 2015 at 4:41 PM, Pat Ferrel  wrote:
>> I’m getting *huge* execution times on a moderate sized dataset during the
>> RDD.isEmpty. Everything in the calculation is fast except an RDD.isEmpty
>> calculation. I’m using Spark 1.5.1 and from researching I would expect this
>> calculation to be linearly proportional to the number of partitions as a
>> worst case, which should be a trivial amount of time but it is taking many
>> minutes to hours to complete this single phase.
>>
>> I know that has been a small amount of discussion about using this so would
>> love to hear what the current thinking on the subject is. Is there a better
>> way to find if an RDD has data? Can someone explain why this is happening?
>>
>> reference PR
>> https://github.com/apache/spark/pull/4534
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: RDD.isEmpty

2015-12-09 Thread Pat Ferrel
Err, compiled for Spark 1.3.1, running on 1.5.1 if that makes any difference. 
The Spark impl is “provided” so should be using 1.5.1 code afaik.

The code is as you see below for isEmpty, so not sure what else could it could 
be measuring since it’s the only spark thing on the line. I can regen the 
timeline but here is the .take(1) timeline. It is an order of magnitude faster 
(from my recollection) but even the take(1) still seems incredibly slow for an 
empty test. I was surprised that isEmpty is a distributed calc. When run from 
the driver this value could have already been calculated as a byproduct of 
creating the RDD, no?

I could use an accumulator to count members as the RDD is created and get a 
negligible .isEmpty calc time, right? The RDD creation might be slightly slower 
due to using an accumulator.





On Dec 9, 2015, at 9:29 AM, Sean Owen  wrote:

Are you sure it's isEmpty? and not an upstream stage? isEmpty is
definitely the action here.  It doesn't make sense that take(1) is so
much faster since it's the "same thing".

On Wed, Dec 9, 2015 at 5:11 PM, Pat Ferrel  wrote:
> Sure, I thought this might be a known issue.
> 
> I have a 122M dataset, which is the trust and rating data from epinions. The 
> data is split into two RDDs and there is an item properties RDD. The code is 
> just trying to remove any empty RDD from the list.
> 
> val esRDDs: List[RDD[(String, Map[String, Any])]] =
>  (correlators ::: properties).filterNot( c => c.isEmpty())
> 
> On my 16G MBP with 4g per executor and 4 executors the IsEmpty takes over a 
> hundred minutes (going from memory, I can supply the timeline given a few 
> hours to recalc it).
> 
> Running a different version of the code that does a .count for debug and 
> .take(1) instead of the .isEmpty the count of one epinions RDD take 8 minutes 
> and the .take(1) uses 3 minutes.
> 
> Other users have seen total runtime on 13G dataset of 700 minutes with the 
> execution time mostly spent in isEmpty.
> 
> 
> On Dec 9, 2015, at 8:50 AM, Sean Owen  wrote:
> 
> It should at best collect 1 item to the driver. This means evaluating
> at least 1 element of 1 partition. I can imagine pathological cases
> where that's slow, but, do you have any more info? how slow is slow
> and what is slow?
> 
> On Wed, Dec 9, 2015 at 4:41 PM, Pat Ferrel  wrote:
>> I’m getting *huge* execution times on a moderate sized dataset during the
>> RDD.isEmpty. Everything in the calculation is fast except an RDD.isEmpty
>> calculation. I’m using Spark 1.5.1 and from researching I would expect this
>> calculation to be linearly proportional to the number of partitions as a
>> worst case, which should be a trivial amount of time but it is taking many
>> minutes to hours to complete this single phase.
>> 
>> I know that has been a small amount of discussion about using this so would
>> love to hear what the current thinking on the subject is. Is there a better
>> way to find if an RDD has data? Can someone explain why this is happening?
>> 
>> reference PR
>> https://github.com/apache/spark/pull/4534
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




Re: RDD.isEmpty

2015-12-09 Thread Sean Owen
Yes but what is the code that generates the RDD? is it a shuffle of
something? that could cause checking for any element to be expensive since
computing the RDD at all is expensive. Look at the stages in these
long-running jobs.

How could isEmpty not be distributed? the driver can't know whether the
RDD's partitions are empty without evaluating at least one of them a little
bit (unless there are 0 partitions). Caching the size doesn't help unless,
well, you know the size already because the RDD was fully computed. And it
might get weird anyway since RDDs are only as deterministic as their source
-- counting lines of a text file will return a different number if the text
file is appended to.

The only thing that sticks out is the time to serialize one value back to
the driver. I don't know what your "Any" is there but could it be big or
hard to serialize?

Really there's a little gotcha in this implementation: you can only check
isEmpty on an RDD of serializable objects! which is a pretty good
assumption; you won't get far with an RDD of something unserializable but
it's not impossible for it to come up.

The serialization could be avoided by mapping everything to "1" or
something and take-ing *that*. Returning a 1 to the driver is trivial. Or
maybe adapt some version of the implementation of take() to be an
optimized, smarter isEmpty(). Neither seemed worth the overhead at the
time, but this could be a case against that, if it turns out somehow to be
serialization time.


On Wed, Dec 9, 2015 at 5:55 PM, Pat Ferrel  wrote:

> Err, compiled for Spark 1.3.1, running on 1.5.1 if that makes any
> difference. The Spark impl is “provided” so should be using 1.5.1 code
> afaik.
>
> The code is as you see below for isEmpty, so not sure what else could it
> could be measuring since it’s the only spark thing on the line. I can regen
> the timeline but here is the .take(1) timeline. It is an order of magnitude
> faster (from my recollection) but even the take(1) still seems incredibly
> slow for an empty test. I was surprised that isEmpty is a distributed calc.
> When run from the driver this value could have already been calculated as a
> byproduct of creating the RDD, no?
>
> I could use an accumulator to count members as the RDD is created and get
> a negligible .isEmpty calc time, right? The RDD creation might be slightly
> slower due to using an accumulator.
>
>
>
>
>
> On Dec 9, 2015, at 9:29 AM, Sean Owen  wrote:
>
> Are you sure it's isEmpty? and not an upstream stage? isEmpty is
> definitely the action here.  It doesn't make sense that take(1) is so
> much faster since it's the "same thing".
>
> On Wed, Dec 9, 2015 at 5:11 PM, Pat Ferrel  wrote:
>
> Sure, I thought this might be a known issue.
>
> I have a 122M dataset, which is the trust and rating data from epinions.
> The data is split into two RDDs and there is an item properties RDD. The
> code is just trying to remove any empty RDD from the list.
>
> val esRDDs: List[RDD[(String, Map[String, Any])]] =
>  (correlators ::: properties).filterNot( c => c.isEmpty())
>
> On my 16G MBP with 4g per executor and 4 executors the IsEmpty takes over
> a hundred minutes (going from memory, I can supply the timeline given a
> few hours to recalc it).
>
> Running a different version of the code that does a .count for debug and
> .take(1) instead of the .isEmpty the count of one epinions RDD take 8
> minutes and the .take(1) uses 3 minutes.
>
> Other users have seen total runtime on 13G dataset of 700 minutes with the
> execution time mostly spent in isEmpty.
>
>
> On Dec 9, 2015, at 8:50 AM, Sean Owen  wrote:
>
> It should at best collect 1 item to the driver. This means evaluating
> at least 1 element of 1 partition. I can imagine pathological cases
> where that's slow, but, do you have any more info? how slow is slow
> and what is slow?
>
> On Wed, Dec 9, 2015 at 4:41 PM, Pat Ferrel  wrote:
>
> I’m getting *huge* execution times on a moderate sized dataset during the
> RDD.isEmpty. Everything in the calculation is fast except an RDD.isEmpty
> calculation. I’m using Spark 1.5.1 and from researching I would expect this
> calculation to be linearly proportional to the number of partitions as a
> worst case, which should be a trivial amount of time but it is taking many
> minutes to hours to complete this single phase.
>
> I know that has been a small amount of discussion about using this so would
> love to hear what the current thinking on the subject is. Is there a better
> way to find if an RDD has data? Can someone explain why this is happening?
>
> reference PR
> https://github.com/apache/spark/pull/4534
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>
> 

Re: RDD.isEmpty

2015-12-09 Thread Sean Owen
It should at best collect 1 item to the driver. This means evaluating
at least 1 element of 1 partition. I can imagine pathological cases
where that's slow, but, do you have any more info? how slow is slow
and what is slow?

On Wed, Dec 9, 2015 at 4:41 PM, Pat Ferrel  wrote:
> I’m getting *huge* execution times on a moderate sized dataset during the
> RDD.isEmpty. Everything in the calculation is fast except an RDD.isEmpty
> calculation. I’m using Spark 1.5.1 and from researching I would expect this
> calculation to be linearly proportional to the number of partitions as a
> worst case, which should be a trivial amount of time but it is taking many
> minutes to hours to complete this single phase.
>
> I know that has been a small amount of discussion about using this so would
> love to hear what the current thinking on the subject is. Is there a better
> way to find if an RDD has data? Can someone explain why this is happening?
>
> reference PR
> https://github.com/apache/spark/pull/4534

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Content based window operation on Time-series data

2015-12-09 Thread Arun Verma
Hi all,

*We have RDD(main) of sorted time-series data. We want to split it into
different RDDs according to window size and then perform some aggregation
operation like max, min etc. over each RDD in parallel.*

If window size is w then ith RDD has data from (startTime + (i-1)*w) to
(startTime + i*w) where startTime is time-stamp of 1st entry in main RDD
and (startTime + (i-1)*w) is greater then last entry of main RDD.

For now, I am using DataFrame and Spark version 1.5.2. Below code is
running sequentially on the data, so execution time is high and resource
utilization is low. Code snippet is given below:








*/*  * aggragator is max* df - Dataframe has sorted timeseries data* start
- first entry of DataFrame* end - last entry of DataFrame df*
bucketLengthSec - window size* stepResults - has particular block/window
output(JSON)* appendResults - has output till this block/window(JSON)*/*













*while (start <= end) {row = df.filter(df.col("timeStamp")
.between(start, nextStart)).agg(max(df.col("timeStamp")),
max(df.col("value"))).first();if (row.get(0) != null)
{stepResults = new JSONObject();stepResults.put("x",
Long.parseLong(row.get(0).toString()));stepResults.put("y",
row.get(1));appendResults.add(stepResults);}start =
nextStart;nextStart = start + bucketLengthSec;}*


-- 
Thanks and Regards,
Arun Verma


RegressionModelEvaluator (from jpmml) NotSerializableException when instantiated in the driver

2015-12-09 Thread Utkarsh Sengar
I am trying to load a PMML file in a spark job. Instantiate it only once
and pass it to the executors. But I get a NotSerializableException for
org.xml.sax.helpers.LocatorImpl which is used inside jpmml.

I have this class Prediction.java:
public class Prediction implements Serializable {
private RegressionModelEvaluator rme;

public Prediction() throws Exception {
InputStream is = .getResourceAsStream("model.pmml");
Source source = ImportFilter.apply(new InputSource(is));
PMML model = JAXBUtil.unmarshalPMML(source);
rme = new RegressionModelEvaluator(model);
is.close();
}

public Map predict(params) {
   ..
return rme.evaluate(params);
}
}


Now I want to instantiate it only once since the
"JAXBUtil.unmarshalPMML(source)" step takes about 2-3seconds. It works fine
I instantiate inside the map{}

So I do this in my driver:

Prediction prediction = new Prediction();
JavaRDD result = rdd1
.cartesian(rdd2)
.map(t -> {...)
at
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)

I am doing this right?

-- 
Thanks,
-Utkarsh


can i process multiple batch in parallel in spark streaming

2015-12-09 Thread prateek arora
Hi

when i run my spark streaming application .. following information show on
application streaming UI.
i am using spark 1.5.0


Batch Time  Input Size   Scheduling Delay (?) Processing Time (?)
Status
2015/12/09 11:00:42 107 events  -   -   
queued  
2015/12/09 11:00:41 103 events  -   -   
queued
2015/12/09 11:00:40 107 events  -   -   
queued
2015/12/09 11:00:39 105 events  -   -   
queued
2015/12/09 11:00:38 109 events  -   -   
queued
2015/12/09 11:00:37 106 events  -   -   
queued
2015/12/09 11:00:36 109 events  -   -   
queued
2015/12/09 11:00:35 113 events  -   -   
queued
2015/12/09 11:00:34 109 events  -   -   
queued
2015/12/09 11:00:33 107 events  -   -   
queued
2015/12/09 11:00:32 99 events   42 s-   
processing



it seems batches push into queue and work like FIFO manner  . is it possible
all my Active batches start processing in parallel.

Regards
Prateek



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/can-i-process-multiple-batch-in-parallel-in-spark-streaming-tp25653.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Recursive nested wildcard directory walking in Spark

2015-12-09 Thread James Ding
Hi!

My name is James, and I’m working on a question there doesn’t seem to be a
lot of answers about online. I was hoping spark/hadoop gurus could shed some
light on this.

I have a data feed on NFS that looks like /foobar/.gz
Currently I have a spark scala job that calls
sparkContext.textFile("/foo/*/*/*/bar/*.gz")
Upstream owners for the data feed have told me they may add additional
nested directories or remove them from files relevant to me. In other words,
files relevant to my spark job might sit on paths that look like:
* /foo/a/b/c/d/bar/*.gz
* /foo/a/b/bar/*.gz
They will do this with only some files and without warning. Anyone have
ideas on how I can configure spark to create an RDD from any textfiles that
fit the pattern /foo/**/bar/*.gz where ** represents a variable number of
wildcard directories?
I'm working with on order of 10^5 and 10^6 files which has discouraged me
from using something besides Hadoop fs API to walk the filesystem and feed
that input to my spark job, but I'm open to suggestions here also.
Thanks!
James Ding




smime.p7s
Description: S/MIME cryptographic signature


Multiple drivers, same worker

2015-12-09 Thread andresb...@gmail.com
Hi everyone,

We've been getting an issue with spark lately where multiple drivers are
assigned to a same worker but resources are never assigned to them and get
"stuck" forever.

If I login in the worker machine I see that the driver processes aren't
really running and the worker's log don't show any error or anything
related to the driver. The master UI does show the drivers as submitted and
in RUNNING state.


Not sure where else to look for clues, any ideas?

-- 
Andrés Blanco Morales


SparkStreaming variable scope

2015-12-09 Thread jpinela
Hi Guys,
I am sure this is a simple question, but I can't find it in the docs
anywhere.
This reads from flume and writes to hbase (as you can see).
But has a variable scope problem (I believe).
I have the following code:

*
from pyspark.streaming import StreamingContext
from pyspark.streaming.flume import FlumeUtils
from datetime import datetime
ssc = StreamingContext(sc, 5)
conf = {"hbase.zookeeper.quorum": "ubuntu3",
"hbase.mapred.outputtable": "teste2",
"mapreduce.outputformat.class":
"org.apache.hadoop.hbase.mapreduce.TableOutputFormat",
"mapreduce.job.output.key.class":
"org.apache.hadoop.hbase.io.ImmutableBytesWritable",
"mapreduce.job.output.value.class":
"org.apache.hadoop.io.Writable"}


keyConv =
"org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
valueConv =
"org.apache.spark.examples.pythonconverters.StringListToPutConverter"

lines = FlumeUtils.createStream(ssc, 'ubuntu3', 9997)
words = lines.map(lambda line: line[1])
rowid = datetime.now().strftime("%Y%m%d%H%M%S")
outrdd= words.map(lambda x: (str(1),[rowid,"cf1desc","col1",x]))
print("ok 1")
outrdd.pprint()

outrdd.foreachRDD(lambda x:
x.saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv))

ssc.start()
ssc.awaitTermination()*

the issue is that the rowid variable is allways at the point that the
streaming was began.
How can I go around this? I tried a function, an application, nothing
worked.
Thank you.
jp



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkStreaming-variable-scope-tp25652.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



[Spark-1.5.2][Hadoop-2.6][Spark SQL] Cannot run queries in SQLContext, getting java.lang.NoSuchMethodError

2015-12-09 Thread Matheus Ramos
​I have a Java application using *Spark SQL* (*Spark 1.5.2* using *local
mode*), but I cannot execute any SQL commands without getting errors.

This is the code I am executing:

//confs
SparkConf sparkConf = new SparkConf();
sparkConf.set("spark.master","local");
sparkConf.set("spark.app.name","application01");
sparkConf.set("spark.driver.host","10.1.1.36");
sparkConf.set("spark.driver.port", "51810");
sparkConf.set("spark.executor.port", "51815");
sparkConf.set("spark.repl.class.uri","http://10.1.1.36:46146;);
sparkConf.set("spark.executor.instances","2");
sparkConf.set("spark.jars","");
sparkConf.set("spark.executor.id","driver");
sparkConf.set("spark.submit.deployMode","client");
sparkConf.set("spark.fileserver.uri","http://10.1.1.36:47314;);
sparkConf.set("spark.localProperties.clone","true");
sparkConf.set("spark.app.id","app-45631207172715-0002");

//Initialize contexts
JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(sparkContext);

//execute command
sqlContext.sql("show tables").show();

Spark dependencies in *pom.xml* look like this:


  org.apache.spark
  spark-core_2.10
  1.5.2



  org.apache.spark
  spark-sql_2.10
  1.5.2



  org.apache.spark
  spark-hive_2.10
  1.5.2



  org.apache.spark
  spark-repl_2.10
  1.5.2


Here is the error I am getting:

java.lang.NoSuchMethodError:
com.fasterxml.jackson.module.scala.deser.BigDecimalDeserializer$.handledType()Ljava/lang/Class;

The *stack trace* is here .

My application is a web application running on Tomcat 7. I don’t have any
other configuration files. What could I be doing wrong? Could it be some
dependency conflict, since I am able to run the same code in a clean
project?
I found an issue  that
gives some more information about the problem.

Regards,

Matheus​


Re: [Spark-1.5.2][Hadoop-2.6][Spark SQL] Cannot run queries in SQLContext, getting java.lang.NoSuchMethodError

2015-12-09 Thread Michael Armbrust
java.lang.NoSuchMethodError almost always means you have the wrong version
of some library (different than what Spark was compiled with) on your
classpath.; In this case the Jackson parser.

On Wed, Dec 9, 2015 at 10:38 AM, Matheus Ramos 
wrote:

> ​I have a Java application using *Spark SQL* (*Spark 1.5.2* using *local
> mode*), but I cannot execute any SQL commands without getting errors.
>
> This is the code I am executing:
>
> //confs
> SparkConf sparkConf = new SparkConf();
> sparkConf.set("spark.master","local");
> sparkConf.set("spark.app.name","application01");
> sparkConf.set("spark.driver.host","10.1.1.36");
> sparkConf.set("spark.driver.port", "51810");
> sparkConf.set("spark.executor.port", "51815");
> sparkConf.set("spark.repl.class.uri","http://10.1.1.36:46146;);
> sparkConf.set("spark.executor.instances","2");
> sparkConf.set("spark.jars","");
> sparkConf.set("spark.executor.id","driver");
> sparkConf.set("spark.submit.deployMode","client");
> sparkConf.set("spark.fileserver.uri","http://10.1.1.36:47314;);
> sparkConf.set("spark.localProperties.clone","true");
> sparkConf.set("spark.app.id","app-45631207172715-0002");
>
> //Initialize contexts
> JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
> SQLContext sqlContext = new SQLContext(sparkContext);
>
> //execute command
> sqlContext.sql("show tables").show();
>
> Spark dependencies in *pom.xml* look like this:
>
> 
>   org.apache.spark
>   spark-core_2.10
>   1.5.2
> 
>
> 
>   org.apache.spark
>   spark-sql_2.10
>   1.5.2
> 
>
> 
>   org.apache.spark
>   spark-hive_2.10
>   1.5.2
> 
>
> 
>   org.apache.spark
>   spark-repl_2.10
>   1.5.2
> 
>
> Here is the error I am getting:
>
> java.lang.NoSuchMethodError: 
> com.fasterxml.jackson.module.scala.deser.BigDecimalDeserializer$.handledType()Ljava/lang/Class;
>
> The *stack trace* is here .
>
> My application is a web application running on Tomcat 7. I don’t have any
> other configuration files. What could I be doing wrong? Could it be some
> dependency conflict, since I am able to run the same code in a clean
> project?
> I found an issue  that
> gives some more information about the problem.
>
> Regards,
>
> Matheus​
>


[Spark-1.5.2][Hadoop-2.6][Spark SQL] Cannot run queries in SQLContext, getting java.lang.NoSuchMethodError

2015-12-09 Thread Matheus Ramos
​I have a Java application using *Spark SQL* (*Spark 1.5.2* using *local
mode*), but I cannot execute any SQL commands without getting errors.

This is the code I am executing:

//confs
SparkConf sparkConf = new SparkConf();
sparkConf.set("spark.master","local");
sparkConf.set("spark.app.name","application01");
sparkConf.set("spark.driver.host","10.1.1.36");
sparkConf.set("spark.driver.port", "51810");
sparkConf.set("spark.executor.port", "51815");
sparkConf.set("spark.repl.class.uri","http://10.1.1.36:46146;);
sparkConf.set("spark.executor.instances","2");
sparkConf.set("spark.jars","");
sparkConf.set("spark.executor.id","driver");
sparkConf.set("spark.submit.deployMode","client");
sparkConf.set("spark.fileserver.uri","http://10.1.1.36:47314;);
sparkConf.set("spark.localProperties.clone","true");
sparkConf.set("spark.app.id","app-45631207172715-0002");

//Initialize contexts
JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(sparkContext);

//execute command
sqlContext.sql("show tables").show();

Spark dependencies in *pom.xml* look like this:


  org.apache.spark
  spark-core_2.10
  1.5.2



  org.apache.spark
  spark-sql_2.10
  1.5.2



  org.apache.spark
  spark-hive_2.10
  1.5.2



  org.apache.spark
  spark-repl_2.10
  1.5.2


Here is the error I am getting:

java.lang.NoSuchMethodError:
com.fasterxml.jackson.module.scala.deser.BigDecimalDeserializer$.handledType()Ljava/lang/Class;

The *stack trace* is here .

My application is a web application running on Tomcat 7. I don’t have any
other configuration files. What could I be doing wrong? Could it be some
dependency conflict, since I am able to run the same code in a clean
project?
I found an issue  that
gives some more information about the problem.

Regards,

Matheus​


Mesos scheduler obeying limit of tasks / executor

2015-12-09 Thread Charles Allen
I have a spark app in development which has relatively strict cpu/mem
ratios that are required. As such, I cannot arbitrarily add CPUs to a
limited memory size.

The general spark cluster behaves as expected, where tasks are launched
with a specified memory/cpu ratio, but the mesos scheduler seems to ignore
this.

Specifically, I cannot find where in the code the limit of number of tasks
per executor of "spark.executor.cores" / "spark.task.cpus" is enforced in
the MesosBackendScheduler.

The Spark App in question has some JVM heap heavy activities inside a
RDD.mapPartitionsWithIndex, so having more tasks per limited JVM memory
resource is bad. The workaround planned handling of this is to limit the
number of tasks per JVM, which does not seem possible in mesos mode, where
it seems to just keep stacking on CPUs as tasks come in without adjusting
any memory constraints, or looking for limits of tasks per executor.

How can I limit the tasks per executor (or per memory pool) in the Mesos
backend scheduler?

Thanks,
Charles Allen


Re: Multiple drivers, same worker

2015-12-09 Thread Ted Yu
When this happened, did you have a chance to take jstack of the stuck
driver process ?

Thanks

On Wed, Dec 9, 2015 at 11:38 AM, andresb...@gmail.com 
wrote:

> Forgot to mention that it doesn't happen every time, it's pretty random so
> far. We've have complete days when it behaves just fine and others when it
> gets crazy. We're using spark 1.5.2
>
> 2015-12-09 13:33 GMT-06:00 andresb...@gmail.com :
>
>> Hi everyone,
>>
>> We've been getting an issue with spark lately where multiple drivers are
>> assigned to a same worker but resources are never assigned to them and get
>> "stuck" forever.
>>
>> If I login in the worker machine I see that the driver processes aren't
>> really running and the worker's log don't show any error or anything
>> related to the driver. The master UI does show the drivers as submitted and
>> in RUNNING state.
>>
>>
>> Not sure where else to look for clues, any ideas?
>>
>> --
>> Andrés Blanco Morales
>>
>
>
>
> --
> Andrés Blanco Morales
>


Re: Multiple drivers, same worker

2015-12-09 Thread andresb...@gmail.com
Sadly, no.

The only evidence I have is the master's log which shows that the Driver
was requested:

15/12/09 18:25:06 INFO Master: Driver submitted
org.apache.spark.deploy.worker.DriverWrapper
15/12/09 18:25:06 INFO Master: Launching driver driver-20151209182506-0164
on worker worker-20151209181534-172.31.31.159-7077



2015-12-09 14:19 GMT-06:00 Ted Yu :

> When this happened, did you have a chance to take jstack of the stuck
> driver process ?
>
> Thanks
>
> On Wed, Dec 9, 2015 at 11:38 AM, andresb...@gmail.com <
> andresb...@gmail.com> wrote:
>
>> Forgot to mention that it doesn't happen every time, it's pretty random
>> so far. We've have complete days when it behaves just fine and others when
>> it gets crazy. We're using spark 1.5.2
>>
>> 2015-12-09 13:33 GMT-06:00 andresb...@gmail.com :
>>
>>> Hi everyone,
>>>
>>> We've been getting an issue with spark lately where multiple drivers are
>>> assigned to a same worker but resources are never assigned to them and get
>>> "stuck" forever.
>>>
>>> If I login in the worker machine I see that the driver processes aren't
>>> really running and the worker's log don't show any error or anything
>>> related to the driver. The master UI does show the drivers as submitted and
>>> in RUNNING state.
>>>
>>>
>>> Not sure where else to look for clues, any ideas?
>>>
>>> --
>>> Andrés Blanco Morales
>>>
>>
>>
>>
>> --
>> Andrés Blanco Morales
>>
>
>


-- 
Andrés Blanco Morales


Re: Multiple drivers, same worker

2015-12-09 Thread andresb...@gmail.com
Ok, attached you can see the jstack

2015-12-09 14:22 GMT-06:00 andresb...@gmail.com :

> Sadly, no.
>
> The only evidence I have is the master's log which shows that the Driver
> was requested:
>
> 15/12/09 18:25:06 INFO Master: Driver submitted
> org.apache.spark.deploy.worker.DriverWrapper
> 15/12/09 18:25:06 INFO Master: Launching driver driver-20151209182506-0164
> on worker worker-20151209181534-172.31.31.159-7077
>
>
>
> 2015-12-09 14:19 GMT-06:00 Ted Yu :
>
>> When this happened, did you have a chance to take jstack of the stuck
>> driver process ?
>>
>> Thanks
>>
>> On Wed, Dec 9, 2015 at 11:38 AM, andresb...@gmail.com <
>> andresb...@gmail.com> wrote:
>>
>>> Forgot to mention that it doesn't happen every time, it's pretty random
>>> so far. We've have complete days when it behaves just fine and others when
>>> it gets crazy. We're using spark 1.5.2
>>>
>>> 2015-12-09 13:33 GMT-06:00 andresb...@gmail.com :
>>>
 Hi everyone,

 We've been getting an issue with spark lately where multiple drivers
 are assigned to a same worker but resources are never assigned to them and
 get "stuck" forever.

 If I login in the worker machine I see that the driver processes aren't
 really running and the worker's log don't show any error or anything
 related to the driver. The master UI does show the drivers as submitted and
 in RUNNING state.


 Not sure where else to look for clues, any ideas?

 --
 Andrés Blanco Morales

>>>
>>>
>>>
>>> --
>>> Andrés Blanco Morales
>>>
>>
>>
>
>
> --
> Andrés Blanco Morales
>



-- 
Andrés Blanco Morales


jstack
Description: Binary data

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: Saving RDDs in Tachyon

2015-12-09 Thread Calvin Jia
Hi Mark,

Were you able to successfully store the RDD with Akhil's method? When you
read it back as an objectFile, you will also need to specify the correct
type.

You can find more information about integrating Spark and Tachyon on this
page: http://tachyon-project.org/documentation/Running-Spark-on-Tachyon.html
.

Hope this helps,
Calvin

On Fri, Oct 30, 2015 at 7:04 AM, Akhil Das 
wrote:

> I guess you can do a .saveAsObjectFiles and read it back as sc.objectFile
>
> Thanks
> Best Regards
>
> On Fri, Oct 23, 2015 at 7:57 AM, mark  wrote:
>
>> I have Avro records stored in Parquet files in HDFS. I want to read these
>> out as an RDD and save that RDD in Tachyon for any spark job that wants the
>> data.
>>
>> How do I save the RDD in Tachyon? What format do I use? Which RDD
>> 'saveAs...' method do I want?
>>
>> Thanks
>>
>
>


HTTP Source for Spark Streaming

2015-12-09 Thread Sourav Mazumder
Hi All,

Currently is there a way using which one can connect to a http server to
get data as a dstream at a given frequency ?

Or one has to write own utility for the same ?

Regards,
Sourav


Re: SparkStreaming variable scope

2015-12-09 Thread Bryan Cutler
rowid from your code is a variable in the driver, so it will be evaluated
once and then only the value is sent to words.map.  You probably want to
have rowid be a lambda itself, so that it will get the value at the time it
is evaluated.  For example if I have the following:

>>> data = sc.parallelize([1,2,3])
>>> from datetime import datetime
>>> rowid = lambda: datetime.now().strftime("%Y%m%d%H%M%S")
>>> data.map(lambda x: (rowid(), x))
>>> mdata = data.map(lambda x: (rowid(), x))
>>> mdata.collect()
[('20151209121532', 1), ('20151209121532', 2), ('20151209121532', 3)]
>>> mdata.collect()
[('20151209121540', 1), ('20151209121540', 2), ('20151209121540', 3)]

here rowid is evaluated whenever an action is called on the RDD, i.e.
collect

On Wed, Dec 9, 2015 at 10:23 AM, jpinela  wrote:

> Hi Guys,
> I am sure this is a simple question, but I can't find it in the docs
> anywhere.
> This reads from flume and writes to hbase (as you can see).
> But has a variable scope problem (I believe).
> I have the following code:
>
> *
> from pyspark.streaming import StreamingContext
> from pyspark.streaming.flume import FlumeUtils
> from datetime import datetime
> ssc = StreamingContext(sc, 5)
> conf = {"hbase.zookeeper.quorum": "ubuntu3",
> "hbase.mapred.outputtable": "teste2",
> "mapreduce.outputformat.class":
> "org.apache.hadoop.hbase.mapreduce.TableOutputFormat",
> "mapreduce.job.output.key.class":
> "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
> "mapreduce.job.output.value.class":
> "org.apache.hadoop.io.Writable"}
>
>
> keyConv =
>
> "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
> valueConv =
> "org.apache.spark.examples.pythonconverters.StringListToPutConverter"
>
> lines = FlumeUtils.createStream(ssc, 'ubuntu3', 9997)
> words = lines.map(lambda line: line[1])
> rowid = datetime.now().strftime("%Y%m%d%H%M%S")
> outrdd= words.map(lambda x: (str(1),[rowid,"cf1desc","col1",x]))
> print("ok 1")
> outrdd.pprint()
>
> outrdd.foreachRDD(lambda x:
>
> x.saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv))
>
> ssc.start()
> ssc.awaitTermination()*
>
> the issue is that the rowid variable is allways at the point that the
> streaming was began.
> How can I go around this? I tried a function, an application, nothing
> worked.
> Thank you.
> jp
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/SparkStreaming-variable-scope-tp25652.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: can i process multiple batch in parallel in spark streaming

2015-12-09 Thread prateek arora
Hi Thanks

In my scenario batches are independent .so is it safe to use in production
environment ?

Regards
Prateek

On Wed, Dec 9, 2015 at 11:39 AM, Ted Yu  wrote:

> Have you seen this thread ?
>
> http://search-hadoop.com/m/q3RTtgSGrobJ3Je
>
> On Wed, Dec 9, 2015 at 11:12 AM, prateek arora  > wrote:
>
>> Hi
>>
>> when i run my spark streaming application .. following information show on
>> application streaming UI.
>> i am using spark 1.5.0
>>
>>
>> Batch Time  Input Size   Scheduling Delay (?) Processing Time
>> (?)
>> Status
>> 2015/12/09 11:00:42 107 events  -   -
>>queued
>> 2015/12/09 11:00:41 103 events  -   -
>>queued
>> 2015/12/09 11:00:40 107 events  -   -
>>queued
>> 2015/12/09 11:00:39 105 events  -   -
>>queued
>> 2015/12/09 11:00:38 109 events  -   -
>>queued
>> 2015/12/09 11:00:37 106 events  -   -
>>queued
>> 2015/12/09 11:00:36 109 events  -   -
>>queued
>> 2015/12/09 11:00:35 113 events  -   -
>>queued
>> 2015/12/09 11:00:34 109 events  -   -
>>queued
>> 2015/12/09 11:00:33 107 events  -   -
>>queued
>> 2015/12/09 11:00:32 99 events   42 s-
>>processing
>>
>>
>>
>> it seems batches push into queue and work like FIFO manner  . is it
>> possible
>> all my Active batches start processing in parallel.
>>
>> Regards
>> Prateek
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/can-i-process-multiple-batch-in-parallel-in-spark-streaming-tp25653.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Release data for spark 1.6?

2015-12-09 Thread Michael Armbrust
The release date is "as soon as possible".  In order to make an Apache
release we must present a release candidate and have 72-hours of voting by
the PMC.  As soon as there are no known bugs, the vote will pass and 1.6
will be released.

In the mean time, I'd love support from the community testing the most
recent release candidate.

On Wed, Dec 9, 2015 at 2:19 PM, Sri  wrote:

> Hi Ted,
>
> Thanks for the info , but there is no particular release date from my
> understanding the package is in testing there is no release date mentioned.
>
> Thanks
> Sri
>
>
>
> Sent from my iPhone
>
> > On 9 Dec 2015, at 21:38, Ted Yu  wrote:
> >
> > See this thread:
> >
> >
> http://search-hadoop.com/m/q3RTtBMZpK7lEFB1/Spark+1.6.0+RC=Re+VOTE+Release+Apache+Spark+1+6+0+RC1+
> >
> >> On Dec 9, 2015, at 1:20 PM, "kali.tumm...@gmail.com" <
> kali.tumm...@gmail.com> wrote:
> >>
> >> Hi All,
> >>
> >> does anyone know exact release data for spark 1.6 ?
> >>
> >> Thanks
> >> Sri
> >>
> >>
> >>
> >> --
> >> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Release-data-for-spark-1-6-tp25654.html
> >> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >>
> >> -
> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >> For additional commands, e-mail: user-h...@spark.apache.org
> >>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


SparkML. RandomForest predict performance for small dataset.

2015-12-09 Thread Eugene Morozov
Hello,

I'm using RandomForest pipeline (ml package). Everything is working fine
(learning models, prediction, etc), but I'd like to tune it for the case,
when I predict with small dataset.
My issue is that when I apply

(PipelineModel)model.transform(dataset)

The model consists of the following stages:

StringIndexerModel labelIndexer = new StringIndexer()...
RandomForestClassifier classifier = new RandomForestClassifier()...
IndexToString labelConverter = new IndexToString()...
Pipeline pipeline = new Pipeline().setStages(new
PipelineStage[]{labelIndexer, classifier, labelConverter});

it obviously takes some time to predict, but when my dataset consists of
just 1 (record) I'd expect it to be really fast.

My observations are even though I use small dataset Spark broadcasts
something over and over again. That's fine, when I load my (serialized)
model from disk and use it just once for prediction, but when I use the
same model in a loop for the same! dataset, I'd say that everything should
already be on a worker nodes, thus I'd expect prediction to be fast.
It takes 20 seconds to predict dataset once (with one input row) and all
subsequent predictions over the same dataset with the same model takes
roughly 10 seconds.
My goal is to have 0.5 - 1 second response.

My intention was to keep learned model on a driver (that's stay online with
created SparkContext) to use it for any subsequent predictions, but these
10 seconds predictions basically kill the whole idea.

Is it possible somehow to distribute the model over the cluster upfront so
that the prediction is really fast?
Are there any specific params to apply to the PipelineModel to stay
resident on a worker nodes? Anything to keep and reuse broadcasted data?

Thanks in advance.
--
Be well!
Jean Morozov


Re: Recursive nested wildcard directory walking in Spark

2015-12-09 Thread James Ding
I’ve set “mapreduce.input.fileinputformat.input.dir.recursive” to “true” in
the SparkConf I use to instantiate SparkContext, and I confirm this at
runtime in my scala job to print out this property, but
sparkContext.textFile(“/foo/*/bar/*.gz”) still fails (so do /foo/**/bar/*.gz
and /foo/*/*/bar/*.gz).

Any thoughts or workarounds? I’m considering using bash globbing to match
files recursively and feed hundreds of thousands of arguments to
spark-submit. Reasons for/against?

From:  Ted Yu 
Date:  Wednesday, December 9, 2015 at 3:50 PM
To:  James Ding 
Cc:  "user@spark.apache.org" 
Subject:  Re: Recursive nested wildcard directory walking in Spark

Have you seen this thread ?

http://search-hadoop.com/m/q3RTt2uhMX1UhnCc1=Re+Does+sc+newAPIHadoopFil
e+support+multiple+directories+or+nested+directories+
 

FYI

On Wed, Dec 9, 2015 at 11:18 AM, James Ding  wrote:
> Hi!
> 
> My name is James, and I’m working on a question there doesn’t seem to be a lot
> of answers about online. I was hoping spark/hadoop gurus could shed some light
> on this.
> 
> I have a data feed on NFS that looks like /foobar/.gz
> Currently I have a spark scala job that calls
> sparkContext.textFile("/foo/*/*/*/bar/*.gz")
> Upstream owners for the data feed have told me they may add additional nested
> directories or remove them from files relevant to me. In other words, files
> relevant to my spark job might sit on paths that look like:
> * /foo/a/b/c/d/bar/*.gz
> * /foo/a/b/bar/*.gz
> They will do this with only some files and without warning. Anyone have ideas
> on how I can configure spark to create an RDD from any textfiles that fit the
> pattern /foo/**/bar/*.gz where ** represents a variable number of wildcard
> directories?
> I'm working with on order of 10^5 and 10^6 files which has discouraged me from
> using something besides Hadoop fs API to walk the filesystem and feed that
> input to my spark job, but I'm open to suggestions here also.
> Thanks!
> James Ding





smime.p7s
Description: S/MIME cryptographic signature


Re: Release data for spark 1.6?

2015-12-09 Thread Ted Yu
See this thread:

http://search-hadoop.com/m/q3RTtBMZpK7lEFB1/Spark+1.6.0+RC=Re+VOTE+Release+Apache+Spark+1+6+0+RC1+

> On Dec 9, 2015, at 1:20 PM, "kali.tumm...@gmail.com"  
> wrote:
> 
> Hi All, 
> 
> does anyone know exact release data for spark 1.6 ?
> 
> Thanks
> Sri
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Release-data-for-spark-1-6-tp25654.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Release data for spark 1.6?

2015-12-09 Thread Sri
Hi Ted,

Thanks for the info , but there is no particular release date from my 
understanding the package is in testing there is no release date mentioned.

Thanks
Sri



Sent from my iPhone

> On 9 Dec 2015, at 21:38, Ted Yu  wrote:
> 
> See this thread:
> 
> http://search-hadoop.com/m/q3RTtBMZpK7lEFB1/Spark+1.6.0+RC=Re+VOTE+Release+Apache+Spark+1+6+0+RC1+
> 
>> On Dec 9, 2015, at 1:20 PM, "kali.tumm...@gmail.com" 
>>  wrote:
>> 
>> Hi All, 
>> 
>> does anyone know exact release data for spark 1.6 ?
>> 
>> Thanks
>> Sri
>> 
>> 
>> 
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/Release-data-for-spark-1-6-tp25654.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> 
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: HiveContext creation failed with Kerberos

2015-12-09 Thread Neal Yin
Thanks Steve!

I tried both 1.5.3 and 1.6.0 from git-clone-build.1.5.3 is still broken.  
1.6.0 does work!
This one is probably the one.  https://issues.apache.org/jira/browse/SPARK-11821

-Neal


From: Steve Loughran >
Date: Tuesday, December 8, 2015 at 4:09 AM
To: "user@spark.apache.org" 
>
Subject: Re: HiveContext creation failed with Kerberos




On 8 Dec 2015, at 06:52, Neal Yin 
> wrote:

15/12/08 04:12:28 ERROR transport.TSaslTransport: SASL negotiation failure
javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: 
No valid credentials provided (Mechanism level: Failed to find any Kerberos 
tgt)]

lots of causes for that, its one of the two classic "kerberos doesn't like you" 
error messages

https://steveloughran.gitbooks.io/kerberos_and_hadoop/content/sections/errors.html

in your case it sounds like 1 or more of these issues

https://issues.apache.org/jira/browse/SPARK-10181
https://issues.apache.org/jira/browse/SPARK-11821
https://issues.apache.org/jira/browse/SPARK-11265

All of which are fixed in 1.5.3. Which doesn't help you with the CDH 1.5.2 
release, unless they backported things

-Steve


Re: Recursive nested wildcard directory walking in Spark

2015-12-09 Thread Ted Yu
Have you seen this thread ?

http://search-hadoop.com/m/q3RTt2uhMX1UhnCc1=Re+Does+sc+newAPIHadoopFile+support+multiple+directories+or+nested+directories+

FYI

On Wed, Dec 9, 2015 at 11:18 AM, James Ding  wrote:

> Hi!
>
> My name is James, and I’m working on a question there doesn’t seem to be a
> lot of answers about online. I was hoping spark/hadoop gurus could shed
> some light on this.
>
> I have a data feed on NFS that looks like /foobar/.gz
>
> Currently I have a spark scala job that calls
>
> sparkContext.textFile("/foo/*/*/*/bar/*.gz")
>
> Upstream owners for the data feed have told me they may add additional
> nested directories or remove them from files relevant to me. In other
> words, files relevant to my spark job might sit on paths that look like:
>
>- /foo/a/b/c/d/bar/*.gz
>- /foo/a/b/bar/*.gz
>
> They will do this with only some files and without warning. Anyone have
> ideas on how I can configure spark to create an RDD from any textfiles that
> fit the pattern /foo/**/bar/*.gz where ** represents a variable number of
> wildcard directories?
>
> I'm working with on order of 10^5 and 10^6 files which has discouraged me
> from using something besides Hadoop fs API to walk the filesystem and feed
> that input to my spark job, but I'm open to suggestions here also.
>
> Thanks!
>
> James Ding
>


Release data for spark 1.6?

2015-12-09 Thread kali.tumm...@gmail.com
Hi All, 

does anyone know exact release data for spark 1.6 ?

Thanks
Sri



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Release-data-for-spark-1-6-tp25654.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Is Spark History Server supported for Mesos?

2015-12-09 Thread Kelvin Chu
Spark on YARN can use History Server by setting the configuration
spark.yarn.historyServer.address. But, I can't find similar config for
Mesos. Is History Server supported by Spark on Mesos? Thanks.

Kelvin


count distinct in spark sql aggregation

2015-12-09 Thread fightf...@163.com
Hi, 
I have a use case that need to get daily, weekly or monthly active users count 
according to the native hourly data, say as a large datasets.
The native datasets are instantly updated and I want to get the distinct active 
user count per time dimension. Anyone can show some 
efficient way of reaching this ? 
If I want to get daily active distinct user count , I would get this day's each 
hour dataset and do some calculation ? My initial thought on this
is to use a key value store and use a hashset to store the hourly userid. Then 
I can compare and distinct each hourly userid set and got the 
daily distinct count. However , I am not sure about this implementation can be 
some efficient workaround. 
Hope some guys can shed a little light on this.

Best,
Sun.



fightf...@163.com


StackOverflowError when writing dataframe to table

2015-12-09 Thread apu mishra . rr
The command

mydataframe.write.saveAsTable(name="tablename")

sometimes results in java.lang.StackOverflowError (see below for fuller
error message).

This is after I am able to successfully run cache() and show() methods on
mydataframe.

The issue is not deterministic, i.e. the same code sometimes works fine,
sometimes not.

I am running PySpark with:

spark-submit --master local[*] --driver-memory 24g --executor-memory 24g

Any help understanding this issue would be appreciated!

Thanks, Apu

Fuller error message:

Exception in thread "dag-scheduler-event-loop" java.lang.StackOverflowError

at
java.io.ObjectOutputStream$HandleTable.assign(ObjectOutputStream.java:2281)

at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1428)

at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

at
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)

at
scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:468)

at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:497)

at
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)

at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)

at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

at
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)

at
scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:468)

at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:497)

at
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)

at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)

at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

at

Re: distcp suddenly broken with spark-ec2 script setup

2015-12-09 Thread Alex Gittens
BTW, yes the referenced s3 bucket does exist, and

hdfs dfs -ls s3n://agittens/CFSRArawtars

does list the entries, although it first prints the same warnings:

015-12-10 00:26:53,815 WARN  httpclient.RestS3Service
(RestS3Service.java:performRequest(393)) - Response '/CFSRArawtars' -
Unexpected response code 404, expected 200
2015-12-10 00:26:53,909 WARN  httpclient.RestS3Service
(RestS3Service.java:performRequest(393)) - Response
'/CFSRArawtars_%24folder%24' - Unexpected response code 404, expected 200
2015-12-10 00:26:54,243 WARN  httpclient.RestS3Service
(RestS3Service.java:performRequest(393)) - Response '/CFSRArawtars' -
Unexpected response code 404, expected 200
Found 2306 items
-rwxrwxrwx   1 177408 2015-11-18 00:26
s3n://agittens/CFSRArawtars/completefilelist
-rwxrwxrwx   1 60 2015-11-18 00:26
s3n://agittens/CFSRArawtars/copyallfileshere.sh
-rwxrwxrwx   1 1814040064 2015-11-18 00:26
s3n://agittens/CFSRArawtars/pgbh02.gdas.19790101-19790105.tar
-rwxrwxrwx   1 1788727808 2015-11-18 00:27
s3n://agittens/CFSRArawtars/pgbh02.gdas.19790106-19790110.tar
-rw
...

On Wed, Dec 9, 2015 at 4:24 PM, AlexG  wrote:

> I've been using the same method to launch my clusters then pull my data
> from
> S3 to local hdfs:
>
> $SPARKHOME/ec2/spark-ec2 -k mykey -i ~/.ssh/mykey.pem -s 29
> --instance-type=r3.8xlarge --placement-group=pcavariants
> --copy-aws-credentials --hadoop-major-version=2 --spot-price=2.8 launch
> mycluster --region=us-west-2
>
> then
>
> ephemeral-hdfs/bin/hadoop distcp s3n://agittens/CFSRArawtars CFSRArawtars
>
> Before this worked as I'd expect. Within the last several days, I've been
> getting this error when I run the distcp command:
> 2015-12-10 00:16:43,113 WARN  httpclient.RestS3Service
> (RestS3Service.java:performRequest(393)) - Response '/CFSRArawtars' -
> Unexpected response code 404, expected 200
> 2015-12-10 00:16:43,207 WARN  httpclient.RestS3Service
> (RestS3Service.java:performRequest(393)) - Response
> '/CFSRArawtars_%24folder%24' - Unexpected response code 404, expected 200
> 2015-12-10 00:16:43,422 WARN  httpclient.RestS3Service
> (RestS3Service.java:performRequest(393)) - Response '/CFSRArawtars' -
> Unexpected response code 404, expected 200
> 2015-12-10 00:16:43,513 WARN  httpclient.RestS3Service
> (RestS3Service.java:performRequest(393)) - Response
> '/CFSRArawtars_%24folder%24' - Unexpected response code 404, expected 200
> 2015-12-10 00:16:43,737 WARN  httpclient.RestS3Service
> (RestS3Service.java:performRequest(393)) - Response '/CFSRArawtars' -
> Unexpected response code 404, expected 200
> 2015-12-10 00:16:43,830 WARN  httpclient.RestS3Service
> (RestS3Service.java:performRequest(393)) - Response
> '/CFSRArawtars_%24folder%24' - Unexpected response code 404, expected 200
> 2015-12-10 00:16:44,015 WARN  httpclient.RestS3Service
> (RestS3Service.java:performRequest(393)) - Response '/CFSRArawtars' -
> Unexpected response code 404, expected 200
> 2015-12-10 00:16:46,141 WARN  conf.Configuration
> (Configuration.java:warnOnceIfDeprecated(824)) - io.sort.mb is deprecated.
> Instead, use mapreduce.task.io.sort.mb
> 2015-12-10 00:16:46,141 WARN  conf.Configuration
> (Configuration.java:warnOnceIfDeprecated(824)) - io.sort.factor is
> deprecated. Instead, use mapreduce.task.io.sort.factor
> 2015-12-10 00:16:46,630 INFO  service.AbstractService
> (AbstractService.java:init(81)) -
> Service:org.apache.hadoop.yarn.client.YarnClientImpl is inited.
> 2015-12-10 00:16:46,630 INFO  service.AbstractService
> (AbstractService.java:start(94)) -
> Service:org.apache.hadoop.yarn.client.YarnClientImpl is started.
> 2015-12-10 00:16:47,135 INFO  mapreduce.JobSubmitter
> (JobSubmitter.java:submitJobInternal(368)) - number of splits:21
>
> Then the job hangs and does nothing until I kill it. Any idea what the
> problem is and how to fix it, or a work-around for getting my data off S3
> quickly? It is around 4 TB.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/distcp-suddenly-broken-with-spark-ec2-script-setup-tp25658.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


distcp suddenly broken with spark-ec2 script setup

2015-12-09 Thread AlexG
I've been using the same method to launch my clusters then pull my data from
S3 to local hdfs:

$SPARKHOME/ec2/spark-ec2 -k mykey -i ~/.ssh/mykey.pem -s 29
--instance-type=r3.8xlarge --placement-group=pcavariants
--copy-aws-credentials --hadoop-major-version=2 --spot-price=2.8 launch
mycluster --region=us-west-2

then

ephemeral-hdfs/bin/hadoop distcp s3n://agittens/CFSRArawtars CFSRArawtars

Before this worked as I'd expect. Within the last several days, I've been
getting this error when I run the distcp command:
2015-12-10 00:16:43,113 WARN  httpclient.RestS3Service
(RestS3Service.java:performRequest(393)) - Response '/CFSRArawtars' -
Unexpected response code 404, expected 200
2015-12-10 00:16:43,207 WARN  httpclient.RestS3Service
(RestS3Service.java:performRequest(393)) - Response
'/CFSRArawtars_%24folder%24' - Unexpected response code 404, expected 200
2015-12-10 00:16:43,422 WARN  httpclient.RestS3Service
(RestS3Service.java:performRequest(393)) - Response '/CFSRArawtars' -
Unexpected response code 404, expected 200
2015-12-10 00:16:43,513 WARN  httpclient.RestS3Service
(RestS3Service.java:performRequest(393)) - Response
'/CFSRArawtars_%24folder%24' - Unexpected response code 404, expected 200
2015-12-10 00:16:43,737 WARN  httpclient.RestS3Service
(RestS3Service.java:performRequest(393)) - Response '/CFSRArawtars' -
Unexpected response code 404, expected 200
2015-12-10 00:16:43,830 WARN  httpclient.RestS3Service
(RestS3Service.java:performRequest(393)) - Response
'/CFSRArawtars_%24folder%24' - Unexpected response code 404, expected 200
2015-12-10 00:16:44,015 WARN  httpclient.RestS3Service
(RestS3Service.java:performRequest(393)) - Response '/CFSRArawtars' -
Unexpected response code 404, expected 200
2015-12-10 00:16:46,141 WARN  conf.Configuration
(Configuration.java:warnOnceIfDeprecated(824)) - io.sort.mb is deprecated.
Instead, use mapreduce.task.io.sort.mb
2015-12-10 00:16:46,141 WARN  conf.Configuration
(Configuration.java:warnOnceIfDeprecated(824)) - io.sort.factor is
deprecated. Instead, use mapreduce.task.io.sort.factor
2015-12-10 00:16:46,630 INFO  service.AbstractService
(AbstractService.java:init(81)) -
Service:org.apache.hadoop.yarn.client.YarnClientImpl is inited.
2015-12-10 00:16:46,630 INFO  service.AbstractService
(AbstractService.java:start(94)) -
Service:org.apache.hadoop.yarn.client.YarnClientImpl is started.
2015-12-10 00:16:47,135 INFO  mapreduce.JobSubmitter
(JobSubmitter.java:submitJobInternal(368)) - number of splits:21

Then the job hangs and does nothing until I kill it. Any idea what the
problem is and how to fix it, or a work-around for getting my data off S3
quickly? It is around 4 TB.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/distcp-suddenly-broken-with-spark-ec2-script-setup-tp25658.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: RDD.isEmpty

2015-12-09 Thread Pat Ferrel
I ran a 124M dataset on my laptop
with isEmpty it took 32 minutes
without isEmpty it took 18 minutes all but 1.5 minutes were in writing to 
Elasticsearch, which is on the same laptop

So excluding the time writing to Elasticsearch, which was nearly the same in 
both cases, the core Spark code took 10x longer with isEmpty. There are other 
isEmpty calls that I’ll optimize away but they are much smaller gains. Also 
strike the comparison to take(1), pretend I never said that.

I can avoid isEmpty but still a bit of a head scratcher.


> On Dec 9, 2015, at 11:53 AM, Sean Owen  wrote:
> 
> On Wed, Dec 9, 2015 at 7:49 PM, Pat Ferrel  wrote:
>> The “Any” is required by the code it is being passed to, which is the
>> Elasticsearch Spark index writing code. The values are actually RDD[(String,
>> Map[String, String])]
> 
> (Is it frequently a big big map by any chance?)

No, 5000 chars or so per Map.

> 
>> No shuffle that I know of. RDDs are created from the output of Mahout
>> SimilarityAnalysis.cooccurrence and are turned into RDD[(String, Map[String,
>> String])], Since all actual values are simple there is no serialization
>> except for standard Java/Scala so no custom serializers or use of Kryo.
> 
> It's still worth looking at the stages in the job.
> 
> 
>> I understand that the driver can’t know, I was suggesting that isEmpty could
>> be backed by a boolean RDD member variable calculated for every RDD at
>> creation time in Spark. This is really the only way to solve generally since
>> sometimes you get an RDD from a lib, so wrapping it as I suggested is not
>> practical, it would have to be in Spark. BTW the same approach could be used
>> for count, an accumulator per RDD, then returned as a pre-calculated RDD
>> state value.
> 
> What would the boolean do? you can't cache the size in general even if
> you know it, but you don't know it at the time the RDD is created
> (before it's evaluated).

Sorry, maybe I misunderstand but either the accumulator being referenced causes 
the DAG to be executed up to the right spot or you have to checkpoint, either 
way we get the count from a fully executed closure.

> 
>> Are you suggesting that both take(1) and isEmpty are unusable for some
>> reason in my case? I can pass around this information if I have to, I just
>> thought the worst case was O(n) where n was number of partitions and
>> therefor always trivial to calculate.
> 
> No, just trying to guess at reasons you observe what you do. There's
> no difference between isEmpty and take(1) if there are > 0 partitions,
> so if they behave very differently it's something to do with what
> you're measuring and how.
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Re: Spark RDD cache persistence

2015-12-09 Thread Calvin Jia
Hi Deepak,

For persistence across Spark jobs, you can store and access the RDDs in
Tachyon. Tachyon works with ramdisk which would give you similar in-memory
performance you would have within a Spark job.

For more information, you can take a look at the docs on Tachyon-Spark
integration:
http://tachyon-project.org/documentation/Running-Spark-on-Tachyon.html

Hope this helps,
Calvin

On Thu, Nov 5, 2015 at 10:29 PM, Deenar Toraskar 
wrote:

> You can have a long running Spark context in several fashions. This will
> ensure your data will be cached in memory. Clients will access the RDD
> through a REST API that you can expose. See the Spark Job Server, it does
> something similar. It has something called Named RDDs
>
> Using Named RDDs
>
> Named RDDs are a way to easily share RDDs among job. Using this facility,
> computed RDDs can be cached with a given name and later on retrieved. To
> use this feature, the SparkJob needs to mixinNamedRddSupport:
>
> Alternatively if you use the Spark Thrift Server, any cached
> dataframes/RDDs will be available to all clients of Spark via the Thrift
> Server until it is shutdown.
>
> If you want to support key value lookups you might want to use IndexedRDD
> 
>
> Finally not the same as sharing RDDs, Tachyon can cache underlying HDFS
> blocks.
>
> Deenar
>
> *Think Reactive Ltd*
> deenar.toras...@thinkreactive.co.uk
> 07714140812
>
>
>
> On 6 November 2015 at 05:56, r7raul1...@163.com 
>  wrote:
>
>> You can try
>> http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html#Archival_Storage_SSD__Memory
>>  .
>>   Hive tmp table use this function to speed
>
>
> On 6 November 2015 at 05:56, r7raul1...@163.com 
> wrote:
>
>> You can try
>> http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html#Archival_Storage_SSD__Memory
>>  .
>>   Hive tmp table use this function to speed job.
>> https://issues.apache.org/jira/browse/HIVE-7313
>>
>> --
>> r7raul1...@163.com
>>
>>
>> *From:* Christian 
>> *Date:* 2015-11-06 13:50
>> *To:* Deepak Sharma 
>> *CC:* user 
>> *Subject:* Re: Spark RDD cache persistence
>> I've never had this need and I've never done it. There are options that
>> allow this. For example, I know there are web apps out there that work like
>> the spark REPL. One of these I think is called Zepplin. . I've never used
>> them, but I've seen them demoed. There is also Tachyon that Spark
>> supports.. Hopefully, that gives you a place to start.
>> On Thu, Nov 5, 2015 at 9:21 PM Deepak Sharma 
>> wrote:
>>
>>> Thanks Christian.
>>> So is there any inbuilt mechanism in spark or api integration  to other
>>> inmemory cache products such as redis to load the RDD to these system upon
>>> program exit ?
>>> What's the best approach to have long lived RDD cache ?
>>> Thanks
>>>
>>>
>>> Deepak
>>> On 6 Nov 2015 8:34 am, "Christian"  wrote:
>>>
 The cache gets cleared out when the job finishes. I am not aware of a
 way to keep the cache around between jobs. You could save it as an object
 file to disk and load it as an object file on your next job for speed.
 On Thu, Nov 5, 2015 at 6:17 PM Deepak Sharma 
 wrote:

> Hi All
> I am confused on RDD persistence in cache .
> If I cache RDD , is it going to stay there in memory even if my spark
> program completes execution , which created it.
> If not , how can I guarantee that RDD is persisted in cache even after
> the program finishes execution.
>
> Thanks
>
>
> Deepak
>

>


how to reference aggregate columns

2015-12-09 Thread skaarthik oss
I am trying to process an aggregate column. The name of the column is
"SUM(clicks)" which is automatically assigned when I use SUM operator on a
column named "clicks". I am trying to find the max value in this aggregated
column. However, using max operator on this aggregated columns results in
parser error.

How can I reference an aggregated column when applying operators?

Code to repro this behavior:
val rows = Seq (("X", 1), ("Y", 5), ("Z", 4))
val rdd = sc.parallelize(rows)
val dataFrame = rdd.toDF("user","clicks")
val sumDf = dataFrame.groupBy("user").agg(("clicks", "sum"))
sumDf.registerTempTable("tempTable")
val ttrows = sqlContext.sql("select * from tempTable")
ttrows.show  /* column names are "user" & "SUM(clicks))" */
val ttrows2 = sqlContext.sql("select max(SUM(clicks)) from tempTable") /*
this line results in following error */
val sumCol = ttrows.select("SUM(clicks)") /* this works fine */


15/12/09 17:50:42.938 INFO ParseDriver: Parsing command: select
max(SUM(clicks)) from tempTable
15/12/09 17:50:42.938 INFO ParseDriver: Parse Completed
org.apache.spark.sql.AnalysisException: cannot resolve 'clicks' given input
columns user, SUM(clicks); line 1 pos 15
at
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:63)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:52)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:286)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:286)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:285)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:299)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to
(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenUp(TreeNode.scala:329)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:283)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:299)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to
(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenUp(TreeNode.scala:329)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:283)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:299)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at 

IP error on starting spark-shell on windows 7

2015-12-09 Thread Stefan Karos
On starting spark-shell I see this just before the scala prompt:

WARN : Your hostname, BloomBear-SSD resolves to a loopback/non-reachable
address: fe80:0:0:0:0:5efe:c0a8:317%net10, but we couldn't find any
external IP address!

I get this error even when firewall is disabled.
I also tried setting the environment variable SPARK_IP_LOCAL to various
choices listed below:

SPARK_LOCAL_IP=localhost
SPARK_LOCAL_IP=127.0.0.1
SPARK_LOCAL_IP=192.168.1.88   (my local machine's IPv4 address)
SPARK_LOCAL_IP=fe80::eda5:a1a7:be1e:13cb%14  (my local machine's IPv6
address)

I still get this annoying error! How can I resolve this?
See below for my environment

Environment
windows 7 64 bit
Spark 1.5.2
Scala 2.10.6
Python 2.7.10 (from Anaconda)

PATH includes:
C:\Users\Stefan\spark-1.5.2-bin-hadoop2.6\bin
C:\ProgramData\Oracle\Java\javapath
C:\Users\Stefan\scala\bin
C:\Users\Stefan\hadoop-2.6.0\bin
(where the bin\winutils resides)
C:\ProgramData\Oracle\Java\javapath

SYSTEM variables set are:
SPARK_HOME=C:\Users\Stefan\spark-1.5.2-bin-hadoop2.6
JAVA_HOME=C:\Program Files\Java\jre1.8.0_65
HADOOP_HOME=C:\Users\Stefan\hadoop-2.6.0

\tmp\hive directory at root on C; drive with full permissions,
e.g.
>winutils ls \tmp\hive
drwxrwxrwx 1 BloomBear-SSD\Stefan BloomBear-SSD\None 0 Dec  8 2015 \tmp\hive


GLM in apache spark in MLlib

2015-12-09 Thread Arunkumar Pillai
Hi

I'm started using apache spark 1.5.2 version. I'm able to see GLM using
SparkR but it is not there in MLlib. Is there any plans or road map for
that



-- 
Thanks and Regards
Arun


Re: how to reference aggregate columns

2015-12-09 Thread Harsh J
While the DataFrame lookups can identify that anonymous column name,
SparkSql does not appear to do so. You should use an alias instead:

val rows = Seq (("X", 1), ("Y", 5), ("Z", 4))
val rdd = sc.parallelize(rows)
val dataFrame = rdd.toDF("user","clicks")
val sumDf = dataFrame.groupBy("user").agg(sum("clicks").alias("csum"))
sumDf.registerTempTable("tempTable")
val ttrows2 = sqlContext.sql("select max(csum) from tempTable")

On Thu, Dec 10, 2015 at 7:35 AM skaarthik oss 
wrote:

> I am trying to process an aggregate column. The name of the column is
> "SUM(clicks)" which is automatically assigned when I use SUM operator on a
> column named "clicks". I am trying to find the max value in this aggregated
> column. However, using max operator on this aggregated columns results in
> parser error.
>
> How can I reference an aggregated column when applying operators?
>
> Code to repro this behavior:
> val rows = Seq (("X", 1), ("Y", 5), ("Z", 4))
> val rdd = sc.parallelize(rows)
> val dataFrame = rdd.toDF("user","clicks")
> val sumDf = dataFrame.groupBy("user").agg(("clicks", "sum"))
> sumDf.registerTempTable("tempTable")
> val ttrows = sqlContext.sql("select * from tempTable")
> ttrows.show  /* column names are "user" & "SUM(clicks))" */
> val ttrows2 = sqlContext.sql("select max(SUM(clicks)) from tempTable") /*
> this line results in following error */
> val sumCol = ttrows.select("SUM(clicks)") /* this works fine */
>
>
> 15/12/09 17:50:42.938 INFO ParseDriver: Parsing command: select
> max(SUM(clicks)) from tempTable
> 15/12/09 17:50:42.938 INFO ParseDriver: Parse Completed
> org.apache.spark.sql.AnalysisException: cannot resolve 'clicks' given
> input columns user, SUM(clicks); line 1 pos 15
> at
> org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
> at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:63)
> at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:52)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:286)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:286)
> at
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:285)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:299)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
> at scala.collection.TraversableOnce$class.to
> (TraversableOnce.scala:273)
> at scala.collection.AbstractIterator.to(Iterator.scala:1157)
> at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
> at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenUp(TreeNode.scala:329)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:283)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:299)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
> at scala.collection.TraversableOnce$class.to
> (TraversableOnce.scala:273)
> at scala.collection.AbstractIterator.to(Iterator.scala:1157)
> at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
> at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenUp(TreeNode.scala:329)
> 

Re: HTTP Source for Spark Streaming

2015-12-09 Thread Vijay Gharge
Not very clear. Can you elaborate your use case ?

On Thursday 10 December 2015, Sourav Mazumder 
wrote:

> Hi All,
>
> Currently is there a way using which one can connect to a http server to
> get data as a dstream at a given frequency ?
>
> Or one has to write own utility for the same ?
>
> Regards,
> Sourav
>


-- 
Regards,
Vijay Gharge


hbase Put object kryo serialisation error

2015-12-09 Thread Shushant Arora
Hi

I have a javapairrdd pairrdd.
when i do rdd.persist(StorageLevel.MEMORY_AND_DISK()).

It throws exception :

com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID:100

serialisationtrace:

familyMap(org.apache.hadoop.hbase.cleint.Put)

I have registerd Put,and TreeMap in spark kryo serialiser.

whats the issue of this error ?


Full stack trace is :


Exception in thread "main" com.esotericsoftware.kryo.KryoException:
Encountered unregistered class ID: 100
Serialization trace:
familyMap (org.apache.hadoop.hbase.client.Put)
at
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:721)
at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:134)
at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:626)


sortByKey not spilling to disk? (PySpark 1.3)

2015-12-09 Thread YaoPau
I'm running sortByKey on a dataset that's nearly the amount of memory I've
provided to executors (I'd like to keep the amount of used memory low so
other jobs can run), and I'm getting the vague "filesystem closed" error. 
When I re-run with more memory it runs fine.

By default shouldn't sortByKey be spilling to disk?  I'm fine with that,
this is a scheduled job where runtime isn't a big issue, and preserving
memory for other jobs is more important.  What can I do to ensure that
sortByKey spills to disk and doesn't result in that error?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/sortByKey-not-spilling-to-disk-PySpark-1-3-tp25660.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Schedular delay in spark 1.4

2015-12-09 Thread Renu Yadav
Hi,
I am working on spark 1.4 .
I am running a  spark job on a yarn cluster .When number of other jobs are
less then my spark job completes very smoothly and when more number of
small job run on the cluster my spark job starts showing schedular delay at
the end on each stage.

PS:I am running my spark job in high priority queue.

PLEASE SUGGEST SOME SOLUTION

Thanks & Regards,
Renu Yadav


Re: SparkStreaming variable scope

2015-12-09 Thread Harsh J
> and then calling getRowID() in the lambda, because the function gets sent
to the executor right?

Yes, that is correct (vs. a one time evaluation, as was with your
assignment earlier).

On Thu, Dec 10, 2015 at 3:34 AM Pinela  wrote:

> Hey Bryan,
>
> Thank for the answer ;) I knew it was a basic python/spark-noob thing :)
>
> this also worked
>
> *def getRowID():*
> * return datetime.now().strftime("%Y%m%d%H%M%S")*
>
>
> and then calling getRowID() in the lambda, because the function gets sent
> to the executor right?
>
> Thanks again for the quick reply :)
>
> All the best and Happy Holidays.
> Jpinela.
>
>
>
> On Wed, Dec 9, 2015 at 8:22 PM, Bryan Cutler  wrote:
>
>> rowid from your code is a variable in the driver, so it will be evaluated
>> once and then only the value is sent to words.map.  You probably want to
>> have rowid be a lambda itself, so that it will get the value at the time it
>> is evaluated.  For example if I have the following:
>>
>> >>> data = sc.parallelize([1,2,3])
>> >>> from datetime import datetime
>> >>> rowid = lambda: datetime.now().strftime("%Y%m%d%H%M%S")
>> >>> data.map(lambda x: (rowid(), x))
>> >>> mdata = data.map(lambda x: (rowid(), x))
>> >>> mdata.collect()
>> [('20151209121532', 1), ('20151209121532', 2), ('20151209121532', 3)]
>> >>> mdata.collect()
>> [('20151209121540', 1), ('20151209121540', 2), ('20151209121540', 3)]
>>
>> here rowid is evaluated whenever an action is called on the RDD, i.e.
>> collect
>>
>> On Wed, Dec 9, 2015 at 10:23 AM, jpinela  wrote:
>>
>>> Hi Guys,
>>> I am sure this is a simple question, but I can't find it in the docs
>>> anywhere.
>>> This reads from flume and writes to hbase (as you can see).
>>> But has a variable scope problem (I believe).
>>> I have the following code:
>>>
>>> *
>>> from pyspark.streaming import StreamingContext
>>> from pyspark.streaming.flume import FlumeUtils
>>> from datetime import datetime
>>> ssc = StreamingContext(sc, 5)
>>> conf = {"hbase.zookeeper.quorum": "ubuntu3",
>>> "hbase.mapred.outputtable": "teste2",
>>> "mapreduce.outputformat.class":
>>> "org.apache.hadoop.hbase.mapreduce.TableOutputFormat",
>>> "mapreduce.job.output.key.class":
>>> "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
>>> "mapreduce.job.output.value.class":
>>> "org.apache.hadoop.io.Writable"}
>>>
>>>
>>> keyConv =
>>>
>>> "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
>>> valueConv =
>>> "org.apache.spark.examples.pythonconverters.StringListToPutConverter"
>>>
>>> lines = FlumeUtils.createStream(ssc, 'ubuntu3', 9997)
>>> words = lines.map(lambda line: line[1])
>>> rowid = datetime.now().strftime("%Y%m%d%H%M%S")
>>> outrdd= words.map(lambda x: (str(1),[rowid,"cf1desc","col1",x]))
>>> print("ok 1")
>>> outrdd.pprint()
>>>
>>> outrdd.foreachRDD(lambda x:
>>>
>>> x.saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv))
>>>
>>> ssc.start()
>>> ssc.awaitTermination()*
>>>
>>> the issue is that the rowid variable is allways at the point that the
>>> streaming was began.
>>> How can I go around this? I tried a function, an application, nothing
>>> worked.
>>> Thank you.
>>> jp
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/SparkStreaming-variable-scope-tp25652.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: Spark Stream Monitoring with Kafka Direct API

2015-12-09 Thread Saisai Shao
I think this is the right JIRA to fix this issue (
https://issues.apache.org/jira/browse/SPARK-7111). It should be in Spark
1.4.

On Thu, Dec 10, 2015 at 12:32 AM, Cody Koeninger  wrote:

> Looks like probably
>
> https://issues.apache.org/jira/browse/SPARK-8701
>
> so 1.5.0
>
> On Wed, Dec 9, 2015 at 10:25 AM, Dan Dutrow  wrote:
>
>> I'm on spark version 1.4.1. I couldn't find documentation that said it
>> was fixed, so I thought maybe it was still an open issue. Any idea what the
>> fix version is?
>>
>> On Wed, Dec 9, 2015 at 11:10 AM Cody Koeninger 
>> wrote:
>>
>>> Which version of spark are you on?  I thought that was added to the
>>> spark UI in recent versions.
>>>
>>> DIrect api doesn't have any inherent interaction with zookeeper.  If you
>>> need number of messages per batch and aren't on a recent enough version of
>>> spark to see them in the ui, you can get them programatically from the
>>> offset ranges.  See the definition of count() in recent versions of
>>> KafkaRDD for an example.
>>>
>>> On Wed, Dec 9, 2015 at 9:39 AM, Dan Dutrow  wrote:
>>>
 Is there documentation for how to update the metrics (#messages per
 batch) in the Spark Streaming tab when using the Direct API? Does the
 Streaming tab get its information from Zookeeper or something else
 internally?
 --
 Dan ✆

>>>
>>> --
>> Dan ✆
>>
>
>


Spark 1.5.2 error on quitting spark in windows 7

2015-12-09 Thread skypickle
If I start spark-shell then just quit, I get an error.


scala> :q
Stopping spark context.
15/12/09 23:43:32 ERROR ShutdownHookManager: Exception while deleting Spark
temp dir:
C:\Users\Stefan\AppData\Local\Temp\spark-68d3a813-9c55-4649-aa7a-5fc269e669e7
java.io.IOException: Failed to delete:
C:\Users\Stefan\AppData\Local\Temp\spark-68d3a813-9c55-4649-aa7a-5fc269e669e7
at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:884)

*So, if u use winutils to examine the directory:*

C:\Users\Stefan\AppData\Local\Temp>winutils ls
spark-cb325426-4a3c-48ec-becc-baaa077bea1f
drwx-- 1 BloomBear-SSD\Stefan BloomBear-SSD\None 0 Dec 10 2015
spark-cb325426-4a3c-48ec-becc-baaa077bea1f

*I interpret this to mean that the OWNER has read/write/execute privs on
this folder.
So why does scala have a problem deleting it?

Just for fun I also installed a set of windows executables that are ports of
common UNIX utilities -
http://sourceforge.net/projects/unxutils/?source=typ_redirect

So now I can run a command like ls and get*

C:\Users\Stefan\AppData\Local\Temp>ls -al
total 61
drwxrwxrwx   1 user group   0 Dec  9 23:44 .
drwxrwxrwx   1 user group   0 Dec  9 22:27 ..
drwxrwxrwx   1 user group   0 Dec  9 23:43
61135062-623a-4624-b406-fbd0ae9308ae_resources
drwxrwxrwx   1 user group   0 Dec  9 23:43
9cc17e8c-2941-4768-9f55-e740e54dab0b_resources
-rw-rw-rw-   1 user group   0 Sep  4  2013
FXSAPIDebugLogFile.txt
drwxrwxrwx   1 user group   0 Dec  9 23:43 Stefan
-rw-rw-rw-   1 user group   16400 Dec  9 21:07
etilqs_3SQb9MejUX0BHwy
-rw-rw-rw-   1 user group2052 Dec  9 21:41
etilqs_8YWZWJEClIYRrKf
drwxrwxrwx   1 user group   0 Dec  9 23:43 hsperfdata_Stefan
-rw-rw-rw-   1 user group   19968 Dec  9 23:09
jansi-64-1-8475478299913367674.11
-rw-rw-rw-   1 user group   18944 Dec  9 23:43 jansi-64-1.5.2.dll
-rw-rw-rw-   1 user group2031 Dec  9 23:15
sbt3359615202868869571.log
drwxrwxrwx   1 user group   0 Dec  9 23:43
spark-68d3a813-9c55-4649-aa7a-5fc269e669e7

*Now the spark directory is being seen by windows as fully readable by
EVERYONE.
In any event, can someone enlighten me about their environment to avoid this
irritating error. Here is my environment:
*

windows 7 64 bit
Spark 1.5.2
Scala 2.10.6
Python 2.7.10 (from Anaconda)

PATH includes:
C:\Users\Stefan\spark-1.5.2-bin-hadoop2.6\bin
C:\ProgramData\Oracle\Java\javapath
C:\Users\Stefan\scala
C:\Users\Stefan\hadoop-2.6.0\bin
C:\ProgramData\Oracle\Java\javapath

SYSTEM variables set are:
SPARK_HOME=C:\Users\Stefan\spark-1.5.2-bin-hadoop2.6
JAVA_HOME=C:\Program Files\Java\jre1.8.0_65
HADOOP_HOME=C:\Users\Stefan\hadoop-2.6.0
(where the bin\winutils resides)
winutils.exe chmod 777 /tmp/hive

\tmp\hive directory at root on C; drive with full permissions,
e.g.
>winutils ls \tmp\hive
drwxrwxrwx 1 BloomBear-SSD\Stefan BloomBear-SSD\None 0 Dec  8 2015 \tmp\hive




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-5-2-error-on-quitting-spark-in-windows-7-tp25659.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Filtering records based on empty value of column in SparkSql

2015-12-09 Thread Gokula Krishnan D
Please refer the link and drop() provides features to drop the rows with
Null / Non-Null columns. Hope, it also helps.

https://spark.apache.org/docs/1.5.2/api/scala/index.html#org.apache.spark.sql.DataFrameNaFunctions



Thanks & Regards,
Gokula Krishnan* (Gokul)*

On Wed, Dec 9, 2015 at 11:12 AM, Gokula Krishnan D 
wrote:

> Ok, then you can slightly change like
>
> [image: Inline image 1]
>
> Thanks & Regards,
> Gokula Krishnan* (Gokul)*
>
>
> On Wed, Dec 9, 2015 at 11:09 AM, Prashant Bhardwaj <
> prashant2006s...@gmail.com> wrote:
>
>> I have to do opposite of what you're doing. I have to filter non-empty
>> records.
>>
>> On Wed, Dec 9, 2015 at 9:33 PM, Gokula Krishnan D 
>> wrote:
>>
>>> Hello Prashant -
>>>
>>> Can you please try like this :
>>>
>>> For the instance, input file name is "student_detail.txt" and
>>>
>>> ID,Name,Sex,Age
>>> ===
>>> 101,Alfred,Male,30
>>> 102,Benjamin,Male,31
>>> 103,Charlie,Female,30
>>> 104,Julie,Female,30
>>> 105,Maven,Male,30
>>> 106,Dexter,Male,30
>>> 107,Lundy,Male,32
>>> 108,Rita,Female,30
>>> 109,Aster,Female,30
>>> 110,Harrison,Male,15
>>> 111,Rita,,30
>>> 112,Aster,,30
>>> 113,Harrison,,15
>>> 114,Rita,Male,20
>>> 115,Aster,,30
>>> 116,Harrison,,20
>>>
>>> [image: Inline image 2]
>>>
>>> *Output:*
>>>
>>> Total No.of Records without SEX 5
>>> [111,Rita,,30]
>>> [112,Aster,,30]
>>> [113,Harrison,,15]
>>> [115,Aster,,30]
>>> [116,Harrison,,20]
>>>
>>> Total No.of Records with AGE <=15 2
>>> [110,Harrison,Male,15]
>>> [113,Harrison,,15]
>>>
>>> Thanks & Regards,
>>> Gokula Krishnan* (Gokul)*
>>> Contact :+1 980-298-1740
>>>
>>> On Wed, Dec 9, 2015 at 8:24 AM, Prashant Bhardwaj <
>>> prashant2006s...@gmail.com> wrote:
>>>
 Already tried it. But getting following error.

 overloaded method value filter with alternatives: (conditionExpr:
 String)org.apache.spark.sql.DataFrame  (condition:
 org.apache.spark.sql.Column)org.apache.spark.sql.DataFrame cannot be
 applied to (Boolean)

 Also tried:

 val req_logs_with_dpid = 
 req_logs.filter(req_logs("req_info.dpid").toString.length
 != 0 )

 But getting same error.


 On Wed, Dec 9, 2015 at 6:45 PM, Fengdong Yu 
 wrote:

> val req_logs_with_dpid = req_logs.filter(req_logs("req_info.pid") !=
> "" )
>
> Azuryy Yu
> Sr. Infrastructure Engineer
>
> cel: 158-0164-9103
> wetchat: azuryy
>
>
> On Wed, Dec 9, 2015 at 7:43 PM, Prashant Bhardwaj <
> prashant2006s...@gmail.com> wrote:
>
>> Hi
>>
>> I have two columns in my json which can have null, empty and
>> non-empty string as value.
>> I know how to filter records which have non-null value using
>> following:
>>
>> val req_logs = sqlContext.read.json(filePath)
>>
>> val req_logs_with_dpid = req_log.filter("req_info.dpid is not null
>> or req_info.dpid_sha1 is not null")
>>
>> But how to filter if value of column is empty string?
>> --
>> Regards
>> Prashant
>>
>
>


 --
 Regards
 Prashant

>>>
>>>
>>
>>
>> --
>> Regards
>> Prashant
>>
>
>


Re: HTTP Source for Spark Streaming

2015-12-09 Thread Vijay Gharge
I am not sure if spark natively support this functionality. Custom poller
class can query HTTP resources as per configured interval and dump it on
HDFS / other stores in csv or json format. Using lambda arch (aws) or
invoking sc context you can use these values for further processing

On Thursday 10 December 2015, Sourav Mazumder 
wrote:

> What I want is to be able to connect to a source system which exposes data
> stream over http.
>
> Essentially a Reciever in onStart() should starting a threads which will
> access a http uri to get the data say after every 10 seconds. There can be
> multiple such Recievers each polling different http uris. An example can be
> accessing a http uri for inventory update and sales update in parallel.
>
> Regards,
> Sourav
>
> On Wed, Dec 9, 2015 at 7:53 PM, Vijay Gharge  > wrote:
>
>> Not very clear. Can you elaborate your use case ?
>>
>> On Thursday 10 December 2015, Sourav Mazumder <
>> sourav.mazumde...@gmail.com
>> > wrote:
>>
>>> Hi All,
>>>
>>> Currently is there a way using which one can connect to a http server to
>>> get data as a dstream at a given frequency ?
>>>
>>> Or one has to write own utility for the same ?
>>>
>>> Regards,
>>> Sourav
>>>
>>
>>
>> --
>> Regards,
>> Vijay Gharge
>>
>>
>>
>>
>

-- 
Regards,
Vijay Gharge


Re: Multiple drivers, same worker

2015-12-09 Thread andresb...@gmail.com
Forgot to mention that it doesn't happen every time, it's pretty random so
far. We've have complete days when it behaves just fine and others when it
gets crazy. We're using spark 1.5.2

2015-12-09 13:33 GMT-06:00 andresb...@gmail.com :

> Hi everyone,
>
> We've been getting an issue with spark lately where multiple drivers are
> assigned to a same worker but resources are never assigned to them and get
> "stuck" forever.
>
> If I login in the worker machine I see that the driver processes aren't
> really running and the worker's log don't show any error or anything
> related to the driver. The master UI does show the drivers as submitted and
> in RUNNING state.
>
>
> Not sure where else to look for clues, any ideas?
>
> --
> Andrés Blanco Morales
>



-- 
Andrés Blanco Morales


Re: can i process multiple batch in parallel in spark streaming

2015-12-09 Thread Ted Yu
Have you seen this thread ?

http://search-hadoop.com/m/q3RTtgSGrobJ3Je

On Wed, Dec 9, 2015 at 11:12 AM, prateek arora 
wrote:

> Hi
>
> when i run my spark streaming application .. following information show on
> application streaming UI.
> i am using spark 1.5.0
>
>
> Batch Time  Input Size   Scheduling Delay (?) Processing Time
> (?)
> Status
> 2015/12/09 11:00:42 107 events  -   -
>  queued
> 2015/12/09 11:00:41 103 events  -   -
>  queued
> 2015/12/09 11:00:40 107 events  -   -
>  queued
> 2015/12/09 11:00:39 105 events  -   -
>  queued
> 2015/12/09 11:00:38 109 events  -   -
>  queued
> 2015/12/09 11:00:37 106 events  -   -
>  queued
> 2015/12/09 11:00:36 109 events  -   -
>  queued
> 2015/12/09 11:00:35 113 events  -   -
>  queued
> 2015/12/09 11:00:34 109 events  -   -
>  queued
> 2015/12/09 11:00:33 107 events  -   -
>  queued
> 2015/12/09 11:00:32 99 events   42 s-
>  processing
>
>
>
> it seems batches push into queue and work like FIFO manner  . is it
> possible
> all my Active batches start processing in parallel.
>
> Regards
> Prateek
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/can-i-process-multiple-batch-in-parallel-in-spark-streaming-tp25653.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: RDD.isEmpty

2015-12-09 Thread Sean Owen
On Wed, Dec 9, 2015 at 7:49 PM, Pat Ferrel  wrote:
> The “Any” is required by the code it is being passed to, which is the
> Elasticsearch Spark index writing code. The values are actually RDD[(String,
> Map[String, String])]

(Is it frequently a big big map by any chance?)

> No shuffle that I know of. RDDs are created from the output of Mahout
> SimilarityAnalysis.cooccurrence and are turned into RDD[(String, Map[String,
> String])], Since all actual values are simple there is no serialization
> except for standard Java/Scala so no custom serializers or use of Kryo.

It's still worth looking at the stages in the job.


> I understand that the driver can’t know, I was suggesting that isEmpty could
> be backed by a boolean RDD member variable calculated for every RDD at
> creation time in Spark. This is really the only way to solve generally since
> sometimes you get an RDD from a lib, so wrapping it as I suggested is not
> practical, it would have to be in Spark. BTW the same approach could be used
> for count, an accumulator per RDD, then returned as a pre-calculated RDD
> state value.

What would the boolean do? you can't cache the size in general even if
you know it, but you don't know it at the time the RDD is created
(before it's evaluated).


> Are you suggesting that both take(1) and isEmpty are unusable for some
> reason in my case? I can pass around this information if I have to, I just
> thought the worst case was O(n) where n was number of partitions and
> therefor always trivial to calculate.

No, just trying to guess at reasons you observe what you do. There's
no difference between isEmpty and take(1) if there are > 0 partitions,
so if they behave very differently it's something to do with what
you're measuring and how.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: RDD.isEmpty

2015-12-09 Thread Pat Ferrel
The “Any” is required by the code it is being passed to, which is the 
Elasticsearch Spark index writing code. The values are actually RDD[(String, 
Map[String, String])]

No shuffle that I know of. RDDs are created from the output of Mahout 
SimilarityAnalysis.cooccurrence and are turned into RDD[(String, Map[String, 
String])], Since all actual values are simple there is no serialization except 
for standard Java/Scala so no custom serializers or use of Kryo.

I understand that the driver can’t know, I was suggesting that isEmpty could be 
backed by a boolean RDD member variable calculated for every RDD at creation 
time in Spark. This is really the only way to solve generally since sometimes 
you get an RDD from a lib, so wrapping it as I suggested is not practical, it 
would have to be in Spark. BTW the same approach could be used for count, an 
accumulator per RDD, then returned as a pre-calculated RDD state value.

Are you suggesting that both take(1) and isEmpty are unusable for some reason 
in my case? I can pass around this information if I have to, I just thought the 
worst case was O(n) where n was number of partitions and therefor always 
trivial to calculate.

This would be 0 time on the timeline if I explicitly keep track os RDD size 
with accumulators (just slightly slower .map), so is this my best path?


On Dec 9, 2015, at 10:06 AM, Sean Owen  wrote:

Yes but what is the code that generates the RDD? is it a shuffle of something? 
that could cause checking for any element to be expensive since computing the 
RDD at all is expensive. Look at the stages in these long-running jobs.

How could isEmpty not be distributed? the driver can't know whether the RDD's 
partitions are empty without evaluating at least one of them a little bit 
(unless there are 0 partitions). Caching the size doesn't help unless, well, 
you know the size already because the RDD was fully computed. And it might get 
weird anyway since RDDs are only as deterministic as their source -- counting 
lines of a text file will return a different number if the text file is 
appended to.

The only thing that sticks out is the time to serialize one value back to the 
driver. I don't know what your "Any" is there but could it be big or hard to 
serialize?

Really there's a little gotcha in this implementation: you can only check 
isEmpty on an RDD of serializable objects! which is a pretty good assumption; 
you won't get far with an RDD of something unserializable but it's not 
impossible for it to come up.

The serialization could be avoided by mapping everything to "1" or something 
and take-ing *that*. Returning a 1 to the driver is trivial. Or maybe adapt 
some version of the implementation of take() to be an optimized, smarter 
isEmpty(). Neither seemed worth the overhead at the time, but this could be a 
case against that, if it turns out somehow to be serialization time.


On Wed, Dec 9, 2015 at 5:55 PM, Pat Ferrel > wrote:
Err, compiled for Spark 1.3.1, running on 1.5.1 if that makes any difference. 
The Spark impl is “provided” so should be using 1.5.1 code afaik.

The code is as you see below for isEmpty, so not sure what else could it could 
be measuring since it’s the only spark thing on the line. I can regen the 
timeline but here is the .take(1) timeline. It is an order of magnitude faster 
(from my recollection) but even the take(1) still seems incredibly slow for an 
empty test. I was surprised that isEmpty is a distributed calc. When run from 
the driver this value could have already been calculated as a byproduct of 
creating the RDD, no?

I could use an accumulator to count members as the RDD is created and get a 
negligible .isEmpty calc time, right? The RDD creation might be slightly slower 
due to using an accumulator.






On Dec 9, 2015, at 9:29 AM, Sean Owen > wrote:

Are you sure it's isEmpty? and not an upstream stage? isEmpty is
definitely the action here.  It doesn't make sense that take(1) is so
much faster since it's the "same thing".

On Wed, Dec 9, 2015 at 5:11 PM, Pat Ferrel > wrote:
> Sure, I thought this might be a known issue.
> 
> I have a 122M dataset, which is the trust and rating data from epinions. The 
> data is split into two RDDs and there is an item properties RDD. The code is 
> just trying to remove any empty RDD from the list.
> 
> val esRDDs: List[RDD[(String, Map[String, Any])]] =
>  (correlators ::: properties).filterNot( c => c.isEmpty())
> 
> On my 16G MBP with 4g per executor and 4 executors the IsEmpty takes over a 
> hundred minutes (going from memory, I can supply the timeline given a few 
> hours to recalc it).
> 
> Running a different version of the code that does a .count for debug and 
> .take(1) instead of the .isEmpty the count of one epinions RDD take 8 minutes 
> and the .take(1) uses 3 

Re: RDD.isEmpty

2015-12-09 Thread Pat Ferrel
Sure, I thought this might be a known issue.

I have a 122M dataset, which is the trust and rating data from epinions. The 
data is split into two RDDs and there is an item properties RDD. The code is 
just trying to remove any empty RDD from the list.

val esRDDs: List[RDD[(String, Map[String, Any])]] =
  (correlators ::: properties).filterNot( c => c.isEmpty())

On my 16G MBP with 4g per executor and 4 executors the IsEmpty takes over a 
hundred minutes (going from memory, I can supply the timeline given a few hours 
to recalc it). 

Running a different version of the code that does a .count for debug and 
.take(1) instead of the .isEmpty the count of one epinions RDD take 8 minutes 
and the .take(1) uses 3 minutes.

Other users have seen total runtime on 13G dataset of 700 minutes with the 
execution time mostly spent in isEmpty.


On Dec 9, 2015, at 8:50 AM, Sean Owen  wrote:

It should at best collect 1 item to the driver. This means evaluating
at least 1 element of 1 partition. I can imagine pathological cases
where that's slow, but, do you have any more info? how slow is slow
and what is slow?

On Wed, Dec 9, 2015 at 4:41 PM, Pat Ferrel  wrote:
> I’m getting *huge* execution times on a moderate sized dataset during the
> RDD.isEmpty. Everything in the calculation is fast except an RDD.isEmpty
> calculation. I’m using Spark 1.5.1 and from researching I would expect this
> calculation to be linearly proportional to the number of partitions as a
> worst case, which should be a trivial amount of time but it is taking many
> minutes to hours to complete this single phase.
> 
> I know that has been a small amount of discussion about using this so would
> love to hear what the current thinking on the subject is. Is there a better
> way to find if an RDD has data? Can someone explain why this is happening?
> 
> reference PR
> https://github.com/apache/spark/pull/4534

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Content based window operation on Time-series data

2015-12-09 Thread Sean Owen
CC Sandy as his https://github.com/cloudera/spark-timeseries might be
of use here.

On Wed, Dec 9, 2015 at 4:54 PM, Arun Verma  wrote:
> Hi all,
>
> We have RDD(main) of sorted time-series data. We want to split it into
> different RDDs according to window size and then perform some aggregation
> operation like max, min etc. over each RDD in parallel.
>
> If window size is w then ith RDD has data from (startTime + (i-1)*w) to
> (startTime + i*w) where startTime is time-stamp of 1st entry in main RDD and
> (startTime + (i-1)*w) is greater then last entry of main RDD.
>
> For now, I am using DataFrame and Spark version 1.5.2. Below code is running
> sequentially on the data, so execution time is high and resource utilization
> is low. Code snippet is given below:
> /*
> * aggragator is max
> * df - Dataframe has sorted timeseries data
> * start - first entry of DataFrame
> * end - last entry of DataFrame df
> * bucketLengthSec - window size
> * stepResults - has particular block/window output(JSON)
> * appendResults - has output till this block/window(JSON)
> */
> while (start <= end) {
> row = df.filter(df.col("timeStamp")
> .between(start, nextStart))
> .agg(max(df.col("timeStamp")), max(df.col("value")))
> .first();
> if (row.get(0) != null) {
> stepResults = new JSONObject();
> stepResults.put("x", Long.parseLong(row.get(0).toString()));
> stepResults.put("y", row.get(1));
> appendResults.add(stepResults);
> }
> start = nextStart;
> nextStart = start + bucketLengthSec;
> }
>
>
> --
> Thanks and Regards,
> Arun Verma

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Content based window operation on Time-series data

2015-12-09 Thread Arun Verma
Thank you for your reply. It is a Scala and Python library. Is similar
library exists for Java?

On Wed, Dec 9, 2015 at 10:26 PM, Sean Owen  wrote:

> CC Sandy as his https://github.com/cloudera/spark-timeseries might be
> of use here.
>
> On Wed, Dec 9, 2015 at 4:54 PM, Arun Verma 
> wrote:
> > Hi all,
> >
> > We have RDD(main) of sorted time-series data. We want to split it into
> > different RDDs according to window size and then perform some aggregation
> > operation like max, min etc. over each RDD in parallel.
> >
> > If window size is w then ith RDD has data from (startTime + (i-1)*w) to
> > (startTime + i*w) where startTime is time-stamp of 1st entry in main RDD
> and
> > (startTime + (i-1)*w) is greater then last entry of main RDD.
> >
> > For now, I am using DataFrame and Spark version 1.5.2. Below code is
> running
> > sequentially on the data, so execution time is high and resource
> utilization
> > is low. Code snippet is given below:
> > /*
> > * aggragator is max
> > * df - Dataframe has sorted timeseries data
> > * start - first entry of DataFrame
> > * end - last entry of DataFrame df
> > * bucketLengthSec - window size
> > * stepResults - has particular block/window output(JSON)
> > * appendResults - has output till this block/window(JSON)
> > */
> > while (start <= end) {
> > row = df.filter(df.col("timeStamp")
> > .between(start, nextStart))
> > .agg(max(df.col("timeStamp")), max(df.col("value")))
> > .first();
> > if (row.get(0) != null) {
> > stepResults = new JSONObject();
> > stepResults.put("x", Long.parseLong(row.get(0).toString()));
> > stepResults.put("y", row.get(1));
> > appendResults.add(stepResults);
> > }
> > start = nextStart;
> > nextStart = start + bucketLengthSec;
> > }
> >
> >
> > --
> > Thanks and Regards,
> > Arun Verma
>



-- 
Thanks and Regards,
Arun Verma