djouallah opened a new issue, #16833:
URL: https://github.com/apache/datafusion/issues/16833
### Describe the bug
was doing some testing and notice that datafusion don't seems to be using
all cores in my notebook runtime
### To Reproduce
here is a simplified code
```
import re
import shutil
from urllib.request import urlopen
import os
import requests
import pyarrow.dataset as ds
from pyarrow import csv
from concurrent.futures import ThreadPoolExecutor
from shutil import unpack_archive
from deltalake.writer import write_deltalake
import glob
from psutil import *
from datafusion import SessionContext as session
# nbr of files to process 60 * (nbr_copies +1)
nbr_copies = 1
total_files = 60 * (nbr_copies + 1)
Source = "/lakehouse/default/Files/0_Source/ARCHIVE/Daily_Reports/"
Destination =
"/lakehouse/default/Files/1_Transform/0/ARCHIVE/Daily_Reportsd/"
# Display system information
core = cpu_count()
vCPU = str(core) + " vCPU"
mem = round(virtual_memory().total / (1024 * 1024 * 1024), 0)
print(vCPU + ' Memory:' + str(mem))
def download(url, Path, total_files):
if not os.path.exists(Path):
os.makedirs(Path, exist_ok=True)
result = urlopen(url).read().decode('utf-8')
pattern = re.compile(r'[\w.]*.zip')
filelist1 = pattern.findall(result)
filelist_unique = dict.fromkeys(filelist1)
filelist = sorted(filelist_unique, reverse=True)
current = [os.path.basename(x) for x in glob.glob(Path + '*.zip')]
files_to_upload = list(set(filelist) - set(current))
files_to_upload = list(dict.fromkeys(files_to_upload))[:total_files]
print(str(len(files_to_upload)) + ' New File Loaded')
if len(files_to_upload) != 0:
for x in files_to_upload:
with requests.get(url + x, stream=True) as resp:
if resp.ok:
with open(f"{Path}{x}", "wb") as f:
for chunk in resp.iter_content(chunk_size=4096):
f.write(chunk)
return "done"
def uncompress(source_zip_path):
"""Unzips a single file to the global Destination directory."""
unpack_archive(str(source_zip_path), str(Destination), 'zip')
def unzip(Source, Destination, Nbr_Files_to_Download):
"""Unzips new files from Source to Destination using multiple threads."""
if not os.path.exists(Destination):
os.makedirs(Destination, exist_ok=True)
source_zips = glob.glob(Source + '*.zip')
existing_unzipped = {os.path.basename(f).replace('.CSV', '.zip') for f
in glob.glob(Destination + '*.CSV')}
files_to_unzip = [zip_path for zip_path in source_zips if
os.path.basename(zip_path) not in existing_unzipped]
print(f'{len(files_to_unzip)} New File(s) to uncompress')
if files_to_unzip:
with ThreadPoolExecutor(max_workers=core) as executor:
executor.map(uncompress, files_to_unzip)
return "done"
else:
return "nothing to see here"
def datafusion_clean_csv(files_to_upload_full_Path):
ctx = session()
colum_names = [
'I', 'UNIT', 'XX', 'VERSION', 'SETTLEMENTDATE', 'RUNNO', 'DUID',
'INTERVENTION',
'DISPATCHMODE', 'AGCSTATUS', 'INITIALMW', 'TOTALCLEARED',
'RAMPDOWNRATE', 'RAMPUPRATE',
'LOWER5MIN', 'LOWER60SEC', 'LOWER6SEC', 'RAISE5MIN', 'RAISE60SEC',
'RAISE6SEC',
'MARGINAL5MINVALUE', 'MARGINAL60SECVALUE', 'MARGINAL6SECVALUE',
'MARGINALVALUE',
'VIOLATION5MINDEGREE', 'VIOLATION60SECDEGREE',
'VIOLATION6SECDEGREE', 'VIOLATIONDEGREE',
'LOWERREG', 'RAISEREG', 'AVAILABILITY', 'RAISE6SECFLAGS',
'RAISE60SECFLAGS',
'RAISE5MINFLAGS', 'RAISEREGFLAGS', 'LOWER6SECFLAGS',
'LOWER60SECFLAGS', 'LOWER5MINFLAGS',
'LOWERREGFLAGS', 'RAISEREGAVAILABILITY', 'RAISEREGENABLEMENTMAX',
'RAISEREGENABLEMENTMIN',
'LOWERREGAVAILABILITY', 'LOWERREGENABLEMENTMAX',
'LOWERREGENABLEMENTMIN',
'RAISE6SECACTUALAVAILABILITY', 'RAISE60SECACTUALAVAILABILITY',
'RAISE5MINACTUALAVAILABILITY',
'RAISEREGACTUALAVAILABILITY', 'LOWER6SECACTUALAVAILABILITY',
'LOWER60SECACTUALAVAILABILITY',
'LOWER5MINACTUALAVAILABILITY', 'LOWERREGACTUALAVAILABILITY'
]
ReadOptions = csv.ReadOptions(column_names=colum_names, skip_rows=1)
ParseOptions = csv.ParseOptions(invalid_row_handler=lambda i: "skip")
ConvertOptions = csv.ConvertOptions(strings_can_be_null=True)
format = ds.CsvFileFormat(parse_options=ParseOptions,
convert_options=ConvertOptions, read_options=ReadOptions)
raw = ds.dataset(files_to_upload_full_Path, format=format)
ctx.register_dataset("arrow_dataset", raw)
######################################
df = ctx.sql("""
select * EXCLUDE("I","XX","SETTLEMENTDATE"),
to_timestamp_seconds ("SETTLEMENTDATE",'%Y/%m/%d
%H:%M:%S') as SETTLEMENTDATE,
cast(EXTRACT(YEAR FROM to_timestamp_seconds
("SETTLEMENTDATE",'%Y/%m/%d %H:%M:%S')) as integer) as year
from arrow_dataset where "I" ='D' and "UNIT"='DUNIT' and
"VERSION" ='3'
""").collect()
write_deltalake(f"/lakehouse/default/Tables/T{total_files}/datafusion",
df, mode="overwrite", partition_by=['year'])
return "done"
def duplicate_files(Destination, nbr_copies):
"""Duplicates files in the specified directory."""
if not os.path.exists(Destination):
os.makedirs(Destination, exist_ok=True)
files = [f for f in os.listdir(Destination) if
os.path.isfile(os.path.join(Destination, f))]
if len(files) > nbr_copies * 60:
print("all good, data exists already ")
return
for file in files:
file_path = os.path.join(Destination, file)
name, ext = os.path.splitext(file)
for i in range(1, nbr_copies + 1): # Create nbr_copies copies
new_file = os.path.join(Destination, f"{name}_copy{i}{ext}")
shutil.copy(file_path, new_file)
print(f"Successfully duplicated {len(files)} files {nbr_copies} times
each in '{Destination}'.")
### Main execution starts here ###
download("https://nemweb.com.au/Reports/Current/Daily_Reports/", Source, 60)
unzip(Source, Destination, 60)
duplicate_files(Destination, nbr_copies)
list_files = [os.path.basename(x) for x in glob.glob(Destination + '*.CSV')]
files_to_upload_full_Path = [Destination + i for i in
list_files][:total_files]
datafusion_clean_csv(files_to_upload_full_Path)
print("Data processing completed successfully.")
```
### Expected behavior
_No response_
### Additional context
_No response_
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]