Hi Raghavendra, Yes, we are trying to reduce the number of files in delta as well (the small file problem [0][1]).
We already have a scheduled app to compact files, but the number of files is still large, at 14K files per day. [0]: https://docs.delta.io/latest/optimizations-oss.html#language-python [1]: https://delta.io/blog/2023-01-25-delta-lake-small-file-compaction-optimize/ On Thu, Oct 5, 2023 at 12:53 PM Raghavendra Ganesh <raghavendr...@gmail.com> wrote: > > Hi, > What is the purpose for which you want to use repartition() .. to reduce the > number of files in delta? > Also note that there is an alternative option of using coalesce() instead of > repartition(). > -- > Raghavendra > > > On Thu, Oct 5, 2023 at 10:15 AM Shao Yang Hong > <shaoyang.h...@ninjavan.co.invalid> wrote: >> >> Hi all on user@spark: >> >> We are looking for advice and suggestions on how to tune the >> .repartition() parameter. >> >> We are using Spark Streaming on our data pipeline to consume messages >> and persist them to a Delta Lake >> (https://delta.io/learn/getting-started/). >> >> We read messages from a Kafka topic, then add a generated date column >> as a daily partitioning, and save these records to Delta Lake. We have >> 60 Kafka partitions on the Kafka topic, 15 Spark executor instances >> (so 4 Kafka partitions per executor). >> >> How then, should we use .repartition()? Should we omit this parameter? >> Or set it to 15? or 4? >> >> Our code looks roughly like the below: >> >> ``` >> df = ( >> spark.readStream.format("kafka") >> .option("kafka.bootstrap.servers", os.environ["KAFKA_BROKERS"]) >> .option("subscribe", os.environ["KAFKA_TOPIC"]) >> .load() >> ) >> >> table = ( >> df.select( >> from_protobuf( >> "value", "table", "/opt/protobuf-desc/table.desc" >> ).alias("msg") >> ) >> .withColumn("uuid", col("msg.uuid")) >> # etc other columns... >> >> # generated column for daily partitioning in Delta Lake >> .withColumn(CREATED_DATE, >> date_format(from_unixtime("msg.logged_at"), "yyyy-MM-dd")) >> .drop("msg") >> ) >> >> query = ( >> table >> .repartition(10).writeStream >> .queryName(APP_NAME) >> .outputMode("append") >> .format("delta") >> .partitionBy(CREATED_DATE) >> .option("checkpointLocation", os.environ["CHECKPOINT"]) >> .start(os.environ["DELTA_PATH"]) >> ) >> >> query.awaitTermination() >> spark.stop() >> ``` >> >> Any advice would be appreciated. >> >> -- >> Best Regards, >> Shao Yang HONG >> >> --------------------------------------------------------------------- >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> -- Best Regards, Shao Yang HONG Software Engineer, Pricing, Tech --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org