Rdd Partitions issue

2015-10-15 Thread Renu Yadav
I am reading parquet file from a dir which has 400 file of max 180M size
so while reading my partition should be 400 as split size is 256 M in my
case

But it is taking 787 partiition .Why is it so?

Please help.

Thanks,
Renu


Fwd: Get the previous state string

2015-10-15 Thread Yogesh Vyas
-- Forwarded message --
From: Yogesh Vyas 
Date: Thu, Oct 15, 2015 at 6:08 PM
Subject: Get the previous state string
To: user@spark.apache.org


Hi,
I am new to Spark and was trying to do some experiments with it.

I had a JavaPairDStream RDD.
I want to get the list of string from its previous state. For that I
use updateStateByKey function as follows:

final Function2,
Optional> updateFunc =
   new Function2,
Optional>() {

public Optional call(List arg0,
Optional arg1) throws Exception {
// TODO Auto-generated method stub
if(arg1.toString()==null)
   return Optional.of(arg0);
else {
   arg0.add(arg1.toString());
   return Optional.of(arg0);
}
   }
};

I want the function to append the new list of string to the previous
list and return the new list. But I am not able to do so. I am getting
the " java.lang.UnsupportedOperationException" error.
Can anyone which help me out in getting the desired output?

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SPARK SQL Error

2015-10-15 Thread Giridhar Maddukuri
Hi Dilip,

I tried this option also spark-submit --master yarn --class
org.spark.apache.CsvDataSource /home/cloudera/Desktop/TestMain.jar  --files
hdfs://quickstart.cloudera:8020/people_csv & getting similar error

Exception in thread "main" org.apache.spark.SparkException: Could not parse
Master URL: '--files'
 at
org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2244)
 at org.apache.spark.SparkContext.(SparkContext.scala:361)
 at org.apache.spark.SparkContext.(SparkContext.scala:154)
 at org.spark.apache.CsvDataSource$.main(CsvDataSource.scala:10)
 at org.spark.apache.CsvDataSource.main(CsvDataSource.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
 at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
 at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Thanks & Regards,
Giri.


On Thu, Oct 15, 2015 at 5:43 PM, Dilip Biswal  wrote:

> Hi Giri,
>
> You are perhaps  missing the "--files" option before the supplied hdfs
> file name ?
>
> spark-submit --master yarn --class org.spark.apache.CsvDataSource
> /home/cloudera/Desktop/TestMain.jar  *--files*
> hdfs://quickstart.cloudera:8020/people_csv
>
> Please refer to Ritchard's comments on why the --files option may be
> redundant in
> your case.
>
> Regards,
> Dilip Biswal
> Tel: 408-463-4980
> dbis...@us.ibm.com
>
>
>
> From:Giri 
> To:user@spark.apache.org
> Date:10/15/2015 02:44 AM
> Subject:Re: SPARK SQL Error
> --
>
>
>
> Hi Ritchard,
>
> Thank you so much  again for your input.This time I ran the command in the
> below way
> spark-submit --master yarn --class org.spark.apache.CsvDataSource
> /home/cloudera/Desktop/TestMain.jar
> hdfs://quickstart.cloudera:8020/people_csv
> But I am facing the new error "Could not parse Master URL:
> 'hdfs://quickstart.cloudera:8020/people_csv'"
> file path is correct
>
> hadoop fs -ls hdfs://quickstart.cloudera:8020/people_csv
> -rw-r--r--   1 cloudera supergroup 29 2015-10-10 00:02
> hdfs://quickstart.cloudera:8020/people_csv
>
> Can you help me to fix this new error
>
> 15/10/15 02:24:39 INFO spark.SparkContext: Added JAR
> file:/home/cloudera/Desktop/TestMain.jar at
> http://10.0.2.15:40084/jars/TestMain.jarwith timestamp 1444901079484
> Exception in thread "main" org.apache.spark.SparkException: Could not parse
> Master URL: 'hdfs://quickstart.cloudera:8020/people_csv'
> at
>
> org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2244)
> at
> org.apache.spark.SparkContext.(SparkContext.scala:361)
> at
> org.apache.spark.SparkContext.(SparkContext.scala:154)
> at
> org.spark.apache.CsvDataSource$.main(CsvDataSource.scala:10)
> at org.spark.apache.CsvDataSource.main(CsvDataSource.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
>
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
> at
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
> at
> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
> at
> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
> at
> org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
>
> Thanks & Regards,
> Giri.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/SPARK-SQL-Error-tp25050p25075.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>
>


Re: org.apache.spark.sql.AnalysisException with registerTempTable

2015-10-15 Thread Yusuf Can Gürkan
Also, i create first table with this code:

val landingDF = 
streamingJsonDF.selectExpr("get_json_object(json,'$.partner_id') partnerid",
  
"from_unixtime(cast(get_json_object(json,'$.date') as int)+ 60 * 60 * 3) date",
  "get_json_object(json,'$.referrer') 
referrer",
  
"get_json_object(json,'$.current_url') currenturl",
  "get_json_object(json,'$.page_type') 
pagetype",
  
"get_json_object(json,'$.product.category') productcategory",
  
"get_json_object(json,'$.product.name') productname",
  
"get_json_object(json,'$.product.imageUrl') productimageurl",
  
"get_json_object(json,'$.product.productUrl') producturl",
  
"get_json_object(json,'$.product.size') productsize",
  
"get_json_object(json,'$.product.price.value') productprice",
  
"get_json_object(json,'$.product.price.currency') currency",
  
"get_json_object(json,'$.cart_amount') cartamount",
  "get_json_object(json,'$.searchs') 
searchs",
  "get_json_object(json,'$.category') 
category",
  "get_json_object(json,'$.user_id') 
userid",
  "get_json_object(json,'$.id') 
landingid",
  "get_json_object(json,'$._parent') 
sessionid",
  "get_json_object(json,'$.doc_id') 
docid")
.filter("sessionid is not NULL and docid is 
NULL").filter("partnerid = '1237'").selectExpr("*",

  "year(date) year",

  "month(date) month",

  "day(date) day",

  "hour(date) hour",

  "concat_ws('-',cast(year(date) as 
string),cast(month(date) as string)) dt")

landingDF.registerTempTable("l2p_landing")

> On 15 Oct 2015, at 14:22, Yusuf Can Gürkan  wrote:
> 
> Hello,
> 
> I’m running some spark sql queries with registerTempTable function. But i get 
> below error:
> 
> org.apache.spark.sql.AnalysisException: resolved attribute(s) 
> day#1680,year#1678,dt#1682,month#1679,hour#1681 missing from 
> searchs#1670,productsize#1666,referrer#1659,day#1653,sessionid#1674,partnerid#1657,producturl#1665,productprice#1667,docid#1675,userid#1672,landingid#1673,year#1651,productcategory#1662,month#1652,pagetype#1661,productimageurl#1664,currency#1668,cartamount#1669,productname#1663,hour#1654,category#1671,dt#1655,date#1658,currenturl#1660
>  in operator !Project 
> [partnerid#1657,date#1658,referrer#1659,currenturl#1660,pagetype#1661,productcategory#1662,productname#1663,productimageurl#1664,producturl#1665,productsize#1666,productprice#1667,currency#1668,cartamount#1669,searchs#1670,category#1671,userid#1672,landingid#1673,sessionid#1674,docid#1675,year#1678,month#1679,day#1680,hour#1681,dt#1682];
> at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
> 
> 
> However, when i use “create table” in sql query instead of calling 
> registerTempTable function it does not give error. But create table way is 
> much slower than “registerTempTable”
> 
> Do you have any idea why it gives error when i use “registerTempTable”?
> 
> My query which gives error is below, also this query uses tables which are 
> created with “registerTempTable”:
> 
> sqlContext.sql(
>   """
> |select sessionid,date,nexteventdate, 
> unix_timestamp(nexteventdate)-unix_timestamp(date) as duration
> |from (select t1.sessionid,t1.date, min(t2.date) nexteventdate
> |from partner t1 join partner t2 on t1.sessionid=t2.sessionid where 
> unix_timestamp(t2.date)>unix_timestamp(t1.date)
> |group by t1.sessionid,t1.date) inq
>   """.stripMargin).registerTempTable("partner_duration")
> 
> 
> 



Re: SPARK SQL Error

2015-10-15 Thread Dilip Biswal
Hi Giri,

You are perhaps  missing the "--files" option before the supplied hdfs 
file name ?

spark-submit --master yarn --class org.spark.apache.CsvDataSource
/home/cloudera/Desktop/TestMain.jar  --files 
hdfs://quickstart.cloudera:8020/people_csv

Please refer to Ritchard's comments on why the --files option may be 
redundant in 
your case. 

Regards,
Dilip Biswal
Tel: 408-463-4980
dbis...@us.ibm.com



From:   Giri 
To: user@spark.apache.org
Date:   10/15/2015 02:44 AM
Subject:Re: SPARK SQL Error



Hi Ritchard,

Thank you so much  again for your input.This time I ran the command in the
below way
spark-submit --master yarn --class org.spark.apache.CsvDataSource
/home/cloudera/Desktop/TestMain.jar 
hdfs://quickstart.cloudera:8020/people_csv
But I am facing the new error "Could not parse Master URL:
'hdfs://quickstart.cloudera:8020/people_csv'"
file path is correct
 
hadoop fs -ls hdfs://quickstart.cloudera:8020/people_csv
-rw-r--r--   1 cloudera supergroup 29 2015-10-10 00:02
hdfs://quickstart.cloudera:8020/people_csv

Can you help me to fix this new error

15/10/15 02:24:39 INFO spark.SparkContext: Added JAR
file:/home/cloudera/Desktop/TestMain.jar at
http://10.0.2.15:40084/jars/TestMain.jar with timestamp 1444901079484
Exception in thread "main" org.apache.spark.SparkException: Could not 
parse
Master URL: 'hdfs://quickstart.cloudera:8020/people_csv'
 at
org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2244)
 at 
org.apache.spark.SparkContext.(SparkContext.scala:361)
 at 
org.apache.spark.SparkContext.(SparkContext.scala:154)
 at 
org.spark.apache.CsvDataSource$.main(CsvDataSource.scala:10)
 at 
org.spark.apache.CsvDataSource.main(CsvDataSource.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
 at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
 at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
 at 
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
 at 
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
 at 
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


Thanks & Regards,
Giri.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SPARK-SQL-Error-tp25050p25075.html

Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org






org.apache.spark.sql.AnalysisException with registerTempTable

2015-10-15 Thread Yusuf Can Gürkan
Hello,

I’m running some spark sql queries with registerTempTable function. But i get 
below error:

org.apache.spark.sql.AnalysisException: resolved attribute(s) 
day#1680,year#1678,dt#1682,month#1679,hour#1681 missing from 
searchs#1670,productsize#1666,referrer#1659,day#1653,sessionid#1674,partnerid#1657,producturl#1665,productprice#1667,docid#1675,userid#1672,landingid#1673,year#1651,productcategory#1662,month#1652,pagetype#1661,productimageurl#1664,currency#1668,cartamount#1669,productname#1663,hour#1654,category#1671,dt#1655,date#1658,currenturl#1660
 in operator !Project 
[partnerid#1657,date#1658,referrer#1659,currenturl#1660,pagetype#1661,productcategory#1662,productname#1663,productimageurl#1664,producturl#1665,productsize#1666,productprice#1667,currency#1668,cartamount#1669,searchs#1670,category#1671,userid#1672,landingid#1673,sessionid#1674,docid#1675,year#1678,month#1679,day#1680,hour#1681,dt#1682];
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)


However, when i use “create table” in sql query instead of calling 
registerTempTable function it does not give error. But create table way is much 
slower than “registerTempTable”

Do you have any idea why it gives error when i use “registerTempTable”?

My query which gives error is below, also this query uses tables which are 
created with “registerTempTable”:

sqlContext.sql(
  """
|select sessionid,date,nexteventdate, 
unix_timestamp(nexteventdate)-unix_timestamp(date) as duration
|from (select t1.sessionid,t1.date, min(t2.date) nexteventdate
|from partner t1 join partner t2 on t1.sessionid=t2.sessionid where 
unix_timestamp(t2.date)>unix_timestamp(t1.date)
|group by t1.sessionid,t1.date) inq
  """.stripMargin).registerTempTable("partner_duration")





Re: Best practices to handle corrupted records

2015-10-15 Thread Roberto Congiu
I came to a similar solution to a similar problem. I deal with a lot of CSV
files from many different sources and they are often malformed.
HOwever, I just have success/failure. Maybe you should  make
SuccessWithWarnings a subclass of success, or getting rid of it altogether
making the warnings optional.
I was thinking of making this cleaning/conforming library open source if
you're interested.

R.

2015-10-15 5:28 GMT-07:00 Antonio Murgia :

> Hello,
> I looked around on the web and I couldn’t find any way to deal in a
> structured way with malformed/faulty records during computation. All I was
> able to find was the flatMap/Some/None technique + logging.
> I’m facing this problem because I have a processing algorithm that
> extracts more than one value from each record, but can fail in extracting
> one of those multiple values, and I want to keep track of them. Logging is
> not feasible because this “warning” happens so frequently that the logs
> would become overwhelming and impossibile to read.
> Since I have 3 different possible outcomes from my processing I modeled it
> with this class hierarchy:
> That holds result and/or warnings.
> Since Result implements Traversable it can be used in a flatMap,
> discarding all warnings and failure results, in the other hand, if we want
> to keep track of warnings, we can elaborate them and output them if we need.
>
> Kind Regards
> #A.M.
>



-- 
--
"Good judgment comes from experience.
Experience comes from bad judgment"
--


Re: [SQL] Memory leak with spark streaming and spark sql in spark 1.5.1

2015-10-15 Thread Shixiong Zhu
Thanks for reporting it Terry. I submitted a PR to fix it:
https://github.com/apache/spark/pull/9132

Best Regards,
Shixiong Zhu

2015-10-15 2:39 GMT+08:00 Reynold Xin :

> +dev list
>
> On Wed, Oct 14, 2015 at 1:07 AM, Terry Hoo  wrote:
>
>> All,
>>
>> Does anyone meet memory leak issue with spark streaming and spark sql in
>> spark 1.5.1? I can see the memory is increasing all the time when running
>> this simple sample:
>>
>> val sc = new SparkContext(conf)
>> val sqlContext = new HiveContext(sc)
>> import sqlContext.implicits._
>> val ssc = new StreamingContext(sc, Seconds(1))
>> val s1 = ssc.socketTextStream("localhost", ).map(x =>
>> (x,1)).reduceByKey((x : Int, y : Int) => x + y)
>> s1.print
>> s1.foreachRDD(rdd => {
>>   rdd.foreach(_ => Unit)
>>   sqlContext.createDataFrame(rdd).registerTempTable("A")
>>   sqlContext.sql("""select * from A""").show(1)
>> })
>>
>> After dump the the java heap, I can see there is about 22K entries
>> in SQLListener._stageIdToStageMetrics after 2 hour running (other maps in
>> this SQLListener has about 1K entries), is this a leak in SQLListener?
>>
>> Thanks!
>> Terry
>>
>
>


Re: Spark 1.5 Streaming and Kinesis

2015-10-15 Thread Eugen Cepoi
Hey,

A quick update on other things that have been tested.

When looking at the compiled code of the spark-streaming-kinesis-asl jar
everything looks normal (there is a class that implements SyncMap and it is
used inside the receiver).
Starting a spark shell and using introspection to instantiate a receiver
and check that blockIdToSeqNumRanges implements SyncMap works too. So
obviously it has the correct type according to that.

Another thing to test could be to do the same introspection stuff but
inside a spark job to make sure it is not a problem in the way the jobs are
run.
The other idea would be that this is a problem related to ser/de. For
example if the receiver was being serialized and then deserialized it could
definitely happen depending on the lib used and its configuration that it
just doesn't preserve the concrete type. So it would deserialize using the
compile type instead of the runtime type.

Cheers,
Eugen


2015-10-15 13:41 GMT+07:00 Jean-Baptiste Onofré :

> Thanks for the update Phil.
>
> I'm preparing a environment to reproduce it.
>
> I keep you posted.
>
> Thanks again,
> Regards
> JB
>
> On 10/15/2015 08:36 AM, Phil Kallos wrote:
>
>> Not a dumb question, but yes I updated all of the library references to
>> 1.5, including  (even tried 1.5.1).
>>
>> // Versions.spark set elsewhere to "1.5.0"
>> "org.apache.spark" %% "spark-streaming-kinesis-asl" % Versions.spark %
>> "provided"
>>
>> I am experiencing the issue in my own spark project, but also when I try
>> to run the spark streaming kinesis example that comes in spark/examples
>>
>> Tried running the streaming job locally, and also in EMR with release
>> 4.1.0 that includes Spark 1.5
>>
>> Very strange!
>>
>> -- Forwarded message --
>>
>> From: "Jean-Baptiste Onofré" > >>
>> To: user@spark.apache.org 
>>
>> Cc:
>> Date: Thu, 15 Oct 2015 08:03:55 +0200
>> Subject: Re: Spark 1.5 Streaming and Kinesis
>> Hi Phil,
>> KinesisReceiver is part of extra. Just a dumb question: did you
>> update all, including the Spark Kinesis extra containing the
>> KinesisReceiver ?
>> I checked on tag v1.5.0, and at line 175 of the KinesisReceiver, we
>> see:
>> blockIdToSeqNumRanges.clear()
>> which is a:
>> private val blockIdToSeqNumRanges = new
>> mutable.HashMap[StreamBlockId, SequenceNumberRanges]
>>  with mutable.SynchronizedMap[StreamBlockId, SequenceNumberRanges]
>> So, it doesn't look fully correct to me.
>> Let me investigate a bit this morning.
>> Regards
>> JB
>> On 10/15/2015 07:49 AM, Phil Kallos wrote:
>> We are trying to migrate from Spark1.4 to Spark1.5 for our Kinesis
>> streaming applications, to take advantage of the new Kinesis
>> checkpointing improvements in 1.5.
>> However after upgrading, we are consistently seeing the following
>> error:
>> java.lang.ClassCastException: scala.collection.mutable.HashMap cannot
>> be
>> cast to scala.collection.mutable.SynchronizedMap
>> at
>>
>> org.apache.spark.streaming.kinesis.KinesisReceiver.onStart(KinesisReceiver.scala:175)
>> at
>>
>> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
>> at
>>
>> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
>> at
>>
>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:542)
>> at
>>
>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:532)
>> at
>>
>> org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
>> at
>>
>> org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>> at org.apache.spark.scheduler.Task.run(Task.scala:88)
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)
>> I even get this when running the Kinesis examples :
>>
>> http://spark.apache.org/docs/latest/streaming-kinesis-integration.html
>> with
>> bin/run-example streaming.KinesisWordCountASL
>> Am I doing something incorrect?
>>
>>
>> --
>> Jean-Baptiste Onofré
>> jbono...@apache.org 
>> http://blog.nanthrax.net 
>> Talend - http://www.talend.com 
>>
>> Hi,
>>
>>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>
> 

