Re: How to guarantee dataset is split over unique partitions (partitioned by a column value)

2022-06-20 Thread Sean Owen
repartition() puts all values with the same key in one partition, but,
multiple other keys can be in the same partition. It sounds like you want
groupBy, not repartition, if you want to handle these separately.

On Mon, Jun 20, 2022 at 8:26 AM DESCOTTE Loic - externe
 wrote:

> Hi,
>
>
>
> I have a data type like this :
>
>case class Data(col: String, ...)
>
>
>
> and a Dataset[Data] ds. Some rows have columns filled with value 'a', and
> other with value 'b', etc.
>
>
>
> I want to process separately all data with a 'a', and all data with a 'b'.
> But I also need to have all the 'a' in the same partition.
>
>
>
> If I do : ds.repartition(col("col")).mapPartition(data => ???)
>
>
>
> Is it guaranteed by default that I will have all the 'a' in a single
> partition, and no 'b' mixed with it in this partition?
>
>
>
> My understanding is that all the 'a' will be in the same partition, but
> 'a' and 'b' may be mixed. So I should do :
>
>
>
> val nbDistinct = ds.select("col").distinct.count
>
> ds.repartition(col("col")).mapPartition{ data =>
>
>// split mixed values in a single partition with group by :
>
>data.groupBy(_.col).flatMap { case (col, rows) => ??? }
>
> }
>
>
>
> Is that correct?
>
>
>
>
>
> I can also do this to force the number of partitions with the number of
> distinct values :
>
> val nbDistinct = ds.select("col").distinct.count
>
> ds.repartition(nbDistinct , col("col")).mapPartition(data => ???)
>
>
>
> But is it useful?
>
> But it adds an action that may be expensive in some cases, and sometimes
> it seems to use less partitions than it should.
>
>
>
> Example (spark shell) :
>
> scala> (Range(0,10).map(_ => Data("a")) union Range(0,10).map(_ =>
> Data("b")) union Range(0,10).map(_ =>
> Data("c"))).toList.toDS.repartition(col("col")).rdd.foreachPartition(part
> => println(part.mkString))
>
>
>
> output :
>
> Data(b)Data(b)Data(b)Data(b)Data(b)Data(b)Data(b)Data(b)Data(b)Data(b)
>
> Data(a)Data(a)Data(a)Data(a)Data(a)Data(a)Data(a)Data(a)Data(a)Data(a)
>
> Data(c)Data(c)Data(c)Data(c)Data(c)Data(c)Data(c)Data(c)Data(c)Data(c)
>
>
>
> I have 3 partitions + a lot of empty partitions (blank lines)
>
> If I do :
>
>
>
> scala> (Range(0,10).map(_ => Data("a")) union Range(0,10).map(_ =>
> Data("b")) union Range(0,10).map(_ =>
> Data("c"))).toList.toDS.repartition(3,col("col")).rdd.foreachPartition(part
> => println(part.mkString))
>
> then output is :
>
>
> Data(a)Data(a)Data(a)Data(a)Data(a)Data(a)Data(a)Data(a)Data(a)Data(a)Data(b)Data(b)Data(b)Data(b)Data(b)Data(b)Data(b)Data(b)Data(b)Data(b)Data(c)Data(c)Data(c)Data(c)Data(c)Data(c)Data(c)Data(c)Data(c)Data(c)
>
>
>
> I would like to have 3 partitions with all ‘a’ (and only the ‘a’), then
> all ‘b’ etc. but I have 2 empty partitions and a partition with all the
> ‘a’, ‘b’ and ‘c’ of my dataset.
>
>
>
> So, is there a good way to guarantee a dataset is split over unique
> partitions for a column value?
>
>
>
> Thanks,
>
> Loïc
>
>
> Ce message et toutes les pièces jointes (ci-après le 'Message') sont
> établis à l'intention exclusive des destinataires et les informations qui y
> figurent sont strictement confidentielles. Toute utilisation de ce Message
> non conforme à sa destination, toute diffusion ou toute publication totale
> ou partielle, est interdite sauf autorisation expresse.
>
> Si vous n'êtes pas le destinataire de ce Message, il vous est interdit de
> le copier, de le faire suivre, de le divulguer ou d'en utiliser tout ou
> partie. Si vous avez reçu ce Message par erreur, merci de le supprimer de
> votre système, ainsi que toutes ses copies, et de n'en garder aucune trace
> sur quelque support que ce soit. Nous vous remercions également d'en
> avertir immédiatement l'expéditeur par retour du message.
>
> Il est impossible de garantir que les communications par messagerie
> électronique arrivent en temps utile, sont sécurisées ou dénuées de toute
> erreur ou virus.
> 
>
> This message and any attachments (the 'Message') are intended solely for
> the addressees. The information contained in this Message is confidential.
> Any use of information contained in this Message not in accord with its
> purpose, any dissemination or disclosure, either whole or partial, is
> prohibited except formal approval.
>
> If you are not the addressee, you may not copy, forward, disclose or use
> any part of it. If you have received this message in error, please delete
> it and all copies from your system and notify the sender immediately by
> return message.
>
> E-mail communication cannot be guaranteed to be timely secure, error or
> virus-free.
>


How to guarantee dataset is split over unique partitions (partitioned by a column value)

2022-06-20 Thread DESCOTTE Loic - externe
Hi,

