Save and read parquet from the same path

2015-03-04 Thread Karlson

Hi all,

what would happen if I save a RDD via saveAsParquetFile to the same path 
that RDD is originally read from? Is that a safe thing to do in Pyspark?


Thanks!


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Nested Case Classes (Found and Required Same)

2015-03-04 Thread Bojan Kostic
Did you find any other way for this issue?
I just found out that i have 22 columns data set... And now i am searching
for best solution.

Anyone else have experienced with this problem?

Best
Bojan



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Nested-Case-Classes-Found-and-Required-Same-tp14096p21908.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Does SparkSQL support ..... having count (fieldname) in SQL statement?

2015-03-04 Thread Cheng, Hao
I’ve tried with latest code, seems it works, which version are you using Shahab?

From: yana [mailto:yana.kadiy...@gmail.com]
Sent: Wednesday, March 4, 2015 8:47 PM
To: shahab; user@spark.apache.org
Subject: RE: Does SparkSQL support . having count (fieldname) in SQL 
statement?

I think the problem is that you are using an alias in the having clause. I am 
not able to try just now but see if HAVING count (*) 2 works ( ie dont use cnt)


Sent on the new Sprint Network from my Samsung Galaxy S®4.

 Original message 
From: shahab
Date:03/04/2015 7:22 AM (GMT-05:00)
To: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Does SparkSQL support . having count (fieldname) in SQL 
statement?

Hi,

It seems that SparkSQL, even the HiveContext, does not support SQL statements 
like :   SELECT category, count(1) AS cnt FROM products GROUP BY category 
HAVING cnt  10;

I get this exception:

Error: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: 
Unresolved attributes: CAST(('cnt  2), BooleanType), tree:


I couldn't find anywhere is documentation whether having keyword is not 
supported ?
If this is the case, what would be the work around? using two nested select 
statements?

best,
/Shahab


Re: scala.Double vs java.lang.Double in RDD

2015-03-04 Thread Imran Rashid
This doesn't involve spark at all, I think this is entirely an issue with
how scala deals w/ primitives and boxing.  Often it can hide the details
for you, but IMO it just leads to far more confusing errors when things
don't work out.  The issue here is that your map has value type Any, which
leads scala to leave it as a boxed java.lang.Double.

scala val x = 1.5
 x: Double = 1.5
 scala x.getClass()
 res0: Class[Double] = double
 scala x.getClass() == classOf[java.lang.Double]
 res1: Boolean = false
 scala x.getClass() == classOf[Double]
 res2: Boolean = true
 scala val arr = Array(1.5,2.5)
 arr: Array[Double] = Array(1.5, 2.5)
 scala arr.getClass().getComponentType() == x.getClass()
 res5: Boolean = true
 scala arr.getClass().getComponentType() == classOf[java.lang.Double]
 res6: Boolean = false

//this map has java.lang.Double
 scala val map: Map[String, Any] = arr.map{x = x.toString - x}.toMap
 map: Map[String,Any] = Map(1.5 - 1.5, 2.5 - 2.5)
 scala map(1.5).getClass()
 res15: Class[_] = class java.lang.Double
 scala map(1.5).getClass() == x.getClass()
 res10: Boolean = false
 scala map(1.5).getClass() == classOf[java.lang.Double]
 res11: Boolean = true
 //this one has Double
 scala val map2: Map[String, Double] = arr.map{x = x.toString - x}.toMap
 map2: Map[String,Double] = Map(1.5 - 1.5, 2.5 - 2.5)
 scala map2(1.5).getClass()
 res12: Class[Double] = double
 scala map2(1.5).getClass() == x.getClass()
 res13: Boolean = true
 scala map2(1.5).getClass() == classOf[java.lang.Double]
 res14: Boolean = false


On Wed, Mar 4, 2015 at 3:17 AM, Tobias Pfeiffer t...@preferred.jp wrote:

 Hi,

 I have a function with signature

   def aggFun1(rdd: RDD[(Long, (Long, Double))]):
 RDD[(Long, Any)]

 and one with

   def aggFun2[_Key: ClassTag, _Index](rdd: RDD[(_Key, (_Index, Double))]):
 RDD[(_Key, Double)]

 where all Double classes involved are scala.Double classes (according
 to IDEA) and my implementation of aggFun1 is just calling aggFun2 (type
 parameters _Key and _Index are inferred by the Scala compiler).

 Now I am writing a test as follows:

   val result: Map[Long, Any] = aggFun1(input).collect().toMap
   result.values.foreach(v = println(v.getClass))
   result.values.foreach(_ shouldBe a[Double])

 and I get the following output:

   class java.lang.Double
   class java.lang.Double
   [info] avg
   [info] - should compute the average *** FAILED ***
   [info]   1.75 was not an instance of double, but an instance of
 java.lang.Double

 So I am wondering about what magic is going on here. Are scala.Double
 values in RDDs automatically converted to java.lang.Doubles or am I just
 missing the implicit back-conversion etc.?

 Any help appreciated,
 Tobias




Re: Is the RDD's Partitions determined before hand ?

2015-03-04 Thread Imran Rashid
You can set the number of partitions dynamically -- its just a parameter to
a method, so you can compute it however you want, it doesn't need to be
some static constant:

val dataSizeEstimate = yourMagicFunctionToEstimateDataSize()
val numberOfPartitions =
yourConversionFromDataSizeToNumPartitions(dataSizeEstimate)


val reducedRDD = someInputRDD.reduceByKey(f, numberOfPartitions) //or
whatever else that needs to know number of partitions

of course this means you need to do the work of figuring out those magic
functions, but its certainly possible.

I agree with all of Sean's recommendations, but I guess I might put a bit
more emphasis on The one exception are operations that tend to pull data
into memory.  For me, I've found that to be a very important exception,
that can come up a lot.  And though in general a lot of partitions makes
sense, there have been recent questions on the user list about folks going
to far, using eg. 100K partitions and then having the bookkeeping overhead
dominating.  But thats a pretty big number -- you should still be able to
err on the side of too many partitions w/out going that far, I'd imagine.



On Wed, Mar 4, 2015 at 4:17 AM, Jeff Zhang zjf...@gmail.com wrote:

 Hi Sean,

   If you know a stage needs unusually high parallelism for example you
 can repartition further for that stage.

 The problem is we may don't know whether high parallelism is needed. e.g.
 for the join operator, high parallelism may only be necessary for some
 dataset that lots of data can join together while for other dataset high
 parallelism may not be necessary if only a few data can join together.

 So my question is that unable changing parallelism at runtime dynamically
 may not be flexible.



 On Wed, Mar 4, 2015 at 5:36 PM, Sean Owen so...@cloudera.com wrote:

 Hm, what do you mean? You can control, to some extent, the number of
 partitions when you read the data, and can repartition if needed.

 You can set the default parallelism too so that it takes effect for most
 ops thay create an RDD. One # of partitions is usually about right for all
 work (2x or so the number of execution slots).

 If you know a stage needs unusually high parallelism for example you can
 repartition further for that stage.
  On Mar 4, 2015 1:50 AM, Jeff Zhang zjf...@gmail.com wrote:

 Thanks Sean.

 But if the partitions of RDD is determined before hand, it would not be
 flexible to run the same program on the different dataset. Although for the
 first stage the partitions can be determined by the input data set, for the
 intermediate stage it is not possible. Users have to create policy to
 repartition or coalesce based on the data set size.


 On Tue, Mar 3, 2015 at 6:29 PM, Sean Owen so...@cloudera.com wrote:

 An RDD has a certain fixed number of partitions, yes. You can't change
 an RDD. You can repartition() or coalese() and RDD to make a new one
 with a different number of RDDs, possibly requiring a shuffle.

 On Tue, Mar 3, 2015 at 10:21 AM, Jeff Zhang zjf...@gmail.com wrote:
  I mean is it possible to change the partition number at runtime.
 Thanks
 
 
  --
  Best Regards
 
  Jeff Zhang




 --
 Best Regards

 Jeff Zhang




 --
 Best Regards

 Jeff Zhang



Error communicating with MapOutputTracker

2015-03-04 Thread Thomas Gerber
Hello,

We are using spark 1.2.1 on a very large cluster (100 c3.8xlarge workers).
We use spark-submit to start an application.

We got the following error which leads to a failed stage:

Job aborted due to stage failure: Task 3095 in stage 140.0 failed 4
times, most recent failure: Lost task 3095.3 in stage 140.0 (TID
308697, ip-10-0-12-88.ec2.internal): org.apache.spark.SparkException:
Error communicating with MapOutputTracker


We tried the whole application again, and it failed on the same stage (but
it got more tasks completed on that stage) with the same error.

We then looked at executors stderr, and all show similar logs, on both runs
(see below). As far as we can tell, executors and master have disk space
left.

*Any suggestion on where to look to understand why the communication with
the MapOutputTracker fails?*

Thanks
Thomas

In case it matters, our akka settings:
spark.akka.frameSize 50
spark.akka.threads 8
// those below are 10* the default, to cope with large GCs
spark.akka.timeout 1000
spark.akka.heartbeat.pauses 6
spark.akka.failure-detector.threshold 3000.0
spark.akka.heartbeat.interval 1

Appendix: executor logs, where it starts going awry

15/03/04 11:45:00 INFO CoarseGrainedExecutorBackend: Got assigned task 298525
15/03/04 11:45:00 INFO Executor: Running task 3083.0 in stage 140.0 (TID 298525)
15/03/04 11:45:00 INFO MemoryStore: ensureFreeSpace(1473) called with
curMem=5543008799, maxMem=18127202549
15/03/04 11:45:00 INFO MemoryStore: Block broadcast_339_piece0 stored
as bytes in memory (estimated size 1473.0 B, free 11.7 GB)
15/03/04 11:45:00 INFO BlockManagerMaster: Updated info of block
broadcast_339_piece0
15/03/04 11:45:00 INFO TorrentBroadcast: Reading broadcast variable
339 took 224 ms
15/03/04 11:45:00 INFO MemoryStore: ensureFreeSpace(2536) called with
curMem=5543010272, maxMem=18127202549
15/03/04 11:45:00 INFO MemoryStore: Block broadcast_339 stored as
values in memory (estimated size 2.5 KB, free 11.7 GB)
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Doing the fetch;
tracker actor =
Actor[akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:52380/user/MapOutputTracker#-2057016370]
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
for shuffle 18, fetching them
15/03/04 11:45:00 INFO 

Re: how to save Word2VecModel

2015-03-04 Thread Xiangrui Meng
+user

On Wed, Mar 4, 2015, 8:21 AM Xiangrui Meng men...@gmail.com wrote:

 You can use the save/load implementation in naive Bayes as reference:
 https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala

 Ping me on the JIRA page to get the ticket assigned to you.

 Thanks,
 Xiangrui



Re: issue Running Spark Job on Yarn Cluster

2015-03-04 Thread sachin Singh
Not yet,
Please let. Me know if you found solution,

Regards
Sachin
On 4 Mar 2015 21:45, mael2210 [via Apache Spark User List] 
ml-node+s1001560n21909...@n3.nabble.com wrote:

 Hello,

 I am facing the exact same issue. Could you solve the problem ?

 Kind regards

 --
  If you reply to this email, your message will be added to the discussion
 below:

 http://apache-spark-user-list.1001560.n3.nabble.com/issue-Running-Spark-Job-on-Yarn-Cluster-tp21697p21909.html
  To unsubscribe from issue Running Spark Job on Yarn Cluster, click here
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=21697code=c2FjaGluLnNoYXNoaUBnbWFpbC5jb218MjE2OTd8MTkyMzgyNjU3Mw==
 .
 NAML
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/issue-Running-Spark-Job-on-Yarn-Cluster-tp21697p21912.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Does SparkSQL support ..... having count (fieldname) in SQL statement?

2015-03-04 Thread shahab
Thanks Cheng, my problem was some misspelling problem which I just fixed,
unfortunately the exception message sometimes does not pin point to exact
reason.  Sorry my bad.



On Wed, Mar 4, 2015 at 5:02 PM, Cheng, Hao hao.ch...@intel.com wrote:

  I’ve tried with latest code, seems it works, which version are you using
 Shahab?



 *From:* yana [mailto:yana.kadiy...@gmail.com]
 *Sent:* Wednesday, March 4, 2015 8:47 PM
 *To:* shahab; user@spark.apache.org
 *Subject:* RE: Does SparkSQL support . having count (fieldname) in
 SQL statement?



 I think the problem is that you are using an alias in the having clause. I
 am not able to try just now but see if HAVING count (*) 2 works ( ie dont
 use cnt)





 Sent on the new Sprint Network from my Samsung Galaxy S®4.



  Original message 

 From: shahab

 Date:03/04/2015 7:22 AM (GMT-05:00)

 To: user@spark.apache.org

 Subject: Does SparkSQL support . having count (fieldname) in SQL
 statement?



 Hi,



 It seems that SparkSQL, even the HiveContext, does not support SQL
 statements like :   SELECT category, count(1) AS cnt FROM products GROUP BY
 category HAVING cnt  10;



 I get this exception:



 Error: org.apache.spark.sql.catalyst.errors.package$TreeNodeException:
 Unresolved attributes: CAST(('cnt  2), BooleanType), tree:





 I couldn't find anywhere is documentation whether having keyword is not
 supported ?

 If this is the case, what would be the work around? using two nested
 select statements?



 best,

 /Shahab



Re: GraphX path traversal

2015-03-04 Thread Robin East
Actually your Pregel code works for me:

import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

val vertexlist = Array((1L,One), (2L,Two), (3L,Three), 
(4L,Four),(5L,Five),(6L,Six))
val edgelist = Array(Edge(6,5,6 to 5),Edge(5,4,5 to 4),Edge(4,3,4 to 3), 
Edge(3,2,3 to 2), Edge(2,1,2 to 1))
val vertices: RDD[(VertexId, String)] =  sc.parallelize(vertexlist)
val edges = sc.parallelize(edgelist)
val graph = Graph(vertices, edges)


val parentGraph = Pregel(
  graph.mapVertices((id, attr) = Set[VertexId]()),
  Set[VertexId](),
  Int.MaxValue,
  EdgeDirection.Out)(
(id, attr, msg) = (msg ++ attr),
edge = { if (edge.srcId != edge.dstId) 
  { Iterator((edge.dstId, (edge.srcAttr + edge.srcId))) 
  } 
  else Iterator.empty 
 },
(a, b) = (a ++ b))
parentGraph.vertices.collect.foreach(println(_))

Output:

(4,Set(6, 5))
(1,Set(5, 6, 2, 3, 4))
(5,Set(6))
(6,Set())
(2,Set(6, 5, 4, 3))
(3,Set(6, 5, 4))

Maybe your data.csv has edges the wrong way round

Robin

 On 3 Mar 2015, at 16:32, Madabhattula Rajesh Kumar mrajaf...@gmail.com 
 wrote:
 
 Hi,
 
 I have tried below program using pergel API but I'm not able to get my 
 required output. I'm getting exactly reverse output which I'm expecting. 
 
 // Creating graph using above mail mentioned edgefile
  val graph: Graph[Int, Int] = GraphLoader.edgeListFile(sc, 
 /home/rajesh/Downloads/graphdata/data.csv).cache()
 
  val parentGraph = Pregel(
   graph.mapVertices((id, attr) = Set[VertexId]()),
   Set[VertexId](),
   Int.MaxValue,
   EdgeDirection.Out)(
 (id, attr, msg) = (msg ++ attr),
 edge = { if (edge.srcId != edge.dstId) 
   { Iterator((edge.dstId, (edge.srcAttr + edge.srcId))) 
   } 
   else Iterator.empty 
  },
 (a, b) = (a ++ b))
 parentGraph.vertices.collect.foreach(println(_))
 
 Output :
 
 (4,Set(1, 2, 3))
 (1,Set())
 (6,Set(5, 1, 2, 3, 4))
 (3,Set(1, 2))
 (5,Set(1, 2, 3, 4))
 (2,Set(1))
 
 But I'm looking below output. 
 
 (4,Set(5, 6))
 (1,Set(2, 3, 4, 5, 6))
 (6,Set())
 (3,Set(4, 5, 6))
 (5,Set(6))
 (2,Set(3, 4, 5, 6))
 
 Could you please correct me where I'm doing wrong.
 
 Regards,
 Rajesh
  
 
 On Tue, Mar 3, 2015 at 8:42 PM, Madabhattula Rajesh Kumar 
 mrajaf...@gmail.com mailto:mrajaf...@gmail.com wrote:
 Hi Robin,
 
 Thank you for your response. Please find below my question. I have a below 
 edge file
 
 Source Vertex Destination Vertex
 1 2
 2 3
 3 4
 4 5
 5 6
 6 6
 
 In this graph 1st vertex is connected to 2nd vertex, 2nd Vertex is connected 
 to 3rd vertex,. 6th vertex is connected to 6th vertex. So 6th vertex is a 
 root node. Please find below graph
 
 image.png
 In this graph, How can I compute the 1st vertex parents like 2,3,4,5,6. 
 Similarly 2nd vertex parents like 3,4,5,6  6th vertex parent like 6 
 because this is the root node.
 
 I'm planning to use pergel API but I'm not able to define messages and vertex 
 program in that API. Could you please help me on this.
 
 Please let me know if you need more information.
 
 Regards,
 Rajesh
 
 
 On Tue, Mar 3, 2015 at 8:15 PM, Robin East robin.e...@xense.co.uk 
 mailto:robin.e...@xense.co.uk wrote:
 Rajesh
 
 I'm not sure if I can help you, however I don't even understand the question. 
 Could you restate what you are trying to do.
 
 Sent from my iPhone
 
 On 2 Mar 2015, at 11:17, Madabhattula Rajesh Kumar mrajaf...@gmail.com 
 mailto:mrajaf...@gmail.com wrote:
 
 Hi,
 
 I have a below edge list. How to find the parents path for every vertex?
 
 Example :
 
 Vertex 1 path : 2, 3, 4, 5, 6
 Vertex 2 path : 3, 4, 5, 6
 Vertex 3 path : 4,5,6
 vertex 4 path : 5,6
 vertex 5 path : 6
 
 Could you please let me know how to do this? (or) Any suggestion
 
 Source VertexDestination Vertex
 12
 23
 34
 45
 56
 
 Regards,
 Rajesh
 
 



Re: TreeNodeException: Unresolved attributes

2015-03-04 Thread Anusha Shamanur
I tried. I still get the same error.

15/03/04 09:01:50 INFO parse.ParseDriver: Parsing command: select * from
TableName where value like '%Restaurant%'

15/03/04 09:01:50 INFO parse.ParseDriver: Parse Completed.

15/03/04 09:01:50 INFO metastore.HiveMetaStore: 0: get_table : db=default
tbl=TableName

15/03/04 09:01:50 INFO HiveMetaStore.audit: ugi=as7339
ip=unknown-ip-addr cmd=get_table
: db=default tbl=TableName
results: org.apache.spark.sql.SchemaRDD =

SchemaRDD[86] at RDD at SchemaRDD.scala:108
== Query Plan ==

== Physical Plan ==

org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved
attributes: *, tree:

'Project [*]

'Filter ('value LIKE Restaurant)
  MetastoreRelation default, TableName, None



On Wed, Mar 4, 2015 at 5:39 AM, Arush Kharbanda ar...@sigmoidanalytics.com
wrote:

 Why don't you formulate a string before you pass it to the hql function
 (appending strings), and hql function is deprecated. You should use sql.


 http://spark.apache.org/docs/1.1.0/api/scala/index.html#org.apache.spark.sql.hive.HiveContext

 On Wed, Mar 4, 2015 at 6:15 AM, Anusha Shamanur anushas...@gmail.com
 wrote:

 Hi,


 I am trying to run a simple select query on a table.


 val restaurants=hiveCtx.hql(select * from TableName where column like
 '%SomeString%' )

 This gives an error as below:

 org.apache.spark.sql.catalyst.errors.package$TreeNodeException:
 Unresolved attributes: *, tree:

 How do I solve this?


 --
 Regards,
 Anusha




 --

 [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com

 *Arush Kharbanda* || Technical Teamlead

 ar...@sigmoidanalytics.com || www.sigmoidanalytics.com




-- 
Regards,
Anusha


Passing around SparkContext with in the Driver

2015-03-04 Thread kpeng1
Hi All,

I am trying to create a class that wraps functionalities that I need; some
of these functions require access to the SparkContext, which I would like to
pass in.  I know that the SparkContext is not seralizable, and I am not
planning on passing it to worker nodes or anything, I just want to wrap some
functionalities that require SparkContext's api.  As a preface, I am
basically using the spark shell to test the functionality of my code at the
moment, so I am not sure if that plays into any of the issues I am having. 
Here is my current class:

class MyClass(sparkContext: SparkContext) {
  import org.apache.spark.sql._
  import org.apache.spark.rdd._

  val sqlContext = new SQLContext(sparkContext)

  val DATA_TYPE_MAPPING = Map(
int - IntegerType,
double - DoubleType,
float - FloatType,
long - LongType,
short - ShortType,
binary - BinaryType,
bool - BooleanType,
byte - ByteType,
string - StringType)

  //removes the first line of a text file
  def removeHeader(partitionIdx: Int, fileItr: Iterator[String]):
Iterator[String] ={
//header line is first line in first partition
if(partitionIdx == 0){
  fileItr.drop(1)
}
fileItr
  }

  //returns back a StructType for the schema
  def getSchema(rawSchema: Array[String]): StructType ={
//return backs a StructField
def getSchemaFieldHelper(schemaField: String): StructField ={
  val schemaParts = schemaField.split(' ')
  StructField(schemaParts(0), DATA_TYPE_MAPPING(schemaParts(1)), true)
}

val structFields = rawSchema.map(column = getSchemaFieldHelper(column))
StructType(structFields)
  }

  def getRow(strRow: String): Row ={
val spRow = strRow.split(',')
val tRow = spRow.map(_.trim)
Row(tRow:_*)
  }
  def applySchemaToCsv(csvFile: String, includeHeader: Boolean, schemaFile:
String): SchemaRDD ={
//apply schema to rdd to create schemaRDD
def createSchemaRDD(csvRDD: RDD[Row], schemaStruct: StructType):
SchemaRDD ={
  val schemaRDD = sqlContext.applySchema(csvRDD, schemaStruct)
  schemaRDD
}
  
val rawSchema = sparkContext.textFile(schemaFile).collect
val schema = getSchema(rawSchema)
  
val rawCsvData = sparkContext.textFile(csvFile)
  
//if we want to keep header from csv file
if(includeHeader){
  val rowRDD = rawCsvData.map(getRow) 
  val schemaRDD = createSchemaRDD(rowRDD, schema)
  return schemaRDD
}
 
val csvData = rawCsvData.mapPartitionsWithIndex(removeHeader)
val rowRDD = csvData.map(getRow)
val schemaRDD = createSchemaRDD(rowRDD, schema)
schemaRDD
  }

}

So in the spark shell I am basically creating an instance of this class and
calling applySchemaToCsv like so:
val test = new MyClass(sc)
test.applySchemaToCsv(/tmp/myFile.csv, false, /tmp/schema.txt)

What I am getting is not serializable exception:
15/03/04 09:40:56 INFO SparkContext: Created broadcast 2 from textFile at
console:62
org.apache.spark.SparkException: Task not serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1435)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:615)
   .
   .
   .
Caused by: java.io.NotSerializableException:


If I remove the class wrapper and make references to sc directly everything
works.  I am basically wondering what is causing the serialization issues
and if I can wrap a class around these functions.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Passing-around-SparkContext-with-in-the-Driver-tp21913.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Monitoring UI for Hadoop Yarn Cluster

2015-03-04 Thread Srini Karri
Hi Todd and Marcelo,

Thanks for helping me. I was to able to lunch the history server on windows
with out any issues. One problem I am running into right now. I always get
the message no completed applications found in history server UI. But I was
able to browse through these applications from Spark Master. Do you have
any thoughts what could be problem? Following are my settings in spark conf
file:

spark.executor.extraClassPath
D:\\Apache\\spark-1.2.1-bin-hadoop2\\spark-1.2.1-bin-hadoop2.4\\bin\\classes
spark.eventLog.dir
D:/Apache/spark-1.2.1-bin-hadoop2/spark-1.2.1-bin-hadoop2.4/bin/tmp/spark-events
spark.history.fs.logDirectory
D:/Apache/spark-1.2.1-bin-hadoop2/spark-1.2.1-bin-hadoop2.4/bin/tmp/spark-events

Also I have attached Spark Master and Spark History server UI screen shots
for convenience. And all the logs are available and I granted directory
permissions to Everyone with full control. Following is the console
output from History server:

D:\Apache\spark-1.2.1-bin-hadoop2\spark-1.2.1-bin-hadoop2.4\binspark-class.cmd
org.apache.spark.deploy.history.HistoryServer
Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties
15/03/04 08:59:42 INFO SecurityManager: Changing view acls to: skarri
15/03/04 08:59:42 INFO SecurityManager: Changing modify acls to: skarri
15/03/04 08:59:42 INFO SecurityManager: SecurityManager: authentication
disabled
; ui acls disabled; users with view permissions: Set(skarri); users with
modify
permissions: Set(skarri)
15/03/04 08:59:49 WARN NativeCodeLoader: Unable to load native-hadoop
library fo
r your platform... using builtin-java classes where applicable
15/03/04 08:59:56 INFO Utils: Successfully started service on port 18080.
15/03/04 08:59:56 INFO HistoryServer: Started HistoryServer at
http://skarri-lt0
5.redmond.corp.microsoft.com:18080

Regards,
Srini.

On Tue, Mar 3, 2015 at 11:41 AM, Marcelo Vanzin van...@cloudera.com wrote:

 Spark applications shown in the RM's UI should have an Application
 Master link when they're running. That takes you to the Spark UI for
 that application where you can see all the information you're looking
 for.

 If you're running a history server and add
 spark.yarn.historyServer.address to your config, that link will
 become a History link after the application is finished, and will
 take you to the history server to view the app's UI.



 On Tue, Mar 3, 2015 at 9:47 AM, Srini Karri skarri@gmail.com wrote:
  Hi All,
 
  I am having trouble finding data related to my requirement. Here is the
  context, I have tried Standalone Spark Installation on Windows, I am
 able to
  submit the logs, able to see the history of events. My question is, is it
  possible to achieve the same monitoring UI experience with Yarn Cluster
 like
  Viewing workers, running/completed job stages in the Web UI. Currently,
 if
  we go to our Yarn Resource manager UI, we are able to see the Spark Jobs
 and
  it's logs. But it is not as rich as Spark Standalone master UI. Is this
  limitation for hadoop yarn cluster or is there any way we can hook this
  Spark Standalone master to Yarn Cluster?
 
  Any help is highly appreciated.
 
  Regards,
  Srini.



 --
 Marcelo


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Does anyone integrate HBASE on Spark

2015-03-04 Thread sandeep vura
Hi Sparkers,

How do i integrate hbase on spark !!!

Appreciate for replies !!

Regards,
Sandeep.v


Re: Error communicating with MapOutputTracker

2015-03-04 Thread Thomas Gerber
Follow up:
We re-retried, this time after *decreasing* spark.parallelism. It was set
to 16000 before, (5 times the number of cores in our cluster). It is now
down to 6400 (2 times the number of cores).

And it got past the point where it failed before.

Does the MapOutputTracker have a limit on the number of tasks it can track?


On Wed, Mar 4, 2015 at 8:15 AM, Thomas Gerber thomas.ger...@radius.com
wrote:

 Hello,

 We are using spark 1.2.1 on a very large cluster (100 c3.8xlarge workers).
 We use spark-submit to start an application.

 We got the following error which leads to a failed stage:

 Job aborted due to stage failure: Task 3095 in stage 140.0 failed 4 times, 
 most recent failure: Lost task 3095.3 in stage 140.0 (TID 308697, 
 ip-10-0-12-88.ec2.internal): org.apache.spark.SparkException: Error 
 communicating with MapOutputTracker


 We tried the whole application again, and it failed on the same stage (but
 it got more tasks completed on that stage) with the same error.

 We then looked at executors stderr, and all show similar logs, on both
 runs (see below). As far as we can tell, executors and master have disk
 space left.

 *Any suggestion on where to look to understand why the communication with
 the MapOutputTracker fails?*

 Thanks
 Thomas
 
 In case it matters, our akka settings:
 spark.akka.frameSize 50
 spark.akka.threads 8
 // those below are 10* the default, to cope with large GCs
 spark.akka.timeout 1000
 spark.akka.heartbeat.pauses 6
 spark.akka.failure-detector.threshold 3000.0
 spark.akka.heartbeat.interval 1

 Appendix: executor logs, where it starts going awry

 15/03/04 11:45:00 INFO CoarseGrainedExecutorBackend: Got assigned task 298525
 15/03/04 11:45:00 INFO Executor: Running task 3083.0 in stage 140.0 (TID 
 298525)
 15/03/04 11:45:00 INFO MemoryStore: ensureFreeSpace(1473) called with 
 curMem=5543008799, maxMem=18127202549
 15/03/04 11:45:00 INFO MemoryStore: Block broadcast_339_piece0 stored as 
 bytes in memory (estimated size 1473.0 B, free 11.7 GB)
 15/03/04 11:45:00 INFO BlockManagerMaster: Updated info of block 
 broadcast_339_piece0
 15/03/04 11:45:00 INFO TorrentBroadcast: Reading broadcast variable 339 took 
 224 ms
 15/03/04 11:45:00 INFO MemoryStore: ensureFreeSpace(2536) called with 
 curMem=5543010272, maxMem=18127202549
 15/03/04 11:45:00 INFO MemoryStore: Block broadcast_339 stored as values in 
 memory (estimated size 2.5 KB, free 11.7 GB)
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Doing the fetch; tracker actor 
 = 
 Actor[akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:52380/user/MapOutputTracker#-2057016370]
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them

Spark logs in standalone clusters

2015-03-04 Thread Thomas Gerber
Hello,

I was wondering where all the logs files were located on a standalone
cluster:

   1. the executor logs are in the work directory on each slave machine
   (stdout/stderr)
  - I've notice that GC information is in stdout, and stage information
  in stderr
  - *Could we get more information on what is written in stdout vs
  stderr?*
   2. the master log
  - The path to the log file is shown went you launch the master,
  like 
/mnt/var/log/apps/spark-hadoop-org.apache.spark.deploy.master.Master-MACHINENAME.out;
  - *Could we get more information on where this path is configured?*
   3. driver logs
  - It seems they are only in the console by default (although you can
  override that in the log4j.properties file.
   4. communication manager logs?
   - *Are there any logs for the communication manager (aka the
  MapOutputTracker?)?*
   5. Any other log file?

Thanks,
Thomas


configure number of cached partition in memory on SparkSQL

2015-03-04 Thread Judy Nash
Hi,

I am tuning a hive dataset on Spark SQL deployed via thrift server.

How can I change the number of partitions after caching the table on thrift 
server?

I have tried the following but still getting the same number of partitions 
after caching:
Spark.default.parallelism
spark.sql.inMemoryColumnarStorage.batchSize


Thanks,
Judy


Re: Spark Monitoring UI for Hadoop Yarn Cluster

2015-03-04 Thread Srini Karri
Yes. I do see files, actually I missed copying the other settings:

spark.master spark://
skarri-lt05.redmond.corp.microsoft.com:7077
spark.eventLog.enabled   true
spark.rdd.compress true
spark.storage.memoryFraction 1
spark.core.connection.ack.wait.timeout 6000
spark.akka.frameSize 50
spark.executor.extraClassPath
D:\\Apache\\spark-1.2.1-bin-hadoop2\\spark-1.2.1-bin-hadoop2.4\\bin\\classes
spark.eventLog.dir
D:/Apache/spark-1.2.1-bin-hadoop2/spark-1.2.1-bin-hadoop2.4/bin/tmp/spark-events
spark.history.fs.logDirectory
D:/Apache/spark-1.2.1-bin-hadoop2/spark-1.2.1-bin-hadoop2.4/bin/tmp/spark-events


On Wed, Mar 4, 2015 at 10:15 AM, Marcelo Vanzin van...@cloudera.com wrote:

 On Wed, Mar 4, 2015 at 10:08 AM, Srini Karri skarri@gmail.com wrote:
  spark.executor.extraClassPath
 
 D:\\Apache\\spark-1.2.1-bin-hadoop2\\spark-1.2.1-bin-hadoop2.4\\bin\\classes
  spark.eventLog.dir
 
 D:/Apache/spark-1.2.1-bin-hadoop2/spark-1.2.1-bin-hadoop2.4/bin/tmp/spark-events
  spark.history.fs.logDirectory
 
 D:/Apache/spark-1.2.1-bin-hadoop2/spark-1.2.1-bin-hadoop2.4/bin/tmp/spark-events

 Do you see any files in that directory?

 spark.eventLog.dir won't do anything unless you also have
 spark.eventLog.enabled=true somewhere. And these are application
 configs, so make sure they're set when running your application (not
 when starting the history server).

 --
 Marcelo



Re: Error communicating with MapOutputTracker

2015-03-04 Thread Thomas Gerber
I meant spark.default.parallelism of course.

On Wed, Mar 4, 2015 at 10:24 AM, Thomas Gerber thomas.ger...@radius.com
wrote:

 Follow up:
 We re-retried, this time after *decreasing* spark.parallelism. It was set
 to 16000 before, (5 times the number of cores in our cluster). It is now
 down to 6400 (2 times the number of cores).

 And it got past the point where it failed before.

 Does the MapOutputTracker have a limit on the number of tasks it can track?


 On Wed, Mar 4, 2015 at 8:15 AM, Thomas Gerber thomas.ger...@radius.com
 wrote:

 Hello,

 We are using spark 1.2.1 on a very large cluster (100 c3.8xlarge
 workers). We use spark-submit to start an application.

 We got the following error which leads to a failed stage:

 Job aborted due to stage failure: Task 3095 in stage 140.0 failed 4 times, 
 most recent failure: Lost task 3095.3 in stage 140.0 (TID 308697, 
 ip-10-0-12-88.ec2.internal): org.apache.spark.SparkException: Error 
 communicating with MapOutputTracker


 We tried the whole application again, and it failed on the same stage
 (but it got more tasks completed on that stage) with the same error.

 We then looked at executors stderr, and all show similar logs, on both
 runs (see below). As far as we can tell, executors and master have disk
 space left.

 *Any suggestion on where to look to understand why the communication with
 the MapOutputTracker fails?*

 Thanks
 Thomas
 
 In case it matters, our akka settings:
 spark.akka.frameSize 50
 spark.akka.threads 8
 // those below are 10* the default, to cope with large GCs
 spark.akka.timeout 1000
 spark.akka.heartbeat.pauses 6
 spark.akka.failure-detector.threshold 3000.0
 spark.akka.heartbeat.interval 1

 Appendix: executor logs, where it starts going awry

 15/03/04 11:45:00 INFO CoarseGrainedExecutorBackend: Got assigned task 298525
 15/03/04 11:45:00 INFO Executor: Running task 3083.0 in stage 140.0 (TID 
 298525)
 15/03/04 11:45:00 INFO MemoryStore: ensureFreeSpace(1473) called with 
 curMem=5543008799, maxMem=18127202549
 15/03/04 11:45:00 INFO MemoryStore: Block broadcast_339_piece0 stored as 
 bytes in memory (estimated size 1473.0 B, free 11.7 GB)
 15/03/04 11:45:00 INFO BlockManagerMaster: Updated info of block 
 broadcast_339_piece0
 15/03/04 11:45:00 INFO TorrentBroadcast: Reading broadcast variable 339 took 
 224 ms
 15/03/04 11:45:00 INFO MemoryStore: ensureFreeSpace(2536) called with 
 curMem=5543010272, maxMem=18127202549
 15/03/04 11:45:00 INFO MemoryStore: Block broadcast_339 stored as values in 
 memory (estimated size 2.5 KB, free 11.7 GB)
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Doing the fetch; tracker 
 actor = 
 Actor[akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:52380/user/MapOutputTracker#-2057016370]
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs 

Re: Spark Monitoring UI for Hadoop Yarn Cluster

2015-03-04 Thread Marcelo Vanzin
On Wed, Mar 4, 2015 at 10:08 AM, Srini Karri skarri@gmail.com wrote:
 spark.executor.extraClassPath
 D:\\Apache\\spark-1.2.1-bin-hadoop2\\spark-1.2.1-bin-hadoop2.4\\bin\\classes
 spark.eventLog.dir
 D:/Apache/spark-1.2.1-bin-hadoop2/spark-1.2.1-bin-hadoop2.4/bin/tmp/spark-events
 spark.history.fs.logDirectory
 D:/Apache/spark-1.2.1-bin-hadoop2/spark-1.2.1-bin-hadoop2.4/bin/tmp/spark-events

Do you see any files in that directory?

spark.eventLog.dir won't do anything unless you also have
spark.eventLog.enabled=true somewhere. And these are application
configs, so make sure they're set when running your application (not
when starting the history server).

-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark sql median and standard deviation

2015-03-04 Thread tridib
Hello,
Is there in built function for getting median and standard deviation in
spark sql? Currently I am converting the schemaRdd to DoubleRdd and calling
doubleRDD.stats(). But still it does not have median.

What is the most efficient way to get the median?

Thanks  Regards
Tridib



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-median-and-standard-deviation-tp21914.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Monitoring UI for Hadoop Yarn Cluster

2015-03-04 Thread Srini Karri
Hi Marcelo,

I found the problem from
http://mail-archives.apache.org/mod_mbox/spark-user/201409.mbox/%3cCAL+LEBfzzjugOoB2iFFdz_=9TQsH=DaiKY=cvydfydg3ac5...@mail.gmail.com%3e
this link. The problem is the application I am running, is not generating
APPLICATION_COMPLETE file. If I add this file manually it is showing
application in the UI. So the problem is with application which is not
calling Stop method on the spark context.

Thank you and Todd for helping. Hopefully I will be able to apply these on
the actual cluster.

Regards,
Srini.

On Wed, Mar 4, 2015 at 10:20 AM, Srini Karri skarri@gmail.com wrote:

 Yes. I do see files, actually I missed copying the other settings:

 spark.master spark://
 skarri-lt05.redmond.corp.microsoft.com:7077
 spark.eventLog.enabled   true
 spark.rdd.compress true
 spark.storage.memoryFraction 1
 spark.core.connection.ack.wait.timeout 6000
 spark.akka.frameSize 50
 spark.executor.extraClassPath
 D:\\Apache\\spark-1.2.1-bin-hadoop2\\spark-1.2.1-bin-hadoop2.4\\bin\\classes
 spark.eventLog.dir
 D:/Apache/spark-1.2.1-bin-hadoop2/spark-1.2.1-bin-hadoop2.4/bin/tmp/spark-events
 spark.history.fs.logDirectory
 D:/Apache/spark-1.2.1-bin-hadoop2/spark-1.2.1-bin-hadoop2.4/bin/tmp/spark-events


 On Wed, Mar 4, 2015 at 10:15 AM, Marcelo Vanzin van...@cloudera.com
 wrote:

 On Wed, Mar 4, 2015 at 10:08 AM, Srini Karri skarri@gmail.com
 wrote:
  spark.executor.extraClassPath
 
 D:\\Apache\\spark-1.2.1-bin-hadoop2\\spark-1.2.1-bin-hadoop2.4\\bin\\classes
  spark.eventLog.dir
 
 D:/Apache/spark-1.2.1-bin-hadoop2/spark-1.2.1-bin-hadoop2.4/bin/tmp/spark-events
  spark.history.fs.logDirectory
 
 D:/Apache/spark-1.2.1-bin-hadoop2/spark-1.2.1-bin-hadoop2.4/bin/tmp/spark-events

 Do you see any files in that directory?

 spark.eventLog.dir won't do anything unless you also have
 spark.eventLog.enabled=true somewhere. And these are application
 configs, so make sure they're set when running your application (not
 when starting the history server).

 --
 Marcelo





Re: Save and read parquet from the same path

2015-03-04 Thread Michael Armbrust
No, this is not safe to do.

On Wed, Mar 4, 2015 at 7:14 AM, Karlson ksonsp...@siberie.de wrote:

 Hi all,

 what would happen if I save a RDD via saveAsParquetFile to the same path
 that RDD is originally read from? Is that a safe thing to do in Pyspark?

 Thanks!


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark SQL Static Analysis

2015-03-04 Thread Justin Pihony
Thanks!

On Wed, Mar 4, 2015 at 3:58 PM, Michael Armbrust mich...@databricks.com
wrote:

 It is somewhat out of data, but here is what we have so far:
 https://github.com/marmbrus/sql-typed

 On Wed, Mar 4, 2015 at 12:53 PM, Justin Pihony justin.pih...@gmail.com
 wrote:

 I am pretty sure that I saw a presentation where SparkSQL could be
 executed
 with static analysis, however I cannot find the presentation now, nor can
 I
 find any documentation or research papers on the topic. So, I am curious
 if
 there is indeed any work going on for this topic. The two things I would
 be
 interested in would be to be able to gain compile time safety, as well as
 gain the ability to work on my data as a type instead of a row (ie,
 result.map(x=x.Age) instead of having to use Row.get)





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Static-Analysis-tp21918.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Driver disassociated

2015-03-04 Thread Thomas Gerber
Hello,

sometimes, in the *middle* of a job, the job stops (status is then seen as
FINISHED in the master).

There isn't anything wrong in the shell/submit output.

When looking at the executor logs, I see logs like this:

15/03/04 21:24:51 INFO MapOutputTrackerWorker: Doing the fetch; tracker
actor = Actor[akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal
:40019/user/MapOutputTracker#893807065]
15/03/04 21:24:51 INFO MapOutputTrackerWorker: Don't have map outputs for
shuffle 38, fetching them
15/03/04 21:24:55 ERROR CoarseGrainedExecutorBackend: Driver Disassociated
[akka.tcp://sparkExecutor@ip-10-0-11-9.ec2.internal:54766] -
[akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:40019] disassociated!
Shutting down.
15/03/04 21:24:55 WARN ReliableDeliverySupervisor: Association with remote
system [akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:40019] has
failed, address is now gated for [5000] ms. Reason is: [Disassociated].

How can I investigate further?
Thanks


Re: spark sql median and standard deviation

2015-03-04 Thread Ted Yu
Please take a look at DoubleRDDFunctions.scala :

  /** Compute the mean of this RDD's elements. */
  def mean(): Double = stats().mean

  /** Compute the variance of this RDD's elements. */
  def variance(): Double = stats().variance

  /** Compute the standard deviation of this RDD's elements. */
  def stdev(): Double = stats().stdev

Cheers

On Wed, Mar 4, 2015 at 10:51 AM, tridib tridib.sama...@live.com wrote:

 Hello,
 Is there in built function for getting median and standard deviation in
 spark sql? Currently I am converting the schemaRdd to DoubleRdd and calling
 doubleRDD.stats(). But still it does not have median.

 What is the most efficient way to get the median?

 Thanks  Regards
 Tridib



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-median-and-standard-deviation-tp21914.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Integer column in schema RDD from parquet being considered as string

2015-03-04 Thread gtinside
Hi ,

I am coverting jsonRDD to parquet by saving it as parquet file
(saveAsParquetFile)
cacheContext.jsonFile(file:///u1/sample.json).saveAsParquetFile(sample.parquet)

I am reading parquet file and registering it as a table :
val parquet = cacheContext.parquetFile(sample_trades.parquet)
parquet.registerTempTable(sample)

When I do a print schema , I see :
root
 |-- SAMPLE: struct (nullable = true)
 ||-- CODE: integer (nullable = true)
 ||-- DESC: string (nullable = true)

When I query :
cacheContext.sql(select SAMPLE.DESC from sample where
SAMPLE.CODE=1).map(t=t).collect.foreach(println) , I get error that 
java.lang.IllegalArgumentException: Column [CODE] was not found in schema!

but if I put SAMPLE.CODE in single code (forcing it as string) , it works ,
for example :
cacheContext.sql(select SAMPLE.DESC from sample where
*SAMPLE.CODE='1'*).map(t=t).collect.foreach(println) works

What am I missing here ? I understand catalyst will do optimization so data
type doesn't matter that much , but something is off here .

Regards,
Gaurav









--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Integer-column-in-schema-RDD-from-parquet-being-considered-as-string-tp21917.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark SQL Static Analysis

2015-03-04 Thread Justin Pihony
I am pretty sure that I saw a presentation where SparkSQL could be executed
with static analysis, however I cannot find the presentation now, nor can I
find any documentation or research papers on the topic. So, I am curious if
there is indeed any work going on for this topic. The two things I would be
interested in would be to be able to gain compile time safety, as well as
gain the ability to work on my data as a type instead of a row (ie,
result.map(x=x.Age) instead of having to use Row.get)





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Static-Analysis-tp21918.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark SQL Static Analysis

2015-03-04 Thread Michael Armbrust
It is somewhat out of data, but here is what we have so far:
https://github.com/marmbrus/sql-typed

On Wed, Mar 4, 2015 at 12:53 PM, Justin Pihony justin.pih...@gmail.com
wrote:

 I am pretty sure that I saw a presentation where SparkSQL could be executed
 with static analysis, however I cannot find the presentation now, nor can I
 find any documentation or research papers on the topic. So, I am curious if
 there is indeed any work going on for this topic. The two things I would be
 interested in would be to be able to gain compile time safety, as well as
 gain the ability to work on my data as a type instead of a row (ie,
 result.map(x=x.Age) instead of having to use Row.get)





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Static-Analysis-tp21918.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Does anyone integrate HBASE on Spark

2015-03-04 Thread gen tang
Hi,

There are some examples in spark/example
https://github.com/apache/spark/tree/master/examples and there are also
some examples in spark package http://spark-packages.org/.
And I find this blog
http://www.abcn.net/2014/07/lighting-spark-with-hbase-full-edition.html
is quite good.

Hope it would be helpful

Cheers
Gen


On Wed, Mar 4, 2015 at 6:51 PM, sandeep vura sandeepv...@gmail.com wrote:

 Hi Sparkers,

 How do i integrate hbase on spark !!!

 Appreciate for replies !!

 Regards,
 Sandeep.v



Re: issue Running Spark Job on Yarn Cluster

2015-03-04 Thread roni
look at the logs
yarn logs --applicationId applicationId
That should give the error.

On Wed, Mar 4, 2015 at 9:21 AM, sachin Singh sachin.sha...@gmail.com
wrote:

 Not yet,
 Please let. Me know if you found solution,

 Regards
 Sachin
 On 4 Mar 2015 21:45, mael2210 [via Apache Spark User List] [hidden
 email] http:///user/SendEmail.jtp?type=nodenode=21912i=0 wrote:

 Hello,

 I am facing the exact same issue. Could you solve the problem ?

 Kind regards

 --
  If you reply to this email, your message will be added to the
 discussion below:

 http://apache-spark-user-list.1001560.n3.nabble.com/issue-Running-Spark-Job-on-Yarn-Cluster-tp21697p21909.html
  To unsubscribe from issue Running Spark Job on Yarn Cluster, click here.
 NAML
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml


 --
 View this message in context: Re: issue Running Spark Job on Yarn Cluster
 http://apache-spark-user-list.1001560.n3.nabble.com/issue-Running-Spark-Job-on-Yarn-Cluster-tp21697p21912.html
 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.



Issues with maven dependencies for version 1.2.0 but not version 1.1.0

2015-03-04 Thread kpeng1
Hi All,

I am currently having problem with the maven dependencies for version 1.2.0
of spark-core and spark-hive.  Here are my dependencies:
dependency
  groupIdorg.apache.spark/groupId
  artifactIdspark-core_2.10/artifactId
  version1.2.0/version
/dependency
dependency
  groupIdorg.apache.spark/groupId
  artifactIdspark-hive_2.10/artifactId
  version1.2.0/version
/dependency

When the dependencies are set to version 1.1.0, I do not get any errors. 
Here are the errors I am getting from artifactory for version 1.2.0 of
spark-core:
error=Could not transfer artifact
org.apache.spark\:spark-core_2.10\:pom\:1.2.0 from/to repo
(https\://m2.mines.com\:8443/artifactory/repo)\: Failed to transfer file\:
https\://m2.mines.com\:8443/artifactory/repo/org/apache/spark/spark-core_2.10/1.2.0/spark-core_2.10-1.2.0.pom.
Return code is\: 409 , ReasonPhrase\:Conflict.

The error is the same for spark-hive.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Issues-with-maven-dependencies-for-version-1-2-0-but-not-version-1-1-0-tp21919.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Issues with maven dependencies for version 1.2.0 but not version 1.1.0

2015-03-04 Thread Marcelo Vanzin
Hi Kevin,

If you're using CDH, I'd recommend using the CDH repo [1], and also
the CDH version when building your app.

[1] 
http://www.cloudera.com/content/cloudera/en/documentation/core/v5-2-x/topics/cdh_vd_cdh5_maven_repo.html

On Wed, Mar 4, 2015 at 4:34 PM, Kevin Peng kpe...@gmail.com wrote:
 Ted,

 I am currently using CDH 5.3 distro, which has Spark 1.2.0, so I am not too
 sure about the compatibility issues between 1.2.0 and 1.2.1, that is why I
 would want to stick to 1.2.0.



 On Wed, Mar 4, 2015 at 4:25 PM, Ted Yu yuzhih...@gmail.com wrote:

 Kevin:
 You can try with 1.2.1

 See this thread: http://search-hadoop.com/m/JW1q5Vfe6X1

 Cheers

 On Wed, Mar 4, 2015 at 4:18 PM, Kevin Peng kpe...@gmail.com wrote:

 Marcelo,

 Yes that is correct, I am going through a mirror, but 1.1.0 works
 properly, while 1.2.0 does not.  I suspect there is crc in the 1.2.0 pom
 file.

 On Wed, Mar 4, 2015 at 4:10 PM, Marcelo Vanzin van...@cloudera.com
 wrote:

 Seems like someone set up m2.mines.com as a mirror in your pom file
 or ~/.m2/settings.xml, and it doesn't mirror Spark 1.2 (or does but is
 in a messed up state).

 On Wed, Mar 4, 2015 at 3:49 PM, kpeng1 kpe...@gmail.com wrote:
  Hi All,
 
  I am currently having problem with the maven dependencies for version
  1.2.0
  of spark-core and spark-hive.  Here are my dependencies:
  dependency
groupIdorg.apache.spark/groupId
artifactIdspark-core_2.10/artifactId
version1.2.0/version
  /dependency
  dependency
groupIdorg.apache.spark/groupId
artifactIdspark-hive_2.10/artifactId
version1.2.0/version
  /dependency
 
  When the dependencies are set to version 1.1.0, I do not get any
  errors.
  Here are the errors I am getting from artifactory for version 1.2.0 of
  spark-core:
  error=Could not transfer artifact
  org.apache.spark\:spark-core_2.10\:pom\:1.2.0 from/to repo
  (https\://m2.mines.com\:8443/artifactory/repo)\: Failed to transfer
  file\:
 
  https\://m2.mines.com\:8443/artifactory/repo/org/apache/spark/spark-core_2.10/1.2.0/spark-core_2.10-1.2.0.pom.
  Return code is\: 409 , ReasonPhrase\:Conflict.
 
  The error is the same for spark-hive.
 
 
 
 
  --
  View this message in context:
  http://apache-spark-user-list.1001560.n3.nabble.com/Issues-with-maven-dependencies-for-version-1-2-0-but-not-version-1-1-0-tp21919.html
  Sent from the Apache Spark User List mailing list archive at
  Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 



 --
 Marcelo







-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark master shut down suddenly

2015-03-04 Thread lisendong
I ‘m sorry, but how to look at the mesos logs?
where are them?



 在 2015年3月4日,下午6:06,Akhil Das ak...@sigmoidanalytics.com 写道:
 
 You can check in the mesos logs and see whats really happening.
 
 Thanks
 Best Regards
 
 On Wed, Mar 4, 2015 at 3:10 PM, lisendong lisend...@163.com 
 mailto:lisend...@163.com wrote:
 15/03/04 09:26:36 INFO ClientCnxn: Client session timed out, have not heard
 from server in 26679ms for sessionid 0x34bbf3313a8001b, closing socket
 connection and attempting reconnect
 15/03/04 09:26:36 INFO ConnectionStateManager: State change: SUSPENDED
 15/03/04 09:26:36 INFO ZooKeeperLeaderElectionAgent: We have lost leadership
 15/03/04 09:26:36 ERROR Master: Leadership has been revoked -- master
 shutting down.
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/spark-master-shut-down-suddenly-tp21907.html
  
 http://apache-spark-user-list.1001560.n3.nabble.com/spark-master-shut-down-suddenly-tp21907.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
 mailto:user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org 
 mailto:user-h...@spark.apache.org
 
 



distribution of receivers in spark streaming

2015-03-04 Thread Du Li
Hi,
I have a set of machines (say 5) and want to evenly launch a number (say 8) of 
kafka receivers on those machines. In my code I did something like the 
following, as suggested in the spark docs:        val streams = (1 to 
numReceivers).map(_ = ssc.receiverStream(new MyKafkaReceiver()))        
ssc.union(streams)
However, from the spark UI, I saw that some machines are not running any 
instance of the receiver while some get three. The mapping changed every time 
the system was restarted. This impacts the receiving and also the processing 
speeds.
I wonder if it's possible to control/suggest the distribution so that it would 
be more even. How is the decision made in spark?
Thanks,Du



Re: Issues with maven dependencies for version 1.2.0 but not version 1.1.0

2015-03-04 Thread Marcelo Vanzin
Seems like someone set up m2.mines.com as a mirror in your pom file
or ~/.m2/settings.xml, and it doesn't mirror Spark 1.2 (or does but is
in a messed up state).

On Wed, Mar 4, 2015 at 3:49 PM, kpeng1 kpe...@gmail.com wrote:
 Hi All,

 I am currently having problem with the maven dependencies for version 1.2.0
 of spark-core and spark-hive.  Here are my dependencies:
 dependency
   groupIdorg.apache.spark/groupId
   artifactIdspark-core_2.10/artifactId
   version1.2.0/version
 /dependency
 dependency
   groupIdorg.apache.spark/groupId
   artifactIdspark-hive_2.10/artifactId
   version1.2.0/version
 /dependency

 When the dependencies are set to version 1.1.0, I do not get any errors.
 Here are the errors I am getting from artifactory for version 1.2.0 of
 spark-core:
 error=Could not transfer artifact
 org.apache.spark\:spark-core_2.10\:pom\:1.2.0 from/to repo
 (https\://m2.mines.com\:8443/artifactory/repo)\: Failed to transfer file\:
 https\://m2.mines.com\:8443/artifactory/repo/org/apache/spark/spark-core_2.10/1.2.0/spark-core_2.10-1.2.0.pom.
 Return code is\: 409 , ReasonPhrase\:Conflict.

 The error is the same for spark-hive.




 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Issues-with-maven-dependencies-for-version-1-2-0-but-not-version-1-1-0-tp21919.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Driver disassociated

2015-03-04 Thread Thomas Gerber
Also,

I was experiencing another problem which might be related:
Error communicating with MapOutputTracker (see email in the ML today).

I just thought I would mention it in case it is relevant.

On Wed, Mar 4, 2015 at 4:07 PM, Thomas Gerber thomas.ger...@radius.com
wrote:

 1.2.1

 Also, I was using the following parameters, which are 10 times the default
 ones:
 spark.akka.timeout 1000
 spark.akka.heartbeat.pauses 6
 spark.akka.failure-detector.threshold 3000.0
 spark.akka.heartbeat.interval 1

 which should have helped *avoid* the problem if I understand correctly.

 Thanks,
 Thomas

 On Wed, Mar 4, 2015 at 3:21 PM, Ted Yu yuzhih...@gmail.com wrote:

 What release are you using ?

 SPARK-3923 went into 1.2.0 release.

 Cheers

 On Wed, Mar 4, 2015 at 1:39 PM, Thomas Gerber thomas.ger...@radius.com
 wrote:

 Hello,

 sometimes, in the *middle* of a job, the job stops (status is then seen
 as FINISHED in the master).

 There isn't anything wrong in the shell/submit output.

 When looking at the executor logs, I see logs like this:

 15/03/04 21:24:51 INFO MapOutputTrackerWorker: Doing the fetch; tracker
 actor = Actor[akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal
 :40019/user/MapOutputTracker#893807065]
 15/03/04 21:24:51 INFO MapOutputTrackerWorker: Don't have map outputs
 for shuffle 38, fetching them
 15/03/04 21:24:55 ERROR CoarseGrainedExecutorBackend: Driver
 Disassociated [akka.tcp://sparkExecutor@ip-10-0-11-9.ec2.internal:54766]
 - [akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:40019]
 disassociated! Shutting down.
 15/03/04 21:24:55 WARN ReliableDeliverySupervisor: Association with
 remote system [akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:40019]
 has failed, address is now gated for [5000] ms. Reason is: [Disassociated].

 How can I investigate further?
 Thanks






RE: Where can I find more information about the R interface forSpark?

2015-03-04 Thread Haopu Wang
Thanks, it's an active project.

 

Will it be released with Spark 1.3.0?

 



From: 鹰 [mailto:980548...@qq.com] 
Sent: Thursday, March 05, 2015 11:19 AM
To: Haopu Wang; user
Subject: Re: Where can I find more information about the R interface forSpark?

 

you can search SparkR on google or search it on github 



Re: RDD coalesce or repartition by #records or #bytes?

2015-03-04 Thread Zhan Zhang
It use HashPartitioner to distribute the record to different partitions, but 
the key is just integer  evenly across output partitions.

From the code, each resulting partition will get very similar number of 
records.

Thanks.

Zhan Zhang


On Mar 4, 2015, at 3:47 PM, Du Li 
l...@yahoo-inc.com.INVALIDmailto:l...@yahoo-inc.com.INVALID wrote:

Hi,

My RDD's are created from kafka stream. After receiving a RDD, I want to do 
coalesce/repartition it so that the data will be processed in a set of machines 
in parallel as even as possible. The number of processing nodes is larger than 
the receiving nodes.

My question is how the coalesce/repartition works. Does it distribute by the 
number of records or number of bytes? In my app, my observation is that the 
distribution seems by number of records. The consequence is, however, some 
executors have to process x1000 as much as data when the sizes of records are 
very skewed. Then we have to allocate memory by the worst case.

Is there a way to programmatically affect the coalesce /repartition scheme?

Thanks,
Du



Extra output from Spark run

2015-03-04 Thread cjwang
When I run Spark 1.2.1, I found these display that wasn't in the previous
releases:

[Stage 12:=   (6 + 1) /
16]
[Stage 12:(8 + 1) /
16]
[Stage 12:== (11 + 1) /
16]
[Stage 12:=  (14 + 1) /
16]

What do they mean and how can I get rid of them?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Extra-output-from-Spark-run-tp21920.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Where can I find more information about the R interface for Spark?

2015-03-04 Thread haopu
Do you have any update on SparkR?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Where-can-I-find-more-information-about-the-R-interface-for-Spark-tp155p21922.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Issues with maven dependencies for version 1.2.0 but not version 1.1.0

2015-03-04 Thread Kevin Peng
Marcelo,

Thanks.  The one in the CDH repo fixed it :)

On Wed, Mar 4, 2015 at 4:37 PM, Marcelo Vanzin van...@cloudera.com wrote:

 Hi Kevin,

 If you're using CDH, I'd recommend using the CDH repo [1], and also
 the CDH version when building your app.

 [1]
 http://www.cloudera.com/content/cloudera/en/documentation/core/v5-2-x/topics/cdh_vd_cdh5_maven_repo.html

 On Wed, Mar 4, 2015 at 4:34 PM, Kevin Peng kpe...@gmail.com wrote:
  Ted,
 
  I am currently using CDH 5.3 distro, which has Spark 1.2.0, so I am not
 too
  sure about the compatibility issues between 1.2.0 and 1.2.1, that is why
 I
  would want to stick to 1.2.0.
 
 
 
  On Wed, Mar 4, 2015 at 4:25 PM, Ted Yu yuzhih...@gmail.com wrote:
 
  Kevin:
  You can try with 1.2.1
 
  See this thread: http://search-hadoop.com/m/JW1q5Vfe6X1
 
  Cheers
 
  On Wed, Mar 4, 2015 at 4:18 PM, Kevin Peng kpe...@gmail.com wrote:
 
  Marcelo,
 
  Yes that is correct, I am going through a mirror, but 1.1.0 works
  properly, while 1.2.0 does not.  I suspect there is crc in the 1.2.0
 pom
  file.
 
  On Wed, Mar 4, 2015 at 4:10 PM, Marcelo Vanzin van...@cloudera.com
  wrote:
 
  Seems like someone set up m2.mines.com as a mirror in your pom file
  or ~/.m2/settings.xml, and it doesn't mirror Spark 1.2 (or does but is
  in a messed up state).
 
  On Wed, Mar 4, 2015 at 3:49 PM, kpeng1 kpe...@gmail.com wrote:
   Hi All,
  
   I am currently having problem with the maven dependencies for
 version
   1.2.0
   of spark-core and spark-hive.  Here are my dependencies:
   dependency
 groupIdorg.apache.spark/groupId
 artifactIdspark-core_2.10/artifactId
 version1.2.0/version
   /dependency
   dependency
 groupIdorg.apache.spark/groupId
 artifactIdspark-hive_2.10/artifactId
 version1.2.0/version
   /dependency
  
   When the dependencies are set to version 1.1.0, I do not get any
   errors.
   Here are the errors I am getting from artifactory for version 1.2.0
 of
   spark-core:
   error=Could not transfer artifact
   org.apache.spark\:spark-core_2.10\:pom\:1.2.0 from/to repo
   (https\://m2.mines.com\:8443/artifactory/repo)\: Failed to transfer
   file\:
  
   https\://m2.mines.com
 \:8443/artifactory/repo/org/apache/spark/spark-core_2.10/1.2.0/spark-core_2.10-1.2.0.pom.
   Return code is\: 409 , ReasonPhrase\:Conflict.
  
   The error is the same for spark-hive.
  
  
  
  
   --
   View this message in context:
  
 http://apache-spark-user-list.1001560.n3.nabble.com/Issues-with-maven-dependencies-for-version-1-2-0-but-not-version-1-1-0-tp21919.html
   Sent from the Apache Spark User List mailing list archive at
   Nabble.com.
  
  
 -
   To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
   For additional commands, e-mail: user-h...@spark.apache.org
  
 
 
 
  --
  Marcelo
 
 
 
 



 --
 Marcelo



Re: Where can I find more information about the R interface forSpark?

2015-03-04 Thread ??
you can search SparkR on google or search it on github

Re: Driver disassociated

2015-03-04 Thread Ted Yu
What release are you using ?

SPARK-3923 went into 1.2.0 release.

Cheers

On Wed, Mar 4, 2015 at 1:39 PM, Thomas Gerber thomas.ger...@radius.com
wrote:

 Hello,

 sometimes, in the *middle* of a job, the job stops (status is then seen
 as FINISHED in the master).

 There isn't anything wrong in the shell/submit output.

 When looking at the executor logs, I see logs like this:

 15/03/04 21:24:51 INFO MapOutputTrackerWorker: Doing the fetch; tracker
 actor = Actor[akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal
 :40019/user/MapOutputTracker#893807065]
 15/03/04 21:24:51 INFO MapOutputTrackerWorker: Don't have map outputs for
 shuffle 38, fetching them
 15/03/04 21:24:55 ERROR CoarseGrainedExecutorBackend: Driver Disassociated
 [akka.tcp://sparkExecutor@ip-10-0-11-9.ec2.internal:54766] -
 [akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:40019] disassociated!
 Shutting down.
 15/03/04 21:24:55 WARN ReliableDeliverySupervisor: Association with remote
 system [akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:40019] has
 failed, address is now gated for [5000] ms. Reason is: [Disassociated].

 How can I investigate further?
 Thanks



Re: Issues with maven dependencies for version 1.2.0 but not version 1.1.0

2015-03-04 Thread Kevin Peng
Ted,

I am currently using CDH 5.3 distro, which has Spark 1.2.0, so I am not too
sure about the compatibility issues between 1.2.0 and 1.2.1, that is why I
would want to stick to 1.2.0.



On Wed, Mar 4, 2015 at 4:25 PM, Ted Yu yuzhih...@gmail.com wrote:

 Kevin:
 You can try with 1.2.1

 See this thread: http://search-hadoop.com/m/JW1q5Vfe6X1

 Cheers

 On Wed, Mar 4, 2015 at 4:18 PM, Kevin Peng kpe...@gmail.com wrote:

 Marcelo,

 Yes that is correct, I am going through a mirror, but 1.1.0 works
 properly, while 1.2.0 does not.  I suspect there is crc in the 1.2.0 pom
 file.

 On Wed, Mar 4, 2015 at 4:10 PM, Marcelo Vanzin van...@cloudera.com
 wrote:

 Seems like someone set up m2.mines.com as a mirror in your pom file
 or ~/.m2/settings.xml, and it doesn't mirror Spark 1.2 (or does but is
 in a messed up state).

 On Wed, Mar 4, 2015 at 3:49 PM, kpeng1 kpe...@gmail.com wrote:
  Hi All,
 
  I am currently having problem with the maven dependencies for version
 1.2.0
  of spark-core and spark-hive.  Here are my dependencies:
  dependency
groupIdorg.apache.spark/groupId
artifactIdspark-core_2.10/artifactId
version1.2.0/version
  /dependency
  dependency
groupIdorg.apache.spark/groupId
artifactIdspark-hive_2.10/artifactId
version1.2.0/version
  /dependency
 
  When the dependencies are set to version 1.1.0, I do not get any
 errors.
  Here are the errors I am getting from artifactory for version 1.2.0 of
  spark-core:
  error=Could not transfer artifact
  org.apache.spark\:spark-core_2.10\:pom\:1.2.0 from/to repo
  (https\://m2.mines.com\:8443/artifactory/repo)\: Failed to transfer
 file\:
  https\://m2.mines.com
 \:8443/artifactory/repo/org/apache/spark/spark-core_2.10/1.2.0/spark-core_2.10-1.2.0.pom.
  Return code is\: 409 , ReasonPhrase\:Conflict.
 
  The error is the same for spark-hive.
 
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Issues-with-maven-dependencies-for-version-1-2-0-but-not-version-1-1-0-tp21919.html
  Sent from the Apache Spark User List mailing list archive at
 Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 



 --
 Marcelo






RDD coalesce or repartition by #records or #bytes?

2015-03-04 Thread Du Li
Hi,
My RDD's are created from kafka stream. After receiving a RDD, I want to do 
coalesce/repartition it so that the data will be processed in a set of machines 
in parallel as even as possible. The number of processing nodes is larger than 
the receiving nodes.
My question is how the coalesce/repartition works. Does it distribute by the 
number of records or number of bytes? In my app, my observation is that the 
distribution seems by number of records. The consequence is, however, some 
executors have to process x1000 as much as data when the sizes of records are 
very skewed. Then we have to allocate memory by the worst case.
Is there a way to programmatically affect the coalesce /repartition scheme?
Thanks,Du

how to update als in mllib?

2015-03-04 Thread lisendong
I 'm using spark1.0.0 with cloudera.

but I want to use new als code which supports more features, such as rdd
cache level(MEMORY ONLY), checkpoint, and so on.

What is the easiest way to use the new als code?

I only need the mllib als code, so maybe I don't need to update all the
spark  mllib  of the cluster machines...

maybe I download a new spark jar, and include it in my driver is enough?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/how-to-update-als-in-mllib-tp21921.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Driver disassociated

2015-03-04 Thread Thomas Gerber
1.2.1

Also, I was using the following parameters, which are 10 times the default
ones:
spark.akka.timeout 1000
spark.akka.heartbeat.pauses 6
spark.akka.failure-detector.threshold 3000.0
spark.akka.heartbeat.interval 1

which should have helped *avoid* the problem if I understand correctly.

Thanks,
Thomas

On Wed, Mar 4, 2015 at 3:21 PM, Ted Yu yuzhih...@gmail.com wrote:

 What release are you using ?

 SPARK-3923 went into 1.2.0 release.

 Cheers

 On Wed, Mar 4, 2015 at 1:39 PM, Thomas Gerber thomas.ger...@radius.com
 wrote:

 Hello,

 sometimes, in the *middle* of a job, the job stops (status is then seen
 as FINISHED in the master).

 There isn't anything wrong in the shell/submit output.

 When looking at the executor logs, I see logs like this:

 15/03/04 21:24:51 INFO MapOutputTrackerWorker: Doing the fetch; tracker
 actor = Actor[akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal
 :40019/user/MapOutputTracker#893807065]
 15/03/04 21:24:51 INFO MapOutputTrackerWorker: Don't have map outputs for
 shuffle 38, fetching them
 15/03/04 21:24:55 ERROR CoarseGrainedExecutorBackend: Driver
 Disassociated [akka.tcp://sparkExecutor@ip-10-0-11-9.ec2.internal:54766]
 - [akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:40019]
 disassociated! Shutting down.
 15/03/04 21:24:55 WARN ReliableDeliverySupervisor: Association with
 remote system [akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:40019]
 has failed, address is now gated for [5000] ms. Reason is: [Disassociated].

 How can I investigate further?
 Thanks





Re: Having lots of FetchFailedException in join

2015-03-04 Thread Jianshi Huang
One really interesting is that when I'm using the
netty-based spark.shuffle.blockTransferService, there's no OOM error
messages (java.lang.OutOfMemoryError: Java heap space).

Any idea why it's not here?

I'm using Spark 1.2.1.

Jianshi

On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang jianshi.hu...@gmail.com
wrote:

 I changed spark.shuffle.blockTransferService to nio and now I'm getting
 OOM errors, I'm doing a big join operation.


 15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0
 (TID 6207)
 java.lang.OutOfMemoryError: Java heap space
 at
 org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142)
 at
 org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74)
 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179)
 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178)
 at
 org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122)
 at
 org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121)
 at
 org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138)
 at
 org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
 at
 org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)
 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)
 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
 at
 scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
 at
 org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at
 org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at
 org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)

 Is join/cogroup still memory bound?


 Jianshi



 On Wed, Mar 4, 2015 at 2:11 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Hmm... ok, previous errors are still block fetch errors.

 15/03/03 10:22:40 ERROR RetryingBlockFetcher: Exception while beginning
 fetch of 11 outstanding blocks
 java.io.IOException: Failed to connect to host-/:55597
 at
 org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
 at
 org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
 at
 org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
 at
 org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
 at
 org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
 at
 org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87)
 at
 org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:149)
 at
 org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:289)
 at
 org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:53)
 at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
 at
 org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
 at
 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
 at 