Re: Spark 1.5 Streaming and Kinesis

2015-10-15 Thread Phil Kallos
Not a dumb question, but yes I updated all of the library references to
1.5, including  (even tried 1.5.1).

// Versions.spark set elsewhere to "1.5.0"
"org.apache.spark" %% "spark-streaming-kinesis-asl" % Versions.spark %
"provided"

I am experiencing the issue in my own spark project, but also when I try to
run the spark streaming kinesis example that comes in spark/examples

Tried running the streaming job locally, and also in EMR with release 4.1.0
that includes Spark 1.5

Very strange!


> -- Forwarded message --

From: "Jean-Baptiste Onofré" 
> To: user@spark.apache.org
> Cc:
> Date: Thu, 15 Oct 2015 08:03:55 +0200
> Subject: Re: Spark 1.5 Streaming and Kinesis
> Hi Phil,
> KinesisReceiver is part of extra. Just a dumb question: did you update
> all, including the Spark Kinesis extra containing the KinesisReceiver ?
> I checked on tag v1.5.0, and at line 175 of the KinesisReceiver, we see:
> blockIdToSeqNumRanges.clear()
> which is a:
> private val blockIdToSeqNumRanges = new mutable.HashMap[StreamBlockId,
> SequenceNumberRanges]
> with mutable.SynchronizedMap[StreamBlockId, SequenceNumberRanges]
> So, it doesn't look fully correct to me.
> Let me investigate a bit this morning.
> Regards
> JB
> On 10/15/2015 07:49 AM, Phil Kallos wrote:
> We are trying to migrate from Spark1.4 to Spark1.5 for our Kinesis
> streaming applications, to take advantage of the new Kinesis
> checkpointing improvements in 1.5.
> However after upgrading, we are consistently seeing the following error:
> java.lang.ClassCastException: scala.collection.mutable.HashMap cannot be
> cast to scala.collection.mutable.SynchronizedMap
> at
>
> org.apache.spark.streaming.kinesis.KinesisReceiver.onStart(KinesisReceiver.scala:175)
> at
>
> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
> at
>
> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
> at
>
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:542)
> at
>
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:532)
> at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
> at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:88)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> I even get this when running the Kinesis examples :
> http://spark.apache.org/docs/latest/streaming-kinesis-integration.html
>  with
> bin/run-example streaming.KinesisWordCountASL
> Am I doing something incorrect?
>
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com

Hi,
>


[no subject]

2015-10-15 Thread Lei Wu
Dear all,

Like the design doc in SPARK-1 for Spark memory management, is there a
design doc for Spark task scheduling details ? I'd really like to dive deep
into the task scheduling module of Spark, thanks so much !


PMML export for LinearRegressionModel

2015-10-15 Thread Fazlan Nazeem
Hi

I am trying to export a LinearRegressionModel in PMML format. According to
the following resource[1] PMML export is supported for
LinearRegressionModel.

[1] https://spark.apache.org/docs/latest/mllib-pmml-model-export.html

But there is *no* *toPMML* method in *LinearRegressionModel* class although
LogisticRegressionModel, ReidgeRegressionModel,SVMModel etc has toPMML
method.

Can someone explain what is the issue here?


Thanks & Regards,

Fazlan Nazeem

*Software Engineer*

*WSO2 Inc*
Mobile : +94772338839
<%2B94%20%280%29%20773%20451194>
fazl...@wso2.com


Re: Spark 1.5 Streaming and Kinesis

2015-10-15 Thread Jean-Baptiste Onofré

Thanks for the update Phil.

I'm preparing a environment to reproduce it.

I keep you posted.

Thanks again,
Regards
JB

On 10/15/2015 08:36 AM, Phil Kallos wrote:

Not a dumb question, but yes I updated all of the library references to
1.5, including  (even tried 1.5.1).

// Versions.spark set elsewhere to "1.5.0"
"org.apache.spark" %% "spark-streaming-kinesis-asl" % Versions.spark %
"provided"

I am experiencing the issue in my own spark project, but also when I try
to run the spark streaming kinesis example that comes in spark/examples

Tried running the streaming job locally, and also in EMR with release
4.1.0 that includes Spark 1.5

Very strange!

-- Forwarded message --

From: "Jean-Baptiste Onofré" >
To: user@spark.apache.org 
Cc:
Date: Thu, 15 Oct 2015 08:03:55 +0200
Subject: Re: Spark 1.5 Streaming and Kinesis
Hi Phil,
KinesisReceiver is part of extra. Just a dumb question: did you
update all, including the Spark Kinesis extra containing the
KinesisReceiver ?
I checked on tag v1.5.0, and at line 175 of the KinesisReceiver, we see:
blockIdToSeqNumRanges.clear()
which is a:
private val blockIdToSeqNumRanges = new
mutable.HashMap[StreamBlockId, SequenceNumberRanges]
 with mutable.SynchronizedMap[StreamBlockId, SequenceNumberRanges]
So, it doesn't look fully correct to me.
Let me investigate a bit this morning.
Regards
JB
On 10/15/2015 07:49 AM, Phil Kallos wrote:
We are trying to migrate from Spark1.4 to Spark1.5 for our Kinesis
streaming applications, to take advantage of the new Kinesis
checkpointing improvements in 1.5.
However after upgrading, we are consistently seeing the following error:
java.lang.ClassCastException: scala.collection.mutable.HashMap cannot be
cast to scala.collection.mutable.SynchronizedMap
at

org.apache.spark.streaming.kinesis.KinesisReceiver.onStart(KinesisReceiver.scala:175)
at

org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
at

org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
at

org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:542)
at

org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:532)
at
org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
at
org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
I even get this when running the Kinesis examples :
http://spark.apache.org/docs/latest/streaming-kinesis-integration.html with
bin/run-example streaming.KinesisWordCountASL
Am I doing something incorrect?


--
Jean-Baptiste Onofré
jbono...@apache.org 
http://blog.nanthrax.net 
Talend - http://www.talend.com 

Hi,



--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SPARK SQL Error

2015-10-15 Thread Giri
Hi Ritchard,

Thank you so much  again for your input.This time I ran the command in the
below way
spark-submit --master yarn --class org.spark.apache.CsvDataSource
/home/cloudera/Desktop/TestMain.jar  
hdfs://quickstart.cloudera:8020/people_csv
But I am facing the new error "Could not parse Master URL:
'hdfs://quickstart.cloudera:8020/people_csv'"
file path is correct
 
hadoop fs -ls hdfs://quickstart.cloudera:8020/people_csv
-rw-r--r--   1 cloudera supergroup 29 2015-10-10 00:02
hdfs://quickstart.cloudera:8020/people_csv

Can you help me to fix this new error

