I have a Spark job that processes incremental data and partitions it by customer id. Some customers have very little data, and I have another job that takes a previous period's data and combines it. However, the job runs serially and I'd basically like to run the function on every partition simultaneously. Is that possible? Here's the relevant code:
client = boto3.client('s3') result = client.list_objects(Bucket=source_bucket, Prefix=source_prefix, Delimiter='/') for o in result.get('CommonPrefixes'): folder = o.get('Prefix') folder_objects = client.list_objects(Bucket=source_bucket, Prefix=folder) size_limit = 1024 * 1024 * 1024 # 1 GB small_files = [] for s3_object in folder_objects.get('Contents'): s3_size = s3_object.get('Size') s3_key = s3_object.get('Key') if s3_size < size_limit: small_files.append(['s3a://{}/{}'.format(source_bucket, s3_key), s3_size]) if small_files: files_list = [x[0] for x in small_files] files_size = sum([x[1] for x in small_files]) num_partitions = int(files_size / size_limit) if num_partitions < 1: num_partitions = 1 agg_df = spark.read.parquet(*files_list) agg_df = agg_df.repartition(num_partitions) agg_df.write.mode('overwrite').parquet('s3a://{}/{}/'.format(target_bucket, folder))