Re: Spark ec2 lunch problem

2015-08-24 Thread Robin East
spark-ec2 is the way to go however you may need to debug connectivity issues. 
For example do you know that the servers were correctly setup in AWS and can 
you access each node using ssh? If no then you need to work out why (it’s not a 
spark issue). If yes then you will need to work out why ssh via the spark-ec2 
script is not working.

I’ve used spark-ec2 successfully many times but have never used the —vpc-id and 
—subnet-id options and that may be the source of your problems, especially 
since it appears to be a hostname resolution issue. If you could confirm the 
above questions then maybe someone on the list can help diagnose the specific 
problem.


---
Robin East
Spark GraphX in Action Michael Malak and Robin East
Manning Publications Co.
http://www.manning.com/malak/ http://www.manning.com/malak/

 On 24 Aug 2015, at 13:45, Garry Chen g...@cornell.edu wrote:
 
 So what is the best way to deploy spark cluster in EC2 environment any 
 suggestions?
  
 Garry
  
 From: Akhil Das [mailto:ak...@sigmoidanalytics.com 
 mailto:ak...@sigmoidanalytics.com] 
 Sent: Friday, August 21, 2015 4:27 PM
 To: Garry Chen g...@cornell.edu mailto:g...@cornell.edu
 Cc: user@spark.apache.org mailto:user@spark.apache.org
 Subject: Re: Spark ec2 lunch problem
  
 It may happen that the version of spark-ec2 script you are using is buggy or 
 sometime AWS have problem provisioning machines.
 
 On Aug 21, 2015 7:56 AM, Garry Chen g...@cornell.edu 
 mailto:g...@cornell.edu wrote:
 Hi All,
 I am trying to lunch a spark ec2 cluster by running  
 spark-ec2 --key-pair=key --identity-file=my.pem --vpc-id=myvpc 
 --subnet-id=subnet-011 --spark-version=1.4.1 launch spark-cluster but getting 
 following message endless.  Please help.
  
  
 Warning: SSH connection error. (This could be temporary.)
 Host:
 SSH return code: 255
 SSH output: ssh: Could not resolve hostname : Name or service not known



How to evaluate custom UDF over window

2015-08-24 Thread xander92
The ultimate aim of my program is to be able to wrap an arbitrary Scala
function (mostly will be statistics / customized rolling window metrics) in
a UDF and evaluate them on DataFrames using the window functionality.

So my main question is how do I express that a UDF takes a Frame of rows
from a DataFrame as an argument instead of just a single row? And what sort
of arguments would the arbitrary Scala function need to take in order to
handle the raw data from the Frame?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-evaluate-custom-UDF-over-window-tp24419.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Loading already existing tables in spark shell

2015-08-24 Thread Ishwardeep Singh
Hi Jeetendra,


I faced this issue. I did not specify the database where this table exists. 
Please set the database by using use database command before executing the 
query.


Regards,

Ishwardeep



From: Jeetendra Gangele gangele...@gmail.com
Sent: Monday, August 24, 2015 5:47 PM
To: user
Subject: Loading already existing tables in spark shell

Hi All I have few tables in hive and I wanted to run query against them with 
spark as execution engine.

Can I direct;y load these tables in spark shell and run query?

I tried with
1.val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
2.qlContext.sql(FROM event_impressions select count(*)) where 
event_impressions is the table name.

It give me error saying org.apache.spark.sql.AnalysisException: no such table 
event_impressions; line 1 pos 5

Does anybody hit similar issues?


regards
jeetendra








NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.


Performance - Python streaming v/s Scala streaming

2015-08-24 Thread utk.pat
I am new to SPARK streaming. I was running the kafka_wordcount example with
a local KAFKA and SPARK instance. It was very easy to set this up and get
going :)I tried running both SCALA and Python versions of the word count
example. Python versions seems to be extremely slow. Sometimes it has delays
of more than couple of minutes. On the other hand SCALA versions seems to be
way better. I am running on a windows machine.I am trying to understand what
is the cause slowness in python streaming? Is there anything that I am
missing? For real time streaming analysis should I prefer SCALA?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Performance-Python-streaming-v-s-Scala-streaming-tp24415.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Joining using mulitimap or array