15/10/15 02:24:39 INFO spark.SparkContext: Added JAR
file:/home/cloudera/Desktop/TestMain.jar at
http://10.0.2.15:40084/jars/TestMain.jar with timestamp 1444901079484
Exception in thread "main" org.apache.spark.SparkException: Could not parse
Master URL: 'hdfs://quickstart.cloudera:8020/people_csv'
at
org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2244)
at org.apache.spark.SparkContext.(SparkContext.scala:361)
at org.apache.spark.SparkContext.(SparkContext.scala:154)
at org.spark.apache.CsvDataSource$.main(CsvDataSource.scala:10)
at org.spark.apache.CsvDataSource.main(CsvDataSource.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


Thanks & Regards,
Giri.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SPARK-SQL-Error-tp25050p25075.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Running in cluster mode causes native library linking to fail

2015-10-15 Thread prajod.vettiyattil
Forwarding to the group, in case someone else has the same error. Just found 
out that I did not reply to the group in my original reply.

From: Prajod S Vettiyattil (WT01 - BAS)
Sent: 15 October 2015 11:45
To: 'Bernardo Vecchia Stein' 
Subject: RE: Running in cluster mode causes native library linking to fail

Hi,

Also try the path settings given here: 
http://stackoverflow.com/questions/12279833/videocapture-opencv-2-4-2-error-in-windows/29278322#29278322

Forgot to add this link in my response earlier:
https://blogs.oracle.com/darcy/entry/purging_ld_library_path
http://www.oracle.com/technetwork/java/javase/jdk7-relnotes-418459.html

So from java 7, LD_LIBRARY_PATH is ignored. This is for Linux and Solaris. And 
probably for all other Unix derivatives.

Also check : System.loadLibrary() should be inside a static {  } block. Please 
check for its syntax on the internet. The loadlibrary function has to be called 
during class load time. That is why the static block is required.

What is your ?

1.  Spark version

2.  OS type and version

3.  Library that you are trying to load.



[I was using OpenCV. Had to go through many trials to get it working 
consistently. Initially, it would work only on dev environment(windows) but not 
on Ubuntu. Its been a few months. There is a stackoverflow answer I have given 
regarding this: 
http://stackoverflow.com/questions/12279833/videocapture-opencv-2-4-2-error-in-windows/29278322#29278322
 ]

Regards,
Prajod

From: Bernardo Vecchia Stein [mailto:bernardovst...@gmail.com]
Sent: 15 October 2015 00:36
To: Prajod S Vettiyattil (WT01 - BAS) 
Subject: Re: Running in cluster mode causes native library linking to fail

Hello Prajod,
Thanks for your reply! I am also using the standalone cluster manager. I do not 
build the jars in Eclipse and neither use Maven. They are built with sbt by 
hand.
I was setting LD_LIBRARY_PATH and LIBRARY_PATH to point to the paths with the 
library. When I didn't set them and set only PATH instead, spark would just not 
find the libraries (it was another error). I'm not sure what version you are 
using, but it appears I do have to set LD_LIBRARY_PATH in order to make things 
work.
I tried a simpler approach using System.load() with a specific path to the 
library, so I don't have to deal with these paths. However, I still get the 
same error when executing in cluster mode (exactly the same error). Do you have 
any idea why that might be failing?
Thank you again for your attention,
Bernardo

On 14 October 2015 at 03:30, 
> wrote:
Hi,

I have successfully made this working using the “standalone”cluster manager. 
Not tried with Mesos or YARN.

Which of these cluster managers are you using ? 
https://spark.apache.org/docs/1.1.0/cluster-overview.html
•Standalone 
– a simple cluster manager included with Spark that makes it easy to set up a 
cluster.
•Apache 
Mesos – a general 
cluster manager that can also run Hadoop MapReduce and service applications.
•Hadoop YARN 
– the resource manager in Hadoop 2.

I have run Spark using Scala in cluster mode, using the standalone cluster 
manager. It took a lot of effort. Also I think  that “UnsatisfiedLinkError” 
means that your .so could not be found.

There are two settings to make this work:

1.  “native library location” in the Eclipse configuration.(my jar for 
spark_submit () was built using maven build from Eclipse). I think that 
value(native library location) goes into the jar manifest.

2.  The PATH environment variable has to be set with the full path of the 
directory where your .so is located. The LD_LIBRARY_PATH was where it was done 
in earlier versions(prior to java 6) of java. But now it seems to be deprecated 
in favor of PATH. So you will find a lot of answers on the internet asking to 
set LD_LIBRARY_PATH. But that variable is ignored now.

Also note that the environment variable settings have to be made in each 
machine where your Spark Worker is running. This also needs that you understand 
where your app code is executed: in the driver(master machine) or in the 
executor(worker machine)

Prajod

From: Bernardo Vecchia Stein 
[mailto:bernardovst...@gmail.com]
Sent: 14 October 2015 10:15
To: user@spark.apache.org
Subject: Running in cluster mode causes native library linking to fail

Hello,
I am trying to run some scala code in cluster mode using spark-submit. This 
code uses addLibrary to link with a .so that exists in the machine, and this 
library has a function to be called natively (there's a native definition as 
needed in the code).
The problem I'm facing is: whenever I try to run 

Re: Spark on Mesos / Executor Memory

2015-10-15 Thread Bharath Ravi Kumar
Resending since user@mesos bounced earlier. My apologies.

On Thu, Oct 15, 2015 at 12:19 PM, Bharath Ravi Kumar 
wrote:

> (Reviving this thread since I ran into similar issues...)
>
> I'm running two spark jobs (in mesos fine grained mode), each belonging to
> a different mesos role, say low and high. The low:high mesos weights are
> 1:10. On expected lines, I see that the low priority job occupies cluster
> resources to the maximum extent when running alone. However, when the high
> priority job is submitted, it does not start and continues to await cluster
> resources (as seen in the logs). Since the jobs run in fine grained mode
> and the low priority tasks begin to finish, the high priority job should
> ideally be able to start and gradually take over cluster resources as per
> the weights. However, I noticed that while the "low" job gives up CPU cores
> with each completing task (e.g. reduction from 72 -> 12 with default
> parallelism set to 72), the memory resources are held on (~500G out of
> 768G). The spark.executor.memory setting appears to directly impact the
> amount of memory that the job holds on to. In this case, it was set to 200G
> in the low priority task and 100G in the high priority task. The nature of
> these jobs is such that setting the numbers to smaller values (say 32g)
> resulted in job failures with outofmemoryerror.  It appears that the spark
> framework is retaining memory (across tasks)  proportional to
> spark.executor.memory for the duration of the job and not releasing memory
> as tasks complete. This defeats the purpose of fine grained mode execution
> as the memory occupancy is preventing the high priority job from accepting
> the prioritized cpu offers and beginning execution. Can this be explained /
> documented better please?
>
> Thanks,
> Bharath
>
> On Sat, Apr 11, 2015 at 10:59 PM, Tim Chen  wrote:
>
>> (Adding spark user list)
>>
>> Hi Tom,
>>
>> If I understand correctly you're saying that you're running into memory
>> problems because the scheduler is allocating too much CPUs and not enough
>> memory to acoomodate them right?
>>
>> In the case of fine grain mode I don't think that's a problem since we
>> have a fixed amount of CPU and memory per task.
>> However, in coarse grain you can run into that problem if you're with in
>> the spark.cores.max limit, and memory is a fixed number.
>>
>> I have a patch out to configure how much max cpus should coarse grain
>> executor use, and it also allows multiple executors in coarse grain mode.
>> So you could say try to launch multiples of max 4 cores with
>> spark.executor.memory (+ overhead and etc) in a slave. (
>> https://github.com/apache/spark/pull/4027)
>>
>> It also might be interesting to include a cores to memory multiplier so
>> that with a larger amount of cores we try to scale the memory with some
>> factor, but I'm not entirely sure that's intuitive to use and what people
>> know what to set it to, as that can likely change with different workload.
>>
>> Tim
>>
>>
>>
>>
>>
>>
>>
>> On Sat, Apr 11, 2015 at 9:51 AM, Tom Arnfeld  wrote:
>>
>>> We're running Spark 1.3.0 (with a couple of patches over the top for
>>> docker related bits).
>>>
>>> I don't think SPARK-4158 is related to what we're seeing, things do run
>>> fine on the cluster, given a ridiculously large executor memory
>>> configuration. As for SPARK-3535 although that looks useful I think we'e
>>> seeing something else.
>>>
>>> Put a different way, the amount of memory required at any given time by
>>> the spark JVM process is directly proportional to the amount of CPU it has,
>>> because more CPU means more tasks and more tasks means more memory. Even if
>>> we're using coarse mode, the amount of executor memory should be
>>> proportionate to the amount of CPUs in the offer.
>>>
>>> On 11 April 2015 at 17:39, Brenden Matthews 
>>> wrote:
>>>
 I ran into some issues with it a while ago, and submitted a couple PRs
 to fix it:

 https://github.com/apache/spark/pull/2401
 https://github.com/apache/spark/pull/3024

 Do these look relevant? What version of Spark are you running?

 On Sat, Apr 11, 2015 at 9:33 AM, Tom Arnfeld  wrote:

> Hey,
>
> Not sure whether it's best to ask this on the spark mailing list or
> the mesos one, so I'll try here first :-)
>
> I'm having a bit of trouble with out of memory errors in my spark
> jobs... it seems fairly odd to me that memory resources can only be set at
> the executor level, and not also at the task level. For example, as far as
> I can tell there's only a *spark.executor.memory* config option.
>
> Surely the memory requirements of a single executor are quite
> dramatically influenced by the number of concurrent tasks running? Given a
> shared cluster, I have no idea what % of an individual slave my executor 
> is

How VectorIndexer works in Spark ML pipelines

2015-10-15 Thread VISHNU SUBRAMANIAN
HI All,

I am trying to use the VectorIndexer (FeatureExtraction) technique
available from the Spark ML Pipelines.

I ran the example in the documentation .

val featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4)
  .fit(data)


And then I wanted to see what output it generates.

After performing transform on the data set , the output looks like below.

scala> predictions.select("indexedFeatures").take(1).foreach(println)

[(692,[124,125,126,127,151,152,153,154,155,179,180,181,182,183,208,209,210,211,235,236,237,238,239,263,264,265,266,267,268,292,293,294,295,296,321,322,323,324,349,350,351,352,377,378,379,380,405,406,407,408,433,434,435,436,461,462,463,464,489,490,491,492,493,517,518,519,520,521,545,546,547,548,549,574,575,576,577,578,602,603,604,605,606,630,631,632,633,634,658,659,660,661,662],[145.0,255.0,211.0,31.0,32.0,237.0,253.0,252.0,71.0,11.0,175.0,253.0,252.0,71.0,144.0,253.0,252.0,71.0,16.0,191.0,253.0,252.0,71.0,26.0,221.0,253.0,252.0,124.0,31.0,125.0,253.0,252.0,252.0,108.0,253.0,252.0,252.0,108.0,255.0,253.0,253.0,108.0,253.0,252.0,252.0,108.0,253.0,252.0,252.0,108.0,253.0,252.0,252.0,108.0,255.0,253.0,253.0,170.0,253.0,252.0,252.0,252.0,42.0,149.0,252.0,252.0,252.0,144.0,109.0,252.0,252.0,252.0,144.0,218.0,253.0,253.0,255.0,35.0,175.0,252.0,252.0,253.0,35.0,73.0,252.0,252.0,253.0,35.0,31.0,211.0,252.0,253.0,35.0])]


scala> predictions.select("features").take(1).foreach(println)

[(692,[124,125,126,127,151,152,153,154,155,179,180,181,182,183,208,209,210,211,235,236,237,238,239,263,264,265,266,267,268,292,293,294,295,296,321,322,323,324,349,350,351,352,377,378,379,380,405,406,407,408,433,434,435,436,461,462,463,464,489,490,491,492,493,517,518,519,520,521,545,546,547,548,549,574,575,576,577,578,602,603,604,605,606,630,631,632,633,634,658,659,660,661,662],[145.0,255.0,211.0,31.0,32.0,237.0,253.0,252.0,71.0,11.0,175.0,253.0,252.0,71.0,144.0,253.0,252.0,71.0,16.0,191.0,253.0,252.0,71.0,26.0,221.0,253.0,252.0,124.0,31.0,125.0,253.0,252.0,252.0,108.0,253.0,252.0,252.0,108.0,255.0,253.0,253.0,108.0,253.0,252.0,252.0,108.0,253.0,252.0,252.0,108.0,253.0,252.0,252.0,108.0,255.0,253.0,253.0,170.0,253.0,252.0,252.0,252.0,42.0,149.0,252.0,252.0,252.0,144.0,109.0,252.0,252.0,252.0,144.0,218.0,253.0,253.0,255.0,35.0,175.0,252.0,252.0,253.0,35.0,73.0,252.0,252.0,253.0,35.0,31.0,211.0,252.0,253.0,35.0])]

I can,t understand what is happening. I tried with simple data sets also ,
but similar result.

Please help.

Thanks,

Vishnu


Re: Spark 1.5 Streaming and Kinesis

2015-10-15 Thread Eugen Cepoi
So running it using spark-submit doesnt change anything, it still works.

When reading the code
https://github.com/apache/spark/blob/branch-1.5/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala#L100
it looks like the receivers are definitely being ser/de. I think this is
the issue, need to find a way to confirm that now...

2015-10-15 16:12 GMT+07:00 Eugen Cepoi :

> Hey,
>
> A quick update on other things that have been tested.
>
> When looking at the compiled code of the spark-streaming-kinesis-asl jar
> everything looks normal (there is a class that implements SyncMap and it is
> used inside the receiver).
> Starting a spark shell and using introspection to instantiate a receiver
> and check that blockIdToSeqNumRanges implements SyncMap works too. So
> obviously it has the correct type according to that.
>
> Another thing to test could be to do the same introspection stuff but
> inside a spark job to make sure it is not a problem in the way the jobs are
> run.
> The other idea would be that this is a problem related to ser/de. For
> example if the receiver was being serialized and then deserialized it could
> definitely happen depending on the lib used and its configuration that it
> just doesn't preserve the concrete type. So it would deserialize using the
> compile type instead of the runtime type.
>
> Cheers,
> Eugen
>
>
> 2015-10-15 13:41 GMT+07:00 Jean-Baptiste Onofré :
>
>> Thanks for the update Phil.
>>
>> I'm preparing a environment to reproduce it.
>>
>> I keep you posted.
>>
>> Thanks again,
>> Regards
>> JB
>>
>> On 10/15/2015 08:36 AM, Phil Kallos wrote:
>>
>>> Not a dumb question, but yes I updated all of the library references to
>>> 1.5, including  (even tried 1.5.1).
>>>
>>> // Versions.spark set elsewhere to "1.5.0"
>>> "org.apache.spark" %% "spark-streaming-kinesis-asl" % Versions.spark %
>>> "provided"
>>>
>>> I am experiencing the issue in my own spark project, but also when I try
>>> to run the spark streaming kinesis example that comes in spark/examples
>>>
>>> Tried running the streaming job locally, and also in EMR with release
>>> 4.1.0 that includes Spark 1.5
>>>
>>> Very strange!
>>>
>>> -- Forwarded message --
>>>
>>> From: "Jean-Baptiste Onofré" > j...@nanthrax.net>>
>>> To: user@spark.apache.org 
>>>
>>> Cc:
>>> Date: Thu, 15 Oct 2015 08:03:55 +0200
>>> Subject: Re: Spark 1.5 Streaming and Kinesis
>>> Hi Phil,
>>> KinesisReceiver is part of extra. Just a dumb question: did you
>>> update all, including the Spark Kinesis extra containing the
>>> KinesisReceiver ?
>>> I checked on tag v1.5.0, and at line 175 of the KinesisReceiver, we
>>> see:
>>> blockIdToSeqNumRanges.clear()
>>> which is a:
>>> private val blockIdToSeqNumRanges = new
>>> mutable.HashMap[StreamBlockId, SequenceNumberRanges]
>>>  with mutable.SynchronizedMap[StreamBlockId,
>>> SequenceNumberRanges]
>>> So, it doesn't look fully correct to me.
>>> Let me investigate a bit this morning.
>>> Regards
>>> JB
>>> On 10/15/2015 07:49 AM, Phil Kallos wrote:
>>> We are trying to migrate from Spark1.4 to Spark1.5 for our Kinesis
>>> streaming applications, to take advantage of the new Kinesis
>>> checkpointing improvements in 1.5.
>>> However after upgrading, we are consistently seeing the following
>>> error:
>>> java.lang.ClassCastException: scala.collection.mutable.HashMap
>>> cannot be
>>> cast to scala.collection.mutable.SynchronizedMap
>>> at
>>>
>>> org.apache.spark.streaming.kinesis.KinesisReceiver.onStart(KinesisReceiver.scala:175)
>>> at
>>>
>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
>>> at
>>>
>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
>>> at
>>>
>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:542)
>>> at
>>>
>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:532)
>>> at
>>>
>>> org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
>>> at
>>>
>>> org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:88)
>>> at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>>> at
>>>
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>> at
>>>
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>> at java.lang.Thread.run(Thread.java:745)
>>> I even get this when running the Kinesis examples :
>>>
>>> 

Re: Spark on Mesos / Executor Memory

2015-10-15 Thread Bharath Ravi Kumar
(Reviving this thread since I ran into similar issues...)

