Re: Write DataFrame with Partition and choose Filename in PySpark

2023-05-04 Thread Mich Talebzadeh
you can try

df2.coalesce(1).write.mode("overwrite").json("/tmp/pairs.json")

hdfs dfs -ls /tmp/pairs.json
Found 2 items
-rw-r--r--   3 hduser supergroup  0 2023-05-04 22:21
/tmp/pairs.json/_SUCCESS
-rw-r--r--   3 hduser supergroup 96 2023-05-04 22:21
/tmp/pairs.json/part-0-21f12540-c1c6-441d-a9b2-a82ce2113853-c000.json

Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 4 May 2023 at 22:14, Marco Costantini <
marco.costant...@rocketfncl.com> wrote:

> Hi Mich,
> Thank you.
> Are you saying this satisfies my requirement?
>
> On the other hand, I am smelling something going on. Perhaps the Spark
> 'part' files should not be thought of as files, but rather pieces of a
> conceptual file. If that is true, then your approach (of which I'm well
> aware) makes sense. Question: what are some good methods, tools, for
> combining the parts into a single, well-named file? I imagine that is
> outside of the scope of PySpark, but any advice is welcome.
>
> Thank you,
> Marco.
>
> On Thu, May 4, 2023 at 5:05 PM Mich Talebzadeh 
> wrote:
>
>> AWS S3, or Google gs are hadoop compatible file systems (HCFS) , so they
>> do sharding to improve read performance when writing to HCFS file systems.
>>
>> Let us take your code for a drive
>>
>> import findspark
>> findspark.init()
>> from pyspark.sql import SparkSession
>> from pyspark.sql.functions import struct
>> from pyspark.sql.types import *
>> spark = SparkSession.builder \
>> .getOrCreate()
>> pairs = [(1, "a1"), (2, "a2"), (3, "a3")]
>> Schema = StructType([ StructField("ID", IntegerType(), False),
>>   StructField("datA" , StringType(), True)])
>> df = spark.createDataFrame(data=pairs,schema=Schema)
>> df.printSchema()
>> df.show()
>> df2 = df.select(df.ID.alias("ID"), struct(df.datA).alias("Struct"))
>> df2.printSchema()
>> df2.show()
>> df2.write.mode("overwrite").json("/tmp/pairs.json")
>>
>> root
>>  |-- ID: integer (nullable = false)
>>  |-- datA: string (nullable = true)
>>
>> +---++
>> | ID|datA|
>> +---++
>> |  1|  a1|
>> |  2|  a2|
>> |  3|  a3|
>> +---++
>>
>> root
>>  |-- ID: integer (nullable = false)
>>  |-- Struct: struct (nullable = false)
>>  ||-- datA: string (nullable = true)
>>
>> +---+--+
>> | ID|Struct|
>> +---+--+
>> |  1|  {a1}|
>> |  2|  {a2}|
>> |  3|  {a3}|
>> +---+--+
>>
>> Look at the last line where json format is written
>> df2.write.mode("overwrite").json("/tmp/pairs.json")
>> Under the bonnet this happens
>>
>> hdfs dfs -ls /tmp/pairs.json
>> Found 5 items
>> -rw-r--r--   3 hduser supergroup  0 2023-05-04 21:53
>> /tmp/pairs.json/_SUCCESS
>> -rw-r--r--   3 hduser supergroup  0 2023-05-04 21:53
>> /tmp/pairs.json/part-0-0b5780ae-f5b6-47e7-b44b-757948f03c3c-c000.json
>> -rw-r--r--   3 hduser supergroup 32 2023-05-04 21:53
>> /tmp/pairs.json/part-1-0b5780ae-f5b6-47e7-b44b-757948f03c3c-c000.json
>> -rw-r--r--   3 hduser supergroup 32 2023-05-04 21:53
>> /tmp/pairs.json/part-2-0b5780ae-f5b6-47e7-b44b-757948f03c3c-c000.json
>> -rw-r--r--   3 hduser supergroup 32 2023-05-04 21:53
>> /tmp/pairs.json/part-3-0b5780ae-f5b6-47e7-b44b-757948f03c3c-c000.json
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Lead Solutions Architect/Engineering Lead
>> Palantir Technologies Limited
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Thu, 4 May 2023 at 21:38, Marco Costantini <
>> marco.costant...@rocketfncl.com> wrote:
>>
>>> Hello,
>>>
>>> I am testing writing my DataFrame to S3 using the DataFrame `write`
>>> method. It mostly does a great job. However, it fails one of my
>>> requirements. Here are my requirements.
>>>
>>> - Write to S3
>>> - use `partitionBy` to automatically make folders based on my chosen
>>> partition columns
>>> - control the resultant filename (whole or in part)
>>>
>>> I can get the first two requirements met but not the third.
>>>
>>> Here's an example. When I use the commands...
>>>

