iostreamdoth commented on a change in pull request #18197:
URL: https://github.com/apache/airflow/pull/18197#discussion_r707831115



##########
File path: airflow/providers/google/cloud/operators/dataproc.py
##########
@@ -48,6 +54,83 @@
 )
 
 
+class DataProcCreateWorkflowBaseOperator(BaseOperator):
+    """Helper class for preparing configs from workflow template"""
+
+    def __init__(self, **kwargs):
+        super().__init__(**kwargs)
+
+    def prepare_template(self) -> None:
+        try:
+            # if no file is specified, skip
+            if 
isinstance(self.template["placement"]["managed_cluster"]["config"], str):
+                if 
urlparse(self.template["placement"]["managed_cluster"]["config"]).scheme == 
"gs":
+                    config = self._build_cluster_config(
+                        self.template["placement"]["managed_cluster"]["config"]
+                    )
+
+                    # Dataproc workflows do not accept life cycle_config
+                    if "lifecycle_config" in config:
+                        self.log.warning(
+                            "Workflow does not accept lifecycle in config, 
removing it from config"
+                        )
+                        config.pop("lifecycle_config", None)
+                    self.template["placement"]["managed_cluster"]["config"] = 
config
+                else:
+                    self.log.warning(
+                        "Workflow Template config accepts only gcs path string 
or dictioanry type, "
+                        "skipping build cluster config from yml file"
+                    )
+        except KeyError:
+            pass

Review comment:
       A template dictionary can be of 2 types:
   1. managed_cluster which needs config key like 
   ```
   WORKFLOW_TEMPLATE_YML = {
       "id": WORKFLOW_NAME_YML,
       "placement": {
           "managed_cluster": {
               "cluster_name": CLUSTER_NAME,
               "config": YML_CONFIG_URI,
           }
       },
       "jobs": [{"step_id": "pig_job_1", "pig_job": PIG_JOB["pig_job"]}],
   }
   ```
   2. cluster_selector  like which does not need config key
   ```
   WORKFLOW_TEMPLATE = {
       "id": WORKFLOW_NAME
       "placement": {
           "cluster_selector": {
               "cluster_name": CLUSTER_NAME
           }
       },
       "jobs": [{"step_id": "pig_job_1", "pig_job": PIG_JOB["pig_job"]}],
   }
   ```
   so given that, yes key  
`self.template["placement"]["managed_cluster"]["config"]` can fail in case of 
`cluster_selector` type template but it is still a valid template. I did more 
digging around it and I feel that prepare_template() is not the right place to 
perform this conversion of yml config file to dictionary. Will try to resolve 
the config during execute method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to