Re: spark master shut down suddenly

2015-03-04 Thread Denny Lee
It depends on your setup but one of the locations is /var/log/mesos
On Wed, Mar 4, 2015 at 19:11 lisendong lisend...@163.com wrote:

 I ‘m sorry, but how to look at the mesos logs?
 where are them?



 在 2015年3月4日,下午6:06,Akhil Das ak...@sigmoidanalytics.com 写道:


 You can check in the mesos logs and see whats really happening.

 Thanks
 Best Regards

 On Wed, Mar 4, 2015 at 3:10 PM, lisendong lisend...@163.com wrote:

 15/03/04 09:26:36 INFO ClientCnxn: Client session timed out, have not
 heard
 from server in 26679ms for sessionid 0x34bbf3313a8001b, closing socket
 connection and attempting reconnect
 15/03/04 09:26:36 INFO ConnectionStateManager: State change: SUSPENDED
 15/03/04 09:26:36 INFO ZooKeeperLeaderElectionAgent: We have lost
 leadership
 15/03/04 09:26:36 ERROR Master: Leadership has been revoked -- master
 shutting down.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/spark-master-shut-down-suddenly-tp21907.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: distribution of receivers in spark streaming

2015-03-04 Thread Du Li
Figured it out: I need to override method preferredLocation() in MyReceiver 
class. 

 On Wednesday, March 4, 2015 3:35 PM, Du Li l...@yahoo-inc.com.INVALID 
