You can try the 'optimize' command of delta lake. That will help you for
sure. It merges small files. Also, it depends on the file format. If you
are working with Parquet then still small files should not cause any issues.

P.

On Thu, Oct 5, 2023 at 10:55 AM Shao Yang Hong
<shaoyang.h...@ninjavan.co.invalid> wrote:

> Hi Raghavendra,
>
> Yes, we are trying to reduce the number of files in delta as well (the
> small file problem [0][1]).
>
> We already have a scheduled app to compact files, but the number of
> files is still large, at 14K files per day.
>
> [0]: https://docs.delta.io/latest/optimizations-oss.html#language-python
> [1]:
> https://delta.io/blog/2023-01-25-delta-lake-small-file-compaction-optimize/
>
> On Thu, Oct 5, 2023 at 12:53 PM Raghavendra Ganesh
> <raghavendr...@gmail.com> wrote:
> >
> > Hi,
> > What is the purpose for which you want to use repartition() .. to reduce
> the number of files in delta?
> > Also note that there is an alternative option of using coalesce()
> instead of repartition().
> > --
> > Raghavendra
> >
> >
> > On Thu, Oct 5, 2023 at 10:15 AM Shao Yang Hong <
> shaoyang.h...@ninjavan.co.invalid> wrote:
> >>
> >> Hi all on user@spark:
> >>
> >> We are looking for advice and suggestions on how to tune the
> >> .repartition() parameter.
> >>
> >> We are using Spark Streaming on our data pipeline to consume messages
> >> and persist them to a Delta Lake
> >> (https://delta.io/learn/getting-started/).
> >>
> >> We read messages from a Kafka topic, then add a generated date column
> >> as a daily partitioning, and save these records to Delta Lake. We have
> >> 60 Kafka partitions on the Kafka topic, 15 Spark executor instances
> >> (so 4 Kafka partitions per executor).
> >>
> >> How then, should we use .repartition()? Should we omit this parameter?
> >> Or set it to 15? or 4?
> >>
> >> Our code looks roughly like the below:
> >>
> >> ```
> >>     df = (
> >>         spark.readStream.format("kafka")
> >>         .option("kafka.bootstrap.servers", os.environ["KAFKA_BROKERS"])
> >>         .option("subscribe", os.environ["KAFKA_TOPIC"])
> >>         .load()
> >>     )
> >>
> >>     table = (
> >>         df.select(
> >>             from_protobuf(
> >>                 "value", "table", "/opt/protobuf-desc/table.desc"
> >>             ).alias("msg")
> >>         )
> >>         .withColumn("uuid", col("msg.uuid"))
> >>         # etc other columns...
> >>
> >>         # generated column for daily partitioning in Delta Lake
> >>         .withColumn(CREATED_DATE,
> >> date_format(from_unixtime("msg.logged_at"), "yyyy-MM-dd"))
> >>         .drop("msg")
> >>     )
> >>
> >>     query = (
> >>         table
> >>         .repartition(10).writeStream
> >>         .queryName(APP_NAME)
> >>         .outputMode("append")
> >>         .format("delta")
> >>         .partitionBy(CREATED_DATE)
> >>         .option("checkpointLocation", os.environ["CHECKPOINT"])
> >>         .start(os.environ["DELTA_PATH"])
> >>     )
> >>
> >>     query.awaitTermination()
> >>     spark.stop()
> >> ```
> >>
> >> Any advice would be appreciated.
> >>
> >> --
> >> Best Regards,
> >> Shao Yang HONG
> >>
> >> ---------------------------------------------------------------------
> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >>
>
>
> --
> Best Regards,
> Shao Yang HONG
> Software Engineer, Pricing, Tech
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

Reply via email to