Re: Custom Kryo serializer
I was able to get this working by extending KryoRegistrator and setting the spark.kryo.registrator property. On Thu, Feb 12, 2015 at 12:31 PM, Corey Nolet cjno...@gmail.com wrote: I'm trying to register a custom class that extends Kryo's Serializer interface. I can't tell exactly what Class the registerKryoClasses() function on the SparkConf is looking for. How do I register the Serializer class?
Re: Master dies after program finishes normally
How many nodes do you have in your cluster, how many cores, what is the size of the memory? On Fri, Feb 13, 2015 at 12:42 AM, Manas Kar manasdebashis...@gmail.com wrote: Hi Arush, Mine is a CDH5.3 with Spark 1.2. The only change to my spark programs are -Dspark.driver.maxResultSize=3g -Dspark.akka.frameSize=1000. ..Manas On Thu, Feb 12, 2015 at 2:05 PM, Arush Kharbanda ar...@sigmoidanalytics.com wrote: What is your cluster configuration? Did you try looking at the Web UI? There are many tips here http://spark.apache.org/docs/1.2.0/tuning.html Did you try these? On Fri, Feb 13, 2015 at 12:09 AM, Manas Kar manasdebashis...@gmail.com wrote: Hi, I have a Hidden Markov Model running with 200MB data. Once the program finishes (i.e. all stages/jobs are done) the program hangs for 20 minutes or so before killing master. In the spark master the following log appears. 2015-02-12 13:00:05,035 ERROR akka.actor.ActorSystemImpl: Uncaught fatal error from thread [sparkMaster-akka.actor.default-dispatcher-31] shutting down ActorSystem [sparkMaster] java.lang.OutOfMemoryError: GC overhead limit exceeded at scala.collection.immutable.List$.newBuilder(List.scala:396) at scala.collection.generic.GenericTraversableTemplate$class.genericBuilder(GenericTraversableTemplate.scala:69) at scala.collection.AbstractTraversable.genericBuilder(Traversable.scala:105) at scala.collection.generic.GenTraversableFactory$GenericCanBuildFrom.apply(GenTraversableFactory.scala:58) at scala.collection.generic.GenTraversableFactory$GenericCanBuildFrom.apply(GenTraversableFactory.scala:53) at scala.collection.TraversableLike$class.builder$1(TraversableLike.scala:239) at scala.collection.TraversableLike$class.map(TraversableLike.scala:243) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.json4s.MonadicJValue$$anonfun$org$json4s$MonadicJValue$$findDirectByName$1.apply(MonadicJValue.scala:26) at org.json4s.MonadicJValue$$anonfun$org$json4s$MonadicJValue$$findDirectByName$1.apply(MonadicJValue.scala:22) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at org.json4s.MonadicJValue.org $json4s$MonadicJValue$$findDirectByName(MonadicJValue.scala:22) at org.json4s.MonadicJValue.$bslash(MonadicJValue.scala:16) at org.apache.spark.util.JsonProtocol$.taskStartFromJson(JsonProtocol.scala:450) at org.apache.spark.util.JsonProtocol$.sparkEventFromJson(JsonProtocol.scala:423) at org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2$$anonfun$apply$1.apply(ReplayListenerBus.scala:71) at org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2$$anonfun$apply$1.apply(ReplayListenerBus.scala:69) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:69) at org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:55) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:55) at org.apache.spark.deploy.master.Master.rebuildSparkUI(Master.scala:726) at org.apache.spark.deploy.master.Master.removeApplication(Master.scala:675) at org.apache.spark.deploy.master.Master.finishApplication(Master.scala:653) at org.apache.spark.deploy.master.Master$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$29.apply(Master.scala:399) Can anyone help? ..Manas -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
Re: Concurrent batch processing
I've been experimenting with my configuration for couple of days and gained quite a bit of power through small optimizations, but it may very well be something I'm doing crazy that is causing this problem. To give a little bit of a background, I am in the early stages of a project that consumes a stream of data in the order of 100,000 per second that requires processing over a sliding window over one day (ideally a week). Spark Streaming is a good candidate but I want to make sure I squash any performance issues ahead of time before I commit. With a 5 second batch size, in 40 minutes, the processing time is also 5 seconds. I see the CPU spikes over two seconds out of five. I assume the sliding window operation is very expensive in this case and that's the root cause of this effect. I should've done a little bit more research before I posted, I just came across a post about an undocumented property spark.streaming.concurrentJobs that I am about to try. I'm still confused how exactly this works with a sliding window where the result of one batch depends on the other. I assume the concurrency can only be achieved up until the window action is executed. Either way, I am going to give this a try and post back here if that doesn't work. Thanks! On Thu, Feb 12, 2015 at 2:55 PM, Arush Kharbanda ar...@sigmoidanalytics.com wrote: It could depend on the nature of your application but spark streaming would use spark internally and concurrency should be there what is your use case? Are you sure that your configuration is good? On Fri, Feb 13, 2015 at 1:17 AM, Matus Faro matus.f...@kik.com wrote: Hi, Please correct me if I'm wrong, in Spark Streaming, next batch will not start processing until the previous batch has completed. Is there any way to be able to start processing the next batch if the previous batch is taking longer to process than the batch interval? The problem I am facing is that I don't see a hardware bottleneck in my Spark cluster, but Spark is not able to handle the amount of data I am pumping through (batch processing time is longer than batch interval). What I'm seeing is spikes of CPU, network and disk IO usage which I assume are due to different stages of a job, but on average, the hardware is under utilized. Concurrency in batch processing would allow the average batch processing time to be greater than batch interval while fully utilizing the hardware. Any ideas on what can be done? One option I can think of is to split the application into multiple applications running concurrently and dividing the initial stream of data between those applications. However, I would have to lose the benefits of having a single application. Thank you, Matus - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
Concurrent batch processing
Hi, Please correct me if I'm wrong, in Spark Streaming, next batch will not start processing until the previous batch has completed. Is there any way to be able to start processing the next batch if the previous batch is taking longer to process than the batch interval? The problem I am facing is that I don't see a hardware bottleneck in my Spark cluster, but Spark is not able to handle the amount of data I am pumping through (batch processing time is longer than batch interval). What I'm seeing is spikes of CPU, network and disk IO usage which I assume are due to different stages of a job, but on average, the hardware is under utilized. Concurrency in batch processing would allow the average batch processing time to be greater than batch interval while fully utilizing the hardware. Any ideas on what can be done? One option I can think of is to split the application into multiple applications running concurrently and dividing the initial stream of data between those applications. However, I would have to lose the benefits of having a single application. Thank you, Matus - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: can we insert and update with spark sql
I thought more on it...can we provide access to the IndexedRDD through thriftserver API and let the mapPartitions query the API ? I am not sure if ThriftServer is as performant as opening up an API using other akka based frameworks (like play or spray)... Any pointers will be really helpful... Neither play nor spray is being used in Spark right nowso it brings dependencies and we already know about the akka conflicts...thriftserver on the other hand is already integrated for JDBC access On Tue, Feb 10, 2015 at 3:43 PM, Debasish Das debasish.da...@gmail.com wrote: Also I wanted to run get() and set() from mapPartitions (from spark workers and not master)... To be able to do that I think I have to create a separate spark context for the cache... But I am not sure how SparkContext from job1 can access SparkContext from job2 ! On Tue, Feb 10, 2015 at 3:25 PM, Debasish Das debasish.da...@gmail.com wrote: Thanks...this is what I was looking for... It will be great if Ankur can give brief details about it...Basically how does it contrast with memcached for example... On Tue, Feb 10, 2015 at 2:32 PM, Michael Armbrust mich...@databricks.com wrote: You should look at https://github.com/amplab/spark-indexedrdd On Tue, Feb 10, 2015 at 2:27 PM, Debasish Das debasish.da...@gmail.com wrote: Hi Michael, I want to cache a RDD and define get() and set() operators on it. Basically like memcached. Is it possible to build a memcached like distributed cache using Spark SQL ? If not what do you suggest we should use for such operations... Thanks. Deb On Fri, Jul 18, 2014 at 1:00 PM, Michael Armbrust mich...@databricks.com wrote: You can do insert into. As with other SQL on HDFS systems there is no updating of data. On Jul 17, 2014 1:26 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Is this what you are looking for? https://spark.apache.org/docs/1.0.0/api/java/org/apache/spark/sql/parquet/InsertIntoParquetTable.html According to the doc, it says Operator that acts as a sink for queries on RDDs and can be used to store the output inside a directory of Parquet files. This operator is similar to Hive's INSERT INTO TABLE operation in the sense that one can choose to either overwrite or append to a directory. Note that consecutive insertions to the same table must have compatible (source) schemas. Thanks Best Regards On Thu, Jul 17, 2014 at 11:42 AM, Hu, Leo leo.h...@sap.com wrote: Hi As for spark 1.0, can we insert and update a table with SPARK SQL, and how? Thanks Best Regard
Master dies after program finishes normally
Hi, I have a Hidden Markov Model running with 200MB data. Once the program finishes (i.e. all stages/jobs are done) the program hangs for 20 minutes or so before killing master. In the spark master the following log appears. 2015-02-12 13:00:05,035 ERROR akka.actor.ActorSystemImpl: Uncaught fatal error from thread [sparkMaster-akka.actor.default-dispatcher-31] shutting down ActorSystem [sparkMaster] java.lang.OutOfMemoryError: GC overhead limit exceeded at scala.collection.immutable.List$.newBuilder(List.scala:396) at scala.collection.generic.GenericTraversableTemplate$class.genericBuilder(GenericTraversableTemplate.scala:69) at scala.collection.AbstractTraversable.genericBuilder(Traversable.scala:105) at scala.collection.generic.GenTraversableFactory$GenericCanBuildFrom.apply(GenTraversableFactory.scala:58) at scala.collection.generic.GenTraversableFactory$GenericCanBuildFrom.apply(GenTraversableFactory.scala:53) at scala.collection.TraversableLike$class.builder$1(TraversableLike.scala:239) at scala.collection.TraversableLike$class.map(TraversableLike.scala:243) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.json4s.MonadicJValue$$anonfun$org$json4s$MonadicJValue$$findDirectByName$1.apply(MonadicJValue.scala:26) at org.json4s.MonadicJValue$$anonfun$org$json4s$MonadicJValue$$findDirectByName$1.apply(MonadicJValue.scala:22) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at org.json4s.MonadicJValue.org $json4s$MonadicJValue$$findDirectByName(MonadicJValue.scala:22) at org.json4s.MonadicJValue.$bslash(MonadicJValue.scala:16) at org.apache.spark.util.JsonProtocol$.taskStartFromJson(JsonProtocol.scala:450) at org.apache.spark.util.JsonProtocol$.sparkEventFromJson(JsonProtocol.scala:423) at org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2$$anonfun$apply$1.apply(ReplayListenerBus.scala:71) at org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2$$anonfun$apply$1.apply(ReplayListenerBus.scala:69) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:69) at org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:55) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:55) at org.apache.spark.deploy.master.Master.rebuildSparkUI(Master.scala:726) at org.apache.spark.deploy.master.Master.removeApplication(Master.scala:675) at org.apache.spark.deploy.master.Master.finishApplication(Master.scala:653) at org.apache.spark.deploy.master.Master$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$29.apply(Master.scala:399) Can anyone help? ..Manas
Re: Master dies after program finishes normally
What is your cluster configuration? Did you try looking at the Web UI? There are many tips here http://spark.apache.org/docs/1.2.0/tuning.html Did you try these? On Fri, Feb 13, 2015 at 12:09 AM, Manas Kar manasdebashis...@gmail.com wrote: Hi, I have a Hidden Markov Model running with 200MB data. Once the program finishes (i.e. all stages/jobs are done) the program hangs for 20 minutes or so before killing master. In the spark master the following log appears. 2015-02-12 13:00:05,035 ERROR akka.actor.ActorSystemImpl: Uncaught fatal error from thread [sparkMaster-akka.actor.default-dispatcher-31] shutting down ActorSystem [sparkMaster] java.lang.OutOfMemoryError: GC overhead limit exceeded at scala.collection.immutable.List$.newBuilder(List.scala:396) at scala.collection.generic.GenericTraversableTemplate$class.genericBuilder(GenericTraversableTemplate.scala:69) at scala.collection.AbstractTraversable.genericBuilder(Traversable.scala:105) at scala.collection.generic.GenTraversableFactory$GenericCanBuildFrom.apply(GenTraversableFactory.scala:58) at scala.collection.generic.GenTraversableFactory$GenericCanBuildFrom.apply(GenTraversableFactory.scala:53) at scala.collection.TraversableLike$class.builder$1(TraversableLike.scala:239) at scala.collection.TraversableLike$class.map(TraversableLike.scala:243) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.json4s.MonadicJValue$$anonfun$org$json4s$MonadicJValue$$findDirectByName$1.apply(MonadicJValue.scala:26) at org.json4s.MonadicJValue$$anonfun$org$json4s$MonadicJValue$$findDirectByName$1.apply(MonadicJValue.scala:22) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at org.json4s.MonadicJValue.org $json4s$MonadicJValue$$findDirectByName(MonadicJValue.scala:22) at org.json4s.MonadicJValue.$bslash(MonadicJValue.scala:16) at org.apache.spark.util.JsonProtocol$.taskStartFromJson(JsonProtocol.scala:450) at org.apache.spark.util.JsonProtocol$.sparkEventFromJson(JsonProtocol.scala:423) at org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2$$anonfun$apply$1.apply(ReplayListenerBus.scala:71) at org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2$$anonfun$apply$1.apply(ReplayListenerBus.scala:69) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:69) at org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:55) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:55) at org.apache.spark.deploy.master.Master.rebuildSparkUI(Master.scala:726) at org.apache.spark.deploy.master.Master.removeApplication(Master.scala:675) at org.apache.spark.deploy.master.Master.finishApplication(Master.scala:653) at org.apache.spark.deploy.master.Master$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$29.apply(Master.scala:399) Can anyone help? ..Manas -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
correct way to broadcast a variable
Suppose I have an object to broadcast and then use it in a mapper function, sth like follows, (Python codes) obj2share = sc.broadcast(Some object here) someRdd.map(createMapper(obj2share)).collect() The createMapper function will create a mapper function using the shared object's value. Another way to do this is someRdd.map(createMapper(obj2share.value)).collect() Here the creatMapper function directly uses the shared object to create the mapper function. Is there a difference from spark side for the two methods? If there is no difference at all, I'd prefer the second, because it hides the spark from the createMapper function. Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/correct-way-to-broadcast-a-variable-tp21631.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Master dies after program finishes normally
I have 5 workers each executor-memory 8GB of memory. My driver memory is 8 GB as well. They are all 8 core machines. To answer Imran's question my configurations are thus. executor_total_max_heapsize = 18GB This problem happens at the end of my program. I don't have to run a lot of jobs to see this behaviour. I can see my output correctly in HDFS and all. I will give it one more try after increasing master's memory(which is default 296MB to 512 MB) ..manas On Thu, Feb 12, 2015 at 2:14 PM, Arush Kharbanda ar...@sigmoidanalytics.com wrote: How many nodes do you have in your cluster, how many cores, what is the size of the memory? On Fri, Feb 13, 2015 at 12:42 AM, Manas Kar manasdebashis...@gmail.com wrote: Hi Arush, Mine is a CDH5.3 with Spark 1.2. The only change to my spark programs are -Dspark.driver.maxResultSize=3g -Dspark.akka.frameSize=1000. ..Manas On Thu, Feb 12, 2015 at 2:05 PM, Arush Kharbanda ar...@sigmoidanalytics.com wrote: What is your cluster configuration? Did you try looking at the Web UI? There are many tips here http://spark.apache.org/docs/1.2.0/tuning.html Did you try these? On Fri, Feb 13, 2015 at 12:09 AM, Manas Kar manasdebashis...@gmail.com wrote: Hi, I have a Hidden Markov Model running with 200MB data. Once the program finishes (i.e. all stages/jobs are done) the program hangs for 20 minutes or so before killing master. In the spark master the following log appears. 2015-02-12 13:00:05,035 ERROR akka.actor.ActorSystemImpl: Uncaught fatal error from thread [sparkMaster-akka.actor.default-dispatcher-31] shutting down ActorSystem [sparkMaster] java.lang.OutOfMemoryError: GC overhead limit exceeded at scala.collection.immutable.List$.newBuilder(List.scala:396) at scala.collection.generic.GenericTraversableTemplate$class.genericBuilder(GenericTraversableTemplate.scala:69) at scala.collection.AbstractTraversable.genericBuilder(Traversable.scala:105) at scala.collection.generic.GenTraversableFactory$GenericCanBuildFrom.apply(GenTraversableFactory.scala:58) at scala.collection.generic.GenTraversableFactory$GenericCanBuildFrom.apply(GenTraversableFactory.scala:53) at scala.collection.TraversableLike$class.builder$1(TraversableLike.scala:239) at scala.collection.TraversableLike$class.map(TraversableLike.scala:243) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.json4s.MonadicJValue$$anonfun$org$json4s$MonadicJValue$$findDirectByName$1.apply(MonadicJValue.scala:26) at org.json4s.MonadicJValue$$anonfun$org$json4s$MonadicJValue$$findDirectByName$1.apply(MonadicJValue.scala:22) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at org.json4s.MonadicJValue.org $json4s$MonadicJValue$$findDirectByName(MonadicJValue.scala:22) at org.json4s.MonadicJValue.$bslash(MonadicJValue.scala:16) at org.apache.spark.util.JsonProtocol$.taskStartFromJson(JsonProtocol.scala:450) at org.apache.spark.util.JsonProtocol$.sparkEventFromJson(JsonProtocol.scala:423) at org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2$$anonfun$apply$1.apply(ReplayListenerBus.scala:71) at org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2$$anonfun$apply$1.apply(ReplayListenerBus.scala:69) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:69) at org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:55) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:55) at org.apache.spark.deploy.master.Master.rebuildSparkUI(Master.scala:726) at org.apache.spark.deploy.master.Master.removeApplication(Master.scala:675) at org.apache.spark.deploy.master.Master.finishApplication(Master.scala:653) at org.apache.spark.deploy.master.Master$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$29.apply(Master.scala:399) Can anyone help? ..Manas -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com --
Re: Master dies after program finishes normally
I have 5 workers each executor-memory 8GB of memory. My driver memory is 8 GB as well. They are all 8 core machines. To answer Imran's question my configurations are thus. executor_total_max_heapsize = 18GB This problem happens at the end of my program. I don't have to run a lot of jobs to see this behaviour. I can see my output correctly in HDFS and all. I will give it one more try after increasing master's memory(which is default 296MB to 512 MB)
Re: spark, reading from s3
The thing is that my time is perfectly valid... On Tue, Feb 10, 2015 at 10:50 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Its with the timezone actually, you can either use an NTP to maintain accurate system clock or you can adjust your system time to match with the AWS one. You can do it as: telnet s3.amazonaws.com 80 GET / HTTP/1.0 [image: Inline image 1] Thanks Best Regards On Wed, Feb 11, 2015 at 6:43 AM, Kane Kim kane.ist...@gmail.com wrote: I'm getting this warning when using s3 input: 15/02/11 00:58:37 WARN RestStorageService: Adjusted time offset in response to RequestTimeTooSkewed error. Local machine and S3 server disagree on the time by approximately 0 seconds. Retrying connection. After that there are tons of 403/forbidden errors and then job fails. It's sporadic, so sometimes I get this error and sometimes not, what could be the issue? I think it could be related to network connectivity?
Re: Why can't Spark find the classes in this Jar?
Hi Abe, I'm new to Spark as well, so someone else could answer better. A few thoughts which may or may not be the right line of thinking.. 1) Spark properties can be set on the SparkConf, and with flags in spark-submit, but settings on SparkConf take precedence. I think your jars flag for spark-submit may be redundant. 1) Is there a chance that stanford-corenlp-3.5.0.jar relies on other dependencies? I could be wrong, but perhaps if there is no other reason not to, try building your application as an uber-jar with a build tool like Maven, which will package the whole transitive jar. You can find stanford-corenlp on maven central .. I think you would add the below dependencies to your pom.xml. After building simple-project-1.0.jar with these dependencies, you would not set jars on the sc or jar flags on spark-submit. dependencies dependency groupIdedu.stanford.nlp/groupId artifactIdstanford-corenlp/artifactId version3.5.0/version /dependency dependency groupIdedu.stanford.nlp/groupId artifactIdstanford-corenlp/artifactId version3.5.0/version classifiermodels/classifier /dependency /dependencies HTH. Deb On Tue, Feb 10, 2015 at 1:12 PM, Abe Handler akh2...@gmail.com wrote: I am new to spark. I am trying to compile and run a spark application that requires classes from an (external) jar file on my local machine. If I open the jar (on ~/Desktop) I can see the missing class in the local jar but when I run spark I get NoClassDefFoundError: edu/stanford/nlp/ie/AbstractSequenceClassifier I add the jar to the spark context like this String[] jars = {/home/pathto/Desktop/stanford-corenlp-3.5.0.jar}; SparkConf conf = new SparkConf().setAppName(Simple Application).setJars(jars); Then I try to run a submit script like this /home/me/Downloads/spark-1.2.0-bin-hadoop2.4/bin/spark-submit \ --class SimpleApp \ --master local[4] \ target/simple-project-1.0.jar \ --jars local[4] /home/abe/Desktop/stanford-corenlp-3.5.0.jar and hit the NoClassDefFoundError. I get that this means that the worker threads can't find the class from the jar. But I am not sure what I am doing wrong. I have tried different syntaxes for the last line (below) but none works. --addJars local[4] /home/abe/Desktop/stanford-corenlp-3.5.0.jar --addJars local:/home/abe/Desktop/stanford-corenlp-3.5.0.jar --addJars local:/home/abe/Desktop/stanford-corenlp-3.5.0.jar How can I fix this error? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Why-can-t-Spark-find-the-classes-in-this-Jar-tp21584.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Why can't Spark find the classes in this Jar?
What version of Java are you using? Core NLP dropped support for Java 7 in its 3.5.0 release. Also, the correct command line option is --jars, not --addJars. On Thu, Feb 12, 2015 at 12:03 PM, Deborah Siegel deborah.sie...@gmail.com wrote: Hi Abe, I'm new to Spark as well, so someone else could answer better. A few thoughts which may or may not be the right line of thinking.. 1) Spark properties can be set on the SparkConf, and with flags in spark-submit, but settings on SparkConf take precedence. I think your jars flag for spark-submit may be redundant. 1) Is there a chance that stanford-corenlp-3.5.0.jar relies on other dependencies? I could be wrong, but perhaps if there is no other reason not to, try building your application as an uber-jar with a build tool like Maven, which will package the whole transitive jar. You can find stanford-corenlp on maven central .. I think you would add the below dependencies to your pom.xml. After building simple-project-1.0.jar with these dependencies, you would not set jars on the sc or jar flags on spark-submit. dependencies dependency groupIdedu.stanford.nlp/groupId artifactIdstanford-corenlp/artifactId version3.5.0/version /dependency dependency groupIdedu.stanford.nlp/groupId artifactIdstanford-corenlp/artifactId version3.5.0/version classifiermodels/classifier /dependency /dependencies HTH. Deb On Tue, Feb 10, 2015 at 1:12 PM, Abe Handler akh2...@gmail.com wrote: I am new to spark. I am trying to compile and run a spark application that requires classes from an (external) jar file on my local machine. If I open the jar (on ~/Desktop) I can see the missing class in the local jar but when I run spark I get NoClassDefFoundError: edu/stanford/nlp/ie/AbstractSequenceClassifier I add the jar to the spark context like this String[] jars = {/home/pathto/Desktop/stanford-corenlp-3.5.0.jar}; SparkConf conf = new SparkConf().setAppName(Simple Application).setJars(jars); Then I try to run a submit script like this /home/me/Downloads/spark-1.2.0-bin-hadoop2.4/bin/spark-submit \ --class SimpleApp \ --master local[4] \ target/simple-project-1.0.jar \ --jars local[4] /home/abe/Desktop/stanford-corenlp-3.5.0.jar and hit the NoClassDefFoundError. I get that this means that the worker threads can't find the class from the jar. But I am not sure what I am doing wrong. I have tried different syntaxes for the last line (below) but none works. --addJars local[4] /home/abe/Desktop/stanford-corenlp-3.5.0.jar --addJars local:/home/abe/Desktop/stanford-corenlp-3.5.0.jar --addJars local:/home/abe/Desktop/stanford-corenlp-3.5.0.jar How can I fix this error? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Why-can-t-Spark-find-the-classes-in-this-Jar-tp21584.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Re: Sort Shuffle performance issues about using AppendOnlyMap for large data sets
Hi, patrick Really glad to get your reply. Yes, we are doing group by operations for our work. We know that this is common for growTable when processing large data sets. The problem actually goes to : Do we have any possible chance to self-modify the initialCapacity using specifically for our application? Does spark provide such configs for achieving that goal? We know that this is trickle to get it working. Just want to know that how could this be resolved, or from other possible channel for we did not cover. Expecting for your kind advice. Thanks, Sun. fightf...@163.com From: Patrick Wendell Date: 2015-02-12 16:12 To: fightf...@163.com CC: user; dev Subject: Re: Re: Sort Shuffle performance issues about using AppendOnlyMap for large data sets The map will start with a capacity of 64, but will grow to accommodate new data. Are you using the groupBy operator in Spark or are you using Spark SQL's group by? This usually happens if you are grouping or aggregating in a way that doesn't sufficiently condense the data created from each input partition. - Patrick On Wed, Feb 11, 2015 at 9:37 PM, fightf...@163.com fightf...@163.com wrote: Hi, Really have no adequate solution got for this issue. Expecting any available analytical rules or hints. Thanks, Sun. fightf...@163.com From: fightf...@163.com Date: 2015-02-09 11:56 To: user; dev Subject: Re: Sort Shuffle performance issues about using AppendOnlyMap for large data sets Hi, Problem still exists. Any experts would take a look at this? Thanks, Sun. fightf...@163.com From: fightf...@163.com Date: 2015-02-06 17:54 To: user; dev Subject: Sort Shuffle performance issues about using AppendOnlyMap for large data sets Hi, all Recently we had caught performance issues when using spark 1.2.0 to read data from hbase and do some summary work. Our scenario means to : read large data sets from hbase (maybe 100G+ file) , form hbaseRDD, transform to schemardd, groupby and aggregate the data while got fewer new summary data sets, loading data into hbase (phoenix). Our major issue lead to : aggregate large datasets to get summary data sets would consume too long time (1 hour +) , while that should be supposed not so bad performance. We got the dump file attached and stacktrace from jstack like the following: From the stacktrace and dump file we can identify that processing large datasets would cause frequent AppendOnlyMap growing, and leading to huge map entrysize. We had referenced the source code of org.apache.spark.util.collection.AppendOnlyMap and found that the map had been initialized with capacity of 64. That would be too small for our use case. So the question is : Does anyone had encounted such issues before? How did that be resolved? I cannot find any jira issues for such problems and if someone had seen, please kindly let us know. More specified solution would goes to : Does any possibility exists for user defining the map capacity releatively in spark? If so, please tell how to achieve that. Best Thanks, Sun. Thread 22432: (state = IN_JAVA) - org.apache.spark.util.collection.AppendOnlyMap.growTable() @bci=87, line=224 (Compiled frame; information may be imprecise) - org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.growTable() @bci=1, line=38 (Interpreted frame) - org.apache.spark.util.collection.AppendOnlyMap.incrementSize() @bci=22, line=198 (Compiled frame) - org.apache.spark.util.collection.AppendOnlyMap.changeValue(java.lang.Object, scala.Function2) @bci=201, line=145 (Compiled frame) - org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(java.lang.Object, scala.Function2) @bci=3, line=32 (Compiled frame) - org.apache.spark.util.collection.ExternalSorter.insertAll(scala.collection.Iterator) @bci=141, line=205 (Compiled frame) - org.apache.spark.shuffle.sort.SortShuffleWriter.write(scala.collection.Iterator) @bci=74, line=58 (Interpreted frame) - org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext) @bci=169, line=68 (Interpreted frame) - org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext) @bci=2, line=41 (Interpreted frame) - org.apache.spark.scheduler.Task.run(long) @bci=77, line=56 (Interpreted frame) - org.apache.spark.executor.Executor$TaskRunner.run() @bci=310, line=196 (Interpreted frame) - java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker) @bci=95, line=1145 (Interpreted frame) - java.util.concurrent.ThreadPoolExecutor$Worker.run() @bci=5, line=615 (Interpreted frame) - java.lang.Thread.run() @bci=11, line=744 (Interpreted frame) Thread 22431: (state = IN_JAVA) - org.apache.spark.util.collection.AppendOnlyMap.growTable() @bci=87, line=224 (Compiled frame; information may be imprecise) -
Re: OutofMemoryError: Java heap space
Thanks, Kelvin :) The error seems to disappear after I decreased both spark.storage.memoryFraction and spark.shuffle.memoryFraction to 0.2 And, some increase on driver memory. Best, Yifan LI On 10 Feb 2015, at 18:58, Kelvin Chu 2dot7kel...@gmail.com wrote: Since the stacktrace shows kryo is being used, maybe, you could also try increasing spark.kryoserializer.buffer.max.mb. Hope this help. Kelvin On Tue, Feb 10, 2015 at 1:26 AM, Akhil Das ak...@sigmoidanalytics.com mailto:ak...@sigmoidanalytics.com wrote: You could try increasing the driver memory. Also, can you be more specific about the data volume? Thanks Best Regards On Mon, Feb 9, 2015 at 3:30 PM, Yifan LI iamyifa...@gmail.com mailto:iamyifa...@gmail.com wrote: Hi, I just found the following errors during computation(graphx), anyone has ideas on this? thanks so much! (I think the memory is sufficient, spark.executor.memory 30GB ) 15/02/09 00:37:12 ERROR Executor: Exception in task 162.0 in stage 719.0 (TID 7653) java.lang.OutOfMemoryError: Java heap space at com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:410) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:113) at com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:23) at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:598) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:566) at com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:29) at com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:27) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at com.twitter.chill.TraversableSerializer.write(Traversable.scala:27) at com.twitter.chill.TraversableSerializer.write(Traversable.scala:21) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:36) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37) at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:128) at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110) 15/02/09 00:37:12 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-15,5,main] java.lang.OutOfMemoryError: Java heap space at com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:410) at com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:113) at com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:23) at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:598) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:566) at com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:29) at com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:27) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
Task not serializable problem in the multi-thread SQL query
I try to use the multi-thread to use the Spark SQL query. some sample code just like this: val sqlContext = new SqlContext(sc) val rdd_query = sc.parallelize(data, part) rdd_query.registerTempTable(MyTable) sqlContext.cacheTable(MyTable) val serverPool = Executors.newFixedThreadPool(3) val loopCnt = 10 for(i - 1 to loopCnt ){ serverPool.execute(new Runnable(){ override def run(){ if( some condition){ sqlContext.sql(SELECT * from ...).collect().foreach(println) } else{ //some other query } } }) } this will throw a Task serializable Exception, if I do not use the multi-thread, it works well. Since there is no object is not serializable? so what is the problem? java.lang.Error: org.apache.spark.SparkException: Task not serializable at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1182) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641) at java.lang.Thread.run(Thread.java:853) Caused by: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597) at org.apache.spark.sql.columnar.InMemoryColumnarTableScan.execute(InMemoryColumnarTableScan.scala:105) at org.apache.spark.sql.execution.Filter.execute(basicOperators.scala:57) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:85) at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438) at RDDRelation$$anonfun$main$1$$anon$1.run(RDDRelation.scala:112) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1176) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641) at java.lang.Thread.run(Thread.java:853) Caused by: java.lang.NullPointerException at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597) at org.apache.spark.sql.columnar.InMemoryColumnarTableScan.execute(InMemoryColumnarTableScan.scala:105) at org.apache.spark.sql.execution.Filter.execute(basicOperators.scala:57) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:85) at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438) at RDDRelation$$anonfun$main$1$$anon$1.run(RDDRelation.scala:112) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1176) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641) at java.lang.Thread.run(Thread.java:853) -- *Best Wishes!*
Re: Spark Streaming distributed batch locking
* We have an inbound stream of sensor data for millions of devices (which have unique identifiers). Spark Streaming can handel events in the ballpark of 100-500K records/sec/node - *so you need to decide on a cluster accordingly. And its scalable.* * We need to perform aggregation of this stream on a per device level. The aggregation will read data that has already been processed (and persisted) in previous batches. - *You need to do stateful stream processing, Spark streaming allows you to do that checkout - **updateStateByKey -**http://spark.apache.org/docs/1.2.0/streaming-programming-guide.html http://spark.apache.org/docs/1.2.0/streaming-programming-guide.html* * Key point: When we process data for a particular device we need to ensure that no other processes are processing data for that particular device. This is because the outcome of our processing will affect the downstream processing for that device. Effectively we need a distributed lock. - *You can make the source device as a key and then updateStateByKey in spark using the key.* * In addition the event device data needs to be processed in the order that the events occurred. - *You would need to implement this in your code adding timestamp as a data item. Spark Streaming dosnt ensure in order delivery of your event.* On Thu, Feb 12, 2015 at 4:51 PM, Legg John john.l...@axonvibe.com wrote: Hi After doing lots of reading and building a POC for our use case we are still unsure as to whether Spark Streaming can handle our use case: * We have an inbound stream of sensor data for millions of devices (which have unique identifiers). * We need to perform aggregation of this stream on a per device level. The aggregation will read data that has already been processed (and persisted) in previous batches. * Key point: When we process data for a particular device we need to ensure that no other processes are processing data for that particular device. This is because the outcome of our processing will affect the downstream processing for that device. Effectively we need a distributed lock. * In addition the event device data needs to be processed in the order that the events occurred. Essentially we can¹t have two batches for the same device being processed at the same time. Can Spark handle our use case? Any advice appreciated. Regards John - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
Re: How to log using log4j to local file system inside a Spark application that runs on YARN?
Marcelo and Burak, Thank you very much for your explanations. Now I'm able to see my logs. On Wed, Feb 11, 2015 at 7:52 PM, Marcelo Vanzin van...@cloudera.com wrote: For Yarn, you need to upload your log4j.properties separately from your app's jar, because of some internal issues that are too boring to explain here. :-) Basically: spark-submit --master yarn --files log4j.properties blah blah blah Having to keep it outside your app jar is sub-optimal, and I think there's a bug filed to fix this, but so far no one has really spent time looking at it. On Wed, Feb 11, 2015 at 4:29 AM, Emre Sevinc emre.sev...@gmail.com wrote: Hello, I'm building an Apache Spark Streaming application and cannot make it log to a file on the local filesystem when running it on YARN. How can achieve this? I've set log4.properties file so that it can successfully write to a log file in /tmp directory on the local file system (shown below partially): log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.File=/tmp/application.log log4j.appender.file.append=false log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=%d{-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n When I run my Spark application locally by using the following command: spark-submit --class myModule.myClass --master local[2] --deploy-mode client myApp.jar It runs fine and I can see that log messages are written to /tmp/application.log on my local file system. But when I run the same application via YARN, e.g. spark-submit --class myModule.myClass --master yarn-client --name myModule --total-executor-cores 1 --executor-memory 1g myApp.jar or spark-submit --class myModule.myClass --master yarn-cluster --name myModule --total-executor-cores 1 --executor-memory 1g myApp.jar I cannot see any /tmp/application.log on the local file system of the machine that runs YARN. What am I missing? -- Emre Sevinç -- Marcelo -- Emre Sevinc
Re: OutOfMemoryError with ramdom forest and small training dataset
Ok, I would suggest adding SPARK_DRIVER_MEMORY in spark-env.sh, with a larger amount of memory than the default 512m -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/OutOfMemoryError-with-ramdom-forest-and-small-training-dataset-tp21598p21618.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark, reading from s3
Check that your timezone is correct as well, an incorrect timezone can make it look like your time is correct when it is skewed. cheers On Fri, Feb 13, 2015 at 5:51 AM, Kane Kim kane.ist...@gmail.com wrote: The thing is that my time is perfectly valid... On Tue, Feb 10, 2015 at 10:50 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Its with the timezone actually, you can either use an NTP to maintain accurate system clock or you can adjust your system time to match with the AWS one. You can do it as: telnet s3.amazonaws.com 80 GET / HTTP/1.0 [image: Inline image 1] Thanks Best Regards On Wed, Feb 11, 2015 at 6:43 AM, Kane Kim kane.ist...@gmail.com wrote: I'm getting this warning when using s3 input: 15/02/11 00:58:37 WARN RestStorageService: Adjusted time offset in response to RequestTimeTooSkewed error. Local machine and S3 server disagree on the time by approximately 0 seconds. Retrying connection. After that there are tons of 403/forbidden errors and then job fails. It's sporadic, so sometimes I get this error and sometimes not, what could be the issue? I think it could be related to network connectivity? -- *Franc Carter* | Systems Architect | Rozetta Technology franc.car...@rozettatech.com franc.car...@rozettatech.com| www.rozettatechnology.com Tel: +61 2 8355 2515 Level 4, 55 Harrington St, The Rocks NSW 2000 PO Box H58, Australia Square, Sydney NSW 1215 AUSTRALIA
Re: How to do broadcast join in SparkSQL
Hello Has Spark implemented computing statistics for Parquet files? Or is there any other way I can enable broadcast joins between parquet file RDDs in Spark Sql? Thanks Dima -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-do-broadcast-join-in-SparkSQL-tp15298p21632.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: No executors allocated on yarn with latest master branch
The nm logs only seems to contain similar to the following. Nothing else in the same time range. Any help? 2015-02-12 20:47:31,245 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl: Event EventType: KILL_CONTAINER sent to absent container container_1422406067005_0053_01_02 2015-02-12 20:47:31,246 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl: Event EventType: KILL_CONTAINER sent to absent container container_1422406067005_0053_01_12 2015-02-12 20:47:31,246 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl: Event EventType: KILL_CONTAINER sent to absent container container_1422406067005_0053_01_22 2015-02-12 20:47:31,246 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl: Event EventType: KILL_CONTAINER sent to absent container container_1422406067005_0053_01_32 2015-02-12 20:47:31,246 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl: Event EventType: KILL_CONTAINER sent to absent container container_1422406067005_0053_01_42 2015-02-12 21:24:30,515 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl: Event EventType: FINISH_APPLICATION sent to absent application application_1422406067005_0053 On Thu, Feb 12, 2015 at 10:38 PM, Sandy Ryza sandy.r...@cloudera.com wrote: It seems unlikely to me that it would be a 2.2 issue, though not entirely impossible. Are you able to find any of the container logs? Is the NodeManager launching containers and reporting some exit code? -Sandy On Thu, Feb 12, 2015 at 1:21 PM, Anders Arpteg arp...@spotify.com wrote: No, not submitting from windows, from a debian distribution. Had a quick look at the rm logs, and it seems some containers are allocated but then released again for some reason. Not easy to make sense of the logs, but here is a snippet from the logs (from a test in our small test cluster) if you'd like to have a closer look: http://pastebin.com/8WU9ivqC Sandy, sounds like it could possible be a 2.2 issue then, or what do you think? Thanks, Anders On Thu, Feb 12, 2015 at 3:11 PM, Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: This is tricky to debug. Check logs of node and resource manager of YARN to see if you can trace the error. In the past I have to closely look at arguments getting passed to YARN container (they get logged before attempting to launch containers). If I still don't get a clue, I had to check the script generated by YARN to execute the container and even run manually to trace at what line the error has occurred. BTW are you submitting the job from windows? On Thu, Feb 12, 2015, 3:34 PM Anders Arpteg arp...@spotify.com wrote: Interesting to hear that it works for you. Are you using Yarn 2.2 as well? No strange log message during startup, and can't see any other log messages since no executer gets launched. Does not seems to work in yarn-client mode either, failing with the exception below. Exception in thread main org.apache.spark.SparkException: Yarn application has already ended! It might have been killed or unable to launch application master. at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:119) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:59) at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:141) at org.apache.spark.SparkContext.init(SparkContext.scala:370) at com.spotify.analytics.AnalyticsSparkContext.init(AnalyticsSparkContext.scala:8) at com.spotify.analytics.DataSampler$.main(DataSampler.scala:42) at com.spotify.analytics.DataSampler.main(DataSampler.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:551) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:155) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:178) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:99) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) /Anders On Thu, Feb 12, 2015 at 1:33 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Anders, I just tried this out and was able to successfully acquire executors. Any strange log messages or additional color you can provide on your setup? Does yarn-client mode work? -Sandy On Wed, Feb 11, 2015 at 1:28 PM, Anders Arpteg
Re: Concurrent batch processing
So you have come across spark.streaming.concurrentJobs already :) Yeah, that is an undocumented feature that does allow multiple output operations to submitted in parallel. However, this is not made public for the exact reasons that you realized - the semantics in case of stateful operations is not clear. It is semantically safe to enable, how ever it may cause redundant computations, as next batches jobs may recompute some RDDs twice rather than using the cached values of another RDDs. In general, only in a very few cases is it useful to increase this concurrency. If batch processing times batch interval, then you need to use more resources, and parallelize the ingestion and processing enough to utilize those resources efficiently. The spikes that you see despite average hardware utilization is low probably indicates that the parallelization of the Spark Streaming jobs is insufficient. There are bunch of optimizations that can be done. http://spark.apache.org/docs/latest/streaming-programming-guide.html#reducing-the-processing-time-of-each-batch If you have already done this, can you tell me more about what sort of utilization and psike do you see, and what sort of parallelization you have already done? TD On Thu, Feb 12, 2015 at 12:09 PM, Matus Faro matus.f...@kik.com wrote: I've been experimenting with my configuration for couple of days and gained quite a bit of power through small optimizations, but it may very well be something I'm doing crazy that is causing this problem. To give a little bit of a background, I am in the early stages of a project that consumes a stream of data in the order of 100,000 per second that requires processing over a sliding window over one day (ideally a week). Spark Streaming is a good candidate but I want to make sure I squash any performance issues ahead of time before I commit. With a 5 second batch size, in 40 minutes, the processing time is also 5 seconds. I see the CPU spikes over two seconds out of five. I assume the sliding window operation is very expensive in this case and that's the root cause of this effect. I should've done a little bit more research before I posted, I just came across a post about an undocumented property spark.streaming.concurrentJobs that I am about to try. I'm still confused how exactly this works with a sliding window where the result of one batch depends on the other. I assume the concurrency can only be achieved up until the window action is executed. Either way, I am going to give this a try and post back here if that doesn't work. Thanks! On Thu, Feb 12, 2015 at 2:55 PM, Arush Kharbanda ar...@sigmoidanalytics.com wrote: It could depend on the nature of your application but spark streaming would use spark internally and concurrency should be there what is your use case? Are you sure that your configuration is good? On Fri, Feb 13, 2015 at 1:17 AM, Matus Faro matus.f...@kik.com wrote: Hi, Please correct me if I'm wrong, in Spark Streaming, next batch will not start processing until the previous batch has completed. Is there any way to be able to start processing the next batch if the previous batch is taking longer to process than the batch interval? The problem I am facing is that I don't see a hardware bottleneck in my Spark cluster, but Spark is not able to handle the amount of data I am pumping through (batch processing time is longer than batch interval). What I'm seeing is spikes of CPU, network and disk IO usage which I assume are due to different stages of a job, but on average, the hardware is under utilized. Concurrency in batch processing would allow the average batch processing time to be greater than batch interval while fully utilizing the hardware. Any ideas on what can be done? One option I can think of is to split the application into multiple applications running concurrently and dividing the initial stream of data between those applications. However, I would have to lose the benefits of having a single application. Thank you, Matus - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
RE: PySpark 1.2 Hadoop version mismatch
I looked at the environment which I ran the spark-submit command in, and it looks like there is nothing that could be messing with the classpath. Just to be sure, I checked the web UI which says the classpath contains: - The two jars I added: /path/to/avro-mapred-1.7.4-hadoop2.jar and lib/spark-examples-1.2.0-hadoop2.0.0-cdh4.7.0.jar - The spark assembly jar in the same location: /path/to/spark/lib/spark-assembly-1.2.0-hadoop2.0.0-cdh4.7.0.jar - The conf folder: /path/to/spark/conf - The python script I was running From: Sean Owen [so...@cloudera.com] Sent: Thursday, February 12, 2015 12:13 AM To: Akhil Das Cc: Michael Nazario; user@spark.apache.org Subject: Re: PySpark 1.2 Hadoop version mismatch No, mr1 should not be the issue here, and I think that would break other things. The OP is not using mr1. client 4 / server 7 means roughly client is Hadoop 1.x, server is Hadoop 2.0.x. Normally, I'd say I think you are packaging Hadoop code in your app by brining in Spark and its deps. Your app shouldn't have any of this code. If you're running straight examples though, I'm less sure. If anything, your client is later than your server. I wonder if you have anything else set on your local classpath via env variables? On Thu, Feb 12, 2015 at 6:52 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Did you have a look at https://urldefense.proofpoint.com/v2/url?u=http-3A__spark.apache.org_docs_1.2.0_building-2Dspark.htmld=AwIBaQc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8r=yN4Yj1JskMkGMKoYoLUUIQViRLGShPc1wislP1YdU4gm=qSlV9TOMGsnfA_9xycNM5biA5h11naL5ZuLVhMrxpHQs=vWYYgDt86TQENpK2Il3JBZHTEqQe3_bRp4TA83PUjkce= I think you can simply download the source and build for your hadoop version as: mvn -Dhadoop.version=2.0.0-mr1-cdh4.7.0 -DskipTests clean package Thanks Best Regards On Thu, Feb 12, 2015 at 11:45 AM, Michael Nazario mnaza...@palantir.com wrote: I also forgot some other information. I have made this error go away by making my pyspark application use spark-1.1.1-bin-cdh4 for the driver, but communicate with a spark 1.2 master and worker. It's not a good workaround, so I would like to have the driver also be spark 1.2 Michael From: Michael Nazario Sent: Wednesday, February 11, 2015 10:13 PM To: user@spark.apache.org Subject: PySpark 1.2 Hadoop version mismatch Hi Spark users, I seem to be having this consistent error which I have been trying to reproduce and narrow down the problem. I've been running a PySpark application on Spark 1.2 reading avro files from Hadoop. I was consistently seeing the following error: py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopFile. : org.apache.hadoop.ipc.RemoteException: Server IPC version 7 cannot communicate with client version 4 After some searching, I noticed that this most likely meant my hadoop versions were mismatched. I had the following versions at the time: Hadoop: hadoop-2.0.0-cdh4.7.0 Spark: spark-1.2.0-bin-cdh4.2.0 In the past, I've never had a problem with this setup for Spark 1.1.1 or Spark 1.0.2. I figured it was worth me rebuilding Spark in case I was wrong about versions. To rebuild my Spark, I ran this command on the v1.2.0 tag: ./make-distribution.sh -Dhadoop.version=2.0.0-cdh4.7.0 I then retried my previously mentioned application with this new build of Spark. Same error. To narrow down the problem some more, I figured I should try out the example which comes with spark which allows you to load an avro file. I ran the below command (I know it uses a deprecated way of passing jars to the driver classpath): SPARK_CLASSPATH=/path/to/avro-mapred-1.7.4-hadoop2.jar:lib/spark-examples-1.2.0-hadoop2.0.0-cdh4.7.0.jar:$SPARK_CLASSPATH bin/spark-submit ./examples/src/main/python/avro_inputformat.py hdfs://localhost:8020/path/to/file.avro I ended up with the same error. The full stacktrace is below. Traceback (most recent call last): File /git/spark/dist/./examples/src/main/python/avro_inputformat.py, line 77, in module conf=conf) File /git/spark/dist/python/pyspark/context.py, line 503, in newAPIHadoopFile jconf, batchSize) File /git/spark/dist/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /git/spark/dist/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopFile. : org.apache.hadoop.ipc.RemoteException: Server IPC version 7 cannot communicate with client version 4 at org.apache.hadoop.ipc.Client.call(Client.java:1113) at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:229) at com.sun.proxy.$Proxy8.getProtocolVersion(Unknown Source) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
Re: Easy way to partition an RDD into chunks like Guava's Iterables.partition
So I tried this: .mapPartitions(itr = { itr.grouped(300).flatMap(items = { myFunction(items) }) }) and I tried this: .mapPartitions(itr = { itr.grouped(300).flatMap(myFunction) }) I tried making myFunction a method, a function val, and even moving it into a singleton object. The closure cleaner throws Task not serliazable exceptions with a distance outer class whenever I do this. Just to test, I tried this: .flatMap(it = myFunction(Seq(it))) And it worked just fine. What am I doing wrong here? Also, my function is a little more complicated and it does take arguments that depend on the class actually manipulating the RDD- but why would it work fine with a single flatMap and not with mapPartitions? I am somewhat new to Scala and maybe I'm missing something here. On Wed, Feb 11, 2015 at 5:59 PM, Mark Hamstra m...@clearstorydata.com wrote: No, only each group should need to fit. On Wed, Feb 11, 2015 at 2:56 PM, Corey Nolet cjno...@gmail.com wrote: Doesn't iter still need to fit entirely into memory? On Wed, Feb 11, 2015 at 5:55 PM, Mark Hamstra m...@clearstorydata.com wrote: rdd.mapPartitions { iter = val grouped = iter.grouped(batchSize) for (group - grouped) { ... } } On Wed, Feb 11, 2015 at 2:44 PM, Corey Nolet cjno...@gmail.com wrote: I think the word partition here is a tad different than the term partition that we use in Spark. Basically, I want something similar to Guava's Iterables.partition [1], that is, If I have an RDD[People] and I want to run an algorithm that can be optimized by working on 30 people at a time, I'd like to be able to say: val rdd: RDD[People] = . val partitioned: RDD[Seq[People]] = rdd.partition(30) I also don't want any shuffling- everything can still be processed locally. [1] http://docs.guava-libraries.googlecode.com/git/javadoc/com/google/common/collect/Iterables.html#partition(java.lang.Iterable,%20int)
Re: No executors allocated on yarn with latest master branch
It seems unlikely to me that it would be a 2.2 issue, though not entirely impossible. Are you able to find any of the container logs? Is the NodeManager launching containers and reporting some exit code? -Sandy On Thu, Feb 12, 2015 at 1:21 PM, Anders Arpteg arp...@spotify.com wrote: No, not submitting from windows, from a debian distribution. Had a quick look at the rm logs, and it seems some containers are allocated but then released again for some reason. Not easy to make sense of the logs, but here is a snippet from the logs (from a test in our small test cluster) if you'd like to have a closer look: http://pastebin.com/8WU9ivqC Sandy, sounds like it could possible be a 2.2 issue then, or what do you think? Thanks, Anders On Thu, Feb 12, 2015 at 3:11 PM, Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: This is tricky to debug. Check logs of node and resource manager of YARN to see if you can trace the error. In the past I have to closely look at arguments getting passed to YARN container (they get logged before attempting to launch containers). If I still don't get a clue, I had to check the script generated by YARN to execute the container and even run manually to trace at what line the error has occurred. BTW are you submitting the job from windows? On Thu, Feb 12, 2015, 3:34 PM Anders Arpteg arp...@spotify.com wrote: Interesting to hear that it works for you. Are you using Yarn 2.2 as well? No strange log message during startup, and can't see any other log messages since no executer gets launched. Does not seems to work in yarn-client mode either, failing with the exception below. Exception in thread main org.apache.spark.SparkException: Yarn application has already ended! It might have been killed or unable to launch application master. at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:119) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:59) at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:141) at org.apache.spark.SparkContext.init(SparkContext.scala:370) at com.spotify.analytics.AnalyticsSparkContext.init(AnalyticsSparkContext.scala:8) at com.spotify.analytics.DataSampler$.main(DataSampler.scala:42) at com.spotify.analytics.DataSampler.main(DataSampler.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:551) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:155) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:178) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:99) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) /Anders On Thu, Feb 12, 2015 at 1:33 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Anders, I just tried this out and was able to successfully acquire executors. Any strange log messages or additional color you can provide on your setup? Does yarn-client mode work? -Sandy On Wed, Feb 11, 2015 at 1:28 PM, Anders Arpteg arp...@spotify.com wrote: Hi, Compiled the latest master of Spark yesterday (2015-02-10) for Hadoop 2.2 and failed executing jobs in yarn-cluster mode for that build. Works successfully with spark 1.2 (and also master from 2015-01-16), so something has changed since then that prevents the job from receiving any executors on the cluster. Basic symptoms are that the jobs fires up the AM, but after examining the executors page in the web ui, only the driver is listed, no executors are ever received, and the driver keep waiting forever. Has anyone seemed similar problems? Thanks for any insights, Anders
Re: Can spark job server be used to visualize streaming data?
You would probably write to hdfs or check out https://databricks.com/blog/2015/01/28/introducing-streaming-k-means-in-spark-1-2.html You might be able to retrofit it to you use case. --- Original Message --- From: Su She suhsheka...@gmail.com Sent: February 11, 2015 10:55 PM To: Felix C felixcheun...@hotmail.com Cc: Kelvin Chu 2dot7kel...@gmail.com, user@spark.apache.org Subject: Re: Can spark job server be used to visualize streaming data? Hello Felix, I am already streaming in very simple data using Kafka (few messages / second, each record only has 3 columns...really simple, but looking to scale once I connect everything). I am processing it in Spark Streaming and am currently writing word counts to hdfs. So the part where I am confused is... Kafka Publishes Data - Kafka Consumer/Spark Streaming Receives Data - Spark Word Count - *How do I visualize?* is there a viz tool that I can set up to visualize JavaPairDStreams? or do I have to write to hbase/hdfs first? Thanks! On Wed, Feb 11, 2015 at 10:39 PM, Felix C felixcheun...@hotmail.com wrote: What kind of data do you have? Kafka is a popular source to use with spark streaming. But, spark streaming also support reading from a file. Its called basic source https://spark.apache.org/docs/latest/streaming-programming-guide.html#input-dstreams-and-receivers --- Original Message --- From: Su She suhsheka...@gmail.com Sent: February 11, 2015 10:23 AM To: Felix C felixcheun...@hotmail.com Cc: Kelvin Chu 2dot7kel...@gmail.com, user@spark.apache.org Subject: Re: Can spark job server be used to visualize streaming data? Thank you Felix and Kelvin. I think I'll def be using the k-means tools in mlib. It seems the best way to stream data is by storing in hbase and then using an api in my viz to extract data? Does anyone have any thoughts on this? Thanks! On Tue, Feb 10, 2015 at 11:45 PM, Felix C felixcheun...@hotmail.com wrote: Checkout https://databricks.com/blog/2015/01/28/introducing-streaming-k-means-in-spark-1-2.html In there are links to how that is done. --- Original Message --- From: Kelvin Chu 2dot7kel...@gmail.com Sent: February 10, 2015 12:48 PM To: Su She suhsheka...@gmail.com Cc: user@spark.apache.org Subject: Re: Can spark job server be used to visualize streaming data? Hi Su, Out of the box, no. But, I know people integrate it with Spark Streaming to do real-time visualization. It will take some work though. Kelvin On Mon, Feb 9, 2015 at 5:04 PM, Su She suhsheka...@gmail.com wrote: Hello Everyone, I was reading this blog post: http://homes.esat.kuleuven.be/~bioiuser/blog/a-d3-visualisation-from-spark-as-a-service/ and was wondering if this approach can be taken to visualize streaming data...not just historical data? Thank you! -Suh
Re: Is there a fast way to do fast top N product recommendations for all users
Not now, but see https://issues.apache.org/jira/browse/SPARK-3066 As an aside, it's quite expensive to make recommendations for all users. IMHO this is not something to do, if you can avoid it architecturally. For example, consider precomputing recommendations only for users whose probability of needing recommendations soon is not very small. Usually, only a small number of users are active. On Thu, Feb 12, 2015 at 10:26 PM, Crystal Xing crystalxin...@gmail.com wrote: Hi, I wonder if there is a way to do fast top N product recommendations for all users in training using mllib's ALS algorithm. I am currently calling public Rating[] recommendProducts(int user, int num) method in MatrixFactorizatoinModel for users one by one and it is quite slow since it does not operate on RDD input? I also tried to generate all possible user-product pairs and use public JavaRDDRating predict(JavaPairRDDInteger,Integer usersProducts) to fill out the matrix. Since I have a large number of user and products, the job stucks and transforming all pairs. I wonder if there is a better way to do this. Thanks, Crystal. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Is there a fast way to do fast top N product recommendations for all users
Thanks, Sean! Glad to know it will be in the future release. On Thu, Feb 12, 2015 at 2:45 PM, Sean Owen so...@cloudera.com wrote: Not now, but see https://issues.apache.org/jira/browse/SPARK-3066 As an aside, it's quite expensive to make recommendations for all users. IMHO this is not something to do, if you can avoid it architecturally. For example, consider precomputing recommendations only for users whose probability of needing recommendations soon is not very small. Usually, only a small number of users are active. On Thu, Feb 12, 2015 at 10:26 PM, Crystal Xing crystalxin...@gmail.com wrote: Hi, I wonder if there is a way to do fast top N product recommendations for all users in training using mllib's ALS algorithm. I am currently calling public Rating[] recommendProducts(int user, int num) method in MatrixFactorizatoinModel for users one by one and it is quite slow since it does not operate on RDD input? I also tried to generate all possible user-product pairs and use public JavaRDDRating predict(JavaPairRDDInteger,Integer usersProducts) to fill out the matrix. Since I have a large number of user and products, the job stucks and transforming all pairs. I wonder if there is a better way to do this. Thanks, Crystal.
Re: Task not serializable problem in the multi-thread SQL query
It looks to me like perhaps your SparkContext has shut down due to too many failures. I'd look in the logs of your executors for more information. On Thu, Feb 12, 2015 at 2:34 AM, lihu lihu...@gmail.com wrote: I try to use the multi-thread to use the Spark SQL query. some sample code just like this: val sqlContext = new SqlContext(sc) val rdd_query = sc.parallelize(data, part) rdd_query.registerTempTable(MyTable) sqlContext.cacheTable(MyTable) val serverPool = Executors.newFixedThreadPool(3) val loopCnt = 10 for(i - 1 to loopCnt ){ serverPool.execute(new Runnable(){ override def run(){ if( some condition){ sqlContext.sql(SELECT * from ...).collect().foreach(println) } else{ //some other query } } }) } this will throw a Task serializable Exception, if I do not use the multi-thread, it works well. Since there is no object is not serializable? so what is the problem? java.lang.Error: org.apache.spark.SparkException: Task not serializable at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1182) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641) at java.lang.Thread.run(Thread.java:853) Caused by: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597) at org.apache.spark.sql.columnar.InMemoryColumnarTableScan.execute(InMemoryColumnarTableScan.scala:105) at org.apache.spark.sql.execution.Filter.execute(basicOperators.scala:57) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:85) at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438) at RDDRelation$$anonfun$main$1$$anon$1.run(RDDRelation.scala:112) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1176) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641) at java.lang.Thread.run(Thread.java:853) Caused by: java.lang.NullPointerException at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597) at org.apache.spark.sql.columnar.InMemoryColumnarTableScan.execute(InMemoryColumnarTableScan.scala:105) at org.apache.spark.sql.execution.Filter.execute(basicOperators.scala:57) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:85) at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438) at RDDRelation$$anonfun$main$1$$anon$1.run(RDDRelation.scala:112) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1176) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641) at java.lang.Thread.run(Thread.java:853) -- *Best Wishes!*
Re: Master dies after program finishes normally
The important thing here is the master's memory, that's where you're getting the GC overhead limit. The master is updating its UI to include your finished app when your app finishes, which would cause a spike in memory usage. I wouldn't expect the master to need a ton of memory just to serve the UI for a modest number of small apps, but maybe some of your apps have a lot of jobs, stages, or tasks. And there is always lots of overhead from the jvm, so bumping it up might help. On Thu, Feb 12, 2015 at 1:25 PM, Manas Kar manasdebashis...@gmail.com wrote: I have 5 workers each executor-memory 8GB of memory. My driver memory is 8 GB as well. They are all 8 core machines. To answer Imran's question my configurations are thus. executor_total_max_heapsize = 18GB This problem happens at the end of my program. I don't have to run a lot of jobs to see this behaviour. I can see my output correctly in HDFS and all. I will give it one more try after increasing master's memory(which is default 296MB to 512 MB) ..manas On Thu, Feb 12, 2015 at 2:14 PM, Arush Kharbanda ar...@sigmoidanalytics.com wrote: How many nodes do you have in your cluster, how many cores, what is the size of the memory? On Fri, Feb 13, 2015 at 12:42 AM, Manas Kar manasdebashis...@gmail.com wrote: Hi Arush, Mine is a CDH5.3 with Spark 1.2. The only change to my spark programs are -Dspark.driver.maxResultSize=3g -Dspark.akka.frameSize=1000. ..Manas On Thu, Feb 12, 2015 at 2:05 PM, Arush Kharbanda ar...@sigmoidanalytics.com wrote: What is your cluster configuration? Did you try looking at the Web UI? There are many tips here http://spark.apache.org/docs/1.2.0/tuning.html Did you try these? On Fri, Feb 13, 2015 at 12:09 AM, Manas Kar manasdebashis...@gmail.com wrote: Hi, I have a Hidden Markov Model running with 200MB data. Once the program finishes (i.e. all stages/jobs are done) the program hangs for 20 minutes or so before killing master. In the spark master the following log appears. 2015-02-12 13:00:05,035 ERROR akka.actor.ActorSystemImpl: Uncaught fatal error from thread [sparkMaster-akka.actor.default-dispatcher-31] shutting down ActorSystem [sparkMaster] java.lang.OutOfMemoryError: GC overhead limit exceeded at scala.collection.immutable.List$.newBuilder(List.scala:396) at scala.collection.generic.GenericTraversableTemplate$class.genericBuilder(GenericTraversableTemplate.scala:69) at scala.collection.AbstractTraversable.genericBuilder(Traversable.scala:105) at scala.collection.generic.GenTraversableFactory$GenericCanBuildFrom.apply(GenTraversableFactory.scala:58) at scala.collection.generic.GenTraversableFactory$GenericCanBuildFrom.apply(GenTraversableFactory.scala:53) at scala.collection.TraversableLike$class.builder$1(TraversableLike.scala:239) at scala.collection.TraversableLike$class.map(TraversableLike.scala:243) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.json4s.MonadicJValue$$anonfun$org$json4s$MonadicJValue$$findDirectByName$1.apply(MonadicJValue.scala:26) at org.json4s.MonadicJValue$$anonfun$org$json4s$MonadicJValue$$findDirectByName$1.apply(MonadicJValue.scala:22) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at org.json4s.MonadicJValue.org $json4s$MonadicJValue$$findDirectByName(MonadicJValue.scala:22) at org.json4s.MonadicJValue.$bslash(MonadicJValue.scala:16) at org.apache.spark.util.JsonProtocol$.taskStartFromJson(JsonProtocol.scala:450) at org.apache.spark.util.JsonProtocol$.sparkEventFromJson(JsonProtocol.scala:423) at org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2$$anonfun$apply$1.apply(ReplayListenerBus.scala:71) at org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2$$anonfun$apply$1.apply(ReplayListenerBus.scala:69) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:69) at org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:55) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:55) at
Is there a fast way to do fast top N product recommendations for all users
Hi, I wonder if there is a way to do fast top N product recommendations for all users in training using mllib's ALS algorithm. I am currently calling public Rating http://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/mllib/recommendation/Rating.html[] recommendProducts(int user, int num) method in MatrixFactorizatoinModel for users one by one and it is quite slow since it does not operate on RDD input? I also tried to generate all possible user-product pairs *and use*public JavaRDD http://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/api/java/JavaRDD.htmlRating http://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/mllib/recommendation/Rating.html predict(JavaPairRDD http://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/api/java/JavaPairRDD.htmlInteger,Integer usersProducts) to fill out the matrix. Since I have a large number of user and products, the job stucks and transforming all pairs. I wonder if there is a better way to do this. Thanks, Crystal.
Re: Is it possible to expose SchemaRDD’s from thrift server?
You can start a JDBC server with an existing context. See my answer here: http://apache-spark-user-list.1001560.n3.nabble.com/Standard-SQL-tool-access-to-SchemaRDD-td20197.html On Thu, Feb 12, 2015 at 7:24 AM, Todd Nist tsind...@gmail.com wrote: I have a question with regards to accessing SchemaRDD’s and Spark SQL temp tables via the thrift server. It appears that a SchemaRDD when created is only available in the local namespace / context and are unavailable to external services accessing Spark through thrift server via ODBC; is this correct? Does the same apply to temp tables? If we process data on Spark how is it exposed to the thrift server for access by third party BI applications via ODBC? Dose one need to have two spark context, one for processing, then dump it to metastore from which a third party application can fetch the data or is it possible to expose the resulting SchemaRDD via the thrift server? I am trying to do this with Tableau, Spark SQL Connector. From what I can see I need the spark context for processing and then dump to metastore. Is it possible to access the resulting SchemaRDD from doing something like this: create temporary table test using org.apache.spark.sql.json options (path ‘/data/json/*'); cache table test; I am using Spark 1.2.1. If not available now will it be in 1.3.x? Or is the only way to achieve this is store into the metastore and does the imply hive. -Todd
Re: spark, reading from s3
Looks like my clock is in sync: -bash-4.1$ date curl -v s3.amazonaws.com Thu Feb 12 21:40:18 UTC 2015 * About to connect() to s3.amazonaws.com port 80 (#0) * Trying 54.231.12.24... connected * Connected to s3.amazonaws.com (54.231.12.24) port 80 (#0) GET / HTTP/1.1 User-Agent: curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/ 3.14.0.0 zlib/1.2.3 libidn/1.18 libssh2/1.4.2 Host: s3.amazonaws.com Accept: */* HTTP/1.1 307 Temporary Redirect x-amz-id-2: sl8Tg81ZnBj3tD7Q9f2KFBBZKC83TbAUieHJu9IA3PrBibvB3M7NpwAlfTi/Tdwg x-amz-request-id: 48C14DF82BE1A970 Date: Thu, 12 Feb 2015 21:40:19 GMT Location: http://aws.amazon.com/s3/ Content-Length: 0 Server: AmazonS3 On Thu, Feb 12, 2015 at 12:26 PM, Franc Carter franc.car...@rozettatech.com wrote: Check that your timezone is correct as well, an incorrect timezone can make it look like your time is correct when it is skewed. cheers On Fri, Feb 13, 2015 at 5:51 AM, Kane Kim kane.ist...@gmail.com wrote: The thing is that my time is perfectly valid... On Tue, Feb 10, 2015 at 10:50 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Its with the timezone actually, you can either use an NTP to maintain accurate system clock or you can adjust your system time to match with the AWS one. You can do it as: telnet s3.amazonaws.com 80 GET / HTTP/1.0 [image: Inline image 1] Thanks Best Regards On Wed, Feb 11, 2015 at 6:43 AM, Kane Kim kane.ist...@gmail.com wrote: I'm getting this warning when using s3 input: 15/02/11 00:58:37 WARN RestStorageService: Adjusted time offset in response to RequestTimeTooSkewed error. Local machine and S3 server disagree on the time by approximately 0 seconds. Retrying connection. After that there are tons of 403/forbidden errors and then job fails. It's sporadic, so sometimes I get this error and sometimes not, what could be the issue? I think it could be related to network connectivity? -- *Franc Carter* | Systems Architect | Rozetta Technology franc.car...@rozettatech.com franc.car...@rozettatech.com| www.rozettatechnology.com Tel: +61 2 8355 2515 Level 4, 55 Harrington St, The Rocks NSW 2000 PO Box H58, Australia Square, Sydney NSW 1215 AUSTRALIA
Re: How to do broadcast join in SparkSQL
In Spark 1.3, parquet tables that are created through the datasources API will automatically calculate the sizeInBytes, which is used to broadcast. On Thu, Feb 12, 2015 at 12:46 PM, Dima Zhiyanov dimazhiya...@hotmail.com wrote: Hello Has Spark implemented computing statistics for Parquet files? Or is there any other way I can enable broadcast joins between parquet file RDDs in Spark Sql? Thanks Dima -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-do-broadcast-join-in-SparkSQL-tp15298p21632.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Question about mllib als's implicit training
HI Sean, I am reading the paper of implicit training. Collaborative Filtering for Implicit Feedback Datasets http://labs.yahoo.com/files/HuKorenVolinsky-ICDM08.pdf It mentioned To this end, let us introduce a set of binary variables p_ui, which indicates the preference of user u to item i. The p_ui values are derived by binarizing the r_ui values: p_ui = 1 if r_ui 0 and p_ui=0 if r_ui = 0 If for user_item without interactions, I do not include it in the training data. All the r_ui will 0 and all the p_ui is always 1? Or the Mllib's implementation automatically takes care of those no interaction user_product pairs ? On Thu, Feb 12, 2015 at 3:13 PM, Sean Owen so...@cloudera.com wrote: Where there is no user-item interaction, you provide no interaction, not an interaction with strength 0. Otherwise your input is fully dense. On Thu, Feb 12, 2015 at 11:09 PM, Crystal Xing crystalxin...@gmail.com wrote: Hi, I have some implicit rating data, such as the purchasing data. I read the paper about the implicit training algorithm used in spark and it mentioned the for user-prodct pairs which do not have implicit rating data, such as no purchase, we need to provide the value as 0. This is different from explicit training where when we provide training data, for user-product pair without a rating, we just do not have them in the training data instead of adding a user-product pair with rating 0. Am I understand this correctly? Or for implicit training implementation in spark, the missing data will be automatically filled out as zero and we do not need to add them in the training data set? Thanks, Crystal.
Predicting Class Probability with Gradient Boosting/Random Forest
We are using Gradient Boosting/Random Forests that I have found provide the best results for our recommendations. My issue is that I need the probability of the 0/1 label, and not the predicted label. In the spark scala api, I see that the predict method also has an option to provide the probability. Can you provide any pointers to any documentation that I can reference for implementing this. Thanks! -Nilesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Predicting-Class-Probability-with-Gradient-Boosting-Random-Forest-tp21633.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: spark left outer join with java.lang.UnsupportedOperationException: empty collection
OK. I think I have to use None instead null, then it works. Still switching from Java. I can also just use the field name as what I assume. Great experience. From: java8...@hotmail.com To: user@spark.apache.org Subject: spark left outer join with java.lang.UnsupportedOperationException: empty collection Date: Thu, 12 Feb 2015 18:06:43 -0500 Hi, I am using Spark 1.2.0 with Hadoop 2.2. Now I have to 2 csv files, but have 8 fields. I know that the first field from both files are IDs. I want to find all the IDs existed in the first file, but NOT in the 2nd file. I am coming with the following code in spark-shell. case class origAsLeft (id: String)case class newAsRight (id: String)val OrigData = sc.textFile(hdfs://firstfile).map(_.split(,)).map( r=(r(0), origAsLeft(r(0val NewData = sc.textFile(hdfs://secondfile).map(_.split(,)).map( r=(r(0), newAsRight(r(0val output = OrigData.leftOuterJoin(NewData).filter{ case (k, v) = v._2 == null } Find what I understand, after OrigData left outer join with NewData, it will use the id as the key, and a tuple with (leftObject, RightObject).Since I want to find out all the IDs existed in the first file, but not in the 2nd one, so the output RDD will be the one I want, as it will filter out only when there is no newAsRight object from NewData. Then I run output.first Spark does start to run, but give me the following error message:15/02/12 16:43:38 INFO scheduler.DAGScheduler: Job 4 finished: first at console:21, took 78.303549 sjava.lang.UnsupportedOperationException: empty collection at org.apache.spark.rdd.RDD.first(RDD.scala:1095) at $iwC$$iwC$$iwC$$iwC.init(console:21) at $iwC$$iwC$$iwC.init(console:26) at $iwC$$iwC.init(console:28) at $iwC.init(console:30)at init(console:32) at .init(console:36) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:94) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55) at java.lang.reflect.Method.invoke(Method.java:619) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:873) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:628) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:968) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:94) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55) at java.lang.reflect.Method.invoke(Method.java:619) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Did I do anything wrong? What is the way to find all the id in the first file, but not in the 2nd file? Second question is how can I use the object field to do the compare in this case? For example, if I define: case class origAsLeft (id: String, name: String)case class newAsRight (id: String, name: String)val OrigData = sc.textFile(hdfs://firstfile).map(_.split(,)).map( r=(r(0), origAsLeft(r(0), r(1val NewData = sc.textFile(hdfs://secondfile).map(_.split(,)).map( r=(r(0), newAsRight(r(0), r(1// in this case, I want to list all the data in the first file which has the same ID as in the 2nd file, but with different value in name, I want to do something like below: val output = OrigData.join(NewData).filter{ case (k, v) = v._1.name != v._2.name } But what is the
Re: exception with json4s render
Any ideas on how to figure out what is going on when using json4s 3.2.11. I have a need to use 3.2.11 and just to see if things work I had downgraded to 3.2.10 and things started working. On Wed, Feb 11, 2015 at 11:45 AM, Charles Feduke charles.fed...@gmail.com wrote: I was having a similar problem to this trying to use the Scala Jackson module yesterday. I tried setting `spark.files.userClassPathFirst` to true but I was still having problems due to the older version of Jackson that Spark has a dependency on. (I think its an old org.codehaus version.) I ended up solving my problem by using Spray JSON ( https://github.com/spray/spray-json) which has no dependency on Jackson and has great control over the JSON rendering process. http://engineering.ooyala.com/blog/comparing-scala-json-libraries - based on that I looked for something that didn't rely on Jackson. Now that I see that there is some success with json4s on Spark 1.2.x I'll have to give that a try. On Wed Feb 11 2015 at 2:32:59 PM Jonathan Haddad j...@jonhaddad.com wrote: Actually, yes, I was using 3.2.11. I thought I would need the UUID encoder that seems to have been added in that version, but I'm not using it. I've downgraded to 3.2.10 and it seems to work. I searched through the spark repo and it looks like it's got 3.2.10 in a pom. I don't know the first thing about how dependencies are resolved but I'm guessing it's related? On Wed Feb 11 2015 at 11:20:42 AM Mohnish Kodnani mohnish.kodn...@gmail.com wrote: I was getting similar error after I upgraded to spark 1.2.1 from 1.1.1 Are you by any chance using json4s 3.2.11. I downgraded to 3.2.10 and that seemed to have worked. But I didnt try to spend much time debugging the issue than that. On Wed, Feb 11, 2015 at 11:13 AM, Jonathan Haddad j...@jonhaddad.com wrote: I'm trying to use the json4s library in a spark job to push data back into kafka. Everything was working fine when I was hard coding a string, but now that I'm trying to render a string from a simple map it's failing. The code works in sbt console. working console code: https://gist.github.com/rustyrazorblade/daa50bf05ff0d48ac6af failing spark job line: https://github.com/rustyrazorblade/killranalytics/blob/master/spark/src/main/scala/RawEventProcessing.scala#L114 exception: https://gist.github.com/rustyrazorblade/1e220d87d41cfcad2bb9 I've seen examples of using render / compact when I searched the ML archives, so I'm kind of at a loss here. Thanks in advance for any help. Jon
Re: Task not serializable problem in the multi-thread SQL query
Thanks very much, you're right. I called the sc.stop() before the execute pool shutdown. On Fri, Feb 13, 2015 at 7:04 AM, Michael Armbrust mich...@databricks.com wrote: It looks to me like perhaps your SparkContext has shut down due to too many failures. I'd look in the logs of your executors for more information. On Thu, Feb 12, 2015 at 2:34 AM, lihu lihu...@gmail.com wrote: I try to use the multi-thread to use the Spark SQL query. some sample code just like this: val sqlContext = new SqlContext(sc) val rdd_query = sc.parallelize(data, part) rdd_query.registerTempTable(MyTable) sqlContext.cacheTable(MyTable) val serverPool = Executors.newFixedThreadPool(3) val loopCnt = 10 for(i - 1 to loopCnt ){ serverPool.execute(new Runnable(){ override def run(){ if( some condition){ sqlContext.sql(SELECT * from ...).collect().foreach(println) } else{ //some other query } } }) } this will throw a Task serializable Exception, if I do not use the multi-thread, it works well. Since there is no object is not serializable? so what is the problem? java.lang.Error: org.apache.spark.SparkException: Task not serializable at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1182) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641) at java.lang.Thread.run(Thread.java:853) Caused by: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597) at org.apache.spark.sql.columnar.InMemoryColumnarTableScan.execute(InMemoryColumnarTableScan.scala:105) at org.apache.spark.sql.execution.Filter.execute(basicOperators.scala:57) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:85) at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438) at RDDRelation$$anonfun$main$1$$anon$1.run(RDDRelation.scala:112) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1176) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641) at java.lang.Thread.run(Thread.java:853) Caused by: java.lang.NullPointerException at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597) at org.apache.spark.sql.columnar.InMemoryColumnarTableScan.execute(InMemoryColumnarTableScan.scala:105) at org.apache.spark.sql.execution.Filter.execute(basicOperators.scala:57) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:85) at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438) at RDDRelation$$anonfun$main$1$$anon$1.run(RDDRelation.scala:112) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1176) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641) at java.lang.Thread.run(Thread.java:853) -- *Best Wishes!* -- *Best Wishes!*
Re: Question about mllib als's implicit training
Where there is no user-item interaction, you provide no interaction, not an interaction with strength 0. Otherwise your input is fully dense. On Thu, Feb 12, 2015 at 11:09 PM, Crystal Xing crystalxin...@gmail.com wrote: Hi, I have some implicit rating data, such as the purchasing data. I read the paper about the implicit training algorithm used in spark and it mentioned the for user-prodct pairs which do not have implicit rating data, such as no purchase, we need to provide the value as 0. This is different from explicit training where when we provide training data, for user-product pair without a rating, we just do not have them in the training data instead of adding a user-product pair with rating 0. Am I understand this correctly? Or for implicit training implementation in spark, the missing data will be automatically filled out as zero and we do not need to add them in the training data set? Thanks, Crystal. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Easy way to partition an RDD into chunks like Guava's Iterables.partition
The more I'm thinking about this- I may try this instead: val myChunkedRDD: RDD[List[Event]] = inputRDD.mapPartitions(_ .grouped(300).toList) I wonder if this would work. I'll try it when I get back to work tomorrow. Yuyhao, I tried your approach too but it seems to be somehow moving all the data to a single partition (no matter what window I set) and it seems to lock up my jobs. I waited for 15 minutes for a stage that usually takes about 15 seconds and I finally just killed the job in yarn. On Thu, Feb 12, 2015 at 4:40 PM, Corey Nolet cjno...@gmail.com wrote: So I tried this: .mapPartitions(itr = { itr.grouped(300).flatMap(items = { myFunction(items) }) }) and I tried this: .mapPartitions(itr = { itr.grouped(300).flatMap(myFunction) }) I tried making myFunction a method, a function val, and even moving it into a singleton object. The closure cleaner throws Task not serliazable exceptions with a distance outer class whenever I do this. Just to test, I tried this: .flatMap(it = myFunction(Seq(it))) And it worked just fine. What am I doing wrong here? Also, my function is a little more complicated and it does take arguments that depend on the class actually manipulating the RDD- but why would it work fine with a single flatMap and not with mapPartitions? I am somewhat new to Scala and maybe I'm missing something here. On Wed, Feb 11, 2015 at 5:59 PM, Mark Hamstra m...@clearstorydata.com wrote: No, only each group should need to fit. On Wed, Feb 11, 2015 at 2:56 PM, Corey Nolet cjno...@gmail.com wrote: Doesn't iter still need to fit entirely into memory? On Wed, Feb 11, 2015 at 5:55 PM, Mark Hamstra m...@clearstorydata.com wrote: rdd.mapPartitions { iter = val grouped = iter.grouped(batchSize) for (group - grouped) { ... } } On Wed, Feb 11, 2015 at 2:44 PM, Corey Nolet cjno...@gmail.com wrote: I think the word partition here is a tad different than the term partition that we use in Spark. Basically, I want something similar to Guava's Iterables.partition [1], that is, If I have an RDD[People] and I want to run an algorithm that can be optimized by working on 30 people at a time, I'd like to be able to say: val rdd: RDD[People] = . val partitioned: RDD[Seq[People]] = rdd.partition(30) I also don't want any shuffling- everything can still be processed locally. [1] http://docs.guava-libraries.googlecode.com/git/javadoc/com/google/common/collect/Iterables.html#partition(java.lang.Iterable,%20int)
RE: Is there a fast way to do fast top N product recommendations for all users
Hi all - I've spent a while playing with this. Two significant sources of speed up that I've achieved are 1) Manually multiplying the feature vectors and caching either the user or product vector 2) By doing so, if one of the RDDs is a global it becomes possible to parallelize this step by running it in a thread and submitting multiple threads to yarn engine. Doing so I've achieved an over 75x speed up compared with the packaged versio inside ml lib. Sent with Good (www.good.com) -Original Message- From: Sean Owen [so...@cloudera.commailto:so...@cloudera.com] Sent: Thursday, February 12, 2015 05:47 PM Eastern Standard Time To: Crystal Xing Cc: user@spark.apache.org Subject: Re: Is there a fast way to do fast top N product recommendations for all users Not now, but see https://issues.apache.org/jira/browse/SPARK-3066 As an aside, it's quite expensive to make recommendations for all users. IMHO this is not something to do, if you can avoid it architecturally. For example, consider precomputing recommendations only for users whose probability of needing recommendations soon is not very small. Usually, only a small number of users are active. On Thu, Feb 12, 2015 at 10:26 PM, Crystal Xing crystalxin...@gmail.com wrote: Hi, I wonder if there is a way to do fast top N product recommendations for all users in training using mllib's ALS algorithm. I am currently calling public Rating[] recommendProducts(int user, int num) method in MatrixFactorizatoinModel for users one by one and it is quite slow since it does not operate on RDD input? I also tried to generate all possible user-product pairs and use public JavaRDDRating predict(JavaPairRDDInteger,Integer usersProducts) to fill out the matrix. Since I have a large number of user and products, the job stucks and transforming all pairs. I wonder if there is a better way to do this. Thanks, Crystal. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Re: Question about mllib als's implicit training
This all describes how the implementation operates, logically. The matrix P is never formed, for sure, certainly not by the caller. The implementation actually extends to handle negative values in R too but it's all taken care of by the implementation. On Thu, Feb 12, 2015 at 11:29 PM, Crystal Xing crystalxin...@gmail.com wrote: HI Sean, I am reading the paper of implicit training. Collaborative Filtering for Implicit Feedback Datasets It mentioned To this end, let us introduce a set of binary variables p_ui, which indicates the preference of user u to item i. The p_ui values are derived by binarizing the r_ui values: p_ui = 1 if r_ui 0 and p_ui=0 if r_ui = 0 If for user_item without interactions, I do not include it in the training data. All the r_ui will 0 and all the p_ui is always 1? Or the Mllib's implementation automatically takes care of those no interaction user_product pairs ? On Thu, Feb 12, 2015 at 3:13 PM, Sean Owen so...@cloudera.com wrote: Where there is no user-item interaction, you provide no interaction, not an interaction with strength 0. Otherwise your input is fully dense. On Thu, Feb 12, 2015 at 11:09 PM, Crystal Xing crystalxin...@gmail.com wrote: Hi, I have some implicit rating data, such as the purchasing data. I read the paper about the implicit training algorithm used in spark and it mentioned the for user-prodct pairs which do not have implicit rating data, such as no purchase, we need to provide the value as 0. This is different from explicit training where when we provide training data, for user-product pair without a rating, we just do not have them in the training data instead of adding a user-product pair with rating 0. Am I understand this correctly? Or for implicit training implementation in spark, the missing data will be automatically filled out as zero and we do not need to add them in the training data set? Thanks, Crystal. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Question about mllib als's implicit training
Many thanks! On Thu, Feb 12, 2015 at 3:31 PM, Sean Owen so...@cloudera.com wrote: This all describes how the implementation operates, logically. The matrix P is never formed, for sure, certainly not by the caller. The implementation actually extends to handle negative values in R too but it's all taken care of by the implementation. On Thu, Feb 12, 2015 at 11:29 PM, Crystal Xing crystalxin...@gmail.com wrote: HI Sean, I am reading the paper of implicit training. Collaborative Filtering for Implicit Feedback Datasets It mentioned To this end, let us introduce a set of binary variables p_ui, which indicates the preference of user u to item i. The p_ui values are derived by binarizing the r_ui values: p_ui = 1 if r_ui 0 and p_ui=0 if r_ui = 0 If for user_item without interactions, I do not include it in the training data. All the r_ui will 0 and all the p_ui is always 1? Or the Mllib's implementation automatically takes care of those no interaction user_product pairs ? On Thu, Feb 12, 2015 at 3:13 PM, Sean Owen so...@cloudera.com wrote: Where there is no user-item interaction, you provide no interaction, not an interaction with strength 0. Otherwise your input is fully dense. On Thu, Feb 12, 2015 at 11:09 PM, Crystal Xing crystalxin...@gmail.com wrote: Hi, I have some implicit rating data, such as the purchasing data. I read the paper about the implicit training algorithm used in spark and it mentioned the for user-prodct pairs which do not have implicit rating data, such as no purchase, we need to provide the value as 0. This is different from explicit training where when we provide training data, for user-product pair without a rating, we just do not have them in the training data instead of adding a user-product pair with rating 0. Am I understand this correctly? Or for implicit training implementation in spark, the missing data will be automatically filled out as zero and we do not need to add them in the training data set? Thanks, Crystal.
Question about mllib als's implicit training
Hi, I have some implicit rating data, such as the purchasing data. I read the paper about the implicit training algorithm used in spark and it mentioned the for user-prodct pairs which do not have implicit rating data, such as no purchase, we need to provide the value as 0. This is different from explicit training where when we provide training data, for user-product pair without a rating, we just do not have them in the training data instead of adding a user-product pair with rating 0. Am I understand this correctly? Or for implicit training implementation in spark, the missing data will be automatically filled out as zero and we do not need to add them in the training data set? Thanks, Crystal.
spark left outer join with java.lang.UnsupportedOperationException: empty collection
Hi, I am using Spark 1.2.0 with Hadoop 2.2. Now I have to 2 csv files, but have 8 fields. I know that the first field from both files are IDs. I want to find all the IDs existed in the first file, but NOT in the 2nd file. I am coming with the following code in spark-shell. case class origAsLeft (id: String)case class newAsRight (id: String)val OrigData = sc.textFile(hdfs://firstfile).map(_.split(,)).map( r=(r(0), origAsLeft(r(0val NewData = sc.textFile(hdfs://secondfile).map(_.split(,)).map( r=(r(0), newAsRight(r(0val output = OrigData.leftOuterJoin(NewData).filter{ case (k, v) = v._2 == null } Find what I understand, after OrigData left outer join with NewData, it will use the id as the key, and a tuple with (leftObject, RightObject).Since I want to find out all the IDs existed in the first file, but not in the 2nd one, so the output RDD will be the one I want, as it will filter out only when there is no newAsRight object from NewData. Then I run output.first Spark does start to run, but give me the following error message:15/02/12 16:43:38 INFO scheduler.DAGScheduler: Job 4 finished: first at console:21, took 78.303549 sjava.lang.UnsupportedOperationException: empty collection at org.apache.spark.rdd.RDD.first(RDD.scala:1095) at $iwC$$iwC$$iwC$$iwC.init(console:21) at $iwC$$iwC$$iwC.init(console:26) at $iwC$$iwC.init(console:28) at $iwC.init(console:30)at init(console:32) at .init(console:36) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:94) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55) at java.lang.reflect.Method.invoke(Method.java:619) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:873) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:628) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:968) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:94) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55) at java.lang.reflect.Method.invoke(Method.java:619) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Did I do anything wrong? What is the way to find all the id in the first file, but not in the 2nd file? Second question is how can I use the object field to do the compare in this case? For example, if I define: case class origAsLeft (id: String, name: String)case class newAsRight (id: String, name: String)val OrigData = sc.textFile(hdfs://firstfile).map(_.split(,)).map( r=(r(0), origAsLeft(r(0), r(1val NewData = sc.textFile(hdfs://secondfile).map(_.split(,)).map( r=(r(0), newAsRight(r(0), r(1// in this case, I want to list all the data in the first file which has the same ID as in the 2nd file, but with different value in name, I want to do something like below: val output = OrigData.join(NewData).filter{ case (k, v) = v._1.name != v._2.name } But what is the syntax to use the field in the case class I defined? Thanks Yong
Re: Is it possible to expose SchemaRDD’s from thrift server?
Thanks Michael. I will give it a try. On Thu, Feb 12, 2015 at 6:00 PM, Michael Armbrust mich...@databricks.com wrote: You can start a JDBC server with an existing context. See my answer here: http://apache-spark-user-list.1001560.n3.nabble.com/Standard-SQL-tool-access-to-SchemaRDD-td20197.html On Thu, Feb 12, 2015 at 7:24 AM, Todd Nist tsind...@gmail.com wrote: I have a question with regards to accessing SchemaRDD’s and Spark SQL temp tables via the thrift server. It appears that a SchemaRDD when created is only available in the local namespace / context and are unavailable to external services accessing Spark through thrift server via ODBC; is this correct? Does the same apply to temp tables? If we process data on Spark how is it exposed to the thrift server for access by third party BI applications via ODBC? Dose one need to have two spark context, one for processing, then dump it to metastore from which a third party application can fetch the data or is it possible to expose the resulting SchemaRDD via the thrift server? I am trying to do this with Tableau, Spark SQL Connector. From what I can see I need the spark context for processing and then dump to metastore. Is it possible to access the resulting SchemaRDD from doing something like this: create temporary table test using org.apache.spark.sql.json options (path ‘/data/json/*'); cache table test; I am using Spark 1.2.1. If not available now will it be in 1.3.x? Or is the only way to achieve this is store into the metastore and does the imply hive. -Todd
Re: obtain cluster assignment in K-means
KMeans.train actually returns a KMeansModel so you can use predict() method of the model e.g. clusters.predict(pointToPredict) or clusters.predict(pointsToPredict) first is a single Vector, 2nd is RDD[Vector] Robin On 12 Feb 2015, at 06:37, Shi Yu shiyu@gmail.com wrote: Hi there, I am new to spark. When training a model using K-means using the following code, how do I obtain the cluster assignment in the next step? val clusters = KMeans.train(parsedData, numClusters, numIterations) I searched around many examples but they mostly calculate the WSSSE. I am still confused. Thanks! Eilian
Re: PySpark 1.2 Hadoop version mismatch
No, mr1 should not be the issue here, and I think that would break other things. The OP is not using mr1. client 4 / server 7 means roughly client is Hadoop 1.x, server is Hadoop 2.0.x. Normally, I'd say I think you are packaging Hadoop code in your app by brining in Spark and its deps. Your app shouldn't have any of this code. If you're running straight examples though, I'm less sure. If anything, your client is later than your server. I wonder if you have anything else set on your local classpath via env variables? On Thu, Feb 12, 2015 at 6:52 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Did you have a look at http://spark.apache.org/docs/1.2.0/building-spark.html I think you can simply download the source and build for your hadoop version as: mvn -Dhadoop.version=2.0.0-mr1-cdh4.7.0 -DskipTests clean package Thanks Best Regards On Thu, Feb 12, 2015 at 11:45 AM, Michael Nazario mnaza...@palantir.com wrote: I also forgot some other information. I have made this error go away by making my pyspark application use spark-1.1.1-bin-cdh4 for the driver, but communicate with a spark 1.2 master and worker. It's not a good workaround, so I would like to have the driver also be spark 1.2 Michael From: Michael Nazario Sent: Wednesday, February 11, 2015 10:13 PM To: user@spark.apache.org Subject: PySpark 1.2 Hadoop version mismatch Hi Spark users, I seem to be having this consistent error which I have been trying to reproduce and narrow down the problem. I've been running a PySpark application on Spark 1.2 reading avro files from Hadoop. I was consistently seeing the following error: py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopFile. : org.apache.hadoop.ipc.RemoteException: Server IPC version 7 cannot communicate with client version 4 After some searching, I noticed that this most likely meant my hadoop versions were mismatched. I had the following versions at the time: Hadoop: hadoop-2.0.0-cdh4.7.0 Spark: spark-1.2.0-bin-cdh4.2.0 In the past, I've never had a problem with this setup for Spark 1.1.1 or Spark 1.0.2. I figured it was worth me rebuilding Spark in case I was wrong about versions. To rebuild my Spark, I ran this command on the v1.2.0 tag: ./make-distribution.sh -Dhadoop.version=2.0.0-cdh4.7.0 I then retried my previously mentioned application with this new build of Spark. Same error. To narrow down the problem some more, I figured I should try out the example which comes with spark which allows you to load an avro file. I ran the below command (I know it uses a deprecated way of passing jars to the driver classpath): SPARK_CLASSPATH=/path/to/avro-mapred-1.7.4-hadoop2.jar:lib/spark-examples-1.2.0-hadoop2.0.0-cdh4.7.0.jar:$SPARK_CLASSPATH bin/spark-submit ./examples/src/main/python/avro_inputformat.py hdfs://localhost:8020/path/to/file.avro I ended up with the same error. The full stacktrace is below. Traceback (most recent call last): File /git/spark/dist/./examples/src/main/python/avro_inputformat.py, line 77, in module conf=conf) File /git/spark/dist/python/pyspark/context.py, line 503, in newAPIHadoopFile jconf, batchSize) File /git/spark/dist/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /git/spark/dist/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopFile. : org.apache.hadoop.ipc.RemoteException: Server IPC version 7 cannot communicate with client version 4 at org.apache.hadoop.ipc.Client.call(Client.java:1113) at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:229) at com.sun.proxy.$Proxy8.getProtocolVersion(Unknown Source) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:85) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:62) at com.sun.proxy.$Proxy8.getProtocolVersion(Unknown Source) at org.apache.hadoop.ipc.RPC.checkVersion(RPC.java:422) at org.apache.hadoop.hdfs.DFSClient.createNamenode(DFSClient.java:183) at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:281) at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:245) at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:100) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1446) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:67) at
Re: Re: Sort Shuffle performance issues about using AppendOnlyMap for large data sets
The map will start with a capacity of 64, but will grow to accommodate new data. Are you using the groupBy operator in Spark or are you using Spark SQL's group by? This usually happens if you are grouping or aggregating in a way that doesn't sufficiently condense the data created from each input partition. - Patrick On Wed, Feb 11, 2015 at 9:37 PM, fightf...@163.com fightf...@163.com wrote: Hi, Really have no adequate solution got for this issue. Expecting any available analytical rules or hints. Thanks, Sun. fightf...@163.com From: fightf...@163.com Date: 2015-02-09 11:56 To: user; dev Subject: Re: Sort Shuffle performance issues about using AppendOnlyMap for large data sets Hi, Problem still exists. Any experts would take a look at this? Thanks, Sun. fightf...@163.com From: fightf...@163.com Date: 2015-02-06 17:54 To: user; dev Subject: Sort Shuffle performance issues about using AppendOnlyMap for large data sets Hi, all Recently we had caught performance issues when using spark 1.2.0 to read data from hbase and do some summary work. Our scenario means to : read large data sets from hbase (maybe 100G+ file) , form hbaseRDD, transform to schemardd, groupby and aggregate the data while got fewer new summary data sets, loading data into hbase (phoenix). Our major issue lead to : aggregate large datasets to get summary data sets would consume too long time (1 hour +) , while that should be supposed not so bad performance. We got the dump file attached and stacktrace from jstack like the following: From the stacktrace and dump file we can identify that processing large datasets would cause frequent AppendOnlyMap growing, and leading to huge map entrysize. We had referenced the source code of org.apache.spark.util.collection.AppendOnlyMap and found that the map had been initialized with capacity of 64. That would be too small for our use case. So the question is : Does anyone had encounted such issues before? How did that be resolved? I cannot find any jira issues for such problems and if someone had seen, please kindly let us know. More specified solution would goes to : Does any possibility exists for user defining the map capacity releatively in spark? If so, please tell how to achieve that. Best Thanks, Sun. Thread 22432: (state = IN_JAVA) - org.apache.spark.util.collection.AppendOnlyMap.growTable() @bci=87, line=224 (Compiled frame; information may be imprecise) - org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.growTable() @bci=1, line=38 (Interpreted frame) - org.apache.spark.util.collection.AppendOnlyMap.incrementSize() @bci=22, line=198 (Compiled frame) - org.apache.spark.util.collection.AppendOnlyMap.changeValue(java.lang.Object, scala.Function2) @bci=201, line=145 (Compiled frame) - org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(java.lang.Object, scala.Function2) @bci=3, line=32 (Compiled frame) - org.apache.spark.util.collection.ExternalSorter.insertAll(scala.collection.Iterator) @bci=141, line=205 (Compiled frame) - org.apache.spark.shuffle.sort.SortShuffleWriter.write(scala.collection.Iterator) @bci=74, line=58 (Interpreted frame) - org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext) @bci=169, line=68 (Interpreted frame) - org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext) @bci=2, line=41 (Interpreted frame) - org.apache.spark.scheduler.Task.run(long) @bci=77, line=56 (Interpreted frame) - org.apache.spark.executor.Executor$TaskRunner.run() @bci=310, line=196 (Interpreted frame) - java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker) @bci=95, line=1145 (Interpreted frame) - java.util.concurrent.ThreadPoolExecutor$Worker.run() @bci=5, line=615 (Interpreted frame) - java.lang.Thread.run() @bci=11, line=744 (Interpreted frame) Thread 22431: (state = IN_JAVA) - org.apache.spark.util.collection.AppendOnlyMap.growTable() @bci=87, line=224 (Compiled frame; information may be imprecise) - org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.growTable() @bci=1, line=38 (Interpreted frame) - org.apache.spark.util.collection.AppendOnlyMap.incrementSize() @bci=22, line=198 (Compiled frame) - org.apache.spark.util.collection.AppendOnlyMap.changeValue(java.lang.Object, scala.Function2) @bci=201, line=145 (Compiled frame) - org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(java.lang.Object, scala.Function2) @bci=3, line=32 (Compiled frame) - org.apache.spark.util.collection.ExternalSorter.insertAll(scala.collection.Iterator) @bci=141, line=205 (Compiled frame) - org.apache.spark.shuffle.sort.SortShuffleWriter.write(scala.collection.Iterator) @bci=74, line=58 (Interpreted frame) - org.apache.spark.scheduler.ShuffleMapTask.runTask(org.apache.spark.TaskContext)
Re: No executors allocated on yarn with latest master branch
Interesting to hear that it works for you. Are you using Yarn 2.2 as well? No strange log message during startup, and can't see any other log messages since no executer gets launched. Does not seems to work in yarn-client mode either, failing with the exception below. Exception in thread main org.apache.spark.SparkException: Yarn application has already ended! It might have been killed or unable to launch application master. at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:119) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:59) at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:141) at org.apache.spark.SparkContext.init(SparkContext.scala:370) at com.spotify.analytics.AnalyticsSparkContext.init(AnalyticsSparkContext.scala:8) at com.spotify.analytics.DataSampler$.main(DataSampler.scala:42) at com.spotify.analytics.DataSampler.main(DataSampler.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:551) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:155) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:178) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:99) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) /Anders On Thu, Feb 12, 2015 at 1:33 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Anders, I just tried this out and was able to successfully acquire executors. Any strange log messages or additional color you can provide on your setup? Does yarn-client mode work? -Sandy On Wed, Feb 11, 2015 at 1:28 PM, Anders Arpteg arp...@spotify.com wrote: Hi, Compiled the latest master of Spark yesterday (2015-02-10) for Hadoop 2.2 and failed executing jobs in yarn-cluster mode for that build. Works successfully with spark 1.2 (and also master from 2015-01-16), so something has changed since then that prevents the job from receiving any executors on the cluster. Basic symptoms are that the jobs fires up the AM, but after examining the executors page in the web ui, only the driver is listed, no executors are ever received, and the driver keep waiting forever. Has anyone seemed similar problems? Thanks for any insights, Anders
Re: Can't access remote Hive table from spark
When you log in, you have root access. Then you can do “su hdfs” or any other account. Then you can create hdfs directory and change permission, etc. Thanks Zhan Zhang On Feb 11, 2015, at 11:28 PM, guxiaobo1982 guxiaobo1...@qq.commailto:guxiaobo1...@qq.com wrote: Hi Zhan, Yes, I found there is a hdfs account, which is created by Ambari, but what's the password for this account, how can I login under this account? Can I just change the password for the hdfs account? Regards, -- Original -- From: Zhan Zhang;zzh...@hortonworks.commailto:zzh...@hortonworks.com; Send time: Thursday, Feb 12, 2015 2:00 AM To: guxiaobo1...@qq.commailto:guxiaobo1...@qq.com; Cc: user@spark.apache.orgmailto:user@spark.apache.orguser@spark.apache.orgmailto:user@spark.apache.org; Cheng Lianlian.cs@gmail.commailto:lian.cs@gmail.com; Subject: Re: Can't access remote Hive table from spark You need to have right hdfs account, e.g., hdfs, to create directory and assign permission. Thanks. Zhan Zhang On Feb 11, 2015, at 4:34 AM, guxiaobo1982 guxiaobo1...@qq.commailto:guxiaobo1...@qq.com wrote: Hi Zhan, My Single Node Cluster of Hadoop is installed by Ambari 1.7.0, I tried to create the /user/xiaobogu directory in hdfs, but both failed with user xiaobogu and root [xiaobogu@lix1 current]$ hadoop dfs -mkdir /user/xiaobogu DEPRECATED: Use of this script to execute hdfs command is deprecated. Instead use the hdfs command for it. mkdir: Permission denied: user=xiaobogu, access=WRITE, inode=/user:hdfs:hdfs:drwxr-xr-x root@lix1 bin]# hadoop dfs -mkdir /user/xiaobogu DEPRECATED: Use of this script to execute hdfs command is deprecated. Instead use the hdfs command for it. mkdir: Permission denied: user=root, access=WRITE, inode=/user:hdfs:hdfs:drwxr-xr-x I notice there is a hdfs account created by ambari, but what's password for it, should I user the hdfs account to create the directory? -- Original -- From: Zhan Zhang;zzh...@hortonworks.commailto:zzh...@hortonworks.com; Send time: Sunday, Feb 8, 2015 4:11 AM To: guxiaobo1...@qq.commailto:guxiaobo1...@qq.com; Cc: user@spark.apache.orgmailto:user@spark.apache.orguser@spark.apache.orgmailto:user@spark.apache.org; Cheng Lianlian.cs@gmail.commailto:lian.cs@gmail.com; Subject: Re: Can't access remote Hive table from spark Yes. You need to create xiaobogu under /user and provide right permission to xiaobogu. Thanks. Zhan Zhang On Feb 7, 2015, at 8:15 AM, guxiaobo1982 guxiaobo1...@qq.commailto:guxiaobo1...@qq.com wrote: Hi Zhan Zhang, With the pre-bulit version 1.2.0 of spark against the yarn cluster installed by ambari 1.7.0, I come with the following errors: [xiaobogu@lix1 spark]$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi--master yarn-cluster --num-executors 3 --driver-memory 512m --executor-memory 512m --executor-cores 1 lib/spark-examples*.jar 10 Spark assembly has been built with Hive, including Datanucleus jars on classpath 15/02/08 00:11:53 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/02/08 00:11:54 INFO client.RMProxy: Connecting to ResourceManager at lix1.bh.com/192.168.100.3:8050http://lix1.bh.com/192.168.100.3:8050 15/02/08 00:11:56 INFO yarn.Client: Requesting a new application from cluster with 1 NodeManagers 15/02/08 00:11:57 INFO yarn.Client: Verifying our application has not requested more than the maximum memory capability of the cluster (4096 MB per container) 15/02/08 00:11:57 INFO yarn.Client: Will allocate AM container, with 896 MB memory including 384 MB overhead 15/02/08 00:11:57 INFO yarn.Client: Setting up container launch context for our AM 15/02/08 00:11:57 INFO yarn.Client: Preparing resources for our AM container 15/02/08 00:11:58 WARN hdfs.BlockReaderLocal: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. Exception in thread main org.apache.hadoop.security.AccessControlException: Permission denied: user=xiaobogu, access=WRITE, inode=/user:hdfs:hdfs:drwxr-xr-x at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkFsPermission(FSPermissionChecker.java:271) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:257) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:238) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:179) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6515) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6497) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkAncestorAccess(FSNamesystem.java:6449) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInternal(FSNamesystem.java:4251) at
Is there a separate mailing list for Spark Developers ?
d...@spark.apache.org http://apache-spark-developers-list.1001551.n3.nabble.com/ mentioned on http://spark.apache.org/community.html seems to be bouncing. Is there another one ?
Re: How to broadcast a variable read from a file in yarn-cluster mode?
Can you give me the whole logs? TD On Tue, Feb 10, 2015 at 10:48 AM, Jon Gregg jonrgr...@gmail.com wrote: OK that worked and getting close here ... the job ran successfully for a bit and I got output for the first couple buckets before getting a java.lang.Exception: Could not compute split, block input-0-1423593163000 not found error. So I bumped up the memory at the command line from 2 gb to 5 gb, ran it again ... this time I got around 8 successful outputs before erroring. Bumped up the memory from 5 gb to 10 gb ... got around 15 successful outputs before erroring. I'm not persisting or caching anything except for the broadcast IP table and another broadcast small user agents list used for the same type of filtering, and both files are tiny. The Hadoop cluster is nearly empty right now and has more than enough available memory to handle this job. I am connecting to Kafka as well and so there's a lot of data coming through as my index is trying to catch up to the current date, but yarn-client mode has several times in the past few weeks been able to catch up to the current date and run successfully for days without issue. My guess is memory isn't being cleared after each bucket? Relevant portion of the log below. 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593134400 on phd40010023.na.com:1 in memory (size: 50.1 MB, free: 10.2 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593135400 on phd40010023.na.com:1 in memory (size: 24.9 MB, free: 10.2 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593136400 on phd40010023.na.com:1 in memory (size: 129.0 MB, free: 10.3 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593137000 on phd40010023.na.com:1 in memory (size: 112.4 MB, free: 10.4 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593137200 on phd40010023.na.com:1 in memory (size: 481.0 B, free: 10.4 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593137800 on phd40010023.na.com:1 in memory (size: 44.6 MB, free: 10.5 GB) 15/02/10 13:34:54 INFO MappedRDD: Removing RDD 754 from persistence list 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593138400 on phd40010023.na.com:1 in memory (size: 95.8 MB, free: 10.6 GB) 15/02/10 13:34:54 INFO BlockManager: Removing RDD 754 15/02/10 13:34:54 INFO MapPartitionsRDD: Removing RDD 753 from persistence list 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593138600 on phd40010023.na.com:1 in memory (size: 123.2 MB, free: 10.7 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593138800 on phd40010023.na.com:1 in memory (size: 5.2 KB, free: 10.7 GB) 15/02/10 13:34:54 INFO BlockManager: Removing RDD 753 15/02/10 13:34:54 INFO MappedRDD: Removing RDD 750 from persistence list 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593139600 on phd40010023.na.com:1 in memory (size: 106.4 MB, free: 10.8 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593139800 on phd40010023.na.com:1 in memory (size: 107.0 MB, free: 10.9 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-142359314 on phd40010023.na.com:1 in memory (size: 59.5 MB, free: 10.9 GB) 15/02/10 13:34:54 INFO BlockManager: Removing RDD 750 15/02/10 13:34:54 INFO MappedRDD: Removing RDD 749 from persistence list 15/02/10 13:34:54 INFO DAGScheduler: Missing parents: List(Stage 117, Stage 114, Stage 115, Stage 116) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593140200 on phd40010023.na.com:1 in memory (size: 845.0 B, free: 10.9 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593140600 on phd40010023.na.com:1 in memory (size: 19.2 MB, free: 11.0 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593140800 on phd40010023.na.com:1 in memory (size: 492.0 B, free: 11.0 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593141000 on phd40010023.na.com:1 in memory (size: 1018.0 B, free: 11.0 GB) 15/02/10 13:34:54 INFO BlockManager: Removing RDD 749 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593142400 on phd40010023.na.com:1 in memory (size: 48.6 MB, free: 11.0 GB) 15/02/10 13:34:54 INFO MappedRDD: Removing RDD 767 from persistence list 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593142600 on phd40010023.na.com:1 in memory (size: 4.9 KB, free: 11.0 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593142800 on phd40010023.na.com:1 in memory (size: 780.0 B, free: 11.0 GB) 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593143800 on phd40010023.na.com:1 in memory (size: 847.0 B, free: 11.0 GB) 15/02/10 13:34:54 INFO BlockManager: Removing RDD 767 15/02/10 13:34:54 INFO BlockManagerInfo: Removed input-0-1423593144400 on phd40010023.na.com:1 in memory (size: 43.7 MB,
Re: Streaming scheduling delay
1. Can you try count()? Take often does not force the entire computation. 2. Can you give the full log. From the log it seems that the blocks are added to two nodes but the tasks seem to be launched to different nodes. I dont see any message removing the blocks. So need the whole log to debug this. TD On Feb 12, 2015 8:52 PM, Tim Smith secs...@gmail.com wrote: 1) Yes, if I disable writing out to kafka and replace it with some very light weight action is rdd.take(1), the app is stable. 2) The partitions I spoke of in the previous mail are the number of partitions I create from each dStream. But yes, since I do processing and writing out, per partition, each dStream partition ends up getting written to a kafka partition. Flow is, broadly: 5 Spark/Kafka Receivers - Split each dStream into 30 partitions (150 partitions) - Apply some transformation logic to each partition - write out each partition to kafka (kafka has 23 partitions). Let me increase the number of partitions on the kafka side and see if that helps. Here's what the main block of code looks like (I added persistence back): val kInStreams = (1 to otherConf(inStreamCount).toInt).map{_ = KafkaUtils.createStream(ssc,otherConf(kafkaConsumerZk).toString,otherConf(kafkaConsumerGroupId).toString, Map(otherConf(kafkaConsumerTopic).toString - 1), StorageLevel.MEMORY_AND_DISK_SER) } if (!configMap.keySet.isEmpty) { // Process stream from each receiver separately // (do not attempt to merge all streams and then re-partition, this causes un-necessary and high amount of shuffle in the job) for (k - kInStreams) { // Re-partition stream from each receiver across all compute nodes to spread out processing load and allows per partition processing // and, set persistence level to spill to disk along with serialization val kInMsgParts = k.repartition(otherConf(dStreamPartitions).toInt).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER) val outdata = kInMsgParts.map(x=myfunc(x._2,configMap)).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER) // Write each transformed partition to Kafka via the writer object outdata.foreachRDD( rdd = rdd.foreachPartition(rec = { val writer = new KafkaOutputService(otherConf(kafkaProducerTopic).toString, propsMap) writer.output(rec) }) ) } } Here's the life-cycle of a lost block: 15/02/12 16:26:12 INFO BlockManagerInfo: Added input-4-1423758372200 in memory on nodedn1-23-acme.com:34526 (size: 164.7 KB, free: 379.5 MB) 15/02/12 16:26:12 INFO BlockManagerInfo: Added input-4-1423758372200 in memory on nodedn1-22-acme.com:42084 (size: 164.7 KB, free: 366.5 MB) 15/02/12 16:31:21 INFO BlockManagerInfo: Added input-4-1423758372200 on disk on nodedn1-22-acme.com:42084 (size: 164.7 KB) 15/02/12 16:31:23 INFO BlockManagerInfo: Added input-4-1423758372200 on disk on nodedn1-23-acme.com:34526 (size: 164.7 KB) 15/02/12 16:32:27 WARN TaskSetManager: Lost task 54.0 in stage 16291.0 (TID 1042569, nodedn1-13-acme.com): java.lang.Exception: Could not compute split, block input-4-1423758372200 not found 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.1 in stage 16291.0 (TID 1042575) on executor nodedn1-21-acme.com: java.lang.Exception (Could not compute split, block input-4-1423758372200 not found) [duplicate 1] 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.2 in stage 16291.0 (TID 1042581) on executor nodedn1-21-acme.com: java.lang.Exception (Could not compute split, block input-4-1423758372200 not found) [duplicate 2] 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.3 in stage 16291.0 (TID 1042584) on executor nodedn1-20-acme.com: java.lang.Exception (Could not compute split, block input-4-1423758372200 not found) [duplicate 3] 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.4 in stage 16291.0 (TID 1042586) on executor nodedn1-11-acme.com: java.lang.Exception (Could not compute split, block input-4-1423758372200 not found) [duplicate 4] 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.5 in stage 16291.0 (TID 1042589) on executor nodedn1-14-acme.com: java.lang.Exception (Could not compute split, block input-4-1423758372200 not found) [duplicate 5] 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.6 in stage 16291.0 (TID 1042594) on executor nodedn1-15-acme.com: java.lang.Exception (Could not compute split, block input-4-1423758372200 not found) [duplicate 6] 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.7 in stage 16291.0 (TID 1042597) on executor nodedn1-20-acme.com: java.lang.Exception (Could not compute split, block input-4-1423758372200 not found) [duplicate 7] 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.8 in stage 16291.0 (TID 1042600) on executor nodedn1-17-acme.com: java.lang.Exception (Could not compute split, block input-4-1423758372200 not found)
Re: Streaming scheduling delay
Hi Tim, I think this code will still introduce shuffle even when you call repartition on each input stream. Actually this style of implementation will generate more jobs (job per each input stream) than union into one stream as called DStream.union(), and union normally has no special overhead as I understood. Also as Cody said, creating Producer per partition could be a potential overhead, producer pool or sharing the Producer for one executor might be better :). // Process stream from each receiver separately // (do not attempt to merge all streams and then re-partition, this causes un-necessary and high amount of shuffle in the job) for (k - kInStreams) { // Re-partition stream from each receiver across all compute nodes to spread out processing load and allows per partition processing // and, set persistence level to spill to disk along with serialization val kInMsgParts = k.repartition(otherConf(dStreamPartitions).toInt). 2015-02-13 13:27 GMT+08:00 Cody Koeninger c...@koeninger.org: outdata.foreachRDD( rdd = rdd.foreachPartition(rec = { val writer = new KafkaOutputService(otherConf(kafkaProducerTopic).toString, propsMap) writer.output(rec) }) ) So this is creating a new kafka producer for every new output partition, right? Have you tried pooling the producers? On Thu, Feb 12, 2015 at 10:52 PM, Tim Smith secs...@gmail.com wrote: 1) Yes, if I disable writing out to kafka and replace it with some very light weight action is rdd.take(1), the app is stable. 2) The partitions I spoke of in the previous mail are the number of partitions I create from each dStream. But yes, since I do processing and writing out, per partition, each dStream partition ends up getting written to a kafka partition. Flow is, broadly: 5 Spark/Kafka Receivers - Split each dStream into 30 partitions (150 partitions) - Apply some transformation logic to each partition - write out each partition to kafka (kafka has 23 partitions). Let me increase the number of partitions on the kafka side and see if that helps. Here's what the main block of code looks like (I added persistence back): val kInStreams = (1 to otherConf(inStreamCount).toInt).map{_ = KafkaUtils.createStream(ssc,otherConf(kafkaConsumerZk).toString,otherConf(kafkaConsumerGroupId).toString, Map(otherConf(kafkaConsumerTopic).toString - 1), StorageLevel.MEMORY_AND_DISK_SER) } if (!configMap.keySet.isEmpty) { // Process stream from each receiver separately // (do not attempt to merge all streams and then re-partition, this causes un-necessary and high amount of shuffle in the job) for (k - kInStreams) { // Re-partition stream from each receiver across all compute nodes to spread out processing load and allows per partition processing // and, set persistence level to spill to disk along with serialization val kInMsgParts = k.repartition(otherConf(dStreamPartitions).toInt).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER) val outdata = kInMsgParts.map(x=myfunc(x._2,configMap)).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER) // Write each transformed partition to Kafka via the writer object outdata.foreachRDD( rdd = rdd.foreachPartition(rec = { val writer = new KafkaOutputService(otherConf(kafkaProducerTopic).toString, propsMap) writer.output(rec) }) ) } } Here's the life-cycle of a lost block: 15/02/12 16:26:12 INFO BlockManagerInfo: Added input-4-1423758372200 in memory on nodedn1-23-acme.com:34526 (size: 164.7 KB, free: 379.5 MB) 15/02/12 16:26:12 INFO BlockManagerInfo: Added input-4-1423758372200 in memory on nodedn1-22-acme.com:42084 (size: 164.7 KB, free: 366.5 MB) 15/02/12 16:31:21 INFO BlockManagerInfo: Added input-4-1423758372200 on disk on nodedn1-22-acme.com:42084 (size: 164.7 KB) 15/02/12 16:31:23 INFO BlockManagerInfo: Added input-4-1423758372200 on disk on nodedn1-23-acme.com:34526 (size: 164.7 KB) 15/02/12 16:32:27 WARN TaskSetManager: Lost task 54.0 in stage 16291.0 (TID 1042569, nodedn1-13-acme.com): java.lang.Exception: Could not compute split, block input-4-1423758372200 not found 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.1 in stage 16291.0 (TID 1042575) on executor nodedn1-21-acme.com: java.lang.Exception (Could not compute split, block input-4-1423758372200 not found) [duplicate 1] 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.2 in stage 16291.0 (TID 1042581) on executor nodedn1-21-acme.com: java.lang.Exception (Could not compute split, block input-4-1423758372200 not found) [duplicate 2] 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.3 in stage 16291.0 (TID 1042584) on
Re: Can spark job server be used to visualize streaming data?
Apache Zeppelin also has a scheduler and then you can reload your chart periodically, Check it out: http://zeppelin.incubator.apache.org/docs/tutorial/tutorial.html On Fri Feb 13 2015 at 7:29:00 AM Silvio Fiorito silvio.fior...@granturing.com wrote: One method I’ve used is to publish each batch to a message bus or queue with a custom UI listening on the other end, displaying the results in d3.js or some other app. As far as I’m aware there isn’t a tool that will directly take a DStream. Spark Notebook seems to have some support for updating graphs periodically. I haven’t used it myself yet so not sure how well it works. See here: https://github.com/andypetrella/spark-notebook From: Su She Date: Thursday, February 12, 2015 at 1:55 AM To: Felix C Cc: Kelvin Chu, user@spark.apache.org Subject: Re: Can spark job server be used to visualize streaming data? Hello Felix, I am already streaming in very simple data using Kafka (few messages / second, each record only has 3 columns...really simple, but looking to scale once I connect everything). I am processing it in Spark Streaming and am currently writing word counts to hdfs. So the part where I am confused is... Kafka Publishes Data - Kafka Consumer/Spark Streaming Receives Data - Spark Word Count - *How do I visualize?* is there a viz tool that I can set up to visualize JavaPairDStreams? or do I have to write to hbase/hdfs first? Thanks! On Wed, Feb 11, 2015 at 10:39 PM, Felix C felixcheun...@hotmail.com wrote: What kind of data do you have? Kafka is a popular source to use with spark streaming. But, spark streaming also support reading from a file. Its called basic source https://spark.apache.org/docs/latest/streaming-programming-guide.html#input-dstreams-and-receivers --- Original Message --- From: Su She suhsheka...@gmail.com Sent: February 11, 2015 10:23 AM To: Felix C felixcheun...@hotmail.com Cc: Kelvin Chu 2dot7kel...@gmail.com, user@spark.apache.org Subject: Re: Can spark job server be used to visualize streaming data? Thank you Felix and Kelvin. I think I'll def be using the k-means tools in mlib. It seems the best way to stream data is by storing in hbase and then using an api in my viz to extract data? Does anyone have any thoughts on this? Thanks! On Tue, Feb 10, 2015 at 11:45 PM, Felix C felixcheun...@hotmail.com wrote: Checkout https://databricks.com/blog/2015/01/28/introducing-streaming-k-means-in-spark-1-2.html In there are links to how that is done. --- Original Message --- From: Kelvin Chu 2dot7kel...@gmail.com Sent: February 10, 2015 12:48 PM To: Su She suhsheka...@gmail.com Cc: user@spark.apache.org Subject: Re: Can spark job server be used to visualize streaming data? Hi Su, Out of the box, no. But, I know people integrate it with Spark Streaming to do real-time visualization. It will take some work though. Kelvin On Mon, Feb 9, 2015 at 5:04 PM, Su She suhsheka...@gmail.com wrote: Hello Everyone, I was reading this blog post: http://homes.esat.kuleuven.be/~bioiuser/blog/a-d3-visualisation-from-spark-as-a-service/ and was wondering if this approach can be taken to visualize streaming data...not just historical data? Thank you! -Suh
Re: Spark streaming job throwing ClassNotFound exception when recovering from checkpointing
Could you come up with a minimal example through which I can reproduce the problem? On Tue, Feb 10, 2015 at 12:30 PM, conor fennell.co...@gmail.com wrote: I am getting the following error when I kill the spark driver and restart the job: 15/02/10 17:31:05 INFO CheckpointReader: Attempting to load checkpoint from file hdfs://hdfs-namenode.vagrant:9000/reporting/SampleJob$0.1.0/checkpoint-142358910.bk 15/02/10 17:31:05 WARN CheckpointReader: Error reading checkpoint from file hdfs://hdfs-namenode.vagrant:9000/reporting/SampleJob$0.1.0/checkpoint-142358910.bk java.io.IOException: java.lang.ClassNotFoundException: com.example.spark.streaming.reporting.live.jobs.Bucket at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:988) at org.apache.spark.streaming.DStreamGraph.readObject(DStreamGraph.scala:176) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) Spark version is 1.2.0 The streaming job is executing every 10 seconds with the following steps: 1. Consuming JSON from a kafka topic called journeys and converting to case classes 2. Filters resulting journeys stream based on a time attribute being set 3. FlatMaps journeys into separate time buckets 15, 60 and 1440 minutes e.g. (HOUR1234569000, ActiveState(HOUR, 1234569000, hyperLogLog(journey id), 360) ) 4. ReduceByKey adding hyperloglogs 5. UpdateStateByKey to add to previous states hyperloglog 6. Then output results to Cassandra I have pasted in a sample app below to mimic the problem and put all classes into one file, it is also attached here SampleJob.scala http://apache-spark-user-list.1001560.n3.nabble.com/file/n21582/SampleJob.scala To get around the issue for the moment, I have removed the Bucket class and stopped passing in a bucket array to the ActiveJourney class. And instead I hard code all the time buckets I need in the ActiveJourney class; this approach works and recovers from checkpointing but is not extensible. Can the Spark gurus explain why I get that ClassNotFound exception? Need any more information, please let me know. Much thanks, Conor package com.example.spark.streaming.reporting.live.jobs import java.util.Date import scala.Array.canBuildFrom import scala.collection.mutable.MutableList import org.apache.spark.SparkConf import org.apache.spark.streaming.Seconds import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.kafka.KafkaUtils import org.json4s.DefaultFormats import org.json4s.jackson.JsonMethods.parse import org.json4s.jvalue2extractable import org.json4s.string2JsonInput import com.example.spark.streaming.utils.MilliSecondUtils import com.example.spark.streaming.utils.constants.ColumnFamilies import com.example.spark.streaming.utils.constants.Constants import com.example.spark.streaming.utils.constants.Milliseconds import com.example.spark.streaming.utils.constants.SparkConfig import com.datastax.spark.connector.SomeColumns import com.datastax.spark.connector.streaming.toDStreamFunctions import com.datastax.spark.connector.toNamedColumnRef import com.twitter.algebird.HLL import com.twitter.algebird.HyperLogLogMonoid // Json parsing classes case class Journey(o: Option[JourneyCommand], o2: Option[JourneyDetails]) case class JourneyDetails(_id: String) case class JourneyCommand($set: Option[JourneySet]) case class JourneySet(awayAt: Date) // Class not found bucket case class Bucket(val bucketType: String, val roundDown: (Long) = Long, val columnFamily: String, val size: Long, val maxIntervals: Int) // used for updateStateByKey case class ActiveState(var bucketType: String, var time: Long, var hyperLogLog: HLL, var ttl: Int) object SampleJob { private final val Name = this.getClass().getSimpleName() def main(args: Array[String]) { if (args.length 8) { System.err.println(sUsage: $Name enviroment zkQuorum group topics numThreads hdfsUri cassandra intervalSeconds) System.exit(1) } System.out.print(args) val Array(environment, zkQuorum, group, topics, numThreads, hdfsUri, cassandra, intervalSeconds) = args val checkpointDirectory = hdfsUri + /reporting/ + Name + getClass(). getPackage().getImplementationVersion() def functionToCreateContext(): StreamingContext = { // how many buckets val fifteen = Bucket(QUARTER_HOUR, MilliSecondUtils. roundDownToNearestQuarterHour, ColumnFamilies.Visits_15, Milliseconds. FifteenMinutes, 90) val hour = Bucket(HOUR, MilliSecondUtils.roundDownToNearestHour, ColumnFamilies.Visits_60, Milliseconds.Hour, 360) val day
Re: HiveContext in SparkSQL - concurrency issues
Hi, I've been reading about Spark SQL and people suggest that using HiveContext is better. So can anyone please suggest a solution to the above problem. This is stopping me from moving forward with HiveContext. Thanks Harika -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/HiveContext-in-SparkSQL-concurrency-issues-tp21491p21636.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Using Spark SQL for temporal data
I have a temporal data set in which I'd like to be able to query using Spark SQL. The dataset is actually in Accumulo and I've already written a CatalystScan implementation and RelationProvider[1] to register with the SQLContext so that I can apply my SQL statements. With my current implementation, the start and stop time ranges are set on the RelationProvider (so ultimately they become a per-table setting). I'd much rather be able to register the table without the time ranges and just specify them through the SQL query string itself (perhaps a expression in the WHERE clause?) [1] https://github.com/calrissian/accumulo-recipes/blob/master/thirdparty/spark/src/main/scala/org/calrissian/accumulorecipes/spark/sql/EventStoreCatalyst.scala
Re: Using Spark SQL for temporal data
Hi Corey, I would not recommend using the CatalystScan for this. Its lower level, and not stable across releases. You should be able to do what you want with PrunedFilteredScan https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L155, though. The filters https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala that it pushes down are already normalized so you can easily look for range predicates with the start/end columns you care about. val start = filters.find { case GreaterThan(start, startDate: String) = DateTime.parse(startDate).toDate }.getOrElse(min possible start date) val end = filters.find { case LessThan(end, endDate: String) = DateTime.parse(endDate).toDate }.getOrElse(max possible date) ... Filters are advisory, so you can ignore ones that aren't start/end. Michael On Thu, Feb 12, 2015 at 8:32 PM, Corey Nolet cjno...@gmail.com wrote: I have a temporal data set in which I'd like to be able to query using Spark SQL. The dataset is actually in Accumulo and I've already written a CatalystScan implementation and RelationProvider[1] to register with the SQLContext so that I can apply my SQL statements. With my current implementation, the start and stop time ranges are set on the RelationProvider (so ultimately they become a per-table setting). I'd much rather be able to register the table without the time ranges and just specify them through the SQL query string itself (perhaps a expression in the WHERE clause?) [1] https://github.com/calrissian/accumulo-recipes/blob/master/thirdparty/spark/src/main/scala/org/calrissian/accumulorecipes/spark/sql/EventStoreCatalyst.scala
Re: saveAsHadoopFile is not a member of ... RDD[(String, MyObject)]
Thank you. That worked. 2015-02-12 20:03 GMT+04:00 Imran Rashid iras...@cloudera.com: You need to import the implicit conversions to PairRDDFunctions with import org.apache.spark.SparkContext._ (note that this requirement will go away in 1.3: https://issues.apache.org/jira/browse/SPARK-4397) On Thu, Feb 12, 2015 at 9:36 AM, Vladimir Protsenko protsenk...@gmail.com wrote: Hi. I am stuck with how to save file to hdfs from spark. I have written MyOutputFormat extends FileOutputFormatString, MyObject, then in spark calling this: rddres.saveAsHadoopFile[MyOutputFormat](hdfs://localhost/output) or rddres.saveAsHadoopFile(hdfs://localhost/output, classOf[String], classOf[MyObject], classOf[MyOutputFormat]) where rddres is RDD[(String, MyObject)] from up of transformation pipeline. Compilation error is: /value saveAsHadoopFile is not a member of org.apache.spark.rdd.RDD[(String, vlpr.MyObject)]/. Could someone give me insights on what could be done here to make it working? Why it is not a member? Because of wrong types? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/saveAsHadoopFile-is-not-a-member-of-RDD-String-MyObject-tp21627.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: HiveContext in SparkSQL - concurrency issues
Your earlier call stack clearly states that it fails because the Derby metastore has already been started by another instance, so I think that is explained by your attempt to run this concurrently. Are you running Spark standalone? Do you have a cluster? You should be able to run spark in yarn-client mode against the hive metastore service. That should give you ability to run multiple concurrently. Be sure to copy hive-site.XML to SPARK_HOME/conf --- Original Message --- From: Harika matha.har...@gmail.com Sent: February 12, 2015 8:22 PM To: user@spark.apache.org Subject: Re: HiveContext in SparkSQL - concurrency issues Hi, I've been reading about Spark SQL and people suggest that using HiveContext is better. So can anyone please suggest a solution to the above problem. This is stopping me from moving forward with HiveContext. Thanks Harika -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/HiveContext-in-SparkSQL-concurrency-issues-tp21491p21636.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Streaming scheduling delay
Hi Gerard, Great write-up and really good guidance in there. I have to be honest, I don't know why but setting # of partitions for each dStream to a low number (5-10) just causes the app to choke/crash. Setting it to 20 gets the app going but with not so great delays. Bump it up to 30 and I start winning the war where processing time is consistently below batch time window (20 seconds) except for a batch every few batches where the compute time spikes 10x the usual. Following your guide, I took out some logInfo statements I had in the app but didn't seem to make much difference :( With a higher time window (20 seconds), I got the app to run stably for a few hours but then ran into the dreaded java.lang.Exception: Could not compute split, block input-0-1423761240800 not found. Wonder if I need to add RDD persistence back? Also, I am reaching out to Virdata with some ProServ inquiries. Thanks On Thu, Feb 12, 2015 at 4:30 AM, Gerard Maas gerard.m...@gmail.com wrote: Hi Tim, From this: There are 5 kafka receivers and each incoming stream is split into 40 partitions I suspect that you're creating too many tasks for Spark to process on time. Could you try some of the 'knobs' I describe here to see if that would help? http://www.virdata.com/tuning-spark/ -kr, Gerard. On Thu, Feb 12, 2015 at 8:44 AM, Tim Smith secs...@gmail.com wrote: Just read the thread Are these numbers abnormal for spark streaming? and I think I am seeing similar results - that is - increasing the window seems to be the trick here. I will have to monitor for a few hours/days before I can conclude (there are so many knobs/dials). On Wed, Feb 11, 2015 at 11:16 PM, Tim Smith secs...@gmail.com wrote: On Spark 1.2 (have been seeing this behaviour since 1.0), I have a streaming app that consumes data from Kafka and writes it back to Kafka (different topic). My big problem has been Total Delay. While execution time is usually window size (in seconds), the total delay ranges from a minutes to hours(s) (keeps going up). For a little while, I thought I had solved the issue by bumping up the driver memory. Then I expanded my Kafka cluster to add more nodes and the issue came up again. I tried a few things to smoke out the issue and something tells me the driver is the bottleneck again: 1) From my app, I took out the entire write-out-to-kafka piece. Sure enough, execution, scheduling delay and hence total delay fell to sub second. This assured me that whatever processing I do before writing back to kafka isn't the bottleneck. 2) In my app, I had RDD persistence set at different points but my code wasn't really re-using any RDDs so I took out all explicit persist() statements. And added, spar...unpersist to true in the context. After this, it doesn't seem to matter how much memory I give my executor, the total delay seems to be in the same range. I tried per executor memory from 2G to 12G with no change in total delay so executors aren't memory starved. Also, in the SparkUI, under the Executors tab, all executors show 0/1060MB used when per executor memory is set to 2GB, for example. 3) Input rate in the kafka consumer restricts spikes in incoming data. 4) Tried FIFO and FAIR but didn't make any difference. 5) Adding executors beyond a certain points seems useless (I guess excess ones just sit idle). At any given point in time, the SparkUI shows only one batch pending processing. So with just one batch pending processing, why would the scheduling delay run into minutes/hours if execution time is within the batch window duration? There aren't any failed stages or jobs. Right now, I have 100 executors ( i have tried setting executors from 50-150), each with 2GB and 4 cores and the driver running with 16GB. There are 5 kafka receivers and each incoming stream is split into 40 partitions. Per receiver, input rate is restricted to 2 messages per second. Can anyone help me with clues or areas to look into, for troubleshooting the issue? One nugget I found buried in the code says: The scheduler delay includes the network delay to send the task to the worker machine and to send back the result (but not the time to fetch the task result, if it needed to be fetched from the block manager on the worker). https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala Could this be an issue with the driver being a bottlneck? All the executors posting their logs/stats to the driver? Thanks, Tim
Re: Extract hour from Timestamp in Spark SQL
This looks like your executors aren't running a version of spark with hive support compiled in. On Feb 12, 2015 7:31 PM, Wush Wu w...@bridgewell.com wrote: Dear Michael, After use the org.apache.spark.sql.hive.HiveContext, the Exception: java.util. NoSuchElementException: key not found: hour is gone during the SQL planning. However, I got another error and the complete stacktrace is shown below. I am working on this now. Best, Wush Stacktrace: java.lang.ClassNotFoundException: org.apache.spark.sql.hive.HiveSimpleUdf at org.apache.spark.repl.ExecutorClassLoader.findClass(ExecutorClassLoader.scala:65) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:274) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:59) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.GeneratedMethodAccessor1.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:60) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
Re: Is there a separate mailing list for Spark Developers ?
dev@spark is active. e.g. see: http://search-hadoop.com/m/JW1q5zQ1Xw/renaming+SchemaRDD+-%253E+DataFramesubj=renaming+SchemaRDD+gt+DataFrame Cheers On Thu, Feb 12, 2015 at 8:09 PM, Manoj Samel manojsamelt...@gmail.com wrote: d...@spark.apache.org http://apache-spark-developers-list.1001551.n3.nabble.com/ mentioned on http://spark.apache.org/community.html seems to be bouncing. Is there another one ?
Re: streaming joining multiple streams
Sorry for the late response. With the amount of data you are planning join, any system would take time. However, between Hive's MapRduce joins, and Spark's basic shuffle, and Spark SQL's join, the latter wins hands down. Furthermore, with the APIs of Spark and Spark Streaming, you will have to do strictly less work to set the infrastructure that you want to build. Yes, Spark Streaming currently does not support providing own timer, because the logic to handle delays etc, is pretty complex and specific to each application. Usually that logic can be implemented on top of the windowing solutoin that Spark Streaming already provides. TD On Thu, Feb 5, 2015 at 7:37 AM, Zilvinas Saltys zilvinas.sal...@gmail.com wrote: The challenge I have is this. There's two streams of data where an event might look like this in stream1: (time, hashkey, foo1) and in stream2: (time, hashkey, foo2) The result after joining should be (time, hashkey, foo1, foo2) .. The join happens on hashkey and the time difference can be ~30 mins between events. The amount of data is enormous .. hundreds of billions of events per month. I need not only join the existing history data but continue to do so with incoming data (comes in batches not really streamed) For now I was thinking to implement this in MapReduce and sliding windows .. I'm wondering if spark can actually help me with this sort of challenge? How would a join of two huge streams of historic data would actually happen internally within spark and would it be more efficient than let's say hive map reduce stream join of two big tables? I also saw spark streaming has windowing support but it seems you cannot provide your own timer? As in I cannot make the time be derived from events itself rather than having an actual clock running. Thanks,
Re: Streaming scheduling delay
Hey Tim, Let me get the key points. 1. If you are not writing back to Kafka, the delay is stable? That is, instead of foreachRDD { // write to kafka } if you do dstream.count, then the delay is stable. Right? 2. If so, then Kafka is the bottleneck. Is the number of partitions, that you spoke of the in the second mail, that determines the parallelism in writes? Is it stable with 30 partitions? Regarding the block exception, could you give me a trace of info level logging that leads to this error? Basically I want trace the lifecycle of the block. On Thu, Feb 12, 2015 at 6:29 PM, Tim Smith secs...@gmail.com wrote: Hi Gerard, Great write-up and really good guidance in there. I have to be honest, I don't know why but setting # of partitions for each dStream to a low number (5-10) just causes the app to choke/crash. Setting it to 20 gets the app going but with not so great delays. Bump it up to 30 and I start winning the war where processing time is consistently below batch time window (20 seconds) except for a batch every few batches where the compute time spikes 10x the usual. Following your guide, I took out some logInfo statements I had in the app but didn't seem to make much difference :( With a higher time window (20 seconds), I got the app to run stably for a few hours but then ran into the dreaded java.lang.Exception: Could not compute split, block input-0-1423761240800 not found. Wonder if I need to add RDD persistence back? Also, I am reaching out to Virdata with some ProServ inquiries. Thanks On Thu, Feb 12, 2015 at 4:30 AM, Gerard Maas gerard.m...@gmail.com wrote: Hi Tim, From this: There are 5 kafka receivers and each incoming stream is split into 40 partitions I suspect that you're creating too many tasks for Spark to process on time. Could you try some of the 'knobs' I describe here to see if that would help? http://www.virdata.com/tuning-spark/ -kr, Gerard. On Thu, Feb 12, 2015 at 8:44 AM, Tim Smith secs...@gmail.com wrote: Just read the thread Are these numbers abnormal for spark streaming? and I think I am seeing similar results - that is - increasing the window seems to be the trick here. I will have to monitor for a few hours/days before I can conclude (there are so many knobs/dials). On Wed, Feb 11, 2015 at 11:16 PM, Tim Smith secs...@gmail.com wrote: On Spark 1.2 (have been seeing this behaviour since 1.0), I have a streaming app that consumes data from Kafka and writes it back to Kafka (different topic). My big problem has been Total Delay. While execution time is usually window size (in seconds), the total delay ranges from a minutes to hours(s) (keeps going up). For a little while, I thought I had solved the issue by bumping up the driver memory. Then I expanded my Kafka cluster to add more nodes and the issue came up again. I tried a few things to smoke out the issue and something tells me the driver is the bottleneck again: 1) From my app, I took out the entire write-out-to-kafka piece. Sure enough, execution, scheduling delay and hence total delay fell to sub second. This assured me that whatever processing I do before writing back to kafka isn't the bottleneck. 2) In my app, I had RDD persistence set at different points but my code wasn't really re-using any RDDs so I took out all explicit persist() statements. And added, spar...unpersist to true in the context. After this, it doesn't seem to matter how much memory I give my executor, the total delay seems to be in the same range. I tried per executor memory from 2G to 12G with no change in total delay so executors aren't memory starved. Also, in the SparkUI, under the Executors tab, all executors show 0/1060MB used when per executor memory is set to 2GB, for example. 3) Input rate in the kafka consumer restricts spikes in incoming data. 4) Tried FIFO and FAIR but didn't make any difference. 5) Adding executors beyond a certain points seems useless (I guess excess ones just sit idle). At any given point in time, the SparkUI shows only one batch pending processing. So with just one batch pending processing, why would the scheduling delay run into minutes/hours if execution time is within the batch window duration? There aren't any failed stages or jobs. Right now, I have 100 executors ( i have tried setting executors from 50-150), each with 2GB and 4 cores and the driver running with 16GB. There are 5 kafka receivers and each incoming stream is split into 40 partitions. Per receiver, input rate is restricted to 2 messages per second. Can anyone help me with clues or areas to look into, for troubleshooting the issue? One nugget I found buried in the code says: The scheduler delay includes the network delay to send the task to the worker machine and to send back the result (but not the time to fetch the task result, if it needed to be fetched from the block manager on the worker).
Re: Streaming scheduling delay
1) Yes, if I disable writing out to kafka and replace it with some very light weight action is rdd.take(1), the app is stable. 2) The partitions I spoke of in the previous mail are the number of partitions I create from each dStream. But yes, since I do processing and writing out, per partition, each dStream partition ends up getting written to a kafka partition. Flow is, broadly: 5 Spark/Kafka Receivers - Split each dStream into 30 partitions (150 partitions) - Apply some transformation logic to each partition - write out each partition to kafka (kafka has 23 partitions). Let me increase the number of partitions on the kafka side and see if that helps. Here's what the main block of code looks like (I added persistence back): val kInStreams = (1 to otherConf(inStreamCount).toInt).map{_ = KafkaUtils.createStream(ssc,otherConf(kafkaConsumerZk).toString,otherConf(kafkaConsumerGroupId).toString, Map(otherConf(kafkaConsumerTopic).toString - 1), StorageLevel.MEMORY_AND_DISK_SER) } if (!configMap.keySet.isEmpty) { // Process stream from each receiver separately // (do not attempt to merge all streams and then re-partition, this causes un-necessary and high amount of shuffle in the job) for (k - kInStreams) { // Re-partition stream from each receiver across all compute nodes to spread out processing load and allows per partition processing // and, set persistence level to spill to disk along with serialization val kInMsgParts = k.repartition(otherConf(dStreamPartitions).toInt).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER) val outdata = kInMsgParts.map(x=myfunc(x._2,configMap)).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER) // Write each transformed partition to Kafka via the writer object outdata.foreachRDD( rdd = rdd.foreachPartition(rec = { val writer = new KafkaOutputService(otherConf(kafkaProducerTopic).toString, propsMap) writer.output(rec) }) ) } } Here's the life-cycle of a lost block: 15/02/12 16:26:12 INFO BlockManagerInfo: Added input-4-1423758372200 in memory on nodedn1-23-acme.com:34526 (size: 164.7 KB, free: 379.5 MB) 15/02/12 16:26:12 INFO BlockManagerInfo: Added input-4-1423758372200 in memory on nodedn1-22-acme.com:42084 (size: 164.7 KB, free: 366.5 MB) 15/02/12 16:31:21 INFO BlockManagerInfo: Added input-4-1423758372200 on disk on nodedn1-22-acme.com:42084 (size: 164.7 KB) 15/02/12 16:31:23 INFO BlockManagerInfo: Added input-4-1423758372200 on disk on nodedn1-23-acme.com:34526 (size: 164.7 KB) 15/02/12 16:32:27 WARN TaskSetManager: Lost task 54.0 in stage 16291.0 (TID 1042569, nodedn1-13-acme.com): java.lang.Exception: Could not compute split, block input-4-1423758372200 not found 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.1 in stage 16291.0 (TID 1042575) on executor nodedn1-21-acme.com: java.lang.Exception (Could not compute split, block input-4-1423758372200 not found) [duplicate 1] 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.2 in stage 16291.0 (TID 1042581) on executor nodedn1-21-acme.com: java.lang.Exception (Could not compute split, block input-4-1423758372200 not found) [duplicate 2] 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.3 in stage 16291.0 (TID 1042584) on executor nodedn1-20-acme.com: java.lang.Exception (Could not compute split, block input-4-1423758372200 not found) [duplicate 3] 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.4 in stage 16291.0 (TID 1042586) on executor nodedn1-11-acme.com: java.lang.Exception (Could not compute split, block input-4-1423758372200 not found) [duplicate 4] 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.5 in stage 16291.0 (TID 1042589) on executor nodedn1-14-acme.com: java.lang.Exception (Could not compute split, block input-4-1423758372200 not found) [duplicate 5] 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.6 in stage 16291.0 (TID 1042594) on executor nodedn1-15-acme.com: java.lang.Exception (Could not compute split, block input-4-1423758372200 not found) [duplicate 6] 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.7 in stage 16291.0 (TID 1042597) on executor nodedn1-20-acme.com: java.lang.Exception (Could not compute split, block input-4-1423758372200 not found) [duplicate 7] 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.8 in stage 16291.0 (TID 1042600) on executor nodedn1-17-acme.com: java.lang.Exception (Could not compute split, block input-4-1423758372200 not found) [duplicate 8] 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.9 in stage 16291.0 (TID 1042606) on executor nodedn1-20-acme.com: java.lang.Exception (Could not compute split, block input-4-1423758372200 not found) [duplicate 9] 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.10 in stage 16291.0 (TID 1042609) on executor
Re: Streaming scheduling delay
outdata.foreachRDD( rdd = rdd.foreachPartition(rec = { val writer = new KafkaOutputService(otherConf(kafkaProducerTopic).toString, propsMap) writer.output(rec) }) ) So this is creating a new kafka producer for every new output partition, right? Have you tried pooling the producers? On Thu, Feb 12, 2015 at 10:52 PM, Tim Smith secs...@gmail.com wrote: 1) Yes, if I disable writing out to kafka and replace it with some very light weight action is rdd.take(1), the app is stable. 2) The partitions I spoke of in the previous mail are the number of partitions I create from each dStream. But yes, since I do processing and writing out, per partition, each dStream partition ends up getting written to a kafka partition. Flow is, broadly: 5 Spark/Kafka Receivers - Split each dStream into 30 partitions (150 partitions) - Apply some transformation logic to each partition - write out each partition to kafka (kafka has 23 partitions). Let me increase the number of partitions on the kafka side and see if that helps. Here's what the main block of code looks like (I added persistence back): val kInStreams = (1 to otherConf(inStreamCount).toInt).map{_ = KafkaUtils.createStream(ssc,otherConf(kafkaConsumerZk).toString,otherConf(kafkaConsumerGroupId).toString, Map(otherConf(kafkaConsumerTopic).toString - 1), StorageLevel.MEMORY_AND_DISK_SER) } if (!configMap.keySet.isEmpty) { // Process stream from each receiver separately // (do not attempt to merge all streams and then re-partition, this causes un-necessary and high amount of shuffle in the job) for (k - kInStreams) { // Re-partition stream from each receiver across all compute nodes to spread out processing load and allows per partition processing // and, set persistence level to spill to disk along with serialization val kInMsgParts = k.repartition(otherConf(dStreamPartitions).toInt).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER) val outdata = kInMsgParts.map(x=myfunc(x._2,configMap)).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER) // Write each transformed partition to Kafka via the writer object outdata.foreachRDD( rdd = rdd.foreachPartition(rec = { val writer = new KafkaOutputService(otherConf(kafkaProducerTopic).toString, propsMap) writer.output(rec) }) ) } } Here's the life-cycle of a lost block: 15/02/12 16:26:12 INFO BlockManagerInfo: Added input-4-1423758372200 in memory on nodedn1-23-acme.com:34526 (size: 164.7 KB, free: 379.5 MB) 15/02/12 16:26:12 INFO BlockManagerInfo: Added input-4-1423758372200 in memory on nodedn1-22-acme.com:42084 (size: 164.7 KB, free: 366.5 MB) 15/02/12 16:31:21 INFO BlockManagerInfo: Added input-4-1423758372200 on disk on nodedn1-22-acme.com:42084 (size: 164.7 KB) 15/02/12 16:31:23 INFO BlockManagerInfo: Added input-4-1423758372200 on disk on nodedn1-23-acme.com:34526 (size: 164.7 KB) 15/02/12 16:32:27 WARN TaskSetManager: Lost task 54.0 in stage 16291.0 (TID 1042569, nodedn1-13-acme.com): java.lang.Exception: Could not compute split, block input-4-1423758372200 not found 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.1 in stage 16291.0 (TID 1042575) on executor nodedn1-21-acme.com: java.lang.Exception (Could not compute split, block input-4-1423758372200 not found) [duplicate 1] 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.2 in stage 16291.0 (TID 1042581) on executor nodedn1-21-acme.com: java.lang.Exception (Could not compute split, block input-4-1423758372200 not found) [duplicate 2] 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.3 in stage 16291.0 (TID 1042584) on executor nodedn1-20-acme.com: java.lang.Exception (Could not compute split, block input-4-1423758372200 not found) [duplicate 3] 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.4 in stage 16291.0 (TID 1042586) on executor nodedn1-11-acme.com: java.lang.Exception (Could not compute split, block input-4-1423758372200 not found) [duplicate 4] 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.5 in stage 16291.0 (TID 1042589) on executor nodedn1-14-acme.com: java.lang.Exception (Could not compute split, block input-4-1423758372200 not found) [duplicate 5] 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.6 in stage 16291.0 (TID 1042594) on executor nodedn1-15-acme.com: java.lang.Exception (Could not compute split, block input-4-1423758372200 not found) [duplicate 6] 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.7 in stage 16291.0 (TID 1042597) on executor nodedn1-20-acme.com: java.lang.Exception (Could not compute split, block input-4-1423758372200 not found) [duplicate 7] 15/02/12 16:32:27 INFO
Re: No executors allocated on yarn with latest master branch
No, not submitting from windows, from a debian distribution. Had a quick look at the rm logs, and it seems some containers are allocated but then released again for some reason. Not easy to make sense of the logs, but here is a snippet from the logs (from a test in our small test cluster) if you'd like to have a closer look: http://pastebin.com/8WU9ivqC Sandy, sounds like it could possible be a 2.2 issue then, or what do you think? Thanks, Anders On Thu, Feb 12, 2015 at 3:11 PM, Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: This is tricky to debug. Check logs of node and resource manager of YARN to see if you can trace the error. In the past I have to closely look at arguments getting passed to YARN container (they get logged before attempting to launch containers). If I still don't get a clue, I had to check the script generated by YARN to execute the container and even run manually to trace at what line the error has occurred. BTW are you submitting the job from windows? On Thu, Feb 12, 2015, 3:34 PM Anders Arpteg arp...@spotify.com wrote: Interesting to hear that it works for you. Are you using Yarn 2.2 as well? No strange log message during startup, and can't see any other log messages since no executer gets launched. Does not seems to work in yarn-client mode either, failing with the exception below. Exception in thread main org.apache.spark.SparkException: Yarn application has already ended! It might have been killed or unable to launch application master. at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:119) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:59) at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:141) at org.apache.spark.SparkContext.init(SparkContext.scala:370) at com.spotify.analytics.AnalyticsSparkContext.init(AnalyticsSparkContext.scala:8) at com.spotify.analytics.DataSampler$.main(DataSampler.scala:42) at com.spotify.analytics.DataSampler.main(DataSampler.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:551) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:155) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:178) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:99) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) /Anders On Thu, Feb 12, 2015 at 1:33 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Anders, I just tried this out and was able to successfully acquire executors. Any strange log messages or additional color you can provide on your setup? Does yarn-client mode work? -Sandy On Wed, Feb 11, 2015 at 1:28 PM, Anders Arpteg arp...@spotify.com wrote: Hi, Compiled the latest master of Spark yesterday (2015-02-10) for Hadoop 2.2 and failed executing jobs in yarn-cluster mode for that build. Works successfully with spark 1.2 (and also master from 2015-01-16), so something has changed since then that prevents the job from receiving any executors on the cluster. Basic symptoms are that the jobs fires up the AM, but after examining the executors page in the web ui, only the driver is listed, no executors are ever received, and the driver keep waiting forever. Has anyone seemed similar problems? Thanks for any insights, Anders
Re: Can spark job server be used to visualize streaming data?
One method I’ve used is to publish each batch to a message bus or queue with a custom UI listening on the other end, displaying the results in d3.js or some other app. As far as I’m aware there isn’t a tool that will directly take a DStream. Spark Notebook seems to have some support for updating graphs periodically. I haven’t used it myself yet so not sure how well it works. See here: https://github.com/andypetrella/spark-notebook From: Su She Date: Thursday, February 12, 2015 at 1:55 AM To: Felix C Cc: Kelvin Chu, user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Can spark job server be used to visualize streaming data? Hello Felix, I am already streaming in very simple data using Kafka (few messages / second, each record only has 3 columns...really simple, but looking to scale once I connect everything). I am processing it in Spark Streaming and am currently writing word counts to hdfs. So the part where I am confused is... Kafka Publishes Data - Kafka Consumer/Spark Streaming Receives Data - Spark Word Count - How do I visualize? is there a viz tool that I can set up to visualize JavaPairDStreams? or do I have to write to hbase/hdfs first? Thanks! On Wed, Feb 11, 2015 at 10:39 PM, Felix C felixcheun...@hotmail.commailto:felixcheun...@hotmail.com wrote: What kind of data do you have? Kafka is a popular source to use with spark streaming. But, spark streaming also support reading from a file. Its called basic source https://spark.apache.org/docs/latest/streaming-programming-guide.html#input-dstreams-and-receivers --- Original Message --- From: Su She suhsheka...@gmail.commailto:suhsheka...@gmail.com Sent: February 11, 2015 10:23 AM To: Felix C felixcheun...@hotmail.commailto:felixcheun...@hotmail.com Cc: Kelvin Chu 2dot7kel...@gmail.commailto:2dot7kel...@gmail.com, user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Can spark job server be used to visualize streaming data? Thank you Felix and Kelvin. I think I'll def be using the k-means tools in mlib. It seems the best way to stream data is by storing in hbase and then using an api in my viz to extract data? Does anyone have any thoughts on this? Thanks! On Tue, Feb 10, 2015 at 11:45 PM, Felix C felixcheun...@hotmail.commailto:felixcheun...@hotmail.com wrote: Checkout https://databricks.com/blog/2015/01/28/introducing-streaming-k-means-in-spark-1-2.html In there are links to how that is done. --- Original Message --- From: Kelvin Chu 2dot7kel...@gmail.commailto:2dot7kel...@gmail.com Sent: February 10, 2015 12:48 PM To: Su She suhsheka...@gmail.commailto:suhsheka...@gmail.com Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Can spark job server be used to visualize streaming data? Hi Su, Out of the box, no. But, I know people integrate it with Spark Streaming to do real-time visualization. It will take some work though. Kelvin On Mon, Feb 9, 2015 at 5:04 PM, Su She suhsheka...@gmail.commailto:suhsheka...@gmail.com wrote: Hello Everyone, I was reading this blog post: http://homes.esat.kuleuven.be/~bioiuser/blog/a-d3-visualisation-from-spark-as-a-service/ and was wondering if this approach can be taken to visualize streaming data...not just historical data? Thank you! -Suh
Re: Master dies after program finishes normally
Increasing your driver memory might help. Thanks Best Regards On Fri, Feb 13, 2015 at 12:09 AM, Manas Kar manasdebashis...@gmail.com wrote: Hi, I have a Hidden Markov Model running with 200MB data. Once the program finishes (i.e. all stages/jobs are done) the program hangs for 20 minutes or so before killing master. In the spark master the following log appears. 2015-02-12 13:00:05,035 ERROR akka.actor.ActorSystemImpl: Uncaught fatal error from thread [sparkMaster-akka.actor.default-dispatcher-31] shutting down ActorSystem [sparkMaster] java.lang.OutOfMemoryError: GC overhead limit exceeded at scala.collection.immutable.List$.newBuilder(List.scala:396) at scala.collection.generic.GenericTraversableTemplate$class.genericBuilder(GenericTraversableTemplate.scala:69) at scala.collection.AbstractTraversable.genericBuilder(Traversable.scala:105) at scala.collection.generic.GenTraversableFactory$GenericCanBuildFrom.apply(GenTraversableFactory.scala:58) at scala.collection.generic.GenTraversableFactory$GenericCanBuildFrom.apply(GenTraversableFactory.scala:53) at scala.collection.TraversableLike$class.builder$1(TraversableLike.scala:239) at scala.collection.TraversableLike$class.map(TraversableLike.scala:243) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.json4s.MonadicJValue$$anonfun$org$json4s$MonadicJValue$$findDirectByName$1.apply(MonadicJValue.scala:26) at org.json4s.MonadicJValue$$anonfun$org$json4s$MonadicJValue$$findDirectByName$1.apply(MonadicJValue.scala:22) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at org.json4s.MonadicJValue.org $json4s$MonadicJValue$$findDirectByName(MonadicJValue.scala:22) at org.json4s.MonadicJValue.$bslash(MonadicJValue.scala:16) at org.apache.spark.util.JsonProtocol$.taskStartFromJson(JsonProtocol.scala:450) at org.apache.spark.util.JsonProtocol$.sparkEventFromJson(JsonProtocol.scala:423) at org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2$$anonfun$apply$1.apply(ReplayListenerBus.scala:71) at org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2$$anonfun$apply$1.apply(ReplayListenerBus.scala:69) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:69) at org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:55) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:55) at org.apache.spark.deploy.master.Master.rebuildSparkUI(Master.scala:726) at org.apache.spark.deploy.master.Master.removeApplication(Master.scala:675) at org.apache.spark.deploy.master.Master.finishApplication(Master.scala:653) at org.apache.spark.deploy.master.Master$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$29.apply(Master.scala:399) Can anyone help? ..Manas
Why are there different parts in my CSV?
Hello Everyone, I am writing simple word counts to hdfs using messages.saveAsHadoopFiles(hdfs://user/ec2-user/,csv,String.class, String.class, (Class) TextOutputFormat.class); 1) However, each 2 seconds I getting a new *directory *that is titled as a csv. So i'll have test.csv, which will be a directory that has two files inside of it called part-0 and part 1 (something like that). This obv makes it very hard for me to read the data stored in the csv files. I am wondering if there is a better way to store the JavaPairRecieverDStream and JavaPairDStream? 2) I know there is a copy/merge hadoop api for merging files...can this be done inside java? I am not sure the logic behind this api if I am using spark streaming which is constantly making new files. Thanks a lot for the help!
An interesting and serious problem I encountered
Hi foks, My Spark cluster has 8 machines, each of which has 377GB physical memory, and thus the total maximum memory can be used for Spark is more than 2400+GB. In my program, I have to deal with 1 billion of (key, value) pairs, where the key is an integer and the value is an integer array with 43 elements. Therefore, the memory cost of this raw dataset is [(1+43) * 10 * 4] / (1024 * 1024 * 1024) = 164GB. Since I have to use this dataset repeatedly, I have to cache it in memory. Some key parameter settings are: spark.storage.fraction=0.6 spark.driver.memory=30GB spark.executor.memory=310GB. But it failed on running a simple countByKey() and the error message is java.lang.OutOfMemoryError: Java heap space Does this mean a Spark cluster of 2400+GB memory cannot keep 164GB raw data in memory? The codes of my program is as follows: def main(args: Array[String]):Unit = { val sc = new SparkContext(new SparkConfig()); val rdd = sc.parallelize(0 until 10, 25600).map(i = (i, new Array[Int](43))).cache(); println(The number of keys is + rdd.countByKey()); //some other operations following here ... } To figure out the issue, I evaluated the memory cost of key-value pairs and computed their memory cost using SizeOf.jar. The codes are as follows: val arr = new Array[Int](43); println(SizeOf.humanReadable(SizeOf.deepSizeOf(arr))); val tuple = (1, arr.clone); println(SizeOf.humanReadable(SizeOf.deepSizeOf(tuple))); The output is: 192.0b 992.0b *Hard to believe, but it is true!! This result means, to store a key-value pair, Tuple2 needs more than 5+ times memory than the simplest method with array. Even though it may take 5+ times memory, its size is less than 1000GB, which is still much less than the total memory size of my cluster, i.e., 2400+GB. I really do not understand why this happened.* BTW, if the number of pairs is 1 million, it works well. If the arr contains only 1 integer, to store a pair, Tuples needs around 10 times memory. So I have some questions: 1. Why does Spark choose such a poor data structure, Tuple2, for key-value pairs? Is there any better data structure for storing (key, value) pairs with less memory cost ? 2. Given a dataset with size of M, in general Spark how many times of memory to handle it? Best, Landmark -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/An-interesting-and-serious-problem-I-encountered-tp21637.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Streaming scheduling delay
I replaced the writeToKafka statements with a rdd.count() and sure enough, I have a stable app with total delay well within the batch window (20 seconds). Here's the total delay lines from the driver log: 15/02/13 06:14:26 INFO JobScheduler: Total delay: 6.521 s for time 142380806 ms (execution: 6.404 s) 15/02/13 06:15:22 INFO JobScheduler: Total delay: 42.396 s for time 142380808 ms (execution: 42.338 s) 15/02/13 06:16:21 INFO JobScheduler: Total delay: 81.879 s for time 142380810 ms (execution: 59.483 s) 15/02/13 06:16:40 INFO JobScheduler: Total delay: 80.242 s for time 142380812 ms (execution: 18.363 s) 15/02/13 06:16:50 INFO JobScheduler: Total delay: 70.342 s for time 142380814 ms (execution: 10.100 s) 15/02/13 06:16:56 INFO JobScheduler: Total delay: 56.551 s for time 142380816 ms (execution: 6.209 s) 15/02/13 06:17:06 INFO JobScheduler: Total delay: 46.405 s for time 142380818 ms (execution: 9.854 s) 15/02/13 06:17:13 INFO JobScheduler: Total delay: 33.443 s for time 142380820 ms (execution: 7.038 s) 15/02/13 06:17:21 INFO JobScheduler: Total delay: 21.483 s for time 142380822 ms (execution: 8.039 s) 15/02/13 06:17:26 INFO JobScheduler: Total delay: 6.697 s for time 142380824 ms (execution: 5.213 s) 15/02/13 06:17:45 INFO JobScheduler: Total delay: 5.814 s for time 142380826 ms (execution: 5.767 s) 15/02/13 06:18:06 INFO JobScheduler: Total delay: 6.905 s for time 142380828 ms (execution: 6.858 s) 15/02/13 06:18:28 INFO JobScheduler: Total delay: 8.604 s for time 142380830 ms (execution: 8.556 s) 15/02/13 06:18:45 INFO JobScheduler: Total delay: 5.631 s for time 142380832 ms (execution: 5.583 s) 15/02/13 06:19:04 INFO JobScheduler: Total delay: 4.838 s for time 142380834 ms (execution: 4.791 s) 15/02/13 06:19:24 INFO JobScheduler: Total delay: 4.467 s for time 142380836 ms (execution: 4.422 s) 15/02/13 06:19:45 INFO JobScheduler: Total delay: 5.779 s for time 142380838 ms (execution: 5.733 s) 15/02/13 06:20:04 INFO JobScheduler: Total delay: 4.747 s for time 142380840 ms (execution: 4.701 s) 15/02/13 06:20:24 INFO JobScheduler: Total delay: 4.829 s for time 142380842 ms (execution: 4.782 s) 15/02/13 06:20:44 INFO JobScheduler: Total delay: 4.724 s for time 142380844 ms (execution: 4.678 s) 15/02/13 06:21:04 INFO JobScheduler: Total delay: 4.110 s for time 142380846 ms (execution: 4.064 s) 15/02/13 06:21:24 INFO JobScheduler: Total delay: 4.562 s for time 142380848 ms (execution: 4.514 s) 15/02/13 06:21:43 INFO JobScheduler: Total delay: 3.999 s for time 142380850 ms (execution: 3.954 s) 15/02/13 06:22:04 INFO JobScheduler: Total delay: 4.353 s for time 142380852 ms (execution: 4.309 s) 15/02/13 06:22:24 INFO JobScheduler: Total delay: 4.712 s for time 142380854 ms (execution: 4.667 s) 15/02/13 06:22:44 INFO JobScheduler: Total delay: 4.726 s for time 142380856 ms (execution: 4.681 s) 15/02/13 06:23:07 INFO JobScheduler: Total delay: 7.860 s for time 142380858 ms (execution: 7.816 s) 15/02/13 06:23:28 INFO JobScheduler: Total delay: 8.426 s for time 142380860 ms (execution: 8.383 s) 15/02/13 06:23:43 INFO JobScheduler: Total delay: 3.857 s for time 142380862 ms (execution: 3.814 s) 15/02/13 06:24:03 INFO JobScheduler: Total delay: 3.936 s for time 142380864 ms (execution: 3.892 s) 15/02/13 06:24:23 INFO JobScheduler: Total delay: 3.810 s for time 142380866 ms (execution: 3.767 s) 15/02/13 06:24:43 INFO JobScheduler: Total delay: 3.889 s for time 142380868 ms (execution: 3.845 s) 15/02/13 06:25:03 INFO JobScheduler: Total delay: 3.553 s for time 142380870 ms (execution: 3.510 s) 15/02/13 06:25:27 INFO JobScheduler: Total delay: 7.031 s for time 142380872 ms (execution: 6.989 s) 15/02/13 06:25:43 INFO JobScheduler: Total delay: 3.636 s for time 142380874 ms (execution: 3.594 s) 15/02/13 06:26:03 INFO JobScheduler: Total delay: 3.425 s for time 142380876 ms (execution: 3.383 s) 15/02/13 06:26:23 INFO JobScheduler: Total delay: 3.939 s for time 142380878 ms (execution: 3.897 s) 15/02/13 06:26:43 INFO JobScheduler: Total delay: 3.640 s for time 142380880 ms (execution: 3.596 s) 15/02/13 06:27:03 INFO JobScheduler: Total delay: 3.905 s for time 142380882 ms (execution: 3.861 s) 15/02/13 06:27:24 INFO JobScheduler: Total delay: 4.068 s for time 142380884 ms (execution: 4.026 s) On Thu, Feb 12, 2015 at 9:54 PM, Tim Smith secs...@gmail.com wrote: TD - I will try count() and report back. Meanwhile, attached is the entire driver log that includes the error logs about missing blocks. Cody - Let me research a bit about how to do connection pooling. Sorry, I am not really a programmer. I did see the connection pooling advise in the Spark Streaming Programming guide as an optimization but wasn't sure how to implement it. But do you think it will have a significant impact on performance? Saisai - I think, ideally, I'd rather not do any
How to sum up the values in the columns of a dataset in Scala?
I am new to Scala. I have a dataset with many columns, each column has a column name. Given several column names (these column names are not fixed, they are generated dynamically), I need to sum up the values of these columns. Is there an efficient way of doing this? I worked out a way by using for loop, but I don't think it is efficient: val AllLabels = List(ID, val1, val2, val3, val4) val lbla = List(val1, val3, val4) val index_lbla = lbla.map(x = AllLabels.indexOf(x)) val dataRDD = sc.textFile(../test.csv).map(_.split(,)) dataRDD.map(x= { var sum = 0.0 for (i - 1 to index_lbla.length) sum = sum + x(i).toDouble sum } ).collect The test.csv looks like below (without column names): ID, val1, val2, val3, val4 A, 123, 523, 534, 893 B, 536, 98, 1623, 98472 C, 537, 89, 83640, 9265 D, 7297, 98364, 9, 735 ... Your help is very much appreciated! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-sum-up-the-values-in-the-columns-of-a-dataset-in-Scala-tp21639.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Streaming scheduling delay
Yes, you can try it. For example, if you have a cluster of 10 executors, 60 Kafka partitions, you can try to choose 10 receivers * 2 consumer threads, so each thread will consume 3 partitions ideally, if you increase the threads to 6, each threads will consume 1 partitions ideally. What I think importantly is that each executor will have a receiver, so the data will be distributed to each executor. If you have a large cluster even number of executors are more than the Kafka partitions, maybe you need to increase the Kafka partitions to increase the parallelism, otherwise some of the computation resources may be idle. Besides if executors * consumers Kafka partitions, the left consumers beyond partition numbers will be idle, each partition could only be consumed by one consumer. We have a in house benchmark cluster with such deploy criterion, I'm not sure if it works for you, you can try it. Thanks Saisai 2015-02-13 15:19 GMT+08:00 Tim Smith secs...@gmail.com: Hi Saisai, If I understand correctly, you are suggesting that control parallelism by having number of consumers/executors at least 1:1 for number of kafka partitions. For example, if I have 50 partitions for a kafka topic then either have: - 25 or more executors, 25 receivers, each receiver set to 2 consumer threads per topic, or, - 50 or more executors, 50 receivers, each receiver set to 1 consumer thread per topic Actually, both executors and total consumers can be more than the number of kafka partitions (some will probably sit idle). But do away with dStream partitioning altogether. Right? Thanks, - Tim On Thu, Feb 12, 2015 at 11:03 PM, Saisai Shao sai.sai.s...@gmail.com wrote: Hi Tim, I think maybe you can try this way: create Receiver per executor and specify thread for each topic large than 1, and the total number of consumer thread will be: total consumer = (receiver number) * (thread number), and make sure this total consumer is less than or equal to Kafka partition number. In this case, I think the parallelism is enough, received blocks are distributed to each executor. So you don't need to repartition to increase the parallelism. Besides for Kafka's high-level API, Kafka partitions may not be equally distributed to all the receivers, so some tasks may process more data than other tasks. another way you can try DirectKafkaInputDStream in Spark 1.3, that will be more balanced because each Kafka partition mapping to Spark partition. Besides set partition count to 1 for each dStream means dstream.repartition(1) ? If so I think it will still introduce shuffle and move all the data into one partition. Thanks Saisai 2015-02-13 13:54 GMT+08:00 Tim Smith secs...@gmail.com: TD - I will try count() and report back. Meanwhile, attached is the entire driver log that includes the error logs about missing blocks. Cody - Let me research a bit about how to do connection pooling. Sorry, I am not really a programmer. I did see the connection pooling advise in the Spark Streaming Programming guide as an optimization but wasn't sure how to implement it. But do you think it will have a significant impact on performance? Saisai - I think, ideally, I'd rather not do any dStream partitioning. Instead have 1 receiver for each kafka partition (so in this case 23 receivers for 23 kafka partitions) and then have as many or more executors to handle processing of the dStreams. Right? Trouble is, I tried this approach and didn't work. Even If I set 23 receivers, and set partition count to 1 for each dStream (effectively, no stream splitting), my performance is extremely poor/laggy. Should I modify my code to remove dStream partitioning altogether and then try setting as many receivers as kafka partitions? On Thu, Feb 12, 2015 at 9:45 PM, Saisai Shao sai.sai.s...@gmail.com wrote: Hi Tim, I think this code will still introduce shuffle even when you call repartition on each input stream. Actually this style of implementation will generate more jobs (job per each input stream) than union into one stream as called DStream.union(), and union normally has no special overhead as I understood. Also as Cody said, creating Producer per partition could be a potential overhead, producer pool or sharing the Producer for one executor might be better :). // Process stream from each receiver separately // (do not attempt to merge all streams and then re-partition, this causes un-necessary and high amount of shuffle in the job) for (k - kInStreams) { // Re-partition stream from each receiver across all compute nodes to spread out processing load and allows per partition processing // and, set persistence level to spill to disk along with serialization val kInMsgParts = k.repartition(otherConf(dStreamPartitions).toInt). 2015-02-13 13:27 GMT+08:00 Cody Koeninger c...@koeninger.org:
Re: Why are there different parts in my CSV?
For streaming application, for every batch it will create a new directory and puts the data in it. If you don't want to have multiple files inside the directory as part- then you can do a repartition before the saveAs* call. messages.repartition(1).saveAsHadoopFiles(hdfs://user/ec2-user/,csv,String.class, String.class, (Class) TextOutputFormat.class); Thanks Best Regards On Fri, Feb 13, 2015 at 11:59 AM, Su She suhsheka...@gmail.com wrote: Hello Everyone, I am writing simple word counts to hdfs using messages.saveAsHadoopFiles(hdfs://user/ec2-user/,csv,String.class, String.class, (Class) TextOutputFormat.class); 1) However, each 2 seconds I getting a new *directory *that is titled as a csv. So i'll have test.csv, which will be a directory that has two files inside of it called part-0 and part 1 (something like that). This obv makes it very hard for me to read the data stored in the csv files. I am wondering if there is a better way to store the JavaPairRecieverDStream and JavaPairDStream? 2) I know there is a copy/merge hadoop api for merging files...can this be done inside java? I am not sure the logic behind this api if I am using spark streaming which is constantly making new files. Thanks a lot for the help!
Re: saveAsHadoopFile is not a member of ... RDD[(String, MyObject)]
Thank's for reply. I solved porblem with importing org.apache.spark.SparkContext._ by Imran Rashid suggestion. In the sake of interest, does JavaPairRDD intended for use from java? What is the purpose of this class? Does my rdd implicitly converted to it in some circumstances? 2015-02-12 19:42 GMT+04:00 Ted Yu yuzhih...@gmail.com: You can use JavaPairRDD which has: override def wrapRDD(rdd: RDD[(K, V)]): JavaPairRDD[K, V] = JavaPairRDD.fromRDD(rdd) Cheers On Thu, Feb 12, 2015 at 7:36 AM, Vladimir Protsenko protsenk...@gmail.com wrote: Hi. I am stuck with how to save file to hdfs from spark. I have written MyOutputFormat extends FileOutputFormatString, MyObject, then in spark calling this: rddres.saveAsHadoopFile[MyOutputFormat](hdfs://localhost/output) or rddres.saveAsHadoopFile(hdfs://localhost/output, classOf[String], classOf[MyObject], classOf[MyOutputFormat]) where rddres is RDD[(String, MyObject)] from up of transformation pipeline. Compilation error is: /value saveAsHadoopFile is not a member of org.apache.spark.rdd.RDD[(String, vlpr.MyObject)]/. Could someone give me insights on what could be done here to make it working? Why it is not a member? Because of wrong types? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/saveAsHadoopFile-is-not-a-member-of-RDD-String-MyObject-tp21627.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Using Spark SQL for temporal data
I haven't been paying close attention to the JIRA tickets for PrunedFilteredScan but I noticed some weird behavior around the filters being applied when OR expressions were used in the WHERE clause. From what I was seeing, it looks like it could be possible that the start and end ranges you are proposing to place in the WHERE clause could actually never be pushed down to the PrunedFilteredScan if there's an OR expression in there, like: (start 2014-12-01 and end 2015-02-12) or (). I haven't done a unit test for this case yet, but I did file SPARK-5296 because of the behavior I was seeing. I'm requiring a time range in the services I'm writing because without it, the full Accumulo table would be scanned- and that's no good. Ah, I see. Right now we only split up and pass down conjunctive (and) predicates that can be expressed in the limited set of filters so far. We can easily add OR if it works for your use case. It'll be up to the data source however to recurse down the ORs and either pass multiple time ranges to accumulo or union multiple RDDs together to return them. Lets discuss more on the JIRA. Are there any plans on making the CatalystScan public in the near future (possibly once SparkSQL reaches the proposed stability in 1.3?) No, it'll remain public so people can experiment with it, but it is unlikely it'll ever have the same stability guarantees that the Spark public API does. This is primarily due to its dependence on the whole catalyst expression hierarchy. Instead I'd like to add to the other scan filters / interfaces that can provide useful information to the data sources.
Re: 8080 port password protection
Just to add to what Arush said, you can go through these links: http://stackoverflow.com/questions/1162375/apache-port-proxy http://serverfault.com/questions/153229/password-protect-and-serve-apache-site-by-port-number Thanks Best Regards On Thu, Feb 12, 2015 at 10:43 PM, Arush Kharbanda ar...@sigmoidanalytics.com wrote: You could apply a password using a filter using a server. Though it dosnt looks like the right grp for the question. It can be done for spark also for Spark UI. On Thu, Feb 12, 2015 at 10:19 PM, MASTER_ZION (Jairo Linux) master.z...@gmail.com wrote: Hi everyone, Im creating a development machine in AWS and i would like to protect the port 8080 using a password. Is it possible? Best Regards *Jairo Moreno* -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
Re: Can spark job server be used to visualize streaming data?
Thanks Kevin for the link, I have had issues trying to install zeppelin as I believe it is not yet supported for CDH 5.3, and Spark 1.2. Please correct me if I am mistaken. On Thu, Feb 12, 2015 at 7:33 PM, Kevin (Sangwoo) Kim kevin...@apache.org wrote: Apache Zeppelin also has a scheduler and then you can reload your chart periodically, Check it out: http://zeppelin.incubator.apache.org/docs/tutorial/tutorial.html On Fri Feb 13 2015 at 7:29:00 AM Silvio Fiorito silvio.fior...@granturing.com wrote: One method I’ve used is to publish each batch to a message bus or queue with a custom UI listening on the other end, displaying the results in d3.js or some other app. As far as I’m aware there isn’t a tool that will directly take a DStream. Spark Notebook seems to have some support for updating graphs periodically. I haven’t used it myself yet so not sure how well it works. See here: https://github.com/andypetrella/spark-notebook From: Su She Date: Thursday, February 12, 2015 at 1:55 AM To: Felix C Cc: Kelvin Chu, user@spark.apache.org Subject: Re: Can spark job server be used to visualize streaming data? Hello Felix, I am already streaming in very simple data using Kafka (few messages / second, each record only has 3 columns...really simple, but looking to scale once I connect everything). I am processing it in Spark Streaming and am currently writing word counts to hdfs. So the part where I am confused is... Kafka Publishes Data - Kafka Consumer/Spark Streaming Receives Data - Spark Word Count - *How do I visualize?* is there a viz tool that I can set up to visualize JavaPairDStreams? or do I have to write to hbase/hdfs first? Thanks! On Wed, Feb 11, 2015 at 10:39 PM, Felix C felixcheun...@hotmail.com wrote: What kind of data do you have? Kafka is a popular source to use with spark streaming. But, spark streaming also support reading from a file. Its called basic source https://spark.apache.org/docs/latest/streaming-programming-guide.html#input-dstreams-and-receivers --- Original Message --- From: Su She suhsheka...@gmail.com Sent: February 11, 2015 10:23 AM To: Felix C felixcheun...@hotmail.com Cc: Kelvin Chu 2dot7kel...@gmail.com, user@spark.apache.org Subject: Re: Can spark job server be used to visualize streaming data? Thank you Felix and Kelvin. I think I'll def be using the k-means tools in mlib. It seems the best way to stream data is by storing in hbase and then using an api in my viz to extract data? Does anyone have any thoughts on this? Thanks! On Tue, Feb 10, 2015 at 11:45 PM, Felix C felixcheun...@hotmail.com wrote: Checkout https://databricks.com/blog/2015/01/28/introducing-streaming-k-means-in-spark-1-2.html In there are links to how that is done. --- Original Message --- From: Kelvin Chu 2dot7kel...@gmail.com Sent: February 10, 2015 12:48 PM To: Su She suhsheka...@gmail.com Cc: user@spark.apache.org Subject: Re: Can spark job server be used to visualize streaming data? Hi Su, Out of the box, no. But, I know people integrate it with Spark Streaming to do real-time visualization. It will take some work though. Kelvin On Mon, Feb 9, 2015 at 5:04 PM, Su She suhsheka...@gmail.com wrote: Hello Everyone, I was reading this blog post: http://homes.esat.kuleuven.be/~bioiuser/blog/a-d3-visualisation-from-spark-as-a-service/ and was wondering if this approach can be taken to visualize streaming data...not just historical data? Thank you! -Suh
Re: Streaming scheduling delay
Hi Tim, I think maybe you can try this way: create Receiver per executor and specify thread for each topic large than 1, and the total number of consumer thread will be: total consumer = (receiver number) * (thread number), and make sure this total consumer is less than or equal to Kafka partition number. In this case, I think the parallelism is enough, received blocks are distributed to each executor. So you don't need to repartition to increase the parallelism. Besides for Kafka's high-level API, Kafka partitions may not be equally distributed to all the receivers, so some tasks may process more data than other tasks. another way you can try DirectKafkaInputDStream in Spark 1.3, that will be more balanced because each Kafka partition mapping to Spark partition. Besides set partition count to 1 for each dStream means dstream.repartition(1) ? If so I think it will still introduce shuffle and move all the data into one partition. Thanks Saisai 2015-02-13 13:54 GMT+08:00 Tim Smith secs...@gmail.com: TD - I will try count() and report back. Meanwhile, attached is the entire driver log that includes the error logs about missing blocks. Cody - Let me research a bit about how to do connection pooling. Sorry, I am not really a programmer. I did see the connection pooling advise in the Spark Streaming Programming guide as an optimization but wasn't sure how to implement it. But do you think it will have a significant impact on performance? Saisai - I think, ideally, I'd rather not do any dStream partitioning. Instead have 1 receiver for each kafka partition (so in this case 23 receivers for 23 kafka partitions) and then have as many or more executors to handle processing of the dStreams. Right? Trouble is, I tried this approach and didn't work. Even If I set 23 receivers, and set partition count to 1 for each dStream (effectively, no stream splitting), my performance is extremely poor/laggy. Should I modify my code to remove dStream partitioning altogether and then try setting as many receivers as kafka partitions? On Thu, Feb 12, 2015 at 9:45 PM, Saisai Shao sai.sai.s...@gmail.com wrote: Hi Tim, I think this code will still introduce shuffle even when you call repartition on each input stream. Actually this style of implementation will generate more jobs (job per each input stream) than union into one stream as called DStream.union(), and union normally has no special overhead as I understood. Also as Cody said, creating Producer per partition could be a potential overhead, producer pool or sharing the Producer for one executor might be better :). // Process stream from each receiver separately // (do not attempt to merge all streams and then re-partition, this causes un-necessary and high amount of shuffle in the job) for (k - kInStreams) { // Re-partition stream from each receiver across all compute nodes to spread out processing load and allows per partition processing // and, set persistence level to spill to disk along with serialization val kInMsgParts = k.repartition(otherConf(dStreamPartitions).toInt). 2015-02-13 13:27 GMT+08:00 Cody Koeninger c...@koeninger.org: outdata.foreachRDD( rdd = rdd.foreachPartition(rec = { val writer = new KafkaOutputService(otherConf(kafkaProducerTopic).toString, propsMap) writer.output(rec) }) ) So this is creating a new kafka producer for every new output partition, right? Have you tried pooling the producers? On Thu, Feb 12, 2015 at 10:52 PM, Tim Smith secs...@gmail.com wrote: 1) Yes, if I disable writing out to kafka and replace it with some very light weight action is rdd.take(1), the app is stable. 2) The partitions I spoke of in the previous mail are the number of partitions I create from each dStream. But yes, since I do processing and writing out, per partition, each dStream partition ends up getting written to a kafka partition. Flow is, broadly: 5 Spark/Kafka Receivers - Split each dStream into 30 partitions (150 partitions) - Apply some transformation logic to each partition - write out each partition to kafka (kafka has 23 partitions). Let me increase the number of partitions on the kafka side and see if that helps. Here's what the main block of code looks like (I added persistence back): val kInStreams = (1 to otherConf(inStreamCount).toInt).map{_ = KafkaUtils.createStream(ssc,otherConf(kafkaConsumerZk).toString,otherConf(kafkaConsumerGroupId).toString, Map(otherConf(kafkaConsumerTopic).toString - 1), StorageLevel.MEMORY_AND_DISK_SER) } if (!configMap.keySet.isEmpty) { // Process stream from each receiver separately // (do not attempt to merge all streams and then re-partition, this causes un-necessary and high amount of shuffle in the job) for (k - kInStreams) { // Re-partition stream from each
Re: Streaming scheduling delay
Hi Saisai, If I understand correctly, you are suggesting that control parallelism by having number of consumers/executors at least 1:1 for number of kafka partitions. For example, if I have 50 partitions for a kafka topic then either have: - 25 or more executors, 25 receivers, each receiver set to 2 consumer threads per topic, or, - 50 or more executors, 50 receivers, each receiver set to 1 consumer thread per topic Actually, both executors and total consumers can be more than the number of kafka partitions (some will probably sit idle). But do away with dStream partitioning altogether. Right? Thanks, - Tim On Thu, Feb 12, 2015 at 11:03 PM, Saisai Shao sai.sai.s...@gmail.com wrote: Hi Tim, I think maybe you can try this way: create Receiver per executor and specify thread for each topic large than 1, and the total number of consumer thread will be: total consumer = (receiver number) * (thread number), and make sure this total consumer is less than or equal to Kafka partition number. In this case, I think the parallelism is enough, received blocks are distributed to each executor. So you don't need to repartition to increase the parallelism. Besides for Kafka's high-level API, Kafka partitions may not be equally distributed to all the receivers, so some tasks may process more data than other tasks. another way you can try DirectKafkaInputDStream in Spark 1.3, that will be more balanced because each Kafka partition mapping to Spark partition. Besides set partition count to 1 for each dStream means dstream.repartition(1) ? If so I think it will still introduce shuffle and move all the data into one partition. Thanks Saisai 2015-02-13 13:54 GMT+08:00 Tim Smith secs...@gmail.com: TD - I will try count() and report back. Meanwhile, attached is the entire driver log that includes the error logs about missing blocks. Cody - Let me research a bit about how to do connection pooling. Sorry, I am not really a programmer. I did see the connection pooling advise in the Spark Streaming Programming guide as an optimization but wasn't sure how to implement it. But do you think it will have a significant impact on performance? Saisai - I think, ideally, I'd rather not do any dStream partitioning. Instead have 1 receiver for each kafka partition (so in this case 23 receivers for 23 kafka partitions) and then have as many or more executors to handle processing of the dStreams. Right? Trouble is, I tried this approach and didn't work. Even If I set 23 receivers, and set partition count to 1 for each dStream (effectively, no stream splitting), my performance is extremely poor/laggy. Should I modify my code to remove dStream partitioning altogether and then try setting as many receivers as kafka partitions? On Thu, Feb 12, 2015 at 9:45 PM, Saisai Shao sai.sai.s...@gmail.com wrote: Hi Tim, I think this code will still introduce shuffle even when you call repartition on each input stream. Actually this style of implementation will generate more jobs (job per each input stream) than union into one stream as called DStream.union(), and union normally has no special overhead as I understood. Also as Cody said, creating Producer per partition could be a potential overhead, producer pool or sharing the Producer for one executor might be better :). // Process stream from each receiver separately // (do not attempt to merge all streams and then re-partition, this causes un-necessary and high amount of shuffle in the job) for (k - kInStreams) { // Re-partition stream from each receiver across all compute nodes to spread out processing load and allows per partition processing // and, set persistence level to spill to disk along with serialization val kInMsgParts = k.repartition(otherConf(dStreamPartitions).toInt). 2015-02-13 13:27 GMT+08:00 Cody Koeninger c...@koeninger.org: outdata.foreachRDD( rdd = rdd.foreachPartition(rec = { val writer = new KafkaOutputService(otherConf(kafkaProducerTopic).toString, propsMap) writer.output(rec) }) ) So this is creating a new kafka producer for every new output partition, right? Have you tried pooling the producers? On Thu, Feb 12, 2015 at 10:52 PM, Tim Smith secs...@gmail.com wrote: 1) Yes, if I disable writing out to kafka and replace it with some very light weight action is rdd.take(1), the app is stable. 2) The partitions I spoke of in the previous mail are the number of partitions I create from each dStream. But yes, since I do processing and writing out, per partition, each dStream partition ends up getting written to a kafka partition. Flow is, broadly: 5 Spark/Kafka Receivers - Split each dStream into 30 partitions (150 partitions) - Apply some transformation logic to each partition - write out each partition to kafka (kafka has 23 partitions). Let me increase the number of
Re: OutOfMemoryError with ramdom forest and small training dataset
Looking at the script, I'm not sure whether --driver-memory is supposed to work in standalone client mode. It's too late to set the driver's memory if the driver is what's already running. It specially handles the case where the value is the environment config though. Not sure, this might be on purpose. On Thu, Feb 12, 2015 at 1:16 PM, poiuytrez guilla...@databerries.com wrote: Very interesting. It works. When I set SPARK_DRIVER_MEMORY=83971m in spark-env.sh or spark-default.conf it works. However, when I set the --driver-memory option with spark submit, the memory is not allocated to the spark master. (the web ui shows the correct value of spark.driver.memory (83971m) but the memory is not correctly allocated as we can see on the webui executor page). I am going to file an issue in the bug tracker. Thank you for your help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/OutOfMemoryError-with-ramdom-forest-and-small-training-dataset-tp21598p21620.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
apply function to all the elements of a rowMatrix
Hi, I need to apply a function to all the elements of a rowMatrix. How can I do that? Here there is a more detailed question http://stackoverflow.com/questions/28438908/spark-mllib-apply-function-to-all-the-elements-of-a-rowmatrix Thanks a lot! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/apply-function-to-all-the-elements-of-a-rowMatrix-tp21622.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Invoking updateStateByKey twice on the same RDD
Can I invoke UpdateStateByKey twice on the same RDD. My requirement is as follows. 1. Get the event stream from Kafka 2. UpdateStateByKey to aggregate and filter events based on timestamp 3. Do some processing and save results to Cassandra DB 4. UpdateStateByKey to remove keys based on logout eventType. I tried doing it assigning results from step 2 to a VAR and reassigning it to the updated value in step 4. But seems it does not work that way. I am new to spark and not sure how this kind of behaviour is possible. Appreciate any help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Invoking-updateStateByKey-twice-on-the-same-RDD-tp21623.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: No executors allocated on yarn with latest master branch
This is tricky to debug. Check logs of node and resource manager of YARN to see if you can trace the error. In the past I have to closely look at arguments getting passed to YARN container (they get logged before attempting to launch containers). If I still don't get a clue, I had to check the script generated by YARN to execute the container and even run manually to trace at what line the error has occurred. BTW are you submitting the job from windows? On Thu, Feb 12, 2015, 3:34 PM Anders Arpteg arp...@spotify.com wrote: Interesting to hear that it works for you. Are you using Yarn 2.2 as well? No strange log message during startup, and can't see any other log messages since no executer gets launched. Does not seems to work in yarn-client mode either, failing with the exception below. Exception in thread main org.apache.spark.SparkException: Yarn application has already ended! It might have been killed or unable to launch application master. at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:119) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:59) at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:141) at org.apache.spark.SparkContext.init(SparkContext.scala:370) at com.spotify.analytics.AnalyticsSparkContext.init(AnalyticsSparkContext.scala:8) at com.spotify.analytics.DataSampler$.main(DataSampler.scala:42) at com.spotify.analytics.DataSampler.main(DataSampler.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:551) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:155) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:178) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:99) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) /Anders On Thu, Feb 12, 2015 at 1:33 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Anders, I just tried this out and was able to successfully acquire executors. Any strange log messages or additional color you can provide on your setup? Does yarn-client mode work? -Sandy On Wed, Feb 11, 2015 at 1:28 PM, Anders Arpteg arp...@spotify.com wrote: Hi, Compiled the latest master of Spark yesterday (2015-02-10) for Hadoop 2.2 and failed executing jobs in yarn-cluster mode for that build. Works successfully with spark 1.2 (and also master from 2015-01-16), so something has changed since then that prevents the job from receiving any executors on the cluster. Basic symptoms are that the jobs fires up the AM, but after examining the executors page in the web ui, only the driver is listed, no executors are ever received, and the driver keep waiting forever. Has anyone seemed similar problems? Thanks for any insights, Anders
Re: OutOfMemoryError with ramdom forest and small training dataset
Very interesting. It works. When I set SPARK_DRIVER_MEMORY=83971m in spark-env.sh or spark-default.conf it works. However, when I set the --driver-memory option with spark submit, the memory is not allocated to the spark master. (the web ui shows the correct value of spark.driver.memory (83971m) but the memory is not correctly allocated as we can see on the webui executor page). I am going to file an issue in the bug tracker. Thank you for your help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/OutOfMemoryError-with-ramdom-forest-and-small-training-dataset-tp21598p21620.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: pyspark: Java null pointer exception when accessing broadcast variables
Hi again, I narrowed down the issue a bit more -- it seems to have to do with the Kryo serializer. When I use it, then this results in a Null Pointer: rdd = sc.parallelize(range(10)) d = {} from random import random for i in range(10) : d[i] = random() rdd.map(lambda x: d[x]).collect() --- Py4JJavaError Traceback (most recent call last) ipython-input-97-7cd5df24206c in module() 1 rdd.map(lambda x: d[x]).collect() /cluster/home/roskarr/spark-1.2.0-bin-hadoop2.4/python/pyspark/rdd.pyc in collect(self) 674 675 with SCCallSiteSync(self.context) as css: -- 676 bytesInJava = self._jrdd.collect().iterator() 677 return list(self._collect_iterator_through_file(bytesInJava)) 678 /cluster/home/roskarr/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args) 536 answer = self.gateway_client.send_command(command) 537 return_value = get_return_value(answer, self.gateway_client, -- 538 self.target_id, self.name) 539 540 for temp_arg in temp_args: /cluster/home/roskarr/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 298 raise Py4JJavaError( 299 'An error occurred while calling {0}{1}{2}.\n'. -- 300 format(target_id, '.', name), value) 301 else: 302 raise Py4JError( Py4JJavaError: An error occurred while calling o768.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage 0.0 (TID 87, e1305.hpc-lca.ethz.ch): java.lang.NullPointerException at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:590) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:233) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(PythonRDD.scala:229) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:229) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:204) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:204) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1460) at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:203) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) If I use a dictionary with fewer items, then it works fine: In [98]: rdd = sc.parallelize(range(10)) d = {} from random import random for i in range(1) : d[i] = random() In [99]: rdd.map(lambda x: d[x]).collect() Out[99]:
Re: Shuffle on joining two RDDs
Hi, I believe that partitionBy will use the same (default) partitioner on both RDDs. On 2015-02-12 17:12, Sean Owen wrote: Doesn't this require that both RDDs have the same partitioner? On Thu, Feb 12, 2015 at 3:48 PM, Imran Rashid iras...@cloudera.com wrote: Hi Karlson, I think your assumptions are correct -- that join alone shouldn't require any shuffling. But its possible you are getting tripped up by lazy evaluation of RDDs. After you do your partitionBy, are you sure those RDDs are actually materialized cached somewhere? eg., if you just did this: val rddA = someData.partitionBy(N) val rddB = someOtherData.partitionBy(N) val joinedRdd = rddA.join(rddB) joinedRdd.count() //or any other action then the partitioning isn't actually getting run until you do the join. So though the join itself can happen without partitioning, joinedRdd.count() will trigger the evaluation of rddA rddB which will require shuffles. Note that even if you have some intervening action on rddA rddB that shuffles them, unless you persist the result, you will need to reshuffle them for the join. If this doesn't help explain things, for debugging joinedRdd.getPartitions.foreach{println} this is getting into the weeds, but at least this will tell us whether or not you are getting narrow dependencies, which would avoid the shuffle. (Does anyone know of a simpler way to check this?) hope this helps, Imran On Thu, Feb 12, 2015 at 9:25 AM, Karlson ksonsp...@siberie.de wrote: Hi All, using Pyspark, I create two RDDs (one with about 2M records (~200MB), the other with about 8M records (~2GB)) of the format (key, value). I've done a partitionBy(num_partitions) on both RDDs and verified that both RDDs have the same number of partitions and that equal keys reside on the same partition (via mapPartitionsWithIndex). Now I'd expect that for a join on the two RDDs no shuffling is necessary. Looking at the Web UI under http://driver:4040 however reveals that that assumption is false. In fact I am seeing shuffle writes of about 200MB and reads of about 50MB. What's the explanation for that behaviour? Where am I wrong with my assumption? Thanks in advance, Karlson - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Test
Sent from my iPhone - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Use of nscala-time within spark-shell
Hi All, Thanks in advance for your help. I have timestamp which I need to convert to datetime using scala. A folder contains the three needed jar files: joda-convert-1.5.jar joda-time-2.4.jar nscala-time_2.11-1.8.0.jar Using scala REPL and adding the jars: scala -classpath *.jar I can use nscala-time like following: scala import com.github.nscala_time.time.Imports._ import com.github.nscala_time.time.Imports._ scala import org.joda._ import org.joda._ scala DateTime.now res0: org.joda.time.DateTime = 2015-02-12T15:51:46.928+01:00 But when i try to use spark-shell: ADD_JARS=/home/scala_test_class/nscala-time_2.11-1.8.0.jar,/home/scala_test_class/joda-time-2.4.jar,/home/scala_test_class/joda-convert-1.5.jar /usr/local/spark/bin/spark-shell --master local --driver-memory 2g --executor-memory 2g --executor-cores 1 It successfully imports the jars: scala import com.github.nscala_time.time.Imports._ import com.github.nscala_time.time.Imports._ scala import org.joda._ import org.joda._ but fails using them scala DateTime.now java.lang.NoSuchMethodError: scala.Predef$.$conforms()Lscala/Predef$$less$colon$less; at com.github.nscala_time.time.LowPriorityOrderingImplicits$class.ReadableInstantOrdering(Implicits.scala:69) at com.github.nscala_time.time.Imports$.ReadableInstantOrdering(Imports.scala:20) at com.github.nscala_time.time.OrderingImplicits$class.$init$(Implicits.scala:61) at com.github.nscala_time.time.Imports$.init(Imports.scala:20) at com.github.nscala_time.time.Imports$.clinit(Imports.scala) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:17) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:22) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:24) at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:26) at $iwC$$iwC$$iwC$$iwC.init(console:28) at $iwC$$iwC$$iwC.init(console:30) at $iwC$$iwC.init(console:32) at $iwC.init(console:34) at init(console:36) at .init(console:40) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:873) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:628) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:968) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Your help is very aappreciated, Regards, Hammam -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Use-of-nscala-time-within-spark-shell-tp21624.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
saveAsHadoopFile is not a member of ... RDD[(String, MyObject)]
Hi. I am stuck with how to save file to hdfs from spark. I have written MyOutputFormat extends FileOutputFormatString, MyObject, then in spark calling this: rddres.saveAsHadoopFile[MyOutputFormat](hdfs://localhost/output) or rddres.saveAsHadoopFile(hdfs://localhost/output, classOf[String], classOf[MyObject], classOf[MyOutputFormat]) where rddres is RDD[(String, MyObject)] from up of transformation pipeline. Compilation error is: /value saveAsHadoopFile is not a member of org.apache.spark.rdd.RDD[(String, vlpr.MyObject)]/. Could someone give me insights on what could be done here to make it working? Why it is not a member? Because of wrong types? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/saveAsHadoopFile-is-not-a-member-of-RDD-String-MyObject-tp21627.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Is it possible to expose SchemaRDD’s from thrift server?
I have a question with regards to accessing SchemaRDD’s and Spark SQL temp tables via the thrift server. It appears that a SchemaRDD when created is only available in the local namespace / context and are unavailable to external services accessing Spark through thrift server via ODBC; is this correct? Does the same apply to temp tables? If we process data on Spark how is it exposed to the thrift server for access by third party BI applications via ODBC? Dose one need to have two spark context, one for processing, then dump it to metastore from which a third party application can fetch the data or is it possible to expose the resulting SchemaRDD via the thrift server? I am trying to do this with Tableau, Spark SQL Connector. From what I can see I need the spark context for processing and then dump to metastore. Is it possible to access the resulting SchemaRDD from doing something like this: create temporary table test using org.apache.spark.sql.json options (path ‘/data/json/*'); cache table test; I am using Spark 1.2.1. If not available now will it be in 1.3.x? Or is the only way to achieve this is store into the metastore and does the imply hive. -Todd
Shuffle on joining two RDDs
Hi All, using Pyspark, I create two RDDs (one with about 2M records (~200MB), the other with about 8M records (~2GB)) of the format (key, value). I've done a partitionBy(num_partitions) on both RDDs and verified that both RDDs have the same number of partitions and that equal keys reside on the same partition (via mapPartitionsWithIndex). Now I'd expect that for a join on the two RDDs no shuffling is necessary. Looking at the Web UI under http://driver:4040 however reveals that that assumption is false. In fact I am seeing shuffle writes of about 200MB and reads of about 50MB. What's the explanation for that behaviour? Where am I wrong with my assumption? Thanks in advance, Karlson - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark mllib error when predict on linear regression model
Hi, I have a model and I am trying to predict regPoints. Here is the code that I have used. A more detailed question is available at http://stackoverflow.com/questions/28482476/spark-mllib-predict-error-with-map scala model res26: org.apache.spark.mllib.regression.LinearRegressionModel = (weights=[-4.00245512323736E-15,-7.110058964543731E-15,2.0790436644401968E-15,1.7497510523275056E-15,6.593638326021273E-15], intercept=0.0) scala regPoints res27: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MappedRDD[32] at map at console:54 //ERROR scala val y_predicted = regPoints map (point = model.predict(point.features)) 15/02/12 16:14:45 INFO BlockManager: Removing broadcast 285 15/02/12 16:14:45 INFO BlockManager: Removing block broadcast_285_piece0 15/.. Thanks a lot, Luca -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-mllib-error-when-predict-on-linear-regression-model-tp21629.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: 8080 port password protection
You could apply a password using a filter using a server. Though it dosnt looks like the right grp for the question. It can be done for spark also for Spark UI. On Thu, Feb 12, 2015 at 10:19 PM, MASTER_ZION (Jairo Linux) master.z...@gmail.com wrote: Hi everyone, Im creating a development machine in AWS and i would like to protect the port 8080 using a password. Is it possible? Best Regards *Jairo Moreno* -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com