jbbqqf opened a new pull request, #66625: URL: https://github.com/apache/airflow/pull/66625
<!-- Was generative AI tooling used to co-author this PR? - [X] Yes — Claude Code (https://claude.com/claude-code), per contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions. Generated-by: Claude Code following the guidelines. All code was reviewed against google-cloud-aiplatform's PipelineJob.submit signature and against the existing pattern used by RayClusterOperator (which already plumbs `reserved_ip_ranges` end-to-end). Tests pass locally. --> closes: #62733 ## Summary `PipelineJobHook.run_pipeline_job` and `PipelineJobHook.submit_pipeline_job` accept `service_account`, `network`, `create_request_timeout`, and `experiment`, but they never forwarded `reserved_ip_ranges` to the underlying `PipelineJob.submit()` call — even though `google-cloud-aiplatform` supports the kwarg and the Vertex AI Ray cluster operator already plumbs it through. Users running Vertex AI pipelines on a VPC with reserved peering ranges had no way to constrain a pipeline to those ranges through `RunPipelineJobOperator`. This PR adds the optional `reserved_ip_ranges: list[str] | None` parameter to both hook entry points and to `RunPipelineJobOperator`, and forwards it to `PipelineJob.submit()` **only when explicitly set** so that users still on an older aiplatform release that doesn't accept the kwarg keep working. ## Context - aiplatform reference: [`PipelineJob.submit()`](https://cloud.google.com/python/docs/reference/aiplatform/latest/google.cloud.aiplatform.PipelineJob#google_cloud_aiplatform_PipelineJob_submit) — `reserved_ip_ranges: Optional[List[str]] = None`. - Existing precedent in this provider: `providers/google/src/airflow/providers/google/cloud/operators/vertex_ai/ray.py:189` already plumbs `reserved_ip_ranges` through to the Ray cluster create call. ## Changes - `providers/google/src/airflow/providers/google/cloud/hooks/vertex_ai/pipeline_job.py` — add `reserved_ip_ranges` to `run_pipeline_job` and `submit_pipeline_job`. The `submit()` call uses a kwargs dict that only includes `reserved_ip_ranges` when it's set; a one-line code comment documents *why* (older aiplatform versions reject unknown kwargs). - `providers/google/src/airflow/providers/google/cloud/operators/vertex_ai/pipeline_job.py` — add `reserved_ip_ranges` to `RunPipelineJobOperator.__init__` and forward it to `submit_pipeline_job(...)`. - `providers/google/tests/unit/google/cloud/hooks/vertex_ai/test_pipeline_job.py` — three regression tests: - `test_submit_pipeline_job_forwards_reserved_ip_ranges` — value lands on the SDK boundary; - `test_submit_pipeline_job_omits_reserved_ip_ranges_when_unset` — backwards-compat: kwarg absent when `None`; - `test_run_pipeline_job_forwards_reserved_ip_ranges` — same for the synchronous variant + asserts `wait()` is called. ## Reproduce BEFORE/AFTER yourself (copy-paste) ```bash # --- one-time setup (skip if you already have a working Airflow dev env) --- git clone https://github.com/apache/airflow.git /tmp/repro-62733 && cd /tmp/repro-62733 python -m venv .venv && source .venv/bin/activate pip install -e ./task-sdk -e ./airflow-core -e ./providers/google pip install pytest google-cloud-aiplatform google-auth-httplib2 \ google-api-python-client google-cloud-secret-manager \ gcloud-aio-bigquery gcloud-aio-storage gcloud-aio-auth time-machine # --- BEFORE (origin/main) — should FAIL --- git checkout origin/main git checkout feat/62733-reserved-ip-ranges-pipeline-job -- \ providers/google/tests/unit/google/cloud/hooks/vertex_ai/test_pipeline_job.py cd providers/google PYTHONPATH=../../devel-common/src:. python -m pytest \ tests/unit/google/cloud/hooks/vertex_ai/test_pipeline_job.py \ -k reserved_ip_ranges -xvs --rootdir=. # Expected: TypeError: PipelineJobHook.submit_pipeline_job() got an unexpected # keyword argument 'reserved_ip_ranges' → FAILED. # --- AFTER (this PR) — should PASS --- cd /tmp/repro-62733 git checkout feat/62733-reserved-ip-ranges-pipeline-job cd providers/google PYTHONPATH=../../devel-common/src:. python -m pytest \ tests/unit/google/cloud/hooks/vertex_ai/test_pipeline_job.py \ -k reserved_ip_ranges -xvs --rootdir=. # Expected: 3 passed. ``` The reviewer can also run the full hook test file to confirm no regression elsewhere: same command without `-k reserved_ip_ranges` (22 tests). ## What I ran locally - `pytest tests/unit/google/cloud/hooks/vertex_ai/test_pipeline_job.py -k reserved_ip_ranges -xvs` → **3/3 passed** - `pytest tests/unit/google/cloud/hooks/vertex_ai/test_pipeline_job.py -v` → **22/22 passed** (full file, no regression) - BEFORE on a stash of the implementation (tests only): the three new tests fail with `TypeError: ... unexpected keyword argument 'reserved_ip_ranges'` — which is the bug behavior reported in the issue. ## Edge cases tested | # | Scenario | Input | Expected | Verified by | |---|----------|-------|----------|-------------| | 1 | Default (unset) | `reserved_ip_ranges=None` | kwarg is **not** forwarded to `submit()` so older aiplatform releases keep working | `test_submit_pipeline_job_omits_reserved_ip_ranges_when_unset` | | 2 | Single range | `["range-1"]` | value lands on `submit()` unchanged; `wait()` is also called for the sync variant | `test_run_pipeline_job_forwards_reserved_ip_ranges` | | 3 | Multiple ranges | `["range-1", "range-2"]` | full list lands on `submit()` unchanged | `test_submit_pipeline_job_forwards_reserved_ip_ranges` | ## Risk / blast radius Additive only. No existing call site changes signature semantics: `reserved_ip_ranges` defaults to `None`, and the `submit()` call only forwards it when non-`None`. Users on the deferrable path (`deferrable=True`) are unaffected — `submit_pipeline_job` is what feeds the deferrable trigger and it already returns the same `PipelineJob`. ## Release note ```release-note Add ``reserved_ip_ranges`` parameter to ``RunPipelineJobOperator`` and the underlying Vertex AI ``PipelineJobHook.run_pipeline_job`` / ``submit_pipeline_job`` methods, allowing users to pin a Vertex AI Pipeline Job to specific reserved IP ranges of a peered VPC. ``` --- *PR drafted with assistance from Claude Code (per `contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions`). The change was reviewed manually against `google-cloud-aiplatform`'s `PipelineJob.submit` signature and the precedent set by `providers/google/.../operators/vertex_ai/ray.py:189`. The reproducer block above was used during development; it is the same one a reviewer can paste verbatim. The contributor takes full responsibility for the patch.* -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
