Compact them and remove the small files.
One messy way of doing this, (including some cleanup) looks like the following, 
based on rdd.mapPartitions() on the file urls rdd:

import gzip
import json
import logging
import math
import re
from typing import List

import boto3
from mypy_boto3_s3.client import S3Client
from pyspark.sql import SparkSession

import configuration
import data_types.time_series
from data_types.shared import series
from s3_batcher import get_s3_objects_batches
from functools import partial

logger = logging.getLogger(__name__)

session = boto3.session.Session()
s3 = session.resource('s3')

def merge_json_files(file_names, config):
    total = []
    exceptions = []
    for name in file_names:
        try:
            logger.debug(f'Loading {name} ...')
            obj = s3.Object(config.get_non_concatenated_bucket_name(), name)
            body = obj.get()['Body'].read()
            if name.endswith('.gz'):
                body = gzip.decompress(body)

            company_id = re.search('company_id=(\S+)/', name).group(1)
            clazz = config.get_schema_class()
            loaded = json.loads(body)
            obj: series.SchemaFixer = clazz(company_id=company_id, **loaded)
            jason = obj.fix_value()

            total.append(jason)
        except Exception as ex:
            logger.error(f'{name}: {ex}')
            exceptions.append(name)

    if exceptions:
        logger.warning(f'Exceptions: {exceptions}')
    return iter(total)

def get_json_files_list(s3client: S3Client, config: configuration.Config) -> 
List[str]:
    """
        returns [{'Key': '$s3prefix'}, ]
    """
    logger.info('Loading file list')
    files = []
    # aws s3 ls --summarize --human-readable --recursive \
    #   s3://ingest-measurements-20200610130929653000000001/TIME_SERIES/ 
--profile hierarchy_playground > list.txt
    for batch in get_s3_objects_batches(s3client,
                                        
Bucket=config.get_non_concatenated_bucket_name(),
                                        
Prefix=config.get_non_concatenated_prefix()):
        files_batch = [b['Key'] for b in batch if '=' in b['Key']]
        # 
TIME_SERIES/company_id=00224d27-b66f-4b62-bae2-f1399f530d94/60514332-0bc0-4bff-8263-eb6b090b9210.json.gz
        files.extend(files_batch)

    logger.info('Finished listing files')
    return files

def run(spark: SparkSession, config: configuration.Config):
    files = get_json_files_list(boto3.client('s3'), config)
    files_num = len(files)
    logger.info(f'Loaded file list with {files_num} files')

    # logger.info(f'Traversing {files}')

    spark.sparkContext.setJobDescription('Parallelize filenames and read/merge 
files')

    rdd = spark.sparkContext.parallelize(files, math.ceil(files_num / 
config.small_files_in_partition))
    logger.info(f'Got an rdd with {rdd.getNumPartitions()} partitions')
    func = partial(merge_json_files, config=config)
    loaded_rdd = rdd.mapPartitions(func)

    # destination = r'c:\tmp\jsonresult'
    # shutil.rmtree(destination)
    # print(loaded_rdd.take(2))
    loaded_rdd.saveAsTextFile(config.get_concatenated_path())
    # note: these are not sorted by time, will be hard to etl/read
    # result.write.json(config.get_concatenated_path())

    # df = load_json_df(spark, config.source_path, 
config.source_partition_keys, config.input_schema)
    # logger.info(f'Schema is {df.schema.json()}')
    # spark.read.json(destination).show(truncate=50)
    pass
    # logger.info(f'Read 
{spark.read.format("parquet").load(config.get_parquet_dir()).count()} rows from 
parquet')


Boris

From: Sachit Murarka <connectsac...@gmail.com>
Sent: Wednesday, 16 June 2021 21:25
To: spark users <user@spark.apache.org>
Subject: Small file problem

Hello Spark Users,

We are receiving too much small small files. About 3 million. Reading it using 
spark.read itself taking long time and job is not proceeding further.

Is there any way to fasten this and proceed?

Regards
Sachit Murarka

Reply via email to