Hi all on user@spark: We are looking for advice and suggestions on how to tune the .repartition() parameter.
We are using Spark Streaming on our data pipeline to consume messages and persist them to a Delta Lake (https://delta.io/learn/getting-started/). We read messages from a Kafka topic, then add a generated date column as a daily partitioning, and save these records to Delta Lake. We have 60 Kafka partitions on the Kafka topic, 15 Spark executor instances (so 4 Kafka partitions per executor). How then, should we use .repartition()? Should we omit this parameter? Or set it to 15? or 4? Our code looks roughly like the below: ``` df = ( spark.readStream.format("kafka") .option("kafka.bootstrap.servers", os.environ["KAFKA_BROKERS"]) .option("subscribe", os.environ["KAFKA_TOPIC"]) .load() ) table = ( df.select( from_protobuf( "value", "table", "/opt/protobuf-desc/table.desc" ).alias("msg") ) .withColumn("uuid", col("msg.uuid")) # etc other columns... # generated column for daily partitioning in Delta Lake .withColumn(CREATED_DATE, date_format(from_unixtime("msg.logged_at"), "yyyy-MM-dd")) .drop("msg") ) query = ( table .repartition(10).writeStream .queryName(APP_NAME) .outputMode("append") .format("delta") .partitionBy(CREATED_DATE) .option("checkpointLocation", os.environ["CHECKPOINT"]) .start(os.environ["DELTA_PATH"]) ) query.awaitTermination() spark.stop() ``` Any advice would be appreciated. -- Best Regards, Shao Yang HONG --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org