dataframe null safe joins given a list of columns

2020-02-06 Thread Marcelo Valle
I was surprised I couldn't find a way of solving this in spark, as it must
be a very common problem for users. Then I decided to ask here.

Consider the code bellow:

```
val joinColumns = Seq("a", "b")
val df1 = Seq(("a1", "b1", "c1"), ("a2", "b2", "c2"), ("a4", null,
"c4")).toDF("a", "b", "c")
val df2 = Seq(("a1", "b1", "d1"), ("a3", "b3", "d3"), ("a4", null,
"d4")).toDF("a", "b", "d")
df1.join(df2, joinColumns).show()
```

The output is :

```
+---+---+---+---+
|  a|  b|  c|  d|
+---+---+---+---+
| a1| b1| c1| d1|
+---+---+---+---+
```

But I want it to be:

```
+---+-+---+---+
|  a|b|  c|  d|
+---+-+---+---+
| a1|   b1| c1| d1|
| a4| null| c4| d4|
+---+-+---+---+
```

The join syntax of `df1.join(df2, joinColumns)` has some advantages, as it
doesn't create duplicate columns by default. However, it uses the operator
`===` to join, not the null safe one `<=>`.

Using the following syntax:

```
df1.join(df2, df1("a") <=> df2("a") && df1("b") <=> df2("b")).show()
```

Would produce:

```
+---++---+---++---+
|  a|   b|  c|  a|   b|  d|
+---++---+---++---+
| a1|  b1| c1| a1|  b1| d1|
| a4|null| c4| a4|null| d4|
+---++---+---++---+
```

So to get the result I really want, I must do:

```
df1.join(df2, df1("a") <=> df2("a") && df1("b") <=>
df2("b")).drop(df2("a")).drop(df2("b")).show()
+---++---+---+
|  a|   b|  c|  d|
+---++---+---+
| a1|  b1| c1| d1|
| a4|null| c4| d4|
+---++---+---+
```

Which works, but is really verbose, especially when you have many join
columns.

Is there a better way of solving this without needing a utility method?
This same problem is something I find in every spark project.

This email is confidential [and may be protected by legal privilege]. If you 
are not the intended recipient, please do not copy or disclose its content but 
contact the sender immediately upon receipt.

KTech Services Ltd is registered in England as company number 10704940.

Registered Office: The River Building, 1 Cousin Lane, London EC4R 3TE, United 
Kingdom


join with just 1 record causes all data to go to a single node

2019-11-21 Thread Marcelo Valle
Hi,

I am using spark on EMR 5.28.0.

We were having a problem in production where, after a join between 2
dataframes, in some situations all data was being moved to a single node,
and then the cluster was failing after retrying many times.

Our join is something like that:

```

df1.join(df2,
  df1("field1") <=> df2("field1")
&& df1("field2") <=> df2("field2"))

```

After some harvesting, we were able to isolate the corner case - this was
only happening when all join fields were NULL. Notice the `<=>` operator
instead of `===`.

Would someone be able to explain this behavior? It looks like a bug to me,
but I could be missing something.

Thanks,
Marcelo.

This email is confidential [and may be protected by legal privilege]. If you 
are not the intended recipient, please do not copy or disclose its content but 
contact the sender immediately upon receipt.

KTech Services Ltd is registered in England as company number 10704940.

Registered Office: The River Building, 1 Cousin Lane, London EC4R 3TE, United 
Kingdom


Re: custom rdd - do I need a hadoop input format?

2019-09-18 Thread Marcelo Valle
To implement a custom RDD with getPartitions, I have to extend
`NewHadoopRDD` informing the hadoop input format class, right?
What input format could I inform so the file won't be read all at once and
my getPartitions method could split by block?

On Tue, 17 Sep 2019 at 18:53, Arun Mahadevan  wrote:

> You can do it with custom RDD implementation.
> You will mainly implement "getPartitions" - the logic to split your input
> into partitions and "compute" to compute and return the values from the
> executors.
>
> On Tue, 17 Sep 2019 at 08:47, Marcelo Valle 
> wrote:
>
>> Just to be more clear about my requirements, what I have is actually a
>> custom format, with header, summary and multi line blocks. I want to create
>> tasks per block and no per line.I already have a library that reads an
>> InputStream and outputs an Iterator of Block, but now I need to integrate
>> this with spark
>>
>> On Tue, 17 Sep 2019 at 16:28, Marcelo Valle 
>> wrote:
>>
>>> Hi,
>>>
>>> I want to create a custom RDD which will read n lines in sequence from a
>>> file, which I call a block, and each block should be converted to a spark
>>> dataframe to be processed in parallel.
>>>
>>> Question - do I have to implement a custom hadoop input format to
>>> achieve this? Or is it possible to do it only with RDD APIs?
>>>
>>> Thanks,
>>> Marcelo.
>>>
>>
>> This email is confidential [and may be protected by legal privilege]. If
>> you are not the intended recipient, please do not copy or disclose its
>> content but contact the sender immediately upon receipt.
>>
>> KTech Services Ltd is registered in England as company number 10704940.
>>
>> Registered Office: The River Building, 1 Cousin Lane, London EC4R 3TE,
>> United Kingdom
>>
>

