I have a Spark job that processes incremental data and partitions it by
customer id. Some customers have very little data, and I have another job
that takes a previous period's data and combines it. However, the job runs
serially and I'd basically like to run the function on every partition
simultaneously. Is that possible? Here's the relevant code:

client = boto3.client('s3')
result = client.list_objects(Bucket=source_bucket, Prefix=source_prefix,
Delimiter='/')
for o in result.get('CommonPrefixes'):
    folder = o.get('Prefix')
    folder_objects = client.list_objects(Bucket=source_bucket,
Prefix=folder)
    size_limit = 1024 * 1024 * 1024 # 1 GB
    small_files = []
    for s3_object in folder_objects.get('Contents'):
        s3_size = s3_object.get('Size')
        s3_key = s3_object.get('Key')
        if s3_size < size_limit:
            small_files.append(['s3a://{}/{}'.format(source_bucket,
s3_key), s3_size])
    if small_files:
        files_list = [x[0] for x in small_files]
        files_size = sum([x[1] for x in small_files])
        num_partitions = int(files_size / size_limit)
        if num_partitions < 1:
            num_partitions = 1
        agg_df = spark.read.parquet(*files_list)
        agg_df = agg_df.repartition(num_partitions)

agg_df.write.mode('overwrite').parquet('s3a://{}/{}/'.format(target_bucket,
folder))

Reply via email to