wrote:
   

 Hi,
I have a set of machines (say 5) and want to evenly launch a number (say 8) of 
kafka receivers on those machines. In my code I did something like the 
following, as suggested in the spark docs:        val streams = (1 to 
numReceivers).map(_ = ssc.receiverStream(new MyKafkaReceiver()))        
ssc.union(streams)
However, from the spark UI, I saw that some machines are not running any 
instance of the receiver while some get three. The mapping changed every time 
the system was restarted. This impacts the receiving and also the processing 
speeds.
I wonder if it's possible to control/suggest the distribution so that it would 
be more even. How is the decision made in spark?
Thanks,Du



   

RE: distribution of receivers in spark streaming

2015-03-04 Thread Shao, Saisai
Hi Du,

You could try to sleep for several seconds after creating streaming context to 
let all the executors registered, then all the receivers can distribute to the 
nodes more evenly. Also setting locality is another way as you mentioned.

Thanks
Jerry


From: Du Li [mailto:l...@yahoo-inc.com.INVALID]
Sent: Thursday, March 5, 2015 1:50 PM
To: User
Subject: Re: distribution of receivers in spark streaming

Figured it out: I need to override method preferredLocation() in MyReceiver 
class.

On Wednesday, March 4, 2015 3:35 PM, Du Li 
l...@yahoo-inc.com.INVALIDmailto:l...@yahoo-inc.com.INVALID wrote:

Hi,

I have a set of machines (say 5) and want to evenly launch a number (say 8) of 
kafka receivers on those machines. In my code I did something like the 
following, as suggested in the spark docs:
val streams = (1 to numReceivers).map(_ = ssc.receiverStream(new 
MyKafkaReceiver()))
ssc.union(streams)

However, from the spark UI, I saw that some machines are not running any 
instance of the receiver while some get three. The mapping changed every time 
the system was restarted. This impacts the receiving and also the processing 
speeds.

I wonder if it's possible to control/suggest the distribution so that it would 
be more even. How is the decision made in spark?

Thanks,
Du





Re: Having lots of FetchFailedException in join

2015-03-04 Thread Jianshi Huang
There're some skew.

6461640SUCCESSPROCESS_LOCAL200 / 2015/03/04 23:45:471.1 min6 s198.6 MB21.1
GB240.8 MB5961590SUCCESSPROCESS_LOCAL30 / 2015/03/04 23:45:4744 s5 s200.7
MB4.8 GB154.0 MB
But I expect this kind of skewness to be quite common.

Jianshi


On Thu, Mar 5, 2015 at 3:48 PM, Jianshi Huang jianshi.hu...@gmail.com
wrote:

 I see. I'm using core's join. The data might have some skewness
 (checking).

 I understand shuffle can spill data to disk but when consuming it, say in
 cogroup or groupByKey, it still needs to read the whole group elements,
 right? I guess OOM happened there when reading very large groups.

 Jianshi

 On Thu, Mar 5, 2015 at 3:38 PM, Shao, Saisai saisai.s...@intel.com
 wrote:

  I think what you could do is to monitor through web UI to see if
 there’s any skew or other symptoms in shuffle write and read. For GC you
 could use the below configuration as you mentioned.



 From Spark core side, all the shuffle related operations can spill the
 data into disk and no need to read the whole partition into memory. But if
 you uses SparkSQL, it depends on how SparkSQL uses this operators.



 CC @hao if he has some thoughts on it.



 Thanks

 Jerry



 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Thursday, March 5, 2015 3:28 PM
 *To:* Shao, Saisai

 *Cc:* user
 *Subject:* Re: Having lots of FetchFailedException in join



 Hi Saisai,



 What's your suggested settings on monitoring shuffle? I've
 enabled -XX:+PrintGCDetails -XX:+PrintGCTimeStamps for GC logging.



 I found SPARK-3461 (Support external groupByKey using
 repartitionAndSortWithinPartitions) want to make groupByKey using external
 storage. It's still open status. Does that mean now
 groupByKey/cogroup/join(implemented as cogroup + flatmap) will still read
 the group as a whole during consuming?



 How can I deal with the key skewness in joins? Is there a skew-join
 implementation?





 Jianshi







 On Thu, Mar 5, 2015 at 2:44 PM, Shao, Saisai saisai.s...@intel.com
 wrote:

  Hi Jianshi,



 From my understanding, it may not be the problem of NIO or Netty, looking
 at your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap),
 theoretically EAOM can spill the data into disk if memory is not enough,
 but there might some issues when join key is skewed or key number is
 smaller, so you will meet OOM.



 Maybe you could monitor each stage or task’s shuffle and GC status also
 system status to identify the problem.



 Thanks

 Jerry



 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Thursday, March 5, 2015 2:32 PM
 *To:* Aaron Davidson
 *Cc:* user
 *Subject:* Re: Having lots of FetchFailedException in join



 One really interesting is that when I'm using the
 netty-based spark.shuffle.blockTransferService, there's no OOM error
 messages (java.lang.OutOfMemoryError: Java heap space).



 Any idea why it's not here?



 I'm using Spark 1.2.1.



 Jianshi



 On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

  I changed spark.shuffle.blockTransferService to nio and now I'm getting
 OOM errors, I'm doing a big join operation.





 15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0
 (TID 6207)

 java.lang.OutOfMemoryError: Java heap space

 at
 org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142)

 at
 org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74)

 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179)

 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178)

 at
 org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122)

 at
 org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121)

 at
 org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138)

 at
 org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)

 at
 org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)

 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)

 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)

 at
 scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)

 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

 at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

 at
 scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)

 at
 org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159)

 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)

 at 

RE: distribution of receivers in spark streaming

2015-03-04 Thread Shao, Saisai
Yes, hostname is enough.

I think currently it is hard for user code to get the worker list from 
standalone master. If you can get the Master object, you could get the worker 
list, but AFAIK may be it is difficult to get this object. All you could do is 
to manually get the worker list and assigned its hostname to each receiver.

Thanks
Jerry

From: Du Li [mailto:l...@yahoo-inc.com]
Sent: Thursday, March 5, 2015 2:29 PM
To: Shao, Saisai; User
Subject: Re: distribution of receivers in spark streaming

Hi Jerry,

Thanks for your response.

Is there a way to get the list of currently registered/live workers? Even in 
order to provide preferredLocation, it would be safer to know which workers are 
active. Guess I only need to provide the hostname, right?

Thanks,
Du

On Wednesday, March 4, 2015 10:08 PM, Shao, Saisai 
saisai.s...@intel.commailto:saisai.s...@intel.com wrote:

Hi Du,

You could try to sleep for several seconds after creating streaming context to 
let all the executors registered, then all the receivers can distribute to the 
nodes more evenly. Also setting locality is another way as you mentioned.

Thanks
Jerry


From: Du Li [mailto:l...@yahoo-inc.com.INVALID]
Sent: Thursday, March 5, 2015 1:50 PM
To: User
Subject: Re: distribution of receivers in spark streaming

Figured it out: I need to override method preferredLocation() in MyReceiver 
class.

On Wednesday, March 4, 2015 3:35 PM, Du Li 
l...@yahoo-inc.com.INVALIDmailto:l...@yahoo-inc.com.INVALID wrote:

Hi,

I have a set of machines (say 5) and want to evenly launch a number (say 8) of 
kafka receivers on those machines. In my code I did something like the 
following, as suggested in the spark docs:
val streams = (1 to numReceivers).map(_ = ssc.receiverStream(new 
MyKafkaReceiver()))
ssc.union(streams)

However, from the spark UI, I saw that some machines are not running any 
instance of the receiver while some get three. The mapping changed every time 
the system was restarted. This impacts the receiving and also the processing 
speeds.

I wonder if it's possible to control/suggest the distribution so that it would 
be more even. How is the decision made in spark?

Thanks,
Du






RE: Having lots of FetchFailedException in join

2015-03-04 Thread Shao, Saisai
Yes, if one key has too many values, there still has a chance to meet the OOM.

Thanks
Jerry

From: Jianshi Huang [mailto:jianshi.hu...@gmail.com]
Sent: Thursday, March 5, 2015 3:49 PM
To: Shao, Saisai
Cc: Cheng, Hao; user
Subject: Re: Having lots of FetchFailedException in join

I see. I'm using core's join. The data might have some skewness (checking).

I understand shuffle can spill data to disk but when consuming it, say in 
cogroup or groupByKey, it still needs to read the whole group elements, right? 
I guess OOM happened there when reading very large groups.

Jianshi

On Thu, Mar 5, 2015 at 3:38 PM, Shao, Saisai 
saisai.s...@intel.commailto:saisai.s...@intel.com wrote:
I think what you could do is to monitor through web UI to see if there’s any 
skew or other symptoms in shuffle write and read. For GC you could use the 
below configuration as you mentioned.

From Spark core side, all the shuffle related operations can spill the data 
into disk and no need to read the whole partition into memory. But if you uses 
SparkSQL, it depends on how SparkSQL uses this operators.

CC @hao if he has some thoughts on it.

Thanks
Jerry

From: Jianshi Huang 
[mailto:jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com]
Sent: Thursday, March 5, 2015 3:28 PM
To: Shao, Saisai

Cc: user
Subject: Re: Having lots of FetchFailedException in join

Hi Saisai,

What's your suggested settings on monitoring shuffle? I've enabled 
-XX:+PrintGCDetails -XX:+PrintGCTimeStamps for GC logging.

I found SPARK-3461 (Support external groupByKey using 
repartitionAndSortWithinPartitions) want to make groupByKey using external 
storage. It's still open status. Does that mean now 
groupByKey/cogroup/join(implemented as cogroup + flatmap) will still read the 
group as a whole during consuming?

How can I deal with the key skewness in joins? Is there a skew-join 
implementation?


Jianshi



On Thu, Mar 5, 2015 at 2:44 PM, Shao, Saisai 
saisai.s...@intel.commailto:saisai.s...@intel.com wrote:
Hi Jianshi,

From my understanding, it may not be the problem of NIO or Netty, looking at 
your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap), 
theoretically EAOM can spill the data into disk if memory is not enough, but 
there might some issues when join key is skewed or key number is smaller, so 
you will meet OOM.

Maybe you could monitor each stage or task’s shuffle and GC status also system 
status to identify the problem.

Thanks
Jerry

From: Jianshi Huang 
[mailto:jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com]
Sent: Thursday, March 5, 2015 2:32 PM
To: Aaron Davidson
Cc: user
Subject: Re: Having lots of FetchFailedException in join

One really interesting is that when I'm using the netty-based 
spark.shuffle.blockTransferService, there's no OOM error messages 
(java.lang.OutOfMemoryError: Java heap space).

Any idea why it's not here?

I'm using Spark 1.2.1.

Jianshi

On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang 
jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com wrote:
I changed spark.shuffle.blockTransferService to nio and now I'm getting OOM 
errors, I'm doing a big join operation.


15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0 (TID 
6207)
java.lang.OutOfMemoryError: Java heap space
at 
org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142)
at 
org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121)
at 
org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138)
at 
org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at 

Re: scala.Double vs java.lang.Double in RDD

2015-03-04 Thread Tobias Pfeiffer
Hi,

On Thu, Mar 5, 2015 at 12:20 AM, Imran Rashid iras...@cloudera.com wrote:

 This doesn't involve spark at all, I think this is entirely an issue with
 how scala deals w/ primitives and boxing.  Often it can hide the details
 for you, but IMO it just leads to far more confusing errors when things
 don't work out.  The issue here is that your map has value type Any, which
 leads scala to leave it as a boxed java.lang.Double.