I'm running two spark jobs (in mesos fine grained mode), each belonging to
a different mesos role, say low and high. The low:high mesos weights are
1:10. On expected lines, I see that the low priority job occupies cluster
resources to the maximum extent when running alone. However, when the high
priority job is submitted, it does not start and continues to await cluster
resources (as seen in the logs). Since the jobs run in fine grained mode
and the low priority tasks begin to finish, the high priority job should
ideally be able to start and gradually take over cluster resources as per
the weights. However, I noticed that while the "low" job gives up CPU cores
with each completing task (e.g. reduction from 72 -> 12 with default
parallelism set to 72), the memory resources are held on (~500G out of
768G). The spark.executor.memory setting appears to directly impact the
amount of memory that the job holds on to. In this case, it was set to 200G
in the low priority task and 100G in the high priority task. The nature of
these jobs is such that setting the numbers to smaller values (say 32g)
resulted in job failures with outofmemoryerror.  It appears that the spark
framework is retaining memory (across tasks)  proportional to
spark.executor.memory for the duration of the job and not releasing memory
as tasks complete. This defeats the purpose of fine grained mode execution
as the memory occupancy is preventing the high priority job from accepting
the prioritized cpu offers and beginning execution. Can this be explained /
documented better please?

Thanks,
Bharath

On Sat, Apr 11, 2015 at 10:59 PM, Tim Chen  wrote:

> (Adding spark user list)
>
> Hi Tom,
>
> If I understand correctly you're saying that you're running into memory
> problems because the scheduler is allocating too much CPUs and not enough
> memory to acoomodate them right?
>
> In the case of fine grain mode I don't think that's a problem since we
> have a fixed amount of CPU and memory per task.
> However, in coarse grain you can run into that problem if you're with in
> the spark.cores.max limit, and memory is a fixed number.
>
> I have a patch out to configure how much max cpus should coarse grain
> executor use, and it also allows multiple executors in coarse grain mode.
> So you could say try to launch multiples of max 4 cores with
> spark.executor.memory (+ overhead and etc) in a slave. (
> https://github.com/apache/spark/pull/4027)
>
> It also might be interesting to include a cores to memory multiplier so
> that with a larger amount of cores we try to scale the memory with some
> factor, but I'm not entirely sure that's intuitive to use and what people
> know what to set it to, as that can likely change with different workload.
>
> Tim
>
>
>
>
>
>
>
> On Sat, Apr 11, 2015 at 9:51 AM, Tom Arnfeld  wrote:
>
>> We're running Spark 1.3.0 (with a couple of patches over the top for
>> docker related bits).
>>
>> I don't think SPARK-4158 is related to what we're seeing, things do run
>> fine on the cluster, given a ridiculously large executor memory
>> configuration. As for SPARK-3535 although that looks useful I think we'e
>> seeing something else.
>>
>> Put a different way, the amount of memory required at any given time by
>> the spark JVM process is directly proportional to the amount of CPU it has,
>> because more CPU means more tasks and more tasks means more memory. Even if
>> we're using coarse mode, the amount of executor memory should be
>> proportionate to the amount of CPUs in the offer.
>>
>> On 11 April 2015 at 17:39, Brenden Matthews  wrote:
>>
>>> I ran into some issues with it a while ago, and submitted a couple PRs
>>> to fix it:
>>>
>>> https://github.com/apache/spark/pull/2401
>>> https://github.com/apache/spark/pull/3024
>>>
>>> Do these look relevant? What version of Spark are you running?
>>>
>>> On Sat, Apr 11, 2015 at 9:33 AM, Tom Arnfeld  wrote:
>>>
 Hey,

 Not sure whether it's best to ask this on the spark mailing list or the
 mesos one, so I'll try here first :-)

 I'm having a bit of trouble with out of memory errors in my spark
 jobs... it seems fairly odd to me that memory resources can only be set at
 the executor level, and not also at the task level. For example, as far as
 I can tell there's only a *spark.executor.memory* config option.

 Surely the memory requirements of a single executor are quite
 dramatically influenced by the number of concurrent tasks running? Given a
 shared cluster, I have no idea what % of an individual slave my executor is
 going to get, so I basically have to set the executor memory to a value
 that's correct when the whole machine is in use...

 Has anyone else running Spark on Mesos come across this, or maybe
 someone could correct my understanding of the config options?


Re: spark-shell :javap fails with complaint about JAVA_HOME, but it is set correctly

2015-10-15 Thread Shixiong Zhu
Scala 2.10 REPL javap doesn't support Java7 or Java8. It was fixed in Scala
2.11. See https://issues.scala-lang.org/browse/SI-4936

Best Regards,
Shixiong Zhu

2015-10-15 4:19 GMT+08:00 Robert Dodier :

> Hi,
>
> I am working with Spark 1.5.1 (official release), with Oracle Java8,
> on Ubuntu 14.04. echo $JAVA_HOME says "/usr/lib/jvm/java-8-oracle".
>
> I'd like to use :javap in spark-shell, but I get an error message:
>
> scala> :javap java.lang.Object
> Failed: Could not load javap tool. Check that JAVA_HOME is correct.
>
> However ls $JAVA_HOME/lib/tools.jar shows that it is there.
>
> I tried starting spark-shell with -toolcp $JAVA_HOME/lib/tools.jar but
> I get the same error.
>
> For comparison, if execute scala and enter :javap java.lang.Object, it
> works as expected.
>
> Not sure where to go from here. Thanks for any advice.
>
> best,
>
> Robert Dodier
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Design doc for Spark task scheduling

2015-10-15 Thread Lei Wu
Like the design doc in SPARK-1 for Spark memory management, is there a
design doc for Spark task scheduling details ? I'd really like to dive deep
into the task scheduling module of Spark, thanks so much !

Forgot the email title in previous mail, sorry for that.


How to specify the numFeatures in HashingTF

2015-10-15 Thread Jianguo Li
Hi,

There is a parameter in the HashingTF called "numFeatures". I was wondering
what is the best way to set the value to this parameter. In the use case of
text categorization, do you need to know in advance the number of words in
your vocabulary? or do you set it to be a large value, greater than the
number of words in your vocabulary?

Thanks,

Jianguo


Re: Best practices to handle corrupted records

2015-10-15 Thread Antonio Murgia
'Either' does not cover the case where the outcome was successful but generated 
warnings. I already looked into it and also at 'Try' from which I got inspired. 
Thanks for pointing it out anyway!

#A.M.

Il giorno 15 ott 2015, alle ore 16:19, Erwan ALLAIN 
> ha scritto:

What about http://www.scala-lang.org/api/2.9.3/scala/Either.html ?


On Thu, Oct 15, 2015 at 2:57 PM, Roberto Congiu 
> wrote:
I came to a similar solution to a similar problem. I deal with a lot of CSV 
files from many different sources and they are often malformed.
HOwever, I just have success/failure. Maybe you should  make 
SuccessWithWarnings a subclass of success, or getting rid of it altogether 
making the warnings optional.
I was thinking of making this cleaning/conforming library open source if you're 
interested.

R.

2015-10-15 5:28 GMT-07:00 Antonio Murgia 
>:
Hello,
I looked around on the web and I couldn't find any way to deal in a structured 
way with malformed/faulty records during computation. All I was able to find 
was the flatMap/Some/None technique + logging.
I'm facing this problem because I have a processing algorithm that extracts 
more than one value from each record, but can fail in extracting one of those 
multiple values, and I want to keep track of them. Logging is not feasible 
because this "warning" happens so frequently that the logs would become 
overwhelming and impossibile to read.
Since I have 3 different possible outcomes from my processing I modeled it with 
this class hierarchy:
[cid:935118B9-A7BA-4D67-815A-B861FA866DAF]
That holds result and/or warnings.
Since Result implements Traversable it can be used in a flatMap, discarding all 
warnings and failure results, in the other hand, if we want to keep track of 
warnings, we can elaborate them and output them if we need.

Kind Regards
#A.M.



--
--
"Good judgment comes from experience.
Experience comes from bad judgment"
--



Re: How to specify the numFeatures in HashingTF

2015-10-15 Thread Nick Pentreath
Setting the numfeatures higher than vocab size will tend to reduce the chance 
of hash collisions, but it's not strictly necessary - it becomes a memory / 
accuracy trade off.





Surprisingly, the impact on model performance of moderate hash collisions is 
often not significant.






So it may be worth trying a few settings out (lower than vocab, higher etc) and 
see what the impact is on evaluation metrics.



—
Sent from Mailbox

On Thu, Oct 15, 2015 at 5:46 PM, Jianguo Li 
wrote:

> Hi,
> There is a parameter in the HashingTF called "numFeatures". I was wondering
> what is the best way to set the value to this parameter. In the use case of
> text categorization, do you need to know in advance the number of words in
> your vocabulary? or do you set it to be a large value, greater than the
> number of words in your vocabulary?
> Thanks,
> Jianguo

Re: Best practices to handle corrupted records

2015-10-15 Thread Erwan ALLAIN
What about http://www.scala-lang.org/api/2.9.3/scala/Either.html ?


On Thu, Oct 15, 2015 at 2:57 PM, Roberto Congiu 
wrote:

> I came to a similar solution to a similar problem. I deal with a lot of
> CSV files from many different sources and they are often malformed.
> HOwever, I just have success/failure. Maybe you should  make
> SuccessWithWarnings a subclass of success, or getting rid of it altogether
> making the warnings optional.
> I was thinking of making this cleaning/conforming library open source if
> you're interested.
>
> R.
>
> 2015-10-15 5:28 GMT-07:00 Antonio Murgia 
> :
>
>> Hello,
>> I looked around on the web and I couldn’t find any way to deal in a
>> structured way with malformed/faulty records during computation. All I was
>> able to find was the flatMap/Some/None technique + logging.
>> I’m facing this problem because I have a processing algorithm that
>> extracts more than one value from each record, but can fail in extracting
>> one of those multiple values, and I want to keep track of them. Logging is
>> not feasible because this “warning” happens so frequently that the logs
>> would become overwhelming and impossibile to read.
>> Since I have 3 different possible outcomes from my processing I modeled
>> it with this class hierarchy:
>> That holds result and/or warnings.
>> Since Result implements Traversable it can be used in a flatMap,
>> discarding all warnings and failure results, in the other hand, if we want
>> to keep track of warnings, we can elaborate them and output them if we need.
>>
>> Kind Regards
>> #A.M.
>>
>
>
>
> --
> --
> "Good judgment comes from experience.
> Experience comes from bad judgment"
> --
>


How to enable Spark mesos docker executor?

2015-10-15 Thread Klaus Ma
Hi team,
I'm working on integration between Mesos & Spark. For now, I can start 
SlaveMesosDispatcher in a docker; and I like to also run Spark executor in 
Mesos docker. I do the following configuration for it, but I got an error; any 
suggestion?Configuration:Spark: 
conf/spark-defaults.confspark.mesos.executor.docker.imageubuntu
spark.mesos.executor.docker.volumes  
/usr/bin:/usr/bin,/usr/local/lib:/usr/local/lib,/usr/lib:/usr/lib,/lib:/lib,/home/test/workshop/spark:/root/spark
spark.mesos.executor.home/root/spark
#spark.executorEnv.SPARK_HOME /root/spark
spark.executorEnv.MESOS_NATIVE_LIBRARY   /usr/local/lib
NOTE: The spark are installed in /home/test/workshop/spark, and all 
dependencies are installed.After submit SparkPi to the dispatcher, the driver 
job is started but failed. The error messes is:I1015 11:10:29.488456 18697 
exec.cpp:134] Version: 0.26.0
I1015 11:10:29.506619 18699 exec.cpp:208] Executor registered on slave 
b7e24114-7585-40bc-879b-6a1188cb65b6-S1
WARNING: Your kernel does not support swap limit capabilities, memory limited 
without swap.
/bin/sh: 1: ./bin/spark-submit: not found
Does any know how to map/set spark home in docker for this case?
 
Da (Klaus), Ma (马达) | PMP® | Advisory Software Engineer 
Platform Symphony/DCOS Development & Support, STG, IBM GCG 
+86-10-8245 4084 | mad...@cn.ibm.com | http://www.cguru.net
  

Re: How to enable Spark mesos docker executor?

2015-10-15 Thread Timothy Chen
Hi Klaus,

Sorry not next to a computer but it could possibily be a bug that it doesn't 
take SPARK_HOME as the base path. Currently the spark image seems to set the 
working directory so that it works. 

I'll look at the code to verify but seems like it could be the case. If it's 
true feel free to create a JIRA and/or provide a fix.

Tim

> On Oct 16, 2015, at 9:28 AM, Klaus Ma  wrote:
> 
> Hi team,
> 
> 
> 
> I'm working on integration between Mesos & Spark. For now, I can start 
> SlaveMesosDispatcher in a docker; and I like to also run Spark executor in 
> Mesos docker. I do the following configuration for it, but I got an error; 
> any suggestion?
> 
> Configuration:
> 
> Spark: conf/spark-defaults.conf
> 
> spark.mesos.executor.docker.imageubuntu
> spark.mesos.executor.docker.volumes  
> /usr/bin:/usr/bin,/usr/local/lib:/usr/local/lib,/usr/lib:/usr/lib,/lib:/lib,/home/test/workshop/spark:/root/spark
> spark.mesos.executor.home/root/spark
> #spark.executorEnv.SPARK_HOME /root/spark
> spark.executorEnv.MESOS_NATIVE_LIBRARY   /usr/local/lib
> NOTE: The spark are installed in /home/test/workshop/spark, and all 
> dependencies are installed.
> 
> After submit SparkPi to the dispatcher, the driver job is started but failed. 
> The error messes is:
> 
> I1015 11:10:29.488456 18697 exec.cpp:134] Version: 0.26.0
> I1015 11:10:29.506619 18699 exec.cpp:208] Executor registered on slave 
> b7e24114-7585-40bc-879b-6a1188cb65b6-S1
> WARNING: Your kernel does not support swap limit capabilities, memory limited 
> without swap.
> /bin/sh: 1: ./bin/spark-submit: not found
> Does any know how to map/set spark home in docker for this case?
> 
> 
>  
> Da (Klaus), Ma (马达) | PMP® | Advisory Software Engineer 
> Platform Symphony/DCOS Development & Support, STG, IBM GCG 
> +86-10-8245 4084 | mad...@cn.ibm.com | http://www.cguru.net


[Spark ML] How to extends MLlib's optimization algorithm

2015-10-15 Thread Zhiliang Zhu
Dear All,
I would like to use spark ml to develop some project related with optimization 
algorithm, however, in spark 1.4.1 it seems that under ml's optimizer there are 
only about 2 optimization algorithms.
My project may needs more kinds of optimization algorithms, then how would I 
use spark ml to develop it? And for the given optimization algorithm, it would 
be with different constraint 
conditions (math formula), exactly how would I actualize & solve those complex 
math formulas in the optimization algorithm?
If there are some comments, or some examples / links, it would be much useful.I 
must appreciate your help very much!
Thank you,Zhiliang





