Re: [PySpark Structured Streaming] How to tune .repartition(N) ?

2023-10-04 Thread Shao Yang Hong
Hi Raghavendra,

Yes, we are trying to reduce the number of files in delta as well (the
small file problem [0][1]).

We already have a scheduled app to compact files, but the number of
files is still large, at 14K files per day.

[0]: https://docs.delta.io/latest/optimizations-oss.html#language-python
[1]: https://delta.io/blog/2023-01-25-delta-lake-small-file-compaction-optimize/

On Thu, Oct 5, 2023 at 12:53 PM Raghavendra Ganesh
 wrote:
>
> Hi,
> What is the purpose for which you want to use repartition() .. to reduce the 
> number of files in delta?
> Also note that there is an alternative option of using coalesce() instead of 
> repartition().
> --
> Raghavendra
>
>
> On Thu, Oct 5, 2023 at 10:15 AM Shao Yang Hong 
>  wrote:
>>
>> Hi all on user@spark:
>>
>> We are looking for advice and suggestions on how to tune the
>> .repartition() parameter.
>>
>> We are using Spark Streaming on our data pipeline to consume messages
>> and persist them to a Delta Lake
>> (https://delta.io/learn/getting-started/).
>>
>> We read messages from a Kafka topic, then add a generated date column
>> as a daily partitioning, and save these records to Delta Lake. We have
>> 60 Kafka partitions on the Kafka topic, 15 Spark executor instances
>> (so 4 Kafka partitions per executor).
>>
>> How then, should we use .repartition()? Should we omit this parameter?
>> Or set it to 15? or 4?
>>
>> Our code looks roughly like the below:
>>
>> ```
>> df = (
>> spark.readStream.format("kafka")
>> .option("kafka.bootstrap.servers", os.environ["KAFKA_BROKERS"])
>> .option("subscribe", os.environ["KAFKA_TOPIC"])
>> .load()
>> )
>>
>> table = (
>> df.select(
>> from_protobuf(
>> "value", "table", "/opt/protobuf-desc/table.desc"
>> ).alias("msg")
>> )
>> .withColumn("uuid", col("msg.uuid"))
>> # etc other columns...
>>
>> # generated column for daily partitioning in Delta Lake
>> .withColumn(CREATED_DATE,
>> date_format(from_unixtime("msg.logged_at"), "-MM-dd"))
>> .drop("msg")
>> )
>>
>> query = (
>> table
>> .repartition(10).writeStream
>> .queryName(APP_NAME)
>> .outputMode("append")
>> .format("delta")
>> .partitionBy(CREATED_DATE)
>> .option("checkpointLocation", os.environ["CHECKPOINT"])
>> .start(os.environ["DELTA_PATH"])
>> )
>>
>> query.awaitTermination()
>> spark.stop()
>> ```
>>
>> Any advice would be appreciated.
>>
>> --
>> Best Regards,
>> Shao Yang HONG
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>


-- 
Best Regards,
Shao Yang HONG
Software Engineer, Pricing, Tech

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[PySpark Structured Streaming] How to tune .repartition(N) ?

2023-10-04 Thread Shao Yang Hong
Hi all on user@spark:

We are looking for advice and suggestions on how to tune the
.repartition() parameter.

We are using Spark Streaming on our data pipeline to consume messages
and persist them to a Delta Lake
(https://delta.io/learn/getting-started/).

We read messages from a Kafka topic, then add a generated date column
as a daily partitioning, and save these records to Delta Lake. We have
60 Kafka partitions on the Kafka topic, 15 Spark executor instances
(so 4 Kafka partitions per executor).

How then, should we use .repartition()? Should we omit this parameter?
Or set it to 15? or 4?

Our code looks roughly like the below:

```
df = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", os.environ["KAFKA_BROKERS"])
.option("subscribe", os.environ["KAFKA_TOPIC"])
.load()
)

table = (
df.select(
from_protobuf(
"value", "table", "/opt/protobuf-desc/table.desc"
).alias("msg")
)
.withColumn("uuid", col("msg.uuid"))
# etc other columns...

# generated column for daily partitioning in Delta Lake
.withColumn(CREATED_DATE,
date_format(from_unixtime("msg.logged_at"), "-MM-dd"))
.drop("msg")
)

query = (
table
.repartition(10).writeStream
.queryName(APP_NAME)
.outputMode("append")
.format("delta")
.partitionBy(CREATED_DATE)
.option("checkpointLocation", os.environ["CHECKPOINT"])
.start(os.environ["DELTA_PATH"])
)

query.awaitTermination()
spark.stop()
```

Any advice would be appreciated.

-- 
Best Regards,
Shao Yang HONG

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [PySpark Structured Streaming] How to tune .repartition(N) ?

2023-10-04 Thread Raghavendra Ganesh
Hi,
What is the purpose for which you want to use repartition() .. to reduce
the number of files in delta?
Also note that there is an alternative option of using coalesce() instead
of repartition().
--
Raghavendra


On Thu, Oct 5, 2023 at 10:15 AM Shao Yang Hong
 wrote:

> Hi all on user@spark:
>
> We are looking for advice and suggestions on how to tune the
> .repartition() parameter.
>
> We are using Spark Streaming on our data pipeline to consume messages
> and persist them to a Delta Lake
> (https://delta.io/learn/getting-started/).
>
> We read messages from a Kafka topic, then add a generated date column
> as a daily partitioning, and save these records to Delta Lake. We have
> 60 Kafka partitions on the Kafka topic, 15 Spark executor instances
> (so 4 Kafka partitions per executor).
>
> How then, should we use .repartition()? Should we omit this parameter?
> Or set it to 15? or 4?
>
> Our code looks roughly like the below:
>
> ```
> df = (
> spark.readStream.format("kafka")
> .option("kafka.bootstrap.servers", os.environ["KAFKA_BROKERS"])
> .option("subscribe", os.environ["KAFKA_TOPIC"])
> .load()
> )
>
> table = (
> df.select(
> from_protobuf(
> "value", "table", "/opt/protobuf-desc/table.desc"
> ).alias("msg")
> )
> .withColumn("uuid", col("msg.uuid"))
> # etc other columns...
>
> # generated column for daily partitioning in Delta Lake
> .withColumn(CREATED_DATE,
> date_format(from_unixtime("msg.logged_at"), "-MM-dd"))
> .drop("msg")
> )
>
> query = (
> table
> .repartition(10).writeStream
> .queryName(APP_NAME)
> .outputMode("append")
> .format("delta")
> .partitionBy(CREATED_DATE)
> .option("checkpointLocation", os.environ["CHECKPOINT"])
> .start(os.environ["DELTA_PATH"])
> )
>
> query.awaitTermination()
> spark.stop()
> ```
>
> Any advice would be appreciated.
>
> --
> Best Regards,
> Shao Yang HONG
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


[PySpark Structured Streaming] How to tune .repartition(N) ?

2023-10-04 Thread Shao Yang Hong
Hi all on user@spark:

We are looking for advice and suggestions on how to tune the
.repartition() parameter.

We are using Spark Streaming on our data pipeline to consume messages
and persist them to a Delta Lake
(https://delta.io/learn/getting-started/).

We read messages from a Kafka topic, then add a generated date column
as a daily partitioning, and save these records to Delta Lake. We have
60 Kafka partitions on the Kafka topic, 15 Spark executor instances
(so 4 Kafka partitions per executor).

How then, should we use .repartition()? Should we omit this parameter?
Or set it to 15? or 4?

Our code looks roughly like the below:

```
df = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", os.environ["KAFKA_BROKERS"])
.option("subscribe", os.environ["KAFKA_TOPIC"])
.load()
)

table = (
df.select(
from_protobuf(
"value", "table", "/opt/protobuf-desc/table.desc"
).alias("msg")
)
.withColumn("uuid", col("msg.uuid"))
# etc other columns...

# generated column for daily partitioning in Delta Lake
.withColumn(CREATED_DATE,
date_format(from_unixtime("msg.logged_at"), "-MM-dd"))
.drop("msg")
)

query = (
table
.repartition(10).writeStream
.queryName(APP_NAME)
.outputMode("append")
.format("delta")
.partitionBy(CREATED_DATE)
.option("checkpointLocation", os.environ["CHECKPOINT"])
.start(os.environ["DELTA_PATH"])
)

query.awaitTermination()
spark.stop()
```

Any advice would be appreciated.

-- 
Best Regards,
Shao Yang HONG

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[Spark Core]: Recomputation cost of a job due to executor failures

2023-10-04 Thread Faiz Halde
Hello,

Due to the way Spark implements shuffle, a loss of an executor sometimes
results in the recomputation of partitions that were lost

The definition of a *partition* is the tuple ( RDD-ids, partition id )
RDD-ids is a sequence of RDD ids

In our system, we define the unit of work performed for a job X as

work = count of tasks executed to complete the job X

We want to be able to segregate the *goodput* from this metric

Goodput is defined as - Had there been 0 failures in a cluster, how many
tasks spark had to compute to complete this job

Using the event listener, would the following work?

1. Build a hashmap of type [(RDD-ids, partition), int] with default value =
0
2. For each task T, hashmap[(T.RDD-ids, T.partition-id)] += 1

The assumption here is that spark will never recompute a *partition* twice
( when there are no failures ). Is this assumption true?

So for any entry, a value of greater than 1 means that the particular
partition identified by the tuple ( RDD-ids, partition id ) was recomputed
because spark thought the partition was "lost"

Given the above data structure, the recomputation cost would be
1 - (hashmap.size() / sum(hashmap.values))

Thanks
Faiz