I see, thank you very much for your explanation and the code examples!
Helps very much!

Thanks
Tobias


In the HA master mode, how to identify the alive master?

2015-03-04 Thread Xuelin Cao
Hi,

  In our project, we use stand alone duo master + zookeeper to make
the HA of spark master.

  Now the problem is, how do we know which master is the current alive
master?

  We tried to read the info that the master stored in zookeeper. But we
found there is no information to identify the current alive master.

  Any suggestions for us?

Thanks


Re: TreeNodeException: Unresolved attributes

2015-03-04 Thread Zhan Zhang
Which spark version did you use? I tried spark-1.2.1 and didn’t meet this 
problem.

scala val m = hiveContext.sql( select * from  testtable where value like 
'%Restaurant%')
15/03/05 02:02:30 INFO ParseDriver: Parsing command: select * from  testtable 
where value like '%Restaurant%'
15/03/05 02:02:30 INFO ParseDriver: Parse Completed
15/03/05 02:02:30 INFO MemoryStore: ensureFreeSpace(462299) called with 
curMem=1087888, maxMem=280248975
15/03/05 02:02:30 INFO MemoryStore: Block broadcast_2 stored as values in 
memory (estimated size 451.5 KB, free 265.8 MB)
15/03/05 02:02:30 INFO MemoryStore: ensureFreeSpace(81645) called with 
curMem=1550187, maxMem=280248975
15/03/05 02:02:30 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in 
memory (estimated size 79.7 KB, free 265.7 MB)
15/03/05 02:02:30 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 
c6402.ambari.apache.orghttp://c6402.ambari.apache.org:33696 (size: 79.7 KB, 
free: 267.0 MB)
15/03/05 02:02:30 INFO BlockManagerMaster: Updated info of block 
broadcast_2_piece0
15/03/05 02:02:30 INFO DefaultExecutionContext: Created broadcast 2 from 
broadcast at TableReader.scala:68
m: org.apache.spark.sql.SchemaRDD =
SchemaRDD[3] at RDD at SchemaRDD.scala:108
== Query Plan ==
== Physical Plan ==
Filter Contains(value#5, Restaurant)
 HiveTableScan [key#4,value#5], (MetastoreRelation default, testtable, None), 
None

scala


Thanks.

Zhan Zhang

On Mar 4, 2015, at 9:09 AM, Anusha Shamanur 
anushas...@gmail.commailto:anushas...@gmail.com wrote:

I tried. I still get the same error.

15/03/04 09:01:50 INFO parse.ParseDriver: Parsing command: select * from 
TableName where value like '%Restaurant%'

15/03/04 09:01:50 INFO parse.ParseDriver: Parse Completed.

15/03/04 09:01:50 INFO metastore.HiveMetaStore: 0: get_table : db=default 
tbl=TableName

15/03/04 09:01:50 INFO HiveMetaStore.audit: ugi=as7339 ip=unknown-ip-addr 
cmd=get_table : db=default tbl=TableName
results: org.apache.spark.sql.SchemaRDD =

SchemaRDD[86] at RDD at SchemaRDD.scala:108
== Query Plan ==

== Physical Plan ==

org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved 
attributes: *, tree:

'Project [*]

'Filter ('value LIKE Restaurant)
  MetastoreRelation default, TableName, None



On Wed, Mar 4, 2015 at 5:39 AM, Arush Kharbanda 
ar...@sigmoidanalytics.commailto:ar...@sigmoidanalytics.com wrote:
Why don't you formulate a string before you pass it to the hql function 
(appending strings), and hql function is deprecated. You should use sql.

http://spark.apache.org/docs/1.1.0/api/scala/index.html#org.apache.spark.sql.hive.HiveContext

On Wed, Mar 4, 2015 at 6:15 AM, Anusha Shamanur 
anushas...@gmail.commailto:anushas...@gmail.com wrote:
Hi,

I am trying to run a simple select query on a table.

val restaurants=hiveCtx.hql(select * from TableName where column like 
'%SomeString%' )
This gives an error as below:
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved 
attributes: *, tree:

How do I solve this?


--
Regards,
Anusha



--

[Sigmoid Analytics]http://htmlsig.com/www.sigmoidanalytics.com

Arush Kharbanda || Technical Teamlead

ar...@sigmoidanalytics.commailto:ar...@sigmoidanalytics.com || 
www.sigmoidanalytics.comhttp://www.sigmoidanalytics.com/



--
Regards,
Anusha



Re: Where can I find more information about the R interface forSpark?

2015-03-04 Thread Ted Yu
Please follow SPARK-5654

On Wed, Mar 4, 2015 at 7:22 PM, Haopu Wang hw...@qilinsoft.com wrote:

  Thanks, it's an active project.



 Will it be released with Spark 1.3.0?


  --

 *From:* 鹰 [mailto:980548...@qq.com]
 *Sent:* Thursday, March 05, 2015 11:19 AM
 *To:* Haopu Wang; user
 *Subject:* Re: Where can I find more information about the R interface
 forSpark?



 you can search SparkR on google or search it on github



RE: Passing around SparkContext with in the Driver

2015-03-04 Thread Kapil Malik
Replace 
val sqlContext = new SQLContext(sparkContext)
with

@transient 
val sqlContext = new SQLContext(sparkContext)

-Original Message-
From: kpeng1 [mailto:kpe...@gmail.com] 
Sent: 04 March 2015 23:39
To: user@spark.apache.org
Subject: Passing around SparkContext with in the Driver

Hi All,

I am trying to create a class that wraps functionalities that I need; some of 
these functions require access to the SparkContext, which I would like to pass 
in.  I know that the SparkContext is not seralizable, and I am not planning on 
passing it to worker nodes or anything, I just want to wrap some 
functionalities that require SparkContext's api.  As a preface, I am basically 
using the spark shell to test the functionality of my code at the moment, so I 
am not sure if that plays into any of the issues I am having. 
Here is my current class:

class MyClass(sparkContext: SparkContext) {
  import org.apache.spark.sql._
  import org.apache.spark.rdd._

  val sqlContext = new SQLContext(sparkContext)

  val DATA_TYPE_MAPPING = Map(
int - IntegerType,
double - DoubleType,
float - FloatType,
long - LongType,
short - ShortType,
binary - BinaryType,
bool - BooleanType,
byte - ByteType,
string - StringType)

  //removes the first line of a text file
  def removeHeader(partitionIdx: Int, fileItr: Iterator[String]):
Iterator[String] ={
//header line is first line in first partition
if(partitionIdx == 0){
  fileItr.drop(1)
}
fileItr
  }

  //returns back a StructType for the schema
  def getSchema(rawSchema: Array[String]): StructType ={
//return backs a StructField
def getSchemaFieldHelper(schemaField: String): StructField ={
  val schemaParts = schemaField.split(' ')
  StructField(schemaParts(0), DATA_TYPE_MAPPING(schemaParts(1)), true)
}

val structFields = rawSchema.map(column = getSchemaFieldHelper(column))
StructType(structFields)
  }

  def getRow(strRow: String): Row ={
val spRow = strRow.split(',')
val tRow = spRow.map(_.trim)
Row(tRow:_*)
  }
  def applySchemaToCsv(csvFile: String, includeHeader: Boolean, schemaFile:
String): SchemaRDD ={
//apply schema to rdd to create schemaRDD
def createSchemaRDD(csvRDD: RDD[Row], schemaStruct: StructType):
SchemaRDD ={
  val schemaRDD = sqlContext.applySchema(csvRDD, schemaStruct)
  schemaRDD
}
  
val rawSchema = sparkContext.textFile(schemaFile).collect
val schema = getSchema(rawSchema)
  
val rawCsvData = sparkContext.textFile(csvFile)
  
//if we want to keep header from csv file
if(includeHeader){
  val rowRDD = rawCsvData.map(getRow) 
  val schemaRDD = createSchemaRDD(rowRDD, schema)
  return schemaRDD
}
 
val csvData = rawCsvData.mapPartitionsWithIndex(removeHeader)
val rowRDD = csvData.map(getRow)
val schemaRDD = createSchemaRDD(rowRDD, schema)
schemaRDD
  }

}

So in the spark shell I am basically creating an instance of this class and 
calling applySchemaToCsv like so:
val test = new MyClass(sc)
test.applySchemaToCsv(/tmp/myFile.csv, false, /tmp/schema.txt)

What I am getting is not serializable exception:
15/03/04 09:40:56 INFO SparkContext: Created broadcast 2 from textFile at
console:62
org.apache.spark.SparkException: Task not serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1435)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:615)
   .
   .
   .
Caused by: java.io.NotSerializableException:


If I remove the class wrapper and make references to sc directly everything 
works.  I am basically wondering what is causing the serialization issues and 
if I can wrap a class around these functions.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Passing-around-SparkContext-with-in-the-Driver-tp21913.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Unable to Read/Write Avro RDD on cluster.

2015-03-04 Thread ๏̯͡๏
I am trying to read RDD avro, transform and write.
I am able to run it locally fine but when i run onto cluster, i see issues
with Avro.


export SPARK_HOME=/home/dvasthimal/spark/spark-1.0.2-bin-2.4.1
export SPARK_YARN_USER_ENV=CLASSPATH=/apache/hadoop/conf
export HADOOP_CONF_DIR=/apache/hadoop/conf
export YARN_CONF_DIR=/apache/hadoop/conf
export SPARK_JAR=$SPARK_HOME/lib/spark-assembly-1.0.2-hadoop2.4.1.jar
export SPARK_LIBRARY_PATH=/apache/hadoop/lib/native
export SPARK_YARN_USER_ENV=CLASSPATH=/apache/hadoop/conf
export SPARK_YARN_USER_ENV=CLASSPATH=/apache/hadoop/conf
export
SPARK_CLASSPATH=/apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-company-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/home/dvasthimal/spark/avro-mapred-1.7.7-hadoop2.jar:/home/dvasthimal/spark/avro-1.7.7.jar
export SPARK_LIBRARY_PATH=/apache/hadoop/lib/native
export YARN_CONF_DIR=/apache/hadoop/conf/

cd $SPARK_HOME

./bin/spark-submit --master yarn-cluster --jars
/home/dvasthimal/spark/avro-mapred-1.7.7-hadoop2.jar,/home/dvasthimal/spark/avro-1.7.7.jar
--num-executors 3 --driver-memory 4g --executor-memory 2g --executor-cores
1  --queue hdmi-spark --class com.company.ep.poc.spark.reporting.SparkApp
/home/dvasthimal/spark/spark_reporting-1.0-SNAPSHOT.jar
startDate=2015-02-16 endDate=2015-02-16
epoutputdirectory=/user/dvasthimal/epdatasets_small/exptsession
subcommand=successevents
outputdir=/user/dvasthimal/epdatasets/successdetail

Spark assembly has been built with Hive, including Datanucleus jars on
classpath
15/03/04 03:20:29 INFO client.ConfiguredRMFailoverProxyProvider: Failing
over to rm2
15/03/04 03:20:30 INFO yarn.Client: Got Cluster metric info from
ApplicationsManager (ASM), number of NodeManagers: 2221
15/03/04 03:20:30 INFO yarn.Client: Queue info ... queueName: hdmi-spark,
queueCurrentCapacity: 0.7162806, queueMaxCapacity: 0.08,
  queueApplicationCount = 7, queueChildQueueCount = 0
15/03/04 03:20:30 INFO yarn.Client: Max mem capabililty of a single
resource in this cluster 16384
15/03/04 03:20:30 INFO yarn.Client: Preparing Local resources
15/03/04 03:20:30 WARN util.NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
15/03/04 03:20:30 WARN hdfs.BlockReaderLocal: The short-circuit local reads
feature cannot be used because libhadoop cannot be loaded.


15/03/04 03:20:46 INFO hdfs.DFSClient: Created HDFS_DELEGATION_TOKEN token
7780745 for dvasthimal on 10.115.206.112:8020
15/03/04 03:20:46 INFO yarn.Client: Uploading
file:/home/dvasthimal/spark/spark_reporting-1.0-SNAPSHOT.jar to hdfs://
apollo-phx-nn.company.com:8020/user/dvasthimal/.sparkStaging/application_1425075571333_61948/spark_reporting-1.0-SNAPSHOT.jar
15/03/04 03:20:47 INFO yarn.Client: Uploading
file:/home/dvasthimal/spark/spark-1.0.2-bin-2.4.1/lib/spark-assembly-1.0.2-hadoop2.4.1.jar
to hdfs://
apollo-phx-nn.company.com:8020/user/dvasthimal/.sparkStaging/application_1425075571333_61948/spark-assembly-1.0.2-hadoop2.4.1.jar
15/03/04 03:20:52 INFO yarn.Client: Uploading
file:/home/dvasthimal/spark/avro-mapred-1.7.7-hadoop2.jar to hdfs://
apollo-phx-nn.company.com:8020/user/dvasthimal/.sparkStaging/application_1425075571333_61948/avro-mapred-1.7.7-hadoop2.jar
15/03/04 03:20:52 INFO yarn.Client: Uploading
file:/home/dvasthimal/spark/avro-1.7.7.jar to hdfs://
apollo-phx-nn.company.com:8020/user/dvasthimal/.sparkStaging/application_1425075571333_61948/avro-1.7.7.jar
15/03/04 03:20:54 INFO yarn.Client: Setting up the launch environment
15/03/04 03:20:54 INFO yarn.Client: Setting up container launch context
15/03/04 03:20:54 INFO yarn.Client: Command for starting the Spark
ApplicationMaster: List($JAVA_HOME/bin/java, -server, -Xmx4096m,
-Djava.io.tmpdir=$PWD/tmp,
-Dspark.app.name=\com.company.ep.poc.spark.reporting.SparkApp\,
 -Dlog4j.configuration=log4j-spark-container.properties,
org.apache.spark.deploy.yarn.ApplicationMaster, --class,
com.company.ep.poc.spark.reporting.SparkApp, --jar ,
file:/home/dvasthimal/spark/spark_reporting-1.0-SNAPSHOT.jar,  --args
 'startDate=2015-02-16'  --args  'endDate=2015-02-16'  --args
 'epoutputdirectory=/user/dvasthimal/epdatasets_small/exptsession'  --args
 'subcommand=successevents'  --args
 'outputdir=/user/dvasthimal/epdatasets/successdetail' , --executor-memory,
2048, --executor-cores, 1, --num-executors , 3, 1, LOG_DIR/stdout, 2,
LOG_DIR/stderr)
15/03/04 03:20:54 INFO yarn.Client: Submitting application to ASM
15/03/04 03:20:54 INFO impl.YarnClientImpl: Submitted application
application_1425075571333_61948
15/03/04 03:20:56 INFO yarn.Client: Application report from ASM:
 application identifier: application_1425075571333_61948
 appId: 61948
 clientToAMToken: null
 appDiagnostics:
 appMasterHost: N/A
 appQueue: hdmi-spark
 appMasterRpcPort: -1
 appStartTime: 1425464454263
 yarnAppState: ACCEPTED
 distributedFinalState: UNDEFINED
 appTrackingUrl:
https://apollo-phx-rm-2.company.com:50030/proxy/application_1425075571333_61948/
 appUser: dvasthimal

Re: distribution of receivers in spark streaming

2015-03-04 Thread Du Li
Hi Jerry,
Thanks for your response.
Is there a way to get the list of currently registered/live workers? Even in 
order to provide preferredLocation, it would be safer to know which workers are 
active. Guess I only need to provide the hostname, right?
Thanks,Du 

 On Wednesday, March 4, 2015 10:08 PM, Shao, Saisai 
saisai.s...@intel.com wrote:
   

 #yiv8205255497 #yiv8205255497 -- _filtered #yiv8205255497 
{font-family:Helvetica;panose-1:2 11 6 4 2 2 2 2 2 4;} _filtered #yiv8205255497 
{font-family:SimSun;panose-1:2 1 6 0 3 1 1 1 1 1;} _filtered #yiv8205255497 
{panose-1:2 4 5 3 5 4 6 3 2 4;} _filtered #yiv8205255497 
{font-family:Calibri;panose-1:2 15 5 2 2 2 4 3 2 4;} _filtered #yiv8205255497 
{panose-1:2 1 6 0 3 1 1 1 1 1;}#yiv8205255497 #yiv8205255497 
p.yiv8205255497MsoNormal, #yiv8205255497 li.yiv8205255497MsoNormal, 
#yiv8205255497 div.yiv8205255497MsoNormal 
{margin:0in;margin-bottom:.0001pt;font-size:12.0pt;}#yiv8205255497 a:link, 
#yiv8205255497 span.yiv8205255497MsoHyperlink 
{color:#0563C1;text-decoration:underline;}#yiv8205255497 a:visited, 
#yiv8205255497 span.yiv8205255497MsoHyperlinkFollowed 
{color:#954F72;text-decoration:underline;}#yiv8205255497 
span.yiv8205255497EmailStyle17 {color:#1F497D;}#yiv8205255497 
.yiv8205255497MsoChpDefault {font-size:10.0pt;} _filtered #yiv8205255497 
{margin:1.0in 1.0in 1.0in 1.0in;}#yiv8205255497 div.yiv8205255497WordSection1 
{}#yiv8205255497 Hi Du,    You could try to sleep for several seconds after 
creating streaming context to let all the executors registered, then all the 
receivers can distribute to the nodes more evenly. Also setting locality is 
another way as you mentioned.    Thanks Jerry       From: Du Li 
[mailto:l...@yahoo-inc.com.INVALID]
Sent: Thursday, March 5, 2015 1:50 PM
To: User
Subject: Re: distribution of receivers in spark streaming    Figured it out: I 
need to override method preferredLocation() in MyReceiver class.    On 
Wednesday, March 4, 2015 3:35 PM, Du Li l...@yahoo-inc.com.INVALID wrote:    
Hi,    I have a set of machines (say 5) and want to evenly launch a number (say 
8) of kafka receivers on those machines. In my code I did something like the 
following, as suggested in the spark docs:         val streams = (1 to 
numReceivers).map(_ = ssc.receiverStream(new MyKafkaReceiver()))         
ssc.union(streams)    However, from the spark UI, I saw that some machines are 
not running any instance of the receiver while some get three. The mapping 
changed every time the system was restarted. This impacts the receiving and 
also the processing speeds.    I wonder if it's possible to control/suggest the 
distribution so that it would be more even. How is the decision made in spark?  
  Thanks, Du          

   

RE: Having lots of FetchFailedException in join

2015-03-04 Thread Shao, Saisai
Hi Jianshi,

From my understanding, it may not be the problem of NIO or Netty, looking at 
your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap), 
theoretically EAOM can spill the data into disk if memory is not enough, but 
there might some issues when join key is skewed or key number is smaller, so 
you will meet OOM.

Maybe you could monitor each stage or task’s shuffle and GC status also system 
status to identify the problem.

Thanks
Jerry

From: Jianshi Huang [mailto:jianshi.hu...@gmail.com]
Sent: Thursday, March 5, 2015 2:32 PM
To: Aaron Davidson
Cc: user
Subject: Re: Having lots of FetchFailedException in join

One really interesting is that when I'm using the netty-based 
spark.shuffle.blockTransferService, there's no OOM error messages 
(java.lang.OutOfMemoryError: Java heap space).

Any idea why it's not here?

I'm using Spark 1.2.1.

Jianshi

On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang 
jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com wrote:
I changed spark.shuffle.blockTransferService to nio and now I'm getting OOM 
errors, I'm doing a big join operation.


15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0 (TID 
6207)
java.lang.OutOfMemoryError: Java heap space
at 
org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142)
at 
org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121)
at 
org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138)
at 
org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at 
org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at 
org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)

Is join/cogroup still memory bound?


Jianshi



On Wed, Mar 4, 2015 at 2:11 PM, Jianshi Huang 
jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com wrote:
Hmm... ok, previous errors are still block fetch errors.

15/03/03 10:22:40 ERROR RetryingBlockFetcher: Exception while beginning fetch 
of 11 outstanding blocks
java.io.IOException: Failed to connect to host-/:55597
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
at 
org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
at 
org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87)
 

Re: Having lots of FetchFailedException in join

2015-03-04 Thread Jianshi Huang
Hi Saisai,

What's your suggested settings on monitoring shuffle? I've
enabled -XX:+PrintGCDetails -XX:+PrintGCTimeStamps for GC logging.

I found SPARK-3461 (Support external groupByKey using
repartitionAndSortWithinPartitions) want to make groupByKey using external
storage. It's still open status. Does that mean now
groupByKey/cogroup/join(implemented as cogroup + flatmap) will still read
the group as a whole during consuming?


How can I deal with the key skewness in joins? Is there a skew-join
implementation?



Jianshi