Re: Write DataFrame with Partition and choose Filename in PySpark

2023-05-04 Thread Marco Costantini
Hi Mich,
Thank you.
Are you saying this satisfies my requirement?

On the other hand, I am smelling something going on. Perhaps the Spark
'part' files should not be thought of as files, but rather pieces of a
conceptual file. If that is true, then your approach (of which I'm well
aware) makes sense. Question: what are some good methods, tools, for
combining the parts into a single, well-named file? I imagine that is
outside of the scope of PySpark, but any advice is welcome.

Thank you,
Marco.

On Thu, May 4, 2023 at 5:05 PM Mich Talebzadeh 
wrote:

> AWS S3, or Google gs are hadoop compatible file systems (HCFS) , so they
> do sharding to improve read performance when writing to HCFS file systems.
>
> Let us take your code for a drive
>
> import findspark
> findspark.init()
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import struct
> from pyspark.sql.types import *
> spark = SparkSession.builder \
> .getOrCreate()
> pairs = [(1, "a1"), (2, "a2"), (3, "a3")]
> Schema = StructType([ StructField("ID", IntegerType(), False),
>   StructField("datA" , StringType(), True)])
> df = spark.createDataFrame(data=pairs,schema=Schema)
> df.printSchema()
> df.show()
> df2 = df.select(df.ID.alias("ID"), struct(df.datA).alias("Struct"))
> df2.printSchema()
> df2.show()
> df2.write.mode("overwrite").json("/tmp/pairs.json")
>
> root
>  |-- ID: integer (nullable = false)
>  |-- datA: string (nullable = true)
>
> +---++
> | ID|datA|
> +---++
> |  1|  a1|
> |  2|  a2|
> |  3|  a3|
> +---++
>
> root
>  |-- ID: integer (nullable = false)
>  |-- Struct: struct (nullable = false)
>  ||-- datA: string (nullable = true)
>
> +---+--+
> | ID|Struct|
> +---+--+
> |  1|  {a1}|
> |  2|  {a2}|
> |  3|  {a3}|
> +---+--+
>
> Look at the last line where json format is written
> df2.write.mode("overwrite").json("/tmp/pairs.json")
> Under the bonnet this happens
>
> hdfs dfs -ls /tmp/pairs.json
> Found 5 items
> -rw-r--r--   3 hduser supergroup  0 2023-05-04 21:53
> /tmp/pairs.json/_SUCCESS
> -rw-r--r--   3 hduser supergroup  0 2023-05-04 21:53
> /tmp/pairs.json/part-0-0b5780ae-f5b6-47e7-b44b-757948f03c3c-c000.json
> -rw-r--r--   3 hduser supergroup 32 2023-05-04 21:53
> /tmp/pairs.json/part-1-0b5780ae-f5b6-47e7-b44b-757948f03c3c-c000.json
> -rw-r--r--   3 hduser supergroup 32 2023-05-04 21:53
> /tmp/pairs.json/part-2-0b5780ae-f5b6-47e7-b44b-757948f03c3c-c000.json
> -rw-r--r--   3 hduser supergroup 32 2023-05-04 21:53
> /tmp/pairs.json/part-3-0b5780ae-f5b6-47e7-b44b-757948f03c3c-c000.json
>
> HTH
>
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies Limited
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 4 May 2023 at 21:38, Marco Costantini <
> marco.costant...@rocketfncl.com> wrote:
>
>> Hello,
>>
>> I am testing writing my DataFrame to S3 using the DataFrame `write`
>> method. It mostly does a great job. However, it fails one of my
>> requirements. Here are my requirements.
>>
>> - Write to S3
>> - use `partitionBy` to automatically make folders based on my chosen
>> partition columns
>> - control the resultant filename (whole or in part)
>>
>> I can get the first two requirements met but not the third.
>>
>> Here's an example. When I use the commands...
>>
>> df.write.partitionBy("year","month").mode("append")\
>> .json('s3a://bucket_name/test_folder/')
>>
>> ... I get the partitions I need. However, the filenames are something
>> like:part-0-0e2e2096-6d32-458d-bcdf-dbf7d74d80fd.c000.json
>>
>>
>> Now, I understand Spark's need to include the partition number in the
>> filename. However, it sure would be nice to control the rest of the file
>> name.
>>
>>
>> Any advice? Please and thank you.
>>
>> Marco.
>>
>


