Hi, What is the purpose for which you want to use repartition() .. to reduce the number of files in delta? Also note that there is an alternative option of using coalesce() instead of repartition(). -- Raghavendra
On Thu, Oct 5, 2023 at 10:15 AM Shao Yang Hong <shaoyang.h...@ninjavan.co.invalid> wrote: > Hi all on user@spark: > > We are looking for advice and suggestions on how to tune the > .repartition() parameter. > > We are using Spark Streaming on our data pipeline to consume messages > and persist them to a Delta Lake > (https://delta.io/learn/getting-started/). > > We read messages from a Kafka topic, then add a generated date column > as a daily partitioning, and save these records to Delta Lake. We have > 60 Kafka partitions on the Kafka topic, 15 Spark executor instances > (so 4 Kafka partitions per executor). > > How then, should we use .repartition()? Should we omit this parameter? > Or set it to 15? or 4? > > Our code looks roughly like the below: > > ``` > df = ( > spark.readStream.format("kafka") > .option("kafka.bootstrap.servers", os.environ["KAFKA_BROKERS"]) > .option("subscribe", os.environ["KAFKA_TOPIC"]) > .load() > ) > > table = ( > df.select( > from_protobuf( > "value", "table", "/opt/protobuf-desc/table.desc" > ).alias("msg") > ) > .withColumn("uuid", col("msg.uuid")) > # etc other columns... > > # generated column for daily partitioning in Delta Lake > .withColumn(CREATED_DATE, > date_format(from_unixtime("msg.logged_at"), "yyyy-MM-dd")) > .drop("msg") > ) > > query = ( > table > .repartition(10).writeStream > .queryName(APP_NAME) > .outputMode("append") > .format("delta") > .partitionBy(CREATED_DATE) > .option("checkpointLocation", os.environ["CHECKPOINT"]) > .start(os.environ["DELTA_PATH"]) > ) > > query.awaitTermination() > spark.stop() > ``` > > Any advice would be appreciated. > > -- > Best Regards, > Shao Yang HONG > > --------------------------------------------------------------------- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >