Re: [PySpark Structured Streaming] How to tune .repartition(N) ?
Hi Raghavendra, Yes, we are trying to reduce the number of files in delta as well (the small file problem [0][1]). We already have a scheduled app to compact files, but the number of files is still large, at 14K files per day. [0]: https://docs.delta.io/latest/optimizations-oss.html#language-python [1]: https://delta.io/blog/2023-01-25-delta-lake-small-file-compaction-optimize/ On Thu, Oct 5, 2023 at 12:53 PM Raghavendra Ganesh wrote: > > Hi, > What is the purpose for which you want to use repartition() .. to reduce the > number of files in delta? > Also note that there is an alternative option of using coalesce() instead of > repartition(). > -- > Raghavendra > > > On Thu, Oct 5, 2023 at 10:15 AM Shao Yang Hong > wrote: >> >> Hi all on user@spark: >> >> We are looking for advice and suggestions on how to tune the >> .repartition() parameter. >> >> We are using Spark Streaming on our data pipeline to consume messages >> and persist them to a Delta Lake >> (https://delta.io/learn/getting-started/). >> >> We read messages from a Kafka topic, then add a generated date column >> as a daily partitioning, and save these records to Delta Lake. We have >> 60 Kafka partitions on the Kafka topic, 15 Spark executor instances >> (so 4 Kafka partitions per executor). >> >> How then, should we use .repartition()? Should we omit this parameter? >> Or set it to 15? or 4? >> >> Our code looks roughly like the below: >> >> ``` >> df = ( >> spark.readStream.format("kafka") >> .option("kafka.bootstrap.servers", os.environ["KAFKA_BROKERS"]) >> .option("subscribe", os.environ["KAFKA_TOPIC"]) >> .load() >> ) >> >> table = ( >> df.select( >> from_protobuf( >> "value", "table", "/opt/protobuf-desc/table.desc" >> ).alias("msg") >> ) >> .withColumn("uuid", col("msg.uuid")) >> # etc other columns... >> >> # generated column for daily partitioning in Delta Lake >> .withColumn(CREATED_DATE, >> date_format(from_unixtime("msg.logged_at"), "-MM-dd")) >> .drop("msg") >> ) >> >> query = ( >> table >> .repartition(10).writeStream >> .queryName(APP_NAME) >> .outputMode("append") >> .format("delta") >> .partitionBy(CREATED_DATE) >> .option("checkpointLocation", os.environ["CHECKPOINT"]) >> .start(os.environ["DELTA_PATH"]) >> ) >> >> query.awaitTermination() >> spark.stop() >> ``` >> >> Any advice would be appreciated. >> >> -- >> Best Regards, >> Shao Yang HONG >> >> - >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> -- Best Regards, Shao Yang HONG Software Engineer, Pricing, Tech - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
[PySpark Structured Streaming] How to tune .repartition(N) ?
Hi all on user@spark: We are looking for advice and suggestions on how to tune the .repartition() parameter. We are using Spark Streaming on our data pipeline to consume messages and persist them to a Delta Lake (https://delta.io/learn/getting-started/). We read messages from a Kafka topic, then add a generated date column as a daily partitioning, and save these records to Delta Lake. We have 60 Kafka partitions on the Kafka topic, 15 Spark executor instances (so 4 Kafka partitions per executor). How then, should we use .repartition()? Should we omit this parameter? Or set it to 15? or 4? Our code looks roughly like the below: ``` df = ( spark.readStream.format("kafka") .option("kafka.bootstrap.servers", os.environ["KAFKA_BROKERS"]) .option("subscribe", os.environ["KAFKA_TOPIC"]) .load() ) table = ( df.select( from_protobuf( "value", "table", "/opt/protobuf-desc/table.desc" ).alias("msg") ) .withColumn("uuid", col("msg.uuid")) # etc other columns... # generated column for daily partitioning in Delta Lake .withColumn(CREATED_DATE, date_format(from_unixtime("msg.logged_at"), "-MM-dd")) .drop("msg") ) query = ( table .repartition(10).writeStream .queryName(APP_NAME) .outputMode("append") .format("delta") .partitionBy(CREATED_DATE) .option("checkpointLocation", os.environ["CHECKPOINT"]) .start(os.environ["DELTA_PATH"]) ) query.awaitTermination() spark.stop() ``` Any advice would be appreciated. -- Best Regards, Shao Yang HONG - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
[PySpark Structured Streaming] How to tune .repartition(N) ?
Hi all on user@spark: We are looking for advice and suggestions on how to tune the .repartition() parameter. We are using Spark Streaming on our data pipeline to consume messages and persist them to a Delta Lake (https://delta.io/learn/getting-started/). We read messages from a Kafka topic, then add a generated date column as a daily partitioning, and save these records to Delta Lake. We have 60 Kafka partitions on the Kafka topic, 15 Spark executor instances (so 4 Kafka partitions per executor). How then, should we use .repartition()? Should we omit this parameter? Or set it to 15? or 4? Our code looks roughly like the below: ``` df = ( spark.readStream.format("kafka") .option("kafka.bootstrap.servers", os.environ["KAFKA_BROKERS"]) .option("subscribe", os.environ["KAFKA_TOPIC"]) .load() ) table = ( df.select( from_protobuf( "value", "table", "/opt/protobuf-desc/table.desc" ).alias("msg") ) .withColumn("uuid", col("msg.uuid")) # etc other columns... # generated column for daily partitioning in Delta Lake .withColumn(CREATED_DATE, date_format(from_unixtime("msg.logged_at"), "-MM-dd")) .drop("msg") ) query = ( table .repartition(10).writeStream .queryName(APP_NAME) .outputMode("append") .format("delta") .partitionBy(CREATED_DATE) .option("checkpointLocation", os.environ["CHECKPOINT"]) .start(os.environ["DELTA_PATH"]) ) query.awaitTermination() spark.stop() ``` Any advice would be appreciated. -- Best Regards, Shao Yang HONG - To unsubscribe e-mail: user-unsubscr...@spark.apache.org