Save and read parquet from the same path
Hi all, what would happen if I save a RDD via saveAsParquetFile to the same path that RDD is originally read from? Is that a safe thing to do in Pyspark? Thanks! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Nested Case Classes (Found and Required Same)
Did you find any other way for this issue? I just found out that i have 22 columns data set... And now i am searching for best solution. Anyone else have experienced with this problem? Best Bojan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Nested-Case-Classes-Found-and-Required-Same-tp14096p21908.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Does SparkSQL support ..... having count (fieldname) in SQL statement?
I’ve tried with latest code, seems it works, which version are you using Shahab? From: yana [mailto:yana.kadiy...@gmail.com] Sent: Wednesday, March 4, 2015 8:47 PM To: shahab; user@spark.apache.org Subject: RE: Does SparkSQL support . having count (fieldname) in SQL statement? I think the problem is that you are using an alias in the having clause. I am not able to try just now but see if HAVING count (*) 2 works ( ie dont use cnt) Sent on the new Sprint Network from my Samsung Galaxy S®4. Original message From: shahab Date:03/04/2015 7:22 AM (GMT-05:00) To: user@spark.apache.orgmailto:user@spark.apache.org Subject: Does SparkSQL support . having count (fieldname) in SQL statement? Hi, It seems that SparkSQL, even the HiveContext, does not support SQL statements like : SELECT category, count(1) AS cnt FROM products GROUP BY category HAVING cnt 10; I get this exception: Error: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: CAST(('cnt 2), BooleanType), tree: I couldn't find anywhere is documentation whether having keyword is not supported ? If this is the case, what would be the work around? using two nested select statements? best, /Shahab
Re: scala.Double vs java.lang.Double in RDD
This doesn't involve spark at all, I think this is entirely an issue with how scala deals w/ primitives and boxing. Often it can hide the details for you, but IMO it just leads to far more confusing errors when things don't work out. The issue here is that your map has value type Any, which leads scala to leave it as a boxed java.lang.Double. scala val x = 1.5 x: Double = 1.5 scala x.getClass() res0: Class[Double] = double scala x.getClass() == classOf[java.lang.Double] res1: Boolean = false scala x.getClass() == classOf[Double] res2: Boolean = true scala val arr = Array(1.5,2.5) arr: Array[Double] = Array(1.5, 2.5) scala arr.getClass().getComponentType() == x.getClass() res5: Boolean = true scala arr.getClass().getComponentType() == classOf[java.lang.Double] res6: Boolean = false //this map has java.lang.Double scala val map: Map[String, Any] = arr.map{x = x.toString - x}.toMap map: Map[String,Any] = Map(1.5 - 1.5, 2.5 - 2.5) scala map(1.5).getClass() res15: Class[_] = class java.lang.Double scala map(1.5).getClass() == x.getClass() res10: Boolean = false scala map(1.5).getClass() == classOf[java.lang.Double] res11: Boolean = true //this one has Double scala val map2: Map[String, Double] = arr.map{x = x.toString - x}.toMap map2: Map[String,Double] = Map(1.5 - 1.5, 2.5 - 2.5) scala map2(1.5).getClass() res12: Class[Double] = double scala map2(1.5).getClass() == x.getClass() res13: Boolean = true scala map2(1.5).getClass() == classOf[java.lang.Double] res14: Boolean = false On Wed, Mar 4, 2015 at 3:17 AM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, I have a function with signature def aggFun1(rdd: RDD[(Long, (Long, Double))]): RDD[(Long, Any)] and one with def aggFun2[_Key: ClassTag, _Index](rdd: RDD[(_Key, (_Index, Double))]): RDD[(_Key, Double)] where all Double classes involved are scala.Double classes (according to IDEA) and my implementation of aggFun1 is just calling aggFun2 (type parameters _Key and _Index are inferred by the Scala compiler). Now I am writing a test as follows: val result: Map[Long, Any] = aggFun1(input).collect().toMap result.values.foreach(v = println(v.getClass)) result.values.foreach(_ shouldBe a[Double]) and I get the following output: class java.lang.Double class java.lang.Double [info] avg [info] - should compute the average *** FAILED *** [info] 1.75 was not an instance of double, but an instance of java.lang.Double So I am wondering about what magic is going on here. Are scala.Double values in RDDs automatically converted to java.lang.Doubles or am I just missing the implicit back-conversion etc.? Any help appreciated, Tobias
Re: Is the RDD's Partitions determined before hand ?
You can set the number of partitions dynamically -- its just a parameter to a method, so you can compute it however you want, it doesn't need to be some static constant: val dataSizeEstimate = yourMagicFunctionToEstimateDataSize() val numberOfPartitions = yourConversionFromDataSizeToNumPartitions(dataSizeEstimate) val reducedRDD = someInputRDD.reduceByKey(f, numberOfPartitions) //or whatever else that needs to know number of partitions of course this means you need to do the work of figuring out those magic functions, but its certainly possible. I agree with all of Sean's recommendations, but I guess I might put a bit more emphasis on The one exception are operations that tend to pull data into memory. For me, I've found that to be a very important exception, that can come up a lot. And though in general a lot of partitions makes sense, there have been recent questions on the user list about folks going to far, using eg. 100K partitions and then having the bookkeeping overhead dominating. But thats a pretty big number -- you should still be able to err on the side of too many partitions w/out going that far, I'd imagine. On Wed, Mar 4, 2015 at 4:17 AM, Jeff Zhang zjf...@gmail.com wrote: Hi Sean, If you know a stage needs unusually high parallelism for example you can repartition further for that stage. The problem is we may don't know whether high parallelism is needed. e.g. for the join operator, high parallelism may only be necessary for some dataset that lots of data can join together while for other dataset high parallelism may not be necessary if only a few data can join together. So my question is that unable changing parallelism at runtime dynamically may not be flexible. On Wed, Mar 4, 2015 at 5:36 PM, Sean Owen so...@cloudera.com wrote: Hm, what do you mean? You can control, to some extent, the number of partitions when you read the data, and can repartition if needed. You can set the default parallelism too so that it takes effect for most ops thay create an RDD. One # of partitions is usually about right for all work (2x or so the number of execution slots). If you know a stage needs unusually high parallelism for example you can repartition further for that stage. On Mar 4, 2015 1:50 AM, Jeff Zhang zjf...@gmail.com wrote: Thanks Sean. But if the partitions of RDD is determined before hand, it would not be flexible to run the same program on the different dataset. Although for the first stage the partitions can be determined by the input data set, for the intermediate stage it is not possible. Users have to create policy to repartition or coalesce based on the data set size. On Tue, Mar 3, 2015 at 6:29 PM, Sean Owen so...@cloudera.com wrote: An RDD has a certain fixed number of partitions, yes. You can't change an RDD. You can repartition() or coalese() and RDD to make a new one with a different number of RDDs, possibly requiring a shuffle. On Tue, Mar 3, 2015 at 10:21 AM, Jeff Zhang zjf...@gmail.com wrote: I mean is it possible to change the partition number at runtime. Thanks -- Best Regards Jeff Zhang -- Best Regards Jeff Zhang -- Best Regards Jeff Zhang
Error communicating with MapOutputTracker
Hello, We are using spark 1.2.1 on a very large cluster (100 c3.8xlarge workers). We use spark-submit to start an application. We got the following error which leads to a failed stage: Job aborted due to stage failure: Task 3095 in stage 140.0 failed 4 times, most recent failure: Lost task 3095.3 in stage 140.0 (TID 308697, ip-10-0-12-88.ec2.internal): org.apache.spark.SparkException: Error communicating with MapOutputTracker We tried the whole application again, and it failed on the same stage (but it got more tasks completed on that stage) with the same error. We then looked at executors stderr, and all show similar logs, on both runs (see below). As far as we can tell, executors and master have disk space left. *Any suggestion on where to look to understand why the communication with the MapOutputTracker fails?* Thanks Thomas In case it matters, our akka settings: spark.akka.frameSize 50 spark.akka.threads 8 // those below are 10* the default, to cope with large GCs spark.akka.timeout 1000 spark.akka.heartbeat.pauses 6 spark.akka.failure-detector.threshold 3000.0 spark.akka.heartbeat.interval 1 Appendix: executor logs, where it starts going awry 15/03/04 11:45:00 INFO CoarseGrainedExecutorBackend: Got assigned task 298525 15/03/04 11:45:00 INFO Executor: Running task 3083.0 in stage 140.0 (TID 298525) 15/03/04 11:45:00 INFO MemoryStore: ensureFreeSpace(1473) called with curMem=5543008799, maxMem=18127202549 15/03/04 11:45:00 INFO MemoryStore: Block broadcast_339_piece0 stored as bytes in memory (estimated size 1473.0 B, free 11.7 GB) 15/03/04 11:45:00 INFO BlockManagerMaster: Updated info of block broadcast_339_piece0 15/03/04 11:45:00 INFO TorrentBroadcast: Reading broadcast variable 339 took 224 ms 15/03/04 11:45:00 INFO MemoryStore: ensureFreeSpace(2536) called with curMem=5543010272, maxMem=18127202549 15/03/04 11:45:00 INFO MemoryStore: Block broadcast_339 stored as values in memory (estimated size 2.5 KB, free 11.7 GB) 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Doing the fetch; tracker actor = Actor[akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:52380/user/MapOutputTracker#-2057016370] 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO
Re: how to save Word2VecModel
+user On Wed, Mar 4, 2015, 8:21 AM Xiangrui Meng men...@gmail.com wrote: You can use the save/load implementation in naive Bayes as reference: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala Ping me on the JIRA page to get the ticket assigned to you. Thanks, Xiangrui
Re: issue Running Spark Job on Yarn Cluster
Not yet, Please let. Me know if you found solution, Regards Sachin On 4 Mar 2015 21:45, mael2210 [via Apache Spark User List] ml-node+s1001560n21909...@n3.nabble.com wrote: Hello, I am facing the exact same issue. Could you solve the problem ? Kind regards -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/issue-Running-Spark-Job-on-Yarn-Cluster-tp21697p21909.html To unsubscribe from issue Running Spark Job on Yarn Cluster, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=21697code=c2FjaGluLnNoYXNoaUBnbWFpbC5jb218MjE2OTd8MTkyMzgyNjU3Mw== . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/issue-Running-Spark-Job-on-Yarn-Cluster-tp21697p21912.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Does SparkSQL support ..... having count (fieldname) in SQL statement?
Thanks Cheng, my problem was some misspelling problem which I just fixed, unfortunately the exception message sometimes does not pin point to exact reason. Sorry my bad. On Wed, Mar 4, 2015 at 5:02 PM, Cheng, Hao hao.ch...@intel.com wrote: I’ve tried with latest code, seems it works, which version are you using Shahab? *From:* yana [mailto:yana.kadiy...@gmail.com] *Sent:* Wednesday, March 4, 2015 8:47 PM *To:* shahab; user@spark.apache.org *Subject:* RE: Does SparkSQL support . having count (fieldname) in SQL statement? I think the problem is that you are using an alias in the having clause. I am not able to try just now but see if HAVING count (*) 2 works ( ie dont use cnt) Sent on the new Sprint Network from my Samsung Galaxy S®4. Original message From: shahab Date:03/04/2015 7:22 AM (GMT-05:00) To: user@spark.apache.org Subject: Does SparkSQL support . having count (fieldname) in SQL statement? Hi, It seems that SparkSQL, even the HiveContext, does not support SQL statements like : SELECT category, count(1) AS cnt FROM products GROUP BY category HAVING cnt 10; I get this exception: Error: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: CAST(('cnt 2), BooleanType), tree: I couldn't find anywhere is documentation whether having keyword is not supported ? If this is the case, what would be the work around? using two nested select statements? best, /Shahab
Re: GraphX path traversal
Actually your Pregel code works for me: import org.apache.spark._ import org.apache.spark.graphx._ import org.apache.spark.rdd.RDD val vertexlist = Array((1L,One), (2L,Two), (3L,Three), (4L,Four),(5L,Five),(6L,Six)) val edgelist = Array(Edge(6,5,6 to 5),Edge(5,4,5 to 4),Edge(4,3,4 to 3), Edge(3,2,3 to 2), Edge(2,1,2 to 1)) val vertices: RDD[(VertexId, String)] = sc.parallelize(vertexlist) val edges = sc.parallelize(edgelist) val graph = Graph(vertices, edges) val parentGraph = Pregel( graph.mapVertices((id, attr) = Set[VertexId]()), Set[VertexId](), Int.MaxValue, EdgeDirection.Out)( (id, attr, msg) = (msg ++ attr), edge = { if (edge.srcId != edge.dstId) { Iterator((edge.dstId, (edge.srcAttr + edge.srcId))) } else Iterator.empty }, (a, b) = (a ++ b)) parentGraph.vertices.collect.foreach(println(_)) Output: (4,Set(6, 5)) (1,Set(5, 6, 2, 3, 4)) (5,Set(6)) (6,Set()) (2,Set(6, 5, 4, 3)) (3,Set(6, 5, 4)) Maybe your data.csv has edges the wrong way round Robin On 3 Mar 2015, at 16:32, Madabhattula Rajesh Kumar mrajaf...@gmail.com wrote: Hi, I have tried below program using pergel API but I'm not able to get my required output. I'm getting exactly reverse output which I'm expecting. // Creating graph using above mail mentioned edgefile val graph: Graph[Int, Int] = GraphLoader.edgeListFile(sc, /home/rajesh/Downloads/graphdata/data.csv).cache() val parentGraph = Pregel( graph.mapVertices((id, attr) = Set[VertexId]()), Set[VertexId](), Int.MaxValue, EdgeDirection.Out)( (id, attr, msg) = (msg ++ attr), edge = { if (edge.srcId != edge.dstId) { Iterator((edge.dstId, (edge.srcAttr + edge.srcId))) } else Iterator.empty }, (a, b) = (a ++ b)) parentGraph.vertices.collect.foreach(println(_)) Output : (4,Set(1, 2, 3)) (1,Set()) (6,Set(5, 1, 2, 3, 4)) (3,Set(1, 2)) (5,Set(1, 2, 3, 4)) (2,Set(1)) But I'm looking below output. (4,Set(5, 6)) (1,Set(2, 3, 4, 5, 6)) (6,Set()) (3,Set(4, 5, 6)) (5,Set(6)) (2,Set(3, 4, 5, 6)) Could you please correct me where I'm doing wrong. Regards, Rajesh On Tue, Mar 3, 2015 at 8:42 PM, Madabhattula Rajesh Kumar mrajaf...@gmail.com mailto:mrajaf...@gmail.com wrote: Hi Robin, Thank you for your response. Please find below my question. I have a below edge file Source Vertex Destination Vertex 1 2 2 3 3 4 4 5 5 6 6 6 In this graph 1st vertex is connected to 2nd vertex, 2nd Vertex is connected to 3rd vertex,. 6th vertex is connected to 6th vertex. So 6th vertex is a root node. Please find below graph image.png In this graph, How can I compute the 1st vertex parents like 2,3,4,5,6. Similarly 2nd vertex parents like 3,4,5,6 6th vertex parent like 6 because this is the root node. I'm planning to use pergel API but I'm not able to define messages and vertex program in that API. Could you please help me on this. Please let me know if you need more information. Regards, Rajesh On Tue, Mar 3, 2015 at 8:15 PM, Robin East robin.e...@xense.co.uk mailto:robin.e...@xense.co.uk wrote: Rajesh I'm not sure if I can help you, however I don't even understand the question. Could you restate what you are trying to do. Sent from my iPhone On 2 Mar 2015, at 11:17, Madabhattula Rajesh Kumar mrajaf...@gmail.com mailto:mrajaf...@gmail.com wrote: Hi, I have a below edge list. How to find the parents path for every vertex? Example : Vertex 1 path : 2, 3, 4, 5, 6 Vertex 2 path : 3, 4, 5, 6 Vertex 3 path : 4,5,6 vertex 4 path : 5,6 vertex 5 path : 6 Could you please let me know how to do this? (or) Any suggestion Source VertexDestination Vertex 12 23 34 45 56 Regards, Rajesh
Re: TreeNodeException: Unresolved attributes
I tried. I still get the same error. 15/03/04 09:01:50 INFO parse.ParseDriver: Parsing command: select * from TableName where value like '%Restaurant%' 15/03/04 09:01:50 INFO parse.ParseDriver: Parse Completed. 15/03/04 09:01:50 INFO metastore.HiveMetaStore: 0: get_table : db=default tbl=TableName 15/03/04 09:01:50 INFO HiveMetaStore.audit: ugi=as7339 ip=unknown-ip-addr cmd=get_table : db=default tbl=TableName results: org.apache.spark.sql.SchemaRDD = SchemaRDD[86] at RDD at SchemaRDD.scala:108 == Query Plan == == Physical Plan == org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: *, tree: 'Project [*] 'Filter ('value LIKE Restaurant) MetastoreRelation default, TableName, None On Wed, Mar 4, 2015 at 5:39 AM, Arush Kharbanda ar...@sigmoidanalytics.com wrote: Why don't you formulate a string before you pass it to the hql function (appending strings), and hql function is deprecated. You should use sql. http://spark.apache.org/docs/1.1.0/api/scala/index.html#org.apache.spark.sql.hive.HiveContext On Wed, Mar 4, 2015 at 6:15 AM, Anusha Shamanur anushas...@gmail.com wrote: Hi, I am trying to run a simple select query on a table. val restaurants=hiveCtx.hql(select * from TableName where column like '%SomeString%' ) This gives an error as below: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: *, tree: How do I solve this? -- Regards, Anusha -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com -- Regards, Anusha
Passing around SparkContext with in the Driver
Hi All, I am trying to create a class that wraps functionalities that I need; some of these functions require access to the SparkContext, which I would like to pass in. I know that the SparkContext is not seralizable, and I am not planning on passing it to worker nodes or anything, I just want to wrap some functionalities that require SparkContext's api. As a preface, I am basically using the spark shell to test the functionality of my code at the moment, so I am not sure if that plays into any of the issues I am having. Here is my current class: class MyClass(sparkContext: SparkContext) { import org.apache.spark.sql._ import org.apache.spark.rdd._ val sqlContext = new SQLContext(sparkContext) val DATA_TYPE_MAPPING = Map( int - IntegerType, double - DoubleType, float - FloatType, long - LongType, short - ShortType, binary - BinaryType, bool - BooleanType, byte - ByteType, string - StringType) //removes the first line of a text file def removeHeader(partitionIdx: Int, fileItr: Iterator[String]): Iterator[String] ={ //header line is first line in first partition if(partitionIdx == 0){ fileItr.drop(1) } fileItr } //returns back a StructType for the schema def getSchema(rawSchema: Array[String]): StructType ={ //return backs a StructField def getSchemaFieldHelper(schemaField: String): StructField ={ val schemaParts = schemaField.split(' ') StructField(schemaParts(0), DATA_TYPE_MAPPING(schemaParts(1)), true) } val structFields = rawSchema.map(column = getSchemaFieldHelper(column)) StructType(structFields) } def getRow(strRow: String): Row ={ val spRow = strRow.split(',') val tRow = spRow.map(_.trim) Row(tRow:_*) } def applySchemaToCsv(csvFile: String, includeHeader: Boolean, schemaFile: String): SchemaRDD ={ //apply schema to rdd to create schemaRDD def createSchemaRDD(csvRDD: RDD[Row], schemaStruct: StructType): SchemaRDD ={ val schemaRDD = sqlContext.applySchema(csvRDD, schemaStruct) schemaRDD } val rawSchema = sparkContext.textFile(schemaFile).collect val schema = getSchema(rawSchema) val rawCsvData = sparkContext.textFile(csvFile) //if we want to keep header from csv file if(includeHeader){ val rowRDD = rawCsvData.map(getRow) val schemaRDD = createSchemaRDD(rowRDD, schema) return schemaRDD } val csvData = rawCsvData.mapPartitionsWithIndex(removeHeader) val rowRDD = csvData.map(getRow) val schemaRDD = createSchemaRDD(rowRDD, schema) schemaRDD } } So in the spark shell I am basically creating an instance of this class and calling applySchemaToCsv like so: val test = new MyClass(sc) test.applySchemaToCsv(/tmp/myFile.csv, false, /tmp/schema.txt) What I am getting is not serializable exception: 15/03/04 09:40:56 INFO SparkContext: Created broadcast 2 from textFile at console:62 org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1435) at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:615) . . . Caused by: java.io.NotSerializableException: If I remove the class wrapper and make references to sc directly everything works. I am basically wondering what is causing the serialization issues and if I can wrap a class around these functions. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Passing-around-SparkContext-with-in-the-Driver-tp21913.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Monitoring UI for Hadoop Yarn Cluster
Hi Todd and Marcelo, Thanks for helping me. I was to able to lunch the history server on windows with out any issues. One problem I am running into right now. I always get the message no completed applications found in history server UI. But I was able to browse through these applications from Spark Master. Do you have any thoughts what could be problem? Following are my settings in spark conf file: spark.executor.extraClassPath D:\\Apache\\spark-1.2.1-bin-hadoop2\\spark-1.2.1-bin-hadoop2.4\\bin\\classes spark.eventLog.dir D:/Apache/spark-1.2.1-bin-hadoop2/spark-1.2.1-bin-hadoop2.4/bin/tmp/spark-events spark.history.fs.logDirectory D:/Apache/spark-1.2.1-bin-hadoop2/spark-1.2.1-bin-hadoop2.4/bin/tmp/spark-events Also I have attached Spark Master and Spark History server UI screen shots for convenience. And all the logs are available and I granted directory permissions to Everyone with full control. Following is the console output from History server: D:\Apache\spark-1.2.1-bin-hadoop2\spark-1.2.1-bin-hadoop2.4\binspark-class.cmd org.apache.spark.deploy.history.HistoryServer Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/03/04 08:59:42 INFO SecurityManager: Changing view acls to: skarri 15/03/04 08:59:42 INFO SecurityManager: Changing modify acls to: skarri 15/03/04 08:59:42 INFO SecurityManager: SecurityManager: authentication disabled ; ui acls disabled; users with view permissions: Set(skarri); users with modify permissions: Set(skarri) 15/03/04 08:59:49 WARN NativeCodeLoader: Unable to load native-hadoop library fo r your platform... using builtin-java classes where applicable 15/03/04 08:59:56 INFO Utils: Successfully started service on port 18080. 15/03/04 08:59:56 INFO HistoryServer: Started HistoryServer at http://skarri-lt0 5.redmond.corp.microsoft.com:18080 Regards, Srini. On Tue, Mar 3, 2015 at 11:41 AM, Marcelo Vanzin van...@cloudera.com wrote: Spark applications shown in the RM's UI should have an Application Master link when they're running. That takes you to the Spark UI for that application where you can see all the information you're looking for. If you're running a history server and add spark.yarn.historyServer.address to your config, that link will become a History link after the application is finished, and will take you to the history server to view the app's UI. On Tue, Mar 3, 2015 at 9:47 AM, Srini Karri skarri@gmail.com wrote: Hi All, I am having trouble finding data related to my requirement. Here is the context, I have tried Standalone Spark Installation on Windows, I am able to submit the logs, able to see the history of events. My question is, is it possible to achieve the same monitoring UI experience with Yarn Cluster like Viewing workers, running/completed job stages in the Web UI. Currently, if we go to our Yarn Resource manager UI, we are able to see the Spark Jobs and it's logs. But it is not as rich as Spark Standalone master UI. Is this limitation for hadoop yarn cluster or is there any way we can hook this Spark Standalone master to Yarn Cluster? Any help is highly appreciated. Regards, Srini. -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Does anyone integrate HBASE on Spark
Hi Sparkers, How do i integrate hbase on spark !!! Appreciate for replies !! Regards, Sandeep.v
Re: Error communicating with MapOutputTracker
Follow up: We re-retried, this time after *decreasing* spark.parallelism. It was set to 16000 before, (5 times the number of cores in our cluster). It is now down to 6400 (2 times the number of cores). And it got past the point where it failed before. Does the MapOutputTracker have a limit on the number of tasks it can track? On Wed, Mar 4, 2015 at 8:15 AM, Thomas Gerber thomas.ger...@radius.com wrote: Hello, We are using spark 1.2.1 on a very large cluster (100 c3.8xlarge workers). We use spark-submit to start an application. We got the following error which leads to a failed stage: Job aborted due to stage failure: Task 3095 in stage 140.0 failed 4 times, most recent failure: Lost task 3095.3 in stage 140.0 (TID 308697, ip-10-0-12-88.ec2.internal): org.apache.spark.SparkException: Error communicating with MapOutputTracker We tried the whole application again, and it failed on the same stage (but it got more tasks completed on that stage) with the same error. We then looked at executors stderr, and all show similar logs, on both runs (see below). As far as we can tell, executors and master have disk space left. *Any suggestion on where to look to understand why the communication with the MapOutputTracker fails?* Thanks Thomas In case it matters, our akka settings: spark.akka.frameSize 50 spark.akka.threads 8 // those below are 10* the default, to cope with large GCs spark.akka.timeout 1000 spark.akka.heartbeat.pauses 6 spark.akka.failure-detector.threshold 3000.0 spark.akka.heartbeat.interval 1 Appendix: executor logs, where it starts going awry 15/03/04 11:45:00 INFO CoarseGrainedExecutorBackend: Got assigned task 298525 15/03/04 11:45:00 INFO Executor: Running task 3083.0 in stage 140.0 (TID 298525) 15/03/04 11:45:00 INFO MemoryStore: ensureFreeSpace(1473) called with curMem=5543008799, maxMem=18127202549 15/03/04 11:45:00 INFO MemoryStore: Block broadcast_339_piece0 stored as bytes in memory (estimated size 1473.0 B, free 11.7 GB) 15/03/04 11:45:00 INFO BlockManagerMaster: Updated info of block broadcast_339_piece0 15/03/04 11:45:00 INFO TorrentBroadcast: Reading broadcast variable 339 took 224 ms 15/03/04 11:45:00 INFO MemoryStore: ensureFreeSpace(2536) called with curMem=5543010272, maxMem=18127202549 15/03/04 11:45:00 INFO MemoryStore: Block broadcast_339 stored as values in memory (estimated size 2.5 KB, free 11.7 GB) 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Doing the fetch; tracker actor = Actor[akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:52380/user/MapOutputTracker#-2057016370] 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them
Spark logs in standalone clusters
Hello, I was wondering where all the logs files were located on a standalone cluster: 1. the executor logs are in the work directory on each slave machine (stdout/stderr) - I've notice that GC information is in stdout, and stage information in stderr - *Could we get more information on what is written in stdout vs stderr?* 2. the master log - The path to the log file is shown went you launch the master, like /mnt/var/log/apps/spark-hadoop-org.apache.spark.deploy.master.Master-MACHINENAME.out; - *Could we get more information on where this path is configured?* 3. driver logs - It seems they are only in the console by default (although you can override that in the log4j.properties file. 4. communication manager logs? - *Are there any logs for the communication manager (aka the MapOutputTracker?)?* 5. Any other log file? Thanks, Thomas
configure number of cached partition in memory on SparkSQL
Hi, I am tuning a hive dataset on Spark SQL deployed via thrift server. How can I change the number of partitions after caching the table on thrift server? I have tried the following but still getting the same number of partitions after caching: Spark.default.parallelism spark.sql.inMemoryColumnarStorage.batchSize Thanks, Judy
Re: Spark Monitoring UI for Hadoop Yarn Cluster
Yes. I do see files, actually I missed copying the other settings: spark.master spark:// skarri-lt05.redmond.corp.microsoft.com:7077 spark.eventLog.enabled true spark.rdd.compress true spark.storage.memoryFraction 1 spark.core.connection.ack.wait.timeout 6000 spark.akka.frameSize 50 spark.executor.extraClassPath D:\\Apache\\spark-1.2.1-bin-hadoop2\\spark-1.2.1-bin-hadoop2.4\\bin\\classes spark.eventLog.dir D:/Apache/spark-1.2.1-bin-hadoop2/spark-1.2.1-bin-hadoop2.4/bin/tmp/spark-events spark.history.fs.logDirectory D:/Apache/spark-1.2.1-bin-hadoop2/spark-1.2.1-bin-hadoop2.4/bin/tmp/spark-events On Wed, Mar 4, 2015 at 10:15 AM, Marcelo Vanzin van...@cloudera.com wrote: On Wed, Mar 4, 2015 at 10:08 AM, Srini Karri skarri@gmail.com wrote: spark.executor.extraClassPath D:\\Apache\\spark-1.2.1-bin-hadoop2\\spark-1.2.1-bin-hadoop2.4\\bin\\classes spark.eventLog.dir D:/Apache/spark-1.2.1-bin-hadoop2/spark-1.2.1-bin-hadoop2.4/bin/tmp/spark-events spark.history.fs.logDirectory D:/Apache/spark-1.2.1-bin-hadoop2/spark-1.2.1-bin-hadoop2.4/bin/tmp/spark-events Do you see any files in that directory? spark.eventLog.dir won't do anything unless you also have spark.eventLog.enabled=true somewhere. And these are application configs, so make sure they're set when running your application (not when starting the history server). -- Marcelo
Re: Error communicating with MapOutputTracker
I meant spark.default.parallelism of course. On Wed, Mar 4, 2015 at 10:24 AM, Thomas Gerber thomas.ger...@radius.com wrote: Follow up: We re-retried, this time after *decreasing* spark.parallelism. It was set to 16000 before, (5 times the number of cores in our cluster). It is now down to 6400 (2 times the number of cores). And it got past the point where it failed before. Does the MapOutputTracker have a limit on the number of tasks it can track? On Wed, Mar 4, 2015 at 8:15 AM, Thomas Gerber thomas.ger...@radius.com wrote: Hello, We are using spark 1.2.1 on a very large cluster (100 c3.8xlarge workers). We use spark-submit to start an application. We got the following error which leads to a failed stage: Job aborted due to stage failure: Task 3095 in stage 140.0 failed 4 times, most recent failure: Lost task 3095.3 in stage 140.0 (TID 308697, ip-10-0-12-88.ec2.internal): org.apache.spark.SparkException: Error communicating with MapOutputTracker We tried the whole application again, and it failed on the same stage (but it got more tasks completed on that stage) with the same error. We then looked at executors stderr, and all show similar logs, on both runs (see below). As far as we can tell, executors and master have disk space left. *Any suggestion on where to look to understand why the communication with the MapOutputTracker fails?* Thanks Thomas In case it matters, our akka settings: spark.akka.frameSize 50 spark.akka.threads 8 // those below are 10* the default, to cope with large GCs spark.akka.timeout 1000 spark.akka.heartbeat.pauses 6 spark.akka.failure-detector.threshold 3000.0 spark.akka.heartbeat.interval 1 Appendix: executor logs, where it starts going awry 15/03/04 11:45:00 INFO CoarseGrainedExecutorBackend: Got assigned task 298525 15/03/04 11:45:00 INFO Executor: Running task 3083.0 in stage 140.0 (TID 298525) 15/03/04 11:45:00 INFO MemoryStore: ensureFreeSpace(1473) called with curMem=5543008799, maxMem=18127202549 15/03/04 11:45:00 INFO MemoryStore: Block broadcast_339_piece0 stored as bytes in memory (estimated size 1473.0 B, free 11.7 GB) 15/03/04 11:45:00 INFO BlockManagerMaster: Updated info of block broadcast_339_piece0 15/03/04 11:45:00 INFO TorrentBroadcast: Reading broadcast variable 339 took 224 ms 15/03/04 11:45:00 INFO MemoryStore: ensureFreeSpace(2536) called with curMem=5543010272, maxMem=18127202549 15/03/04 11:45:00 INFO MemoryStore: Block broadcast_339 stored as values in memory (estimated size 2.5 KB, free 11.7 GB) 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Doing the fetch; tracker actor = Actor[akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:52380/user/MapOutputTracker#-2057016370] 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 18, fetching them 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs
Re: Spark Monitoring UI for Hadoop Yarn Cluster
On Wed, Mar 4, 2015 at 10:08 AM, Srini Karri skarri@gmail.com wrote: spark.executor.extraClassPath D:\\Apache\\spark-1.2.1-bin-hadoop2\\spark-1.2.1-bin-hadoop2.4\\bin\\classes spark.eventLog.dir D:/Apache/spark-1.2.1-bin-hadoop2/spark-1.2.1-bin-hadoop2.4/bin/tmp/spark-events spark.history.fs.logDirectory D:/Apache/spark-1.2.1-bin-hadoop2/spark-1.2.1-bin-hadoop2.4/bin/tmp/spark-events Do you see any files in that directory? spark.eventLog.dir won't do anything unless you also have spark.eventLog.enabled=true somewhere. And these are application configs, so make sure they're set when running your application (not when starting the history server). -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark sql median and standard deviation
Hello, Is there in built function for getting median and standard deviation in spark sql? Currently I am converting the schemaRdd to DoubleRdd and calling doubleRDD.stats(). But still it does not have median. What is the most efficient way to get the median? Thanks Regards Tridib -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-median-and-standard-deviation-tp21914.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Monitoring UI for Hadoop Yarn Cluster
Hi Marcelo, I found the problem from http://mail-archives.apache.org/mod_mbox/spark-user/201409.mbox/%3cCAL+LEBfzzjugOoB2iFFdz_=9TQsH=DaiKY=cvydfydg3ac5...@mail.gmail.com%3e this link. The problem is the application I am running, is not generating APPLICATION_COMPLETE file. If I add this file manually it is showing application in the UI. So the problem is with application which is not calling Stop method on the spark context. Thank you and Todd for helping. Hopefully I will be able to apply these on the actual cluster. Regards, Srini. On Wed, Mar 4, 2015 at 10:20 AM, Srini Karri skarri@gmail.com wrote: Yes. I do see files, actually I missed copying the other settings: spark.master spark:// skarri-lt05.redmond.corp.microsoft.com:7077 spark.eventLog.enabled true spark.rdd.compress true spark.storage.memoryFraction 1 spark.core.connection.ack.wait.timeout 6000 spark.akka.frameSize 50 spark.executor.extraClassPath D:\\Apache\\spark-1.2.1-bin-hadoop2\\spark-1.2.1-bin-hadoop2.4\\bin\\classes spark.eventLog.dir D:/Apache/spark-1.2.1-bin-hadoop2/spark-1.2.1-bin-hadoop2.4/bin/tmp/spark-events spark.history.fs.logDirectory D:/Apache/spark-1.2.1-bin-hadoop2/spark-1.2.1-bin-hadoop2.4/bin/tmp/spark-events On Wed, Mar 4, 2015 at 10:15 AM, Marcelo Vanzin van...@cloudera.com wrote: On Wed, Mar 4, 2015 at 10:08 AM, Srini Karri skarri@gmail.com wrote: spark.executor.extraClassPath D:\\Apache\\spark-1.2.1-bin-hadoop2\\spark-1.2.1-bin-hadoop2.4\\bin\\classes spark.eventLog.dir D:/Apache/spark-1.2.1-bin-hadoop2/spark-1.2.1-bin-hadoop2.4/bin/tmp/spark-events spark.history.fs.logDirectory D:/Apache/spark-1.2.1-bin-hadoop2/spark-1.2.1-bin-hadoop2.4/bin/tmp/spark-events Do you see any files in that directory? spark.eventLog.dir won't do anything unless you also have spark.eventLog.enabled=true somewhere. And these are application configs, so make sure they're set when running your application (not when starting the history server). -- Marcelo
Re: Save and read parquet from the same path
No, this is not safe to do. On Wed, Mar 4, 2015 at 7:14 AM, Karlson ksonsp...@siberie.de wrote: Hi all, what would happen if I save a RDD via saveAsParquetFile to the same path that RDD is originally read from? Is that a safe thing to do in Pyspark? Thanks! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL Static Analysis
Thanks! On Wed, Mar 4, 2015 at 3:58 PM, Michael Armbrust mich...@databricks.com wrote: It is somewhat out of data, but here is what we have so far: https://github.com/marmbrus/sql-typed On Wed, Mar 4, 2015 at 12:53 PM, Justin Pihony justin.pih...@gmail.com wrote: I am pretty sure that I saw a presentation where SparkSQL could be executed with static analysis, however I cannot find the presentation now, nor can I find any documentation or research papers on the topic. So, I am curious if there is indeed any work going on for this topic. The two things I would be interested in would be to be able to gain compile time safety, as well as gain the ability to work on my data as a type instead of a row (ie, result.map(x=x.Age) instead of having to use Row.get) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Static-Analysis-tp21918.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Driver disassociated
Hello, sometimes, in the *middle* of a job, the job stops (status is then seen as FINISHED in the master). There isn't anything wrong in the shell/submit output. When looking at the executor logs, I see logs like this: 15/03/04 21:24:51 INFO MapOutputTrackerWorker: Doing the fetch; tracker actor = Actor[akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal :40019/user/MapOutputTracker#893807065] 15/03/04 21:24:51 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 38, fetching them 15/03/04 21:24:55 ERROR CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkExecutor@ip-10-0-11-9.ec2.internal:54766] - [akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:40019] disassociated! Shutting down. 15/03/04 21:24:55 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:40019] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. How can I investigate further? Thanks
Re: spark sql median and standard deviation
Please take a look at DoubleRDDFunctions.scala : /** Compute the mean of this RDD's elements. */ def mean(): Double = stats().mean /** Compute the variance of this RDD's elements. */ def variance(): Double = stats().variance /** Compute the standard deviation of this RDD's elements. */ def stdev(): Double = stats().stdev Cheers On Wed, Mar 4, 2015 at 10:51 AM, tridib tridib.sama...@live.com wrote: Hello, Is there in built function for getting median and standard deviation in spark sql? Currently I am converting the schemaRdd to DoubleRdd and calling doubleRDD.stats(). But still it does not have median. What is the most efficient way to get the median? Thanks Regards Tridib -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-median-and-standard-deviation-tp21914.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Integer column in schema RDD from parquet being considered as string
Hi , I am coverting jsonRDD to parquet by saving it as parquet file (saveAsParquetFile) cacheContext.jsonFile(file:///u1/sample.json).saveAsParquetFile(sample.parquet) I am reading parquet file and registering it as a table : val parquet = cacheContext.parquetFile(sample_trades.parquet) parquet.registerTempTable(sample) When I do a print schema , I see : root |-- SAMPLE: struct (nullable = true) ||-- CODE: integer (nullable = true) ||-- DESC: string (nullable = true) When I query : cacheContext.sql(select SAMPLE.DESC from sample where SAMPLE.CODE=1).map(t=t).collect.foreach(println) , I get error that java.lang.IllegalArgumentException: Column [CODE] was not found in schema! but if I put SAMPLE.CODE in single code (forcing it as string) , it works , for example : cacheContext.sql(select SAMPLE.DESC from sample where *SAMPLE.CODE='1'*).map(t=t).collect.foreach(println) works What am I missing here ? I understand catalyst will do optimization so data type doesn't matter that much , but something is off here . Regards, Gaurav -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Integer-column-in-schema-RDD-from-parquet-being-considered-as-string-tp21917.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark SQL Static Analysis
I am pretty sure that I saw a presentation where SparkSQL could be executed with static analysis, however I cannot find the presentation now, nor can I find any documentation or research papers on the topic. So, I am curious if there is indeed any work going on for this topic. The two things I would be interested in would be to be able to gain compile time safety, as well as gain the ability to work on my data as a type instead of a row (ie, result.map(x=x.Age) instead of having to use Row.get) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Static-Analysis-tp21918.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL Static Analysis
It is somewhat out of data, but here is what we have so far: https://github.com/marmbrus/sql-typed On Wed, Mar 4, 2015 at 12:53 PM, Justin Pihony justin.pih...@gmail.com wrote: I am pretty sure that I saw a presentation where SparkSQL could be executed with static analysis, however I cannot find the presentation now, nor can I find any documentation or research papers on the topic. So, I am curious if there is indeed any work going on for this topic. The two things I would be interested in would be to be able to gain compile time safety, as well as gain the ability to work on my data as a type instead of a row (ie, result.map(x=x.Age) instead of having to use Row.get) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Static-Analysis-tp21918.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Does anyone integrate HBASE on Spark
Hi, There are some examples in spark/example https://github.com/apache/spark/tree/master/examples and there are also some examples in spark package http://spark-packages.org/. And I find this blog http://www.abcn.net/2014/07/lighting-spark-with-hbase-full-edition.html is quite good. Hope it would be helpful Cheers Gen On Wed, Mar 4, 2015 at 6:51 PM, sandeep vura sandeepv...@gmail.com wrote: Hi Sparkers, How do i integrate hbase on spark !!! Appreciate for replies !! Regards, Sandeep.v
Re: issue Running Spark Job on Yarn Cluster
look at the logs yarn logs --applicationId applicationId That should give the error. On Wed, Mar 4, 2015 at 9:21 AM, sachin Singh sachin.sha...@gmail.com wrote: Not yet, Please let. Me know if you found solution, Regards Sachin On 4 Mar 2015 21:45, mael2210 [via Apache Spark User List] [hidden email] http:///user/SendEmail.jtp?type=nodenode=21912i=0 wrote: Hello, I am facing the exact same issue. Could you solve the problem ? Kind regards -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/issue-Running-Spark-Job-on-Yarn-Cluster-tp21697p21909.html To unsubscribe from issue Running Spark Job on Yarn Cluster, click here. NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: Re: issue Running Spark Job on Yarn Cluster http://apache-spark-user-list.1001560.n3.nabble.com/issue-Running-Spark-Job-on-Yarn-Cluster-tp21697p21912.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Issues with maven dependencies for version 1.2.0 but not version 1.1.0
Hi All, I am currently having problem with the maven dependencies for version 1.2.0 of spark-core and spark-hive. Here are my dependencies: dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.2.0/version /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-hive_2.10/artifactId version1.2.0/version /dependency When the dependencies are set to version 1.1.0, I do not get any errors. Here are the errors I am getting from artifactory for version 1.2.0 of spark-core: error=Could not transfer artifact org.apache.spark\:spark-core_2.10\:pom\:1.2.0 from/to repo (https\://m2.mines.com\:8443/artifactory/repo)\: Failed to transfer file\: https\://m2.mines.com\:8443/artifactory/repo/org/apache/spark/spark-core_2.10/1.2.0/spark-core_2.10-1.2.0.pom. Return code is\: 409 , ReasonPhrase\:Conflict. The error is the same for spark-hive. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Issues-with-maven-dependencies-for-version-1-2-0-but-not-version-1-1-0-tp21919.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Issues with maven dependencies for version 1.2.0 but not version 1.1.0
Hi Kevin, If you're using CDH, I'd recommend using the CDH repo [1], and also the CDH version when building your app. [1] http://www.cloudera.com/content/cloudera/en/documentation/core/v5-2-x/topics/cdh_vd_cdh5_maven_repo.html On Wed, Mar 4, 2015 at 4:34 PM, Kevin Peng kpe...@gmail.com wrote: Ted, I am currently using CDH 5.3 distro, which has Spark 1.2.0, so I am not too sure about the compatibility issues between 1.2.0 and 1.2.1, that is why I would want to stick to 1.2.0. On Wed, Mar 4, 2015 at 4:25 PM, Ted Yu yuzhih...@gmail.com wrote: Kevin: You can try with 1.2.1 See this thread: http://search-hadoop.com/m/JW1q5Vfe6X1 Cheers On Wed, Mar 4, 2015 at 4:18 PM, Kevin Peng kpe...@gmail.com wrote: Marcelo, Yes that is correct, I am going through a mirror, but 1.1.0 works properly, while 1.2.0 does not. I suspect there is crc in the 1.2.0 pom file. On Wed, Mar 4, 2015 at 4:10 PM, Marcelo Vanzin van...@cloudera.com wrote: Seems like someone set up m2.mines.com as a mirror in your pom file or ~/.m2/settings.xml, and it doesn't mirror Spark 1.2 (or does but is in a messed up state). On Wed, Mar 4, 2015 at 3:49 PM, kpeng1 kpe...@gmail.com wrote: Hi All, I am currently having problem with the maven dependencies for version 1.2.0 of spark-core and spark-hive. Here are my dependencies: dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.2.0/version /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-hive_2.10/artifactId version1.2.0/version /dependency When the dependencies are set to version 1.1.0, I do not get any errors. Here are the errors I am getting from artifactory for version 1.2.0 of spark-core: error=Could not transfer artifact org.apache.spark\:spark-core_2.10\:pom\:1.2.0 from/to repo (https\://m2.mines.com\:8443/artifactory/repo)\: Failed to transfer file\: https\://m2.mines.com\:8443/artifactory/repo/org/apache/spark/spark-core_2.10/1.2.0/spark-core_2.10-1.2.0.pom. Return code is\: 409 , ReasonPhrase\:Conflict. The error is the same for spark-hive. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Issues-with-maven-dependencies-for-version-1-2-0-but-not-version-1-1-0-tp21919.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Marcelo -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark master shut down suddenly
I ‘m sorry, but how to look at the mesos logs? where are them? 在 2015年3月4日,下午6:06,Akhil Das ak...@sigmoidanalytics.com 写道: You can check in the mesos logs and see whats really happening. Thanks Best Regards On Wed, Mar 4, 2015 at 3:10 PM, lisendong lisend...@163.com mailto:lisend...@163.com wrote: 15/03/04 09:26:36 INFO ClientCnxn: Client session timed out, have not heard from server in 26679ms for sessionid 0x34bbf3313a8001b, closing socket connection and attempting reconnect 15/03/04 09:26:36 INFO ConnectionStateManager: State change: SUSPENDED 15/03/04 09:26:36 INFO ZooKeeperLeaderElectionAgent: We have lost leadership 15/03/04 09:26:36 ERROR Master: Leadership has been revoked -- master shutting down. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-master-shut-down-suddenly-tp21907.html http://apache-spark-user-list.1001560.n3.nabble.com/spark-master-shut-down-suddenly-tp21907.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org
distribution of receivers in spark streaming
Hi, I have a set of machines (say 5) and want to evenly launch a number (say 8) of kafka receivers on those machines. In my code I did something like the following, as suggested in the spark docs: val streams = (1 to numReceivers).map(_ = ssc.receiverStream(new MyKafkaReceiver())) ssc.union(streams) However, from the spark UI, I saw that some machines are not running any instance of the receiver while some get three. The mapping changed every time the system was restarted. This impacts the receiving and also the processing speeds. I wonder if it's possible to control/suggest the distribution so that it would be more even. How is the decision made in spark? Thanks,Du
Re: Issues with maven dependencies for version 1.2.0 but not version 1.1.0
Seems like someone set up m2.mines.com as a mirror in your pom file or ~/.m2/settings.xml, and it doesn't mirror Spark 1.2 (or does but is in a messed up state). On Wed, Mar 4, 2015 at 3:49 PM, kpeng1 kpe...@gmail.com wrote: Hi All, I am currently having problem with the maven dependencies for version 1.2.0 of spark-core and spark-hive. Here are my dependencies: dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.2.0/version /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-hive_2.10/artifactId version1.2.0/version /dependency When the dependencies are set to version 1.1.0, I do not get any errors. Here are the errors I am getting from artifactory for version 1.2.0 of spark-core: error=Could not transfer artifact org.apache.spark\:spark-core_2.10\:pom\:1.2.0 from/to repo (https\://m2.mines.com\:8443/artifactory/repo)\: Failed to transfer file\: https\://m2.mines.com\:8443/artifactory/repo/org/apache/spark/spark-core_2.10/1.2.0/spark-core_2.10-1.2.0.pom. Return code is\: 409 , ReasonPhrase\:Conflict. The error is the same for spark-hive. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Issues-with-maven-dependencies-for-version-1-2-0-but-not-version-1-1-0-tp21919.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Driver disassociated
Also, I was experiencing another problem which might be related: Error communicating with MapOutputTracker (see email in the ML today). I just thought I would mention it in case it is relevant. On Wed, Mar 4, 2015 at 4:07 PM, Thomas Gerber thomas.ger...@radius.com wrote: 1.2.1 Also, I was using the following parameters, which are 10 times the default ones: spark.akka.timeout 1000 spark.akka.heartbeat.pauses 6 spark.akka.failure-detector.threshold 3000.0 spark.akka.heartbeat.interval 1 which should have helped *avoid* the problem if I understand correctly. Thanks, Thomas On Wed, Mar 4, 2015 at 3:21 PM, Ted Yu yuzhih...@gmail.com wrote: What release are you using ? SPARK-3923 went into 1.2.0 release. Cheers On Wed, Mar 4, 2015 at 1:39 PM, Thomas Gerber thomas.ger...@radius.com wrote: Hello, sometimes, in the *middle* of a job, the job stops (status is then seen as FINISHED in the master). There isn't anything wrong in the shell/submit output. When looking at the executor logs, I see logs like this: 15/03/04 21:24:51 INFO MapOutputTrackerWorker: Doing the fetch; tracker actor = Actor[akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal :40019/user/MapOutputTracker#893807065] 15/03/04 21:24:51 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 38, fetching them 15/03/04 21:24:55 ERROR CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkExecutor@ip-10-0-11-9.ec2.internal:54766] - [akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:40019] disassociated! Shutting down. 15/03/04 21:24:55 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:40019] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. How can I investigate further? Thanks
RE: Where can I find more information about the R interface forSpark?
Thanks, it's an active project. Will it be released with Spark 1.3.0? From: 鹰 [mailto:980548...@qq.com] Sent: Thursday, March 05, 2015 11:19 AM To: Haopu Wang; user Subject: Re: Where can I find more information about the R interface forSpark? you can search SparkR on google or search it on github
Re: RDD coalesce or repartition by #records or #bytes?
It use HashPartitioner to distribute the record to different partitions, but the key is just integer evenly across output partitions. From the code, each resulting partition will get very similar number of records. Thanks. Zhan Zhang On Mar 4, 2015, at 3:47 PM, Du Li l...@yahoo-inc.com.INVALIDmailto:l...@yahoo-inc.com.INVALID wrote: Hi, My RDD's are created from kafka stream. After receiving a RDD, I want to do coalesce/repartition it so that the data will be processed in a set of machines in parallel as even as possible. The number of processing nodes is larger than the receiving nodes. My question is how the coalesce/repartition works. Does it distribute by the number of records or number of bytes? In my app, my observation is that the distribution seems by number of records. The consequence is, however, some executors have to process x1000 as much as data when the sizes of records are very skewed. Then we have to allocate memory by the worst case. Is there a way to programmatically affect the coalesce /repartition scheme? Thanks, Du
Extra output from Spark run
When I run Spark 1.2.1, I found these display that wasn't in the previous releases: [Stage 12:= (6 + 1) / 16] [Stage 12:(8 + 1) / 16] [Stage 12:== (11 + 1) / 16] [Stage 12:= (14 + 1) / 16] What do they mean and how can I get rid of them? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Extra-output-from-Spark-run-tp21920.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Where can I find more information about the R interface for Spark?
Do you have any update on SparkR? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Where-can-I-find-more-information-about-the-R-interface-for-Spark-tp155p21922.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Issues with maven dependencies for version 1.2.0 but not version 1.1.0
Marcelo, Thanks. The one in the CDH repo fixed it :) On Wed, Mar 4, 2015 at 4:37 PM, Marcelo Vanzin van...@cloudera.com wrote: Hi Kevin, If you're using CDH, I'd recommend using the CDH repo [1], and also the CDH version when building your app. [1] http://www.cloudera.com/content/cloudera/en/documentation/core/v5-2-x/topics/cdh_vd_cdh5_maven_repo.html On Wed, Mar 4, 2015 at 4:34 PM, Kevin Peng kpe...@gmail.com wrote: Ted, I am currently using CDH 5.3 distro, which has Spark 1.2.0, so I am not too sure about the compatibility issues between 1.2.0 and 1.2.1, that is why I would want to stick to 1.2.0. On Wed, Mar 4, 2015 at 4:25 PM, Ted Yu yuzhih...@gmail.com wrote: Kevin: You can try with 1.2.1 See this thread: http://search-hadoop.com/m/JW1q5Vfe6X1 Cheers On Wed, Mar 4, 2015 at 4:18 PM, Kevin Peng kpe...@gmail.com wrote: Marcelo, Yes that is correct, I am going through a mirror, but 1.1.0 works properly, while 1.2.0 does not. I suspect there is crc in the 1.2.0 pom file. On Wed, Mar 4, 2015 at 4:10 PM, Marcelo Vanzin van...@cloudera.com wrote: Seems like someone set up m2.mines.com as a mirror in your pom file or ~/.m2/settings.xml, and it doesn't mirror Spark 1.2 (or does but is in a messed up state). On Wed, Mar 4, 2015 at 3:49 PM, kpeng1 kpe...@gmail.com wrote: Hi All, I am currently having problem with the maven dependencies for version 1.2.0 of spark-core and spark-hive. Here are my dependencies: dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.2.0/version /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-hive_2.10/artifactId version1.2.0/version /dependency When the dependencies are set to version 1.1.0, I do not get any errors. Here are the errors I am getting from artifactory for version 1.2.0 of spark-core: error=Could not transfer artifact org.apache.spark\:spark-core_2.10\:pom\:1.2.0 from/to repo (https\://m2.mines.com\:8443/artifactory/repo)\: Failed to transfer file\: https\://m2.mines.com \:8443/artifactory/repo/org/apache/spark/spark-core_2.10/1.2.0/spark-core_2.10-1.2.0.pom. Return code is\: 409 , ReasonPhrase\:Conflict. The error is the same for spark-hive. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Issues-with-maven-dependencies-for-version-1-2-0-but-not-version-1-1-0-tp21919.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Marcelo -- Marcelo
Re: Where can I find more information about the R interface forSpark?
you can search SparkR on google or search it on github
Re: Driver disassociated
What release are you using ? SPARK-3923 went into 1.2.0 release. Cheers On Wed, Mar 4, 2015 at 1:39 PM, Thomas Gerber thomas.ger...@radius.com wrote: Hello, sometimes, in the *middle* of a job, the job stops (status is then seen as FINISHED in the master). There isn't anything wrong in the shell/submit output. When looking at the executor logs, I see logs like this: 15/03/04 21:24:51 INFO MapOutputTrackerWorker: Doing the fetch; tracker actor = Actor[akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal :40019/user/MapOutputTracker#893807065] 15/03/04 21:24:51 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 38, fetching them 15/03/04 21:24:55 ERROR CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkExecutor@ip-10-0-11-9.ec2.internal:54766] - [akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:40019] disassociated! Shutting down. 15/03/04 21:24:55 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:40019] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. How can I investigate further? Thanks
Re: Issues with maven dependencies for version 1.2.0 but not version 1.1.0
Ted, I am currently using CDH 5.3 distro, which has Spark 1.2.0, so I am not too sure about the compatibility issues between 1.2.0 and 1.2.1, that is why I would want to stick to 1.2.0. On Wed, Mar 4, 2015 at 4:25 PM, Ted Yu yuzhih...@gmail.com wrote: Kevin: You can try with 1.2.1 See this thread: http://search-hadoop.com/m/JW1q5Vfe6X1 Cheers On Wed, Mar 4, 2015 at 4:18 PM, Kevin Peng kpe...@gmail.com wrote: Marcelo, Yes that is correct, I am going through a mirror, but 1.1.0 works properly, while 1.2.0 does not. I suspect there is crc in the 1.2.0 pom file. On Wed, Mar 4, 2015 at 4:10 PM, Marcelo Vanzin van...@cloudera.com wrote: Seems like someone set up m2.mines.com as a mirror in your pom file or ~/.m2/settings.xml, and it doesn't mirror Spark 1.2 (or does but is in a messed up state). On Wed, Mar 4, 2015 at 3:49 PM, kpeng1 kpe...@gmail.com wrote: Hi All, I am currently having problem with the maven dependencies for version 1.2.0 of spark-core and spark-hive. Here are my dependencies: dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.2.0/version /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-hive_2.10/artifactId version1.2.0/version /dependency When the dependencies are set to version 1.1.0, I do not get any errors. Here are the errors I am getting from artifactory for version 1.2.0 of spark-core: error=Could not transfer artifact org.apache.spark\:spark-core_2.10\:pom\:1.2.0 from/to repo (https\://m2.mines.com\:8443/artifactory/repo)\: Failed to transfer file\: https\://m2.mines.com \:8443/artifactory/repo/org/apache/spark/spark-core_2.10/1.2.0/spark-core_2.10-1.2.0.pom. Return code is\: 409 , ReasonPhrase\:Conflict. The error is the same for spark-hive. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Issues-with-maven-dependencies-for-version-1-2-0-but-not-version-1-1-0-tp21919.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Marcelo
RDD coalesce or repartition by #records or #bytes?
Hi, My RDD's are created from kafka stream. After receiving a RDD, I want to do coalesce/repartition it so that the data will be processed in a set of machines in parallel as even as possible. The number of processing nodes is larger than the receiving nodes. My question is how the coalesce/repartition works. Does it distribute by the number of records or number of bytes? In my app, my observation is that the distribution seems by number of records. The consequence is, however, some executors have to process x1000 as much as data when the sizes of records are very skewed. Then we have to allocate memory by the worst case. Is there a way to programmatically affect the coalesce /repartition scheme? Thanks,Du
how to update als in mllib?
I 'm using spark1.0.0 with cloudera. but I want to use new als code which supports more features, such as rdd cache level(MEMORY ONLY), checkpoint, and so on. What is the easiest way to use the new als code? I only need the mllib als code, so maybe I don't need to update all the spark mllib of the cluster machines... maybe I download a new spark jar, and include it in my driver is enough? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-update-als-in-mllib-tp21921.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Driver disassociated
1.2.1 Also, I was using the following parameters, which are 10 times the default ones: spark.akka.timeout 1000 spark.akka.heartbeat.pauses 6 spark.akka.failure-detector.threshold 3000.0 spark.akka.heartbeat.interval 1 which should have helped *avoid* the problem if I understand correctly. Thanks, Thomas On Wed, Mar 4, 2015 at 3:21 PM, Ted Yu yuzhih...@gmail.com wrote: What release are you using ? SPARK-3923 went into 1.2.0 release. Cheers On Wed, Mar 4, 2015 at 1:39 PM, Thomas Gerber thomas.ger...@radius.com wrote: Hello, sometimes, in the *middle* of a job, the job stops (status is then seen as FINISHED in the master). There isn't anything wrong in the shell/submit output. When looking at the executor logs, I see logs like this: 15/03/04 21:24:51 INFO MapOutputTrackerWorker: Doing the fetch; tracker actor = Actor[akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal :40019/user/MapOutputTracker#893807065] 15/03/04 21:24:51 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 38, fetching them 15/03/04 21:24:55 ERROR CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkExecutor@ip-10-0-11-9.ec2.internal:54766] - [akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:40019] disassociated! Shutting down. 15/03/04 21:24:55 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:40019] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. How can I investigate further? Thanks
Re: Having lots of FetchFailedException in join
One really interesting is that when I'm using the netty-based spark.shuffle.blockTransferService, there's no OOM error messages (java.lang.OutOfMemoryError: Java heap space). Any idea why it's not here? I'm using Spark 1.2.1. Jianshi On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: I changed spark.shuffle.blockTransferService to nio and now I'm getting OOM errors, I'm doing a big join operation. 15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0 (TID 6207) java.lang.OutOfMemoryError: Java heap space at org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142) at org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178) at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122) at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121) at org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138) at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) Is join/cogroup still memory bound? Jianshi On Wed, Mar 4, 2015 at 2:11 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hmm... ok, previous errors are still block fetch errors. 15/03/03 10:22:40 ERROR RetryingBlockFetcher: Exception while beginning fetch of 11 outstanding blocks java.io.IOException: Failed to connect to host-/:55597 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78) at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87) at org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:149) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:289) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:53) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at
Re: spark master shut down suddenly
It depends on your setup but one of the locations is /var/log/mesos On Wed, Mar 4, 2015 at 19:11 lisendong lisend...@163.com wrote: I ‘m sorry, but how to look at the mesos logs? where are them? 在 2015年3月4日,下午6:06,Akhil Das ak...@sigmoidanalytics.com 写道: You can check in the mesos logs and see whats really happening. Thanks Best Regards On Wed, Mar 4, 2015 at 3:10 PM, lisendong lisend...@163.com wrote: 15/03/04 09:26:36 INFO ClientCnxn: Client session timed out, have not heard from server in 26679ms for sessionid 0x34bbf3313a8001b, closing socket connection and attempting reconnect 15/03/04 09:26:36 INFO ConnectionStateManager: State change: SUSPENDED 15/03/04 09:26:36 INFO ZooKeeperLeaderElectionAgent: We have lost leadership 15/03/04 09:26:36 ERROR Master: Leadership has been revoked -- master shutting down. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-master-shut-down-suddenly-tp21907.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: distribution of receivers in spark streaming
Figured it out: I need to override method preferredLocation() in MyReceiver class. On Wednesday, March 4, 2015 3:35 PM, Du Li l...@yahoo-inc.com.INVALID wrote: Hi, I have a set of machines (say 5) and want to evenly launch a number (say 8) of kafka receivers on those machines. In my code I did something like the following, as suggested in the spark docs: val streams = (1 to numReceivers).map(_ = ssc.receiverStream(new MyKafkaReceiver())) ssc.union(streams) However, from the spark UI, I saw that some machines are not running any instance of the receiver while some get three. The mapping changed every time the system was restarted. This impacts the receiving and also the processing speeds. I wonder if it's possible to control/suggest the distribution so that it would be more even. How is the decision made in spark? Thanks,Du
RE: distribution of receivers in spark streaming
Hi Du, You could try to sleep for several seconds after creating streaming context to let all the executors registered, then all the receivers can distribute to the nodes more evenly. Also setting locality is another way as you mentioned. Thanks Jerry From: Du Li [mailto:l...@yahoo-inc.com.INVALID] Sent: Thursday, March 5, 2015 1:50 PM To: User Subject: Re: distribution of receivers in spark streaming Figured it out: I need to override method preferredLocation() in MyReceiver class. On Wednesday, March 4, 2015 3:35 PM, Du Li l...@yahoo-inc.com.INVALIDmailto:l...@yahoo-inc.com.INVALID wrote: Hi, I have a set of machines (say 5) and want to evenly launch a number (say 8) of kafka receivers on those machines. In my code I did something like the following, as suggested in the spark docs: val streams = (1 to numReceivers).map(_ = ssc.receiverStream(new MyKafkaReceiver())) ssc.union(streams) However, from the spark UI, I saw that some machines are not running any instance of the receiver while some get three. The mapping changed every time the system was restarted. This impacts the receiving and also the processing speeds. I wonder if it's possible to control/suggest the distribution so that it would be more even. How is the decision made in spark? Thanks, Du
Re: Having lots of FetchFailedException in join
There're some skew. 6461640SUCCESSPROCESS_LOCAL200 / 2015/03/04 23:45:471.1 min6 s198.6 MB21.1 GB240.8 MB5961590SUCCESSPROCESS_LOCAL30 / 2015/03/04 23:45:4744 s5 s200.7 MB4.8 GB154.0 MB But I expect this kind of skewness to be quite common. Jianshi On Thu, Mar 5, 2015 at 3:48 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: I see. I'm using core's join. The data might have some skewness (checking). I understand shuffle can spill data to disk but when consuming it, say in cogroup or groupByKey, it still needs to read the whole group elements, right? I guess OOM happened there when reading very large groups. Jianshi On Thu, Mar 5, 2015 at 3:38 PM, Shao, Saisai saisai.s...@intel.com wrote: I think what you could do is to monitor through web UI to see if there’s any skew or other symptoms in shuffle write and read. For GC you could use the below configuration as you mentioned. From Spark core side, all the shuffle related operations can spill the data into disk and no need to read the whole partition into memory. But if you uses SparkSQL, it depends on how SparkSQL uses this operators. CC @hao if he has some thoughts on it. Thanks Jerry *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Thursday, March 5, 2015 3:28 PM *To:* Shao, Saisai *Cc:* user *Subject:* Re: Having lots of FetchFailedException in join Hi Saisai, What's your suggested settings on monitoring shuffle? I've enabled -XX:+PrintGCDetails -XX:+PrintGCTimeStamps for GC logging. I found SPARK-3461 (Support external groupByKey using repartitionAndSortWithinPartitions) want to make groupByKey using external storage. It's still open status. Does that mean now groupByKey/cogroup/join(implemented as cogroup + flatmap) will still read the group as a whole during consuming? How can I deal with the key skewness in joins? Is there a skew-join implementation? Jianshi On Thu, Mar 5, 2015 at 2:44 PM, Shao, Saisai saisai.s...@intel.com wrote: Hi Jianshi, From my understanding, it may not be the problem of NIO or Netty, looking at your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap), theoretically EAOM can spill the data into disk if memory is not enough, but there might some issues when join key is skewed or key number is smaller, so you will meet OOM. Maybe you could monitor each stage or task’s shuffle and GC status also system status to identify the problem. Thanks Jerry *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Thursday, March 5, 2015 2:32 PM *To:* Aaron Davidson *Cc:* user *Subject:* Re: Having lots of FetchFailedException in join One really interesting is that when I'm using the netty-based spark.shuffle.blockTransferService, there's no OOM error messages (java.lang.OutOfMemoryError: Java heap space). Any idea why it's not here? I'm using Spark 1.2.1. Jianshi On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: I changed spark.shuffle.blockTransferService to nio and now I'm getting OOM errors, I'm doing a big join operation. 15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0 (TID 6207) java.lang.OutOfMemoryError: Java heap space at org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142) at org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178) at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122) at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121) at org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138) at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at
RE: distribution of receivers in spark streaming
Yes, hostname is enough. I think currently it is hard for user code to get the worker list from standalone master. If you can get the Master object, you could get the worker list, but AFAIK may be it is difficult to get this object. All you could do is to manually get the worker list and assigned its hostname to each receiver. Thanks Jerry From: Du Li [mailto:l...@yahoo-inc.com] Sent: Thursday, March 5, 2015 2:29 PM To: Shao, Saisai; User Subject: Re: distribution of receivers in spark streaming Hi Jerry, Thanks for your response. Is there a way to get the list of currently registered/live workers? Even in order to provide preferredLocation, it would be safer to know which workers are active. Guess I only need to provide the hostname, right? Thanks, Du On Wednesday, March 4, 2015 10:08 PM, Shao, Saisai saisai.s...@intel.commailto:saisai.s...@intel.com wrote: Hi Du, You could try to sleep for several seconds after creating streaming context to let all the executors registered, then all the receivers can distribute to the nodes more evenly. Also setting locality is another way as you mentioned. Thanks Jerry From: Du Li [mailto:l...@yahoo-inc.com.INVALID] Sent: Thursday, March 5, 2015 1:50 PM To: User Subject: Re: distribution of receivers in spark streaming Figured it out: I need to override method preferredLocation() in MyReceiver class. On Wednesday, March 4, 2015 3:35 PM, Du Li l...@yahoo-inc.com.INVALIDmailto:l...@yahoo-inc.com.INVALID wrote: Hi, I have a set of machines (say 5) and want to evenly launch a number (say 8) of kafka receivers on those machines. In my code I did something like the following, as suggested in the spark docs: val streams = (1 to numReceivers).map(_ = ssc.receiverStream(new MyKafkaReceiver())) ssc.union(streams) However, from the spark UI, I saw that some machines are not running any instance of the receiver while some get three. The mapping changed every time the system was restarted. This impacts the receiving and also the processing speeds. I wonder if it's possible to control/suggest the distribution so that it would be more even. How is the decision made in spark? Thanks, Du
RE: Having lots of FetchFailedException in join
Yes, if one key has too many values, there still has a chance to meet the OOM. Thanks Jerry From: Jianshi Huang [mailto:jianshi.hu...@gmail.com] Sent: Thursday, March 5, 2015 3:49 PM To: Shao, Saisai Cc: Cheng, Hao; user Subject: Re: Having lots of FetchFailedException in join I see. I'm using core's join. The data might have some skewness (checking). I understand shuffle can spill data to disk but when consuming it, say in cogroup or groupByKey, it still needs to read the whole group elements, right? I guess OOM happened there when reading very large groups. Jianshi On Thu, Mar 5, 2015 at 3:38 PM, Shao, Saisai saisai.s...@intel.commailto:saisai.s...@intel.com wrote: I think what you could do is to monitor through web UI to see if there’s any skew or other symptoms in shuffle write and read. For GC you could use the below configuration as you mentioned. From Spark core side, all the shuffle related operations can spill the data into disk and no need to read the whole partition into memory. But if you uses SparkSQL, it depends on how SparkSQL uses this operators. CC @hao if he has some thoughts on it. Thanks Jerry From: Jianshi Huang [mailto:jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com] Sent: Thursday, March 5, 2015 3:28 PM To: Shao, Saisai Cc: user Subject: Re: Having lots of FetchFailedException in join Hi Saisai, What's your suggested settings on monitoring shuffle? I've enabled -XX:+PrintGCDetails -XX:+PrintGCTimeStamps for GC logging. I found SPARK-3461 (Support external groupByKey using repartitionAndSortWithinPartitions) want to make groupByKey using external storage. It's still open status. Does that mean now groupByKey/cogroup/join(implemented as cogroup + flatmap) will still read the group as a whole during consuming? How can I deal with the key skewness in joins? Is there a skew-join implementation? Jianshi On Thu, Mar 5, 2015 at 2:44 PM, Shao, Saisai saisai.s...@intel.commailto:saisai.s...@intel.com wrote: Hi Jianshi, From my understanding, it may not be the problem of NIO or Netty, looking at your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap), theoretically EAOM can spill the data into disk if memory is not enough, but there might some issues when join key is skewed or key number is smaller, so you will meet OOM. Maybe you could monitor each stage or task’s shuffle and GC status also system status to identify the problem. Thanks Jerry From: Jianshi Huang [mailto:jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com] Sent: Thursday, March 5, 2015 2:32 PM To: Aaron Davidson Cc: user Subject: Re: Having lots of FetchFailedException in join One really interesting is that when I'm using the netty-based spark.shuffle.blockTransferService, there's no OOM error messages (java.lang.OutOfMemoryError: Java heap space). Any idea why it's not here? I'm using Spark 1.2.1. Jianshi On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com wrote: I changed spark.shuffle.blockTransferService to nio and now I'm getting OOM errors, I'm doing a big join operation. 15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0 (TID 6207) java.lang.OutOfMemoryError: Java heap space at org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142) at org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178) at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122) at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121) at org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138) at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at
Re: scala.Double vs java.lang.Double in RDD
Hi, On Thu, Mar 5, 2015 at 12:20 AM, Imran Rashid iras...@cloudera.com wrote: This doesn't involve spark at all, I think this is entirely an issue with how scala deals w/ primitives and boxing. Often it can hide the details for you, but IMO it just leads to far more confusing errors when things don't work out. The issue here is that your map has value type Any, which leads scala to leave it as a boxed java.lang.Double. I see, thank you very much for your explanation and the code examples! Helps very much! Thanks Tobias
In the HA master mode, how to identify the alive master?
Hi, In our project, we use stand alone duo master + zookeeper to make the HA of spark master. Now the problem is, how do we know which master is the current alive master? We tried to read the info that the master stored in zookeeper. But we found there is no information to identify the current alive master. Any suggestions for us? Thanks
Re: TreeNodeException: Unresolved attributes
Which spark version did you use? I tried spark-1.2.1 and didn’t meet this problem. scala val m = hiveContext.sql( select * from testtable where value like '%Restaurant%') 15/03/05 02:02:30 INFO ParseDriver: Parsing command: select * from testtable where value like '%Restaurant%' 15/03/05 02:02:30 INFO ParseDriver: Parse Completed 15/03/05 02:02:30 INFO MemoryStore: ensureFreeSpace(462299) called with curMem=1087888, maxMem=280248975 15/03/05 02:02:30 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 451.5 KB, free 265.8 MB) 15/03/05 02:02:30 INFO MemoryStore: ensureFreeSpace(81645) called with curMem=1550187, maxMem=280248975 15/03/05 02:02:30 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 79.7 KB, free 265.7 MB) 15/03/05 02:02:30 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on c6402.ambari.apache.orghttp://c6402.ambari.apache.org:33696 (size: 79.7 KB, free: 267.0 MB) 15/03/05 02:02:30 INFO BlockManagerMaster: Updated info of block broadcast_2_piece0 15/03/05 02:02:30 INFO DefaultExecutionContext: Created broadcast 2 from broadcast at TableReader.scala:68 m: org.apache.spark.sql.SchemaRDD = SchemaRDD[3] at RDD at SchemaRDD.scala:108 == Query Plan == == Physical Plan == Filter Contains(value#5, Restaurant) HiveTableScan [key#4,value#5], (MetastoreRelation default, testtable, None), None scala Thanks. Zhan Zhang On Mar 4, 2015, at 9:09 AM, Anusha Shamanur anushas...@gmail.commailto:anushas...@gmail.com wrote: I tried. I still get the same error. 15/03/04 09:01:50 INFO parse.ParseDriver: Parsing command: select * from TableName where value like '%Restaurant%' 15/03/04 09:01:50 INFO parse.ParseDriver: Parse Completed. 15/03/04 09:01:50 INFO metastore.HiveMetaStore: 0: get_table : db=default tbl=TableName 15/03/04 09:01:50 INFO HiveMetaStore.audit: ugi=as7339 ip=unknown-ip-addr cmd=get_table : db=default tbl=TableName results: org.apache.spark.sql.SchemaRDD = SchemaRDD[86] at RDD at SchemaRDD.scala:108 == Query Plan == == Physical Plan == org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: *, tree: 'Project [*] 'Filter ('value LIKE Restaurant) MetastoreRelation default, TableName, None On Wed, Mar 4, 2015 at 5:39 AM, Arush Kharbanda ar...@sigmoidanalytics.commailto:ar...@sigmoidanalytics.com wrote: Why don't you formulate a string before you pass it to the hql function (appending strings), and hql function is deprecated. You should use sql. http://spark.apache.org/docs/1.1.0/api/scala/index.html#org.apache.spark.sql.hive.HiveContext On Wed, Mar 4, 2015 at 6:15 AM, Anusha Shamanur anushas...@gmail.commailto:anushas...@gmail.com wrote: Hi, I am trying to run a simple select query on a table. val restaurants=hiveCtx.hql(select * from TableName where column like '%SomeString%' ) This gives an error as below: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: *, tree: How do I solve this? -- Regards, Anusha -- [Sigmoid Analytics]http://htmlsig.com/www.sigmoidanalytics.com Arush Kharbanda || Technical Teamlead ar...@sigmoidanalytics.commailto:ar...@sigmoidanalytics.com || www.sigmoidanalytics.comhttp://www.sigmoidanalytics.com/ -- Regards, Anusha
Re: Where can I find more information about the R interface forSpark?
Please follow SPARK-5654 On Wed, Mar 4, 2015 at 7:22 PM, Haopu Wang hw...@qilinsoft.com wrote: Thanks, it's an active project. Will it be released with Spark 1.3.0? -- *From:* 鹰 [mailto:980548...@qq.com] *Sent:* Thursday, March 05, 2015 11:19 AM *To:* Haopu Wang; user *Subject:* Re: Where can I find more information about the R interface forSpark? you can search SparkR on google or search it on github
RE: Passing around SparkContext with in the Driver
Replace val sqlContext = new SQLContext(sparkContext) with @transient val sqlContext = new SQLContext(sparkContext) -Original Message- From: kpeng1 [mailto:kpe...@gmail.com] Sent: 04 March 2015 23:39 To: user@spark.apache.org Subject: Passing around SparkContext with in the Driver Hi All, I am trying to create a class that wraps functionalities that I need; some of these functions require access to the SparkContext, which I would like to pass in. I know that the SparkContext is not seralizable, and I am not planning on passing it to worker nodes or anything, I just want to wrap some functionalities that require SparkContext's api. As a preface, I am basically using the spark shell to test the functionality of my code at the moment, so I am not sure if that plays into any of the issues I am having. Here is my current class: class MyClass(sparkContext: SparkContext) { import org.apache.spark.sql._ import org.apache.spark.rdd._ val sqlContext = new SQLContext(sparkContext) val DATA_TYPE_MAPPING = Map( int - IntegerType, double - DoubleType, float - FloatType, long - LongType, short - ShortType, binary - BinaryType, bool - BooleanType, byte - ByteType, string - StringType) //removes the first line of a text file def removeHeader(partitionIdx: Int, fileItr: Iterator[String]): Iterator[String] ={ //header line is first line in first partition if(partitionIdx == 0){ fileItr.drop(1) } fileItr } //returns back a StructType for the schema def getSchema(rawSchema: Array[String]): StructType ={ //return backs a StructField def getSchemaFieldHelper(schemaField: String): StructField ={ val schemaParts = schemaField.split(' ') StructField(schemaParts(0), DATA_TYPE_MAPPING(schemaParts(1)), true) } val structFields = rawSchema.map(column = getSchemaFieldHelper(column)) StructType(structFields) } def getRow(strRow: String): Row ={ val spRow = strRow.split(',') val tRow = spRow.map(_.trim) Row(tRow:_*) } def applySchemaToCsv(csvFile: String, includeHeader: Boolean, schemaFile: String): SchemaRDD ={ //apply schema to rdd to create schemaRDD def createSchemaRDD(csvRDD: RDD[Row], schemaStruct: StructType): SchemaRDD ={ val schemaRDD = sqlContext.applySchema(csvRDD, schemaStruct) schemaRDD } val rawSchema = sparkContext.textFile(schemaFile).collect val schema = getSchema(rawSchema) val rawCsvData = sparkContext.textFile(csvFile) //if we want to keep header from csv file if(includeHeader){ val rowRDD = rawCsvData.map(getRow) val schemaRDD = createSchemaRDD(rowRDD, schema) return schemaRDD } val csvData = rawCsvData.mapPartitionsWithIndex(removeHeader) val rowRDD = csvData.map(getRow) val schemaRDD = createSchemaRDD(rowRDD, schema) schemaRDD } } So in the spark shell I am basically creating an instance of this class and calling applySchemaToCsv like so: val test = new MyClass(sc) test.applySchemaToCsv(/tmp/myFile.csv, false, /tmp/schema.txt) What I am getting is not serializable exception: 15/03/04 09:40:56 INFO SparkContext: Created broadcast 2 from textFile at console:62 org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1435) at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:615) . . . Caused by: java.io.NotSerializableException: If I remove the class wrapper and make references to sc directly everything works. I am basically wondering what is causing the serialization issues and if I can wrap a class around these functions. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Passing-around-SparkContext-with-in-the-Driver-tp21913.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Unable to Read/Write Avro RDD on cluster.
I am trying to read RDD avro, transform and write. I am able to run it locally fine but when i run onto cluster, i see issues with Avro. export SPARK_HOME=/home/dvasthimal/spark/spark-1.0.2-bin-2.4.1 export SPARK_YARN_USER_ENV=CLASSPATH=/apache/hadoop/conf export HADOOP_CONF_DIR=/apache/hadoop/conf export YARN_CONF_DIR=/apache/hadoop/conf export SPARK_JAR=$SPARK_HOME/lib/spark-assembly-1.0.2-hadoop2.4.1.jar export SPARK_LIBRARY_PATH=/apache/hadoop/lib/native export SPARK_YARN_USER_ENV=CLASSPATH=/apache/hadoop/conf export SPARK_YARN_USER_ENV=CLASSPATH=/apache/hadoop/conf export SPARK_CLASSPATH=/apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-company-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/home/dvasthimal/spark/avro-mapred-1.7.7-hadoop2.jar:/home/dvasthimal/spark/avro-1.7.7.jar export SPARK_LIBRARY_PATH=/apache/hadoop/lib/native export YARN_CONF_DIR=/apache/hadoop/conf/ cd $SPARK_HOME ./bin/spark-submit --master yarn-cluster --jars /home/dvasthimal/spark/avro-mapred-1.7.7-hadoop2.jar,/home/dvasthimal/spark/avro-1.7.7.jar --num-executors 3 --driver-memory 4g --executor-memory 2g --executor-cores 1 --queue hdmi-spark --class com.company.ep.poc.spark.reporting.SparkApp /home/dvasthimal/spark/spark_reporting-1.0-SNAPSHOT.jar startDate=2015-02-16 endDate=2015-02-16 epoutputdirectory=/user/dvasthimal/epdatasets_small/exptsession subcommand=successevents outputdir=/user/dvasthimal/epdatasets/successdetail Spark assembly has been built with Hive, including Datanucleus jars on classpath 15/03/04 03:20:29 INFO client.ConfiguredRMFailoverProxyProvider: Failing over to rm2 15/03/04 03:20:30 INFO yarn.Client: Got Cluster metric info from ApplicationsManager (ASM), number of NodeManagers: 2221 15/03/04 03:20:30 INFO yarn.Client: Queue info ... queueName: hdmi-spark, queueCurrentCapacity: 0.7162806, queueMaxCapacity: 0.08, queueApplicationCount = 7, queueChildQueueCount = 0 15/03/04 03:20:30 INFO yarn.Client: Max mem capabililty of a single resource in this cluster 16384 15/03/04 03:20:30 INFO yarn.Client: Preparing Local resources 15/03/04 03:20:30 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/03/04 03:20:30 WARN hdfs.BlockReaderLocal: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. 15/03/04 03:20:46 INFO hdfs.DFSClient: Created HDFS_DELEGATION_TOKEN token 7780745 for dvasthimal on 10.115.206.112:8020 15/03/04 03:20:46 INFO yarn.Client: Uploading file:/home/dvasthimal/spark/spark_reporting-1.0-SNAPSHOT.jar to hdfs:// apollo-phx-nn.company.com:8020/user/dvasthimal/.sparkStaging/application_1425075571333_61948/spark_reporting-1.0-SNAPSHOT.jar 15/03/04 03:20:47 INFO yarn.Client: Uploading file:/home/dvasthimal/spark/spark-1.0.2-bin-2.4.1/lib/spark-assembly-1.0.2-hadoop2.4.1.jar to hdfs:// apollo-phx-nn.company.com:8020/user/dvasthimal/.sparkStaging/application_1425075571333_61948/spark-assembly-1.0.2-hadoop2.4.1.jar 15/03/04 03:20:52 INFO yarn.Client: Uploading file:/home/dvasthimal/spark/avro-mapred-1.7.7-hadoop2.jar to hdfs:// apollo-phx-nn.company.com:8020/user/dvasthimal/.sparkStaging/application_1425075571333_61948/avro-mapred-1.7.7-hadoop2.jar 15/03/04 03:20:52 INFO yarn.Client: Uploading file:/home/dvasthimal/spark/avro-1.7.7.jar to hdfs:// apollo-phx-nn.company.com:8020/user/dvasthimal/.sparkStaging/application_1425075571333_61948/avro-1.7.7.jar 15/03/04 03:20:54 INFO yarn.Client: Setting up the launch environment 15/03/04 03:20:54 INFO yarn.Client: Setting up container launch context 15/03/04 03:20:54 INFO yarn.Client: Command for starting the Spark ApplicationMaster: List($JAVA_HOME/bin/java, -server, -Xmx4096m, -Djava.io.tmpdir=$PWD/tmp, -Dspark.app.name=\com.company.ep.poc.spark.reporting.SparkApp\, -Dlog4j.configuration=log4j-spark-container.properties, org.apache.spark.deploy.yarn.ApplicationMaster, --class, com.company.ep.poc.spark.reporting.SparkApp, --jar , file:/home/dvasthimal/spark/spark_reporting-1.0-SNAPSHOT.jar, --args 'startDate=2015-02-16' --args 'endDate=2015-02-16' --args 'epoutputdirectory=/user/dvasthimal/epdatasets_small/exptsession' --args 'subcommand=successevents' --args 'outputdir=/user/dvasthimal/epdatasets/successdetail' , --executor-memory, 2048, --executor-cores, 1, --num-executors , 3, 1, LOG_DIR/stdout, 2, LOG_DIR/stderr) 15/03/04 03:20:54 INFO yarn.Client: Submitting application to ASM 15/03/04 03:20:54 INFO impl.YarnClientImpl: Submitted application application_1425075571333_61948 15/03/04 03:20:56 INFO yarn.Client: Application report from ASM: application identifier: application_1425075571333_61948 appId: 61948 clientToAMToken: null appDiagnostics: appMasterHost: N/A appQueue: hdmi-spark appMasterRpcPort: -1 appStartTime: 1425464454263 yarnAppState: ACCEPTED distributedFinalState: UNDEFINED appTrackingUrl: https://apollo-phx-rm-2.company.com:50030/proxy/application_1425075571333_61948/ appUser: dvasthimal
Re: distribution of receivers in spark streaming
Hi Jerry, Thanks for your response. Is there a way to get the list of currently registered/live workers? Even in order to provide preferredLocation, it would be safer to know which workers are active. Guess I only need to provide the hostname, right? Thanks,Du On Wednesday, March 4, 2015 10:08 PM, Shao, Saisai saisai.s...@intel.com wrote: #yiv8205255497 #yiv8205255497 -- _filtered #yiv8205255497 {font-family:Helvetica;panose-1:2 11 6 4 2 2 2 2 2 4;} _filtered #yiv8205255497 {font-family:SimSun;panose-1:2 1 6 0 3 1 1 1 1 1;} _filtered #yiv8205255497 {panose-1:2 4 5 3 5 4 6 3 2 4;} _filtered #yiv8205255497 {font-family:Calibri;panose-1:2 15 5 2 2 2 4 3 2 4;} _filtered #yiv8205255497 {panose-1:2 1 6 0 3 1 1 1 1 1;}#yiv8205255497 #yiv8205255497 p.yiv8205255497MsoNormal, #yiv8205255497 li.yiv8205255497MsoNormal, #yiv8205255497 div.yiv8205255497MsoNormal {margin:0in;margin-bottom:.0001pt;font-size:12.0pt;}#yiv8205255497 a:link, #yiv8205255497 span.yiv8205255497MsoHyperlink {color:#0563C1;text-decoration:underline;}#yiv8205255497 a:visited, #yiv8205255497 span.yiv8205255497MsoHyperlinkFollowed {color:#954F72;text-decoration:underline;}#yiv8205255497 span.yiv8205255497EmailStyle17 {color:#1F497D;}#yiv8205255497 .yiv8205255497MsoChpDefault {font-size:10.0pt;} _filtered #yiv8205255497 {margin:1.0in 1.0in 1.0in 1.0in;}#yiv8205255497 div.yiv8205255497WordSection1 {}#yiv8205255497 Hi Du, You could try to sleep for several seconds after creating streaming context to let all the executors registered, then all the receivers can distribute to the nodes more evenly. Also setting locality is another way as you mentioned. Thanks Jerry From: Du Li [mailto:l...@yahoo-inc.com.INVALID] Sent: Thursday, March 5, 2015 1:50 PM To: User Subject: Re: distribution of receivers in spark streaming Figured it out: I need to override method preferredLocation() in MyReceiver class. On Wednesday, March 4, 2015 3:35 PM, Du Li l...@yahoo-inc.com.INVALID wrote: Hi, I have a set of machines (say 5) and want to evenly launch a number (say 8) of kafka receivers on those machines. In my code I did something like the following, as suggested in the spark docs: val streams = (1 to numReceivers).map(_ = ssc.receiverStream(new MyKafkaReceiver())) ssc.union(streams) However, from the spark UI, I saw that some machines are not running any instance of the receiver while some get three. The mapping changed every time the system was restarted. This impacts the receiving and also the processing speeds. I wonder if it's possible to control/suggest the distribution so that it would be more even. How is the decision made in spark? Thanks, Du
RE: Having lots of FetchFailedException in join
Hi Jianshi, From my understanding, it may not be the problem of NIO or Netty, looking at your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap), theoretically EAOM can spill the data into disk if memory is not enough, but there might some issues when join key is skewed or key number is smaller, so you will meet OOM. Maybe you could monitor each stage or task’s shuffle and GC status also system status to identify the problem. Thanks Jerry From: Jianshi Huang [mailto:jianshi.hu...@gmail.com] Sent: Thursday, March 5, 2015 2:32 PM To: Aaron Davidson Cc: user Subject: Re: Having lots of FetchFailedException in join One really interesting is that when I'm using the netty-based spark.shuffle.blockTransferService, there's no OOM error messages (java.lang.OutOfMemoryError: Java heap space). Any idea why it's not here? I'm using Spark 1.2.1. Jianshi On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com wrote: I changed spark.shuffle.blockTransferService to nio and now I'm getting OOM errors, I'm doing a big join operation. 15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0 (TID 6207) java.lang.OutOfMemoryError: Java heap space at org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142) at org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178) at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122) at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121) at org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138) at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) Is join/cogroup still memory bound? Jianshi On Wed, Mar 4, 2015 at 2:11 PM, Jianshi Huang jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com wrote: Hmm... ok, previous errors are still block fetch errors. 15/03/03 10:22:40 ERROR RetryingBlockFetcher: Exception while beginning fetch of 11 outstanding blocks java.io.IOException: Failed to connect to host-/:55597 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78) at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87)
Re: Having lots of FetchFailedException in join
Hi Saisai, What's your suggested settings on monitoring shuffle? I've enabled -XX:+PrintGCDetails -XX:+PrintGCTimeStamps for GC logging. I found SPARK-3461 (Support external groupByKey using repartitionAndSortWithinPartitions) want to make groupByKey using external storage. It's still open status. Does that mean now groupByKey/cogroup/join(implemented as cogroup + flatmap) will still read the group as a whole during consuming? How can I deal with the key skewness in joins? Is there a skew-join implementation? Jianshi On Thu, Mar 5, 2015 at 2:44 PM, Shao, Saisai saisai.s...@intel.com wrote: Hi Jianshi, From my understanding, it may not be the problem of NIO or Netty, looking at your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap), theoretically EAOM can spill the data into disk if memory is not enough, but there might some issues when join key is skewed or key number is smaller, so you will meet OOM. Maybe you could monitor each stage or task’s shuffle and GC status also system status to identify the problem. Thanks Jerry *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Thursday, March 5, 2015 2:32 PM *To:* Aaron Davidson *Cc:* user *Subject:* Re: Having lots of FetchFailedException in join One really interesting is that when I'm using the netty-based spark.shuffle.blockTransferService, there's no OOM error messages (java.lang.OutOfMemoryError: Java heap space). Any idea why it's not here? I'm using Spark 1.2.1. Jianshi On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: I changed spark.shuffle.blockTransferService to nio and now I'm getting OOM errors, I'm doing a big join operation. 15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0 (TID 6207) java.lang.OutOfMemoryError: Java heap space at org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142) at org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178) at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122) at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121) at org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138) at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) Is join/cogroup still memory bound? Jianshi On Wed, Mar 4, 2015 at 2:11 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hmm... ok, previous errors are still block fetch errors. 15/03/03 10:22:40 ERROR RetryingBlockFetcher: Exception while beginning fetch of 11 outstanding blocks java.io.IOException: Failed to connect to host-/:55597
RE: Having lots of FetchFailedException in join
I think what you could do is to monitor through web UI to see if there’s any skew or other symptoms in shuffle write and read. For GC you could use the below configuration as you mentioned. From Spark core side, all the shuffle related operations can spill the data into disk and no need to read the whole partition into memory. But if you uses SparkSQL, it depends on how SparkSQL uses this operators. CC @hao if he has some thoughts on it. Thanks Jerry From: Jianshi Huang [mailto:jianshi.hu...@gmail.com] Sent: Thursday, March 5, 2015 3:28 PM To: Shao, Saisai Cc: user Subject: Re: Having lots of FetchFailedException in join Hi Saisai, What's your suggested settings on monitoring shuffle? I've enabled -XX:+PrintGCDetails -XX:+PrintGCTimeStamps for GC logging. I found SPARK-3461 (Support external groupByKey using repartitionAndSortWithinPartitions) want to make groupByKey using external storage. It's still open status. Does that mean now groupByKey/cogroup/join(implemented as cogroup + flatmap) will still read the group as a whole during consuming? How can I deal with the key skewness in joins? Is there a skew-join implementation? Jianshi On Thu, Mar 5, 2015 at 2:44 PM, Shao, Saisai saisai.s...@intel.commailto:saisai.s...@intel.com wrote: Hi Jianshi, From my understanding, it may not be the problem of NIO or Netty, looking at your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap), theoretically EAOM can spill the data into disk if memory is not enough, but there might some issues when join key is skewed or key number is smaller, so you will meet OOM. Maybe you could monitor each stage or task’s shuffle and GC status also system status to identify the problem. Thanks Jerry From: Jianshi Huang [mailto:jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com] Sent: Thursday, March 5, 2015 2:32 PM To: Aaron Davidson Cc: user Subject: Re: Having lots of FetchFailedException in join One really interesting is that when I'm using the netty-based spark.shuffle.blockTransferService, there's no OOM error messages (java.lang.OutOfMemoryError: Java heap space). Any idea why it's not here? I'm using Spark 1.2.1. Jianshi On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com wrote: I changed spark.shuffle.blockTransferService to nio and now I'm getting OOM errors, I'm doing a big join operation. 15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0 (TID 6207) java.lang.OutOfMemoryError: Java heap space at org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142) at org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178) at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122) at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121) at org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138) at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at
Re: Having lots of FetchFailedException in join
I see. I'm using core's join. The data might have some skewness (checking). I understand shuffle can spill data to disk but when consuming it, say in cogroup or groupByKey, it still needs to read the whole group elements, right? I guess OOM happened there when reading very large groups. Jianshi On Thu, Mar 5, 2015 at 3:38 PM, Shao, Saisai saisai.s...@intel.com wrote: I think what you could do is to monitor through web UI to see if there’s any skew or other symptoms in shuffle write and read. For GC you could use the below configuration as you mentioned. From Spark core side, all the shuffle related operations can spill the data into disk and no need to read the whole partition into memory. But if you uses SparkSQL, it depends on how SparkSQL uses this operators. CC @hao if he has some thoughts on it. Thanks Jerry *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Thursday, March 5, 2015 3:28 PM *To:* Shao, Saisai *Cc:* user *Subject:* Re: Having lots of FetchFailedException in join Hi Saisai, What's your suggested settings on monitoring shuffle? I've enabled -XX:+PrintGCDetails -XX:+PrintGCTimeStamps for GC logging. I found SPARK-3461 (Support external groupByKey using repartitionAndSortWithinPartitions) want to make groupByKey using external storage. It's still open status. Does that mean now groupByKey/cogroup/join(implemented as cogroup + flatmap) will still read the group as a whole during consuming? How can I deal with the key skewness in joins? Is there a skew-join implementation? Jianshi On Thu, Mar 5, 2015 at 2:44 PM, Shao, Saisai saisai.s...@intel.com wrote: Hi Jianshi, From my understanding, it may not be the problem of NIO or Netty, looking at your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap), theoretically EAOM can spill the data into disk if memory is not enough, but there might some issues when join key is skewed or key number is smaller, so you will meet OOM. Maybe you could monitor each stage or task’s shuffle and GC status also system status to identify the problem. Thanks Jerry *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Thursday, March 5, 2015 2:32 PM *To:* Aaron Davidson *Cc:* user *Subject:* Re: Having lots of FetchFailedException in join One really interesting is that when I'm using the netty-based spark.shuffle.blockTransferService, there's no OOM error messages (java.lang.OutOfMemoryError: Java heap space). Any idea why it's not here? I'm using Spark 1.2.1. Jianshi On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: I changed spark.shuffle.blockTransferService to nio and now I'm getting OOM errors, I'm doing a big join operation. 15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0 (TID 6207) java.lang.OutOfMemoryError: Java heap space at org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142) at org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178) at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122) at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121) at org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138) at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31) at
using log4j2 with spark
Hi, Trying to run spark 1.2.1 w/ hadoop 1.0.4 on cluster and configure it to run with log4j2. Problem is that spark-assembly.jar contains log4j and slf4j classes compatible with log4j 1.2 in it, and so it detects it should use log4j 1.2 ( https://github.com/apache/spark/blob/54e7b456dd56c9e52132154e699abca87563465b/core/src/main/scala/org/apache/spark/Logging.scala on line 121). Is there a maven profile for building spark-assembly w/out the log4j dependencies, or any other way I can force spark to use log4j2? Thanks! Lior
How to parse Json formatted Kafka message in spark streaming
Friends, I'm trying to parse json formatted Kafka messages and then send back to cassandra.I have two problems: 1. I got the exception below. How to check an empty RDD? Exception in thread main java.lang.UnsupportedOperationException: empty collection at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869) at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.reduce(RDD.scala:869) at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57) at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232) at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204) val messages = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](…) messages.foreachRDD { rdd = val message:RDD[String] = rdd.map { y = y._2 } sqlContext.jsonRDD(message).registerTempTable(tempTable) sqlContext.sql(SELECT time,To FROM tempTable) .saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns(key, msg)) } 2. how to get all column names from json messages? I have hundreds of columns in the json formatted message. Thanks for your help! Best regards, Cui Lin
Re: spark master shut down suddenly
Generally the location of logs in /var/log/mesos but the definitive configuration can be found via the /etc/mesos-master/... configuration files. There should be a configuration file labeled log_dir. ps -ax | grep mesos should also show the output of the configuration if it is configured. Another location to review would potentially be /etc/default/mesos-master On Wed, Mar 4, 2015 at 9:31 PM, Denny Lee denny.g@gmail.com wrote: It depends on your setup but one of the locations is /var/log/mesos On Wed, Mar 4, 2015 at 19:11 lisendong lisend...@163.com wrote: I ‘m sorry, but how to look at the mesos logs? where are them? 在 2015年3月4日,下午6:06,Akhil Das ak...@sigmoidanalytics.com 写道: You can check in the mesos logs and see whats really happening. Thanks Best Regards On Wed, Mar 4, 2015 at 3:10 PM, lisendong lisend...@163.com wrote: 15/03/04 09:26:36 INFO ClientCnxn: Client session timed out, have not heard from server in 26679ms for sessionid 0x34bbf3313a8001b, closing socket connection and attempting reconnect 15/03/04 09:26:36 INFO ConnectionStateManager: State change: SUSPENDED 15/03/04 09:26:36 INFO ZooKeeperLeaderElectionAgent: We have lost leadership 15/03/04 09:26:36 ERROR Master: Leadership has been revoked -- master shutting down. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-master-shut-down-suddenly-tp21907.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Having lots of FetchFailedException in join
I changed spark.shuffle.blockTransferService to nio and now I'm getting OOM errors, I'm doing a big join operation. 15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0 (TID 6207) java.lang.OutOfMemoryError: Java heap space at org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142) at org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178) at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122) at org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121) at org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138) at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) Is join/cogroup still memory bound? Jianshi On Wed, Mar 4, 2015 at 2:11 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hmm... ok, previous errors are still block fetch errors. 15/03/03 10:22:40 ERROR RetryingBlockFetcher: Exception while beginning fetch of 11 outstanding blocks java.io.IOException: Failed to connect to host-/:55597 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78) at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87) at org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:149) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:289) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:53) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160) at
Re: Parallel execution of JavaDStream/JavaPairDStream
14/06/19 15:03:36 WARN LoadSnappy: Snappy native library not loaded The problem is Snappy library is not loaded in the workers. This is because you would have written the system.loadlibrary outside map function which is not shipped to the workers. Regards Jishnu Prathap -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Parallel-execution-of-JavaDStream-JavaPairDStream-tp7961p21904.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Why can't Spark Streaming recover from the checkpoint directory when using a third party library for processingmulti-line JSON?
I've also tried the following: Configuration hadoopConfiguration = new Configuration(); hadoopConfiguration.set(multilinejsoninputformat.member, itemSet); JavaStreamingContext ssc = JavaStreamingContext.getOrCreate(checkpointDirectory, hadoopConfiguration, factory, false); but I still get the same exception. Why doesn't getOrCreate ignore that Hadoop configuration part (which normally works, e.g. when not recovering)? -- Emre On Tue, Mar 3, 2015 at 3:36 PM, Emre Sevinc emre.sev...@gmail.com wrote: Hello, I have a Spark Streaming application (that uses Spark 1.2.1) that listens to an input directory, and when new JSON files are copied to that directory processes them, and writes them to an output directory. It uses a 3rd party library to process the multi-line JSON files ( https://github.com/alexholmes/json-mapreduce). You can see the relevant part of the streaming application at: https://gist.github.com/emres/ec18ee264e4eb0dd8f1a When I run this application locally, it works perfectly fine. But then I wanted to test whether it could recover from failure, e.g. if I stopped it right in the middle of processing some files. I started the streaming application, copied 100 files to the input directory, and hit Ctrl+C when it has alread processed about 50 files: ... 2015-03-03 15:06:20 INFO FileInputFormat:280 - Total input paths to process : 1 2015-03-03 15:06:20 INFO FileInputFormat:280 - Total input paths to process : 1 2015-03-03 15:06:20 INFO FileInputFormat:280 - Total input paths to process : 1 [Stage 0:== (65 + 4) / 100] ^C Then I started the application again, expecting that it could recover from the checkpoint. For a while it started to read files again and then gave an exception: ... 2015-03-03 15:06:39 INFO FileInputFormat:280 - Total input paths to process : 1 2015-03-03 15:06:39 INFO FileInputFormat:280 - Total input paths to process : 1 2015-03-03 15:06:39 WARN SchemaValidatorDriver:145 - * * * hadoopConfiguration: itemSet 2015-03-03 15:06:40 ERROR Executor:96 - Exception in task 0.0 in stage 0.0 (TID 0) java.io.IOException: Missing configuration value for multilinejsoninputformat.member at com.alexholmes.json.mapreduce.MultiLineJsonInputFormat.createRecordReader(MultiLineJsonInputFormat.java:30) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.init(NewHadoopRDD.scala:133) at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:107) at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) at org.apache.spark.rdd.RDD.iterator(RDD.scala:245) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) at org.apache.spark.rdd.RDD.iterator(RDD.scala:245) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Since in the exception it refers to a missing configuration multilinejsoninputformat.member, I think it is about the following line: ssc.ssc().sc().hadoopConfiguration().set( multilinejsoninputformat.member, itemSet); And this is why I also log the value of it, and as you can see above, just before it gives the exception in the recovery process, it shows that multilinejsoninputformat.member is set to itemSet. But somehow it is not found during the recovery. This exception happens only when it tries to recover from a previously interrupted run. I've also tried moving the above line into the createContext method, but still had the same exception. Why is that? And how can I work around it? -- Emre Sevinç http://www.bigindustries.be/ -- Emre Sevinc
Re: spark.local.dir leads to Job cancelled because SparkContext was shut down
When you say multiple directories, make sure those directories are available and spark have permission to write to those directories. You can look at the worker logs to see the exact reason of failure. Thanks Best Regards On Tue, Mar 3, 2015 at 6:45 PM, lisendong lisend...@163.com wrote: As long as I set the spark.local.dir to multiple disks, the job will failed, the errors are as follow: (if I set the spark.local.dir to only 1 dir, the job will succed...) Exception in thread main org.apache.spark.SparkException: Job cancelled because SparkContext was shut down at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:639) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:638) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:638) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1215) at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:201) at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163) at akka.actor.ActorCell.terminate(ActorCell.scala:338) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:431) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:240) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-local-dir-leads-to-Job-cancelled-because-SparkContext-was-shut-down-tp21894.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Running Spark jobs via oozie
We have gotten it to work... --- Original Message --- From: nitinkak001 nitinkak...@gmail.com Sent: March 3, 2015 7:46 AM To: user@spark.apache.org Subject: Re: Running Spark jobs via oozie I am also starting to work on this one. Did you get any solution to this issue? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-Spark-jobs-via-oozie-tp5187p21896.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Connecting a PHP/Java applications to Spark SQL Thrift Server
Thanks very much, I used it and works fine with me. On 4 March 2015 at 11:56, Arush Kharbanda ar...@sigmoidanalytics.com wrote: For java You can use hive-jdbc connectivity jars to connect to Spark-SQL. The driver is inside the hive-jdbc Jar. *http://hive.apache.org/javadocs/r0.11.0/api/org/apache/hadoop/hive/jdbc/HiveDriver.html http://hive.apache.org/javadocs/r0.11.0/api/org/apache/hadoop/hive/jdbc/HiveDriver.html* On Wed, Mar 4, 2015 at 1:26 PM, n...@reactor8.com wrote: SparkSQL supports JDBC/ODBC connectivity, so if that's the route you needed/wanted to connect through you could do so via java/php apps. Havent used either so cant speak to the developer experience, assume its pretty good as would be preferred method for lots of third party enterprise apps/tooling If you prefer using the thrift server/interface, if they don't exist already in open source land you can use thrift definitions to generate client libs in any supported thrift language and use that for connectivity. Seems one issue with thrift-server is when running in cluster mode. Seems like it still exists but UX of error has been cleaned up in 1.3: https://issues.apache.org/jira/browse/SPARK-5176 -Original Message- From: fanooos [mailto:dev.fano...@gmail.com] Sent: Tuesday, March 3, 2015 11:15 PM To: user@spark.apache.org Subject: Connecting a PHP/Java applications to Spark SQL Thrift Server We have installed hadoop cluster with hive and spark and the spark sql thrift server is up and running without any problem. Now we have set of applications need to use spark sql thrift server to query some data. Some of these applications are java applications and the others are PHP applications. As I am an old fashioned java developer, I used to connect java applications to BD servers like Mysql using a JDBC driver. Is there a corresponding driver for connecting with Spark Sql Thrift server ? Or what is the library I need to use to connect to it? For PHP, what are the ways we can use to connect PHP applications to Spark Sql Thrift Server? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Connecting-a-PHP-Java-ap plications-to-Spark-SQL-Thrift-Server-tp21902.html http://apache-spark-user-list.1001560.n3.nabble.com/Connecting-a-PHP-Java-applications-to-Spark-SQL-Thrift-Server-tp21902.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com -- Anas Rabei Senior Software Developer Mubasher.info anas.ra...@mubasher.info
spark master shut down suddenly
15/03/04 09:26:36 INFO ClientCnxn: Client session timed out, have not heard from server in 26679ms for sessionid 0x34bbf3313a8001b, closing socket connection and attempting reconnect 15/03/04 09:26:36 INFO ConnectionStateManager: State change: SUSPENDED 15/03/04 09:26:36 INFO ZooKeeperLeaderElectionAgent: We have lost leadership 15/03/04 09:26:36 ERROR Master: Leadership has been revoked -- master shutting down. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-master-shut-down-suddenly-tp21907.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark master shut down suddenly
You can check in the mesos logs and see whats really happening. Thanks Best Regards On Wed, Mar 4, 2015 at 3:10 PM, lisendong lisend...@163.com wrote: 15/03/04 09:26:36 INFO ClientCnxn: Client session timed out, have not heard from server in 26679ms for sessionid 0x34bbf3313a8001b, closing socket connection and attempting reconnect 15/03/04 09:26:36 INFO ConnectionStateManager: State change: SUSPENDED 15/03/04 09:26:36 INFO ZooKeeperLeaderElectionAgent: We have lost leadership 15/03/04 09:26:36 ERROR Master: Leadership has been revoked -- master shutting down. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-master-shut-down-suddenly-tp21907.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Why can't Spark Streaming recover from the checkpoint directory when using a third party library for processingmulti-line JSON?
That could be a corner case bug. How do you add the 3rd party library to the class path of the driver? Through spark-submit? Could you give the command you used? TD On Wed, Mar 4, 2015 at 12:42 AM, Emre Sevinc emre.sev...@gmail.com wrote: I've also tried the following: Configuration hadoopConfiguration = new Configuration(); hadoopConfiguration.set(multilinejsoninputformat.member, itemSet); JavaStreamingContext ssc = JavaStreamingContext.getOrCreate(checkpointDirectory, hadoopConfiguration, factory, false); but I still get the same exception. Why doesn't getOrCreate ignore that Hadoop configuration part (which normally works, e.g. when not recovering)? -- Emre On Tue, Mar 3, 2015 at 3:36 PM, Emre Sevinc emre.sev...@gmail.com wrote: Hello, I have a Spark Streaming application (that uses Spark 1.2.1) that listens to an input directory, and when new JSON files are copied to that directory processes them, and writes them to an output directory. It uses a 3rd party library to process the multi-line JSON files ( https://github.com/alexholmes/json-mapreduce). You can see the relevant part of the streaming application at: https://gist.github.com/emres/ec18ee264e4eb0dd8f1a When I run this application locally, it works perfectly fine. But then I wanted to test whether it could recover from failure, e.g. if I stopped it right in the middle of processing some files. I started the streaming application, copied 100 files to the input directory, and hit Ctrl+C when it has alread processed about 50 files: ... 2015-03-03 15:06:20 INFO FileInputFormat:280 - Total input paths to process : 1 2015-03-03 15:06:20 INFO FileInputFormat:280 - Total input paths to process : 1 2015-03-03 15:06:20 INFO FileInputFormat:280 - Total input paths to process : 1 [Stage 0:== (65 + 4) / 100] ^C Then I started the application again, expecting that it could recover from the checkpoint. For a while it started to read files again and then gave an exception: ... 2015-03-03 15:06:39 INFO FileInputFormat:280 - Total input paths to process : 1 2015-03-03 15:06:39 INFO FileInputFormat:280 - Total input paths to process : 1 2015-03-03 15:06:39 WARN SchemaValidatorDriver:145 - * * * hadoopConfiguration: itemSet 2015-03-03 15:06:40 ERROR Executor:96 - Exception in task 0.0 in stage 0.0 (TID 0) java.io.IOException: Missing configuration value for multilinejsoninputformat.member at com.alexholmes.json.mapreduce.MultiLineJsonInputFormat.createRecordReader(MultiLineJsonInputFormat.java:30) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.init(NewHadoopRDD.scala:133) at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:107) at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) at org.apache.spark.rdd.RDD.iterator(RDD.scala:245) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) at org.apache.spark.rdd.RDD.iterator(RDD.scala:245) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Since in the exception it refers to a missing configuration multilinejsoninputformat.member, I think it is about the following line: ssc.ssc().sc().hadoopConfiguration().set( multilinejsoninputformat.member, itemSet); And this is why I also log the value of it, and as you can see above, just before it gives the exception in the recovery process, it shows that multilinejsoninputformat.member is set to itemSet. But somehow it is not found during the recovery. This exception happens only when it tries to recover from a previously interrupted run. I've also tried moving the above line into the createContext method, but still had the same exception. Why is that? And how can I work around it? -- Emre Sevinç http://www.bigindustries.be/
Re: delay between removing the block manager of an executor, and marking that as lost
You can look at the following - spark.akka.timeout - spark.akka.heartbeat.pauses from http://spark.apache.org/docs/1.2.0/configuration.html Thanks Best Regards On Tue, Mar 3, 2015 at 4:46 PM, twinkle sachdeva twinkle.sachd...@gmail.com wrote: Hi, Is there any relation between removing block manager of an executor and marking that as lost? In my setup,even after removing block manager ( after failing to do some operation )...it is taking more than 20 mins, to mark that as lost executor. Following are the logs: *15/03/03 10:26:49 WARN storage.BlockManagerMaster: Failed to remove broadcast 20 with removeFromMaster = true - Ask timed out on [Actor[akka.tcp://sparkExecutor@TMO-DN73:54363/user/BlockManagerActor1#-966525686]] after [3 ms]}* *15/03/03 10:27:41 WARN storage.BlockManagerMasterActor: Removing BlockManager BlockManagerId(1, TMO-DN73, 4) with no recent heart beats: 76924ms exceeds 45000ms* *15/03/03 10:27:41 INFO storage.BlockManagerMasterActor: Removing block manager BlockManagerId(1, TMO-DN73, 4)* *15/03/03 10:49:10 ERROR cluster.YarnClusterScheduler: Lost executor 1 on TMO-DN73: remote Akka client disassociated* How can i make this to happen faster? Thanks, Twinkle
Re: Is the RDD's Partitions determined before hand ?
Hm, what do you mean? You can control, to some extent, the number of partitions when you read the data, and can repartition if needed. You can set the default parallelism too so that it takes effect for most ops thay create an RDD. One # of partitions is usually about right for all work (2x or so the number of execution slots). If you know a stage needs unusually high parallelism for example you can repartition further for that stage. On Mar 4, 2015 1:50 AM, Jeff Zhang zjf...@gmail.com wrote: Thanks Sean. But if the partitions of RDD is determined before hand, it would not be flexible to run the same program on the different dataset. Although for the first stage the partitions can be determined by the input data set, for the intermediate stage it is not possible. Users have to create policy to repartition or coalesce based on the data set size. On Tue, Mar 3, 2015 at 6:29 PM, Sean Owen so...@cloudera.com wrote: An RDD has a certain fixed number of partitions, yes. You can't change an RDD. You can repartition() or coalese() and RDD to make a new one with a different number of RDDs, possibly requiring a shuffle. On Tue, Mar 3, 2015 at 10:21 AM, Jeff Zhang zjf...@gmail.com wrote: I mean is it possible to change the partition number at runtime. Thanks -- Best Regards Jeff Zhang -- Best Regards Jeff Zhang
Spark Streaming and SchemaRDD usage
Hi, in the roadmap of Spark in 2015 (link: http://files.meetup.com/3138542/Spark%20in%202015%20Talk%20-%20Wendell.p ptx), I saw SchemaRDD is designed to be the basis of BOTH Spark Streaming and Spark SQL. My question is: what's the typical usage of SchemaRDD in a Spark Streaming application? Thank you very much! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Is FileInputDStream returned by fileStream method a reliable receiver?
The file stream does not use receiver. May be that was not clear in the programming guide. I am updating it for 1.3 release right now, I will make it more clear. And file stream has full reliability. Read this in the programming guide. http://spark.apache.org/docs/latest/streaming-programming-guide.html#semantics-with-files-as-input-source On Wed, Mar 4, 2015 at 2:14 AM, Emre Sevinc emre.sev...@gmail.com wrote: Is FileInputDStream returned by fileStream method a reliable receiver? In the Spark Streaming Guide it says: There can be two kinds of data sources based on their *reliability*. Sources (like Kafka and Flume) allow the transferred data to be acknowledged. If the system receiving data from these *reliable* sources acknowledge the received data correctly, it can be ensured that no data gets lost due to any kind of failure. This leads to two kinds of receivers. 1. *Reliable Receiver* - A *reliable receiver* correctly acknowledges a reliable source that the data has been received and stored in Spark with replication. 2. *Unreliable Receiver* - These are receivers for sources that do not support acknowledging. Even for reliable sources, one may implement an unreliable receiver that do not go into the complexity of acknowledging correctly. So I wonder whether the receivers for HDFS (and local file system) are reliable, e.g. when I'm using fileStream method to process files in a directory locally or on HDFS? -- Emre Sevinç
Spark RDD Python, Numpy Shape command
I am a beginner to Spark, having some simple questions regarding the use of RDD in python. Suppose I have a matrix called data_matrix, I pass it to RDD using RDD_matrix = sc.parallelize(data_matrix) but I will have a problem if I want to know the dimension of the matrix in Spark, because Sparkk RDD does not know the Python (Numpy package) command shape In this case, how should I deal with it? In general, do I need to translate all my piece of Python code in RDD acceptable syntax, so that my Python program can run using Pyspark? Thanks in advance for any helps! Best Rui
scala.Double vs java.lang.Double in RDD
Hi, I have a function with signature def aggFun1(rdd: RDD[(Long, (Long, Double))]): RDD[(Long, Any)] and one with def aggFun2[_Key: ClassTag, _Index](rdd: RDD[(_Key, (_Index, Double))]): RDD[(_Key, Double)] where all Double classes involved are scala.Double classes (according to IDEA) and my implementation of aggFun1 is just calling aggFun2 (type parameters _Key and _Index are inferred by the Scala compiler). Now I am writing a test as follows: val result: Map[Long, Any] = aggFun1(input).collect().toMap result.values.foreach(v = println(v.getClass)) result.values.foreach(_ shouldBe a[Double]) and I get the following output: class java.lang.Double class java.lang.Double [info] avg [info] - should compute the average *** FAILED *** [info] 1.75 was not an instance of double, but an instance of java.lang.Double So I am wondering about what magic is going on here. Are scala.Double values in RDDs automatically converted to java.lang.Doubles or am I just missing the implicit back-conversion etc.? Any help appreciated, Tobias
Is FileInputDStream returned by fileStream method a reliable receiver?
Is FileInputDStream returned by fileStream method a reliable receiver? In the Spark Streaming Guide it says: There can be two kinds of data sources based on their *reliability*. Sources (like Kafka and Flume) allow the transferred data to be acknowledged. If the system receiving data from these *reliable* sources acknowledge the received data correctly, it can be ensured that no data gets lost due to any kind of failure. This leads to two kinds of receivers. 1. *Reliable Receiver* - A *reliable receiver* correctly acknowledges a reliable source that the data has been received and stored in Spark with replication. 2. *Unreliable Receiver* - These are receivers for sources that do not support acknowledging. Even for reliable sources, one may implement an unreliable receiver that do not go into the complexity of acknowledging correctly. So I wonder whether the receivers for HDFS (and local file system) are reliable, e.g. when I'm using fileStream method to process files in a directory locally or on HDFS? -- Emre Sevinç
Re: Is the RDD's Partitions determined before hand ?
Hi Sean, If you know a stage needs unusually high parallelism for example you can repartition further for that stage. The problem is we may don't know whether high parallelism is needed. e.g. for the join operator, high parallelism may only be necessary for some dataset that lots of data can join together while for other dataset high parallelism may not be necessary if only a few data can join together. So my question is that unable changing parallelism at runtime dynamically may not be flexible. On Wed, Mar 4, 2015 at 5:36 PM, Sean Owen so...@cloudera.com wrote: Hm, what do you mean? You can control, to some extent, the number of partitions when you read the data, and can repartition if needed. You can set the default parallelism too so that it takes effect for most ops thay create an RDD. One # of partitions is usually about right for all work (2x or so the number of execution slots). If you know a stage needs unusually high parallelism for example you can repartition further for that stage. On Mar 4, 2015 1:50 AM, Jeff Zhang zjf...@gmail.com wrote: Thanks Sean. But if the partitions of RDD is determined before hand, it would not be flexible to run the same program on the different dataset. Although for the first stage the partitions can be determined by the input data set, for the intermediate stage it is not possible. Users have to create policy to repartition or coalesce based on the data set size. On Tue, Mar 3, 2015 at 6:29 PM, Sean Owen so...@cloudera.com wrote: An RDD has a certain fixed number of partitions, yes. You can't change an RDD. You can repartition() or coalese() and RDD to make a new one with a different number of RDDs, possibly requiring a shuffle. On Tue, Mar 3, 2015 at 10:21 AM, Jeff Zhang zjf...@gmail.com wrote: I mean is it possible to change the partition number at runtime. Thanks -- Best Regards Jeff Zhang -- Best Regards Jeff Zhang -- Best Regards Jeff Zhang
Re: Why can't Spark Streaming recover from the checkpoint directory when using a third party library for processingmulti-line JSON?
I'm adding this 3rd party library to my Maven pom.xml file so that it's embedded into the JAR I send to spark-submit: dependency groupIdjson-mapreduce/groupId artifactIdjson-mapreduce/artifactId version1.0-SNAPSHOT/version exclusions exclusion groupIdjavax.servlet/groupId artifactId*/artifactId /exclusion exclusion groupIdcommons-io/groupId artifactId*/artifactId /exclusion exclusion groupIdcommons-lang/groupId artifactId*/artifactId /exclusion exclusion groupIdorg.apache.hadoop/groupId artifactIdhadoop-common/artifactId /exclusion /exclusions /dependency Then I build my über JAR, and then I run my Spark Streaming application via the command line: spark-submit --class com.example.schemavalidator.SchemaValidatorDriver --master local[4] --deploy-mode client target/myapp-1.0-SNAPSHOT.jar -- Emre Sevinç On Wed, Mar 4, 2015 at 11:19 AM, Tathagata Das t...@databricks.com wrote: That could be a corner case bug. How do you add the 3rd party library to the class path of the driver? Through spark-submit? Could you give the command you used? TD On Wed, Mar 4, 2015 at 12:42 AM, Emre Sevinc emre.sev...@gmail.com wrote: I've also tried the following: Configuration hadoopConfiguration = new Configuration(); hadoopConfiguration.set(multilinejsoninputformat.member, itemSet); JavaStreamingContext ssc = JavaStreamingContext.getOrCreate(checkpointDirectory, hadoopConfiguration, factory, false); but I still get the same exception. Why doesn't getOrCreate ignore that Hadoop configuration part (which normally works, e.g. when not recovering)? -- Emre On Tue, Mar 3, 2015 at 3:36 PM, Emre Sevinc emre.sev...@gmail.com wrote: Hello, I have a Spark Streaming application (that uses Spark 1.2.1) that listens to an input directory, and when new JSON files are copied to that directory processes them, and writes them to an output directory. It uses a 3rd party library to process the multi-line JSON files ( https://github.com/alexholmes/json-mapreduce). You can see the relevant part of the streaming application at: https://gist.github.com/emres/ec18ee264e4eb0dd8f1a When I run this application locally, it works perfectly fine. But then I wanted to test whether it could recover from failure, e.g. if I stopped it right in the middle of processing some files. I started the streaming application, copied 100 files to the input directory, and hit Ctrl+C when it has alread processed about 50 files: ... 2015-03-03 15:06:20 INFO FileInputFormat:280 - Total input paths to process : 1 2015-03-03 15:06:20 INFO FileInputFormat:280 - Total input paths to process : 1 2015-03-03 15:06:20 INFO FileInputFormat:280 - Total input paths to process : 1 [Stage 0:== (65 + 4) / 100] ^C Then I started the application again, expecting that it could recover from the checkpoint. For a while it started to read files again and then gave an exception: ... 2015-03-03 15:06:39 INFO FileInputFormat:280 - Total input paths to process : 1 2015-03-03 15:06:39 INFO FileInputFormat:280 - Total input paths to process : 1 2015-03-03 15:06:39 WARN SchemaValidatorDriver:145 - * * * hadoopConfiguration: itemSet 2015-03-03 15:06:40 ERROR Executor:96 - Exception in task 0.0 in stage 0.0 (TID 0) java.io.IOException: Missing configuration value for multilinejsoninputformat.member at com.alexholmes.json.mapreduce.MultiLineJsonInputFormat.createRecordReader(MultiLineJsonInputFormat.java:30) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.init(NewHadoopRDD.scala:133) at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:107) at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) at org.apache.spark.rdd.RDD.iterator(RDD.scala:245) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) at org.apache.spark.rdd.RDD.iterator(RDD.scala:245) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at
Re:
You may look at https://issues.apache.org/jira/browse/SPARK-4516 Thanks Best Regards On Wed, Mar 4, 2015 at 12:25 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, I got this error message: 15/03/03 10:22:41 ERROR OneForOneBlockFetcher: Failed while starting block fetches java.lang.RuntimeException: java.io.FileNotFoundException: /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index (No such file or directory) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.init(FileInputStream.java:146) at org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:109) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:305) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) And then for the same index file and executor, I got the following errors multiple times 15/03/03 10:22:41 ERROR ShuffleBlockFetcherIterator: Failed to get block(s) from host-:39534 java.lang.RuntimeException: java.io.FileNotFoundException: /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index (No such file or directory) 15/03/03 10:22:41 ERROR RetryingBlockFetcher: Failed to fetch block shuffle_0_13_1228, and will not retry (0 retries) java.lang.RuntimeException: java.io.FileNotFoundException: /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index (No such file or directory) ... Caused by: java.net.ConnectException: Connection refused: host- What's the problem? BTW, I'm using Spark 1.2.1-SNAPSHOT I built around Dec. 20. Is there any bug fixes related to shuffle block fetching or index files after that? Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Connecting a PHP/Java applications to Spark SQL Thrift Server
For java You can use hive-jdbc connectivity jars to connect to Spark-SQL. The driver is inside the hive-jdbc Jar. *http://hive.apache.org/javadocs/r0.11.0/api/org/apache/hadoop/hive/jdbc/HiveDriver.html http://hive.apache.org/javadocs/r0.11.0/api/org/apache/hadoop/hive/jdbc/HiveDriver.html* On Wed, Mar 4, 2015 at 1:26 PM, n...@reactor8.com wrote: SparkSQL supports JDBC/ODBC connectivity, so if that's the route you needed/wanted to connect through you could do so via java/php apps. Havent used either so cant speak to the developer experience, assume its pretty good as would be preferred method for lots of third party enterprise apps/tooling If you prefer using the thrift server/interface, if they don't exist already in open source land you can use thrift definitions to generate client libs in any supported thrift language and use that for connectivity. Seems one issue with thrift-server is when running in cluster mode. Seems like it still exists but UX of error has been cleaned up in 1.3: https://issues.apache.org/jira/browse/SPARK-5176 -Original Message- From: fanooos [mailto:dev.fano...@gmail.com] Sent: Tuesday, March 3, 2015 11:15 PM To: user@spark.apache.org Subject: Connecting a PHP/Java applications to Spark SQL Thrift Server We have installed hadoop cluster with hive and spark and the spark sql thrift server is up and running without any problem. Now we have set of applications need to use spark sql thrift server to query some data. Some of these applications are java applications and the others are PHP applications. As I am an old fashioned java developer, I used to connect java applications to BD servers like Mysql using a JDBC driver. Is there a corresponding driver for connecting with Spark Sql Thrift server ? Or what is the library I need to use to connect to it? For PHP, what are the ways we can use to connect PHP applications to Spark Sql Thrift Server? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Connecting-a-PHP-Java-ap plications-to-Spark-SQL-Thrift-Server-tp21902.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
Re: Unable to submit spark job to mesos cluster
From the lines pointed in the exception log, I figured out that my code is unable to get the spark context. To isolate the problem, I've written a small code as below - *import org.apache.spark.SparkConf;* *import org.apache.spark.SparkContext;* *public class Test {* *public static void main(String[] args) throws Exception {* *SparkConf sparkConf = new SparkConf().setMaster(mesos://node2.algofusiontech.com:5050 http://node2.algofusiontech.com:5050).setAppName(test);* *SparkContext context = new SparkContext(sparkConf);* *}* *}* When I run this code as - *java -cp .:/opt/cloudera/parcels/CDH/jars/* Test* I'm getting the below exception dump. Please help. *1[sparkDriver-akka.actor.default-dispatcher-2] ERROR akka.actor.ActorSystemImpl - Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-4] shutting down ActorSystem [sparkDriver]* *java.lang.NoSuchMethodError: org.jboss.netty.channel.socket.nio.NioWorkerPool.init(Ljava/util/concurrent/Executor;I)V* * at akka.remote.transport.netty.NettyTransport.init(NettyTransport.scala:282)* * at akka.remote.transport.netty.NettyTransport.init(NettyTransport.scala:239)* * at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)* * at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)* * at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)* * at java.lang.reflect.Constructor.newInstance(Constructor.java:526)* * at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78)* * at scala.util.Try$.apply(Try.scala:161)* * at akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73)* * at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)* * at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)* * at scala.util.Success.flatMap(Try.scala:200)* * at akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84)* * at akka.remote.EndpointManager$$anonfun$8.apply(Remoting.scala:618)* * at akka.remote.EndpointManager$$anonfun$8.apply(Remoting.scala:610)* * at scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722)* * at scala.collection.Iterator$class.foreach(Iterator.scala:727)* * at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)* * at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)* * at scala.collection.AbstractIterable.foreach(Iterable.scala:54)* * at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)* * at akka.remote.EndpointManager.akka$remote$EndpointManager$$listens(Remoting.scala:610)* * at akka.remote.EndpointManager$$anonfun$receive$2.applyOrElse(Remoting.scala:450)* * at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)* * at akka.actor.ActorCell.invoke(ActorCell.scala:456)* * at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)* * at akka.dispatch.Mailbox.run(Mailbox.scala:219)* * at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)* * at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)* * at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)* * at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)* * at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)* *[ERROR] [03/04/2015 17:13:23.745] [main] [Remoting] Remoting error: [Startup timed out] [* *akka.remote.RemoteTransportException: Startup timed out* * at akka.remote.Remoting.akka$remote$Remoting$$notifyError(Remoting.scala:129)* * at akka.remote.Remoting.start(Remoting.scala:191)* * at akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)* * at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:579)* * at akka.actor.ActorSystemImpl._start(ActorSystem.scala:577)* * at akka.actor.ActorSystemImpl.start(ActorSystem.scala:588)* * at akka.actor.ActorSystem$.apply(ActorSystem.scala:111)* * at akka.actor.ActorSystem$.apply(ActorSystem.scala:104)* * at org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121)* * at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54)* * at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53)* * at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1454)* * at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)* * at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1450)* * at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:56)* * at org.apache.spark.SparkEnv$.create(SparkEnv.scala:156)* * at org.apache.spark.SparkContext.init(SparkContext.scala:203)* * at Test.main(Test.java:7)* *Caused by:
Re: Is the RDD's Partitions determined before hand ?
Parallelism doesn't really affect the throughput as long as it's: - not less than the number of available execution slots, - ... and probably some low multiple of them to even out task size effects - not so high that the bookkeeping overhead dominates Although you may need to select different scales of parallelism for different stages (like a join), you shouldn't in general have to change it according to data size. However you could count the input size and make parallelism some function of that if you found that was consistently better. The one exception are operations that tend to pull data into memory. You may need more parallelism as scale increases to keep in-memory data size small enough. There again you usually just err on the side of 'too much' parallelism, or avoid patterns that can pull a lot of data into memory, but this is usually the pain point if there is one. The problem I run into when thinking about this is that I don't think Spark can do much better, since it doesn't have the info above needed to decide these things in general. The calling program has to tell it. On Wed, Mar 4, 2015 at 10:17 AM, Jeff Zhang zjf...@gmail.com wrote: Hi Sean, If you know a stage needs unusually high parallelism for example you can repartition further for that stage. The problem is we may don't know whether high parallelism is needed. e.g. for the join operator, high parallelism may only be necessary for some dataset that lots of data can join together while for other dataset high parallelism may not be necessary if only a few data can join together. So my question is that unable changing parallelism at runtime dynamically may not be flexible. On Wed, Mar 4, 2015 at 5:36 PM, Sean Owen so...@cloudera.com wrote: Hm, what do you mean? You can control, to some extent, the number of partitions when you read the data, and can repartition if needed. You can set the default parallelism too so that it takes effect for most ops thay create an RDD. One # of partitions is usually about right for all work (2x or so the number of execution slots). If you know a stage needs unusually high parallelism for example you can repartition further for that stage. On Mar 4, 2015 1:50 AM, Jeff Zhang zjf...@gmail.com wrote: Thanks Sean. But if the partitions of RDD is determined before hand, it would not be flexible to run the same program on the different dataset. Although for the first stage the partitions can be determined by the input data set, for the intermediate stage it is not possible. Users have to create policy to repartition or coalesce based on the data set size. On Tue, Mar 3, 2015 at 6:29 PM, Sean Owen so...@cloudera.com wrote: An RDD has a certain fixed number of partitions, yes. You can't change an RDD. You can repartition() or coalese() and RDD to make a new one with a different number of RDDs, possibly requiring a shuffle. On Tue, Mar 3, 2015 at 10:21 AM, Jeff Zhang zjf...@gmail.com wrote: I mean is it possible to change the partition number at runtime. Thanks -- Best Regards Jeff Zhang -- Best Regards Jeff Zhang -- Best Regards Jeff Zhang - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Does SparkSQL support ..... having count (fieldname) in SQL statement?
Hi, It seems that SparkSQL, even the HiveContext, does not support SQL statements like : SELECT category, count(1) AS cnt FROM products GROUP BY category HAVING cnt 10; I get this exception: Error: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: CAST(('cnt 2), BooleanType), tree: I couldn't find anywhere is documentation whether having keyword is not supported ? If this is the case, what would be the work around? using two nested select statements? best, /Shahab
Re: insert Hive table with RDD
Hi, I guess that toDF() api in spark 1.3 which is required build from source code? Patcharee On 03. mars 2015 13:42, Cheng, Hao wrote: Using the SchemaRDD / DataFrame API via HiveContext Assume you're using the latest code, something probably like: val hc = new HiveContext(sc) import hc.implicits._ existedRdd.toDF().insertInto(hivetable) or existedRdd.toDF().registerTempTable(mydata) hc.sql(insert into hivetable as select xxx from mydata) -Original Message- From: patcharee [mailto:patcharee.thong...@uni.no] Sent: Tuesday, March 3, 2015 7:09 PM To: user@spark.apache.org Subject: insert Hive table with RDD Hi, How can I insert an existing hive table with an RDD containing my data? Any examples? Best, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Unable to submit spark job to mesos cluster
Hi, I have a cluster running on CDH5.2.1 and I have a Mesos cluster (version 0.18.1). Through a Oozie java action I'm want to submit a Spark job to mesos cluster. Before configuring it as Oozie job I'm testing the java action from command line and getting exception as below. While running I'm pointing the classpath to CDH Home/jars folder. What is going wrong? Is there any additional configuration to be done which I'm missing? [ERROR] [03/04/2015 17:00:49.968] [main] [Remoting] Remoting error: [Startup timed out] [ akka.remote.RemoteTransportException: Startup timed out at akka.remote.Remoting.akka$remote$Remoting$$notifyError(Remoting.scala:129) at akka.remote.Remoting.start(Remoting.scala:191) at akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184) at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:579) at akka.actor.ActorSystemImpl._start(ActorSystem.scala:577) at akka.actor.ActorSystemImpl.start(ActorSystem.scala:588) at akka.actor.ActorSystem$.apply(ActorSystem.scala:111) at akka.actor.ActorSystem$.apply(ActorSystem.scala:104) at org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121) at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54) at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53) at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1454) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1450) at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:56) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:156) at org.apache.spark.SparkContext.init(SparkContext.scala:203) at com.algofusion.reconciliation.execution.utils.ExecutionUtils.clinit(ExecutionUtils.java:130) at com.algofusion.reconciliation.execution.ReconExecutionController.initialize(ReconExecutionController.java:257) at com.algofusion.reconciliation.execution.ReconExecutionController.main(ReconExecutionController.java:105) Caused by: java.util.concurrent.TimeoutException: Futures timed out after [1 milliseconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at akka.remote.Remoting.start(Remoting.scala:173) ... 18 more ] Exception in thread main java.lang.ExceptionInInitializerError at com.algofusion.reconciliation.execution.ReconExecutionController.initialize(ReconExecutionController.java:257) at com.algofusion.reconciliation.execution.ReconExecutionController.main(ReconExecutionController.java:105) Caused by: java.util.concurrent.TimeoutException: Futures timed out after [1 milliseconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at akka.remote.Remoting.start(Remoting.scala:173) at akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184) at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:579) at akka.actor.ActorSystemImpl._start(ActorSystem.scala:577) at akka.actor.ActorSystemImpl.start(ActorSystem.scala:588) at akka.actor.ActorSystem$.apply(ActorSystem.scala:111) at akka.actor.ActorSystem$.apply(ActorSystem.scala:104) at org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121) at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54) at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53) at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1454) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1450) at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:56) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:156) at org.apache.spark.SparkContext.init(SparkContext.scala:203) at com.algofusion.reconciliation.execution.utils.ExecutionUtils.clinit(ExecutionUtils.java:130) ... 2 more Regards, Sarath.
RE: Does SparkSQL support ..... having count (fieldname) in SQL statement?
I think the problem is that you are using an alias in the having clause. I am not able to try just now but see if HAVING count (*) 2 works ( ie dont use cnt) Sent on the new Sprint Network from my Samsung Galaxy S®4. div Original message /divdivFrom: shahab shahab.mok...@gmail.com /divdivDate:03/04/2015 7:22 AM (GMT-05:00) /divdivTo: user@spark.apache.org /divdivSubject: Does SparkSQL support . having count (fieldname) in SQL statement? /divdiv /divHi, It seems that SparkSQL, even the HiveContext, does not support SQL statements like : SELECT category, count(1) AS cnt FROM products GROUP BY category HAVING cnt 10; I get this exception: Error: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: CAST(('cnt 2), BooleanType), tree: I couldn't find anywhere is documentation whether having keyword is not supported ? If this is the case, what would be the work around? using two nested select statements? best, /Shahab
Re: TreeNodeException: Unresolved attributes
Why don't you formulate a string before you pass it to the hql function (appending strings), and hql function is deprecated. You should use sql. http://spark.apache.org/docs/1.1.0/api/scala/index.html#org.apache.spark.sql.hive.HiveContext On Wed, Mar 4, 2015 at 6:15 AM, Anusha Shamanur anushas...@gmail.com wrote: Hi, I am trying to run a simple select query on a table. val restaurants=hiveCtx.hql(select * from TableName where column like '%SomeString%' ) This gives an error as below: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: *, tree: How do I solve this? -- Regards, Anusha -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
Re: Unable to submit spark job to mesos cluster
Looks like you are having 2 netty jars in the classpath. Thanks Best Regards On Wed, Mar 4, 2015 at 5:14 PM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: From the lines pointed in the exception log, I figured out that my code is unable to get the spark context. To isolate the problem, I've written a small code as below - *import org.apache.spark.SparkConf;* *import org.apache.spark.SparkContext;* *public class Test {* *public static void main(String[] args) throws Exception {* *SparkConf sparkConf = new SparkConf().setMaster(mesos://node2.algofusiontech.com:5050 http://node2.algofusiontech.com:5050).setAppName(test);* *SparkContext context = new SparkContext(sparkConf);* *}* *}* When I run this code as - *java -cp .:/opt/cloudera/parcels/CDH/jars/* Test* I'm getting the below exception dump. Please help. *1[sparkDriver-akka.actor.default-dispatcher-2] ERROR akka.actor.ActorSystemImpl - Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-4] shutting down ActorSystem [sparkDriver]* *java.lang.NoSuchMethodError: org.jboss.netty.channel.socket.nio.NioWorkerPool.init(Ljava/util/concurrent/Executor;I)V* * at akka.remote.transport.netty.NettyTransport.init(NettyTransport.scala:282)* * at akka.remote.transport.netty.NettyTransport.init(NettyTransport.scala:239)* * at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)* * at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)* * at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)* * at java.lang.reflect.Constructor.newInstance(Constructor.java:526)* * at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78)* * at scala.util.Try$.apply(Try.scala:161)* * at akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73)* * at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)* * at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)* * at scala.util.Success.flatMap(Try.scala:200)* * at akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84)* * at akka.remote.EndpointManager$$anonfun$8.apply(Remoting.scala:618)* * at akka.remote.EndpointManager$$anonfun$8.apply(Remoting.scala:610)* * at scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722)* * at scala.collection.Iterator$class.foreach(Iterator.scala:727)* * at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)* * at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)* * at scala.collection.AbstractIterable.foreach(Iterable.scala:54)* * at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)* * at akka.remote.EndpointManager.akka$remote$EndpointManager$$listens(Remoting.scala:610)* * at akka.remote.EndpointManager$$anonfun$receive$2.applyOrElse(Remoting.scala:450)* * at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)* * at akka.actor.ActorCell.invoke(ActorCell.scala:456)* * at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)* * at akka.dispatch.Mailbox.run(Mailbox.scala:219)* * at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)* * at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)* * at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)* * at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)* * at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)* *[ERROR] [03/04/2015 17:13:23.745] [main] [Remoting] Remoting error: [Startup timed out] [* *akka.remote.RemoteTransportException: Startup timed out* * at akka.remote.Remoting.akka$remote$Remoting$$notifyError(Remoting.scala:129)* * at akka.remote.Remoting.start(Remoting.scala:191)* * at akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)* * at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:579)* * at akka.actor.ActorSystemImpl._start(ActorSystem.scala:577)* * at akka.actor.ActorSystemImpl.start(ActorSystem.scala:588)* * at akka.actor.ActorSystem$.apply(ActorSystem.scala:111)* * at akka.actor.ActorSystem$.apply(ActorSystem.scala:104)* * at org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121)* * at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54)* * at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53)* * at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1454)* * at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)* * at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1450)* *
Re: Unable to submit spark job to mesos cluster
You can try increasing the Akka time out in the config, you can set the following in your config. spark.core.connection.ack.wait.timeout: 600 spark.akka.timeout: 1000 (In secs) spark.akka.frameSize:50 On Wed, Mar 4, 2015 at 5:14 PM, Sarath Chandra sarathchandra.jos...@algofusiontech.com wrote: From the lines pointed in the exception log, I figured out that my code is unable to get the spark context. To isolate the problem, I've written a small code as below - *import org.apache.spark.SparkConf;* *import org.apache.spark.SparkContext;* *public class Test {* *public static void main(String[] args) throws Exception {* *SparkConf sparkConf = new SparkConf().setMaster(mesos://node2.algofusiontech.com:5050 http://node2.algofusiontech.com:5050).setAppName(test);* *SparkContext context = new SparkContext(sparkConf);* *}* *}* When I run this code as - *java -cp .:/opt/cloudera/parcels/CDH/jars/* Test* I'm getting the below exception dump. Please help. *1[sparkDriver-akka.actor.default-dispatcher-2] ERROR akka.actor.ActorSystemImpl - Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-4] shutting down ActorSystem [sparkDriver]* *java.lang.NoSuchMethodError: org.jboss.netty.channel.socket.nio.NioWorkerPool.init(Ljava/util/concurrent/Executor;I)V* * at akka.remote.transport.netty.NettyTransport.init(NettyTransport.scala:282)* * at akka.remote.transport.netty.NettyTransport.init(NettyTransport.scala:239)* * at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)* * at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)* * at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)* * at java.lang.reflect.Constructor.newInstance(Constructor.java:526)* * at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78)* * at scala.util.Try$.apply(Try.scala:161)* * at akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73)* * at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)* * at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)* * at scala.util.Success.flatMap(Try.scala:200)* * at akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84)* * at akka.remote.EndpointManager$$anonfun$8.apply(Remoting.scala:618)* * at akka.remote.EndpointManager$$anonfun$8.apply(Remoting.scala:610)* * at scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722)* * at scala.collection.Iterator$class.foreach(Iterator.scala:727)* * at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)* * at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)* * at scala.collection.AbstractIterable.foreach(Iterable.scala:54)* * at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)* * at akka.remote.EndpointManager.akka$remote$EndpointManager$$listens(Remoting.scala:610)* * at akka.remote.EndpointManager$$anonfun$receive$2.applyOrElse(Remoting.scala:450)* * at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)* * at akka.actor.ActorCell.invoke(ActorCell.scala:456)* * at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)* * at akka.dispatch.Mailbox.run(Mailbox.scala:219)* * at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)* * at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)* * at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)* * at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)* * at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)* *[ERROR] [03/04/2015 17:13:23.745] [main] [Remoting] Remoting error: [Startup timed out] [* *akka.remote.RemoteTransportException: Startup timed out* * at akka.remote.Remoting.akka$remote$Remoting$$notifyError(Remoting.scala:129)* * at akka.remote.Remoting.start(Remoting.scala:191)* * at akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)* * at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:579)* * at akka.actor.ActorSystemImpl._start(ActorSystem.scala:577)* * at akka.actor.ActorSystemImpl.start(ActorSystem.scala:588)* * at akka.actor.ActorSystem$.apply(ActorSystem.scala:111)* * at akka.actor.ActorSystem$.apply(ActorSystem.scala:104)* * at org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121)* * at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54)* * at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53)* * at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1454)* * at
Re: Speed Benchmark
Sorry for the confusion. All are running Hadoop services. Node 1 is the namenode whereas Nodes 2 and 3 are datanodes. Best, Guillaume Guy * +1 919 - 972 - 8750* On Sat, Feb 28, 2015 at 1:09 AM, Sean Owen so...@cloudera.com wrote: Is machine 1 the only one running an HDFS data node? You describe it as one running Hadoop services. On Feb 27, 2015 9:44 PM, Guillaume Guy guillaume.c@gmail.com wrote: Hi Jason: Thanks for your feedback. Beside the information above I mentioned, there are 3 machines in the cluster. *1st one*: Driver + has a bunch of Hadoop services. 32GB of RAM, 8 cores (2 used) *2nd + 3rd: *16B of RAM, 4 cores (2 used each) I hope this helps clarify. Thx. GG Best, Guillaume Guy * +1 919 - 972 - 8750 %2B1%20919%20-%20972%20-%208750* On Fri, Feb 27, 2015 at 9:06 AM, Jason Bell jaseb...@gmail.com wrote: How many machines are on the cluster? And what is the configuration of those machines (Cores/RAM)? Small cluster is very subjective statement. Guillaume Guy wrote: Dear Spark users: I want to see if anyone has an idea of the performance for a small cluster.