This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 48fa7b5553 Fix system test for DataprocSubmitJobOperator (SparkR)
(#32746)
48fa7b5553 is described below
commit 48fa7b5553e461c96e2364aefae8d89b2dbea4b9
Author: max <[email protected]>
AuthorDate: Wed Jul 26 09:49:47 2023 +0200
Fix system test for DataprocSubmitJobOperator (SparkR) (#32746)
---
.../cloud/dataproc/example_dataproc_sparkr.py | 50 ++++++++++++++++------
1 file changed, 36 insertions(+), 14 deletions(-)
diff --git
a/tests/system/providers/google/cloud/dataproc/example_dataproc_sparkr.py
b/tests/system/providers/google/cloud/dataproc/example_dataproc_sparkr.py
index 80d4bca96f..fb81d8acb2 100644
--- a/tests/system/providers/google/cloud/dataproc/example_dataproc_sparkr.py
+++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_sparkr.py
@@ -22,15 +22,18 @@ from __future__ import annotations
import os
from datetime import datetime
-from pathlib import Path
from airflow import models
+from airflow.decorators import task
from airflow.providers.google.cloud.operators.dataproc import (
DataprocCreateClusterOperator,
DataprocDeleteClusterOperator,
DataprocSubmitJobOperator,
)
-from airflow.providers.google.cloud.operators.gcs import
GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.providers.google.cloud.operators.gcs import (
+ GCSCreateBucketOperator,
+ GCSDeleteBucketOperator,
+)
from airflow.providers.google.cloud.transfers.local_to_gcs import
LocalFilesystemToGCSOperator
from airflow.utils.trigger_rule import TriggerRule
@@ -39,12 +42,8 @@ DAG_ID = "dataproc_sparkr"
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
-CLUSTER_NAME = f"dataproc-sparkr-{ENV_ID}"
+CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}".replace("_", "-")
REGION = "europe-west1"
-ZONE = "europe-west1-b"
-
-SPARKR_SRC = str(Path(__file__).parent / "resources" / "hello_world.R")
-SPARKR_FILE = "hello_world.R"
# Cluster definition
CLUSTER_CONFIG = {
@@ -60,14 +59,20 @@ CLUSTER_CONFIG = {
},
}
-TIMEOUT = {"seconds": 1 * 24 * 60 * 60}
+JOB_FILE_NAME = "dataproc-sparkr-job.r"
+JOB_FILE_LOCAL_PATH = f"/tmp/{JOB_FILE_NAME}"
+JOB_FILE_CONTENT = """library(SparkR)
+sparkR.session()
+df <- as.DataFrame(faithful)
+head(df)
+"""
# Jobs definitions
# [START how_to_cloud_dataproc_sparkr_config]
SPARKR_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
- "spark_r_job": {"main_r_file_uri": f"gs://{BUCKET_NAME}/{SPARKR_FILE}"},
+ "spark_r_job": {"main_r_file_uri": f"gs://{BUCKET_NAME}/{JOB_FILE_NAME}"},
}
# [END how_to_cloud_dataproc_sparkr_config]
@@ -77,15 +82,23 @@ with models.DAG(
schedule="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
- tags=["example", "dataproc"],
+ tags=["example", "dataproc", "sparkr"],
) as dag:
create_bucket = GCSCreateBucketOperator(
task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
)
+
+ @task
+ def create_job_file():
+ with open(JOB_FILE_LOCAL_PATH, "w") as job_file:
+ job_file.write(JOB_FILE_CONTENT)
+
+ create_job_file_task = create_job_file()
+
upload_file = LocalFilesystemToGCSOperator(
task_id="upload_file",
- src=SPARKR_SRC,
- dst=SPARKR_FILE,
+ src=JOB_FILE_LOCAL_PATH,
+ dst=JOB_FILE_NAME,
bucket=BUCKET_NAME,
)
@@ -113,12 +126,21 @@ with models.DAG(
task_id="delete_bucket", bucket_name=BUCKET_NAME,
trigger_rule=TriggerRule.ALL_DONE
)
+ @task(trigger_rule=TriggerRule.ALL_DONE)
+ def delete_job_file():
+ try:
+ os.remove(JOB_FILE_LOCAL_PATH)
+ except FileNotFoundError:
+ pass
+
+ delete_job_file_task = delete_job_file()
+
# TEST SETUP
- create_bucket >> [upload_file, create_cluster]
+ [create_bucket, create_job_file_task] >> upload_file
# TEST BODY
[upload_file, create_cluster] >> sparkr_task
# TEST TEARDOWN
- sparkr_task >> [delete_cluster, delete_bucket]
+ sparkr_task >> [delete_cluster, delete_bucket, delete_job_file_task]
from tests.system.utils.watcher import watcher