Re: Spark streaming checkpoint against s3

2015-10-15 Thread Tian Zhang
So as long as jar is kept on s3 and available across different runs, then the
s3 checkpoint is working.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-checkpoint-against-s3-tp25068p25081.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




RE: How to enable Spark mesos docker executor?

2015-10-15 Thread Klaus Ma
Hi Timothy,
Thanks for your feedback, I logged 
https://issues.apache.org/jira/browse/SPARK-11143 to trace this issue.

If any more suggestions, please let me know :).

 
Da (Klaus), Ma (马达) | PMP® | Advisory Software Engineer 
Platform Symphony/DCOS Development & Support, STG, IBM GCG 
+86-10-8245 4084 | mad...@cn.ibm.com | http://www.cguru.net


Subject: Re: How to enable Spark mesos docker executor?
From: t...@mesosphere.io
Date: Fri, 16 Oct 2015 10:11:36 +0800
CC: user@spark.apache.org
To: kl...@cguru.net

Hi Klaus,
Sorry not next to a computer but it could possibily be a bug that it doesn't 
take SPARK_HOME as the base path. Currently the spark image seems to set the 
working directory so that it works. 
I'll look at the code to verify but seems like it could be the case. If it's 
true feel free to create a JIRA and/or provide a fix.
Tim
On Oct 16, 2015, at 9:28 AM, Klaus Ma  wrote:




Hi team,
I'm working on integration between Mesos & Spark. For now, I can start 
SlaveMesosDispatcher in a docker; and I like to also run Spark executor in 
Mesos docker. I do the following configuration for it, but I got an error; any 
suggestion?Configuration:Spark: 
conf/spark-defaults.confspark.mesos.executor.docker.imageubuntu
spark.mesos.executor.docker.volumes  
/usr/bin:/usr/bin,/usr/local/lib:/usr/local/lib,/usr/lib:/usr/lib,/lib:/lib,/home/test/workshop/spark:/root/spark
spark.mesos.executor.home/root/spark
#spark.executorEnv.SPARK_HOME /root/spark
spark.executorEnv.MESOS_NATIVE_LIBRARY   /usr/local/lib
NOTE: The spark are installed in /home/test/workshop/spark, and all 
dependencies are installed.After submit SparkPi to the dispatcher, the driver 
job is started but failed. The error messes is:I1015 11:10:29.488456 18697 
exec.cpp:134] Version: 0.26.0
I1015 11:10:29.506619 18699 exec.cpp:208] Executor registered on slave 
b7e24114-7585-40bc-879b-6a1188cb65b6-S1
WARNING: Your kernel does not support swap limit capabilities, memory limited 
without swap.
/bin/sh: 1: ./bin/spark-submit: not found
Does any know how to map/set spark home in docker for this case?
 
Da (Klaus), Ma (马达) | PMP® | Advisory Software Engineer 
Platform Symphony/DCOS Development & Support, STG, IBM GCG 
+86-10-8245 4084 | mad...@cn.ibm.com | http://www.cguru.net
  
  

Get list of Strings from its Previous State

2015-10-15 Thread Yogesh Vyas
Hi,
I am new to Spark and was trying to do some experiments with it.

I had a JavaPairDStream RDD.
I want to get the list of string from its previous state. For that I
use updateStateByKey function as follows:

final Function2,
Optional> updateFunc =
   new Function2,
Optional>() {

public Optional call(List arg0,
Optional arg1) throws Exception {
// TODO Auto-generated method stub
if(arg1.toString()==null)
   return Optional.of(arg0);
else {
   arg0.add(arg1.toString());
   return Optional.of(arg0);
}
   }
};

I want the function to append the new list of string to the previous
list and return the new list. But I am not able to do so. I am getting
the " java.lang.UnsupportedOperationException" error.
Can anyone which help me out in getting the desired output?

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: dataframes and numPartitions

2015-10-15 Thread Mohammed Guller
You may find the spark.sql.shuffle.partitions property useful. The default 
value is 200.

Mohammed

From: Alex Nastetsky [mailto:alex.nastet...@vervemobile.com]
Sent: Wednesday, October 14, 2015 8:14 PM
To: user
Subject: dataframes and numPartitions

A lot of RDD methods take a numPartitions parameter that lets you specify the 
number of partitions in the result. For example, groupByKey.

The DataFrame counterparts don't have a numPartitions parameter, e.g. groupBy 
only takes a bunch of Columns as params.

I understand that the DataFrame API is supposed to be smarter and go through a 
LogicalPlan, and perhaps determine the number of optimal partitions for you, 
but sometimes you want to specify the number of partitions yourself. One such 
use case is when you are preparing to do a "merge" join with another dataset 
that is similarly partitioned with the same number of partitions.


RE: Spark SQL running totals

2015-10-15 Thread Stefan Panayotov
Thanks to all of you guys for the helpful suggestions. I'll try these first 
thing tomorrow morning.

Stefan Panayotov
Sent from my Windows Phone

From: java8964
Sent: ‎10/‎15/‎2015 4:30 PM
To: Michael Armbrust; Deenar 
Toraskar
Cc: Stefan Panayotov; 
user@spark.apache.org
Subject: RE: Spark SQL running totals

My mistake. I didn't noticed "UNBOUNDED PRECEDING" already supported.
So cumulative sum should work then.
Thanks
Yong

From: java8...@hotmail.com
To: mich...@databricks.com; deenar.toras...@gmail.com
CC: spanayo...@msn.com; user@spark.apache.org
Subject: RE: Spark SQL running totals
Date: Thu, 15 Oct 2015 16:24:39 -0400




Not sure the windows function can work for his case.
If you do a "sum() over (partitioned by)", that will return a total sum per 
partition, instead of a cumulative sum wanted in this case.
I saw there is a "cume_dis", but no "cume_sum".
Do we really have a "cume_sum" in Spark window function, or am I total 
misunderstand about "sum() over (partitioned by)" in it?
Yong

From: mich...@databricks.com
Date: Thu, 15 Oct 2015 11:51:59 -0700
Subject: Re: Spark SQL running totals
To: deenar.toras...@gmail.com
CC: spanayo...@msn.com; user@spark.apache.org

Check out: 
https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
On Thu, Oct 15, 2015 at 11:35 AM, Deenar Toraskar  
wrote:
you can do a self join of the table with itself with the join clause being 
a.col1 >= b.col1
select a.col1, a.col2, sum(b.col2)from tablea as a left outer join tablea as b 
on (a.col1 >= b.col1)group by a.col1, a.col2
I havent tried it, but cant see why it cant work, but doing it in RDD might be 
more efficient see 
https://bzhangusc.wordpress.com/2014/06/21/calculate-running-sums/
On 15 October 2015 at 18:48, Stefan Panayotov  wrote:



Hi,

I need help with Spark SQL. I need to achieve something like the following.
If I have data like:

col_1  col_2
1 10
2 30
3 15
4 20
5 25

I need to get col_3 to be the running total of the sum of the previous rows of 
col_2, e.g.

col_1  col_2  col_3
1 1010
2 3040
3 1555
4 2075
5 25100

Is there a way to achieve this in Spark SQL or maybe with Data frame 
transformations?

Thanks in advance,


Stefan Panayotov, PhD
Home: 610-355-0919
Cell: 610-517-5586
email: spanayo...@msn.com
spanayo...@outlook.com
spanayo...@comcast.net







RE: Spark SQL running totals

2015-10-15 Thread java8964
My mistake. I didn't noticed "UNBOUNDED PRECEDING" already supported. 
So cumulative sum should work then.
Thanks
Yong

From: java8...@hotmail.com
To: mich...@databricks.com; deenar.toras...@gmail.com
CC: spanayo...@msn.com; user@spark.apache.org
Subject: RE: Spark SQL running totals
Date: Thu, 15 Oct 2015 16:24:39 -0400




Not sure the windows function can work for his case.
If you do a "sum() over (partitioned by)", that will return a total sum per 
partition, instead of a cumulative sum wanted in this case.
I saw there is a "cume_dis", but no "cume_sum".
Do we really have a "cume_sum" in Spark window function, or am I total 
misunderstand about "sum() over (partitioned by)" in it?
Yong

From: mich...@databricks.com
Date: Thu, 15 Oct 2015 11:51:59 -0700
Subject: Re: Spark SQL running totals
To: deenar.toras...@gmail.com
CC: spanayo...@msn.com; user@spark.apache.org

Check out: 
https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
On Thu, Oct 15, 2015 at 11:35 AM, Deenar Toraskar  
wrote:
you can do a self join of the table with itself with the join clause being 
a.col1 >= b.col1
select a.col1, a.col2, sum(b.col2)from tablea as a left outer join tablea as b 
on (a.col1 >= b.col1)group by a.col1, a.col2
I havent tried it, but cant see why it cant work, but doing it in RDD might be 
more efficient see 
https://bzhangusc.wordpress.com/2014/06/21/calculate-running-sums/
On 15 October 2015 at 18:48, Stefan Panayotov  wrote:



Hi,
 
I need help with Spark SQL. I need to achieve something like the following.
If I have data like:
 
col_1  col_2
1 10
2 30
3 15
4 20
5 25
 
I need to get col_3 to be the running total of the sum of the previous rows of 
col_2, e.g.
 
col_1  col_2  col_3
1 1010
2 3040
3 1555
4 2075
5 25100
 
Is there a way to achieve this in Spark SQL or maybe with Data frame 
transformations?
 
Thanks in advance,


Stefan Panayotov, PhD 
Home: 610-355-0919 
Cell: 610-517-5586 
email: spanayo...@msn.com 
spanayo...@outlook.com 
spanayo...@comcast.net
  




  

RE: Spark SQL running totals

2015-10-15 Thread java8964
Not sure the windows function can work for his case.
If you do a "sum() over (partitioned by)", that will return a total sum per 
partition, instead of a cumulative sum wanted in this case.
I saw there is a "cume_dis", but no "cume_sum".
Do we really have a "cume_sum" in Spark window function, or am I total 
misunderstand about "sum() over (partitioned by)" in it?
Yong

From: mich...@databricks.com
Date: Thu, 15 Oct 2015 11:51:59 -0700
Subject: Re: Spark SQL running totals
To: deenar.toras...@gmail.com
CC: spanayo...@msn.com; user@spark.apache.org

Check out: 
https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
On Thu, Oct 15, 2015 at 11:35 AM, Deenar Toraskar  
wrote:
you can do a self join of the table with itself with the join clause being 
a.col1 >= b.col1
select a.col1, a.col2, sum(b.col2)from tablea as a left outer join tablea as b 
on (a.col1 >= b.col1)group by a.col1, a.col2
I havent tried it, but cant see why it cant work, but doing it in RDD might be 
more efficient see 
https://bzhangusc.wordpress.com/2014/06/21/calculate-running-sums/
On 15 October 2015 at 18:48, Stefan Panayotov  wrote:



Hi,
 
I need help with Spark SQL. I need to achieve something like the following.
If I have data like:
 
col_1  col_2
1 10
2 30
3 15
4 20
5 25
 
I need to get col_3 to be the running total of the sum of the previous rows of 
col_2, e.g.
 
col_1  col_2  col_3
1 1010
2 3040
3 1555
4 2075
5 25100
 
Is there a way to achieve this in Spark SQL or maybe with Data frame 
transformations?
 
Thanks in advance,


Stefan Panayotov, PhD 
Home: 610-355-0919 
Cell: 610-517-5586 
email: spanayo...@msn.com 
spanayo...@outlook.com 
spanayo...@comcast.net
  



  

Re: Problem installing Sparck on Windows 8

2015-10-15 Thread Marco Mistroni
Hi
 i t ried to set this variable in my windows env variables but got same
result
this si the result of calling set in my command prompt
have i amended it in the wrong place?

kr
 marco
..
USERDOMAIN=MarcoLaptop
USERDOMAIN_ROAMINGPROFILE=MarcoLaptop
USERNAME=marco
USERPROFILE=C:\Users\marco
windir=C:\Windows
_JAVA_OPTIONS=-Djava.net.preferIPv4Stack=true


On Thu, Oct 15, 2015 at 1:25 AM, Raghavendra Pandey <
raghavendra.pan...@gmail.com> wrote:

> Looks like you are facing ipv6 issue. Can you try using preferIPv4
> property on.
> On Oct 15, 2015 2:10 AM, "Steve Loughran"  wrote:
>
>>
>> On 14 Oct 2015, at 20:56, Marco Mistroni  wrote:
>>
>>
>> 15/10/14 20:52:35 WARN : Your hostname, MarcoLaptop resolves to a
>> loopback/non-r
>> eachable address: fe80:0:0:0:c5ed:a66d:9d95:5caa%wlan2, but we couldn't
>> find any
>>  external IP address!
>> java.lang.RuntimeException: java.lang.RuntimeException: The root scratch
>> dir: /t
>> mp/hive on HDFS should be writable. Current permissions are: -
>> at
>> org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.jav
>> a:522)
>> at
>> org.apache.spark.sql.hive.client.ClientWrapper.(ClientWrapper.s
>> cala:171)
>> at
>> org.apache.spark.sql.hive.HiveContext.executionHive$lzycompute(HiveCo
>>
>>
>> now, that I haven't seen. Looks like it thinks the permissions are wrong,
>> doesn't it?
>>
>


Re: Spark 1.5 Streaming and Kinesis

2015-10-15 Thread Jean-Baptiste Onofré

Hi Phil,

sorry I didn't have time to investigate yesterday (I was on a couple of 
other Apache projects ;)). I will try to do it today. I keep you posted.


Regards
JB

On 10/16/2015 07:21 AM, Phil Kallos wrote:

JB,

To clarify, you are able to run the Amazon Kinesis example provided in
the spark examples dir?

bin/run-example streaming.KinesisWordCountASL [app name] [stream name]
[endpoint url] ?

If it helps, below are the steps I used to build spark

mvn -Pyarn -Pkinesis-asl -Phadoop-2.6 -DskipTests clean package

And I did this with revision 4f894dd6906311cb57add6757690069a18078783
(v.1.5.1)

Thanks,
Phil


On Thu, Oct 15, 2015 at 2:31 AM, Eugen Cepoi > wrote:

So running it using spark-submit doesnt change anything, it still works.

When reading the code

https://github.com/apache/spark/blob/branch-1.5/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala#L100
it looks like the receivers are definitely being ser/de. I think
this is the issue, need to find a way to confirm that now...

2015-10-15 16:12 GMT+07:00 Eugen Cepoi >:

Hey,

A quick update on other things that have been tested.

When looking at the compiled code of the
spark-streaming-kinesis-asl jar everything looks normal (there
is a class that implements SyncMap and it is used inside the
receiver).
Starting a spark shell and using introspection to instantiate a
receiver and check that blockIdToSeqNumRanges implements SyncMap
works too. So obviously it has the correct type according to that.

Another thing to test could be to do the same introspection
stuff but inside a spark job to make sure it is not a problem in
the way the jobs are run.
The other idea would be that this is a problem related to
ser/de. For example if the receiver was being serialized and
then deserialized it could definitely happen depending on the
lib used and its configuration that it just doesn't preserve the
concrete type. So it would deserialize using the compile type
instead of the runtime type.

Cheers,
Eugen


2015-10-15 13:41 GMT+07:00 Jean-Baptiste Onofré >:

Thanks for the update Phil.

I'm preparing a environment to reproduce it.

I keep you posted.

Thanks again,
Regards
JB

On 10/15/2015 08:36 AM, Phil Kallos wrote:

Not a dumb question, but yes I updated all of the
library references to
1.5, including  (even tried 1.5.1).

// Versions.spark set elsewhere to "1.5.0"
"org.apache.spark" %% "spark-streaming-kinesis-asl" %
Versions.spark %
"provided"

I am experiencing the issue in my own spark project, but
also when I try
to run the spark streaming kinesis example that comes in
spark/examples

Tried running the streaming job locally, and also in EMR
with release
4.1.0 that includes Spark 1.5

Very strange!

 -- Forwarded message --

 From: "Jean-Baptiste Onofré"  >>
 To: user@spark.apache.org

>

 Cc:
 Date: Thu, 15 Oct 2015 08:03:55 +0200
 Subject: Re: Spark 1.5 Streaming and Kinesis
 Hi Phil,
 KinesisReceiver is part of extra. Just a dumb
question: did you
 update all, including the Spark Kinesis extra
containing the
 KinesisReceiver ?
 I checked on tag v1.5.0, and at line 175 of the
KinesisReceiver, we see:
 blockIdToSeqNumRanges.clear()
 which is a:
 private val blockIdToSeqNumRanges = new
 mutable.HashMap[StreamBlockId, SequenceNumberRanges]
  with mutable.SynchronizedMap[StreamBlockId,
SequenceNumberRanges]
 So, it doesn't look fully correct to me.
 Let me investigate a bit this morning.
 Regards
 JB
 On 10/15/2015 07:49 AM, Phil Kallos wrote:
 We are trying to migrate from 

Get the previous state string in Spark streaming

2015-10-15 Thread Chandra Mohan, Ananda Vel Murugan
One of my co-worker(Yogesh) was trying to get this posted in spark mailing and 
it seems it did not get posted. So I am reposting it here. Please help.





Hi,

I am new to Spark and was trying to do some experiments with it.



I had a JavaPairDStream RDD.

I want to get the list of string from its previous state. For that I use 
updateStateByKey function as follows:



final Function2, Optional> 
updateFunc =

   new Function2,

Optional>() {



public Optional call(List arg0, 
Optional arg1) throws Exception {

// TODO Auto-generated method stub

if(arg1.toString()==null)

   return Optional.of(arg0);

else {

   arg0.add(arg1.toString());

   return Optional.of(arg0);

}

   }

};



I want the function to append the new list of string to the previous list and 
return the new list. But I am not able to do so. I am getting the " 
java.lang.UnsupportedOperationException" error.

Can anyone which help me out in getting the desired output?



Is Feature Transformations supported by Spark export to PMML

2015-10-15 Thread Weiwei Zhang
Hi Folks,

I am trying to find out if the Spark export to PMML has support for feature
transformations. I know in R, I need to specify local transformations and
attributes using the "pmml" and "pmmlTransformation" libraries. The example
I read on Spark, simply apply "toPMML" function and it generates an XML
file. If I want to do some transformations to independent variables, such
as taking log, adding interaction terms and etc, will the "toPMML" function
take care of these transformations automatically as well?

Thank you.

Weiwei


Re: s3a file system and spark deployment mode

2015-10-15 Thread Raghavendra Pandey
You can use spark 1.5.1 with no hadoop and hadoop 2.7.1..
Hadoop 2.7.1 is more mature for s3a access. You also need to set hadoop
tools dir into hadoop classpath...

Raghav
On Oct 16, 2015 1:09 AM, "Scott Reynolds"  wrote:

> We do not use EMR. This is deployed on Amazon VMs
>
> We build Spark with Hadoop-2.6.0 but that does not include the s3a
> filesystem nor the Amazon AWS SDK
>
> On Thu, Oct 15, 2015 at 12:26 PM, Spark Newbie 
> wrote:
>
>> Are you using EMR?
>> You can install Hadoop-2.6.0 along with Spark-1.5.1 in your EMR cluster.
>> And that brings s3a jars to the worker nodes and it becomes available to
>> your application.
>>
>> On Thu, Oct 15, 2015 at 11:04 AM, Scott Reynolds 
>> wrote:
>>
>>> List,
>>>
>>> Right now we build our spark jobs with the s3a hadoop client. We do this
>>> because our machines are only allowed to use IAM access to the s3 store. We
>>> can build our jars with the s3a filesystem and the aws sdk just fine and
>>> this jars run great in *client mode*.
>>>
>>> We would like to move from client mode to cluster mode as that will
>>> allow us to be more resilient to driver failure. In order to do this either:
>>> 1. the jar file has to be on worker's local disk
>>> 2. the jar file is in shared storage (s3a)
>>>
>>> We would like to put the jar file in s3 storage, but when we give the
>>> jar path as s3a://.., the worker node doesn't have the hadoop s3a and
>>> aws sdk in its classpath / uber jar.
>>>
>>> Other then building spark with those two dependencies, what other
>>> options do I have ? We are using 1.5.1 so SPARK_CLASSPATH is no longer a
>>> thing.
>>>
>>> Need to get s3a access to both the master (so that we can log spark
>>> event log to s3) and to the worker processes (driver, executor).
>>>
>>> Looking for ideas before just adding the dependencies to our spark build
>>> and calling it a day.
>>>
>>
>>
>


Re: Spark 1.5 Streaming and Kinesis

2015-10-15 Thread Phil Kallos
JB,

To clarify, you are able to run the Amazon Kinesis example provided in the
spark examples dir?

bin/run-example streaming.KinesisWordCountASL [app name] [stream name]
[endpoint url] ?

If it helps, below are the steps I used to build spark

mvn -Pyarn -Pkinesis-asl -Phadoop-2.6 -DskipTests clean package

And I did this with revision 4f894dd6906311cb57add6757690069a18078783
(v.1.5.1)

Thanks,
Phil


On Thu, Oct 15, 2015 at 2:31 AM, Eugen Cepoi  wrote:

> So running it using spark-submit doesnt change anything, it still works.
>
> When reading the code
> https://github.com/apache/spark/blob/branch-1.5/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala#L100
> it looks like the receivers are definitely being ser/de. I think this is
> the issue, need to find a way to confirm that now...
>
> 2015-10-15 16:12 GMT+07:00 Eugen Cepoi :
>
>> Hey,
>>
>> A quick update on other things that have been tested.
>>
>> When looking at the compiled code of the spark-streaming-kinesis-asl jar
>> everything looks normal (there is a class that implements SyncMap and it is
>> used inside the receiver).
>> Starting a spark shell and using introspection to instantiate a receiver
>> and check that blockIdToSeqNumRanges implements SyncMap works too. So
>> obviously it has the correct type according to that.
>>
>> Another thing to test could be to do the same introspection stuff but
>> inside a spark job to make sure it is not a problem in the way the jobs are
>> run.
>> The other idea would be that this is a problem related to ser/de. For
>> example if the receiver was being serialized and then deserialized it could
>> definitely happen depending on the lib used and its configuration that it
>> just doesn't preserve the concrete type. So it would deserialize using the
>> compile type instead of the runtime type.
>>
>> Cheers,
>> Eugen
>>
>>
>> 2015-10-15 13:41 GMT+07:00 Jean-Baptiste Onofré :
>>
>>> Thanks for the update Phil.
>>>
>>> I'm preparing a environment to reproduce it.
>>>
>>> I keep you posted.
>>>
>>> Thanks again,
>>> Regards
>>> JB
>>>
>>> On 10/15/2015 08:36 AM, Phil Kallos wrote:
>>>
 Not a dumb question, but yes I updated all of the library references to
 1.5, including  (even tried 1.5.1).

 // Versions.spark set elsewhere to "1.5.0"
 "org.apache.spark" %% "spark-streaming-kinesis-asl" % Versions.spark %
 "provided"

 I am experiencing the issue in my own spark project, but also when I try
 to run the spark streaming kinesis example that comes in spark/examples

 Tried running the streaming job locally, and also in EMR with release
 4.1.0 that includes Spark 1.5

 Very strange!

 -- Forwarded message --

 From: "Jean-Baptiste Onofré" >> j...@nanthrax.net>>
 To: user@spark.apache.org 

 Cc:
 Date: Thu, 15 Oct 2015 08:03:55 +0200
 Subject: Re: Spark 1.5 Streaming and Kinesis
 Hi Phil,
 KinesisReceiver is part of extra. Just a dumb question: did you
 update all, including the Spark Kinesis extra containing the
 KinesisReceiver ?
 I checked on tag v1.5.0, and at line 175 of the KinesisReceiver, we
 see:
 blockIdToSeqNumRanges.clear()
 which is a:
 private val blockIdToSeqNumRanges = new
 mutable.HashMap[StreamBlockId, SequenceNumberRanges]
  with mutable.SynchronizedMap[StreamBlockId,
 SequenceNumberRanges]
 So, it doesn't look fully correct to me.
 Let me investigate a bit this morning.
 Regards
 JB
 On 10/15/2015 07:49 AM, Phil Kallos wrote:
 We are trying to migrate from Spark1.4 to Spark1.5 for our Kinesis
 streaming applications, to take advantage of the new Kinesis
 checkpointing improvements in 1.5.
 However after upgrading, we are consistently seeing the following
 error:
 java.lang.ClassCastException: scala.collection.mutable.HashMap
 cannot be
 cast to scala.collection.mutable.SynchronizedMap
 at

 org.apache.spark.streaming.kinesis.KinesisReceiver.onStart(KinesisReceiver.scala:175)
 at

 org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
 at

 org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
 at

 org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:542)
 at

 org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:532)
 at

 org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
 at

 

Turn off logs in spark-sql shell

2015-10-15 Thread Muhammad Ahsan
Hello Everyone!

I want to know how to turn off logging during starting *spark-sql shell*
without changing log4j configuration files. In normal spark-shell I can use
the following commands

import org.apache.log4j.Loggerimport org.apache.log4j.Level
Logger.getLogger("org").setLevel(Level.OFF)Logger.getLogger("akka").setLevel(Level.OFF)


Thanks

-- 
Thanks

Muhammad Ahsan


Application not found in Spark historyserver in yarn-client mode

2015-10-15 Thread Anfernee Xu
Sorry, I have to re-send it again as I did not get the answer.

Here's the problem I'm facing, I'm using Spark 1.5.0 release, I have a
standalone java application which is periodically submit Spark jobs to my
yarn cluster, btw I'm not using 'spark-submit' or
'org.apache.spark.launcher' to submit my jobs. These jobs are successful
and I can see them on Yarn RM webUI, but when I want to follow the link to
the app history on Spark historyserver, I always got 404(application is not
found) from Spark historyserver.

My code looks likes as below


SparkConf conf = new
SparkConf().setAppName("testSpak").setMaster("yarn-client")
.setJars(new String[]{IOUtil.getJar(MySparkApp.class)});

conf.set("spark.yarn.historyServer.address", "10.247.44.155:18080");
conf.set("spark.history.fs.logDirectory",
"
hdfs://myHdfsNameNode:55310/scratch/tie/spark/applicationHistory");

JavaSparkContext sc = new JavaSparkContext(conf);

try {

 ... my application code

}finally{
  sc.stop();

}

Anything I did wrong or missed? Do I need to configure something on Yarn
side?

Thanks
-- 
--Anfernee


Re:

2015-10-15 Thread Dirceu Semighini Filho
Hi Anfemee,
Subject in the email sometimes help ;)
Have you seen if the link is sending you to a hostname that is not
accessible by your workstation? Sometimes changing the hostname to the ip
solve this kind of issue.



2015-10-15 13:34 GMT-03:00 Anfernee Xu :