On Thu, Mar 5, 2015 at 2:44 PM, Shao, Saisai saisai.s...@intel.com wrote:

  Hi Jianshi,



 From my understanding, it may not be the problem of NIO or Netty, looking
 at your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap),
 theoretically EAOM can spill the data into disk if memory is not enough,
 but there might some issues when join key is skewed or key number is
 smaller, so you will meet OOM.



 Maybe you could monitor each stage or task’s shuffle and GC status also
 system status to identify the problem.



 Thanks

 Jerry



 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Thursday, March 5, 2015 2:32 PM
 *To:* Aaron Davidson
 *Cc:* user
 *Subject:* Re: Having lots of FetchFailedException in join



 One really interesting is that when I'm using the
 netty-based spark.shuffle.blockTransferService, there's no OOM error
 messages (java.lang.OutOfMemoryError: Java heap space).



 Any idea why it's not here?



 I'm using Spark 1.2.1.



 Jianshi



 On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

  I changed spark.shuffle.blockTransferService to nio and now I'm getting
 OOM errors, I'm doing a big join operation.





 15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0
 (TID 6207)

 java.lang.OutOfMemoryError: Java heap space

 at
 org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142)

 at
 org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74)

 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179)

 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178)

 at
 org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122)

 at
 org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121)

 at
 org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138)

 at
 org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)

 at
 org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)

 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)

 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)

 at
 scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)

 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

 at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

 at
 scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)

 at
 org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159)

 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)

 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)

 at
 org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)

 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)

 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)

 at
 org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)

 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)

 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)

 at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)

 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)

 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)

 at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)

 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)

 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)

 at
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)

 at org.apache.spark.scheduler.Task.run(Task.scala:56)



 Is join/cogroup still memory bound?





 Jianshi







 On Wed, Mar 4, 2015 at 2:11 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

  Hmm... ok, previous errors are still block fetch errors.



 15/03/03 10:22:40 ERROR RetryingBlockFetcher: Exception while beginning
 fetch of 11 outstanding blocks

 java.io.IOException: Failed to connect to host-/:55597

  

RE: Having lots of FetchFailedException in join

2015-03-04 Thread Shao, Saisai
I think what you could do is to monitor through web UI to see if there’s any 
skew or other symptoms in shuffle write and read. For GC you could use the 
below configuration as you mentioned.

From Spark core side, all the shuffle related operations can spill the data 
into disk and no need to read the whole partition into memory. But if you uses 
SparkSQL, it depends on how SparkSQL uses this operators.

CC @hao if he has some thoughts on it.

Thanks
Jerry

From: Jianshi Huang [mailto:jianshi.hu...@gmail.com]
Sent: Thursday, March 5, 2015 3:28 PM
To: Shao, Saisai
Cc: user
Subject: Re: Having lots of FetchFailedException in join

Hi Saisai,

What's your suggested settings on monitoring shuffle? I've enabled 
-XX:+PrintGCDetails -XX:+PrintGCTimeStamps for GC logging.

I found SPARK-3461 (Support external groupByKey using 
repartitionAndSortWithinPartitions) want to make groupByKey using external 
storage. It's still open status. Does that mean now 
groupByKey/cogroup/join(implemented as cogroup + flatmap) will still read the 
group as a whole during consuming?

How can I deal with the key skewness in joins? Is there a skew-join 
implementation?


Jianshi



On Thu, Mar 5, 2015 at 2:44 PM, Shao, Saisai 
saisai.s...@intel.commailto:saisai.s...@intel.com wrote:
Hi Jianshi,

From my understanding, it may not be the problem of NIO or Netty, looking at 
your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap), 
theoretically EAOM can spill the data into disk if memory is not enough, but 
there might some issues when join key is skewed or key number is smaller, so 
you will meet OOM.

Maybe you could monitor each stage or task’s shuffle and GC status also system 
status to identify the problem.

Thanks
Jerry

From: Jianshi Huang 
[mailto:jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com]
Sent: Thursday, March 5, 2015 2:32 PM
To: Aaron Davidson
Cc: user
Subject: Re: Having lots of FetchFailedException in join

One really interesting is that when I'm using the netty-based 
spark.shuffle.blockTransferService, there's no OOM error messages 
(java.lang.OutOfMemoryError: Java heap space).

Any idea why it's not here?

I'm using Spark 1.2.1.

Jianshi

On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang 
jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com wrote:
I changed spark.shuffle.blockTransferService to nio and now I'm getting OOM 
errors, I'm doing a big join operation.


15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0 (TID 
6207)
java.lang.OutOfMemoryError: Java heap space
at 
org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142)
at 
org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121)
at 
org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138)
at 
org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at 
org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at 
org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at 

Re: Having lots of FetchFailedException in join

2015-03-04 Thread Jianshi Huang
I see. I'm using core's join. The data might have some skewness (checking).

I understand shuffle can spill data to disk but when consuming it, say in
cogroup or groupByKey, it still needs to read the whole group elements,
right? I guess OOM happened there when reading very large groups.

Jianshi

On Thu, Mar 5, 2015 at 3:38 PM, Shao, Saisai saisai.s...@intel.com wrote:

  I think what you could do is to monitor through web UI to see if there’s
 any skew or other symptoms in shuffle write and read. For GC you could use
 the below configuration as you mentioned.



 From Spark core side, all the shuffle related operations can spill the
 data into disk and no need to read the whole partition into memory. But if
 you uses SparkSQL, it depends on how SparkSQL uses this operators.



 CC @hao if he has some thoughts on it.



 Thanks

 Jerry



 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Thursday, March 5, 2015 3:28 PM
 *To:* Shao, Saisai

 *Cc:* user
 *Subject:* Re: Having lots of FetchFailedException in join



 Hi Saisai,



 What's your suggested settings on monitoring shuffle? I've
 enabled -XX:+PrintGCDetails -XX:+PrintGCTimeStamps for GC logging.



 I found SPARK-3461 (Support external groupByKey using
 repartitionAndSortWithinPartitions) want to make groupByKey using external
 storage. It's still open status. Does that mean now
 groupByKey/cogroup/join(implemented as cogroup + flatmap) will still read
 the group as a whole during consuming?



 How can I deal with the key skewness in joins? Is there a skew-join
 implementation?





 Jianshi







 On Thu, Mar 5, 2015 at 2:44 PM, Shao, Saisai saisai.s...@intel.com
 wrote:

  Hi Jianshi,



 From my understanding, it may not be the problem of NIO or Netty, looking
 at your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap),
 theoretically EAOM can spill the data into disk if memory is not enough,
 but there might some issues when join key is skewed or key number is
 smaller, so you will meet OOM.



 Maybe you could monitor each stage or task’s shuffle and GC status also
 system status to identify the problem.



 Thanks

 Jerry



 *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
 *Sent:* Thursday, March 5, 2015 2:32 PM
 *To:* Aaron Davidson
 *Cc:* user
 *Subject:* Re: Having lots of FetchFailedException in join



 One really interesting is that when I'm using the
 netty-based spark.shuffle.blockTransferService, there's no OOM error
 messages (java.lang.OutOfMemoryError: Java heap space).



 Any idea why it's not here?



 I'm using Spark 1.2.1.



 Jianshi



 On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

  I changed spark.shuffle.blockTransferService to nio and now I'm getting
 OOM errors, I'm doing a big join operation.





 15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0
 (TID 6207)

 java.lang.OutOfMemoryError: Java heap space

 at
 org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142)

 at
 org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74)

 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179)

 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178)

 at
 org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122)

 at
 org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121)

 at
 org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138)

 at
 org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)

 at
 org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)

 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)

 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)

 at
 scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)

 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

 at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

 at
 scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)

 at
 org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159)

 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)

 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)

 at
 org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)

 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)

 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)

 at
 org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)

 at 

using log4j2 with spark

2015-03-04 Thread Lior Chaga
Hi,
Trying to run spark 1.2.1 w/ hadoop 1.0.4 on cluster and configure it to
run with log4j2.
Problem is that spark-assembly.jar contains log4j and slf4j classes
compatible with log4j 1.2 in it, and so it detects it should use log4j 1.2 (
https://github.com/apache/spark/blob/54e7b456dd56c9e52132154e699abca87563465b/core/src/main/scala/org/apache/spark/Logging.scala
on line 121).

Is there a maven profile for building spark-assembly w/out the log4j
dependencies, or any other way I can force spark to use log4j2?

Thanks!
Lior


How to parse Json formatted Kafka message in spark streaming

2015-03-04 Thread Cui Lin
Friends,

I'm trying to parse json formatted Kafka messages and then send back to 
cassandra.I have two problems:

  1.  I got the exception below. How to check an empty RDD?

Exception in thread main java.lang.UnsupportedOperationException: empty 
collection
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:869)
at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57)
at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232)
at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204)


val messages = KafkaUtils.createStream[String, String, StringDecoder, 
StringDecoder](…)

messages.foreachRDD { rdd =
  val message:RDD[String] = rdd.map { y = y._2 }
  sqlContext.jsonRDD(message).registerTempTable(tempTable)
  sqlContext.sql(SELECT time,To FROM tempTable)
.saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns(key, 
msg))
}

2. how to get all column names from json messages? I have hundreds of columns 
in the json formatted message.

Thanks for your help!




Best regards,

Cui Lin


Re: spark master shut down suddenly

2015-03-04 Thread Benjamin Stickel
Generally the location of logs in /var/log/mesos but the definitive
configuration can be found via the /etc/mesos-master/... configuration
files. There should be a configuration file labeled log_dir.

ps -ax | grep mesos should also show the output of the configuration if it
is configured.

Another location to review would potentially be /etc/default/mesos-master



On Wed, Mar 4, 2015 at 9:31 PM, Denny Lee denny.g@gmail.com wrote:

 It depends on your setup but one of the locations is /var/log/mesos
 On Wed, Mar 4, 2015 at 19:11 lisendong lisend...@163.com wrote:

 I ‘m sorry, but how to look at the mesos logs?
 where are them?



 在 2015年3月4日,下午6:06,Akhil Das ak...@sigmoidanalytics.com 写道:


 You can check in the mesos logs and see whats really happening.

 Thanks
 Best Regards

 On Wed, Mar 4, 2015 at 3:10 PM, lisendong lisend...@163.com wrote:

 15/03/04 09:26:36 INFO ClientCnxn: Client session timed out, have not
 heard
 from server in 26679ms for sessionid 0x34bbf3313a8001b, closing socket
 connection and attempting reconnect
 15/03/04 09:26:36 INFO ConnectionStateManager: State change: SUSPENDED
 15/03/04 09:26:36 INFO ZooKeeperLeaderElectionAgent: We have lost
 leadership
 15/03/04 09:26:36 ERROR Master: Leadership has been revoked -- master
 shutting down.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/spark-master-shut-down-suddenly-tp21907.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Having lots of FetchFailedException in join

2015-03-04 Thread Jianshi Huang
I changed spark.shuffle.blockTransferService to nio and now I'm getting OOM
errors, I'm doing a big join operation.


15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0 (TID
6207)
java.lang.OutOfMemoryError: Java heap space
at
org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142)
at
org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74)
at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179)
at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178)
at
org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122)
at
org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121)
at
org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138)
at
org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
at
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)
at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)
at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at
org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at
org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)

Is join/cogroup still memory bound?


Jianshi



On Wed, Mar 4, 2015 at 2:11 PM, Jianshi Huang jianshi.hu...@gmail.com
wrote:

 Hmm... ok, previous errors are still block fetch errors.

 15/03/03 10:22:40 ERROR RetryingBlockFetcher: Exception while beginning
 fetch of 11 outstanding blocks
 java.io.IOException: Failed to connect to host-/:55597
 at
 org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
 at
 org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
 at
 org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
 at
 org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
 at
 org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
 at
 org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87)
 at
 org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:149)
 at
 org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:289)
 at
 org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:53)
 at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
 at
 org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
 at
 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at
 org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125)
 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)
 at
 

Re: Parallel execution of JavaDStream/JavaPairDStream

2015-03-04 Thread Jishnu Prathap
14/06/19 15:03:36 WARN LoadSnappy: Snappy native library not loaded 

The problem is Snappy library is not loaded in the workers. This is because
you would have written the system.loadlibrary outside map function which is
not shipped to the workers.  

Regards
Jishnu Prathap



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Parallel-execution-of-JavaDStream-JavaPairDStream-tp7961p21904.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Why can't Spark Streaming recover from the checkpoint directory when using a third party library for processingmulti-line JSON?

2015-03-04 Thread Emre Sevinc
I've also tried the following:

Configuration hadoopConfiguration = new Configuration();
hadoopConfiguration.set(multilinejsoninputformat.member, itemSet);

JavaStreamingContext ssc =
JavaStreamingContext.getOrCreate(checkpointDirectory, hadoopConfiguration,
factory, false);


but I still get the same exception.

Why doesn't getOrCreate ignore that Hadoop configuration part (which
normally works, e.g. when not recovering)?

--
Emre


On Tue, Mar 3, 2015 at 3:36 PM, Emre Sevinc emre.sev...@gmail.com wrote:

 Hello,

 I have a Spark Streaming application (that uses Spark 1.2.1) that listens
 to an input directory, and when new JSON files are copied to that directory
 processes them, and writes them to an output directory.

 It uses a 3rd party library to process the multi-line JSON files (
 https://github.com/alexholmes/json-mapreduce). You can see the relevant
 part of the streaming application at:

   https://gist.github.com/emres/ec18ee264e4eb0dd8f1a

 When I run this application locally, it works perfectly fine. But then I
 wanted to test whether it could recover from failure, e.g. if I stopped it
 right in the middle of processing some files. I started the streaming
 application, copied 100 files to the input directory, and hit Ctrl+C when
 it has alread processed about 50 files:

 ...
 2015-03-03 15:06:20 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-03-03 15:06:20 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-03-03 15:06:20 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 [Stage
 0:==
 (65 + 4) / 100]
 ^C

 Then I started the application again, expecting that it could recover from
 the checkpoint. For a while it started to read files again and then gave an
 exception:

 ...
 2015-03-03 15:06:39 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-03-03 15:06:39 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-03-03 15:06:39 WARN  SchemaValidatorDriver:145 -
  * * * hadoopConfiguration: itemSet
 2015-03-03 15:06:40 ERROR Executor:96 - Exception in task 0.0 in stage 0.0
 (TID 0)
 java.io.IOException: Missing configuration value for
 multilinejsoninputformat.member
 at
 com.alexholmes.json.mapreduce.MultiLineJsonInputFormat.createRecordReader(MultiLineJsonInputFormat.java:30)
 at
 org.apache.spark.rdd.NewHadoopRDD$$anon$1.init(NewHadoopRDD.scala:133)
 at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:107)
 at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
 at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)

 Since in the exception it refers to a missing configuration
 multilinejsoninputformat.member, I think it is about the following line:

ssc.ssc().sc().hadoopConfiguration().set(
 multilinejsoninputformat.member, itemSet);

 And this is why I also log the value of it, and as you can see above, just
 before it gives the exception in the recovery process, it shows that 
 multilinejsoninputformat.member
 is set to itemSet. But somehow it is not found during the recovery.
 This exception happens only when it tries to recover from a previously
 interrupted run.

 I've also tried moving the above line into the createContext method, but
 still had the same exception.

 Why is that?

 And how can I work around it?

 --
 Emre Sevinç
 http://www.bigindustries.be/




-- 
Emre Sevinc


Re: spark.local.dir leads to Job cancelled because SparkContext was shut down

2015-03-04 Thread Akhil Das
When you say multiple directories, make sure those directories are
available and spark have permission to write to those directories. You can
look at the worker logs to see the exact reason of failure.

Thanks
Best Regards

On Tue, Mar 3, 2015 at 6:45 PM, lisendong lisend...@163.com wrote:

 As long as I set the spark.local.dir to multiple disks, the job will
 failed, the errors are as follow:
 (if I set the spark.local.dir to only 1 dir, the job will succed...)

 Exception in thread main org.apache.spark.SparkException: Job cancelled
 because SparkContext was shut down
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:639)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:638)
 at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
 at

 org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:638)
 at

 org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1215)
 at

 akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:201)
 at
 akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163)
 at akka.actor.ActorCell.terminate(ActorCell.scala:338)
 at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:431)
 at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
 at
 akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:240)
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 at

 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at

 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at

 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/spark-local-dir-leads-to-Job-cancelled-because-SparkContext-was-shut-down-tp21894.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Running Spark jobs via oozie

2015-03-04 Thread Felix C
We have gotten it to work...

--- Original Message ---

From: nitinkak001 nitinkak...@gmail.com
Sent: March 3, 2015 7:46 AM
To: user@spark.apache.org
Subject: Re: Running Spark jobs via oozie

I am also starting to work on this one. Did you get any solution to this
issue?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Running-Spark-jobs-via-oozie-tp5187p21896.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Connecting a PHP/Java applications to Spark SQL Thrift Server

2015-03-04 Thread أنس الليثي
Thanks very much, I used it and works fine with me.



On 4 March 2015 at 11:56, Arush Kharbanda ar...@sigmoidanalytics.com
wrote:

 For java You can use hive-jdbc connectivity jars to connect to Spark-SQL.

 The driver is inside the hive-jdbc Jar.

 *http://hive.apache.org/javadocs/r0.11.0/api/org/apache/hadoop/hive/jdbc/HiveDriver.html
 http://hive.apache.org/javadocs/r0.11.0/api/org/apache/hadoop/hive/jdbc/HiveDriver.html*




 On Wed, Mar 4, 2015 at 1:26 PM, n...@reactor8.com wrote:

 SparkSQL supports JDBC/ODBC connectivity, so if that's the route you
 needed/wanted to connect through you could do so via java/php apps.
 Havent
 used either so cant speak to the developer experience, assume its pretty
 good as would be preferred method for lots of third party enterprise
 apps/tooling

 If you prefer using the thrift server/interface, if they don't exist
 already
 in open source land you can use thrift definitions to generate client libs
 in any supported thrift language and use that for connectivity.  Seems one
 issue with thrift-server is when running in cluster mode.  Seems like it
 still exists but UX of error has been cleaned up in 1.3:

 https://issues.apache.org/jira/browse/SPARK-5176



 -Original Message-
 From: fanooos [mailto:dev.fano...@gmail.com]
 Sent: Tuesday, March 3, 2015 11:15 PM
 To: user@spark.apache.org
 Subject: Connecting a PHP/Java applications to Spark SQL Thrift Server

 We have installed hadoop cluster with hive and spark and the spark sql
 thrift server is up and running without any problem.

 Now we have set of applications need to use spark sql thrift server to
 query
 some data.

 Some of these applications are java applications and the others are PHP
 applications.

 As I am an old fashioned java developer, I used to connect java
 applications
 to BD servers like Mysql using a JDBC driver. Is there a corresponding
 driver for connecting with Spark Sql Thrift server ? Or what is the
 library
 I need to use to connect to it?


 For PHP, what are the ways we can use to connect PHP applications to Spark
 Sql Thrift Server?





 --
 View this message in context:

 http://apache-spark-user-list.1001560.n3.nabble.com/Connecting-a-PHP-Java-ap
 plications-to-Spark-SQL-Thrift-Server-tp21902.html
 http://apache-spark-user-list.1001560.n3.nabble.com/Connecting-a-PHP-Java-applications-to-Spark-SQL-Thrift-Server-tp21902.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
 commands, e-mail: user-h...@spark.apache.org



 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




 --

 [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com

 *Arush Kharbanda* || Technical Teamlead

 ar...@sigmoidanalytics.com || www.sigmoidanalytics.com




-- 
Anas Rabei
Senior Software Developer
Mubasher.info
anas.ra...@mubasher.info


spark master shut down suddenly

2015-03-04 Thread lisendong
15/03/04 09:26:36 INFO ClientCnxn: Client session timed out, have not heard
from server in 26679ms for sessionid 0x34bbf3313a8001b, closing socket
connection and attempting reconnect
15/03/04 09:26:36 INFO ConnectionStateManager: State change: SUSPENDED
15/03/04 09:26:36 INFO ZooKeeperLeaderElectionAgent: We have lost leadership
15/03/04 09:26:36 ERROR Master: Leadership has been revoked -- master
shutting down.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-master-shut-down-suddenly-tp21907.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark master shut down suddenly

2015-03-04 Thread Akhil Das
You can check in the mesos logs and see whats really happening.

Thanks
Best Regards

On Wed, Mar 4, 2015 at 3:10 PM, lisendong lisend...@163.com wrote:

 15/03/04 09:26:36 INFO ClientCnxn: Client session timed out, have not heard
 from server in 26679ms for sessionid 0x34bbf3313a8001b, closing socket
 connection and attempting reconnect
 15/03/04 09:26:36 INFO ConnectionStateManager: State change: SUSPENDED
 15/03/04 09:26:36 INFO ZooKeeperLeaderElectionAgent: We have lost
 leadership
 15/03/04 09:26:36 ERROR Master: Leadership has been revoked -- master
 shutting down.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/spark-master-shut-down-suddenly-tp21907.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Why can't Spark Streaming recover from the checkpoint directory when using a third party library for processingmulti-line JSON?

2015-03-04 Thread Tathagata Das
That could be a corner case bug. How do you add the 3rd party library to
the class path of the driver? Through spark-submit? Could you give the
command you used?

TD

On Wed, Mar 4, 2015 at 12:42 AM, Emre Sevinc emre.sev...@gmail.com wrote:

 I've also tried the following:

 Configuration hadoopConfiguration = new Configuration();
 hadoopConfiguration.set(multilinejsoninputformat.member, itemSet);

 JavaStreamingContext ssc =
 JavaStreamingContext.getOrCreate(checkpointDirectory, hadoopConfiguration,
 factory, false);


 but I still get the same exception.

 Why doesn't getOrCreate ignore that Hadoop configuration part (which
 normally works, e.g. when not recovering)?

 --
 Emre


 On Tue, Mar 3, 2015 at 3:36 PM, Emre Sevinc emre.sev...@gmail.com wrote:

 Hello,

 I have a Spark Streaming application (that uses Spark 1.2.1) that listens
 to an input directory, and when new JSON files are copied to that directory
 processes them, and writes them to an output directory.

 It uses a 3rd party library to process the multi-line JSON files (
 https://github.com/alexholmes/json-mapreduce). You can see the relevant
 part of the streaming application at:

   https://gist.github.com/emres/ec18ee264e4eb0dd8f1a

 When I run this application locally, it works perfectly fine. But then I
 wanted to test whether it could recover from failure, e.g. if I stopped it
 right in the middle of processing some files. I started the streaming
 application, copied 100 files to the input directory, and hit Ctrl+C when
 it has alread processed about 50 files:

 ...
 2015-03-03 15:06:20 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-03-03 15:06:20 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-03-03 15:06:20 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 [Stage
 0:==
 (65 + 4) / 100]
 ^C

 Then I started the application again, expecting that it could recover
 from the checkpoint. For a while it started to read files again and then
 gave an exception:

 ...
 2015-03-03 15:06:39 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-03-03 15:06:39 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-03-03 15:06:39 WARN  SchemaValidatorDriver:145 -
  * * * hadoopConfiguration: itemSet
 2015-03-03 15:06:40 ERROR Executor:96 - Exception in task 0.0 in stage
 0.0 (TID 0)
 java.io.IOException: Missing configuration value for
 multilinejsoninputformat.member
 at
 com.alexholmes.json.mapreduce.MultiLineJsonInputFormat.createRecordReader(MultiLineJsonInputFormat.java:30)
 at
 org.apache.spark.rdd.NewHadoopRDD$$anon$1.init(NewHadoopRDD.scala:133)
 at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:107)
 at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
 at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)

 Since in the exception it refers to a missing configuration
 multilinejsoninputformat.member, I think it is about the following line:

