Re: Accumulators / Accumulables : thread-local, task-local, executor-local ?
Hi, Thank you for this confirmation. Coalescing is what we do now. It creates, however, very big partitions. Guillaume Hey, I am not 100% sure but from my understanding accumulators are per partition (so per task as its the same) and are sent back to the driver with the task result and merged. When a task needs to be run n times (multiple rdds depend on this one, some partition loss later in the chain etc) then the accumulator will count n times the values from that task. So in short I don't think you'd win from using an accumulator over what you are doing right now. You could maybe coalesce your rdd to num-executors without a shuffle and then update the sketches. You should endup with 1 partition per executor thus 1 sketch per executor. You could then increase the number of threads per task if you can use the sketches concurrently. Eugen 2015-06-18 13:36 GMT+02:00 Guillaume Pitel guillaume.pi...@exensa.com mailto:guillaume.pi...@exensa.com: Hi, I'm trying to figure out the smartest way to implement a global count-min-sketch on accumulators. For now, we are doing that with RDDs. It works well, but with one sketch per partition, merging takes too long. As you probably know, a count-min sketch is a big mutable array of array of ints. To distribute it, all sketches must have the same size. Since it can be big, and since merging is not free, I would like to minimize the number of sketches and maximize reuse and conccurent use of the sketches. Ideally, I would like to just have one sketch per worker. I think accumulables might be the right structures for that, but it seems that they are not shared between executors, or even between tasks. https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Accumulators.scala (289) /** * This thread-local map holds per-task copies of accumulators; it is used to collect the set * of accumulator updates to send back to the driver when tasks complete. After tasks complete, * this map is cleared by `Accumulators.clear()` (see Executor.scala). */ private val localAccums = new ThreadLocal[Map[Long, Accumulable[_, _]]]() { override protected def initialValue() = Map[Long, Accumulable[_, _]]() } The localAccums stores an accumulator for each task (it's thread local, so I assume each task have a unique thread on executors) If I understand correctly, each time a task starts, an accumulator is initialized locally, updated, then sent back to the driver for merging ? So I guess, accumulators may not be the way to go, finally. Any advice ? Guillaume -- eXenSa *Guillaume PITEL, Président* +33(0)626 222 431 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705 -- eXenSa *Guillaume PITEL, Président* +33(0)626 222 431 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705
Re: kafka spark streaming working example
.setMaster(local) set it to local[2] or local[*] Thanks Best Regards On Thu, Jun 18, 2015 at 5:59 PM, Bartek Radziszewski bar...@scalaric.com wrote: hi, I'm trying to run simple kafka spark streaming example over spark-shell: sc.stop import org.apache.spark.SparkConf import org.apache.spark.SparkContext._ import kafka.serializer.DefaultDecoder import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka._ import org.apache.spark.storage.StorageLevel val sparkConf = new SparkConf().setAppName(Summarizer).setMaster(local) val ssc = new StreamingContext(sparkConf, Seconds(10)) val kafkaParams = Map[String, String](zookeeper.connect - 127.0.0.1:2181, group.id - test) val messages = KafkaUtils.createStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](ssc, kafkaParams, Map(test - 1), StorageLevel.MEMORY_ONLY_SER).map(_._2) messages.foreachRDD { pairRDD = println(sDataListener.listen() [pairRDD = ${pairRDD}]) println(sDataListener.listen() [pairRDD.count = ${pairRDD.count()}]) pairRDD.foreach(row = println(sDataListener.listen() [row = ${row}])) } ssc.start() ssc.awaitTermination() in spark output i'm able to find only following println log: println(sDataListener.listen() [pairRDD = ${pairRDD}]) but unfortunately can't find output of: println(sDataListener.listen() [pairRDD.count = ${pairRDD.count()}]) and println(sDataListener.listen() [row = ${row}]) it's my spark-shell full output - http://pastebin.com/sfxbYYga any ideas what i'm doing wrong? thanks!
Re: RE: Spark or Storm
I am wondering how direct stream api ensures end-to-end exactly once semantics I think there are two things involved: 1. From the spark streaming end, the driver will replay the Offset range when it's down and restarted,which means that the new tasks will process some already processed data. 2. From the user end, since tasks may process already processed data, user end should detect that some data has already been processed,eg, use some unique ID. Not sure if I have understood correctly. bit1...@163.com From: prajod.vettiyat...@wipro.com Date: 2015-06-18 16:56 To: jrpi...@gmail.com; eshi...@gmail.com CC: wrbri...@gmail.com; asoni.le...@gmail.com; guha.a...@gmail.com; user@spark.apache.org; sateesh.kav...@gmail.com; sparkenthusi...@yahoo.in; sabarish.sasidha...@manthan.com Subject: RE: Spark or Storm not being able to read from Kafka using multiple nodes Kafka is plenty capable of doing this.. I faced the same issue before Spark 1.3 was released. The issue was not with Kafka, but with Spark Streaming’s Kafka connector. Before Spark 1.3.0 release one Spark worker would get all the streamed messages. We had to re-partition to distribute the processing. From Spark 1.3.0 release the Spark Direct API for Kafka supported parallel reads from Kafka streamed to Spark workers. See the “Approach 2: Direct Approach” in this page: http://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html. Note that is also mentions zero data loss and exactly once semantics for kafka integration. Prajod From: Jordan Pilat [mailto:jrpi...@gmail.com] Sent: 18 June 2015 03:57 To: Enno Shioji Cc: Will Briggs; asoni.le...@gmail.com; ayan guha; user; Sateesh Kavuri; Spark Enthusiast; Sabarish Sasidharan Subject: Re: Spark or Storm not being able to read from Kafka using multiple nodes Kafka is plenty capable of doing this, by clustering together multiple consumer instances into a consumer group. If your topic is sufficiently partitioned, the consumer group can consume the topic in a parallelized fashion. If it isn't, you still have the fault tolerance associated with clustering the consumers. OK JRP On Jun 17, 2015 1:27 AM, Enno Shioji eshi...@gmail.com wrote: We've evaluated Spark Streaming vs. Storm and ended up sticking with Storm. Some of the important draw backs are: Spark has no back pressure (receiver rate limit can alleviate this to a certain point, but it's far from ideal) There is also no exactly-once semantics. (updateStateByKey can achieve this semantics, but is not practical if you have any significant amount of state because it does so by dumping the entire state on every checkpointing) There are also some minor drawbacks that I'm sure will be fixed quickly, like no task timeout, not being able to read from Kafka using multiple nodes, data loss hazard with Kafka. It's also not possible to attain very low latency in Spark, if that's what you need. The pos for Spark is the concise and IMO more intuitive syntax, especially if you compare it with Storm's Java API. I admit I might be a bit biased towards Storm tho as I'm more familiar with it. Also, you can do some processing with Kinesis. If all you need to do is straight forward transformation and you are reading from Kinesis to begin with, it might be an easier option to just do the transformation in Kinesis. On Wed, Jun 17, 2015 at 7:15 AM, Sabarish Sasidharan sabarish.sasidha...@manthan.com wrote: Whatever you write in bolts would be the logic you want to apply on your events. In Spark, that logic would be coded in map() or similar such transformations and/or actions. Spark doesn't enforce a structure for capturing your processing logic like Storm does. Regards Sab Probably overloading the question a bit. In Storm, Bolts have the functionality of getting triggered on events. Is that kind of functionality possible with Spark streaming? During each phase of the data processing, the transformed data is stored to the database and this transformed data should then be sent to a new pipeline for further processing How can this be achieved using Spark? On Wed, Jun 17, 2015 at 10:10 AM, Spark Enthusiast sparkenthusi...@yahoo.in wrote: I have a use-case where a stream of Incoming events have to be aggregated and joined to create Complex events. The aggregation will have to happen at an interval of 1 minute (or less). The pipeline is : send events enrich event Upstream services --- KAFKA - event Stream Processor Complex Event Processor Elastic Search. From what I understand, Storm will make a very good ESP and Spark Streaming will make a good CEP. But, we are also evaluating Storm with Trident. How does Spark Streaming compare with Storm with Trident? Sridhar Chellappa On Wednesday, 17 June 2015 10:02 AM, ayan guha guha.a...@gmail.com wrote: I
Re: Accumulators / Accumulables : thread-local, task-local, executor-local ?
I was thinking exactly the same. I'm going to try it, It doesn't really matter if I lose an executor, since its sketch will be lost, but then reexecuted somewhere else. And anyway, it's an approximate data structure, and what matters are ratios, not exact values. I mostly need to take care of the concurrency problem for my sketch. Guillaume Yeah thats the problem. There is probably some perfect num of partitions that provides the best balance between partition size and memory and merge overhead. Though it's not an ideal solution :( There could be another way but very hacky... for example if you store one sketch in a singleton per jvm (so per executor). Do a first pass over your data and update those. Then you trigger some other dummy operation that will just retrieve the sketches. Thats kind of a hack but should work. Note that if you loose an executor in between, then that doesn't work anymore, probably you could detect it and recompute the sketches, but it would become over complicated. 2015-06-18 14:27 GMT+02:00 Guillaume Pitel guillaume.pi...@exensa.com mailto:guillaume.pi...@exensa.com: Hi, Thank you for this confirmation. Coalescing is what we do now. It creates, however, very big partitions. Guillaume Hey, I am not 100% sure but from my understanding accumulators are per partition (so per task as its the same) and are sent back to the driver with the task result and merged. When a task needs to be run n times (multiple rdds depend on this one, some partition loss later in the chain etc) then the accumulator will count n times the values from that task. So in short I don't think you'd win from using an accumulator over what you are doing right now. You could maybe coalesce your rdd to num-executors without a shuffle and then update the sketches. You should endup with 1 partition per executor thus 1 sketch per executor. You could then increase the number of threads per task if you can use the sketches concurrently. Eugen 2015-06-18 13:36 GMT+02:00 Guillaume Pitel guillaume.pi...@exensa.com mailto:guillaume.pi...@exensa.com: Hi, I'm trying to figure out the smartest way to implement a global count-min-sketch on accumulators. For now, we are doing that with RDDs. It works well, but with one sketch per partition, merging takes too long. As you probably know, a count-min sketch is a big mutable array of array of ints. To distribute it, all sketches must have the same size. Since it can be big, and since merging is not free, I would like to minimize the number of sketches and maximize reuse and conccurent use of the sketches. Ideally, I would like to just have one sketch per worker. I think accumulables might be the right structures for that, but it seems that they are not shared between executors, or even between tasks. https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Accumulators.scala (289) /** * This thread-local map holds per-task copies of accumulators; it is used to collect the set * of accumulator updates to send back to the driver when tasks complete. After tasks complete, * this map is cleared by `Accumulators.clear()` (see Executor.scala). */ private val localAccums = new ThreadLocal[Map[Long, Accumulable[_, _]]]() { override protected def initialValue() = Map[Long, Accumulable[_, _]]() } The localAccums stores an accumulator for each task (it's thread local, so I assume each task have a unique thread on executors) If I understand correctly, each time a task starts, an accumulator is initialized locally, updated, then sent back to the driver for merging ? So I guess, accumulators may not be the way to go, finally. Any advice ? Guillaume -- eXenSa *Guillaume PITEL, Président* +33(0)626 222 431 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705 -- eXenSa *Guillaume PITEL, Président* +33(0)626 222 431 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705 -- eXenSa *Guillaume PITEL, Président* +33(0)626 222 431 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705
Best way to randomly distribute elements
Hello, In the context of a machine learning algorithm, I need to be able to randomly distribute the elements of a large RDD across partitions (i.e., essentially assign each element to a random partition). How could I achieve this? I have tried to call repartition() with the current number of partitions - but it seems to me that this moves only some of the elements, and in a deterministic way. I know this will be an expensive operation but I only need to perform it every once in a while. Thanks a lot! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Best-way-to-randomly-distribute-elements-tp23391.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Best way to randomly distribute elements
how about generating the key using some 1-way hashing like md5? On Thu, Jun 18, 2015 at 9:59 PM, Guillaume Pitel guillaume.pi...@exensa.com wrote: I think you can randomly reshuffle your elements just by emitting a random key (mapping a PairRdd's key triggers a reshuffle IIRC) yourrdd.map{ x = (rand(), x)} There is obiously a risk that rand() will give same sequence of numbers in each partition, so you may need to use mapPartitionsWithIndex first and seed your rand with the partition id (or compute your rand from a seed based on x). Guillaume Hello, In the context of a machine learning algorithm, I need to be able to randomly distribute the elements of a large RDD across partitions (i.e., essentially assign each element to a random partition). How could I achieve this? I have tried to call repartition() with the current number of partitions - but it seems to me that this moves only some of the elements, and in a deterministic way. I know this will be an expensive operation but I only need to perform it every once in a while. Thanks a lot! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Best-way-to-randomly-distribute-elements-tp23391.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- [image: eXenSa] *Guillaume PITEL, Président* +33(0)626 222 431 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705 -- Best Regards, Ayan Guha
Re: Accumulators / Accumulables : thread-local, task-local, executor-local ?
Hey, I am not 100% sure but from my understanding accumulators are per partition (so per task as its the same) and are sent back to the driver with the task result and merged. When a task needs to be run n times (multiple rdds depend on this one, some partition loss later in the chain etc) then the accumulator will count n times the values from that task. So in short I don't think you'd win from using an accumulator over what you are doing right now. You could maybe coalesce your rdd to num-executors without a shuffle and then update the sketches. You should endup with 1 partition per executor thus 1 sketch per executor. You could then increase the number of threads per task if you can use the sketches concurrently. Eugen 2015-06-18 13:36 GMT+02:00 Guillaume Pitel guillaume.pi...@exensa.com: Hi, I'm trying to figure out the smartest way to implement a global count-min-sketch on accumulators. For now, we are doing that with RDDs. It works well, but with one sketch per partition, merging takes too long. As you probably know, a count-min sketch is a big mutable array of array of ints. To distribute it, all sketches must have the same size. Since it can be big, and since merging is not free, I would like to minimize the number of sketches and maximize reuse and conccurent use of the sketches. Ideally, I would like to just have one sketch per worker. I think accumulables might be the right structures for that, but it seems that they are not shared between executors, or even between tasks. https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Accumulators.scala (289) /** * This thread-local map holds per-task copies of accumulators; it is used to collect the set * of accumulator updates to send back to the driver when tasks complete. After tasks complete, * this map is cleared by `Accumulators.clear()` (see Executor.scala). */ private val localAccums = new ThreadLocal[Map[Long, Accumulable[_, _]]]() { override protected def initialValue() = Map[Long, Accumulable[_, _]]() } The localAccums stores an accumulator for each task (it's thread local, so I assume each task have a unique thread on executors) If I understand correctly, each time a task starts, an accumulator is initialized locally, updated, then sent back to the driver for merging ? So I guess, accumulators may not be the way to go, finally. Any advice ? Guillaume -- [image: eXenSa] *Guillaume PITEL, Président* +33(0)626 222 431 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705
Re: deployment options for Spark and YARN w/ many app jar library dependencies
Thank you, Sandy! I'll investigate use of the extraClassPath variable. Both options are helpful. Thanks, Matt On Jun 17, 2015, at 8:01 PM, Sandy Ryza sandy.r...@cloudera.commailto:sandy.r...@cloudera.com wrote: Hi Matt, If you place your jars on HDFS in a public location, YARN will cache them on each node after the first download. You can also use the spark.executor.extraClassPath config to point to them. -Sandy On Wed, Jun 17, 2015 at 4:47 PM, Sweeney, Matt mswee...@fourv.commailto:mswee...@fourv.com wrote: Hi folks, I'm looking to deploy spark on YARN and I have read through the docs (https://spark.apache.org/docs/latest/running-on-yarn.html). One question that I still have is if there is an alternate means of including your own app jars as opposed to the process in the Adding Other Jars section of the docs. The app jars and dependencies that I need to include are significant in size (100s MBs) and I'd rather deploy them in advance onto the cluster nodes disk so that I don't have that overhead cost on the network for each spark-submit that is executed. Thanks in advance for your help! Matt
Re: Spark and Google Cloud Storage
I believe it is available here: https://cloud.google.com/hadoop/google-cloud-storage-connector 2015-06-18 15:31 GMT+02:00 Klaus Schaefers klaus.schaef...@ligatus.com: Hi, is there a kind adapter to use GoogleCloudStorage with Spark? Cheers, Klaus -- -- Klaus Schaefers Senior Optimization Manager Ligatus GmbH Hohenstaufenring 30-32 D-50674 Köln Tel.: +49 (0) 221 / 56939 -784 Fax: +49 (0) 221 / 56 939 - 599 E-Mail: klaus.schaef...@ligatus.com Web: www.ligatus.de HRB Köln 56003 Geschäftsführung: Dipl.-Kaufmann Lars Hasselbach, Dipl.-Kaufmann Klaus Ludemann, Dipl.-Wirtschaftsingenieur Arne Wolter
Re: Accumulators / Accumulables : thread-local, task-local, executor-local ?
BTW I suggest this instead of using thread locals as I am not sure in which situation spark will reuse or not them. For example if an error happens inside a thread, will spark then create a new one or the error is catched inside the thread preventing it to stop. So in short, does spark guarantee that the threads are being started at the begining and will last until the end of the jvm. 2015-06-18 15:32 GMT+02:00 Eugen Cepoi cepoi.eu...@gmail.com: 2015-06-18 15:17 GMT+02:00 Guillaume Pitel guillaume.pi...@exensa.com: I was thinking exactly the same. I'm going to try it, It doesn't really matter if I lose an executor, since its sketch will be lost, but then reexecuted somewhere else. I mean that between the action that will update the sketches and the action to collect/merge them you can loose an executor. So outside of sparks control. But it's probably an acceptable risk. And anyway, it's an approximate data structure, and what matters are ratios, not exact values. I mostly need to take care of the concurrency problem for my sketch. I think you could do something like: - Have this singleton that holds N sketch instances (one for each executor core) - Inside an operation over partitions (like forEachPartition/mapPartitions) - at the begin you ask the singleton to provide you with one instance to use, in a sync. fashion and pick it out from the N available instances or mark it as in use - when the iterator over the partition doesn't have more elements then you release this sketch - Then you can do something like sc.parallelize(Seq(...)).coalesce(numExecutors).map(pickTheSketches).reduce(blabla), but you will have to make sure that this will be executed over each executor (not sure if a dataset than executor num, will trigger an action on every executor) Please let me know what you end up doing, sounds interesting :) Eugen Guillaume Yeah thats the problem. There is probably some perfect num of partitions that provides the best balance between partition size and memory and merge overhead. Though it's not an ideal solution :( There could be another way but very hacky... for example if you store one sketch in a singleton per jvm (so per executor). Do a first pass over your data and update those. Then you trigger some other dummy operation that will just retrieve the sketches. Thats kind of a hack but should work. Note that if you loose an executor in between, then that doesn't work anymore, probably you could detect it and recompute the sketches, but it would become over complicated. 2015-06-18 14:27 GMT+02:00 Guillaume Pitel guillaume.pi...@exensa.com: Hi, Thank you for this confirmation. Coalescing is what we do now. It creates, however, very big partitions. Guillaume Hey, I am not 100% sure but from my understanding accumulators are per partition (so per task as its the same) and are sent back to the driver with the task result and merged. When a task needs to be run n times (multiple rdds depend on this one, some partition loss later in the chain etc) then the accumulator will count n times the values from that task. So in short I don't think you'd win from using an accumulator over what you are doing right now. You could maybe coalesce your rdd to num-executors without a shuffle and then update the sketches. You should endup with 1 partition per executor thus 1 sketch per executor. You could then increase the number of threads per task if you can use the sketches concurrently. Eugen 2015-06-18 13:36 GMT+02:00 Guillaume Pitel guillaume.pi...@exensa.com: Hi, I'm trying to figure out the smartest way to implement a global count-min-sketch on accumulators. For now, we are doing that with RDDs. It works well, but with one sketch per partition, merging takes too long. As you probably know, a count-min sketch is a big mutable array of array of ints. To distribute it, all sketches must have the same size. Since it can be big, and since merging is not free, I would like to minimize the number of sketches and maximize reuse and conccurent use of the sketches. Ideally, I would like to just have one sketch per worker. I think accumulables might be the right structures for that, but it seems that they are not shared between executors, or even between tasks. https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Accumulators.scala (289) /** * This thread-local map holds per-task copies of accumulators; it is used to collect the set * of accumulator updates to send back to the driver when tasks complete. After tasks complete, * this map is cleared by `Accumulators.clear()` (see Executor.scala). */ private val localAccums = new ThreadLocal[Map[Long, Accumulable[_, _]]]() { override protected def initialValue() = Map[Long, Accumulable[_, _]]() } The localAccums stores an accumulator for each task (it's thread local, so I assume each task have a
Re: Best way to randomly distribute elements
I think you can randomly reshuffle your elements just by emitting a random key (mapping a PairRdd's key triggers a reshuffle IIRC) yourrdd.map{ x = (rand(), x)} There is obiously a risk that rand() will give same sequence of numbers in each partition, so you may need to use mapPartitionsWithIndex first and seed your rand with the partition id (or compute your rand from a seed based on x). Guillaume Hello, In the context of a machine learning algorithm, I need to be able to randomly distribute the elements of a large RDD across partitions (i.e., essentially assign each element to a random partition). How could I achieve this? I have tried to call repartition() with the current number of partitions - but it seems to me that this moves only some of the elements, and in a deterministic way. I know this will be an expensive operation but I only need to perform it every once in a while. Thanks a lot! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Best-way-to-randomly-distribute-elements-tp23391.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- eXenSa *Guillaume PITEL, Président* +33(0)626 222 431 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705
Re: connect mobile app with Spark backend
Why not something like your mobile app pushes data to your webserver which pushes the data to Kafka or Cassandra or any other database and have a Spark streaming job running all the time operating on the incoming data and pushes the calculated values back. This way, you don't have to start a spark job for every user request. (Which will end up awful if you have thousands of user requests per second) Thanks Best Regards On Thu, Jun 18, 2015 at 4:33 PM, Ralph Bergmann ra...@dasralph.de wrote: Hi, I'm new to Spark and need some architecture tips :-) I need a way to connect the mobile app with the Spark backend to upload data to and download data from the Spark backend. The use case is that the user do something with the app. This changes are uploaded to the backend. Spark calculates something. If the user uses the app again it download the new calculated data. My plan is that the mobile app talks with a Jersey-Tomcat server and this Jersey-Tomcat server loads the data into Spark and starts the jobs. But what is the best way to upload the data to Spark and to start the job? Currently Jersey, Tomcat and Spark are on the same machine. I found this spark-jobserver[1] but I'm not sure if it is the right choise. The mobile app uploads a JSON. Jersey converts it into POJOs to do something with it. And than it converts it to JSON again to load it into Spark witch converts it to POJOs. I thought also about Spark streaming. But this means that this streaming stuff runs 24/7? [1] ... https://github.com/spark-jobserver/spark-jobserver -- Ralph Bergmann www http://www.dasralph.de | http://www.the4thFloor.eu mail ra...@dasralph.de skypedasralph facebook https://www.facebook.com/dasralph google+ https://plus.google.com/+RalphBergmann xing https://www.xing.com/profile/Ralph_Bergmann3 linkedin https://www.linkedin.com/in/ralphbergmann gulp https://www.gulp.de/Profil/RalphBergmann.html github https://github.com/the4thfloor pgp key id 0x421F9B78 pgp fingerprint CEE3 7AE9 07BE 98DF CD5A E69C F131 4A8E 421F 9B78 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streming yarn-cluster Mode Off-heap Memory Is Constantly Growing
Hi, We switched from ParallelGC to CMS, and the symptom is gone. On Thu, Jun 4, 2015 at 3:37 PM, Ji ZHANG zhangj...@gmail.com wrote: Hi, I set spark.shuffle.io.preferDirectBufs to false in SparkConf and this setting can be seen in web ui's environment tab. But, it still eats memory, i.e. -Xmx set to 512M but RES grows to 1.5G in half a day. On Wed, Jun 3, 2015 at 12:02 PM, Shixiong Zhu zsxw...@gmail.com wrote: Could you set spark.shuffle.io.preferDirectBufs to false to turn off the off-heap allocation of netty? Best Regards, Shixiong Zhu 2015-06-03 11:58 GMT+08:00 Ji ZHANG zhangj...@gmail.com: Hi, Thanks for you information. I'll give spark1.4 a try when it's released. On Wed, Jun 3, 2015 at 11:31 AM, Tathagata Das t...@databricks.com wrote: Could you try it out with Spark 1.4 RC3? Also pinging, Cloudera folks, they may be aware of something. BTW, the way I have debugged memory leaks in the past is as follows. Run with a small driver memory, say 1 GB. Periodically (maybe a script), take snapshots of histogram and also do memory dumps. Say every hour. And then compare the difference between two histo/dumps that are few hours separated (more the better). Diffing histo is easy. Diff two dumps can be done in JVisualVM, it will show the diff in the objects that got added in the later dump. That makes it easy to debug what is not getting cleaned. TD On Tue, Jun 2, 2015 at 7:33 PM, Ji ZHANG zhangj...@gmail.com wrote: Hi, Thanks for you reply. Here's the top 30 entries of jmap -histo:live result: num #instances #bytes class name -- 1: 40802 145083848 [B 2: 99264 12716112 methodKlass 3: 99264 12291480 constMethodKlass 4: 84729144816 constantPoolKlass 5: 84727625192 instanceKlassKlass 6: 1866097824 [Lscala.concurrent.forkjoin.ForkJoinTask; 7: 70454804832 constantPoolCacheKlass 8:1391684453376 java.util.HashMap$Entry 9: 94273542512 methodDataKlass 10:1413123391488 io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry 11:1354913251784 java.lang.Long 12: 261922765496 [C 13: 8131140560 [Ljava.util.HashMap$Entry; 14: 89971061936 java.lang.Class 15: 16022 851384 [[I 16: 16447 789456 java.util.zip.Inflater 17: 13855 723376 [S 18: 17282 691280 java.lang.ref.Finalizer 19: 25725 617400 java.lang.String 20: 320 570368 [Lio.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry; 21: 16066 514112 java.util.concurrent.ConcurrentHashMap$HashEntry 22: 12288 491520 org.jboss.netty.util.internal.ConcurrentIdentityHashMap$Segment 23: 13343 426976 java.util.concurrent.locks.ReentrantLock$NonfairSync 24: 12288 396416 [Lorg.jboss.netty.util.internal.ConcurrentIdentityHashMap$HashEntry; 25: 16447 394728 java.util.zip.ZStreamRef 26: 565 370080 [I 27: 508 272288 objArrayKlassKlass 28: 16233 259728 java.lang.Object 29: 771 209232 [Ljava.util.concurrent.ConcurrentHashMap$HashEntry; 30: 2524 192312 [Ljava.lang.Object; But as I mentioned above, the heap memory seems OK, the extra memory is consumed by some off-heap data. I can't find a way to figure out what is in there. Besides, I did some extra experiments, i.e. run the same program in difference environments to test whether it has off-heap memory issue: spark1.0 + standalone = no spark1.0 + yarn = no spark1.3 + standalone = no spark1.3 + yarn = yes I'm using CDH5.1, so the spark1.0 is provided by cdh, and spark-1.3.1-bin-hadoop2.3 is downloaded from the official website. I could use spark1.0 + yarn, but I can't find a way to handle the logs, level and rolling, so it'll explode the harddrive. Currently I'll stick to spark1.0 + standalone, until our ops team decides to upgrade cdh. On Tue, Jun 2, 2015 at 2:58 PM, Tathagata Das t...@databricks.com wrote: While you are running is it possible for you login into the YARN node and get histograms of live objects using jmap -histo:live. That may reveal something. On Thursday, May 28, 2015, Ji ZHANG zhangj...@gmail.com wrote: Hi, Unfortunately, they're still growing, both driver and executors. I run the same job with local mode, everything is fine. On Thu, May 28, 2015 at 5:26 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Can you replace your counting part with this? logs.filter(_.s_id 0).foreachRDD(rdd = logger.info(rdd.count())) Thanks Best Regards On Thu, May 28,
Issue with PySpark UDF on a column of Vectors
I am having trouble using a UDF on a column of Vectors in PySpark which can be illustrated here: from pyspark import SparkContext from pyspark.sql import Row from pyspark.sql.types import DoubleType from pyspark.sql.functions import udf from pyspark.mllib.linalg import Vectors FeatureRow = Row('id', 'features') data = sc.parallelize([(0, Vectors.dense([9.7, 1.0, -3.2])), (1, Vectors.dense([2.25, -11.1, 123.2])), (2, Vectors.dense([-7.2, 1.0, -3.2]))]) df = data.map(lambda r: FeatureRow(*r)).toDF() vector_udf = udf(lambda vector: sum(vector), DoubleType()) df.withColumn('feature_sums', vector_udf(df.features)).first() This fails with the following stack trace: Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 31.0 failed 1 times, most recent failure: Lost task 5.0 in stage 31.0 (TID 95, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File /Users/colin/src/spark/python/lib/pyspark.zip/pyspark/worker.py, line 111, in main process() File /Users/colin/src/spark/python/lib/pyspark.zip/pyspark/worker.py, line 106, in process serializer.dump_stream(func(split_index, iterator), outfile) x1 File /Users/colin/src/spark/python/lib/pyspark.zip/pyspark/serializers.py, line 263, in dump_stream vs = list(itertools.islice(iterator, batch)) File /Users/colin/src/spark/python/pyspark/sql/functions.py, line 469, in lambda func = lambda _, it: map(lambda x: f(*x), it) File /Users/colin/pokitdok/spark_mapper/spark_mapper/filters.py, line 143, in lambda TypeError: unsupported operand type(s) for +: 'int' and 'NoneType' Looking at what gets passed to the UDF, there seems to be something strange. The argument passed should be a Vector, but instead it gets passed a Python tuple like this: (1, None, None, [9.7, 1.0, -3.2]) Is it not possible to use UDFs on DataFrame columns of Vectors? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Issue-with-PySpark-UDF-on-a-column-of-Vectors-tp23393.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Best way to randomly distribute elements
Hi A bellet You can try RDD.randomSplit(weights array) where a weights array is the array of weight you wants to want to put in the consecutive partition example RDD.randomSplit(Array(0.7, 0.3)) will create two partitions containing 70% data in one and 30% in other, randomly selecting the elements. RDD.randomSplit(Array(0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, )) will create 10 partitions of randomly selected elements with equal weights. Thank you Himanshu -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Best-way-to-randomly-distribute-elements-tp23391p23392.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Accumulators / Accumulables : thread-local, task-local, executor-local ?
Yeah thats the problem. There is probably some perfect num of partitions that provides the best balance between partition size and memory and merge overhead. Though it's not an ideal solution :( There could be another way but very hacky... for example if you store one sketch in a singleton per jvm (so per executor). Do a first pass over your data and update those. Then you trigger some other dummy operation that will just retrieve the sketches. Thats kind of a hack but should work. Note that if you loose an executor in between, then that doesn't work anymore, probably you could detect it and recompute the sketches, but it would become over complicated. 2015-06-18 14:27 GMT+02:00 Guillaume Pitel guillaume.pi...@exensa.com: Hi, Thank you for this confirmation. Coalescing is what we do now. It creates, however, very big partitions. Guillaume Hey, I am not 100% sure but from my understanding accumulators are per partition (so per task as its the same) and are sent back to the driver with the task result and merged. When a task needs to be run n times (multiple rdds depend on this one, some partition loss later in the chain etc) then the accumulator will count n times the values from that task. So in short I don't think you'd win from using an accumulator over what you are doing right now. You could maybe coalesce your rdd to num-executors without a shuffle and then update the sketches. You should endup with 1 partition per executor thus 1 sketch per executor. You could then increase the number of threads per task if you can use the sketches concurrently. Eugen 2015-06-18 13:36 GMT+02:00 Guillaume Pitel guillaume.pi...@exensa.com: Hi, I'm trying to figure out the smartest way to implement a global count-min-sketch on accumulators. For now, we are doing that with RDDs. It works well, but with one sketch per partition, merging takes too long. As you probably know, a count-min sketch is a big mutable array of array of ints. To distribute it, all sketches must have the same size. Since it can be big, and since merging is not free, I would like to minimize the number of sketches and maximize reuse and conccurent use of the sketches. Ideally, I would like to just have one sketch per worker. I think accumulables might be the right structures for that, but it seems that they are not shared between executors, or even between tasks. https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Accumulators.scala (289) /** * This thread-local map holds per-task copies of accumulators; it is used to collect the set * of accumulator updates to send back to the driver when tasks complete. After tasks complete, * this map is cleared by `Accumulators.clear()` (see Executor.scala). */ private val localAccums = new ThreadLocal[Map[Long, Accumulable[_, _]]]() { override protected def initialValue() = Map[Long, Accumulable[_, _]]() } The localAccums stores an accumulator for each task (it's thread local, so I assume each task have a unique thread on executors) If I understand correctly, each time a task starts, an accumulator is initialized locally, updated, then sent back to the driver for merging ? So I guess, accumulators may not be the way to go, finally. Any advice ? Guillaume -- [image: eXenSa] *Guillaume PITEL, Président* +33(0)626 222 431 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705 -- [image: eXenSa] *Guillaume PITEL, Président* +33(0)626 222 431 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705
Accumulators / Accumulables : thread-local, task-local, executor-local ?
Hi, I'm trying to figure out the smartest way to implement a global count-min-sketch on accumulators. For now, we are doing that with RDDs. It works well, but with one sketch per partition, merging takes too long. As you probably know, a count-min sketch is a big mutable array of array of ints. To distribute it, all sketches must have the same size. Since it can be big, and since merging is not free, I would like to minimize the number of sketches and maximize reuse and conccurent use of the sketches. Ideally, I would like to just have one sketch per worker. I think accumulables might be the right structures for that, but it seems that they are not shared between executors, or even between tasks. https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Accumulators.scala (289) /** * This thread-local map holds per-task copies of accumulators; it is used to collect the set * of accumulator updates to send back to the driver when tasks complete. After tasks complete, * this map is cleared by `Accumulators.clear()` (see Executor.scala). */ private val localAccums = new ThreadLocal[Map[Long, Accumulable[_, _]]]() { override protected def initialValue() = Map[Long, Accumulable[_, _]]() } The localAccums stores an accumulator for each task (it's thread local, so I assume each task have a unique thread on executors) If I understand correctly, each time a task starts, an accumulator is initialized locally, updated, then sent back to the driver for merging ? So I guess, accumulators may not be the way to go, finally. Any advice ? Guillaume -- eXenSa *Guillaume PITEL, Président* +33(0)626 222 431 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705
Spark and Google Cloud Storage
Hi, is there a kind adapter to use GoogleCloudStorage with Spark? Cheers, Klaus -- -- Klaus Schaefers Senior Optimization Manager Ligatus GmbH Hohenstaufenring 30-32 D-50674 Köln Tel.: +49 (0) 221 / 56939 -784 Fax: +49 (0) 221 / 56 939 - 599 E-Mail: klaus.schaef...@ligatus.com Web: www.ligatus.de HRB Köln 56003 Geschäftsführung: Dipl.-Kaufmann Lars Hasselbach, Dipl.-Kaufmann Klaus Ludemann, Dipl.-Wirtschaftsingenieur Arne Wolter
kafka spark streaming working example
hi, I'm trying to run simple kafka spark streaming example over spark-shell: sc.stop import org.apache.spark.SparkConf import org.apache.spark.SparkContext._ import kafka.serializer.DefaultDecoder import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka._ import org.apache.spark.storage.StorageLevel val sparkConf = new SparkConf().setAppName(Summarizer).setMaster(local) val ssc = new StreamingContext(sparkConf, Seconds(10)) val kafkaParams = Map[String, String](zookeeper.connect - 127.0.0.1:2181, group.id - test) val messages = KafkaUtils.createStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](ssc, kafkaParams, Map(test - 1), StorageLevel.MEMORY_ONLY_SER).map(_._2) messages.foreachRDD { pairRDD = println(sDataListener.listen() [pairRDD = ${pairRDD}]) println(sDataListener.listen() [pairRDD.count = ${pairRDD.count()}]) pairRDD.foreach(row = println(sDataListener.listen() [row = ${row}])) } ssc.start() ssc.awaitTermination() in spark output i'm able to find only following println log: println(sDataListener.listen() [pairRDD = ${pairRDD}]) but unfortunately can't find output of: println(sDataListener.listen() [pairRDD.count = ${pairRDD.count()}]) and println(sDataListener.listen() [row = ${row}]) it's my spark-shell full output - http://pastebin.com/sfxbYYga http://pastebin.com/sfxbYYga any ideas what i'm doing wrong? thanks!
Re: Accumulators / Accumulables : thread-local, task-local, executor-local ?
2015-06-18 15:17 GMT+02:00 Guillaume Pitel guillaume.pi...@exensa.com: I was thinking exactly the same. I'm going to try it, It doesn't really matter if I lose an executor, since its sketch will be lost, but then reexecuted somewhere else. I mean that between the action that will update the sketches and the action to collect/merge them you can loose an executor. So outside of sparks control. But it's probably an acceptable risk. And anyway, it's an approximate data structure, and what matters are ratios, not exact values. I mostly need to take care of the concurrency problem for my sketch. I think you could do something like: - Have this singleton that holds N sketch instances (one for each executor core) - Inside an operation over partitions (like forEachPartition/mapPartitions) - at the begin you ask the singleton to provide you with one instance to use, in a sync. fashion and pick it out from the N available instances or mark it as in use - when the iterator over the partition doesn't have more elements then you release this sketch - Then you can do something like sc.parallelize(Seq(...)).coalesce(numExecutors).map(pickTheSketches).reduce(blabla), but you will have to make sure that this will be executed over each executor (not sure if a dataset than executor num, will trigger an action on every executor) Please let me know what you end up doing, sounds interesting :) Eugen Guillaume Yeah thats the problem. There is probably some perfect num of partitions that provides the best balance between partition size and memory and merge overhead. Though it's not an ideal solution :( There could be another way but very hacky... for example if you store one sketch in a singleton per jvm (so per executor). Do a first pass over your data and update those. Then you trigger some other dummy operation that will just retrieve the sketches. Thats kind of a hack but should work. Note that if you loose an executor in between, then that doesn't work anymore, probably you could detect it and recompute the sketches, but it would become over complicated. 2015-06-18 14:27 GMT+02:00 Guillaume Pitel guillaume.pi...@exensa.com: Hi, Thank you for this confirmation. Coalescing is what we do now. It creates, however, very big partitions. Guillaume Hey, I am not 100% sure but from my understanding accumulators are per partition (so per task as its the same) and are sent back to the driver with the task result and merged. When a task needs to be run n times (multiple rdds depend on this one, some partition loss later in the chain etc) then the accumulator will count n times the values from that task. So in short I don't think you'd win from using an accumulator over what you are doing right now. You could maybe coalesce your rdd to num-executors without a shuffle and then update the sketches. You should endup with 1 partition per executor thus 1 sketch per executor. You could then increase the number of threads per task if you can use the sketches concurrently. Eugen 2015-06-18 13:36 GMT+02:00 Guillaume Pitel guillaume.pi...@exensa.com: Hi, I'm trying to figure out the smartest way to implement a global count-min-sketch on accumulators. For now, we are doing that with RDDs. It works well, but with one sketch per partition, merging takes too long. As you probably know, a count-min sketch is a big mutable array of array of ints. To distribute it, all sketches must have the same size. Since it can be big, and since merging is not free, I would like to minimize the number of sketches and maximize reuse and conccurent use of the sketches. Ideally, I would like to just have one sketch per worker. I think accumulables might be the right structures for that, but it seems that they are not shared between executors, or even between tasks. https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Accumulators.scala (289) /** * This thread-local map holds per-task copies of accumulators; it is used to collect the set * of accumulator updates to send back to the driver when tasks complete. After tasks complete, * this map is cleared by `Accumulators.clear()` (see Executor.scala). */ private val localAccums = new ThreadLocal[Map[Long, Accumulable[_, _]]]() { override protected def initialValue() = Map[Long, Accumulable[_, _]]() } The localAccums stores an accumulator for each task (it's thread local, so I assume each task have a unique thread on executors) If I understand correctly, each time a task starts, an accumulator is initialized locally, updated, then sent back to the driver for merging ? So I guess, accumulators may not be the way to go, finally. Any advice ? Guillaume -- [image: eXenSa] *Guillaume PITEL, Président* +33(0)626 222 431 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705 --
Got the exception when joining RDD with spark streamRDD
Hi, I am writing pyspark stream program. I have the training data set to compute the regression model. I want to use the stream data set to test the model. So, I join with RDD with the StreamRDD, but i got the exception. Following are my source code, and the exception I got. Any help is appreciated. Thanks Regards, Afancy from __future__ import print_function import sys,os,datetime from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.sql.context import SQLContext from pyspark.resultiterable import ResultIterable from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD import numpy as np import statsmodels.api as sm def splitLine(line, delimiter='|'): values = line.split(delimiter) st = datetime.datetime.strptime(values[1], '%Y-%m-%d %H:%M:%S') return (values[0],st.hour), values[2:] def reg_m(y, x): ones = np.ones(len(x[0])) X = sm.add_constant(np.column_stack((x[0], ones))) for ele in x[1:]: X = sm.add_constant(np.column_stack((ele, X))) results = sm.OLS(y, X).fit() return results def train(line): y,x = [],[] y, x = [],[[],[],[],[],[],[]] reading_tmp,temp_tmp = [],[] i = 0 for reading, temperature in line[1]: if i%4==0 and len(reading_tmp)==4: y.append(reading_tmp.pop()) x[0].append(reading_tmp.pop()) x[1].append(reading_tmp.pop()) x[2].append(reading_tmp.pop()) temp = float(temp_tmp[0]) del temp_tmp[:] x[3].append(temp-20.0 if temp20.0 else 0.0) x[4].append(16.0-temp if temp16.0 else 0.0) x[5].append(5.0-temp if temp5.0 else 0.0) reading_tmp.append(float(reading)) temp_tmp.append(float(temperature)) i = i + 1 return str(line[0]),reg_m(y, x).params.tolist() if __name__ == __main__: if len(sys.argv) != 4: print(Usage: regression.py checkpointDir trainingDataDir streamDataDir, file=sys.stderr) exit(-1) checkpoint, trainingInput, streamInput = sys.argv[1:] sc = SparkContext(local[2], appName=BenchmarkSparkStreaming) trainingLines = sc.textFile(trainingInput) modelRDD = trainingLines.map(lambda line: splitLine(line, |))\ .groupByKey()\ .map(lambda line: train(line))\ .cache() ssc = StreamingContext(sc, 2) ssc.checkpoint(checkpoint) lines = ssc.textFileStream(streamInput).map(lambda line: splitLine(line, |)) testRDD = lines.groupByKeyAndWindow(4,2).map(lambda line:(str(line[0]), line[1])).transform(lambda rdd: rdd.leftOuterJoin(modelRDD)) testRDD.pprint(20) ssc.start() ssc.awaitTermination() 15/06/18 12:25:37 INFO FileInputDStream: Duration for remembering RDDs set to 6 ms for org.apache.spark.streaming.dstream.FileInputDStream@15b81ee6 Traceback (most recent call last): File /opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/util.py, line 90, in dumps return bytearray(self.serializer.dumps((func.func, func.deserializers))) File /opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py, line 427, in dumps return cloudpickle.dumps(obj, 2) File /opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py, line 622, in dumps cp.dump(obj) File /opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py, line 107, in dump return Pickler.dump(self, obj) File /usr/lib/python2.7/pickle.py, line 224, in dump self.save(obj) File /usr/lib/python2.7/pickle.py, line 286, in save f(self, obj) # Call unbound method with explicit self File /usr/lib/python2.7/pickle.py, line 548, in save_tuple save(element) File /usr/lib/python2.7/pickle.py, line 286, in save f(self, obj) # Call unbound method with explicit self File /opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py, line 193, in save_function self.save_function_tuple(obj) File /opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py, line 236, in save_function_tuple save((code, closure, base_globals)) File /usr/lib/python2.7/pickle.py, line 286, in save f(self, obj) # Call unbound method with explicit self File /usr/lib/python2.7/pickle.py, line 548, in save_tuple save(element) File /usr/lib/python2.7/pickle.py, line 286, in save f(self, obj) # Call unbound method with explicit self File /usr/lib/python2.7/pickle.py, line 600, in save_list self._batch_appends(iter(obj)) File /usr/lib/python2.7/pickle.py, line 633, in _batch_appends save(x) File /usr/lib/python2.7/pickle.py, line 286, in save f(self, obj) # Call unbound method with explicit self File /opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py, line 193, in save_function
Re: [Spark Streaming] Iterative programming on an ordered spark stream using Java?
Hi, UpdateStateByKey : if you can brief the issue you are facing with this,that will be great. Regarding not keeping whole dataset in memory, you can tweak the parameter of remember, such that it does checkpoint at appropriate time. Thanks Twinkle On Thursday, June 18, 2015, Nipun Arora nipunarora2...@gmail.com wrote: Hi All, I am updating my question so that I give more detail. I have also created a stackexchange question: http://stackoverflow.com/questions/30904244/iterative-programming-on-an-ordered-spark-stream-using-java-in-spark-streaming Is there anyway in spark streaming to keep data across multiple micro-batches of a sorted dstream, where the stream is sorted using timestamps? (Assuming monotonically arriving data) Can anyone make suggestions on how to keep data across iterations where each iteration is an RDD being processed in JavaDStream? *What does iteration mean?* I first sort the dstream using timestamps, assuming that data has arrived in a monotonically increasing timestamp (no out-of-order). I need a global HashMap X, which I would like to be updated using values with timestamp t1, and then subsequently t1+1. Since the state of X itself impacts the calculations it needs to be a linear operation. Hence operation at t1+1 depends on HashMap X, which depends on data at and before t1. *Application* This is especially the case when one is trying to update a model or compare two sets of RDD's, or keep a global history of certain events etc which will impact operations in future iterations? I would like to keep some accumulated history to make calculations.. not the entire dataset, but persist certain events which can be used in future DStream RDDs? Thanks Nipun On Wed, Jun 17, 2015 at 11:15 PM, Nipun Arora nipunarora2...@gmail.com javascript:_e(%7B%7D,'cvml','nipunarora2...@gmail.com'); wrote: Hi Silvio, Thanks for your response. I should clarify. I would like to do updates on a structure iteratively. I am not sure if updateStateByKey meets my criteria. In the current situation, I can run some map reduce tasks and generate a JavaPairDStreamKey,Value, after this my algorithm is necessarily sequential ... i.e. I have sorted the data using the timestamp(within the messages), and I would like to iterate over it, and maintain a state where I can update a model. I tried using foreach/foreachRDD, and collect to do this, but I can't seem to propagate values across microbatches/RDD's. Any suggestions? Thanks Nipun On Wed, Jun 17, 2015 at 10:52 PM, Silvio Fiorito silvio.fior...@granturing.com javascript:_e(%7B%7D,'cvml','silvio.fior...@granturing.com'); wrote: Hi, just answered in your other thread as well... Depending on your requirements, you can look at the updateStateByKey API From: Nipun Arora Date: Wednesday, June 17, 2015 at 10:51 PM To: user@spark.apache.org javascript:_e(%7B%7D,'cvml','user@spark.apache.org'); Subject: Iterative Programming by keeping data across micro-batches in spark-streaming? Hi, Is there anyway in spark streaming to keep data across multiple micro-batches? Like in a HashMap or something? Can anyone make suggestions on how to keep data across iterations where each iteration is an RDD being processed in JavaDStream? This is especially the case when I am trying to update a model or compare two sets of RDD's, or keep a global history of certain events etc which will impact operations in future iterations? I would like to keep some accumulated history to make calculations.. not the entire dataset, but persist certain events which can be used in future JavaDStream RDDs? Thanks Nipun
Re: Executor memory allocations
It would be the 40%, although it's probably better to think of it as shuffle vs. data cache and the remainder goes to tasks. As the comments for the shuffle memory fraction configuration clarify that it will be taking memory at the expense of the storage/data cache fraction: spark.shuffle.memoryFraction0.2Fraction of Java heap to use for aggregation and cogroups during shuffles, ifspark.shuffle.spill is true. At any given time, the collective size of all in-memory maps used for shuffles is bounded by this limit, beyond which the contents will begin to spill to disk. If spills are often, consider increasing this value at the expense of spark.storage.memoryFraction. On Wed, Jun 17, 2015 at 6:02 PM, Corey Nolet cjno...@gmail.com wrote: So I've seen in the documentation that (after the overhead memory is subtracted), the memory allocations of each executor are as follows (assume default settings): 60% for cache 40% for tasks to process data Reading about how Spark implements shuffling, I've also seen it say 20% of executor memory is utilized for shuffles Does this 20% cut into the 40% for tasks to process data or the 60% for the data cache?
Re: Matrix Multiplication and mllib.recommendation
Thanks Sabarish and Nick Would you happen to have some code snippets that you can share. Best Ayman On Jun 17, 2015, at 10:35 PM, Sabarish Sasidharan sabarish.sasidha...@manthan.com wrote: Nick is right. I too have implemented this way and it works just fine. In my case, there can be even more products. You simply broadcast blocks of products to userFeatures.mapPartitions() and BLAS multiply in there to get recommendations. In my case 10K products form one block. Note that you would then have to union your recommendations. And if there lots of product blocks, you might also want to checkpoint once every few times. Regards Sab On Thu, Jun 18, 2015 at 10:43 AM, Nick Pentreath nick.pentre...@gmail.com wrote: One issue is that you broadcast the product vectors and then do a dot product one-by-one with the user vector. You should try forming a matrix of the item vectors and doing the dot product as a matrix-vector multiply which will make things a lot faster. Another optimisation that is avalailable on 1.4 is a recommendProducts method that blockifies the factors to make use of level 3 BLAS (ie matrix-matrix multiply). I am not sure if this is available in The Python api yet. But you can do a version yourself by using mapPartitions over user factors, blocking the factors into sub-matrices and doing matrix multiply with item factor matrix to get scores on a block-by-block basis. Also as Ilya says more parallelism can help. I don't think it's so necessary to do LSH with 30,000 items. — Sent from Mailbox On Thu, Jun 18, 2015 at 6:01 AM, Ganelin, Ilya ilya.gane...@capitalone.com wrote: Actually talk about this exact thing in a blog post here http://blog.cloudera.com/blog/2015/05/working-with-apache-spark-or-how-i-learned-to-stop-worrying-and-love-the-shuffle/. Keep in mind, you're actually doing a ton of math. Even with proper caching and use of broadcast variables this will take a while defending on the size of your cluster. To get real results you may want to look into locality sensitive hashing to limit your search space and definitely look into spinning up multiple threads to process your product features in parallel to increase resource utilization on the cluster. Thank you, Ilya Ganelin -Original Message- From: afarahat [ayman.fara...@yahoo.com] Sent: Wednesday, June 17, 2015 11:16 PM Eastern Standard Time To: user@spark.apache.org Subject: Matrix Multiplication and mllib.recommendation Hello; I am trying to get predictions after running the ALS model. The model works fine. In the prediction/recommendation , I have about 30 ,000 products and 90 Millions users. When i try the predict all it fails. I have been trying to formulate the problem as a Matrix multiplication where I first get the product features, broadcast them and then do a dot product. Its still very slow. Any reason why here is a sample code def doMultiply(x): a = [] #multiply by mylen = len(pf.value) for i in range(mylen) : myprod = numpy.dot(x,pf.value[i][1]) a.append(myprod) return a myModel = MatrixFactorizationModel.load(sc, FlurryModelPath) #I need to select which products to broadcast but lets try all m1 = myModel.productFeatures().sample(False, 0.001) pf = sc.broadcast(m1.collect()) uf = myModel.userFeatures() f1 = uf.map(lambda x : (x[0], doMultiply(x[1]))) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Matrix-Multiplication-and-mllib-recommendation-tp23384.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer. -- Architect - Big Data Ph: +91 99805 99458 Manthan Systems | Company of the year - Analytics (2014 Frost and Sullivan India ICT) +++
Re: [Spark Streaming] Iterative programming on an ordered spark stream using Java?
Hi All, I appreciate the help :) Here is a sample code where I am trying to keep the data of the previous RDD and the current RDD in a foreachRDD in spark stream. I do not know if the bottom code technically works as I cannot compile it , but I am trying to in a way keep the historical reference of the last RDD in this scenario. This is the furthest I got. You can imagine another scenario where I keep historical list where if I get a certain order of events, I store them. sortedtsStream.foreach(new ABC()); //error here cannot be referenced from static context, this call is within static main() class ABC implements FunctionJavaPairRDDTuple2Long, Integer, Integer, Void{ @Override public Void call(JavaPairRDDTuple2Long, Integer, Integer tuple2IntegerJavaPairRDD) throws Exception { ListTuple2Tuple2Long, Integer, Integer list = tuple2IntegerJavaPairRDD.collect(); if(Type4ViolationChecker.this.prevlist!=null currentlist!=null){ prevlist = currentlist; currentlist = list; } else{ currentlist = list; prevlist = list; } System.out.println(Printing previous); for (Tuple2Tuple2Long, Integer, Integer tuple : prevlist) { Date date = new Date(tuple._1._1); int pattern = tuple._1._2; int count = tuple._2; System.out.println(TimeSlot: + date.toString() + Pattern: + pattern + Count: + count); } System.out.println(Printing current); for (Tuple2Tuple2Long, Integer, Integer tuple : currentlist) { Date date = new Date(tuple._1._1); int pattern = tuple._1._2; int count = tuple._2; System.out.println(TimeSlot: + date.toString() + Pattern: + pattern + Count: + count); } return null; } } Thanks Nipun On Thu, Jun 18, 2015 at 11:26 AM, twinkle sachdeva twinkle.sachd...@gmail.com wrote: Hi, UpdateStateByKey : if you can brief the issue you are facing with this,that will be great. Regarding not keeping whole dataset in memory, you can tweak the parameter of remember, such that it does checkpoint at appropriate time. Thanks Twinkle On Thursday, June 18, 2015, Nipun Arora nipunarora2...@gmail.com wrote: Hi All, I am updating my question so that I give more detail. I have also created a stackexchange question: http://stackoverflow.com/questions/30904244/iterative-programming-on-an-ordered-spark-stream-using-java-in-spark-streaming Is there anyway in spark streaming to keep data across multiple micro-batches of a sorted dstream, where the stream is sorted using timestamps? (Assuming monotonically arriving data) Can anyone make suggestions on how to keep data across iterations where each iteration is an RDD being processed in JavaDStream? *What does iteration mean?* I first sort the dstream using timestamps, assuming that data has arrived in a monotonically increasing timestamp (no out-of-order). I need a global HashMap X, which I would like to be updated using values with timestamp t1, and then subsequently t1+1. Since the state of X itself impacts the calculations it needs to be a linear operation. Hence operation at t1+1 depends on HashMap X, which depends on data at and before t1. *Application* This is especially the case when one is trying to update a model or compare two sets of RDD's, or keep a global history of certain events etc which will impact operations in future iterations? I would like to keep some accumulated history to make calculations.. not the entire dataset, but persist certain events which can be used in future DStream RDDs? Thanks Nipun On Wed, Jun 17, 2015 at 11:15 PM, Nipun Arora nipunarora2...@gmail.com wrote: Hi Silvio, Thanks for your response. I should clarify. I would like to do updates on a structure iteratively. I am not sure if updateStateByKey meets my criteria. In the current situation, I can run some map reduce tasks and generate a JavaPairDStreamKey,Value, after this my algorithm is necessarily sequential ... i.e. I have sorted the data using the timestamp(within the messages), and I would like to iterate over it, and maintain a state where I can update a model. I tried using foreach/foreachRDD, and collect to do this, but I can't seem to propagate values across microbatches/RDD's. Any suggestions? Thanks Nipun On Wed, Jun 17, 2015 at 10:52 PM, Silvio Fiorito silvio.fior...@granturing.com wrote: Hi, just answered in your other thread as well... Depending on your requirements, you can look at the updateStateByKey API From: Nipun Arora Date: Wednesday, June 17, 2015 at 10:51 PM To: user@spark.apache.org Subject: Iterative Programming by keeping data across micro-batches in spark-streaming? Hi, Is there anyway in spark streaming to keep data across multiple micro-batches? Like in a HashMap or something? Can anyone make
Re: Matrix Multiplication and mllib.recommendation
Also not sure how threading helps here because Spark puts a partition to each core. On each core may be there are multiple threads if you are using intel hyperthreading but I will let Spark handle the threading. On Thu, Jun 18, 2015 at 8:38 AM, Debasish Das debasish.da...@gmail.com wrote: We added SPARK-3066 for this. In 1.4 you should get the code to do BLAS dgemm based calculation. On Thu, Jun 18, 2015 at 8:20 AM, Ayman Farahat ayman.fara...@yahoo.com.invalid wrote: Thanks Sabarish and Nick Would you happen to have some code snippets that you can share. Best Ayman On Jun 17, 2015, at 10:35 PM, Sabarish Sasidharan sabarish.sasidha...@manthan.com wrote: Nick is right. I too have implemented this way and it works just fine. In my case, there can be even more products. You simply broadcast blocks of products to userFeatures.mapPartitions() and BLAS multiply in there to get recommendations. In my case 10K products form one block. Note that you would then have to union your recommendations. And if there lots of product blocks, you might also want to checkpoint once every few times. Regards Sab On Thu, Jun 18, 2015 at 10:43 AM, Nick Pentreath nick.pentre...@gmail.com wrote: One issue is that you broadcast the product vectors and then do a dot product one-by-one with the user vector. You should try forming a matrix of the item vectors and doing the dot product as a matrix-vector multiply which will make things a lot faster. Another optimisation that is avalailable on 1.4 is a recommendProducts method that blockifies the factors to make use of level 3 BLAS (ie matrix-matrix multiply). I am not sure if this is available in The Python api yet. But you can do a version yourself by using mapPartitions over user factors, blocking the factors into sub-matrices and doing matrix multiply with item factor matrix to get scores on a block-by-block basis. Also as Ilya says more parallelism can help. I don't think it's so necessary to do LSH with 30,000 items. — Sent from Mailbox https://www.dropbox.com/mailbox On Thu, Jun 18, 2015 at 6:01 AM, Ganelin, Ilya ilya.gane...@capitalone.com wrote: Actually talk about this exact thing in a blog post here http://blog.cloudera.com/blog/2015/05/working-with-apache-spark-or-how-i-learned-to-stop-worrying-and-love-the-shuffle/. Keep in mind, you're actually doing a ton of math. Even with proper caching and use of broadcast variables this will take a while defending on the size of your cluster. To get real results you may want to look into locality sensitive hashing to limit your search space and definitely look into spinning up multiple threads to process your product features in parallel to increase resource utilization on the cluster. Thank you, Ilya Ganelin -Original Message- *From: *afarahat [ayman.fara...@yahoo.com] *Sent: *Wednesday, June 17, 2015 11:16 PM Eastern Standard Time *To: *user@spark.apache.org *Subject: *Matrix Multiplication and mllib.recommendation Hello; I am trying to get predictions after running the ALS model. The model works fine. In the prediction/recommendation , I have about 30 ,000 products and 90 Millions users. When i try the predict all it fails. I have been trying to formulate the problem as a Matrix multiplication where I first get the product features, broadcast them and then do a dot product. Its still very slow. Any reason why here is a sample code def doMultiply(x): a = [] #multiply by mylen = len(pf.value) for i in range(mylen) : myprod = numpy.dot(x,pf.value[i][1]) a.append(myprod) return a myModel = MatrixFactorizationModel.load(sc, FlurryModelPath) #I need to select which products to broadcast but lets try all m1 = myModel.productFeatures().sample(False, 0.001) pf = sc.broadcast(m1.collect()) uf = myModel.userFeatures() f1 = uf.map(lambda x : (x[0], doMultiply(x[1]))) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Matrix-Multiplication-and-mllib-recommendation-tp23384.html Sent from the Apache Spark User List mailing list archive at Nabble.com . - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited.
Re: Matrix Multiplication and mllib.recommendation
We added SPARK-3066 for this. In 1.4 you should get the code to do BLAS dgemm based calculation. On Thu, Jun 18, 2015 at 8:20 AM, Ayman Farahat ayman.fara...@yahoo.com.invalid wrote: Thanks Sabarish and Nick Would you happen to have some code snippets that you can share. Best Ayman On Jun 17, 2015, at 10:35 PM, Sabarish Sasidharan sabarish.sasidha...@manthan.com wrote: Nick is right. I too have implemented this way and it works just fine. In my case, there can be even more products. You simply broadcast blocks of products to userFeatures.mapPartitions() and BLAS multiply in there to get recommendations. In my case 10K products form one block. Note that you would then have to union your recommendations. And if there lots of product blocks, you might also want to checkpoint once every few times. Regards Sab On Thu, Jun 18, 2015 at 10:43 AM, Nick Pentreath nick.pentre...@gmail.com wrote: One issue is that you broadcast the product vectors and then do a dot product one-by-one with the user vector. You should try forming a matrix of the item vectors and doing the dot product as a matrix-vector multiply which will make things a lot faster. Another optimisation that is avalailable on 1.4 is a recommendProducts method that blockifies the factors to make use of level 3 BLAS (ie matrix-matrix multiply). I am not sure if this is available in The Python api yet. But you can do a version yourself by using mapPartitions over user factors, blocking the factors into sub-matrices and doing matrix multiply with item factor matrix to get scores on a block-by-block basis. Also as Ilya says more parallelism can help. I don't think it's so necessary to do LSH with 30,000 items. — Sent from Mailbox https://www.dropbox.com/mailbox On Thu, Jun 18, 2015 at 6:01 AM, Ganelin, Ilya ilya.gane...@capitalone.com wrote: Actually talk about this exact thing in a blog post here http://blog.cloudera.com/blog/2015/05/working-with-apache-spark-or-how-i-learned-to-stop-worrying-and-love-the-shuffle/. Keep in mind, you're actually doing a ton of math. Even with proper caching and use of broadcast variables this will take a while defending on the size of your cluster. To get real results you may want to look into locality sensitive hashing to limit your search space and definitely look into spinning up multiple threads to process your product features in parallel to increase resource utilization on the cluster. Thank you, Ilya Ganelin -Original Message- *From: *afarahat [ayman.fara...@yahoo.com] *Sent: *Wednesday, June 17, 2015 11:16 PM Eastern Standard Time *To: *user@spark.apache.org *Subject: *Matrix Multiplication and mllib.recommendation Hello; I am trying to get predictions after running the ALS model. The model works fine. In the prediction/recommendation , I have about 30 ,000 products and 90 Millions users. When i try the predict all it fails. I have been trying to formulate the problem as a Matrix multiplication where I first get the product features, broadcast them and then do a dot product. Its still very slow. Any reason why here is a sample code def doMultiply(x): a = [] #multiply by mylen = len(pf.value) for i in range(mylen) : myprod = numpy.dot(x,pf.value[i][1]) a.append(myprod) return a myModel = MatrixFactorizationModel.load(sc, FlurryModelPath) #I need to select which products to broadcast but lets try all m1 = myModel.productFeatures().sample(False, 0.001) pf = sc.broadcast(m1.collect()) uf = myModel.userFeatures() f1 = uf.map(lambda x : (x[0], doMultiply(x[1]))) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Matrix-Multiplication-and-mllib-recommendation-tp23384.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer. -- Architect - Big Data Ph: +91 99805 99458 Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan India ICT)* +++
Re: Matrix Multiplication and mllib.recommendation
Also in my experiments, it's much faster to blocked BLAS through cartesian rather than doing sc.union. Here are the details on the experiments: https://issues.apache.org/jira/browse/SPARK-4823 On Thu, Jun 18, 2015 at 8:40 AM, Debasish Das debasish.da...@gmail.com wrote: Also not sure how threading helps here because Spark puts a partition to each core. On each core may be there are multiple threads if you are using intel hyperthreading but I will let Spark handle the threading. On Thu, Jun 18, 2015 at 8:38 AM, Debasish Das debasish.da...@gmail.com wrote: We added SPARK-3066 for this. In 1.4 you should get the code to do BLAS dgemm based calculation. On Thu, Jun 18, 2015 at 8:20 AM, Ayman Farahat ayman.fara...@yahoo.com.invalid wrote: Thanks Sabarish and Nick Would you happen to have some code snippets that you can share. Best Ayman On Jun 17, 2015, at 10:35 PM, Sabarish Sasidharan sabarish.sasidha...@manthan.com wrote: Nick is right. I too have implemented this way and it works just fine. In my case, there can be even more products. You simply broadcast blocks of products to userFeatures.mapPartitions() and BLAS multiply in there to get recommendations. In my case 10K products form one block. Note that you would then have to union your recommendations. And if there lots of product blocks, you might also want to checkpoint once every few times. Regards Sab On Thu, Jun 18, 2015 at 10:43 AM, Nick Pentreath nick.pentre...@gmail.com wrote: One issue is that you broadcast the product vectors and then do a dot product one-by-one with the user vector. You should try forming a matrix of the item vectors and doing the dot product as a matrix-vector multiply which will make things a lot faster. Another optimisation that is avalailable on 1.4 is a recommendProducts method that blockifies the factors to make use of level 3 BLAS (ie matrix-matrix multiply). I am not sure if this is available in The Python api yet. But you can do a version yourself by using mapPartitions over user factors, blocking the factors into sub-matrices and doing matrix multiply with item factor matrix to get scores on a block-by-block basis. Also as Ilya says more parallelism can help. I don't think it's so necessary to do LSH with 30,000 items. — Sent from Mailbox https://www.dropbox.com/mailbox On Thu, Jun 18, 2015 at 6:01 AM, Ganelin, Ilya ilya.gane...@capitalone.com wrote: Actually talk about this exact thing in a blog post here http://blog.cloudera.com/blog/2015/05/working-with-apache-spark-or-how-i-learned-to-stop-worrying-and-love-the-shuffle/. Keep in mind, you're actually doing a ton of math. Even with proper caching and use of broadcast variables this will take a while defending on the size of your cluster. To get real results you may want to look into locality sensitive hashing to limit your search space and definitely look into spinning up multiple threads to process your product features in parallel to increase resource utilization on the cluster. Thank you, Ilya Ganelin -Original Message- *From: *afarahat [ayman.fara...@yahoo.com] *Sent: *Wednesday, June 17, 2015 11:16 PM Eastern Standard Time *To: *user@spark.apache.org *Subject: *Matrix Multiplication and mllib.recommendation Hello; I am trying to get predictions after running the ALS model. The model works fine. In the prediction/recommendation , I have about 30 ,000 products and 90 Millions users. When i try the predict all it fails. I have been trying to formulate the problem as a Matrix multiplication where I first get the product features, broadcast them and then do a dot product. Its still very slow. Any reason why here is a sample code def doMultiply(x): a = [] #multiply by mylen = len(pf.value) for i in range(mylen) : myprod = numpy.dot(x,pf.value[i][1]) a.append(myprod) return a myModel = MatrixFactorizationModel.load(sc, FlurryModelPath) #I need to select which products to broadcast but lets try all m1 = myModel.productFeatures().sample(False, 0.001) pf = sc.broadcast(m1.collect()) uf = myModel.userFeatures() f1 = uf.map(lambda x : (x[0], doMultiply(x[1]))) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Matrix-Multiplication-and-mllib-recommendation-tp23384.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity
Re: Spark-sql(yarn-client) java.lang.NoClassDefFoundError: org/apache/spark/deploy/yarn/ExecutorLauncher
btw, user listt will be a better place for this thread. On Thu, Jun 18, 2015 at 8:19 AM, Yin Huai yh...@databricks.com wrote: Is it the full stack trace? On Thu, Jun 18, 2015 at 6:39 AM, Sea 261810...@qq.com wrote: Hi, all: I want to run spark sql on yarn(yarn-client), but ... I already set spark.yarn.jar and spark.jars in conf/spark-defaults.conf. ./bin/spark-sql -f game.sql --executor-memory 2g --num-executors 100 game.txt Exception in thread main java.lang.NoClassDefFoundError: org/apache/spark/deploy/yarn/ExecutorLauncher Caused by: java.lang.ClassNotFoundException: org.apache.spark.deploy.yarn.ExecutorLauncher at java.net.URLClassLoader$1.run(URLClassLoader.java:202) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:190) at java.lang.ClassLoader.loadClass(ClassLoader.java:306) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301) at java.lang.ClassLoader.loadClass(ClassLoader.java:247) Could not find the main class: org.apache.spark.deploy.yarn.ExecutorLauncher. Program will exit. Anyone can help?
Re: RE: Spark or Storm
That general description is accurate, but not really a specific issue of the direct steam. It applies to anything consuming from kafka (or, as Matei already said, any streaming system really). You can't have exactly once semantics, unless you know something more about how you're storing results. For some unique id, topicpartition and offset is usually the obvious choice, which is why it's important that the direct stream gives you access to the offsets. See https://github.com/koeninger/kafka-exactly-once for more info On Thu, Jun 18, 2015 at 6:47 AM, bit1...@163.com bit1...@163.com wrote: I am wondering how direct stream api ensures end-to-end exactly once semantics I think there are two things involved: 1. From the spark streaming end, the driver will replay the Offset range when it's down and restarted,which means that the new tasks will process some already processed data. 2. From the user end, since tasks may process already processed data, user end should detect that some data has already been processed,eg, use some unique ID. Not sure if I have understood correctly. -- bit1...@163.com *From:* prajod.vettiyat...@wipro.com *Date:* 2015-06-18 16:56 *To:* jrpi...@gmail.com; eshi...@gmail.com *CC:* wrbri...@gmail.com; asoni.le...@gmail.com; guha.a...@gmail.com; user@spark.apache.org; sateesh.kav...@gmail.com; sparkenthusi...@yahoo.in; sabarish.sasidha...@manthan.com *Subject:* RE: Spark or Storm not being able to read from Kafka using multiple nodes Kafka is plenty capable of doing this.. I faced the same issue before Spark 1.3 was released. The issue was not with Kafka, but with Spark Streaming’s Kafka connector. Before Spark 1.3.0 release one Spark worker would get all the streamed messages. We had to re-partition to distribute the processing. From Spark 1.3.0 release the Spark Direct API for Kafka supported parallel reads from Kafka streamed to Spark workers. See the “Approach 2: Direct Approach” in this page: http://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html. Note that is also mentions zero data loss and exactly once semantics for kafka integration. Prajod *From:* Jordan Pilat [mailto:jrpi...@gmail.com] *Sent:* 18 June 2015 03:57 *To:* Enno Shioji *Cc:* Will Briggs; asoni.le...@gmail.com; ayan guha; user; Sateesh Kavuri; Spark Enthusiast; Sabarish Sasidharan *Subject:* Re: Spark or Storm not being able to read from Kafka using multiple nodes Kafka is plenty capable of doing this, by clustering together multiple consumer instances into a consumer group. If your topic is sufficiently partitioned, the consumer group can consume the topic in a parallelized fashion. If it isn't, you still have the fault tolerance associated with clustering the consumers. OK JRP On Jun 17, 2015 1:27 AM, Enno Shioji eshi...@gmail.com wrote: We've evaluated Spark Streaming vs. Storm and ended up sticking with Storm. Some of the important draw backs are: Spark has no back pressure (receiver rate limit can alleviate this to a certain point, but it's far from ideal) There is also no exactly-once semantics. (updateStateByKey can achieve this semantics, but is not practical if you have any significant amount of state because it does so by dumping the entire state on every checkpointing) There are also some minor drawbacks that I'm sure will be fixed quickly, like no task timeout, not being able to read from Kafka using multiple nodes, data loss hazard with Kafka. It's also not possible to attain very low latency in Spark, if that's what you need. The pos for Spark is the concise and IMO more intuitive syntax, especially if you compare it with Storm's Java API. I admit I might be a bit biased towards Storm tho as I'm more familiar with it. Also, you can do some processing with Kinesis. If all you need to do is straight forward transformation and you are reading from Kinesis to begin with, it might be an easier option to just do the transformation in Kinesis. On Wed, Jun 17, 2015 at 7:15 AM, Sabarish Sasidharan sabarish.sasidha...@manthan.com wrote: Whatever you write in bolts would be the logic you want to apply on your events. In Spark, that logic would be coded in map() or similar such transformations and/or actions. Spark doesn't enforce a structure for capturing your processing logic like Storm does. Regards Sab Probably overloading the question a bit. In Storm, Bolts have the functionality of getting triggered on events. Is that kind of functionality possible with Spark streaming? During each phase of the data processing, the transformed data is stored to the database and this transformed data should then be sent to a new pipeline for further processing How can this be achieved using Spark? On Wed, Jun 17, 2015 at 10:10 AM, Spark Enthusiast sparkenthusi...@yahoo.in wrote: I have a use-case where a stream of
Re: Loading lots of parquet files into dataframe from s3
You can do something like this: ObjectListing objectListing; do { objectListing = s3Client.listObjects(listObjectsRequest); for (S3ObjectSummary objectSummary : objectListing.getObjectSummaries()) { if ((objectSummary.getLastModified().compareTo(dayBefore) 0) (objectSummary.getLastModified().compareTo(dayAfter) 1) objectSummary.getKey().contains(.log)) FileNames.add(objectSummary.getKey()); } listObjectsRequest.setMarker(objectListing.getNextMarker()); } while (objectListing.isTruncated()); String concatName= ; for(String fName : FileNames) { if(FileNames.indexOf(fName) == (FileNames.size() -1)) { concatName+= s3n:// + s3_bucket + / + fName; } else { concatName+= s3n:// + s3_bucket + / + fName + ,; } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Loading-lots-of-parquet-files-into-dataframe-from-s3-tp23127p23394.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Submitting Spark Applications using Spark Submit
Hi, To make the jar files as part of the jar which you would like to use, you should create a uber jar. Please refer to the following: https://maven.apache.org/plugins/maven-shade-plugin/examples/includes-excludes.html -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Submitting-Spark-Applications-using-Spark-Submit-tp23352p23395.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Issue with PySpark UDF on a column of Vectors
This is a known issue. See https://issues.apache.org/jira/browse/SPARK-7902 -Xiangrui On Thu, Jun 18, 2015 at 6:41 AM, calstad colin.als...@gmail.com wrote: I am having trouble using a UDF on a column of Vectors in PySpark which can be illustrated here: from pyspark import SparkContext from pyspark.sql import Row from pyspark.sql.types import DoubleType from pyspark.sql.functions import udf from pyspark.mllib.linalg import Vectors FeatureRow = Row('id', 'features') data = sc.parallelize([(0, Vectors.dense([9.7, 1.0, -3.2])), (1, Vectors.dense([2.25, -11.1, 123.2])), (2, Vectors.dense([-7.2, 1.0, -3.2]))]) df = data.map(lambda r: FeatureRow(*r)).toDF() vector_udf = udf(lambda vector: sum(vector), DoubleType()) df.withColumn('feature_sums', vector_udf(df.features)).first() This fails with the following stack trace: Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 31.0 failed 1 times, most recent failure: Lost task 5.0 in stage 31.0 (TID 95, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File /Users/colin/src/spark/python/lib/pyspark.zip/pyspark/worker.py, line 111, in main process() File /Users/colin/src/spark/python/lib/pyspark.zip/pyspark/worker.py, line 106, in process serializer.dump_stream(func(split_index, iterator), outfile) x1 File /Users/colin/src/spark/python/lib/pyspark.zip/pyspark/serializers.py, line 263, in dump_stream vs = list(itertools.islice(iterator, batch)) File /Users/colin/src/spark/python/pyspark/sql/functions.py, line 469, in lambda func = lambda _, it: map(lambda x: f(*x), it) File /Users/colin/pokitdok/spark_mapper/spark_mapper/filters.py, line 143, in lambda TypeError: unsupported operand type(s) for +: 'int' and 'NoneType' Looking at what gets passed to the UDF, there seems to be something strange. The argument passed should be a Vector, but instead it gets passed a Python tuple like this: (1, None, None, [9.7, 1.0, -3.2]) Is it not possible to use UDFs on DataFrame columns of Vectors? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Issue-with-PySpark-UDF-on-a-column-of-Vectors-tp23393.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [Spark Streaming] Iterative programming on an ordered spark stream using Java?
@Twinkle - what did you mean by Regarding not keeping whole dataset in memory, you can tweak the parameter of remember, such that it does checkpoint at appropriate time? On Thu, Jun 18, 2015 at 11:40 AM, Nipun Arora nipunarora2...@gmail.com wrote: Hi All, I appreciate the help :) Here is a sample code where I am trying to keep the data of the previous RDD and the current RDD in a foreachRDD in spark stream. I do not know if the bottom code technically works as I cannot compile it , but I am trying to in a way keep the historical reference of the last RDD in this scenario. This is the furthest I got. You can imagine another scenario where I keep historical list where if I get a certain order of events, I store them. sortedtsStream.foreach(new ABC()); //error here cannot be referenced from static context, this call is within static main() class ABC implements FunctionJavaPairRDDTuple2Long, Integer, Integer, Void{ @Override public Void call(JavaPairRDDTuple2Long, Integer, Integer tuple2IntegerJavaPairRDD) throws Exception { ListTuple2Tuple2Long, Integer, Integer list = tuple2IntegerJavaPairRDD.collect(); if(Type4ViolationChecker.this.prevlist!=null currentlist!=null){ prevlist = currentlist; currentlist = list; } else{ currentlist = list; prevlist = list; } System.out.println(Printing previous); for (Tuple2Tuple2Long, Integer, Integer tuple : prevlist) { Date date = new Date(tuple._1._1); int pattern = tuple._1._2; int count = tuple._2; System.out.println(TimeSlot: + date.toString() + Pattern: + pattern + Count: + count); } System.out.println(Printing current); for (Tuple2Tuple2Long, Integer, Integer tuple : currentlist) { Date date = new Date(tuple._1._1); int pattern = tuple._1._2; int count = tuple._2; System.out.println(TimeSlot: + date.toString() + Pattern: + pattern + Count: + count); } return null; } } Thanks Nipun On Thu, Jun 18, 2015 at 11:26 AM, twinkle sachdeva twinkle.sachd...@gmail.com wrote: Hi, UpdateStateByKey : if you can brief the issue you are facing with this,that will be great. Regarding not keeping whole dataset in memory, you can tweak the parameter of remember, such that it does checkpoint at appropriate time. Thanks Twinkle On Thursday, June 18, 2015, Nipun Arora nipunarora2...@gmail.com wrote: Hi All, I am updating my question so that I give more detail. I have also created a stackexchange question: http://stackoverflow.com/questions/30904244/iterative-programming-on-an-ordered-spark-stream-using-java-in-spark-streaming Is there anyway in spark streaming to keep data across multiple micro-batches of a sorted dstream, where the stream is sorted using timestamps? (Assuming monotonically arriving data) Can anyone make suggestions on how to keep data across iterations where each iteration is an RDD being processed in JavaDStream? *What does iteration mean?* I first sort the dstream using timestamps, assuming that data has arrived in a monotonically increasing timestamp (no out-of-order). I need a global HashMap X, which I would like to be updated using values with timestamp t1, and then subsequently t1+1. Since the state of X itself impacts the calculations it needs to be a linear operation. Hence operation at t1+1 depends on HashMap X, which depends on data at and before t1. *Application* This is especially the case when one is trying to update a model or compare two sets of RDD's, or keep a global history of certain events etc which will impact operations in future iterations? I would like to keep some accumulated history to make calculations.. not the entire dataset, but persist certain events which can be used in future DStream RDDs? Thanks Nipun On Wed, Jun 17, 2015 at 11:15 PM, Nipun Arora nipunarora2...@gmail.com wrote: Hi Silvio, Thanks for your response. I should clarify. I would like to do updates on a structure iteratively. I am not sure if updateStateByKey meets my criteria. In the current situation, I can run some map reduce tasks and generate a JavaPairDStreamKey,Value, after this my algorithm is necessarily sequential ... i.e. I have sorted the data using the timestamp(within the messages), and I would like to iterate over it, and maintain a state where I can update a model. I tried using foreach/foreachRDD, and collect to do this, but I can't seem to propagate values across microbatches/RDD's. Any suggestions? Thanks Nipun On Wed, Jun 17, 2015 at 10:52 PM, Silvio Fiorito silvio.fior...@granturing.com wrote: Hi, just answered in your other thread as well... Depending on your requirements, you can look at the updateStateByKey API From: Nipun Arora
different schemas per row with DataFrames
I am reading JSON data that has different schemas for every record. That is, for a given field that would have a null value, it's simply absent from that record (and therefore, its schema). I would like to use the DataFrame API to select specific fields from this data, and for fields that are missing from a record, to default to null or an empty string. Is this possible or can DataFrames only handle a single consistent schema throughout the data? One thing I noticed is that the schema of the DataFrame is the superset of all the records in it, so if record A has field X, but record B does not, it will show up in B as null because it's part of the DataFrame's schema (because A has it). But if none of the records have field X, then referencing that field will result in an error about not being able to resolve that column. If I know the schema of all possible fields and the order in which they occur, it may be possible to get the RDD from the DataFrame and build my own DataFrame with createDataFrame and passing it my fabricated super-schema. However, this is brittle, as the super-schema is not in my control and may change in the future. Thanks for any suggestions, Alex.
Hivecontext going out-of-sync issue
Hi All. I have a partitioned table in Hive. The use case is to drop one of the partitions before inserting new data every time the Spark process runs. I am using the Hivecontext to read and write (dynamic partitions) and also to alter the table to drop the partition before insert. Everything runs fine when run for the first time (the partition being inserted didn't exist before). However, if the partition existed and was dropped by the alter table command in the same process, then the insert fails with the error FileNotFoundException: File does not exist : hdfs table location/part_col=val1/part-0. When the program is rerun as-is, it succeeds as now the partition does not exist any more when it starts up. Spark 1.3.0 on CDH5.4.0. Things I have tried: - Put a pause of up to 1 min between alter table and insert to ensure that any possibly pending async task in the background gets time to finish. - Create a new Hivecontext object and call Insert with it (Call drop partition and insert using separate hive context objects). The intention was perhaps a new hive context will be created with the correct state of the hive metastore at that moment and should work. - Create a new SparkContext and a HiveContext - more of throwing a stone at the dark - try and create a new set of contexts after the alter table to try and reload the states at that point in time. None of these have worked so far. Any ideas, suggestions or experiences on similar lines..? -- Regards, Ranadip Chatterjee
[Spark Streaming] Runtime Error in call to max function for JavaPairRDD
Hi, I have the following piece of code, where I am trying to transform a spark stream and add min and max to it of eachRDD. However, I get an error saying max call does not exist, at run-time (compiles properly). I am using spark-1.4 I have added the question to stackoverflow as well: http://stackoverflow.com/questions/30902090/adding-max-and-min-in-spark-stream-in-java/30909796#30909796 Any help is greatly appreciated :) Thanks Nipun JavaPairDStreamTuple2Long, Integer, Tuple3Integer,Long,Long sortedtsStream = transformedMaxMintsStream.transformToPair(new Sort2()); sortedtsStream.foreach( new FunctionJavaPairRDDTuple2Long, Integer, Tuple3Integer, Long, Long, Void() { @Override public Void call(JavaPairRDDTuple2Long, Integer, Tuple3Integer, Long, Long tuple2Tuple3JavaPairRDD) throws Exception { ListTuple2Tuple2Long, Integer, Tuple3Integer,Long,Long templist = tuple2Tuple3JavaPairRDD.collect(); for(Tuple2Tuple2Long,Integer, Tuple3Integer,Long,Long tuple :templist){ Date date = new Date(tuple._1._1); int pattern = tuple._1._2; int count = tuple._2._1(); Date maxDate = new Date(tuple._2._2()); Date minDate = new Date(tuple._2._2()); System.out.println(TimeSlot: + date.toString() + Pattern: + pattern + Count: + count + Max: + maxDate.toString() + Min: + minDate.toString()); } return null; } } ); Error: 15/06/18 11:05:06 INFO BlockManagerInfo: Added input-0-1434639906000 in memory on localhost:42829 (size: 464.0 KB, free: 264.9 MB)15/06/18 11:05:06 INFO BlockGenerator: Pushed block input-0-1434639906000Exception in thread JobGenerator java.lang.NoSuchMethodError: org.apache.spark.api.java.JavaPairRDD.max(Ljava/util/Comparator;)Lscala/Tuple2; at org.necla.ngla.spark_streaming.MinMax.call(Type4ViolationChecker.java:346) at org.necla.ngla.spark_streaming.MinMax.call(Type4ViolationChecker.java:340) at org.apache.spark.streaming.api.java.JavaDStreamLike$class.scalaTransform$3(JavaDStreamLike.scala:360) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1.apply(JavaDStreamLike.scala:361) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1.apply(JavaDStreamLike.scala:361) at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonf
problem with pants building
Hi, We use pants to build python project to executable python file (pex). But we cannot run pex file until we add all necessary library paths to PYTHONPATH and use pip to install necessary packages for $SPARK_HOME/python/lib/pyspark.zip specially. Since we have already add the unzipped pyspark folder to our project for development, any idea if we could let it just refer to the pyspark folder inside our project? Thanks, lpx - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
problem with pants building
Hi, We use pants to build python project to executable python file (pex). But we cannot run pex file until we add all necessary library paths to PYTHONPATH and use pip to install necessary packages for $SPARK_HOME/python/lib/pyspark.zip specially. Since we have already add the unzipped pyspark folder to our project for development, any idea if we could let it just refer to the pyspark folder inside our project? Thanks, lpx - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: understanding on the waiting batches and scheduling delay in Streaming UI
Also, could you give a screenshot of the streaming UI. Even better, could you run it on Spark 1.4 which has a new streaming UI and then use that for debugging/screenshot? TD On Thu, Jun 18, 2015 at 3:05 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Which version of spark? and what is your data source? For some reason, your processing delay is exceeding the batch duration. And its strange that you are not seeing any scheduling delay. Thanks Best Regards On Thu, Jun 18, 2015 at 7:29 AM, Mike Fang chyfan...@gmail.com wrote: Hi, I have a spark streaming program running for ~ 25hrs. When I check the Streaming UI tab. I found the “Waiting batches” is 144. But the “scheduling delay” is 0. I am a bit confused. If the “waiting batches” is 144, that means many batches are waiting in the queue to be processed? If this is the case, the scheduling delay should be high rather than 0. Am I missing anything? Thanks, Mike
Re: Latency between the RDD in Streaming
Why do you need to uniquely identify the message? All you need is the time when the message was inserted by the receiver, and when it is processed, isnt it? On Thu, Jun 18, 2015 at 2:28 PM, anshu shukla anshushuk...@gmail.com wrote: Thanks alot , But i have already tried the second way ,Problem with that is that how to identify the particular RDD from source to sink (as we can do by passing a msg id in storm) . For that i just updated RDD and added a msgID (as static variable) . but while dumping them to file some of the tuples of RDD are failed/missed (approx 3000 and data rate is aprox 1500 tuples/sec). On Fri, Jun 19, 2015 at 2:50 AM, Tathagata Das t...@databricks.com wrote: Couple of ways. 1. Easy but approx way: Find scheduling delay and processing time using StreamingListener interface, and then calculate end-to-end delay = 0.5 * batch interval + scheduling delay + processing time. The 0.5 * batch inteval is the approx average batching delay across all the records in the batch. 2. Hard but precise way: You could build a custom receiver that embeds the current timestamp in the records, and then compare them with the timestamp at the final step of the records. Assuming the executor and driver clocks are reasonably in sync, this will measure the latency between the time is received by the system and the result from the record is available. On Thu, Jun 18, 2015 at 2:12 PM, anshu shukla anshushuk...@gmail.com wrote: Sorry , i missed the LATENCY word.. for a large streaming query .How to find the time taken by the particular RDD to travel from initial D-STREAM to final/last D-STREAM . Help Please !! On Fri, Jun 19, 2015 at 12:40 AM, Tathagata Das t...@databricks.com wrote: Its not clear what you are asking. Find what among RDD? On Thu, Jun 18, 2015 at 11:24 AM, anshu shukla anshushuk...@gmail.com wrote: Is there any fixed way to find among RDD in stream processing systems , in the Distributed set-up . -- Thanks Regards, Anshu Shukla -- Thanks Regards, Anshu Shukla -- Thanks Regards, Anshu Shukla
NaiveBayes for MLPipeline is absent
Hello, Currently, there is no NaiveBayes implementation for MLpipeline. I couldn't find the JIRA ticket related to it too (or maybe I missed). Is there a plan to implement it? If no one has the bandwidth, I can work on it. Thanks. Justin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NaiveBayes-for-MLPipeline-is-absent-tp23402.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: createDirectStream and Stats
Thanks for the super-fast response, TD :) I will now go bug my hadoop vendor to upgrade from 1.3 to 1.4. Cloudera, are you listening? :D On Thu, Jun 18, 2015 at 7:02 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Are you using Spark 1.3.x ? That explains. This issue has been fixed in Spark 1.4.0. Bonus you get a fancy new streaming UI with more awesome stats. :) On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith secs...@gmail.com wrote: Hi, I just switched from createStream to the createDirectStream API for kafka and while things otherwise seem happy, the first thing I noticed is that stream/receiver stats are gone from the Spark UI :( Those stats were very handy for keeping an eye on health of the app. What's the best way to re-create those in the Spark UI? Maintain Accumulators? Would be really nice to get back receiver-like stats even though I understand that createDirectStream is a receiver-less design. Thanks, Tim
Spark-sql(yarn-client) java.lang.NoClassDefFoundError: org/apache/spark/deploy/yarn/ExecutorLauncher
Hi, all: I want to run spark sql on yarn(yarn-client), but ... I already set spark.yarn.jar and spark.jars in conf/spark-defaults.conf. ./bin/spark-sql -f game.sql --executor-memory 2g --num-executors 100 game.txt Exception in thread main java.lang.NoClassDefFoundError: org/apache/spark/deploy/yarn/ExecutorLauncher Caused by: java.lang.ClassNotFoundException: org.apache.spark.deploy.yarn.ExecutorLauncher at java.net.URLClassLoader$1.run(URLClassLoader.java:202) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:190) at java.lang.ClassLoader.loadClass(ClassLoader.java:306) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301) at java.lang.ClassLoader.loadClass(ClassLoader.java:247) Could not find the main class: org.apache.spark.deploy.yarn.ExecutorLauncher. Program will exit. Anyone can help?
Re: SparkSubmit with Ivy jars is very slow to load with no internet access
Hey Nathan, I like the first idea better. Let's see what others think. I'd be happy to review your PR afterwards! Best, Burak On Thu, Jun 18, 2015 at 9:53 PM, Nathan McCarthy nathan.mccar...@quantium.com.au wrote: Hey, Spark Submit adds maven central spark bintray to the ChainResolver before it adds any external resolvers. https://github.com/apache/spark/blob/branch-1.4/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L821 When running on a cluster without internet access, this means the spark shell takes forever to launch as it tries these two remote repos before the ones specified in the --repositories list. In our case we have a proxy which the cluster can access it and supply it via —repositories. This is also a problem for users who maintain a proxy for maven/ivy repos with something like Nexus/Artifactory. I see two options for a fix; - Change the order repos are added to the ChainResolver, making the --repositories supplied repos come before anything else. https://github.com/apache/spark/blob/branch-1.4/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L843 - Have a config option (like spark.jars.ivy.useDefaultRemoteRepos, default true) which when false wont add the maven central bintry to the ChainResolver. Happy to do a PR now for this if someone can give me a recommendation on which option would be better. JIRA here; https://issues.apache.org/jira/browse/SPARK-8475 Cheers, Nathan
[SparkSQL]. MissingRequirementError when creating dataframe from RDD (new error in 1.4)
Since upgrading to Spark 1.4, I'm getting a scala.reflect.internal.MissingRequirementError when creating a DataFrame from an RDD. The error references a case class in the application (the RDD's type parameter), which has been verified to be present. Items of note: 1) This is running on AWS EMR (YARN). I do not get this error running locally (standalone). 2) Reverting to Spark 1.3.1 makes the problem go away 3) The jar file containing the referenced class (the app assembly jar) is not listed in the classpath expansion dumped in the error message. I have seen SPARK-5281, and am guessing that this is the root cause, especially since the code added there is involved in the stacktrace. That said, my grasp on scala reflection isn't strong enough to make sense of the change to say for sure. It certainly looks, though, that in this scenario the current thread's context classloader may not be what we think it is (given #3 above). Any ideas? App code: def registerTable[A : Product : TypeTag](name: String, rdd: RDD[A])(implicit hc: HiveContext) = { val df = hc.createDataFrame(rdd) df.registerTempTable(name) } Stack trace: scala.reflect.internal.MissingRequirementError: class comMyClass in JavaMirror with sun.misc.Launcher$AppClassLoader@d16e5d6 of type class sun.misc.Launcher$AppClassLoader with classpath [ lots and lots of paths and jars, but not the app assembly jar] not found at scala.reflect.internal.MissingRequirementError$.signal(MissingRequirementError.scala:16) at scala.reflect.internal.MissingRequirementError$.notFound(MissingRequirementError.scala:17) at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:48) at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:61) at scala.reflect.internal.Mirrors$RootsBase.staticModuleOrClass(Mirrors.scala:72) at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:119) at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:21) at com.ipcoop.spark.sql.SqlEnv$$typecreator1$1.apply(SqlEnv.scala:87) at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:231) at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:231) at org.apache.spark.sql.catalyst.ScalaReflection$class.localTypeOf(ScalaReflection.scala:71) at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:59) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:28) at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:410) ... app code... P.S. It looks as though I am not the only one facing this issue. A colleague ran into it independently, and has also been reported here: https://www.mail-archive.com/user@spark.apache.org/msg30302.html - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Does MLLib has attribute importance?
Running l1 and picking non zero coefficient s gives a good estimate of interesting features as well... On Jun 17, 2015 4:51 PM, Xiangrui Meng men...@gmail.com wrote: We don't have it in MLlib. The closest would be the ChiSqSelector, which works for categorical data. -Xiangrui On Thu, Jun 11, 2015 at 4:33 PM, Ruslan Dautkhanov dautkha...@gmail.com wrote: What would be closest equivalent in MLLib to Oracle Data Miner's Attribute Importance mining function? http://docs.oracle.com/cd/B28359_01/datamine.111/b28129/feature_extr.htm#i1005920 Attribute importance is a supervised function that ranks attributes according to their significance in predicting a target. Best regards, Ruslan Dautkhanov - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: createDirectStream and Stats
Are you using Spark 1.3.x ? That explains. This issue has been fixed in Spark 1.4.0. Bonus you get a fancy new streaming UI with more awesome stats. :) On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith secs...@gmail.com wrote: Hi, I just switched from createStream to the createDirectStream API for kafka and while things otherwise seem happy, the first thing I noticed is that stream/receiver stats are gone from the Spark UI :( Those stats were very handy for keeping an eye on health of the app. What's the best way to re-create those in the Spark UI? Maintain Accumulators? Would be really nice to get back receiver-like stats even though I understand that createDirectStream is a receiver-less design. Thanks, Tim
Re: HiveContext saveAsTable create wrong partition
Are you writing to an existing hive orc table? On Wed, Jun 17, 2015 at 3:25 PM, Cheng Lian lian.cs@gmail.com wrote: Thanks for reporting this. Would you mind to help creating a JIRA for this? On 6/16/15 2:25 AM, patcharee wrote: I found if I move the partitioned columns in schemaString and in Row to the end of the sequence, then it works correctly... On 16. juni 2015 11:14, patcharee wrote: Hi, I am using spark 1.4 and HiveContext to append data into a partitioned hive table. I found that the data insert into the table is correct, but the partition(folder) created is totally wrong. Below is my code snippet --- val schemaString = zone z year month date hh x y height u v w ph phb t p pb qvapor qgraup qnice qnrain tke_pbl el_pbl val schema = StructType( schemaString.split( ).map(fieldName = if (fieldName.equals(zone) || fieldName.equals(z) || fieldName.equals(year) || fieldName.equals(month) || fieldName.equals(date) || fieldName.equals(hh) || fieldName.equals(x) || fieldName.equals(y)) StructField(fieldName, IntegerType, true) else StructField(fieldName, FloatType, true) )) val pairVarRDD = sc.parallelize(Seq((Row(2,42,2009,3,1,0,218,365,9989.497.floatValue(),29.627113.floatValue(),19.071793.floatValue(),0.11982734.floatValue(),3174.6812.floatValue(), 97735.2.floatValue(),16.389032.floatValue(),-96.62891.floatValue(),25135.365.floatValue(),2.6476808E-5.floatValue(),0.0.floatValue(),13195.351.floatValue(), 0.0.floatValue(),0.1.floatValue(),0.0.floatValue())) )) val partitionedTestDF2 = sqlContext.createDataFrame(pairVarRDD, schema) partitionedTestDF2.write.format(org.apache.spark.sql.hive.orc.DefaultSource) .mode(org.apache.spark.sql.SaveMode.Append).partitionBy(zone,z,year,month).saveAsTable(test4DimBySpark) --- The table contains 23 columns (longer than Tuple maximum length), so I use Row Object to store raw data, not Tuple. Here is some message from spark when it saved data 15/06/16 10:39:22 INFO metadata.Hive: Renaming src:hdfs://service-10-0.local:8020/tmp/hive-patcharee/hive_2015-06-16_10-39-21_205_8768669104487548472-1/-ext-1/zone=13195/z=0/year=0/month=0/part-1;dest: hdfs://service-10-0.local:8020/apps/hive/warehouse/test4dimBySpark/zone=13195/z=0/year=0/month=0/part-1;Status:true 15/06/16 10:39:22 INFO metadata.Hive: New loading path = hdfs://service-10-0.local:8020/tmp/hive-patcharee/hive_2015-06-16_10-39-21_205_8768669104487548472-1/-ext-1/zone=13195/z=0/year=0/month=0 with partSpec {zone=13195, z=0, year=0, month=0} From the raw data (pairVarRDD) zone = 2, z = 42, year = 2009, month = 3. But spark created a partition {zone=13195, z=0, year=0, month=0}. When I queried from hive hive select * from test4dimBySpark; OK 242200931.00.0218.0365.09989.497 29.62711319.0717930.11982734-3174.681297735.2 16.389032 -96.6289125135.3652.6476808E-50.0 13195000 hive select zone, z, year, month from test4dimBySpark; OK 13195000 hive dfs -ls /apps/hive/warehouse/test4dimBySpark/*/*/*/*; Found 2 items -rw-r--r-- 3 patcharee hdfs 1411 2015-06-16 10:39 /apps/hive/warehouse/test4dimBySpark/zone=13195/z=0/year=0/month=0/part-1 The data stored in the table is correct zone = 2, z = 42, year = 2009, month = 3, but the partition created was wrong zone=13195/z=0/year=0/month=0 Is this a bug or what could be wrong? Any suggestion is appreciated. BR, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: RE: Spark or Storm
More details on the Direct API of Spark 1.3 is at the databricks blog: https://databricks.com/blog/2015/03/30/improvements-to-kafka-integration-of-spark-streaming.html Note the use of checkpoints to persist the Kafka offsets in Spark Streaming itself, and not in zookeeper. Also this statement:”.. This allows one to build a Spark Streaming + Kafka pipelines with end-to-end exactly-once semantics (if your updates to downstream systems are idempotent or transactional).” From: Cody Koeninger [mailto:c...@koeninger.org] Sent: 18 June 2015 19:38 To: bit1...@163.com Cc: Prajod S Vettiyattil (WT01 - BAS); jrpi...@gmail.com; eshi...@gmail.com; wrbri...@gmail.com; asoni.le...@gmail.com; ayan guha; user; sateesh.kav...@gmail.com; sparkenthusi...@yahoo.in; sabarish.sasidha...@manthan.com Subject: Re: RE: Spark or Storm That general description is accurate, but not really a specific issue of the direct steam. It applies to anything consuming from kafka (or, as Matei already said, any streaming system really). You can't have exactly once semantics, unless you know something more about how you're storing results. For some unique id, topicpartition and offset is usually the obvious choice, which is why it's important that the direct stream gives you access to the offsets. See https://github.com/koeninger/kafka-exactly-once for more info On Thu, Jun 18, 2015 at 6:47 AM, bit1...@163.commailto:bit1...@163.com bit1...@163.commailto:bit1...@163.com wrote: I am wondering how direct stream api ensures end-to-end exactly once semantics I think there are two things involved: 1. From the spark streaming end, the driver will replay the Offset range when it's down and restarted,which means that the new tasks will process some already processed data. 2. From the user end, since tasks may process already processed data, user end should detect that some data has already been processed,eg, use some unique ID. Not sure if I have understood correctly. bit1...@163.commailto:bit1...@163.com From: prajod.vettiyat...@wipro.commailto:prajod.vettiyat...@wipro.com Date: 2015-06-18 16:56 To: jrpi...@gmail.commailto:jrpi...@gmail.com; eshi...@gmail.commailto:eshi...@gmail.com CC: wrbri...@gmail.commailto:wrbri...@gmail.com; asoni.le...@gmail.commailto:asoni.le...@gmail.com; guha.a...@gmail.commailto:guha.a...@gmail.com; user@spark.apache.orgmailto:user@spark.apache.org; sateesh.kav...@gmail.commailto:sateesh.kav...@gmail.com; sparkenthusi...@yahoo.inmailto:sparkenthusi...@yahoo.in; sabarish.sasidha...@manthan.commailto:sabarish.sasidha...@manthan.com Subject: RE: Spark or Storm not being able to read from Kafka using multiple nodes Kafka is plenty capable of doing this.. I faced the same issue before Spark 1.3 was released. The issue was not with Kafka, but with Spark Streaming’s Kafka connector. Before Spark 1.3.0 release one Spark worker would get all the streamed messages. We had to re-partition to distribute the processing. From Spark 1.3.0 release the Spark Direct API for Kafka supported parallel reads from Kafka streamed to Spark workers. See the “Approach 2: Direct Approach” in this page: http://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html. Note that is also mentions zero data loss and exactly once semantics for kafka integration. Prajod From: Jordan Pilat [mailto:jrpi...@gmail.commailto:jrpi...@gmail.com] Sent: 18 June 2015 03:57 To: Enno Shioji Cc: Will Briggs; asoni.le...@gmail.commailto:asoni.le...@gmail.com; ayan guha; user; Sateesh Kavuri; Spark Enthusiast; Sabarish Sasidharan Subject: Re: Spark or Storm not being able to read from Kafka using multiple nodes Kafka is plenty capable of doing this, by clustering together multiple consumer instances into a consumer group. If your topic is sufficiently partitioned, the consumer group can consume the topic in a parallelized fashion. If it isn't, you still have the fault tolerance associated with clustering the consumers. OK JRP On Jun 17, 2015 1:27 AM, Enno Shioji eshi...@gmail.commailto:eshi...@gmail.com wrote: We've evaluated Spark Streaming vs. Storm and ended up sticking with Storm. Some of the important draw backs are: Spark has no back pressure (receiver rate limit can alleviate this to a certain point, but it's far from ideal) There is also no exactly-once semantics. (updateStateByKey can achieve this semantics, but is not practical if you have any significant amount of state because it does so by dumping the entire state on every checkpointing) There are also some minor drawbacks that I'm sure will be fixed quickly, like no task timeout, not being able to read from Kafka using multiple nodes, data loss hazard with Kafka. It's also not possible to attain very low latency in Spark, if that's what you need. The pos for Spark is the concise and IMO more intuitive syntax, especially if you compare it with Storm's Java API. I admit I might be a bit biased towards
Re: HiveContext saveAsTable create wrong partition
If you are writing to an existing hive table, our insert into operator follows hive's requirement, which is *the dynamic partition columns must be specified last among the columns in the SELECT statement and in the same order** in which they appear in the PARTITION() clause*. You can find requirement in https://cwiki.apache.org/confluence/display/Hive/DynamicPartitions. If you use select to reorder columns, I think it should work. Also, since the table is an existing hive table, you do not need to specify the format because we will use the format of existing table. btw, please feel free to open a jira about removing this requirement for inserting into an existing hive table. Thanks, Yin On Thu, Jun 18, 2015 at 9:39 PM, Yin Huai yh...@databricks.com wrote: Are you writing to an existing hive orc table? On Wed, Jun 17, 2015 at 3:25 PM, Cheng Lian lian.cs@gmail.com wrote: Thanks for reporting this. Would you mind to help creating a JIRA for this? On 6/16/15 2:25 AM, patcharee wrote: I found if I move the partitioned columns in schemaString and in Row to the end of the sequence, then it works correctly... On 16. juni 2015 11:14, patcharee wrote: Hi, I am using spark 1.4 and HiveContext to append data into a partitioned hive table. I found that the data insert into the table is correct, but the partition(folder) created is totally wrong. Below is my code snippet --- val schemaString = zone z year month date hh x y height u v w ph phb t p pb qvapor qgraup qnice qnrain tke_pbl el_pbl val schema = StructType( schemaString.split( ).map(fieldName = if (fieldName.equals(zone) || fieldName.equals(z) || fieldName.equals(year) || fieldName.equals(month) || fieldName.equals(date) || fieldName.equals(hh) || fieldName.equals(x) || fieldName.equals(y)) StructField(fieldName, IntegerType, true) else StructField(fieldName, FloatType, true) )) val pairVarRDD = sc.parallelize(Seq((Row(2,42,2009,3,1,0,218,365,9989.497.floatValue(),29.627113.floatValue(),19.071793.floatValue(),0.11982734.floatValue(),3174.6812.floatValue(), 97735.2.floatValue(),16.389032.floatValue(),-96.62891.floatValue(),25135.365.floatValue(),2.6476808E-5.floatValue(),0.0.floatValue(),13195.351.floatValue(), 0.0.floatValue(),0.1.floatValue(),0.0.floatValue())) )) val partitionedTestDF2 = sqlContext.createDataFrame(pairVarRDD, schema) partitionedTestDF2.write.format(org.apache.spark.sql.hive.orc.DefaultSource) .mode(org.apache.spark.sql.SaveMode.Append).partitionBy(zone,z,year,month).saveAsTable(test4DimBySpark) --- The table contains 23 columns (longer than Tuple maximum length), so I use Row Object to store raw data, not Tuple. Here is some message from spark when it saved data 15/06/16 10:39:22 INFO metadata.Hive: Renaming src:hdfs://service-10-0.local:8020/tmp/hive-patcharee/hive_2015-06-16_10-39-21_205_8768669104487548472-1/-ext-1/zone=13195/z=0/year=0/month=0/part-1;dest: hdfs://service-10-0.local:8020/apps/hive/warehouse/test4dimBySpark/zone=13195/z=0/year=0/month=0/part-1;Status:true 15/06/16 10:39:22 INFO metadata.Hive: New loading path = hdfs://service-10-0.local:8020/tmp/hive-patcharee/hive_2015-06-16_10-39-21_205_8768669104487548472-1/-ext-1/zone=13195/z=0/year=0/month=0 with partSpec {zone=13195, z=0, year=0, month=0} From the raw data (pairVarRDD) zone = 2, z = 42, year = 2009, month = 3. But spark created a partition {zone=13195, z=0, year=0, month=0}. When I queried from hive hive select * from test4dimBySpark; OK 242200931.00.0218.0365.09989.497 29.62711319.0717930.11982734-3174.681297735.2 16.389032 -96.6289125135.3652.6476808E-50.0 13195000 hive select zone, z, year, month from test4dimBySpark; OK 13195000 hive dfs -ls /apps/hive/warehouse/test4dimBySpark/*/*/*/*; Found 2 items -rw-r--r-- 3 patcharee hdfs 1411 2015-06-16 10:39 /apps/hive/warehouse/test4dimBySpark/zone=13195/z=0/year=0/month=0/part-1 The data stored in the table is correct zone = 2, z = 42, year = 2009, month = 3, but the partition created was wrong zone=13195/z=0/year=0/month=0 Is this a bug or what could be wrong? Any suggestion is appreciated. BR, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail:
Re: Error when connecting to Spark SQL via Hive JDBC driver
hello, I am not sure what is wrong.. But, in my case, I followed the instruction from http://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/HiveJDBCDriver.html. It worked fine with SQuirreL SQL Client (http://squirrel-sql.sourceforge.net/), and SQL Workbench J (http://www.sql-workbench.net/). -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-connecting-to-Spark-SQL-via-Hive-JDBC-driver-tp23397p23403.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [SparkSQL]. MissingRequirementError when creating dataframe from RDD (new error in 1.4)
Thanks for reporting. Filed as: https://issues.apache.org/jira/browse/SPARK-8470 On Thu, Jun 18, 2015 at 5:35 PM, Adam Lewandowski adam.lewandow...@gmail.com wrote: Since upgrading to Spark 1.4, I'm getting a scala.reflect.internal.MissingRequirementError when creating a DataFrame from an RDD. The error references a case class in the application (the RDD's type parameter), which has been verified to be present. Items of note: 1) This is running on AWS EMR (YARN). I do not get this error running locally (standalone). 2) Reverting to Spark 1.3.1 makes the problem go away 3) The jar file containing the referenced class (the app assembly jar) is not listed in the classpath expansion dumped in the error message. I have seen SPARK-5281, and am guessing that this is the root cause, especially since the code added there is involved in the stacktrace. That said, my grasp on scala reflection isn't strong enough to make sense of the change to say for sure. It certainly looks, though, that in this scenario the current thread's context classloader may not be what we think it is (given #3 above). Any ideas? App code: def registerTable[A : Product : TypeTag](name: String, rdd: RDD[A])(implicit hc: HiveContext) = { val df = hc.createDataFrame(rdd) df.registerTempTable(name) } Stack trace: scala.reflect.internal.MissingRequirementError: class comMyClass in JavaMirror with sun.misc.Launcher$AppClassLoader@d16e5d6 of type class sun.misc.Launcher$AppClassLoader with classpath [ lots and lots of paths and jars, but not the app assembly jar] not found at scala.reflect.internal.MissingRequirementError$.signal(MissingRequirementError.scala:16) at scala.reflect.internal.MissingRequirementError$.notFound(MissingRequirementError.scala:17) at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:48) at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:61) at scala.reflect.internal.Mirrors$RootsBase.staticModuleOrClass(Mirrors.scala:72) at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:119) at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:21) at com.ipcoop.spark.sql.SqlEnv$$typecreator1$1.apply(SqlEnv.scala:87) at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:231) at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:231) at org.apache.spark.sql.catalyst.ScalaReflection$class.localTypeOf(ScalaReflection.scala:71) at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:59) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:28) at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:410) ... app code... P.S. It looks as though I am not the only one facing this issue. A colleague ran into it independently, and has also been reported here: https://www.mail-archive.com/user@spark.apache.org/msg30302.html - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: confusing ScalaReflectionException with DataFrames in 1.4
I saw another report so I filed it already: Filed as: https://issues.apache.org/jira/browse/SPARK-8470 On Thu, Jun 18, 2015 at 4:07 PM, Chad Urso McDaniel cha...@gmail.com wrote: We're using the normal command line: --- bin/spark-submit --properties-file ./spark-submit.conf --class com.rr.data.visits.VisitSequencerRunner ./mvt-master-SNAPSHOT-jar-with-dependencies.jar --- Our jar contains both com.rr.data.visits.orc.OrcReadWrite (which you can see in the stack trace) and the unfound com.rr.data.Visit. I'll open a Jira ticket On Thu, Jun 18, 2015 at 3:26 PM Michael Armbrust mich...@databricks.com wrote: How are you adding com.rr.data.Visit to spark? With --jars? It is possible we are using the wrong classloader. Could you open a JIRA? On Thu, Jun 18, 2015 at 2:56 PM, Chad Urso McDaniel cha...@gmail.com wrote: We are seeing class exceptions when converting to a DataFrame. Anyone out there with some suggestions on what is going on? Our original intention was to use a HiveContext to write ORC and we say the error there and have narrowed it down. This is an example of our code: --- def saveVisitsAsOrcFile(sqlContext: SQLContext, rdd: RDD[Visit], outputDir: String) { // works!: println(rdd count: + rdd.map(_.clicks.size).sum) import sqlContext.implicits._ // scala.ScalaReflectionException: class com.rr.data.Visit print(rdd.toDF.count: + rdd .toDF() .count()) --- This runs locally, but when using spark-submit with 1.4 we get: Exception in thread main scala.ScalaReflectionException: class com.rr.data.Visit in JavaMirror with sun.misc.Launcher$AppClassLoader@5c647e05 of type class sun.misc.Launcher$AppClassLoader with classpath [file:/home/candiru/tewfik/,file:/home/candiru/tewfik/spark-1.4.0-bin-tewfik-spark/conf/,file:/home/candiru/tewfik/spark-1.4.0-bin-tewfik-spark/lib/spark-assembly-1.4.0-hadoop2.0.0-mr1-cdh4.2.0.jar,file:/home/candiru/tewfik/spark-1.4.0-bin-tewfik-spark/lib/datanucleus-api-jdo-3.2.6.jar,file:/home/candiru/tewfik/spark-1.4.0-bin-tewfik-spark/lib/datanucleus-core-3.2.10.jar,file:/home/candiru/tewfik/spark-1.4.0-bin-tewfik-spark/lib/datanucleus-rdbms-3.2.9.jar] and parent being sun.misc.Launcher$ExtClassLoader@1c79d093 of type class sun.misc.Launcher$ExtClassLoader with classpath [file:/usr/java/jdk1.8.0_05/jre/lib/ext/cldrdata.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/dnsns.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/jfxrt.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/localedata.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/nashorn.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/sunec.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/sunjce_provider.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/sunpkcs11.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/zipfs.jar] and parent being primordial classloader with boot classpath [/usr/java/jdk1.8.0_05/jre/lib/resources.jar:/usr/java/jdk1.8.0_05/jre/lib/rt.jar:/usr/java/jdk1.8.0_05/jre/lib/sunrsasign.jar:/usr/java/jdk1.8.0_05/jre/lib/jsse.jar:/usr/java/jdk1.8.0_05/jre/lib/jce.jar:/usr/java/jdk1.8.0_05/jre/lib/charsets.jar:/usr/java/jdk1.8.0_05/jre/lib/jfr.jar:/usr/java/jdk1.8.0_05/jre/classes] not found. at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:123) at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:22) at com.rr.data.visits.orc.OrcReadWrite$$typecreator2$1.apply(OrcReadWrite.scala:36) at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:232) at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:232) at org.apache.spark.sql.catalyst.ScalaReflection$class.localTypeOf(ScalaReflection.scala:71) at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:59) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:28) at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:410) at org.apache.spark.sql.SQLContext$implicits$.rddToDataFrameHolder(SQLContext.scala:335) at com.rr.data.visits.orc.OrcReadWrite$.saveVisitsAsOrcFile(OrcReadWrite.scala:36) at com.rr.data.visits.VisitSequencerRunner$.main(VisitSequencerRunner.scala:43) at com.rr.data.visits.VisitSequencerRunner.main(VisitSequencerRunner.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192) at
Build spark application into uber jar
Hi,sparks, I have a spark streaming application that is a maven project, I would like to build it into a uber jar and run in the cluster. I have found out two options to build the uber jar, either of them has its shortcomings, so I would ask how you guys do it. Thanks. 1. Use the maven shade jar, and I have marked the spark related stuff as provided in the pom.xml, like: dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version${spark.version}/version scopeprovided/scope /dependency With this, looks it can build the uber jar, but when I run the application locally, it complains that spark related stuff is missing which is not surprising because the spark related things are marked as provided, which will not included in runtime time 2. Instead of marking the spark things as provided, i configure the maven shade plugin to exclude the spark things as following, but there are still many things are there. executions execution phasepackage/phase goals goalshade/goal /goals configuration artifactSet excludes excludejunit:junit/exclude excludelog4j:log4j:jar:/exclude excludeorg.scala-lang:scala-library:jar:/exclude excludeorg.apache.spark:spark-core_2.10/exclude excludeorg.apache.spark:spark-sql_2.10/exclude excludeorg.apache.spark:spark-streaming_2.10/exclude /excludes /artifactSet /configuration Does someone ever build uber jar for the spark application, I would like to see how you do it, thanks! bit1...@163.com
Interaction between StringIndexer feature transformer and CrossValidator
Hi, I encountered errors fitting a model using a CrossValidator. The training set contained a feature which was initially a String with many unique values. I used a StringIndexer to transform this feature column into label indices. Fitting a model with a regular pipeline worked fine, but I ran into the following error when I introduced the CrossValidator: 15/06/18 16:30:18 ERROR Executor: Exception in task 1.0 in stage 70.0 (TID 156) org.apache.spark.SparkException: Unseen label: 2456. at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:120) at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:115) at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2.apply(ScalaUdf.scala:71) at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2.apply(ScalaUdf.scala:70) at org.apache.spark.sql.catalyst.expressions.ScalaUdf.eval(ScalaUdf.scala:960) I think the pipeline with cross validation is applying the StringIndexer transformation to the training folds but not the test fold. When the pipeline encounters a previously unseen label in the test fold, it breaks down. When I whittled down the feature set to only contain low-cardinality categorical features, the pipeline behaved. Is this behavior desired? If I'm understanding this correctly, it would be great to have some more graceful error handling. My code is at https://gist.github.com/chelseaz/7ead2c0f25e2dd7fe5d9 Thanks, Chelsea -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Interaction-between-StringIndexer-feature-transformer-and-CrossValidator-tp23401.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: If not stop StreamingContext gracefully, will checkpoint data be consistent?
Akhil, From my test, I can see the files in the last batch will alwyas be reprocessed upon restarting from checkpoint even for graceful shutdown. I think usually the file is expected to be processed only once. Maybe this is a bug in fileStream? or do you know any approach to workaround it? Much thanks! From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Tuesday, June 16, 2015 3:26 PM To: Haopu Wang Cc: user Subject: Re: If not stop StreamingContext gracefully, will checkpoint data be consistent? Good question, with fileStream or textFileStream basically it will only takes in the files whose timestamp is the current timestamp https://github.com/apache/spark/blob/3c0156899dc1ec1f7dfe6d7c8af47fa6dc 7d00bf/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileI nputDStream.scala#L172 and when checkpointing is enabled https://github.com/apache/spark/blob/3c0156899dc1ec1f7dfe6d7c8af47fa6dc 7d00bf/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileI nputDStream.scala#L324 it would restore the latest filenames from the checkpoint directory which i believe will kind of reprocess some files. Thanks Best Regards On Mon, Jun 15, 2015 at 2:49 PM, Haopu Wang hw...@qilinsoft.com wrote: Akhil, thank you for the response. I want to explore more. If the application is just monitoring a HDFS folder and output the word count of each streaming batch into also HDFS. When I kill the application _before_ spark takes a checkpoint, after recovery, spark will resume the processing from the timestamp of latest checkpoint. That means some files will be processed twice and duplicate results are generated. Please correct me if the understanding is wrong, thanks again! From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Monday, June 15, 2015 3:48 PM To: Haopu Wang Cc: user Subject: Re: If not stop StreamingContext gracefully, will checkpoint data be consistent? I think it should be fine, that's the whole point of check-pointing (in case of driver failure etc). Thanks Best Regards On Mon, Jun 15, 2015 at 6:54 AM, Haopu Wang hw...@qilinsoft.com wrote: Hi, can someone help to confirm the behavior? Thank you! -Original Message- From: Haopu Wang Sent: Friday, June 12, 2015 4:57 PM To: user Subject: If not stop StreamingContext gracefully, will checkpoint data be consistent? This is a quick question about Checkpoint. The question is: if the StreamingContext is not stopped gracefully, will the checkpoint be consistent? Or I should always gracefully shutdown the application even in order to use the checkpoint? Thank you very much! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Shuffle produces one huge partition and many tiny partitions
Sorry Du, Repartition means coalesce(shuffle = true) as per [1]. They are the same operation. Coalescing with shuffle = false means you are specifying the max amount of partitions after the coalesce (if there are less partitions you will end up with the lesser amount. [1] https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L341 On Thu, Jun 18, 2015 at 7:55 PM, Du Li l...@yahoo-inc.com.invalid wrote: repartition() means coalesce(shuffle=false) On Thursday, June 18, 2015 4:07 PM, Corey Nolet cjno...@gmail.com wrote: Doesn't repartition call coalesce(shuffle=true)? On Jun 18, 2015 6:53 PM, Du Li l...@yahoo-inc.com.invalid wrote: I got the same problem with rdd,repartition() in my streaming app, which generated a few huge partitions and many tiny partitions. The resulting high data skew makes the processing time of a batch unpredictable and often exceeding the batch interval. I eventually solved the problem by using rdd.coalesce() instead, which however is expensive as it yields a lot of shuffle traffic and also takes a long time. Du On Thursday, June 18, 2015 1:00 AM, Al M alasdair.mcbr...@gmail.com wrote: Thanks for the suggestion. Repartition didn't help us unfortunately. It still puts everything into the same partition. We did manage to improve the situation by making a new partitioner that extends HashPartitioner. It treats certain exception keys differently. These keys that are known to appear very often are assigned random partitions instead of using the existing partitioning mechanism. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-produces-one-huge-partition-and-many-tiny-partitions-tp23358p23387.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Coalescing with shuffle = false in imbalanced cluster
I'm confused about this. The comment on the function seems to indicate that there is absolutely no shuffle or network IO but it also states that it assigns an even number of parent partitions to each final partition group. I'm having trouble seeing how this can be guaranteed without some data passing around nodes. For instance, lets saying I have 5 machines and 10 partitions but the way the partitions are layed out is machines 1, 2, and 3 each have 3 partitions while machine 4 only has 1 partition and machine 5 has none. Am I to assume that coalesce(4, false) will the 3 partitions on nodes 1, 2, and 3 each to 1 partition while node 4 will just remain 1 partition? Thanks.
how to change /tmp folder for spark ut use sbt
hi,all if i want to change the /tmp folder to any other folder for spark ut use sbt,how can i do?
SparkSubmit with Ivy jars is very slow to load with no internet access
Hey, Spark Submit adds maven central spark bintray to the ChainResolver before it adds any external resolvers. https://github.com/apache/spark/blob/branch-1.4/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L821 When running on a cluster without internet access, this means the spark shell takes forever to launch as it tries these two remote repos before the ones specified in the --repositories list. In our case we have a proxy which the cluster can access it and supply it via —repositories. This is also a problem for users who maintain a proxy for maven/ivy repos with something like Nexus/Artifactory. I see two options for a fix; * Change the order repos are added to the ChainResolver, making the --repositories supplied repos come before anything else. https://github.com/apache/spark/blob/branch-1.4/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L843 * Have a config option (like spark.jars.ivy.useDefaultRemoteRepos, default true) which when false wont add the maven central bintry to the ChainResolver. Happy to do a PR now for this if someone can give me a recommendation on which option would be better. JIRA here; https://issues.apache.org/jira/browse/SPARK-8475 Cheers, Nathan
createDirectStream and Stats
Hi, I just switched from createStream to the createDirectStream API for kafka and while things otherwise seem happy, the first thing I noticed is that stream/receiver stats are gone from the Spark UI :( Those stats were very handy for keeping an eye on health of the app. What's the best way to re-create those in the Spark UI? Maintain Accumulators? Would be really nice to get back receiver-like stats even though I understand that createDirectStream is a receiver-less design. Thanks, Tim
Re: [Spark Streaming] Runtime Error in call to max function for JavaPairRDD
Hi Tathagata, When you say please mark spark-core and spark-streaming as dependencies how do you mean? I have installed the pre-build spark-1.4 for Hadoop 2.6 from spark downloads. In my maven pom.xml, I am using version 1.4 as described. Please let me know how I can fix that? Thanks Nipun On Thu, Jun 18, 2015 at 4:22 PM, Tathagata Das t...@databricks.com wrote: I think you may be including a different version of Spark Streaming in your assembly. Please mark spark-core nd spark-streaming as provided dependencies. Any installation of Spark will automatically provide Spark in the classpath so you do not have to bundle it. On Thu, Jun 18, 2015 at 8:44 AM, Nipun Arora nipunarora2...@gmail.com wrote: Hi, I have the following piece of code, where I am trying to transform a spark stream and add min and max to it of eachRDD. However, I get an error saying max call does not exist, at run-time (compiles properly). I am using spark-1.4 I have added the question to stackoverflow as well: http://stackoverflow.com/questions/30902090/adding-max-and-min-in-spark-stream-in-java/30909796#30909796 Any help is greatly appreciated :) Thanks Nipun JavaPairDStreamTuple2Long, Integer, Tuple3Integer,Long,Long sortedtsStream = transformedMaxMintsStream.transformToPair(new Sort2()); sortedtsStream.foreach( new FunctionJavaPairRDDTuple2Long, Integer, Tuple3Integer, Long, Long, Void() { @Override public Void call(JavaPairRDDTuple2Long, Integer, Tuple3Integer, Long, Long tuple2Tuple3JavaPairRDD) throws Exception { ListTuple2Tuple2Long, Integer, Tuple3Integer,Long,Long templist = tuple2Tuple3JavaPairRDD.collect(); for(Tuple2Tuple2Long,Integer, Tuple3Integer,Long,Long tuple :templist){ Date date = new Date(tuple._1._1); int pattern = tuple._1._2; int count = tuple._2._1(); Date maxDate = new Date(tuple._2._2()); Date minDate = new Date(tuple._2._2()); System.out.println(TimeSlot: + date.toString() + Pattern: + pattern + Count: + count + Max: + maxDate.toString() + Min: + minDate.toString()); } return null; } } ); Error: 15/06/18 11:05:06 INFO BlockManagerInfo: Added input-0-1434639906000 in memory on localhost:42829 (size: 464.0 KB, free: 264.9 MB)15/06/18 11:05:06 INFO BlockGenerator: Pushed block input-0-1434639906000Exception in thread JobGenerator java.lang.NoSuchMethodError: org.apache.spark.api.java.JavaPairRDD.max(Ljava/util/Comparator;)Lscala/Tuple2; at org.necla.ngla.spark_streaming.MinMax.call(Type4ViolationChecker.java:346) at org.necla.ngla.spark_streaming.MinMax.call(Type4ViolationChecker.java:340) at org.apache.spark.streaming.api.java.JavaDStreamLike$class.scalaTransform$3(JavaDStreamLike.scala:360) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1.apply(JavaDStreamLike.scala:361) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1.apply(JavaDStreamLike.scala:361) at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonf
Specify number of partitions with which to run DataFrame.join?
Hi everyone, I¹m looking into switching raw RDD operations to DataFrames operations. When I used JavaPairRDD.join(), I had the option to specify the number of partitions with which to do the join. However, I don¹t see an equivalent option in DataFrame.join(). Is there a way to specify the partitioning for a DataFrame join operation as it is being computed? Or do I have to compute the join and repartition separately after? Thanks, -Matt Cheah smime.p7s Description: S/MIME cryptographic signature
Re: Spark Streming yarn-cluster Mode Off-heap Memory Is Constantly Growing
Glad to hear that. :) On Thu, Jun 18, 2015 at 6:25 AM, Ji ZHANG zhangj...@gmail.com wrote: Hi, We switched from ParallelGC to CMS, and the symptom is gone. On Thu, Jun 4, 2015 at 3:37 PM, Ji ZHANG zhangj...@gmail.com wrote: Hi, I set spark.shuffle.io.preferDirectBufs to false in SparkConf and this setting can be seen in web ui's environment tab. But, it still eats memory, i.e. -Xmx set to 512M but RES grows to 1.5G in half a day. On Wed, Jun 3, 2015 at 12:02 PM, Shixiong Zhu zsxw...@gmail.com wrote: Could you set spark.shuffle.io.preferDirectBufs to false to turn off the off-heap allocation of netty? Best Regards, Shixiong Zhu 2015-06-03 11:58 GMT+08:00 Ji ZHANG zhangj...@gmail.com: Hi, Thanks for you information. I'll give spark1.4 a try when it's released. On Wed, Jun 3, 2015 at 11:31 AM, Tathagata Das t...@databricks.com wrote: Could you try it out with Spark 1.4 RC3? Also pinging, Cloudera folks, they may be aware of something. BTW, the way I have debugged memory leaks in the past is as follows. Run with a small driver memory, say 1 GB. Periodically (maybe a script), take snapshots of histogram and also do memory dumps. Say every hour. And then compare the difference between two histo/dumps that are few hours separated (more the better). Diffing histo is easy. Diff two dumps can be done in JVisualVM, it will show the diff in the objects that got added in the later dump. That makes it easy to debug what is not getting cleaned. TD On Tue, Jun 2, 2015 at 7:33 PM, Ji ZHANG zhangj...@gmail.com wrote: Hi, Thanks for you reply. Here's the top 30 entries of jmap -histo:live result: num #instances #bytes class name -- 1: 40802 145083848 [B 2: 99264 12716112 methodKlass 3: 99264 12291480 constMethodKlass 4: 84729144816 constantPoolKlass 5: 84727625192 instanceKlassKlass 6: 1866097824 [Lscala.concurrent.forkjoin.ForkJoinTask; 7: 70454804832 constantPoolCacheKlass 8:1391684453376 java.util.HashMap$Entry 9: 94273542512 methodDataKlass 10:1413123391488 io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry 11:1354913251784 java.lang.Long 12: 261922765496 [C 13: 8131140560 [Ljava.util.HashMap$Entry; 14: 89971061936 java.lang.Class 15: 16022 851384 [[I 16: 16447 789456 java.util.zip.Inflater 17: 13855 723376 [S 18: 17282 691280 java.lang.ref.Finalizer 19: 25725 617400 java.lang.String 20: 320 570368 [Lio.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry; 21: 16066 514112 java.util.concurrent.ConcurrentHashMap$HashEntry 22: 12288 491520 org.jboss.netty.util.internal.ConcurrentIdentityHashMap$Segment 23: 13343 426976 java.util.concurrent.locks.ReentrantLock$NonfairSync 24: 12288 396416 [Lorg.jboss.netty.util.internal.ConcurrentIdentityHashMap$HashEntry; 25: 16447 394728 java.util.zip.ZStreamRef 26: 565 370080 [I 27: 508 272288 objArrayKlassKlass 28: 16233 259728 java.lang.Object 29: 771 209232 [Ljava.util.concurrent.ConcurrentHashMap$HashEntry; 30: 2524 192312 [Ljava.lang.Object; But as I mentioned above, the heap memory seems OK, the extra memory is consumed by some off-heap data. I can't find a way to figure out what is in there. Besides, I did some extra experiments, i.e. run the same program in difference environments to test whether it has off-heap memory issue: spark1.0 + standalone = no spark1.0 + yarn = no spark1.3 + standalone = no spark1.3 + yarn = yes I'm using CDH5.1, so the spark1.0 is provided by cdh, and spark-1.3.1-bin-hadoop2.3 is downloaded from the official website. I could use spark1.0 + yarn, but I can't find a way to handle the logs, level and rolling, so it'll explode the harddrive. Currently I'll stick to spark1.0 + standalone, until our ops team decides to upgrade cdh. On Tue, Jun 2, 2015 at 2:58 PM, Tathagata Das t...@databricks.com wrote: While you are running is it possible for you login into the YARN node and get histograms of live objects using jmap -histo:live. That may reveal something. On Thursday, May 28, 2015, Ji ZHANG zhangj...@gmail.com wrote: Hi, Unfortunately, they're still growing, both driver and executors. I run the same job with local mode, everything is fine. On Thu, May 28, 2015 at 5:26 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Can you replace your counting part with this?
Spark-sql versus Impala versus Hive
I just published results of my findings herehttps://bigdatalatte.wordpress.com/2015/06/18/spark-sql-versus-impala-versus-hive/
Re: Serial batching with Spark Streaming
I haven’t tried with 1.4 but I tried with 1.3 a while ago and I could not get the serialized behavior by using default scheduler when there is failure and retry so I created a customized stream like this. class EachSeqRDD[T: ClassTag] ( parent: DStream[T], eachSeqFunc: (RDD[T], Time) = Unit ) extends DStream[Unit](parent.ssc) { override def slideDuration: Duration = parent.slideDuration override def dependencies: List[DStream[_]] = List(parent) override def compute(validTime: Time): Option[RDD[Unit]] = None override private[streaming] def generateJob(time: Time): Option[Job] = { val pendingJobs = ssc.scheduler.getPendingTimes().size logInfo(%d job(s) is(are) pending at %s.format(pendingJobs, time)) // do not generate new RDD if there is pending job if (pendingJobs == 0) { parent.getOrCompute(time) match { case Some(rdd) = { val jobFunc = () = { ssc.sparkContext.setCallSite(creationSite) eachSeqFunc(rdd, time) } Some(new Job(time, jobFunc)) } case None = None } } else { None } } } object DStreamEx { implicit class EDStream[T: ClassTag](dStream: DStream[T]) { def eachSeqRDD(func: (RDD[T], Time) = Unit) = { // because the DStream is reachable from the outer object here, and because // DStreams can't be serialized with closures, we can't proactively check // it for serializability and so we pass the optional false to SparkContext.clean new EachSeqRDD(dStream, dStream.context.sparkContext.clean(func, false)).register() } } } -Binh On Thu, Jun 18, 2015 at 10:49 AM, Michal Čizmazia mici...@gmail.com wrote: Tathagata, thanks for your response. You are right! Everything seems to work as expected. Please could help me understand why the time for processing of all jobs for a batch is always less than 4 seconds? Please see my playground code below. The last modified time of the input (lines) RDD dump files seems to match the Thread.sleep delays (20s or 5s) in the transform operation or the batching interval (10s): 20s, 5s, 10s. However, neither the batch processing time in the Streaming tab nor the last modified time of the output (words) RDD dump files reflect the Thread.sleep delays. 07:20 3240 001_lines_... 07:21 117 001_words_... 07:41 37224 002_lines_... 07:43 252 002_words_... 08:00 37728 003_lines_... 08:02 504 003_words_... 08:20 38952 004_lines_... 08:22 756 004_words_... 08:40 38664 005_lines_... 08:42 999 005_words_... 08:45 38160 006_lines_... 08:47 1134 006_words_... 08:50 9720 007_lines_... 08:51 1260 007_words_... 08:55 9864 008_lines_... 08:56 1260 008_words_... 09:00 10656 009_lines_... 09:01 1395 009_words_... 09:05 11664 010_lines_... 09:06 1395 010_words_... 09:11 10935 011_lines_... 09:11 1521 011_words_... 09:16 11745 012_lines_... 09:16 1530 012_words_... 09:21 12069 013_lines_... 09:22 1656 013_words_... 09:27 10692 014_lines_... 09:27 1665 014_words_... 09:32 10449 015_lines_... 09:32 1791 015_words_... 09:37 11178 016_lines_... 09:37 1800 016_words_... 09:45 17496 017_lines_... 09:45 1926 017_words_... 09:55 22032 018_lines_... 09:56 2061 018_words_... 10:05 21951 019_lines_... 10:06 2196 019_words_... 10:15 21870 020_lines_... 10:16 2322 020_words_... 10:25 21303 021_lines_... 10:26 2340 021_words_... final SparkConf conf = new SparkConf().setMaster(local[4]).setAppName(WordCount); try (final JavaStreamingContext context = new JavaStreamingContext(conf, Durations.seconds(10))) { context.checkpoint(/tmp/checkpoint); final JavaDStreamString lines = context.union( context.receiverStream(new GeneratorReceiver()), ImmutableList.of( context.receiverStream(new GeneratorReceiver()), context.receiverStream(new GeneratorReceiver(; lines.print(); final AccumulatorInteger lineRddIndex = context.sparkContext().accumulator(0); lines.foreachRDD( rdd - { lineRddIndex.add(1); final String prefix = /tmp/ + String.format(%03d, lineRddIndex.localValue()) + _lines_; try (final PrintStream out = new PrintStream(prefix + UUID.randomUUID())) { rdd.collect().forEach(s - out.println(s)); } return null; }); final JavaDStreamString words = lines.flatMap(x - Arrays.asList(x.split( ))); final JavaPairDStreamString, Integer pairs = words.mapToPair(s - new Tuple2String, Integer(s, 1)); final JavaPairDStreamString, Integer wordCounts = pairs.reduceByKey((i1, i2) - i1 + i2); final AccumulatorInteger sleep =
Latency between the RDD in Streaming
Is there any fixed way to find among RDD in stream processing systems , in the Distributed set-up . -- Thanks Regards, Anshu Shukla
Re: Matrix Multiplication and mllib.recommendation
Thanks all for the help. It turned out that using the bumpy matrix multiplication made a huge difference in performance. I suspect that Numpy already uses BLAS optimized code. Here is Python code #This is where i load and directly test the predictions myModel = MatrixFactorizationModel.load(sc, FlurryModelPath) m1 = myModel.productFeatures().sample(False, 1.00) m2 = m1.map(lambda (user,feature) : feature).collect() m3 = matrix(m2).transpose() pf = sc.broadcast(m3) uf = myModel.userFeatures() f1 = uf.map(lambda (userID, features): (userID, squeeze(asarray(matrix(array(features)) * pf.value dog = f1.count() On Jun 18, 2015, at 8:42 AM, Debasish Das debasish.da...@gmail.com wrote: Also in my experiments, it's much faster to blocked BLAS through cartesian rather than doing sc.union. Here are the details on the experiments: https://issues.apache.org/jira/browse/SPARK-4823 On Thu, Jun 18, 2015 at 8:40 AM, Debasish Das debasish.da...@gmail.com wrote: Also not sure how threading helps here because Spark puts a partition to each core. On each core may be there are multiple threads if you are using intel hyperthreading but I will let Spark handle the threading. On Thu, Jun 18, 2015 at 8:38 AM, Debasish Das debasish.da...@gmail.com wrote: We added SPARK-3066 for this. In 1.4 you should get the code to do BLAS dgemm based calculation. On Thu, Jun 18, 2015 at 8:20 AM, Ayman Farahat ayman.fara...@yahoo.com.invalid wrote: Thanks Sabarish and Nick Would you happen to have some code snippets that you can share. Best Ayman On Jun 17, 2015, at 10:35 PM, Sabarish Sasidharan sabarish.sasidha...@manthan.com wrote: Nick is right. I too have implemented this way and it works just fine. In my case, there can be even more products. You simply broadcast blocks of products to userFeatures.mapPartitions() and BLAS multiply in there to get recommendations. In my case 10K products form one block. Note that you would then have to union your recommendations. And if there lots of product blocks, you might also want to checkpoint once every few times. Regards Sab On Thu, Jun 18, 2015 at 10:43 AM, Nick Pentreath nick.pentre...@gmail.com wrote: One issue is that you broadcast the product vectors and then do a dot product one-by-one with the user vector. You should try forming a matrix of the item vectors and doing the dot product as a matrix-vector multiply which will make things a lot faster. Another optimisation that is avalailable on 1.4 is a recommendProducts method that blockifies the factors to make use of level 3 BLAS (ie matrix-matrix multiply). I am not sure if this is available in The Python api yet. But you can do a version yourself by using mapPartitions over user factors, blocking the factors into sub-matrices and doing matrix multiply with item factor matrix to get scores on a block-by-block basis. Also as Ilya says more parallelism can help. I don't think it's so necessary to do LSH with 30,000 items. — Sent from Mailbox On Thu, Jun 18, 2015 at 6:01 AM, Ganelin, Ilya ilya.gane...@capitalone.com wrote: Actually talk about this exact thing in a blog post here http://blog.cloudera.com/blog/2015/05/working-with-apache-spark-or-how-i-learned-to-stop-worrying-and-love-the-shuffle/. Keep in mind, you're actually doing a ton of math. Even with proper caching and use of broadcast variables this will take a while defending on the size of your cluster. To get real results you may want to look into locality sensitive hashing to limit your search space and definitely look into spinning up multiple threads to process your product features in parallel to increase resource utilization on the cluster. Thank you, Ilya Ganelin -Original Message- From: afarahat [ayman.fara...@yahoo.com] Sent: Wednesday, June 17, 2015 11:16 PM Eastern Standard Time To: user@spark.apache.org Subject: Matrix Multiplication and mllib.recommendation Hello; I am trying to get predictions after running the ALS model. The model works fine. In the prediction/recommendation , I have about 30 ,000 products and 90 Millions users. When i try the predict all it fails. I have been trying to formulate the problem as a Matrix multiplication where I first get the product features, broadcast them and then do a dot product. Its still very slow. Any reason why here is a sample code def doMultiply(x): a = [] #multiply by mylen = len(pf.value) for i in range(mylen) : myprod = numpy.dot(x,pf.value[i][1]) a.append(myprod) return a myModel = MatrixFactorizationModel.load(sc, FlurryModelPath) #I need to select which products to broadcast but lets try all m1 = myModel.productFeatures().sample(False, 0.001) pf = sc.broadcast(m1.collect()) uf = myModel.userFeatures() f1 = uf.map(lambda x : (x[0], doMultiply(x[1]))) -- View this message
Re: Got the exception when joining RDD with spark streamRDD
This seems be a bug, could you file a JIRA for it? RDD should be serializable for Streaming job. On Thu, Jun 18, 2015 at 4:25 AM, Groupme grou...@gmail.com wrote: Hi, I am writing pyspark stream program. I have the training data set to compute the regression model. I want to use the stream data set to test the model. So, I join with RDD with the StreamRDD, but i got the exception. Following are my source code, and the exception I got. Any help is appreciated. Thanks Regards, Afancy from __future__ import print_function import sys,os,datetime from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.sql.context import SQLContext from pyspark.resultiterable import ResultIterable from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD import numpy as np import statsmodels.api as sm def splitLine(line, delimiter='|'): values = line.split(delimiter) st = datetime.datetime.strptime(values[1], '%Y-%m-%d %H:%M:%S') return (values[0],st.hour), values[2:] def reg_m(y, x): ones = np.ones(len(x[0])) X = sm.add_constant(np.column_stack((x[0], ones))) for ele in x[1:]: X = sm.add_constant(np.column_stack((ele, X))) results = sm.OLS(y, X).fit() return results def train(line): y,x = [],[] y, x = [],[[],[],[],[],[],[]] reading_tmp,temp_tmp = [],[] i = 0 for reading, temperature in line[1]: if i%4==0 and len(reading_tmp)==4: y.append(reading_tmp.pop()) x[0].append(reading_tmp.pop()) x[1].append(reading_tmp.pop()) x[2].append(reading_tmp.pop()) temp = float(temp_tmp[0]) del temp_tmp[:] x[3].append(temp-20.0 if temp20.0 else 0.0) x[4].append(16.0-temp if temp16.0 else 0.0) x[5].append(5.0-temp if temp5.0 else 0.0) reading_tmp.append(float(reading)) temp_tmp.append(float(temperature)) i = i + 1 return str(line[0]),reg_m(y, x).params.tolist() if __name__ == __main__: if len(sys.argv) != 4: print(Usage: regression.py checkpointDir trainingDataDir streamDataDir, file=sys.stderr) exit(-1) checkpoint, trainingInput, streamInput = sys.argv[1:] sc = SparkContext(local[2], appName=BenchmarkSparkStreaming) trainingLines = sc.textFile(trainingInput) modelRDD = trainingLines.map(lambda line: splitLine(line, |))\ .groupByKey()\ .map(lambda line: train(line))\ .cache() ssc = StreamingContext(sc, 2) ssc.checkpoint(checkpoint) lines = ssc.textFileStream(streamInput).map(lambda line: splitLine(line, |)) testRDD = lines.groupByKeyAndWindow(4,2).map(lambda line:(str(line[0]), line[1])).transform(lambda rdd: rdd.leftOuterJoin(modelRDD)) testRDD.pprint(20) ssc.start() ssc.awaitTermination() 15/06/18 12:25:37 INFO FileInputDStream: Duration for remembering RDDs set to 6 ms for org.apache.spark.streaming.dstream.FileInputDStream@15b81ee6 Traceback (most recent call last): File /opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/util.py, line 90, in dumps return bytearray(self.serializer.dumps((func.func, func.deserializers))) File /opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py, line 427, in dumps return cloudpickle.dumps(obj, 2) File /opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py, line 622, in dumps cp.dump(obj) File /opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py, line 107, in dump return Pickler.dump(self, obj) File /usr/lib/python2.7/pickle.py, line 224, in dump self.save(obj) File /usr/lib/python2.7/pickle.py, line 286, in save f(self, obj) # Call unbound method with explicit self File /usr/lib/python2.7/pickle.py, line 548, in save_tuple save(element) File /usr/lib/python2.7/pickle.py, line 286, in save f(self, obj) # Call unbound method with explicit self File /opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py, line 193, in save_function self.save_function_tuple(obj) File /opt/spark-1.4.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py, line 236, in save_function_tuple save((code, closure, base_globals)) File /usr/lib/python2.7/pickle.py, line 286, in save f(self, obj) # Call unbound method with explicit self File /usr/lib/python2.7/pickle.py, line 548, in save_tuple save(element) File /usr/lib/python2.7/pickle.py, line 286, in save f(self, obj) # Call unbound method with explicit self File /usr/lib/python2.7/pickle.py, line 600, in save_list self._batch_appends(iter(obj)) File /usr/lib/python2.7/pickle.py, line
Re: Is there programmatic way running Spark job on Yarn cluster without using spark-submit script ?
This is not independent programmatic way of running of Spark job on Yarn cluster. The example I created simply demonstrates how to wire up the classpath so that spark submit can be called programmatically. For my use case, I wanted to hold open a connection so I could send tasks to the executors on demand. If you were to submit this via yarn-cluster mode, it would only require any extra files be placed on the executors, if needed. On Wed, Jun 17, 2015 at 9:01 PM, Elkhan Dadashov elkhan8...@gmail.com wrote: This is not independent programmatic way of running of Spark job on Yarn cluster. That example demonstrates running on *Yarn-client* mode, also will be dependent of Jetty. Users writing Spark programs do not want to depend on that. I found this SparkLauncher class introduced in Spark 1.4 version ( https://github.com/apache/spark/tree/master/launcher) which allows running Spark jobs in programmatic way. SparkLauncher exists in Java and Scala APIs, but I could not find in Python API. Did not try it yet, but seems promising. Example: import org.apache.spark.launcher.SparkLauncher; public class MyLauncher { public static void main(String[] args) throws Exception { Process spark = new SparkLauncher() .setAppResource(/my/app.jar) .setMainClass(my.spark.app.Main) .setMaster(local) .setConf(SparkLauncher.DRIVER_MEMORY, 2g) .launch(); spark.waitFor(); } } } On Wed, Jun 17, 2015 at 5:51 PM, Corey Nolet cjno...@gmail.com wrote: An example of being able to do this is provided in the Spark Jetty Server project [1] [1] https://github.com/calrissian/spark-jetty-server On Wed, Jun 17, 2015 at 8:29 PM, Elkhan Dadashov elkhan8...@gmail.com wrote: Hi all, Is there any way running Spark job in programmatic way on Yarn cluster without using spark-submit script ? I cannot include Spark jars on my Java application (due o dependency conflict and other reasons), so I'll be shipping Spark assembly uber jar (spark-assembly-1.3.1-hadoop2.3.0.jar) to Yarn cluster, and then execute job (Python or Java) on Yarn-cluster. So is there any way running Spark job implemented in python file/Java class without calling it through spark-submit script ? Thanks. -- Best regards, Elkhan Dadashov
Settings for K-Means Clustering in Mlib for large data set
Hi All, I am trying to run KMeans clustering on a large data set with 12,000 points and 80,000 dimensions. I have a spark cluster in Ec2 stand alone mode with 8 workers running on 2 slaves with 160 GB Ram and 40 VCPU. My Code is as Follows: def convert_into_sparse_vector(A): non_nan_indices=np.nonzero(~np.isnan(A) ) non_nan_values=A[non_nan_indices] dictionary=dict(zip(non_nan_indices[0],non_nan_values)) return Vectors.sparse (len(A),dictionary) X=[convert_into_sparse_vector(A) for A in complete_dataframe.values ] sc=SparkContext(appName=parallel_kmeans) data=sc.parallelize(X,10) model = KMeans.train(data, 1000, initializationMode=k-means||) where complete_dataframe is a pandas data frame that has my data. I get the error: *Py4JNetworkError: An error occurred while trying to connect to the **Java server.* The error trace is as follows: Exception happened during processing of request from ('127.0.0.1', 41360) Traceback (most recent call last): File /usr/lib64/python2.6/SocketServer.py, line 283, in _handle_request_noblock self.process_request(request, client_address) File /usr/lib64/python2.6/SocketServer.py, line 309, in process_request self.finish_request(request, client_address) File /usr/lib64/python2.6/SocketServer.py, line 322, in finish_request self.RequestHandlerClass(request, client_address, self) File /usr/lib64/python2.6/SocketServer.py, line 617, in __init__ self.handle() File /root/spark/python/pyspark/accumulators.py, line 235, in handle num_updates = read_int(self.rfile) File /root/spark/python/pyspark/serializers.py, line 544, in read_int raise EOFError EOFError --- Py4JNetworkError Traceback (most recent call last) ipython-input-13-3dd00c2c5e93 in module() 1 model = KMeans.train(data, 1000, initializationMode=k-means||) /root/spark/python/pyspark/mllib/clustering.pyc in train(cls, rdd, k, maxIterations, runs, initializationMode, seed, initializationSteps, epsilon) 134 Train a k-means clustering model. 135 model = callMLlibFunc(trainKMeansModel, rdd.map(_convert_to_vector), k, maxIterations, -- 136 runs, initializationMode, seed, initializationSteps, epsilon) 137 centers = callJavaFunc(rdd.context, model.clusterCenters) 138 return KMeansModel([c.toArray() for c in centers]) /root/spark/python/pyspark/mllib/common.pyc in callMLlibFunc(name, *args) 126 sc = SparkContext._active_spark_context 127 api = getattr(sc._jvm.PythonMLLibAPI(), name) -- 128 return callJavaFunc(sc, api, *args) 129 130 /root/spark/python/pyspark/mllib/common.pyc in callJavaFunc(sc, func, *args) 119 Call Java Function 120 args = [_py2java(sc, a) for a in args] -- 121 return _java2py(sc, func(*args)) 122 123 /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args) 534 END_COMMAND_PART 535 -- 536 answer = self.gateway_client.send_command(command) 537 return_value = get_return_value(answer, self.gateway_client, 538 self.target_id, self.name) /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in send_command(self, command, retry) 367 if retry: 368 #print_exc() -- 369 response = self.send_command(command) 370 else: 371 response = ERROR /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in send_command(self, command, retry) 360 the Py4J protocol. 361 -- 362 connection = self._get_connection() 363 try: 364 response = connection.send_command(command) /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in _get_connection(self) 316 connection = self.deque.pop() 317 except Exception: -- 318 connection = self._create_connection() 319 return connection 320 /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in _create_connection(self) 323 connection = GatewayConnection(self.address, self.port, 324 self.auto_close, self.gateway_property) -- 325 connection.start() 326 return connection 327 /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in start(self) 430 'server' 431 logger.exception(msg) -- 432 raise Py4JNetworkError(msg) 433 434 def close(self): Py4JNetworkError: An error occurred while trying to connect to the Java server Is there any specific setting that I am missing ,
Re: Matrix Multiplication and mllib.recommendation
Yup, numpy calls into BLAS for matrix multiply. Sent from my iPad On 18 Jun 2015, at 8:54 PM, Ayman Farahat ayman.fara...@yahoo.com wrote: Thanks all for the help. It turned out that using the bumpy matrix multiplication made a huge difference in performance. I suspect that Numpy already uses BLAS optimized code. Here is Python code #This is where i load and directly test the predictions myModel = MatrixFactorizationModel.load(sc, FlurryModelPath) m1 = myModel.productFeatures().sample(False, 1.00) m2 = m1.map(lambda (user,feature) : feature).collect() m3 = matrix(m2).transpose() pf = sc.broadcast(m3) uf = myModel.userFeatures() f1 = uf.map(lambda (userID, features): (userID, squeeze(asarray(matrix(array(features)) * pf.value dog = f1.count() On Jun 18, 2015, at 8:42 AM, Debasish Das debasish.da...@gmail.com wrote: Also in my experiments, it's much faster to blocked BLAS through cartesian rather than doing sc.union. Here are the details on the experiments: https://issues.apache.org/jira/browse/SPARK-4823 On Thu, Jun 18, 2015 at 8:40 AM, Debasish Das debasish.da...@gmail.com wrote: Also not sure how threading helps here because Spark puts a partition to each core. On each core may be there are multiple threads if you are using intel hyperthreading but I will let Spark handle the threading. On Thu, Jun 18, 2015 at 8:38 AM, Debasish Das debasish.da...@gmail.com wrote: We added SPARK-3066 for this. In 1.4 you should get the code to do BLAS dgemm based calculation. On Thu, Jun 18, 2015 at 8:20 AM, Ayman Farahat ayman.fara...@yahoo.com.invalid wrote: Thanks Sabarish and Nick Would you happen to have some code snippets that you can share. Best Ayman On Jun 17, 2015, at 10:35 PM, Sabarish Sasidharan sabarish.sasidha...@manthan.com wrote: Nick is right. I too have implemented this way and it works just fine. In my case, there can be even more products. You simply broadcast blocks of products to userFeatures.mapPartitions() and BLAS multiply in there to get recommendations. In my case 10K products form one block. Note that you would then have to union your recommendations. And if there lots of product blocks, you might also want to checkpoint once every few times. Regards Sab On Thu, Jun 18, 2015 at 10:43 AM, Nick Pentreath nick.pentre...@gmail.com wrote: One issue is that you broadcast the product vectors and then do a dot product one-by-one with the user vector. You should try forming a matrix of the item vectors and doing the dot product as a matrix-vector multiply which will make things a lot faster. Another optimisation that is avalailable on 1.4 is a recommendProducts method that blockifies the factors to make use of level 3 BLAS (ie matrix-matrix multiply). I am not sure if this is available in The Python api yet. But you can do a version yourself by using mapPartitions over user factors, blocking the factors into sub-matrices and doing matrix multiply with item factor matrix to get scores on a block-by-block basis. Also as Ilya says more parallelism can help. I don't think it's so necessary to do LSH with 30,000 items. — Sent from Mailbox On Thu, Jun 18, 2015 at 6:01 AM, Ganelin, Ilya ilya.gane...@capitalone.com wrote: Actually talk about this exact thing in a blog post here http://blog.cloudera.com/blog/2015/05/working-with-apache-spark-or-how-i-learned-to-stop-worrying-and-love-the-shuffle/. Keep in mind, you're actually doing a ton of math. Even with proper caching and use of broadcast variables this will take a while defending on the size of your cluster. To get real results you may want to look into locality sensitive hashing to limit your search space and definitely look into spinning up multiple threads to process your product features in parallel to increase resource utilization on the cluster. Thank you, Ilya Ganelin -Original Message- From: afarahat [ayman.fara...@yahoo.com] Sent: Wednesday, June 17, 2015 11:16 PM Eastern Standard Time To: user@spark.apache.org Subject: Matrix Multiplication and mllib.recommendation Hello; I am trying to get predictions after running the ALS model. The model works fine. In the prediction/recommendation , I have about 30 ,000 products and 90 Millions users. When i try the predict all it fails. I have been trying to formulate the problem as a Matrix multiplication where I first get the product features, broadcast them and then do a dot product. Its still very slow. Any reason why here is a sample code def doMultiply(x): a = [] #multiply by mylen = len(pf.value) for i in range(mylen) : myprod = numpy.dot(x,pf.value[i][1]) a.append(myprod) return a myModel = MatrixFactorizationModel.load(sc, FlurryModelPath) #I need to select which products to broadcast but lets try all m1 =
MLIB-KMEANS: Py4JNetworkError: An error occurred while trying to connect to the Java server , on a huge data set
Hi All, I am trying to run KMeans clustering on a large data set with 12,000 points and 80,000 dimensions. I have a spark cluster in Ec2 stand alone mode with 8 workers running on 2 slaves with 160 GB Ram and 40 VCPU. *My Code is as Follows:* def convert_into_sparse_vector(A): non_nan_indices=np.nonzero(~np.isnan(A) ) non_nan_values=A[non_nan_indices] dictionary=dict(zip(non_nan_indices[0],non_nan_values)) return Vectors.sparse (len(A),dictionary) X=[convert_into_sparse_vector(A) for A in complete_dataframe.values ] sc=SparkContext(appName=parallel_kmeans) data=sc.parallelize(X,10) model = KMeans.train(data, 1000, initializationMode=k-means||) where complete_dataframe is a pandas data frame that has my data. I get the error: Py4JNetworkError: An error occurred while trying to connect to the Java server. / The error trace is as follows: Exception happened during processing of request from ('127.0.0.1', 41360) Traceback (most recent call last): File /usr/lib64/python2.6/SocketServer.py, line 283, in _handle_request_noblock self.process_request(request, client_address) File /usr/lib64/python2.6/SocketServer.py, line 309, in process_request self.finish_request(request, client_address) File /usr/lib64/python2.6/SocketServer.py, line 322, in finish_request self.RequestHandlerClass(request, client_address, self) File /usr/lib64/python2.6/SocketServer.py, line 617, in __init__ self.handle() File /root/spark/python/pyspark/accumulators.py, line 235, in handle num_updates = read_int(self.rfile) File /root/spark/python/pyspark/serializers.py, line 544, in read_int raise EOFError EOFError --- Py4JNetworkError Traceback (most recent call last) ipython-input-13-3dd00c2c5e93 in module() 1 model = KMeans.train(data, 1000, initializationMode=k-means||) /root/spark/python/pyspark/mllib/clustering.pyc in train(cls, rdd, k, maxIterations, runs, initializationMode, seed, initializationSteps, epsilon) 134 Train a k-means clustering model. 135 model = callMLlibFunc(trainKMeansModel, rdd.map(_convert_to_vector), k, maxIterations, -- 136 runs, initializationMode, seed, initializationSteps, epsilon) 137 centers = callJavaFunc(rdd.context, model.clusterCenters) 138 return KMeansModel([c.toArray() for c in centers]) /root/spark/python/pyspark/mllib/common.pyc in callMLlibFunc(name, *args) 126 sc = SparkContext._active_spark_context 127 api = getattr(sc._jvm.PythonMLLibAPI(), name) -- 128 return callJavaFunc(sc, api, *args) 129 130 /root/spark/python/pyspark/mllib/common.pyc in callJavaFunc(sc, func, *args) 119 Call Java Function 120 args = [_py2java(sc, a) for a in args] -- 121 return _java2py(sc, func(*args)) 122 123 /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args) 534 END_COMMAND_PART 535 -- 536 answer = self.gateway_client.send_command(command) 537 return_value = get_return_value(answer, self.gateway_client, 538 self.target_id, self.name) /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in send_command(self, command, retry) 367 if retry: 368 #print_exc() -- 369 response = self.send_command(command) 370 else: 371 response = ERROR /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in send_command(self, command, retry) 360 the Py4J protocol. 361 -- 362 connection = self._get_connection() 363 try: 364 response = connection.send_command(command) /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in _get_connection(self) 316 connection = self.deque.pop() 317 except Exception: -- 318 connection = self._create_connection() 319 return connection 320 /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in _create_connection(self) 323 connection = GatewayConnection(self.address, self.port, 324 self.auto_close, self.gateway_property) -- 325 connection.start() 326 return connection 327 /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in start(self) 430 'server' 431 logger.exception(msg) -- 432 raise Py4JNetworkError(msg) 433 434 def close(self): Py4JNetworkError: An error occurred while trying to connect to the Java server/ Please let me know if I am missing
Re: [Spark Streaming] Runtime Error in call to max function for JavaPairRDD
I think you may be including a different version of Spark Streaming in your assembly. Please mark spark-core nd spark-streaming as provided dependencies. Any installation of Spark will automatically provide Spark in the classpath so you do not have to bundle it. On Thu, Jun 18, 2015 at 8:44 AM, Nipun Arora nipunarora2...@gmail.com wrote: Hi, I have the following piece of code, where I am trying to transform a spark stream and add min and max to it of eachRDD. However, I get an error saying max call does not exist, at run-time (compiles properly). I am using spark-1.4 I have added the question to stackoverflow as well: http://stackoverflow.com/questions/30902090/adding-max-and-min-in-spark-stream-in-java/30909796#30909796 Any help is greatly appreciated :) Thanks Nipun JavaPairDStreamTuple2Long, Integer, Tuple3Integer,Long,Long sortedtsStream = transformedMaxMintsStream.transformToPair(new Sort2()); sortedtsStream.foreach( new FunctionJavaPairRDDTuple2Long, Integer, Tuple3Integer, Long, Long, Void() { @Override public Void call(JavaPairRDDTuple2Long, Integer, Tuple3Integer, Long, Long tuple2Tuple3JavaPairRDD) throws Exception { ListTuple2Tuple2Long, Integer, Tuple3Integer,Long,Long templist = tuple2Tuple3JavaPairRDD.collect(); for(Tuple2Tuple2Long,Integer, Tuple3Integer,Long,Long tuple :templist){ Date date = new Date(tuple._1._1); int pattern = tuple._1._2; int count = tuple._2._1(); Date maxDate = new Date(tuple._2._2()); Date minDate = new Date(tuple._2._2()); System.out.println(TimeSlot: + date.toString() + Pattern: + pattern + Count: + count + Max: + maxDate.toString() + Min: + minDate.toString()); } return null; } } ); Error: 15/06/18 11:05:06 INFO BlockManagerInfo: Added input-0-1434639906000 in memory on localhost:42829 (size: 464.0 KB, free: 264.9 MB)15/06/18 11:05:06 INFO BlockGenerator: Pushed block input-0-1434639906000Exception in thread JobGenerator java.lang.NoSuchMethodError: org.apache.spark.api.java.JavaPairRDD.max(Ljava/util/Comparator;)Lscala/Tuple2; at org.necla.ngla.spark_streaming.MinMax.call(Type4ViolationChecker.java:346) at org.necla.ngla.spark_streaming.MinMax.call(Type4ViolationChecker.java:340) at org.apache.spark.streaming.api.java.JavaDStreamLike$class.scalaTransform$3(JavaDStreamLike.scala:360) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1.apply(JavaDStreamLike.scala:361) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1.apply(JavaDStreamLike.scala:361) at org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonf
Re: Serial batching with Spark Streaming
Tathagata, thanks for your response. You are right! Everything seems to work as expected. Please could help me understand why the time for processing of all jobs for a batch is always less than 4 seconds? Please see my playground code below. The last modified time of the input (lines) RDD dump files seems to match the Thread.sleep delays (20s or 5s) in the transform operation or the batching interval (10s): 20s, 5s, 10s. However, neither the batch processing time in the Streaming tab nor the last modified time of the output (words) RDD dump files reflect the Thread.sleep delays. 07:20 3240 001_lines_... 07:21 117 001_words_... 07:41 37224 002_lines_... 07:43 252 002_words_... 08:00 37728 003_lines_... 08:02 504 003_words_... 08:20 38952 004_lines_... 08:22 756 004_words_... 08:40 38664 005_lines_... 08:42 999 005_words_... 08:45 38160 006_lines_... 08:47 1134 006_words_... 08:50 9720 007_lines_... 08:51 1260 007_words_... 08:55 9864 008_lines_... 08:56 1260 008_words_... 09:00 10656 009_lines_... 09:01 1395 009_words_... 09:05 11664 010_lines_... 09:06 1395 010_words_... 09:11 10935 011_lines_... 09:11 1521 011_words_... 09:16 11745 012_lines_... 09:16 1530 012_words_... 09:21 12069 013_lines_... 09:22 1656 013_words_... 09:27 10692 014_lines_... 09:27 1665 014_words_... 09:32 10449 015_lines_... 09:32 1791 015_words_... 09:37 11178 016_lines_... 09:37 1800 016_words_... 09:45 17496 017_lines_... 09:45 1926 017_words_... 09:55 22032 018_lines_... 09:56 2061 018_words_... 10:05 21951 019_lines_... 10:06 2196 019_words_... 10:15 21870 020_lines_... 10:16 2322 020_words_... 10:25 21303 021_lines_... 10:26 2340 021_words_... final SparkConf conf = new SparkConf().setMaster(local[4]).setAppName(WordCount); try (final JavaStreamingContext context = new JavaStreamingContext(conf, Durations.seconds(10))) { context.checkpoint(/tmp/checkpoint); final JavaDStreamString lines = context.union( context.receiverStream(new GeneratorReceiver()), ImmutableList.of( context.receiverStream(new GeneratorReceiver()), context.receiverStream(new GeneratorReceiver(; lines.print(); final AccumulatorInteger lineRddIndex = context.sparkContext().accumulator(0); lines.foreachRDD( rdd - { lineRddIndex.add(1); final String prefix = /tmp/ + String.format(%03d, lineRddIndex.localValue()) + _lines_; try (final PrintStream out = new PrintStream(prefix + UUID.randomUUID())) { rdd.collect().forEach(s - out.println(s)); } return null; }); final JavaDStreamString words = lines.flatMap(x - Arrays.asList(x.split( ))); final JavaPairDStreamString, Integer pairs = words.mapToPair(s - new Tuple2String, Integer(s, 1)); final JavaPairDStreamString, Integer wordCounts = pairs.reduceByKey((i1, i2) - i1 + i2); final AccumulatorInteger sleep = context.sparkContext().accumulator(0); final JavaPairDStreamString, Integer wordCounts2 = JavaPairDStream.fromJavaDStream( wordCounts.transform( (rdd) - { sleep.add(1); Thread.sleep(sleep.localValue() 6 ? 2 : 5000); return JavaRDD.fromRDD(JavaPairRDD.toRDD(rdd), rdd.classTag()); })); final Function2ListInteger, OptionalInteger, OptionalInteger updateFunction = (values, state) - { Integer newSum = state.or(0); for (final Integer value : values) { newSum += value; } return Optional.of(newSum); }; final ListTuple2String, Integer tuples = ImmutableList.Tuple2String, Integer of(); final JavaPairRDDString, Integer initialRDD = context.sparkContext().parallelizePairs(tuples); final JavaPairDStreamString, Integer wordCountsState = wordCounts2.updateStateByKey( updateFunction, new HashPartitioner(context.sparkContext().defaultParallelism()), initialRDD); wordCountsState.print(); final AccumulatorInteger rddIndex = context.sparkContext().accumulator(0); wordCountsState.foreachRDD( rdd - { rddIndex.add(1); final String prefix = /tmp/ + String.format(%03d, rddIndex.localValue()) + _words_; try (final PrintStream out = new PrintStream(prefix + UUID.randomUUID())) { rdd.collect().forEach(s - out.println(s)); } return null; }); context.start(); context.awaitTermination(); } On 17 June 2015 at 17:25, Tathagata Das t...@databricks.com wrote: The default behavior should be that batch X + 1 starts processing only after batch X completes. If you are using Spark 1.4.0, could you show us a screenshot of the streaming
Re: Does MLLib has attribute importance?
ChiSqSelector calls an RDD of labeled points, where the label is the target. See https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala#L120 On Wed, Jun 17, 2015 at 10:22 PM, Ruslan Dautkhanov dautkha...@gmail.com wrote: Thank you Xiangrui. Oracle's attribute importance mining function have a target variable. Attribute importance is a supervised function that ranks attributes according to their significance in predicting a target. MLlib's ChiSqSelector does not have a target variable. -- Ruslan Dautkhanov On Wed, Jun 17, 2015 at 5:50 PM, Xiangrui Meng men...@gmail.com wrote: We don't have it in MLlib. The closest would be the ChiSqSelector, which works for categorical data. -Xiangrui On Thu, Jun 11, 2015 at 4:33 PM, Ruslan Dautkhanov dautkha...@gmail.com wrote: What would be closest equivalent in MLLib to Oracle Data Miner's Attribute Importance mining function? http://docs.oracle.com/cd/B28359_01/datamine.111/b28129/feature_extr.htm#i1005920 Attribute importance is a supervised function that ranks attributes according to their significance in predicting a target. Best regards, Ruslan Dautkhanov - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Latency between the RDD in Streaming
Its not clear what you are asking. Find what among RDD? On Thu, Jun 18, 2015 at 11:24 AM, anshu shukla anshushuk...@gmail.com wrote: Is there any fixed way to find among RDD in stream processing systems , in the Distributed set-up . -- Thanks Regards, Anshu Shukla
Re: Serial batching with Spark Streaming
Tathagata, Please could you confirm that batches are not processed in parallel during retries in Spark 1.4? See Binh's email copied below. Any pointers for workarounds if necessary? Thanks! On 18 June 2015 at 14:29, Binh Nguyen Van binhn...@gmail.com wrote: I haven’t tried with 1.4 but I tried with 1.3 a while ago and I could not get the serialized behavior by using default scheduler when there is failure and retry so I created a customized stream like this. class EachSeqRDD[T: ClassTag] ( parent: DStream[T], eachSeqFunc: (RDD[T], Time) = Unit ) extends DStream[Unit](parent.ssc) { override def slideDuration: Duration = parent.slideDuration override def dependencies: List[DStream[_]] = List(parent) override def compute(validTime: Time): Option[RDD[Unit]] = None override private[streaming] def generateJob(time: Time): Option[Job] = { val pendingJobs = ssc.scheduler.getPendingTimes().size logInfo(%d job(s) is(are) pending at %s.format(pendingJobs, time)) // do not generate new RDD if there is pending job if (pendingJobs == 0) { parent.getOrCompute(time) match { case Some(rdd) = { val jobFunc = () = { ssc.sparkContext.setCallSite(creationSite) eachSeqFunc(rdd, time) } Some(new Job(time, jobFunc)) } case None = None } } else { None } } } object DStreamEx { implicit class EDStream[T: ClassTag](dStream: DStream[T]) { def eachSeqRDD(func: (RDD[T], Time) = Unit) = { // because the DStream is reachable from the outer object here, and because // DStreams can't be serialized with closures, we can't proactively check // it for serializability and so we pass the optional false to SparkContext.clean new EachSeqRDD(dStream, dStream.context.sparkContext.clean(func, false)).register() } } } -Binh On Thu, Jun 18, 2015 at 10:49 AM, Michal Čizmazia mici...@gmail.com wrote: Tathagata, thanks for your response. You are right! Everything seems to work as expected. Please could help me understand why the time for processing of all jobs for a batch is always less than 4 seconds? Please see my playground code below. The last modified time of the input (lines) RDD dump files seems to match the Thread.sleep delays (20s or 5s) in the transform operation or the batching interval (10s): 20s, 5s, 10s. However, neither the batch processing time in the Streaming tab nor the last modified time of the output (words) RDD dump files reflect the Thread.sleep delays. 07:20 3240 001_lines_... 07:21 117 001_words_... 07:41 37224 002_lines_... 07:43 252 002_words_... 08:00 37728 003_lines_... 08:02 504 003_words_... 08:20 38952 004_lines_... 08:22 756 004_words_... 08:40 38664 005_lines_... 08:42 999 005_words_... 08:45 38160 006_lines_... 08:47 1134 006_words_... 08:50 9720 007_lines_... 08:51 1260 007_words_... 08:55 9864 008_lines_... 08:56 1260 008_words_... 09:00 10656 009_lines_... 09:01 1395 009_words_... 09:05 11664 010_lines_... 09:06 1395 010_words_... 09:11 10935 011_lines_... 09:11 1521 011_words_... 09:16 11745 012_lines_... 09:16 1530 012_words_... 09:21 12069 013_lines_... 09:22 1656 013_words_... 09:27 10692 014_lines_... 09:27 1665 014_words_... 09:32 10449 015_lines_... 09:32 1791 015_words_... 09:37 11178 016_lines_... 09:37 1800 016_words_... 09:45 17496 017_lines_... 09:45 1926 017_words_... 09:55 22032 018_lines_... 09:56 2061 018_words_... 10:05 21951 019_lines_... 10:06 2196 019_words_... 10:15 21870 020_lines_... 10:16 2322 020_words_... 10:25 21303 021_lines_... 10:26 2340 021_words_... final SparkConf conf = new SparkConf().setMaster(local[4]).setAppName(WordCount); try (final JavaStreamingContext context = new JavaStreamingContext(conf, Durations.seconds(10))) { context.checkpoint(/tmp/checkpoint); final JavaDStreamString lines = context.union( context.receiverStream(new GeneratorReceiver()), ImmutableList.of( context.receiverStream(new GeneratorReceiver()), context.receiverStream(new GeneratorReceiver(; lines.print(); final AccumulatorInteger lineRddIndex = context.sparkContext().accumulator(0); lines.foreachRDD( rdd - { lineRddIndex.add(1); final String prefix = /tmp/ + String.format(%03d, lineRddIndex.localValue()) + _lines_; try (final PrintStream out = new PrintStream(prefix + UUID.randomUUID())) { rdd.collect().forEach(s - out.println(s)); } return null; }); final JavaDStreamString words =
Re: Latency between the RDD in Streaming
Sorry , i missed the LATENCY word.. for a large streaming query .How to find the time taken by the particular RDD to travel from initial D-STREAM to final/last D-STREAM . Help Please !! On Fri, Jun 19, 2015 at 12:40 AM, Tathagata Das t...@databricks.com wrote: Its not clear what you are asking. Find what among RDD? On Thu, Jun 18, 2015 at 11:24 AM, anshu shukla anshushuk...@gmail.com wrote: Is there any fixed way to find among RDD in stream processing systems , in the Distributed set-up . -- Thanks Regards, Anshu Shukla -- Thanks Regards, Anshu Shukla
Re: Does MLLib has attribute importance?
Got it. Thanks! -- Ruslan Dautkhanov On Thu, Jun 18, 2015 at 1:02 PM, Xiangrui Meng men...@gmail.com wrote: ChiSqSelector calls an RDD of labeled points, where the label is the target. See https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala#L120 On Wed, Jun 17, 2015 at 10:22 PM, Ruslan Dautkhanov dautkha...@gmail.com wrote: Thank you Xiangrui. Oracle's attribute importance mining function have a target variable. Attribute importance is a supervised function that ranks attributes according to their significance in predicting a target. MLlib's ChiSqSelector does not have a target variable. -- Ruslan Dautkhanov On Wed, Jun 17, 2015 at 5:50 PM, Xiangrui Meng men...@gmail.com wrote: We don't have it in MLlib. The closest would be the ChiSqSelector, which works for categorical data. -Xiangrui On Thu, Jun 11, 2015 at 4:33 PM, Ruslan Dautkhanov dautkha...@gmail.com wrote: What would be closest equivalent in MLLib to Oracle Data Miner's Attribute Importance mining function? http://docs.oracle.com/cd/B28359_01/datamine.111/b28129/feature_extr.htm#i1005920 Attribute importance is a supervised function that ranks attributes according to their significance in predicting a target. Best regards, Ruslan Dautkhanov
Re: Latency between the RDD in Streaming
Thanks alot , But i have already tried the second way ,Problem with that is that how to identify the particular RDD from source to sink (as we can do by passing a msg id in storm) . For that i just updated RDD and added a msgID (as static variable) . but while dumping them to file some of the tuples of RDD are failed/missed (approx 3000 and data rate is aprox 1500 tuples/sec). On Fri, Jun 19, 2015 at 2:50 AM, Tathagata Das t...@databricks.com wrote: Couple of ways. 1. Easy but approx way: Find scheduling delay and processing time using StreamingListener interface, and then calculate end-to-end delay = 0.5 * batch interval + scheduling delay + processing time. The 0.5 * batch inteval is the approx average batching delay across all the records in the batch. 2. Hard but precise way: You could build a custom receiver that embeds the current timestamp in the records, and then compare them with the timestamp at the final step of the records. Assuming the executor and driver clocks are reasonably in sync, this will measure the latency between the time is received by the system and the result from the record is available. On Thu, Jun 18, 2015 at 2:12 PM, anshu shukla anshushuk...@gmail.com wrote: Sorry , i missed the LATENCY word.. for a large streaming query .How to find the time taken by the particular RDD to travel from initial D-STREAM to final/last D-STREAM . Help Please !! On Fri, Jun 19, 2015 at 12:40 AM, Tathagata Das t...@databricks.com wrote: Its not clear what you are asking. Find what among RDD? On Thu, Jun 18, 2015 at 11:24 AM, anshu shukla anshushuk...@gmail.com wrote: Is there any fixed way to find among RDD in stream processing systems , in the Distributed set-up . -- Thanks Regards, Anshu Shukla -- Thanks Regards, Anshu Shukla -- Thanks Regards, Anshu Shukla
The Initial job has not accepted any resources error; can't seem to set
Hi, I'm running Spark Standalone on a single node with 16 cores. Master and 4 workers are running. I'm trying to submit two applications via spark-submit and am getting the following error when submitting the second one: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources. The Web UI shows the first job taking up all the cores. Have tried setting spark.deploy.defaultCores, or spark.cores.max, or both, at the value of 2: spark-submit \ --conf spark.deploy.defaultCores=2 spark.cores.max=2 \ ... or spark-submit \ --conf spark.deploy.defaultCores=2 \ ... This doesn't seem to get propagated. Or perhaps this is not the way to pass this in? Does spark.executor.cores play into this? I have it set to 2 in spark-defaults.conf. Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/The-Initial-job-has-not-accepted-any-resources-error-can-t-seem-to-set-tp23398.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: The Initial job has not accepted any resources error; can't seem to set
I just realized that --conf needs to be one key-value pair per line. And somehow I needed --conf spark.cores.max=2 \ However, when it was --conf spark.deploy.defaultCores=2 \ then one job would take up all 16 cores on the box. What's the actual model here? We've got 10 apps we want to submit. These are apps that consume, directly, out of Kafka topics. Now with max=2 I'm lacking a few cores. What should the actual strategy be here? How do the below parameters affect this strategy and each other? Set this (max) lower on a shared cluster to prevent users from grabbing the whole cluster by default. But why tie a consumer to 1 or 2 cores only? isn't the idea to split RDD's into partitions and send them to multiple workers? spark.cores.max Default=not set When running on a standalone deploy cluster or a Mesos cluster in coarse-grained sharing mode, the maximum amount of CPU cores to request for the application from across the cluster (not from each machine). If not set, the default will be spark.deploy.defaultCores on Spark's standalone cluster manager, or infinite (all available cores) on Mesos. spark.executor.cores Default=1 in YARN mode, all the available cores on the worker in standalone mode. The number of cores to use on each executor. For YARN and standalone mode only. In standalone mode, setting this parameter allows an application to run multiple executors on the same worker, provided that there are enough cores on that worker. Otherwise, only one executor per application will run on each worker. spark.deploy.defaultCores Default=infinite Default number of cores to give to applications in Spark's standalone mode if they don't set spark.cores.max. If not set, applications always get all available cores unless they configure spark.cores.max themselves. Set this lower on a shared cluster to prevent users from grabbing the whole cluster by default. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/The-Initial-job-has-not-accepted-any-resources-error-can-t-seem-to-set-tp23398p23399.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Latency between the RDD in Streaming
Couple of ways. 1. Easy but approx way: Find scheduling delay and processing time using StreamingListener interface, and then calculate end-to-end delay = 0.5 * batch interval + scheduling delay + processing time. The 0.5 * batch inteval is the approx average batching delay across all the records in the batch. 2. Hard but precise way: You could build a custom receiver that embeds the current timestamp in the records, and then compare them with the timestamp at the final step of the records. Assuming the executor and driver clocks are reasonably in sync, this will measure the latency between the time is received by the system and the result from the record is available. On Thu, Jun 18, 2015 at 2:12 PM, anshu shukla anshushuk...@gmail.com wrote: Sorry , i missed the LATENCY word.. for a large streaming query .How to find the time taken by the particular RDD to travel from initial D-STREAM to final/last D-STREAM . Help Please !! On Fri, Jun 19, 2015 at 12:40 AM, Tathagata Das t...@databricks.com wrote: Its not clear what you are asking. Find what among RDD? On Thu, Jun 18, 2015 at 11:24 AM, anshu shukla anshushuk...@gmail.com wrote: Is there any fixed way to find among RDD in stream processing systems , in the Distributed set-up . -- Thanks Regards, Anshu Shukla -- Thanks Regards, Anshu Shukla
Re: Spark-sql versus Impala versus Hive
I would also love to see a more recent version of Spark SQL. There have been a lot of performance improvements between 1.2 and 1.4 :) On Thu, Jun 18, 2015 at 3:18 PM, Steve Nunez snu...@hortonworks.com wrote: Interesting. What where the Hive settings? Specifically it would be useful to know if this was Hive on Tez. - Steve From: Sanjay Subramanian Reply-To: Sanjay Subramanian Date: Thursday, June 18, 2015 at 11:08 To: user@spark.apache.org Subject: Spark-sql versus Impala versus Hive I just published results of my findings here https://bigdatalatte.wordpress.com/2015/06/18/spark-sql-versus-impala-versus-hive/
Re: confusing ScalaReflectionException with DataFrames in 1.4
How are you adding com.rr.data.Visit to spark? With --jars? It is possible we are using the wrong classloader. Could you open a JIRA? On Thu, Jun 18, 2015 at 2:56 PM, Chad Urso McDaniel cha...@gmail.com wrote: We are seeing class exceptions when converting to a DataFrame. Anyone out there with some suggestions on what is going on? Our original intention was to use a HiveContext to write ORC and we say the error there and have narrowed it down. This is an example of our code: --- def saveVisitsAsOrcFile(sqlContext: SQLContext, rdd: RDD[Visit], outputDir: String) { // works!: println(rdd count: + rdd.map(_.clicks.size).sum) import sqlContext.implicits._ // scala.ScalaReflectionException: class com.rr.data.Visit print(rdd.toDF.count: + rdd .toDF() .count()) --- This runs locally, but when using spark-submit with 1.4 we get: Exception in thread main scala.ScalaReflectionException: class com.rr.data.Visit in JavaMirror with sun.misc.Launcher$AppClassLoader@5c647e05 of type class sun.misc.Launcher$AppClassLoader with classpath [file:/home/candiru/tewfik/,file:/home/candiru/tewfik/spark-1.4.0-bin-tewfik-spark/conf/,file:/home/candiru/tewfik/spark-1.4.0-bin-tewfik-spark/lib/spark-assembly-1.4.0-hadoop2.0.0-mr1-cdh4.2.0.jar,file:/home/candiru/tewfik/spark-1.4.0-bin-tewfik-spark/lib/datanucleus-api-jdo-3.2.6.jar,file:/home/candiru/tewfik/spark-1.4.0-bin-tewfik-spark/lib/datanucleus-core-3.2.10.jar,file:/home/candiru/tewfik/spark-1.4.0-bin-tewfik-spark/lib/datanucleus-rdbms-3.2.9.jar] and parent being sun.misc.Launcher$ExtClassLoader@1c79d093 of type class sun.misc.Launcher$ExtClassLoader with classpath [file:/usr/java/jdk1.8.0_05/jre/lib/ext/cldrdata.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/dnsns.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/jfxrt.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/localedata.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/nashorn.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/sunec.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/sunjce_provider.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/sunpkcs11.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/zipfs.jar] and parent being primordial classloader with boot classpath [/usr/java/jdk1.8.0_05/jre/lib/resources.jar:/usr/java/jdk1.8.0_05/jre/lib/rt.jar:/usr/java/jdk1.8.0_05/jre/lib/sunrsasign.jar:/usr/java/jdk1.8.0_05/jre/lib/jsse.jar:/usr/java/jdk1.8.0_05/jre/lib/jce.jar:/usr/java/jdk1.8.0_05/jre/lib/charsets.jar:/usr/java/jdk1.8.0_05/jre/lib/jfr.jar:/usr/java/jdk1.8.0_05/jre/classes] not found. at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:123) at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:22) at com.rr.data.visits.orc.OrcReadWrite$$typecreator2$1.apply(OrcReadWrite.scala:36) at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:232) at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:232) at org.apache.spark.sql.catalyst.ScalaReflection$class.localTypeOf(ScalaReflection.scala:71) at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:59) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:28) at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:410) at org.apache.spark.sql.SQLContext$implicits$.rddToDataFrameHolder(SQLContext.scala:335) at com.rr.data.visits.orc.OrcReadWrite$.saveVisitsAsOrcFile(OrcReadWrite.scala:36) at com.rr.data.visits.VisitSequencerRunner$.main(VisitSequencerRunner.scala:43) at com.rr.data.visits.VisitSequencerRunner.main(VisitSequencerRunner.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Re: Shuffle produces one huge partition and many tiny partitions
I got the same problem with rdd,repartition() in my streaming app, which generated a few huge partitions and many tiny partitions. The resulting high data skew makes the processing time of a batch unpredictable and often exceeding the batch interval. I eventually solved the problem by using rdd.coalesce() instead, which however is expensive as it yields a lot of shuffle traffic and also takes a long time. Du On Thursday, June 18, 2015 1:00 AM, Al M alasdair.mcbr...@gmail.com wrote: Thanks for the suggestion. Repartition didn't help us unfortunately. It still puts everything into the same partition. We did manage to improve the situation by making a new partitioner that extends HashPartitioner. It treats certain exception keys differently. These keys that are known to appear very often are assigned random partitions instead of using the existing partitioning mechanism. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-produces-one-huge-partition-and-many-tiny-partitions-tp23358p23387.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
confusing ScalaReflectionException with DataFrames in 1.4
We are seeing class exceptions when converting to a DataFrame. Anyone out there with some suggestions on what is going on? Our original intention was to use a HiveContext to write ORC and we say the error there and have narrowed it down. This is an example of our code: --- def saveVisitsAsOrcFile(sqlContext: SQLContext, rdd: RDD[Visit], outputDir: String) { // works!: println(rdd count: + rdd.map(_.clicks.size).sum) import sqlContext.implicits._ // scala.ScalaReflectionException: class com.rr.data.Visit print(rdd.toDF.count: + rdd .toDF() .count()) --- This runs locally, but when using spark-submit with 1.4 we get: Exception in thread main scala.ScalaReflectionException: class com.rr.data.Visit in JavaMirror with sun.misc.Launcher$AppClassLoader@5c647e05 of type class sun.misc.Launcher$AppClassLoader with classpath [file:/home/candiru/tewfik/,file:/home/candiru/tewfik/spark-1.4.0-bin-tewfik-spark/conf/,file:/home/candiru/tewfik/spark-1.4.0-bin-tewfik-spark/lib/spark-assembly-1.4.0-hadoop2.0.0-mr1-cdh4.2.0.jar,file:/home/candiru/tewfik/spark-1.4.0-bin-tewfik-spark/lib/datanucleus-api-jdo-3.2.6.jar,file:/home/candiru/tewfik/spark-1.4.0-bin-tewfik-spark/lib/datanucleus-core-3.2.10.jar,file:/home/candiru/tewfik/spark-1.4.0-bin-tewfik-spark/lib/datanucleus-rdbms-3.2.9.jar] and parent being sun.misc.Launcher$ExtClassLoader@1c79d093 of type class sun.misc.Launcher$ExtClassLoader with classpath [file:/usr/java/jdk1.8.0_05/jre/lib/ext/cldrdata.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/dnsns.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/jfxrt.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/localedata.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/nashorn.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/sunec.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/sunjce_provider.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/sunpkcs11.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/zipfs.jar] and parent being primordial classloader with boot classpath [/usr/java/jdk1.8.0_05/jre/lib/resources.jar:/usr/java/jdk1.8.0_05/jre/lib/rt.jar:/usr/java/jdk1.8.0_05/jre/lib/sunrsasign.jar:/usr/java/jdk1.8.0_05/jre/lib/jsse.jar:/usr/java/jdk1.8.0_05/jre/lib/jce.jar:/usr/java/jdk1.8.0_05/jre/lib/charsets.jar:/usr/java/jdk1.8.0_05/jre/lib/jfr.jar:/usr/java/jdk1.8.0_05/jre/classes] not found. at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:123) at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:22) at com.rr.data.visits.orc.OrcReadWrite$$typecreator2$1.apply(OrcReadWrite.scala:36) at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:232) at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:232) at org.apache.spark.sql.catalyst.ScalaReflection$class.localTypeOf(ScalaReflection.scala:71) at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:59) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:28) at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:410) at org.apache.spark.sql.SQLContext$implicits$.rddToDataFrameHolder(SQLContext.scala:335) at com.rr.data.visits.orc.OrcReadWrite$.saveVisitsAsOrcFile(OrcReadWrite.scala:36) at com.rr.data.visits.VisitSequencerRunner$.main(VisitSequencerRunner.scala:43) at com.rr.data.visits.VisitSequencerRunner.main(VisitSequencerRunner.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Re: Spark-sql versus Impala versus Hive
Interesting. What where the Hive settings? Specifically it would be useful to know if this was Hive on Tez. - Steve From: Sanjay Subramanian Reply-To: Sanjay Subramanian Date: Thursday, June 18, 2015 at 11:08 To: user@spark.apache.orgmailto:user@spark.apache.org Subject: Spark-sql versus Impala versus Hive I just published results of my findings here https://bigdatalatte.wordpress.com/2015/06/18/spark-sql-versus-impala-versus-hive/
Re: confusing ScalaReflectionException with DataFrames in 1.4
We're using the normal command line: --- bin/spark-submit --properties-file ./spark-submit.conf --class com.rr.data.visits.VisitSequencerRunner ./mvt-master-SNAPSHOT-jar-with-dependencies.jar --- Our jar contains both com.rr.data.visits.orc.OrcReadWrite (which you can see in the stack trace) and the unfound com.rr.data.Visit. I'll open a Jira ticket On Thu, Jun 18, 2015 at 3:26 PM Michael Armbrust mich...@databricks.com wrote: How are you adding com.rr.data.Visit to spark? With --jars? It is possible we are using the wrong classloader. Could you open a JIRA? On Thu, Jun 18, 2015 at 2:56 PM, Chad Urso McDaniel cha...@gmail.com wrote: We are seeing class exceptions when converting to a DataFrame. Anyone out there with some suggestions on what is going on? Our original intention was to use a HiveContext to write ORC and we say the error there and have narrowed it down. This is an example of our code: --- def saveVisitsAsOrcFile(sqlContext: SQLContext, rdd: RDD[Visit], outputDir: String) { // works!: println(rdd count: + rdd.map(_.clicks.size).sum) import sqlContext.implicits._ // scala.ScalaReflectionException: class com.rr.data.Visit print(rdd.toDF.count: + rdd .toDF() .count()) --- This runs locally, but when using spark-submit with 1.4 we get: Exception in thread main scala.ScalaReflectionException: class com.rr.data.Visit in JavaMirror with sun.misc.Launcher$AppClassLoader@5c647e05 of type class sun.misc.Launcher$AppClassLoader with classpath [file:/home/candiru/tewfik/,file:/home/candiru/tewfik/spark-1.4.0-bin-tewfik-spark/conf/,file:/home/candiru/tewfik/spark-1.4.0-bin-tewfik-spark/lib/spark-assembly-1.4.0-hadoop2.0.0-mr1-cdh4.2.0.jar,file:/home/candiru/tewfik/spark-1.4.0-bin-tewfik-spark/lib/datanucleus-api-jdo-3.2.6.jar,file:/home/candiru/tewfik/spark-1.4.0-bin-tewfik-spark/lib/datanucleus-core-3.2.10.jar,file:/home/candiru/tewfik/spark-1.4.0-bin-tewfik-spark/lib/datanucleus-rdbms-3.2.9.jar] and parent being sun.misc.Launcher$ExtClassLoader@1c79d093 of type class sun.misc.Launcher$ExtClassLoader with classpath [file:/usr/java/jdk1.8.0_05/jre/lib/ext/cldrdata.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/dnsns.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/jfxrt.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/localedata.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/nashorn.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/sunec.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/sunjce_provider.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/sunpkcs11.jar,file:/usr/java/jdk1.8.0_05/jre/lib/ext/zipfs.jar] and parent being primordial classloader with boot classpath [/usr/java/jdk1.8.0_05/jre/lib/resources.jar:/usr/java/jdk1.8.0_05/jre/lib/rt.jar:/usr/java/jdk1.8.0_05/jre/lib/sunrsasign.jar:/usr/java/jdk1.8.0_05/jre/lib/jsse.jar:/usr/java/jdk1.8.0_05/jre/lib/jce.jar:/usr/java/jdk1.8.0_05/jre/lib/charsets.jar:/usr/java/jdk1.8.0_05/jre/lib/jfr.jar:/usr/java/jdk1.8.0_05/jre/classes] not found. at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:123) at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:22) at com.rr.data.visits.orc.OrcReadWrite$$typecreator2$1.apply(OrcReadWrite.scala:36) at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:232) at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:232) at org.apache.spark.sql.catalyst.ScalaReflection$class.localTypeOf(ScalaReflection.scala:71) at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:59) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:28) at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:410) at org.apache.spark.sql.SQLContext$implicits$.rddToDataFrameHolder(SQLContext.scala:335) at com.rr.data.visits.orc.OrcReadWrite$.saveVisitsAsOrcFile(OrcReadWrite.scala:36) at com.rr.data.visits.VisitSequencerRunner$.main(VisitSequencerRunner.scala:43) at com.rr.data.visits.VisitSequencerRunner.main(VisitSequencerRunner.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Re: Shuffle produces one huge partition and many tiny partitions
Doesn't repartition call coalesce(shuffle=true)? On Jun 18, 2015 6:53 PM, Du Li l...@yahoo-inc.com.invalid wrote: I got the same problem with rdd,repartition() in my streaming app, which generated a few huge partitions and many tiny partitions. The resulting high data skew makes the processing time of a batch unpredictable and often exceeding the batch interval. I eventually solved the problem by using rdd.coalesce() instead, which however is expensive as it yields a lot of shuffle traffic and also takes a long time. Du On Thursday, June 18, 2015 1:00 AM, Al M alasdair.mcbr...@gmail.com wrote: Thanks for the suggestion. Repartition didn't help us unfortunately. It still puts everything into the same partition. We did manage to improve the situation by making a new partitioner that extends HashPartitioner. It treats certain exception keys differently. These keys that are known to appear very often are assigned random partitions instead of using the existing partitioning mechanism. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-produces-one-huge-partition-and-many-tiny-partitions-tp23358p23387.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Submitting Spark Applications using Spark Submit
You can specify the jars of your application to be included with spark-submit with the /--jars/ switch. Otherwise, are you sure that your newly compiled spark jar assembly is in assembly/target/scala-2.10/? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Submitting-Spark-Applications-using-Spark-Submit-tp23352p23400.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Settings for K-Means Clustering in Mlib for large data set
With 80,000 features and 1000 clusters, you need 80,000,000 doubles to store the cluster centers. That is ~600MB. If there are 10 partitions, you might need 6GB on the driver to collect updates from workers. I guess the driver died. Did you specify driver memory with spark-submit? -Xiangrui On Thu, Jun 18, 2015 at 12:22 PM, Rogers Jeffrey rogers.john2...@gmail.com wrote: Hi All, I am trying to run KMeans clustering on a large data set with 12,000 points and 80,000 dimensions. I have a spark cluster in Ec2 stand alone mode with 8 workers running on 2 slaves with 160 GB Ram and 40 VCPU. My Code is as Follows: def convert_into_sparse_vector(A): non_nan_indices=np.nonzero(~np.isnan(A) ) non_nan_values=A[non_nan_indices] dictionary=dict(zip(non_nan_indices[0],non_nan_values)) return Vectors.sparse (len(A),dictionary) X=[convert_into_sparse_vector(A) for A in complete_dataframe.values ] sc=SparkContext(appName=parallel_kmeans) data=sc.parallelize(X,10) model = KMeans.train(data, 1000, initializationMode=k-means||) where complete_dataframe is a pandas data frame that has my data. I get the error: Py4JNetworkError: An error occurred while trying to connect to the Java server. The error trace is as follows: Exception happened during processing of request from ('127.0.0.1', 41360) Traceback (most recent call last): File /usr/lib64/python2.6/SocketServer.py, line 283, in _handle_request_noblock self.process_request(request, client_address) File /usr/lib64/python2.6/SocketServer.py, line 309, in process_request self.finish_request(request, client_address) File /usr/lib64/python2.6/SocketServer.py, line 322, in finish_request self.RequestHandlerClass(request, client_address, self) File /usr/lib64/python2.6/SocketServer.py, line 617, in __init__ self.handle() File /root/spark/python/pyspark/accumulators.py, line 235, in handle num_updates = read_int(self.rfile) File /root/spark/python/pyspark/serializers.py, line 544, in read_int raise EOFError EOFError --- Py4JNetworkError Traceback (most recent call last) ipython-input-13-3dd00c2c5e93 in module() 1 model = KMeans.train(data, 1000, initializationMode=k-means||) /root/spark/python/pyspark/mllib/clustering.pyc in train(cls, rdd, k, maxIterations, runs, initializationMode, seed, initializationSteps, epsilon) 134 Train a k-means clustering model. 135 model = callMLlibFunc(trainKMeansModel, rdd.map(_convert_to_vector), k, maxIterations, -- 136 runs, initializationMode, seed, initializationSteps, epsilon) 137 centers = callJavaFunc(rdd.context, model.clusterCenters) 138 return KMeansModel([c.toArray() for c in centers]) /root/spark/python/pyspark/mllib/common.pyc in callMLlibFunc(name, *args) 126 sc = SparkContext._active_spark_context 127 api = getattr(sc._jvm.PythonMLLibAPI(), name) -- 128 return callJavaFunc(sc, api, *args) 129 130 /root/spark/python/pyspark/mllib/common.pyc in callJavaFunc(sc, func, *args) 119 Call Java Function 120 args = [_py2java(sc, a) for a in args] -- 121 return _java2py(sc, func(*args)) 122 123 /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args) 534 END_COMMAND_PART 535 -- 536 answer = self.gateway_client.send_command(command) 537 return_value = get_return_value(answer, self.gateway_client, 538 self.target_id, self.name) /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in send_command(self, command, retry) 367 if retry: 368 #print_exc() -- 369 response = self.send_command(command) 370 else: 371 response = ERROR /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in send_command(self, command, retry) 360 the Py4J protocol. 361 -- 362 connection = self._get_connection() 363 try: 364 response = connection.send_command(command) /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in _get_connection(self) 316 connection = self.deque.pop() 317 except Exception: -- 318 connection = self._create_connection() 319 return connection 320 /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in _create_connection(self) 323 connection = GatewayConnection(self.address, self.port, 324 self.auto_close, self.gateway_property) -- 325 connection.start() 326 return connection 327
Re: Shuffle produces one huge partition and many tiny partitions
repartition() means coalesce(shuffle=false) On Thursday, June 18, 2015 4:07 PM, Corey Nolet cjno...@gmail.com wrote: Doesn't repartition call coalesce(shuffle=true)?On Jun 18, 2015 6:53 PM, Du Li l...@yahoo-inc.com.invalid wrote: I got the same problem with rdd,repartition() in my streaming app, which generated a few huge partitions and many tiny partitions. The resulting high data skew makes the processing time of a batch unpredictable and often exceeding the batch interval. I eventually solved the problem by using rdd.coalesce() instead, which however is expensive as it yields a lot of shuffle traffic and also takes a long time. Du On Thursday, June 18, 2015 1:00 AM, Al M alasdair.mcbr...@gmail.com wrote: Thanks for the suggestion. Repartition didn't help us unfortunately. It still puts everything into the same partition. We did manage to improve the situation by making a new partitioner that extends HashPartitioner. It treats certain exception keys differently. These keys that are known to appear very often are assigned random partitions instead of using the existing partitioning mechanism. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-produces-one-huge-partition-and-many-tiny-partitions-tp23358p23387.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Settings for K-Means Clustering in Mlib for large data set
I am submitting the application from a python notebook. I am launching pyspark as follows: SPARK_PUBLIC_DNS=ec2-54-165-202-17.compute-1.amazonaws.com SPARK_WORKER_CORES=8 SPARK_WORKER_MEMORY=15g SPARK_MEM=30g OUR_JAVA_MEM=30g SPARK_DAEMON_JAVA_OPTS=-XX:MaxPermSize=30g -Xms30g -Xmx30g IPYTHON=1 PYSPARK_PYTHON=/usr/bin/python SPARK_PRINT_LAUNCH_COMMAND=1 ./spark/bin/pyspark --master spark:// 54.165.202.17.compute-1.amazonaws.com:7077 --deploy-mode client I guess I should be adding another extra argument --conf spark.driver.memory=15g . Is that correct? Regards, Rogers Jeffrey L On Thu, Jun 18, 2015 at 7:50 PM, Xiangrui Meng men...@gmail.com wrote: With 80,000 features and 1000 clusters, you need 80,000,000 doubles to store the cluster centers. That is ~600MB. If there are 10 partitions, you might need 6GB on the driver to collect updates from workers. I guess the driver died. Did you specify driver memory with spark-submit? -Xiangrui On Thu, Jun 18, 2015 at 12:22 PM, Rogers Jeffrey rogers.john2...@gmail.com wrote: Hi All, I am trying to run KMeans clustering on a large data set with 12,000 points and 80,000 dimensions. I have a spark cluster in Ec2 stand alone mode with 8 workers running on 2 slaves with 160 GB Ram and 40 VCPU. My Code is as Follows: def convert_into_sparse_vector(A): non_nan_indices=np.nonzero(~np.isnan(A) ) non_nan_values=A[non_nan_indices] dictionary=dict(zip(non_nan_indices[0],non_nan_values)) return Vectors.sparse (len(A),dictionary) X=[convert_into_sparse_vector(A) for A in complete_dataframe.values ] sc=SparkContext(appName=parallel_kmeans) data=sc.parallelize(X,10) model = KMeans.train(data, 1000, initializationMode=k-means||) where complete_dataframe is a pandas data frame that has my data. I get the error: Py4JNetworkError: An error occurred while trying to connect to the Java server. The error trace is as follows: Exception happened during processing of request from ('127.0.0.1', 41360) Traceback (most recent call last): File /usr/lib64/python2.6/SocketServer.py, line 283, in _handle_request_noblock self.process_request(request, client_address) File /usr/lib64/python2.6/SocketServer.py, line 309, in process_request self.finish_request(request, client_address) File /usr/lib64/python2.6/SocketServer.py, line 322, in finish_request self.RequestHandlerClass(request, client_address, self) File /usr/lib64/python2.6/SocketServer.py, line 617, in __init__ self.handle() File /root/spark/python/pyspark/accumulators.py, line 235, in handle num_updates = read_int(self.rfile) File /root/spark/python/pyspark/serializers.py, line 544, in read_int raise EOFError EOFError --- Py4JNetworkError Traceback (most recent call last) ipython-input-13-3dd00c2c5e93 in module() 1 model = KMeans.train(data, 1000, initializationMode=k-means||) /root/spark/python/pyspark/mllib/clustering.pyc in train(cls, rdd, k, maxIterations, runs, initializationMode, seed, initializationSteps, epsilon) 134 Train a k-means clustering model. 135 model = callMLlibFunc(trainKMeansModel, rdd.map(_convert_to_vector), k, maxIterations, -- 136 runs, initializationMode, seed, initializationSteps, epsilon) 137 centers = callJavaFunc(rdd.context, model.clusterCenters) 138 return KMeansModel([c.toArray() for c in centers]) /root/spark/python/pyspark/mllib/common.pyc in callMLlibFunc(name, *args) 126 sc = SparkContext._active_spark_context 127 api = getattr(sc._jvm.PythonMLLibAPI(), name) -- 128 return callJavaFunc(sc, api, *args) 129 130 /root/spark/python/pyspark/mllib/common.pyc in callJavaFunc(sc, func, *args) 119 Call Java Function 120 args = [_py2java(sc, a) for a in args] -- 121 return _java2py(sc, func(*args)) 122 123 /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args) 534 END_COMMAND_PART 535 -- 536 answer = self.gateway_client.send_command(command) 537 return_value = get_return_value(answer, self.gateway_client, 538 self.target_id, self.name) /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in send_command(self, command, retry) 367 if retry: 368 #print_exc() -- 369 response = self.send_command(command) 370 else: 371 response = ERROR /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in send_command(self, command, retry) 360
RE: Machine Learning on GraphX
What is GraphX: - It can be viewed as a kind of Distributed, Parallel, Graph Database - It can be viewed as Graph Data Structure (Data Structures 101 from your CS course) - It features some off the shelve algos for Graph Processing and Navigation (Algos and Data Structures 101) and the implementation of these takes advantage of the distributed parallel nature of GrapphX Any of the MLib algos can be applied to ANY data structure from time series to graph to matrix/tabular etc – it is up to your needs and imagination As an example – Clustering – you can apply it to Graph Data Structure BUT you may also leverage the Graph inherent connection/clustering properties and Graph algos taking advantage of that Instead of e.g. the run of the mill K-Means which is ok for te.g. time series, matrix etc data structures From: Timothée Rebours [mailto:t.rebo...@gmail.com] Sent: Thursday, June 18, 2015 10:44 AM To: Akhil Das Cc: user@spark.apache.org Subject: Re: Machine Learning on GraphX Thanks for the quick answer. I've already followed this tutorial but it doesn't use GraphX at all. My goal would be to work directly on the graph, and not extracting edges and vertices from the graph as standard RDDs and then work on that with the standard MLlib's ALS, which has no interest. That's why I tried with the other implementation, but it's not optimized at all. I might have gone in the wrong direction with the ALS, but I'd like to see what's possible to do with MLlib on GraphX. Any idea ? 2015-06-18 11:19 GMT+02:00 Akhil Das ak...@sigmoidanalytics.com: This might give you a good start http://ampcamp.berkeley.edu/big-data-mini-course/movie-recommendation-with-mllib.html its a bit old though. Thanks Best Regards On Thu, Jun 18, 2015 at 2:33 PM, texol t.rebo...@gmail.com wrote: Hi, I'm new to GraphX and I'd like to use Machine Learning algorithms on top of it. I wanted to write a simple program implementing MLlib's ALS on a bipartite graph (a simple movie recommendation), but didn't succeed. I found an implementation on Spark 1.1.x (https://github.com/ankurdave/spark/blob/GraphXALS/graphx/src/main/scala/org/apache/spark/graphx/lib/ALS.scala) of ALS on GraphX, but it is painfully slow compared to the standard implementation, and uses the deprecated (in the current version) PregelVertex class. Do we expect a new implementation ? Is there a smarter solution to do so ? Thanks, Regards, Timothée Rebours. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Machine-Learning-on-GraphX-tp23388.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Timothée Rebours 13, rue Georges Bizet 78380 BOUGIVAL
Re: *Metrics API is odd in MLLib
Firstly apologies for the header of my email containing some junk, I believe it's due to a copy and paste error on a smart phone. Thanks for your response. I will indeed make the PR you suggest, though glancing at the code I realize it's not just a case of making these public since the types are also private. Then, there is certain functionality I will be exposing, which then ought to be tested, e.g. every bin except potentially the last will have an equal number of data points in it*. I'll get round to it at some point. As for BinaryClassificationMetrics using Double for labels, thanks for the explanation. If I where to make a PR to encapsulate the underlying implementation (that uses LabeledPoint) and change the type to Boolean, would what be the impact to versioning (since I'd be changing public API)? An alternative would be to create a new wrapper class, say BinaryClassificationMeasures, and deprecate the old with the intention of migrating all the code into the new class. * Maybe some other part of the code base tests this, since this assumption must hold in order to average across folds in x-validation? On Thu, Jun 18, 2015 at 1:02 AM, Xiangrui Meng men...@gmail.com wrote: LabeledPoint was used for both classification and regression, where label type is Double for simplicity. So in BinaryClassificationMetrics, we still use Double for labels. We compute the confusion matrix at each threshold internally, but this is not exposed to users ( https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala#L127). Feel free to submit a PR to make it public. -Xiangrui On Mon, Jun 15, 2015 at 7:13 AM, Sam samthesav...@gmail.com wrote: Google+ https://plus.google.com/app/basic?nopromo=1source=moggl=uk http://mail.google.com/mail/x/mog-/gp/?source=moggl=uk Calendar https://www.google.com/calendar/gpcal?source=moggl=uk Web http://www.google.co.uk/?source=moggl=uk more Inbox Apache Spark Email GmailNot Work S sam.sav...@barclays.com to me 0 minutes ago Details According to https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.mllib.evaluation.BinaryClassificationMetrics The constructor takes `RDD[(Double, Double)]` meaning lables are Doubles, this seems odd, shouldn't it be Boolean? Similarly for MutlilabelMetrics (I.e. Should be RDD[(Array[Double], Array[Boolean])]), and for MulticlassMetrics the type of both should be generic? Additionally it would be good if either the ROC output type was changed or another method was added that returned confusion matricies, so that the hard integer values can be obtained before the divisions. E.g. ``` case class Confusion(tp: Int, fp: Int, fn: Int, tn: Int) { // bunch of methods for each of the things in the table here https://en.wikipedia.org/wiki/Receiver_operating_characteristic } ... def confusions(): RDD[Confusion] ```
Re: understanding on the waiting batches and scheduling delay in Streaming UI
Which version of spark? and what is your data source? For some reason, your processing delay is exceeding the batch duration. And its strange that you are not seeing any scheduling delay. Thanks Best Regards On Thu, Jun 18, 2015 at 7:29 AM, Mike Fang chyfan...@gmail.com wrote: Hi, I have a spark streaming program running for ~ 25hrs. When I check the Streaming UI tab. I found the “Waiting batches” is 144. But the “scheduling delay” is 0. I am a bit confused. If the “waiting batches” is 144, that means many batches are waiting in the queue to be processed? If this is the case, the scheduling delay should be high rather than 0. Am I missing anything? Thanks, Mike
Fwd: mllib from sparkR
Hi, I was wondering if it is possible to use MLlib function inside SparkR, as outlined at the Spark Summer East 2015 Warmup meetup: http://www.meetup.com/Spark-NYC/events/220850389/ Are there available examples? Thank you! Elena