djouallah opened a new issue, #16833: URL: https://github.com/apache/datafusion/issues/16833
### Describe the bug was doing some testing and notice that datafusion don't seems to be using all cores in my notebook runtime ### To Reproduce here is a simplified code ``` import re import shutil from urllib.request import urlopen import os import requests import pyarrow.dataset as ds from pyarrow import csv from concurrent.futures import ThreadPoolExecutor from shutil import unpack_archive from deltalake.writer import write_deltalake import glob from psutil import * from datafusion import SessionContext as session # nbr of files to process 60 * (nbr_copies +1) nbr_copies = 1 total_files = 60 * (nbr_copies + 1) Source = "/lakehouse/default/Files/0_Source/ARCHIVE/Daily_Reports/" Destination = "/lakehouse/default/Files/1_Transform/0/ARCHIVE/Daily_Reportsd/" # Display system information core = cpu_count() vCPU = str(core) + " vCPU" mem = round(virtual_memory().total / (1024 * 1024 * 1024), 0) print(vCPU + ' Memory:' + str(mem)) def download(url, Path, total_files): if not os.path.exists(Path): os.makedirs(Path, exist_ok=True) result = urlopen(url).read().decode('utf-8') pattern = re.compile(r'[\w.]*.zip') filelist1 = pattern.findall(result) filelist_unique = dict.fromkeys(filelist1) filelist = sorted(filelist_unique, reverse=True) current = [os.path.basename(x) for x in glob.glob(Path + '*.zip')] files_to_upload = list(set(filelist) - set(current)) files_to_upload = list(dict.fromkeys(files_to_upload))[:total_files] print(str(len(files_to_upload)) + ' New File Loaded') if len(files_to_upload) != 0: for x in files_to_upload: with requests.get(url + x, stream=True) as resp: if resp.ok: with open(f"{Path}{x}", "wb") as f: for chunk in resp.iter_content(chunk_size=4096): f.write(chunk) return "done" def uncompress(source_zip_path): """Unzips a single file to the global Destination directory.""" unpack_archive(str(source_zip_path), str(Destination), 'zip') def unzip(Source, Destination, Nbr_Files_to_Download): """Unzips new files from Source to Destination using multiple threads.""" if not os.path.exists(Destination): os.makedirs(Destination, exist_ok=True) source_zips = glob.glob(Source + '*.zip') existing_unzipped = {os.path.basename(f).replace('.CSV', '.zip') for f in glob.glob(Destination + '*.CSV')} files_to_unzip = [zip_path for zip_path in source_zips if os.path.basename(zip_path) not in existing_unzipped] print(f'{len(files_to_unzip)} New File(s) to uncompress') if files_to_unzip: with ThreadPoolExecutor(max_workers=core) as executor: executor.map(uncompress, files_to_unzip) return "done" else: return "nothing to see here" def datafusion_clean_csv(files_to_upload_full_Path): ctx = session() colum_names = [ 'I', 'UNIT', 'XX', 'VERSION', 'SETTLEMENTDATE', 'RUNNO', 'DUID', 'INTERVENTION', 'DISPATCHMODE', 'AGCSTATUS', 'INITIALMW', 'TOTALCLEARED', 'RAMPDOWNRATE', 'RAMPUPRATE', 'LOWER5MIN', 'LOWER60SEC', 'LOWER6SEC', 'RAISE5MIN', 'RAISE60SEC', 'RAISE6SEC', 'MARGINAL5MINVALUE', 'MARGINAL60SECVALUE', 'MARGINAL6SECVALUE', 'MARGINALVALUE', 'VIOLATION5MINDEGREE', 'VIOLATION60SECDEGREE', 'VIOLATION6SECDEGREE', 'VIOLATIONDEGREE', 'LOWERREG', 'RAISEREG', 'AVAILABILITY', 'RAISE6SECFLAGS', 'RAISE60SECFLAGS', 'RAISE5MINFLAGS', 'RAISEREGFLAGS', 'LOWER6SECFLAGS', 'LOWER60SECFLAGS', 'LOWER5MINFLAGS', 'LOWERREGFLAGS', 'RAISEREGAVAILABILITY', 'RAISEREGENABLEMENTMAX', 'RAISEREGENABLEMENTMIN', 'LOWERREGAVAILABILITY', 'LOWERREGENABLEMENTMAX', 'LOWERREGENABLEMENTMIN', 'RAISE6SECACTUALAVAILABILITY', 'RAISE60SECACTUALAVAILABILITY', 'RAISE5MINACTUALAVAILABILITY', 'RAISEREGACTUALAVAILABILITY', 'LOWER6SECACTUALAVAILABILITY', 'LOWER60SECACTUALAVAILABILITY', 'LOWER5MINACTUALAVAILABILITY', 'LOWERREGACTUALAVAILABILITY' ] ReadOptions = csv.ReadOptions(column_names=colum_names, skip_rows=1) ParseOptions = csv.ParseOptions(invalid_row_handler=lambda i: "skip") ConvertOptions = csv.ConvertOptions(strings_can_be_null=True) format = ds.CsvFileFormat(parse_options=ParseOptions, convert_options=ConvertOptions, read_options=ReadOptions) raw = ds.dataset(files_to_upload_full_Path, format=format) ctx.register_dataset("arrow_dataset", raw) ###################################### df = ctx.sql(""" select * EXCLUDE("I","XX","SETTLEMENTDATE"), to_timestamp_seconds ("SETTLEMENTDATE",'%Y/%m/%d %H:%M:%S') as SETTLEMENTDATE, cast(EXTRACT(YEAR FROM to_timestamp_seconds ("SETTLEMENTDATE",'%Y/%m/%d %H:%M:%S')) as integer) as year from arrow_dataset where "I" ='D' and "UNIT"='DUNIT' and "VERSION" ='3' """).collect() write_deltalake(f"/lakehouse/default/Tables/T{total_files}/datafusion", df, mode="overwrite", partition_by=['year']) return "done" def duplicate_files(Destination, nbr_copies): """Duplicates files in the specified directory.""" if not os.path.exists(Destination): os.makedirs(Destination, exist_ok=True) files = [f for f in os.listdir(Destination) if os.path.isfile(os.path.join(Destination, f))] if len(files) > nbr_copies * 60: print("all good, data exists already ") return for file in files: file_path = os.path.join(Destination, file) name, ext = os.path.splitext(file) for i in range(1, nbr_copies + 1): # Create nbr_copies copies new_file = os.path.join(Destination, f"{name}_copy{i}{ext}") shutil.copy(file_path, new_file) print(f"Successfully duplicated {len(files)} files {nbr_copies} times each in '{Destination}'.") ### Main execution starts here ### download("https://nemweb.com.au/Reports/Current/Daily_Reports/", Source, 60) unzip(Source, Destination, 60) duplicate_files(Destination, nbr_copies) list_files = [os.path.basename(x) for x in glob.glob(Destination + '*.CSV')] files_to_upload_full_Path = [Destination + i for i in list_files][:total_files] datafusion_clean_csv(files_to_upload_full_Path) print("Data processing completed successfully.") ``` ### Expected behavior _No response_ ### Additional context _No response_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org