Re: Write DataFrame with Partition and choose Filename in PySpark

2023-05-04 Thread Mich Talebzadeh
AWS S3, or Google gs are hadoop compatible file systems (HCFS) , so they do
sharding to improve read performance when writing to HCFS file systems.

Let us take your code for a drive

import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import struct
from pyspark.sql.types import *
spark = SparkSession.builder \
.getOrCreate()
pairs = [(1, "a1"), (2, "a2"), (3, "a3")]
Schema = StructType([ StructField("ID", IntegerType(), False),
  StructField("datA" , StringType(), True)])
df = spark.createDataFrame(data=pairs,schema=Schema)
df.printSchema()
df.show()
df2 = df.select(df.ID.alias("ID"), struct(df.datA).alias("Struct"))
df2.printSchema()
df2.show()
df2.write.mode("overwrite").json("/tmp/pairs.json")

root
 |-- ID: integer (nullable = false)
 |-- datA: string (nullable = true)

+---++
| ID|datA|
+---++
|  1|  a1|
|  2|  a2|
|  3|  a3|
+---++

root
 |-- ID: integer (nullable = false)
 |-- Struct: struct (nullable = false)
 ||-- datA: string (nullable = true)

+---+--+
| ID|Struct|
+---+--+
|  1|  {a1}|
|  2|  {a2}|
|  3|  {a3}|
+---+--+

Look at the last line where json format is written
df2.write.mode("overwrite").json("/tmp/pairs.json")
Under the bonnet this happens

hdfs dfs -ls /tmp/pairs.json
Found 5 items
-rw-r--r--   3 hduser supergroup  0 2023-05-04 21:53
/tmp/pairs.json/_SUCCESS
-rw-r--r--   3 hduser supergroup  0 2023-05-04 21:53
/tmp/pairs.json/part-0-0b5780ae-f5b6-47e7-b44b-757948f03c3c-c000.json
-rw-r--r--   3 hduser supergroup 32 2023-05-04 21:53
/tmp/pairs.json/part-1-0b5780ae-f5b6-47e7-b44b-757948f03c3c-c000.json
-rw-r--r--   3 hduser supergroup 32 2023-05-04 21:53
/tmp/pairs.json/part-2-0b5780ae-f5b6-47e7-b44b-757948f03c3c-c000.json
-rw-r--r--   3 hduser supergroup 32 2023-05-04 21:53
/tmp/pairs.json/part-3-0b5780ae-f5b6-47e7-b44b-757948f03c3c-c000.json

HTH

Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 4 May 2023 at 21:38, Marco Costantini <
marco.costant...@rocketfncl.com> wrote:

> Hello,
>
> I am testing writing my DataFrame to S3 using the DataFrame `write`
> method. It mostly does a great job. However, it fails one of my
> requirements. Here are my requirements.
>
> - Write to S3
> - use `partitionBy` to automatically make folders based on my chosen
> partition columns
> - control the resultant filename (whole or in part)
>
> I can get the first two requirements met but not the third.
>
> Here's an example. When I use the commands...
>
> df.write.partitionBy("year","month").mode("append")\
> .json('s3a://bucket_name/test_folder/')
>
> ... I get the partitions I need. However, the filenames are something
> like:part-0-0e2e2096-6d32-458d-bcdf-dbf7d74d80fd.c000.json
>
>
> Now, I understand Spark's need to include the partition number in the
> filename. However, it sure would be nice to control the rest of the file
> name.
>
>
> Any advice? Please and thank you.
>
> Marco.
>


Write DataFrame with Partition and choose Filename in PySpark

2023-05-04 Thread Marco Costantini
Hello,

I am testing writing my DataFrame to S3 using the DataFrame `write` method.
It mostly does a great job. However, it fails one of my requirements. Here
are my requirements.

- Write to S3
- use `partitionBy` to automatically make folders based on my chosen
partition columns
- control the resultant filename (whole or in part)

I can get the first two requirements met but not the third.

Here's an example. When I use the commands...

df.write.partitionBy("year","month").mode("append")\
.json('s3a://bucket_name/test_folder/')

... I get the partitions I need. However, the filenames are something
like:part-0-0e2e2096-6d32-458d-bcdf-dbf7d74d80fd.c000.json


Now, I understand Spark's need to include the partition number in the
filename. However, it sure would be nice to control the rest of the file
name.


Any advice? Please and thank you.

Marco.


Re: Write custom JSON from DataFrame in PySpark

2023-05-04 Thread Marco Costantini
Hi Enrico,
What a great answer. Thank you. Seems like I need to get comfortable with
the 'struct' and then I will be golden. Thank you again, friend.

Marco.

On Thu, May 4, 2023 at 3:00 AM Enrico Minack  wrote:

> Hi,
>
> You could rearrange the DataFrame so that writing the DataFrame as-is
> produces your structure:
>
> df = spark.createDataFrame([(1, "a1"), (2, "a2"), (3, "a3")], "id int,
> datA string")
> +---++
> | id|datA|
> +---++
> |  1|  a1|
> |  2|  a2|
> |  3|  a3|
> +---++
>
> df2 = df.select(df.id, struct(df.datA).alias("stuff"))
> root
>   |-- id: integer (nullable = true)
>   |-- stuff: struct (nullable = false)
>   ||-- datA: string (nullable = true)
> +---+-+
> | id|stuff|
> +---+-+
> |  1| {a1}|
> |  2| {a2}|
> |  3| {a3}|
> +---+-+
>
> df2.write.json("data.json")
> {"id":1,"stuff":{"datA":"a1"}}
> {"id":2,"stuff":{"datA":"a2"}}
> {"id":3,"stuff":{"datA":"a3"}}
>
> Looks pretty much like what you described.
>
> Enrico
>
>
> Am 04.05.23 um 06:37 schrieb Marco Costantini:
> > Hello,
> >
> > Let's say I have a very simple DataFrame, as below.
> >
> > +---++
> > | id|datA|
> > +---++
> > |  1|  a1|
> > |  2|  a2|
> > |  3|  a3|
> > +---++
> >
> > Let's say I have a requirement to write this to a bizarre JSON
> > structure. For example:
> >
> > {
> >   "id": 1,
> >   "stuff": {
> > "datA": "a1"
> >   }
> > }
> >
> > How can I achieve this with PySpark? I have only seen the following:
> > - writing the DataFrame as-is (doesn't meet requirement)
> > - using a UDF (seems frowned upon)
> >
> > What I have tried is to do this within a `foreach`. I have had some
> > success, but also some problems with other requirements (serializing
> > other things).
> >
> > Any advice? Please and thank you,
> > Marco.
>
>
>


Re: Write custom JSON from DataFrame in PySpark

2023-05-04 Thread Enrico Minack

Hi,

You could rearrange the DataFrame so that writing the DataFrame as-is
produces your structure:

df = spark.createDataFrame([(1, "a1"), (2, "a2"), (3, "a3")], "id int,
datA string")
+---++
| id|datA|
+---++
|  1|  a1|
|  2|  a2|
|  3|  a3|
+---++

df2 = df.select(df.id, struct(df.datA).alias("stuff"))
root
 |-- id: integer (nullable = true)
 |-- stuff: struct (nullable = false)
 |    |-- datA: string (nullable = true)
+---+-+
| id|stuff|
+---+-+
|  1| {a1}|
|  2| {a2}|
|  3| {a3}|
+---+-+

df2.write.json("data.json")
{"id":1,"stuff":{"datA":"a1"}}
{"id":2,"stuff":{"datA":"a2"}}
{"id":3,"stuff":{"datA":"a3"}}

Looks pretty much like what you described.

Enrico


Am 04.05.23 um 06:37 schrieb Marco Costantini:

Hello,

Let's say I have a very simple DataFrame, as below.

+---++
| id|datA|
+---++
|  1|  a1|
|  2|  a2|
|  3|  a3|
+---++

Let's say I have a requirement to write this to a bizarre JSON
structure. For example:

{
  "id": 1,
  "stuff": {
    "datA": "a1"
  }
}

How can I achieve this with PySpark? I have only seen the following:
- writing the DataFrame as-is (doesn't meet requirement)
- using a UDF (seems frowned upon)

What I have tried is to do this within a `foreach`. I have had some
success, but also some problems with other requirements (serializing
other things).

Any advice? Please and thank you,
Marco.




-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org