This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 48fa7b5553 Fix system test for DataprocSubmitJobOperator (SparkR) 
(#32746)
48fa7b5553 is described below

commit 48fa7b5553e461c96e2364aefae8d89b2dbea4b9
Author: max <[email protected]>
AuthorDate: Wed Jul 26 09:49:47 2023 +0200

    Fix system test for DataprocSubmitJobOperator (SparkR) (#32746)
---
 .../cloud/dataproc/example_dataproc_sparkr.py      | 50 ++++++++++++++++------
 1 file changed, 36 insertions(+), 14 deletions(-)

diff --git 
a/tests/system/providers/google/cloud/dataproc/example_dataproc_sparkr.py 
b/tests/system/providers/google/cloud/dataproc/example_dataproc_sparkr.py
index 80d4bca96f..fb81d8acb2 100644
--- a/tests/system/providers/google/cloud/dataproc/example_dataproc_sparkr.py
+++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_sparkr.py
@@ -22,15 +22,18 @@ from __future__ import annotations
 
 import os
 from datetime import datetime
-from pathlib import Path
 
 from airflow import models
+from airflow.decorators import task
 from airflow.providers.google.cloud.operators.dataproc import (
     DataprocCreateClusterOperator,
     DataprocDeleteClusterOperator,
     DataprocSubmitJobOperator,
 )
-from airflow.providers.google.cloud.operators.gcs import 
GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.providers.google.cloud.operators.gcs import (
+    GCSCreateBucketOperator,
+    GCSDeleteBucketOperator,
+)
 from airflow.providers.google.cloud.transfers.local_to_gcs import 
LocalFilesystemToGCSOperator
 from airflow.utils.trigger_rule import TriggerRule
 
@@ -39,12 +42,8 @@ DAG_ID = "dataproc_sparkr"
 PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
 
 BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
-CLUSTER_NAME = f"dataproc-sparkr-{ENV_ID}"
+CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}".replace("_", "-")
 REGION = "europe-west1"
-ZONE = "europe-west1-b"
-
-SPARKR_SRC = str(Path(__file__).parent / "resources" / "hello_world.R")
-SPARKR_FILE = "hello_world.R"
 
 # Cluster definition
 CLUSTER_CONFIG = {
@@ -60,14 +59,20 @@ CLUSTER_CONFIG = {
     },
 }
 
-TIMEOUT = {"seconds": 1 * 24 * 60 * 60}
+JOB_FILE_NAME = "dataproc-sparkr-job.r"
+JOB_FILE_LOCAL_PATH = f"/tmp/{JOB_FILE_NAME}"
+JOB_FILE_CONTENT = """library(SparkR)
+sparkR.session()
+df <- as.DataFrame(faithful)
+head(df)
+"""
 
 # Jobs definitions
 # [START how_to_cloud_dataproc_sparkr_config]
 SPARKR_JOB = {
     "reference": {"project_id": PROJECT_ID},
     "placement": {"cluster_name": CLUSTER_NAME},
-    "spark_r_job": {"main_r_file_uri": f"gs://{BUCKET_NAME}/{SPARKR_FILE}"},
+    "spark_r_job": {"main_r_file_uri": f"gs://{BUCKET_NAME}/{JOB_FILE_NAME}"},
 }
 # [END how_to_cloud_dataproc_sparkr_config]
 
@@ -77,15 +82,23 @@ with models.DAG(
     schedule="@once",
     start_date=datetime(2021, 1, 1),
     catchup=False,
-    tags=["example", "dataproc"],
+    tags=["example", "dataproc", "sparkr"],
 ) as dag:
     create_bucket = GCSCreateBucketOperator(
         task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
     )
+
+    @task
+    def create_job_file():
+        with open(JOB_FILE_LOCAL_PATH, "w") as job_file:
+            job_file.write(JOB_FILE_CONTENT)
+
+    create_job_file_task = create_job_file()
+
     upload_file = LocalFilesystemToGCSOperator(
         task_id="upload_file",
-        src=SPARKR_SRC,
-        dst=SPARKR_FILE,
+        src=JOB_FILE_LOCAL_PATH,
+        dst=JOB_FILE_NAME,
         bucket=BUCKET_NAME,
     )
 
@@ -113,12 +126,21 @@ with models.DAG(
         task_id="delete_bucket", bucket_name=BUCKET_NAME, 
trigger_rule=TriggerRule.ALL_DONE
     )
 
+    @task(trigger_rule=TriggerRule.ALL_DONE)
+    def delete_job_file():
+        try:
+            os.remove(JOB_FILE_LOCAL_PATH)
+        except FileNotFoundError:
+            pass
+
+    delete_job_file_task = delete_job_file()
+
     # TEST SETUP
-    create_bucket >> [upload_file, create_cluster]
+    [create_bucket, create_job_file_task] >> upload_file
     # TEST BODY
     [upload_file, create_cluster] >> sparkr_task
     # TEST TEARDOWN
-    sparkr_task >> [delete_cluster, delete_bucket]
+    sparkr_task >> [delete_cluster, delete_bucket, delete_job_file_task]
 
     from tests.system.utils.watcher import watcher
 

Reply via email to