ssc.ssc().sc().hadoopConfiguration().set(
 multilinejsoninputformat.member, itemSet);

 And this is why I also log the value of it, and as you can see above,
 just before it gives the exception in the recovery process, it shows that 
 multilinejsoninputformat.member
 is set to itemSet. But somehow it is not found during the recovery.
 This exception happens only when it tries to recover from a previously
 interrupted run.

 I've also tried moving the above line into the createContext method,
 but still had the same exception.

 Why is that?

 And how can I work around it?

 --
 Emre Sevinç
 http://www.bigindustries.be/




 

Re: delay between removing the block manager of an executor, and marking that as lost

2015-03-04 Thread Akhil Das
You can look at the following

- spark.akka.timeout
- spark.akka.heartbeat.pauses

from http://spark.apache.org/docs/1.2.0/configuration.html

Thanks
Best Regards

On Tue, Mar 3, 2015 at 4:46 PM, twinkle sachdeva twinkle.sachd...@gmail.com
 wrote:

 Hi,

 Is there any relation between removing block manager of an executor and
 marking that as lost?

 In my setup,even after removing block manager ( after failing to do some
 operation )...it is taking more than 20 mins, to mark that as lost executor.

 Following are the logs:

 *15/03/03 10:26:49 WARN storage.BlockManagerMaster: Failed to remove
 broadcast 20 with removeFromMaster = true - Ask timed out on
 [Actor[akka.tcp://sparkExecutor@TMO-DN73:54363/user/BlockManagerActor1#-966525686]]
 after [3 ms]}*

 *15/03/03 10:27:41 WARN storage.BlockManagerMasterActor: Removing
 BlockManager BlockManagerId(1, TMO-DN73, 4) with no recent heart beats:
 76924ms exceeds 45000ms*

 *15/03/03 10:27:41 INFO storage.BlockManagerMasterActor: Removing block
 manager BlockManagerId(1, TMO-DN73, 4)*

 *15/03/03 10:49:10 ERROR cluster.YarnClusterScheduler: Lost executor 1 on
 TMO-DN73: remote Akka client disassociated*

 How can i make this to happen faster?

 Thanks,
 Twinkle



Re: Is the RDD's Partitions determined before hand ?

2015-03-04 Thread Sean Owen
Hm, what do you mean? You can control, to some extent, the number of
partitions when you read the data, and can repartition if needed.

You can set the default parallelism too so that it takes effect for most
ops thay create an RDD. One # of partitions is usually about right for all
work (2x or so the number of execution slots).

If you know a stage needs unusually high parallelism for example you can
repartition further for that stage.
 On Mar 4, 2015 1:50 AM, Jeff Zhang zjf...@gmail.com wrote:

 Thanks Sean.

 But if the partitions of RDD is determined before hand, it would not be
 flexible to run the same program on the different dataset. Although for the
 first stage the partitions can be determined by the input data set, for the
 intermediate stage it is not possible. Users have to create policy to
 repartition or coalesce based on the data set size.


 On Tue, Mar 3, 2015 at 6:29 PM, Sean Owen so...@cloudera.com wrote:

 An RDD has a certain fixed number of partitions, yes. You can't change
 an RDD. You can repartition() or coalese() and RDD to make a new one
 with a different number of RDDs, possibly requiring a shuffle.

 On Tue, Mar 3, 2015 at 10:21 AM, Jeff Zhang zjf...@gmail.com wrote:
  I mean is it possible to change the partition number at runtime. Thanks
 
 
  --
  Best Regards
 
  Jeff Zhang




 --
 Best Regards

 Jeff Zhang



Spark Streaming and SchemaRDD usage

2015-03-04 Thread Haopu Wang
Hi, in the roadmap of Spark in 2015 (link:
http://files.meetup.com/3138542/Spark%20in%202015%20Talk%20-%20Wendell.p
ptx), I saw SchemaRDD is designed to be the basis of BOTH Spark
Streaming and Spark SQL.

My question is: what's the typical usage of SchemaRDD in a Spark
Streaming application? Thank you very much!


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Is FileInputDStream returned by fileStream method a reliable receiver?

2015-03-04 Thread Tathagata Das
The file stream does not use receiver. May be that was not clear in the
programming guide. I am updating it for 1.3 release right now, I will make
it more clear.
And file stream has full reliability. Read this in the programming guide.
http://spark.apache.org/docs/latest/streaming-programming-guide.html#semantics-with-files-as-input-source

On Wed, Mar 4, 2015 at 2:14 AM, Emre Sevinc emre.sev...@gmail.com wrote:

 Is FileInputDStream returned by fileStream method a reliable receiver?

 In the Spark Streaming Guide it says:

   There can be two kinds of data sources based on their *reliability*.
 Sources (like Kafka and Flume) allow the transferred data to be
 acknowledged. If the system receiving data from these *reliable* sources
 acknowledge the received data correctly, it can be ensured that no data
 gets lost due to any kind of failure. This leads to two kinds of receivers.

1. *Reliable Receiver* - A *reliable receiver* correctly acknowledges
a reliable source that the data has been received and stored in Spark with
replication.
2. *Unreliable Receiver* - These are receivers for sources that do not
support acknowledging. Even for reliable sources, one may implement an
unreliable receiver that do not go into the complexity of acknowledging
correctly.


 So I wonder whether the receivers for HDFS (and local file system) are
 reliable, e.g. when I'm using fileStream method to process files in a
 directory locally or on HDFS?


 --
 Emre Sevinç



Spark RDD Python, Numpy Shape command

2015-03-04 Thread rui li
I am a beginner to Spark, having some simple questions regarding the use of
RDD in python.

Suppose I have a matrix called data_matrix, I pass it to RDD using

RDD_matrix = sc.parallelize(data_matrix)

but I will have a problem if I want to know the dimension of the matrix in
Spark, because Sparkk RDD does not know the Python (Numpy package) command
shape

In this case, how should I deal with it?

In general, do I need to translate all my piece of Python code in RDD
acceptable syntax, so that my Python program can run using Pyspark?

Thanks in advance for any helps!

Best

Rui


scala.Double vs java.lang.Double in RDD

2015-03-04 Thread Tobias Pfeiffer
Hi,

I have a function with signature

  def aggFun1(rdd: RDD[(Long, (Long, Double))]):
RDD[(Long, Any)]

and one with

  def aggFun2[_Key: ClassTag, _Index](rdd: RDD[(_Key, (_Index, Double))]):
RDD[(_Key, Double)]

where all Double classes involved are scala.Double classes (according
to IDEA) and my implementation of aggFun1 is just calling aggFun2 (type
parameters _Key and _Index are inferred by the Scala compiler).

Now I am writing a test as follows:

  val result: Map[Long, Any] = aggFun1(input).collect().toMap
  result.values.foreach(v = println(v.getClass))
  result.values.foreach(_ shouldBe a[Double])

and I get the following output:

  class java.lang.Double
  class java.lang.Double
  [info] avg
  [info] - should compute the average *** FAILED ***
  [info]   1.75 was not an instance of double, but an instance of
java.lang.Double

So I am wondering about what magic is going on here. Are scala.Double
values in RDDs automatically converted to java.lang.Doubles or am I just
missing the implicit back-conversion etc.?

Any help appreciated,
Tobias


Is FileInputDStream returned by fileStream method a reliable receiver?

2015-03-04 Thread Emre Sevinc
Is FileInputDStream returned by fileStream method a reliable receiver?

In the Spark Streaming Guide it says:

  There can be two kinds of data sources based on their *reliability*.
Sources (like Kafka and Flume) allow the transferred data to be
acknowledged. If the system receiving data from these *reliable* sources
acknowledge the received data correctly, it can be ensured that no data
gets lost due to any kind of failure. This leads to two kinds of receivers.

   1. *Reliable Receiver* - A *reliable receiver* correctly acknowledges a
   reliable source that the data has been received and stored in Spark with
   replication.
   2. *Unreliable Receiver* - These are receivers for sources that do not
   support acknowledging. Even for reliable sources, one may implement an
   unreliable receiver that do not go into the complexity of acknowledging
   correctly.


So I wonder whether the receivers for HDFS (and local file system) are
reliable, e.g. when I'm using fileStream method to process files in a
directory locally or on HDFS?


-- 
Emre Sevinç


Re: Is the RDD's Partitions determined before hand ?

2015-03-04 Thread Jeff Zhang
Hi Sean,

  If you know a stage needs unusually high parallelism for example you can
repartition further for that stage.

The problem is we may don't know whether high parallelism is needed. e.g.
for the join operator, high parallelism may only be necessary for some
dataset that lots of data can join together while for other dataset high
parallelism may not be necessary if only a few data can join together.

So my question is that unable changing parallelism at runtime dynamically
may not be flexible.



On Wed, Mar 4, 2015 at 5:36 PM, Sean Owen so...@cloudera.com wrote:

 Hm, what do you mean? You can control, to some extent, the number of
 partitions when you read the data, and can repartition if needed.

 You can set the default parallelism too so that it takes effect for most
 ops thay create an RDD. One # of partitions is usually about right for all
 work (2x or so the number of execution slots).

 If you know a stage needs unusually high parallelism for example you can
 repartition further for that stage.
  On Mar 4, 2015 1:50 AM, Jeff Zhang zjf...@gmail.com wrote:

 Thanks Sean.

 But if the partitions of RDD is determined before hand, it would not be
 flexible to run the same program on the different dataset. Although for the
 first stage the partitions can be determined by the input data set, for the
 intermediate stage it is not possible. Users have to create policy to
 repartition or coalesce based on the data set size.


 On Tue, Mar 3, 2015 at 6:29 PM, Sean Owen so...@cloudera.com wrote:

 An RDD has a certain fixed number of partitions, yes. You can't change
 an RDD. You can repartition() or coalese() and RDD to make a new one
 with a different number of RDDs, possibly requiring a shuffle.

 On Tue, Mar 3, 2015 at 10:21 AM, Jeff Zhang zjf...@gmail.com wrote:
  I mean is it possible to change the partition number at runtime. Thanks
 
 
  --
  Best Regards
 
  Jeff Zhang




 --
 Best Regards

 Jeff Zhang




-- 
Best Regards

Jeff Zhang


Re: Why can't Spark Streaming recover from the checkpoint directory when using a third party library for processingmulti-line JSON?

2015-03-04 Thread Emre Sevinc
I'm adding this 3rd party library to my Maven pom.xml file so that it's
embedded into the JAR I send to spark-submit:

  dependency
  groupIdjson-mapreduce/groupId
  artifactIdjson-mapreduce/artifactId
  version1.0-SNAPSHOT/version
  exclusions
exclusion
  groupIdjavax.servlet/groupId
  artifactId*/artifactId
/exclusion
exclusion
  groupIdcommons-io/groupId
  artifactId*/artifactId
/exclusion
exclusion
  groupIdcommons-lang/groupId
  artifactId*/artifactId
/exclusion
exclusion
  groupIdorg.apache.hadoop/groupId
  artifactIdhadoop-common/artifactId
/exclusion
  /exclusions
/dependency


Then I build my über JAR, and then I run my Spark Streaming application via
the command line:

 spark-submit --class com.example.schemavalidator.SchemaValidatorDriver
--master local[4] --deploy-mode client target/myapp-1.0-SNAPSHOT.jar

--
Emre Sevinç


On Wed, Mar 4, 2015 at 11:19 AM, Tathagata Das t...@databricks.com wrote:

 That could be a corner case bug. How do you add the 3rd party library to
 the class path of the driver? Through spark-submit? Could you give the
 command you used?

 TD

 On Wed, Mar 4, 2015 at 12:42 AM, Emre Sevinc emre.sev...@gmail.com
 wrote:

 I've also tried the following:

 Configuration hadoopConfiguration = new Configuration();
 hadoopConfiguration.set(multilinejsoninputformat.member, itemSet);

 JavaStreamingContext ssc =
 JavaStreamingContext.getOrCreate(checkpointDirectory, hadoopConfiguration,
 factory, false);


 but I still get the same exception.

 Why doesn't getOrCreate ignore that Hadoop configuration part (which
 normally works, e.g. when not recovering)?

 --
 Emre


 On Tue, Mar 3, 2015 at 3:36 PM, Emre Sevinc emre.sev...@gmail.com
 wrote:

 Hello,

 I have a Spark Streaming application (that uses Spark 1.2.1) that
 listens to an input directory, and when new JSON files are copied to that
 directory processes them, and writes them to an output directory.

 It uses a 3rd party library to process the multi-line JSON files (
 https://github.com/alexholmes/json-mapreduce). You can see the relevant
 part of the streaming application at:

   https://gist.github.com/emres/ec18ee264e4eb0dd8f1a

 When I run this application locally, it works perfectly fine. But then I
 wanted to test whether it could recover from failure, e.g. if I stopped it
 right in the middle of processing some files. I started the streaming
 application, copied 100 files to the input directory, and hit Ctrl+C when
 it has alread processed about 50 files:

 ...
 2015-03-03 15:06:20 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-03-03 15:06:20 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-03-03 15:06:20 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 [Stage
 0:==
 (65 + 4) / 100]
 ^C

 Then I started the application again, expecting that it could recover
 from the checkpoint. For a while it started to read files again and then
 gave an exception:

 ...
 2015-03-03 15:06:39 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-03-03 15:06:39 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-03-03 15:06:39 WARN  SchemaValidatorDriver:145 -
  * * * hadoopConfiguration: itemSet
 2015-03-03 15:06:40 ERROR Executor:96 - Exception in task 0.0 in stage
 0.0 (TID 0)
 java.io.IOException: Missing configuration value for
 multilinejsoninputformat.member
 at
 com.alexholmes.json.mapreduce.MultiLineJsonInputFormat.createRecordReader(MultiLineJsonInputFormat.java:30)
 at
 org.apache.spark.rdd.NewHadoopRDD$$anon$1.init(NewHadoopRDD.scala:133)
 at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:107)
 at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
 at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
 at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
 at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at 

Re:

2015-03-04 Thread Akhil Das
You may look at https://issues.apache.org/jira/browse/SPARK-4516

Thanks
Best Regards

On Wed, Mar 4, 2015 at 12:25 AM, Jianshi Huang jianshi.hu...@gmail.com
wrote:

 Hi,

 I got this error message:

 15/03/03 10:22:41 ERROR OneForOneBlockFetcher: Failed while starting block
 fetches
 java.lang.RuntimeException: java.io.FileNotFoundException:
 /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index
 (No such file or directory)
 at java.io.FileInputStream.open(Native Method)
 at java.io.FileInputStream.init(FileInputStream.java:146)
 at
 org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:109)
 at
 org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:305)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at
 scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)


 And then for the same index file and executor, I got the following errors
 multiple times

 15/03/03 10:22:41 ERROR ShuffleBlockFetcherIterator: Failed to get
 block(s) from host-:39534
 java.lang.RuntimeException: java.io.FileNotFoundException:
 /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index
 (No such file or directory)

 15/03/03 10:22:41 ERROR RetryingBlockFetcher: Failed to fetch block
 shuffle_0_13_1228, and will not retry (0 retries)
 java.lang.RuntimeException: java.io.FileNotFoundException:
 /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index
 (No such file or directory)

 ...
 Caused by: java.net.ConnectException: Connection refused: host-


 What's the problem?

 BTW, I'm using Spark 1.2.1-SNAPSHOT I built around Dec. 20. Is there any
 bug fixes related to shuffle block fetching or index files after that?


 Thanks,
 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/



Re: Connecting a PHP/Java applications to Spark SQL Thrift Server

2015-03-04 Thread Arush Kharbanda
For java You can use hive-jdbc connectivity jars to connect to Spark-SQL.

The driver is inside the hive-jdbc Jar.

*http://hive.apache.org/javadocs/r0.11.0/api/org/apache/hadoop/hive/jdbc/HiveDriver.html
http://hive.apache.org/javadocs/r0.11.0/api/org/apache/hadoop/hive/jdbc/HiveDriver.html*




On Wed, Mar 4, 2015 at 1:26 PM, n...@reactor8.com wrote:

 SparkSQL supports JDBC/ODBC connectivity, so if that's the route you
 needed/wanted to connect through you could do so via java/php apps.  Havent
 used either so cant speak to the developer experience, assume its pretty
 good as would be preferred method for lots of third party enterprise
 apps/tooling

 If you prefer using the thrift server/interface, if they don't exist
 already
 in open source land you can use thrift definitions to generate client libs
 in any supported thrift language and use that for connectivity.  Seems one
 issue with thrift-server is when running in cluster mode.  Seems like it
 still exists but UX of error has been cleaned up in 1.3:

 https://issues.apache.org/jira/browse/SPARK-5176



 -Original Message-
 From: fanooos [mailto:dev.fano...@gmail.com]
 Sent: Tuesday, March 3, 2015 11:15 PM
 To: user@spark.apache.org
 Subject: Connecting a PHP/Java applications to Spark SQL Thrift Server

 We have installed hadoop cluster with hive and spark and the spark sql
 thrift server is up and running without any problem.

 Now we have set of applications need to use spark sql thrift server to
 query
 some data.

 Some of these applications are java applications and the others are PHP
 applications.

 As I am an old fashioned java developer, I used to connect java
 applications
 to BD servers like Mysql using a JDBC driver. Is there a corresponding
 driver for connecting with Spark Sql Thrift server ? Or what is the library
 I need to use to connect to it?


 For PHP, what are the ways we can use to connect PHP applications to Spark
 Sql Thrift Server?





 --
 View this message in context:

 http://apache-spark-user-list.1001560.n3.nabble.com/Connecting-a-PHP-Java-ap
 plications-to-Spark-SQL-Thrift-Server-tp21902.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
 commands, e-mail: user-h...@spark.apache.org



 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 

