Re: [PySpark Structured Streaming] How to tune .repartition(N) ?
Hi Raghavendra, Yes, we are trying to reduce the number of files in delta as well (the small file problem [0][1]). We already have a scheduled app to compact files, but the number of files is still large, at 14K files per day. [0]: https://docs.delta.io/latest/optimizations-oss.html#language-python [1]: https://delta.io/blog/2023-01-25-delta-lake-small-file-compaction-optimize/ On Thu, Oct 5, 2023 at 12:53 PM Raghavendra Ganesh wrote: > > Hi, > What is the purpose for which you want to use repartition() .. to reduce the > number of files in delta? > Also note that there is an alternative option of using coalesce() instead of > repartition(). > -- > Raghavendra > > > On Thu, Oct 5, 2023 at 10:15 AM Shao Yang Hong > wrote: >> >> Hi all on user@spark: >> >> We are looking for advice and suggestions on how to tune the >> .repartition() parameter. >> >> We are using Spark Streaming on our data pipeline to consume messages >> and persist them to a Delta Lake >> (https://delta.io/learn/getting-started/). >> >> We read messages from a Kafka topic, then add a generated date column >> as a daily partitioning, and save these records to Delta Lake. We have >> 60 Kafka partitions on the Kafka topic, 15 Spark executor instances >> (so 4 Kafka partitions per executor). >> >> How then, should we use .repartition()? Should we omit this parameter? >> Or set it to 15? or 4? >> >> Our code looks roughly like the below: >> >> ``` >> df = ( >> spark.readStream.format("kafka") >> .option("kafka.bootstrap.servers", os.environ["KAFKA_BROKERS"]) >> .option("subscribe", os.environ["KAFKA_TOPIC"]) >> .load() >> ) >> >> table = ( >> df.select( >> from_protobuf( >> "value", "table", "/opt/protobuf-desc/table.desc" >> ).alias("msg") >> ) >> .withColumn("uuid", col("msg.uuid")) >> # etc other columns... >> >> # generated column for daily partitioning in Delta Lake >> .withColumn(CREATED_DATE, >> date_format(from_unixtime("msg.logged_at"), "-MM-dd")) >> .drop("msg") >> ) >> >> query = ( >> table >> .repartition(10).writeStream >> .queryName(APP_NAME) >> .outputMode("append") >> .format("delta") >> .partitionBy(CREATED_DATE) >> .option("checkpointLocation", os.environ["CHECKPOINT"]) >> .start(os.environ["DELTA_PATH"]) >> ) >> >> query.awaitTermination() >> spark.stop() >> ``` >> >> Any advice would be appreciated. >> >> -- >> Best Regards, >> Shao Yang HONG >> >> - >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> -- Best Regards, Shao Yang HONG Software Engineer, Pricing, Tech - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
[PySpark Structured Streaming] How to tune .repartition(N) ?
Hi all on user@spark: We are looking for advice and suggestions on how to tune the .repartition() parameter. We are using Spark Streaming on our data pipeline to consume messages and persist them to a Delta Lake (https://delta.io/learn/getting-started/). We read messages from a Kafka topic, then add a generated date column as a daily partitioning, and save these records to Delta Lake. We have 60 Kafka partitions on the Kafka topic, 15 Spark executor instances (so 4 Kafka partitions per executor). How then, should we use .repartition()? Should we omit this parameter? Or set it to 15? or 4? Our code looks roughly like the below: ``` df = ( spark.readStream.format("kafka") .option("kafka.bootstrap.servers", os.environ["KAFKA_BROKERS"]) .option("subscribe", os.environ["KAFKA_TOPIC"]) .load() ) table = ( df.select( from_protobuf( "value", "table", "/opt/protobuf-desc/table.desc" ).alias("msg") ) .withColumn("uuid", col("msg.uuid")) # etc other columns... # generated column for daily partitioning in Delta Lake .withColumn(CREATED_DATE, date_format(from_unixtime("msg.logged_at"), "-MM-dd")) .drop("msg") ) query = ( table .repartition(10).writeStream .queryName(APP_NAME) .outputMode("append") .format("delta") .partitionBy(CREATED_DATE) .option("checkpointLocation", os.environ["CHECKPOINT"]) .start(os.environ["DELTA_PATH"]) ) query.awaitTermination() spark.stop() ``` Any advice would be appreciated. -- Best Regards, Shao Yang HONG - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: [PySpark Structured Streaming] How to tune .repartition(N) ?
Hi, What is the purpose for which you want to use repartition() .. to reduce the number of files in delta? Also note that there is an alternative option of using coalesce() instead of repartition(). -- Raghavendra On Thu, Oct 5, 2023 at 10:15 AM Shao Yang Hong wrote: > Hi all on user@spark: > > We are looking for advice and suggestions on how to tune the > .repartition() parameter. > > We are using Spark Streaming on our data pipeline to consume messages > and persist them to a Delta Lake > (https://delta.io/learn/getting-started/). > > We read messages from a Kafka topic, then add a generated date column > as a daily partitioning, and save these records to Delta Lake. We have > 60 Kafka partitions on the Kafka topic, 15 Spark executor instances > (so 4 Kafka partitions per executor). > > How then, should we use .repartition()? Should we omit this parameter? > Or set it to 15? or 4? > > Our code looks roughly like the below: > > ``` > df = ( > spark.readStream.format("kafka") > .option("kafka.bootstrap.servers", os.environ["KAFKA_BROKERS"]) > .option("subscribe", os.environ["KAFKA_TOPIC"]) > .load() > ) > > table = ( > df.select( > from_protobuf( > "value", "table", "/opt/protobuf-desc/table.desc" > ).alias("msg") > ) > .withColumn("uuid", col("msg.uuid")) > # etc other columns... > > # generated column for daily partitioning in Delta Lake > .withColumn(CREATED_DATE, > date_format(from_unixtime("msg.logged_at"), "-MM-dd")) > .drop("msg") > ) > > query = ( > table > .repartition(10).writeStream > .queryName(APP_NAME) > .outputMode("append") > .format("delta") > .partitionBy(CREATED_DATE) > .option("checkpointLocation", os.environ["CHECKPOINT"]) > .start(os.environ["DELTA_PATH"]) > ) > > query.awaitTermination() > spark.stop() > ``` > > Any advice would be appreciated. > > -- > Best Regards, > Shao Yang HONG > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
[PySpark Structured Streaming] How to tune .repartition(N) ?
Hi all on user@spark: We are looking for advice and suggestions on how to tune the .repartition() parameter. We are using Spark Streaming on our data pipeline to consume messages and persist them to a Delta Lake (https://delta.io/learn/getting-started/). We read messages from a Kafka topic, then add a generated date column as a daily partitioning, and save these records to Delta Lake. We have 60 Kafka partitions on the Kafka topic, 15 Spark executor instances (so 4 Kafka partitions per executor). How then, should we use .repartition()? Should we omit this parameter? Or set it to 15? or 4? Our code looks roughly like the below: ``` df = ( spark.readStream.format("kafka") .option("kafka.bootstrap.servers", os.environ["KAFKA_BROKERS"]) .option("subscribe", os.environ["KAFKA_TOPIC"]) .load() ) table = ( df.select( from_protobuf( "value", "table", "/opt/protobuf-desc/table.desc" ).alias("msg") ) .withColumn("uuid", col("msg.uuid")) # etc other columns... # generated column for daily partitioning in Delta Lake .withColumn(CREATED_DATE, date_format(from_unixtime("msg.logged_at"), "-MM-dd")) .drop("msg") ) query = ( table .repartition(10).writeStream .queryName(APP_NAME) .outputMode("append") .format("delta") .partitionBy(CREATED_DATE) .option("checkpointLocation", os.environ["CHECKPOINT"]) .start(os.environ["DELTA_PATH"]) ) query.awaitTermination() spark.stop() ``` Any advice would be appreciated. -- Best Regards, Shao Yang HONG - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
[Spark Core]: Recomputation cost of a job due to executor failures
Hello, Due to the way Spark implements shuffle, a loss of an executor sometimes results in the recomputation of partitions that were lost The definition of a *partition* is the tuple ( RDD-ids, partition id ) RDD-ids is a sequence of RDD ids In our system, we define the unit of work performed for a job X as work = count of tasks executed to complete the job X We want to be able to segregate the *goodput* from this metric Goodput is defined as - Had there been 0 failures in a cluster, how many tasks spark had to compute to complete this job Using the event listener, would the following work? 1. Build a hashmap of type [(RDD-ids, partition), int] with default value = 0 2. For each task T, hashmap[(T.RDD-ids, T.partition-id)] += 1 The assumption here is that spark will never recompute a *partition* twice ( when there are no failures ). Is this assumption true? So for any entry, a value of greater than 1 means that the particular partition identified by the tuple ( RDD-ids, partition id ) was recomputed because spark thought the partition was "lost" Given the above data structure, the recomputation cost would be 1 - (hashmap.size() / sum(hashmap.values)) Thanks Faiz