Re: How to list all dataframes and RDDs available in current session?
You get the list of all the persistet rdd using spark context... On Aug 21, 2015 12:06 AM, Rishitesh Mishra rishi80.mis...@gmail.com wrote: I am not sure if you can view all RDDs in a session. Tables are maintained in a catalogue . Hence its easier. However you can see the DAG representation , which lists all the RDDs in a job , with Spark UI. On 20 Aug 2015 22:34, Dhaval Patel dhaval1...@gmail.com wrote: Apologies I accidentally included Spark User DL on BCC. The actual email message is below. = Hi: I have been working on few example using zeppelin. I have been trying to find a command that would list all *dataframes/RDDs* that has been created in current session. Anyone knows if there is any such commands available? Something similar to SparkSQL to list all temp tables : show tables; Thanks, Dhaval On Thu, Aug 20, 2015 at 12:49 PM, Dhaval Patel dhaval1...@gmail.com wrote: Hi: I have been working on few example using zeppelin. I have been trying to find a command that would list all *dataframes/RDDs* that has been created in current session. Anyone knows if there is any such commands available? Something similar to SparkSQL to list all temp tables : show tables; Thanks, Dhaval
Re: spark streaming 1.3 kafka error
it comes at start of each tasks when there is new data inserted in kafka.( data inserted is very few) kafka topic has 300 partitions - data inserted is ~10 MB. Tasks gets failed and it retries which succeed and after certain no of fail tasks it kills the job. On Sat, Aug 22, 2015 at 2:08 AM, Akhil Das ak...@sigmoidanalytics.com wrote: That looks like you are choking your kafka machine. Do a top on the kafka machines and see the workload, it may happen that you are spending too much time on disk io etc. On Aug 21, 2015 7:32 AM, Cody Koeninger c...@koeninger.org wrote: Sounds like that's happening consistently, not an occasional network problem? Look at the Kafka broker logs Make sure you've configured the correct kafka broker hosts / ports (note that direct stream does not use zookeeper host / port). Make sure that host / port is reachable from your driver and worker nodes, ie telnet or netcat to it. It looks like your driver can reach it (since there's partition info in the logs), but that doesn't mean the worker can. Use lsof / netstat to see what's going on with those ports while the job is running, or tcpdump if you need to. If you can't figure out what's going on from a networking point of view, post a minimal reproducible code sample that demonstrates the issue, so it can be tested in a different environment. On Fri, Aug 21, 2015 at 4:06 AM, Shushant Arora shushantaror...@gmail.com wrote: Hi Getting below error in spark streaming 1.3 while consuming from kafka using directkafka stream. Few of tasks are getting failed in each run. What is the reason /solution of this error? 15/08/21 08:54:54 ERROR executor.Executor: Exception in task 262.0 in stage 130.0 (TID 16332) java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at kafka.utils.Utils$.read(Utils.scala:376) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:150) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:162) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:210) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/08/21 08:54:54 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 16348 15/08/21 08:54:54 INFO executor.Executor: Running task 260.1 in stage 130.0 (TID 16348) 15/08/21 08:54:54 INFO kafka.KafkaRDD: Computing topic test_hbrealtimeevents, partition 75 offsets 4701 - 4718 15/08/21 08:54:54 INFO utils.VerifiableProperties: Verifying properties Thanks
Re: How can I save the RDD result as Orcfile with spark1.3?
hi Ted, thanks for your reply, are there any other way to do this with spark 1.3? such as write the orcfile manually in foreachPartition method? On Sat, Aug 22, 2015 at 12:19 PM, Ted Yu yuzhih...@gmail.com wrote: ORC support was added in Spark 1.4 See SPARK-2883 On Fri, Aug 21, 2015 at 7:36 PM, dong.yajun dongt...@gmail.com wrote: Hi list, Is there a way to save the RDD result as Orcfile in spark1.3? due to some reasons we can't upgrade our spark version to 1.4 now. -- *Ric Dong* -- *Ric Dong*
Re: Random Forest and StringIndexer in pyspark ML Pipeline
ML plans to make Machine Learning pipeline that users can make machine learning more efficient. It's more general to make StringIndexer chain with any kinds of Estimators. I think we can make StringIndexer and reverse process automatic in the future. If you want to know your original labels, you can use IndexToString. 2015-08-11 6:56 GMT+08:00 pkphlam pkph...@gmail.com: Hi, If I understand the RandomForest model in the ML Pipeline implementation in the ml package correctly, I have to first run my outcome label variable through the StringIndexer, even if my labels are numeric. The StringIndexer then converts the labels into numeric indices based on frequency of the label. This could create situations where if I'm classifying binary outcomes where my original labels are simply 0 and 1, the StringIndexer may actually flip my labels such that 0s become 1s and 1s become 0s if my original 1s were more frequent. This transformation would then extend itself to the predictions. In the old mllib implementation, the RF does not require the labels to be changed and I could use 0/1 labels without worrying about them being transformed. I was wondering: 1. Why is this the default implementation for the Pipeline RF? This seems like it could cause a lot of confusion in cases like the one I outlined above. 2. Is there a way to avoid this by either controlling how the indices are created in StringIndexer or bypassing StringIndexer altogether? 3. If 2 is not possible, is there an easy way to see how my original labels mapped onto the indices so that I can revert the predictions back to the original labels rather than the transformed labels? I suppose I could do this by counting the original labels and mapping by frequency, but it seems like there should be a more straightforward way to get it out of the StringIndexer. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Random-Forest-and-StringIndexer-in-pyspark-ML-Pipeline-tp24200.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
DataFrame. SparkPlan / Project serialization issue: ArrayIndexOutOfBounds.
Hi, I'm using spark 1.3.1 built against hadoop 1.0.4 and java 1.7 and I'm trying to save my data frame to parquet. The issue I'm stuck looks like serialization tries to do pretty weird thing: tries to write to an empty array. The last (through stack trace) line of spark code that leads to exception is in method SerializationDebugger.visitSerializable(o: Object, stack: List[String]): List[String]. desc.getObjFieldValues(finalObj, objFieldValues) The reason it does so, is because finalObj is org.apache.spark.sql.execution.Project and objFieldValues is an empty array! As a result there are two fields to read from the Project instance object (happens in java.io.ObjectStreamClass), but there is an empty array to read into. A little bit of code with debug info: private def visitSerializable(o: Object, stack: List[String]): List[String] = { val (finalObj, desc) = findObjectAndDescriptor(o) //finalObj: Project, desc: org.apache.spark.sql.execution.Project val slotDescs = desc.getSlotDescs //java.io.ObjectStreamClass[2] [0: SparkPlan, 1: Project] var i = 0 //i: 0 while (i slotDescs.length) { val slotDesc = slotDescs(i) //slotDesc: org.apache.spark.sql.execution.SparkPlan if (slotDesc.hasWriteObjectMethod) { // TODO: Handle classes that specify writeObject method. } else { val fields: Array[ObjectStreamField] = slotDesc.getFields //fields: java.io.ObjectStreamField[1] [0: Z codegenEnabled] val objFieldValues: Array[Object] = new Array[Object](slotDesc.getNumObjFields) //objFieldValues: java.lang.Object[0] val numPrims = fields.length - objFieldValues.length //numPrims: 1 desc.getObjFieldValues(finalObj, objFieldValues) //leads to exception So it looks like it gets objFieldValues array from the SparkPlan object, but uses it to receive values from Project object. Here is the schema of my data frame root |-- Id: long (nullable = true) |-- explodes: struct (nullable = true) ||-- Identifiers: array (nullable = true) |||-- element: struct (containsNull = true) ||||-- Type: array (nullable = true) |||||-- element: string (containsNull = true) |-- Identifiers: struct (nullable = true) ||-- Type: array (nullable = true) |||-- element: string (containsNull = true) |-- Type2: string (nullable = true) |-- Type: string (nullable = true) Actual stack trace is: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1623) at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:635) at org.apache.spark.sql.execution.Project.execute(basicOperators.scala:40) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:84) at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:887) at com.reltio.analytics.data.application.DataAccessTest.testEntities_NestedAttribute(DataAccessTest.java:199) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) at org.junit.runners.ParentRunner.run(ParentRunner.java:309) at org.junit.runner.JUnitCore.run(JUnitCore.java:160) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:68) Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at
Is long running Spark batch job in fine grained mode is Deprecated?
Hi *, We are trying to run Spark on top of mesos using fine grained mode. While talking to few people i came to know that running Spark job using fine grained mode on mesos is not a good idea. I could not find anything regarding fine grained mode getting deprecated and also if corse grained mode is default choice for running Spark long running batch job. Thanks, -- Regards, Akash Mishra. Its not our abilities that make us, but our decisions.--Albus Dumbledore
Spark streaming multi-tasking during I/O
Hi, My scenario goes like this: I have an algorithm running in Spark streaming mode on a 4 core virtual machine. Majority of the time, the algorithm does disk I/O and database I/O. Question is, during the I/O, where the CPU is not considerably loaded, is it possible to run any other task/thread so as to efficiently utilize the CPU? Note that one DStream of the algorithm runs completely on a single CPU Thank you, Sateesh
Re: How to avoid executor time out on yarn spark while dealing with large shuffle skewed data?
This is something of a wild guess, but I find that when executors start disappearingfor no obvious reason, this is usually because the yarn node-managers have decided that the containers are using too much memory and then terminate the executors. Unfortunately, to see evidence of this, one needs to carefully review the yarn node-manager logson the workers -- it doesn't seem to show up in the UI. What I generally do is some combination of: 1) increasing executors memory (often also decreasing number of executors) 2) decreasing the number of cores per executor 3) increase the executor memory overhead. Good luck. -Mike From: Sandy Ryza sandy.r...@cloudera.com To: Umesh Kacha umesh.ka...@gmail.com Cc: user@spark.apache.org user@spark.apache.org Sent: Thursday, August 20, 2015 5:21 PM Subject: Re: How to avoid executor time out on yarn spark while dealing with large shuffle skewed data? GC wouldn't necessarily result in errors - it could just be slowing down your job and causing the executor JVMs to stall. If you click on a stage in the UI, you should end up on a page with all the metrics concerning the tasks that ran in that stage. GC Time is one of these task metrics. -Sandy On Thu, Aug 20, 2015 at 8:54 AM, Umesh Kacha umesh.ka...@gmail.com wrote: Hi where do I see GC time in UI? I have set spark.yarn.executor.memoryOverhead as 3500 which seems to be good enough I believe. So you mean only GC could be the reason behind timeout I checked Yarn logs I did not see any GC error there. Please guide. Thanks much. On Thu, Aug 20, 2015 at 8:14 PM, Sandy Ryza sandy.r...@cloudera.com wrote: Moving this back onto user@ Regarding GC, can you look in the web UI and see whether the GC time metric dominates the amount of time spent on each task (or at least the tasks that aren't completing)? Also, have you tried bumping your spark.yarn.executor.memoryOverhead? YARN may be killing your executors for using too much off-heap space. You can see whether this is happening by looking in the Spark AM or YARN NodeManager logs. -Sandy On Thu, Aug 20, 2015 at 7:39 AM, Umesh Kacha umesh.ka...@gmail.com wrote: Hi thanks much for the response. Yes I tried default settings too 0.2 it was also going into timeout if it is spending time in GC then why it is not throwing GC error I don't see any such error. Yarn logs are not helpful at all. What is tungsten how do I use it? Spark is doing great I believe my job runs successfully and 60% tasks completes only after first executor gets lost things are messing. On Aug 20, 2015 7:59 PM, Sandy Ryza sandy.r...@cloudera.com wrote: What sounds most likely is that you're hitting heavy garbage collection. Did you hit issues when the shuffle memory fraction was at its default of 0.2? A potential danger with setting the shuffle storage to 0.7 is that it allows shuffle objects to get into the GC old generation, which triggers more stop-the-world garbage collections. Have you tried enabling Tungsten / unsafe? Unfortunately, Spark is still not that great at dealing with heavily-skewed shuffle data, because its reduce-side aggregation still operates on Java objects instead of binary data. -Sandy On Thu, Aug 20, 2015 at 7:21 AM, Umesh Kacha umesh.ka...@gmail.com wrote: Hi Sandy thanks much for the response. I am using Spark 1.4.1 and I have set spark.shuffle.storage as 0.7 as my spark job involves 4 groupby queries executed using hiveContext.sql my data set is skewed so will be more shuffling I believe I don't know what's wrong spark job runs fine for almost an hour and when shuffle read shuffle write column in UI starts to show more than 10 gb executor starts to getting lost because of timeout and slowly other executor starts getting lost. Please guide. On Aug 20, 2015 7:38 PM, Sandy Ryza sandy.r...@cloudera.com wrote: What version of Spark are you using? Have you set any shuffle configs? On Wed, Aug 19, 2015 at 11:46 AM, unk1102 umesh.ka...@gmail.com wrote: I have one Spark job which seems to run fine but after one hour or so executor start getting lost because of time out something like the following error cluster.yarnScheduler : Removing an executor 14 65 timeout exceeds 60 seconds and because of above error couple of chained errors starts to come like FetchFailedException, Rpc client disassociated, Connection reset by peer, IOException etc Please see the following UI page I have noticed when shuffle read/write starts to increase more than 10 GB executors starts getting lost because of timeout. How do I clear this stacked memory of 10 GB in shuffle read/write section I dont cache anything why Spark is not clearing those memory. Please guide. IMG_20150819_231418358.jpg http://apache-spark-user-list.1001560.n3.nabble.com/file/n24345/IMG_20150819_231418358.jpg -- View this message in context:
RE: Transformation not happening for reduceByKey or GroupByKey
I believe spark-shell -i scriptFile is there. We also use it, at least in Spark 1.3.1. dse spark will just wrap spark-shell command, underline it is just invoking spark-shell. I don't know too much about the original problem though. Yong Date: Fri, 21 Aug 2015 18:19:49 +0800 Subject: Re: Transformation not happening for reduceByKey or GroupByKey From: zjf...@gmail.com To: jsatishchan...@gmail.com CC: robin.e...@xense.co.uk; user@spark.apache.org Hi Satish, I don't see where spark support -i, so suspect it is provided by DSE. In that case, it might be bug of DSE. On Fri, Aug 21, 2015 at 6:02 PM, satish chandra j jsatishchan...@gmail.com wrote: HI Robin,Yes, it is DSE but issue is related to Spark only Regards,Satish Chandra On Fri, Aug 21, 2015 at 3:06 PM, Robin East robin.e...@xense.co.uk wrote: Not sure, never used dse - it’s part of DataStax Enterprise right? On 21 Aug 2015, at 10:07, satish chandra j jsatishchan...@gmail.com wrote: HI Robin,Yes, below mentioned piece or code works fine in Spark Shell but the same when place in Script File and executed with -i file name it creating an empty RDD scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40)))pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77] at makeRDD at console:28 scala pairs.reduceByKey((x,y) = x + y).collectres43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40)) Command: dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile I understand, I am missing something here due to which my final RDD does not have as required output Regards,Satish Chandra On Thu, Aug 20, 2015 at 8:23 PM, Robin East robin.e...@xense.co.uk wrote: This works for me: scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40)))pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77] at makeRDD at console:28 scala pairs.reduceByKey((x,y) = x + y).collectres43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40)) On 20 Aug 2015, at 11:05, satish chandra j jsatishchan...@gmail.com wrote: HI All,I have data in RDD as mentioned below: RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40)) I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on Values for each key Code:RDD.reduceByKey((x,y) = x+y)RDD.take(3) Result in console: RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at console:73res:Array[(Int,Int)] = Array() Command as mentioned dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile Please let me know what is missing in my code, as my resultant Array is empty Regards,Satish -- Best Regards Jeff Zhang
Re: SPARK sql :Need JSON back isntead of roq
2015-08-21 3:17 GMT-07:00 smagadi sudhindramag...@fico.com: teenagers .toJSON gives the json but it does not preserve the parent ids meaning if the input was {name:Yin, address:{city:Columbus,state:Ohio},age:20} val x= sqlContext.sql(SELECT name, address.city, address.state ,age FROM people where age19 and age =30 ).toJSON x.collect().foreach(println) This returns back , missing address. {name:Yin,city:Columbus,state:Ohio,age:20} Is this a bug ? You're not including it in the query, so of course it's not there. Try sqlContext.sql(Select * from ppl).toJSON.collect().foreach(println) instead. I get {address:{city:Columbus,state:Ohio},name:Yin} R.
Re: Transformation not happening for reduceByKey or GroupByKey
HI All, Any inputs for the actual problem statement Regards, Satish On Fri, Aug 21, 2015 at 5:57 PM, Jeff Zhang zjf...@gmail.com wrote: Yong, Thanks for your reply. I tried spark-shell -i script-file, it works fine for me. Not sure the different with dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile On Fri, Aug 21, 2015 at 7:01 PM, java8964 java8...@hotmail.com wrote: I believe spark-shell -i scriptFile is there. We also use it, at least in Spark 1.3.1. dse spark will just wrap spark-shell command, underline it is just invoking spark-shell. I don't know too much about the original problem though. Yong -- Date: Fri, 21 Aug 2015 18:19:49 +0800 Subject: Re: Transformation not happening for reduceByKey or GroupByKey From: zjf...@gmail.com To: jsatishchan...@gmail.com CC: robin.e...@xense.co.uk; user@spark.apache.org Hi Satish, I don't see where spark support -i, so suspect it is provided by DSE. In that case, it might be bug of DSE. On Fri, Aug 21, 2015 at 6:02 PM, satish chandra j jsatishchan...@gmail.com wrote: HI Robin, Yes, it is DSE but issue is related to Spark only Regards, Satish Chandra On Fri, Aug 21, 2015 at 3:06 PM, Robin East robin.e...@xense.co.uk wrote: Not sure, never used dse - it’s part of DataStax Enterprise right? On 21 Aug 2015, at 10:07, satish chandra j jsatishchan...@gmail.com wrote: HI Robin, Yes, below mentioned piece or code works fine in Spark Shell but the same when place in Script File and executed with -i file name it creating an empty RDD scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40))) pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77] at makeRDD at console:28 scala pairs.reduceByKey((x,y) = x + y).collect res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40)) Command: dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile I understand, I am missing something here due to which my final RDD does not have as required output Regards, Satish Chandra On Thu, Aug 20, 2015 at 8:23 PM, Robin East robin.e...@xense.co.uk wrote: This works for me: scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40))) pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77] at makeRDD at console:28 scala pairs.reduceByKey((x,y) = x + y).collect res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40)) On 20 Aug 2015, at 11:05, satish chandra j jsatishchan...@gmail.com wrote: HI All, I have data in RDD as mentioned below: RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40)) I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on Values for each key Code: RDD.reduceByKey((x,y) = x+y) RDD.take(3) Result in console: RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at console:73 res:Array[(Int,Int)] = Array() Command as mentioned dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile Please let me know what is missing in my code, as my resultant Array is empty Regards, Satish -- Best Regards Jeff Zhang -- Best Regards Jeff Zhang
Re: Failed stages and dropped executors when running implicit matrix factorization/ALS
I've been able to almost halve my memory usage with no instability issues. I lowered my storage.memoryFraction and increased my shuffle.memoryFraction (essentially swapping them). I set spark.yarn.executor.memoryOverhead to 6GB. And I lowered executor-cores in case other jobs are using the available cores. I'm not sure which of these fixed the issue, but things are much more stable now. On Fri, Jun 26, 2015 at 11:26 AM, Xiangrui Meng men...@gmail.com wrote: Please see my comments inline. It would be helpful if you can attach the full stack trace. -Xiangrui On Fri, Jun 26, 2015 at 7:18 AM, Ravi Mody rmody...@gmail.com wrote: 1. These are my settings: rank = 100 iterations = 12 users = ~20M items = ~2M training examples = ~500M-1B (I'm running into the issue even with 500M training examples) Did you set number of blocks? If you didn't, could you check how many partitions you have in the ratings RDD? Setting a large number of blocks would increase shuffle size. If you have enough RAM, try to set number of blocks to the number of CPU cores or less. 2. The memory storage never seems to go too high. The user blocks may go up to ~10Gb, and each executor will have a few GB used out of 30 free GB. Everything seems small compared to the amount of memory I'm using. This looks correct. 3. I think I have a lot of disk space - is this on the executors or the driver? Is there a way to know if the error is coming from disk space. You can see the shuffle data size for each iteration from the WebUI. Usually, it should throw an out of disk space exception instead of the message you posted. But it is worth checking. 4. I'm not changing checkpointing settings, but I think checkpointing defaults to every 10 iterations? One notable thing is the crashes often start on or after the 9th iteration, so it may be related to checkpointing. But this could just be a coincidence. If you didn't set checkpointDir in SparkContext, the checkpointInterval setting in ALS has no effect. Thanks! On Fri, Jun 26, 2015 at 1:08 AM, Ayman Farahat ayman.fara...@yahoo.com wrote: was there any resolution to that problem? I am also having that with Pyspark 1.4 380 Million observations 100 factors and 5 iterations Thanks Ayman On Jun 23, 2015, at 6:20 PM, Xiangrui Meng men...@gmail.com wrote: It shouldn't be hard to handle 1 billion ratings in 1.3. Just need more information to guess what happened: 1. Could you share the ALS settings, e.g., number of blocks, rank and number of iterations, as well as number of users/items in your dataset? 2. If you monitor the progress in the WebUI, how much data is stored in memory and how much data is shuffled per iteration? 3. Do you have enough disk space for the shuffle files? 4. Did you set checkpointDir in SparkContext and checkpointInterval in ALS? Best, Xiangrui On Fri, Jun 19, 2015 at 11:43 AM, Ravi Mody rmody...@gmail.com wrote: Hi, I'm running implicit matrix factorization/ALS in Spark 1.3.1 on fairly large datasets (1+ billion input records). As I grow my dataset I often run into issues with a lot of failed stages and dropped executors, ultimately leading to the whole application failing. The errors are like org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 19 and org.apache.spark.shuffle.FetchFailedException: Failed to connect to These occur during flatMap, mapPartitions, and aggregate stages. I know that increasing memory fixes this issue, but most of the time my executors are only using a tiny portion of the their allocated memory (10%). Often, the stages run fine until the last iteration or two of ALS, but this could just be a coincidence. I've tried tweaking a lot of settings, but it's time-consuming to do this through guess-and-check. Right now I have these set: spark.shuffle.memoryFraction = 0.3 spark.storage.memoryFraction = 0.65 spark.executor.heartbeatInterval = 60 I'm sure these settings aren't optimal - any idea of what could be causing my errors, and what direction I can push these settings in to get more out of my memory? I'm currently using 240 GB of memory (on 7 executors) for a 1 billion record dataset, which seems like too much. Thanks! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Transformation not happening for reduceByKey or GroupByKey
You had: RDD.reduceByKey((x,y) = x+y) RDD.take(3) Maybe try: rdd2 = RDD.reduceByKey((x,y) = x+y) rdd2.take(3) -Abhishek- On Aug 20, 2015, at 3:05 AM, satish chandra j jsatishchan...@gmail.com wrote: HI All, I have data in RDD as mentioned below: RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40)) I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on Values for each key Code: RDD.reduceByKey((x,y) = x+y) RDD.take(3) Result in console: RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at console:73 res:Array[(Int,Int)] = Array() Command as mentioned dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile Please let me know what is missing in my code, as my resultant Array is empty Regards, Satish - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark streaming 1.3 kafka error
Sounds like that's happening consistently, not an occasional network problem? Look at the Kafka broker logs Make sure you've configured the correct kafka broker hosts / ports (note that direct stream does not use zookeeper host / port). Make sure that host / port is reachable from your driver and worker nodes, ie telnet or netcat to it. It looks like your driver can reach it (since there's partition info in the logs), but that doesn't mean the worker can. Use lsof / netstat to see what's going on with those ports while the job is running, or tcpdump if you need to. If you can't figure out what's going on from a networking point of view, post a minimal reproducible code sample that demonstrates the issue, so it can be tested in a different environment. On Fri, Aug 21, 2015 at 4:06 AM, Shushant Arora shushantaror...@gmail.com wrote: Hi Getting below error in spark streaming 1.3 while consuming from kafka using directkafka stream. Few of tasks are getting failed in each run. What is the reason /solution of this error? 15/08/21 08:54:54 ERROR executor.Executor: Exception in task 262.0 in stage 130.0 (TID 16332) java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at kafka.utils.Utils$.read(Utils.scala:376) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:150) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:162) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:210) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/08/21 08:54:54 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 16348 15/08/21 08:54:54 INFO executor.Executor: Running task 260.1 in stage 130.0 (TID 16348) 15/08/21 08:54:54 INFO kafka.KafkaRDD: Computing topic test_hbrealtimeevents, partition 75 offsets 4701 - 4718 15/08/21 08:54:54 INFO utils.VerifiableProperties: Verifying properties Thanks
Want to install lz4 compression
Hi all, I am using pre-compiled spark with hadoop 2.6. LZ4 Codec is not on hadoop's native libraries, so I am not being able to use it. Can anyone suggest on how to proceed? Hopefully I wont have to recompile hadoop. I tried changing the --driver-library-path to point directly into lz4 stand alone package libraries, but of course it didnt work. Thanks Saif
Re: Want to install lz4 compression
Have you read this ? http://stackoverflow.com/questions/22716346/how-to-use-lz4-compression-in-linux-3-11 On Aug 21, 2015, at 6:57 AM, saif.a.ell...@wellsfargo.com saif.a.ell...@wellsfargo.com wrote: Hi all, I am using pre-compiled spark with hadoop 2.6. LZ4 Codec is not on hadoop’s native libraries, so I am not being able to use it. Can anyone suggest on how to proceed? Hopefully I wont have to recompile hadoop. I tried changing the --driver-library-path to point directly into lz4 stand alone package libraries, but of course it didnt work. Thanks Saif
Tungsten and sun.misc.Unsafe
Hello, I attended the Tungsten-related presentations at Spark Summit (by Josh Rosen) and at Big Data Scala (by Matei Zaharia). Needless to say, this project holds great promise for major performance improvements. At Josh's talk, I heard about the use of sun.misc.Unsafe as a way of achieving some of these optimizations (e.g. slides 11-17 of Josh's presentation: http://www.slideshare.net/SparkSummit/deep-dive-into-project-tungsten-josh-rosen). I have no problems with the use of Unsafe in the code itself (I've done it before myself, too), however I think there is a considerable risk associated with beginning the use of Unsafe now, because Oracle is determined to limit access to APIs such as Unsafe starting in Java 9. JEP 260 http://openjdk.java.net/jeps/260 was filed specifically to limit access to internal JDK APIs that were never intended for external use, including sun.misc.* The JEP does say that the functionality of sun.misc.Unsafe is to remain available even as other internal APIs are blocked for non-JDK use, however, it also says that the functionality of many methods of this class is now available via *variable handles (JEP 193 http://openjdk.java.net/jeps/193).* If the direct access to sun.misc.Unsafe is blocked and only the variable handles access remains, this may mean more than just a need for code refactoring - functionality such as doing malloc from Spark core may be restricted. JEP 260 has evolved quite a bit over time and the wording available now (after the Aug. 4, 2015) seems more reasonable than before. Nevertheless, Hazelcast and other companies whose technologies depend on the availability of Unsafe started a Google doc here https://docs.google.com/document/d/1GDm_cAxYInmoHMor-AkStzWvwE9pw6tnz_CebJQxuUE/edit#heading=h.brct71tr6e13 . I doubt that Oracle would want to make life difficult for everyone. In addition to Spark's code base, projects such as Akka, Cassandra, Hibernate, Netty, Neo4j and Spring (among many others) depend on Unsafe. Still, there are tons of posts about this issue in the Java community (e.g. here https://jaxenter.com/hazelcast-on-java-unsafe-class-119286.html's a Hazelcast interview, also from Aug. 3, the day before the latest update to JEP 260). There are tons of concerned posts on the blogosphere, too (e.g. here http://blog.dripstat.com/removal-of-sun-misc-unsafe-a-disaster-in-the-making/ ). Have the leaders of the Spark community been following these Unsafe-related developments and if so, what's Spark's plan of handling whatever Oracle throws our way? Marek
Re: Transformation not happening for reduceByKey or GroupByKey
HI Abhishek, I have even tried that but rdd2 is empty Regards, Satish On Fri, Aug 21, 2015 at 6:47 PM, Abhishek R. Singh abhis...@tetrationanalytics.com wrote: You had: RDD.reduceByKey((x,y) = x+y) RDD.take(3) Maybe try: rdd2 = RDD.reduceByKey((x,y) = x+y) rdd2.take(3) -Abhishek- On Aug 20, 2015, at 3:05 AM, satish chandra j jsatishchan...@gmail.com wrote: HI All, I have data in RDD as mentioned below: RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40)) I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on Values for each key Code: RDD.reduceByKey((x,y) = x+y) RDD.take(3) Result in console: RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at console:73 res:Array[(Int,Int)] = Array() Command as mentioned dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile Please let me know what is missing in my code, as my resultant Array is empty Regards, Satish
RE: Transformation not happening for reduceByKey or GroupByKey
What version of Spark you are using, or comes with DSE 4.7? We just cannot reproduce it in Spark. yzhang@localhost$ more test.sparkval pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40)))pairs.reduceByKey((x,y) = x + y).collectyzhang@localhost$ ~/spark/bin/spark-shell --master local -i test.sparkWelcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.3.1 /_/ Using Scala version 2.10.4Spark context available as sc.SQL context available as sqlContext.Loading test.spark...pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at makeRDD at console:2115/08/21 09:58:51 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yesres0: Array[(Int, Int)] = Array((0,3), (1,50), (2,40)) Yong Date: Fri, 21 Aug 2015 19:24:09 +0530 Subject: Re: Transformation not happening for reduceByKey or GroupByKey From: jsatishchan...@gmail.com To: abhis...@tetrationanalytics.com CC: user@spark.apache.org HI Abhishek, I have even tried that but rdd2 is empty Regards,Satish On Fri, Aug 21, 2015 at 6:47 PM, Abhishek R. Singh abhis...@tetrationanalytics.com wrote: You had: RDD.reduceByKey((x,y) = x+y) RDD.take(3) Maybe try: rdd2 = RDD.reduceByKey((x,y) = x+y) rdd2.take(3) -Abhishek- On Aug 20, 2015, at 3:05 AM, satish chandra j jsatishchan...@gmail.com wrote: HI All, I have data in RDD as mentioned below: RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40)) I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on Values for each key Code: RDD.reduceByKey((x,y) = x+y) RDD.take(3) Result in console: RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at console:73 res:Array[(Int,Int)] = Array() Command as mentioned dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile Please let me know what is missing in my code, as my resultant Array is empty Regards, Satish
Spark ec2 lunch problem
Hi All, I am trying to lunch a spark ec2 cluster by running spark-ec2 --key-pair=key --identity-file=my.pem --vpc-id=myvpc --subnet-id=subnet-011 --spark-version=1.4.1 launch spark-cluster but getting following message endless. Please help. Warning: SSH connection error. (This could be temporary.) Host: SSH return code: 255 SSH output: ssh: Could not resolve hostname : Name or service not known
Re: build spark 1.4.1 with JDK 1.6
Thanks Sean. So how PySpark is supported. I thought PySpark needs jdk 1.6. Chen On Fri, Aug 21, 2015 at 11:16 AM, Sean Owen so...@cloudera.com wrote: Spark 1.4 requires Java 7. On Fri, Aug 21, 2015, 3:12 PM Chen Song chen.song...@gmail.com wrote: I tried to build Spark 1.4.1 on cdh 5.4.0. Because we need to support PySpark, I used JDK 1.6. I got the following error, [INFO] --- scala-maven-plugin:3.2.0:testCompile (scala-test-compile-first) @ spark-streaming_2.10 --- java.lang.UnsupportedClassVersionError: org/apache/hadoop/io/LongWritable : Unsupported major.minor version 51.0 at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClassCond(ClassLoader.java:637) at java.lang.ClassLoader.defineClass(ClassLoader.java:621) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:141) I know that is due to the hadoop jar for cdh5.4.0 is built with JDK 7. Anyone has done this before? Thanks, -- Chen Song -- Chen Song
Re: build spark 1.4.1 with JDK 1.6
That was only true until Spark 1.3. Spark 1.4 can be built with JDK7 and pyspark will still work. On Fri, Aug 21, 2015 at 8:29 AM, Chen Song chen.song...@gmail.com wrote: Thanks Sean. So how PySpark is supported. I thought PySpark needs jdk 1.6. Chen On Fri, Aug 21, 2015 at 11:16 AM, Sean Owen so...@cloudera.com wrote: Spark 1.4 requires Java 7. On Fri, Aug 21, 2015, 3:12 PM Chen Song chen.song...@gmail.com wrote: I tried to build Spark 1.4.1 on cdh 5.4.0. Because we need to support PySpark, I used JDK 1.6. I got the following error, [INFO] --- scala-maven-plugin:3.2.0:testCompile (scala-test-compile-first) @ spark-streaming_2.10 --- java.lang.UnsupportedClassVersionError: org/apache/hadoop/io/LongWritable : Unsupported major.minor version 51.0 at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClassCond(ClassLoader.java:637) at java.lang.ClassLoader.defineClass(ClassLoader.java:621) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:141) I know that is due to the hadoop jar for cdh5.4.0 is built with JDK 7. Anyone has done this before? Thanks, -- Chen Song -- Chen Song -- --- You received this message because you are subscribed to the Google Groups CDH Users group. To unsubscribe from this group and stop receiving emails from it, send an email to cdh-user+unsubscr...@cloudera.org. For more options, visit https://groups.google.com/a/cloudera.org/d/optout. -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
build spark 1.4.1 with JDK 1.6
I tried to build Spark 1.4.1 on cdh 5.4.0. Because we need to support PySpark, I used JDK 1.6. I got the following error, [INFO] --- scala-maven-plugin:3.2.0:testCompile (scala-test-compile-first) @ spark-streaming_2.10 --- java.lang.UnsupportedClassVersionError: org/apache/hadoop/io/LongWritable : Unsupported major.minor version 51.0 at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClassCond(ClassLoader.java:637) at java.lang.ClassLoader.defineClass(ClassLoader.java:621) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:141) I know that is due to the hadoop jar for cdh5.4.0 is built with JDK 7. Anyone has done this before? Thanks, -- Chen Song
Finding the number of executors.
Is there any reliable way to find out the number of executors programatically - regardless of how the job is run? A method that preferably works for spark-standalone, yarn, mesos, regardless whether the code runs from the shell or not? Things that I tried and don't work: - sparkContext.getExecutorMemoryStatus.size - 1 // works from the shell, does not work if task submitted via spark-submit - sparkContext.getConf.getInt(spark.executor.instances, 1) - doesn't work unless explicitly configured - call to http://master:8080/json (this used to work, but doesn't anymore?) I guess I could parse the output html from the Spark UI... but that seems dumb. is there really no better way? Thanks, Virgil.
RE: Spark ec2 lunch problem
No, the message never end. I have to ctrl-c out of it. Garry From: shahid ashraf [mailto:sha...@trialx.com] Sent: Friday, August 21, 2015 11:13 AM To: Garry Chen g...@cornell.edu Cc: user@spark.apache.org Subject: Re: Spark ec2 lunch problem Does the cluster work at the end ? On Fri, Aug 21, 2015 at 8:25 PM, Garry Chen g...@cornell.edumailto:g...@cornell.edu wrote: Hi All, I am trying to lunch a spark ec2 cluster by running spark-ec2 --key-pair=key --identity-file=my.pem --vpc-id=myvpc --subnet-id=subnet-011 --spark-version=1.4.1 launch spark-cluster but getting following message endless. Please help. Warning: SSH connection error. (This could be temporary.) Host: SSH return code: 255 SSH output: ssh: Could not resolve hostname : Name or service not known -- with Regards Shahid Ashraf
Re: build spark 1.4.1 with JDK 1.6
Spark 1.4 requires Java 7. On Fri, Aug 21, 2015, 3:12 PM Chen Song chen.song...@gmail.com wrote: I tried to build Spark 1.4.1 on cdh 5.4.0. Because we need to support PySpark, I used JDK 1.6. I got the following error, [INFO] --- scala-maven-plugin:3.2.0:testCompile (scala-test-compile-first) @ spark-streaming_2.10 --- java.lang.UnsupportedClassVersionError: org/apache/hadoop/io/LongWritable : Unsupported major.minor version 51.0 at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClassCond(ClassLoader.java:637) at java.lang.ClassLoader.defineClass(ClassLoader.java:621) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:141) I know that is due to the hadoop jar for cdh5.4.0 is built with JDK 7. Anyone has done this before? Thanks, -- Chen Song
Re: SparkSQL concerning materials
Have you seen the Spark SQL paper?: https://people.csail.mit.edu/matei/papers/2015/sigmod_spark_sql.pdf On Thu, Aug 20, 2015 at 11:35 PM, Dawid Wysakowicz wysakowicz.da...@gmail.com wrote: Hi, thanks for answers. I have read answers you provided, but I rather look for some materials on the internals. E.g how the optimizer works, how the query is translated into rdd operations etc. The API I am quite familiar with. A good starting point for me was: Spark DataFrames: Simple and Fast Analysis of Structured Data https://www.brighttalk.com/webcast/12891/166495?utm_campaign=child-community-webcasts-feedutm_content=Big+Data+and+Data+Managementutm_source=brighttalk-portalutm_medium=webutm_term= 2015-08-20 18:29 GMT+02:00 Dhaval Patel dhaval1...@gmail.com: Or if you're a python lover then this is a good place - https://spark.apache.org/docs/1.4.1/api/python/pyspark.sql.html# On Thu, Aug 20, 2015 at 10:58 AM, Ted Yu yuzhih...@gmail.com wrote: See also http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.package Cheers On Thu, Aug 20, 2015 at 7:50 AM, Muhammad Atif muhammadatif...@gmail.com wrote: Hi Dawid The best pace to get started is the Spark SQL Guide from Apache http://spark.apache.org/docs/latest/sql-programming-guide.html Regards Muhammad On Thu, Aug 20, 2015 at 5:46 AM, Dawid Wysakowicz wysakowicz.da...@gmail.com wrote: Hi, I would like to dip into SparkSQL. Get to know better the architecture, good practices, some internals. Could you advise me some materials on this matter? Regards Dawid
Re: Spark ec2 lunch problem
Does the cluster work at the end ? On Fri, Aug 21, 2015 at 8:25 PM, Garry Chen g...@cornell.edu wrote: Hi All, I am trying to lunch a spark ec2 cluster by running spark-ec2 --key-pair=key --identity-file=my.pem --vpc-id=myvpc --subnet-id=subnet-011 --spark-version=1.4.1 launch spark-cluster but getting following message endless. Please help. Warning: SSH connection error. (This could be temporary.) Host: SSH return code: 255 SSH output: ssh: Could not resolve hostname : Name or service not known -- with Regards Shahid Ashraf
Re: Data locality with HDFS not being seen
Hi Sunil, Have you seen this fix in Spark 1.5 that may fix the locality issue?: https://issues.apache.org/jira/browse/SPARK-4352 On Thu, Aug 20, 2015 at 4:09 AM, Sunil sdhe...@gmail.com wrote: Hello . I am seeing some unexpected issues with achieving HDFS data locality. I expect the tasks to be executed only on the node which has the data but this is not happening (ofcourse, unless the node is busy in which case, I understand tasks can go to some other node). Could anyone clarify whats wrong with the way I am trying or what I should rather do? Below is the cluster configuration and experiments that I have tried. Any help will be appreciated. If you would like to recreate the below scenario, then you may use the JavaWordCount.java example given within the spark. *Cluster configuration:* 1. spark-1.4.0 and hadoop-2.7.1 2. Machines -- Master node (master) and 6 worker nodes (node1 to node6) 3. master acts as -- spark master, HDFS name node sec name node, Yarn resource manager 4. Each of the 6 worker nodes act as -- spark worker node, HDFS data node, node manager *Data on HDFS:* 20Mb text file is stored in single block. With the replication factor of 3, the text file is stored on nodes 2, 3 4. *Test-1 (Spark stand alone mode):* Application being run is the standard Java word count count example with the above text file in HDFS, as input. On job submission, I see in the spark web-UI that, stage-0(i.e mapToPair) is being run on random nodes (i.e. node1, node 2, node 6, etc.). By random I mean that, stage 0 executes on the very first worker node that gets registered to the application (this can be looked from the event timeline graph). Rather, I am expecting the stage-0 to be run only on any one of the three nodes 2, 3, or 4. * Test-2 (Yarn cluster mode): * Same as above. No data locality seen. * Additional info: * No other spark applications are running and I have even tried by setting the /spark.locality.wait/ to 10s, but still no difference. Thanks and regards, Sunil -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Data-locality-with-HDFS-not-being-seen-tp24361.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Convert mllib.linalg.Matrix to Breeze
Hi Naveen, As I mentioned before, the code is private therefore not accessible. Just copy and use the snippet that I sent. Copying it here again: https://github.com/apache/spark/blob/43e0135421b2262cbb0e06aae53523f663b4f959/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala#L270 Best, Burak On Thu, Aug 20, 2015 at 9:08 PM, Naveen nav...@formcept.com wrote: Hi, Thanks for the reply. I tried Matrix.toBreeze() which returns the following error: *method toBreeze in trait Matrix cannot be accessed in org.apache.spark.mllib.linalg.Matrix* On Thursday 20 August 2015 07:50 PM, Burak Yavuz wrote: Matrix.toBreeze is a private method. MLlib matrices have the same structure as Breeze Matrices. Just create a new Breeze matrix like this https://github.com/apache/spark/blob/43e0135421b2262cbb0e06aae53523f663b4f959/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala#L270. Best, Burak On Thu, Aug 20, 2015 at 3:28 AM, Yanbo Liang yblia...@gmail.com wrote: You can use Matrix.toBreeze() https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala#L56 . 2015-08-20 18:24 GMT+08:00 Naveen nav...@formcept.com: Hi All, Is there anyway to convert a mllib matrix to a Dense Matrix of Breeze? Any leads are appreciated. Thanks, Naveen - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark streaming multi-tasking during I/O
Hi Sateesh, It is interesting to know , how did you determine that the Dstream runs on a single core. Did you mean receivers? Coming back to your question, could you not start disk io in a separate thread, so that the sceduler can go ahead and assign other tasks ? On 21 Aug 2015 16:06, Sateesh Kavuri sateesh.kav...@gmail.com wrote: Hi, My scenario goes like this: I have an algorithm running in Spark streaming mode on a 4 core virtual machine. Majority of the time, the algorithm does disk I/O and database I/O. Question is, during the I/O, where the CPU is not considerably loaded, is it possible to run any other task/thread so as to efficiently utilize the CPU? Note that one DStream of the algorithm runs completely on a single CPU Thank you, Sateesh
RE: SparkR csv without headers
You could also rename them with names Unfortunately the API doesn't show the example of that https://spark.apache.org/docs/latest/api/R/index.html On Thu, Aug 20, 2015 at 7:43 PM -0700, Sun, Rui rui@intel.com wrote: Hi, You can create a DataFrame using load.df() with a specified schema. Something like: schema - structType(structField(“a”, “string”), structField(“b”, integer), …) read.df ( …, schema = schema) From: Franc Carter [mailto:franc.car...@rozettatech.com] Sent: Wednesday, August 19, 2015 1:48 PM To: user@spark.apache.org Subject: SparkR csv without headers Hi, Does anyone have an example of how to create a DataFrame in SparkR which specifies the column names - the csv files I have do not have column names in the first row. I can get read a csv nicely with com.databricks:spark-csv_2.10:1.0.3, but I end up with column names C1, C2, C3 etc thanks -- Franc Carter I Systems ArchitectI RoZetta Technology [Description: Description: Description: cid:image003.jpg@01D02903.9B540580] L4. 55 Harrington Street, THE ROCKS, NSW, 2000 PO Box H58, Australia Square, Sydney NSW, 1215, AUSTRALIA T +61 2 8355 2515tel:%2B61%202%208355%202515 I www.rozettatechnology.comhttp://www.rozettatechnology.com/ [cid:image002.jpg@01D02903.0B41B280] DISCLAIMER: The contents of this email, inclusive of attachments, may be legally privileged and confidential. Any unauthorised use of the contents is expressly prohibited.
Re: Spark streaming multi-tasking during I/O
You can look at the spark.streaming.concurrentJobs by default it runs a single job. If set it to 2 then it can run 2 jobs parallely. Its an experimental flag, but go ahead and give it a try. On Aug 21, 2015 3:36 AM, Sateesh Kavuri sateesh.kav...@gmail.com wrote: Hi, My scenario goes like this: I have an algorithm running in Spark streaming mode on a 4 core virtual machine. Majority of the time, the algorithm does disk I/O and database I/O. Question is, during the I/O, where the CPU is not considerably loaded, is it possible to run any other task/thread so as to efficiently utilize the CPU? Note that one DStream of the algorithm runs completely on a single CPU Thank you, Sateesh
Re: Set custm worker id ?
You can try adding a humanly readable entry in your /etc/hosts file of the worker machine and then you can set the SPARK_LOCAL_IP pointing to this hostname on that machines spark-env.sh file. On Aug 21, 2015 11:57 AM, saif.a.ell...@wellsfargo.com wrote: Hi, Is it possible in standalone to set up worker ID names? to avoid the worker-19248891237482379-ip..-port ?? Thanks, Saif
Re: DataFrame. SparkPlan / Project serialization issue: ArrayIndexOutOfBounds.
You've probably hit this bug: https://issues.apache.org/jira/browse/SPARK-7180 It's fixed in Spark 1.4.1+. Try setting spark.serializer.extraDebugInfo to false and see if it goes away. On Fri, Aug 21, 2015 at 3:37 AM, Eugene Morozov evgeny.a.moro...@gmail.com wrote: Hi, I'm using spark 1.3.1 built against hadoop 1.0.4 and java 1.7 and I'm trying to save my data frame to parquet. The issue I'm stuck looks like serialization tries to do pretty weird thing: tries to write to an empty array. The last (through stack trace) line of spark code that leads to exception is in method SerializationDebugger.visitSerializable(o: Object, stack: List[String]): List[String]. desc.getObjFieldValues(finalObj, objFieldValues) The reason it does so, is because finalObj is org.apache.spark.sql.execution.Project and objFieldValues is an empty array! As a result there are two fields to read from the Project instance object (happens in java.io.ObjectStreamClass), but there is an empty array to read into. A little bit of code with debug info: private def visitSerializable(o: Object, stack: List[String]): List[String] = { val (finalObj, desc) = findObjectAndDescriptor(o) //finalObj: Project, desc: org.apache.spark.sql.execution.Project val slotDescs = desc.getSlotDescs //java.io.ObjectStreamClass[2] [0: SparkPlan, 1: Project] var i = 0 //i: 0 while (i slotDescs.length) { val slotDesc = slotDescs(i) //slotDesc: org.apache.spark.sql.execution.SparkPlan if (slotDesc.hasWriteObjectMethod) { // TODO: Handle classes that specify writeObject method. } else { val fields: Array[ObjectStreamField] = slotDesc.getFields //fields: java.io.ObjectStreamField[1] [0: Z codegenEnabled] val objFieldValues: Array[Object] = new Array[Object](slotDesc.getNumObjFields) //objFieldValues: java.lang.Object[0] val numPrims = fields.length - objFieldValues.length //numPrims: 1 desc.getObjFieldValues(finalObj, objFieldValues) //leads to exception So it looks like it gets objFieldValues array from the SparkPlan object, but uses it to receive values from Project object. Here is the schema of my data frame root |-- Id: long (nullable = true) |-- explodes: struct (nullable = true) ||-- Identifiers: array (nullable = true) |||-- element: struct (containsNull = true) ||||-- Type: array (nullable = true) |||||-- element: string (containsNull = true) |-- Identifiers: struct (nullable = true) ||-- Type: array (nullable = true) |||-- element: string (containsNull = true) |-- Type2: string (nullable = true) |-- Type: string (nullable = true) Actual stack trace is: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1623) at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:635) at org.apache.spark.sql.execution.Project.execute(basicOperators.scala:40) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:84) at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:887) at com.reltio.analytics.data.application.DataAccessTest.testEntities_NestedAttribute(DataAccessTest.java:199) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) at org.junit.runners.ParentRunner.run(ParentRunner.java:309) at org.junit.runner.JUnitCore.run(JUnitCore.java:160) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:68) Caused by:
Re: Spark ec2 lunch problem
It may happen that the version of spark-ec2 script you are using is buggy or sometime AWS have problem provisioning machines. On Aug 21, 2015 7:56 AM, Garry Chen g...@cornell.edu wrote: Hi All, I am trying to lunch a spark ec2 cluster by running spark-ec2 --key-pair=key --identity-file=my.pem --vpc-id=myvpc --subnet-id=subnet-011 --spark-version=1.4.1 launch spark-cluster but getting following message endless. Please help. Warning: SSH connection error. (This could be temporary.) Host: SSH return code: 255 SSH output: ssh: Could not resolve hostname : Name or service not known
Re: spark streaming 1.3 kafka error
That looks like you are choking your kafka machine. Do a top on the kafka machines and see the workload, it may happen that you are spending too much time on disk io etc. On Aug 21, 2015 7:32 AM, Cody Koeninger c...@koeninger.org wrote: Sounds like that's happening consistently, not an occasional network problem? Look at the Kafka broker logs Make sure you've configured the correct kafka broker hosts / ports (note that direct stream does not use zookeeper host / port). Make sure that host / port is reachable from your driver and worker nodes, ie telnet or netcat to it. It looks like your driver can reach it (since there's partition info in the logs), but that doesn't mean the worker can. Use lsof / netstat to see what's going on with those ports while the job is running, or tcpdump if you need to. If you can't figure out what's going on from a networking point of view, post a minimal reproducible code sample that demonstrates the issue, so it can be tested in a different environment. On Fri, Aug 21, 2015 at 4:06 AM, Shushant Arora shushantaror...@gmail.com wrote: Hi Getting below error in spark streaming 1.3 while consuming from kafka using directkafka stream. Few of tasks are getting failed in each run. What is the reason /solution of this error? 15/08/21 08:54:54 ERROR executor.Executor: Exception in task 262.0 in stage 130.0 (TID 16332) java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at kafka.utils.Utils$.read(Utils.scala:376) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:150) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:162) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:210) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/08/21 08:54:54 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 16348 15/08/21 08:54:54 INFO executor.Executor: Running task 260.1 in stage 130.0 (TID 16348) 15/08/21 08:54:54 INFO kafka.KafkaRDD: Computing topic test_hbrealtimeevents, partition 75 offsets 4701 - 4718 15/08/21 08:54:54 INFO utils.VerifiableProperties: Verifying properties Thanks
How frequently should full gc we expect
In the test job I am running in Spark 1.3.1 in our stage cluster, I can see following information on the application stage information: MetricMin25th percentileMedian75th percentileMaxDuration0 ms1.1 min1.5 min1.7 min3.4 minGC Time11 s16 s21 s25 s54 s From the GC output log, I can see it is about full GC in the executor every minutes, like below. My question is that the committed heap is more than 14G, and -XX:MaxPermSize=128m, in this case, the heap usage max is about 10G, why full GC happened every minute? The job runs fine, just want to know what exception you guys normally have for full GC in the spark jobs? Thanks Yong 2015-08-21T16:53:59.561-0400: [Full GC [PSYoungGen: 328038K-0K(3728384K)] [ParOldGen: 10359817K-5856671K(11185152K)] 10687855K-5856671K(14913536K) [PSPermGen: 57214K-57214K(57856K)], 8.6951450 secs] [Times: user=140.72 sys=0.18, real=8.69 secs] 2015-08-21T16:54:09.605-0400: [GC [PSYoungGen: 1864192K-251539K(3728384K)] 7720863K-6108211K(14913536K), 0.1217750 secs] [Times: user=2.12 sys=0.01, real=0.12 secs] 2015-08-21T16:54:11.131-0400: [GC [PSYoungGen: 2115731K-163448K(3728384K)] 7972404K-6197142K(14913536K), 0.1802910 secs] [Times: user=3.19 sys=0.01, real=0.18 secs] 2015-08-21T16:54:12.832-0400: [GC [PSYoungGen: 2027640K-144369K(3728384K)] 8061339K-6314232K(14913536K), 0.1816010 secs] [Times: user=3.03 sys=0.00, real=0.19 secs] 2015-08-21T16:54:14.547-0400: [GC [PSYoungGen: 2008561K-121478K(3728384K)] 8178424K-6435609K(14913536K), 0.1411160 secs] [Times: user=2.50 sys=0.00, real=0.14 secs] 2015-08-21T16:54:15.931-0400: [GC [PSYoungGen: 1985670K-114489K(3728384K)] 8299801K-6550508K(14913536K), 0.1285300 secs] [Times: user=2.13 sys=0.00, real=0.13 secs] 2015-08-21T16:54:17.323-0400: [GC [PSYoungGen: 1978681K-219811K(3792896K)] 8414700K-6769504K(14978048K), 0.1649230 secs] [Times: user=2.89 sys=0.01, real=0.17 secs] 2015-08-21T16:54:18.878-0400: [GC [PSYoungGen: 2148515K-425173K(3728384K)] 8698218K-6974876K(14913536K), 0.3130360 secs] [Times: user=5.56 sys=0.00, real=0.31 secs] 2015-08-21T16:54:20.596-0400: [GC [PSYoungGen: 2353877K-313071K(3985408K)] 8903582K-7071556K(15170560K), 0.2423240 secs] [Times: user=4.30 sys=0.00, real=0.24 secs] 2015-08-21T16:54:22.695-0400: [GC [PSYoungGen: 2608367K-371370K(3902464K)] 9366852K-7338548K(15087616K), 0.2647510 secs] [Times: user=4.48 sys=0.00, real=0.26 secs] 2015-08-21T16:54:24.747-0400: [GC [PSYoungGen: 266K-459392K(4174336K)] 9633844K-7528652K(15359488K), 0.3564370 secs] [Times: user=6.36 sys=0.00, real=0.35 secs] 2015-08-21T16:54:26.951-0400: [GC [PSYoungGen: 3116160K-445880K(4075008K)] 10185420K-7746897K(15260160K), 0.2853880 secs] [Times: user=5.07 sys=0.00, real=0.29 secs] 2015-08-21T16:54:29.340-0400: [GC [PSYoungGen: 3102648K-286176K(4314112K)] 10403665K-7809242K(15499264K), 0.2534940 secs] [Times: user=4.48 sys=0.01, real=0.25 secs] 2015-08-21T16:54:31.979-0400: [GC [PSYoungGen: 3269600K-122064K(4261888K)] 10792666K-7863493K(15447040K), 0.2035800 secs] [Times: user=3.41 sys=0.00, real=0.20 secs] 2015-08-21T16:54:34.737-0400: [GC [PSYoungGen: 3105488K-555850K(4373504K)] 10846917K-8297279K(15558656K), 0.2401510 secs] [Times: user=4.14 sys=0.00, real=0.24 secs] 2015-08-21T16:54:38.015-0400: [GC [PSYoungGen: 3675978K-1146062K(4266496K)] 11417409K-8887493K(15451648K), 0.4298600 secs] [Times: user=7.65 sys=0.00, real=0.43 secs] 2015-08-21T16:54:41.492-0400: [GC [PSYoungGen: 4266190K-1326063K(3565056K)] 12007627K-9231644K(14750208K), 0.5542100 secs] [Times: user=9.90 sys=0.01, real=0.55 secs] 2015-08-21T16:54:43.797-0400: [GC [PSYoungGen: 3565039K-1587981K(3827200K)] 11470620K-9612725K(15012352K), 0.5359080 secs] [Times: user=9.57 sys=0.00, real=0.54 secs] 2015-08-21T16:54:45.856-0400: [GC [PSYoungGen: 3826957K-1047737K(3629568K)] 11851701K-9914434K(14814720K), 0.7787060 secs] [Times: user=13.91 sys=0.00, real=0.78 secs] 2015-08-21T16:54:48.174-0400: [GC [PSYoungGen: 2911929K-459808K(3728384K)] 11778626K-10058483K(14913536K), 0.5953360 secs] [Times: user=10.62 sys=0.03, real=0.60 secs] 2015-08-21T16:54:50.217-0400: [GC [PSYoungGen: 2324000K-102928K(3740160K)] 11922675K-10159967K(14925312K), 0.3191560 secs] [Times: user=5.68 sys=0.01, real=0.32 secs] 2015-08-21T16:54:51.951-0400: [GC [PSYoungGen: 1978896K-296227K(3728384K)] 12035935K-10456136K(14913536K), 0.1809970 secs] [Times: user=3.02 sys=0.00, real=0.18 secs] 2015-08-21T16:54:53.550-0400: [GC [PSYoungGen: 2172195K-316636K(3866624K)] 12332104K-10720591K(15051776K), 0.2545970 secs] [Times: user=4.43 sys=0.00, real=0.25 secs] 2015-08-21T16:54:55.340-0400: [GC [PSYoungGen: 2390748K-336907K(3800064K)] 12794703K-11043658K(14985216K), 0.3550330 secs] [Times: user=6.28 sys=0.00, real=0.35 secs] 2015-08-21T16:54:55.695-0400: [Full GC [PSYoungGen: 336907K-0K(3800064K)] [ParOldGen: 10706750K-5725402K(11185152K)] 11043658K-5725402K(14985216K) [PSPermGen: 57214K-57214K(57856K)], 9.5623960 secs] [Times: user=150.15
Re: Aggregate to array (or 'slice by key') with DataFrames
Raghavendra, Thanks for the quick reply! I don’t think I included enough information in my question. I am hoping to get fields that are not directly part of the aggregation. Imagine a dataframe representing website views with a userID, datetime, and a webpage address. How could I find the oldest or newest webpage address that an user visited? As I understand it you can only access fields that are part of the aggregation itself. Thanks, Impact On Aug 21, 2015, at 11:11 AM, Raghavendra Pandey raghavendra.pan...@gmail.com wrote: Impact, You can group by the data and then sort it by timestamp and take max to select the oldest value. On Aug 21, 2015 11:15 PM, Impact nat...@skone.org mailto:nat...@skone.org wrote: I am also looking for a way to achieve the reducebykey functionality on data frames. In my case I need to select one particular row (the oldest, based on a timestamp column value) by key. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Aggregate-to-array-or-slice-by-key-with-DataFrames-tp23636p24399.html http://apache-spark-user-list.1001560.n3.nabble.com/Aggregate-to-array-or-slice-by-key-with-DataFrames-tp23636p24399.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org
Re: Aggregate to array (or 'slice by key') with DataFrames
Did you try sorting it by datetime and doing a groupBy on the userID? On Aug 21, 2015 12:47 PM, Nathan Skone nat...@skone.org wrote: Raghavendra, Thanks for the quick reply! I don’t think I included enough information in my question. I am hoping to get fields that are not directly part of the aggregation. Imagine a dataframe representing website views with a userID, datetime, and a webpage address. How could I find the oldest or newest webpage address that an user visited? As I understand it you can only access fields that are part of the aggregation itself. Thanks, Impact On Aug 21, 2015, at 11:11 AM, Raghavendra Pandey raghavendra.pan...@gmail.com wrote: Impact, You can group by the data and then sort it by timestamp and take max to select the oldest value. On Aug 21, 2015 11:15 PM, Impact nat...@skone.org wrote: I am also looking for a way to achieve the reducebykey functionality on data frames. In my case I need to select one particular row (the oldest, based on a timestamp column value) by key. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Aggregate-to-array-or-slice-by-key-with-DataFrames-tp23636p24399.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Finding the number of executors.
Hi Akhil, I'm using spark 1.4.1. Number of executors is not in the command line, not in the getExecutorMemoryStatus (I already mentioned that I tried that, works in spark-shell but not when executed via spark-submit). I tried looking at defaultParallelism too, it's 112 (7 executors * 16 cores) when ran via spark-shell, but just 2 when ran via spark-submit. But the scheduler obviously knows this information. It *must* know it. How can I access it? Other that parsing the HTML of the WebUI, that is... that's pretty much guaranteed to work, and maybe I'll do that, but it's extremely convoluted. Regards, Virgil. On Fri, Aug 21, 2015 at 11:35 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Which version spark are you using? There was a discussion happened over here http://apache-spark-user-list.1001560.n3.nabble.com/Determine-number-of-running-executors-td19453.html http://mail-archives.us.apache.org/mod_mbox/spark-user/201411.mbox/%3ccacbyxk+ya1rbbnkwjheekpnbsbh10rykuzt-laqgpdanvhm...@mail.gmail.com%3E On Aug 21, 2015 7:42 AM, Virgil Palanciuc vir...@palanciuc.eu wrote: Is there any reliable way to find out the number of executors programatically - regardless of how the job is run? A method that preferably works for spark-standalone, yarn, mesos, regardless whether the code runs from the shell or not? Things that I tried and don't work: - sparkContext.getExecutorMemoryStatus.size - 1 // works from the shell, does not work if task submitted via spark-submit - sparkContext.getConf.getInt(spark.executor.instances, 1) - doesn't work unless explicitly configured - call to http://master:8080/json (this used to work, but doesn't anymore?) I guess I could parse the output html from the Spark UI... but that seems dumb. is there really no better way? Thanks, Virgil.
Re: Finding the number of executors.
Which version spark are you using? There was a discussion happened over here http://apache-spark-user-list.1001560.n3.nabble.com/Determine-number-of-running-executors-td19453.html http://mail-archives.us.apache.org/mod_mbox/spark-user/201411.mbox/%3ccacbyxk+ya1rbbnkwjheekpnbsbh10rykuzt-laqgpdanvhm...@mail.gmail.com%3E On Aug 21, 2015 7:42 AM, Virgil Palanciuc vir...@palanciuc.eu wrote: Is there any reliable way to find out the number of executors programatically - regardless of how the job is run? A method that preferably works for spark-standalone, yarn, mesos, regardless whether the code runs from the shell or not? Things that I tried and don't work: - sparkContext.getExecutorMemoryStatus.size - 1 // works from the shell, does not work if task submitted via spark-submit - sparkContext.getConf.getInt(spark.executor.instances, 1) - doesn't work unless explicitly configured - call to http://master:8080/json (this used to work, but doesn't anymore?) I guess I could parse the output html from the Spark UI... but that seems dumb. is there really no better way? Thanks, Virgil.
Re: Worker Machine running out of disk for Long running Streaming process
Could you periodically (say every 10 mins) run System.gc() on the driver. The cleaning up shuffles is tied to the garbage collection. On Fri, Aug 21, 2015 at 2:59 AM, gaurav sharma sharmagaura...@gmail.com wrote: Hi All, I have a 24x7 running Streaming Process, which runs on 2 hour windowed data The issue i am facing is my worker machines are running OUT OF DISK space I checked that the SHUFFLE FILES are not getting cleaned up. /log/spark-2b875d98-1101-4e61-86b4-67c9e71954cc/executor-5bbb53c1-cee9-4438-87a2-b0f2becfac6f/blockmgr-c905b93b-c817-4124-a774-be1e706768c1//00/shuffle_2739_5_0.data Ultimately the machines runs out of Disk Spac i read about *spark.cleaner.ttl *config param which what i can understand from the documentation, says cleans up all the metadata beyond the time limit. I went through https://issues.apache.org/jira/browse/SPARK-5836 it says resolved, but there is no code commit Can anyone please throw some light on the issue.
Re: Spark Sql behaves strangely with tables with a lot of partitions
@Cheng, Hao : Physical plans show that it got stuck on scanning S3! (table is partitioned by date_prefix and hour) explain select count(*) from test_table where date_prefix='20150819' and hour='00'; TungstenAggregate(key=[], value=[(count(1),mode=Final,isDistinct=false)] TungstenExchange SinglePartition TungstenAggregate(key=[], value=[(count(1),mode=Partial,isDistinct=false)] Scan ParquetRelation[ .. about 1000 partition paths go here ] Why does spark have to scan all partitions when the query only concerns with 1 partitions? Doesn't it defeat the purpose of partitioning? Thanks! On Thu, Aug 20, 2015 at 4:12 PM, Philip Weaver philip.wea...@gmail.com wrote: I hadn't heard of spark.sql.sources.partitionDiscovery.enabled before, and I couldn't find much information about it online. What does it mean exactly to disable it? Are there any negative consequences to disabling it? On Wed, Aug 19, 2015 at 10:53 PM, Cheng, Hao hao.ch...@intel.com wrote: Can you make some more profiling? I am wondering if the driver is busy with scanning the HDFS / S3. Like jstack pid of driver process And also, it’s will be great if you can paste the physical plan for the simple query. *From:* Jerrick Hoang [mailto:jerrickho...@gmail.com] *Sent:* Thursday, August 20, 2015 1:46 PM *To:* Cheng, Hao *Cc:* Philip Weaver; user *Subject:* Re: Spark Sql behaves strangely with tables with a lot of partitions I cloned from TOT after 1.5.0 cut off. I noticed there were a couple of CLs trying to speed up spark sql with tables with a huge number of partitions, I've made sure that those CLs are included but it's still very slow On Wed, Aug 19, 2015 at 10:43 PM, Cheng, Hao hao.ch...@intel.com wrote: Yes, you can try set the spark.sql.sources.partitionDiscovery.enabled to false. BTW, which version are you using? Hao *From:* Jerrick Hoang [mailto:jerrickho...@gmail.com] *Sent:* Thursday, August 20, 2015 12:16 PM *To:* Philip Weaver *Cc:* user *Subject:* Re: Spark Sql behaves strangely with tables with a lot of partitions I guess the question is why does spark have to do partition discovery with all partitions when the query only needs to look at one partition? Is there a conf flag to turn this off? On Wed, Aug 19, 2015 at 9:02 PM, Philip Weaver philip.wea...@gmail.com wrote: I've had the same problem. It turns out that Spark (specifically parquet) is very slow at partition discovery. It got better in 1.5 (not yet released), but was still unacceptably slow. Sadly, we ended up reading parquet files manually in Python (via C++) and had to abandon Spark SQL because of this problem. On Wed, Aug 19, 2015 at 7:51 PM, Jerrick Hoang jerrickho...@gmail.com wrote: Hi all, I did a simple experiment with Spark SQL. I created a partitioned parquet table with only one partition (date=20140701). A simple `select count(*) from table where date=20140701` would run very fast (0.1 seconds). However, as I added more partitions the query takes longer and longer. When I added about 10,000 partitions, the query took way too long. I feel like querying for a single partition should not be affected by having more partitions. Is this a known behaviour? What does spark try to do here? Thanks, Jerrick
Re: Spark Sql behaves strangely with tables with a lot of partitions
Did you try with hadoop version 2.7.1 .. It is known that s3a works really well with parquet which is available in 2.7. They fixed lot of issues related to metadata reading there... On Aug 21, 2015 11:24 PM, Jerrick Hoang jerrickho...@gmail.com wrote: @Cheng, Hao : Physical plans show that it got stuck on scanning S3! (table is partitioned by date_prefix and hour) explain select count(*) from test_table where date_prefix='20150819' and hour='00'; TungstenAggregate(key=[], value=[(count(1),mode=Final,isDistinct=false)] TungstenExchange SinglePartition TungstenAggregate(key=[], value=[(count(1),mode=Partial,isDistinct=false)] Scan ParquetRelation[ .. about 1000 partition paths go here ] Why does spark have to scan all partitions when the query only concerns with 1 partitions? Doesn't it defeat the purpose of partitioning? Thanks! On Thu, Aug 20, 2015 at 4:12 PM, Philip Weaver philip.wea...@gmail.com wrote: I hadn't heard of spark.sql.sources.partitionDiscovery.enabled before, and I couldn't find much information about it online. What does it mean exactly to disable it? Are there any negative consequences to disabling it? On Wed, Aug 19, 2015 at 10:53 PM, Cheng, Hao hao.ch...@intel.com wrote: Can you make some more profiling? I am wondering if the driver is busy with scanning the HDFS / S3. Like jstack pid of driver process And also, it’s will be great if you can paste the physical plan for the simple query. *From:* Jerrick Hoang [mailto:jerrickho...@gmail.com] *Sent:* Thursday, August 20, 2015 1:46 PM *To:* Cheng, Hao *Cc:* Philip Weaver; user *Subject:* Re: Spark Sql behaves strangely with tables with a lot of partitions I cloned from TOT after 1.5.0 cut off. I noticed there were a couple of CLs trying to speed up spark sql with tables with a huge number of partitions, I've made sure that those CLs are included but it's still very slow On Wed, Aug 19, 2015 at 10:43 PM, Cheng, Hao hao.ch...@intel.com wrote: Yes, you can try set the spark.sql.sources.partitionDiscovery.enabled to false. BTW, which version are you using? Hao *From:* Jerrick Hoang [mailto:jerrickho...@gmail.com] *Sent:* Thursday, August 20, 2015 12:16 PM *To:* Philip Weaver *Cc:* user *Subject:* Re: Spark Sql behaves strangely with tables with a lot of partitions I guess the question is why does spark have to do partition discovery with all partitions when the query only needs to look at one partition? Is there a conf flag to turn this off? On Wed, Aug 19, 2015 at 9:02 PM, Philip Weaver philip.wea...@gmail.com wrote: I've had the same problem. It turns out that Spark (specifically parquet) is very slow at partition discovery. It got better in 1.5 (not yet released), but was still unacceptably slow. Sadly, we ended up reading parquet files manually in Python (via C++) and had to abandon Spark SQL because of this problem. On Wed, Aug 19, 2015 at 7:51 PM, Jerrick Hoang jerrickho...@gmail.com wrote: Hi all, I did a simple experiment with Spark SQL. I created a partitioned parquet table with only one partition (date=20140701). A simple `select count(*) from table where date=20140701` would run very fast (0.1 seconds). However, as I added more partitions the query takes longer and longer. When I added about 10,000 partitions, the query took way too long. I feel like querying for a single partition should not be affected by having more partitions. Is this a known behaviour? What does spark try to do here? Thanks, Jerrick
Re: Tungsten and sun.misc.Unsafe
Thanks Reynold, that helps a lot. I'm glad you're involved with that Google Doc community effort. I think it's because of that doc that the JEP's wording and scope changed for the better since it originally got introduced. Marek On Fri, Aug 21, 2015 at 11:18 AM, Reynold Xin r...@databricks.com wrote: I'm actually somewhat involved with the Google Docs you linked to. I don't think Oracle will remove Unsafe in JVM 9. As you said, JEP 260 already proposes making Unsafe available. Given the widespread use of Unsafe for performance and advanced functionalities, I don't think Oracle can just remove it in one release. If they do, there will be strong backlash and the act would significantly undermine the credibility of the JVM as a long-term platform. Note that for Spark itself, we move pretty fast and can replace all the use of Unsafe with a newer alternative in one release if absolutely necessary (the actual coding takes only a day or two). On Fri, Aug 21, 2015 at 5:29 AM, Marek Kolodziej mkolod@gmail.com wrote: Hello, I attended the Tungsten-related presentations at Spark Summit (by Josh Rosen) and at Big Data Scala (by Matei Zaharia). Needless to say, this project holds great promise for major performance improvements. At Josh's talk, I heard about the use of sun.misc.Unsafe as a way of achieving some of these optimizations (e.g. slides 11-17 of Josh's presentation: http://www.slideshare.net/SparkSummit/deep-dive-into-project-tungsten-josh-rosen). I have no problems with the use of Unsafe in the code itself (I've done it before myself, too), however I think there is a considerable risk associated with beginning the use of Unsafe now, because Oracle is determined to limit access to APIs such as Unsafe starting in Java 9. JEP 260 http://openjdk.java.net/jeps/260 was filed specifically to limit access to internal JDK APIs that were never intended for external use, including sun.misc.* The JEP does say that the functionality of sun.misc.Unsafe is to remain available even as other internal APIs are blocked for non-JDK use, however, it also says that the functionality of many methods of this class is now available via *variable handles (JEP 193 http://openjdk.java.net/jeps/193).* If the direct access to sun.misc.Unsafe is blocked and only the variable handles access remains, this may mean more than just a need for code refactoring - functionality such as doing malloc from Spark core may be restricted. JEP 260 has evolved quite a bit over time and the wording available now (after the Aug. 4, 2015) seems more reasonable than before. Nevertheless, Hazelcast and other companies whose technologies depend on the availability of Unsafe started a Google doc here https://docs.google.com/document/d/1GDm_cAxYInmoHMor-AkStzWvwE9pw6tnz_CebJQxuUE/edit#heading=h.brct71tr6e13 . I doubt that Oracle would want to make life difficult for everyone. In addition to Spark's code base, projects such as Akka, Cassandra, Hibernate, Netty, Neo4j and Spring (among many others) depend on Unsafe. Still, there are tons of posts about this issue in the Java community (e.g. here https://jaxenter.com/hazelcast-on-java-unsafe-class-119286.html's a Hazelcast interview, also from Aug. 3, the day before the latest update to JEP 260). There are tons of concerned posts on the blogosphere, too (e.g. here http://blog.dripstat.com/removal-of-sun-misc-unsafe-a-disaster-in-the-making/ ). Have the leaders of the Spark community been following these Unsafe-related developments and if so, what's Spark's plan of handling whatever Oracle throws our way? Marek
Re: Aggregate to array (or 'slice by key') with DataFrames
Impact, You can group by the data and then sort it by timestamp and take max to select the oldest value. On Aug 21, 2015 11:15 PM, Impact nat...@skone.org wrote: I am also looking for a way to achieve the reducebykey functionality on data frames. In my case I need to select one particular row (the oldest, based on a timestamp column value) by key. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Aggregate-to-array-or-slice-by-key-with-DataFrames-tp23636p24399.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Remoting warning when submitting to cluster
I believe this was caused by some network configuration on my machines. After installing VirtualBox, some new network interfaces were installed on the machines and the Akka software was binding to one of the VirtualBox interfaces and not the interface that belonged to my Ethernet card. Once I disabled the VirtualBox internet interfaces, things worked more reliably. -Javier From: sumit.anvekar [via Apache Spark User List] [mailto:ml-node+s1001560n24377...@n3.nabble.com] Sent: Thursday, August 20, 2015 9:32 PM To: Javier Delgadillo Subject: Re: Remoting warning when submitting to cluster Were you able to figure out what the issue is? I am also facing the same issue but with a EC2 (1 master, 2 worker) setup. also, I am trying to create RDD with data from remote Cassandra. My program jar has all the dependencies needed. If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Remoting-warning-when-submitting-to-cluster-tp22733p24377.html To unsubscribe from Remoting warning when submitting to cluster, click herehttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=22733code=amRlbGdhZGlsbG9AZXNyaS5jb218MjI3MzN8NzU3Mjg2MDAz. NAMLhttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Remoting-warning-when-submitting-to-cluster-tp22733p24397.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Set custm worker id ?
Hi, Is it possible in standalone to set up worker ID names? to avoid the worker-19248891237482379-ip..-port ?? Thanks, Saif
Re: Aggregate to array (or 'slice by key') with DataFrames
I am also looking for a way to achieve the reducebykey functionality on data frames. In my case I need to select one particular row (the oldest, based on a timestamp column value) by key. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Aggregate-to-array-or-slice-by-key-with-DataFrames-tp23636p24399.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Aggregate to array (or 'slice by key') with DataFrames
Nathan, I achieve this using rowNumber. Here is a Python DataFrame example: from pyspark.sql.window import Window from pyspark.sql.functions import desc, rowNumber yourOutputDF = ( yourInputDF .withColumn(first, rowNumber() .over(Window.partitionBy(userID).orderBy(datetime)) ) .withColumn(last, rowNumber() .over(Window.partitionBy(userID).orderBy(desc(datetime))) ) ) You can get the first url like this: yourOutputDF.filter(first=1).select(userID, url) ...and the last like this: yourOutputDF.filter(last=1).select(userID, url) If you wanted the first and last url as columns with one row per userID, you could do a groupBy and take the max of a when column that returns the url if last is 1, or null otherwise. (You would need a similar column where first is 1.) Not sure if this makes sense, but I don't have time now to provide a code example. Regards, Dan On Fri, Aug 21, 2015 at 4:09 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Did you try sorting it by datetime and doing a groupBy on the userID? On Aug 21, 2015 12:47 PM, Nathan Skone nat...@skone.org wrote: Raghavendra, Thanks for the quick reply! I don’t think I included enough information in my question. I am hoping to get fields that are not directly part of the aggregation. Imagine a dataframe representing website views with a userID, datetime, and a webpage address. How could I find the oldest or newest webpage address that an user visited? As I understand it you can only access fields that are part of the aggregation itself. Thanks, Impact On Aug 21, 2015, at 11:11 AM, Raghavendra Pandey raghavendra.pan...@gmail.com wrote: Impact, You can group by the data and then sort it by timestamp and take max to select the oldest value. On Aug 21, 2015 11:15 PM, Impact nat...@skone.org wrote: I am also looking for a way to achieve the reducebykey functionality on data frames. In my case I need to select one particular row (the oldest, based on a timestamp column value) by key. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Aggregate-to-array-or-slice-by-key-with-DataFrames-tp23636p24399.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Sql behaves strangely with tables with a lot of partitions
Is there a workaround without updating Hadoop? Would really appreciate if someone can explain what spark is trying to do here and what is an easy way to turn this off. Thanks all! On Fri, Aug 21, 2015 at 11:09 AM, Raghavendra Pandey raghavendra.pan...@gmail.com wrote: Did you try with hadoop version 2.7.1 .. It is known that s3a works really well with parquet which is available in 2.7. They fixed lot of issues related to metadata reading there... On Aug 21, 2015 11:24 PM, Jerrick Hoang jerrickho...@gmail.com wrote: @Cheng, Hao : Physical plans show that it got stuck on scanning S3! (table is partitioned by date_prefix and hour) explain select count(*) from test_table where date_prefix='20150819' and hour='00'; TungstenAggregate(key=[], value=[(count(1),mode=Final,isDistinct=false)] TungstenExchange SinglePartition TungstenAggregate(key=[], value=[(count(1),mode=Partial,isDistinct=false)] Scan ParquetRelation[ .. about 1000 partition paths go here ] Why does spark have to scan all partitions when the query only concerns with 1 partitions? Doesn't it defeat the purpose of partitioning? Thanks! On Thu, Aug 20, 2015 at 4:12 PM, Philip Weaver philip.wea...@gmail.com wrote: I hadn't heard of spark.sql.sources.partitionDiscovery.enabled before, and I couldn't find much information about it online. What does it mean exactly to disable it? Are there any negative consequences to disabling it? On Wed, Aug 19, 2015 at 10:53 PM, Cheng, Hao hao.ch...@intel.com wrote: Can you make some more profiling? I am wondering if the driver is busy with scanning the HDFS / S3. Like jstack pid of driver process And also, it’s will be great if you can paste the physical plan for the simple query. *From:* Jerrick Hoang [mailto:jerrickho...@gmail.com] *Sent:* Thursday, August 20, 2015 1:46 PM *To:* Cheng, Hao *Cc:* Philip Weaver; user *Subject:* Re: Spark Sql behaves strangely with tables with a lot of partitions I cloned from TOT after 1.5.0 cut off. I noticed there were a couple of CLs trying to speed up spark sql with tables with a huge number of partitions, I've made sure that those CLs are included but it's still very slow On Wed, Aug 19, 2015 at 10:43 PM, Cheng, Hao hao.ch...@intel.com wrote: Yes, you can try set the spark.sql.sources.partitionDiscovery.enabled to false. BTW, which version are you using? Hao *From:* Jerrick Hoang [mailto:jerrickho...@gmail.com] *Sent:* Thursday, August 20, 2015 12:16 PM *To:* Philip Weaver *Cc:* user *Subject:* Re: Spark Sql behaves strangely with tables with a lot of partitions I guess the question is why does spark have to do partition discovery with all partitions when the query only needs to look at one partition? Is there a conf flag to turn this off? On Wed, Aug 19, 2015 at 9:02 PM, Philip Weaver philip.wea...@gmail.com wrote: I've had the same problem. It turns out that Spark (specifically parquet) is very slow at partition discovery. It got better in 1.5 (not yet released), but was still unacceptably slow. Sadly, we ended up reading parquet files manually in Python (via C++) and had to abandon Spark SQL because of this problem. On Wed, Aug 19, 2015 at 7:51 PM, Jerrick Hoang jerrickho...@gmail.com wrote: Hi all, I did a simple experiment with Spark SQL. I created a partitioned parquet table with only one partition (date=20140701). A simple `select count(*) from table where date=20140701` would run very fast (0.1 seconds). However, as I added more partitions the query takes longer and longer. When I added about 10,000 partitions, the query took way too long. I feel like querying for a single partition should not be affected by having more partitions. Is this a known behaviour? What does spark try to do here? Thanks, Jerrick
Re: Finding the number of executors.
Following is a method that retrieves the list of executors registered to a spark context. It worked perfectly with spark-submit in standalone mode for my project. /** * A simplified method that just returns the current active/registered executors * excluding the driver. * @param sc * The spark context to retrieve registered executors. * @return * A list of executors each in the form of host:port. */ def currentActiveExecutors(sc: SparkContext): Seq[String] = { val allExecutors = sc.getExecutorMemoryStatus.map(_._1) val driverHost: String = sc.getConf.get(spark.driver.host) allExecutors.filter(! _.split(:)(0).equals(driverHost)).toList } On Friday, August 21, 2015 1:53 PM, Virgil Palanciuc virg...@gmail.com wrote: Hi Akhil, I'm using spark 1.4.1. Number of executors is not in the command line, not in the getExecutorMemoryStatus (I already mentioned that I tried that, works in spark-shell but not when executed via spark-submit). I tried looking at defaultParallelism too, it's 112 (7 executors * 16 cores) when ran via spark-shell, but just 2 when ran via spark-submit. But the scheduler obviously knows this information. It *must* know it. How can I access it? Other that parsing the HTML of the WebUI, that is... that's pretty much guaranteed to work, and maybe I'll do that, but it's extremely convoluted. Regards,Virgil. On Fri, Aug 21, 2015 at 11:35 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Which version spark are you using? There was a discussion happened over here http://apache-spark-user-list.1001560.n3.nabble.com/Determine-number-of-running-executors-td19453.htmlhttp://mail-archives.us.apache.org/mod_mbox/spark-user/201411.mbox/%3ccacbyxk+ya1rbbnkwjheekpnbsbh10rykuzt-laqgpdanvhm...@mail.gmail.com%3EOn Aug 21, 2015 7:42 AM, Virgil Palanciuc vir...@palanciuc.eu wrote: Is there any reliable way to find out the number of executors programatically - regardless of how the job is run? A method that preferably works for spark-standalone, yarn, mesos, regardless whether the code runs from the shell or not? Things that I tried and don't work:- sparkContext.getExecutorMemoryStatus.size - 1 // works from the shell, does not work if task submitted via spark-submit- sparkContext.getConf.getInt(spark.executor.instances, 1) - doesn't work unless explicitly configured- call to http://master:8080/json (this used to work, but doesn't anymore?) I guess I could parse the output html from the Spark UI... but that seems dumb. is there really no better way? Thanks,Virgil.
Having Clause with variation and stddev
Hi, Exception thrown when using Having Clause with variation or stddev. It works perfectly when using other aggregate functions(Like sum,count,min,max..) SELECT SUM(1) AS `sum_number_of_records_ok` FROM `some_db`.`some_table` `some_table` GROUP BY 1 HAVING (STDDEV(1) 0) SELECT SUM(1) AS `sum_number_of_records_ok` FROM `some_db`.`some_table` `some_table` GROUP BY 1 HAVING (VARIANCE(1) 0) Could you please share any other way for using this kind query? Regards, Ravi
Re: spark kafka partitioning
when i send the message from kafka topic having three partitions. Spark will listen the message when i say kafkautils.createStream or createDirectstSream have local[4] Now i want to see if spark will create partitions when it receive message from kafka using dstream, how and where ,prwhich method of spark api i have to see to find out On 8/21/15, Gaurav Agarwal gaurav130...@gmail.com wrote: Hello Regarding Spark Streaming and Kafka Partitioning When i send message on kafka topic with 3 partitions and listens on kafkareceiver with local value[4] . how will i come to know in Spark Streaming that different Dstreams are created according to partitions of kafka messages . Thanks - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkSQL concerning materials
Hi, thanks for answers. I have read answers you provided, but I rather look for some materials on the internals. E.g how the optimizer works, how the query is translated into rdd operations etc. The API I am quite familiar with. A good starting point for me was: Spark DataFrames: Simple and Fast Analysis of Structured Data https://www.brighttalk.com/webcast/12891/166495?utm_campaign=child-community-webcasts-feedutm_content=Big+Data+and+Data+Managementutm_source=brighttalk-portalutm_medium=webutm_term= 2015-08-20 18:29 GMT+02:00 Dhaval Patel dhaval1...@gmail.com: Or if you're a python lover then this is a good place - https://spark.apache.org/docs/1.4.1/api/python/pyspark.sql.html# On Thu, Aug 20, 2015 at 10:58 AM, Ted Yu yuzhih...@gmail.com wrote: See also http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.package Cheers On Thu, Aug 20, 2015 at 7:50 AM, Muhammad Atif muhammadatif...@gmail.com wrote: Hi Dawid The best pace to get started is the Spark SQL Guide from Apache http://spark.apache.org/docs/latest/sql-programming-guide.html Regards Muhammad On Thu, Aug 20, 2015 at 5:46 AM, Dawid Wysakowicz wysakowicz.da...@gmail.com wrote: Hi, I would like to dip into SparkSQL. Get to know better the architecture, good practices, some internals. Could you advise me some materials on this matter? Regards Dawid
ClassCastException when saving a DataFrame to parquet file (saveAsParquetFile, Spark 1.3.1) using Scala
Hi, I was trying to programmatically specify a schema and apply it to a RDD of Rows and save the resulting DataFrame as a parquet file. Here's what I did: 1. Created an RDD of Rows from RDD[Array[String]]: val gameId= Long.valueOf(line(0)) val accountType = Long.valueOf(line(1)) val worldId = Long.valueOf(line(2)) val dtEventTime = line(3) val iEventId = line(4) val vVersionId = line(5) val vUin = line(6) val vClientIp = line(7) val vZoneId = line(8) val dtCreateTime = line(9) val iFeeFlag = Long.valueOf(line(10)) val vLoginWay = line(11) return Row(gameId, accountType, worldId, dtEventTime, iEventId, vVersionId, vUin, vClientIp, vZoneId, dtCreateTime, vZoneId, dtCreateTime, iFeeFlag, vLoginWay)
Re: what determine the task size?
The OP wants to understand what determines the size of the task code that is shipped to each executor so it can run the task. I don't know the answer to but would be interested to know too. Sent from my iPhone On 21 Aug 2015, at 08:26, oubrik [via Apache Spark User List] ml-node+s1001560n24380...@n3.nabble.com wrote: Hi You mean user code ? If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/what-determine-the-task-size-tp24363p24380.html To start a new topic under Apache Spark User List, email ml-node+s1001560n1...@n3.nabble.com To unsubscribe from Apache Spark User List, click here. NAML -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/what-determine-the-task-size-tp24363p24384.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: PySpark concurrent jobs using single SparkContext
It seems like you want simultaneous processing of multiple jobs but at the same time serialization of few tasks within those jobs. I don't know how to achieve that in Spark. But, why would you bother about the inter-weaved processing when the data that is being aggregated in different jobs is per customer per day? Is it that save_aggregate depends on results of other customers and/or other days? I also don't understand how you would achieve that with yarn because interweaving of tasks of separately submitted jobs may happen with dynamic executor allocation as well. Hemant On Thu, Aug 20, 2015 at 7:04 PM, Mike Sukmanowsky mike.sukmanow...@gmail.com wrote: Hi all, We're using Spark 1.3.0 via a small YARN cluster to do some log processing. The jobs are pretty simple, for a number of customers and a number of days, fetch some event log data, build aggregates and store those aggregates into a data store. The way our script is written right now does something akin to: with SparkContext() as sc: for customer in customers: for day in days: logs = sc.textFile(get_logs(customer, day)) aggregate = make_aggregate(logs) # This function contains the action saveAsNewAPIHadoopFile which # triggers a save save_aggregate(aggregate) So we have a Spark job per customer, per day. I tried doing some parallel job submission with something similar to: def make_and_save_aggregate(customer, day, spark_context): # Without a separate threading.Lock() here or better yet, one guarding the # Spark context, multiple customer/day transformations and actions could # be interweaved sc = spark_context logs = sc.textFile(get_logs(customer, day)) aggregate = make_aggregate(logs) save_aggregate(aggregate) with SparkContext() as sc, futures.ThreadPoolExecutor(4) as executor: for customer in customers: for day in days: executor.submit(make_and_save_aggregate, customer, day, sc) The problem is, with no locks on a SparkContext except during initialization https://github.com/apache/spark/blob/v1.3.0/python/pyspark/context.py#L214-L241 and shutdown https://github.com/apache/spark/blob/v1.3.0/python/pyspark/context.py#L296-L307, operations on the context could (if I understand correctly) be interweaved leading to DAG which contains transformations out of order and from different customer, day periods. One solution is instead to launch multiple Spark jobs via spark-submit and let YARN/Spark's dynamic executor allocation take care of fair scheduling. In practice, this doesn't seem to yield very fast computation perhaps due to some additional overhead with YARN. Is there any safe way to launch concurrent jobs like this using a single PySpark context? -- Mike Sukmanowsky Aspiring Digital Carpenter *e*: mike.sukmanow...@gmail.com LinkedIn http://www.linkedin.com/profile/view?id=10897143 | github https://github.com/msukmanowsky
Re:SPARK sql :Need JSON back isntead of roq
please try DataFrame.toJSON, it will give you an RDD of JSON string. At 2015-08-21 15:59:43, smagadi sudhindramag...@fico.com wrote: val teenagers = sqlContext.sql(SELECT name FROM people WHERE age = 13 AND age = 19) I need teenagers to be a JSON object rather a simple row .How can we get that done ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SPARK-sql-Need-JSON-back-isntead-of-roq-tp24381.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Transformation not happening for reduceByKey or GroupByKey
Yes, DSE 4.7 Regards, Satish Chandra On Fri, Aug 21, 2015 at 3:06 PM, Robin East robin.e...@xense.co.uk wrote: Not sure, never used dse - it’s part of DataStax Enterprise right? On 21 Aug 2015, at 10:07, satish chandra j jsatishchan...@gmail.com wrote: HI Robin, Yes, below mentioned piece or code works fine in Spark Shell but the same when place in Script File and executed with -i file name it creating an empty RDD scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40))) pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77] at makeRDD at console:28 scala pairs.reduceByKey((x,y) = x + y).collect res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40)) Command: dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile I understand, I am missing something here due to which my final RDD does not have as required output Regards, Satish Chandra On Thu, Aug 20, 2015 at 8:23 PM, Robin East robin.e...@xense.co.uk wrote: This works for me: scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40))) pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77] at makeRDD at console:28 scala pairs.reduceByKey((x,y) = x + y).collect res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40)) On 20 Aug 2015, at 11:05, satish chandra j jsatishchan...@gmail.com wrote: HI All, I have data in RDD as mentioned below: RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40)) I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on Values for each key Code: RDD.reduceByKey((x,y) = x+y) RDD.take(3) Result in console: RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at console:73 res:Array[(Int,Int)] = Array() Command as mentioned dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile Please let me know what is missing in my code, as my resultant Array is empty Regards, Satish
Re: Transformation not happening for reduceByKey or GroupByKey
HI Robin, Yes, it is DSE but issue is related to Spark only Regards, Satish Chandra On Fri, Aug 21, 2015 at 3:06 PM, Robin East robin.e...@xense.co.uk wrote: Not sure, never used dse - it’s part of DataStax Enterprise right? On 21 Aug 2015, at 10:07, satish chandra j jsatishchan...@gmail.com wrote: HI Robin, Yes, below mentioned piece or code works fine in Spark Shell but the same when place in Script File and executed with -i file name it creating an empty RDD scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40))) pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77] at makeRDD at console:28 scala pairs.reduceByKey((x,y) = x + y).collect res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40)) Command: dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile I understand, I am missing something here due to which my final RDD does not have as required output Regards, Satish Chandra On Thu, Aug 20, 2015 at 8:23 PM, Robin East robin.e...@xense.co.uk wrote: This works for me: scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40))) pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77] at makeRDD at console:28 scala pairs.reduceByKey((x,y) = x + y).collect res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40)) On 20 Aug 2015, at 11:05, satish chandra j jsatishchan...@gmail.com wrote: HI All, I have data in RDD as mentioned below: RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40)) I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on Values for each key Code: RDD.reduceByKey((x,y) = x+y) RDD.take(3) Result in console: RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at console:73 res:Array[(Int,Int)] = Array() Command as mentioned dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile Please let me know what is missing in my code, as my resultant Array is empty Regards, Satish
Re: SPARK sql :Need JSON back isntead of roq
teenagers .toJSON gives the json but it does not preserve the parent ids meaning if the input was {name:Yin, address:{city:Columbus,state:Ohio},age:20} val x= sqlContext.sql(SELECT name, address.city, address.state ,age FROM people where age19 and age =30 ).toJSON x.collect().foreach(println) This returns back , missing address. {name:Yin,city:Columbus,state:Ohio,age:20} Is this a bug ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SPARK-sql-Need-JSON-back-isntead-of-roq-tp24381p24386.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Worker Machine running out of disk for Long running Streaming process
Hi All, I have a 24x7 running Streaming Process, which runs on 2 hour windowed data The issue i am facing is my worker machines are running OUT OF DISK space I checked that the SHUFFLE FILES are not getting cleaned up. /log/spark-2b875d98-1101-4e61-86b4-67c9e71954cc/executor-5bbb53c1-cee9-4438-87a2-b0f2becfac6f/blockmgr-c905b93b-c817-4124-a774-be1e706768c1//00/shuffle_2739_5_0.data Ultimately the machines runs out of Disk Spac i read about *spark.cleaner.ttl *config param which what i can understand from the documentation, says cleans up all the metadata beyond the time limit. I went through https://issues.apache.org/jira/browse/SPARK-5836 it says resolved, but there is no code commit Can anyone please throw some light on the issue.
Re: Transformation not happening for reduceByKey or GroupByKey
Hi Satish, I don't see where spark support -i, so suspect it is provided by DSE. In that case, it might be bug of DSE. On Fri, Aug 21, 2015 at 6:02 PM, satish chandra j jsatishchan...@gmail.com wrote: HI Robin, Yes, it is DSE but issue is related to Spark only Regards, Satish Chandra On Fri, Aug 21, 2015 at 3:06 PM, Robin East robin.e...@xense.co.uk wrote: Not sure, never used dse - it’s part of DataStax Enterprise right? On 21 Aug 2015, at 10:07, satish chandra j jsatishchan...@gmail.com wrote: HI Robin, Yes, below mentioned piece or code works fine in Spark Shell but the same when place in Script File and executed with -i file name it creating an empty RDD scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40))) pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77] at makeRDD at console:28 scala pairs.reduceByKey((x,y) = x + y).collect res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40)) Command: dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile I understand, I am missing something here due to which my final RDD does not have as required output Regards, Satish Chandra On Thu, Aug 20, 2015 at 8:23 PM, Robin East robin.e...@xense.co.uk wrote: This works for me: scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40))) pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77] at makeRDD at console:28 scala pairs.reduceByKey((x,y) = x + y).collect res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40)) On 20 Aug 2015, at 11:05, satish chandra j jsatishchan...@gmail.com wrote: HI All, I have data in RDD as mentioned below: RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40)) I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on Values for each key Code: RDD.reduceByKey((x,y) = x+y) RDD.take(3) Result in console: RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at console:73 res:Array[(Int,Int)] = Array() Command as mentioned dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile Please let me know what is missing in my code, as my resultant Array is empty Regards, Satish -- Best Regards Jeff Zhang
spark streaming 1.3 kafka error
Hi Getting below error in spark streaming 1.3 while consuming from kafka using directkafka stream. Few of tasks are getting failed in each run. What is the reason /solution of this error? 15/08/21 08:54:54 ERROR executor.Executor: Exception in task 262.0 in stage 130.0 (TID 16332) java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at kafka.utils.Utils$.read(Utils.scala:376) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:150) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:162) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:210) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/08/21 08:54:54 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 16348 15/08/21 08:54:54 INFO executor.Executor: Running task 260.1 in stage 130.0 (TID 16348) 15/08/21 08:54:54 INFO kafka.KafkaRDD: Computing topic test_hbrealtimeevents, partition 75 offsets 4701 - 4718 15/08/21 08:54:54 INFO utils.VerifiableProperties: Verifying properties Thanks
Re: Transformation not happening for reduceByKey or GroupByKey
HI Robin, Yes, below mentioned piece or code works fine in Spark Shell but the same when place in Script File and executed with -i file name it creating an empty RDD scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40))) pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77] at makeRDD at console:28 scala pairs.reduceByKey((x,y) = x + y).collect res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40)) Command: dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile I understand, I am missing something here due to which my final RDD does not have as required output Regards, Satish Chandra On Thu, Aug 20, 2015 at 8:23 PM, Robin East robin.e...@xense.co.uk wrote: This works for me: scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40))) pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77] at makeRDD at console:28 scala pairs.reduceByKey((x,y) = x + y).collect res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40)) On 20 Aug 2015, at 11:05, satish chandra j jsatishchan...@gmail.com wrote: HI All, I have data in RDD as mentioned below: RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40)) I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on Values for each key Code: RDD.reduceByKey((x,y) = x+y) RDD.take(3) Result in console: RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at console:73 res:Array[(Int,Int)] = Array() Command as mentioned dse spark --master local --jars postgresql-9.4-1201.jar -i ScriptFile Please let me know what is missing in my code, as my resultant Array is empty Regards, Satish
Re: Creating Spark DataFrame from large pandas DataFrame
The easiest option I found to put jars in SPARK CLASSPATH On 21 Aug 2015 06:20, Burak Yavuz brk...@gmail.com wrote: If you would like to try using spark-csv, please use `pyspark --packages com.databricks:spark-csv_2.11:1.2.0` You're missing a dependency. Best, Burak On Thu, Aug 20, 2015 at 1:08 PM, Charlie Hack charles.t.h...@gmail.com wrote: Hi, I'm new to spark and am trying to create a Spark df from a pandas df with ~5 million rows. Using Spark 1.4.1. When I type: df = sqlContext.createDataFrame(pandas_df.where(pd.notnull(didf), None)) (the df.where is a hack I found on the Spark JIRA to avoid a problem with NaN values making mixed column types) I get: TypeError: cannot create an RDD from type: type 'list' Converting a smaller pandas dataframe (~2000 rows) works fine. Anyone had this issue? This is already a workaround-- ideally I'd like to read the spark dataframe from a Hive table. But this is currently not an option for my setup. I also tried reading the data into spark from a CSV using spark-csv. Haven't been able to make this work as yet. I launch $ pyspark --jars path/to/spark-csv_2.11-1.2.0.jar and when I attempt to read the csv I get: Py4JJavaError: An error occurred while calling o22.load. : java.lang.NoClassDefFoundError: org/apache/commons/csv/CSVFormat ... Other options I can think of: - Convert my CSV to json (use Pig?) and read into Spark - Read in using jdbc connect from postgres But want to make sure I'm not misusing Spark or missing something obvious. Thanks! Charlie
ClassCastException when saving a DataFrame to parquet file (saveAsParquetFile, Spark 1.3.1) using Scala
Hi, I was trying to programmatically specify a schema and apply it to a RDD of Rows and save the resulting DataFrame as a parquet file, but I got java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Long on the last step. Here's what I did: 1. Created an RDD of Rows from RDD[Array[String]]: val gameId= Long.valueOf(line(0)) val accountType = Long.valueOf(line(1)) val worldId = Long.valueOf(line(2)) val dtEventTime = line(3) val iEventId = line(4) return Row(gameId, accountType, worldId, dtEventTime, iEventId) 2. Generate the schema: return StructType(Array(StructField(GameId, LongType, true), StructField(AccountType, LongType, true), StructField(WorldId, LongType, true), StructField(dtEventTime, StringType, true), StructField(iEventId, StringType, true))) 3. Apply the schema and apply it to the RDD of Rows: val schemaRdd = sqlContext.createDataFrame(rowRdd, schema) 4. Save schemaRdd as a parquet file: schemaRdd.saveAsParquetFile(dst + / + tableName + .parquet) However, it gave me a ClassCastException on step 4 (the DataFrame, i.e. schemaRdd, can be correctly printed out according to the specified schema). Thank you for your help! Best, Emma Stack trace of the exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 1.0 failed 4 times, most recent failure: Lost task 3.3 in stage 1.0 (TID 12, 10-4-28-24): java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Long at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:110) at org.apache.spark.sql.catalyst.expressions.GenericRow.getLong(rows.scala:88) at org.apache.spark.sql.parquet.MutableRowWriteSupport.consumeType(ParquetTableSupport.scala:357) at org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:338) at org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:324) at parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:120) at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81) at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37) at org.apache.spark.sql.parquet.ParquetRelation2.org $apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:671) at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:689) at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:689) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)
SPARK SQL support for XML
Does spark sql supports XML the same way as it supports json ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SPARK-SQL-support-for-XML-tp24382.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark-Cassandra-connector
Have you considered asking this question on https://groups.google.com/a/lists.datastax.com/forum/#!forum/spark-connector-user ? Cheers On Thu, Aug 20, 2015 at 10:57 PM, Samya samya.ma...@amadeus.com wrote: Hi All, I need to write an RDD to Cassandra using the sparkCassandraConnector from DataStax. My application is using Yarn. *Some basic Questions :* 1. Will a call to saveToCassandra(.), be using the same connection object between all task in a given executor? I mean is there 1 (one) connection object per executor, that is shared between tasks ? 2. If the above answer is YES, is there a way to create a connectionPool for each executor, so that multiple task can dump data to cassandra in parallel? Regards, Samya -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Cassandra-connector-tp24378.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org