data size exceeds the total ram

2022-02-11 Thread frakass

Hello

I have three nodes with total memory 128G x 3 = 384GB
But the input data is about 1TB.
How can spark handle this case?

Thanks.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: data size exceeds the total ram

2022-02-11 Thread Mich Talebzadeh
Well one experiment is worth many times more than asking what/if scenario
question.


   1. Try running it first to see how spark handles it
   2. Go to spark GUI (on port 4044) and look at the storage tab and see
   what it says
   3. Unless you explicitly persist the data, Spark will read the data
   using appropriate partitions given the memory size and cluster count. As
   long as there is sufficient disk space (not memory), Spark will handle
   files larger than the available memory. However, If you do persist, you
   will get an Out of Memory error

HTH



   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 11 Feb 2022 at 09:23, frakass  wrote:

> Hello
>
> I have three nodes with total memory 128G x 3 = 384GB
> But the input data is about 1TB.
> How can spark handle this case?
>
> Thanks.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: data size exceeds the total ram

2022-02-11 Thread Gourav Sengupta
Hi,

just so that we understand the problem first?

What is the source data (is it JSON, CSV, Parquet, etc)? Where are you
reading it from (JDBC, file, etc)? What is the compression format (GZ,
BZIP, etc)? What is the SPARK version that you are using?


Thanks and Regards,
Gourav Sengupta

On Fri, Feb 11, 2022 at 9:39 AM Mich Talebzadeh 
wrote:

> Well one experiment is worth many times more than asking what/if scenario
> question.
>
>
>1. Try running it first to see how spark handles it
>2. Go to spark GUI (on port 4044) and look at the storage tab and see
>what it says
>3. Unless you explicitly persist the data, Spark will read the data
>using appropriate partitions given the memory size and cluster count. As
>long as there is sufficient disk space (not memory), Spark will handle
>files larger than the available memory. However, If you do persist,
>you will get an Out of Memory error
>
> HTH
>
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 11 Feb 2022 at 09:23, frakass  wrote:
>
>> Hello
>>
>> I have three nodes with total memory 128G x 3 = 384GB
>> But the input data is about 1TB.
>> How can spark handle this case?
>>
>> Thanks.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Re: data size exceeds the total ram

2022-02-11 Thread frakass




On 2022/2/11 6:16, Gourav Sengupta wrote:
What is the source data (is it JSON, CSV, Parquet, etc)? Where are you 
reading it from (JDBC, file, etc)? What is the compression format (GZ, 
BZIP, etc)? What is the SPARK version that you are using?


it's a well built csv file (no compressed) stored in HDFS.
spark 3.2.0

Thanks.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: data size exceeds the total ram

2022-02-11 Thread frakass

Hello list

I have imported the data into spark and I found there is disk IO in 
every node. The memory didn't get overflow.


But such query is quite slow:

>>> df.groupBy("rvid").agg({'rate':'avg','rvid':'count'}).show()


May I ask:
1. since I have 3 nodes (as known as 3 executors?), are there 3 
partitions for each job?

2. can I expand the partition by hand to increase the performance?

Thanks



On 2022/2/11 6:22, frakass wrote:



On 2022/2/11 6:16, Gourav Sengupta wrote:
What is the source data (is it JSON, CSV, Parquet, etc)? Where are you 
reading it from (JDBC, file, etc)? What is the compression format (GZ, 
BZIP, etc)? What is the SPARK version that you are using?


it's a well built csv file (no compressed) stored in HDFS.
spark 3.2.0

Thanks.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: data size exceeds the total ram

2022-02-11 Thread Mich Talebzadeh
check this

https://sparkbyexamples.com/spark/spark-partitioning-understanding/



   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 11 Feb 2022 at 11:09, frakass  wrote:

> Hello list
>
> I have imported the data into spark and I found there is disk IO in
> every node. The memory didn't get overflow.
>
> But such query is quite slow:
>
>  >>> df.groupBy("rvid").agg({'rate':'avg','rvid':'count'}).show()
>
>
> May I ask:
> 1. since I have 3 nodes (as known as 3 executors?), are there 3
> partitions for each job?
> 2. can I expand the partition by hand to increase the performance?
>
> Thanks
>
>
>
> On 2022/2/11 6:22, frakass wrote:
> >
> >
> > On 2022/2/11 6:16, Gourav Sengupta wrote:
> >> What is the source data (is it JSON, CSV, Parquet, etc)? Where are you
> >> reading it from (JDBC, file, etc)? What is the compression format (GZ,
> >> BZIP, etc)? What is the SPARK version that you are using?
> >
> > it's a well built csv file (no compressed) stored in HDFS.
> > spark 3.2.0
> >
> > Thanks.
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Unable to force small partitions in streaming job without repartitioning

2022-02-11 Thread Chris Coutinho
Hello,

We have a spark structured streaming job that includes a stream-static join
and a Pandas UDF, streaming to/from delta tables. The primary key of the
static table is non-unique, meaning that the streaming join results in
multiple records per input record - in our case 100x increase. The Pandas
UDF is then applied to the resulting stream-static join and stored in a
table. To avoid OOM errors on the executors, we need to start with very
small (~10MB) partitions to account for the expansion. Currently this only
seems possible by explicitly repartitioning the data, incurring the perf
cost associated with the shuffle. Is it possible to force spark to read
parquet files into 10MB partitions without explicitly repartitioning?

The documentation regarding Performance Tuning [0] suggests that it should
be possible to control how spark reads files into partitions - we're
assuming this accounts for structured streaming jobs as well. Based on our
understanding of the page, we used the following to configure spark into
reading a stream of 10GB per trigger into 1000 partitions 10 MB each.

spark.sql.files.openCostInBytes 128MB
spark.sql.files.maxPartitionBytes 10MB
spark.sql.files.minPartitionNum 1000

Unfortunately we still see a large number of empty partitions and a small
number containing the rest of the data (see median vs max number of input
records).

[image: image.png]

Any help would be much appreciated

Chris


how to classify column

2022-02-11 Thread frakass

Hello

I have a column whose value (Int type as score) is from 0 to 5.
I want to query that, when the score > 3, classified as "good". else 
classified as "bad".

How do I implement that? A UDF like something as this?

scala> implicit class Foo(i:Int) {
 |   def classAs(f:Int=>String) = f(i)
 | }
class Foo

scala> 4.classAs { x => if (x > 3) "good" else "bad" }
val res13: String = good

scala> 2.classAs { x => if (x > 3) "good" else "bad" }
val res14: String = bad


Thank you.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Unable to force small partitions in streaming job without repartitioning

2022-02-11 Thread Adam Binford
The smallest unit of work you can do on a parquet file (under the delta
hood) is based on the parquet row group size, which by default is 128mb. If
you specify maxPartitionBytes of 10mb, what that will basically do is
create a partition for each 10mb of a file, but whatever partition covers
the part of the file where the row group starts will consume the entire row
group. That's why you're seeing a lot of empty partitions and a small
number with the rest of the actual data.

Can't think of any solution other than repartitioning (or rewriting the
input Delta table with a much smaller row group size which wouldn't be
ideal performance wise).

Adam

On Fri, Feb 11, 2022 at 7:23 AM Chris Coutinho 
wrote:

> Hello,
>
> We have a spark structured streaming job that includes a stream-static
> join and a Pandas UDF, streaming to/from delta tables. The primary key of
> the static table is non-unique, meaning that the streaming join results in
> multiple records per input record - in our case 100x increase. The Pandas
> UDF is then applied to the resulting stream-static join and stored in a
> table. To avoid OOM errors on the executors, we need to start with very
> small (~10MB) partitions to account for the expansion. Currently this only
> seems possible by explicitly repartitioning the data, incurring the perf
> cost associated with the shuffle. Is it possible to force spark to read
> parquet files into 10MB partitions without explicitly repartitioning?
>
> The documentation regarding Performance Tuning [0] suggests that it should
> be possible to control how spark reads files into partitions - we're
> assuming this accounts for structured streaming jobs as well. Based on our
> understanding of the page, we used the following to configure spark into
> reading a stream of 10GB per trigger into 1000 partitions 10 MB each.
>
> spark.sql.files.openCostInBytes 128MB
> spark.sql.files.maxPartitionBytes 10MB
> spark.sql.files.minPartitionNum 1000
>
> Unfortunately we still see a large number of empty partitions and a small
> number containing the rest of the data (see median vs max number of input
> records).
>
> [image: image.png]
>
> Any help would be much appreciated
>
> Chris
>


-- 
Adam Binford


Repartitioning dataframe by file wite size and preserving order

2022-02-11 Thread Danil Suetin
Hello,

I want to be able to write dataframe with set average size of the file using 
orc or parquet. Also, preserving dataframe sorting is important. The task is 
that I have a dataframe that I know nothing about as an argument, and I need to 
write orc or parquet files with constant size with minimal variance.

Most of repartitioning methods work either with number of partitions or max 
number of rows per file. Which both can be calculated from average row size, 
via sampling dataframe, serializing it with compression, and finding it's size. 
Which of course not perfect due to compression of row columnar form. But as 
estimation, might just do it.

As for repartitioning itself, then I have tried:

- df.repartition - Can split into equivalent partition, but due to hash 
partitioning sorting is not preserved
- df.repartitionByRange.sortWithinPartitions - Can preserve sorting if known 
original sorting keys (I might not know them), although some say 
repartitionByRange might not always preserve sorting. But if keys not uniformly 
distributed, then file size will vary a lot.

- df.coalesce - Sorting seems to be preserved, although some say that not 
always the case for every version of spark. Also, partition size may vary a 
lot. And can only decrease number of partitions.

- df.write.option("maxRecordsPerFile", 1) - Not sure about sorting 
preservation. And also seem there still problem with small files, due to no 
minimum on records per file. Merging to one partition and then using 
maxRecordsPerFile won't work since it might not fit in one partition.

What I am trying to solve seems to be a complex bin packing problem, which 
should also parallelize. As a simple way, I thought I might do it is by 
counting rows in each partition and creating new dataframe with tasks kind of 
this way:
Let's say I want 100M sized files with 2M (just for example) as average row 
size. Which means 50 rows per file/partition.
[repartition.png]

But I am not quite sure if it's the best way of doing that. Also, there is 
going to be needed a lot of optimizing in locality of tasks and partitions in 
order to reduce network load.

Are there any built in things or libraries that might help me solve that? Or 
anyone had the same issue?

Thanks in advance
Danil

Re: Unable to force small partitions in streaming job without repartitioning

2022-02-11 Thread Chris Coutinho
Hi Adam,

Thanks for the explanation on the empty partitions.

We have the freedom to adjust how the source table is written, so if there
are any improvements we can implement on the source side we'd be happy to
look into that.

It's not yet clear to me how you can reduce the row group size of the
parquet files, I see some mention of `parquet.block.size` online , as well
as various map reduce settings regarding file splitting (SO:
mapred-min-split-size-in-hdfs
);
however, I don't quite understand the link between the splitting settings,
row group configuration, and resulting number of records when reading from
a delta table.

For more specifics: we're running Spark 3.1.2 using ADLS as cloud storage.

Best,
Chris

On Fri, Feb 11, 2022 at 1:40 PM Adam Binford  wrote:

> The smallest unit of work you can do on a parquet file (under the delta
> hood) is based on the parquet row group size, which by default is 128mb. If
> you specify maxPartitionBytes of 10mb, what that will basically do is
> create a partition for each 10mb of a file, but whatever partition covers
> the part of the file where the row group starts will consume the entire row
> group. That's why you're seeing a lot of empty partitions and a small
> number with the rest of the actual data.
>
> Can't think of any solution other than repartitioning (or rewriting the
> input Delta table with a much smaller row group size which wouldn't be
> ideal performance wise).
>
> Adam
>
> On Fri, Feb 11, 2022 at 7:23 AM Chris Coutinho 
> wrote:
>
>> Hello,
>>
>> We have a spark structured streaming job that includes a stream-static
>> join and a Pandas UDF, streaming to/from delta tables. The primary key of
>> the static table is non-unique, meaning that the streaming join results in
>> multiple records per input record - in our case 100x increase. The Pandas
>> UDF is then applied to the resulting stream-static join and stored in a
>> table. To avoid OOM errors on the executors, we need to start with very
>> small (~10MB) partitions to account for the expansion. Currently this only
>> seems possible by explicitly repartitioning the data, incurring the perf
>> cost associated with the shuffle. Is it possible to force spark to read
>> parquet files into 10MB partitions without explicitly repartitioning?
>>
>> The documentation regarding Performance Tuning [0] suggests that it
>> should be possible to control how spark reads files into partitions - we're
>> assuming this accounts for structured streaming jobs as well. Based on our
>> understanding of the page, we used the following to configure spark into
>> reading a stream of 10GB per trigger into 1000 partitions 10 MB each.
>>
>> spark.sql.files.openCostInBytes 128MB
>> spark.sql.files.maxPartitionBytes 10MB
>> spark.sql.files.minPartitionNum 1000
>>
>> Unfortunately we still see a large number of empty partitions and a small
>> number containing the rest of the data (see median vs max number of input
>> records).
>>
>> [image: image.png]
>>
>> Any help would be much appreciated
>>
>> Chris
>>
>
>
> --
> Adam Binford
>


Re: data size exceeds the total ram

2022-02-11 Thread Gourav Sengupta
Hi,

I am in a meeting, but you can look out for a setting that tells spark how
many bytes to read from a file at one go.

I use SQL, which  is far better in case you are using dataframes.

As we do not still know what is the SPARK version that you are using, it
may cause issues around skew, and there are different ways to manage that
depending on the SPARK version.



Thanks and Regards,
Gourav Sengupta

On Fri, Feb 11, 2022 at 11:09 AM frakass  wrote:

> Hello list
>
> I have imported the data into spark and I found there is disk IO in
> every node. The memory didn't get overflow.
>
> But such query is quite slow:
>
>  >>> df.groupBy("rvid").agg({'rate':'avg','rvid':'count'}).show()
>
>
> May I ask:
> 1. since I have 3 nodes (as known as 3 executors?), are there 3
> partitions for each job?
> 2. can I expand the partition by hand to increase the performance?
>
> Thanks
>
>
>
> On 2022/2/11 6:22, frakass wrote:
> >
> >
> > On 2022/2/11 6:16, Gourav Sengupta wrote:
> >> What is the source data (is it JSON, CSV, Parquet, etc)? Where are you
> >> reading it from (JDBC, file, etc)? What is the compression format (GZ,
> >> BZIP, etc)? What is the SPARK version that you are using?
> >
> > it's a well built csv file (no compressed) stored in HDFS.
> > spark 3.2.0
> >
> > Thanks.
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Unable to force small partitions in streaming job without repartitioning

2022-02-11 Thread Sean Owen
It should just be parquet.block.size indeed.
spark.write.option("parquet.block.size", "16m").parquet(...)
This is an issue in how you write, not read, the parquet.

On Fri, Feb 11, 2022 at 8:26 AM Chris Coutinho 
wrote:

> Hi Adam,
>
> Thanks for the explanation on the empty partitions.
>
> We have the freedom to adjust how the source table is written, so if there
> are any improvements we can implement on the source side we'd be happy to
> look into that.
>
> It's not yet clear to me how you can reduce the row group size of the
> parquet files, I see some mention of `parquet.block.size` online , as well
> as various map reduce settings regarding file splitting (SO:
> mapred-min-split-size-in-hdfs
> );
> however, I don't quite understand the link between the splitting settings,
> row group configuration, and resulting number of records when reading from
> a delta table.
>
> For more specifics: we're running Spark 3.1.2 using ADLS as cloud storage.
>
> Best,
> Chris
>
> On Fri, Feb 11, 2022 at 1:40 PM Adam Binford  wrote:
>
>> The smallest unit of work you can do on a parquet file (under the delta
>> hood) is based on the parquet row group size, which by default is 128mb. If
>> you specify maxPartitionBytes of 10mb, what that will basically do is
>> create a partition for each 10mb of a file, but whatever partition covers
>> the part of the file where the row group starts will consume the entire row
>> group. That's why you're seeing a lot of empty partitions and a small
>> number with the rest of the actual data.
>>
>> Can't think of any solution other than repartitioning (or rewriting the
>> input Delta table with a much smaller row group size which wouldn't be
>> ideal performance wise).
>>
>> Adam
>>
>> On Fri, Feb 11, 2022 at 7:23 AM Chris Coutinho 
>> wrote:
>>
>>> Hello,
>>>
>>> We have a spark structured streaming job that includes a stream-static
>>> join and a Pandas UDF, streaming to/from delta tables. The primary key of
>>> the static table is non-unique, meaning that the streaming join results in
>>> multiple records per input record - in our case 100x increase. The Pandas
>>> UDF is then applied to the resulting stream-static join and stored in a
>>> table. To avoid OOM errors on the executors, we need to start with very
>>> small (~10MB) partitions to account for the expansion. Currently this only
>>> seems possible by explicitly repartitioning the data, incurring the perf
>>> cost associated with the shuffle. Is it possible to force spark to read
>>> parquet files into 10MB partitions without explicitly repartitioning?
>>>
>>> The documentation regarding Performance Tuning [0] suggests that it
>>> should be possible to control how spark reads files into partitions - we're
>>> assuming this accounts for structured streaming jobs as well. Based on our
>>> understanding of the page, we used the following to configure spark into
>>> reading a stream of 10GB per trigger into 1000 partitions 10 MB each.
>>>
>>> spark.sql.files.openCostInBytes 128MB
>>> spark.sql.files.maxPartitionBytes 10MB
>>> spark.sql.files.minPartitionNum 1000
>>>
>>> Unfortunately we still see a large number of empty partitions and a
>>> small number containing the rest of the data (see median vs max number of
>>> input records).
>>>
>>> [image: image.png]
>>>
>>> Any help would be much appreciated
>>>
>>> Chris
>>>
>>
>>
>> --
>> Adam Binford
>>
>


Re: Unable to force small partitions in streaming job without repartitioning

2022-02-11 Thread Chris Coutinho
I tried re-writing the table with the updated block size but it doesn't
appear to have an effect on the row group size.

```pyspark
df = spark.read.format("delta").load("/path/to/source1")

df.write \
.format("delta") \
.mode("overwrite") \
.options(**{
  "parquet.block.size": "1m",
}) \
.partitionBy("date") \
.save("/path/to/source2")
```

The files created by this job are about 20m in size. Using `parquet-tools`
I can inspect a single file and see the following 12m file contains a
single row group - not the expected 12 based on the block size:

$ parquet-tools inspect /path/to/source2/date=.../part-.parquet
 file meta data 
created_by: parquet-mr version 1.10.1-databricks9 (build
cf6c823f85c3b69d49e1573e48e236148c709e82)
num_columns: 19
num_rows: 369483
num_row_groups: 1
format_version: 1.0
serialized_size: 6364

 Columns 
...

Chris

On Fri, Feb 11, 2022 at 3:37 PM Sean Owen  wrote:

> It should just be parquet.block.size indeed.
> spark.write.option("parquet.block.size", "16m").parquet(...)
> This is an issue in how you write, not read, the parquet.
>
> On Fri, Feb 11, 2022 at 8:26 AM Chris Coutinho 
> wrote:
>
>> Hi Adam,
>>
>> Thanks for the explanation on the empty partitions.
>>
>> We have the freedom to adjust how the source table is written, so if
>> there are any improvements we can implement on the source side we'd be
>> happy to look into that.
>>
>> It's not yet clear to me how you can reduce the row group size of the
>> parquet files, I see some mention of `parquet.block.size` online , as well
>> as various map reduce settings regarding file splitting (SO:
>> mapred-min-split-size-in-hdfs
>> );
>> however, I don't quite understand the link between the splitting settings,
>> row group configuration, and resulting number of records when reading from
>> a delta table.
>>
>> For more specifics: we're running Spark 3.1.2 using ADLS as cloud storage.
>>
>> Best,
>> Chris
>>
>> On Fri, Feb 11, 2022 at 1:40 PM Adam Binford  wrote:
>>
>>> The smallest unit of work you can do on a parquet file (under the delta
>>> hood) is based on the parquet row group size, which by default is 128mb. If
>>> you specify maxPartitionBytes of 10mb, what that will basically do is
>>> create a partition for each 10mb of a file, but whatever partition covers
>>> the part of the file where the row group starts will consume the entire row
>>> group. That's why you're seeing a lot of empty partitions and a small
>>> number with the rest of the actual data.
>>>
>>> Can't think of any solution other than repartitioning (or rewriting the
>>> input Delta table with a much smaller row group size which wouldn't be
>>> ideal performance wise).
>>>
>>> Adam
>>>
>>> On Fri, Feb 11, 2022 at 7:23 AM Chris Coutinho 
>>> wrote:
>>>
 Hello,

 We have a spark structured streaming job that includes a stream-static
 join and a Pandas UDF, streaming to/from delta tables. The primary key of
 the static table is non-unique, meaning that the streaming join results in
 multiple records per input record - in our case 100x increase. The Pandas
 UDF is then applied to the resulting stream-static join and stored in a
 table. To avoid OOM errors on the executors, we need to start with very
 small (~10MB) partitions to account for the expansion. Currently this only
 seems possible by explicitly repartitioning the data, incurring the perf
 cost associated with the shuffle. Is it possible to force spark to read
 parquet files into 10MB partitions without explicitly repartitioning?

 The documentation regarding Performance Tuning [0] suggests that it
 should be possible to control how spark reads files into partitions - we're
 assuming this accounts for structured streaming jobs as well. Based on our
 understanding of the page, we used the following to configure spark into
 reading a stream of 10GB per trigger into 1000 partitions 10 MB each.

 spark.sql.files.openCostInBytes 128MB
 spark.sql.files.maxPartitionBytes 10MB
 spark.sql.files.minPartitionNum 1000

 Unfortunately we still see a large number of empty partitions and a
 small number containing the rest of the data (see median vs max number of
 input records).

 [image: image.png]

 Any help would be much appreciated

 Chris

>>>
>>>
>>> --
>>> Adam Binford
>>>
>>


Re: Spark 3.1.2 full thread dumps

2022-02-11 Thread Lalwani, Jayesh
You can probably tune writing to elastic search by

  1.  Increasing number of partitions so you are writing smaller batches of 
rows to elastic search
  2.  Using Elastic search’s bulk api
  3.  Scaling up the number of hot nodes on elastic search cluster to support 
writing in parallel.

You want to minimize long running tasks. Not just to avoid the “thread dump”. 
Large number of shorter running tasks are better than Small number of long 
running tasks, because you can scale up your processing by throwing hardware at 
it. This is subject to law of diminishing returns; ie; at some point making 
your tasks smaller will start slowing you down. You need to find the sweet spot.

Generally for elastic search, the sweet spot is that each task writes around 
10MB of data using the bulk API. Writing 10MB of data per task should be take 
order of few seconds. You won’t get the dreaded thread dump if your tasks are 
taking few seconds

From: Maksim Grinman 
Date: Thursday, February 10, 2022 at 7:21 PM
To: "Lalwani, Jayesh" 
Cc: Mich Talebzadeh , Holden Karau 
, Sean Owen , "user @spark" 

Subject: RE: [EXTERNAL] Spark 3.1.2 full thread dumps


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.


That's fair, but I do get the same thread dump at the last step of the spark 
job, where we write the final dataframe into an elasticsearch index. It's a 
df.rdd.map(lambda r: r.asDict(True)).foreachPartition operation which takes a 
while and we usually get a thread dump during that as well.

On Mon, Feb 7, 2022 at 11:24 AM Lalwani, Jayesh 
mailto:jlalw...@amazon.com>> wrote:
Probably not the answer you are looking for, but the best thing to do is to 
avoid making Spark code sleep. Is there a way you can predict how big your 
autoscaling group needs to be without looking at all the data? Are you using 
fixed number of Spark executors or are you have some way of scaling your 
executors? I am guessing that the size of your autoscaling group is 
proportional to the number of Spark executors. You can probably measure how 
many executors each can support. Then you can tie in the size of your 
autoscaling group to the number of executors.

Alternatively, you can build your service so a) it autoscales as load increases 
b) throttle requests when the load is higher than it can manage now. This means 
that when Spark executors start hitting your nodes, your service will throttle 
many of the requests, and start autoscaling up. Note that this is an 
established pattern in the cloud. This is how most services on AWS work. The 
end result is that initially there will be higher latency due to cold start, 
but the system will catch up eventually

From: Maksim Grinman mailto:m...@resolute.ai>>
Date: Friday, February 4, 2022 at 9:35 PM
To: Mich Talebzadeh 
mailto:mich.talebza...@gmail.com>>
Cc: Holden Karau mailto:hol...@pigscanfly.ca>>, Sean Owen 
mailto:sro...@gmail.com>>, "user @spark" 
mailto:user@spark.apache.org>>
Subject: RE: [EXTERNAL] Spark 3.1.2 full thread dumps


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.


Not that this discussion is not interesting (it is), but this has strayed 
pretty far from my original question. Which was: How do I prevent spark from 
dumping huge Java Full Thread dumps when an executor appears to not be doing 
anything (in my case, there's a loop where it sleeps waiting for a service to 
come up). The service happens to be set up using an auto-scaling group, a 
coincidental and unimportant detail that seems to have derailed the 
conversation.

On Fri, Feb 4, 2022 at 7:18 PM Mich Talebzadeh 
mailto:mich.talebza...@gmail.com>> wrote:
OK basically, do we have a scenario where Spark or for that matter any cluster 
manager can deploy a new node (after the loss of  an existing node) with the 
view of running the failed tasks on the new executor(s) deployed on that newly 
spun node?




 Error! Filename not specified.  view my Linkedin 
profile



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Sat, 5 Feb 2022 at 00:00, Holden Karau 
mailto:hol...@pigscanfly.ca>> wrote:
We don’t block scaling up after node failure in classic Spark if that’s the 
question.

On Fri, Feb 4, 2022 at 6:30 PM Mich Talebzadeh 
mailto:mich.talebza...@gmail.com>> wrote:
From what I can see in auto scaling setup, you will always need a min of two 
worker nodes as primary. It also states and I quote "Scaling primary workers is 
not recommended due to HD

Re: Unable to force small partitions in streaming job without repartitioning

2022-02-11 Thread Adam Binford
Writing to Delta might not support the write.option method. We set
spark.hadoop.parquet.block.size in our spark config for writing to Delta.

Adam

On Fri, Feb 11, 2022, 10:15 AM Chris Coutinho 
wrote:

> I tried re-writing the table with the updated block size but it doesn't
> appear to have an effect on the row group size.
>
> ```pyspark
> df = spark.read.format("delta").load("/path/to/source1")
>
> df.write \
> .format("delta") \
> .mode("overwrite") \
> .options(**{
>   "parquet.block.size": "1m",
> }) \
> .partitionBy("date") \
> .save("/path/to/source2")
> ```
>
> The files created by this job are about 20m in size. Using `parquet-tools`
> I can inspect a single file and see the following 12m file contains a
> single row group - not the expected 12 based on the block size:
>
> $ parquet-tools inspect /path/to/source2/date=.../part-.parquet
>  file meta data 
> created_by: parquet-mr version 1.10.1-databricks9 (build
> cf6c823f85c3b69d49e1573e48e236148c709e82)
> num_columns: 19
> num_rows: 369483
> num_row_groups: 1
> format_version: 1.0
> serialized_size: 6364
>
>  Columns 
> ...
>
> Chris
>
> On Fri, Feb 11, 2022 at 3:37 PM Sean Owen  wrote:
>
>> It should just be parquet.block.size indeed.
>> spark.write.option("parquet.block.size", "16m").parquet(...)
>> This is an issue in how you write, not read, the parquet.
>>
>> On Fri, Feb 11, 2022 at 8:26 AM Chris Coutinho 
>> wrote:
>>
>>> Hi Adam,
>>>
>>> Thanks for the explanation on the empty partitions.
>>>
>>> We have the freedom to adjust how the source table is written, so if
>>> there are any improvements we can implement on the source side we'd be
>>> happy to look into that.
>>>
>>> It's not yet clear to me how you can reduce the row group size of the
>>> parquet files, I see some mention of `parquet.block.size` online , as well
>>> as various map reduce settings regarding file splitting (SO:
>>> mapred-min-split-size-in-hdfs
>>> );
>>> however, I don't quite understand the link between the splitting settings,
>>> row group configuration, and resulting number of records when reading from
>>> a delta table.
>>>
>>> For more specifics: we're running Spark 3.1.2 using ADLS as cloud
>>> storage.
>>>
>>> Best,
>>> Chris
>>>
>>> On Fri, Feb 11, 2022 at 1:40 PM Adam Binford  wrote:
>>>
 The smallest unit of work you can do on a parquet file (under the delta
 hood) is based on the parquet row group size, which by default is 128mb. If
 you specify maxPartitionBytes of 10mb, what that will basically do is
 create a partition for each 10mb of a file, but whatever partition covers
 the part of the file where the row group starts will consume the entire row
 group. That's why you're seeing a lot of empty partitions and a small
 number with the rest of the actual data.

 Can't think of any solution other than repartitioning (or rewriting the
 input Delta table with a much smaller row group size which wouldn't be
 ideal performance wise).

 Adam

 On Fri, Feb 11, 2022 at 7:23 AM Chris Coutinho <
 chrisbcouti...@gmail.com> wrote:

> Hello,
>
> We have a spark structured streaming job that includes a stream-static
> join and a Pandas UDF, streaming to/from delta tables. The primary key of
> the static table is non-unique, meaning that the streaming join results in
> multiple records per input record - in our case 100x increase. The Pandas
> UDF is then applied to the resulting stream-static join and stored in a
> table. To avoid OOM errors on the executors, we need to start with very
> small (~10MB) partitions to account for the expansion. Currently this only
> seems possible by explicitly repartitioning the data, incurring the perf
> cost associated with the shuffle. Is it possible to force spark to read
> parquet files into 10MB partitions without explicitly repartitioning?
>
> The documentation regarding Performance Tuning [0] suggests that it
> should be possible to control how spark reads files into partitions - 
> we're
> assuming this accounts for structured streaming jobs as well. Based on our
> understanding of the page, we used the following to configure spark into
> reading a stream of 10GB per trigger into 1000 partitions 10 MB each.
>
> spark.sql.files.openCostInBytes 128MB
> spark.sql.files.maxPartitionBytes 10MB
> spark.sql.files.minPartitionNum 1000
>
> Unfortunately we still see a large number of empty partitions and a
> small number containing the rest of the data (see median vs max number of
> input records).
>
> [image: image.png]
>
> Any help would be much appreciated
>
> Chris
>


 --
 Adam Binford