[image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com

*Arush Kharbanda* || Technical Teamlead

ar...@sigmoidanalytics.com || www.sigmoidanalytics.com


Re: Unable to submit spark job to mesos cluster

2015-03-04 Thread Sarath Chandra
From the lines pointed in the exception log, I figured out that my code is
unable to get the spark context. To isolate the problem, I've written a
small code as below -

*import org.apache.spark.SparkConf;*
*import org.apache.spark.SparkContext;*

*public class Test {*
*public static void main(String[] args) throws Exception {*
*SparkConf sparkConf = new
SparkConf().setMaster(mesos://node2.algofusiontech.com:5050
http://node2.algofusiontech.com:5050).setAppName(test);*
*SparkContext context = new SparkContext(sparkConf);*
*}*
*}*

When I run this code as -  *java -cp .:/opt/cloudera/parcels/CDH/jars/*
Test*
I'm getting the below exception dump. Please help.

*1[sparkDriver-akka.actor.default-dispatcher-2] ERROR
akka.actor.ActorSystemImpl  - Uncaught fatal error from thread
[sparkDriver-akka.actor.default-dispatcher-4] shutting down ActorSystem
[sparkDriver]*
*java.lang.NoSuchMethodError:
org.jboss.netty.channel.socket.nio.NioWorkerPool.init(Ljava/util/concurrent/Executor;I)V*
* at
akka.remote.transport.netty.NettyTransport.init(NettyTransport.scala:282)*
* at
akka.remote.transport.netty.NettyTransport.init(NettyTransport.scala:239)*
* at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)*
* at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)*
* at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)*
* at java.lang.reflect.Constructor.newInstance(Constructor.java:526)*
* at
akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78)*
* at scala.util.Try$.apply(Try.scala:161)*
* at
akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73)*
* at
akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)*
* at
akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)*
* at scala.util.Success.flatMap(Try.scala:200)*
* at
akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84)*
* at akka.remote.EndpointManager$$anonfun$8.apply(Remoting.scala:618)*
* at akka.remote.EndpointManager$$anonfun$8.apply(Remoting.scala:610)*
* at
scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722)*
* at scala.collection.Iterator$class.foreach(Iterator.scala:727)*
* at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)*
* at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)*
* at scala.collection.AbstractIterable.foreach(Iterable.scala:54)*
* at
scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)*
* at
akka.remote.EndpointManager.akka$remote$EndpointManager$$listens(Remoting.scala:610)*
* at
akka.remote.EndpointManager$$anonfun$receive$2.applyOrElse(Remoting.scala:450)*
* at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)*
* at akka.actor.ActorCell.invoke(ActorCell.scala:456)*
* at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)*
* at akka.dispatch.Mailbox.run(Mailbox.scala:219)*
* at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)*
* at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)*
* at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)*
* at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)*
* at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)*
*[ERROR] [03/04/2015 17:13:23.745] [main] [Remoting] Remoting error:
[Startup timed out] [*
*akka.remote.RemoteTransportException: Startup timed out*
* at
akka.remote.Remoting.akka$remote$Remoting$$notifyError(Remoting.scala:129)*
* at akka.remote.Remoting.start(Remoting.scala:191)*
* at
akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)*
* at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:579)*
* at akka.actor.ActorSystemImpl._start(ActorSystem.scala:577)*
* at akka.actor.ActorSystemImpl.start(ActorSystem.scala:588)*
* at akka.actor.ActorSystem$.apply(ActorSystem.scala:111)*
* at akka.actor.ActorSystem$.apply(ActorSystem.scala:104)*
* at
org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121)*
* at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54)*
* at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53)*
* at
org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1454)*
* at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)*
* at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1450)*
* at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:56)*
* at org.apache.spark.SparkEnv$.create(SparkEnv.scala:156)*
* at org.apache.spark.SparkContext.init(SparkContext.scala:203)*
* at Test.main(Test.java:7)*
*Caused by: 

Re: Is the RDD's Partitions determined before hand ?

2015-03-04 Thread Sean Owen
Parallelism doesn't really affect the throughput as long as it's:

- not less than the number of available execution slots,
- ... and probably some low multiple of them to even out task size effects
- not so high that the bookkeeping overhead dominates

Although you may need to select different scales of parallelism for
different stages (like a join), you shouldn't in general have to
change it according to data size.

However you could count the input size and make parallelism some
function of that if you found that was consistently better.

The one exception are operations that tend to pull data into memory.
You may need more parallelism as scale increases to keep in-memory
data size small enough. There again you usually just err on the side
of 'too much' parallelism, or avoid patterns that can pull a lot of
data into memory, but this is usually the pain point if there is one.

The problem I run into when thinking about this is that I don't think
Spark can do much better, since it doesn't have the info above needed
to decide these things in general. The calling program has to tell it.

On Wed, Mar 4, 2015 at 10:17 AM, Jeff Zhang zjf...@gmail.com wrote:
 Hi Sean,

   If you know a stage needs unusually high parallelism for example you can
 repartition further for that stage.

 The problem is we may don't know whether high parallelism is needed. e.g.
 for the join operator, high parallelism may only be necessary for some
 dataset that lots of data can join together while for other dataset high
 parallelism may not be necessary if only a few data can join together.

 So my question is that unable changing parallelism at runtime dynamically
 may not be flexible.



 On Wed, Mar 4, 2015 at 5:36 PM, Sean Owen so...@cloudera.com wrote:

 Hm, what do you mean? You can control, to some extent, the number of
 partitions when you read the data, and can repartition if needed.

 You can set the default parallelism too so that it takes effect for most
 ops thay create an RDD. One # of partitions is usually about right for all
 work (2x or so the number of execution slots).

 If you know a stage needs unusually high parallelism for example you can
 repartition further for that stage.

 On Mar 4, 2015 1:50 AM, Jeff Zhang zjf...@gmail.com wrote:

 Thanks Sean.

 But if the partitions of RDD is determined before hand, it would not be
 flexible to run the same program on the different dataset. Although for the
 first stage the partitions can be determined by the input data set, for the
 intermediate stage it is not possible. Users have to create policy to
 repartition or coalesce based on the data set size.


 On Tue, Mar 3, 2015 at 6:29 PM, Sean Owen so...@cloudera.com wrote:

 An RDD has a certain fixed number of partitions, yes. You can't change
 an RDD. You can repartition() or coalese() and RDD to make a new one
 with a different number of RDDs, possibly requiring a shuffle.

 On Tue, Mar 3, 2015 at 10:21 AM, Jeff Zhang zjf...@gmail.com wrote:
  I mean is it possible to change the partition number at runtime.
  Thanks
 
 
  --
  Best Regards
 
  Jeff Zhang




 --
 Best Regards

 Jeff Zhang




 --
 Best Regards

 Jeff Zhang

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Does SparkSQL support ..... having count (fieldname) in SQL statement?

2015-03-04 Thread shahab
Hi,

It seems that SparkSQL, even the HiveContext, does not support SQL
statements like :   SELECT category, count(1) AS cnt FROM products GROUP BY
category HAVING cnt  10;

I get this exception:

Error: org.apache.spark.sql.catalyst.errors.package$TreeNodeException:
Unresolved attributes: CAST(('cnt  2), BooleanType), tree:


I couldn't find anywhere is documentation whether having keyword is not
supported ?
If this is the case, what would be the work around? using two nested select
statements?

best,
/Shahab


Re: insert Hive table with RDD

2015-03-04 Thread patcharee

Hi,

I guess that toDF() api in spark 1.3 which is required build from source 
code?


Patcharee

On 03. mars 2015 13:42, Cheng, Hao wrote:

Using the SchemaRDD / DataFrame API via HiveContext

Assume you're using the latest code, something probably like:

val hc = new HiveContext(sc)
import hc.implicits._
existedRdd.toDF().insertInto(hivetable)
or

existedRdd.toDF().registerTempTable(mydata)
hc.sql(insert into hivetable as select xxx from mydata)



-Original Message-
From: patcharee [mailto:patcharee.thong...@uni.no]
Sent: Tuesday, March 3, 2015 7:09 PM
To: user@spark.apache.org
Subject: insert Hive table with RDD

Hi,

How can I insert an existing hive table with an RDD containing my data?
Any examples?

Best,
Patcharee

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Unable to submit spark job to mesos cluster

2015-03-04 Thread Sarath Chandra
Hi,

I have a cluster running on CDH5.2.1 and I have a Mesos cluster (version
0.18.1). Through a Oozie java action I'm want to submit a Spark job to
mesos cluster. Before configuring it as Oozie job I'm testing the java
action from command line and getting exception as below. While running I'm
pointing the classpath to CDH Home/jars folder.

What is going wrong? Is there any additional configuration to be done which
I'm missing?

[ERROR] [03/04/2015 17:00:49.968] [main] [Remoting] Remoting error:
[Startup timed out] [
akka.remote.RemoteTransportException: Startup timed out
at
akka.remote.Remoting.akka$remote$Remoting$$notifyError(Remoting.scala:129)
at akka.remote.Remoting.start(Remoting.scala:191)
at akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)
at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:579)
at akka.actor.ActorSystemImpl._start(ActorSystem.scala:577)
at akka.actor.ActorSystemImpl.start(ActorSystem.scala:588)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:111)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:104)
at
org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121)
at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54)
at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53)
at
org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1454)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1450)
at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:56)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:156)
at org.apache.spark.SparkContext.init(SparkContext.scala:203)
at
com.algofusion.reconciliation.execution.utils.ExecutionUtils.clinit(ExecutionUtils.java:130)
at
com.algofusion.reconciliation.execution.ReconExecutionController.initialize(ReconExecutionController.java:257)
at
com.algofusion.reconciliation.execution.ReconExecutionController.main(ReconExecutionController.java:105)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after
[1 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at akka.remote.Remoting.start(Remoting.scala:173)
... 18 more
]
Exception in thread main java.lang.ExceptionInInitializerError
at
com.algofusion.reconciliation.execution.ReconExecutionController.initialize(ReconExecutionController.java:257)
at
com.algofusion.reconciliation.execution.ReconExecutionController.main(ReconExecutionController.java:105)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after
[1 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at akka.remote.Remoting.start(Remoting.scala:173)
at akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)
at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:579)
at akka.actor.ActorSystemImpl._start(ActorSystem.scala:577)
at akka.actor.ActorSystemImpl.start(ActorSystem.scala:588)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:111)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:104)
at
org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121)
at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54)
at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53)
at
org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1454)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1450)
at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:56)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:156)
at org.apache.spark.SparkContext.init(SparkContext.scala:203)
at
com.algofusion.reconciliation.execution.utils.ExecutionUtils.clinit(ExecutionUtils.java:130)
... 2 more

Regards,
Sarath.


RE: Does SparkSQL support ..... having count (fieldname) in SQL statement?

2015-03-04 Thread yana
I think the problem is that you are using an alias in the having clause. I am 
not able to try just now but see if HAVING count (*) 2 works ( ie dont use cnt)


Sent on the new Sprint Network from my Samsung Galaxy S®4.

div Original message /divdivFrom: shahab 
shahab.mok...@gmail.com /divdivDate:03/04/2015  7:22 AM  (GMT-05:00) 
/divdivTo: user@spark.apache.org /divdivSubject: Does SparkSQL support 
. having count (fieldname) in SQL statement? /divdiv
/divHi,

It seems that SparkSQL, even the HiveContext, does not support SQL statements 
like :   SELECT category, count(1) AS cnt FROM products GROUP BY category 
HAVING cnt  10;

I get this exception:

Error: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: 
Unresolved attributes: CAST(('cnt  2), BooleanType), tree:


I couldn't find anywhere is documentation whether having keyword is not 
supported ?
If this is the case, what would be the work around? using two nested select 
statements?

best,
/Shahab

Re: TreeNodeException: Unresolved attributes

2015-03-04 Thread Arush Kharbanda
Why don't you formulate a string before you pass it to the hql function
(appending strings), and hql function is deprecated. You should use sql.

http://spark.apache.org/docs/1.1.0/api/scala/index.html#org.apache.spark.sql.hive.HiveContext

On Wed, Mar 4, 2015 at 6:15 AM, Anusha Shamanur anushas...@gmail.com
wrote:

 Hi,


 I am trying to run a simple select query on a table.


 val restaurants=hiveCtx.hql(select * from TableName where column like
 '%SomeString%' )

 This gives an error as below:

 org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved
 attributes: *, tree:

 How do I solve this?


 --
 Regards,
 Anusha




-- 

[image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com

*Arush Kharbanda* || Technical Teamlead

ar...@sigmoidanalytics.com || www.sigmoidanalytics.com


Re: Unable to submit spark job to mesos cluster

2015-03-04 Thread Akhil Das
Looks like you are having 2 netty jars in the classpath.

Thanks
Best Regards

On Wed, Mar 4, 2015 at 5:14 PM, Sarath Chandra 
sarathchandra.jos...@algofusiontech.com wrote:

 From the lines pointed in the exception log, I figured out that my code is
 unable to get the spark context. To isolate the problem, I've written a
 small code as below -

 *import org.apache.spark.SparkConf;*
 *import org.apache.spark.SparkContext;*

 *public class Test {*
 *public static void main(String[] args) throws Exception {*
 *SparkConf sparkConf = new
 SparkConf().setMaster(mesos://node2.algofusiontech.com:5050
 http://node2.algofusiontech.com:5050).setAppName(test);*
 *SparkContext context = new SparkContext(sparkConf);*
 *}*
 *}*

 When I run this code as -  *java -cp .:/opt/cloudera/parcels/CDH/jars/*
 Test*
 I'm getting the below exception dump. Please help.

 *1[sparkDriver-akka.actor.default-dispatcher-2] ERROR
 akka.actor.ActorSystemImpl  - Uncaught fatal error from thread
 [sparkDriver-akka.actor.default-dispatcher-4] shutting down ActorSystem
 [sparkDriver]*
 *java.lang.NoSuchMethodError:
 org.jboss.netty.channel.socket.nio.NioWorkerPool.init(Ljava/util/concurrent/Executor;I)V*
 * at
 akka.remote.transport.netty.NettyTransport.init(NettyTransport.scala:282)*
 * at
 akka.remote.transport.netty.NettyTransport.init(NettyTransport.scala:239)*
 * at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)*
 * at
 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)*
 * at
 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)*
 * at java.lang.reflect.Constructor.newInstance(Constructor.java:526)*
 * at
 akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78)*
 * at scala.util.Try$.apply(Try.scala:161)*
 * at
 akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73)*
 * at
 akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)*
 * at
 akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)*
 * at scala.util.Success.flatMap(Try.scala:200)*
 * at
 akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84)*
 * at akka.remote.EndpointManager$$anonfun$8.apply(Remoting.scala:618)*
 * at akka.remote.EndpointManager$$anonfun$8.apply(Remoting.scala:610)*
 * at
 scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722)*
 * at scala.collection.Iterator$class.foreach(Iterator.scala:727)*
 * at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)*
 * at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)*
 * at scala.collection.AbstractIterable.foreach(Iterable.scala:54)*
 * at
 scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)*
 * at
 akka.remote.EndpointManager.akka$remote$EndpointManager$$listens(Remoting.scala:610)*
 * at
 akka.remote.EndpointManager$$anonfun$receive$2.applyOrElse(Remoting.scala:450)*
 * at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)*
 * at akka.actor.ActorCell.invoke(ActorCell.scala:456)*
 * at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)*
 * at akka.dispatch.Mailbox.run(Mailbox.scala:219)*
 * at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)*
 * at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)*
 * at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)*
 * at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)*
 * at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)*
 *[ERROR] [03/04/2015 17:13:23.745] [main] [Remoting] Remoting error:
 [Startup timed out] [*
 *akka.remote.RemoteTransportException: Startup timed out*
 * at
 akka.remote.Remoting.akka$remote$Remoting$$notifyError(Remoting.scala:129)*
 * at akka.remote.Remoting.start(Remoting.scala:191)*
 * at
 akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)*
 * at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:579)*
 * at akka.actor.ActorSystemImpl._start(ActorSystem.scala:577)*
 * at akka.actor.ActorSystemImpl.start(ActorSystem.scala:588)*
 * at akka.actor.ActorSystem$.apply(ActorSystem.scala:111)*
 * at akka.actor.ActorSystem$.apply(ActorSystem.scala:104)*
 * at
 org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121)*
 * at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54)*
 * at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53)*
 * at
 org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1454)*
 * at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)*
 * at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1450)*
 * 

Re: Unable to submit spark job to mesos cluster

2015-03-04 Thread Arush Kharbanda
You can try increasing the Akka time out in the config, you can set the
following in your config.

spark.core.connection.ack.wait.timeout: 600
spark.akka.timeout: 1000 (In secs)
spark.akka.frameSize:50

On Wed, Mar 4, 2015 at 5:14 PM, Sarath Chandra 
sarathchandra.jos...@algofusiontech.com wrote:

 From the lines pointed in the exception log, I figured out that my code is
 unable to get the spark context. To isolate the problem, I've written a
 small code as below -

 *import org.apache.spark.SparkConf;*
 *import org.apache.spark.SparkContext;*

 *public class Test {*
 *public static void main(String[] args) throws Exception {*
 *SparkConf sparkConf = new
 SparkConf().setMaster(mesos://node2.algofusiontech.com:5050
 http://node2.algofusiontech.com:5050).setAppName(test);*
 *SparkContext context = new SparkContext(sparkConf);*
 *}*
 *}*

 When I run this code as -  *java -cp .:/opt/cloudera/parcels/CDH/jars/*
 Test*
 I'm getting the below exception dump. Please help.

 *1[sparkDriver-akka.actor.default-dispatcher-2] ERROR
 akka.actor.ActorSystemImpl  - Uncaught fatal error from thread
 [sparkDriver-akka.actor.default-dispatcher-4] shutting down ActorSystem
 [sparkDriver]*
 *java.lang.NoSuchMethodError:
 org.jboss.netty.channel.socket.nio.NioWorkerPool.init(Ljava/util/concurrent/Executor;I)V*
 * at
 akka.remote.transport.netty.NettyTransport.init(NettyTransport.scala:282)*
 * at
 akka.remote.transport.netty.NettyTransport.init(NettyTransport.scala:239)*
 * at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)*
 * at
 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)*
 * at
 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)*
 * at java.lang.reflect.Constructor.newInstance(Constructor.java:526)*
 * at
 akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78)*
 * at scala.util.Try$.apply(Try.scala:161)*
 * at
 akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73)*
 * at
 akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)*
 * at
 akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)*
 * at scala.util.Success.flatMap(Try.scala:200)*
 * at
 akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84)*
 * at akka.remote.EndpointManager$$anonfun$8.apply(Remoting.scala:618)*
 * at akka.remote.EndpointManager$$anonfun$8.apply(Remoting.scala:610)*
 * at
 scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722)*
 * at scala.collection.Iterator$class.foreach(Iterator.scala:727)*
 * at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)*
 * at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)*
 * at scala.collection.AbstractIterable.foreach(Iterable.scala:54)*
 * at
 scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)*
 * at
 akka.remote.EndpointManager.akka$remote$EndpointManager$$listens(Remoting.scala:610)*
 * at
 akka.remote.EndpointManager$$anonfun$receive$2.applyOrElse(Remoting.scala:450)*
 * at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)*
 * at akka.actor.ActorCell.invoke(ActorCell.scala:456)*
 * at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)*
 * at akka.dispatch.Mailbox.run(Mailbox.scala:219)*
 * at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)*
 * at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)*
 * at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)*
 * at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)*
 * at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)*
 *[ERROR] [03/04/2015 17:13:23.745] [main] [Remoting] Remoting error:
 [Startup timed out] [*
 *akka.remote.RemoteTransportException: Startup timed out*
 * at
 akka.remote.Remoting.akka$remote$Remoting$$notifyError(Remoting.scala:129)*
 * at akka.remote.Remoting.start(Remoting.scala:191)*
 * at
 akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)*
 * at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:579)*
 * at akka.actor.ActorSystemImpl._start(ActorSystem.scala:577)*
 * at akka.actor.ActorSystemImpl.start(ActorSystem.scala:588)*
 * at akka.actor.ActorSystem$.apply(ActorSystem.scala:111)*
 * at akka.actor.ActorSystem$.apply(ActorSystem.scala:104)*
 * at
 org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121)*
 * at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54)*
 * at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53)*
 * at
 org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1454)*
 * at 

Re: Speed Benchmark

2015-03-04 Thread Guillaume Guy
Sorry for the confusion.

All are running Hadoop services. Node 1 is the namenode whereas Nodes 2 and
3 are datanodes.



Best,

Guillaume Guy

* +1 919 - 972 - 8750*

On Sat, Feb 28, 2015 at 1:09 AM, Sean Owen so...@cloudera.com wrote:

 Is machine 1 the only one running an HDFS data node? You describe it as
 one running Hadoop services.
 On Feb 27, 2015 9:44 PM, Guillaume Guy guillaume.c@gmail.com
 wrote:

 Hi Jason:

 Thanks for your feedback.

 Beside the information above I mentioned, there are 3 machines in the
 cluster.

 *1st one*: Driver + has a bunch of Hadoop services. 32GB of RAM, 8 cores
 (2 used)
 *2nd + 3rd: *16B of RAM, 4 cores (2 used each)

  I hope this helps clarify.

 Thx.

 GG



 Best,

 Guillaume Guy

 * +1 919 - 972 - 8750 %2B1%20919%20-%20972%20-%208750*

 On Fri, Feb 27, 2015 at 9:06 AM, Jason Bell jaseb...@gmail.com wrote:

 How many machines are on the cluster?
 And what is the configuration of those machines (Cores/RAM)?

 Small cluster is very subjective statement.



 Guillaume Guy wrote:

 Dear Spark users:

 I want to see if anyone has an idea of the performance for a small
 cluster.