djouallah opened a new issue, #16833:
URL: https://github.com/apache/datafusion/issues/16833

   ### Describe the bug
   
   was doing some testing and notice that datafusion don't seems to be using 
all cores in my notebook runtime
   
   ### To Reproduce
   
   here is a simplified code
   
   ```
   import re
   import shutil
   from   urllib.request import urlopen
   import os
   import requests
   import pyarrow.dataset as ds
   from   pyarrow import csv
   from   concurrent.futures import ThreadPoolExecutor
   from   shutil import unpack_archive
   from   deltalake.writer import write_deltalake
   import glob
   from   psutil import *
   from   datafusion import SessionContext as session
   
   # nbr of files to process 60 * (nbr_copies +1)
   nbr_copies = 1
   total_files = 60 * (nbr_copies + 1)
   Source = "/lakehouse/default/Files/0_Source/ARCHIVE/Daily_Reports/"
   Destination = 
"/lakehouse/default/Files/1_Transform/0/ARCHIVE/Daily_Reportsd/"
   
   # Display system information
   core = cpu_count()
   vCPU = str(core) + " vCPU"
   mem = round(virtual_memory().total / (1024 * 1024 * 1024), 0)
   print(vCPU + ' Memory:' + str(mem))
   
   def download(url, Path, total_files):
       if not os.path.exists(Path):
           os.makedirs(Path, exist_ok=True)
       result = urlopen(url).read().decode('utf-8')
       pattern = re.compile(r'[\w.]*.zip')
       filelist1 = pattern.findall(result)
       filelist_unique = dict.fromkeys(filelist1)
       filelist = sorted(filelist_unique, reverse=True)
       current = [os.path.basename(x) for x in glob.glob(Path + '*.zip')]
       files_to_upload = list(set(filelist) - set(current))
       files_to_upload = list(dict.fromkeys(files_to_upload))[:total_files]
       print(str(len(files_to_upload)) + ' New File Loaded')
       if len(files_to_upload) != 0:
           for x in files_to_upload:
               with requests.get(url + x, stream=True) as resp:
                   if resp.ok:
                       with open(f"{Path}{x}", "wb") as f:
                           for chunk in resp.iter_content(chunk_size=4096):
                               f.write(chunk)
       return "done"
   
   def uncompress(source_zip_path):
       """Unzips a single file to the global Destination directory."""
       unpack_archive(str(source_zip_path), str(Destination), 'zip')
   
   def unzip(Source, Destination, Nbr_Files_to_Download):
       """Unzips new files from Source to Destination using multiple threads."""
       if not os.path.exists(Destination):
           os.makedirs(Destination, exist_ok=True)
       
       source_zips = glob.glob(Source + '*.zip')
       existing_unzipped = {os.path.basename(f).replace('.CSV', '.zip') for f 
in glob.glob(Destination + '*.CSV')}
       files_to_unzip = [zip_path for zip_path in source_zips if 
os.path.basename(zip_path) not in existing_unzipped]
       
       print(f'{len(files_to_unzip)} New File(s) to uncompress')
       
       if files_to_unzip:
           with ThreadPoolExecutor(max_workers=core) as executor:
               executor.map(uncompress, files_to_unzip)
           return "done"
       else:
           return "nothing to see here"
   
   
   def datafusion_clean_csv(files_to_upload_full_Path):
       ctx = session()
       colum_names = [
           'I', 'UNIT', 'XX', 'VERSION', 'SETTLEMENTDATE', 'RUNNO', 'DUID', 
'INTERVENTION',
           'DISPATCHMODE', 'AGCSTATUS', 'INITIALMW', 'TOTALCLEARED', 
'RAMPDOWNRATE', 'RAMPUPRATE',
           'LOWER5MIN', 'LOWER60SEC', 'LOWER6SEC', 'RAISE5MIN', 'RAISE60SEC', 
'RAISE6SEC',
           'MARGINAL5MINVALUE', 'MARGINAL60SECVALUE', 'MARGINAL6SECVALUE', 
'MARGINALVALUE',
           'VIOLATION5MINDEGREE', 'VIOLATION60SECDEGREE', 
'VIOLATION6SECDEGREE', 'VIOLATIONDEGREE',
           'LOWERREG', 'RAISEREG', 'AVAILABILITY', 'RAISE6SECFLAGS', 
'RAISE60SECFLAGS',
           'RAISE5MINFLAGS', 'RAISEREGFLAGS', 'LOWER6SECFLAGS', 
'LOWER60SECFLAGS', 'LOWER5MINFLAGS',
           'LOWERREGFLAGS', 'RAISEREGAVAILABILITY', 'RAISEREGENABLEMENTMAX', 
'RAISEREGENABLEMENTMIN',
           'LOWERREGAVAILABILITY', 'LOWERREGENABLEMENTMAX', 
'LOWERREGENABLEMENTMIN',
           'RAISE6SECACTUALAVAILABILITY', 'RAISE60SECACTUALAVAILABILITY', 
'RAISE5MINACTUALAVAILABILITY',
           'RAISEREGACTUALAVAILABILITY', 'LOWER6SECACTUALAVAILABILITY', 
'LOWER60SECACTUALAVAILABILITY',
           'LOWER5MINACTUALAVAILABILITY', 'LOWERREGACTUALAVAILABILITY'
       ]
   
       ReadOptions = csv.ReadOptions(column_names=colum_names, skip_rows=1)
       ParseOptions = csv.ParseOptions(invalid_row_handler=lambda i: "skip")
       ConvertOptions = csv.ConvertOptions(strings_can_be_null=True)
       format = ds.CsvFileFormat(parse_options=ParseOptions, 
convert_options=ConvertOptions, read_options=ReadOptions)
       raw = ds.dataset(files_to_upload_full_Path, format=format)
       ctx.register_dataset("arrow_dataset", raw)
       ######################################
       df = ctx.sql("""
                       select * EXCLUDE("I","XX","SETTLEMENTDATE"),
                       to_timestamp_seconds ("SETTLEMENTDATE",'%Y/%m/%d 
%H:%M:%S') as SETTLEMENTDATE,
                       cast(EXTRACT(YEAR FROM to_timestamp_seconds 
("SETTLEMENTDATE",'%Y/%m/%d %H:%M:%S')) as integer) as year
                       from arrow_dataset where "I" ='D' and "UNIT"='DUNIT' and 
"VERSION" ='3'
                   """).collect()
       write_deltalake(f"/lakehouse/default/Tables/T{total_files}/datafusion", 
df, mode="overwrite", partition_by=['year'])
       return "done"
   def duplicate_files(Destination, nbr_copies):
       """Duplicates files in the specified directory."""
       if not os.path.exists(Destination):
           os.makedirs(Destination, exist_ok=True)
       
       files = [f for f in os.listdir(Destination) if 
os.path.isfile(os.path.join(Destination, f))]
       if len(files) > nbr_copies * 60:
           print("all good, data exists already ")
           return
       
       for file in files:
           file_path = os.path.join(Destination, file)
           name, ext = os.path.splitext(file)
   
           for i in range(1, nbr_copies + 1):  # Create nbr_copies copies
               new_file = os.path.join(Destination, f"{name}_copy{i}{ext}")
               shutil.copy(file_path, new_file)
       print(f"Successfully duplicated {len(files)} files {nbr_copies} times 
each in '{Destination}'.")
   
   ### Main execution starts here ###
   download("https://nemweb.com.au/Reports/Current/Daily_Reports/";, Source, 60)
   unzip(Source, Destination, 60)
   duplicate_files(Destination, nbr_copies)
   list_files = [os.path.basename(x) for x in glob.glob(Destination + '*.CSV')]
   files_to_upload_full_Path = [Destination + i for i in 
list_files][:total_files]
   datafusion_clean_csv(files_to_upload_full_Path)
   print("Data processing completed successfully.")
   ```
   
   ### Expected behavior
   
   _No response_
   
   ### Additional context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to