Running in local mode as SQL engine - what to optimize?
Hi all, For several reasons which I won't elaborate (yet), we're using Spark on local mode as an in memory SQL engine for data we're retrieving from Cassandra, execute SQL queries and return to the client - so no cluster, no worker nodes. I'm well aware local mode has always been considered a testing mode, but it does fit our purposes at the moment We're on Spark 2.0.0 I'm finding several challenges which I would like to get some comments if possible: 1 - For group by based SQL queries I'm finding shuffle disk spills to constantly happen, to a point where after a couple of days I have 9GB of disk filled in the block manager folder with broadcast files. My understanding is that disk spills only occur during the lifetime of an RDD. Once the RDD is gone from memory, so should the files, this doesn't seem to be happening. Is there any way of completely disable the disk spills? I've tweaked the memory fraction configuration to maximize execution memory and avoid the disk spills but doesn't seem to have done much to avoid the spills... 2 - GC overhead is overwhelming - when refreshing an Dataframe (even empty data!) and executing 1 group by queries every second on around 1MB of data, the amount of Young Gen used goes up to 2GB every 10 seconds. I've started profiling the JVM and can find considerable number of hashmap objects which I assume are created internally in Spark. 3 - I'm really looking for low latency multithreaded refreshes and collection of data frames - in order of milliseconds of query execution and collection of data within this local node, and I'm afraid goes against the nature of spark. Spark partitions all data s blocks and uses the scheduler for its multi-node design, and that's great for multi-node execution. For a local node execution adds considerable overhead, and I'm aware of this constraint, the hope is that we could optimize it to do the point where this kind of usage becomes a possibility - in memory efficient SQL engine within the same JVM where the data lives. Any suggestions are very welcomed! Thanks in advance, Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-in-local-mode-as-SQL-engine-what-to-optimize-tp27815.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Spark on Apache Ingnite?
Although I haven't work explicitly with either, they do seem to differ in design and consequently in usage scenarios. Ignite is claimed to be a pure in-memory distributed database. With Ignite, updating existing keys is something that is self-managed comparing with Tachyon. In Tachyon once a value is created for a given key, becomes immutable, so you either delete and insert again, or need to manage/update the tachyon keys yourself. Also, Tachyon's resilience design is based on the underlying file system (typically hadoop), which means that if a node goes down, to recover the lost data, it would need first to have been persisted on the corresponding file partition. With Ignite, there is no master dependency like with Tachyon, and my understanding is that API calls will depend on master's availability in Tachyon. I believe Ignite has some options for replication which would be more aligned with the in-memory datastore. If you are looking for persisting some RDD's output into an in-memory store and query it outside of Spark, on the paper Ignite sounds like a better solution. Since you are asking about Ignite benefits that was the focus of my response. Tachyon has its own benefits like the community support and the Spark lineage persistency integration. If you are doing batch based processing and want to persist fast Spark RDDs, Tachyon is your friend. Hope this helps. Tnks, Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-on-Apache-Ingnite-tp25884p25933.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Scala 2.11 and Akka 2.4.0
Hi Manas, Thanks for the reply. I've done that. The problem lies with Spark + akka 2.4.0 build. Seems the maven shader plugin is altering some class files and breaking the Akka runtime. Seems the Spark build on Scala 2.11 using SBT is broken. I'm getting build errors using sbt due to the issues found in the below thread in July of this year. https://mail-archives.apache.org/mod_mbox/spark-dev/201507.mbox/%3CCA+3qhFSJGmZToGmBU1=ivy7kr6eb7k8t6dpz+ibkstihryw...@mail.gmail.com%3E So I went back to maven and decided to risk building Spark on akka 2.3.11 and force the akka 2.4.0 jars onto the server's classpath. I find this a temporary solution while I cannot have a proper akka 2.4.0 runable build. If anyone has managed to get it working, please let me know. tnks, Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Scala-2-11-and-Akka-2-4-0-tp25535p25618.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Streaming - Minimizing batch interval
I've been given a feature requirement that means processing events on a latency lower than 0.25ms. Meaning I would have to make sure that Spark streaming gets new events from the messaging layer within that period of time. Would anyone have achieve such numbers using a Spark cluster? Or would this be even possible, even assuming we don't use the write ahead logs... tnks in advance! Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Minimizing-batch-interval-tp7.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming checkpoint recovery causes IO re-execution
Hi Hannes, Good to know I'm not alone on the boat. Sorry about not posting back, I haven't gone in a while onto the user list. It's on my agenda to get over this issue. Will be very important for our recovery implementation. I have done an internal proof of concept but without any conclusions so far. The main approach is to have full control over offsets, meaning upon each processed batch we will need to persist the last processed event (I'm using Kafka btw) and keep the offset somewhere, so that upon recovery we only start the streaming from the last processed one. This kind of goes in conflict with the new ReliableReceiver implementation, where that control is taken away from the processing layer... When recovering Spark Streaming, we need to control the recovered batches so that only internal state gets updated and no IO gets executed. For this we need to make internal changes to Spark Streaming I exposed a function that identifies how many batches are being recovered. Then I passed that info upfront to the workers, and with a counter they are aware of how many batches were recomputed, thus avoiding IO re-execution. This is very much in embryo stage so I can't actually help you much at this stage... This is the function I've created inside JobGenerator class to access the recovered batches: def getDownTimes() : Seq[Time] = { println(123) if (ssc.isCheckpointPresent) { val batchDuration = ssc.graph.batchDuration // Batches when the master was down, that is, // between the checkpoint and current restart time val checkpointTime = ssc.initialCheckpoint.checkpointTime val restartTime = new Time(timer.getRestartTime(graph.zeroTime.milliseconds)) val downTimes = checkpointTime.until(restartTime, batchDuration) logInfo(Batches during down time ( + downTimes.size + batches): + downTimes.mkString(, )) downTimes } else Seq[Time]() } Has been a while since I last visited this issue so I'm probably not able to give you too many details right now, but I expect to have a concrete solution on which ultimately I could push as proposal to the Spark dev team. I will definitely notify people on this thread at least. Tnks, Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-causes-IO-re-execution-tp12568p21265.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low Level Kafka Consumer for Spark
Hi Dibyendu,What are your thoughts on keeping this solution (or not), considering that Spark Streaming v1.2 will have built-in recoverability of the received data?https://issues.apache.org/jira/browse/SPARK-1647I'm concerned about the complexity of this solution with regards the added complexity and performance overhead by the writing of big amounts of data into HDFS on a small batch interval.https://docs.google.com/document/d/1vTCB5qVfyxQPlHuv8rit9-zjdttlgaSrMgfCDQlCJIM/edit?pli=1# http://apache-spark-user-list.1001560.n3.nabble.com/file/n20181/spark_streaming_v.png I think the whole solution is well designed and thought but I'm afraid if it does really fit all needs with checkpoint based technologies like Flume or Kafka, by hiding away the management of the offset from the user code. If instead of saving received data into HDFS, the ReceiverHandler would be saving some metadata (such as offset in the case of Kafka) specified by the custom receiver passed into the StreamingContext, then upon driver restart, that metadata could be used by the custom receiver to recover the point from which it should start receiving data once more.Anyone's comments will be greatly appreciated.Tnks,Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p20181.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Dynamically switching Nr of allocated core
Hi all, I can't seem to find a clear answer on the documentation. Does the standalone cluster support dynamic assigment of nr of allocated cores to an application once another app stops? I'm aware that we can have core sharding if we use Mesos between active applications depending on the nr of parallel tasks I believe my question is slightly simpler. For example: 1 - There are 12 cores available in the cluster 2 - I start app A with 2 cores - gets 2 3 - I start app B - gets remaining 10 4 - If I stop app A, app B *does not* get the now available remaining 2 cores. Should I expect Mesos to have this scenario working? Also, the same question applies to when we add more cores to a cluster. Let's say ideally I want 12 cores for my app, although there are only 10. As I add more workers, they should get assigned to my app dynamically. I haven't tested this in a while but I think the app will not even start and complain about not enough resources... Would very much appreciate any knowledge share on this! tnks, Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Dynamically-switching-Nr-of-allocated-core-tp17955.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: NullPointerException on reading checkpoint files
Hi TD, This is actually an important requirement (recovery of shared variables) for us as we need to spread some referential data across the Spark nodes on application startup. I just bumped into this issue on Spark version 1.0.1. I assume the latest one also doesn't include this capability. Are there any plans to do so. If not could you give me your opinion on how difficult would it be to implement this? If it's nothing too complex I could consider contributing on that level. BTW, regarding recovery I have posted a topic on which I would very much appreciate your comments on http://apache-spark-user-list.1001560.n3.nabble.com/RDD-data-checkpoint-cleaning-td14847.html tnks, Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NullPointerException-on-reading-checkpoint-files-tp7306p14882.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: RDD data checkpoint cleaning
Hi TD, tnks for getting back on this. Yes that's what I was experiencing - data checkpoints were being recovered from considerable time before the last data checkpoint, probably since the beginning of the first writes, would have to confirm. I have some development on this though. These results are shown when I run the application from my Windows laptop where I have IntelliJ, while the HDFS file system is on a linux box (with a very reasonable latency!). Couldn't find any exception in the spark logs and I did see metadata checkpoints were recycled on the HDFS folder. Upon recovery I could see the usual Spark streaming timestamp prints on the console jumping from one data checkpoint moment to the next one very slowly. Once I moved the app to the linux box where I had HDFS this problem seemed to go away. If this issue is only happening when running from Windows I won't be so concerned and could go back testing everything on linux. My only concern is if because of substantial HDFS latency to the Spark app there is any kind of race condition between writes and cleanups of HDFS files that could have lead to this finding. Hope this description helps tnks again, Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-data-checkpoint-cleaning-tp14847p14935.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: RDD data checkpoint cleaning
Just a follow-up. Just to make sure about the RDDs not being cleaned up, I just replayed the app both on the windows remote laptop and then on the linux machine and at the same time was observing the RDD folders in HDFS. Confirming the observed behavior: running on the laptop I could see the RDDs continuously increasing. When I ran on linux, only two RDD folders were there and continuously being recycled. Metadata checkpoints were being cleaned on both scenarios. tnks, Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-data-checkpoint-cleaning-tp14847p14939.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming checkpoint recovery causes IO re-execution
Hi Yana, You are correct. What needs to be added is that besides RDDs being checkpointed, metadata which represents execution of computations are also checkpointed in Spark Streaming. Upon driver recovery, the last batches (the ones already executed and the ones that should have been executed while shut down) are recomputed. This is very good if we just want to recover state and if we don't have any other component or data store depending on Spark's output. In the case we do have that requirement (which is my case) all the nodes will re-execute all that IO provoking overall system inconsistency as the outside system were not expecting events from the past. We need some way of making Spark aware of which computations are recomputations and which are not so we can empower Spark developers to introduce specific logic if they need to. Let me know if any of this doesn't make sense. tnks, Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-causes-IO-re-execution-tp12568p13205.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low Level Kafka Consumer for Spark
Just a comment on the recovery part. Is it correct to say that currently Spark Streaming recovery design does not consider re-computations (upon metadata lineage recovery) that depend on blocks of data of the received stream? https://issues.apache.org/jira/browse/SPARK-1647 Just to illustrate a real use case (mine): - We have object states which have a Duration field per state which is incremented on every batch interval. Also this object state is reset to 0 upon incoming state changing events. Let's supposed there is at least one event since the last data checkpoint. This will lead to inconsistency upon driver recovery: The Duration field will get incremented from the data checkpoint version until the recovery moment, but the state change event will never be re-processed...so in the end we have the old state with the wrong Duration value. To make things worst, let's imagine we're dumping the Duration increases somewhere...which means we're spreading the problem across our system. Re-computation awareness is something I've commented on another thread and rather treat it separately. http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-causes-IO-re-execution-td12568.html#a13205 Re-computations do occur, but the only RDD's that are recovered are the ones from the data checkpoint. This is what we've seen. Is not enough by itself to ensure recovery of computed data and this partial recovery leads to inconsistency in some cases. Roger - I share the same question with you - I'm just not sure if the replicated data really gets persisted on every batch. The execution lineage is checkpointed, but if we have big chunks of data being consumed to Receiver node on let's say a second bases then having it persisted to HDFS every second could be a big challenge for keeping JVM performance - maybe that could be reason why it's not really implemented...assuming it isn't. Dibyendu had a great effort with the offset controlling code but the general state consistent recovery feels to me like another big issue to address. I plan on having a dive into the Streaming code and try to at least contribute with some ideas. Some more insight from anyone on the dev team will be very appreciated. tnks, Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13208.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming checkpoint recovery causes IO re-execution
Hi Yana, The fact is that the DB writing is happening on the node level and not on Spark level. One of the benefits of distributed computing nature of Spark is enabling IO distribution as well. For example, is much faster to have the nodes to write to Cassandra instead of having them all collected at the driver level and sending the writes from there. The problem is that nodes computations which get redone upon recovery. If these lambda functions send events to other systems these events would get resent upon re-computation causing overall system instability. Hope this helps you understand the problematic. tnks, Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-causes-IO-re-execution-tp12568p13043.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low Level Kafka Consumer for Spark
Hi Dibyendu, My colleague has taken a look at the spark kafka consumer github you have provided and started experimenting. We found that somehow when Spark has a failure after a data checkpoint, the expected re-computations correspondent to the metadata checkpoints are not recovered so we loose Kafka messages and RDD's computations in Spark. The impression is that this code is replacing quite a bit of Spark Kafka Streaming code where maybe (not sure) metadata checkpoints are done every batch interval. Was it on purpose to solely depend on the Kafka commit to recover data and recomputations between data checkpoints? If so, how to make this work? tnks Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p12757.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Running a task once on each executor
Hi Christopher, I am also in the need of having a single function call on the node level. Your suggestion makes sense as a solution to the requirement, but still feels like a workaround, this check will get called on every row...Also having static members and methods created specially on a multi-threaded environment is bad code smell. Would be nice to have a way of having a way of exposing the nodes that would allow simply invoking a function from the driver to the nodes without having to do any transformation and looping through every record. Would be more efficient and more flexible from a user's perspective. Tnks, Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-a-task-once-on-each-executor-tp3203p11908.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: streaming window not behaving as advertised (v1.0.1)
Hi TD, I've also been fighting this issue only to find the exact same solution you are suggesting. Too bad I didn't find either the post or the issue sooner. I'm using a 1 second batch with N amount of kafka events (1 to 1 with the state objects) per batch and only calling the updatestatebykey function. This is my interpretation, please correct me if needed: Because of Spark’s lazy computation the RDDs weren’t being updated as expected on the batch interval execution. The assumption was that as long as I have a streaming batch run (with or without new messages), I should get updated RDDs, which was not happening. We only get updateStateByKey calls for objects which got events or that are forced through an output function to compute. I did not make further test to confirm this, but that's the given impression. This doesn't fit our requirements as we want to do duration updates based on the batch interval execution...so I had to force the computation of all the objects through the ForeachRDD function. I will also appreciate if the priority can be increased to the issue. I assume the ForeachRDD is additional unnecessary resource allocation (although I'm not sure how much) as opposite to doing it somehow by default on batch interval execution. tnks, Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/streaming-window-not-behaving-as-advertised-v1-0-1-tp10453p11168.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: UpdatestateByKey assumptions
Just an update on this, Looking into Spark logs seems that some partitions are not found and recomputed. Gives the impression that those are related with the delayed updatestatebykey calls. I'm seeing something like: log line 1 - Partition rdd_132_1 not found, computing it log line N - Found block rdd_132_1 locally Log line N+1 - Goes into the updatestatebykey X times has many objects with delayed update Log line M - Done Checkpointing RDD 126 to hdfs:// This happens for Y amount of partitions as many seconds the updatestatebykey call is delayed. Tnks, Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/UpdatestateByKey-assumptions-tp10858p10859.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Cassandra driver Spark question
Tnks to both for the comments and the debugging suggestion, I will try to use. Regarding you comment, yes I do agree the current solution was not efficient but for using the saveToCassandra method I need an RDD thus the paralelize method. I finally got direct by Piotr to use the CassandraConnect and got this fixed in the meantime. Bottom line is I started using the new Cassandra Spark driver with async calls, prepared statements and batch executions on the node transformation and performance improved greatly. tnks, Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Cassandra-driver-Spark-question-tp9177p9990.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Cassandra driver Spark question
Hi all, I am currently trying to save to Cassandra after some Spark Streaming computation. I call a myDStream.foreachRDD so that I can collect each RDD in the driver app runtime and inside I do something like this: myDStream.foreachRDD(rdd = { var someCol = Seq[MyType]() foreach(kv ={ someCol :+ rdd._2 //I only want the RDD value and not the key } val collectionRDD = sc.parallelize(someCol) //THIS IS WHY IT FAILS TRYING TO RUN THE WORKER collectionRDD.saveToCassandra(...) } I get the NotSerializableException while trying to run the Node (also tried someCol as shared variable). I believe this happens because the myDStream doesn't exist yet when the code is pushed to the Node so the parallelize doens't have any structure to relate to it. Inside this foreachRDD I should only do RDD calls which are only related to other RDDs. I guess this was just a desperate attempt So I have a question Using the Cassandra Spark driver - Can we only write to Cassandra from an RDD? In my case I only want to write once all the computation is finished in a single batch on the driver app. tnks in advance. Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Cassandra-driver-Spark-question-tp9177.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Cassandra driver Spark question
Hi Luis, Yes it's actually an ouput of the previous RDD. Have you ever used the Cassandra Spark Driver on the driver app? I believe these limitations go around that - it's designed to save RDDs from the nodes. tnks, Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Cassandra-driver-Spark-question-tp9177p9187.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
RE: range partitioner with updateStateByKey
Hi TD, I have the same question: I need the workers to process using arrival order since it's updating a state based on previous one. tnks in advance. Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/range-partitioner-with-updateStateByKey-tp5190p7123.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: NoSuchElementException: key not found
Hi Tathagata, Im seeing the same issue on a load run over night with Kafka streaming (6000 mgs/sec) and 500millisec batch size. Again occasional and only happening after a few hours I believe Im using updateStateByKey. Any comment will be very welcome. tnks, Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NoSuchElementException-key-not-found-tp6743p7157.html Sent from the Apache Spark User List mailing list archive at Nabble.com.