2015-08-24 Thread Ilya Karpov
Thanks,
but I think this is not the case of multiple spark contexts (never the less I 
tried your suggestion - didn’t worked). The problem is join to datasets using 
array items value: attribute.value in my case. Has anyone ideas?


 24 авг. 2015 г., в 15:01, satish chandra j jsatishchan...@gmail.com 
 написал(а):
 
 Hi,
 If you join logic is correct, it seems to be a similar issue which i faced 
 recently
 
 Can you try by 
 SparkContext(conf).set(spark.driver.allowMultipleContexts,true)
 
 Regards,
 Satish Chandra
 
 On Mon, Aug 24, 2015 at 2:51 PM, Ilya Karpov i.kar...@cleverdata.ru 
 mailto:i.kar...@cleverdata.ru wrote:
 Hi, guys
 I'm confused about joining columns in SparkSQL and need your advice.
 I want to join 2 datasets of profiles. Each profile has name and array of 
 attributes(age, gender, email etc).
 There can be mutliple instances of attribute with the same name, e.g. profile 
 has 2 emails - so 2 attributes with name = 'email' in
 array. Now I want to join 2 datasets using 'email' attribute. I cant find the 
 way to do it :(
 
 The code is below. Now result of join is empty, while I expect to see 1 row 
 with all Alice emails.
 
 import org.apache.spark.sql.{DataFrame, SQLContext}
 import org.apache.spark.{SparkConf, SparkContext}
 
 case class Attribute(name: String, value: String, weight: Float)
 case class Profile(name: String, attributes: Seq[Attribute])
 
 object SparkJoinArrayColumn {
   def main(args: Array[String]) {
 val sc: SparkContext = new SparkContext(new 
 SparkConf().setMaster(local).setAppName(getClass.getSimpleName))
 val sqlContext: SQLContext = new SQLContext(sc)
 
 import sqlContext.implicits._
 
 val a: DataFrame = sc.parallelize(Seq(
   Profile(Alice, Seq(Attribute(email, al...@mail.com 
 mailto:al...@mail.com, 1.0f), Attribute(email, a.jo...@mail.com 
 mailto:a.jo...@mail.com, 1.0f)))
 )).toDF.as(a)
 
 val b: DataFrame = sc.parallelize(Seq(
   Profile(Alice, Seq(Attribute(email, al...@mail.com 
 mailto:al...@mail.com, 1.0f), Attribute(age, 29, 0.2f)))
 )).toDF.as(b)
 
 
 a.where($a.attributes.name http://a.attributes.name/ === email)
   .join(
 b.where($b.attributes.name http://b.attributes.name/ === email),
 $a.attributes.value === $b.attributes.value
   )
 .show()
   }
 }
 
 Thanks forward!
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
 mailto:user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org 
 mailto:user-h...@spark.apache.org
 
 



Re: Joining using mulitimap or array

2015-08-24 Thread satish chandra j
Hi,
If you join logic is correct, it seems to be a similar issue which i faced
recently

Can you try by
*SparkContext(conf).set(spark.driver.allowMultipleContexts,true)*

Regards,
Satish Chandra

On Mon, Aug 24, 2015 at 2:51 PM, Ilya Karpov i.kar...@cleverdata.ru wrote:

 Hi, guys
 I'm confused about joining columns in SparkSQL and need your advice.
 I want to join 2 datasets of profiles. Each profile has name and array of
 attributes(age, gender, email etc).
 There can be mutliple instances of attribute with the same name, e.g.
 profile has 2 emails - so 2 attributes with name = 'email' in
 array. Now I want to join 2 datasets using 'email' attribute. I cant find
 the way to do it :(

 The code is below. Now result of join is empty, while I expect to see 1
 row with all Alice emails.

 import org.apache.spark.sql.{DataFrame, SQLContext}
 import org.apache.spark.{SparkConf, SparkContext}

 case class Attribute(name: String, value: String, weight: Float)
 case class Profile(name: String, attributes: Seq[Attribute])

 object SparkJoinArrayColumn {
   def main(args: Array[String]) {
 val sc: SparkContext = new SparkContext(new
 SparkConf().setMaster(local).setAppName(getClass.getSimpleName))
 val sqlContext: SQLContext = new SQLContext(sc)

 import sqlContext.implicits._

 val a: DataFrame = sc.parallelize(Seq(
   Profile(Alice, Seq(Attribute(email, al...@mail.com, 1.0f),
 Attribute(email, a.jo...@mail.com, 1.0f)))
 )).toDF.as(a)

 val b: DataFrame = sc.parallelize(Seq(
   Profile(Alice, Seq(Attribute(email, al...@mail.com, 1.0f),
 Attribute(age, 29, 0.2f)))
 )).toDF.as(b)


 a.where($a.attributes.name === email)
   .join(
 b.where($b.attributes.name === email),
 $a.attributes.value === $b.attributes.value
   )
 .show()
   }
 }

 Thanks forward!
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Difficulties developing a Specs2 matcher for Spark Streaming

2015-08-24 Thread Juan Rodríguez Hortalá
Hi,

I've had some troubles developing a Specs2 matcher that checks that a
predicate holds for all the elements of an RDD, and using it for testing a
simple Spark Streaming program. I've finally been able to get a code that
works, you can see it in https://gist.github.com/juanrh/dffd060e3a371676b83c,
but I wanted to check with the list that I'm using the right approach.
First I defined the matcher as follows:

def foreachRecord[T](predicate : T = Boolean) : Matcher[RDD[T]] = {  (rdd
: RDD[T]) =

  val failingRecords = rdd.filter(! predicate(_))

  (

failingRecords.isEmpty,

each record fulfils the predicate,

spredicate failed for records ${failingRecords.take(4).mkString(,
)} ...

  )

}

which works ok for examples like

def simpleBatchTest = {

val rdd = sc.parallelize(1 to 100, 3)

rdd should foreachRecord(_  0)

  }


The problem started when I tried to use it to check that a predicate holds
for all batches / RDDs of a DStream. The idea was using foreachRDD to
update a driver local org.specs2.execute.Result object, to make an and for
all the batches:

def simpleStreamingTest : Result = {

val ssc = new StreamingContext(sc, Duration(300))

val record = hola

val batches = Seq.fill(5)(Seq.fill(10)(record))

val queue = new Queue[RDD[String]]

queue ++= batches.map(batch = sc.parallelize(batch, numSlices = 2))

val inputDStream = ssc.queueStream(queue, oneAtATime = true)

var result : Result = ok

inputDStream.foreachRDD { rdd =

   val r = AsResult { rdd should foreachRecord(_ == record) }
   result = result and r

}



ssc.start()

StreamingContextUtils.awaitForNBatchesCompleted(batches.length)(ssc)

ssc.stop(stopSparkContext=false, stopGracefully=false)

println(sresult : ${result})

result

  }


This leads to an exception running the test, because AsResult { rdd should
foreachRecord(_ == record) } fails because the closure argument of
foreachRecord needs to access the local variable 'record', and to do that
it tries to serialize the whole context, including 'result', for which it
fails with:

 Driver stacktrace:,org.apache.spark.SparkException: Job aborted
due to stage failure:

 Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task
0.0 in stage 0.0 (TID 0, localhost):

java.io.InvalidClassException: org.specs2.execute.Success; no valid
constructor


To solve this I followed
http://erikerlandson.github.io/blog/2015/03/31/hygienic-closures-for-scala-function-serialization/
to specify more carefully which values would be accessed by the closure,
defining the matcher

 def foreachRecord[T,C](predicateContext : C)(toPredicate : C = (T =
Boolean)) : Matcher[RDD[T]] = {

  val predicate = toPredicate(predicateContext)

  foreachRecord(predicate)

}


and then replacing the call above with

  val r = AsResult { rdd should foreachRecord(record)(r = { _ == r} )
}


I'm writing to the list because I wanted to conform that this is a proper
solution, and wanted to ask you guys if somebody can imagine a better
solution, as this is not too bad but the call foreachRecord(record)(r = {
_ == r} ) is still a bit ugly. It is curious that Spark's closure cleaner
is smart enough to avoid sending 'result' to the closure in this very
similar example:

  val r2 = AsResult { rdd.filter(_ != record).count === 0 }

Also, I wanted to confirm that the approach of updating a var in driver in
a foreachRDD is a good idea, I understand that foreachRDD runs in the
driver, so that should be ok as long as the local variable is updated using
action results only.

Thanks a lot in advance.

Greetings,

Juan


RE: Spark ec2 lunch problem

2015-08-24 Thread Garry Chen
So what is the best way to deploy spark cluster in EC2 environment any 
suggestions?

Garry

From: Akhil Das [mailto:ak...@sigmoidanalytics.com]
Sent: Friday, August 21, 2015 4:27 PM
To: Garry Chen g...@cornell.edu
Cc: user@spark.apache.org
Subject: Re: Spark ec2 lunch problem


It may happen that the version of spark-ec2 script you are using is buggy or 
sometime AWS have problem provisioning machines.
On Aug 21, 2015 7:56 AM, Garry Chen 
g...@cornell.edumailto:g...@cornell.edu wrote:
Hi All,
I am trying to lunch a spark ec2 cluster by running  spark-ec2 
--key-pair=key --identity-file=my.pem --vpc-id=myvpc --subnet-id=subnet-011 
--spark-version=1.4.1 launch spark-cluster but getting following message 
endless.  Please help.


Warning: SSH connection error. (This could be temporary.)
Host:
SSH return code: 255
SSH output: ssh: Could not resolve hostname : Name or service not known


Re: Joining using mulitimap or array

2015-08-24 Thread Hemant Bhanawat
In your example, a.attributes.name is a list and is not a string . Run this
to find it out :

a.select($a.attributes.name).show()


On Mon, Aug 24, 2015 at 2:51 PM, Ilya Karpov i.kar...@cleverdata.ru wrote:

 Hi, guys
 I'm confused about joining columns in SparkSQL and need your advice.
 I want to join 2 datasets of profiles. Each profile has name and array of
 attributes(age, gender, email etc).
 There can be mutliple instances of attribute with the same name, e.g.
 profile has 2 emails - so 2 attributes with name = 'email' in
 array. Now I want to join 2 datasets using 'email' attribute. I cant find
 the way to do it :(

 The code is below. Now result of join is empty, while I expect to see 1
 row with all Alice emails.

 import org.apache.spark.sql.{DataFrame, SQLContext}
 import org.apache.spark.{SparkConf, SparkContext}

 case class Attribute(name: String, value: String, weight: Float)
 case class Profile(name: String, attributes: Seq[Attribute])

 object SparkJoinArrayColumn {
   def main(args: Array[String]) {
 val sc: SparkContext = new SparkContext(new
 SparkConf().setMaster(local).setAppName(getClass.getSimpleName))
 val sqlContext: SQLContext = new SQLContext(sc)

 import sqlContext.implicits._

 val a: DataFrame = sc.parallelize(Seq(
   Profile(Alice, Seq(Attribute(email, al...@mail.com, 1.0f),
 Attribute(email, a.jo...@mail.com, 1.0f)))
 )).toDF.as(a)

 val b: DataFrame = sc.parallelize(Seq(
   Profile(Alice, Seq(Attribute(email, al...@mail.com, 1.0f),
 Attribute(age, 29, 0.2f)))
 )).toDF.as(b)


 a.where($a.attributes.name === email)
   .join(
 b.where($b.attributes.name === email),
 $a.attributes.value === $b.attributes.value
   )
 .show()
   }
 }

 Thanks forward!
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




RE: DataFrame#show cost 2 Spark Jobs ?

2015-08-24 Thread Cheng, Hao
The first job is to infer the json schema, and the second one is what you mean 
of the query.
You can provide the schema while loading the json file, like below:

sqlContext.read.schema(xxx).json(“…”)?

Hao
From: Jeff Zhang [mailto:zjf...@gmail.com]
Sent: Monday, August 24, 2015 6:20 PM
To: user@spark.apache.org
Subject: DataFrame#show cost 2 Spark Jobs ?

It's weird to me that the simple show function will cost 2 spark jobs. 
DataFrame#explain shows it is a very simple operation, not sure why need 2 jobs.

== Parsed Logical Plan ==
Relation[age#0L,name#1] 
JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json]

== Analyzed Logical Plan ==
age: bigint, name: string
Relation[age#0L,name#1] 
JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json]

== Optimized Logical Plan ==
Relation[age#0L,name#1] 
JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json]

== Physical Plan ==
Scan 
JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json][age#0L,name#1]



--
Best Regards

Jeff Zhang


Re: How to set environment of worker applications

2015-08-24 Thread Raghavendra Pandey
System properties and environment variables are two different things.. One
can use spark.executor.extraJavaOptions to pass system properties and
spark-env.sh to pass environment variables.

-raghav

On Mon, Aug 24, 2015 at 1:00 PM, Hemant Bhanawat hemant9...@gmail.com
wrote:

 That's surprising. Passing the environment variables using
 spark.executor.extraJavaOptions=-Dmyenvvar=xxx to the executor and then
 fetching them using System.getProperty(myenvvar) has worked for me.

 What is the error that you guys got?

 On Mon, Aug 24, 2015 at 12:10 AM, Sathish Kumaran Vairavelu 
 vsathishkuma...@gmail.com wrote:

 spark-env.sh works for me in Spark 1.4 but not
 spark.executor.extraJavaOptions.

 On Sun, Aug 23, 2015 at 11:27 AM Raghavendra Pandey 
 raghavendra.pan...@gmail.com wrote:

 I think the only way to pass on environment variables to worker node is
 to write it in spark-env.sh file on each worker node.

 On Sun, Aug 23, 2015 at 8:16 PM, Hemant Bhanawat hemant9...@gmail.com
 wrote:

 Check for spark.driver.extraJavaOptions and
 spark.executor.extraJavaOptions in the following article. I think you can
 use -D to pass system vars:

 spark.apache.org/docs/latest/configuration.html#runtime-environment
 Hi,

 I am starting a spark streaming job in standalone mode with
 spark-submit.

 Is there a way to make the UNIX environment variables with which
 spark-submit is started available to the processes started on the worker
 nodes?

 Jan
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






RE: Loading already existing tables in spark shell

2015-08-24 Thread Cheng, Hao
And be sure the hive-site.xml is under the classpath or under the path of 
$SPARK_HOME/conf

Hao

From: Ishwardeep Singh [mailto:ishwardeep.si...@impetus.co.in]
Sent: Monday, August 24, 2015 8:57 PM
To: user
Subject: Re: Loading already existing tables in spark shell


Hi Jeetendra,



I faced this issue. I did not specify the database where this table exists. 
Please set the database by using use database command before executing the 
query.



Regards,

Ishwardeep


From: Jeetendra Gangele gangele...@gmail.commailto:gangele...@gmail.com
Sent: Monday, August 24, 2015 5:47 PM
To: user
Subject: Loading already existing tables in spark shell

Hi All I have few tables in hive and I wanted to run query against them with 
spark as execution engine.

Can I direct;y load these tables in spark shell and run query?

I tried with
1.val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
2.qlContext.sql(FROM event_impressions select count(*)) where 
event_impressions is the table name.

It give me error saying org.apache.spark.sql.AnalysisException: no such table 
event_impressions; line 1 pos 5

Does anybody hit similar issues?


regards
jeetendra








NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.


DataFrame/JDBC very slow performance

2015-08-24 Thread Dhaval Patel
I am trying to access a mid-size Teradata table (~100 million rows) via
JDBC in standalone mode on a single node (local[*]). When I tried with BIG
table (5B records) then no results returned upon completion of query.

I am using Spark 1.4.1. and is setup on a very powerful machine(2 cpu, 24
cores, 126G RAM).

I have tried several memory setup and tuning options to make it work
faster, but neither of them made a huge impact.

I am sure there is something I am missing and below is my final try that
took about 11 minutes to get this simple counts vs it only took 40 seconds
using a JDBC connection through R to get the counts.


bin/pyspark --driver-memory 40g --executor-memory 40g

df = sqlContext.read.jdbc(jdbc:teradata://..)
df.count()


[image: Inline image 1]


Re: Got wrong md5sum for boto

2015-08-24 Thread Justin Pihony
Additional info...If I use an online md5sum check then it matches...So,
it's either windows or python (using 2.7.10)

On Mon, Aug 24, 2015 at 11:54 AM, Justin Pihony justin.pih...@gmail.com
wrote:

 When running the spark_ec2.py script, I'm getting a wrong md5sum. I've now
 seen this on two different machines. I am running on windows, but I would
 imagine that shouldn't affect the md5. Is this a boto problem, python
 problem, spark problem?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Got-wrong-md5sum-for-boto-tp24420.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: How to evaluate custom UDF over window

2015-08-24 Thread Yin Huai
For now, user-defined window function is not supported. We will add it in
future.

On Mon, Aug 24, 2015 at 6:26 AM, xander92 alexander.fra...@ompnt.com
wrote:

 The ultimate aim of my program is to be able to wrap an arbitrary Scala
 function (mostly will be statistics / customized rolling window metrics) in
 a UDF and evaluate them on DataFrames using the window functionality.

 So my main question is how do I express that a UDF takes a Frame of rows
 from a DataFrame as an argument instead of just a single row? And what sort
 of arguments would the arbitrary Scala function need to take in order to
 handle the raw data from the Frame?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-evaluate-custom-UDF-over-window-tp24419.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Got wrong md5sum for boto

2015-08-24 Thread Justin Pihony
I found this solution:
https://stackoverflow.com/questions/3390484/python-hashlib-md5-differs-between-linux-windows

Does anybody see a reason why I shouldn't put in a PR to make this change?

FROM
with open(tgz_file_path) as tar:

TO
with open(tgz_file_path, rb) as tar:

On Mon, Aug 24, 2015 at 11:58 AM, Justin Pihony justin.pih...@gmail.com
wrote:

 Additional info...If I use an online md5sum check then it matches...So,
 it's either windows or python (using 2.7.10)

 On Mon, Aug 24, 2015 at 11:54 AM, Justin Pihony justin.pih...@gmail.com
 wrote:

 When running the spark_ec2.py script, I'm getting a wrong md5sum. I've now
 seen this on two different machines. I am running on windows, but I would
 imagine that shouldn't affect the md5. Is this a boto problem, python
 problem, spark problem?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Got-wrong-md5sum-for-boto-tp24420.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Drop table and Hive warehouse

2015-08-24 Thread Michael Armbrust
Thats not the expected behavior.  What version of Spark?

On Mon, Aug 24, 2015 at 1:32 AM, Kevin Jung itsjb.j...@samsung.com wrote:

 When I store DataFrame as table with command saveAsTable and then
 execute DROP TABLE in SparkSQL, it doesn't actually delete files in hive
 warehouse.
 The table disappears from a table list but the data files are still alive.
 Because of this, I can't saveAsTable with a same name before dropping
 table.
 Is it a normal situation? If it is, I will delete files manually ;)

 Kevin



 상기 메일은 지정된 수신인만을 위한 것이며, 부정경쟁방지 및 영업비밀보호에 관한 법률,개인정보 보호법을 포함하여
  관련 법령에 따라 보호의 대상이 되는 영업비밀, 산업기술,기밀정보, 개인정보 등을 포함하고 있을 수 있습니다.
 본 문서에 포함된 정보의 전부 또는 일부를 무단으로 복사 또는 사용하거나 제3자에게 공개, 배포, 제공하는 것은 엄격히
  금지됩니다. 본 메일이 잘못 전송된 경우 발신인 또는 당사에게 알려주시고 본 메일을 즉시 삭제하여 주시기 바랍니다.
 The contents of this e-mail message and any attachments are confidential
 and are intended solely for addressee.
  The information may also be legally privileged. This transmission is sent
 in trust, for the sole purpose of delivery
  to the intended recipient. If you have received this transmission in
 error, any use, reproduction or dissemination of
  this transmission is strictly prohibited. If you are not the intended
 recipient, please immediately notify the sender
  by reply e-mail or phone and delete this message and its attachments, if
 any.


Got wrong md5sum for boto

2015-08-24 Thread Justin Pihony
When running the spark_ec2.py script, I'm getting a wrong md5sum. I've now
seen this on two different machines. I am running on windows, but I would
imagine that shouldn't affect the md5. Is this a boto problem, python
problem, spark problem?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Got-wrong-md5sum-for-boto-tp24420.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [Spark Streaming on Mesos (good practices)]

2015-08-24 Thread Aram Mkrtchyan
Here is the answer to my question if somebody needs it

   Running Spark in Standalone mode or coarse-grained Mesos mode leads to
better task launch times than the fine-grained Mesos mode.

The resource is
http://spark.apache.org/docs/latest/streaming-programming-guide.html

On Mon, Aug 24, 2015 at 12:15 PM, Aram Mkrtchyan 
aram.mkrtchyan...@gmail.com wrote:

 which are the best practices to submit spark streaming application on
 mesos.
 I would like to know about scheduler mode.
 Is `coarse-grained` mode right solution?

 Thanks



Re: Unable to catch SparkContext methods exceptions

2015-08-24 Thread Burak Yavuz
textFile is a lazy operation. It doesn't evaluate until you call an action
on it, such as .count(). Therefore, you won't catch the exception there.

Best,
Burak

On Mon, Aug 24, 2015 at 9:09 AM, Roberto Coluccio 
roberto.coluc...@gmail.com wrote:

 Hello folks,

 I'm experiencing an unexpected behaviour, that suggests me thinking about
 my missing notions on how Spark works. Let's say I have a Spark driver that
 invokes a function like:

 - in myDriver -

 val sparkContext = new SparkContext(mySparkConf)
 val inputPath = file://home/myUser/project/resources/date=*/*

 val myResult = new MyResultFunction()(sparkContext, inputPath)

 - in MyResultFunctionOverRDD --

 class MyResultFunction extends Function2[SparkContext, String,
 RDD[String]] with Serializable {
   override def apply(sparkContext: SparkContext, inputPath: String):
 RDD[String] = {
 try {
   sparkContext.textFile(inputPath, 1)
 } catch {
   case t: Throwable = {
 myLogger.error(serror: ${t.getStackTraceString}\n)
 sc.makeRDD(Seq[String]())
   }
 }
   }
 }

 What happens is that I'm *unable to catch exceptions* thrown by the
 textFile method within the try..catch clause in MyResultFunction. In
 fact, in a unit test for that function where I call it passing an invalid
 inputPath, I don't get an empty RDD as result, but the unit test exits
 (and fails) due to exception not handled.

 What am I missing here?

 Thank you.

 Best regards,
 Roberto



Re: Spark Direct Streaming With ZK Updates

2015-08-24 Thread suchenzang
Forgot to include the PR I was referencing:
https://github.com/apache/spark/pull/4805/



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Direct-Streaming-With-ZK-Updates-tp24423p24424.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Direct Streaming With ZK Updates

2015-08-24 Thread Cody Koeninger
It doesn't matter if shuffling occurs.  Just update ZK from the driver,
inside the foreachRDD, after all your dynamodb updates are done.  Since
you're just doing it for monitoring purposes, that should be fine.


On Mon, Aug 24, 2015 at 12:11 PM, suchenzang suchenz...@gmail.com wrote:

 Forgot to include the PR I was referencing:
 https://github.com/apache/spark/pull/4805/



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Direct-Streaming-With-ZK-Updates-tp24423p24424.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Kafka Spark Partition Mapping

2015-08-24 Thread Syed, Nehal (Contractor)
Dear Cody,
Thanks for your response, I am trying to do decoration which means when a 
message comes from Kafka (partitioned by key) in to the Spark I want to add 
more fields/data to it.
How Does normally people do it in Spark? If it were you how would you decorate 
message without hitting database for every message?

Our current strategy is,  decoration data comes from local in Memory Cache 
(Guava LoadingCache) and/or from SQL DB if not in cache.  If we take this 
approach we want cached decoration data available locally to RDDs most of the 
time.
Our Kafka and Spark run on separate machines and thats why I just wants 
kafka-partition to go to same Spark RDD partition most of time so I can 
utilized cached decoration Data.

Do you think if I Create JdbcRDD for décorarion data and join it with 
JavaPairReceiverInputDStream it will always stays where JdbcRDD lives?

Nehal

From: Cody Koeninger c...@koeninger.orgmailto:c...@koeninger.org
Date: Thursday, August 20, 2015 at 6:33 PM
To: Microsoft Office User 
nehal_s...@cable.comcast.commailto:nehal_s...@cable.comcast.com
Cc: user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: Kafka Spark Partition Mapping

In general you cannot guarantee which node an RDD will be processed on.

The preferred location for a kafkardd is the kafka leader for that partition, 
if they're deployed on the same machine. If you want to try to override that 
behavior, the method is getPreferredLocations

But even in that case, location preferences are just a scheduler hint, the rdd 
can still be scheduled elsewhere.  You can turn up spark.locality.wait to a 
very high value to decrease the likelihood.



On Thu, Aug 20, 2015 at 5:47 PM, nehalsyed 
nehal_s...@cable.comcast.commailto:nehal_s...@cable.comcast.com wrote:
I have data in Kafka topic-partition and I am reading it from Spark like this: 
JavaPairReceiverInputDStreamString, String directKafkaStream = 
KafkaUtils.createDirectStream(streamingContext, [key class], [value class], 
[key decoder class], [value decoder class], [map of Kafka parameters], [set of 
topics to consume]); I want that message from a kafka partition always land on 
same machine on Spark rdd so I can cache some decoration data locally and later 
reuse with other messages (that belong to same key). Can anyone tell me how can 
I achieve it? Thanks

View this message in context: Kafka Spark Partition 
Mappinghttp://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Spark-Partition-Mapping-tp24372.html
Sent from the Apache Spark User List mailing list 
archivehttp://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.



Re: Unable to catch SparkContext methods exceptions

2015-08-24 Thread Burak Yavuz
The laziness is hard to deal with in these situations. I would suggest
trying to handle expected cases FileNotFound, etc using other methods
before even starting a Spark job. If you really want to try.catch a
specific portion of a Spark job, one way is to just follow it with an
action. You can even call persist() before the action, so that you can
re-use the rdd.

Best,
Burak

On Mon, Aug 24, 2015 at 10:52 AM, Roberto Coluccio 
roberto.coluc...@gmail.com wrote:

 Hi Burak, thanks for your answer.

 I have a new MyResultFunction()(sparkContext, inputPath).collect in the
 unit test (so to evaluate the actual result), and there I can observe and
 catch the exception. Even considering Spark's laziness, shouldn't I catch
 the exception while occurring in the try..catch statement that encloses the
 textFile invocation?

 Best,
 Roberto


 On Mon, Aug 24, 2015 at 7:38 PM, Burak Yavuz brk...@gmail.com wrote:

 textFile is a lazy operation. It doesn't evaluate until you call an
 action on it, such as .count(). Therefore, you won't catch the exception
 there.

 Best,
 Burak

 On Mon, Aug 24, 2015 at 9:09 AM, Roberto Coluccio 
 roberto.coluc...@gmail.com wrote:

 Hello folks,

 I'm experiencing an unexpected behaviour, that suggests me thinking
 about my missing notions on how Spark works. Let's say I have a Spark
 driver that invokes a function like:

 - in myDriver -

 val sparkContext = new SparkContext(mySparkConf)
 val inputPath = file://home/myUser/project/resources/date=*/*

 val myResult = new MyResultFunction()(sparkContext, inputPath)

 - in MyResultFunctionOverRDD --

 class MyResultFunction extends Function2[SparkContext, String,
 RDD[String]] with Serializable {
   override def apply(sparkContext: SparkContext, inputPath: String):
 RDD[String] = {
 try {
   sparkContext.textFile(inputPath, 1)
 } catch {
   case t: Throwable = {
 myLogger.error(serror: ${t.getStackTraceString}\n)
 sc.makeRDD(Seq[String]())
   }
 }
   }
 }

 What happens is that I'm *unable to catch exceptions* thrown by the
 textFile method within the try..catch clause in MyResultFunction. In
 fact, in a unit test for that function where I call it passing an invalid
 inputPath, I don't get an empty RDD as result, but the unit test exits
 (and fails) due to exception not handled.

 What am I missing here?

 Thank you.

 Best regards,
 Roberto






RE: Spark Sql behaves strangely with tables with a lot of partitions

2015-08-24 Thread Sereday, Scott
Can you please remove me from this distribution list?

(Filling up my inbox too fast)

From: Michael Armbrust [mailto:mich...@databricks.com]
Sent: Monday, August 24, 2015 2:13 PM
To: Philip Weaver philip.wea...@gmail.com
Cc: Jerrick Hoang jerrickho...@gmail.com; Raghavendra Pandey 
raghavendra.pan...@gmail.com; User user@spark.apache.org; Cheng, Hao 
hao.ch...@intel.com
Subject: Re: Spark Sql behaves strangely with tables with a lot of partitions

I think we are mostly bottlenecked at this point by how fast we can make 
listStatus calls to discover the folders.  That said, we are happy to accept 
suggestions or PRs to make this faster.  Perhaps you can describe how your home 
grown partitioning works?

On Sun, Aug 23, 2015 at 7:38 PM, Philip Weaver 
philip.wea...@gmail.commailto:philip.wea...@gmail.com wrote:
1 minute to discover 1000s of partitions -- yes, that is what I have observed. 
And I would assert that is very slow.

On Sun, Aug 23, 2015 at 7:16 PM, Michael Armbrust 
mich...@databricks.commailto:mich...@databricks.com wrote:
We should not be actually scanning all of the data of all of the partitions, 
but we do need to at least list all of the available directories so that we can 
apply your predicates to the actual values that are present when we are 
deciding which files need to be read in a given spark job.  While this is a 
somewhat expensive operation, we do it in parallel and we cache this 
information when you access the same relation more than once.

Can you provide a little more detail about how exactly you are accessing the 
parquet data (are you using sqlContext.read or creating persistent tables in 
the metastore?), and how long it is taking?  It would also be good to know how 
many partitions we are talking about and how much data is in each.  Finally, 
I'd like to see the stacktrace where it is hanging to make sure my above 
assertions are correct.

We have several tables internally that have 1000s of partitions and while it 
takes ~1 minute initially to discover the metadata, after that we are able to 
query the data interactively.



On Sun, Aug 23, 2015 at 2:00 AM, Jerrick Hoang 
jerrickho...@gmail.commailto:jerrickho...@gmail.com wrote:
anybody has any suggestions?

On Fri, Aug 21, 2015 at 3:14 PM, Jerrick Hoang 
jerrickho...@gmail.commailto:jerrickho...@gmail.com wrote:
Is there a workaround without updating Hadoop? Would really appreciate if 
someone can explain what spark is trying to do here and what is an easy way to 
turn this off. Thanks all!

On Fri, Aug 21, 2015 at 11:09 AM, Raghavendra Pandey 
raghavendra.pan...@gmail.commailto:raghavendra.pan...@gmail.com wrote:

Did you try with hadoop version 2.7.1 .. It is known that s3a works really well 
with parquet which is available in 2.7. They fixed lot of issues related to 
metadata reading there...
On Aug 21, 2015 11:24 PM, Jerrick Hoang 
jerrickho...@gmail.commailto:jerrickho...@gmail.com wrote:
@Cheng, Hao : Physical plans show that it got stuck on scanning S3!

(table is partitioned by date_prefix and hour)
explain select count(*) from test_table where date_prefix='20150819' and 
hour='00';

TungstenAggregate(key=[], value=[(count(1),mode=Final,isDistinct=false)]
 TungstenExchange SinglePartition
  TungstenAggregate(key=[], value=[(count(1),mode=Partial,isDistinct=false)]
   Scan ParquetRelation[ .. about 1000 partition paths go here ]

Why does spark have to scan all partitions when the query only concerns with 1 
partitions? Doesn't it defeat the purpose of partitioning?

Thanks!

On Thu, Aug 20, 2015 at 4:12 PM, Philip Weaver 
philip.wea...@gmail.commailto:philip.wea...@gmail.com wrote:
I hadn't heard of spark.sql.sources.partitionDiscovery.enabled before, and I 
couldn't find much information about it online. What does it mean exactly to 
disable it? Are there any negative consequences to disabling it?

On Wed, Aug 19, 2015 at 10:53 PM, Cheng, Hao 
hao.ch...@intel.commailto:hao.ch...@intel.com wrote:
Can you make some more profiling? I am wondering if the driver is busy with 
scanning the HDFS / S3.
Like jstack pid of driver process

And also, it’s will be great if you can paste the physical plan for the simple 
query.

From: Jerrick Hoang 
[mailto:jerrickho...@gmail.commailto:jerrickho...@gmail.com]
Sent: Thursday, August 20, 2015 1:46 PM
To: Cheng, Hao
Cc: Philip Weaver; user
Subject: Re: Spark Sql behaves strangely with tables with a lot of partitions

I cloned from TOT after 1.5.0 cut off. I noticed there were a couple of CLs 
trying to speed up spark sql with tables with a huge number of partitions, I've 
made sure that those CLs are included but it's still very slow

On Wed, Aug 19, 2015 at 10:43 PM, Cheng, Hao 
hao.ch...@intel.commailto:hao.ch...@intel.com wrote:
Yes, you can try set the spark.sql.sources.partitionDiscovery.enabled to false.

BTW, which version are you using?

Hao

From: Jerrick Hoang 
[mailto:jerrickho...@gmail.commailto:jerrickho...@gmail.com]
Sent: Thursday, August 20, 2015 12:16 PM
To: 

Local Spark talking to remote HDFS?

2015-08-24 Thread Dino Fancellu
I have a file in HDFS inside my HortonWorks HDP 2.3_1 VirtualBox VM.

If I go into the guest spark-shell and refer to the file thus, it works fine

  val words=sc.textFile(hdfs:///tmp/people.txt)
  words.count

However if I try to access it from a local Spark app on my Windows host, it
doesn't work

  val conf = new SparkConf().setMaster(local).setAppName(My App)
  val sc = new SparkContext(conf)
  
  val words=sc.textFile(hdfs://localhost:8020/tmp/people.txt)
  words.count

Emits



The port 8020 is open, and if I choose the wrong file name, it will tell me



My pom has

dependency
groupIdorg.apache.spark/groupId
artifactIdspark-core_2.11/artifactId
version1.4.1/version
scopeprovided/scope
/dependency

Am I doing something wrong?

Thanks.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Local-Spark-talking-to-remote-HDFS-tp24425.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Sql behaves strangely with tables with a lot of partitions

2015-08-24 Thread Jerrick Hoang
@Michael: would listStatus calls read the actual parquet footers within the
folders?

On Mon, Aug 24, 2015 at 11:36 AM, Sereday, Scott scott.sere...@nielsen.com
wrote:

 Can you please remove me from this distribution list?



 (Filling up my inbox too fast)



 *From:* Michael Armbrust [mailto:mich...@databricks.com]
 *Sent:* Monday, August 24, 2015 2:13 PM
 *To:* Philip Weaver philip.wea...@gmail.com
 *Cc:* Jerrick Hoang jerrickho...@gmail.com; Raghavendra Pandey 
 raghavendra.pan...@gmail.com; User user@spark.apache.org; Cheng, Hao 
 hao.ch...@intel.com

 *Subject:* Re: Spark Sql behaves strangely with tables with a lot of
 partitions



 I think we are mostly bottlenecked at this point by how fast we can make
 listStatus calls to discover the folders.  That said, we are happy to
 accept suggestions or PRs to make this faster.  Perhaps you can describe
 how your home grown partitioning works?



 On Sun, Aug 23, 2015 at 7:38 PM, Philip Weaver philip.wea...@gmail.com
 wrote:

 1 minute to discover 1000s of partitions -- yes, that is what I have
 observed. And I would assert that is very slow.



 On Sun, Aug 23, 2015 at 7:16 PM, Michael Armbrust mich...@databricks.com
 wrote:

 We should not be actually scanning all of the data of all of the
 partitions, but we do need to at least list all of the available
 directories so that we can apply your predicates to the actual values that
 are present when we are deciding which files need to be read in a given
 spark job.  While this is a somewhat expensive operation, we do it in
 parallel and we cache this information when you access the same relation
 more than once.



 Can you provide a little more detail about how exactly you are accessing
 the parquet data (are you using sqlContext.read or creating persistent
 tables in the metastore?), and how long it is taking?  It would also be
 good to know how many partitions we are talking about and how much data is
 in each.  Finally, I'd like to see the stacktrace where it is hanging to
 make sure my above assertions are correct.



 We have several tables internally that have 1000s of partitions and while
 it takes ~1 minute initially to discover the metadata, after that we are
 able to query the data interactively.







 On Sun, Aug 23, 2015 at 2:00 AM, Jerrick Hoang jerrickho...@gmail.com
 wrote:

 anybody has any suggestions?



 On Fri, Aug 21, 2015 at 3:14 PM, Jerrick Hoang jerrickho...@gmail.com
 wrote:

 Is there a workaround without updating Hadoop? Would really appreciate if
 someone can explain what spark is trying to do here and what is an easy way
 to turn this off. Thanks all!



 On Fri, Aug 21, 2015 at 11:09 AM, Raghavendra Pandey 
 raghavendra.pan...@gmail.com wrote:

 Did you try with hadoop version 2.7.1 .. It is known that s3a works really
 well with parquet which is available in 2.7. They fixed lot of issues
 related to metadata reading there...

 On Aug 21, 2015 11:24 PM, Jerrick Hoang jerrickho...@gmail.com wrote:

 @Cheng, Hao : Physical plans show that it got stuck on scanning S3!



 (table is partitioned by date_prefix and hour)

 explain select count(*) from test_table where date_prefix='20150819' and
 hour='00';



 TungstenAggregate(key=[], value=[(count(1),mode=Final,isDistinct=false)]

  TungstenExchange SinglePartition

   TungstenAggregate(key=[],
 value=[(count(1),mode=Partial,isDistinct=false)]

Scan ParquetRelation[ .. about 1000 partition paths go here ]



 Why does spark have to scan all partitions when the query only concerns
 with 1 partitions? Doesn't it defeat the purpose of partitioning?



 Thanks!



 On Thu, Aug 20, 2015 at 4:12 PM, Philip Weaver philip.wea...@gmail.com
 wrote:

 I hadn't heard of spark.sql.sources.partitionDiscovery.enabled before, and
 I couldn't find much information about it online. What does it mean exactly
 to disable it? Are there any negative consequences to disabling it?



 On Wed, Aug 19, 2015 at 10:53 PM, Cheng, Hao hao.ch...@intel.com wrote:

 Can you make some more profiling? I am wondering if the driver is busy
 with scanning the HDFS / S3.

 Like jstack pid of driver process



 And also, it’s will be great if you can paste the physical plan for the
 simple query.



 *From:* Jerrick Hoang [mailto:jerrickho...@gmail.com]
 *Sent:* Thursday, August 20, 2015 1:46 PM
 *To:* Cheng, Hao
 *Cc:* Philip Weaver; user
 *Subject:* Re: Spark Sql behaves strangely with tables with a lot of
 partitions



 I cloned from TOT after 1.5.0 cut off. I noticed there were a couple of
 CLs trying to speed up spark sql with tables with a huge number of
 partitions, I've made sure that those CLs are included but it's still very
 slow



 On Wed, Aug 19, 2015 at 10:43 PM, Cheng, Hao hao.ch...@intel.com wrote:

 Yes, you can try set the spark.sql.sources.partitionDiscovery.enabled to
 false.



 BTW, which version are you using?



 Hao



 *From:* Jerrick Hoang [mailto:jerrickho...@gmail.com]
 *Sent:* Thursday, August 20, 2015 12:16 PM
 *To:* Philip 

Re: Spark Sql behaves strangely with tables with a lot of partitions

2015-08-24 Thread Michael Armbrust
No, starting with Spark 1.5 we should by default only be reading the
footers on the executor side (that is unless schema merging has been
explicitly turned on).

On Mon, Aug 24, 2015 at 12:20 PM, Jerrick Hoang jerrickho...@gmail.com
wrote:

 @Michael: would listStatus calls read the actual parquet footers within
 the folders?

 On Mon, Aug 24, 2015 at 11:36 AM, Sereday, Scott 
 scott.sere...@nielsen.com wrote:

 Can you please remove me from this distribution list?



 (Filling up my inbox too fast)



 *From:* Michael Armbrust [mailto:mich...@databricks.com]
 *Sent:* Monday, August 24, 2015 2:13 PM
 *To:* Philip Weaver philip.wea...@gmail.com
 *Cc:* Jerrick Hoang jerrickho...@gmail.com; Raghavendra Pandey 
 raghavendra.pan...@gmail.com; User user@spark.apache.org; Cheng, Hao 
 hao.ch...@intel.com

 *Subject:* Re: Spark Sql behaves strangely with tables with a lot of
 partitions



 I think we are mostly bottlenecked at this point by how fast we can make
 listStatus calls to discover the folders.  That said, we are happy to
 accept suggestions or PRs to make this faster.  Perhaps you can describe
 how your home grown partitioning works?



 On Sun, Aug 23, 2015 at 7:38 PM, Philip Weaver philip.wea...@gmail.com
 wrote:

 1 minute to discover 1000s of partitions -- yes, that is what I have
 observed. And I would assert that is very slow.



 On Sun, Aug 23, 2015 at 7:16 PM, Michael Armbrust mich...@databricks.com
 wrote:

 We should not be actually scanning all of the data of all of the
 partitions, but we do need to at least list all of the available
 directories so that we can apply your predicates to the actual values that
 are present when we are deciding which files need to be read in a given
 spark job.  While this is a somewhat expensive operation, we do it in
 parallel and we cache this information when you access the same relation
 more than once.



 Can you provide a little more detail about how exactly you are accessing
 the parquet data (are you using sqlContext.read or creating persistent
 tables in the metastore?), and how long it is taking?  It would also be
 good to know how many partitions we are talking about and how much data is
 in each.  Finally, I'd like to see the stacktrace where it is hanging to
 make sure my above assertions are correct.



 We have several tables internally that have 1000s of partitions and while
 it takes ~1 minute initially to discover the metadata, after that we are
 able to query the data interactively.







 On Sun, Aug 23, 2015 at 2:00 AM, Jerrick Hoang jerrickho...@gmail.com
 wrote:

 anybody has any suggestions?



 On Fri, Aug 21, 2015 at 3:14 PM, Jerrick Hoang jerrickho...@gmail.com
 wrote:

 Is there a workaround without updating Hadoop? Would really appreciate if
 someone can explain what spark is trying to do here and what is an easy way
 to turn this off. Thanks all!



 On Fri, Aug 21, 2015 at 11:09 AM, Raghavendra Pandey 
 raghavendra.pan...@gmail.com wrote:

 Did you try with hadoop version 2.7.1 .. It is known that s3a works
 really well with parquet which is available in 2.7. They fixed lot of
 issues related to metadata reading there...

 On Aug 21, 2015 11:24 PM, Jerrick Hoang jerrickho...@gmail.com wrote:

 @Cheng, Hao : Physical plans show that it got stuck on scanning S3!



 (table is partitioned by date_prefix and hour)

 explain select count(*) from test_table where date_prefix='20150819' and
 hour='00';



 TungstenAggregate(key=[], value=[(count(1),mode=Final,isDistinct=false)]

  TungstenExchange SinglePartition

   TungstenAggregate(key=[],
 value=[(count(1),mode=Partial,isDistinct=false)]

Scan ParquetRelation[ .. about 1000 partition paths go here ]



 Why does spark have to scan all partitions when the query only concerns
 with 1 partitions? Doesn't it defeat the purpose of partitioning?



 Thanks!



 On Thu, Aug 20, 2015 at 4:12 PM, Philip Weaver philip.wea...@gmail.com
 wrote:

 I hadn't heard of spark.sql.sources.partitionDiscovery.enabled before,
 and I couldn't find much information about it online. What does it mean
 exactly to disable it? Are there any negative consequences to disabling it?



 On Wed, Aug 19, 2015 at 10:53 PM, Cheng, Hao hao.ch...@intel.com wrote:

 Can you make some more profiling? I am wondering if the driver is busy
 with scanning the HDFS / S3.

 Like jstack pid of driver process



 And also, it’s will be great if you can paste the physical plan for the
 simple query.



 *From:* Jerrick Hoang [mailto:jerrickho...@gmail.com]
 *Sent:* Thursday, August 20, 2015 1:46 PM
 *To:* Cheng, Hao
 *Cc:* Philip Weaver; user
 *Subject:* Re: Spark Sql behaves strangely with tables with a lot of
 partitions



 I cloned from TOT after 1.5.0 cut off. I noticed there were a couple of
 CLs trying to speed up spark sql with tables with a huge number of
 partitions, I've made sure that those CLs are included but it's still very
 slow



 On Wed, Aug 19, 2015 at 10:43 PM, Cheng, Hao hao.ch...@intel.com wrote:

 

Re: Memory-efficient successive calls to repartition()

2015-08-24 Thread Alexis Gillain
Hi Aurelien,

The first code should create a new RDD in memory at each iteration (check
the webui).
The second code will unpersist the RDD but that's not the main problem.

I think you have trouble due to long lineage as .cache() keep track of
lineage for recovery.
You should have a look at checkpointing :
https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointer.scala

You can also have a look at the code of others iterative algorithms in
mlllib for best practices.


2015-08-20 17:26 GMT+08:00 abellet aurelien.bel...@telecom-paristech.fr:

 Hello,

 For the need of my application, I need to periodically shuffle the data
 across nodes/partitions of a reasonably-large dataset. This is an expensive
 operation but I only need to do it every now and then. However it seems
 that
 I am doing something wrong because as the iterations go the memory usage
 increases, causing the job to spill onto HDFS, which eventually gets full.
 I
 am also getting some Lost executor errors that I don't get if I don't
 repartition.

 Here's a basic piece of code which reproduces the problem:

 data = sc.textFile(ImageNet_gist_train.txt,50).map(parseLine).cache()
 data.count()
 for i in range(1000):
 data=data.repartition(50).persist()
 # below several operations are done on data


 What am I doing wrong? I tried the following but it doesn't solve the
 issue:

 for i in range(1000):
 data2=data.repartition(50).persist()
 data2.count() # materialize rdd
 data.unpersist() # unpersist previous version
 data=data2


 Help and suggestions on this would be greatly appreciated! Thanks a lot!




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Memory-efficient-successive-calls-to-repartition-tp24358.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Alexis GILLAIN


[Spark Streaming on Mesos (good practices)]

2015-08-24 Thread Aram Mkrtchyan
which are the best practices to submit spark streaming application on mesos.
I would like to know about scheduler mode.
Is `coarse-grained` mode right solution?

Thanks


Drop table and Hive warehouse

2015-08-24 Thread Kevin Jung
When I store DataFrame as table with command saveAsTable and then execute 
DROP TABLE in SparkSQL, it doesn't actually delete files in hive warehouse.
The table disappears from a table list but the data files are still alive.
Because of this, I can't saveAsTable with a same name before dropping table.
Is it a normal situation? If it is, I will delete files manually ;)

Kevin



상기 메일은 지정된 수신인만을 위한 것이며, 부정경쟁방지 및 영업비밀보호에 관한 법률,개인정보 보호법을 포함하여
 관련 법령에 따라 보호의 대상이 되는 영업비밀, 산업기술,기밀정보, 개인정보 등을 포함하고 있을 수 있습니다.
본 문서에 포함된 정보의 전부 또는 일부를 무단으로 복사 또는 사용하거나 제3자에게 공개, 배포, 제공하는 것은 엄격히
 금지됩니다. 본 메일이 잘못 전송된 경우 발신인 또는 당사에게 알려주시고 본 메일을 즉시 삭제하여 주시기 바랍니다. 
The contents of this e-mail message and any attachments are confidential and 
are intended solely for addressee.
 The information may also be legally privileged. This transmission is sent in 
trust, for the sole purpose of delivery
 to the intended recipient. If you have received this transmission in error, 
any use, reproduction or dissemination of
 this transmission is strictly prohibited. If you are not the intended 
recipient, please immediately notify the sender
 by reply e-mail or phone and delete this message and its attachments, if any.

DataFrame#show cost 2 Spark Jobs ?

2015-08-24 Thread Jeff Zhang
It's weird to me that the simple show function will cost 2 spark jobs.
DataFrame#explain shows it is a very simple operation, not sure why need 2
jobs.

== Parsed Logical Plan ==
Relation[age#0L,name#1]
JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json]

== Analyzed Logical Plan ==
age: bigint, name: string
Relation[age#0L,name#1]
JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json]

== Optimized Logical Plan ==
Relation[age#0L,name#1]
JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json]

== Physical Plan ==
Scan
JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json][age#0L,name#1]



-- 
Best Regards

Jeff Zhang


Determinant of Matrix

2015-08-24 Thread Naveen

Hi,

Is there any function to find the determinant of a mllib.linalg.Matrix 
(a covariance matrix) using Spark?



Regards,
Naveen

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to remove worker node but let it finish first?

2015-08-24 Thread Romi Kuntsman
Hi,
I have a spark standalone cluster with 100s of applications per day, and it
changes size (more or less workers) at various hours. The driver runs on a
separate machine outside the spark cluster.

When a job is running and it's worker is killed (because at that hour the
number of workers is reduced), it sometimes fails, instead of
redistributing the work to other workers.

How is it possible to decomission a worker, so that it doesn't receive any
new work, but does finish all existing work before shutting down?

Thanks!


Re: How to set environment of worker applications

2015-08-24 Thread Hemant Bhanawat
That's surprising. Passing the environment variables using
spark.executor.extraJavaOptions=-Dmyenvvar=xxx to the executor and then
fetching them using System.getProperty(myenvvar) has worked for me.

What is the error that you guys got?

On Mon, Aug 24, 2015 at 12:10 AM, Sathish Kumaran Vairavelu 
vsathishkuma...@gmail.com wrote:

 spark-env.sh works for me in Spark 1.4 but not
 spark.executor.extraJavaOptions.

 On Sun, Aug 23, 2015 at 11:27 AM Raghavendra Pandey 
 raghavendra.pan...@gmail.com wrote:

 I think the only way to pass on environment variables to worker node is
 to write it in spark-env.sh file on each worker node.

 On Sun, Aug 23, 2015 at 8:16 PM, Hemant Bhanawat hemant9...@gmail.com
 wrote:

 Check for spark.driver.extraJavaOptions and
 spark.executor.extraJavaOptions in the following article. I think you can
 use -D to pass system vars:

 spark.apache.org/docs/latest/configuration.html#runtime-environment
 Hi,

 I am starting a spark streaming job in standalone mode with spark-submit.

 Is there a way to make the UNIX environment variables with which
 spark-submit is started available to the processes started on the worker
 nodes?

 Jan
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Joining using mulitimap or array

2015-08-24 Thread Ilya Karpov
Hi, guys
I'm confused about joining columns in SparkSQL and need your advice.
I want to join 2 datasets of profiles. Each profile has name and array of 
attributes(age, gender, email etc).
There can be mutliple instances of attribute with the same name, e.g. profile 
has 2 emails - so 2 attributes with name = 'email' in 
array. Now I want to join 2 datasets using 'email' attribute. I cant find the 
way to do it :(

The code is below. Now result of join is empty, while I expect to see 1 row 
with all Alice emails.

import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

case class Attribute(name: String, value: String, weight: Float)
case class Profile(name: String, attributes: Seq[Attribute])

object SparkJoinArrayColumn {
  def main(args: Array[String]) {
val sc: SparkContext = new SparkContext(new 
SparkConf().setMaster(local).setAppName(getClass.getSimpleName))
val sqlContext: SQLContext = new SQLContext(sc)

import sqlContext.implicits._

val a: DataFrame = sc.parallelize(Seq(
  Profile(Alice, Seq(Attribute(email, al...@mail.com, 1.0f), 
Attribute(email, a.jo...@mail.com, 1.0f)))
)).toDF.as(a)

val b: DataFrame = sc.parallelize(Seq(
  Profile(Alice, Seq(Attribute(email, al...@mail.com, 1.0f), 
Attribute(age, 29, 0.2f)))
)).toDF.as(b)


a.where($a.attributes.name === email)
  .join(
b.where($b.attributes.name === email),
$a.attributes.value === $b.attributes.value
  )
.show()
  }
}

Thanks forward!
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Memory-efficient successive calls to repartition()

2015-08-24 Thread alexis GILLAIN
Hi Aurelien,

The first code should create a new RDD in memory at each iteration (check
the webui).
The second code will unpersist the RDD but that's not the main problem.

I think you have trouble due to long lineage as .cache() keep track of
lineage for recovery.
You should have a look at checkpointing :
https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointer.scala

You can also have a look at the code of others iterative algorithms in
mlllib for best practices.

2015-08-20 17:26 GMT+08:00 abellet aurelien.bel...@telecom-paristech.fr:

 Hello,

 For the need of my application, I need to periodically shuffle the data
 across nodes/partitions of a reasonably-large dataset. This is an expensive
 operation but I only need to do it every now and then. However it seems
 that
 I am doing something wrong because as the iterations go the memory usage
 increases, causing the job to spill onto HDFS, which eventually gets full.
 I
 am also getting some Lost executor errors that I don't get if I don't
 repartition.

 Here's a basic piece of code which reproduces the problem:

 data = sc.textFile(ImageNet_gist_train.txt,50).map(parseLine).cache()
 data.count()
 for i in range(1000):
 data=data.repartition(50).persist()
 # below several operations are done on data


 What am I doing wrong? I tried the following but it doesn't solve the
 issue:

 for i in range(1000):
 data2=data.repartition(50).persist()
 data2.count() # materialize rdd
 data.unpersist() # unpersist previous version
 data=data2


 Help and suggestions on this would be greatly appreciated! Thanks a lot!




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Memory-efficient-successive-calls-to-repartition-tp24358.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Transformation not happening for reduceByKey or GroupByKey

2015-08-24 Thread satish chandra j
HI All,

Please find fix info for users who are following the mail chain of this
issue and the respective solution below:

*reduceByKey: Non working snippet*

import org.apache.spark.Context
import org.apache.spark.Context._
import org.apache.spark.SparkConf
val conf = new SparkConf()
val sc = new SparkContext(conf)

val DataRDD =  SC.makeRDD(Seq((0,1),(0,2),(1,2),(1,3),(2,4)))
DataRDD.reduceByKey(_+_).collect

Result: Array() is empty

*reduceByKey: Working snippet*

import org.apache.spark.Context
import org.apache.spark.Context._
import org.apache.spark.SparkConf
val conf = new SparkConf()
val sc = new
SparkContext(conf).set(spark.driver.allowMultipleContexts,true)

val DataRDD =  SC.makeRDD(Seq((0,1),(0,2),(1,2),(1,3),(2,4)))
DataRDD.reduceByKey(_+_).collect

Result: Array((0,3),(1,5),(2,4))

Regards,
Satish Chandra


On Sat, Aug 22, 2015 at 11:27 AM, satish chandra j jsatishchan...@gmail.com
 wrote:

 HI All,
 Currently using DSE 4.7 and Spark 1.2.2 version

 Regards,
 Satish

 On Fri, Aug 21, 2015 at 7:30 PM, java8964 java8...@hotmail.com wrote:

 What version of Spark you are using, or comes with DSE 4.7?

 We just cannot reproduce it in Spark.

 yzhang@localhost$ more test.spark
 val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40)))
 pairs.reduceByKey((x,y) = x + y).collect
 yzhang@localhost$ ~/spark/bin/spark-shell --master local -i test.spark
 Welcome to
     __
  / __/__  ___ _/ /__
 _\ \/ _ \/ _ `/ __/  '_/
/___/ .__/\_,_/_/ /_/\_\   version 1.3.1
   /_/

 Using Scala version 2.10.4
 Spark context available as sc.
 SQL context available as sqlContext.
 Loading test.spark...
 pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at
 makeRDD at console:21
 15/08/21 09:58:51 WARN SizeEstimator: Failed to check whether
 UseCompressedOops is set; assuming yes
 res0: Array[(Int, Int)] = Array((0,3), (1,50), (2,40))

 Yong


 --
 Date: Fri, 21 Aug 2015 19:24:09 +0530
 Subject: Re: Transformation not happening for reduceByKey or GroupByKey
 From: jsatishchan...@gmail.com
 To: abhis...@tetrationanalytics.com
 CC: user@spark.apache.org


 HI Abhishek,

 I have even tried that but rdd2 is empty

 Regards,
 Satish

 On Fri, Aug 21, 2015 at 6:47 PM, Abhishek R. Singh 
 abhis...@tetrationanalytics.com wrote:

 You had:

  RDD.reduceByKey((x,y) = x+y)
  RDD.take(3)

 Maybe try:

  rdd2 = RDD.reduceByKey((x,y) = x+y)
  rdd2.take(3)

 -Abhishek-

 On Aug 20, 2015, at 3:05 AM, satish chandra j jsatishchan...@gmail.com
 wrote:

  HI All,
  I have data in RDD as mentioned below:
 
  RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40))
 
 
  I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function
 on Values for each key
 
  Code:
  RDD.reduceByKey((x,y) = x+y)
  RDD.take(3)
 
  Result in console:
  RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey
 at console:73
  res:Array[(Int,Int)] = Array()
 
  Command as mentioned
 
  dse spark --master local --jars postgresql-9.4-1201.jar -i  ScriptFile
 
 
  Please let me know what is missing in my code, as my resultant Array is
 empty
 
 
 
  Regards,
  Satish
 






History server is not receiving any event

2015-08-24 Thread b.bhavesh
Hi,

I am working on streaming application. 
I tried to configure history server to persist the events of application in
hadoop file system (hdfs). However, it is not logging any events.
I am running Apache Spark 1.4.1 (pyspark) under Ubuntu 14.04 with three
nodes.
Here is my configuration:
File - /usr/local/spark/conf/spark-defaults.conf#In all three nodes
spark.eventLog.enabled true
spark.eventLog.dir hdfs://master-host:port/usr/local/hadoop/spark_log

#in master node
export
SPARK_HISTORY_OPTS=-Dspark.history.fs.logDirectory=hdfs://host:port/usr/local/hadoop/spark_log

Can someone give list of steps to configure history server.

Thanks and regards,
b.bhavesh





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/History-server-is-not-receiving-any-event-tp24426.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Array Out OF Bound Exception

2015-08-24 Thread SAHA, DEBOBROTA
Hi ,

I am using SPARK 1.4 and I am getting an array out of bound Exception when I am 
trying to read from a registered table in SPARK.

For example If I have 3 different text files with the content as below:

Scenario 1:
A1|B1|C1
A2|B2|C2

Scenario 2:
A1| |C1
A2| |C2

Scenario 3:
A1| B1|
A2| B2|

So for Scenario 1 and 2 it's working fine but for Scenario 3 I am getting the 
following error:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 
4, localhost): java.lang.ArrayIndexOutOfBoundsException: 2
at 
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(console:40)
at 
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(console:38)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at 
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at 
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at scala.Option.foreach(Option.scala:236)
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

Please help.

Thanks,
Debobrota





Re: Meaning of local[2]

2015-08-24 Thread Akhil Das
Just to add you can also look into SPARK_WORKER_INSTANCES configuration in
the spark-env.sh file.
On Aug 17, 2015 3:44 AM, Daniel Darabos daniel.dara...@lynxanalytics.com
wrote:

 Hi Praveen,

 On Mon, Aug 17, 2015 at 12:34 PM, praveen S mylogi...@gmail.com wrote:

 What does this mean in .setMaster(local[2])

 Local mode (executor in the same JVM) with 2 executor threads.

 Is this applicable only for standalone Mode?

 It is not applicable for standalone mode, only for local.

 Can I do this in a cluster setup, eg:
 . setMaster(hostname:port[2])..

 No. It's faster to try than to ask a mailing list, actually. Also it's
 documented at
 http://spark.apache.org/docs/latest/submitting-applications.html#master-urls
 .

 Is it number of threads per worker node?

 You can control the number of total threads with
 spark-submit's --total-executor-cores parameter, if that's what you're
 looking for.



Re: Spark Sql behaves strangely with tables with a lot of partitions

2015-08-24 Thread Michael Armbrust
Follow the directions here: http://spark.apache.org/community.html

On Mon, Aug 24, 2015 at 11:36 AM, Sereday, Scott scott.sere...@nielsen.com
wrote:

 Can you please remove me from this distribution list?



 (Filling up my inbox too fast)



 *From:* Michael Armbrust [mailto:mich...@databricks.com]
 *Sent:* Monday, August 24, 2015 2:13 PM
 *To:* Philip Weaver philip.wea...@gmail.com
 *Cc:* Jerrick Hoang jerrickho...@gmail.com; Raghavendra Pandey 
 raghavendra.pan...@gmail.com; User user@spark.apache.org; Cheng, Hao 
 hao.ch...@intel.com

 *Subject:* Re: Spark Sql behaves strangely with tables with a lot of
 partitions



 I think we are mostly bottlenecked at this point by how fast we can make
 listStatus calls to discover the folders.  That said, we are happy to
 accept suggestions or PRs to make this faster.  Perhaps you can describe
 how your home grown partitioning works?



 On Sun, Aug 23, 2015 at 7:38 PM, Philip Weaver philip.wea...@gmail.com
 wrote:

 1 minute to discover 1000s of partitions -- yes, that is what I have
 observed. And I would assert that is very slow.



 On Sun, Aug 23, 2015 at 7:16 PM, Michael Armbrust mich...@databricks.com
 wrote:

 We should not be actually scanning all of the data of all of the
 partitions, but we do need to at least list all of the available
 directories so that we can apply your predicates to the actual values that
 are present when we are deciding which files need to be read in a given
 spark job.  While this is a somewhat expensive operation, we do it in
 parallel and we cache this information when you access the same relation
 more than once.



 Can you provide a little more detail about how exactly you are accessing
 the parquet data (are you using sqlContext.read or creating persistent
 tables in the metastore?), and how long it is taking?  It would also be
 good to know how many partitions we are talking about and how much data is
 in each.  Finally, I'd like to see the stacktrace where it is hanging to
 make sure my above assertions are correct.



 We have several tables internally that have 1000s of partitions and while
 it takes ~1 minute initially to discover the metadata, after that we are
 able to query the data interactively.







 On Sun, Aug 23, 2015 at 2:00 AM, Jerrick Hoang jerrickho...@gmail.com
 wrote:

 anybody has any suggestions?



 On Fri, Aug 21, 2015 at 3:14 PM, Jerrick Hoang jerrickho...@gmail.com
 wrote:

 Is there a workaround without updating Hadoop? Would really appreciate if
 someone can explain what spark is trying to do here and what is an easy way
 to turn this off. Thanks all!



 On Fri, Aug 21, 2015 at 11:09 AM, Raghavendra Pandey 
 raghavendra.pan...@gmail.com wrote:

 Did you try with hadoop version 2.7.1 .. It is known that s3a works really
 well with parquet which is available in 2.7. They fixed lot of issues
 related to metadata reading there...

 On Aug 21, 2015 11:24 PM, Jerrick Hoang jerrickho...@gmail.com wrote:

 @Cheng, Hao : Physical plans show that it got stuck on scanning S3!



 (table is partitioned by date_prefix and hour)

 explain select count(*) from test_table where date_prefix='20150819' and
 hour='00';



 TungstenAggregate(key=[], value=[(count(1),mode=Final,isDistinct=false)]

  TungstenExchange SinglePartition

   TungstenAggregate(key=[],
 value=[(count(1),mode=Partial,isDistinct=false)]

Scan ParquetRelation[ .. about 1000 partition paths go here ]



 Why does spark have to scan all partitions when the query only concerns
 with 1 partitions? Doesn't it defeat the purpose of partitioning?



 Thanks!



 On Thu, Aug 20, 2015 at 4:12 PM, Philip Weaver philip.wea...@gmail.com
 wrote:

 I hadn't heard of spark.sql.sources.partitionDiscovery.enabled before, and
 I couldn't find much information about it online. What does it mean exactly
 to disable it? Are there any negative consequences to disabling it?



 On Wed, Aug 19, 2015 at 10:53 PM, Cheng, Hao hao.ch...@intel.com wrote:

 Can you make some more profiling? I am wondering if the driver is busy
 with scanning the HDFS / S3.

 Like jstack pid of driver process



 And also, it’s will be great if you can paste the physical plan for the
 simple query.



 *From:* Jerrick Hoang [mailto:jerrickho...@gmail.com]
 *Sent:* Thursday, August 20, 2015 1:46 PM
 *To:* Cheng, Hao
 *Cc:* Philip Weaver; user
 *Subject:* Re: Spark Sql behaves strangely with tables with a lot of
 partitions



 I cloned from TOT after 1.5.0 cut off. I noticed there were a couple of
 CLs trying to speed up spark sql with tables with a huge number of
 partitions, I've made sure that those CLs are included but it's still very
 slow



 On Wed, Aug 19, 2015 at 10:43 PM, Cheng, Hao hao.ch...@intel.com wrote:

 Yes, you can try set the spark.sql.sources.partitionDiscovery.enabled to
 false.



 BTW, which version are you using?



 Hao



 *From:* Jerrick Hoang [mailto:jerrickho...@gmail.com]
 *Sent:* Thursday, August 20, 2015 12:16 PM
 *To:* Philip Weaver
 *Cc:* user
 

`show tables like 'tmp*';` does not work in Spark 1.3.0+

2015-08-24 Thread dugdun
Hi guys and gals,

I have a Spark 1.2.0 instance running that I connect to via the thrift
interface using beeline. On this instance I can send a command like `show
tables like 'tmp*';` and I get a list of all tables that start with `tmp`.
When testing this same command out on a server that is running Spark 1.3.0
or higher, I now get an error message:

0: jdbc:hive2://localhost:10001 show tables like 'tmp*';
Error: java.lang.RuntimeException: [1.13] failure: ``in'' expected but
identifier like found

show tables like 'tmp*'
^ (state=,code=0)
0: jdbc:hive2://localhost:10001 

I'm wondering if wildcard matching was inadvertently removed, or if there is
another way to accomplish the same thing without having to do this filtering
on the client side.

Cheers!

Doug




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/show-tables-like-tmp-does-not-work-in-Spark-1-3-0-tp24429.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Strange ClassNotFoundException in spark-shell

2015-08-24 Thread Jan Algermissen
Hi,

I am using spark 1.4 M1 with the Cassandra Connector and run into a strange 
error when using the spark shell.

This works:

sc.cassandraTable(events, 
bid_events).select(bid,type).take(10).foreach(println)


But as soon as I put a map() in there (or filter):

sc.cassandraTable(events, bid_events).select(bid,type).map(r = 
r).take(10).foreach(println)


I get the exception below.

The spark-shell call is:

/opt/spark/bin/spark-shell --master spark://x-1:7077 --conf 
spark.cassandra.connection.host=$(hostname -i) --driver-class-path $(echo 
/root/*.jar |sed 's/ /:/g') --jar 
spark-cassandra-connector-assembly-1.4.0-M1-SNAPSHOT.jar

Can anyone provide ideas how to approach debugging this?

Jan


15/08/24 23:54:43 INFO DAGScheduler: Job 0 failed: take at console:32, took 
1.999875 s
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 
3, 10.31.39.116): java.lang.ClassNotFoundException: $anonfun$1
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at 
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:66)
at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69)
at 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:95)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
  at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
  at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
  at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
  at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
  at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
  at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
  at scala.Option.foreach(Option.scala:257)
  at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
  at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
  at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

scala 
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Running spark shell on mesos with zookeeper on spark 1.3.1

2015-08-24 Thread kohlisimranjit
I have setup up apache mesos using mesosphere on Cent OS 6 with Java 8.I have
3 slaves which total to 3 cores and 8 gb ram. I have set no firewalls. I am
trying to run the following lines of code to test whether the setup is
working:

 val data = 1 to 1
 val distData = sc.parallelize(data)
 distData.filter(_ 10).collect()

I get the following on my console
5/08/24 20:54:57 INFO SparkContext: Starting job: collect at console:26
15/08/24 20:54:57 INFO DAGScheduler: Got job 0 (collect at console:26)
with 8 output partitions (allowLocal=false)
15/08/24 20:54:57 INFO DAGScheduler: Final stage: Stage 0(collect at
console:26)
15/08/24 20:54:57 INFO DAGScheduler: Parents of final stage: List()
15/08/24 20:54:57 INFO DAGScheduler: Missing parents: List()
15/08/24 20:54:57 INFO DAGScheduler: Submitting Stage 0 (MapPartitionsRDD[1]
at filter at console:26), which has no missing parents
15/08/24 20:54:57 INFO MemoryStore: ensureFreeSpace(1792) called with
curMem=0, maxMem=280248975
15/08/24 20:54:57 INFO MemoryStore: Block broadcast_0 stored as values in
memory (estimated size 1792.0 B, free 267.3 MB)
15/08/24 20:54:57 INFO MemoryStore: ensureFreeSpace(1293) called with
curMem=1792, maxMem=280248975
15/08/24 20:54:57 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes
in memory (estimated size 1293.0 B, free 267.3 MB)
15/08/24 20:54:57 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory
on ip-172-31-46-176.ec2.internal:33361 (size: 1293.0 B, free: 267.3 MB)
15/08/24 20:54:57 INFO BlockManagerMaster: Updated info of block
broadcast_0_piece0
15/08/24 20:54:57 INFO SparkContext: Created broadcast 0 from broadcast at
DAGScheduler.scala:839
15/08/24 20:54:57 INFO DAGScheduler: Submitting 8 missing tasks from Stage 0
(MapPartitionsRDD[1] at filter at console:26)
15/08/24 20:54:57 INFO TaskSchedulerImpl: Adding task set 0.0 with 8 tasks
15/08/24 20:55:12 WARN TaskSchedulerImpl: Initial job has not accepted any
resources; check your cluster UI to ensure that workers are registered and
have sufficient resources
15/08/24 20:55:27 WARN TaskSchedulerImpl: Initial job has not accepted any
resources; check your cluster UI to ensure that workers are registered and
have sufficient resources
.

Here are the logs /var/log/mesos attached







mesos-slave.22202
http://apache-spark-user-list.1001560.n3.nabble.com/file/n24430/mesos-slave.22202
  
mesos-slave.22202
http://apache-spark-user-list.1001560.n3.nabble.com/file/n24430/mesos-slave.22202
  
mesos-master.22181
http://apache-spark-user-list.1001560.n3.nabble.com/file/n24430/mesos-master.22181
  
mesos-master.22181
http://apache-spark-user-list.1001560.n3.nabble.com/file/n24430/mesos-master.22181
  





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Running-spark-shell-on-mesos-with-zookeeper-on-spark-1-3-1-tp24430.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Run Spark job from within iPython+Spark?

2015-08-24 Thread YaoPau
I set up iPython Notebook to work with the pyspark shell, and now I'd like
use %run to basically 'spark-submit' another Python Spark file, and leave
the objects accessible within the Notebook.

I tried this, but got a ValueError: Cannot run multiple SparkContexts at
once error.  I then tried taking out the 'sc = SparkContext()' line from
the .py file, but then it couldn't access sc.

How can I %run another Python Spark file within iPython Notebook?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Run-Spark-job-from-within-iPython-Spark-tp24427.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Array Out OF Bound Exception

2015-08-24 Thread Michael Armbrust
This top line here is indicating that the exception is being throw from
your code (i.e. code written in the console).

at
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(console:40)


Check to make sure that you are properly handling data that has less
columns than you would expect.



On Mon, Aug 24, 2015 at 12:41 PM, SAHA, DEBOBROTA ds3...@att.com wrote:

 Hi ,



 I am using SPARK 1.4 and I am getting an array out of bound Exception when
 I am trying to read from a registered table in SPARK.



 For example If I have 3 different text files with the content as below:



 *Scenario 1*:

 A1|B1|C1

 A2|B2|C2



 *Scenario 2*:

 A1| |C1

 A2| |C2



 *Scenario 3*:

 A1| B1|

 A2| B2|



 So for Scenario 1 and 2 it’s working fine but for Scenario 3 I am getting
 the following error:



 org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage
 3.0 (TID 4, localhost): java.lang.ArrayIndexOutOfBoundsException: 2

 at
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(console:40)

 at
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(console:38)

 at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)

 at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)

 at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)

 at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)

 at scala.collection.Iterator$class.foreach(Iterator.scala:727)

 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

 at
 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)

 at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)

 at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)

 at scala.collection.TraversableOnce$class.to
 (TraversableOnce.scala:273)

 at scala.collection.AbstractIterator.to(Iterator.scala:1157)

 at
 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)

 at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)

 at
 scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)

 at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)

 at
 org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143)

 at
 org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143)

 at
 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)

 at
 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)

 at
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)

 at org.apache.spark.scheduler.Task.run(Task.scala:70)

 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)

 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

 at java.lang.Thread.run(Thread.java:745)



 Driver stacktrace:

 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)

 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)

 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)

 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

 at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

 at
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)

 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)

 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)

 at scala.Option.foreach(Option.scala:236)

 at
 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)

 at
 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)

 at
 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)

 at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)



 Please help.



 Thanks,

 Debobrota









Re: Performance - Python streaming v/s Scala streaming

2015-08-24 Thread Tathagata Das
The scala version of the Kafka  is something that we have been working on
for a while, and is likely to be more optimized than the python one. The
python one definitely requires pass the data back and forth between JVM and
Python VM and decoding the raw bytes to the Python strings (probably less
efficient that Java's Byte to UTF8 decoder), so that may cause some extra
overheads compared to scala.

Also consider trying the direct API. Read more in the Kafka integration
guide - http://spark.apache.org/docs/latest/streaming-kafka-integration.html
That overall has a much higher throughput that the earlier receiver based
approach.

BTW, disclaimer. Do not consider this difference as generalization of the
performance difference between Scala and Python for all of Spark, For
example, DataFrames provide performance parity between Scala and Python
APIs.


On Mon, Aug 24, 2015 at 5:22 AM, utk.pat utkarsh.pat...@gmail.com wrote:

 I am new to SPARK streaming. I was running the kafka_wordcount example
 with a local KAFKA and SPARK instance. It was very easy to set this up and
 get going :) I tried running both SCALA and Python versions of the word
 count example. Python versions seems to be extremely slow. Sometimes it has
 delays of more than couple of minutes. On the other hand SCALA versions
 seems to be way better. I am running on a windows machine. I am trying to
 understand what is the cause slowness in python streaming? Is there
 anything that I am missing? For real time streaming analysis should I
 prefer SCALA?
 --
 View this message in context: Performance - Python streaming v/s Scala
 streaming
 http://apache-spark-user-list.1001560.n3.nabble.com/Performance-Python-streaming-v-s-Scala-streaming-tp24415.html
 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.



ExternalSorter: Thread *** spilling in-memory map of 352.6 MB to disk (38 times so far)

2015-08-24 Thread d...@lumity.com
Hello,

I'm trying to run a spark 1.5 job with:

 ./spark-shell --driver-java-options -Xdebug
-Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=1044 -Xms16g
-Xmx48g -Xss128m  

I get lots of error messages like :

15/08/24 20:24:33 INFO ExternalSorter: Thread 172 spilling in-memory map of
352.2 MB to disk (40 times so far)
15/08/24 20:24:33 INFO ExternalSorter: Thread 179 spilling in-memory map of
352.2 MB to disk (39 times so far)
15/08/24 20:24:34 INFO ExternalSorter: Thread 197 spilling in-memory map of
352.2 MB to disk (39 times so far)
15/08/24 20:24:34 INFO ExternalSorter: Thread 192 spilling in-memory map of
352.2 MB to disk (39 times so far)
15/08/24 20:24:36 INFO ShuffleMemoryManager: TID 798 waiting for at least
1/2N of shuffle memory pool to be free
15/08/24 20:24:36 INFO ExternalSorter: Thread 170 spilling in-memory map of
352.2 MB to disk (39 times so far)
15/08/24 20:24:36 INFO ExternalSorter: Thread 171 spilling in-memory map of
352.2 MB to disk (40 times so far)

When I force a stack trace with jstack, I get stack traces like this for
each of my 36 cores:

Executor task launch worker-44 daemon prio=10 tid=0x7fb9c404d000
nid=0x417f runnable [0x7fb5e3ffe000]
   java.lang.Thread.State: RUNNABLE
at
scala.collection.mutable.ResizableArray$class.$init$(ResizableArray.scala:32)
at scala.collection.mutable.ArrayBuffer.init(ArrayBuffer.scala:49)
at scala.collection.mutable.ArrayBuffer.init(ArrayBuffer.scala:63)
at
org.apache.spark.util.SizeEstimator$SearchState.init(SizeEstimator.scala:154)
at
org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:183)
at
org.apache.spark.util.SizeEstimator$$anonfun$sampleArray$1.apply$mcVI$sp(SizeEstimator.scala:262)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:166)
at
org.apache.spark.util.SizeEstimator$.sampleArray(SizeEstimator.scala:254)
at
org.apache.spark.util.SizeEstimator$.visitArray(SizeEstimator.scala:238)
at
org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:194)
at
org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:186)
at
org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:54)
at
org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78)
at
org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70)
at
org.apache.spark.util.collection.PartitionedPairBuffer.afterUpdate(PartitionedPairBuffer.scala:30)
at
org.apache.spark.util.collection.PartitionedPairBuffer.insert(PartitionedPairBuffer.scala:53)
at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:214)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)



Jmap Shows:

root@ip-172-31-28-227:~# jmap -heap 16421
Attaching to process ID 16421, please wait...
Debugger attached successfully.
Server compiler detected.
JVM version is 24.79-b02

using thread-local object allocation.
Parallel GC with 25 thread(s)

Heap Configuration:
   MinHeapFreeRatio = 0
   MaxHeapFreeRatio = 100
   MaxHeapSize  = 51539607552 (49152.0MB)
   NewSize  = 1310720 (1.25MB)
   MaxNewSize   = 17592186044415 MB
   OldSize  = 5439488 (5.1875MB)
   NewRatio = 2
   SurvivorRatio= 8
   PermSize = 21757952 (20.75MB)
   MaxPermSize  = 268435456 (256.0MB)
   G1HeapRegionSize = 0 (0.0MB)

Heap Usage:
PS Young Generation
Eden Space:
   capacity = 15290335232 (14582.0MB)
   used = 11958528232 (11404.541236877441MB)
   free = 3331807000 (3177.4587631225586MB)
   78.20971908433302% used
From Space:
   capacity = 869269504 (829.0MB)
   used = 868910560 (828.6576843261719MB)
   free = 358944 (0.342315673828125MB)
   99.9587073976082% used
To Space:
   capacity = 965738496 (921.0MB)
   used = 0 (0.0MB)
   free = 965738496 (921.0MB)
   0.0% used
PS Old Generation
   capacity = 11453595648 (10923.0MB)
   used = 1423152248 (1357.223747253418MB)
   free = 10030443400 (9565.776252746582MB)
   12.425375329611077% used
PS Perm Generation
   capacity = 107479040 (102.5MB)
   used = 107107360 (102.14553833007812MB)
   free = 371680 (0.354461669921875MB)
   99.65418373666158% used

27990 interned Strings occupying 

Re: Spark ec2 lunch problem

2015-08-24 Thread Andrew Or
Hey Garry,

Have you verified that your particular VPC and subnet are open to the
world? In particular, have you verified the route table attached to your
VPC / subnet contains an internet gateway open to the public?

I've run into this issue myself recently and that was the problem for me.

-Andrew

2015-08-24 5:58 GMT-07:00 Robin East robin.e...@xense.co.uk:

 spark-ec2 is the way to go however you may need to debug connectivity
 issues. For example do you know that the servers were correctly setup in
 AWS and can you access each node using ssh? If no then you need to work out
 why (it’s not a spark issue). If yes then you will need to work out why ssh
 via the spark-ec2 script is not working.

 I’ve used spark-ec2 successfully many times but have never used the
 —vpc-id and —subnet-id options and that may be the source of your problems,
 especially since it appears to be a hostname resolution issue. If you could
 confirm the above questions then maybe someone on the list can help
 diagnose the specific problem.



 ---
 Robin East
 *Spark GraphX in Action* Michael Malak and Robin East
 Manning Publications Co.
 http://www.manning.com/malak/

 On 24 Aug 2015, at 13:45, Garry Chen g...@cornell.edu wrote:

 So what is the best way to deploy spark cluster in EC2 environment any
 suggestions?

 Garry

 *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com
 ak...@sigmoidanalytics.com]
 *Sent:* Friday, August 21, 2015 4:27 PM
 *To:* Garry Chen g...@cornell.edu
 *Cc:* user@spark.apache.org
 *Subject:* Re: Spark ec2 lunch problem


 It may happen that the version of spark-ec2 script you are using is buggy
 or sometime AWS have problem provisioning machines.
 On Aug 21, 2015 7:56 AM, Garry Chen g...@cornell.edu wrote:

 Hi All,
 I am trying to lunch a spark ec2 cluster by running
  spark-ec2 --key-pair=key --identity-file=my.pem --vpc-id=myvpc
 --subnet-id=subnet-011 --spark-version=1.4.1 launch spark-cluster but
 getting following message endless.  Please help.


 Warning: SSH connection error. (This could be temporary.)
 Host:
 SSH return code: 255
 SSH output: ssh: Could not resolve hostname : Name or service not known





Exclude slf4j-log4j12 from the classpath via spark-submit

2015-08-24 Thread Utkarsh Sengar
Continuing this discussion:
http://apache-spark-user-list.1001560.n3.nabble.com/same-log4j-slf4j-error-in-spark-9-1-td5592.html

I am getting this error when I use logback-classic.

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:.m2/repository/ch/qos/logback/logback-classic/1.1.2/logback-classic-1.1.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:.m2/repository/org/slf4j/slf4j-log4j12/1.7.10/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]

I need to use logback-classic for my current project, so I am trying to
ignore slf4j-log4j12 from spark:
dependency
groupIdorg.apache.spark/groupId
artifactIdspark-core_2.10/artifactId
version1.4.1/version
exclusions
exclusion
groupIdorg.slf4j/groupId
artifactIdslf4j-log4j12/artifactId
/exclusion
/exclusions
/dependency

Now, when I run my job from Intellij (which sets the classpath), things
work perfectly.

But when I run my job via spark-submit:
~/spark-1.4.1-bin-hadoop2.4/bin/spark-submit --class runner.SparkRunner
spark-0.1-SNAPSHOT-jar-with-dependencies.jar
My job fails because spark-submit sets up the classpath and it re-adds the
slf4j-log4j12.

I am not adding spark jar to the uber-jar via the maven assembly plugin:
 dependencySets
dependencySet
..
useTransitiveDependenciesfalse/useTransitiveDependencies
excludes
excludeorg.apache.spark:spark-core_2.10/exclude
/excludes
/dependencySet
/dependencySets

So how can I exclude slf4j-log4j12.jar when I submit a job via
spark-submit (on a per job basis)?

-- 
Thanks,
-Utkarsh


Re: Exclude slf4j-log4j12 from the classpath via spark-submit

2015-08-24 Thread Marcelo Vanzin
Hi Utkarsh,

Unfortunately that's not going to be easy. Since Spark bundles all
dependent classes into a single fat jar file, to remove that
dependency you'd need to modify Spark's assembly jar (potentially in
all your nodes). Doing that per-job is even trickier, because you'd
probably need some kind of script to inject the correct binding into
Spark's classpath.

That being said, that message is not an error, it's more of a noisy
warning. I'd expect slf4j to use the first binding available - in your
case, logback-classic. Is that not the case?


On Mon, Aug 24, 2015 at 2:50 PM, Utkarsh Sengar utkarsh2...@gmail.com wrote:
 Continuing this discussion:
 http://apache-spark-user-list.1001560.n3.nabble.com/same-log4j-slf4j-error-in-spark-9-1-td5592.html

 I am getting this error when I use logback-classic.

 SLF4J: Class path contains multiple SLF4J bindings.
 SLF4J: Found binding in
 [jar:file:.m2/repository/ch/qos/logback/logback-classic/1.1.2/logback-classic-1.1.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
 SLF4J: Found binding in
 [jar:file:.m2/repository/org/slf4j/slf4j-log4j12/1.7.10/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]

 I need to use logback-classic for my current project, so I am trying to
 ignore slf4j-log4j12 from spark:
 dependency
 groupIdorg.apache.spark/groupId
 artifactIdspark-core_2.10/artifactId
 version1.4.1/version
 exclusions
 exclusion
 groupIdorg.slf4j/groupId
 artifactIdslf4j-log4j12/artifactId
 /exclusion
 /exclusions
 /dependency

 Now, when I run my job from Intellij (which sets the classpath), things work
 perfectly.

 But when I run my job via spark-submit:
 ~/spark-1.4.1-bin-hadoop2.4/bin/spark-submit --class runner.SparkRunner
 spark-0.1-SNAPSHOT-jar-with-dependencies.jar
 My job fails because spark-submit sets up the classpath and it re-adds the
 slf4j-log4j12.

 I am not adding spark jar to the uber-jar via the maven assembly plugin:
  dependencySets
 dependencySet
 ..
 useTransitiveDependenciesfalse/useTransitiveDependencies
 excludes
 excludeorg.apache.spark:spark-core_2.10/exclude
 /excludes
 /dependencySet
 /dependencySets

 So how can I exclude slf4j-log4j12.jar when I submit a job via
 spark-submit (on a per job basis)?

 --
 Thanks,
 -Utkarsh



-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark and scala-2.11

2015-08-24 Thread Lanny Ripple
We're going to be upgrading from spark 1.0.2 and using hadoop-1.2.1 so need
to build by hand.  (Yes, I know. Use hadoop-2.x but standard resource
constraints apply.)  I want to build against scala-2.11 and publish to our
artifact repository but finding build/spark-2.10.4 and tracing down what
build/mvn was doing had me concerned that I was missing something.  I'll
hold the course and build it as instructed.

Thanks for the info, all.

PS - Since asked -- PATH=./build/apache-maven-3.2.5/bin:$PATH; build/mvn
-Phadoop-1 -Dhadoop.version=1.2.1 -Dscala-2.11 -DskipTests package

On Mon, Aug 24, 2015 at 2:49 PM, Jonathan Coveney jcove...@gmail.com
wrote:

 I've used the instructions and it worked fine.

 Can you post exactly what you're doing, and what it fails with? Or are you
 just trying to understand how it works?

 2015-08-24 15:48 GMT-04:00 Lanny Ripple la...@spotright.com:

 Hello,

 The instructions for building spark against scala-2.11 indicate using
 -Dspark-2.11.  When I look in the pom.xml I find a profile named
 'spark-2.11' but nothing that would indicate I should set a property.  The
 sbt build seems to need the -Dscala-2.11 property set.  Finally build/mvn
 does a simple grep of scala.version (which doesn't change after running dev/
 change-version-to-2.11.sh) so the build seems to be grabbing the 2.10.4
 scala library.

 Anyone know (from having done it and used it in production) if the build
 instructions for spark-1.4.1 against Scala-2.11 are correct?

 Thanks.
   -Lanny





Re: spark and scala-2.11

2015-08-24 Thread Jonathan Coveney
I've used the instructions and it worked fine.

Can you post exactly what you're doing, and what it fails with? Or are you
just trying to understand how it works?

2015-08-24 15:48 GMT-04:00 Lanny Ripple la...@spotright.com:

 Hello,

 The instructions for building spark against scala-2.11 indicate using
 -Dspark-2.11.  When I look in the pom.xml I find a profile named
 'spark-2.11' but nothing that would indicate I should set a property.  The
 sbt build seems to need the -Dscala-2.11 property set.  Finally build/mvn
 does a simple grep of scala.version (which doesn't change after running dev/
 change-version-to-2.11.sh) so the build seems to be grabbing the 2.10.4
 scala library.

 Anyone know (from having done it and used it in production) if the build
 instructions for spark-1.4.1 against Scala-2.11 are correct?

 Thanks.
   -Lanny



Re: spark and scala-2.11

2015-08-24 Thread Sean Owen
The property scala-2.11 triggers the profile scala-2.11 -- and
additionally disables the scala-2.10 profile, so that's the way to do
it. But yes, you also need to run the script before-hand to set up the
build for Scala 2.11 as well.

On Mon, Aug 24, 2015 at 8:48 PM, Lanny Ripple la...@spotright.com wrote:
 Hello,

 The instructions for building spark against scala-2.11 indicate using
 -Dspark-2.11.  When I look in the pom.xml I find a profile named
 'spark-2.11' but nothing that would indicate I should set a property.  The
 sbt build seems to need the -Dscala-2.11 property set.  Finally build/mvn
 does a simple grep of scala.version (which doesn't change after running
 dev/change-version-to-2.11.sh) so the build seems to be grabbing the 2.10.4
 scala library.

 Anyone know (from having done it and used it in production) if the build
 instructions for spark-1.4.1 against Scala-2.11 are correct?

 Thanks.
   -Lanny

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Local Spark talking to remote HDFS?

2015-08-24 Thread Dino Fancellu
Changing the ip to the guest IP address just never connects.

The VM has port tunnelling, and it passes through all the main ports,
8020 included to the host VM.

You can tell that it was talking to the guest VM before, simply
because it said when file not found

Error is:

Exception in thread main org.apache.spark.SparkException: Job
aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most
recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost):
org.apache.hadoop.hdfs.BlockMissingException: Could not obtain block:
BP-452094660-10.0.2.15-1437494483194:blk_1073742905_2098
file=/tmp/people.txt

but I have no idea what it means by that. It certainly can find the
file and knows it exists.



On 24 August 2015 at 20:43, Roberto Congiu roberto.con...@gmail.com wrote:
 When you launch your HDP guest VM, most likely it gets launched with NAT and
 an address on a private network (192.168.x.x) so on your windows host you
 should use that address (you can find out using ifconfig on the guest OS).
 I usually add an entry to my /etc/hosts for VMs that I use oftenif you
 use vagrant, there's also a vagrant module that can do that automatically.
 Also, I am not sure how the default HDP VM is set up, that is, if it only
 binds HDFS to 127.0.0.1 or to all addresses. You can check that with netstat
 -a.

 R.

 2015-08-24 11:46 GMT-07:00 Dino Fancellu d...@felstar.com:

 I have a file in HDFS inside my HortonWorks HDP 2.3_1 VirtualBox VM.

 If I go into the guest spark-shell and refer to the file thus, it works
 fine

   val words=sc.textFile(hdfs:///tmp/people.txt)
   words.count

 However if I try to access it from a local Spark app on my Windows host,
 it
 doesn't work

   val conf = new SparkConf().setMaster(local).setAppName(My App)
   val sc = new SparkContext(conf)

   val words=sc.textFile(hdfs://localhost:8020/tmp/people.txt)
   words.count

 Emits



 The port 8020 is open, and if I choose the wrong file name, it will tell
 me



 My pom has

 dependency
 groupIdorg.apache.spark/groupId
 artifactIdspark-core_2.11/artifactId
 version1.4.1/version
 scopeprovided/scope
 /dependency

 Am I doing something wrong?

 Thanks.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Local-Spark-talking-to-remote-HDFS-tp24425.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Local Spark talking to remote HDFS?

2015-08-24 Thread Roberto Congiu
When you launch your HDP guest VM, most likely it gets launched with NAT
and an address on a private network (192.168.x.x) so on your windows host
you should use that address (you can find out using ifconfig on the guest
OS).
I usually add an entry to my /etc/hosts for VMs that I use oftenif you
use vagrant, there's also a vagrant module that can do that automatically.
Also, I am not sure how the default HDP VM is set up, that is, if it only
binds HDFS to 127.0.0.1 or to all addresses. You can check that with
netstat -a.

R.

2015-08-24 11:46 GMT-07:00 Dino Fancellu d...@felstar.com:

 I have a file in HDFS inside my HortonWorks HDP 2.3_1 VirtualBox VM.

 If I go into the guest spark-shell and refer to the file thus, it works
 fine

   val words=sc.textFile(hdfs:///tmp/people.txt)
   words.count

 However if I try to access it from a local Spark app on my Windows host, it
 doesn't work

   val conf = new SparkConf().setMaster(local).setAppName(My App)
   val sc = new SparkContext(conf)

   val words=sc.textFile(hdfs://localhost:8020/tmp/people.txt)
   words.count

 Emits



 The port 8020 is open, and if I choose the wrong file name, it will tell me



 My pom has

 dependency
 groupIdorg.apache.spark/groupId
 artifactIdspark-core_2.11/artifactId
 version1.4.1/version
 scopeprovided/scope
 /dependency

 Am I doing something wrong?

 Thanks.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Local-Spark-talking-to-remote-HDFS-tp24425.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




spark and scala-2.11

2015-08-24 Thread Lanny Ripple
Hello,

The instructions for building spark against scala-2.11 indicate using
-Dspark-2.11.  When I look in the pom.xml I find a profile named
'spark-2.11' but nothing that would indicate I should set a property.  The
sbt build seems to need the -Dscala-2.11 property set.  Finally build/mvn
does a simple grep of scala.version (which doesn't change after running dev/
change-version-to-2.11.sh) so the build seems to be grabbing the 2.10.4
scala library.

Anyone know (from having done it and used it in production) if the build
instructions for spark-1.4.1 against Scala-2.11 are correct?

Thanks.
  -Lanny


Re: rdd count is throwing null pointer exception

2015-08-24 Thread Akhil Das
Move your count operation outside the foreach and use a broadcast to access
it inside the foreach.
On Aug 17, 2015 10:34 AM, Priya Ch learnings.chitt...@gmail.com wrote:

 Looks like because of Spark-5063
 RDD transformations and actions can only be invoked by the driver, not
 inside of other transformations; for example, rdd1.map(x =
 rdd2.values.count() * x) is invalid because the values transformation and
 count action cannot be performed inside of the rdd1.map transformation. For
 more information, see SPARK-5063.

 On Mon, Aug 17, 2015 at 8:13 PM, Preetam preetam...@gmail.com wrote:

 The error could be because of the missing brackets after the word cache -
 .ticketRdd.cache()

  On Aug 17, 2015, at 7:26 AM, Priya Ch learnings.chitt...@gmail.com
 wrote:
 
  Hi All,
 
   Thank you very much for the detailed explanation.
 
  I have scenario like this-
  I have rdd of ticket records and another rdd of booking records. for
 each ticket record, i need to check whether any link exists in booking
 table.
 
  val ticketCachedRdd = ticketRdd.cache
 
  ticketRdd.foreach{
  ticket =
  val bookingRecords =  queryOnBookingTable (date, flightNumber,
 flightCarrier)  // this function queries the booking table and retrieves
 the booking rows
  println(ticketCachedRdd.count) // this is throwing Null pointer
 exception
 
  }
 
  Is there somthing wrong in the count, i am trying to use the count of
 cached rdd when looping through the actual rdd. whats wrong in this ?
 
  Thanks,
  Padma Ch





Re: Exclude slf4j-log4j12 from the classpath via spark-submit

2015-08-24 Thread Utkarsh Sengar
Hi Marcelo,

When I add this exclusion rule to my pom:
dependency
groupIdorg.apache.spark/groupId
artifactIdspark-core_2.10/artifactId
version1.4.1/version
exclusions
exclusion
groupIdorg.slf4j/groupId
artifactIdslf4j-log4j12/artifactId
/exclusion
/exclusions
/dependency

The SparkRunner class works fine (from IntelliJ) but when I build a jar and
submit it to spark-submit:

I get this error:
Caused by: java.lang.ClassCastException: org.slf4j.impl.Log4jLoggerFactory
cannot be cast to ch.qos.logback.classic.LoggerContext
at
com.opentable.logging.AssimilateForeignLogging.assimilate(AssimilateForeignLogging.java:68)
at
com.opentable.logging.AssimilateForeignLoggingHook.automaticAssimilationHook(AssimilateForeignLoggingHook.java:28)
at com.opentable.logging.Log.clinit(Log.java:31)

Which is this here (our logging lib is open sourced):
https://github.com/opentable/otj-logging/blob/master/logging/src/main/java/com/opentable/logging/AssimilateForeignLogging.java#L68

Thanks,
-Utkarsh




On Mon, Aug 24, 2015 at 3:04 PM, Marcelo Vanzin van...@cloudera.com wrote:

 Hi Utkarsh,

 Unfortunately that's not going to be easy. Since Spark bundles all
 dependent classes into a single fat jar file, to remove that
 dependency you'd need to modify Spark's assembly jar (potentially in
 all your nodes). Doing that per-job is even trickier, because you'd
 probably need some kind of script to inject the correct binding into
 Spark's classpath.

 That being said, that message is not an error, it's more of a noisy
 warning. I'd expect slf4j to use the first binding available - in your
 case, logback-classic. Is that not the case?


 On Mon, Aug 24, 2015 at 2:50 PM, Utkarsh Sengar utkarsh2...@gmail.com
 wrote:
  Continuing this discussion:
 
 http://apache-spark-user-list.1001560.n3.nabble.com/same-log4j-slf4j-error-in-spark-9-1-td5592.html
 
  I am getting this error when I use logback-classic.
 
  SLF4J: Class path contains multiple SLF4J bindings.
  SLF4J: Found binding in
 
 [jar:file:.m2/repository/ch/qos/logback/logback-classic/1.1.2/logback-classic-1.1.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
  SLF4J: Found binding in
 
 [jar:file:.m2/repository/org/slf4j/slf4j-log4j12/1.7.10/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
 
  I need to use logback-classic for my current project, so I am trying to
  ignore slf4j-log4j12 from spark:
  dependency
  groupIdorg.apache.spark/groupId
  artifactIdspark-core_2.10/artifactId
  version1.4.1/version
  exclusions
  exclusion
  groupIdorg.slf4j/groupId
  artifactIdslf4j-log4j12/artifactId
  /exclusion
  /exclusions
  /dependency
 
  Now, when I run my job from Intellij (which sets the classpath), things
 work
  perfectly.
 
  But when I run my job via spark-submit:
  ~/spark-1.4.1-bin-hadoop2.4/bin/spark-submit --class runner.SparkRunner
  spark-0.1-SNAPSHOT-jar-with-dependencies.jar
  My job fails because spark-submit sets up the classpath and it re-adds
 the
  slf4j-log4j12.
 
  I am not adding spark jar to the uber-jar via the maven assembly plugin:
   dependencySets
  dependencySet
  ..
  useTransitiveDependenciesfalse/useTransitiveDependencies
  excludes
  excludeorg.apache.spark:spark-core_2.10/exclude
  /excludes
  /dependencySet
  /dependencySets
 
  So how can I exclude slf4j-log4j12.jar when I submit a job via
  spark-submit (on a per job basis)?
 
  --
  Thanks,
  -Utkarsh



 --
 Marcelo




-- 
Thanks,
-Utkarsh


Re: Spark Direct Streaming With ZK Updates

2015-08-24 Thread suchenzang
When updating the ZK offset in the driver (within foreachRDD), there is
somehow a serialization exception getting thrown:

15/08/24 15:45:40 ERROR JobScheduler: Error in job generator
java.io.NotSerializableException: org.I0Itec.zkclient.ZkClient
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)

The code looks something like:

directKafkaStream.transform { rdd =
   currOffsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
   rdd
 }.foreachRDD { rdd =
   ... /*do stuff with shuffling involved, then update DynamoDB*/

  val props = new Properties()
  kafkaConf.foreach(param = props.put(param._1, param._2))
  props.setProperty(AUTO_OFFSET_COMMIT, false)

  val consumerConfig = new ConsumerConfig(props)
  assert(!consumerConfig.autoCommitEnable)

  zkClient = new ZkClient(consumerConfig.zkConnect,
consumerConfig.zkSessionTimeoutMs,
consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer)

  offsetRanges.foreach { osr =
val topicDirs = new ZKGroupTopicDirs(groupId, osr.topic)
val zkPath = s${topicDirs.consumerOffsetDir}/${osr.partition}
ZkUtils.updatePersistentPath(zkClient, zkPath,
osr.untilOffset.toString)
  }
}

Why would there be serialization issues when I'm not trying to pass ZkClient
to any of the workers?

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Direct-Streaming-With-ZK-Updates-tp24423p24432.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: DataFrame#show cost 2 Spark Jobs ?

2015-08-24 Thread Jeff Zhang
Hi Cheng,

I know that sqlContext.read will trigger one spark job to infer the schema.
What I mean is DataFrame#show cost 2 spark jobs. So overall it would cost 3
jobs.

Here's the command I use:

 val df =
sqlContext.read.json(file:///Users/hadoop/github/spark/examples/src/main/resources/people.json)
   // trigger one spark job to infer schema
 df.show()// trigger 2 spark jobs which is weird




On Mon, Aug 24, 2015 at 10:56 PM, Cheng, Hao hao.ch...@intel.com wrote:

 The first job is to infer the json schema, and the second one is what you
 mean of the query.

 You can provide the schema while loading the json file, like below:



 sqlContext.read.schema(xxx).json(“…”)?



 Hao

 *From:* Jeff Zhang [mailto:zjf...@gmail.com]
 *Sent:* Monday, August 24, 2015 6:20 PM
 *To:* user@spark.apache.org
 *Subject:* DataFrame#show cost 2 Spark Jobs ?



 It's weird to me that the simple show function will cost 2 spark jobs.
 DataFrame#explain shows it is a very simple operation, not sure why need 2
 jobs.



 == Parsed Logical Plan ==

 Relation[age#0L,name#1]
 JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json]



 == Analyzed Logical Plan ==

 age: bigint, name: string

 Relation[age#0L,name#1]
 JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json]



 == Optimized Logical Plan ==

 Relation[age#0L,name#1]
 JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json]



 == Physical Plan ==

 Scan
 JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json][age#0L,name#1]







 --

 Best Regards

 Jeff Zhang




-- 
Best Regards

Jeff Zhang


Re: Exclude slf4j-log4j12 from the classpath via spark-submit

2015-08-24 Thread Utkarsh Sengar
I get the same error even when I set the SPARK_CLASSPATH: export
SPARK_CLASSPATH=/.m2/repository/ch/qos/logback/logback-classic/1.1.2/logback-classic-1.1.1.jar:/.m2/repository/ch/qos/logback/logback-core/1.1.2/logback-core-1.1.2.jar
And I run the job like this: /spark-1.4.1-bin-hadoop2.4/bin/spark-submit
--class runner.SparkRunner
target/simspark-0.1-SNAPSHOT-jar-with-dependencies.jar

I am not able to find the code in spark which adds these jars before the
spark classes in classpath. Or maybe its a bug. Any suggestions on
workarounds?

Thanks,
-Utkarsh


On Mon, Aug 24, 2015 at 4:32 PM, Utkarsh Sengar utkarsh2...@gmail.com
wrote:

 I assumed that's the case beacause of the error I got and the
 documentation which says: Extra classpath entries to append to the
 classpath of the driver.

 This is where I stand now:
 dependency
 groupIdorg.apache.spark/groupId
 artifactIdspark-core_2.10/artifactId
 version1.4.1/version
 exclusions
 exclusion
 groupIdorg.slf4j/groupId
 artifactIdslf4j-log4j12/artifactId
 /exclusion
 /exclusions
 /dependency

 And no exclusions from my logging lib.

 And I submit this task: spark-1.4.1-bin-hadoop2.4/bin/spark-submit --class
 runner.SparkRunner --conf
 spark.driver.extraClassPath=/.m2/repository/ch/qos/logback/logback-classic/1.1.2/logback-classic-1.1.2.jar
 --conf
 spark.executor.extraClassPath=/.m2/repository/ch/qos/logback/logback-classic/1.1.2/logback-classic-1.1.2.jar
 --conf
 spark.driver.extraClassPath=/.m2/repository/ch/qos/logback/logback-core/1.1.2/logback-core-1.1.2.jar
 --conf
 spark.executor.extraClassPath=/.m2/repository/ch/qos/logback/logback-core/1.1.2/logback-core-1.1.2.jar
 target/simspark-0.1-SNAPSHOT-jar-with-dependencies.jar

 And I get the same error:
 Caused by: java.lang.ClassCastException: org.slf4j.impl.Log4jLoggerFactory
 cannot be cast to ch.qos.logback.classic.LoggerContext
 at
 com.opentable.logging.AssimilateForeignLogging.assimilate(AssimilateForeignLogging.java:68)
 at
 com.opentable.logging.AssimilateForeignLoggingHook.automaticAssimilationHook(AssimilateForeignLoggingHook.java:28)
 at com.opentable.logging.Log.clinit(Log.java:31)
 ... 16 more


 Thanks,
 -Utkarsh

 On Mon, Aug 24, 2015 at 4:11 PM, Marcelo Vanzin van...@cloudera.com
 wrote:

 On Mon, Aug 24, 2015 at 3:58 PM, Utkarsh Sengar utkarsh2...@gmail.com
 wrote:
  That didn't work since extraClassPath flag was still appending the
 jars at
  the end, so its still picking the slf4j jar provided by spark.

 Out of curiosity, how did you verify this? The extraClassPath
 options are supposed to prepend entries to the classpath, and the code
 seems to be doing that. If it's not really doing that in some case,
 it's a bug that needs to be fixed.

 Another option is those is setting the SPARK_CLASSPATH env variable,
 which is deprecated, but might come in handy in case there is actually
 a bug in handling those options.


 --
 Marcelo




 --
 Thanks,
 -Utkarsh




-- 
Thanks,
-Utkarsh


Where is Redgate's HDFS explorer?

2015-08-24 Thread Dino Fancellu
http://hortonworks.com/blog/windows-explorer-experience-hdfs/

Seemed to exist, now now sign.

Anything similar to tie HDFS into windows explorer?

Thanks,



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Where-is-Redgate-s-HDFS-explorer-tp24431.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Exclude slf4j-log4j12 from the classpath via spark-submit

2015-08-24 Thread Marcelo Vanzin
Hi Utkarsh,

A quick look at slf4j's source shows it loads the first
StaticLoggerBinder in your classpath. How are you adding the logback
jar file to spark-submit?

If you use spark.driver.extraClassPath and
spark.executor.extraClassPath to add the jar, it should take
precedence over the log4j binding embedded in the Spark assembly.


On Mon, Aug 24, 2015 at 3:15 PM, Utkarsh Sengar utkarsh2...@gmail.com wrote:
 Hi Marcelo,

 When I add this exclusion rule to my pom:
 dependency
 groupIdorg.apache.spark/groupId
 artifactIdspark-core_2.10/artifactId
 version1.4.1/version
 exclusions
 exclusion
 groupIdorg.slf4j/groupId
 artifactIdslf4j-log4j12/artifactId
 /exclusion
 /exclusions
 /dependency

 The SparkRunner class works fine (from IntelliJ) but when I build a jar and
 submit it to spark-submit:

 I get this error:
 Caused by: java.lang.ClassCastException: org.slf4j.impl.Log4jLoggerFactory
 cannot be cast to ch.qos.logback.classic.LoggerContext
 at
 com.opentable.logging.AssimilateForeignLogging.assimilate(AssimilateForeignLogging.java:68)
 at
 com.opentable.logging.AssimilateForeignLoggingHook.automaticAssimilationHook(AssimilateForeignLoggingHook.java:28)
 at com.opentable.logging.Log.clinit(Log.java:31)

 Which is this here (our logging lib is open sourced):
 https://github.com/opentable/otj-logging/blob/master/logging/src/main/java/com/opentable/logging/AssimilateForeignLogging.java#L68

 Thanks,
 -Utkarsh




 On Mon, Aug 24, 2015 at 3:04 PM, Marcelo Vanzin van...@cloudera.com wrote:

 Hi Utkarsh,

 Unfortunately that's not going to be easy. Since Spark bundles all
 dependent classes into a single fat jar file, to remove that
 dependency you'd need to modify Spark's assembly jar (potentially in
 all your nodes). Doing that per-job is even trickier, because you'd
 probably need some kind of script to inject the correct binding into
 Spark's classpath.

 That being said, that message is not an error, it's more of a noisy
 warning. I'd expect slf4j to use the first binding available - in your
 case, logback-classic. Is that not the case?


 On Mon, Aug 24, 2015 at 2:50 PM, Utkarsh Sengar utkarsh2...@gmail.com
 wrote:
  Continuing this discussion:
 
  http://apache-spark-user-list.1001560.n3.nabble.com/same-log4j-slf4j-error-in-spark-9-1-td5592.html
 
  I am getting this error when I use logback-classic.
 
  SLF4J: Class path contains multiple SLF4J bindings.
  SLF4J: Found binding in
 
  [jar:file:.m2/repository/ch/qos/logback/logback-classic/1.1.2/logback-classic-1.1.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
  SLF4J: Found binding in
 
  [jar:file:.m2/repository/org/slf4j/slf4j-log4j12/1.7.10/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
 
  I need to use logback-classic for my current project, so I am trying to
  ignore slf4j-log4j12 from spark:
  dependency
  groupIdorg.apache.spark/groupId
  artifactIdspark-core_2.10/artifactId
  version1.4.1/version
  exclusions
  exclusion
  groupIdorg.slf4j/groupId
  artifactIdslf4j-log4j12/artifactId
  /exclusion
  /exclusions
  /dependency
 
  Now, when I run my job from Intellij (which sets the classpath), things
  work
  perfectly.
 
  But when I run my job via spark-submit:
  ~/spark-1.4.1-bin-hadoop2.4/bin/spark-submit --class runner.SparkRunner
  spark-0.1-SNAPSHOT-jar-with-dependencies.jar
  My job fails because spark-submit sets up the classpath and it re-adds
  the
  slf4j-log4j12.
 
  I am not adding spark jar to the uber-jar via the maven assembly plugin:
   dependencySets
  dependencySet
  ..
  useTransitiveDependenciesfalse/useTransitiveDependencies
  excludes
  excludeorg.apache.spark:spark-core_2.10/exclude
  /excludes
  /dependencySet
  /dependencySets
 
  So how can I exclude slf4j-log4j12.jar when I submit a job via
  spark-submit (on a per job basis)?
 
  --
  Thanks,
  -Utkarsh



 --
 Marcelo




 --
 Thanks,
 -Utkarsh



-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: DataFrame/JDBC very slow performance

2015-08-24 Thread Michael Armbrust

 Much appreciated! I am not comparing with select count(*) for
 performance, but it was one simple thing I tried to check the performance
 :). I think it now makes sense since Spark tries to extract all records
 before doing the count. I thought having an aggregated function query
 submitted over JDBC/Teradata would let Teradata do the heavy lifting.


We currently only push down filters since there is a lot of variability in
what types of aggregations various databases support.  You can manually
pushdown whatever you want by replacing the table name with a subquery
(i.e. (SELECT ... FROM ...))

   - How come my second query for (5B) records didn't return anything
 even after a long processing? If I understood correctly, Spark would try to
 fit it in memory and if not then might use disk space, which I have
 available?


Nothing should be held in memory for a query like this (other than a single
count per partition), so I don't think that is the problem.  There is
likely an error buried somewhere.


  - Am I supposed to do any Spark related tuning to make it work?

 My main need is to access data from these large table(s) on demand and
 provide aggregated and calculated results much quicker, for that  I was
 trying out Spark. Next step I am thinking to export data in Parque files
 and give it a try. Do you have any suggestions for to deal with the problem?


Exporting to parquet will likely be a faster option that trying to query
through JDBC, since we have many more opportunities for parallelism here.


Re: Exclude slf4j-log4j12 from the classpath via spark-submit

2015-08-24 Thread Utkarsh Sengar
That didn't work since extraClassPath flag was still appending the jars
at the end, so its still picking the slf4j jar provided by spark.
Although I found this flag: --conf spark.executor.userClassPathFirst=true
(http://spark.apache.org/docs/latest/configuration.html) and tried this:

➜  simspark git:(bulkrunner) ✗ spark-1.4.1-bin-hadoop2.4/bin/spark-submit
--class runner.SparkRunner --jars
/.m2/repository/ch/qos/logback/logback-classic/1.1.2/logback-classic-1.1.2.jar,/.m2/repository/ch/qos/logback/logback-core/1.1.2/logback-core-1.1.2.jar
--conf spark.executor.userClassPathFirst=true --conf
spark.driver.userClassPathFirst=true
target/ds-tetris-simspark-0.1-SNAPSHOT-jar-with-dependencies.jar

But this led to another error: com.typesafe.config.ConfigException$Missing:
No configuration setting found for key 'akka.version'

Thanks,
-Utkarsh

On Mon, Aug 24, 2015 at 3:25 PM, Marcelo Vanzin van...@cloudera.com wrote:

 Hi Utkarsh,

 A quick look at slf4j's source shows it loads the first
 StaticLoggerBinder in your classpath. How are you adding the logback
 jar file to spark-submit?

 If you use spark.driver.extraClassPath and
 spark.executor.extraClassPath to add the jar, it should take
 precedence over the log4j binding embedded in the Spark assembly.


 On Mon, Aug 24, 2015 at 3:15 PM, Utkarsh Sengar utkarsh2...@gmail.com
 wrote:
  Hi Marcelo,
 
  When I add this exclusion rule to my pom:
  dependency
  groupIdorg.apache.spark/groupId
  artifactIdspark-core_2.10/artifactId
  version1.4.1/version
  exclusions
  exclusion
  groupIdorg.slf4j/groupId
  artifactIdslf4j-log4j12/artifactId
  /exclusion
  /exclusions
  /dependency
 
  The SparkRunner class works fine (from IntelliJ) but when I build a jar
 and
  submit it to spark-submit:
 
  I get this error:
  Caused by: java.lang.ClassCastException:
 org.slf4j.impl.Log4jLoggerFactory
  cannot be cast to ch.qos.logback.classic.LoggerContext
  at
 
 com.opentable.logging.AssimilateForeignLogging.assimilate(AssimilateForeignLogging.java:68)
  at
 
 com.opentable.logging.AssimilateForeignLoggingHook.automaticAssimilationHook(AssimilateForeignLoggingHook.java:28)
  at com.opentable.logging.Log.clinit(Log.java:31)
 
  Which is this here (our logging lib is open sourced):
 
 https://github.com/opentable/otj-logging/blob/master/logging/src/main/java/com/opentable/logging/AssimilateForeignLogging.java#L68
 
  Thanks,
  -Utkarsh
 
 
 
 
  On Mon, Aug 24, 2015 at 3:04 PM, Marcelo Vanzin van...@cloudera.com
 wrote:
 
  Hi Utkarsh,
 
  Unfortunately that's not going to be easy. Since Spark bundles all
  dependent classes into a single fat jar file, to remove that
  dependency you'd need to modify Spark's assembly jar (potentially in
  all your nodes). Doing that per-job is even trickier, because you'd
  probably need some kind of script to inject the correct binding into
  Spark's classpath.
 
  That being said, that message is not an error, it's more of a noisy
  warning. I'd expect slf4j to use the first binding available - in your
  case, logback-classic. Is that not the case?
 
 
  On Mon, Aug 24, 2015 at 2:50 PM, Utkarsh Sengar utkarsh2...@gmail.com
  wrote:
   Continuing this discussion:
  
  
 http://apache-spark-user-list.1001560.n3.nabble.com/same-log4j-slf4j-error-in-spark-9-1-td5592.html
  
   I am getting this error when I use logback-classic.
  
   SLF4J: Class path contains multiple SLF4J bindings.
   SLF4J: Found binding in
  
  
 [jar:file:.m2/repository/ch/qos/logback/logback-classic/1.1.2/logback-classic-1.1.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
   SLF4J: Found binding in
  
  
 [jar:file:.m2/repository/org/slf4j/slf4j-log4j12/1.7.10/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
  
   I need to use logback-classic for my current project, so I am trying
 to
   ignore slf4j-log4j12 from spark:
   dependency
   groupIdorg.apache.spark/groupId
   artifactIdspark-core_2.10/artifactId
   version1.4.1/version
   exclusions
   exclusion
   groupIdorg.slf4j/groupId
   artifactIdslf4j-log4j12/artifactId
   /exclusion
   /exclusions
   /dependency
  
   Now, when I run my job from Intellij (which sets the classpath),
 things
   work
   perfectly.
  
   But when I run my job via spark-submit:
   ~/spark-1.4.1-bin-hadoop2.4/bin/spark-submit --class
 runner.SparkRunner
   spark-0.1-SNAPSHOT-jar-with-dependencies.jar
   My job fails because spark-submit sets up the classpath and it re-adds
   the
   slf4j-log4j12.
  
   I am not adding spark jar to the uber-jar via the maven assembly
 plugin:
dependencySets
   dependencySet
   ..
  
  useTransitiveDependenciesfalse/useTransitiveDependencies
   

Re: Protobuf error when streaming from Kafka

2015-08-24 Thread Ted Yu
Can you show the complete stack trace ?

Which Spark / Kafka release are you using ?

Thanks

On Mon, Aug 24, 2015 at 4:58 PM, Cassa L lcas...@gmail.com wrote:

 Hi,
  I am storing messages in Kafka using protobuf and reading them into
 Spark. I upgraded protobuf version from 2.4.1 to 2.5.0. I got
 java.lang.UnsupportedOperationException for older messages. However, even
 for new messages I get the same error. Spark does convert it though. I see
 my messages. How do I get rid of this error?
 java.lang.UnsupportedOperationException: This is supposed to be overridden
 by subclasses.
 at
 com.google.protobuf.GeneratedMessage.getUnknownFields(GeneratedMessage.java:180)
 at
 org.apache.hadoop.hdfs.protocol.proto.HdfsProtos$FsPermissionProto.getSerializedSize(HdfsProtos.java:5407)
 at
 com.google.protobuf.CodedOutputStream.computeMessageSizeNoTag(CodedOutputStream.java:749)



Re: Exclude slf4j-log4j12 from the classpath via spark-submit

2015-08-24 Thread Marcelo Vanzin
On Mon, Aug 24, 2015 at 3:58 PM, Utkarsh Sengar utkarsh2...@gmail.com wrote:
 That didn't work since extraClassPath flag was still appending the jars at
 the end, so its still picking the slf4j jar provided by spark.

Out of curiosity, how did you verify this? The extraClassPath
options are supposed to prepend entries to the classpath, and the code
seems to be doing that. If it's not really doing that in some case,
it's a bug that needs to be fixed.

Another option is those is setting the SPARK_CLASSPATH env variable,
which is deprecated, but might come in handy in case there is actually
a bug in handling those options.


-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Protobuf error when streaming from Kafka

2015-08-24 Thread Cassa L
Hi,
 I am storing messages in Kafka using protobuf and reading them into Spark.
I upgraded protobuf version from 2.4.1 to 2.5.0. I got
java.lang.UnsupportedOperationException for older messages. However, even
for new messages I get the same error. Spark does convert it though. I see
my messages. How do I get rid of this error?
java.lang.UnsupportedOperationException: This is supposed to be overridden
by subclasses.
at
com.google.protobuf.GeneratedMessage.getUnknownFields(GeneratedMessage.java:180)
at
org.apache.hadoop.hdfs.protocol.proto.HdfsProtos$FsPermissionProto.getSerializedSize(HdfsProtos.java:5407)
at
com.google.protobuf.CodedOutputStream.computeMessageSizeNoTag(CodedOutputStream.java:749)


Re: Exclude slf4j-log4j12 from the classpath via spark-submit

2015-08-24 Thread Utkarsh Sengar
I assumed that's the case beacause of the error I got and the documentation
which says: Extra classpath entries to append to the classpath of the
driver.

This is where I stand now:
dependency
groupIdorg.apache.spark/groupId
artifactIdspark-core_2.10/artifactId
version1.4.1/version
exclusions
exclusion
groupIdorg.slf4j/groupId
artifactIdslf4j-log4j12/artifactId
/exclusion
/exclusions
/dependency

And no exclusions from my logging lib.

And I submit this task: spark-1.4.1-bin-hadoop2.4/bin/spark-submit --class
runner.SparkRunner --conf
spark.driver.extraClassPath=/.m2/repository/ch/qos/logback/logback-classic/1.1.2/logback-classic-1.1.2.jar
--conf
spark.executor.extraClassPath=/.m2/repository/ch/qos/logback/logback-classic/1.1.2/logback-classic-1.1.2.jar
--conf
spark.driver.extraClassPath=/.m2/repository/ch/qos/logback/logback-core/1.1.2/logback-core-1.1.2.jar
--conf
spark.executor.extraClassPath=/.m2/repository/ch/qos/logback/logback-core/1.1.2/logback-core-1.1.2.jar
target/simspark-0.1-SNAPSHOT-jar-with-dependencies.jar

And I get the same error:
Caused by: java.lang.ClassCastException: org.slf4j.impl.Log4jLoggerFactory
cannot be cast to ch.qos.logback.classic.LoggerContext
at
com.opentable.logging.AssimilateForeignLogging.assimilate(AssimilateForeignLogging.java:68)
at
com.opentable.logging.AssimilateForeignLoggingHook.automaticAssimilationHook(AssimilateForeignLoggingHook.java:28)
at com.opentable.logging.Log.clinit(Log.java:31)
... 16 more


Thanks,
-Utkarsh

On Mon, Aug 24, 2015 at 4:11 PM, Marcelo Vanzin van...@cloudera.com wrote:

 On Mon, Aug 24, 2015 at 3:58 PM, Utkarsh Sengar utkarsh2...@gmail.com
 wrote:
  That didn't work since extraClassPath flag was still appending the
 jars at
  the end, so its still picking the slf4j jar provided by spark.

 Out of curiosity, how did you verify this? The extraClassPath
 options are supposed to prepend entries to the classpath, and the code
 seems to be doing that. If it's not really doing that in some case,
 it's a bug that needs to be fixed.

 Another option is those is setting the SPARK_CLASSPATH env variable,
 which is deprecated, but might come in handy in case there is actually
 a bug in handling those options.


 --
 Marcelo




-- 
Thanks,
-Utkarsh


What does Attribute and AttributeReference mean in Spark SQL

2015-08-24 Thread Todd
There are many such kind of case class or concept such as 
Attribute/AttributeReference/Expression in Spark SQL

I would ask what Attribute/AttributeReference/Expression mean, given a sql 
query like select a,b from c, it a,  b are two Attributes? a + b is an 
expression?
Looks I misunderstand it because Attribute is extending Expression in the 
code,which means Attribute itself is an Expression.


Thanks.


Re: Too many files/dirs in hdfs

2015-08-24 Thread Mohit Anchlia
Any help would be appreciated

On Wed, Aug 19, 2015 at 9:38 AM, Mohit Anchlia mohitanch...@gmail.com
wrote:

 My question was how to do this in Hadoop? Could somebody point me to some
 examples?

 On Tue, Aug 18, 2015 at 10:43 PM, UMESH CHAUDHARY umesh9...@gmail.com
 wrote:

 Of course, Java or Scala can do that:
 1) Create a FileWriter with append or roll over option
 2) For each RDD create a StringBuilder after applying your filters
 3) Write this StringBuilder to File when you want to write (The duration
 can be defined as a condition)

 On Tue, Aug 18, 2015 at 11:05 PM, Mohit Anchlia mohitanch...@gmail.com
 wrote:

 Is there a way to store all the results in one file and keep the file
 roll over separate than the spark streaming batch interval?

 On Mon, Aug 17, 2015 at 2:39 AM, UMESH CHAUDHARY umesh9...@gmail.com
 wrote:

 In Spark Streaming you can simply check whether your RDD contains any
 records or not and if records are there you can save them using
 FIleOutputStream:

 DStream.foreachRDD(t= { var count = t.count(); if (count0){ // SAVE
 YOUR STUFF} };

 This will not create unnecessary files of 0 bytes.

 On Mon, Aug 17, 2015 at 2:51 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Currently, spark streaming would create a new directory for every
 batch and store the data to it (whether it has anything or not). There is
 no direct append call as of now, but you can achieve this either with
 FileUtil.copyMerge
 http://apache-spark-user-list.1001560.n3.nabble.com/save-spark-streaming-output-to-single-file-on-hdfs-td21124.html#a21167
 or have a separate program which will do the clean up for you.

 Thanks
 Best Regards

 On Sat, Aug 15, 2015 at 5:20 AM, Mohit Anchlia mohitanch...@gmail.com
  wrote:

 Spark stream seems to be creating 0 bytes files even when there is no
 data. Also, I have 2 concerns here:

 1) Extra unnecessary files is being created from the output
 2) Hadoop doesn't work really well with too many files and I see that
 it is creating a directory with a timestamp every 1 second. Is there a
 better way of writing a file, may be use some kind of append mechanism
 where one doesn't have to change the batch interval.









Re: MLlib Prefixspan implementation

2015-08-24 Thread Feynman Liang
CCing the mailing list again.

It's currently not on the radar. Do you have a use case for it? I can bring
it up during 1.6 roadmap planning tomorrow.

On Mon, Aug 24, 2015 at 8:28 PM, alexis GILLAIN ila...@hotmail.com wrote:

 Hi,

 I just realized the article I mentioned is cited in the jira and not in
 the code so I guess you didn't use this result.

 Do you plan to implement sequence with timestamp and gap constraint as in :

 https://people.mpi-inf.mpg.de/~rgemulla/publications/miliaraki13mg-fsm.pdf

 2015-08-25 7:06 GMT+08:00 Feynman Liang fli...@databricks.com:

 Hi Alexis,

 Unfortunately, both of the papers you referenced appear to be
 translations and are quite difficult to understand. We followed
 http://doi.org/10.1109/ICDE.2001.914830 when implementing PrefixSpan.
 Perhaps you can find the relevant lines in there so I can elaborate further?

 Feynman

 On Thu, Aug 20, 2015 at 9:07 AM, alexis GILLAIN ila...@hotmail.com
 wrote:

 I want to use prefixspan so I had a look at the code and the cited paper
 : Distributed PrefixSpan Algorithm Based on MapReduce.

 There is a result in the paper I didn't really undertstand and I
 could'nt find where it is used in the code.

 Suppose a sequence database S = {­1­,2...­n}, a sequence a... is a
 length-(L-1) (2≤L≤n) sequential pattern, in projected databases which is a
 prefix of a length-(L-1) sequential pattern a...a, when the support count
 of a is not less than min_support, it is equal to obtaining a length-L
 sequential pattern  a ... a  from projected databases that obtaining a
 length-L sequential pattern  a ... a  from a sequence database S.

 According to the paper It's supposed to add a pruning step in the reduce
 function but I couldn't find where.

 This result seems to come from a previous paper : Wang Linlin, Fan Jun.
 Improved Algorithm for Sequential Pattern Mining Based on PrefixSpan [J].
 Computer Engineering, 2009, 35(23): 56-61 but it didn't help me to
 understand it and how it can improve the algorithm.






Re: DataFrame#show cost 2 Spark Jobs ?

2015-08-24 Thread Shixiong Zhu
Hao,

I can reproduce it using the master branch. I'm curious why you cannot
reproduce it. Did you check if the input HadoopRDD did have two partitions?
My test code is

val df = sqlContext.read.json(examples/src/main/resources/people.json)
df.show()


Best Regards,
Shixiong Zhu

2015-08-25 13:01 GMT+08:00 Cheng, Hao hao.ch...@intel.com:

 Hi Jeff, which version are you using? I couldn’t reproduce the 2 spark
 jobs in the `df.show()` with latest code, we did refactor the code for json
 data source recently, not sure you’re running an earlier version of it.



 And a known issue is Spark SQL will try to re-list the files every time
 when loading the data for JSON, it’s probably causes longer time for ramp
 up with large number of files/partitions.



 *From:* Jeff Zhang [mailto:zjf...@gmail.com]
 *Sent:* Tuesday, August 25, 2015 8:11 AM
 *To:* Cheng, Hao
 *Cc:* user@spark.apache.org
 *Subject:* Re: DataFrame#show cost 2 Spark Jobs ?



 Hi Cheng,



 I know that sqlContext.read will trigger one spark job to infer the
 schema. What I mean is DataFrame#show cost 2 spark jobs. So overall it
 would cost 3 jobs.



 Here's the command I use:



  val df = sqlContext.read.json(
 file:///Users/hadoop/github/spark/examples/src/main/resources/people.json)
// trigger one spark job to infer schema

  df.show()// trigger 2 spark jobs which is weird









 On Mon, Aug 24, 2015 at 10:56 PM, Cheng, Hao hao.ch...@intel.com wrote:

 The first job is to infer the json schema, and the second one is what you
 mean of the query.

 You can provide the schema while loading the json file, like below:



 sqlContext.read.schema(xxx).json(“…”)?



 Hao

 *From:* Jeff Zhang [mailto:zjf...@gmail.com]
 *Sent:* Monday, August 24, 2015 6:20 PM
 *To:* user@spark.apache.org
 *Subject:* DataFrame#show cost 2 Spark Jobs ?



 It's weird to me that the simple show function will cost 2 spark jobs.
 DataFrame#explain shows it is a very simple operation, not sure why need 2
 jobs.



 == Parsed Logical Plan ==

 Relation[age#0L,name#1]
 JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json]



 == Analyzed Logical Plan ==

 age: bigint, name: string

 Relation[age#0L,name#1]
 JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json]



 == Optimized Logical Plan ==

 Relation[age#0L,name#1]
 JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json]



 == Physical Plan ==

 Scan
 JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json][age#0L,name#1]







 --

 Best Regards

 Jeff Zhang





 --

 Best Regards

 Jeff Zhang



RE: DataFrame#show cost 2 Spark Jobs ?

2015-08-24 Thread Cheng, Hao
Hi Jeff, which version are you using? I couldn’t reproduce the 2 spark jobs in 
the `df.show()` with latest code, we did refactor the code for json data source 
recently, not sure you’re running an earlier version of it.

And a known issue is Spark SQL will try to re-list the files every time when 
loading the data for JSON, it’s probably causes longer time for ramp up with 
large number of files/partitions.

From: Jeff Zhang [mailto:zjf...@gmail.com]
Sent: Tuesday, August 25, 2015 8:11 AM
To: Cheng, Hao
Cc: user@spark.apache.org
Subject: Re: DataFrame#show cost 2 Spark Jobs ?

Hi Cheng,

I know that sqlContext.read will trigger one spark job to infer the schema. 
What I mean is DataFrame#show cost 2 spark jobs. So overall it would cost 3 
jobs.

Here's the command I use:

 val df = 
 sqlContext.read.json(file:///Users/hadoop/github/spark/examples/src/main/resources/people.jsonfile:///\\Users\hadoop\github\spark\examples\src\main\resources\people.json)
 // trigger one spark job to infer schema
 df.show()// trigger 2 spark jobs which is weird




On Mon, Aug 24, 2015 at 10:56 PM, Cheng, Hao 
hao.ch...@intel.commailto:hao.ch...@intel.com wrote:
The first job is to infer the json schema, and the second one is what you mean 
of the query.
You can provide the schema while loading the json file, like below:

sqlContext.read.schema(xxx).json(“…”)?

Hao
From: Jeff Zhang [mailto:zjf...@gmail.commailto:zjf...@gmail.com]
Sent: Monday, August 24, 2015 6:20 PM
To: user@spark.apache.orgmailto:user@spark.apache.org
Subject: DataFrame#show cost 2 Spark Jobs ?

It's weird to me that the simple show function will cost 2 spark jobs. 
DataFrame#explain shows it is a very simple operation, not sure why need 2 jobs.

== Parsed Logical Plan ==
Relation[age#0L,name#1] 
JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json]

== Analyzed Logical Plan ==
age: bigint, name: string
Relation[age#0L,name#1] 
JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json]

== Optimized Logical Plan ==
Relation[age#0L,name#1] 
JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json]

== Physical Plan ==
Scan 
JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json][age#0L,name#1]



--
Best Regards

Jeff Zhang



--
Best Regards

Jeff Zhang


Re: build spark 1.4.1 with JDK 1.6

2015-08-24 Thread Eric Friedman
I'm trying to build Spark 1.4 with Java 7 and despite having that as my
JAVA_HOME, I get

[INFO] --- scala-maven-plugin:3.2.2:compile (scala-compile-first) @
spark-launcher_2.10 ---

[INFO] Using zinc server for incremental compilation

[info] Compiling 8 Java sources to
/Users/eric/spark/spark/launcher/target/scala-2.10/classes...

[error] javac: invalid source release: 1.7

[error] Usage: javac options source files

[error] use -help for a list of possible options

[error] Compile failed at Aug 24, 2015 7:44:40 PM [0.020s]

[INFO]


[INFO] Reactor Summary:

[INFO]

[INFO] Spark Project Parent POM ... SUCCESS [
3.109 s]

[INFO] Spark Project Launcher . FAILURE [
4.493 s]

On Fri, Aug 21, 2015 at 9:43 AM, Marcelo Vanzin van...@cloudera.com wrote:

 That was only true until Spark 1.3. Spark 1.4 can be built with JDK7
 and pyspark will still work.

 On Fri, Aug 21, 2015 at 8:29 AM, Chen Song chen.song...@gmail.com wrote:
  Thanks Sean.
 
  So how PySpark is supported. I thought PySpark needs jdk 1.6.
 
  Chen
 
  On Fri, Aug 21, 2015 at 11:16 AM, Sean Owen so...@cloudera.com wrote:
 
  Spark 1.4 requires Java 7.
 
 
  On Fri, Aug 21, 2015, 3:12 PM Chen Song chen.song...@gmail.com wrote:
 
  I tried to build Spark 1.4.1 on cdh 5.4.0. Because we need to support
  PySpark, I used JDK 1.6.
 
  I got the following error,
 
  [INFO] --- scala-maven-plugin:3.2.0:testCompile
  (scala-test-compile-first) @ spark-streaming_2.10 ---
 
  java.lang.UnsupportedClassVersionError:
 org/apache/hadoop/io/LongWritable
  : Unsupported major.minor version 51.0
  at java.lang.ClassLoader.defineClass1(Native Method)
  at java.lang.ClassLoader.defineClassCond(ClassLoader.java:637)
  at java.lang.ClassLoader.defineClass(ClassLoader.java:621)
  at
  java.security.SecureClassLoader.defineClass(SecureClassLoader.java:141)
 
  I know that is due to the hadoop jar for cdh5.4.0 is built with JDK 7.
  Anyone has done this before?
 
  Thanks,
 
  --
  Chen Song
 
 
 
 
  --
  Chen Song
 
  --
 
  ---
  You received this message because you are subscribed to the Google Groups
  CDH Users group.
  To unsubscribe from this group and stop receiving emails from it, send an
  email to cdh-user+unsubscr...@cloudera.org.
  For more options, visit
 https://groups.google.com/a/cloudera.org/d/optout.



 --
 Marcelo

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: DataFrame#show cost 2 Spark Jobs ?

2015-08-24 Thread Shixiong Zhu
Because defaultMinPartitions is 2 (See
https://github.com/apache/spark/blob/642c43c81c835139e3f35dfd6a215d668a474203/core/src/main/scala/org/apache/spark/SparkContext.scala#L2057
), your input people.json will be split to 2 partitions.

At first, `take` will start a job for the first partition. However, the
limit is 21, but the first partition only has 2 records. So it will
continue to start a new job for the second partition.

You can check implementation details in SparkPlan.executeTake:
https://github.com/apache/spark/blob/642c43c81c835139e3f35dfd6a215d668a474203/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala#L185

Best Regards,
Shixiong Zhu

2015-08-25 8:11 GMT+08:00 Jeff Zhang zjf...@gmail.com:

 Hi Cheng,

 I know that sqlContext.read will trigger one spark job to infer the
 schema. What I mean is DataFrame#show cost 2 spark jobs. So overall it
 would cost 3 jobs.

 Here's the command I use:

  val df =
 sqlContext.read.json(file:///Users/hadoop/github/spark/examples/src/main/resources/people.json)
// trigger one spark job to infer schema
  df.show()// trigger 2 spark jobs which is weird




 On Mon, Aug 24, 2015 at 10:56 PM, Cheng, Hao hao.ch...@intel.com wrote:

 The first job is to infer the json schema, and the second one is what you
 mean of the query.

 You can provide the schema while loading the json file, like below:



 sqlContext.read.schema(xxx).json(“…”)?



 Hao

 *From:* Jeff Zhang [mailto:zjf...@gmail.com]
 *Sent:* Monday, August 24, 2015 6:20 PM
 *To:* user@spark.apache.org
 *Subject:* DataFrame#show cost 2 Spark Jobs ?



 It's weird to me that the simple show function will cost 2 spark jobs.
 DataFrame#explain shows it is a very simple operation, not sure why need 2
 jobs.



 == Parsed Logical Plan ==

 Relation[age#0L,name#1]
 JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json]



 == Analyzed Logical Plan ==

 age: bigint, name: string

 Relation[age#0L,name#1]
 JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json]



 == Optimized Logical Plan ==

 Relation[age#0L,name#1]
 JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json]



 == Physical Plan ==

 Scan
 JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json][age#0L,name#1]







 --

 Best Regards

 Jeff Zhang




 --
 Best Regards

 Jeff Zhang



Re: Drop table and Hive warehouse

2015-08-24 Thread Kevin Jung
Thanks, Michael.
I discovered it myself. Finally, it was not a bug from Spark. 
I have two HDFS cluster and Hive uses hive.metastore.warehouse.dir + 
fs.defaultFS(HDFS1) for saving internal tables and also reference a default 
database URI(HDFS2) in DBS table from metastore.
It may not be a problem if URI of default database is same as fs.defaultFS.
Maybe few of people set their default database URI to another HDFS like me.
I copied hive-site.xml into spark conf then Hive and Spark had same metastore 
configuration.
But the result table of saveAsTable has its metadata in HDFS1 and its data in 
HDFS2.
DESCRIBE FORMATTED table_name will show the difference between Location of 
table(HDFS1) and Path in Storage Desc Params(HDFS2) even though table is type 
of MANAGED_TABLE.
That is why DROP TABLE deletes only metadata in HDFS1 and NOT delete data 
files in HDFS2.
So I can not reproduce a table with same location and same name. If I update 
DBS table in metastoredb to set default database URI to HDFS1, it works 
perfectly.


Kevin

--- Original Message ---
Sender : Michael Armbrustmich...@databricks.com
Date : 2015-08-25 00:43 (GMT+09:00)
Title : Re: Drop table and Hive warehouse

Thats not the expected behavior.  What version of Spark?


On Mon, Aug 24, 2015 at 1:32 AM, Kevin Jung itsjb.j...@samsung.com wrote:

When I store DataFrame as table with command saveAsTable and then execute 
DROP TABLE in SparkSQL, it doesn't actually delete files in hive warehouse.
The table disappears from a table list but the data files are still alive.
Because of this, I can't saveAsTable with a same name before dropping table.
Is it a normal situation? If it is, I will delete files manually ;)

Kevin




상기 메일은 지정된 수신인만을 위한 것이며, 부정경쟁방지 및 영업비밀보호에 관한 법률,개인정보 보호법을 포함하여
 관련 법령에 따라 보호의 대상이 되는 영업비밀, 산업기술,기밀정보, 개인정보 등을 포함하고 있을 수 있습니다.
본 문서에 포함된 정보의 전부 또는 일부를 무단으로 복사 또는 사용하거나 제3자에게 공개, 배포, 제공하는 것은 엄격히
 금지됩니다. 본 메일이 잘못 전송된 경우 발신인 또는 당사에게 알려주시고 본 메일을 즉시 삭제하여 주시기 바랍니다. 
The contents of this e-mail message and any attachments are confidential and 
are intended solely for addressee.
 The information may also be legally privileged. This transmission is sent in 
trust, for the sole purpose of delivery
 to the intended recipient. If you have received this transmission in error, 
any use, reproduction or dissemination of
 this transmission is strictly prohibited. If you are not the intended 
recipient, please immediately notify the sender
 by reply e-mail or phone and delete this message and its attachments, if any.

Re: Spark Direct Streaming With ZK Updates

2015-08-24 Thread Cody Koeninger
I'd start off by trying to simplify that closure - you don't need the
transform step, or currOffsetRanges to be scoped outside of it.  Just do
everything in foreachRDD.  LIkewise, it looks like zkClient is also scoped
outside of the closure passed to foreachRDD

i.e. you have

zkClient = new ZkClient(consumerConfig.zkConnec

instead of

val zkClient = new ZkClient(consumerConfig.zkConnec

On Mon, Aug 24, 2015 at 5:53 PM, suchenzang suchenz...@gmail.com wrote:

 When updating the ZK offset in the driver (within foreachRDD), there is
 somehow a serialization exception getting thrown:

 15/08/24 15:45:40 ERROR JobScheduler: Error in job generator
 java.io.NotSerializableException: org.I0Itec.zkclient.ZkClient
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
 at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)

 The code looks something like:

 directKafkaStream.transform { rdd =
currOffsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
  }.foreachRDD { rdd =
... /*do stuff with shuffling involved, then update DynamoDB*/

   val props = new Properties()
   kafkaConf.foreach(param = props.put(param._1, param._2))
   props.setProperty(AUTO_OFFSET_COMMIT, false)

   val consumerConfig = new ConsumerConfig(props)
   assert(!consumerConfig.autoCommitEnable)

   zkClient = new ZkClient(consumerConfig.zkConnect,
 consumerConfig.zkSessionTimeoutMs,
 consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer)

   offsetRanges.foreach { osr =
 val topicDirs = new ZKGroupTopicDirs(groupId, osr.topic)
 val zkPath = s${topicDirs.consumerOffsetDir}/${osr.partition}
 ZkUtils.updatePersistentPath(zkClient, zkPath,
 osr.untilOffset.toString)
   }
 }

 Why would there be serialization issues when I'm not trying to pass
 ZkClient
 to any of the workers?

 Thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Direct-Streaming-With-ZK-Updates-tp24423p24432.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




RE: Test case for the spark sql catalyst

2015-08-24 Thread Cheng, Hao
Yes, check the source code under: 
https://github.com/apache/spark/tree/master/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst

From: Todd [mailto:bit1...@163.com]
Sent: Tuesday, August 25, 2015 1:01 PM
To: user@spark.apache.org
Subject: Test case for the spark sql catalyst

Hi, Are there test cases for the spark sql catalyst, such as testing the rules 
of transforming unsolved query plan?
Thanks!


Re: Spark

2015-08-24 Thread Sonal Goyal
I think you could try sorting the endPointsCount and then doing a take.
This should be a distributed process and only the result would get returned
to the driver.

Best Regards,
Sonal
Founder, Nube Technologies http://www.nubetech.co
Check out Reifier at Spark Summit 2015
https://spark-summit.org/2015/events/real-time-fuzzy-matching-with-spark-and-elastic-search/

http://in.linkedin.com/in/sonalgoyal



On Tue, Aug 25, 2015 at 10:22 AM, Spark Enthusiast sparkenthusi...@yahoo.in
 wrote:

 I was running a Spark Job to crunch a 9GB apache log file When I saw the
 following error:


 15/08/25 04:25:16 WARN scheduler.TaskSetManager: Lost task 99.0 in stage
 37.0 (TID 4115, ip-10-150-137-100.ap-southeast-1.compute.internal):
 ExecutorLostFailure (executor 29 lost)
 15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted
 ShuffleMapTask(37, 40), so marking it as still running
 15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted
 ShuffleMapTask(37, 86), so marking it as still running
 15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted
 ShuffleMapTask(37, 84), so marking it as still running
 15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted
 ShuffleMapTask(37, 22), so marking it as still running
 15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted
 ShuffleMapTask(37, 48), so marking it as still running
 15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted
 ShuffleMapTask(37, 12), so marking it as still running
 15/08/25 04:25:16 INFO scheduler.DAGScheduler: Executor lost: 29 (epoch 59)
 15/08/25 04:25:16 INFO storage.BlockManagerMasterActor: Trying to remove
 executor 29 from BlockManagerMaster.
 15/08/25 04:25:16 INFO storage.BlockManagerMasterActor: Removing block
 manager BlockManagerId(29,
 ip-10-150-137-100.ap-southeast-1.compute.internal, 39411)

   .
   .
 Encountered Exception An error occurred while calling
 z:org.apache.spark.api.python.PythonRDD.collectAndServe.
 : org.apache.spark.SparkException: Job cancelled because SparkContext was
 shut down
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:699)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:698)
 at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
 at
 org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:698)
 at
 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1411)
 at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
 at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1346)
 at org.apache.spark.SparkContext.stop(SparkContext.scala:1380)
 at
 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$$anon$1.run(YarnClientSchedulerBackend.scala:143)

 .
 .

 Looking further, it seems like takeOrdered (called by my application) uses
 collect() internally and hence drains out all the Drive memory.

 line 361, in top10EndPoints
 topEndpoints = endpointCounts.takeOrdered(10, lambda s: -1 * s[1])
   File /home/hadoop/spark/python/pyspark/rdd.py, line 1174, in
 takeOrdered
 return self.mapPartitions(lambda it: [heapq.nsmallest(num, it,
 key)]).reduce(merge)
   File /home/hadoop/spark/python/pyspark/rdd.py, line 739, in reduce
 vals = self.mapPartitions(func).collect()
   File /home/hadoop/spark/python/pyspark/rdd.py, line 713, in collect
 port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
   File
 /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py,
 line 538, in __call__
 self.target_id, self.name)
   File
 /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line
 300, in get_return_value
 format(target_id, '.', name), value)

 How can I rewrite this code



 endpointCounts = (access_logs

   .map(lambda log: (log.endpoint, 1))

   .reduceByKey(lambda a, b : a + b))


 #Endpoints is now a list of Tuples of [(endpoint1, count1), (endpoint2, 
 count2), ]


 topEndpoints = endpointCounts.takeOrdered(10, lambda s: -1 * s[1])


 so that this error does not happen?




Re: How to list all dataframes and RDDs available in current session?

2015-08-24 Thread Dhaval Gmail
Okay but how? thats what I am trying to figure out ? Any command you would 
suggest? 

Sent from my iPhone, plaese excuse any typos :)

 On Aug 21, 2015, at 11:45 PM, Raghavendra Pandey 
 raghavendra.pan...@gmail.com wrote:
 
 You get the list of all the persistet rdd using spark context...
 On Aug 21, 2015 12:06 AM, Rishitesh Mishra rishi80.mis...@gmail.com 
 wrote:
 I am not sure if you can view all RDDs in a session. Tables are maintained 
 in a catalogue . Hence its easier. However  you can see the DAG 
 representation , which lists all the RDDs in a job , with Spark UI.
 
 On 20 Aug 2015 22:34, Dhaval Patel dhaval1...@gmail.com wrote:
 Apologies 
 
 I accidentally included Spark User DL on BCC. The actual email message is 
 below.
 =
 
 
 Hi:
 
 I have been working on few example using zeppelin.
 
 I have been trying to find a command that would list all *dataframes/RDDs* 
 that has been created in current session. Anyone knows if there is any such 
 commands available?
 
 Something similar to SparkSQL to list all temp tables : 
   show tables;
 
 Thanks,
 Dhaval
 
 
 
 On Thu, Aug 20, 2015 at 12:49 PM, Dhaval Patel dhaval1...@gmail.com 
 wrote:
 Hi:
 
 I have been working on few example using zeppelin.
 
 I have been trying to find a command that would list all *dataframes/RDDs* 
 that has been created in current session. Anyone knows if there is any 
 such commands available?
 
 Something similar to SparkSQL to list all temp tables : 
   show tables;
 
 Thanks,
 Dhaval


Spark

2015-08-24 Thread Spark Enthusiast
I was running a Spark Job to crunch a 9GB apache log file When I saw the 
following error:


15/08/25 04:25:16 WARN scheduler.TaskSetManager: Lost task 99.0 in stage 37.0 
(TID 4115, ip-10-150-137-100.ap-southeast-1.compute.internal): 
ExecutorLostFailure (executor 29 lost)15/08/25 04:25:16 INFO 
scheduler.DAGScheduler: Resubmitted ShuffleMapTask(37, 40), so marking it as 
still running15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted 
ShuffleMapTask(37, 86), so marking it as still running15/08/25 04:25:16 INFO 
scheduler.DAGScheduler: Resubmitted ShuffleMapTask(37, 84), so marking it as 
still running15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted 
ShuffleMapTask(37, 22), so marking it as still running15/08/25 04:25:16 INFO 
scheduler.DAGScheduler: Resubmitted ShuffleMapTask(37, 48), so marking it as 
still running15/08/25 04:25:16 INFO scheduler.DAGScheduler: Resubmitted 
ShuffleMapTask(37, 12), so marking it as still running15/08/25 04:25:16 INFO 
scheduler.DAGScheduler: Executor lost: 29 (epoch 59)15/08/25 04:25:16 INFO 
storage.BlockManagerMasterActor: Trying to remove executor 29 from 
BlockManagerMaster.15/08/25 04:25:16 INFO storage.BlockManagerMasterActor: 
Removing block manager BlockManagerId(29, 
ip-10-150-137-100.ap-southeast-1.compute.internal, 39411)
                      .                      .Encountered Exception An error 
occurred while calling 
z:org.apache.spark.api.python.PythonRDD.collectAndServe.: 
org.apache.spark.SparkException: Job cancelled because SparkContext was shut 
down at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:699)
 at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:698)
 at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at 
org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:698)
 at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1411)
 at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84) at 
org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1346) at 
org.apache.spark.SparkContext.stop(SparkContext.scala:1380) at 
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$$anon$1.run(YarnClientSchedulerBackend.scala:143)
                    .                    .
Looking further, it seems like takeOrdered (called by my application) uses 
collect() internally and hence drains out all the Drive memory.
line 361, in top10EndPoints    topEndpoints = endpointCounts.takeOrdered(10, 
lambda s: -1 * s[1])  File /home/hadoop/spark/python/pyspark/rdd.py, line 
1174, in takeOrdered    return self.mapPartitions(lambda it: 
[heapq.nsmallest(num, it, key)]).reduce(merge)  File 
/home/hadoop/spark/python/pyspark/rdd.py, line 739, in reduce    vals = 
self.mapPartitions(func).collect()  File 
/home/hadoop/spark/python/pyspark/rdd.py, line 713, in collect    port = 
self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())  File 
/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 
538, in __call__    self.target_id, self.name)  File 
/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 
300, in get_return_value    format(target_id, '.', name), value)
How can I rewrite this code



endpointCounts = (access_logs  .map(lambda log: (log.endpoint, 
1))  .reduceByKey(lambda a, b : a + b))
#Endpoints is now a list of Tuples of [(endpoint1, count1), (endpoint2, 
count2), ]
topEndpoints = endpointCounts.takeOrdered(10, lambda s: -1 * s[1])

so that this error does not happen?

Test case for the spark sql catalyst

2015-08-24 Thread Todd
Hi, Are there test cases for the spark sql catalyst, such as testing the rules 
of transforming unsolved query plan?
Thanks!


How to access Spark UI through AWS

2015-08-24 Thread Justin Pihony
I am using the steps from  this article
https://aws.amazon.com/articles/Elastic-MapReduce/4926593393724923   to
get spark up and running on EMR through yarn. Once up and running I ssh in
and cd to the spark bin and run spark-shell --master yarn. Once this spins
up I can see that the UI is started at the internal ip of 4040. If I hit the
public dns at 4040 with dynamic port tunneling and foxyproxy then I get a
crude UI (css seems broken), however the proxy continuously redirects me to
the main page, so I cannot drill into anything. So, I tried static
tunneling, but can't seem to get through.

So, how can I access the spark UI when running a spark shell in AWS yarn?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-access-Spark-UI-through-AWS-tp24436.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org