Stopping StreamingContext
Hey, I have a Spark Streaming application processing some events. Sometimes, I want to stop the application if a get a specific event. I collect the executor's results in the driver and based on those results, I kill the StreamingContext using StreamingContext.stop(stopSparkContext=true). When I do that, I can see in the logs that the app is shutting down, closing receivers etc. But when I go to the master's web UI I can still see the app under "Running Applications". But if I click it, it says the endpoint doesn't exist. When I check the open processes on the machine, I can see that the job's process is still running. Am I closing the application wrong? Those are the logs once I call the stop() method: 2018-03-28 11:59:04 INFO KafkaProducer:615 - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. 2018-03-28 11:59:04 INFO ReceiverTracker:54 - Sent stop signal to all 1 receivers 2018-03-28 11:59:05 INFO BlockManagerInfo:54 - Added input-0-1522238344750 in memory on i-va-spark1:59059 (size: 1632.0 B, free: 579.2 MB) 2018-03-28 11:59:05 ERROR ReceiverTracker:70 - Deregistered receiver for stream 0: Stopped by driver 2018-03-28 11:59:05 INFO BlockManagerInfo:54 - Added input-0-1522238345000 in memory on i-va-spark1:59059 (size: 272.0 B, free: 579.2 MB) 2018-03-28 11:59:05 INFO TaskSetManager:54 - Finished task 0.0 in stage 2.0 (TID 70) in 30213 ms on i-va-spark1 (executor 0) (1/1) 2018-03-28 11:59:05 INFO TaskSchedulerImpl:54 - Removed TaskSet 2.0, whose tasks have all completed, from pool 2018-03-28 11:59:05 INFO DAGScheduler:54 - ResultStage 2 (start at UserLocationHistoryJob.scala:38) finished in 30.213 s 2018-03-28 11:59:05 INFO ReceiverTracker:54 - All of the receivers have deregistered successfully 2018-03-28 11:59:05 INFO ReceiverTracker:54 - ReceiverTracker stopped 2018-03-28 11:59:05 INFO JobGenerator:54 - Stopping JobGenerator immediately 2018-03-28 11:59:05 INFO RecurringTimer:54 - Stopped timer for JobGenerator after time 152223834 2018-03-28 11:59:05 INFO JobGenerator:54 - Stopped JobGenerator 2018-03-28 11:59:07 INFO JobScheduler:54 - Stopped JobScheduler 2018-03-28 11:59:07 INFO StreamingContext:54 - StreamingContext stopped successfully 2018-03-28 11:59:07 INFO BlockManagerInfo:54 - Removed broadcast_5_piece0 on 10.0.0.243:41976 in memory (size: 2.4 KB, free: 488.4 MB) 2018-03-28 11:59:07 INFO BlockManagerInfo:54 - Removed broadcast_5_piece0 on i-va-spark1:59059 in memory (size: 2.4 KB, free: 579.2 MB) 2018-03-28 11:59:07 INFO BlockManagerInfo:54 - Removed broadcast_4_piece0 on 10.0.0.243:41976 in memory (size: 23.9 KB, free: 488.4 MB) 2018-03-28 11:59:07 INFO BlockManagerInfo:54 - Removed broadcast_4_piece0 on i-va-spark1:59059 in memory (size: 23.9 KB, free: 579.2 MB) 2018-03-28 11:59:37 WARN QueuedThreadPool:178 - SparkUI{STOPPING,8<=9<=200,i=0,q=4} Couldn't stop Thread[SparkUI-171-selector-ServerConnectorManager@478b3e9/2,5,main] 2018-03-28 11:59:37 WARN QueuedThreadPool:178 - SparkUI{STOPPING,8<=9<=200,i=0,q=4} Couldn't stop Thread[SparkUI-172-selector-ServerConnectorManager@478b3e9/3,5,main] 2018-03-28 11:59:37 WARN QueuedThreadPool:178 - SparkUI{STOPPING,8<=9<=200,i=0,q=4} Couldn't stop Thread[SparkUI-169-selector-ServerConnectorManager@478b3e9/0,5,main] 2018-03-28 11:59:37 WARN QueuedThreadPool:178 - SparkUI{STOPPING,8<=9<=200,i=0,q=4} Couldn't stop Thread[SparkUI-170-selector-ServerConnectorManager@478b3e9/1,5,main] 2018-03-28 13:22:01 INFO DiskBlockManager:54 - Shutdown hook called 2018-03-28 13:22:01 INFO ShutdownHookManager:54 - Shutdown hook called 2018-03-28 13:22:01 INFO ShutdownHookManager:54 - Deleting directory /data/spark/scratch/spark-69a3a8a6-5504-4153-a4c1-059676861581 2018-03-28 13:22:01 INFO ShutdownHookManager:54 - Deleting directory /data/spark/scratch/spark-69a3a8a6-5504-4153-a4c1-059676861581/userFiles-8a970eec-da41-442b-9ccf-1621b9e9e045 Sidney Feiner / SW Developer M: +972.528197720 / Skype: sidney.feiner.startapp [emailsignature]
RE: [PySpark] - running processes and computing time
To initialize it per executor, I used a class with only class attibutes and class methods (like an `object` in Scala), but because I was using PySpark, it was actually being initiated per process ☹ What I went for was the broadcast variable but there still is something suspicious with my application – the processing time of each batch. In my logs, I see that when I process a partition, it takes under a second. But in the Spark UI I see that a task takes between 3 and 6 seconds. Shouldn't the partition process time and the task computing time be the same? My code: def process_func (obj, records): start = time() processed_records = # Some processing logger.info("It took {0} seconds to handle records".format(time() - start, events_amount)) # This logs very small numbers (around 0.05 seoonds) return analyzed_events def handle_rdd(rdd: RDD): start_time = time.time() rdd.foreachPartition(lambda records: process_func(object_broadcast.value, records)) logger.info("Handle RDD took: {0} seconds".format(time.time() - start_time)) # This logs much bigger numbers (around 3-6 seconds) ssc.union(*streams)\ .filter(lambda x: x[1] is not None)\ .map(lambda x: x[1])\ .foreachRDD(handle_rdd) # Keep only values and cast them to TextAnalysis ssc.start() ssc.awaitTermination() each RDD has at most 10 partitions which means that it should take around 0.5 seconds for all the tasks to be processed. Does anyone know what happens here? The time difference is too big for it to be networking right? From: Sudev A C [mailto:sudev...@go-mmt.com] Sent: Monday, July 3, 2017 7:48 PM To: Sidney Feiner <sidney.fei...@startapp.com>; user@spark.apache.org Subject: Re: [PySpark] - running processes You might want to do the initialisation per partition (Not sure how you are achieving the per executor initialisation in your code ). To initialise something for per partition, you may use something like rdd.forEach partition. Or if you want something globally like a variable for further processing you might want to initialise it once as a broadcast variable and use access the data structure through broadcast variable. Afaik python process will be initiated for per partition tasks. On Mon, 3 Jul 2017 at 5:23 PM, Sidney Feiner <sidney.fei...@startapp.com<mailto:sidney.fei...@startapp.com>> wrote: In my Spark Streaming application, I have the need to build a graph from a file and initializing that graph takes between 5 and 10 seconds. So I tried initializing it once per executor so it'll be initialized only once. After running the application, I've noticed that it's initiated much more than once per executor, every time with a different process id (every process has it's own logger). Doesn't every executor have it's own JVM and it's own process? Or is that only relevant when I develop in JVM languages like Scala/Java? Do executors in PySpark spawn new processes for new tasks? And if they do, how can I make sure that my graph object will really only be initiated once? Thanks :) Sidney Feiner / SW Developer M: +972.528197720 / Skype: sidney.feiner.startapp [emailsignature] ::DISCLAIMER:: This message is intended only for the use of the addressee and may contain information that is privileged, confidential and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, or the employee or agent responsible for delivering the message to the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. If you have received this e-mail in error, please notify us immediately by return e-mail and delete this e-mail and all attachments from your system.
[PySpark] - running processes
In my Spark Streaming application, I have the need to build a graph from a file and initializing that graph takes between 5 and 10 seconds. So I tried initializing it once per executor so it'll be initialized only once. After running the application, I've noticed that it's initiated much more than once per executor, every time with a different process id (every process has it's own logger). Doesn't every executor have it's own JVM and it's own process? Or is that only relevant when I develop in JVM languages like Scala/Java? Do executors in PySpark spawn new processes for new tasks? And if they do, how can I make sure that my graph object will really only be initiated once? Thanks :) Sidney Feiner / SW Developer M: +972.528197720 / Skype: sidney.feiner.startapp [emailsignature]
RE: Message getting lost in Kafka + Spark Streaming
Are you sure that every message gets processed? It could be that some messages failed passing the decoder. And during the processing, are you maybe putting the events into a map? That way, events with the same key could override each other and that way you'll have less final events. -Original Message- From: Vikash Pareek [mailto:vikash.par...@infoobjects.com] Sent: Tuesday, May 30, 2017 4:00 PM To: user@spark.apache.org Subject: Message getting lost in Kafka + Spark Streaming I am facing an issue related to spark streaming with kafka, my use case is as follow: 1. Spark streaming(DirectStream) application reading data/messages from kafka topic and process it 2. On the basis of proccessed message, app will write proccessed message to different kafka topics for e.g. if messgese is harmonized then write to harmonized topic else unharmonized topic the problem is that during the streaming somehow we are lossing some messaged i.e all the incoming messages are not written to harmonized or unharmonized topics. for e.g. if app received 30 messages in one batch then sometime it write all the messges to output topics(this is expected behaviour) but sometimes it writes only 27 (3 messages are lost, this number can change). Versions as follow: Spark 1.6.0 Kafka 0.9 Kafka topics confguration is as follow: # of brokers: 3 # replicxation factor: 3 # of paritions: 3 Following are the properties we are using for kafka: * val props = new Properties() props.put("metadata.broker.list", properties.getProperty("metadataBrokerList")) props.put("auto.offset.reset", properties.getProperty("autoOffsetReset")) props.put("group.id", properties.getProperty("group.id")) props.put("serializer.class", "kafka.serializer.StringEncoder") props.put("outTopicHarmonized", properties.getProperty("outletKafkaTopicHarmonized")) props.put("outTopicUnharmonized", properties.getProperty("outletKafkaTopicUnharmonized")) props.put("acks", "all"); props.put("retries", "5"); props.put("request.required.acks", "-1") * Following is the piece of code where we are writing proccessed messges to kafka: * val schemaRdd2 = finalHarmonizedDF.toJSON schemaRdd2.foreachPartition { partition => val producerConfig = new ProducerConfig(props) val producer = new Producer[String, String](producerConfig) partition.foreach { row => if (debug) println(row.mkString) val keyedMessage = new KeyedMessage[String, String](props.getProperty("outTopicHarmonized"), null, row.toString()) producer.send(keyedMessage) } //hack, should be done with the flush Thread.sleep(1000) producer.close() } * We explicitely added sleep(1000) for testing purpose. But this is also not solving the problem :( Any suggestion would be appreciated. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Message-getting-lost-in-Kafka-Spark-Streaming-tp28719.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
RE: [Spark Streaming] - Killing application from within code
Instead of setting up an additional mechanism, would it be "clean" to catch the error back in the driver, and use SparkContext.stop() there? And beause the SparkContext can’t be serialized, I can't catch the error inside the rdd.foreach function. What I did eventually and it worked: ssc.union(uniStreams) foreachRDD { rdd => val futures = rdd mapValues { event => handleEvent(event) } collect() map(_._2) Future.sequence(futures.toList) onFailure { case ex: Throwable => LoggerManager.getInstance().getLogger.error(s"Unhandled Error caught in job, stopping SparkContext. Error: ${ExceptionUtils.getStackTrace(ex)}") sc.stop() } } It collects all the futures into the driver and checks if one of them failed. If it's not a recommended way of doing it, I'm all ears ☺ From: Tathagata Das [mailto:tathagata.das1...@gmail.com] Sent: Wednesday, May 3, 2017 10:25 PM To: Sidney Feiner <sidney.fei...@startapp.com> Cc: user@spark.apache.org Subject: Re: [Spark Streaming] - Killing application from within code There isnt a clean programmatic way to kill the application running in the driver from the executor. You will have to set up addition RPC mechanism to explicitly send a signal from the executors to the application/driver to quit. On Wed, May 3, 2017 at 8:44 AM, Sidney Feiner <sidney.fei...@startapp.com<mailto:sidney.fei...@startapp.com>> wrote: Hey, I'm using connections to Elasticsearch from within my Spark Streaming application. I'm using Futures to maximize performance when it sends network requests to the ES cluster. Basically, I want my app to crash if any one of the executors fails to connect to ES. The exception gets catched and returned in my Future as a Failure(ex: NoNodeAvailableException) but when I handle it, I can't seem to kill my app. I tried using: fut andThen { case Failure(ex: NoNodeAvailableException) => throw ex } fut andThen { case Failure(ex: NoNodeAvailableException) => System.exit(-1) } fut onFailure { case ex: NoNodeAvailableException => throw ex } fut onFailure { case ex: NoNodeAvailableException => System.exit(-1) } But none of them seem to be killing my app. The System.exit(-1) kills my executor but that doesn't seem like the correct way to do it. And no matter what way I try, the driver stays alive. Is there a way to programmatically kill the application from within one of the workers? Thanks a lot ☺ Sidney Feiner / SW Developer M: +972.528197720<tel:+972%2052-819-7720> / Skype: sidney.feiner.startapp [emailsignature]
[Spark Streaming] - Killing application from within code
Hey, I'm using connections to Elasticsearch from within my Spark Streaming application. I'm using Futures to maximize performance when it sends network requests to the ES cluster. Basically, I want my app to crash if any one of the executors fails to connect to ES. The exception gets catched and returned in my Future as a Failure(ex: NoNodeAvailableException) but when I handle it, I can't seem to kill my app. I tried using: fut andThen { case Failure(ex: NoNodeAvailableException) => throw ex } fut andThen { case Failure(ex: NoNodeAvailableException) => System.exit(-1) } fut onFailure { case ex: NoNodeAvailableException => throw ex } fut onFailure { case ex: NoNodeAvailableException => System.exit(-1) } But none of them seem to be killing my app. The System.exit(-1) kills my executor but that doesn't seem like the correct way to do it. And no matter what way I try, the driver stays alive. Is there a way to programmatically kill the application from within one of the workers? Thanks a lot :) Sidney Feiner / SW Developer M: +972.528197720 / Skype: sidney.feiner.startapp [emailsignature]
RE: How to run a spark on Pycharm
Hey, I once found an article about that: https://mengdong.github.io/2016/08/08/fully-armed-pyspark-with-ipython-and-jupyter/ And I once managed to set it up on Pycharm as well. What I had to do was to add /path/to/spark to a system variable called "PYTHTONPATH". Try that one, it might help ☺ From: Anahita Talebi [mailto:anahita.t.am...@gmail.com] Sent: Friday, March 3, 2017 5:05 PM To: Pushkar.GujarCc: User Subject: Re: How to run a spark on Pycharm Hi, Thanks for your answer. Sorry, I am completely beginner in running the code in spark. Could you please tell me a bit more in details how to do that? I installed ipython and Jupyter notebook on my local machine. But how can I run the code using them? Before, I tried to run the code with Pycharm that I was failed. Thanks, Anahita On Fri, Mar 3, 2017 at 3:48 PM, Pushkar.Gujar > wrote: Jupyter notebook/ipython can be connected to apache spark Thank you, Pushkar Gujar On Fri, Mar 3, 2017 at 9:43 AM, Anahita Talebi > wrote: Hi everyone, I am trying to run a spark code on Pycharm. I tried to give the path of spark as a environment variable to the configuration of Pycharm. Unfortunately, I get the error. Does anyone know how I can run the spark code on Pycharm? It shouldn't be necessarily on Pycharm. if you know any other software, It would be nice to tell me. Thanks a lot, Anahita
RE: pyspark in intellij
Yes, I got it working once but I can't exactly remember how. I think what I did was the following: · To the environment variables, add a variable named PYTHONPATH with the path to your pyspark python directory (in my case, C:\spark-2.1.0-bin-hadoop2.7\python) · To the environment variable, add the same path as above to the PATH variable Hope these work ☺ Sidney Feiner / SW Developer M: +972.528197720 / Skype: sidney.feiner.startapp [emailsignature] From: Stephen Boesch [mailto:java...@gmail.com] Sent: Sunday, February 26, 2017 3:56 AM To: user <user@spark.apache.org> Subject: pyspark in intellij Anyone have this working - either in 1.X or 2.X? thanks
RE: How to query a query with not contain, not start_with, not end_with condition effective?
Chanh wants to return user_id's that don't have any record with a url containing "sell". Without a subquery/join, it can only filter per record without knowing about the rest of the user_id's record Sidney Feiner / SW Developer M: +972.528197720 / Skype: sidney.feiner.startapp [StartApp]<http://www.startapp.com/> From: Yong Zhang [mailto:java8...@hotmail.com] Sent: Tuesday, February 21, 2017 4:10 PM To: Chanh Le <giaosu...@gmail.com>; user @spark <user@spark.apache.org> Subject: Re: How to query a query with not contain, not start_with, not end_with condition effective? Not sure if I misunderstand your question, but what's wrong doing it this way? scala> spark.version res6: String = 2.0.2 scala> val df = Seq((1,"lao.com/sell"), (2, "lao.com/buy")).toDF("user_id", "url") df: org.apache.spark.sql.DataFrame = [user_id: int, url: string] scala> df.registerTempTable("data") warning: there was one deprecation warning; re-run with -deprecation for details scala> spark.sql("select user_id from data where url not like '%sell%'").show +---+ |user_id| +---+ | 2| +---+ Yong From: Chanh Le <giaosu...@gmail.com<mailto:giaosu...@gmail.com>> Sent: Tuesday, February 21, 2017 4:56 AM To: user @spark Subject: How to query a query with not contain, not start_with, not end_with condition effective? Hi everyone, I am working on a dataset like this user_id url 1 lao.com/buy<http://lao.com/buy> 2 bao.com/sell<http://bao.com/sell> 2 cao.com/market<http://cao.com/market> 1 lao.com/sell<http://lao.com/sell> 3 vui.com/sell<http://vui.com/sell> I have to find all user_id with url not contain sell. Which means I need to query all user_id contains sell and put it into a set then do another query to find all user_id not in that set. SELECT user_id FROM data WHERE user_id not in ( SELECT user_id FROM data WHERE url like '%sell%'; My data is about 20 million records and it's growing. When I tried in zeppelin I need to set spark.sql.crossJoin.enabled = true Then I ran the query and the driver got extremely high CPU percentage and the process get stuck and I need to kill it. I am running at client mode that submit to a Mesos cluster. I am using Spark 2.0.2 and my data store in HDFS with parquet format. Any advices for me in this situation? Thank you in advance!. Regards, Chanh
RE: Jars directory in Spark 2.0
Ok, good to know ☺ Shading every spark app it is then… Thanks! Sidney Feiner / SW Developer M: +972.528197720 / Skype: sidney.feiner.startapp [StartApp]<http://www.startapp.com/> From: Marcelo Vanzin [mailto:van...@cloudera.com] Sent: Wednesday, February 1, 2017 7:41 PM To: Sidney Feiner <sidney.fei...@startapp.com> Cc: Koert Kuipers <ko...@tresata.com>; user@spark.apache.org Subject: Re: Jars directory in Spark 2.0 Spark has never shaded dependencies (in the sense of renaming the classes), with a couple of exceptions (Guava and Jetty). So that behavior is nothing new. Spark's dependencies themselves have a lot of other dependencies, so doing that would have limited benefits anyway. On Tue, Jan 31, 2017 at 11:23 PM, Sidney Feiner <sidney.fei...@startapp.com<mailto:sidney.fei...@startapp.com>> wrote: Is this done on purpose? Because it really makes it hard to deploy applications. Is there a reason they didn't shade the jars they use to begin with? Sidney Feiner / SW Developer M: +972.528197720<tel:+972%2052-819-7720> / Skype: sidney.feiner.startapp [StartApp]<http://www.startapp.com/> From: Koert Kuipers [mailto:ko...@tresata.com<mailto:ko...@tresata.com>] Sent: Tuesday, January 31, 2017 7:26 PM To: Sidney Feiner <sidney.fei...@startapp.com<mailto:sidney.fei...@startapp.com>> Cc: user@spark.apache.org<mailto:user@spark.apache.org> Subject: Re: Jars directory in Spark 2.0 you basically have to keep your versions of dependencies in line with sparks or shade your own dependencies. you cannot just replace the jars in sparks jars folder. if you wan to update them you have to build spark yourself with updated dependencies and confirm it compiles, passes tests etc. On Tue, Jan 31, 2017 at 3:40 AM, Sidney Feiner <sidney.fei...@startapp.com<mailto:sidney.fei...@startapp.com>> wrote: Hey, While migrating to Spark 2.X from 1.6, I've had many issues with jars that come preloaded with Spark in the "jars/" directory and I had to shade most of my packages. Can I replace the jars in this folder to more up to date versions? Are those jar used for anything internal in Spark which means I can't blindly replace them? Thanks ☺ Sidney Feiner / SW Developer M: +972.528197720<tel:+972%2052-819-7720> / Skype: sidney.feiner.startapp [StartApp]<http://www.startapp.com/> <http://www.startapp.com/press/#events_press> -- Marcelo
RE: Jars directory in Spark 2.0
Is this done on purpose? Because it really makes it hard to deploy applications. Is there a reason they didn't shade the jars they use to begin with? Sidney Feiner / SW Developer M: +972.528197720 / Skype: sidney.feiner.startapp [StartApp]<http://www.startapp.com/> From: Koert Kuipers [mailto:ko...@tresata.com] Sent: Tuesday, January 31, 2017 7:26 PM To: Sidney Feiner <sidney.fei...@startapp.com> Cc: user@spark.apache.org Subject: Re: Jars directory in Spark 2.0 you basically have to keep your versions of dependencies in line with sparks or shade your own dependencies. you cannot just replace the jars in sparks jars folder. if you wan to update them you have to build spark yourself with updated dependencies and confirm it compiles, passes tests etc. On Tue, Jan 31, 2017 at 3:40 AM, Sidney Feiner <sidney.fei...@startapp.com<mailto:sidney.fei...@startapp.com>> wrote: Hey, While migrating to Spark 2.X from 1.6, I've had many issues with jars that come preloaded with Spark in the "jars/" directory and I had to shade most of my packages. Can I replace the jars in this folder to more up to date versions? Are those jar used for anything internal in Spark which means I can't blindly replace them? Thanks ☺ Sidney Feiner / SW Developer M: +972.528197720<tel:+972%2052-819-7720> / Skype: sidney.feiner.startapp [StartApp]<http://www.startapp.com/> <http://www.startapp.com/press/#events_press> <http://www.startapp.com/press/#events_press>
Jars directory in Spark 2.0
Hey, While migrating to Spark 2.X from 1.6, I've had many issues with jars that come preloaded with Spark in the "jars/" directory and I had to shade most of my packages. Can I replace the jars in this folder to more up to date versions? Are those jar used for anything internal in Spark which means I can't blindly replace them? Thanks :) Sidney Feiner / SW Developer M: +972.528197720 / Skype: sidney.feiner.startapp [StartApp]<http://www.startapp.com/> [Meet Us at] <http://www.startapp.com/press/#events_press>
RE: [PySpark 2.1.0] - SparkContext not properly initialized by SparkConf
I think I'm getting close to find the reason: When I initialize the SparkContext, the following code is executed: def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer, conf, jsc, profiler_cls): self.environment = environment or {} # java gateway must have been launched at this point. if conf is not None and conf._jconf is not None: # conf has been initialized in JVM properly, so use conf directly. This represent the # scenario that JVM has been launched before SparkConf is created (e.g. SparkContext is # created and then stopped, and we create a new SparkConf and new SparkContext again) self._conf = conf else: self._conf = SparkConf(_jvm=SparkContext._jvm) So I can see that the only way that my SparkConf will be used is if it also has a _jvm object. I've used spark-submit to submit my job and printed the _jvm object but it is null, which explains why my SparkConf object is ignored. I've tried running exactly the same on Spark 2.0.1 and it worked! My SparkConf object had a valid _jvm object. Anybody knows what changed? Or if I got something wrong? Thanks :) Sidney Feiner / SW Developer M: +972.528197720 / Skype: sidney.feiner.startapp [StartApp]<http://www.startapp.com/> From: Sidney Feiner Sent: Thursday, January 26, 2017 9:26 AM To: user@spark.apache.org Subject: [PySpark 2.1.0] - SparkContext not properly initialized by SparkConf Hey, I'm pasting a question I asked on Stack Overflow without getting any answers(:() I hope somebody here knows the answer, thanks in advance! Link to post<https://stackoverflow.com/questions/41847113/pyspark-2-1-0-sparkcontext-not-properly-initialized-by-sparkconf> I'm migrating from Spark 1.6 to 2.1.0 and I've run into a problem migrating my PySpark application. I'm dynamically setting up my SparkConf object based on configurations in a file and when I was on Spark 1.6, the app would run with the correct configs. But now, when I open the Spark UI, I can see that NONE of those configs are loaded into the SparkContext. Here's my code: spark_conf = SparkConf().setAll( filter(lambda x: x[0].startswith('spark.'), conf_dict.items()) ) sc = SparkContext(conf=spark_conf) I've also added a print before initializing the SparkContext to make sure the SparkConf has all the relevant configs: [print("{0}: {1}".format(key, value)) for (key, value) in spark_conf.getAll()] And this outputs all the configs I need: * spark.app.name: MyApp * spark.akka.threads: 4 * spark.driver.memory: 2G * spark.streaming.receiver.maxRate: 25 * spark.streaming.backpressure.enabled: true * spark.executor.logs.rolling.maxRetainedFiles: 7 * spark.executor.memory: 3G * spark.cores.max: 24 * spark.executor.cores: 4 * spark.streaming.blockInterval: 350ms * spark.memory.storageFraction: 0.2 * spark.memory.useLegacyMode: false * spark.memory.fraction: 0.8 * spark.executor.logs.rolling.time.interval: daily I submit my job with the following: /usr/local/spark/bin/spark-submit --conf spark.driver.host=i-${HOSTNAME} --master spark://i-${HOSTNAME}:7077 /path/to/main/file.py /path/to/config/file Does anybody know why my SparkContext doesn't get initialized with my SparkConf? Thanks :) Sidney Feiner / SW Developer M: +972.528197720 / Skype: sidney.feiner.startapp [StartApp]<http://www.startapp.com/>
[PySpark 2.1.0] - SparkContext not properly initialized by SparkConf
Hey, I'm pasting a question I asked on Stack Overflow without getting any answers(:() I hope somebody here knows the answer, thanks in advance! Link to post<https://stackoverflow.com/questions/41847113/pyspark-2-1-0-sparkcontext-not-properly-initialized-by-sparkconf> I'm migrating from Spark 1.6 to 2.1.0 and I've run into a problem migrating my PySpark application. I'm dynamically setting up my SparkConf object based on configurations in a file and when I was on Spark 1.6, the app would run with the correct configs. But now, when I open the Spark UI, I can see that NONE of those configs are loaded into the SparkContext. Here's my code: spark_conf = SparkConf().setAll( filter(lambda x: x[0].startswith('spark.'), conf_dict.items()) ) sc = SparkContext(conf=spark_conf) I've also added a print before initializing the SparkContext to make sure the SparkConf has all the relevant configs: [print("{0}: {1}".format(key, value)) for (key, value) in spark_conf.getAll()] And this outputs all the configs I need: * spark.app.name: MyApp * spark.akka.threads: 4 * spark.driver.memory: 2G * spark.streaming.receiver.maxRate: 25 * spark.streaming.backpressure.enabled: true * spark.executor.logs.rolling.maxRetainedFiles: 7 * spark.executor.memory: 3G * spark.cores.max: 24 * spark.executor.cores: 4 * spark.streaming.blockInterval: 350ms * spark.memory.storageFraction: 0.2 * spark.memory.useLegacyMode: false * spark.memory.fraction: 0.8 * spark.executor.logs.rolling.time.interval: daily I submit my job with the following: /usr/local/spark/bin/spark-submit --conf spark.driver.host=i-${HOSTNAME} --master spark://i-${HOSTNAME}:7077 /path/to/main/file.py /path/to/config/file Does anybody know why my SparkContext doesn't get initialized with my SparkConf? Thanks :) Sidney Feiner / SW Developer M: +972.528197720 / Skype: sidney.feiner.startapp [StartApp]<http://www.startapp.com/> [Meet Us at] <http://www.startapp.com/press/#events_press>
Re: Spark-submit: where do --files go?
Every executor creates a directory with your submitted files and you can access every file's absolute path them with the following: val fullFilePath = SparkFiles.get(fileName) On Jan 19, 2017 19:35, jeff saremiwrote: I'd like to know how -- From within Java/spark -- I can access the dependent files which i deploy using "--files" option on the command line?
RE: [PySpark - 1.6] - Avoid object serialization
Thanks everybody but I've found another way of doing it. Because I didn't really actually need an instance of my class, I created a "static" class. All variables get initiated as class variables and all methods are class methods. Thanks a lot anyways, hope my answer will also help one day ☺ Sidney Feiner / SW Developer M: +972.528197720 / Skype: sidney.feiner.startapp [StartApp]<http://www.startapp.com/> From: Holden Karau [mailto:hol...@pigscanfly.ca] Sent: Thursday, December 29, 2016 8:54 PM To: Chawla,Sumit <sumitkcha...@gmail.com>; Eike von Seggern <eike.segg...@sevenval.com> Cc: Sidney Feiner <sidney.fei...@startapp.com>; user@spark.apache.org Subject: Re: [PySpark - 1.6] - Avoid object serialization Alternatively, using the broadcast functionality can also help with this. On Thu, Dec 29, 2016 at 3:05 AM Eike von Seggern <eike.segg...@sevenval.com<mailto:eike.segg...@sevenval.com>> wrote: 2016-12-28 20:17 GMT+01:00 Chawla,Sumit <sumitkcha...@gmail.com<mailto:sumitkcha...@gmail.com>>: Would this work for you? def processRDD(rdd): analyzer = ShortTextAnalyzer(root_dir) rdd.foreach(lambda record: analyzer.analyze_short_text_event(record[1])) ssc.union(*streams).filter(lambda x: x[1] != None) .foreachRDD(lambda rdd: processRDD(rdd)) I think, this will still send each analyzer to all executors where rdd partitions are stored. Maybe you can work around this with `RDD.foreachPartition()`: def processRDD(rdd): def partition_func(records): analyzer = ShortTextAnalyzer(root_dir) for record in records: analyzer.analyze_short_text_event(record[1]) rdd.foreachPartition(partition_func) This will create one analyzer per partition and RDD. Best Eike
[PySpark - 1.6] - Avoid object serialization
Hey, I just posted this question on Stack Overflow (link here<http://stackoverflow.com/questions/41362314/pyspark-streaming-job-avoid-object-serialization>) and decided to try my luck here as well :) I'm writing a PySpark job but I got into some performance issues. Basically, all it does is read events from Kafka and logs the transformations made. Thing is, the transformation is calculated based on an object's function, and that object is pretty heavy as it contains a Graph and an inner-cache which gets automatically updated as it processes rdd's. So when I write the following piece of code: analyzer = ShortTextAnalyzer(root_dir) logger.info("Start analyzing the documents from kafka") ssc.union(*streams).filter(lambda x: x[1] != None).foreachRDD(lambda rdd: rdd.foreach(lambda record: analyzer.analyze_short_text_event(record[1]))) It serializes my analyzer which takes a lot of time because of the graph, and as it is copied to the executor, the cache is only relevant for that specific RDD. If the job was written in Scala, I could have written an Object which would exist in every executor and then my object wouldn't have to be serialized each time. I've read in a post (http://www.spark.tc/deserialization-in-pyspark-storage/) that prior to PySpark 2.0, objects are always serialized. So does that mean that I have no way to avoid the serialization? I'd love to hear about a way to avoid serialization in PySpark if it exists. To have my object created once for each executor and then it could avoid the serialization process, gain time and actually have a working cache system? Thanks in advance :) Sidney Feiner / SW Developer M: +972.528197720 / Skype: sidney.feiner.startapp [StartApp]<http://www.startapp.com/>