> Sorry, I have to re-send it again as I did not get the answer.
>
> Here's the problem I'm facing, I have a standalone java application which
> is periodically submit Spark jobs to my yarn cluster, btw I'm not using
> 'spark-submit' or 'org.apache.spark.launcher' to submit my jobs. These jobs
> are successful and I can see them on Yarn RM webUI, but when I want to
> follow the link to the app history on Spark historyserver, I always got
> 404(application is not found) from Spark historyserver.
>
> My code looks likes as below
>
>
> SparkConf conf = new
> SparkConf().setAppName("testSpak").setMaster("yarn-client")
> .setJars(new String[]{IOUtil.getJar(MySparkApp.class)});
>
> conf.set("spark.yarn.historyServer.address", "10.247.44.155:18080");
> conf.set("spark.history.fs.logDirectory",
> "
> hdfs://myHdfsNameNode:55310/scratch/tie/spark/applicationHistory");
>
> JavaSparkContext sc = new JavaSparkContext(conf);
>
> try {
>
>  ... my application code
>
> }finally{
>   sc.stop();
>
> }
>
> Anything I did wrong or missed? Do I need to configure something on Yarn
> side?
>
> Thanks
>
> --
> --Anfernee
>


[no subject]

2015-10-15 Thread Anfernee Xu
Sorry, I have to re-send it again as I did not get the answer.

Here's the problem I'm facing, I have a standalone java application which
is periodically submit Spark jobs to my yarn cluster, btw I'm not using
'spark-submit' or 'org.apache.spark.launcher' to submit my jobs. These jobs
are successful and I can see them on Yarn RM webUI, but when I want to
follow the link to the app history on Spark historyserver, I always got
404(application is not found) from Spark historyserver.

My code looks likes as below


SparkConf conf = new
SparkConf().setAppName("testSpak").setMaster("yarn-client")
.setJars(new String[]{IOUtil.getJar(MySparkApp.class)});

conf.set("spark.yarn.historyServer.address", "10.247.44.155:18080");
conf.set("spark.history.fs.logDirectory",
"
hdfs://myHdfsNameNode:55310/scratch/tie/spark/applicationHistory");

JavaSparkContext sc = new JavaSparkContext(conf);

try {

 ... my application code

}finally{
  sc.stop();

}

Anything I did wrong or missed? Do I need to configure something on Yarn
side?

Thanks

-- 
--Anfernee


word2vec cosineSimilarity

2015-10-15 Thread Arthur Chan
Hi,

I am trying sample word2vec  from
http://spark.apache.org/docs/latest/mllib-feature-extraction.html#example

Following are my test results:

scala> for((synonym, cosineSimilarity) <- synonyms) {
 |   println(s"$synonym $cosineSimilarity")
 | }
taiwan 2.0518918365726297
japan 1.8960962308732054
korea 1.8789320149319788
thailand 1.7549218525671182
mongolia 1.7375501108635814


I got the values cosineSimilarity are all greater than 1,  should the
cosineSimilarity be the values between 0 to 1?

How can I get the values of Similarity in 0 to 1?

Regards


Complex transformation on a dataframe column

2015-10-15 Thread Hao Wang
Hi,

I have searched around but could not find a satisfying answer to this question: 
what is the best way to do a complex transformation on a dataframe column?

For example, I have a dataframe with the following schema and a function that 
has pretty complex logic to format addresses. I would like to use the function 
to format each address and store the output as an additional column in the 
dataframe. What is the best way to do it? Use Dataframe.map? Define a UDF? Some 
code example would be appreciated.

Input dataframe:
root
 |-- ID: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- PhoneNumber: string (nullable = true)
 |-- Address: string (nullable = true)

Output dataframe:
root
 |-- ID: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- PhoneNumber: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- FormattedAddress: string (nullable = true)

The function for format addresses:
def formatAddress(address: String): String


Best regards,
Hao Wang

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SQL Context error in 1.5.1 - any work around ?

2015-10-15 Thread Richard Hillegas
A crude workaround may be to run your spark shell with a sudo command.

Hope this helps,
Rick Hillegas


Sourav Mazumder  wrote on 10/15/2015 09:59:02
AM:

> From: Sourav Mazumder 
> To: user 
> Date: 10/15/2015 09:59 AM
> Subject: SQL Context error in 1.5.1 - any work around ?
>
> I keep on getting this error whenever I'm starting spark-shell : The
> root scratch dir: /tmp/hive on HDFS should be writable. Current
> permissions are: rwx--.

> I cannot work with this if I need to do anything with sqlContext as
> that does not get created.
>
> I could see that a bug is raised for this https://issues.apache.org/
> jira/browse/SPARK-10066.

> However, is there any work around for this.

> I didn't face this problem in 1.4.1

> Regards,
> Sourav

SQL Context error in 1.5.1 - any work around ?

2015-10-15 Thread Sourav Mazumder
I keep on getting this error whenever I'm starting spark-shell : The root
scratch dir: /tmp/hive on HDFS should be writable. Current permissions are:
rwx--.

I cannot work with this if I need to do anything with sqlContext as that
does not get created.

I could see that a bug is raised for this
https://issues.apache.org/jira/browse/SPARK-10066.

However, is there any work around for this.

I didn't face this problem in 1.4.1

Regards,
Sourav


Re: SPARK SQL Error

2015-10-15 Thread pnpritchard
Going back to your code, I see that you instantiate the spark context as:
  val sc = new SparkContext(args(0), "Csv loading example")
which will set the master url to "args(0)" and app name to "Csv loading
example". In your case, args(0) is
"hdfs://quickstart.cloudera:8020/people_csv", which obviously is not the
master url, so that is why you are getting the error.

There are two ways to fix this:
1. Add master url to the command line args:
spark-submit --master yarn --class org.spark.apache.CsvDataSource
/home/cloudera/Desktop/TestMain.jar yarn
hdfs://quickstart.cloudera:8020/people_csv

2. Use the no arg SparkContext constructor
I would recommend this since you are using spark-submit, which can set the
master url and app name properties. You would have to change your code as
"val sc = new SparkContext()" use the "--name" option for spark-submit.
Also, you would have to change your code for setting the csv file path using
"arg(0)" (since there is only one command line argument, indexed from 0).
spark-submit --master yarn --name "Csv loading example" --class
org.spark.apache.CsvDataSource /home/cloudera/Desktop/TestMain.jar
hdfs://quickstart.cloudera:8020/people_csv

Lastly, if you look at this documentation:
http://spark.apache.org/docs/latest/submitting-applications.html#master-urls,
"yarn" is not a valid master url. It looks like you need to use
"yarn-client" or "yarn-cluster". Unfortunately, I do not have experience
using yarn, so can't help you there. Here is more documentation for yarn you
can read: http://spark.apache.org/docs/latest/running-on-yarn.html.

-Nick



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SPARK-SQL-Error-tp25050p25078.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.5 java.net.ConnectException: Connection refused

2015-10-15 Thread legolasluk
In case of job being aborted does the application fail, if not then how do you 
make it fail?  How should streaming application recover without data loss?

How does this apply to zero data loss behavior for Kinesis receivers in Spark 
1.5?

Thanks,
Ashish 

 Original message 
From: "Tathagata Das" 
Date: Oct 14, 2015 1:28 PM
Subject: Re: Spark 1.5 java.net.ConnectException: Connection refused
To: "Spark Newbie" 
Cc: "user" , "Shixiong (Ryan) Zhu" 


When a job gets aborted, it means that the internal tasks were retried a number 
of times before the system gave up. You can control the number retries (see 
Spark's configuration page). The job by default does not get resubmitted.

You could try getting the logs of the failed executor, to see what caused the 
failure. Could be a memory limit issue, and YARN killing it somehow. 



On Wed, Oct 14, 2015 at 11:05 AM, Spark Newbie  
wrote:
Is it slowing things down or blocking progress.
>> I didn't see slowing of processing, but I do see jobs aborted consecutively 
>> for a period of 18 batches (5 minute batch intervals). So I am worried about 
>> what happened to the records that these jobs were processing. 
Also, one more thing to mention is that the 
StreamingListenerBatchCompleted.numRecords information shows all received 
records as processed even if the batch/job failed. The processing time as well 
shows as the same time it takes for a successful batch.  
It seems like it is the numRecords which was the input to the batch regardless 
of whether they were successfully processed or not.

On Wed, Oct 14, 2015 at 11:01 AM, Spark Newbie  
wrote:
I ran 2 different spark 1.5 clusters that have been running for more than a day 
now. I do see jobs getting aborted due to task retry's maxing out (default 4) 
due to ConnectionException. It seems like the executors die and get restarted 
and I was unable to find the root cause (same app code and conf used on spark 
1.4.1 I don't see ConnectionException). 

Another question related to this, what happens to the kinesis records received 
when Job gets aborted? In Spark-1.5 and kinesis-asl-1.5 (which I am using) does 
the job gets resubmitted with the same received records? Or does the 
kinesis-asl library get those records again based on sequence numbers it 
tracks? It would good for me to understand the story around lossless processing 
of kinesis records in Spark-1.5 + kinesis-asl-1.5 when jobs are aborted. Any 
pointers or quick explanation would be very helpful.


On Tue, Oct 13, 2015 at 4:04 PM, Tathagata Das  wrote:
Is this happening too often? Is it slowing things down or blocking progress. 
Failures once in a while is part of the norm, and the system should take care 
of itself.

On Tue, Oct 13, 2015 at 2:47 PM, Spark Newbie  wrote:
Hi Spark users,

I'm seeing the below exception in my spark streaming application. It happens in 
the first stage where the kinesis receivers receive records and perform a 
flatMap operation on the unioned Dstream. A coalesce step also happens as a 
part of that stage for optimizing the performance. 

This is happening on my spark 1.5 instance using kinesis-asl-1.5. When I look 
at the executor logs I do not see any exceptions indicating the root cause of 
why there is no connectivity on xxx.xx.xx.xxx:36684 or when did that service go 
down. 

Any help debugging this problem will be helpful.

15/10/13 16:36:07 ERROR shuffle.RetryingBlockFetcher: Exception while beginning 
fetch of 1 outstanding blocks
java.io.IOException: Failed to connect to 
ip-xxx-xx-xx-xxx.ec2.internal/xxx.xx.xx.xxx:36684
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193)
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
at 
org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
at 
org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:97)
at 
org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:89)
at 
org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:595)
at 
org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:593)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 

Re: s3a file system and spark deployment mode

2015-10-15 Thread Spark Newbie
Are you using EMR?
You can install Hadoop-2.6.0 along with Spark-1.5.1 in your EMR cluster.
And that brings s3a jars to the worker nodes and it becomes available to
your application.

On Thu, Oct 15, 2015 at 11:04 AM, Scott Reynolds 
wrote:

> List,
>
> Right now we build our spark jobs with the s3a hadoop client. We do this
> because our machines are only allowed to use IAM access to the s3 store. We
> can build our jars with the s3a filesystem and the aws sdk just fine and
> this jars run great in *client mode*.
>
> We would like to move from client mode to cluster mode as that will allow
> us to be more resilient to driver failure. In order to do this either:
> 1. the jar file has to be on worker's local disk
> 2. the jar file is in shared storage (s3a)
>
> We would like to put the jar file in s3 storage, but when we give the jar
> path as s3a://.., the worker node doesn't have the hadoop s3a and aws
> sdk in its classpath / uber jar.
>
> Other then building spark with those two dependencies, what other options
> do I have ? We are using 1.5.1 so SPARK_CLASSPATH is no longer a thing.
>
> Need to get s3a access to both the master (so that we can log spark event
> log to s3) and to the worker processes (driver, executor).
>
> Looking for ideas before just adding the dependencies to our spark build
> and calling it a day.
>


Re: s3a file system and spark deployment mode

2015-10-15 Thread Scott Reynolds
We do not use EMR. This is deployed on Amazon VMs

We build Spark with Hadoop-2.6.0 but that does not include the s3a
filesystem nor the Amazon AWS SDK

On Thu, Oct 15, 2015 at 12:26 PM, Spark Newbie 
wrote:

> Are you using EMR?
> You can install Hadoop-2.6.0 along with Spark-1.5.1 in your EMR cluster.
> And that brings s3a jars to the worker nodes and it becomes available to
> your application.
>
> On Thu, Oct 15, 2015 at 11:04 AM, Scott Reynolds 
> wrote:
>
>> List,
>>
>> Right now we build our spark jobs with the s3a hadoop client. We do this
>> because our machines are only allowed to use IAM access to the s3 store. We
>> can build our jars with the s3a filesystem and the aws sdk just fine and
>> this jars run great in *client mode*.
>>
>> We would like to move from client mode to cluster mode as that will allow
>> us to be more resilient to driver failure. In order to do this either:
>> 1. the jar file has to be on worker's local disk
>> 2. the jar file is in shared storage (s3a)
>>
>> We would like to put the jar file in s3 storage, but when we give the jar
>> path as s3a://.., the worker node doesn't have the hadoop s3a and aws
>> sdk in its classpath / uber jar.
>>
>> Other then building spark with those two dependencies, what other options
>> do I have ? We are using 1.5.1 so SPARK_CLASSPATH is no longer a thing.
>>
>> Need to get s3a access to both the master (so that we can log spark event
>> log to s3) and to the worker processes (driver, executor).
>>
>> Looking for ideas before just adding the dependencies to our spark build
>> and calling it a day.
>>
>
>


Re: Spark 1.5 Streaming and Kinesis

2015-10-15 Thread Jean-Baptiste Onofré

Hi Phil,

KinesisReceiver is part of extra. Just a dumb question: did you update 
all, including the Spark Kinesis extra containing the KinesisReceiver ?


I checked on tag v1.5.0, and at line 175 of the KinesisReceiver, we see:

blockIdToSeqNumRanges.clear()

which is a:

private val blockIdToSeqNumRanges = new mutable.HashMap[StreamBlockId, 
SequenceNumberRanges]

with mutable.SynchronizedMap[StreamBlockId, SequenceNumberRanges]

So, it doesn't look fully correct to me.

Let me investigate a bit this morning.

Regards
JB

On 10/15/2015 07:49 AM, Phil Kallos wrote:

Hi,

We are trying to migrate from Spark1.4 to Spark1.5 for our Kinesis
streaming applications, to take advantage of the new Kinesis
checkpointing improvements in 1.5.

However after upgrading, we are consistently seeing the following error:

java.lang.ClassCastException: scala.collection.mutable.HashMap cannot be
cast to scala.collection.mutable.SynchronizedMap
at
org.apache.spark.streaming.kinesis.KinesisReceiver.onStart(KinesisReceiver.scala:175)
at
org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
at
org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
at
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:542)
at
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:532)
at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

I even get this when running the Kinesis examples :
http://spark.apache.org/docs/latest/streaming-kinesis-integration.html with

bin/run-example streaming.KinesisWordCountASL

Am I doing something incorrect?




--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.5 Streaming and Kinesis

2015-10-15 Thread Jean-Baptiste Onofré
By correct, I mean: the map declaration looks good to me, so the 
ClassCastException is weird ;)


I'm trying to reproduce the issue in order to investigate.

Regards
JB

On 10/15/2015 08:03 AM, Jean-Baptiste Onofré wrote:

Hi Phil,

KinesisReceiver is part of extra. Just a dumb question: did you update
all, including the Spark Kinesis extra containing the KinesisReceiver ?

I checked on tag v1.5.0, and at line 175 of the KinesisReceiver, we see:

blockIdToSeqNumRanges.clear()

which is a:

private val blockIdToSeqNumRanges = new mutable.HashMap[StreamBlockId,
SequenceNumberRanges]
 with mutable.SynchronizedMap[StreamBlockId, SequenceNumberRanges]

So, it doesn't look fully correct to me.

Let me investigate a bit this morning.

Regards
JB

On 10/15/2015 07:49 AM, Phil Kallos wrote:

Hi,

We are trying to migrate from Spark1.4 to Spark1.5 for our Kinesis
streaming applications, to take advantage of the new Kinesis
checkpointing improvements in 1.5.

However after upgrading, we are consistently seeing the following error:

java.lang.ClassCastException: scala.collection.mutable.HashMap cannot be
cast to scala.collection.mutable.SynchronizedMap
at
org.apache.spark.streaming.kinesis.KinesisReceiver.onStart(KinesisReceiver.scala:175)

at
org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)

at
org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)

at
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:542)

at
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:532)

at
org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
at
org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)

I even get this when running the Kinesis examples :
http://spark.apache.org/docs/latest/streaming-kinesis-integration.html
with

