[GitHub] [airflow] github-actions[bot] commented on pull request #12304: Docs installation improvements
github-actions[bot] commented on pull request #12304: URL: https://github.com/apache/airflow/pull/12304#issuecomment-725893098 [The Workflow run](https://github.com/apache/airflow/actions/runs/359202172) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] Zedmor opened a new pull request #12305: JSON as response for 404 not found
Zedmor opened a new pull request #12305: URL: https://github.com/apache/airflow/pull/12305 In case of existing issue, reference it using one of the following: closes: https://github.com/apache/airflow/issues/12131 ### JSON is not returned for API requests with incorrect endpoint Reason of 12131 is happening is that blueprint level handlers are not possible for 404 and 405. See references: https://stackoverflow.com/questions/58728366/python-flask-error-handling-with-blueprints https://github.com/pallets/flask/issues/1935 and therefore 404 exception handling spill out to application level handler (circles). I am looking forward to better implementation but this works. ``` ~ ξ° curl http://localhost:5000/api/v1/das { "detail": "The requested URL was not found on the server. If you entered the URL manually please check your spelling and try again.", "status": 404, "title": "Not Found", "type": "about:blank" ``` ``` ~ ξ° curl http://localhost:5000/fafs Airflow 404 = lots of circles ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] Acehaidrey commented on pull request #9544: Add metric for scheduling delay between first run task & expected start time
Acehaidrey commented on pull request #9544: URL: https://github.com/apache/airflow/pull/9544#issuecomment-725882481 Hey all sorry for all the delays. Working on it currently and will have the fixes in by tomorrow. Thanks for bearing with me! Seeing that we can use the `TISchedulingDecision` object. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] zjffdu commented on issue #11305: Authorization with Identity Aware Proxy
zjffdu commented on issue #11305: URL: https://github.com/apache/airflow/issues/11305#issuecomment-725867226 > @zjffdu How is the identity from Apache Knox passed to other applications? Have you ever tried integrating other applications with Apache Knox? It looks like there's already a PR in knox project. https://github.com/apache/knox/pull/182 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] mik-laj commented on issue #12282: Rewrite Breeze in Python
mik-laj commented on issue #12282: URL: https://github.com/apache/airflow/issues/12282#issuecomment-725844781 I'm interested too. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] rootcss commented on pull request #11850: Add Telegram hook and operator
rootcss commented on pull request #11850: URL: https://github.com/apache/airflow/pull/11850#issuecomment-725839722 @kaxil Please review. I'll fix the two conflicting files, looks like it gets changed in master frequently enough. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] mik-laj commented on a change in pull request #12249: Add DataflowJobMessagesSensor and DataflowAutoscalingEventsSensor (Depends on: #11726)
mik-laj commented on a change in pull request #12249: URL: https://github.com/apache/airflow/pull/12249#discussion_r521835730 ## File path: airflow/providers/google/cloud/hooks/dataflow.py ## @@ -225,6 +243,67 @@ def _fetch_job_by_id(self, job_id: str) -> dict: .execute(num_retries=self._num_retries) ) +def _fetch_list_job_messages_responses(self, job_id: str) -> Generator[dict, None, None]: +""" +Helper method to fetch ListJobMessagesResponse with the specified Job ID. + +:param job_id: Job ID to get. +:type job_id: str +:return: yields the ListJobMessagesResponse. See: + https://cloud.google.com/dataflow/docs/reference/rest/v1b3/ListJobMessagesResponse +:rtype: Generator[dict, None, None] +""" +# raise Exception("error") Review comment: ```suggestion ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] mik-laj commented on a change in pull request #11726: Add DataflowJobStatusSensor and support non-blocking execution of jobs
mik-laj commented on a change in pull request #11726: URL: https://github.com/apache/airflow/pull/11726#discussion_r521835489 ## File path: airflow/providers/google/cloud/sensors/dataflow.py ## @@ -0,0 +1,115 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""This module contains a Google Cloud Dataflow sensor.""" +from typing import Optional, Sequence, Set, Union + +from airflow.exceptions import AirflowException +from airflow.providers.google.cloud.hooks.dataflow import ( +DEFAULT_DATAFLOW_LOCATION, +DataflowHook, +DataflowJobStatus, +) +from airflow.sensors.base_sensor_operator import BaseSensorOperator +from airflow.utils.decorators import apply_defaults + + +class DataflowJobStatusSensor(BaseSensorOperator): +""" +Checks for the status of a job in Google Cloud Dataflow. + +:param job_id: ID of the job to be checked. +:type job_id: str +:param expected_statuses: The expected state of the operation. +See: + https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState +:type expected_statuses: Union[Set[str], str] +:param project_id: Optional, the Google Cloud project ID in which to start a job. +If set to None or missing, the default project_id from the Google Cloud connection is used. +:type project_id: str +:param location: The location of the Dataflow job (for example europe-west1). See: +https://cloud.google.com/dataflow/docs/concepts/regional-endpoints +:type location: str +:param gcp_conn_id: The connection ID to use connecting to Google Cloud. +:type gcp_conn_id: str +:param delegate_to: The account to impersonate using domain-wide delegation of authority, +if any. For this to work, the service account making the request must have +domain-wide delegation enabled. See: + https://developers.google.com/identity/protocols/oauth2/service-account#delegatingauthority +:type delegate_to: str +:param impersonation_chain: Optional service account to impersonate using short-term +credentials, or chained list of accounts required to get the access_token +of the last account in the list, which will be impersonated in the request. +If set as a string, the account must grant the originating account +the Service Account Token Creator IAM role. +If set as a sequence, the identities from the list must grant +Service Account Token Creator IAM role to the directly preceding identity, with first +account from the list granting this role to the originating account (templated). +:type impersonation_chain: Union[str, Sequence[str]] +""" + +template_fields = ['job_id'] + +@apply_defaults +def __init__( +self, +*, +job_id: str, +expected_statuses: Union[Set[str], str], +project_id: Optional[str] = None, +location: str = DEFAULT_DATAFLOW_LOCATION, +gcp_conn_id: str = 'google_cloud_default', +delegate_to: Optional[str] = None, +impersonation_chain: Optional[Union[str, Sequence[str]]] = None, +**kwargs, +) -> None: +super().__init__(**kwargs) +self.job_id = job_id +self.expected_statuses = ( +{expected_statuses} if isinstance(expected_statuses, str) else expected_statuses +) +self.project_id = project_id +self.location = location +self.gcp_conn_id = gcp_conn_id +self.delegate_to = delegate_to +self.impersonation_chain = impersonation_chain +self.hook: Optional[DataflowHook] = None + +def poke(self, context: dict) -> bool: Review comment: Sensors are a native Airflow feature and the user has the ability to influence his behavior. One way is the `poke_interval` argument, which is set to 60 seconds by default. https://airflow.readthedocs.io/en/latest/_api/airflow/sensors/base_sensor_operator/index.html?highlight=base_sensor_operator#airflow.sensors.base_sensor_operator.BaseSensorOperator This is an automated message from the Apache Git Service. To respond to the m
[GitHub] [airflow] mik-laj opened a new pull request #12304: Docs installation improvements
mik-laj opened a new pull request #12304: URL: https://github.com/apache/airflow/pull/12304 I've improved the documentation a bit by separating the reference documentation from the concept documentation. I also added reference documentation for providers along with Python API reference for them i.e. i.e. you can see what is available in each pip package. All data is in a separate YAML file (along with JSON Schema validation), which allows us to reuse this data, for example to build README.md files. Close: https://github.com/apache/airflow/issues/12281 --- **^ Add meaningful description above** Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)** for more information. In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed. In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x). In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] yangrong688 commented on issue #12282: Rewrite Breeze in Python
yangrong688 commented on issue #12282: URL: https://github.com/apache/airflow/issues/12282#issuecomment-725776357 I would like to contribute. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] caddac commented on issue #12282: Rewrite Breeze in Python
caddac commented on issue #12282: URL: https://github.com/apache/airflow/issues/12282#issuecomment-725774821 I'm interested. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] kukigai commented on pull request #12126: Wait option for dagrun operator
kukigai commented on pull request #12126: URL: https://github.com/apache/airflow/pull/12126#issuecomment-725772492 @turbaszek please see the updated code. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] github-actions[bot] commented on pull request #12303: K8s yaml templates not rendered by k8sexecutor
github-actions[bot] commented on pull request #12303: URL: https://github.com/apache/airflow/pull/12303#issuecomment-725768741 [The Workflow run](https://github.com/apache/airflow/actions/runs/358816685) is cancelling this PR. Building images for the PR has failed. Follow the the workflow link to check the reason. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] AndersonReyes commented on a change in pull request #10349: infer multiple_output from return type annotation
AndersonReyes commented on a change in pull request #10349: URL: https://github.com/apache/airflow/pull/10349#discussion_r521711097 ## File path: docs/tutorial_taskflow_api.rst ## @@ -144,6 +144,22 @@ the dependencies as shown below. :end-before: [END main_flow] +Multiple outputs inference +-- +Tasks can also infer multiple outputs by using dict python typing. + +.. code-block:: python + +@task +def identity_dict(x: int, y: int) -> Dict[str, int]: +return {"x": x, "y": y} + +By using the typing ``Dict`` for the function return type, the ``multiple_outputs`` parameter +is automatically set to true. + +Not, If you manually set the ``multiple_outputs`` parameter the inference is disabled and Review comment: nah definitely a typo This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] dimberman opened a new pull request #12303: K8s yaml templates not rendered by k8sexecutor
dimberman opened a new pull request #12303: URL: https://github.com/apache/airflow/pull/12303 There is a bug in the yaml template rendering caused by the logic that yaml templates are only generated when the current executor is the k8sexecutor. This is a problem as the templates are generated by the task pod, which is itself running a LocalExecutor. Also generates a "base" template if this taskInstance has not run yet. --- **^ Add meaningful description above** Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)** for more information. In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed. In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x). In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] jhtimmins opened a new issue #12302: Web views don't consider DAG-level permissions
jhtimmins opened a new issue #12302: URL: https://github.com/apache/airflow/issues/12302 Dag-level permissions arenβt considered on web views. The built-in permissions are using a the default `has_access()` method, rather than the custom method. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] jhtimmins opened a new issue #12301: DAG action views should only show DAGs a user can edit
jhtimmins opened a new issue #12301: URL: https://github.com/apache/airflow/issues/12301 The DAG view only shows DAGs that a user has view access to. The action views should only show DAGs the user can edit, even though they're all showing up right now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] jhtimmins opened a new issue #12300: Permissions appear to reload repeatedly
jhtimmins opened a new issue #12300: URL: https://github.com/apache/airflow/issues/12300 This results in slow UI load time and too many permissions records Please assign to @jhtimmins This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] Zedmor commented on issue #12282: Rewrite Breeze in Python
Zedmor commented on issue #12282: URL: https://github.com/apache/airflow/issues/12282#issuecomment-725751853 I would like that! sign me up This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] kaxil edited a comment on issue #12272: Package vulnerability scan failing on lodash CVE
kaxil edited a comment on issue #12272: URL: https://github.com/apache/airflow/issues/12272#issuecomment-725749306 Yes we had fixed it in https://github.com/apache/airflow/pull/9921 and even bumped it to 4.17.20 in https://github.com/apache/airflow/pull/11095 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] kaxil edited a comment on issue #12272: Package vulnerability scan failing on lodash CVE
kaxil edited a comment on issue #12272: URL: https://github.com/apache/airflow/issues/12272#issuecomment-725749306 Yes we had even bumped it to 4.17.20 in https://github.com/apache/airflow/pull/11095 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] kaxil commented on issue #12272: Package vulnerability scan failing on lodash CVE
kaxil commented on issue #12272: URL: https://github.com/apache/airflow/issues/12272#issuecomment-725749306 Yes we had fixed it in https://github.com/apache/airflow/pull/11095 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] kaxil closed issue #12272: Package vulnerability scan failing on lodash CVE
kaxil closed issue #12272: URL: https://github.com/apache/airflow/issues/12272 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[airflow] branch master updated: Replace remaining decorated DAGs reference (#12299)
This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/master by this push: new ee544b4 Replace remaining decorated DAGs reference (#12299) ee544b4 is described below commit ee544b4a92ce28607c792519ba6b660e93f50fca Author: Kaxil Naik AuthorDate: Thu Nov 12 00:44:30 2020 + Replace remaining decorated DAGs reference (#12299) `decorated DAGs` -> `DAGs with Task Flow API` --- docs/concepts.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/concepts.rst b/docs/concepts.rst index 2cf6cb1..b05f73f 100644 --- a/docs/concepts.rst +++ b/docs/concepts.rst @@ -290,7 +290,7 @@ The decorated function can be called once to set the arguments and key arguments Task decorator captures returned values and sends them to the :ref:`XCom backend `. By default, the returned value is saved as a single XCom value. You can set ``multiple_outputs`` key argument to ``True`` to unroll dictionaries, lists or tuples into separate XCom values. This can be used with regular operators to -create :ref:`decorated DAGs `. +create :ref:`DAGs with Task Flow API `. Calling a decorated function returns an ``XComArg`` instance. You can use it to set templated fields on downstream operators.
[GitHub] [airflow] kaxil merged pull request #12299: Replace remaining decorated DAGs reference
kaxil merged pull request #12299: URL: https://github.com/apache/airflow/pull/12299 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[airflow] branch master updated (9276607 -> fa2b033)
This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/airflow.git. from 9276607 Add session_parameters option to snowflake_hook (#12071) add fa2b033 Add reference for SubDagOperator (#12297) No new revisions were added by this update. Summary of changes: docs/concepts.rst | 18 +- 1 file changed, 9 insertions(+), 9 deletions(-)
[GitHub] [airflow] kaxil merged pull request #12297: Add reference for SubDagOperator in docs
kaxil merged pull request #12297: URL: https://github.com/apache/airflow/pull/12297 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] github-actions[bot] commented on pull request #12299: Replace remaining decorated DAGs reference
github-actions[bot] commented on pull request #12299: URL: https://github.com/apache/airflow/pull/12299#issuecomment-725744770 The PR is ready to be merged. No tests are needed! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[airflow] branch master updated (4f5e0ed -> 9276607)
This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/airflow.git. from 4f5e0ed Update INTHEWILD.md (#12293) add 9276607 Add session_parameters option to snowflake_hook (#12071) No new revisions were added by this update. Summary of changes: airflow/providers/snowflake/hooks/snowflake.py | 3 +++ airflow/providers/snowflake/operators/snowflake.py | 6 ++ tests/providers/snowflake/hooks/test_snowflake.py | 3 ++- 3 files changed, 11 insertions(+), 1 deletion(-)
[airflow] branch master updated (4f5e0ed -> 9276607)
This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/airflow.git. from 4f5e0ed Update INTHEWILD.md (#12293) add 9276607 Add session_parameters option to snowflake_hook (#12071) No new revisions were added by this update. Summary of changes: airflow/providers/snowflake/hooks/snowflake.py | 3 +++ airflow/providers/snowflake/operators/snowflake.py | 6 ++ tests/providers/snowflake/hooks/test_snowflake.py | 3 ++- 3 files changed, 11 insertions(+), 1 deletion(-)
[GitHub] [airflow] kaxil commented on issue #11731: Using affinity and labels in pod_mutation_hook is not backwards-compatible in 1.10.12
kaxil commented on issue #11731: URL: https://github.com/apache/airflow/issues/11731#issuecomment-725744220 @dimberman Thoughts? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] kaxil commented on issue #12290: Got a sqlite3.OperationalError while runing airflow scheduler
kaxil commented on issue #12290: URL: https://github.com/apache/airflow/issues/12290#issuecomment-725743794 Please add more information like Airflow Version used and how to reproduce the issue. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[airflow] branch master updated (4f5e0ed -> 9276607)
This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/airflow.git. from 4f5e0ed Update INTHEWILD.md (#12293) add 9276607 Add session_parameters option to snowflake_hook (#12071) No new revisions were added by this update. Summary of changes: airflow/providers/snowflake/hooks/snowflake.py | 3 +++ airflow/providers/snowflake/operators/snowflake.py | 6 ++ tests/providers/snowflake/hooks/test_snowflake.py | 3 ++- 3 files changed, 11 insertions(+), 1 deletion(-)
[airflow] branch master updated (4f5e0ed -> 9276607)
This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/airflow.git. from 4f5e0ed Update INTHEWILD.md (#12293) add 9276607 Add session_parameters option to snowflake_hook (#12071) No new revisions were added by this update. Summary of changes: airflow/providers/snowflake/hooks/snowflake.py | 3 +++ airflow/providers/snowflake/operators/snowflake.py | 6 ++ tests/providers/snowflake/hooks/test_snowflake.py | 3 ++- 3 files changed, 11 insertions(+), 1 deletion(-)
[GitHub] [airflow] kaxil merged pull request #12071: Add session_parameters option to snowflake_hook
kaxil merged pull request #12071: URL: https://github.com/apache/airflow/pull/12071 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] kaxil opened a new pull request #12299: Replace remaining decorated DAGs reference
kaxil opened a new pull request #12299: URL: https://github.com/apache/airflow/pull/12299 `decorated DAGs` -> `DAGs with Task Flow API` --- **^ Add meaningful description above** Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)** for more information. In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed. In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x). In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] kaxil opened a new issue #12298: Add docs around when to use TaskGroup vs SubDag and potentially listing PROs and CONS.
kaxil opened a new issue #12298: URL: https://github.com/apache/airflow/issues/12298 It would be great for users to know when they should use TaskGroup vs SubDag. What are the PROs and CONs of each **TaskGroup**: https://airflow.readthedocs.io/en/latest/concepts.html#taskgroup **SubDags**: https://airflow.readthedocs.io/en/latest/concepts.html#subdags This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] kaxil commented on issue #8112: Authorization and Permissions
kaxil commented on issue #8112: URL: https://github.com/apache/airflow/issues/8112#issuecomment-725729492 Can this be closed @jhtimmins / @mik-laj ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] AndersonReyes commented on a change in pull request #10349: infer multiple_output from return type annotation
AndersonReyes commented on a change in pull request #10349: URL: https://github.com/apache/airflow/pull/10349#discussion_r521711017 ## File path: airflow/operators/python.py ## @@ -255,6 +255,16 @@ def task( :type multiple_outputs: bool """ +# try to infer from type annotation +if python_callable and multiple_outputs is None: +sig = signature(python_callable).return_annotation +ttype = getattr(sig, "__origin__", None) + +if sig != inspect.Signature.empty and ttype in (dict, Dict): +multiple_outputs = True + +# in case its still none, then set False here +multiple_outputs = multiple_outputs or False Review comment: I'll do the patch suggestion you have above, removing the if statement would set it to false anyways so yeah the line is not needed ## File path: docs/tutorial_taskflow_api.rst ## @@ -144,6 +144,22 @@ the dependencies as shown below. :end-before: [END main_flow] +Multiple outputs inference +-- +Tasks can also infer multiple outputs by using dict python typing. + +.. code-block:: python + +@task +def identity_dict(x: int, y: int) -> Dict[str, int]: +return {"x": x, "y": y} + +By using the typing ``Dict`` for the function return type, the ``multiple_outputs`` parameter +is automatically set to true. + +Not, If you manually set the ``multiple_outputs`` parameter the inference is disabled and Review comment: nah def a typo ## File path: airflow/operators/python.py ## @@ -255,6 +255,16 @@ def task( :type multiple_outputs: bool """ +# try to infer from type annotation +if python_callable and multiple_outputs is None: +sig = signature(python_callable).return_annotation +ttype = getattr(sig, "__origin__", None) + +if sig != inspect.Signature.empty and ttype in (dict, Dict): +multiple_outputs = True Review comment: π This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] kaxil opened a new pull request #12297: Add reference for SubDagOperator
kaxil opened a new pull request #12297: URL: https://github.com/apache/airflow/pull/12297 It would be better for users to have a link when they see SubDagOperator where they can read about it instead of just Class names --- **^ Add meaningful description above** Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)** for more information. In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed. In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x). In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] kaxil opened a new pull request #12296: Remove deprecated Elasticsearch Configs
kaxil opened a new pull request #12296: URL: https://github.com/apache/airflow/pull/12296 Since Airflow 1.10.4 we have removed `elasticsearch_` prefix from all config items under `[elasticsearch]` section. It is time we remove them from 2.0. --- **^ Add meaningful description above** Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)** for more information. In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed. In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x). In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[airflow] branch master updated (289c9b5 -> 4f5e0ed)
This is an automated email from the ASF dual-hosted git repository. potiuk pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/airflow.git. from 289c9b5 Use default view in TriggerDagRunLink (#11778) add 4f5e0ed Update INTHEWILD.md (#12293) No new revisions were added by this update. Summary of changes: INTHEWILD.md | 1 + 1 file changed, 1 insertion(+)
[airflow] branch master updated (289c9b5 -> 4f5e0ed)
This is an automated email from the ASF dual-hosted git repository. potiuk pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/airflow.git. from 289c9b5 Use default view in TriggerDagRunLink (#11778) add 4f5e0ed Update INTHEWILD.md (#12293) No new revisions were added by this update. Summary of changes: INTHEWILD.md | 1 + 1 file changed, 1 insertion(+)
[GitHub] [airflow] kaxil opened a new pull request #12295: Remove deprecated BashTaskRunner
kaxil opened a new pull request #12295: URL: https://github.com/apache/airflow/pull/12295 This commit: - Remove support for BashTaskRunner, this task_runner was deprecated from Airflow 1.10.3 (https://github.com/apache/airflow/blob/1.10.3/UPDATING.md#rename-of-bashtaskrunner-to-standardtaskrunner) - Support deprecated `hostname_callable` & `email_backedn` until 2.1 since it has not been deprecated in any relased Airflow versions --- **^ Add meaningful description above** Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)** for more information. In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed. In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x). In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] github-actions[bot] commented on pull request #12293: Update INTHEWILD.md
github-actions[bot] commented on pull request #12293: URL: https://github.com/apache/airflow/pull/12293#issuecomment-725721147 The PR is ready to be merged. No tests are needed! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] potiuk merged pull request #12293: Update INTHEWILD.md
potiuk merged pull request #12293: URL: https://github.com/apache/airflow/pull/12293 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] boring-cyborg[bot] commented on pull request #12293: Update INTHEWILD.md
boring-cyborg[bot] commented on pull request #12293: URL: https://github.com/apache/airflow/pull/12293#issuecomment-725721210 Awesome work, congrats on your first merged pull request! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] kukigai commented on a change in pull request #12126: Wait option for dagrun operator
kukigai commented on a change in pull request #12126: URL: https://github.com/apache/airflow/pull/12126#discussion_r521699808 ## File path: airflow/operators/dagrun_operator.py ## @@ -55,6 +57,14 @@ class TriggerDagRunOperator(BaseOperator): When reset_dag_run=False and dag run exists, DagRunAlreadyExists will be raised. When reset_dag_run=True and dag run exists, existing dag run will be cleared to rerun. :type reset_dag_run: bool +:param wait_for_completion: Whether or not wait for dag run completion. +:type wait_for_completion: bool +:param poke_interval: Poke internal to check dag run status when wait_for_completion=True. Review comment: will add it and fix typo :( This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] kukigai commented on a change in pull request #12126: Wait option for dagrun operator
kukigai commented on a change in pull request #12126: URL: https://github.com/apache/airflow/pull/12126#discussion_r521699461 ## File path: airflow/operators/dagrun_operator.py ## @@ -55,6 +57,14 @@ class TriggerDagRunOperator(BaseOperator): When reset_dag_run=False and dag run exists, DagRunAlreadyExists will be raised. When reset_dag_run=True and dag run exists, existing dag run will be cleared to rerun. :type reset_dag_run: bool +:param wait_for_completion: Whether or not wait for dag run completion. Review comment: will add it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] kukigai commented on a change in pull request #12126: Wait option for dagrun operator
kukigai commented on a change in pull request #12126: URL: https://github.com/apache/airflow/pull/12126#discussion_r521698535 ## File path: airflow/operators/dagrun_operator.py ## @@ -126,3 +144,22 @@ def execute(self, context: Dict): dag.clear(start_date=self.execution_date, end_date=self.execution_date) else: raise e + +if self.wait_for_completion: +# wait for dag to complete +while True: Review comment: @turbaszek @XD-DENG regarding https://github.com/apache/airflow/pull/12126#discussion_r521673165, user can set execution_timeout. Do you think we should have separate internal timeout for this operator? I prefer to have user choose execution_timeout so keep infinite loop as is. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] kaxil opened a new issue #12294: Option to auto-replace deprecated configs with new options for 2.0
kaxil opened a new issue #12294: URL: https://github.com/apache/airflow/issues/12294 This issue is part of #8765 It would be good to have a rule or command in upgrade-check utility tool to replace the deprecated configs with new options. Following are the deprecated options: https://github.com/apache/airflow/blob/289c9b5a994a3e26951ca23b6edd30b2329b3089/airflow/configuration.py#L136-L174 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] boring-cyborg[bot] commented on pull request #12293: Update INTHEWILD.md
boring-cyborg[bot] commented on pull request #12293: URL: https://github.com/apache/airflow/pull/12293#issuecomment-725714541 Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst) Here are some useful points: - Pay attention to the quality of your code (flake8, pylint and type annotations). Our [pre-commits]( https://github.com/apache/airflow/blob/master/STATIC_CODE_CHECKS.rst#prerequisites-for-pre-commit-hooks) will help you with that. - In case of a new feature add useful documentation (in docstrings or in `docs/` directory). Adding a new operator? Check this short [guide](https://github.com/apache/airflow/blob/master/docs/howto/custom-operator.rst) Consider adding an example DAG that shows how users should use it. - Consider using [Breeze environment](https://github.com/apache/airflow/blob/master/BREEZE.rst) for testing locally, itβs a heavy docker but it ships with a working Airflow and a lot of integrations. - Be patient and persistent. It might take some time to get a review or get the final approval from Committers. - Please follow [ASF Code of Conduct](https://www.apache.org/foundation/policies/conduct) for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack. - Be sure to read the [Airflow Coding style]( https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#coding-style-and-best-practices). Apache Airflow is a community-driven project and together we are making it better π. In case of doubts contact the developers at: Mailing List: d...@airflow.apache.org Slack: https://s.apache.org/airflow-slack This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] Zedmor opened a new pull request #12293: Update INTHEWILD.md
Zedmor opened a new pull request #12293: URL: https://github.com/apache/airflow/pull/12293 Update in the wild with our org. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] kaxil opened a new issue #12292: Deprecate SubDags in Favor of TaskGroups
kaxil opened a new issue #12292: URL: https://github.com/apache/airflow/issues/12292 Once TaskGroups (https://airflow.readthedocs.io/en/latest/concepts.html#taskgroup) that would be released in Airflow 2.0 reach feature parity with SubDags and we have wide adoption and feedback from users about Taskgroups we should deprecate Subdags and remove them eventually in Airflow 3.0. Discussion Thread: https://lists.apache.org/thread.html/ra52746f9c8274469d343b5f0251199de776e75ab75ded6830886fb6a%40%3Cdev.airflow.apache.org%3E This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] kukigai commented on a change in pull request #12126: Wait option for dagrun operator
kukigai commented on a change in pull request #12126: URL: https://github.com/apache/airflow/pull/12126#discussion_r521694146 ## File path: airflow/operators/dagrun_operator.py ## @@ -126,3 +144,22 @@ def execute(self, context: Dict): dag.clear(start_date=self.execution_date, end_date=self.execution_date) else: raise e + +if self.wait_for_completion: +# wait for dag to complete +while True: +self.log.info( +'Waiting for %s on %s to become allowed state %s ...', +self.trigger_dag_id, +dag_run.execution_date, +self.allowed_states, +) +dag_run.refresh_from_db() +state = dag_run.state +if state in self.failed_states: +raise AirflowException(f"{self.trigger_dag_id} failed with failed states {state}") Review comment: @turbaszek Yes, I was thinking about that too. I will send separate PR once this is done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] kukigai commented on a change in pull request #12126: Wait option for dagrun operator
kukigai commented on a change in pull request #12126: URL: https://github.com/apache/airflow/pull/12126#discussion_r521693421 ## File path: airflow/operators/dagrun_operator.py ## @@ -126,3 +144,22 @@ def execute(self, context: Dict): dag.clear(start_date=self.execution_date, end_date=self.execution_date) else: raise e + +if self.wait_for_completion: +# wait for dag to complete +while True: +self.log.info( +'Waiting for %s on %s to become allowed state %s ...', +self.trigger_dag_id, +dag_run.execution_date, +self.allowed_states, +) +dag_run.refresh_from_db() +state = dag_run.state +if state in self.failed_states: +raise AirflowException(f"{self.trigger_dag_id} failed with failed states {state}") +elif state in self.allowed_states: +self.log.info("%s finished with allowed state %s", self.trigger_dag_id, state) +return + +time.sleep(self.poke_interval) Review comment: ok i can This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] kaxil commented on issue #8605: Add Production-ready docker compose for the production image
kaxil commented on issue #8605: URL: https://github.com/apache/airflow/issues/8605#issuecomment-725712013 Also, I don't think docker-compose files need to be production-ready. It should just be meant for local-development or to quickly start / work on Airflow locally with different executors This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] kaxil commented on issue #8605: Add Production-ready docker compose for the production image
kaxil commented on issue #8605: URL: https://github.com/apache/airflow/issues/8605#issuecomment-725711363 I think we should get this one in sooner before 2.0.0rc1, is someone willing to work on this one?? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] XD-DENG commented on a change in pull request #12126: Wait option for dagrun operator
XD-DENG commented on a change in pull request #12126: URL: https://github.com/apache/airflow/pull/12126#discussion_r521684303 ## File path: airflow/operators/dagrun_operator.py ## @@ -126,3 +144,22 @@ def execute(self, context: Dict): dag.clear(start_date=self.execution_date, end_date=self.execution_date) else: raise e + +if self.wait_for_completion: +# wait for dag to complete +while True: Review comment: Yep. For no reason I mixed these two just nowπ Agree with you This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] turbaszek commented on a change in pull request #12126: Wait option for dagrun operator
turbaszek commented on a change in pull request #12126: URL: https://github.com/apache/airflow/pull/12126#discussion_r521682843 ## File path: airflow/operators/dagrun_operator.py ## @@ -126,3 +144,22 @@ def execute(self, context: Dict): dag.clear(start_date=self.execution_date, end_date=self.execution_date) else: raise e + +if self.wait_for_completion: +# wait for dag to complete +while True: Review comment: > My concern was users don't get aware of new additions. This is a valid concern, however the UPDATING.md was purposed to be place for breaking changes. New features are "announced" in changelog π€·ββοΈ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] XD-DENG commented on a change in pull request #12126: Wait option for dagrun operator
XD-DENG commented on a change in pull request #12126: URL: https://github.com/apache/airflow/pull/12126#discussion_r521678717 ## File path: airflow/operators/dagrun_operator.py ## @@ -126,3 +144,22 @@ def execute(self, context: Dict): dag.clear(start_date=self.execution_date, end_date=self.execution_date) else: raise e + +if self.wait_for_completion: +# wait for dag to complete +while True: Review comment: My concern was users don't get aware of new additions. I don't have strong opinion on this though, so it's also ok for me if we don't add this entry in `UPDATING.md`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] turbaszek commented on a change in pull request #12126: Wait option for dagrun operator
turbaszek commented on a change in pull request #12126: URL: https://github.com/apache/airflow/pull/12126#discussion_r521677189 ## File path: airflow/operators/dagrun_operator.py ## @@ -126,3 +144,22 @@ def execute(self, context: Dict): dag.clear(start_date=self.execution_date, end_date=self.execution_date) else: raise e + +if self.wait_for_completion: +# wait for dag to complete +while True: Review comment: > Please also add an entry in `UPDATING.md` (section `master`) to describe this change @kukigai @XD-DENG do we need it? The wait feature is a new addition and the default value is to avoid waiting, thus the default behaviour is the same as before this change. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] vitaly-krugl opened a new issue #12291: Trigger Dag from Airflow Web GUI sets origin with wrong HTTP scheme
vitaly-krugl opened a new issue #12291: URL: https://github.com/apache/airflow/issues/12291 **Apache Airflow version**: 1.10.11 **Environment**: - **OS** (e.g. from /etc/os-release): - **Kernel** (e.g. `uname -a`): Linux **What happened**: After manually triggering a DAG from Airflow Web GUI, Airflow attempted to open the /admin/ page using http:// instead of the the https:// scheme, and so failed to connect because my webserver only supports SSL connections. **What you expected to happen**: I expected Airflow to open the /admin/ page using the https:// scheme because my webserver only supports SSL connections. Furthermore, I have the `[webserver]` `base_url` option set up using https:// scheme. **How to reproduce it**: When I examine the HTML source of the `Airflow - DAGs` page, I see that the `` blocks have "origin" url args configured with "http", as in this example: ``` http://emp-wf-vkruglik.mydomain.com/admin/";> ``` I have the `[webserver]` `base_url` option set up as ``` base_url = https://emp-wf-vkruglik.mydomain.com ``` Note that in my network setup, there is an apache server that acts as SSL terminator. Apache server in turn forwards the requests to Airflow webserver as HTTP over port 80. So, in the web browser (I am using Chrome), I access Airflow using this URL: https://emp-wf-vkruglik.mydomain.com/admin/ (note the https). However, when I got trough the "Trigger Dag" in Airflow UI and click on the Trigger button (or the "abort" button), Airflow GUI attempts to load `http://emp-wf-vkruglik.mydomain.com/admin/` instead of `https://emp-wf-vkruglik.mydomain.com/admin/`. From the source code of airflow/www/templates/airflow/trigger.html, I see that this incorrect HTTP scheme is picked up from the `{{ origin }}` arg: ``` {% extends "airflow/master.html" %} {% block body %} {{ super() }} Trigger DAG: {{ dag_id }} Configuration JSON (Optional) {{ conf }} To access configuration in your DAG use {{ '{{' }} dag_run.conf {{ '}}' }}. bail. {% endblock %} ``` **Anything else we need to know**: Occurs 100% This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] XD-DENG commented on a change in pull request #12126: Wait option for dagrun operator
XD-DENG commented on a change in pull request #12126: URL: https://github.com/apache/airflow/pull/12126#discussion_r521675792 ## File path: airflow/operators/dagrun_operator.py ## @@ -126,3 +144,22 @@ def execute(self, context: Dict): dag.clear(start_date=self.execution_date, end_date=self.execution_date) else: raise e + +if self.wait_for_completion: +# wait for dag to complete +while True: Review comment: Please also add an entry in `UPDATING.md` (section `master`) to describe this change @kukigai This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] turbaszek commented on a change in pull request #10349: infer multiple_output from return type annotation
turbaszek commented on a change in pull request #10349: URL: https://github.com/apache/airflow/pull/10349#discussion_r521675375 ## File path: airflow/operators/python.py ## @@ -255,6 +255,16 @@ def task( :type multiple_outputs: bool """ +# try to infer from type annotation +if python_callable and multiple_outputs is None: +sig = signature(python_callable).return_annotation +ttype = getattr(sig, "__origin__", None) + +if sig != inspect.Signature.empty and ttype in (dict, Dict): +multiple_outputs = True + +# in case its still none, then set False here +multiple_outputs = multiple_outputs or False Review comment: Should we be able to skip this line? `None` has boolean value of `False` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] turbaszek commented on a change in pull request #10349: infer multiple_output from return type annotation
turbaszek commented on a change in pull request #10349: URL: https://github.com/apache/airflow/pull/10349#discussion_r521674847 ## File path: airflow/operators/python.py ## @@ -255,6 +255,16 @@ def task( :type multiple_outputs: bool """ +# try to infer from type annotation +if python_callable and multiple_outputs is None: +sig = signature(python_callable).return_annotation +ttype = getattr(sig, "__origin__", None) + +if sig != inspect.Signature.empty and ttype in (dict, Dict): +multiple_outputs = True Review comment: ```suggestion sig = signature(python_callable).return_annotation ttype = getattr(sig, "__origin__", None) multiple_outputs = sig != inspect.Signature.empty and ttype in (dict, Dict) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] XD-DENG commented on a change in pull request #12126: Wait option for dagrun operator
XD-DENG commented on a change in pull request #12126: URL: https://github.com/apache/airflow/pull/12126#discussion_r521674296 ## File path: airflow/operators/dagrun_operator.py ## @@ -126,3 +144,22 @@ def execute(self, context: Dict): dag.clear(start_date=self.execution_date, end_date=self.execution_date) else: raise e + +if self.wait_for_completion: +# wait for dag to complete +while True: Review comment: Sounds a good plan to me. Thanks @turbaszek This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] turbaszek commented on a change in pull request #10349: infer multiple_output from return type annotation
turbaszek commented on a change in pull request #10349: URL: https://github.com/apache/airflow/pull/10349#discussion_r521674105 ## File path: docs/tutorial_taskflow_api.rst ## @@ -144,6 +144,22 @@ the dependencies as shown below. :end-before: [END main_flow] +Multiple outputs inference +-- +Tasks can also infer multiple outputs by using dict python typing. + +.. code-block:: python + +@task +def identity_dict(x: int, y: int) -> Dict[str, int]: +return {"x": x, "y": y} + +By using the typing ``Dict`` for the function return type, the ``multiple_outputs`` parameter +is automatically set to true. + +Not, If you manually set the ``multiple_outputs`` parameter the inference is disabled and Review comment: ```suggestion Note, if you manually set the ``multiple_outputs`` parameter the inference is disabled and ``` Or did you have something else in your mind? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] turbaszek commented on a change in pull request #12126: Wait option for dagrun operator
turbaszek commented on a change in pull request #12126: URL: https://github.com/apache/airflow/pull/12126#discussion_r521673165 ## File path: airflow/operators/dagrun_operator.py ## @@ -126,3 +144,22 @@ def execute(self, context: Dict): dag.clear(start_date=self.execution_date, end_date=self.execution_date) else: raise e + +if self.wait_for_completion: +# wait for dag to complete +while True: Review comment: @XD-DENG I agree, that's something we can do either here or in follow up PR fixing also the `ExternalTaskSensor` which also uses similar infinite loop (https://github.com/apache/airflow/pull/12126#discussion_r521662517). Personally I would be in favour of timeout. And probably something like `60 * 60 * 24` similar to what we have in `BaseSensor` to handle most of DAGs This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] turbaszek commented on a change in pull request #12126: Wait option for dagrun operator
turbaszek commented on a change in pull request #12126: URL: https://github.com/apache/airflow/pull/12126#discussion_r521673165 ## File path: airflow/operators/dagrun_operator.py ## @@ -126,3 +144,22 @@ def execute(self, context: Dict): dag.clear(start_date=self.execution_date, end_date=self.execution_date) else: raise e + +if self.wait_for_completion: +# wait for dag to complete +while True: Review comment: @XD-DENG I agree that's something we can do either here or in follow up PR fixing also the `ExternalTaskSensor` which also uses similar infinite loop. Personally I would be in favour of timeout. And probably something like `60 * 60 * 24` similar to what we have in `BaseSensor` to handle most of DAGs ## File path: airflow/operators/dagrun_operator.py ## @@ -126,3 +144,22 @@ def execute(self, context: Dict): dag.clear(start_date=self.execution_date, end_date=self.execution_date) else: raise e + +if self.wait_for_completion: +# wait for dag to complete +while True: Review comment: @XD-DENG I agree, that's something we can do either here or in follow up PR fixing also the `ExternalTaskSensor` which also uses similar infinite loop. Personally I would be in favour of timeout. And probably something like `60 * 60 * 24` similar to what we have in `BaseSensor` to handle most of DAGs This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] XD-DENG commented on a change in pull request #12126: Wait option for dagrun operator
XD-DENG commented on a change in pull request #12126: URL: https://github.com/apache/airflow/pull/12126#discussion_r521670483 ## File path: airflow/operators/dagrun_operator.py ## @@ -126,3 +144,22 @@ def execute(self, context: Dict): dag.clear(start_date=self.execution_date, end_date=self.execution_date) else: raise e + +if self.wait_for_completion: +# wait for dag to complete +while True: Review comment: I'm thinking if it's making sense to have something like `max_retries` or `timeout` here. Otherwise there is a chance that this becomes a dead loop in extreme circumstances. @turbaszek what do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] XD-DENG commented on a change in pull request #12126: Wait option for dagrun operator
XD-DENG commented on a change in pull request #12126: URL: https://github.com/apache/airflow/pull/12126#discussion_r521672872 ## File path: airflow/operators/dagrun_operator.py ## @@ -126,3 +144,22 @@ def execute(self, context: Dict): dag.clear(start_date=self.execution_date, end_date=self.execution_date) else: raise e + +if self.wait_for_completion: +# wait for dag to complete +while True: Review comment: Other than the idea/question above + my comments on the docstring, overall looks good to me. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] XD-DENG commented on a change in pull request #12126: Wait option for dagrun operator
XD-DENG commented on a change in pull request #12126: URL: https://github.com/apache/airflow/pull/12126#discussion_r521671664 ## File path: airflow/operators/dagrun_operator.py ## @@ -55,6 +57,14 @@ class TriggerDagRunOperator(BaseOperator): When reset_dag_run=False and dag run exists, DagRunAlreadyExists will be raised. When reset_dag_run=True and dag run exists, existing dag run will be cleared to rerun. :type reset_dag_run: bool +:param wait_for_completion: Whether or not wait for dag run completion. +:type wait_for_completion: bool +:param poke_interval: Poke internal to check dag run status when wait_for_completion=True. Review comment: And add the default value (60) in the docstring as well please. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] turbaszek commented on issue #12090: task_command.task_run log handling is either broken or redundant
turbaszek commented on issue #12090: URL: https://github.com/apache/airflow/issues/12090#issuecomment-725691765 > Or were you thinking of something else? I was confused by custom code I was fixing on fork π You are right, if we have separate handlers (different streams/files) everything works as expected. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] XD-DENG commented on a change in pull request #12126: Wait option for dagrun operator
XD-DENG commented on a change in pull request #12126: URL: https://github.com/apache/airflow/pull/12126#discussion_r521670483 ## File path: airflow/operators/dagrun_operator.py ## @@ -126,3 +144,22 @@ def execute(self, context: Dict): dag.clear(start_date=self.execution_date, end_date=self.execution_date) else: raise e + +if self.wait_for_completion: +# wait for dag to complete +while True: Review comment: I'm thinking if it's making sense to have something like `max_retries` or `timeout` here. Otherwise there is chance that this becomes a dead loop in extreme circumstances. @turbaszek what do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[airflow] branch master updated: Use default view in TriggerDagRunLink (#11778)
This is an automated email from the ASF dual-hosted git repository. turbaszek pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/master by this push: new 289c9b5 Use default view in TriggerDagRunLink (#11778) 289c9b5 is described below commit 289c9b5a994a3e26951ca23b6edd30b2329b3089 Author: Tomek Urbaszek AuthorDate: Wed Nov 11 23:11:53 2020 +0100 Use default view in TriggerDagRunLink (#11778) --- airflow/operators/dagrun_operator.py| 5 +++-- airflow/sensors/external_task_sensor.py | 5 +++-- airflow/utils/helpers.py| 12 tests/utils/test_helpers.py | 13 - 4 files changed, 30 insertions(+), 5 deletions(-) diff --git a/airflow/operators/dagrun_operator.py b/airflow/operators/dagrun_operator.py index 0ca0cf5..7547a0f 100644 --- a/airflow/operators/dagrun_operator.py +++ b/airflow/operators/dagrun_operator.py @@ -18,13 +18,13 @@ import datetime from typing import Dict, Optional, Union -from urllib.parse import quote from airflow.api.common.experimental.trigger_dag import trigger_dag from airflow.exceptions import DagNotFound, DagRunAlreadyExists from airflow.models import BaseOperator, BaseOperatorLink, DagBag, DagModel, DagRun from airflow.utils import timezone from airflow.utils.decorators import apply_defaults +from airflow.utils.helpers import build_airflow_url_with_query from airflow.utils.types import DagRunType @@ -37,7 +37,8 @@ class TriggerDagRunLink(BaseOperatorLink): name = 'Triggered DAG' def get_link(self, operator, dttm): -return f"/graph?dag_id={operator.trigger_dag_id}&root=&execution_date={quote(dttm.isoformat())}" +query = {"dag_id": operator.trigger_dag_id, "execution_date": dttm.isoformat()} +return build_airflow_url_with_query(query) class TriggerDagRunOperator(BaseOperator): diff --git a/airflow/sensors/external_task_sensor.py b/airflow/sensors/external_task_sensor.py index 06137a4..c72c0b7e 100644 --- a/airflow/sensors/external_task_sensor.py +++ b/airflow/sensors/external_task_sensor.py @@ -19,7 +19,6 @@ import datetime import os from typing import FrozenSet, Optional, Union -from urllib.parse import quote from sqlalchemy import func @@ -28,6 +27,7 @@ from airflow.models import BaseOperatorLink, DagBag, DagModel, DagRun, TaskInsta from airflow.operators.dummy_operator import DummyOperator from airflow.sensors.base_sensor_operator import BaseSensorOperator from airflow.utils.decorators import apply_defaults +from airflow.utils.helpers import build_airflow_url_with_query from airflow.utils.session import provide_session from airflow.utils.state import State @@ -41,7 +41,8 @@ class ExternalTaskSensorLink(BaseOperatorLink): name = 'External DAG' def get_link(self, operator, dttm): -return f"/graph?dag_id={operator.external_dag_id}&root=&execution_date={quote(dttm.isoformat())}" +query = {"dag_id": operator.external_dag_id, "execution_date": dttm.isoformat()} +return build_airflow_url_with_query(query) class ExternalTaskSensor(BaseSensorOperator): diff --git a/airflow/utils/helpers.py b/airflow/utils/helpers.py index 5ccb618..69ac5a0 100644 --- a/airflow/utils/helpers.py +++ b/airflow/utils/helpers.py @@ -22,9 +22,11 @@ from datetime import datetime from functools import reduce from itertools import filterfalse, tee from typing import Any, Callable, Dict, Generator, Iterable, List, Optional, TypeVar +from urllib import parse from jinja2 import Template +from airflow.configuration import conf from airflow.exceptions import AirflowException from airflow.utils.module_loading import import_string @@ -202,3 +204,13 @@ def cross_downstream(*args, **kwargs): stacklevel=2, ) return import_string('airflow.models.baseoperator.cross_downstream')(*args, **kwargs) + + +def build_airflow_url_with_query(query: Dict[str, Any]) -> str: +""" +Build airflow url using base_url and default_view and provided query +For example: + 'http://0.0.0.0:8000/base/graph?dag_id=my-task&root=&execution_date=2020-10-27T10%3A59%3A25.615587 +""" +view = conf.get('webserver', 'dag_default_view').lower() +return f"/{view}?{parse.urlencode(query)}" diff --git a/tests/utils/test_helpers.py b/tests/utils/test_helpers.py index 85c53c5..c53aa4f 100644 --- a/tests/utils/test_helpers.py +++ b/tests/utils/test_helpers.py @@ -23,7 +23,8 @@ from airflow.models import TaskInstance from airflow.models.dag import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.utils import helpers -from airflow.utils.helpers import merge_dicts +from airflow.utils.helpers import build_airflow_url_with_query, merge_dicts +from tests.test_utils.config import conf_vars class TestHelpers(unittest.TestCase): @@ -136,3 +137,13 @@ class TestHelpers(unittest.TestCase): dict2
[airflow] branch master updated: Use default view in TriggerDagRunLink (#11778)
This is an automated email from the ASF dual-hosted git repository. turbaszek pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/master by this push: new 289c9b5 Use default view in TriggerDagRunLink (#11778) 289c9b5 is described below commit 289c9b5a994a3e26951ca23b6edd30b2329b3089 Author: Tomek Urbaszek AuthorDate: Wed Nov 11 23:11:53 2020 +0100 Use default view in TriggerDagRunLink (#11778) --- airflow/operators/dagrun_operator.py| 5 +++-- airflow/sensors/external_task_sensor.py | 5 +++-- airflow/utils/helpers.py| 12 tests/utils/test_helpers.py | 13 - 4 files changed, 30 insertions(+), 5 deletions(-) diff --git a/airflow/operators/dagrun_operator.py b/airflow/operators/dagrun_operator.py index 0ca0cf5..7547a0f 100644 --- a/airflow/operators/dagrun_operator.py +++ b/airflow/operators/dagrun_operator.py @@ -18,13 +18,13 @@ import datetime from typing import Dict, Optional, Union -from urllib.parse import quote from airflow.api.common.experimental.trigger_dag import trigger_dag from airflow.exceptions import DagNotFound, DagRunAlreadyExists from airflow.models import BaseOperator, BaseOperatorLink, DagBag, DagModel, DagRun from airflow.utils import timezone from airflow.utils.decorators import apply_defaults +from airflow.utils.helpers import build_airflow_url_with_query from airflow.utils.types import DagRunType @@ -37,7 +37,8 @@ class TriggerDagRunLink(BaseOperatorLink): name = 'Triggered DAG' def get_link(self, operator, dttm): -return f"/graph?dag_id={operator.trigger_dag_id}&root=&execution_date={quote(dttm.isoformat())}" +query = {"dag_id": operator.trigger_dag_id, "execution_date": dttm.isoformat()} +return build_airflow_url_with_query(query) class TriggerDagRunOperator(BaseOperator): diff --git a/airflow/sensors/external_task_sensor.py b/airflow/sensors/external_task_sensor.py index 06137a4..c72c0b7e 100644 --- a/airflow/sensors/external_task_sensor.py +++ b/airflow/sensors/external_task_sensor.py @@ -19,7 +19,6 @@ import datetime import os from typing import FrozenSet, Optional, Union -from urllib.parse import quote from sqlalchemy import func @@ -28,6 +27,7 @@ from airflow.models import BaseOperatorLink, DagBag, DagModel, DagRun, TaskInsta from airflow.operators.dummy_operator import DummyOperator from airflow.sensors.base_sensor_operator import BaseSensorOperator from airflow.utils.decorators import apply_defaults +from airflow.utils.helpers import build_airflow_url_with_query from airflow.utils.session import provide_session from airflow.utils.state import State @@ -41,7 +41,8 @@ class ExternalTaskSensorLink(BaseOperatorLink): name = 'External DAG' def get_link(self, operator, dttm): -return f"/graph?dag_id={operator.external_dag_id}&root=&execution_date={quote(dttm.isoformat())}" +query = {"dag_id": operator.external_dag_id, "execution_date": dttm.isoformat()} +return build_airflow_url_with_query(query) class ExternalTaskSensor(BaseSensorOperator): diff --git a/airflow/utils/helpers.py b/airflow/utils/helpers.py index 5ccb618..69ac5a0 100644 --- a/airflow/utils/helpers.py +++ b/airflow/utils/helpers.py @@ -22,9 +22,11 @@ from datetime import datetime from functools import reduce from itertools import filterfalse, tee from typing import Any, Callable, Dict, Generator, Iterable, List, Optional, TypeVar +from urllib import parse from jinja2 import Template +from airflow.configuration import conf from airflow.exceptions import AirflowException from airflow.utils.module_loading import import_string @@ -202,3 +204,13 @@ def cross_downstream(*args, **kwargs): stacklevel=2, ) return import_string('airflow.models.baseoperator.cross_downstream')(*args, **kwargs) + + +def build_airflow_url_with_query(query: Dict[str, Any]) -> str: +""" +Build airflow url using base_url and default_view and provided query +For example: + 'http://0.0.0.0:8000/base/graph?dag_id=my-task&root=&execution_date=2020-10-27T10%3A59%3A25.615587 +""" +view = conf.get('webserver', 'dag_default_view').lower() +return f"/{view}?{parse.urlencode(query)}" diff --git a/tests/utils/test_helpers.py b/tests/utils/test_helpers.py index 85c53c5..c53aa4f 100644 --- a/tests/utils/test_helpers.py +++ b/tests/utils/test_helpers.py @@ -23,7 +23,8 @@ from airflow.models import TaskInstance from airflow.models.dag import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.utils import helpers -from airflow.utils.helpers import merge_dicts +from airflow.utils.helpers import build_airflow_url_with_query, merge_dicts +from tests.test_utils.config import conf_vars class TestHelpers(unittest.TestCase): @@ -136,3 +137,13 @@ class TestHelpers(unittest.TestCase): dict2
[GitHub] [airflow] turbaszek merged pull request #11778: Use default view in dr op link
turbaszek merged pull request #11778: URL: https://github.com/apache/airflow/pull/11778 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] XD-DENG commented on a change in pull request #12126: Wait option for dagrun operator
XD-DENG commented on a change in pull request #12126: URL: https://github.com/apache/airflow/pull/12126#discussion_r521668345 ## File path: airflow/operators/dagrun_operator.py ## @@ -55,6 +57,14 @@ class TriggerDagRunOperator(BaseOperator): When reset_dag_run=False and dag run exists, DagRunAlreadyExists will be raised. When reset_dag_run=True and dag run exists, existing dag run will be cleared to rerun. :type reset_dag_run: bool +:param wait_for_completion: Whether or not wait for dag run completion. +:type wait_for_completion: bool +:param poke_interval: Poke internal to check dag run status when wait_for_completion=True. Review comment: In addition, would be good to tell users what's the unit here (seconds). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] turbaszek commented on a change in pull request #12198: [WIP] Show DeprecationWarnings when listing DAGs
turbaszek commented on a change in pull request #12198: URL: https://github.com/apache/airflow/pull/12198#discussion_r521668051 ## File path: airflow/contrib/hooks/aws_athena_hook.py ## @@ -26,5 +26,4 @@ warnings.warn( "This module is deprecated. Please use `airflow.providers.amazon.aws.hooks.athena`.", DeprecationWarning, Review comment: > Does this change show correct stacklevel in all cases? The "correct stacklevel" is the main problem here π I'm not sure if this change will give users anything helpful. Why? Because the stacklevel never points to line in dagfile. I tried changing it from 0 to ~21... and I get the either from module level, dagbag or airflow cli. Never from the problematic DAG. That's why I think what is proposed in #11960 is more helpful for users. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] XD-DENG commented on a change in pull request #12126: Wait option for dagrun operator
XD-DENG commented on a change in pull request #12126: URL: https://github.com/apache/airflow/pull/12126#discussion_r521666703 ## File path: airflow/operators/dagrun_operator.py ## @@ -55,6 +57,14 @@ class TriggerDagRunOperator(BaseOperator): When reset_dag_run=False and dag run exists, DagRunAlreadyExists will be raised. When reset_dag_run=True and dag run exists, existing dag run will be cleared to rerun. :type reset_dag_run: bool +:param wait_for_completion: Whether or not wait for dag run completion. Review comment: Would be good to tell users what's the default value here ## File path: airflow/operators/dagrun_operator.py ## @@ -55,6 +57,14 @@ class TriggerDagRunOperator(BaseOperator): When reset_dag_run=False and dag run exists, DagRunAlreadyExists will be raised. When reset_dag_run=True and dag run exists, existing dag run will be cleared to rerun. :type reset_dag_run: bool +:param wait_for_completion: Whether or not wait for dag run completion. +:type wait_for_completion: bool +:param poke_interval: Poke internal to check dag run status when wait_for_completion=True. Review comment: typo "Poke internal"? Should be "Poke interval" This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] turbaszek commented on issue #12284: BigQueryGetDataOperator doesn't return data correctly
turbaszek commented on issue #12284: URL: https://github.com/apache/airflow/issues/12284#issuecomment-725685572 Awesome! Thank you @nathadfield ! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] nathadfield commented on issue #12284: BigQueryGetDataOperator doesn't return data correctly
nathadfield commented on issue #12284: URL: https://github.com/apache/airflow/issues/12284#issuecomment-725685081 @turbaszek Yep. I'll be submitting tomorrow. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] turbaszek commented on issue #12284: BigQueryGetDataOperator doesn't return data correctly
turbaszek commented on issue #12284: URL: https://github.com/apache/airflow/issues/12284#issuecomment-725684703 @nathadfield would you like to submit a fix? π This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] turbaszek commented on pull request #12126: Wait option for dagrun operator
turbaszek commented on pull request #12126: URL: https://github.com/apache/airflow/pull/12126#issuecomment-725683832 @kaxil @XD-DENG would you like to take a look? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] github-actions[bot] commented on pull request #12126: Wait option for dagrun operator
github-actions[bot] commented on pull request #12126: URL: https://github.com/apache/airflow/pull/12126#issuecomment-725683769 The PR needs to run all tests because it modifies core of Airflow! Please rebase it to latest master or ask committer to re-run it! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] turbaszek commented on a change in pull request #12126: Wait option for dagrun operator
turbaszek commented on a change in pull request #12126: URL: https://github.com/apache/airflow/pull/12126#discussion_r521662517 ## File path: airflow/operators/dagrun_operator.py ## @@ -126,3 +144,22 @@ def execute(self, context: Dict): dag.clear(start_date=self.execution_date, end_date=self.execution_date) else: raise e + +if self.wait_for_completion: +# wait for dag to complete +while True: +self.log.info( +'Waiting for %s on %s to become allowed state %s ...', +self.trigger_dag_id, +dag_run.execution_date, +self.allowed_states, +) +dag_run.refresh_from_db() +state = dag_run.state +if state in self.failed_states: +raise AirflowException(f"{self.trigger_dag_id} failed with failed states {state}") Review comment: @kukigai the same logic can be used in ExternalTaskSensor - would you like to refactor this sensor in separate PR? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] turbaszek commented on a change in pull request #12126: Wait option for dagrun operator
turbaszek commented on a change in pull request #12126: URL: https://github.com/apache/airflow/pull/12126#discussion_r521662136 ## File path: airflow/operators/dagrun_operator.py ## @@ -126,3 +144,22 @@ def execute(self, context: Dict): dag.clear(start_date=self.execution_date, end_date=self.execution_date) else: raise e + +if self.wait_for_completion: +# wait for dag to complete +while True: +self.log.info( +'Waiting for %s on %s to become allowed state %s ...', +self.trigger_dag_id, +dag_run.execution_date, +self.allowed_states, +) +dag_run.refresh_from_db() +state = dag_run.state +if state in self.failed_states: +raise AirflowException(f"{self.trigger_dag_id} failed with failed states {state}") +elif state in self.allowed_states: +self.log.info("%s finished with allowed state %s", self.trigger_dag_id, state) +return + +time.sleep(self.poke_interval) Review comment: Let's move the sleep to the beginning of the loop. It's a slight improvement but may stucture the code better This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] turbaszek commented on a change in pull request #12126: Wait option for dagrun operator
turbaszek commented on a change in pull request #12126: URL: https://github.com/apache/airflow/pull/12126#discussion_r521661723 ## File path: airflow/operators/dagrun_operator.py ## @@ -126,3 +144,22 @@ def execute(self, context: Dict): dag.clear(start_date=self.execution_date, end_date=self.execution_date) else: raise e + +if self.wait_for_completion: +# wait for dag to complete +while True: +self.log.info( +'Waiting for %s on %s to become allowed state %s ...', +self.trigger_dag_id, +dag_run.execution_date, +self.allowed_states, +) +dag_run.refresh_from_db() +state = dag_run.state +if state in self.failed_states: +raise AirflowException(f"{self.trigger_dag_id} failed with failed states {state}") +elif state in self.allowed_states: Review comment: ```suggestion if state in self.allowed_states: ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] wentaowanguc opened a new issue #12290: Got a sqlite3.OperationalError while runing airflow scheduler
wentaowanguc opened a new issue #12290: URL: https://github.com/apache/airflow/issues/12290 Hi Team: We got an error when we running airflow scheduler `2020-11-11_20:09:26.59088 Process DagFileProcessor34125-Process: 2020-11-11_20:09:26.59091 Traceback (most recent call last): 2020-11-11_20:09:26.59092 File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context 2020-11-11_20:09:26.59092 cursor, statement, parameters, context 2020-11-11_20:09:26.59093 File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/engine/default.py", line 593, in do_execute 2020-11-11_20:09:26.59093 cursor.execute(statement, parameters) 2020-11-11_20:09:26.59094 sqlite3.OperationalError: database is locked 2020-11-11_20:09:26.59094 2020-11-11_20:09:26.59094 The above exception was the direct cause of the following exception: 2020-11-11_20:09:26.59095 2020-11-11_20:09:26.59095 Traceback (most recent call last): 2020-11-11_20:09:26.59096 File "/usr/lib64/python3.6/multiprocessing/process.py", line 258, in _bootstrap 2020-11-11_20:09:26.59096 self.run() 2020-11-11_20:09:26.59097 File "/usr/lib64/python3.6/multiprocessing/process.py", line 93, in run 2020-11-11_20:09:26.59097 self._target(*self._args, **self._kwargs) 2020-11-11_20:09:26.59097 File "/usr/local/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 159, in _run_file_processor 2020-11-11_20:09:26.59098 pickle_dags) 2020-11-11_20:09:26.59098 File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 74, in wrapper 2020-11-11_20:09:26.59099 return func(*args, **kwargs) 2020-11-11_20:09:26.59099 File "/usr/local/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 1609, in process_file 2020-11-11_20:09:26.59102 dag.sync_to_db() 2020-11-11_20:09:26.59103 File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 74, in wrapper 2020-11-11_20:09:26.59105 return func(*args, **kwargs) 2020-11-11_20:09:26.59105 File "/usr/local/lib/python3.6/site-packages/airflow/models/dag.py", line 1515, in sync_to_db 2020-11-11_20:09:26.59106 DagModel).filter(DagModel.dag_id == self.dag_id).first() 2020-11-11_20:09:26.59106 File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/orm/query.py", line 3429, in first 2020-11-11_20:09:26.59107 ret = list(self[0:1]) 2020-11-11_20:09:26.59107 File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/orm/query.py", line 3203, in __getitem__ 2020-11-11_20:09:26.59107 return list(res) 2020-11-11_20:09:26.59108 File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/orm/query.py", line 3535, in __iter__ 2020-11-11_20:09:26.59108 return self._execute_and_instances(context) 2020-11-11_20:09:26.59109 File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/orm/query.py", line 3560, in _execute_and_instances 2020-11-11_20:09:26.59109 result = conn.execute(querycontext.statement, self._params) 2020-11-11_20:09:26.59109 File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 1011, in execute 2020-11-11_20:09:26.59112 return meth(self, multiparams, params) 2020-11-11_20:09:26.59113 File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection 2020-11-11_20:09:26.59113 return connection._execute_clauseelement(self, multiparams, params) 2020-11-11_20:09:26.59114 File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 1130, in _execute_clauseelement 2020-11-11_20:09:26.59114 distilled_params, 2020-11-11_20:09:26.59114 File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 1317, in _execute_context 2020-11-11_20:09:26.59115 e, statement, parameters, cursor, context 2020-11-11_20:09:26.59115 File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 1511, in _handle_dbapi_exception 2020-11-11_20:09:26.59116 sqlalchemy_exception, with_traceback=exc_info[2], from_=e 2020-11-11_20:09:26.59116 File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise_ 2020-11-11_20:09:26.59117 raise exception 2020-11-11_20:09:26.59117 File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context 2020-11-11_20:09:26.59117 cursor, statement, parameters, context 2020-11-11_20:09:26.59118 File "/usr/local/lib64/python3.6/site-packages/sqlalchemy/engine/default.py", line 593, in do_execute 2020-11-11_20:09:26.59121 cursor.execute(statement, parameters) 2020-11-11_20:09:26.59121 sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) database is locked 2020-11-11_20:09:26.59122 [SQL: SELECT dag.dag_id AS dag_dag_id, dag.root_dag_id AS da
[GitHub] [airflow] boring-cyborg[bot] commented on issue #12290: Got a sqlite3.OperationalError while runing airflow scheduler
boring-cyborg[bot] commented on issue #12290: URL: https://github.com/apache/airflow/issues/12290#issuecomment-725681803 Thanks for opening your first issue here! Be sure to follow the issue template! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] aaltay commented on a change in pull request #11726: Add DataflowJobStatusSensor and support non-blocking execution of jobs
aaltay commented on a change in pull request #11726: URL: https://github.com/apache/airflow/pull/11726#discussion_r521613015 ## File path: airflow/providers/google/cloud/hooks/dataflow.py ## @@ -278,18 +296,20 @@ def _check_dataflow_job_state(self, job) -> bool: :rtype: bool :raise: Exception """ -if DataflowJobStatus.JOB_STATE_DONE == job["currentState"]: +if self._wait_until_finished is None: +wait_until_finished = job['type'] != DataflowJobType.JOB_TYPE_STREAMING +else: +wait_until_finished = self._wait_until_finished + +if job['currentState'] == DataflowJobStatus.JOB_STATE_DONE: Review comment: DRAINED, UPDATED also needs to be in this list. They are alos terminal states. (https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#jobstate) ## File path: airflow/providers/google/cloud/sensors/dataflow.py ## @@ -0,0 +1,115 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""This module contains a Google Cloud Dataflow sensor.""" +from typing import Optional, Sequence, Set, Union + +from airflow.exceptions import AirflowException +from airflow.providers.google.cloud.hooks.dataflow import ( +DEFAULT_DATAFLOW_LOCATION, +DataflowHook, +DataflowJobStatus, +) +from airflow.sensors.base_sensor_operator import BaseSensorOperator +from airflow.utils.decorators import apply_defaults + + +class DataflowJobStatusSensor(BaseSensorOperator): +""" +Checks for the status of a job in Google Cloud Dataflow. + +:param job_id: ID of the job to be checked. +:type job_id: str +:param expected_statuses: The expected state of the operation. +See: + https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState +:type expected_statuses: Union[Set[str], str] +:param project_id: Optional, the Google Cloud project ID in which to start a job. +If set to None or missing, the default project_id from the Google Cloud connection is used. +:type project_id: str +:param location: The location of the Dataflow job (for example europe-west1). See: +https://cloud.google.com/dataflow/docs/concepts/regional-endpoints +:type location: str +:param gcp_conn_id: The connection ID to use connecting to Google Cloud. +:type gcp_conn_id: str +:param delegate_to: The account to impersonate using domain-wide delegation of authority, +if any. For this to work, the service account making the request must have +domain-wide delegation enabled. See: + https://developers.google.com/identity/protocols/oauth2/service-account#delegatingauthority +:type delegate_to: str +:param impersonation_chain: Optional service account to impersonate using short-term +credentials, or chained list of accounts required to get the access_token +of the last account in the list, which will be impersonated in the request. +If set as a string, the account must grant the originating account +the Service Account Token Creator IAM role. +If set as a sequence, the identities from the list must grant +Service Account Token Creator IAM role to the directly preceding identity, with first +account from the list granting this role to the originating account (templated). +:type impersonation_chain: Union[str, Sequence[str]] +""" + +template_fields = ['job_id'] + +@apply_defaults +def __init__( +self, +*, +job_id: str, +expected_statuses: Union[Set[str], str], +project_id: Optional[str] = None, +location: str = DEFAULT_DATAFLOW_LOCATION, +gcp_conn_id: str = 'google_cloud_default', +delegate_to: Optional[str] = None, +impersonation_chain: Optional[Union[str, Sequence[str]]] = None, +**kwargs, +) -> None: +super().__init__(**kwargs) +self.job_id = job_id +self.expected_statuses = ( +{expected_statuses} if isinstance(expected_statuses, str) else expected_statuses +) +self.project_id = project_id +self.location = location +
[GitHub] [airflow] kukigai commented on pull request #12126: Wait option for dagrun operator
kukigai commented on pull request #12126: URL: https://github.com/apache/airflow/pull/12126#issuecomment-725639995 @mik-laj if @turbaszek is busy, please review. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] potiuk commented on issue #12281: Add list of providers available in the docs
potiuk commented on issue #12281: URL: https://github.com/apache/airflow/issues/12281#issuecomment-725623882 Feel free! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] mik-laj commented on issue #12281: Add list of providers available in the docs
mik-laj commented on issue #12281: URL: https://github.com/apache/airflow/issues/12281#issuecomment-725623346 Are you working on it? I will be happy to do it, because I will also need it to build the final documentation. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] tomasfarias commented on a change in pull request #11964: Add new datetime branch operator
tomasfarias commented on a change in pull request #11964: URL: https://github.com/apache/airflow/pull/11964#discussion_r521589482 ## File path: airflow/operators/datetime_branch_operator.py ## @@ -0,0 +1,80 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import datetime +from typing import Dict, Iterable, Optional, Union + +from airflow.exceptions import AirflowException +from airflow.operators.branch_operator import BaseBranchOperator +from airflow.utils.decorators import apply_defaults + + +class DateTimeBranchOperator(BaseBranchOperator): +""" +Branches into one of two lists of tasks depending on the current datetime. + +True branch will be returned when `datetime.datetime.now()` falls below +`target_upper` and above `target_lower`. + +:param follow_task_ids_if_true: task id or task ids to follow if +`datetime.datetime.now()` falls above target_lower and below `target_upper`. +:type follow_task_ids_if_true: str or list[str] +:param follow_task_ids_if_false: task id or task ids to follow if +`datetime.datetime.now()` falls below target_lower or above `target_upper`. +:type follow_task_ids_if_false: str or list[str] +:param target_lower: target lower bound. +:type target_lower: Optional[datetime.datetime] +:param target_upper: target upper bound. +:type target_upper: Optional[datetime.datetime] +:param timezone: timezone to override local time. +:type timezone: Optional[datetime.timezone] +""" + +@apply_defaults +def __init__( +self, +*, +follow_task_ids_if_true: Union[str, Iterable[str]], +follow_task_ids_if_false: Union[str, Iterable[str]], +target_lower: Optional[datetime.datetime], +target_upper: Optional[datetime.datetime], +timezone: Optional[datetime.timezone] = None, Review comment: Unit tests added π This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] mik-laj commented on issue #12281: Add list of providers available in the docs
mik-laj commented on issue #12281: URL: https://github.com/apache/airflow/issues/12281#issuecomment-725614206 SGTM This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] potiuk edited a comment on issue #11965: Airflow fails to initdb with cattrs 1.1.0
potiuk edited a comment on issue #11965: URL: https://github.com/apache/airflow/issues/11965#issuecomment-725613679 > Didn't work for me :( What do you mean by 'didn't work"? Can you please provide some specifics on what you tried and how it did not work @gaj995 ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] potiuk commented on issue #11965: Airflow fails to initdb with cattrs 1.1.0
potiuk commented on issue #11965: URL: https://github.com/apache/airflow/issues/11965#issuecomment-725613679 > Didn't work for me :( What do you mean by 'didn't work"? Can you please provide some specifics on what you tried and how it did not work? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] potiuk commented on issue #12281: Add list of providers available in the docs
potiuk commented on issue #12281: URL: https://github.com/apache/airflow/issues/12281#issuecomment-725612544 > I think it shouldn't be in the documentation, but on our website. This way we will be able to update it independently of the rest of the documentation. For example: we will be able to issue providers every month, and Airflow with documentation every 3 months. This means that we will not have any documentation for 2 months for a new provider. Absolutely - this is our strategic direction. But I belive we also need a "tactical" solution for now - while people will be trying out Betas, I think it would be great to have a single page where they can simply see which providers are. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org