I have a data type like this :
   case class Data(col: String, ...)

and a Dataset[Data] ds. Some rows have columns filled with value 'a', and other 
with value 'b', etc.

I want to process separately all data with a 'a', and all data with a 'b'. But 
I also need to have all the 'a' in the same partition.

If I do : ds.repartition(col("col")).mapPartition(data => ???)

Is it guaranteed by default that I will have all the 'a' in a single partition, 
and no 'b' mixed with it in this partition?

My understanding is that all the 'a' will be in the same partition, but 'a' and 
'b' may be mixed. So I should do :

val nbDistinct = ds.select("col").distinct.count
ds.repartition(col("col")).mapPartition{ data =>
   // split mixed values in a single partition with group by :
   data.groupBy(_.col).flatMap { case (col, rows) => ??? }
}

Is that correct?


I can also do this to force the number of partitions with the number of 
distinct values :
val nbDistinct = ds.select("col").distinct.count
ds.repartition(nbDistinct , col("col")).mapPartition(data => ???)

But is it useful?
But it adds an action that may be expensive in some cases, and sometimes it 
seems to use less partitions than it should.

Example (spark shell) :
scala> (Range(0,10).map(_ => Data("a")) union Range(0,10).map(_ => Data("b")) 
union Range(0,10).map(_ => 
Data("c"))).toList.toDS.repartition(col("col")).rdd.foreachPartition(part => 
println(part.mkString))

output :
Data(b)Data(b)Data(b)Data(b)Data(b)Data(b)Data(b)Data(b)Data(b)Data(b)
Data(a)Data(a)Data(a)Data(a)Data(a)Data(a)Data(a)Data(a)Data(a)Data(a)
Data(c)Data(c)Data(c)Data(c)Data(c)Data(c)Data(c)Data(c)Data(c)Data(c)

I have 3 partitions + a lot of empty partitions (blank lines)
If I do :

scala> (Range(0,10).map(_ => Data("a")) union Range(0,10).map(_ => Data("b")) 
union Range(0,10).map(_ => 
Data("c"))).toList.toDS.repartition(3,col("col")).rdd.foreachPartition(part => 
println(part.mkString))
then output is :
Data(a)Data(a)Data(a)Data(a)Data(a)Data(a)Data(a)Data(a)Data(a)Data(a)Data(b)Data(b)Data(b)Data(b)Data(b)Data(b)Data(b)Data(b)Data(b)Data(b)Data(c)Data(c)Data(c)Data(c)Data(c)Data(c)Data(c)Data(c)Data(c)Data(c)

I would like to have 3 partitions with all 'a' (and only the 'a'), then all 'b' 
etc. but I have 2 empty partitions and a partition with all the 'a', 'b' and 
'c' of my dataset.

So, is there a good way to guarantee a dataset is split over unique partitions 
for a column value?

Thanks,
Loïc



Ce message et toutes les pièces jointes (ci-après le 'Message') sont établis à 
l'intention exclusive des destinataires et les informations qui y figurent sont 
strictement confidentielles. Toute utilisation de ce Message non conforme à sa 
destination, toute diffusion ou toute publication totale ou partielle, est 
interdite sauf autorisation expresse.

Si vous n'êtes pas le destinataire de ce Message, il vous est interdit de le 
copier, de le faire suivre, de le divulguer ou d'en utiliser tout ou partie. Si 
vous avez reçu ce Message par erreur, merci de le supprimer de votre système, 
ainsi que toutes ses copies, et de n'en garder aucune trace sur quelque support 
que ce soit. Nous vous remercions également d'en avertir immédiatement 
l'expéditeur par retour du message.

Il est impossible de garantir que les communications par messagerie 
électronique arrivent en temps utile, sont sécurisées ou dénuées de toute 
erreur ou virus.


This message and any attachments (the 'Message') are intended solely for the 
addressees. The information contained in this Message is confidential. Any use 
of information contained in this Message not in accord with its purpose, any 
dissemination or disclosure, either whole or partial, is prohibited except 
formal approval.

If you are not the addressee, you may not copy, forward, disclose or use any 
part of it. If you have received this message in error, please delete it and 
all copies from your system and notify the sender immediately by return message.

E-mail communication cannot be guaranteed to be timely secure, error or 
virus-free.


Re: How reading works?

2022-06-20 Thread Sid
Hi Team,

Can somebody help?

Thanks,
Sid

On Sun, Jun 19, 2022 at 3:51 PM Sid  wrote:

> Hi,
>
> I already have a partitioned JSON dataset in s3 like the below:
>
> edl_timestamp=2022090800
>
> Now, the problem is, in the earlier 10 days of data collection there was a
> duplicate columns issue due to which we couldn't read the data.
>
> Now the latest 10 days of data are proper. So, I am trying to do
> something like the below:
>
>
> spark.read.option("multiline","true").json("path").filter(col("edl_timestamp")>last_saved_timestamp)
>
> but I am getting the issue of the duplicate column which was present in
> the old dataset. So, I am trying to understand how the spark reads the
> data. Does it full dataset and filter on the basis of the last saved
> timestamp or does it filter only what is required? If the second case is
> true, then it should have read the data since the latest data is correct.
>
> So just trying to understand. Could anyone help here?
>
> Thanks,
> Sid
>
>
>