Hi all on user@spark:

We are looking for advice and suggestions on how to tune the
.repartition() parameter.

We are using Spark Streaming on our data pipeline to consume messages
and persist them to a Delta Lake
(https://delta.io/learn/getting-started/).

We read messages from a Kafka topic, then add a generated date column
as a daily partitioning, and save these records to Delta Lake. We have
60 Kafka partitions on the Kafka topic, 15 Spark executor instances
(so 4 Kafka partitions per executor).

How then, should we use .repartition()? Should we omit this parameter?
Or set it to 15? or 4?

Our code looks roughly like the below:

```
    df = (
        spark.readStream.format("kafka")
        .option("kafka.bootstrap.servers", os.environ["KAFKA_BROKERS"])
        .option("subscribe", os.environ["KAFKA_TOPIC"])
        .load()
    )

    table = (
        df.select(
            from_protobuf(
                "value", "table", "/opt/protobuf-desc/table.desc"
            ).alias("msg")
        )
        .withColumn("uuid", col("msg.uuid"))
        # etc other columns...

        # generated column for daily partitioning in Delta Lake
        .withColumn(CREATED_DATE,
date_format(from_unixtime("msg.logged_at"), "yyyy-MM-dd"))
        .drop("msg")
    )

    query = (
        table
        .repartition(10).writeStream
        .queryName(APP_NAME)
        .outputMode("append")
        .format("delta")
        .partitionBy(CREATED_DATE)
        .option("checkpointLocation", os.environ["CHECKPOINT"])
        .start(os.environ["DELTA_PATH"])
    )

    query.awaitTermination()
    spark.stop()
```

Any advice would be appreciated.

-- 
Best Regards,
Shao Yang HONG

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to