>>>


determine week of month from date in spark3

2022-02-11 Thread Appel, Kevin
Previously in Spark2 we could use the spark function date_format with the "W" 
flag and it will provide back the week of month of that date.  In Spark3 when 
trying this there is an error back:


* org.apache.spark.SparkUpgradeException: You may get a different 
result due to the upgrading of Spark 3.0: Fail to recognize 'W' pattern in the 
DateTimeFormatter. 1) You can set spark.sql.legacy.timeParserPolicy to LEGACY 
to restore the behavior before Spark 3.0. 2) You can form a valid datetime 
pattern with the guide from 
https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html


* Caused by: java.lang.IllegalArgumentException: All week-based 
patterns are unsupported since Spark 3.0, detected: W, Please use the SQL 
function EXTRACT instead

If I use the first solution and set the policy to LEGACY, currently it is 
EXCEPTION, then the code runs through

spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")

df1 = spark.createDataFrame(
[
(1, date(2014, 3, 7)),
(2, date(2014, 3, 8)),
(3, date(2014, 3, 30)),
(4, date(2014, 3, 31)),
(5, date(2015, 3, 7)),
(6, date(2015, 3, 8)),
(7, date(2015, 3, 30)),
(8, date(2015, 3, 31)),
],
schema="a long, b date",
)
df1 = df1.withColumn("WEEKOFMONTH", F.date_format(F.col("b"), "W"))
df1.show()

+---+--+---+
|  a| b|WEEKOFMONTH|
+---+--+---+
|  1|2014-03-07|  2|
|  2|2014-03-08|  2|
|  3|2014-03-30|  6|
|  4|2014-03-31|  6|
|  5|2015-03-07|  1|
|  6|2015-03-08|  2|
|  7|2015-03-30|  5|
|  8|2015-03-31|  5|
+---+--+---+

Trying to explore the latter options, in both the EXTRACT and the datetime 
patterns that are listed, I don't see that there is the "W" option or 
equivalent way to produce this.
The datetime link mentions: In Spark 3.0, we define our own pattern strings in 
Datetime Patterns for Formatting and 
Parsing, 
which is implemented via 
DateTimeFormatter
 under the hood.
If I follow the link into the DateTimeFormatter then I see the W existing there:
   W   week-of-month   number4

My first question is, with the Spark3 and not setting the policy to LEGACY, is 
there no longer a way to compute this using the spark built in functions?

The second question is, if we are setting the policy to LEGACY, is there any 
caveats or items to be aware of that might get us later? For example in a 
future Spark 3.3.X is this option going to be deprecated

This was an item that we ran into from Spark2 to Spark3 conversion and trying 
to see how to best handle this

Thanks for your feedback,

Kevin





--
This message, and any attachments, is for the intended recipient(s) only, may 
contain information that is privileged, confidential and/or proprietary and 
subject to important terms and conditions available at 
http://www.bankofamerica.com/emaildisclaimer.   If you are not the intended 
recipient, please delete this message.


Re: Using Avro file format with SparkSQL

2022-02-11 Thread Gourav Sengupta
Hi Anna,

Avro libraries should be inbuilt in SPARK in case I am not wrong. Any
particular reason why you are using a deprecated or soon to be deprecated
version of SPARK?

SPARK 3.2.1 is fantastic.

Please do let us know about your set up if possible.


Regards,
Gourav Sengupta

On Thu, Feb 10, 2022 at 3:35 AM Karanika, Anna  wrote:

> Hello,
>
> I have been trying to use spark SQL’s operations that are related to the
> Avro file format,
> e.g., stored as, save, load, in a Java class but they keep failing with
> the following stack trace:
>
> Exception in thread "main" org.apache.spark.sql.AnalysisException:  Failed
> to find data source: avro. Avro is built-in but external data source module
> since Spark 2.4. Please deploy the application as per the deployment
> section of "Apache Avro Data Source Guide".
> at
> org.apache.spark.sql.errors.QueryCompilationErrors$.failedToFindAvroDataSourceError(QueryCompilationErrors.scala:1032)
> at
> org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:666)
> at
> org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:720)
> at
> org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:852)
> at
> org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:256)
> at
> org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
> at xsys.fileformats.SparkSQLvsAvro.main(SparkSQLvsAvro.java:57)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:64)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:564)
> at
> org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
> at org.apache.spark.deploy.SparkSubmit.org
> $apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:955)
> at
> org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
> at
> org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
> at
> org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
> at
> org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043)
> at
> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
> For context, I am invoking spark-submit and adding arguments --packages
> org.apache.spark:spark-avro_2.12:3.2.0.
> Yet, Spark responds as if the dependency was not added.
> I am running spark-v3.2.0 (Scala 2.12).
>
> On the other hand, everything works great with spark-shell or spark-sql.
>
> I would appreciate any advice or feedback to get this running.
>
> Thank you,
> Anna
>
>


Re: determine week of month from date in spark3

2022-02-11 Thread Sean Owen
Here is some back-story: https://issues.apache.org/jira/browse/SPARK-32683
I think the answer may be: use "F"?

On Fri, Feb 11, 2022 at 12:43 PM Appel, Kevin 
wrote:

> Previously in Spark2 we could use the spark function date_format with the
> “W” flag and it will provide back the week of month of that date.  In
> Spark3 when trying this there is an error back:
>
>
>
> · org.apache.spark.SparkUpgradeException: You may get a different
> result due to the upgrading of Spark 3.0: Fail to recognize 'W' pattern in
> the DateTimeFormatter. 1) You can set spark.sql.legacy.timeParserPolicy to
> LEGACY to restore the behavior before Spark 3.0. 2) You can form a valid
> datetime pattern with the guide from
> https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html
>
>
>
> · Caused by: java.lang.IllegalArgumentException: All week-based
> patterns are unsupported since Spark 3.0, detected: W, Please use the SQL
> function EXTRACT instead
>
>
>
> If I use the first solution and set the policy to LEGACY, currently it is
> EXCEPTION, then the code runs through
>
>
>
> spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")
>
>
>
> df1 = spark.createDataFrame(
>
> [
>
> (1, date(2014, 3, 7)),
>
> (2, date(2014, 3, 8)),
>
> (3, date(2014, 3, 30)),
>
> (4, date(2014, 3, 31)),
>
> (5, date(2015, 3, 7)),
>
> (6, date(2015, 3, 8)),
>
> (7, date(2015, 3, 30)),
>
> (8, date(2015, 3, 31)),
>
> ],
>
> schema="a long, b date",
>
> )
>
> df1 = df1.withColumn("WEEKOFMONTH", F.date_format(F.col("b"), "W"))
>
> df1.show()
>
>
>
> +---+--+---+
>
>
> |  a| b|WEEKOFMONTH|
>
> +---+--+---+
>
> |  1|2014-03-07|  2|
>
> |  2|2014-03-08|  2|
>
> |  3|2014-03-30|  6|
>
> |  4|2014-03-31|  6|
>
> |  5|2015-03-07|  1|
>
> |  6|2015-03-08|  2|
>
> |  7|2015-03-30|  5|
>
> |  8|2015-03-31|  5|
>
> +---+--+---+
>
>
>
> Trying to explore the latter options, in both the EXTRACT and the datetime
> patterns that are listed, I don’t see that there is the “W” option or
> equivalent way to produce this.
>
> The datetime link mentions: In Spark 3.0, we define our own pattern
> strings in Datetime Patterns for Formatting and Parsing
> ,
> which is implemented via DateTimeFormatter
> 
>  under
> the hood.
>
> If I follow the link into the DateTimeFormatter then I see the W existing
> there:
>
>W   week-of-month   number4
>
>
>
> My first question is, with the Spark3 and not setting the policy to
> LEGACY, is there no longer a way to compute this using the spark built in
> functions?
>
>
>
> The second question is, if we are setting the policy to LEGACY, is there
> any caveats or items to be aware of that might get us later? For example in
> a future Spark 3.3.X is this option going to be deprecated
>
>
>
> This was an item that we ran into from Spark2 to Spark3 conversion and
> trying to see how to best handle this
>
>
>
> Thanks for your feedback,
>
>
>
> Kevin
>
>
>
>
>
>
>
>
> --
> This message, and any attachments, is for the intended recipient(s) only,
> may contain information that is privileged, confidential and/or proprietary
> and subject to important terms and conditions available at
> http://www.bankofamerica.com/emaildisclaimer. If you are not the intended
> recipient, please delete this message.
>


Re: Spark 3.1.2 full thread dumps

2022-02-11 Thread Maksim Grinman
Thanks for these suggestions. Regarding hot nodes, are you referring to the
same as in this article?
https://www.elastic.co/blog/hot-warm-architecture-in-elasticsearch-5-x.
I am also curious where the 10MB heuristic came from, though I have heard a
similar heuristic with respect to the size of a partition. I suspect the
best way to see the size of a partition is simply to write to parquet and
observe the size of the written parquet partitions?

Thanks

On Fri, Feb 11, 2022 at 12:48 PM Lalwani, Jayesh 
wrote:

> You can probably tune writing to elastic search by
>
>1. Increasing number of partitions so you are writing smaller batches
>of rows to elastic search
>2. Using Elastic search’s bulk api
>3. Scaling up the number of hot nodes on elastic search cluster to
>support writing in parallel.
>
>
>
> You want to minimize long running tasks. Not just to avoid the “thread
> dump”. Large number of shorter running tasks are better than Small number
> of long running tasks, because you can scale up your processing by throwing
> hardware at it. This is subject to law of diminishing returns; ie; at some
> point making your tasks smaller will start slowing you down. You need to
> find the sweet spot.
>
>
>
> Generally for elastic search, the sweet spot is that each task writes
> around 10MB of data using the bulk API. Writing 10MB of data per task
> should be take order of few seconds. You won’t get the dreaded thread dump
> if your tasks are taking few seconds
>
>
>
> *From: *Maksim Grinman 
> *Date: *Thursday, February 10, 2022 at 7:21 PM
> *To: *"Lalwani, Jayesh" 
> *Cc: *Mich Talebzadeh , Holden Karau <
> hol...@pigscanfly.ca>, Sean Owen , "user @spark" <
> user@spark.apache.org>
> *Subject: *RE: [EXTERNAL] Spark 3.1.2 full thread dumps
>
>
>
> *CAUTION*: This email originated from outside of the organization. Do not
> click links or open attachments unless you can confirm the sender and know
> the content is safe.
>
>
>
> That's fair, but I do get the same thread dump at the last step of the
> spark job, where we write the final dataframe into an elasticsearch index.
> It's a df.rdd.map(lambda r: r.asDict(True)).foreachPartition operation
> which takes a while and we usually get a thread dump during that as well.
>
>
>
> On Mon, Feb 7, 2022 at 11:24 AM Lalwani, Jayesh 
> wrote:
>
> Probably not the answer you are looking for, but the best thing to do is
> to avoid making Spark code sleep. Is there a way you can predict how big
> your autoscaling group needs to be without looking at all the data? Are you
> using fixed number of Spark executors or are you have some way of scaling
> your executors? I am guessing that the size of your autoscaling group is
> proportional to the number of Spark executors. You can probably measure how
> many executors each can support. Then you can tie in the size of your
> autoscaling group to the number of executors.
>
>
>
> Alternatively, you can build your service so a) it autoscales as load
> increases b) throttle requests when the load is higher than it can manage
> now. This means that when Spark executors start hitting your nodes, your
> service will throttle many of the requests, and start autoscaling up. Note
> that this is an established pattern in the cloud. This is how most services
> on AWS work. The end result is that initially there will be higher latency
> due to cold start, but the system will catch up eventually
>
>
>
> *From: *Maksim Grinman 
> *Date: *Friday, February 4, 2022 at 9:35 PM
> *To: *Mich Talebzadeh 
> *Cc: *Holden Karau , Sean Owen ,
> "user @spark" 
> *Subject: *RE: [EXTERNAL] Spark 3.1.2 full thread dumps
>
>
>
> *CAUTION*: This email originated from outside of the organization. Do not
> click links or open attachments unless you can confirm the sender and know
> the content is safe.
>
>
>
> Not that this discussion is not interesting (it is), but this has strayed
> pretty far from my original question. Which was: How do I prevent spark
> from dumping huge Java Full Thread dumps when an executor appears to not be
> doing anything (in my case, there's a loop where it sleeps waiting for a
> service to come up). The service happens to be set up using an auto-scaling
> group, a coincidental and unimportant detail that seems to have derailed
> the conversation.
>
>
>
> On Fri, Feb 4, 2022 at 7:18 PM Mich Talebzadeh 
> wrote:
>
> OK basically, do we have a scenario where Spark or for that matter any
> cluster manager can deploy a new node (after the loss of  an existing node)
> with the view of running the failed tasks on the new executor(s) deployed
> on that newly spun node?
>
>
>
>
>
>  *Error! Filename not specified.*  view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.

Re: Unable to force small partitions in streaming job without repartitioning

2022-02-11 Thread Gourav Sengupta
Hi,

just trying to understand the problem before solving it.

1. you mentioned "The primary key of the static table is non-unique". This
appears to be a design flaw to me.

2. you once again mentioned "The Pandas UDF is then applied to the
resulting stream-static join and stored in a table. To avoid OOM errors on
the executors, we need to start with very small (~10MB) partitions to
account for the expansion. Currently this only seems possible by explicitly
repartitioning the data, incurring the perf cost associated with the
shuffle." Should the shuffle not be happening as it is because you are
joining the records?

Another question:
I am not sure of your notation of "keys", when you are joining the table
are you using single column or multiple columns, are you expecting a
cartesian product to happen during the join, or the number of records
exploding will be at max the number of duplicates in the static table?

Obviously I do not clearly understand the problem, therefore all the
suggestions can be wrong, but without over engineering  have you simply
tried to store the data by sorting it on the PK (the one that is non unique
and in the static table) while running VACCUM?

Ofcourse the above solution assumes that volume of data for a particular
key in the static table fits into an executor memory along with the
subsequent operations

Another thing that you might want to enable is Adaptive Query Execution,
and whether it is enabled properly by reading its settings.


Regards,
Gourav Sengupta

On Fri, Feb 11, 2022 at 6:00 PM Adam Binford  wrote:

> Writing to Delta might not support the write.option method. We set
> spark.hadoop.parquet.block.size in our spark config for writing to Delta.
>
> Adam
>
> On Fri, Feb 11, 2022, 10:15 AM Chris Coutinho 
> wrote:
>
>> I tried re-writing the table with the updated block size but it doesn't
>> appear to have an effect on the row group size.
>>
>> ```pyspark
>> df = spark.read.format("delta").load("/path/to/source1")
>>
>> df.write \
>> .format("delta") \
>> .mode("overwrite") \
>> .options(**{
>>   "parquet.block.size": "1m",
>> }) \
>> .partitionBy("date") \
>> .save("/path/to/source2")
>> ```
>>
>> The files created by this job are about 20m in size. Using
>> `parquet-tools` I can inspect a single file and see the following 12m file
>> contains a single row group - not the expected 12 based on the block size:
>>
>> $ parquet-tools inspect /path/to/source2/date=.../part-.parquet
>>  file meta data 
>> created_by: parquet-mr version 1.10.1-databricks9 (build
>> cf6c823f85c3b69d49e1573e48e236148c709e82)
>> num_columns: 19
>> num_rows: 369483
>> num_row_groups: 1
>> format_version: 1.0
>> serialized_size: 6364
>>
>>  Columns 
>> ...
>>
>> Chris
>>
>> On Fri, Feb 11, 2022 at 3:37 PM Sean Owen  wrote:
>>
>>> It should just be parquet.block.size indeed.
>>> spark.write.option("parquet.block.size", "16m").parquet(...)
>>> This is an issue in how you write, not read, the parquet.
>>>
>>> On Fri, Feb 11, 2022 at 8:26 AM Chris Coutinho 
>>> wrote:
>>>
 Hi Adam,

 Thanks for the explanation on the empty partitions.

 We have the freedom to adjust how the source table is written, so if
 there are any improvements we can implement on the source side we'd be
 happy to look into that.

 It's not yet clear to me how you can reduce the row group size of the
 parquet files, I see some mention of `parquet.block.size` online , as well
 as various map reduce settings regarding file splitting (SO:
 mapred-min-split-size-in-hdfs
 );
 however, I don't quite understand the link between the splitting settings,
 row group configuration, and resulting number of records when reading from
 a delta table.

 For more specifics: we're running Spark 3.1.2 using ADLS as cloud
 storage.

 Best,
 Chris

 On Fri, Feb 11, 2022 at 1:40 PM Adam Binford  wrote:

> The smallest unit of work you can do on a parquet file (under the
> delta hood) is based on the parquet row group size, which by default is
> 128mb. If you specify maxPartitionBytes of 10mb, what that will basically
> do is create a partition for each 10mb of a file, but whatever partition
> covers the part of the file where the row group starts will consume the
> entire row group. That's why you're seeing a lot of empty partitions and a
> small number with the rest of the actual data.
>
> Can't think of any solution other than repartitioning (or rewriting
> the input Delta table with a much smaller row group size which wouldn't be
> ideal performance wise).
>
> Adam
>
> On Fri, Feb 11, 2022 at 7:23 AM Chris Coutinho <
> chrisbcouti...@gmail.com> wrote:
>
>> Hello,
>>
>> We have a spark structured streaming job that incl

Deploying Spark on Google Kubernetes (GKE) autopilot, preliminary findings

2022-02-11 Thread Mich Talebzadeh
The equivalent of Google GKE autopilot

in
AWS is AWS Fargate 


I have not used the AWS Fargate so I can only mension Google's GKE
Autopilot.


This is developed from the concept of containerization and microservices.
In the standard mode of creating a GKE cluster users can customize their
configurations based on the requirements, GKE manages the control plane and
users manually provision and manage their node infrastructure. So you
choose your hardware type and memory/CPU where your spark containers will
be running and they will be shown as VM hosts in your account. In GKE
Autopilot mode, GKE manages the nodes, pre-configures the cluster with
adds-on for auto-scaling, auto-upgrades, maintenance, Day 2 operations and
security hardening. So there is a lot there. You don't choose your nodes
and their sizes. You are effectively paying for the pods you use.


Within spark-submit, you still need to specify the number of executors,
driver and executor memory plus cores for each driver and executor when
doing spark-submit. The theory is that the k8s cluster will deploy suitable
nodes and will create enough pods on those nodes. With the standard k8s
cluster you choose your nodes and you ensure that one core on each node is
reserved for the OS itself. Otherwise if you allocate all cores to spark
with --conf spark.executor.cores, you will receive this error


kubctl describe pods -n spark

...

Events:

  Type Reason Age From
Message

   -- 
---

  Warning  FailedScheduling   9s (x17 over 15m)   default-scheduler   0/3
nodes are available: 3 Insufficient cpu.

So with the standard k8s you have a choice of selecting your core sizes.
With autopilot this node selection is left to autopilot to deploy suitable
nodes and this will be a trial and error at the start (to get the
configuration right). You may be lucky if the history of executions are
kept current and the same job can be repeated. However, in my experience,
to procedure the driver pod in "running state" is expensive timewise and
without an executor in running state, there is no chance of spark job doing
anything


NAME READY   STATUSRESTARTS
 AGE

randomdatabigquery-cebab77eea6de971-exec-1   0/1 Pending   0
31s

randomdatabigquery-cebab77eea6de971-exec-2   0/1 Pending   0
31s

randomdatabigquery-cebab77eea6de971-exec-3   0/1 Pending   0
31s

randomdatabigquery-cebab77eea6de971-exec-4   0/1 Pending   0
31s

randomdatabigquery-cebab77eea6de971-exec-5   0/1 Pending   0
31s

randomdatabigquery-cebab77eea6de971-exec-6   0/1 Pending   0
31s

sparkbq-37405a7eea6b9468-driver  1/1 Running   0
3m4s


NAME READY   STATUS
RESTARTS   AGE

randomdatabigquery-cebab77eea6de971-exec-6   0/1 ContainerCreating   0
112s

sparkbq-37405a7eea6b9468-driver  1/1 Running 0
4m25s

NAME READY   STATUSRESTARTS
 AGE

randomdatabigquery-cebab77eea6de971-exec-6   1/1 Running   0
114s

sparkbq-37405a7eea6b9468-driver  1/1 Running   0
4m27s

Basically I told Spak to have 6 executors but could only bring into running
state one executor after the driver pod spinning for 4 minutes.

22/02/11 20:16:18 INFO SparkKubernetesClientFactory: Auto-configuring K8S
client using current context from users K8S config file

22/02/11 20:16:19 INFO Utils: Using initial executors = 6, max of
spark.dynamicAllocation.initialExecutors,
spark.dynamicAllocation.minExecutors and spark.executor.instances

22/02/11 20:16:19 INFO ExecutorPodsAllocator: Going to request 3 executors
from Kubernetes for ResourceProfile Id: 0, target: 6 running: 0.

22/02/11 20:16:20 INFO BasicExecutorFeatureStep: Decommissioning not
enabled, skipping shutdown script

22/02/11 20:16:20 INFO Utils: Successfully started service
'org.apache.spark.network.netty.NettyBlockTransferService' on port 7079.

22/02/11 20:16:20 INFO NettyBlockTransferService: Server created on
sparkbq-37405a7eea6b9468-driver-svc.spark.svc:7079

22/02/11 20:16:20 INFO BlockManager: Using
org.apache.spark.storage.RandomBlockReplicationPolicy for block replication
policy

22/02/11 20:16:20 INFO BlockManagerMaster: Registering BlockManager
BlockManagerId(driver, sparkbq-37405a7eea6b9468-driver-svc.spark.svc, 7079,
None)

22/02/11 20:16:20 INFO BlockManagerMasterEndpoint: Registering block
manager sparkbq-37405a7eea6b9468-driver-svc.spark.svc:7079 with 366.3 MiB
RAM, BlockManagerId(driver, sparkbq-37405a7eea6b9468-driver-svc.spark.svc,
7079, None)

22/02/11 20:16:20 INFO BlockManagerMaster: Registered BlockManager
BlockManagerId(driver, sparkbq-37405a7eea6b9468-driver-svc.spark.svc, 7079,
None)

22/02/11 20:16:20 INFO BlockManager: Initialized BlockMan

RE: determine week of month from date in spark3

2022-02-11 Thread Appel, Kevin
The output I sent originally with WEEKOFMONTHF is when LEGACY is set, when 
EXCEPTION is set this is the result which is also different

+---+--++
|  a| b|WEEKOFMONTHF|
+---+--++
|  1|2014-03-07|   7|
|  2|2014-03-08|   1|
|  3|2014-03-30|   2|
|  4|2014-03-31|   3|
|  5|2015-03-07|   7|
|  6|2015-03-08|   1|
|  7|2015-03-30|   2|
|  8|2015-03-31|   3|
+---+--++

