Accessing log for lost executors
Hi all, I'm trying to troubleshoot an ExecutorLostFailure issue. In Spark UI I noticed that executors tab only list active executors, is there any way that I can see the log for dead executors so that I can find out why it's dead/lost? I'm using Spark 1.5.2 on YARN 2.7.1. Thanks! Nisrina
Client process memory usage
Hi all, I have a python Spark application that I'm running using spark-submit in yarn-cluster mode. If I run ps -aux | grep in the submitter node, I can find the client process that submitted the application, usually with around 300-600 MB memory use (%MEM around 1.0-2.0 in a node with 30 GB memory). Is there anything that I can do to make this smaller? Also, as far as I know in yarn-cluster mode after the application is launched the client then does nothing, what is the memory used for? Thank you, Nisrina.
Spark SQL - udf with entire row as parameter
Hi all, I'm using spark sql in python and want to write a udf that takes an entire Row as the argument. I tried something like: def functionName(row): ... return a_string udfFunctionName=udf(functionName, StringType()) df.withColumn('columnName', udfFunctionName('*')) but this gives an error message: Traceback (most recent call last): File "", line 1, in File "/home/nina/Downloads/spark-1.6.0-bin-hadoop2.6/python/pyspark/sql/dataframe.py", line 1311, in withColumn return DataFrame(self._jdf.withColumn(colName, col._jc), self.sql_ctx) File "/home/nina/Downloads/spark-1.6.0-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__ File "/home/nina/Downloads/spark-1.6.0-bin-hadoop2.6/python/pyspark/sql/utils.py", line 51, in deco raise AnalysisException(s.split(': ', 1)[1], stackTrace) pyspark.sql.utils.AnalysisException: u"unresolved operator 'Project [address#0,name#1,PythonUDF#functionName(*) AS columnName#26];" Does anyone know how this can be done or whether this is possible? Thank you, Nisrina.
Re: Write to S3 with server side encryption in KMS mode
Ah, alright then, that looks like that's the case. Thank you for the info. I'm probably going to try to use the s3 managed encryption, from what I read this is supported by setting fs.s3a.server-side-encryption-algorithm parameter. Thanks! Nisrina On Tue, Jan 26, 2016 at 11:55 PM, Ewan Leith wrote: > Hi Nisrina, I’m not aware of any support for KMS keys in s3n, s3a or the > EMR specific EMRFS s3 driver. > > > > If you’re using EMRFS with Amazon’s EMR, you can use KMS keys with > client-side encryption > > > > > http://docs.aws.amazon.com/kms/latest/developerguide/services-emr.html#emrfs-encrypt > > > > If this has changed, I’d love to know, but I’m pretty sure it hasn’t. > > > > The alternative is to write to HDFS, then copy the data across in bulk. > > > > Thanks, > > Ewan > > > > > > > > *From:* Nisrina Luthfiyati [mailto:nisrina.luthfiy...@gmail.com] > *Sent:* 26 January 2016 10:42 > *To:* user > *Subject:* Write to S3 with server side encryption in KMS mode > > > > Hi all, > > > > I'm trying to save a spark application output to a bucket in S3. The data > is supposed to be encrypted with S3's server side encryption using KMS > mode, which typically (using java api/cli) would require us to pass the > sse-kms key when writing the data. I currently have not found a way to do > this using spark hadoop config. Would anyone have any idea how this can be > done or whether this is possible? > > > > Thanks, > > Nisrina. > > > -- Nisrina Luthfiyati - Ilmu Komputer Fasilkom UI 2010 http://www.facebook.com/nisrina.luthfiyati http://id.linkedin.com/in/nisrina
Write to S3 with server side encryption in KMS mode
Hi all, I'm trying to save a spark application output to a bucket in S3. The data is supposed to be encrypted with S3's server side encryption using KMS mode, which typically (using java api/cli) would require us to pass the sse-kms key when writing the data. I currently have not found a way to do this using spark hadoop config. Would anyone have any idea how this can be done or whether this is possible? Thanks, Nisrina.
Re: In yarn-client mode, is it the driver or application master that issue commands to executors?
Hi Jacek, thank you for your answer. I looked at TaskSchedulerImpl and TaskSetManager and it does looked like tasks are directly sent to executors. Also would love to be corrected if mistaken as I have little knowledge about Spark internals and very new at scala. On Tue, Dec 1, 2015 at 1:16 AM, Jacek Laskowski wrote: > On Fri, Nov 27, 2015 at 12:12 PM, Nisrina Luthfiyati < > nisrina.luthfiy...@gmail.com> wrote: > >> Hi all, >> I'm trying to understand how yarn-client mode works and found these two >> diagrams: >> >> >> >> >> In the first diagram, it looks like the driver running in client directly >> communicates with executors to issue application commands, while in the >> second diagram it looks like application commands is sent to application >> master first and then forwarded to executors. >> > > My limited understanding tells me that regardless of deploy mode (local, > standalone, YARN or mesos), drivers (using TaskSchedulerImpl) sends > TaskSets to executors once they're launched. YARN and Mesos are only used > until they offer resources (CPU and memory) and once executors start, these > cluster managers are not engaged in the communication (driver and executors > communicate using RPC over netty since 1.6-SNAPSHOT or akka before). > > I'd love being corrected if mistaken. Thanks. > > Jacek > -- Nisrina Luthfiyati - Ilmu Komputer Fasilkom UI 2010 http://www.facebook.com/nisrina.luthfiyati http://id.linkedin.com/in/nisrina
Re: In yarn-client mode, is it the driver or application master that issue commands to executors?
Hi Mich, thank you for the answer. Regarding the diagrams, I'm specifically referring to the direct line between spark yarn client to spark executor in the first diagram which implies direct communication to executor when issuing application commands. And the 'Application commands' & 'Issue application commands' lines in the second diagram which implies that spark driver in client communicates to executor via yarn application master (Correct me if i'm wrong in these interpretations). Would you happen to know how spark drivers communicates with executor in yarn-client mode or if both can be true under different circumstances? Thanks again, Nisrina. On Fri, Nov 27, 2015 at 6:22 PM, Mich Talebzadeh wrote: > Hi, > > > > In general YARN is used as the resource scheduler regardless of the > execution engine whether it is MapReduce or Spark. > > > > Yarn will create a resource container for the submitted job (that is the > Spark client) and will execute it in the default engine (in this case > Spark). There will be a job scheduler and one or more Spark Executors > depending on the cluster. So as far as I can see both diagrams are correct, > > > > HTH > > > > Mich Talebzadeh > > > > *Sybase ASE 15 Gold Medal Award 2008* > > A Winning Strategy: Running the most Critical Financial Data on ASE 15 > > > http://login.sybase.com/files/Product_Overviews/ASE-Winning-Strategy-091908.pdf > > Author of the books* "A Practitioner’s Guide to Upgrading to Sybase ASE > 15", ISBN 978-0-9563693-0-7*. > > co-author *"Sybase Transact SQL Guidelines Best Practices", ISBN > 978-0-9759693-0-4* > > *Publications due shortly:* > > *Complex Event Processing in Heterogeneous Environments*, ISBN: > 978-0-9563693-3-8 > > *Oracle and Sybase, Concepts and Contrasts*, ISBN: 978-0-9563693-1-4, volume > one out shortly > > > > http://talebzadehmich.wordpress.com > > > > NOTE: The information in this email is proprietary and confidential. This > message is for the designated recipient only, if you are not the intended > recipient, you should destroy it immediately. Any information in this > message shall not be understood as given or endorsed by Peridale Technology > Ltd, its subsidiaries or their employees, unless expressly so stated. It is > the responsibility of the recipient to ensure that this email is virus > free, therefore neither Peridale Ltd, its subsidiaries nor their employees > accept any responsibility. > > > > *From:* Nisrina Luthfiyati [mailto:nisrina.luthfiy...@gmail.com] > *Sent:* 27 November 2015 11:12 > *To:* user@spark.apache.org > *Subject:* In yarn-client mode, is it the driver or application master > that issue commands to executors? > > > > Hi all, > > I'm trying to understand how yarn-client mode works and found these two > diagrams: > > > > > In the first diagram, it looks like the driver running in client directly > communicates with executors to issue application commands, while in the > second diagram it looks like application commands is sent to application > master first and then forwarded to executors. > > Would anyone knows which case is true or is there any other interpretation > to these diagrams? > > Thanks! > > Nisrina > -- Nisrina Luthfiyati - Ilmu Komputer Fasilkom UI 2010 http://www.facebook.com/nisrina.luthfiyati http://id.linkedin.com/in/nisrina
In yarn-client mode, is it the driver or application master that issue commands to executors?
Hi all, I'm trying to understand how yarn-client mode works and found these two diagrams: In the first diagram, it looks like the driver running in client directly communicates with executors to issue application commands, while in the second diagram it looks like application commands is sent to application master first and then forwarded to executors. Would anyone knows which case is true or is there any other interpretation to these diagrams? Thanks! Nisrina
Re: Is the resources specified in configuration shared by all jobs?
Got it. Thanks! On Nov 5, 2015 12:32 AM, "Sandy Ryza" wrote: > Hi Nisrina, > > The resources you specify are shared by all jobs that run inside the > application. > > -Sandy > > On Wed, Nov 4, 2015 at 9:24 AM, Nisrina Luthfiyati < > nisrina.luthfiy...@gmail.com> wrote: > >> Hi all, >> >> I'm running some spark jobs in java on top of YARN by submitting one >> application jar that starts multiple jobs. >> My question is, if I'm setting some resource configurations, either when >> submitting the app or in spark-defaults.conf, would this configs apply to >> each job or the entire application? >> >> For example if I lauch it with: >> >> spark-submit --class org.some.className \ >> --master yarn-client \ >> --num-executors 3 \ >> --executor-memory 5g \ >> someJar.jar \ >> >> , would the 3 executor x 5G memory be allocated to each job or would all >> jobs share the resources? >> >> Thank you! >> Nisrina >> >> >
Is the resources specified in configuration shared by all jobs?
Hi all, I'm running some spark jobs in java on top of YARN by submitting one application jar that starts multiple jobs. My question is, if I'm setting some resource configurations, either when submitting the app or in spark-defaults.conf, would this configs apply to each job or the entire application? For example if I lauch it with: spark-submit --class org.some.className \ --master yarn-client \ --num-executors 3 \ --executor-memory 5g \ someJar.jar \ , would the 3 executor x 5G memory be allocated to each job or would all jobs share the resources? Thank you! Nisrina
Re: Spark Streaming: Change Kafka topics on runtime
Hi Cody, by start/stopping, do you mean the streaming context or the app entirely? >From what I understand once a streaming context has been stopped it cannot be restarted, but I also haven't found a way to stop the app programmatically. The batch duration will probably be around 1-10 seconds. I think this is small enough to not make it a batch job? Thanks again On Thu, Aug 13, 2015 at 10:15 PM, Cody Koeninger wrote: > The current kafka stream implementation assumes the set of topics doesn't > change during operation. > > You could either take a crack at writing a subclass that does what you > need; stop/start; or if your batch duration isn't too small, you could run > it as a series of RDDs (using the existing KafkaUtils.createRDD) where the > set of topics is determined before each rdd. > > On Thu, Aug 13, 2015 at 4:38 AM, Nisrina Luthfiyati < > nisrina.luthfiy...@gmail.com> wrote: > >> Hi all, >> >> I want to write a Spark Streaming program that listens to Kafka for a >> list of topics. >> The list of topics that I want to consume is stored in a DB and might >> change dynamically. I plan to periodically refresh this list of topics in >> the Spark Streaming app. >> >> My question is is it possible to add/remove a Kafka topic that is >> consumed by a stream, or probably create a new stream at runtime? >> Would I need to stop/start the program or is there any other way to do >> this? >> >> Thanks! >> Nisrina >> > > -- Nisrina Luthfiyati - Ilmu Komputer Fasilkom UI 2010 http://www.facebook.com/nisrina.luthfiyati http://id.linkedin.com/in/nisrina
Spark Streaming: Change Kafka topics on runtime
Hi all, I want to write a Spark Streaming program that listens to Kafka for a list of topics. The list of topics that I want to consume is stored in a DB and might change dynamically. I plan to periodically refresh this list of topics in the Spark Streaming app. My question is is it possible to add/remove a Kafka topic that is consumed by a stream, or probably create a new stream at runtime? Would I need to stop/start the program or is there any other way to do this? Thanks! Nisrina
Re: Grouping and storing unordered time series data stream to HDFS
Hi Ayan and Helena, I've considered using Cassandra/HBase but ended up opting to save to worker hdfs because I want to take advantage of the data locality since the data will often be loaded to Spark for further processing. I was also under the impression that saving to filesystem (instead of db) is the better option for intermediate data. Definitely going to read up some more and reconsider due to the time series nature of the data though. This might be a bit out of topic, but in your experience is it common to store intermediate data that will be loaded to Spark plenty of times in the future in Cassandra? Regarding on how late a data can be, I might be able to set the limit. Would you know if it's possible to combine RDDs from different interval in Spark Streaming? Or would I need to write to file first then group the data by time dimension in other batch processing? Thanks in advance! Nisrina. On May 16, 2015 7:26 PM, "Helena Edelson" wrote: > Consider using cassandra with spark streaming and timeseries, cassandra > has been doing time series for years. > Here’s some snippets with kafka streaming and writing/reading the data > back: > > > https://github.com/killrweather/killrweather/blob/master/killrweather-app/src/main/scala/com/datastax/killrweather/KafkaStreamingActor.scala#L62-L64 > > or write in the stream, read back > > https://github.com/killrweather/killrweather/blob/master/killrweather-examples/src/main/scala/com/datastax/killrweather/KafkaStreamingJson2.scala#L53-L61 > > or more detailed reads back > > https://github.com/killrweather/killrweather/blob/master/killrweather-app/src/main/scala/com/datastax/killrweather/TemperatureActor.scala#L65-L69 > > > > A CassandraInputDStream is coming, i’m working on it now. > > Helena > @helenaedelson > > On May 15, 2015, at 9:59 AM, ayan guha wrote: > > Hi > > Do you have a cut off time, like how "late" an event can be? Else, you may > consider a different persistent storage like Cassandra/Hbase and delegate > "update: part to them. > > On Fri, May 15, 2015 at 8:10 PM, Nisrina Luthfiyati < > nisrina.luthfiy...@gmail.com> wrote: > >> >> Hi all, >> I have a stream of data from Kafka that I want to process and store in >> hdfs using Spark Streaming. >> Each data has a date/time dimension and I want to write data within the >> same time dimension to the same hdfs directory. The data stream might be >> unordered (by time dimension). >> >> I'm wondering what are the best practices in grouping/storing time series >> data stream using Spark Streaming? >> >> I'm considering grouping each batch of data in Spark Streaming per time >> dimension and then saving each group to different hdfs directories. However >> since it is possible for data with the same time dimension to be in >> different batches, I would need to handle "update" in case the hdfs >> directory already exists. >> >> Is this a common approach? Are there any other approaches that I can try? >> >> Thank you! >> Nisrina. >> > > > > -- > Best Regards, > Ayan Guha > > >
Grouping and storing unordered time series data stream to HDFS
Hi all, I have a stream of data from Kafka that I want to process and store in hdfs using Spark Streaming. Each data has a date/time dimension and I want to write data within the same time dimension to the same hdfs directory. The data stream might be unordered (by time dimension). I'm wondering what are the best practices in grouping/storing time series data stream using Spark Streaming? I'm considering grouping each batch of data in Spark Streaming per time dimension and then saving each group to different hdfs directories. However since it is possible for data with the same time dimension to be in different batches, I would need to handle "update" in case the hdfs directory already exists. Is this a common approach? Are there any other approaches that I can try? Thank you! Nisrina.
Performance advantage by loading data from local node over S3.
Hi all, I'm new to Spark so I'm sorry if the question is too vague. I'm currently trying to deploy a Spark cluster using YARN on an amazon EMR cluster. For the data storage I'm currently using S3 but would loading the data in HDFS from local node gives considerable performance advantage over loading from S3? Would the reduced traffic latency in data load affect the runtime largely, considering most of the computation is done in memory? Thank you, Nisrina.