Hi Raghavendra,

Yes, we are trying to reduce the number of files in delta as well (the
small file problem [0][1]).

We already have a scheduled app to compact files, but the number of
files is still large, at 14K files per day.

[0]: https://docs.delta.io/latest/optimizations-oss.html#language-python
[1]: https://delta.io/blog/2023-01-25-delta-lake-small-file-compaction-optimize/

On Thu, Oct 5, 2023 at 12:53 PM Raghavendra Ganesh
<raghavendr...@gmail.com> wrote:
>
> Hi,
> What is the purpose for which you want to use repartition() .. to reduce the 
> number of files in delta?
> Also note that there is an alternative option of using coalesce() instead of 
> repartition().
> --
> Raghavendra
>
>
> On Thu, Oct 5, 2023 at 10:15 AM Shao Yang Hong 
> <shaoyang.h...@ninjavan.co.invalid> wrote:
>>
>> Hi all on user@spark:
>>
>> We are looking for advice and suggestions on how to tune the
>> .repartition() parameter.
>>
>> We are using Spark Streaming on our data pipeline to consume messages
>> and persist them to a Delta Lake
>> (https://delta.io/learn/getting-started/).
>>
>> We read messages from a Kafka topic, then add a generated date column
>> as a daily partitioning, and save these records to Delta Lake. We have
>> 60 Kafka partitions on the Kafka topic, 15 Spark executor instances
>> (so 4 Kafka partitions per executor).
>>
>> How then, should we use .repartition()? Should we omit this parameter?
>> Or set it to 15? or 4?
>>
>> Our code looks roughly like the below:
>>
>> ```
>>     df = (
>>         spark.readStream.format("kafka")
>>         .option("kafka.bootstrap.servers", os.environ["KAFKA_BROKERS"])
>>         .option("subscribe", os.environ["KAFKA_TOPIC"])
>>         .load()
>>     )
>>
>>     table = (
>>         df.select(
>>             from_protobuf(
>>                 "value", "table", "/opt/protobuf-desc/table.desc"
>>             ).alias("msg")
>>         )
>>         .withColumn("uuid", col("msg.uuid"))
>>         # etc other columns...
>>
>>         # generated column for daily partitioning in Delta Lake
>>         .withColumn(CREATED_DATE,
>> date_format(from_unixtime("msg.logged_at"), "yyyy-MM-dd"))
>>         .drop("msg")
>>     )
>>
>>     query = (
>>         table
>>         .repartition(10).writeStream
>>         .queryName(APP_NAME)
>>         .outputMode("append")
>>         .format("delta")
>>         .partitionBy(CREATED_DATE)
>>         .option("checkpointLocation", os.environ["CHECKPOINT"])
>>         .start(os.environ["DELTA_PATH"])
>>     )
>>
>>     query.awaitTermination()
>>     spark.stop()
>> ```
>>
>> Any advice would be appreciated.
>>
>> --
>> Best Regards,
>> Shao Yang HONG
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>


-- 
Best Regards,
Shao Yang HONG
Software Engineer, Pricing, Tech

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to