bin/run-example streaming.KinesisWordCountASL

Am I doing something incorrect?






--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark UI consuming lots of memory

2015-10-15 Thread Nicholas Pritchard
Thanks for your help, most likely this is the memory leak you are fixing in
https://issues.apache.org/jira/browse/SPARK-11126.
-Nick

On Mon, Oct 12, 2015 at 9:00 PM, Shixiong Zhu  wrote:

> In addition, you cannot turn off JobListener and SQLListener now...
>
> Best Regards,
> Shixiong Zhu
>
> 2015-10-13 11:59 GMT+08:00 Shixiong Zhu :
>
>> Is your query very complicated? Could you provide the output of `explain`
>> your query that consumes an excessive amount of memory? If this is a small
>> query, there may be a bug that leaks memory in SQLListener.
>>
>> Best Regards,
>> Shixiong Zhu
>>
>> 2015-10-13 11:44 GMT+08:00 Nicholas Pritchard <
>> nicholas.pritch...@falkonry.com>:
>>
>>> As an update, I did try disabling the ui with "spark.ui.enabled=false",
>>> but the JobListener and SQLListener still consume a lot of memory, leading
>>> to OOM error. Has anyone encountered this before? Is the only solution just
>>> to increase the driver heap size?
>>>
>>> Thanks,
>>> Nick
>>>
>>> On Mon, Oct 12, 2015 at 8:42 PM, Nicholas Pritchard <
>>> nicholas.pritch...@falkonry.com> wrote:
>>>
 I set those configurations by passing to spark-submit script:
 "bin/spark-submit --conf spark.ui.retainedJobs=20 ...". I have verified
 that these configurations are being passed correctly because they are
 listed in the environments tab and also by counting the number of
 job/stages that are listed. The "spark.sql.ui.retainedExecutions=0"
 only applies to the number of "completed" executions; there will always be
 a "running" execution. For some reason, I have one execution that consumes
 an excessive amount of memory.

 Actually, I am not interested in the SQL UI, as I find the Job/Stages
 UI to have sufficient information. I am also using Spark Standalone cluster
 manager so have not had to use the history server.


 On Mon, Oct 12, 2015 at 8:17 PM, Shixiong Zhu 
 wrote:

> Could you show how did you set the configurations? You need to set
> these configurations before creating SparkContext and SQLContext.
>
> Moreover, the history sever doesn't support SQL UI. So
> "spark.eventLog.enabled=true" doesn't work now.
>
> Best Regards,
> Shixiong Zhu
>
> 2015-10-13 2:01 GMT+08:00 pnpritchard  >:
>
>> Hi,
>>
>> In my application, the Spark UI is consuming a lot of memory,
>> especially the
>> SQL tab. I have set the following configurations to reduce the memory
>> consumption:
>> - spark.ui.retainedJobs=20
>> - spark.ui.retainedStages=40
>> - spark.sql.ui.retainedExecutions=0
>>
>> However, I still get OOM errors in the driver process with the
>> default 1GB
>> heap size. The following link is a screen shot of a heap dump report,
>> showing the SQLListener instance having a retained size of 600MB.
>>
>> https://cloud.githubusercontent.com/assets/5124612/10404379/20fbdcfc-6e87-11e5-9415-27e25193a25c.png
>>
>> Rather than just increasing the allotted heap size, does anyone have
>> any
>> other ideas? Is it possible to disable the SQL tab specifically? I
>> also
>> thought about serving the UI from disk rather than memory with
>> "spark.eventLog.enabled=true" and "spark.ui.enabled=false". Has
>> anyone tried
>> this before?
>>
>> Thanks,
>> Nick
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-UI-consuming-lots-of-memory-tp25033.html
>> Sent from the Apache Spark User List mailing list archive at
>> Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>

>>>
>>
>


Spark 1.5.1 ThriftServer

2015-10-15 Thread Dirceu Semighini Filho
Hello,
I'm trying to migrate to scala 2.11 and I didn't found a spark-thriftserver
jar for scala 2.11 in maven repository.
I could a manual build (without tests) the spark with thriftserver in scala
2.11.
Sometime ago the thrift server build wasn't enabled by default, but I can
find a 2.10 jar for thrift server.
Is there any problem with thriftserver in scala 2.11?


multiple pyspark instances simultaneously (same time)

2015-10-15 Thread jeff.sadow...@gmail.com
I am having issues trying to setup spark to run jobs simultaneously.

I thought I wanted FAIR scheduling?

I used the templated fairscheduler.xml as is when I start pyspark I see the
3 expected pools:
production, test, and default

when I login as second user and run pyspark
I see the expected pools as that user as well

when I open up a webbrowser to http://master:8080

I see my first user's state is running and my second user's state is waiting

so I try putting them both in the production pool which is fair scheduler

When I refresh http://master:8080

the second user's status is still waiting.

If I try to run something as the second user I get

"Initial job has not accepted any resources"

Maybe fair queuing is not what I want?

I'm starting pyspark as follows

pyspark --master spark://master:7077

I started spark as follows

start-all.sh



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/multiple-pyspark-instances-simultaneously-same-time-tp25079.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



s3a file system and spark deployment mode

2015-10-15 Thread Scott Reynolds
List,

Right now we build our spark jobs with the s3a hadoop client. We do this
because our machines are only allowed to use IAM access to the s3 store. We
can build our jars with the s3a filesystem and the aws sdk just fine and
this jars run great in *client mode*.

We would like to move from client mode to cluster mode as that will allow
us to be more resilient to driver failure. In order to do this either:
1. the jar file has to be on worker's local disk
2. the jar file is in shared storage (s3a)

We would like to put the jar file in s3 storage, but when we give the jar
path as s3a://.., the worker node doesn't have the hadoop s3a and aws
sdk in its classpath / uber jar.

Other then building spark with those two dependencies, what other options
do I have ? We are using 1.5.1 so SPARK_CLASSPATH is no longer a thing.

Need to get s3a access to both the master (so that we can log spark event
log to s3) and to the worker processes (driver, executor).

Looking for ideas before just adding the dependencies to our spark build
and calling it a day.


Re: Spark SQL running totals

2015-10-15 Thread Deenar Toraskar
you can do a self join of the table with itself with the join clause being
a.col1 >= b.col1

select a.col1, a.col2, sum(b.col2)
from tablea as a left outer join tablea as b on (a.col1 >= b.col1)
group by a.col1, a.col2

I havent tried it, but cant see why it cant work, but doing it in RDD might
be more efficient see
https://bzhangusc.wordpress.com/2014/06/21/calculate-running-sums/

On 15 October 2015 at 18:48, Stefan Panayotov  wrote:

> Hi,
>
> I need help with Spark SQL. I need to achieve something like the following.
> If I have data like:
>
> col_1  col_2
> 1 10
> 2 30
> 3 15
> 4 20
> 5 25
>
> I need to get col_3 to be the running total of the sum of the previous
> rows of col_2, e.g.
>
> col_1  col_2  col_3
> 1 1010
> 2 3040
> 3 1555
> 4 2075
> 5 25100
>
> Is there a way to achieve this in Spark SQL or maybe with Data frame
> transformations?
>
> Thanks in advance,
>
>
> *Stefan Panayotov, PhD **Home*: 610-355-0919
> *Cell*: 610-517-5586
> *email*: spanayo...@msn.com
> spanayo...@outlook.com
> spanayo...@comcast.net
>
>


Spark SQL running totals

2015-10-15 Thread Stefan Panayotov
Hi,
 
I need help with Spark SQL. I need to achieve something like the following.
If I have data like:
 
col_1  col_2
1 10
2 30
3 15
4 20
5 25
 
I need to get col_3 to be the running total of the sum of the previous rows of 
col_2, e.g.
 
col_1  col_2  col_3
1 1010
2 3040
3 1555
4 2075
5 25100
 
Is there a way to achieve this in Spark SQL or maybe with Data frame 
transformations?
 
Thanks in advance,


Stefan Panayotov, PhD 
Home: 610-355-0919 
Cell: 610-517-5586 
email: spanayo...@msn.com 
spanayo...@outlook.com 
spanayo...@comcast.net
  

Re: Spark SQL running totals

2015-10-15 Thread Michael Armbrust
Check out:
https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

On Thu, Oct 15, 2015 at 11:35 AM, Deenar Toraskar  wrote:

> you can do a self join of the table with itself with the join clause being
> a.col1 >= b.col1
>
> select a.col1, a.col2, sum(b.col2)
> from tablea as a left outer join tablea as b on (a.col1 >= b.col1)
> group by a.col1, a.col2
>
> I havent tried it, but cant see why it cant work, but doing it in RDD
> might be more efficient see
> https://bzhangusc.wordpress.com/2014/06/21/calculate-running-sums/
>
> On 15 October 2015 at 18:48, Stefan Panayotov  wrote:
>
>> Hi,
>>
>> I need help with Spark SQL. I need to achieve something like the
>> following.
>> If I have data like:
>>
>> col_1  col_2
>> 1 10
>> 2 30
>> 3 15
>> 4 20
>> 5 25
>>
>> I need to get col_3 to be the running total of the sum of the previous
>> rows of col_2, e.g.
>>
>> col_1  col_2  col_3
>> 1 1010
>> 2 3040
>> 3 1555
>> 4 2075
>> 5 25100
>>
>> Is there a way to achieve this in Spark SQL or maybe with Data frame
>> transformations?
>>
>> Thanks in advance,
>>
>>
>> *Stefan Panayotov, PhD **Home*: 610-355-0919
>> *Cell*: 610-517-5586
>> *email*: spanayo...@msn.com
>> spanayo...@outlook.com
>> spanayo...@comcast.net
>>
>>
>
>


Re: Spark SQL running totals

2015-10-15 Thread Kristina Rogale Plazonic
You can do it and many other transformations very easily with window
functions, see this blog post:

https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

In your case you would do (in Scala):

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{functions => func}

val wo = Window.orderBy("col1")
val newdf = df.withColumn("col3", func.sum("col2").over(wo))

The true power of this is when you have to do a running total by a
particular, say, user (in a different column)! :)

HTH,
Kristina

On Thu, Oct 15, 2015 at 1:48 PM, Stefan Panayotov 
wrote:

> Hi,
>
> I need help with Spark SQL. I need to achieve something like the following.
> If I have data like:
>
> col_1  col_2
> 1 10
> 2 30
> 3 15
> 4 20
> 5 25
>
> I need to get col_3 to be the running total of the sum of the previous
> rows of col_2, e.g.
>
> col_1  col_2  col_3
> 1 1010
> 2 3040
> 3 1555
> 4 2075
> 5 25100
>
> Is there a way to achieve this in Spark SQL or maybe with Data frame
> transformations?
>
> Thanks in advance,
>
>
> *Stefan Panayotov, PhD **Home*: 610-355-0919
> *Cell*: 610-517-5586
> *email*: spanayo...@msn.com
> spanayo...@outlook.com
> spanayo...@comcast.net
>
>


Re: Spark 1.5 java.net.ConnectException: Connection refused

2015-10-15 Thread Spark Newbie
What is the best way to fail the application when job gets aborted?

On Wed, Oct 14, 2015 at 1:27 PM, Tathagata Das  wrote:

> When a job gets aborted, it means that the internal tasks were retried a
> number of times before the system gave up. You can control the number
> retries (see Spark's configuration page). The job by default does not get
> resubmitted.
>
> You could try getting the logs of the failed executor, to see what caused
> the failure. Could be a memory limit issue, and YARN killing it somehow.
>
>
>
> On Wed, Oct 14, 2015 at 11:05 AM, Spark Newbie 
> wrote:
>
>> Is it slowing things down or blocking progress.
>> >> I didn't see slowing of processing, but I do see jobs aborted
>> consecutively for a period of 18 batches (5 minute batch intervals). So I
>> am worried about what happened to the records that these jobs were
>> processing.
>> Also, one more thing to mention is that the
>> StreamingListenerBatchCompleted.numRecords information shows all
>> received records as processed even if the batch/job failed. The processing
>> time as well shows as the same time it takes for a successful batch.
>> It seems like it is the numRecords which was the input to the batch
>> regardless of whether they were successfully processed or not.
>>
>> On Wed, Oct 14, 2015 at 11:01 AM, Spark Newbie > > wrote:
>>
>>> I ran 2 different spark 1.5 clusters that have been running for more
>>> than a day now. I do see jobs getting aborted due to task retry's maxing
>>> out (default 4) due to ConnectionException. It seems like the executors die
>>> and get restarted and I was unable to find the root cause (same app code
>>> and conf used on spark 1.4.1 I don't see ConnectionException).
>>>
>>> Another question related to this, what happens to the kinesis records
>>> received when Job gets aborted? In Spark-1.5 and kinesis-asl-1.5 (which I
>>> am using) does the job gets resubmitted with the same received records? Or
>>> does the kinesis-asl library get those records again based on sequence
>>> numbers it tracks? It would good for me to understand the story around
>>> lossless processing of kinesis records in Spark-1.5 + kinesis-asl-1.5 when
>>> jobs are aborted. Any pointers or quick explanation would be very helpful.
>>>
>>>
>>> On Tue, Oct 13, 2015 at 4:04 PM, Tathagata Das 
>>> wrote:
>>>
 Is this happening too often? Is it slowing things down or blocking
 progress. Failures once in a while is part of the norm, and the system
 should take care of itself.

 On Tue, Oct 13, 2015 at 2:47 PM, Spark Newbie <
 sparknewbie1...@gmail.com> wrote:

> Hi Spark users,
>
> I'm seeing the below exception in my spark streaming application. It
> happens in the first stage where the kinesis receivers receive records and
> perform a flatMap operation on the unioned Dstream. A coalesce step also
> happens as a part of that stage for optimizing the performance.
>
> This is happening on my spark 1.5 instance using kinesis-asl-1.5. When
> I look at the executor logs I do not see any exceptions indicating the 
> root
> cause of why there is no connectivity on xxx.xx.xx.xxx:36684 or when did
> that service go down.
>
> Any help debugging this problem will be helpful.
>
> 15/10/13 16:36:07 ERROR shuffle.RetryingBlockFetcher: Exception while
> beginning fetch of 1 outstanding blocks
> java.io.IOException: Failed to connect to
> ip-xxx-xx-xx-xxx.ec2.internal/xxx.xx.xx.xxx:36684
> at
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193)
> at
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
> at
> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88)
> at
> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
> at
> org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
> at
> org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:97)
> at
> org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:89)
> at
> org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:595)
> at
> org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:593)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:593)