mik-laj commented on a change in pull request #4633: AIRFLOW-3791: Dataflow - Support check status if pipeline spans on multiple jobs URL: https://github.com/apache/airflow/pull/4633#discussion_r321568473
########## File path: airflow/contrib/hooks/gcp_dataflow_hook.py ########## @@ -47,58 +47,131 @@ class DataflowJobStatus: JOB_STATE_FAILED = "JOB_STATE_FAILED" JOB_STATE_CANCELLED = "JOB_STATE_CANCELLED" JOB_STATE_PENDING = "JOB_STATE_PENDING" + FAILED_END_STATES = {JOB_STATE_FAILED, JOB_STATE_CANCELLED} + SUCCEEDED_END_STATES = {JOB_STATE_DONE} + END_STATES = SUCCEEDED_END_STATES | FAILED_END_STATES class _DataflowJob(LoggingMixin): def __init__(self, dataflow, project_number, name, location, poll_sleep=10, - job_id=None, num_retries=None): + job_id=None, num_retries=None, multiple_jobs=None): self._dataflow = dataflow self._project_number = project_number self._job_name = name self._job_location = location + self._multiple_jobs = multiple_jobs self._job_id = job_id self._num_retries = num_retries - self._job = self._get_job() + if self._num_retries is None: + self._num_retries = 0 self._poll_sleep = poll_sleep + self._jobs = self._get_jobs() - def _get_job_id_from_name(self): - jobs = self._dataflow.projects().locations().jobs().list( - projectId=self._project_number, - location=self._job_location - ).execute(num_retries=self._num_retries) - for job in jobs['jobs']: - if job['name'].lower() == self._job_name.lower(): - self._job_id = job['id'] - return job - return None + def is_job_running(self): + """ + Helper method to check if jos is still running in dataflow + + :return: True if job is running. + :rtype: bool + """ + for job in self._jobs: + if job['currentState'] not in DataflowJobStatus.END_STATES: + return True + return False - def _get_job(self): - if self._job_id: - job = self._dataflow.projects().locations().jobs().get( + # pylint: disable=too-many-nested-blocks + def _get_dataflow_jobs(self): + """ + Helper method to get list of jobs that start with job name or id + + :return: list of jobs including id's + :rtype: list + """ + if not self._multiple_jobs and self._job_id: + return self._dataflow.projects().locations().jobs().get( Review comment: Here is the problem. This method returns a dictionary here and returns a list of dictionaries below. This makes it impossible to determine jobID ``` [2019-09-06 03:20:51,974] {taskinstance.py:1042} ERROR - string indices must be integers Traceback (most recent call last): File "/opt/airflow/airflow/models/taskinstance.py", line 917, in _run_raw_task result = task_copy.execute(context=context) File "/opt/airflow/airflow/gcp/operators/dataflow.py", line 216, in execute self.jar, self.job_class, True, self.multiple_jobs) File "/opt/airflow/airflow/gcp/hooks/dataflow.py", line 372, in start_java_dataflow self._start_dataflow(variables, name, command_prefix, label_formatter, multiple_jobs) File "/opt/airflow/airflow/contrib/hooks/gcp_api_base_hook.py", line 307, in wrapper return func(self, *args, **kwargs) File "/opt/airflow/airflow/gcp/hooks/dataflow.py", line 327, in _start_dataflow variables['region'], self.poll_sleep, job_id, self.num_retries, multiple_jobs) \ File "/opt/airflow/airflow/gcp/hooks/dataflow.py", line 76, in __init__ self._jobs = self._get_jobs() File "/opt/airflow/airflow/gcp/hooks/dataflow.py", line 138, in _get_jobs self._job_id, job['name'] TypeError: string indices must be integers ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services