From: Appel, Kevin
Sent: Friday, February 11, 2022 2:35 PM
To: user@spark.apache.org; 'Sean Owen' 
Subject: RE: determine week of month from date in spark3

Thanks for the reply, that is looking to be along the lines of what is going 
on, and they mention in that item the W is banned which is what I saw in the 
error, however F is not giving the same as W.

+---+--+++
|  a| b|WEEKOFMONTHW|WEEKOFMONTHF|
+---+--+++
|  1|2014-03-07|   2|   1|
|  2|2014-03-08|   2|   2|
|  3|2014-03-30|   6|   5|
|  4|2014-03-31|   6|   5|
|  5|2015-03-07|   1|   1|
|  6|2015-03-08|   2|   2|
|  7|2015-03-30|   5|   5|
|  8|2015-03-31|   5|   5|
+---+--+++

The best way to explain what W is giving, if you look at a printed picture of a 
calendar from March 2014, the March 30 and March 31 are on row6 which is week6; 
whereas if you look at the same for March 2015, they are on row5 which is week5

From: Sean Owen [mailto:sro...@gmail.com]
Sent: Friday, February 11, 2022 2:11 PM
To: Appel, Kevin mailto:kevin.ap...@bofa.com>>
Cc: user@spark.apache.org
Subject: Re: determine week of month from date in spark3

Here is some back-story: 
https://issues.apache.org/jira/browse/SPARK-32683
I think the answer may be: use "F"?

On Fri, Feb 11, 2022 at 12:43 PM Appel, Kevin 
mailto:kevin.ap...@bofa.com.invalid>> wrote:
Previously in Spark2 we could use the spark function date_format with the “W” 
flag and it will provide back the week of month of that date.  In Spark3 when 
trying this there is an error back:


• org.apache.spark.SparkUpgradeException: You may get a different 
result due to the upgrading of Spark 3.0: Fail to recognize 'W' pattern in the 
DateTimeFormatter. 1) You can set spark.sql.legacy.timeParserPolicy to LEGACY 
to restore the behavior before Spark 3.0. 2) You can form a valid datetime 
pattern with the guide from 
https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html


• Caused by: java.lang.IllegalArgumentException: All week-based 
patterns are unsupported since Spark 3.0, detected: W, Please use the SQL 
function EXTRACT instead

If I use the first solution and set the policy to LEGACY, currently it is 
EXCEPTION, then the code runs through

spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")

df1 = spark.createDataFrame(
[
(1, date(2014, 3, 7)),
(2, date(2014, 3, 8)),
(3, date(2014, 3, 30)),
(4, date(2014, 3, 31)),
(5, date(2015, 3, 7)),
(6, date(2015, 3, 8)),
(7, date(2015, 3, 30)),
(8, date(2015, 3, 31)),
],
schema="a long, b date",
)
df1 = df1.withColumn("WEEKOFMONTH", F.date_format(F.col("b"), "W"))
df1.show()

+---+--+---+
|  a| b|WEEKOFMONTH|
+---+--+---+
|  1|2014-03-07|  2|
|  2|2014-03-08|  2|
|  3|2014-03-30|  6|
|  4|2014-03-31|  6|
|  5|2015-03-07|  1|
|  6|2015-03-08|  2|
|  7|2015-03-30|  5|
|  8|2015-03-31|  5|
+---+--+---+

Trying to explore the latter options, in both the EXTRACT and the datetime 
patterns that are listed, I don’t see that there is the “W” option or 
equivalent way to produce this.
The datetime link mentions: In Spark 3.0, we define our own pattern strings in 
Datetime Patterns for Formatting and 
Parsing,
 which is implemented via 
DateTimeFormatter
 under the hood.
If I follow the link into the DateTimeFormatter then I see the W existing there:
   W   week-of-month   number  

Apache spark 3.0.3 [Spark lower version enhancements]

2022-02-11 Thread Rajesh Krishnamurthy
Hi there,

  We are just wondering if there are any agenda by the Spark community to 
actively engage development activities on the 3.0.x path. I know we have the 
latest version of Spark with 3.2.x, but we are just wondering if any 
development plans to have the vulnerabilities fixed on the 3.0.x path that were 
identified on the 3.0.3 version, so that we don’t need to migrate to next major 
version(3.1.x in this case), but at the same time all the vulnerabilities fixed 
within the minor version upgrade(eg:3.0.x)


Rajesh Krishnamurthy | Enterprise Architect
T: +1 510-833-7189 | M: +1 925-917-9208
http://www.perforce.com
Visit us on: 
Twitter
 | 
LinkedIn
 | 
Facebook



This e-mail may contain information that is privileged or confidential. If you 
are not the intended recipient, please delete the e-mail and any attachments 
and notify us immediately.



Re: Apache spark 3.0.3 [Spark lower version enhancements]

2022-02-11 Thread Sean Owen
3.0.x is about EOL now, and I hadn't heard anyone come forward to push a
final maintenance release. Is there a specific issue you're concerned about?

On Fri, Feb 11, 2022 at 4:24 PM Rajesh Krishnamurthy <
rkrishnamur...@perforce.com> wrote:

> Hi there,
>
>   We are just wondering if there are any agenda by the Spark community to
> actively engage development activities on the 3.0.x path. I know we have
> the latest version of Spark with 3.2.x, but we are just wondering if any
> development plans to have the vulnerabilities fixed on the 3.0.x path that
> were identified on the 3.0.3 version, so that we don’t need to migrate to
> next major version(3.1.x in this case), but at the same time all the
> vulnerabilities fixed within the minor version upgrade(eg:3.0.x)
>
>
> Rajesh Krishnamurthy | Enterprise Architect
> T: +1 510-833-7189 | M: +1 925-917-9208
> http://www.perforce.com
> Visit us on: Twitter
> 
>  | LinkedIn
> 
>  | Facebook
> 
>
>
> This e-mail may contain information that is privileged or confidential. If
> you are not the intended recipient, please delete the e-mail and any
> attachments and notify us immediately.
>
>


Re: how to classify column

2022-02-11 Thread Raghavendra Ganesh
You could use expr() function to achieve the same.

.withColumn("newColumn",expr(s"case when score>3 then 'good' else 'bad'
end"))
--
Raghavendra


On Fri, Feb 11, 2022 at 5:59 PM frakass  wrote:

> Hello
>
> I have a column whose value (Int type as score) is from 0 to 5.
> I want to query that, when the score > 3, classified as "good". else
> classified as "bad".
> How do I implement that? A UDF like something as this?
>
> scala> implicit class Foo(i:Int) {
>   |   def classAs(f:Int=>String) = f(i)
>   | }
> class Foo
>
> scala> 4.classAs { x => if (x > 3) "good" else "bad" }
> val res13: String = good
>
> scala> 2.classAs { x => if (x > 3) "good" else "bad" }
> val res14: String = bad
>
>
> Thank you.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Unable to access Google buckets using spark-submit

2022-02-11 Thread karan alang
Hello All,

I'm trying to access gcp buckets while running spark-submit from local, and
running into issues.

I'm getting error :
```

22/02/11 20:06:59 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where
applicable
Exception in thread "main"
org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for
scheme "gs"

```
I tried adding the --conf
spark.hadoop.fs.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem

to the spark-submit command, but getting ClassNotFoundException

Details are in stackoverflow :
https://stackoverflow.com/questions/71088934/unable-to-access-google-buckets-using-spark-submit

Any ideas on how to fix this ?
tia !


unsubscribe

2022-02-11 Thread Basavaraj
unsubscribe

smime.p7s
Description: S/MIME cryptographic signature


Re: how to classify column

2022-02-11 Thread frakass

that's good. thanks

On 2022/2/12 12:11, Raghavendra Ganesh wrote:
.withColumn("newColumn",expr(s"case when score>3 then 'good' else 'bad' 
end"))




-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org