This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow-site-archive.git
The following commit(s) were added to refs/heads/main by this push: new 65dfac67f0 Make staging/live choice instead of free-type and fix full sync (#13) 65dfac67f0 is described below commit 65dfac67f0def2df0d43376b67df45abe4db0026 Author: Jarek Potiuk <ja...@potiuk.com> AuthorDate: Sat May 10 21:10:40 2025 +0200 Make staging/live choice instead of free-type and fix full sync (#13) --- .github/workflows/github-to-s3.yml | 103 +++++++++++++++------ .github/workflows/s3-to-github.yml | 103 +++++++++++++++++---- .../apache-airflow-providers-amazon/stable.txt | 1 + .../stable.txt | 1 + .../apache-airflow-providers-fab/stable.txt | 1 + .../apache-airflow-providers-standard/stable.txt | 1 + scripts/github_to_s3.py | 83 +++++++++-------- scripts/s3_to_github.py | 51 +++++----- scripts/transfer_utils.py | 89 ++++++++++++------ 9 files changed, 297 insertions(+), 136 deletions(-) diff --git a/.github/workflows/github-to-s3.yml b/.github/workflows/github-to-s3.yml index e0249841d3..5c5fe31daf 100644 --- a/.github/workflows/github-to-s3.yml +++ b/.github/workflows/github-to-s3.yml @@ -22,13 +22,16 @@ on: # yamllint disable-line rule:truthy inputs: destination-location: description: "The destination location in S3" - required: false + required: true + type: choice + options: + - s3://live-docs-airflow-apache-org/docs/ + - s3://staging-docs-airflow-apache-org/docs/ default: "s3://live-docs-airflow-apache-org/docs/" - type: string - document-folder: - description: "Provide any specific package document folder to sync" + document-folders: + description: "Provide any specific package document folders to sync - space separated" required: false - default: "" + default: "all" type: string sync-type: description: "Perform a full sync or just sync the last commit" @@ -46,13 +49,21 @@ on: # yamllint disable-line rule:truthy processes: description: "Number of processes to use for syncing" required: false - default: "8" + default: "4" type: string jobs: github-to-s3: name: GitHub to S3 runs-on: ubuntu-latest steps: + - name: Summarize parameters + run: | + echo "Destination location: ${{ inputs.destination-location }}" + echo "Document folders: ${{ inputs.document-folders }}" + echo "Sync type: ${{ inputs.sync-type }}" + echo "Commit SHA: ${{ inputs.commit-sha }}" + echo "Processes: ${{ inputs.processes }}" + - name: Setup Python uses: actions/setup-python@v4 with: @@ -95,56 +106,88 @@ jobs: .github scripts - - name: Create /mnt/airflow-site-archive directory + - name: Create /mnt/cloned-airflow-site-archive directory run: | - sudo mkdir -pv /mnt/airflow-site-archive - sudo chown -R "${USER}" /mnt/airflow-site-archive - ln -v -s /mnt/airflow-site-archive ./airflow-site-archive + sudo mkdir -pv /mnt/cloned-airflow-site-archive + sudo chown -R "${USER}" /mnt/cloned-airflow-site-archive + ln -v -s /mnt/cloned-airflow-site-archive ./cloned-airflow-site-archive - - name: Pre-process docs folder + - name: Pre-process docs folders env: - DOCUMENTS_FOLDER: ${{ inputs.document-folder }} - id: docs-folder + DOCUMENTS_FOLDERS: ${{ inputs.document-folders }} + id: docs-folders-processed run: | - echo "docs-folder=${DOCUMENTS_FOLDER}" >> ${GITHUB_OUTPUT} - if [[ "${DOCUMENTS_FOLDER}" != "" ]]; then - echo "Preprocessing docs folder: ${DOCUMENTS_FOLDER}" - if [[ "${DOCUMENTS_FOLDER}" != apache-airflow-providers* ]]; then - echo "docs-folder=apache-airflow-providers-${DOCUMENTS_FOLDER/./-}" >> ${GITHUB_OUTPUT} - fi + echo "sparse-checkout<<EOF" >> ${GITHUB_OUTPUT} + if [[ "${DOCUMENTS_FOLDERS}" != "all" ]]; then + echo "Preprocessing docs folders: ${DOCUMENTS_FOLDERS}" + folders="" + sparse_checkout="" + separator="" + for folder in ${DOCUMENTS_FOLDERS}; do + if [[ "${folder}" != apache-airflow-providers* ]]; then + folders="${folders}${separator}apache-airflow-providers-${folder/./-}" + echo "docs-archive/apache-airflow-providers-${folder/./-}" >> ${GITHUB_OUTPUT} + else + folders="${folders}${separator}${folder}" + echo "docs-archive/${folder}" >> ${GITHUB_OUTPUT} + fi + separator=" " + done + else + folders="all" + echo "docs-archive" >> ${GITHUB_OUTPUT} fi - + echo "EOF" >> ${GITHUB_OUTPUT} + echo "docs-folders-processed=${folders}" + echo "docs-folders-processed=${folders}" >> ${GITHUB_OUTPUT} - name: > - Checkout (${{ inputs.commit-sha || github.sha }}) to /mnt/airflow-site-archive - with docs: ${{ steps.docs-folder.outputs.docs-folder }}" + Checkout (${{ inputs.commit-sha || github.sha }}) to /mnt/cloned-airflow-site-archive + with docs: ${{ steps.docs-folders-processed.outputs.docs-folders-processed }} uses: actions/checkout@v4 with: - path: ./airflow-site-archive + path: ./cloned-airflow-site-archive fetch-depth: 2 sparse-checkout: | - docs-archive/${{ steps.docs-folder.outputs.docs-folder }} + ${{ steps.docs-folders-processed.outputs.sparse-checkout }} + ref: ${{ inputs.commit-sha || github.sha }} + if: steps.docs-folders-processed.outputs.docs-folders-processed != 'all' + + - name: > + Checkout (${{ inputs.commit-sha || github.sha }}) to /mnt/cloned-airflow-site-archive (whole repo) + uses: actions/checkout@v4 + with: + path: ./cloned-airflow-site-archive + fetch-depth: 2 ref: ${{ inputs.commit-sha || github.sha }} + if: steps.docs-folders-processed.outputs.docs-folders-processed == 'all' - name: > - Syncing ${{ inputs.commit-sha || github.sha }}: - ${{ inputs.sync-type }} ${{ steps.docs-folder.outputs.docs-folder }}" + Syncing ${{ inputs.commit-sha || github.sha }}: ${{ inputs.destination-location }}: + ${{ inputs.sync-type }} ${{ steps.docs-folders-processed.outputs.docs-folders-processed }} + wih parallel aws cli methods = ${{ inputs.processes }} env: COMMIT_SHA: ${{ inputs.commit-sha || github.sha }} SYNC_TYPE: ${{ inputs.sync-type }} PROCESSES: ${{ inputs.processes }} - DOCUMENTS_FOLDER: ${{ steps.docs-folder.outputs.docs-folder }} + DOCUMENTS_FOLDERS: ${{ steps.docs-folders-processed.outputs.docs-folders-processed }} DESTINATION_LOCATION: ${{ inputs.destination-location }} run: | + # show what's being run + set -x if [[ "${SYNC_TYPE}" == "single_commit" ]]; then echo "Syncing ${COMMIT_SHA}" else echo "Syncing whole repo" fi - ls -la /mnt/airflow-site-archive/* + ls -la /mnt/cloned-airflow-site-archive/* python3 -m pip install uv + # we run inputs.processes aws cli commands - each command uploading files in parallel + # that seems to be the fastest way to upload files to S3 + aws configure set default.s3.max_concurrent_requests 10 uv run ./scripts/github_to_s3.py \ --bucket-path ${DESTINATION_LOCATION} \ - --local-path /mnt/airflow-site-archive/docs-archive \ - --document-folder "${DOCUMENTS_FOLDER}" \ + --local-path /mnt/cloned-airflow-site-archive/docs-archive \ + --document-folders "${DOCUMENTS_FOLDERS}" \ --commit-sha ${COMMIT_SHA} --sync-type ${SYNC_TYPE} \ --processes ${PROCESSES} + diff --git a/.github/workflows/s3-to-github.yml b/.github/workflows/s3-to-github.yml index 4070192fca..97f1f0ed1f 100644 --- a/.github/workflows/s3-to-github.yml +++ b/.github/workflows/s3-to-github.yml @@ -21,30 +21,40 @@ on: # yamllint disable-line rule:truthy workflow_dispatch: inputs: source-location: - description: "The destination location in S3" - required: false + description: "The source location in S3" + required: true + type: choice + options: + - s3://live-docs-airflow-apache-org/docs/ + - s3://staging-docs-airflow-apache-org/docs/ default: "s3://live-docs-airflow-apache-org/docs/" - type: string - local-destination: - description: "The local destination location" + document-folders: + description: "Document folders to sync or short package ids (separated with spaces)" required: false - default: "./docs-archive" + default: "all" type: string - document-folder: - description: "Provide any specific package document folder to sync" + commit-changes: + description: "Commit changes to GitHub (forced to false if not main)" required: false - default: "" - type: string + default: true + type: boolean processes: description: "Number of processes to use for syncing" required: false - default: "8" + default: "4" type: string jobs: s3-to-github: name: S3 to GitHub runs-on: ubuntu-latest steps: + - name: Summarize parameters + run: | + echo "Source location: ${{ inputs.source-location }}" + echo "Document folders: ${{ inputs.document-folders }}" + echo "Commit changes: ${{ inputs.commit-changes }}" + echo "Processes: ${{ inputs.processes }}" + - name: Setup Python uses: actions/setup-python@v4 with: @@ -65,32 +75,84 @@ jobs: aws-secret-access-key: ${{ secrets.DOCS_AWS_SECRET_ACCESS_KEY }} aws-region: us-east-2 + - name: Pre-process docs folders + env: + DOCUMENTS_FOLDERS: ${{ inputs.document-folders }} + id: docs-folders-processed + run: | + echo "sparse-checkout<<EOF" >> ${GITHUB_OUTPUT} + echo ".github" >> ${GITHUB_OUTPUT} + echo "scripts" >> ${GITHUB_OUTPUT} + if [[ "${DOCUMENTS_FOLDERS}" != "all" ]]; then + echo "Preprocessing docs folders: ${DOCUMENTS_FOLDERS}" + folders="" + sparse_checkout="" + separator="" + for folder in ${DOCUMENTS_FOLDERS}; do + if [[ "${folder}" != apache-airflow-providers* ]]; then + folders="${folders}${separator}apache-airflow-providers-${folder/./-}" + echo "docs-archive/apache-airflow-providers-${folder/./-}" >> ${GITHUB_OUTPUT} + else + folders="${folders}${separator}${folder}" + echo "docs-archive/${folder}" >> ${GITHUB_OUTPUT} + fi + separator=" " + done + else + folders="all" + echo "docs-archive" >> ${GITHUB_OUTPUT} + fi + echo "EOF" >> ${GITHUB_OUTPUT} + echo "docs-folders-processed=${folders}" + echo "docs-folders-processed=${folders}" >> ${GITHUB_OUTPUT} + - name: Create /mnt/cloned-airflow-site-archive directory run: | sudo mkdir -pv /mnt/cloned-airflow-site-archive && sudo chown -R "${USER}" /mnt/cloned-airflow-site-archive ln -sv /mnt/cloned-airflow-site-archive cloned-airflow-site-archive - - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" + - name: > + Checkout (${{ inputs.commit-sha || github.sha }}) to /mnt/cloned-airflow-site-archive + with docs: ${{ steps.docs-folders-processed.outputs.docs-folders-processed }} + uses: actions/checkout@v4 + with: + path: ./cloned-airflow-site-archive + fetch-depth: 1 + sparse-checkout: | + ${{ steps.docs-folders-processed.outputs.sparse-checkout }} + if: steps.docs-folders-processed.outputs.docs-folders-processed != 'all' + + - name: > + Checkout (${{ inputs.commit-sha || github.sha }}) to /mnt/cloned-airflow-site-archive (whole repo) uses: actions/checkout@v4 with: - # we want to persist credentials so that we can push back the changes - persist-credentials: true - path: cloned-airflow-site-archive + path: ./cloned-airflow-site-archive + fetch-depth: 1 + if: steps.docs-folders-processed.outputs.docs-folders-processed == 'all' - name: "Check space available" run: df -h - - name: Syncing + - name: Syncing ${{inputs.source-location}} (${{ inputs.document-folders }}) + env: + PROCESSES: ${{ inputs.processes }} run: | + set -x python3 -m pip install uv + aws configure set default.s3.max_concurrent_requests 10 uv run ./scripts/s3_to_github.py \ --bucket-path ${{inputs.source-location}} \ - --local-path ${{inputs.local-destination}} \ - --document-folder "${{inputs.document-folder}}" \ - --processes ${{inputs.processes}} + --local-path ./docs-archive \ + --document-folders "${{inputs.document-folders}}" \ + --processes "${{inputs.processes}}" + working-directory: /mnt/cloned-airflow-site-archive + + - name: Diff summary + run : | + git diff --stat working-directory: /mnt/cloned-airflow-site-archive - - name: Commiting changes + - name: Committing changes run: | echo "Running git config" git config user.name "GitHub Actions" @@ -101,3 +163,4 @@ jobs: git commit -m "Sync S3 to GitHub" || echo "No changes to commit" git push --force origin main working-directory: /mnt/cloned-airflow-site-archive + if: inputs.commit-changes == 'true' && github.ref == 'refs/heads/main' diff --git a/docs-archive/apache-airflow-providers-amazon/stable.txt b/docs-archive/apache-airflow-providers-amazon/stable.txt new file mode 100644 index 0000000000..4e9f7fdb58 --- /dev/null +++ b/docs-archive/apache-airflow-providers-amazon/stable.txt @@ -0,0 +1 @@ +9.6.1 \ No newline at end of file diff --git a/docs-archive/apache-airflow-providers-common-compat/stable.txt b/docs-archive/apache-airflow-providers-common-compat/stable.txt new file mode 100644 index 0000000000..ce6a70b9d8 --- /dev/null +++ b/docs-archive/apache-airflow-providers-common-compat/stable.txt @@ -0,0 +1 @@ +1.6.0 \ No newline at end of file diff --git a/docs-archive/apache-airflow-providers-fab/stable.txt b/docs-archive/apache-airflow-providers-fab/stable.txt new file mode 100644 index 0000000000..10bf840ed5 --- /dev/null +++ b/docs-archive/apache-airflow-providers-fab/stable.txt @@ -0,0 +1 @@ +2.0.1 \ No newline at end of file diff --git a/docs-archive/apache-airflow-providers-standard/stable.txt b/docs-archive/apache-airflow-providers-standard/stable.txt new file mode 100644 index 0000000000..60a2d3e96c --- /dev/null +++ b/docs-archive/apache-airflow-providers-standard/stable.txt @@ -0,0 +1 @@ +0.4.0 \ No newline at end of file diff --git a/scripts/github_to_s3.py b/scripts/github_to_s3.py index b35bafb042..4b78226d54 100644 --- a/scripts/github_to_s3.py +++ b/scripts/github_to_s3.py @@ -31,7 +31,7 @@ from pathlib import Path from rich.console import Console -from transfer_utils import CommonTransferUtils +from transfer_utils import CommonTransferUtils, convert_short_name_to_folder_name, sort_priority_folders console = Console(width=200, color_system="standard") @@ -40,7 +40,7 @@ class GithubToS3(CommonTransferUtils): super().__init__(bucket, local_path) @staticmethod - def fetch_commit_files(commit_sha, diff_filter="ACM"): + def fetch_commit_files(commit_sha: str, diff_filter: str="ACM"): console.print(f"[blue] Fetching files from last commit {commit_sha} [/]") cmd = [ "git", @@ -56,9 +56,9 @@ class GithubToS3(CommonTransferUtils): if result.returncode != 0: console.print( - f"[warning] Error when running diff-tree command [/]\n{result.stdout}\n{result.stderr}" + f"[error] Error when running diff-tree command [/]\n{result.stdout}\n{result.stderr}" ) - return [] + sys.exit(1) return result.stdout.splitlines() if result.stdout else [] def sync_single_commit_files(self, commit_sha: str, processes: int): @@ -91,28 +91,29 @@ class GithubToS3(CommonTransferUtils): self.run_with_pool(self.remove, delete_files_pool_args, processes=processes) self.run_with_pool(self.copy, copy_files_pool_args, processes=processes) - def full_sync(self, processes: int): - console.print(f"[blue] Syncing all files from {self.local_path} to {self.bucket_name} [/]") - list_of_folders = os.listdir(self.local_path) + def full_sync(self, processes: int, folders: list[str] | None = None): + if folders: + console.print(f"[blue] Syncing folders {folders} from {self.local_path} to {self.bucket_name} [/]") + else: + console.print(f"[blue] Syncing all files from {self.local_path} to {self.bucket_name} [/]") + list_of_folders = os.listdir(self.local_path) if not folders else folders pool_args = [] - for folder in list_of_folders: + for folder in sort_priority_folders(list_of_folders): source = os.path.join(self.local_path, folder) dest = f"s3://{self.bucket_name}/{self.prefix}".rstrip("/") + "/" + folder pool_args.append((source, dest)) self.run_with_pool(self.sync, pool_args, processes=processes) -def convert_short_name_to_folder_name(short_name: str): - if not short_name.startswith("apache-airflow-providers-"): - return f"apache-airflow-providers-{short_name.replace('.', '-')}" - return short_name if __name__ == "__main__": parser = argparse.ArgumentParser(description="Sync GitHub to S3") parser.add_argument("--bucket-path", required=True, help="S3 bucket name with path") parser.add_argument("--local-path", required=True, help="local path to sync") - parser.add_argument("--document-folder", help="Document folder to sync (or short provider-id)", default="") + parser.add_argument("--document-folders", help="Document folders to sync " + "(or short provider-ids) separated with spaces " + "('all' means all folders)", default="") parser.add_argument("--commit-sha", help="Commit SHA to sync", default="") parser.add_argument("--sync-type", help="Sync type", default="single_commit") parser.add_argument("--processes", help="Number of processes", type=int, default=8) @@ -122,30 +123,36 @@ if __name__ == "__main__": syncer = GithubToS3(bucket=args.bucket_path, local_path=args.local_path) syncer.check_bucket() - document_folder = args.document_folder - - if document_folder and document_folder != "": - full_local_path = Path(f"{args.local_path}/{document_folder}") - if not full_local_path.exists(): - full_local_path = Path(f"{args.local_path}/{convert_short_name_to_folder_name(document_folder)}") - if full_local_path.exists(): - console.print(f"[blue] Document folder {document_folder} exists in bucket {args.bucket_path}.[/]") - - destination = f"s3://{syncer.bucket_name}/{syncer.prefix}".rstrip("/") + "/" + document_folder - syncer.sync(source=full_local_path.as_posix(), destination=destination) - sys.exit(0) - else: - console.print(f"[red] Document folder {full_local_path} does not exist.[/]") - sys.exit(1) - - if args.sync_type == "single_commit" and args.commit_sha: + document_folders = args.document_folders + # Make sure you are in the right directory for git commands + os.chdir(Path(args.local_path).parent.as_posix()) + # Force color + os.environ["FORCE_COLOR"] = "1" + + if document_folders != "all" and args.sync_type == "single_commit": + console.print(f"[red] Invalid folder name {document_folders} for sync type {args.sync_type} - only " + f"all can be used with single_commit[/]") + sys.exit(1) + + if document_folders and document_folders != "all" and args.sync_type == "full_sync": + folders_to_sync = [] + for _folder in document_folders.split(" "): + full_local_path = Path(f"{args.local_path}/{_folder}") + if not full_local_path.exists(): + full_local_path = Path(f"{args.local_path}/{convert_short_name_to_folder_name(_folder)}") + if full_local_path.exists(): + console.print(f"[blue] Document folder {_folder} exists in bucket {args.bucket_path}.[/]") + folders_to_sync.append(_folder) + else: + console.print(f"[red] Document folder {full_local_path} does not exist.[/]") + sys.exit(1) + syncer.full_sync(processes=int(args.processes), folders=folders_to_sync) + elif args.sync_type == "full_sync": + syncer.full_sync(processes=int(args.processes)) + elif args.sync_type == "single_commit" and args.commit_sha and document_folders == "all": console.print(f"[blue] Syncing last commit {args.commit_sha} from {args.local_path} [/]") syncer.sync_single_commit_files(args.commit_sha, processes=int(args.processes)) - sys.exit(0) - - if args.sync_type == "full_sync": - syncer.full_sync(processes=int(args.processes)) - sys.exit(0) - - console.print(f"[red] Invalid sync type {args.sync_type} [/]") - + else: + console.print(f"[red] Invalid sync type {args.sync_type} with document folders {document_folders} " + f"and commit sha {args.commit_sha}[/]") + sys.exit(1) diff --git a/scripts/s3_to_github.py b/scripts/s3_to_github.py index f8240c4703..738586a3aa 100644 --- a/scripts/s3_to_github.py +++ b/scripts/s3_to_github.py @@ -22,10 +22,12 @@ # "rich>=14.0.0", # ] # /// +import os +from pathlib import Path from rich.console import Console -from transfer_utils import CommonTransferUtils +from transfer_utils import CommonTransferUtils, convert_short_name_to_folder_name, sort_priority_folders console = Console(width=200, color_system="standard") @@ -35,23 +37,22 @@ import sys class S3TOGithub(CommonTransferUtils): - def __init__(self, bucket, local_path): + def __init__(self, bucket:str , local_path: str): super().__init__(bucket, local_path) - def verify_document_folder(self, document_folder): + def verify_document_folder(self, document_folder: str): response= self.s3_client.list_objects_v2( Bucket=self.bucket_name, Prefix=self.prefix.rstrip("/") + "/" + document_folder, ) return response["KeyCount"] > 0 - def sync_to_s3(self, processes: int): + def sync_to_s3(self, processes: int, folders: list[str] | None = None): console.print("[blue] Syncing files from S3 to GitHub...[/]") - prefixes = self.get_list_of_folders() + prefixes = self.get_list_of_folders() if not folders else folders pool_args = [] for pref in prefixes: source_bucket_path = f"s3://{self.bucket_name}/{pref}" - # we want to store the files in the github under docs-archive/ destination = self.local_path + pref.replace("docs/", "") pool_args.append((source_bucket_path, destination)) @@ -64,28 +65,34 @@ if __name__ == "__main__": parser = argparse.ArgumentParser(description="Sync S3 to GitHub") parser.add_argument("--bucket-path", required=True, help="S3 bucket name with path") parser.add_argument("--local-path", required=True, help="local path to sync") - parser.add_argument("--document-folder", help="Document folder to sync", default="") + parser.add_argument("--document-folders", help="Document folders to sync", default="all") parser.add_argument("--processes", help="Number of processes", type=int, default=8) args = parser.parse_args() syncer = S3TOGithub(bucket=args.bucket_path, local_path=args.local_path) syncer.check_bucket() - document_folder = args.document_folder - - if document_folder and document_folder != "": - if syncer.verify_document_folder(document_folder): - console.print(f"[blue] Document folder {document_folder} exists in bucket {args.bucket_path}.[/]") - source_prefix = syncer.prefix.rstrip("/") + "/" + document_folder - source = f"s3://{syncer.bucket_name}/{syncer.prefix}{document_folder}" - local_path = args.local_path.rstrip("/") + "/" + document_folder - syncer.sync(source=source, destination=local_path) - sys.exit(0) - else: - console.print(f"[red] Document folder {document_folder} does not exist in bucket {args.bucket_path}.[/]") - sys.exit(1) - - syncer.sync_to_s3(processes=int(args.processes)) + _document_folders = args.document_folders + # Make sure you are in the right directory for git commands + os.chdir(Path(args.local_path).parent.as_posix()) + # Force color + os.environ["FORCE_COLOR"] = "1" + + if _document_folders and _document_folders != "all": + folders_to_sync = [] + for _folder in _document_folders.split(" "): + full_folder_name = convert_short_name_to_folder_name(_folder) + if syncer.verify_document_folder(full_folder_name): + console.print(f"[blue] Document folder {full_folder_name} exists in bucket {args.bucket_path}.[/]") + folders_to_sync.append(full_folder_name) + else: + console.print(f"[red] Document folder {full_folder_name} does not exist in bucket {args.bucket_path}.[/]") + sys.exit(1) + + folders_to_sync = sort_priority_folders(folders_to_sync) + syncer.sync_to_s3(processes=int(args.processes), folders=folders_to_sync) + else: + syncer.sync_to_s3(processes=int(args.processes)) diff --git a/scripts/transfer_utils.py b/scripts/transfer_utils.py index bf73d0f6dd..1ed7df53a8 100644 --- a/scripts/transfer_utils.py +++ b/scripts/transfer_utils.py @@ -1,8 +1,11 @@ import subprocess import sys +import tempfile from functools import cached_property from multiprocessing import Pool from pathlib import Path +from threading import Thread +from time import sleep from typing import Callable, Any import boto3 @@ -11,6 +14,13 @@ from rich.console import Console console = Console(width=200, color_system="standard") +def track_progress(folder: str, file_path: Path): + while file_path.exists(): + num_lines = file_path.read_text().count("\n") + console.print(f"{folder}: [blue] Processed {num_lines} files[/]") + sleep(10) + + class CommonTransferUtils: s3_client = boto3.client('s3') @@ -19,7 +29,7 @@ class CommonTransferUtils: self.local_path = local_path.rstrip("/") + "/" @cached_property - def bucket_name(self): + def bucket_name(self) -> str: try: bucket = urllib3.util.parse_url(self.bucket).netloc return bucket @@ -28,7 +38,7 @@ class CommonTransferUtils: sys.exit(1) @cached_property - def prefix(self): + def prefix(self) -> str: try: pref = urllib3.util.parse_url(self.bucket).path if pref.startswith('/'): @@ -47,7 +57,7 @@ class CommonTransferUtils: console.print(f"[red] Error: {e}[/]") sys.exit(1) - def get_list_of_folders(self): + def get_list_of_folders(self) -> list[str]: folders = [] try: response = self.s3_client.list_objects_v2( @@ -58,29 +68,33 @@ class CommonTransferUtils: if 'CommonPrefixes' in response: for cur_prefix in response['CommonPrefixes']: folders.append(cur_prefix['Prefix']) - return folders + return sorted(folders) except Exception as e: console.print(f"[yellow] Error: {e}[/]") return [] - def sync(self, source: str, destination: str): + def sync(self, source: str, destination: str): console.print(f"[blue] Syncing {source} to {destination} [/]") - - if source.startswith("s3://"): - subprocess.run( - ["aws", "s3", "sync", "--delete", source, destination], capture_output=True, text=True, check=True - ) - console.print(f"[blue] Sync completed for {source} to {destination} [/]") - return - - if Path(source).is_dir(): - subprocess.run( - ["aws", "s3", "sync", "--delete", source, destination], capture_output=True, text=True, check=True - ) - else: - self.copy(source, destination) - + with tempfile.NamedTemporaryFile(mode="w+", delete=True, suffix=".out") as output_file: + thread = Thread(target=track_progress, args=(source, Path(output_file.name),)) + thread.start() + if source.startswith("s3://"): + subprocess.run( + ["aws", "s3", "sync", "--delete", "--no-progress", source, destination], + stdout=output_file, + text=True, check=True + ) + console.print(f"[blue] Sync completed for {source} to {destination} [/]") + elif Path(source).is_dir(): + subprocess.run( + ["aws", "s3", "sync", "--delete", "--no-progress", source, destination], + stdout=output_file, + text=True, check=True + ) + else: + self.copy(source, destination) + Path(output_file.name).unlink(missing_ok=True) console.print(f"[blue] Sync completed for {source} to {destination} [/]") @staticmethod @@ -95,17 +109,40 @@ class CommonTransferUtils: @staticmethod def copy(source, destination): console.print(f"[blue] Copying {source} to {destination} [/]") - - subprocess.run( - ["aws", "s3", "cp", source, destination], capture_output=True, text=True, check=True - ) + with tempfile.NamedTemporaryFile(mode="w+", delete=True, suffix=".out") as output_file: + thread = Thread(target=track_progress, args=(source, Path(output_file.name),)) + thread.start() + subprocess.run( + ["aws", "s3", "cp", "--quiet", "--no-progress", source, destination], + stdout=output_file, + text=True, check=True + ) + Path(output_file.name).unlink(missing_ok=True) console.print(f"[blue] Copy completed for {source} to {destination} [/]") @staticmethod - def remove(file_to_delete): + def remove(file_to_delete: str): console.print(f"[blue] Deleting {file_to_delete} [/]") - subprocess.run( ["aws", "s3", "rm", file_to_delete], capture_output=True, text=True, check=True ) console.print(f"[blue] Delete completed for {file_to_delete} [/]") + +def convert_short_name_to_folder_name(short_name: str) -> str: + if not short_name.startswith("apache-airflow-providers-"): + return f"apache-airflow-providers-{short_name.replace('.', '-')}" + return short_name + +# start with those folders first +PRIORITY_FOLDERS = ["apache-airflow-providers-google", "apache-airflow-providers-amazon"] + +def sort_priority_folders(folders: list[str]) -> list[str]: + """ + Sort the folders in a way that the priority folders are at the top + """ + sorted_folders = [] + for folder in PRIORITY_FOLDERS: + if folder in folders: + sorted_folders.append(folder) + folders.remove(folder) + return sorted_folders + sorted(folders)