This email is confidential [and may be protected by legal privilege]. If you 
are not the intended recipient, please do not copy or disclose its content but 
contact the sender immediately upon receipt.

KTech Services Ltd is registered in England as company number 10704940.

Registered Office: The River Building, 1 Cousin Lane, London EC4R 3TE, United 
Kingdom


Re: custom rdd - do I need a hadoop input format?

2019-09-17 Thread Marcelo Valle
Just to be more clear about my requirements, what I have is actually a
custom format, with header, summary and multi line blocks. I want to create
tasks per block and no per line.I already have a library that reads an
InputStream and outputs an Iterator of Block, but now I need to integrate
this with spark

On Tue, 17 Sep 2019 at 16:28, Marcelo Valle  wrote:

> Hi,
>
> I want to create a custom RDD which will read n lines in sequence from a
> file, which I call a block, and each block should be converted to a spark
> dataframe to be processed in parallel.
>
> Question - do I have to implement a custom hadoop input format to achieve
> this? Or is it possible to do it only with RDD APIs?
>
> Thanks,
> Marcelo.
>

This email is confidential [and may be protected by legal privilege]. If you 
are not the intended recipient, please do not copy or disclose its content but 
contact the sender immediately upon receipt.

KTech Services Ltd is registered in England as company number 10704940.

Registered Office: The River Building, 1 Cousin Lane, London EC4R 3TE, United 
Kingdom


custom rdd - do I need a hadoop input format?

2019-09-17 Thread Marcelo Valle
Hi,

I want to create a custom RDD which will read n lines in sequence from a
file, which I call a block, and each block should be converted to a spark
dataframe to be processed in parallel.

Question - do I have to implement a custom hadoop input format to achieve
this? Or is it possible to do it only with RDD APIs?

Thanks,
Marcelo.

This email is confidential [and may be protected by legal privilege]. If you 
are not the intended recipient, please do not copy or disclose its content but 
contact the sender immediately upon receipt.

KTech Services Ltd is registered in England as company number 10704940.

Registered Office: The River Building, 1 Cousin Lane, London EC4R 3TE, United 
Kingdom


Re: help understanding physical plan

2019-08-16 Thread Marcelo Valle
Thanks Tianlang. I saw the DAG on YARN, but what really solved my problem
is adding intermediate steps and evaluating them eagerly to find out where
the bottleneck was.
My process now runs in 6 min. :D

Thanks for the help.

[]s

On Thu, 15 Aug 2019 at 07:25, Tianlang 
wrote:

> Hi,
>
> Maybe you can look at the spark ui. The physical plan has no time
> consuming information.
> 在 2019/8/13 下午10:45, Marcelo Valle 写道:
>
> Hi,
>
> I have a job running on AWS EMR. It's basically a join between 2 tables
> (parquet files on s3), one somehow large (around 50 gb) and other small
> (less than 1gb).
> The small table is the result of other operations, but it was a dataframe
> with `.persist(StorageLevel.MEMORY_AND_DISK_SER)` and the count on this
> dataframe finishes quickly.
> When I run my "LEFT_ANTI" join, I get the execution plan down bellow.
> While most of my jobs on larges amount of data take max 1 h on this
> cluster, this one takes almost 1 day to complete.
>
> What could I be doing wrong? I am trying to analyze the plan, but I can't
> find anything that justify the slowness. It has 2 shuffles followed by a
> zip, but other jobs have similar things and they are not that slow.
>
> Could anyone point me to possible actions I could take to investigate this?
>
> Thanks,
> Marcelo.
>
> == Physical Plan ==
> *(2) Project [USAGE_AGGREGATED_METADATA_ID#1493,
> SENDER_RECORDING_IDENTIFIER#1499, AIP127258 AS SENDER_IP_ID#1702,
> USAGE_AGGREGATED_METADATA_HASH#1513]
> +- *(2) BroadcastHashJoin [coalesce(USAGE_AGGREGATED_METADATA_ID#1493, ),
> coalesce(SENDER_RECORDING_IDENTIFIER#1499, )],
> [coalesce(USAGE_AGGREGATED_METADATA_ID#356, ),
> coalesce(SENDER_RECORDING_IDENTIFIER#357, )], LeftAnti, BuildRight,
> ((USAGE_AGGREGATED_METADATA_ID#356 <=> USAGE_AGGREGATED_METADATA_ID#1493)
> && (SENDER_RECORDING_IDENTIFIER#357 <=> SENDER_RECORDING_IDENTIFIER#1499))
>:- InMemoryTableScan [USAGE_AGGREGATED_METADATA_ID#1493,
> SENDER_RECORDING_IDENTIFIER#1499, USAGE_AGGREGATED_METADATA_HASH#1513]
>: +- InMemoryRelation [USAGE_AGGREGATED_METADATA_ID#1493,
> ISRC#1494, ISWC#1495, RECORDING_TITLE#1496, RECORDING_DISPLAY_ARTIST#1497,
> WORK_WRITERS#1498, SENDER_RECORDING_IDENTIFIER#1499,
> RECORDING_VERSION_TITLE#1500, WORK_TITLE#1501, CONTENT_TYPE#1502,
> USAGE_AGGREGATED_METADATA_HASH#1513], StorageLevel(disk, memory, 1 replicas)
>:   +- *(2) Project [ID#328 AS
> USAGE_AGGREGATED_METADATA_ID#1493, isrc#289 AS ISRC#1494, iswc#290 AS
> ISWC#1495, track_name#291 AS RECORDING_TITLE#1496, artist_name#292 AS
> RECORDING_DISPLAY_ARTIST#1497, work_writer_names#293 AS WORK_WRITERS#1498,
> uri#286 AS SENDER_RECORDING_IDENTIFIER#1499, null AS
> RECORDING_VERSION_TITLE#1500, null AS WORK_TITLE#1501, SOUND AS
> CONTENT_TYPE#1502, UDF(array(isrc#289, track_name#291, null,
> artist_name#292, iswc#290, null, work_writer_names#293, SOUND)) AS
> USAGE_AGGREGATED_METADATA_HASH#1513]
>:  +- *(2) BroadcastHashJoin [coalesce(isrc_1#1419, ),
> coalesce(iswc_1#1420, ), coalesce(track_name_1#1421, ),
> coalesce(artist_name_1#1422, ), coalesce(work_writer_names_1#1423, )],
> [coalesce(isrc#289, ), coalesce(iswc#290, ), coalesce(track_name#291, ),
> coalesce(artist_name#292, ), coalesce(work_writer_names#293, )], Inner,
> BuildLeft, (isrc#289 <=> isrc_1#1419) && (iswc#290 <=> iswc_1#1420)) &&
> (track_name#291 <=> track_name_1#1421)) && (artist_name#292 <=>
> artist_name_1#1422)) && (work_writer_names#293 <=>
> work_writer_names_1#1423))
>: :- BroadcastExchange
> HashedRelationBroadcastMode(List(coalesce(input[1, string, true], ),
> coalesce(input[2, string, true], ), coalesce(input[3, string, true], ),
> coalesce(input[4, string, true], ), coalesce(input[5, string, true], )))
>: :  +- *(1) Project [ID#328, isrc#289 AS isrc_1#1419,
> iswc#290 AS iswc_1#1420, track_name#291 AS track_name_1#1421,
> artist_name#292 AS artist_name_1#1422, work_writer_names#293 AS
> work_writer_names_1#1423]
>: : +- *(1) Filter isnotnull(ID#328)
>: :+- InMemoryTableScan [ID#328,
> artist_name#292, isrc#289, iswc#290, track_name#291,
> work_writer_names#293], [isnotnull(ID#328)]
>: :  +- InMemoryRelation [ID#328, isrc#289,
> iswc#290, track_name#291, artist_name#292, work_writer_names#293],
> StorageLevel(disk, memory, 1 replicas)
>: :+- *(2) Project [ID#328,
> isrc#289, iswc#290, track_name#291, artist_name#292, work_writer_names#293]
>: :   +- *(2) BroadcastHashJoin
> [coalesce(ISRC#329, ), coalesce(I

help understanding physical plan

2019-08-13 Thread Marcelo Valle
Hi,

I have a job running on AWS EMR. It's basically a join between 2 tables
(parquet files on s3), one somehow large (around 50 gb) and other small
(less than 1gb).
The small table is the result of other operations, but it was a dataframe
with `.persist(StorageLevel.MEMORY_AND_DISK_SER)` and the count on this
dataframe finishes quickly.
When I run my "LEFT_ANTI" join, I get the execution plan down bellow. While
most of my jobs on larges amount of data take max 1 h on this cluster, this
one takes almost 1 day to complete.

What could I be doing wrong? I am trying to analyze the plan, but I can't
find anything that justify the slowness. It has 2 shuffles followed by a
zip, but other jobs have similar things and they are not that slow.

Could anyone point me to possible actions I could take to investigate this?

Thanks,
Marcelo.

== Physical Plan ==
*(2) Project [USAGE_AGGREGATED_METADATA_ID#1493,
SENDER_RECORDING_IDENTIFIER#1499, AIP127258 AS SENDER_IP_ID#1702,
USAGE_AGGREGATED_METADATA_HASH#1513]
+- *(2) BroadcastHashJoin [coalesce(USAGE_AGGREGATED_METADATA_ID#1493, ),
coalesce(SENDER_RECORDING_IDENTIFIER#1499, )],
[coalesce(USAGE_AGGREGATED_METADATA_ID#356, ),
coalesce(SENDER_RECORDING_IDENTIFIER#357, )], LeftAnti, BuildRight,
((USAGE_AGGREGATED_METADATA_ID#356 <=> USAGE_AGGREGATED_METADATA_ID#1493)
&& (SENDER_RECORDING_IDENTIFIER#357 <=> SENDER_RECORDING_IDENTIFIER#1499))
   :- InMemoryTableScan [USAGE_AGGREGATED_METADATA_ID#1493,
SENDER_RECORDING_IDENTIFIER#1499, USAGE_AGGREGATED_METADATA_HASH#1513]
   : +- InMemoryRelation [USAGE_AGGREGATED_METADATA_ID#1493, ISRC#1494,
ISWC#1495, RECORDING_TITLE#1496, RECORDING_DISPLAY_ARTIST#1497,
WORK_WRITERS#1498, SENDER_RECORDING_IDENTIFIER#1499,
RECORDING_VERSION_TITLE#1500, WORK_TITLE#1501, CONTENT_TYPE#1502,
USAGE_AGGREGATED_METADATA_HASH#1513], StorageLevel(disk, memory, 1 replicas)
   :   +- *(2) Project [ID#328 AS
USAGE_AGGREGATED_METADATA_ID#1493, isrc#289 AS ISRC#1494, iswc#290 AS
ISWC#1495, track_name#291 AS RECORDING_TITLE#1496, artist_name#292 AS
RECORDING_DISPLAY_ARTIST#1497, work_writer_names#293 AS WORK_WRITERS#1498,
uri#286 AS SENDER_RECORDING_IDENTIFIER#1499, null AS
RECORDING_VERSION_TITLE#1500, null AS WORK_TITLE#1501, SOUND AS
CONTENT_TYPE#1502, UDF(array(isrc#289, track_name#291, null,
artist_name#292, iswc#290, null, work_writer_names#293, SOUND)) AS
USAGE_AGGREGATED_METADATA_HASH#1513]
   :  +- *(2) BroadcastHashJoin [coalesce(isrc_1#1419, ),
coalesce(iswc_1#1420, ), coalesce(track_name_1#1421, ),
coalesce(artist_name_1#1422, ), coalesce(work_writer_names_1#1423, )],
[coalesce(isrc#289, ), coalesce(iswc#290, ), coalesce(track_name#291, ),
coalesce(artist_name#292, ), coalesce(work_writer_names#293, )], Inner,
BuildLeft, (isrc#289 <=> isrc_1#1419) && (iswc#290 <=> iswc_1#1420)) &&
(track_name#291 <=> track_name_1#1421)) && (artist_name#292 <=>
artist_name_1#1422)) && (work_writer_names#293 <=>
work_writer_names_1#1423))
   : :- BroadcastExchange
HashedRelationBroadcastMode(List(coalesce(input[1, string, true], ),
coalesce(input[2, string, true], ), coalesce(input[3, string, true], ),
coalesce(input[4, string, true], ), coalesce(input[5, string, true], )))
   : :  +- *(1) Project [ID#328, isrc#289 AS isrc_1#1419,
iswc#290 AS iswc_1#1420, track_name#291 AS track_name_1#1421,
artist_name#292 AS artist_name_1#1422, work_writer_names#293 AS
work_writer_names_1#1423]
   : : +- *(1) Filter isnotnull(ID#328)
   : :+- InMemoryTableScan [ID#328,
artist_name#292, isrc#289, iswc#290, track_name#291,
work_writer_names#293], [isnotnull(ID#328)]
   : :  +- InMemoryRelation [ID#328, isrc#289,
iswc#290, track_name#291, artist_name#292, work_writer_names#293],
StorageLevel(disk, memory, 1 replicas)
   : :+- *(2) Project [ID#328,
isrc#289, iswc#290, track_name#291, artist_name#292, work_writer_names#293]
   : :   +- *(2) BroadcastHashJoin
[coalesce(ISRC#329, ), coalesce(ISWC#330, ), coalesce(RECORDING_TITLE#331,
), coalesce(RECORDING_DISPLAY_ARTIST#332, ), coalesce(WORK_WRITERS#333, )],
[coalesce(isrc#289, ), coalesce(iswc#290, ), coalesce(track_name#291, ),
coalesce(substring(artist_name#292, 0, 1000), ),
coalesce(work_writer_names#293, )], RightOuter, BuildLeft, (isrc#289
<=> ISRC#329) && (iswc#290 <=> ISWC#330)) && (track_name#291 <=>
RECORDING_TITLE#331)) && (substring(artist_name#292, 0, 1000) <=>
RECORDING_DISPLAY_ARTIST#332)) && (work_writer_names#293 <=>
WORK_WRITERS#333))
   : :  :- BroadcastExchange
HashedRelationBroadcastMode(List(coalesce(input[1, string, true], ),
coalesce(input[2, string, true], ), coalesce(input[3, string, true], ),
coalesce(input[4, string, true], ), coalesce(input[5, string, true], )))
   : :  :  +- *(1) Project [ID#328,
ISRC#329, ISWC#330, 

Re: best docker image to use

2019-06-13 Thread Marcelo Valle
Thanks Riccardo. This is useful, and it seems it's maintained by jupyter
team.
I was hoping I would find some maintained by spark team.

Right now, I am using the base images from this repo:
https://github.com/big-data-europe/docker-spark/

-Marcelo

On Tue, 11 Jun 2019 at 12:19, Riccardo Ferrari  wrote:

> Hi Marcelo,
>
> I'm used to work with https://github.com/jupyter/docker-stacks. There's
> the Scala+jupyter option too. Though there might be better option with
> Zeppelin too.
> Hth
>
>
> On Tue, 11 Jun 2019, 11:52 Marcelo Valle,  wrote:
>
>> Hi,
>>
>> I would like to run spark shell + scala on a docker environment, just to
>> play with docker in development machine without having to install JVM + a
>> lot of things.
>>
>> Is there something as an "official docker image" I am recommended to use?
>> I saw some on docker hub, but it seems they are all contributions from
>> pro-active individuals. I wonder whether the group maintaining Apache Spark
>> also maintains some docker images for use cases like this?
>>
>> Thanks,
>> Marcelo.
>>
>> This email is confidential [and may be protected by legal privilege]. If
>> you are not the intended recipient, please do not copy or disclose its
>> content but contact the sender immediately upon receipt.
>>
>> KTech Services Ltd is registered in England as company number 10704940.
>>
>> Registered Office: The River Building, 1 Cousin Lane, London EC4R 3TE,
>> United Kingdom
>>
>

This email is confidential [and may be protected by legal privilege]. If you 
are not the intended recipient, please do not copy or disclose its content but 
contact the sender immediately upon receipt.

KTech Services Ltd is registered in England as company number 10704940.

Registered Office: The River Building, 1 Cousin Lane, London EC4R 3TE, United 
Kingdom


best docker image to use

2019-06-11 Thread Marcelo Valle
Hi,

I would like to run spark shell + scala on a docker environment, just to
play with docker in development machine without having to install JVM + a
lot of things.

Is there something as an "official docker image" I am recommended to use? I
saw some on docker hub, but it seems they are all contributions from
pro-active individuals. I wonder whether the group maintaining Apache Spark
also maintains some docker images for use cases like this?

Thanks,
Marcelo.

This email is confidential [and may be protected by legal privilege]. If you 
are not the intended recipient, please do not copy or disclose its content but 
contact the sender immediately upon receipt.

KTech Services Ltd is registered in England as company number 10704940.

Registered Office: The River Building, 1 Cousin Lane, London EC4R 3TE, United 
Kingdom


Re: adding a column to a groupBy (dataframe)

2019-06-07 Thread Marcelo Valle
Hi Bruno, that's really interesting...

So, to use explode, I would have to do a group by on countries and a
collect_all on cities, then explode the cities, right? Am I understanding
the idea right?

I think this could produce the results I want. But what would be the
behaviour under the hood? Does collect_all return an iterator or does it
return a list? If I have a country with too many cities, would my server
have to store all cities of a country in memory?





On Thu, 6 Jun 2019 at 20:57, Bruno Nassivet 
wrote:

> Hi Marcelo,
>
> Maybe the spark.sql.functions.explode give what you need?
>
> // Bruno
>
>
> Le 6 juin 2019 à 16:02, Marcelo Valle  a écrit :
>
> Generating the city id (child) is easy, monotonically increasing id worked
> for me.
>
> The problem is the country (parent) which has to be in both countries and
> cities data frames.
>
>
>
> On Thu, 6 Jun 2019 at 14:57, Magnus Nilsson  wrote:
>
>> Well, you could do a repartition on cityname/nrOfCities and use the
>> spark_partition_id function or the mappartitionswithindex dataframe method
>> to add a city Id column. Then just split the dataframe into two subsets. Be
>> careful of hashcollisions on the reparition Key though, or more than one
>> city might end up in the same partition (you can use a custom partitioner).
>>
>> It all depends on what kind of Id you want/need for the city value. I.e.
>> will you later need to append new city Id:s or not. Do you always handle
>> the entire dataset when you make this change or not.
>>
>> On the other hand, getting a distinct list of citynames is a non
>> shuffling fast operation, add a row_number column and do a broadcast join
>> with the original dataset and then split into two subsets. Probably a bit
>> faster than reshuffling the entire dataframe. As always the proof is in the
>> pudding.
>>
>> //Magnus
>>
>> On Thu, Jun 6, 2019 at 2:53 PM Marcelo Valle 
>> wrote:
>>
>>> Akshay,
>>>
>>> First of all, thanks for the answer. I *am* using monotonically
>>> increasing id, but that's not my problem.
>>> My problem is I want to output 2 tables from 1 data frame, 1 parent
>>> table with ID for the group by and 1 child table with the parent id without
>>> the group by.
>>>
>>> I was able to solve this problem by grouping by, generating a parent
>>> data frame with an id, then joining the parent dataframe with the original
>>> one to get a child dataframe with a parent id.
>>>
>>> I would like to find a solution without this second join, though.
>>>
>>> Thanks,
>>> Marcelo.
>>>
>>>
>>> On Thu, 6 Jun 2019 at 10:49, Akshay Bhardwaj <
>>> akshay.bhardwaj1...@gmail.com> wrote:
>>>
>>>> Hi Marcelo,
>>>>
>>>> If you are using spark 2.3+ and dataset API/SparkSQL,you can use this
>>>> inbuilt function "monotonically_increasing_id" in Spark.
>>>> A little tweaking using Spark sql inbuilt functions can enable you to
>>>> achieve this without having to write code or define RDDs with map/reduce
>>>> functions.
>>>>
>>>> Akshay Bhardwaj
>>>> +91-97111-33849
>>>>
>>>>
>>>> On Thu, May 30, 2019 at 4:05 AM Marcelo Valle 
>>>> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I am new to spark and I am trying to write an application using
>>>>> dataframes that normalize data.
>>>>>
>>>>> So I have a dataframe `denormalized_cities` with 3 columns:  COUNTRY,
>>>>> CITY, CITY_NICKNAME
>>>>>
>>>>> Here is what I want to do:
>>>>>
>>>>>
>>>>>1. Map by country, then for each country generate a new ID and
>>>>>write to a new dataframe `countries`, which would have COUNTRY_ID, 
>>>>> COUNTRY
>>>>>- country ID would be generated, probably using
>>>>>`monotonically_increasing_id`.
>>>>>2. For each country, write several lines on a new dataframe
>>>>>`cities`, which would have COUNTRY_ID, ID, CITY, CITY_NICKNAME. 
>>>>> COUNTRY_ID
>>>>>would be the same generated on country table and ID would be another 
>>>>> ID I
>>>>>generate.
>>>>>
>>>>> What's the best way to do this, hopefully using only dataframes (no
>>>>> low level RDDs) unless it's not possible?
>>>>>
>>>&g

Re: adding a column to a groupBy (dataframe)

2019-06-06 Thread Marcelo Valle
Hi Magnus, Thanks for replying.

I didn't get the partition solution, tbh, but indeed, I was trying to
figure a way of solving only with data frames without rejoining.

I can't have a global list of countries in my real scenario, as the real
scenario is not reference data, countries was just an example.

It would be nice if dataframe api could evolve in the future to handle this
use case. This doesn't seem to be an isolated case, at least in the company
I work for.

Anyway, the question is answered, I wanted to hear that there was no better
way of doing it using data frames only from someone more experienced in
Spark - I am still in the beginning of my journey with it - thanks for the
help!

Thanks,
Marcelo.


On Thu, 6 Jun 2019 at 15:18, Magnus Nilsson  wrote:

> Sorry, misread. Just change city to country in my previous answer. I still
> believe the most efficient way to do this is to create a small dataset with
> country name and id columns and broadcast to join in the city dataset. Other
> than that you have to bring in all the cities from the same country into
> the same partition and do a partition based operation.
>
> If you want to do neither you need to be able to determine the Country ID
> from the string representation of the name, make it deterministic, and make
> sure your algorithm doesn't collide.
>
> I don't see any other way to do it. I still vote for doing a distinct
> collect, add an Id, do a  broadcast join unless your dataset is
> pre-bucketed along non-colliding Country name lines, then the
> partition-based solution is probably faster. Or better yet, pre-create a
> list of all the worlds countries with an Id and do a broadcast join
> straight away.
>
>
> Regards,
>
> Magnus
> --
> *From:* Marcelo Valle 
> *Sent:* Thursday, June 6, 2019 16:02
> *To:* Magnus Nilsson
> *Cc:* user @spark
> *Subject:* Re: adding a column to a groupBy (dataframe)
>
> Generating the city id (child) is easy, monotonically increasing id worked
> for me.
>
> The problem is the country (parent) which has to be in both countries and
> cities data frames.
>
>
>
> On Thu, 6 Jun 2019 at 14:57, Magnus Nilsson  wrote:
>
> Well, you could do a repartition on cityname/nrOfCities and use the
> spark_partition_id function or the mappartitionswithindex dataframe method
> to add a city Id column. Then just split the dataframe into two subsets. Be
> careful of hashcollisions on the reparition Key though, or more than one
> city might end up in the same partition (you can use a custom partitioner).
>
> It all depends on what kind of Id you want/need for the city value. I.e.
> will you later need to append new city Id:s or not. Do you always handle
> the entire dataset when you make this change or not.
>
> On the other hand, getting a distinct list of citynames is a non shuffling
> fast operation, add a row_number column and do a broadcast join with the
> original dataset and then split into two subsets. Probably a bit faster
> than reshuffling the entire dataframe. As always the proof is in the
> pudding.
>
> //Magnus
>
> On Thu, Jun 6, 2019 at 2:53 PM Marcelo Valle 
> wrote:
>
> Akshay,
>
> First of all, thanks for the answer. I *am* using monotonically increasing
> id, but that's not my problem.
> My problem is I want to output 2 tables from 1 data frame, 1 parent table
> with ID for the group by and 1 child table with the parent id without the
> group by.
>
> I was able to solve this problem by grouping by, generating a parent data
> frame with an id, then joining the parent dataframe with the original one
> to get a child dataframe with a parent id.
>
> I would like to find a solution without this second join, though.
>
> Thanks,
> Marcelo.
>
>
> On Thu, 6 Jun 2019 at 10:49, Akshay Bhardwaj <
> akshay.bhardwaj1...@gmail.com> wrote:
>
> Hi Marcelo,
>
> If you are using spark 2.3+ and dataset API/SparkSQL,you can use this
> inbuilt function "monotonically_increasing_id" in Spark.
> A little tweaking using Spark sql inbuilt functions can enable you to
> achieve this without having to write code or define RDDs with map/reduce
> functions.
>
> Akshay Bhardwaj
> +91-97111-33849
>
>
> On Thu, May 30, 2019 at 4:05 AM Marcelo Valle 
> wrote:
>
> Hi all,
>
> I am new to spark and I am trying to write an application using dataframes
> that normalize data.
>
> So I have a dataframe `denormalized_cities` with 3 columns:  COUNTRY,
> CITY, CITY_NICKNAME
>
> Here is what I want to do:
>
>
>1. Map by country, then for each country generate a new ID and write
>to a new dataframe `countries`, which would have COUNTRY_ID, COUNTRY -
>country ID would be generat

Re: adding a column to a groupBy (dataframe)

2019-06-06 Thread Marcelo Valle
Generating the city id (child) is easy, monotonically increasing id worked
for me.

The problem is the country (parent) which has to be in both countries and
cities data frames.



On Thu, 6 Jun 2019 at 14:57, Magnus Nilsson  wrote:

> Well, you could do a repartition on cityname/nrOfCities and use the
> spark_partition_id function or the mappartitionswithindex dataframe method
> to add a city Id column. Then just split the dataframe into two subsets. Be
> careful of hashcollisions on the reparition Key though, or more than one
> city might end up in the same partition (you can use a custom partitioner).
>
> It all depends on what kind of Id you want/need for the city value. I.e.
> will you later need to append new city Id:s or not. Do you always handle
> the entire dataset when you make this change or not.
>
> On the other hand, getting a distinct list of citynames is a non shuffling
> fast operation, add a row_number column and do a broadcast join with the
> original dataset and then split into two subsets. Probably a bit faster
> than reshuffling the entire dataframe. As always the proof is in the
> pudding.
>
> //Magnus
>
> On Thu, Jun 6, 2019 at 2:53 PM Marcelo Valle 
> wrote:
>
>> Akshay,
>>
>> First of all, thanks for the answer. I *am* using monotonically
>> increasing id, but that's not my problem.
>> My problem is I want to output 2 tables from 1 data frame, 1 parent table
>> with ID for the group by and 1 child table with the parent id without the
>> group by.
>>
>> I was able to solve this problem by grouping by, generating a parent data
>> frame with an id, then joining the parent dataframe with the original one
>> to get a child dataframe with a parent id.
>>
>> I would like to find a solution without this second join, though.
>>
>> Thanks,
>> Marcelo.
>>
>>
>> On Thu, 6 Jun 2019 at 10:49, Akshay Bhardwaj <
>> akshay.bhardwaj1...@gmail.com> wrote:
>>
>>> Hi Marcelo,
>>>
>>> If you are using spark 2.3+ and dataset API/SparkSQL,you can use this
>>> inbuilt function "monotonically_increasing_id" in Spark.
>>> A little tweaking using Spark sql inbuilt functions can enable you to
>>> achieve this without having to write code or define RDDs with map/reduce
>>> functions.
>>>
>>> Akshay Bhardwaj
>>> +91-97111-33849
>>>
>>>
>>> On Thu, May 30, 2019 at 4:05 AM Marcelo Valle 
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I am new to spark and I am trying to write an application using
>>>> dataframes that normalize data.
>>>>
>>>> So I have a dataframe `denormalized_cities` with 3 columns:  COUNTRY,
>>>> CITY, CITY_NICKNAME
>>>>
>>>> Here is what I want to do:
>>>>
>>>>
>>>>1. Map by country, then for each country generate a new ID and
>>>>write to a new dataframe `countries`, which would have COUNTRY_ID, 
>>>> COUNTRY
>>>>- country ID would be generated, probably using
>>>>`monotonically_increasing_id`.
>>>>2. For each country, write several lines on a new dataframe
>>>>`cities`, which would have COUNTRY_ID, ID, CITY, CITY_NICKNAME. 
>>>> COUNTRY_ID
>>>>would be the same generated on country table and ID would be another ID 
>>>> I
>>>>generate.
>>>>
>>>> What's the best way to do this, hopefully using only dataframes (no low
>>>> level RDDs) unless it's not possible?
>>>>
>>>> I clearly see a MAP/Reduce process where for each KEY mapped I generate
>>>> a row in countries table with COUNTRY_ID and for every value I write a row
>>>> in cities table. But how to implement this in an easy and efficient way?
>>>>
>>>> I thought about using a `GroupBy Country` and then using `collect` to
>>>> collect all values for that country, but then I don't know how to generate
>>>> the country id and I am not sure about memory efficiency of `collect` for a
>>>> country with too many cities (bare in mind country/city is just an example,
>>>> my real entities are different).
>>>>
>>>> Could anyone point me to the direction of a good solution?
>>>>
>>>> Thanks,
>>>> Marcelo.
>>>>
>>>> This email is confidential [and may be protected by legal privilege].
>>>> If you are not the intended recipient, please do not copy or disclose its
>>>> conten

Re: adding a column to a groupBy (dataframe)

2019-06-06 Thread Marcelo Valle
Akshay,

First of all, thanks for the answer. I *am* using monotonically increasing
id, but that's not my problem.
My problem is I want to output 2 tables from 1 data frame, 1 parent table
with ID for the group by and 1 child table with the parent id without the
group by.

I was able to solve this problem by grouping by, generating a parent data
frame with an id, then joining the parent dataframe with the original one
to get a child dataframe with a parent id.

I would like to find a solution without this second join, though.

Thanks,
Marcelo.


On Thu, 6 Jun 2019 at 10:49, Akshay Bhardwaj 
wrote:

> Hi Marcelo,
>
> If you are using spark 2.3+ and dataset API/SparkSQL,you can use this
> inbuilt function "monotonically_increasing_id" in Spark.
> A little tweaking using Spark sql inbuilt functions can enable you to
> achieve this without having to write code or define RDDs with map/reduce
> functions.
>
> Akshay Bhardwaj
> +91-97111-33849
>
>
> On Thu, May 30, 2019 at 4:05 AM Marcelo Valle 
> wrote:
>
>> Hi all,
>>
>> I am new to spark and I am trying to write an application using
>> dataframes that normalize data.
>>
>> So I have a dataframe `denormalized_cities` with 3 columns:  COUNTRY,
>> CITY, CITY_NICKNAME
>>
>> Here is what I want to do:
>>
>>
>>1. Map by country, then for each country generate a new ID and write
>>to a new dataframe `countries`, which would have COUNTRY_ID, COUNTRY -
>>country ID would be generated, probably using 
>> `monotonically_increasing_id`.
>>2. For each country, write several lines on a new dataframe `cities`,
>>which would have COUNTRY_ID, ID, CITY, CITY_NICKNAME. COUNTRY_ID would be
>>the same generated on country table and ID would be another ID I generate.
>>
>> What's the best way to do this, hopefully using only dataframes (no low
>> level RDDs) unless it's not possible?
>>
>> I clearly see a MAP/Reduce process where for each KEY mapped I generate a
>> row in countries table with COUNTRY_ID and for every value I write a row in
>> cities table. But how to implement this in an easy and efficient way?
>>
>> I thought about using a `GroupBy Country` and then using `collect` to
>> collect all values for that country, but then I don't know how to generate
>> the country id and I am not sure about memory efficiency of `collect` for a
>> country with too many cities (bare in mind country/city is just an example,
>> my real entities are different).
>>
>> Could anyone point me to the direction of a good solution?
>>
>> Thanks,
>> Marcelo.
>>
>> This email is confidential [and may be protected by legal privilege]. If
>> you are not the intended recipient, please do not copy or disclose its
>> content but contact the sender immediately upon receipt.
>>
>> KTech Services Ltd is registered in England as company number 10704940.
>>
>> Registered Office: The River Building, 1 Cousin Lane, London EC4R 3TE,
>> United Kingdom
>>
>

This email is confidential [and may be protected by legal privilege]. If you 
are not the intended recipient, please do not copy or disclose its content but 
contact the sender immediately upon receipt.

KTech Services Ltd is registered in England as company number 10704940.

Registered Office: The River Building, 1 Cousin Lane, London EC4R 3TE, United 
Kingdom


adding a column to a groupBy (dataframe)

2019-05-29 Thread Marcelo Valle
Hi all,

I am new to spark and I am trying to write an application using dataframes
that normalize data.

So I have a dataframe `denormalized_cities` with 3 columns:  COUNTRY, CITY,
CITY_NICKNAME

Here is what I want to do:


   1. Map by country, then for each country generate a new ID and write to
   a new dataframe `countries`, which would have COUNTRY_ID, COUNTRY - country
   ID would be generated, probably using `monotonically_increasing_id`.
   2. For each country, write several lines on a new dataframe `cities`,
   which would have COUNTRY_ID, ID, CITY, CITY_NICKNAME. COUNTRY_ID would be
   the same generated on country table and ID would be another ID I generate.

What's the best way to do this, hopefully using only dataframes (no low
level RDDs) unless it's not possible?

I clearly see a MAP/Reduce process where for each KEY mapped I generate a
row in countries table with COUNTRY_ID and for every value I write a row in
cities table. But how to implement this in an easy and efficient way?

I thought about using a `GroupBy Country` and then using `collect` to
collect all values for that country, but then I don't know how to generate
the country id and I am not sure about memory efficiency of `collect` for a
country with too many cities (bare in mind country/city is just an example,
my real entities are different).

Could anyone point me to the direction of a good solution?

Thanks,
Marcelo.

This email is confidential [and may be protected by legal privilege]. If you 
are not the intended recipient, please do not copy or disclose its content but 
contact the sender immediately upon receipt.

KTech Services Ltd is registered in England as company number 10704940.

Registered Office: The River Building, 1 Cousin Lane, London EC4R 3TE, United 
Kingdom