Re: Spark on Mesos - Weird behavior

2018-07-11 Thread Pavel Plotnikov
Oh, sorry, i missed that you use spark without dynamic allocation. Anyway,
i don't know does this parameters works without dynamic allocation.

On Wed, Jul 11, 2018 at 5:11 PM Thodoris Zois  wrote:

> Hello,
>
> Yeah you are right, but I think that works only if you use Spark dynamic
> allocation. Am I wrong?
>
> -Thodoris
>
> On 11 Jul 2018, at 17:09, Pavel Plotnikov 
> wrote:
>
> Hi, Thodoris
> You can configure resources per executor and manipulate with number of
> executers instead using spark.max.cores. I think 
> spark.dynamicAllocation.minExecutors
> and spark.dynamicAllocation.maxExecutors configuration values can help
> you.
>
> On Tue, Jul 10, 2018 at 5:07 PM Thodoris Zois  wrote:
>
>> Actually after some experiments we figured out that spark.max.cores /
>> spark.executor.cores is the upper bound for the executors. Spark apps will
>> run even only if one executor can be launched.
>>
>> Is there any way to specify also the lower bound? It is a bit annoying
>> that seems that we can’t control the resource usage of an application. By
>> the way, we are not using dynamic allocation.
>>
>> - Thodoris
>>
>>
>> On 10 Jul 2018, at 14:35, Pavel Plotnikov 
>> wrote:
>>
>> Hello Thodoris!
>> Have you checked this:
>>  - does mesos cluster have available resources?
>>   - if spark have waiting tasks in queue more than
>> spark.dynamicAllocation.schedulerBacklogTimeout configuration value?
>>  - And then, have you checked that mesos send offers to spark app mesos
>> framework at least with 10 cores and 2GB RAM?
>>
>> If mesos have not available offers with 10 cores, for example, but have
>> with 8 or 9, so you can use smaller executers for better fit for available
>> resources on nodes for example with 4 cores and 1 GB RAM, for example
>>
>> Cheers,
>> Pavel
>>
>> On Mon, Jul 9, 2018 at 9:05 PM Thodoris Zois  wrote:
>>
>>> Hello list,
>>>
>>> We are running Apache Spark on a Mesos cluster and we face a weird
>>> behavior of executors. When we submit an app with e.g 10 cores and 2GB of
>>> memory and max cores 30, we expect to see 3 executors running on the
>>> cluster. However, sometimes there are only 2... Spark applications are not
>>> the only one that run on the cluster. I guess that Spark starts executors
>>> on the available offers even if it does not satisfy our needs. Is there any
>>> configuration that we can use in order to prevent Spark from starting when
>>> there are no resource offers for the total number of executors?
>>>
>>> Thank you
>>> - Thodoris
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>


Re: Spark on Mesos - Weird behavior

2018-07-11 Thread Pavel Plotnikov
Hi, Thodoris
You can configure resources per executor and manipulate with number of
executers instead using spark.max.cores. I think
spark.dynamicAllocation.minExecutors
and spark.dynamicAllocation.maxExecutors configuration values can help you.

On Tue, Jul 10, 2018 at 5:07 PM Thodoris Zois  wrote:

> Actually after some experiments we figured out that spark.max.cores /
> spark.executor.cores is the upper bound for the executors. Spark apps will
> run even only if one executor can be launched.
>
> Is there any way to specify also the lower bound? It is a bit annoying
> that seems that we can’t control the resource usage of an application. By
> the way, we are not using dynamic allocation.
>
> - Thodoris
>
>
> On 10 Jul 2018, at 14:35, Pavel Plotnikov 
> wrote:
>
> Hello Thodoris!
> Have you checked this:
>  - does mesos cluster have available resources?
>   - if spark have waiting tasks in queue more than
> spark.dynamicAllocation.schedulerBacklogTimeout configuration value?
>  - And then, have you checked that mesos send offers to spark app mesos
> framework at least with 10 cores and 2GB RAM?
>
> If mesos have not available offers with 10 cores, for example, but have
> with 8 or 9, so you can use smaller executers for better fit for available
> resources on nodes for example with 4 cores and 1 GB RAM, for example
>
> Cheers,
> Pavel
>
> On Mon, Jul 9, 2018 at 9:05 PM Thodoris Zois  wrote:
>
>> Hello list,
>>
>> We are running Apache Spark on a Mesos cluster and we face a weird
>> behavior of executors. When we submit an app with e.g 10 cores and 2GB of
>> memory and max cores 30, we expect to see 3 executors running on the
>> cluster. However, sometimes there are only 2... Spark applications are not
>> the only one that run on the cluster. I guess that Spark starts executors
>> on the available offers even if it does not satisfy our needs. Is there any
>> configuration that we can use in order to prevent Spark from starting when
>> there are no resource offers for the total number of executors?
>>
>> Thank you
>> - Thodoris
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Re: Spark on Mesos - Weird behavior

