Re: Spark on Mesos - Weird behavior
Oh, sorry, i missed that you use spark without dynamic allocation. Anyway, i don't know does this parameters works without dynamic allocation. On Wed, Jul 11, 2018 at 5:11 PM Thodoris Zois wrote: > Hello, > > Yeah you are right, but I think that works only if you use Spark dynamic > allocation. Am I wrong? > > -Thodoris > > On 11 Jul 2018, at 17:09, Pavel Plotnikov > wrote: > > Hi, Thodoris > You can configure resources per executor and manipulate with number of > executers instead using spark.max.cores. I think > spark.dynamicAllocation.minExecutors > and spark.dynamicAllocation.maxExecutors configuration values can help > you. > > On Tue, Jul 10, 2018 at 5:07 PM Thodoris Zois wrote: > >> Actually after some experiments we figured out that spark.max.cores / >> spark.executor.cores is the upper bound for the executors. Spark apps will >> run even only if one executor can be launched. >> >> Is there any way to specify also the lower bound? It is a bit annoying >> that seems that we can’t control the resource usage of an application. By >> the way, we are not using dynamic allocation. >> >> - Thodoris >> >> >> On 10 Jul 2018, at 14:35, Pavel Plotnikov >> wrote: >> >> Hello Thodoris! >> Have you checked this: >> - does mesos cluster have available resources? >> - if spark have waiting tasks in queue more than >> spark.dynamicAllocation.schedulerBacklogTimeout configuration value? >> - And then, have you checked that mesos send offers to spark app mesos >> framework at least with 10 cores and 2GB RAM? >> >> If mesos have not available offers with 10 cores, for example, but have >> with 8 or 9, so you can use smaller executers for better fit for available >> resources on nodes for example with 4 cores and 1 GB RAM, for example >> >> Cheers, >> Pavel >> >> On Mon, Jul 9, 2018 at 9:05 PM Thodoris Zois wrote: >> >>> Hello list, >>> >>> We are running Apache Spark on a Mesos cluster and we face a weird >>> behavior of executors. When we submit an app with e.g 10 cores and 2GB of >>> memory and max cores 30, we expect to see 3 executors running on the >>> cluster. However, sometimes there are only 2... Spark applications are not >>> the only one that run on the cluster. I guess that Spark starts executors >>> on the available offers even if it does not satisfy our needs. Is there any >>> configuration that we can use in order to prevent Spark from starting when >>> there are no resource offers for the total number of executors? >>> >>> Thank you >>> - Thodoris >>> >>> - >>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>> >>> >
Re: Spark on Mesos - Weird behavior
Hi, Thodoris You can configure resources per executor and manipulate with number of executers instead using spark.max.cores. I think spark.dynamicAllocation.minExecutors and spark.dynamicAllocation.maxExecutors configuration values can help you. On Tue, Jul 10, 2018 at 5:07 PM Thodoris Zois wrote: > Actually after some experiments we figured out that spark.max.cores / > spark.executor.cores is the upper bound for the executors. Spark apps will > run even only if one executor can be launched. > > Is there any way to specify also the lower bound? It is a bit annoying > that seems that we can’t control the resource usage of an application. By > the way, we are not using dynamic allocation. > > - Thodoris > > > On 10 Jul 2018, at 14:35, Pavel Plotnikov > wrote: > > Hello Thodoris! > Have you checked this: > - does mesos cluster have available resources? > - if spark have waiting tasks in queue more than > spark.dynamicAllocation.schedulerBacklogTimeout configuration value? > - And then, have you checked that mesos send offers to spark app mesos > framework at least with 10 cores and 2GB RAM? > > If mesos have not available offers with 10 cores, for example, but have > with 8 or 9, so you can use smaller executers for better fit for available > resources on nodes for example with 4 cores and 1 GB RAM, for example > > Cheers, > Pavel > > On Mon, Jul 9, 2018 at 9:05 PM Thodoris Zois wrote: > >> Hello list, >> >> We are running Apache Spark on a Mesos cluster and we face a weird >> behavior of executors. When we submit an app with e.g 10 cores and 2GB of >> memory and max cores 30, we expect to see 3 executors running on the >> cluster. However, sometimes there are only 2... Spark applications are not >> the only one that run on the cluster. I guess that Spark starts executors >> on the available offers even if it does not satisfy our needs. Is there any >> configuration that we can use in order to prevent Spark from starting when >> there are no resource offers for the total number of executors? >> >> Thank you >> - Thodoris >> >> - >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> >>
Re: Spark on Mesos - Weird behavior
Hello Thodoris! Have you checked this: - does mesos cluster have available resources? - if spark have waiting tasks in queue more than spark.dynamicAllocation.schedulerBacklogTimeout configuration value? - And then, have you checked that mesos send offers to spark app mesos framework at least with 10 cores and 2GB RAM? If mesos have not available offers with 10 cores, for example, but have with 8 or 9, so you can use smaller executers for better fit for available resources on nodes for example with 4 cores and 1 GB RAM, for example Cheers, Pavel On Mon, Jul 9, 2018 at 9:05 PM Thodoris Zois wrote: > Hello list, > > We are running Apache Spark on a Mesos cluster and we face a weird > behavior of executors. When we submit an app with e.g 10 cores and 2GB of > memory and max cores 30, we expect to see 3 executors running on the > cluster. However, sometimes there are only 2... Spark applications are not > the only one that run on the cluster. I guess that Spark starts executors > on the available offers even if it does not satisfy our needs. Is there any > configuration that we can use in order to prevent Spark from starting when > there are no resource offers for the total number of executors? > > Thank you > - Thodoris > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Re: Spark diclines mesos offers
Michael Gummelt, Thanks!!! I'm forgot about debug logging! On Mon, Apr 24, 2017 at 9:30 PM Michael Gummelt wrote: > Have you run with debug logging? There are some hints in the debug logs: > https://github.com/apache/spark/blob/branch-2.1/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala#L316 > > On Mon, Apr 24, 2017 at 4:53 AM, Pavel Plotnikov < > pavel.plotni...@team.wrike.com> wrote: > >> Hi, everyone! I run spark 2.1.0 jobs on the top of Mesos cluster in >> coarse-grained mode with dynamic resource allocation. And sometimes spark >> mesos scheduler declines mesos offers despite the fact that not all >> available resources were used (I have less workers than the possible >> maximum) and the maximum threshold in the spark configuration is not >> reached and the queue have lot of pending tasks. >> >> May be I have wrong spark or mesos configuration? Does anyone have the >> same problems? >> > > > > -- > Michael Gummelt > Software Engineer > Mesosphere >
Spark diclines mesos offers
Hi, everyone! I run spark 2.1.0 jobs on the top of Mesos cluster in coarse-grained mode with dynamic resource allocation. And sometimes spark mesos scheduler declines mesos offers despite the fact that not all available resources were used (I have less workers than the possible maximum) and the maximum threshold in the spark configuration is not reached and the queue have lot of pending tasks. May be I have wrong spark or mesos configuration? Does anyone have the same problems?
Re: Spark runs out of memory with small file
Hi, Henry In first example the dict d always contains only one value because the_Id is same, in second case duct grows very quickly. So, I can suggest to firstly apply map function to split you file with string on rows then please make repartition and then apply custom logic Example: def splitf(s): return s.split("\n") rdd.flatmap(splitf).repartition(1000).map(your function) Best, Pavel On Mon, 27 Feb 2017, 06:28 Henry Tremblay, wrote: > Not sure where you want me to put yield. My first try caused an error in > Spark that it could not pickle generator objects. > > On 02/26/2017 03:25 PM, ayan guha wrote: > > Hi > > We are doing similar stuff, but with large number of small-ish files. What > we do is write a function to parse a complete file, similar to your parse > file. But we use yield, instead of return and flatmap on top of it. Can you > give it a try and let us know if it works? > > On Mon, Feb 27, 2017 at 9:02 AM, Koert Kuipers wrote: > > using wholeFiles to process formats that can not be split per line is not > "old" > > and there are plenty of problems for which RDD is still better suited than > Dataset or DataFrame currently (this might change in near future when > Dataset gets some crucial optimizations fixed). > > On Sun, Feb 26, 2017 at 3:14 PM, Gourav Sengupta < > gourav.sengu...@gmail.com> wrote: > > Hi Henry, > > Those guys in Databricks training are nuts and still use Spark 1.x for > their exams. Learning SPARK is a VERY VERY VERY old way of solving problems > using SPARK. > > The core engine of SPARK, which even I understand, has gone through > several fundamental changes. > > Just try reading the file using dataframes and try using SPARK 2.1. > > In other words it may be of tremendous benefit if you were learning to > solve problems which exists rather than problems which does not exist any > more. > > Please let me know in case I can be of any further help. > > Regards, > Gourav > > On Sun, Feb 26, 2017 at 7:09 PM, Henry Tremblay > wrote: > > The file is so small that a stand alone python script, independent of > spark, can process the file in under a second. > > Also, the following fails: > > 1. Read the whole file in with wholeFiles > > 2. use flatMap to get 50,000 rows that looks like: Row(id="path", > line="line") > > 3. Save the results as CVS to HDFS > > 4. Read the files (there are 20) from HDFS into a df using > sqlContext.read.csv() > > 5. Convert the df to an rdd. > > 6 Create key value pairs with the key being the file path and the value > being the line. > > 7 Iterate through values > > What happens is Spark either runs out of memory, or, in my last try with a > slight variation, just hangs for 12 hours. > > Henry > > On 02/26/2017 03:31 AM, 颜发才(Yan Facai) wrote: > > Hi, Tremblay. > Your file is .gz format, which is not splittable for hadoop. Perhaps the > file is loaded by only one executor. > How many executors do you start? > Perhaps repartition method could solve it, I guess. > > > On Sun, Feb 26, 2017 at 3:33 AM, Henry Tremblay > wrote: > > I am reading in a single small file from hadoop with wholeText. If I > process each line and create a row with two cells, the first cell equal to > the name of the file, the second cell equal to the line. That code runs > fine. > > But if I just add two line of code and change the first cell based on > parsing a line, spark runs out of memory. Any idea why such a simple > process that would succeed quickly in a non spark application fails? > > Thanks! > > Henry > > CODE: > > [hadoop@ip-172-31-35-67 ~]$ hadoop fs -du /mnt/temp > 3816096 > /mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.internal.warc.gz > > > In [1]: rdd1 = sc.wholeTextFiles("/mnt/temp") > In [2]: rdd1.count() > Out[2]: 1 > > > In [4]: def process_file(s): >...: text = s[1] >...: the_id = s[0] >...: d = {} >...: l = text.split("\n") >...: final = [] >...: for line in l: >...: d[the_id] = line >...: final.append(Row(**d)) >...: return final >...: > > In [5]: rdd2 = rdd1.map(process_file) > > In [6]: rdd2.count() > Out[6]: 1 > > In [7]: rdd3 = rdd2.flatMap(lambda x: x) > > In [8]: rdd3.count() > Out[8]: 508310 > > In [9]: rdd3.take(1) > Out[9]: [Row(hdfs://ip-172-31-35-67.us > -west-2.compute.internal:8020/mnt/temp/CC-MAIN-2017011609512 > 3-00570-ip-10-171-10-70.ec2.internal.warc.gz='WARC/1.0\r')] > > In [10]: def process_file(s): > ...: text = s[1] > ...: d = {} > ...: l = text.split("\n") > ...: final = [] > ...: the_id = "init" > ...: for line in l: > ...: if line[0:15] == 'WARC-Record-ID:': > ...: the_id = line[15:] > ...: d[the_id] = line > ...: final.append(Row(**d)) > ...: return final > > In [12]: rdd2 = rdd1.map(process_file) > > In [13]: rdd2.count() > 17/02/25 19:03:03 ERROR YarnScheduler: Lost executor 5 on > ip-172-31-41-89.us-west-2.compute.
Re: Launching an Spark application in a subset of machines
Hi, Alvaro You can create different clusters using standalone cluster manager, and than manage subset of machines through submitting application on different masters. Or you can use Mesos attributes to mark subset of workers and specify it in spark.mesos.constraints On Tue, Feb 7, 2017 at 1:21 PM Alvaro Brandon wrote: > Hello all: > > I have the following scenario. > - I have a cluster of 50 machines with Hadoop and Spark installed on them. > - I want to launch one Spark application through spark submit. However I > want this application to run on only a subset of these machines, > disregarding data locality. (e.g. 10 machines) > > Is this possible?. Is there any option in the standalone scheduler, YARN > or Mesos that allows such thing?. > > >
Re: physical memory usage keep increasing for spark app on Yarn
Hi Yang! I don't know exactly why this happen, but i think GC can't work to fast enough or size of data with additional objects created while computations to big for executor. And i found that this problem only if you make some data manipulations. You can cache you data first, after that, write in one partiton. For example val dropDF = df.drop("hh").drop("mm").drop("mode").drop("y").drop("m").drop("d") dropDF.cache() or dropDF.write.mode(SaveMode.ErrorIfExists).parquet(temppath) val dropDF = spark.read.parquet(temppath) and then dropDF.repartition(1).write.mode(SaveMode.ErrorIfExists).parquet(targetpath) Best, On Sun, Jan 22, 2017 at 12:31 PM Yang Cao wrote: > Also, do you know why this happen? > > On 2017年1月20日, at 18:23, Pavel Plotnikov > wrote: > > Hi Yang, > i have faced with the same problem on Mesos and to circumvent this issue i > am usually increase partition number. On last step in your code you reduce > number of partitions to 1, try to set bigger value, may be it solve this > problem. > > Cheers, > Pavel > > On Fri, Jan 20, 2017 at 12:35 PM Yang Cao wrote: > > Hi all, > > I am running a spark application on YARN-client mode with 6 executors > (each 4 cores and executor memory = 6G and Overhead = 4G, spark version: > 1.6.3 / 2.1.0). I find that my executor memory keeps increasing until get > killed by node manager; and give out the info that tells me to boost > spark.yarn.excutor.memoryOverhead. > I know that this param mainly control the size of memory allocated > off-heap. But I don’t know when and how the spark engine will use this part > of memory. Also increase that part of memory not always solve my > problem. sometimes works sometimes not. It trends to be useless when the > input data is large. > > FYI, my app’s logic is quite simple. It means to combine the small files > generated in one single day (one directory one day) into a single one and > write back to hdfs. Here is the core code: > > val df = spark.read.parquet(originpath).filter(s"m = ${ts.month} AND d = > ${ts.day}").coalesce(400) > > val dropDF = > df.drop("hh").drop("mm").drop("mode").drop("y").drop("m").drop("d") > > dropDF.repartition(1).write.mode(SaveMode.ErrorIfExists).parquet(targetpath) > > The source file may have hundreds to thousands level’s partition. And the > total parquet file is around 1to 5 gigs. Also I find that in the step that > shuffle reading data from different machines, The size of shuffle read is > about 4 times larger than the input size, Which is wired or some principle > I don’t know. > > Anyway, I have done some search myself for this problem. Some article said > that it’s on the direct buffer memory (I don’t set myself). Some article > said that people solve it with more frequent full GC. Also I find one > people on SO with very similar situation: > http://stackoverflow.com/questions/31646679/ever-increasing-physical-memory-for-a-spark-application-in-yarn > This guy claimed that it’s a bug with parquet but comment questioned him. > People in this mail list may also receive an email hours ago from > blondowski who described this problem while writing json: > http://apache-spark-user-list.1001560.n3.nabble.com/Executors-running-out-of-memory-tt28325.html#none > > So it looks like to be common question for different output format. I hope > someone with experience about this problem could make an explanation about > this issue. Why this happen and what is a reliable way to solve this > problem. > > Best, > > >
Re: physical memory usage keep increasing for spark app on Yarn
Hi Yang, i have faced with the same problem on Mesos and to circumvent this issue i am usually increase partition number. On last step in your code you reduce number of partitions to 1, try to set bigger value, may be it solve this problem. Cheers, Pavel On Fri, Jan 20, 2017 at 12:35 PM Yang Cao wrote: > Hi all, > > I am running a spark application on YARN-client mode with 6 executors > (each 4 cores and executor memory = 6G and Overhead = 4G, spark version: > 1.6.3 / 2.1.0). I find that my executor memory keeps increasing until get > killed by node manager; and give out the info that tells me to boost > spark.yarn.excutor.memoryOverhead. > I know that this param mainly control the size of memory allocated > off-heap. But I don’t know when and how the spark engine will use this part > of memory. Also increase that part of memory not always solve my > problem. sometimes works sometimes not. It trends to be useless when the > input data is large. > > FYI, my app’s logic is quite simple. It means to combine the small files > generated in one single day (one directory one day) into a single one and > write back to hdfs. Here is the core code: > > val df = spark.read.parquet(originpath).filter(s"m = ${ts.month} AND d = > ${ts.day}").coalesce(400) > > val dropDF = > df.drop("hh").drop("mm").drop("mode").drop("y").drop("m").drop("d") > > dropDF.repartition(1).write.mode(SaveMode.ErrorIfExists).parquet(targetpath) > > The source file may have hundreds to thousands level’s partition. And the > total parquet file is around 1to 5 gigs. Also I find that in the step that > shuffle reading data from different machines, The size of shuffle read is > about 4 times larger than the input size, Which is wired or some principle > I don’t know. > > Anyway, I have done some search myself for this problem. Some article said > that it’s on the direct buffer memory (I don’t set myself). Some article > said that people solve it with more frequent full GC. Also I find one > people on SO with very similar situation: > http://stackoverflow.com/questions/31646679/ever-increasing-physical-memory-for-a-spark-application-in-yarn > This guy claimed that it’s a bug with parquet but comment questioned him. > People in this mail list may also receive an email hours ago from > blondowski who described this problem while writing json: > http://apache-spark-user-list.1001560.n3.nabble.com/Executors-running-out-of-memory-tt28325.html#none > > So it looks like to be common question for different output format. I hope > someone with experience about this problem could make an explanation about > this issue. Why this happen and what is a reliable way to solve this > problem. > > Best, > > >
Re: Spark partition size tuning
Hi, May be *sc.hadoopConfiguration.setInt( "dfs.blocksize", blockSize ) *helps you Best Regards, Pavel On Tue, Jan 26, 2016 at 7:13 AM Jia Zou wrote: > Dear all, > > First to update that the local file system data partition size can be > tuned by: > sc.hadoopConfiguration().setLong("fs.local.block.size", blocksize) > > However, I also need to tune Spark data partition size for input data that > is stored in Tachyon (default is 512MB), but above method can't work for > Tachyon data. > > Do you have any suggestions? Thanks very much! > > Best Regards, > Jia > > > -- Forwarded message -- > From: Jia Zou > Date: Thu, Jan 21, 2016 at 10:05 PM > Subject: Spark partition size tuning > To: "user @spark" > > > Dear all! > > When using Spark to read from local file system, the default partition > size is 32MB, how can I increase the partition size to 128MB, to reduce the > number of tasks? > > Thank you very much! > > Best Regards, > Jia > >
Re: Parquet write optimization by row group size config
I have got about 25 separated gzipped log files per hour. File sizes is very different, from 10MB to 50MB of gzipped JSON data. So, i'am convert this data in parquet each hour. Code very simple on python: text_file = sc.textFile(src_file) df = sqlCtx.jsonRDD(text_file.map(lambda x: x.split('\t')[2]).map(json.loads).flatMap(flatting_events).map(specific_keys_types_wrapper).map(json.dumps)) df.write.parquet(out_file, mode='overwrite') - The JSON in log files is not clear, and i need to make some preparation via rdd. Output parquet files is very small about 35MB for largest source files. This source log files converted one by one. It is cool that all converting transformations are executed on lot of machine cores quickly, but when i run command htop on my machines i found that it mostly use only one core. So it very strange. First think - create lot of spark contexts for each input file (or group of files) and allocate then only 2 cores, and then it will be use all servers power. But this solution looks ugly, and it eliminates all the beauty of Spark in this case, may be this case not for spark. I found, that on fist seconds job use all available cores but then start work on one and it is not a IO probleb (file sizes to small for raid over ssd). So, second think - problem in parquet files. After some docs reading, i am understand that parquet have hot a lot of levels of parallelism, and I should look for a solution out there. On Thu, Jan 21, 2016 at 10:35 AM Jörn Franke wrote: > What is your data size, the algorithm and the expected time? > Depending on this the group can recommend you optimizations or tell you > that the expectations are wrong > > On 20 Jan 2016, at 18:24, Pavel Plotnikov > wrote: > > Thanks, Akhil! It helps, but this jobs still not fast enough, maybe i > missed something > > Regards, > Pavel > > On Wed, Jan 20, 2016 at 9:51 AM Akhil Das > wrote: > >> Did you try re-partitioning the data before doing the write? >> >> Thanks >> Best Regards >> >> On Tue, Jan 19, 2016 at 6:13 PM, Pavel Plotnikov < >> pavel.plotni...@team.wrike.com> wrote: >> >>> Hello, >>> I'm using spark on some machines in standalone mode, data storage is >>> mounted on this machines via nfs. A have input data stream and when i'm >>> trying to store all data for hour in parquet, a job executes mostly on one >>> core and this hourly data are stored in 40- 50 minutes. It is very slow! >>> And it is not IO problem. After research how parquet file works, i'm found >>> that it can be parallelized on row group abstraction level. >>> I think row group for my files is to large, and how can i change it? >>> When i create to big DataFrame i devides in parts very well and writes >>> quikly! >>> >>> Thanks, >>> Pavel >>> >> >>
Re: Parquet write optimization by row group size config
Thanks, Akhil! It helps, but this jobs still not fast enough, maybe i missed something Regards, Pavel On Wed, Jan 20, 2016 at 9:51 AM Akhil Das wrote: > Did you try re-partitioning the data before doing the write? > > Thanks > Best Regards > > On Tue, Jan 19, 2016 at 6:13 PM, Pavel Plotnikov < > pavel.plotni...@team.wrike.com> wrote: > >> Hello, >> I'm using spark on some machines in standalone mode, data storage is >> mounted on this machines via nfs. A have input data stream and when i'm >> trying to store all data for hour in parquet, a job executes mostly on one >> core and this hourly data are stored in 40- 50 minutes. It is very slow! >> And it is not IO problem. After research how parquet file works, i'm found >> that it can be parallelized on row group abstraction level. >> I think row group for my files is to large, and how can i change it? >> When i create to big DataFrame i devides in parts very well and writes >> quikly! >> >> Thanks, >> Pavel >> > >
Re: Can I configure Spark on multiple nodes using local filesystem on each node?
Hi, I'm using Spark in standalone mode without HDFS, and shared folder is mounted on nodes via nfs. It looks like each node write data like in local file system. Regards, Pavel On Tue, Jan 19, 2016 at 5:39 PM Jia Zou wrote: > Dear all, > > Can I configure Spark on multiple nodes without HDFS, so that output data > will be written to the local file system on each node? > > I guess there is no such feature in Spark, but just want to confirm. > > Best Regards, > Jia >
Parquet write optimization by row group size config
Hello, I'm using spark on some machines in standalone mode, data storage is mounted on this machines via nfs. A have input data stream and when i'm trying to store all data for hour in parquet, a job executes mostly on one core and this hourly data are stored in 40- 50 minutes. It is very slow! And it is not IO problem. After research how parquet file works, i'm found that it can be parallelized on row group abstraction level. I think row group for my files is to large, and how can i change it? When i create to big DataFrame i devides in parts very well and writes quikly! Thanks, Pavel