Re: [PySpark Structured Streaming] How to tune .repartition(N) ?

2023-10-04 Thread Shao Yang Hong
Hi Raghavendra,

Yes, we are trying to reduce the number of files in delta as well (the
small file problem [0][1]).

We already have a scheduled app to compact files, but the number of
files is still large, at 14K files per day.

[0]: https://docs.delta.io/latest/optimizations-oss.html#language-python
[1]: https://delta.io/blog/2023-01-25-delta-lake-small-file-compaction-optimize/

On Thu, Oct 5, 2023 at 12:53 PM Raghavendra Ganesh
 wrote:
>
> Hi,
> What is the purpose for which you want to use repartition() .. to reduce the 
> number of files in delta?
> Also note that there is an alternative option of using coalesce() instead of 
> repartition().
> --
> Raghavendra
>
>
> On Thu, Oct 5, 2023 at 10:15 AM Shao Yang Hong 
>  wrote:
>>
>> Hi all on user@spark:
>>
>> We are looking for advice and suggestions on how to tune the
>> .repartition() parameter.
>>
>> We are using Spark Streaming on our data pipeline to consume messages
>> and persist them to a Delta Lake
>> (https://delta.io/learn/getting-started/).
>>
>> We read messages from a Kafka topic, then add a generated date column
>> as a daily partitioning, and save these records to Delta Lake. We have
>> 60 Kafka partitions on the Kafka topic, 15 Spark executor instances
>> (so 4 Kafka partitions per executor).
>>
>> How then, should we use .repartition()? Should we omit this parameter?
>> Or set it to 15? or 4?
>>
>> Our code looks roughly like the below:
>>
>> ```
>> df = (
>> spark.readStream.format("kafka")
>> .option("kafka.bootstrap.servers", os.environ["KAFKA_BROKERS"])
>> .option("subscribe", os.environ["KAFKA_TOPIC"])
>> .load()
>> )
>>
>> table = (
>> df.select(
>> from_protobuf(
>> "value", "table", "/opt/protobuf-desc/table.desc"
>> ).alias("msg")
>> )
>> .withColumn("uuid", col("msg.uuid"))
>> # etc other columns...
>>
>> # generated column for daily partitioning in Delta Lake
>> .withColumn(CREATED_DATE,
>> date_format(from_unixtime("msg.logged_at"), "-MM-dd"))
>> .drop("msg")
>> )
>>
>> query = (
>> table
>> .repartition(10).writeStream
>> .queryName(APP_NAME)
>> .outputMode("append")
>> .format("delta")
>> .partitionBy(CREATED_DATE)
>> .option("checkpointLocation", os.environ["CHECKPOINT"])
>> .start(os.environ["DELTA_PATH"])
>> )
>>
>> query.awaitTermination()
>> spark.stop()
>> ```
>>
>> Any advice would be appreciated.
>>
>> --
>> Best Regards,
>> Shao Yang HONG
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>


-- 
Best Regards,
Shao Yang HONG
Software Engineer, Pricing, Tech

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[PySpark Structured Streaming] How to tune .repartition(N) ?

2023-10-04 Thread Shao Yang Hong
Hi all on user@spark:

We are looking for advice and suggestions on how to tune the
.repartition() parameter.

We are using Spark Streaming on our data pipeline to consume messages
and persist them to a Delta Lake
(https://delta.io/learn/getting-started/).

We read messages from a Kafka topic, then add a generated date column
as a daily partitioning, and save these records to Delta Lake. We have
60 Kafka partitions on the Kafka topic, 15 Spark executor instances
(so 4 Kafka partitions per executor).

How then, should we use .repartition()? Should we omit this parameter?
Or set it to 15? or 4?

Our code looks roughly like the below:

```
df = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", os.environ["KAFKA_BROKERS"])
.option("subscribe", os.environ["KAFKA_TOPIC"])
.load()
)

table = (
df.select(
from_protobuf(
"value", "table", "/opt/protobuf-desc/table.desc"
).alias("msg")
)
.withColumn("uuid", col("msg.uuid"))
# etc other columns...

# generated column for daily partitioning in Delta Lake
.withColumn(CREATED_DATE,
date_format(from_unixtime("msg.logged_at"), "-MM-dd"))
.drop("msg")
)

query = (
table
.repartition(10).writeStream
.queryName(APP_NAME)
.outputMode("append")
.format("delta")
.partitionBy(CREATED_DATE)
.option("checkpointLocation", os.environ["CHECKPOINT"])
.start(os.environ["DELTA_PATH"])
)

query.awaitTermination()
spark.stop()
```

Any advice would be appreciated.

-- 
Best Regards,
Shao Yang HONG

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[PySpark Structured Streaming] How to tune .repartition(N) ?

2023-10-04 Thread Shao Yang Hong
Hi all on user@spark:

We are looking for advice and suggestions on how to tune the
.repartition() parameter.

We are using Spark Streaming on our data pipeline to consume messages
and persist them to a Delta Lake
(https://delta.io/learn/getting-started/).

We read messages from a Kafka topic, then add a generated date column
as a daily partitioning, and save these records to Delta Lake. We have
60 Kafka partitions on the Kafka topic, 15 Spark executor instances
(so 4 Kafka partitions per executor).

How then, should we use .repartition()? Should we omit this parameter?
Or set it to 15? or 4?

Our code looks roughly like the below:

```
df = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", os.environ["KAFKA_BROKERS"])
.option("subscribe", os.environ["KAFKA_TOPIC"])
.load()
)

table = (
df.select(
from_protobuf(
"value", "table", "/opt/protobuf-desc/table.desc"
).alias("msg")
)
.withColumn("uuid", col("msg.uuid"))
# etc other columns...

# generated column for daily partitioning in Delta Lake
.withColumn(CREATED_DATE,
date_format(from_unixtime("msg.logged_at"), "-MM-dd"))
.drop("msg")
)

query = (
table
.repartition(10).writeStream
.queryName(APP_NAME)
.outputMode("append")
.format("delta")
.partitionBy(CREATED_DATE)
.option("checkpointLocation", os.environ["CHECKPOINT"])
.start(os.environ["DELTA_PATH"])
)

query.awaitTermination()
spark.stop()
```

Any advice would be appreciated.

-- 
Best Regards,
Shao Yang HONG

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org