2018-07-10 Thread Pavel Plotnikov
Hello Thodoris!
Have you checked this:
 - does mesos cluster have available resources?
  - if spark have waiting tasks in queue more than
spark.dynamicAllocation.schedulerBacklogTimeout configuration value?
 - And then, have you checked that mesos send offers to spark app mesos
framework at least with 10 cores and 2GB RAM?

If mesos have not available offers with 10 cores, for example, but have
with 8 or 9, so you can use smaller executers for better fit for available
resources on nodes for example with 4 cores and 1 GB RAM, for example

Cheers,
Pavel

On Mon, Jul 9, 2018 at 9:05 PM Thodoris Zois  wrote:

> Hello list,
>
> We are running Apache Spark on a Mesos cluster and we face a weird
> behavior of executors. When we submit an app with e.g 10 cores and 2GB of
> memory and max cores 30, we expect to see 3 executors running on the
> cluster. However, sometimes there are only 2... Spark applications are not
> the only one that run on the cluster. I guess that Spark starts executors
> on the available offers even if it does not satisfy our needs. Is there any
> configuration that we can use in order to prevent Spark from starting when
> there are no resource offers for the total number of executors?
>
> Thank you
> - Thodoris
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark diclines mesos offers

2017-04-26 Thread Pavel Plotnikov
Michael Gummelt, Thanks!!! I'm forgot about debug logging!

On Mon, Apr 24, 2017 at 9:30 PM Michael Gummelt 
wrote:

> Have you run with debug logging?  There are some hints in the debug logs:
> https://github.com/apache/spark/blob/branch-2.1/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala#L316
>
> On Mon, Apr 24, 2017 at 4:53 AM, Pavel Plotnikov <
> pavel.plotni...@team.wrike.com> wrote:
>
>> Hi, everyone! I run spark 2.1.0 jobs on the top of Mesos cluster in
>> coarse-grained mode with dynamic resource allocation. And sometimes spark
>> mesos scheduler declines mesos offers despite the fact that not all
>> available resources were used (I have less workers than the possible
>> maximum) and the maximum threshold in the spark configuration is not
>> reached and the queue have lot of pending tasks.
>>
>> May be I have wrong spark or mesos configuration? Does anyone have the
>> same problems?
>>
>
>
>
> --
> Michael Gummelt
> Software Engineer
> Mesosphere
>


Spark diclines mesos offers

2017-04-24 Thread Pavel Plotnikov
Hi, everyone! I run spark 2.1.0 jobs on the top of Mesos cluster in
coarse-grained mode with dynamic resource allocation. And sometimes spark
mesos scheduler declines mesos offers despite the fact that not all
available resources were used (I have less workers than the possible
maximum) and the maximum threshold in the spark configuration is not
reached and the queue have lot of pending tasks.

May be I have wrong spark or mesos configuration? Does anyone have the same
problems?


Re: Spark runs out of memory with small file

2017-02-26 Thread Pavel Plotnikov
Hi, Henry

In first example the dict d always contains only one value because the_Id
is same, in second case duct grows very quickly.
So, I can suggest to firstly apply map function to split you file with
string on rows then please make repartition and then apply custom logic

Example:

def splitf(s):
return s.split("\n")

rdd.flatmap(splitf).repartition(1000).map(your function)

Best,
Pavel

On Mon, 27 Feb 2017, 06:28 Henry Tremblay,  wrote:

> Not sure where you want me to put yield. My first try caused an error in
> Spark that it could not pickle generator objects.
>
> On 02/26/2017 03:25 PM, ayan guha wrote:
>
> Hi
>
> We are doing similar stuff, but with large number of small-ish files. What
> we do is write a function to parse a complete file, similar to your parse
> file. But we use yield, instead of return and flatmap on top of it. Can you
> give it a try and let us know if it works?
>
> On Mon, Feb 27, 2017 at 9:02 AM, Koert Kuipers  wrote:
>
> using wholeFiles to process formats that can not be split per line is not
> "old"
>
> and there are plenty of problems for which RDD is still better suited than
> Dataset or DataFrame currently (this might change in near future when
> Dataset gets some crucial optimizations fixed).
>
> On Sun, Feb 26, 2017 at 3:14 PM, Gourav Sengupta <
> gourav.sengu...@gmail.com> wrote:
>
> Hi Henry,
>
> Those guys in Databricks training are nuts and still use Spark 1.x for
> their exams. Learning SPARK is a VERY VERY VERY old way of solving problems
> using SPARK.
>
> The core engine of SPARK, which even I understand, has gone through
> several fundamental changes.
>
> Just try reading the file using dataframes and try using SPARK 2.1.
>
> In other words it may be of tremendous benefit if you were learning to
> solve problems which exists rather than problems which does not exist any
> more.
>
> Please let me know in case I can be of any further help.
>
> Regards,
> Gourav
>
> On Sun, Feb 26, 2017 at 7:09 PM, Henry Tremblay 
> wrote:
>
> The file is so small that a stand alone python script, independent of
> spark, can process the file in under a second.
>
> Also, the following fails:
>
> 1. Read the whole file in with wholeFiles
>
> 2. use flatMap to get 50,000 rows that looks like: Row(id="path",
> line="line")
>
> 3. Save the results as CVS to HDFS
>
> 4. Read the files (there are 20) from HDFS into a df using
> sqlContext.read.csv()
>
> 5. Convert the df to an rdd.
>
> 6 Create key value pairs with the key being the file path and the value
> being the line.
>
> 7 Iterate through values
>
> What happens is Spark either runs out of memory, or, in my last try with a
> slight variation, just hangs for 12 hours.
>
> Henry
>
> On 02/26/2017 03:31 AM, 颜发才(Yan Facai) wrote:
>
> Hi, Tremblay.
> Your file is .gz format, which is not splittable for hadoop. Perhaps the
> file is loaded by only one executor.
> How many executors do you start?
> Perhaps repartition method could solve it, I guess.
>
>
> On Sun, Feb 26, 2017 at 3:33 AM, Henry Tremblay 
> wrote:
>
> I am reading in a single small file from hadoop with wholeText. If I
> process each line and create a row with two cells, the first cell equal to
> the name of the file, the second cell equal to the line. That code runs
> fine.
>
> But if I just add two line of code and change the first cell based on
> parsing a line, spark runs out of memory. Any idea why such a simple
> process that would succeed quickly in a non spark application fails?
>
> Thanks!
>
> Henry
>
> CODE:
>
> [hadoop@ip-172-31-35-67 ~]$ hadoop fs -du /mnt/temp
> 3816096
> /mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.internal.warc.gz
>
>
> In [1]: rdd1 = sc.wholeTextFiles("/mnt/temp")
> In [2]: rdd1.count()
> Out[2]: 1
>
>
> In [4]: def process_file(s):
>...: text = s[1]
>...: the_id = s[0]
>...: d = {}
>...: l =  text.split("\n")
>...: final = []
>...: for line in l:
>...: d[the_id] = line
>...: final.append(Row(**d))
>...: return final
>...:
>
> In [5]: rdd2 = rdd1.map(process_file)
>
> In [6]: rdd2.count()
> Out[6]: 1
>
> In [7]: rdd3 = rdd2.flatMap(lambda x: x)
>
> In [8]: rdd3.count()
> Out[8]: 508310
>
> In [9]: rdd3.take(1)
> Out[9]: [Row(hdfs://ip-172-31-35-67.us
> -west-2.compute.internal:8020/mnt/temp/CC-MAIN-2017011609512
> 3-00570-ip-10-171-10-70.ec2.internal.warc.gz='WARC/1.0\r')]
>
> In [10]: def process_file(s):
> ...: text = s[1]
> ...: d = {}
> ...: l =  text.split("\n")
> ...: final = []
> ...: the_id = "init"
> ...: for line in l:
> ...: if line[0:15] == 'WARC-Record-ID:':
> ...: the_id = line[15:]
> ...: d[the_id] = line
> ...: final.append(Row(**d))
> ...: return final
>
> In [12]: rdd2 = rdd1.map(process_file)
>
> In [13]: rdd2.count()
> 17/02/25 19:03:03 ERROR YarnScheduler: Lost executor 5 on
> ip-172-31-41-89.us-west-2.compute.

Re: Launching an Spark application in a subset of machines

2017-02-07 Thread Pavel Plotnikov
Hi, Alvaro
You can create different clusters using standalone cluster manager, and
than manage subset of machines through submitting application on different
masters. Or you can use Mesos attributes to mark subset of workers and
specify it in spark.mesos.constraints


On Tue, Feb 7, 2017 at 1:21 PM Alvaro Brandon 
wrote:

> Hello all:
>
> I have the following scenario.
> - I have a cluster of 50 machines with Hadoop and Spark installed on them.
> - I want to launch one Spark application through spark submit. However I
> want this application to run on only a subset of these machines,
> disregarding data locality. (e.g. 10 machines)
>
> Is this possible?. Is there any option in the standalone scheduler, YARN
> or Mesos that allows such thing?.
>
>
>


Re: physical memory usage keep increasing for spark app on Yarn

2017-01-23 Thread Pavel Plotnikov
Hi Yang!

I don't know exactly why this happen, but i think GC can't work to fast
enough or size of data with additional objects created while computations
to big for executor.
And i found that this problem only if you make some data manipulations. You
can cache you data first, after that, write in one partiton.
For example

val dropDF = df.drop("hh").drop("mm").drop("mode").drop("y").drop("m").drop("d")

dropDF.cache()

or

dropDF.write.mode(SaveMode.ErrorIfExists).parquet(temppath)

val dropDF = spark.read.parquet(temppath)

and then

dropDF.repartition(1).write.mode(SaveMode.ErrorIfExists).parquet(targetpath)

Best,

On Sun, Jan 22, 2017 at 12:31 PM Yang Cao  wrote:

> Also, do you know why this happen?
>
> On 2017年1月20日, at 18:23, Pavel Plotnikov 
> wrote:
>
> Hi Yang,
> i have faced with the same problem on Mesos and to circumvent this issue i
> am usually increase partition number. On last step in your code you reduce
> number of partitions to 1, try to set bigger value, may be it solve this
> problem.
>
> Cheers,
> Pavel
>
> On Fri, Jan 20, 2017 at 12:35 PM Yang Cao  wrote:
>
> Hi all,
>
> I am running a spark application on YARN-client mode with 6 executors
> (each 4 cores and executor memory = 6G and Overhead = 4G, spark version:
> 1.6.3 / 2.1.0). I find that my executor memory keeps increasing until get
> killed by node manager; and give out the info that tells me to boost 
> spark.yarn.excutor.memoryOverhead.
> I know that this param mainly control the size of memory allocated
> off-heap. But I don’t know when and how the spark engine will use this part
> of memory. Also increase that part of memory not always solve my
> problem. sometimes works sometimes not. It trends to be useless when the
> input data is large.
>
> FYI, my app’s logic is quite simple. It means to combine the small files
> generated in one single day (one directory one day) into a single one and
> write back to hdfs. Here is the core code:
>
> val df = spark.read.parquet(originpath).filter(s"m = ${ts.month} AND d = 
> ${ts.day}").coalesce(400)
>
> val dropDF = 
> df.drop("hh").drop("mm").drop("mode").drop("y").drop("m").drop("d")
>
> dropDF.repartition(1).write.mode(SaveMode.ErrorIfExists).parquet(targetpath)
>
> The source file may have hundreds to thousands level’s partition. And the
> total parquet file is around 1to 5 gigs. Also I find that in the step that
> shuffle reading data from different machines, The size of shuffle read is
> about 4 times larger than the input size, Which is wired or some principle
> I don’t know.
>
> Anyway, I have done some search myself for this problem. Some article said
> that it’s on the direct buffer memory (I don’t set myself). Some article
> said that people solve it with more frequent full GC. Also I find one
> people on SO with very similar situation:
> http://stackoverflow.com/questions/31646679/ever-increasing-physical-memory-for-a-spark-application-in-yarn
> This guy claimed that it’s a bug with parquet but comment questioned him.
> People in this mail list may also receive an email hours ago from
> blondowski who described this problem while writing json:
> http://apache-spark-user-list.1001560.n3.nabble.com/Executors-running-out-of-memory-tt28325.html#none
>
> So it looks like to be common question for different output format. I hope
> someone with experience about this problem could make an explanation about
> this issue. Why this happen and what is a reliable way to solve this
> problem.
>
> Best,
>
>
>


Re: physical memory usage keep increasing for spark app on Yarn

2017-01-20 Thread Pavel Plotnikov
Hi Yang,
i have faced with the same problem on Mesos and to circumvent this issue i
am usually increase partition number. On last step in your code you reduce
number of partitions to 1, try to set bigger value, may be it solve this
problem.

Cheers,
Pavel

On Fri, Jan 20, 2017 at 12:35 PM Yang Cao  wrote:

> Hi all,
>
> I am running a spark application on YARN-client mode with 6 executors
> (each 4 cores and executor memory = 6G and Overhead = 4G, spark version:
> 1.6.3 / 2.1.0). I find that my executor memory keeps increasing until get
> killed by node manager; and give out the info that tells me to boost 
> spark.yarn.excutor.memoryOverhead.
> I know that this param mainly control the size of memory allocated
> off-heap. But I don’t know when and how the spark engine will use this part
> of memory. Also increase that part of memory not always solve my
> problem. sometimes works sometimes not. It trends to be useless when the
> input data is large.
>
> FYI, my app’s logic is quite simple. It means to combine the small files
> generated in one single day (one directory one day) into a single one and
> write back to hdfs. Here is the core code:
>
> val df = spark.read.parquet(originpath).filter(s"m = ${ts.month} AND d = 
> ${ts.day}").coalesce(400)
>
> val dropDF = 
> df.drop("hh").drop("mm").drop("mode").drop("y").drop("m").drop("d")
>
> dropDF.repartition(1).write.mode(SaveMode.ErrorIfExists).parquet(targetpath)
>
> The source file may have hundreds to thousands level’s partition. And the
> total parquet file is around 1to 5 gigs. Also I find that in the step that
> shuffle reading data from different machines, The size of shuffle read is
> about 4 times larger than the input size, Which is wired or some principle
> I don’t know.
>
> Anyway, I have done some search myself for this problem. Some article said
> that it’s on the direct buffer memory (I don’t set myself). Some article
> said that people solve it with more frequent full GC. Also I find one
> people on SO with very similar situation:
> http://stackoverflow.com/questions/31646679/ever-increasing-physical-memory-for-a-spark-application-in-yarn
> This guy claimed that it’s a bug with parquet but comment questioned him.
> People in this mail list may also receive an email hours ago from
> blondowski who described this problem while writing json:
> http://apache-spark-user-list.1001560.n3.nabble.com/Executors-running-out-of-memory-tt28325.html#none
>
> So it looks like to be common question for different output format. I hope
> someone with experience about this problem could make an explanation about
> this issue. Why this happen and what is a reliable way to solve this
> problem.
>
> Best,
>
>
>


Re: Spark partition size tuning

2016-01-26 Thread Pavel Plotnikov
Hi,
May be *sc.hadoopConfiguration.setInt( "dfs.blocksize", blockSize ) *helps
you

Best Regards,
Pavel

On Tue, Jan 26, 2016 at 7:13 AM Jia Zou  wrote:

> Dear all,
>
> First to update that the local file system data partition size can be
> tuned by:
> sc.hadoopConfiguration().setLong("fs.local.block.size", blocksize)
>
> However, I also need to tune Spark data partition size for input data that
> is stored in Tachyon (default is 512MB), but above method can't work for
> Tachyon data.
>
> Do you have any suggestions? Thanks very much!
>
> Best Regards,
> Jia
>
>
> -- Forwarded message --
> From: Jia Zou 
> Date: Thu, Jan 21, 2016 at 10:05 PM
> Subject: Spark partition size tuning
> To: "user @spark" 
>
>
> Dear all!
>
> When using Spark to read from local file system, the default partition
> size is 32MB, how can I increase the partition size to 128MB, to reduce the
> number of tasks?
>
> Thank you very much!
>
> Best Regards,
> Jia
>
>


Re: Parquet write optimization by row group size config

2016-01-21 Thread Pavel Plotnikov
I have got about 25 separated gzipped log files per hour. File sizes is
very different, from 10MB to 50MB of gzipped JSON data. So, i'am convert
this data in parquet each hour. Code very simple on python:

text_file = sc.textFile(src_file)
df = sqlCtx.jsonRDD(text_file.map(lambda x:
x.split('\t')[2]).map(json.loads).flatMap(flatting_events).map(specific_keys_types_wrapper).map(json.dumps))
df.write.parquet(out_file, mode='overwrite')
-

The JSON in log files is not clear, and i need to make some preparation via
rdd. Output parquet files is very small about 35MB for largest source
files. This source log files converted one by one. It is cool that all
converting transformations are executed on lot of machine cores quickly,
but when i run command htop on my machines i found that it mostly use only
one core. So it very strange.
First think - create lot of spark contexts for each input file (or group of
files) and allocate then only 2 cores, and then it will be use all servers
power. But this solution looks ugly, and it eliminates all the beauty of
Spark in this case, may be this case not for spark.
I found, that on fist seconds job use all available cores but then start
work on one and it is not a IO probleb (file sizes to small for raid over
ssd). So, second think - problem in parquet files. After some docs reading,
i am understand that parquet have hot a lot of  levels of parallelism, and I
should look for a solution out there.

On Thu, Jan 21, 2016 at 10:35 AM Jörn Franke  wrote:

> What is your data size, the algorithm and the expected time?
> Depending on this the group can recommend you optimizations or tell you
> that the expectations are wrong
>
> On 20 Jan 2016, at 18:24, Pavel Plotnikov 
> wrote:
>
> Thanks, Akhil! It helps, but this jobs still not fast enough, maybe i
> missed something
>
> Regards,
> Pavel
>
> On Wed, Jan 20, 2016 at 9:51 AM Akhil Das 
> wrote:
>
>> Did you try re-partitioning the data before doing the write?
>>
>> Thanks
>> Best Regards
>>
>> On Tue, Jan 19, 2016 at 6:13 PM, Pavel Plotnikov <
>> pavel.plotni...@team.wrike.com> wrote:
>>
>>> Hello,
>>> I'm using spark on some machines in standalone mode, data storage is
>>> mounted on this machines via nfs. A have input data stream and when i'm
>>> trying to store all data for hour in parquet, a job executes mostly on one
>>> core and this hourly data are stored in 40- 50 minutes. It is very slow!
>>> And it is not IO problem. After research how parquet file works, i'm found
>>> that it can be parallelized on row group abstraction level.
>>> I think row group for my files is to large, and how can i change it?
>>> When i create to big DataFrame i devides in parts very well and writes
>>> quikly!
>>>
>>> Thanks,
>>> Pavel
>>>
>>
>>


Re: Parquet write optimization by row group size config

2016-01-20 Thread Pavel Plotnikov
Thanks, Akhil! It helps, but this jobs still not fast enough, maybe i
missed something

Regards,
Pavel

On Wed, Jan 20, 2016 at 9:51 AM Akhil Das 
wrote:

> Did you try re-partitioning the data before doing the write?
>
> Thanks
> Best Regards
>
> On Tue, Jan 19, 2016 at 6:13 PM, Pavel Plotnikov <
> pavel.plotni...@team.wrike.com> wrote:
>
>> Hello,
>> I'm using spark on some machines in standalone mode, data storage is
>> mounted on this machines via nfs. A have input data stream and when i'm
>> trying to store all data for hour in parquet, a job executes mostly on one
>> core and this hourly data are stored in 40- 50 minutes. It is very slow!
>> And it is not IO problem. After research how parquet file works, i'm found
>> that it can be parallelized on row group abstraction level.
>> I think row group for my files is to large, and how can i change it?
>> When i create to big DataFrame i devides in parts very well and writes
>> quikly!
>>
>> Thanks,
>> Pavel
>>
>
>


Re: Can I configure Spark on multiple nodes using local filesystem on each node?

2016-01-19 Thread Pavel Plotnikov
Hi,

I'm using Spark in standalone mode without HDFS, and shared folder is
mounted on nodes via nfs. It looks like each node write data like in local
file system.

Regards,
Pavel

On Tue, Jan 19, 2016 at 5:39 PM Jia Zou  wrote:

> Dear all,
>
> Can I configure Spark on multiple nodes without HDFS, so that output data
> will be written to the local file system on each node?
>
> I guess there is no such feature in Spark, but just want to confirm.
>
> Best Regards,
> Jia
>


Parquet write optimization by row group size config

2016-01-19 Thread Pavel Plotnikov
Hello,
I'm using spark on some machines in standalone mode, data storage is
mounted on this machines via nfs. A have input data stream and when i'm
trying to store all data for hour in parquet, a job executes mostly on one
core and this hourly data are stored in 40- 50 minutes. It is very slow!
And it is not IO problem. After research how parquet file works, i'm found
that it can be parallelized on row group abstraction level.
I think row group for my files is to large, and how can i change it?
When i create to big DataFrame i devides in parts very well and writes
quikly!

Thanks,
Pavel