[GitHub] [airflow] oripwk commented on a change in pull request #8572: Improve template capabilities of EMR job and step operators
oripwk commented on a change in pull request #8572: URL: https://github.com/apache/airflow/pull/8572#discussion_r416360122 ## File path: airflow/providers/amazon/aws/operators/emr_add_steps.py ## @@ -38,13 +38,14 @@ class EmrAddStepsOperator(BaseOperator): :type cluster_states: list :param aws_conn_id: aws connection to uses :type aws_conn_id: str -:param steps: boto3 style steps to be added to the jobflow. (templated) -:type steps: list +:param steps: boto3 style steps or reference to a steps file (must be '.json' or '.jinja2') to +be added to the jobflow. (templated) +:type steps: list|str :param do_xcom_push: if True, job_flow_id is pushed to XCom with key job_flow_id. :type do_xcom_push: bool """ template_fields = ['job_flow_id', 'job_flow_name', 'cluster_states', 'steps'] -template_ext = () +template_ext = ('.json', '.jinja2') Review comment: I can remove it. Though it's a widespread convention, and also helps distinguish between real files and template files. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] boring-cyborg[bot] commented on issue #8599: Airflow DAGs On/Off toggle button not working properly with RBAC=True mode
boring-cyborg[bot] commented on issue #8599: URL: https://github.com/apache/airflow/issues/8599#issuecomment-620407902 Thanks for opening your first issue here! Be sure to follow the issue template! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] ytiam opened a new issue #8599: Airflow DAGs On/Off toggle button not working properly with RBAC=True mode
ytiam opened a new issue #8599: URL: https://github.com/apache/airflow/issues/8599 I am using an AWS ec2 instance and installed apache-airflow there. Following steps, I am doing in that instance, 1) After initiating airflow with 'airflow initdb', I created a user with **Admin** role using 'airflow create_user' option and in airflow.cfg changed rbac = True. 2) Again ran 'airflow initdb' and 'airflow webserver'. It initiated the webserver. 3) In another tab, run 'airflow scheduler' 4) Then using port-forwarding to port forward the 8080 port from the ec2 instance to my local machine 5) In my local browser, I am just opening localhost:8080/ 6) On Airflow login page, putting credentials of the Admin user, I logged in into Airflow 7) After toggling ON a dag from UI, when I am refreshing the page or clicking on some other option, the DAG toggling button is automatically turning OFF. and hence I am not able to run any DAG ![image](https://user-images.githubusercontent.com/13828878/80454358-8bd69280-8947-11ea-9fbf-e63b20e09d9b.png) The button I am talking about is on the left side of the DAG name. Please help anyone This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] oripwk commented on a change in pull request #8572: Improve template capabilities of EMR job and step operators
oripwk commented on a change in pull request #8572: URL: https://github.com/apache/airflow/pull/8572#discussion_r416360122 ## File path: airflow/providers/amazon/aws/operators/emr_add_steps.py ## @@ -38,13 +38,14 @@ class EmrAddStepsOperator(BaseOperator): :type cluster_states: list :param aws_conn_id: aws connection to uses :type aws_conn_id: str -:param steps: boto3 style steps to be added to the jobflow. (templated) -:type steps: list +:param steps: boto3 style steps or reference to a steps file (must be '.json' or '.jinja2') to +be added to the jobflow. (templated) +:type steps: list|str :param do_xcom_push: if True, job_flow_id is pushed to XCom with key job_flow_id. :type do_xcom_push: bool """ template_fields = ['job_flow_id', 'job_flow_name', 'cluster_states', 'steps'] -template_ext = () +template_ext = ('.json', '.jinja2') Review comment: I can remove it. Though it's a widespread convention, and also helps distinguish between real file and template files. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] Acehaidrey opened a new pull request #8598: make hive macros py3 compatible with decoded string return type
Acehaidrey opened a new pull request #8598: URL: https://github.com/apache/airflow/pull/8598 With the current implementation of the hive macros encoding the resultant from the metastore calls, in py2 this returns a string type still but in python3 encoding forces the representation to be a byte type. See the example below ``` ahaidrey-078HTD6:incubator-airflow ahaidrey$ python3 Python 3.7.0 (default, Oct 2 2018, 09:20:07) [Clang 10.0.0 (clang-1000.11.45.2)] on darwin Type "help", "copyright", "credits" or "license" for more information. >>> a = 'apple' >>> b = a.encode('utf-8') >>> type(b) ahaidrey-078HTD6:~ ahaidrey$ python Python 2.7.16 (default, Dec 3 2019, 02:03:47) [GCC 4.2.1 Compatible Apple LLVM 9.0.0 (clang-900.0.31)] on darwin Type "help", "copyright", "credits" or "license" for more information. >>> a = 'apple' >>> b = a.encode('utf-8') >>> type(b) ``` The issue with this is that the resultant for example being used by macros returns a byte type that isn't templatable as a string and breaks the queries it is used in. What this means is that all the templates need to be written as something like this: ``` val = "{{ macros.hive.max_partition(table='mytable', schema='myschema', field='myfield', filter_map={'key1': 'val1'}).decode('utf-8') }}" ``` Requiring from the users end to always decode the value is not the intention of this method and should use a value that can be returned as is. This PR is to fix this ordeal. We may be able to just remove the encoding altogether but it could make things backwards incompatible. --- Make sure to mark the boxes below before creating PR: [x] - [x] Description above provides context of the change - [x] Unit tests coverage for changes (not needed for documentation changes) - [x] Target Github ISSUE in description if exists (not existing) - [x] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)" - [x] Relevant documentation is updated including usage instructions. - [x] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example). --- In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed. In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x). In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md). Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] VinodKumarLogan commented on issue #8565: SalesforceHook missing method to return dataframe
VinodKumarLogan commented on issue #8565: URL: https://github.com/apache/airflow/issues/8565#issuecomment-620396479 Hi @jeffolsi @potiuk , can I give this a shot and submit a PR? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (AIRFLOW-4568) The ExternalTaskSensor should be configurable to raise an Airflow Exception in case the poked external task reaches a disallowed state, such as f.i. failed
[ https://issues.apache.org/jira/browse/AIRFLOW-4568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094158#comment-17094158 ] ASF GitHub Bot commented on AIRFLOW-4568: - lokeshlal commented on pull request #8509: URL: https://github.com/apache/airflow/pull/8509#issuecomment-620384248 Requesting review of this PR This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > The ExternalTaskSensor should be configurable to raise an Airflow Exception > in case the poked external task reaches a disallowed state, such as f.i. > failed > --- > > Key: AIRFLOW-4568 > URL: https://issues.apache.org/jira/browse/AIRFLOW-4568 > Project: Apache Airflow > Issue Type: Improvement > Components: operators >Affects Versions: 1.10.3 >Reporter: ddluke >Priority: Minor > > _As an engineer, I would like to have the behavior of the ExternalTaskSensor > changed_ > _So that it fails in case the poked external_task_id fails_ > *Therefore* > * I suggest extending the behavior of the sensor to optionally also query > the TaskInstance for disallowed states and raise an AirflowException if > found. Currently, if the poked external task reaches a failed state, the > sensor continues to poke and does not terminate > *Acceptance Criteria (from my pov)* > * The class interface for ExternalTaskSensor is extended with an additional > parameter, disallowed_states, which is an Optional List of > airflow.utils.state.State > * The poke method is expanded to count the number of rows from TaskInstance > which met the filter criteria dag_id, task_id, disallowed_states and > dttm_filter if disallowed_states is not None > * If disallowed_states is not None and the above query returns a counter > > 0, an Airflow Exception is thrown -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [airflow] lokeshlal commented on pull request #8509: [AIRFLOW-4568]The ExternalTaskSensor should be configurable to raise an Airflow Exception in case the poked external task reaches a disall
lokeshlal commented on pull request #8509: URL: https://github.com/apache/airflow/pull/8509#issuecomment-620384248 Requesting review of this PR This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] dhuang commented on a change in pull request #8422: Add Snowflake system test
dhuang commented on a change in pull request #8422: URL: https://github.com/apache/airflow/pull/8422#discussion_r416327461 ## File path: airflow/providers/snowflake/operators/s3_to_snowflake.py ## @@ -51,7 +51,7 @@ def __init__(self, table, stage, file_format, - schema, + schema, # TODO: shouldn't be required Review comment: Similar to database/warehouse/role values, the schema may already be set from the user's settings (`DEFAULT_NAMESPACE`) or in the session [set via connection extras](https://github.com/apache/airflow/blob/7d4b81ddd7f0c11532b0abac2050336b4f243b88/airflow/providers/snowflake/hooks/snowflake.py#L62). It can also be explicitly referenced in the table name, i.e. `my_schema.my_table`. Although there are good reasons to override the session default, I think that should be optional and is consistent with [SnowflakeOperator](https://github.com/apache/airflow/blob/7d4b81ddd7f0c11532b0abac2050336b4f243b88/airflow/providers/snowflake/operators/snowflake.py#L44-L46). Will update comment to be a little more specific. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (AIRFLOW-6231) Show DAG Run conf in graph view
[ https://issues.apache.org/jira/browse/AIRFLOW-6231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094155#comment-17094155 ] ASF GitHub Bot commented on AIRFLOW-6231: - dhuang commented on pull request #6794: URL: https://github.com/apache/airflow/pull/6794#issuecomment-620380349 > @dhuang Can you add a test to "When a conf is passed in DAG it appears in ListView"? > > Example: ccbaf57#diff-948e87b4f8f644b3ad8c7950958df033 Good call, added. Thanks for providing the example. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Show DAG Run conf in graph view > --- > > Key: AIRFLOW-6231 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6231 > Project: Apache Airflow > Issue Type: Improvement > Components: webserver >Affects Versions: 1.10.6 >Reporter: Daniel Huang >Assignee: Daniel Huang >Priority: Trivial > > A DAG run's conf (from triggered DAGs) isn't surfaced anywhere other than in > the database itself. Would be handy to show it when one exists. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [airflow] dhuang commented on pull request #6794: [AIRFLOW-6231] Display DAG run conf in the list view
dhuang commented on pull request #6794: URL: https://github.com/apache/airflow/pull/6794#issuecomment-620380349 > @dhuang Can you add a test to "When a conf is passed in DAG it appears in ListView"? > > Example: ccbaf57#diff-948e87b4f8f644b3ad8c7950958df033 Good call, added. Thanks for providing the example. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] stale[bot] commented on pull request #7645: [AIRFLOW-7004][depends on AIRFLOW-7003][WIP] Lazy initialize settings
stale[bot] commented on pull request #7645: URL: https://github.com/apache/airflow/pull/7645#issuecomment-620366010 This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] stale[bot] commented on pull request #5308: [AIRFLOW-4549] skipped tasks should be ok for wait_for_downstream
stale[bot] commented on pull request #5308: URL: https://github.com/apache/airflow/pull/5308#issuecomment-620366004 This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (AIRFLOW-7004) Lazy initialize settings
[ https://issues.apache.org/jira/browse/AIRFLOW-7004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094133#comment-17094133 ] ASF GitHub Bot commented on AIRFLOW-7004: - stale[bot] commented on pull request #7645: URL: https://github.com/apache/airflow/pull/7645#issuecomment-620366010 This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Lazy initialize settings > > > Key: AIRFLOW-7004 > URL: https://issues.apache.org/jira/browse/AIRFLOW-7004 > Project: Apache Airflow > Issue Type: Improvement > Components: core >Affects Versions: 1.10.9 >Reporter: Kamil Bregula >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (AIRFLOW-5948) Replace SimpleDag with serialized version
[ https://issues.apache.org/jira/browse/AIRFLOW-5948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094131#comment-17094131 ] ASF GitHub Bot commented on AIRFLOW-5948: - stale[bot] commented on pull request #7694: URL: https://github.com/apache/airflow/pull/7694#issuecomment-620366014 This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Replace SimpleDag with serialized version > - > > Key: AIRFLOW-5948 > URL: https://issues.apache.org/jira/browse/AIRFLOW-5948 > Project: Apache Airflow > Issue Type: Improvement > Components: core, scheduler >Affects Versions: 2.0.0, 1.10.7 >Reporter: Kaxil Naik >Assignee: Kaxil Naik >Priority: Critical > Labels: dag-serialization > > Replace SimpleDag with serialized version (json over multiprocessing) in > SchedulerJob etc., no other change in scheduler behaviour. (This doesn't make > sense long term, but does tidy up the code) > Currently, we have 2 Serialized Representation: > # SimpleDags (were created because SimpleDags were not pickleable) > # Serialized DAG > We should remove SimpleDags -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (AIRFLOW-4549) wait_for_downstream does not respect skipped tasks
[ https://issues.apache.org/jira/browse/AIRFLOW-4549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094132#comment-17094132 ] ASF GitHub Bot commented on AIRFLOW-4549: - stale[bot] commented on pull request #5308: URL: https://github.com/apache/airflow/pull/5308#issuecomment-620366004 This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > wait_for_downstream does not respect skipped tasks > -- > > Key: AIRFLOW-4549 > URL: https://issues.apache.org/jira/browse/AIRFLOW-4549 > Project: Apache Airflow > Issue Type: Bug > Components: scheduler >Reporter: Dima Kamalov >Assignee: Teddy Hartanto >Priority: Major > > See > [http://mail-archives.apache.org/mod_mbox/airflow-dev/201609.mbox/%3ccaheep7utgpjvkgww9_9n5fupnu+pskf3rmbvxugk5dxb6bh...@mail.gmail.com%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [airflow] stale[bot] commented on pull request #7694: [AIRFLOW-5948][WIP] Replace SimpleDag with SerializedDag
stale[bot] commented on pull request #7694: URL: https://github.com/apache/airflow/pull/7694#issuecomment-620366014 This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] randr97 edited a comment on pull request #8503: Facebook system test
randr97 edited a comment on pull request #8503: URL: https://github.com/apache/airflow/pull/8503#issuecomment-620362839 > I have run these tests and there are two minor problems. > > * The bucket is not created or deleted; > * The BigQuery is not deleted. Will add the create bucket and delete BigQ operator. Thanks @mik-laj :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] randr97 commented on pull request #8503: Facebook system test
randr97 commented on pull request #8503: URL: https://github.com/apache/airflow/pull/8503#issuecomment-620362839 > I have run these tests and there are two minor problems. > > * The bucket is not created or deleted; > * The BigQuery is not deleted. Will add the create bucket and delete BigQ operator. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (AIRFLOW-5156) Add other authentication mechanisms to HttpHook
[ https://issues.apache.org/jira/browse/AIRFLOW-5156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094123#comment-17094123 ] ASF GitHub Bot commented on AIRFLOW-5156: - randr97 commented on a change in pull request #8429: URL: https://github.com/apache/airflow/pull/8429#discussion_r416302309 ## File path: airflow/providers/http/hooks/http.py ## @@ -66,7 +75,7 @@ def get_conn(self, headers=None): if conn.port: self.base_url = self.base_url + ":" + str(conn.port) if conn.login: -session.auth = (conn.login, conn.password) +session.auth = self.auth_type(conn.login, conn.password) Review comment: @mik-laj the reason why I have given a default value is cus of backward compatibility. For other auth methods I think we will have to build stuff out. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add other authentication mechanisms to HttpHook > --- > > Key: AIRFLOW-5156 > URL: https://issues.apache.org/jira/browse/AIRFLOW-5156 > Project: Apache Airflow > Issue Type: Improvement > Components: hooks >Affects Versions: 1.10.4 >Reporter: Joshua Kornblum >Assignee: Rohit S S >Priority: Minor > > It looks like the only supported authentication for HttpHooks is basic auth. > The hook code shows > {quote}_if conn.login:_ > _session.auth = (conn.login, conn.password)_ > {quote} > requests library supports any auth that inherits AuthBase – in my scenario we > need ntlmauth for API on IIS server. > [https://2.python-requests.org/en/master/user/advanced/#custom-authentication] > I would suggest option to pass auth object in constructor then add to if/else > control flow like > {quote}_if self.auth is not None:_ > _session.auth = self.auth_ > _elif conn.login:_ > _session.auth = (conn.login, conn.password)_ > {quote} > One would have to fetch the connection themselves and then fill out auth and > then pass that to hook which is flexible although a little awkard. > {quote}api_conn = BaseHook().get_connection('my_api') > auth = HttpNtlmAuth(api_conn.login, api_conn.password) > HttpSensor(task_id='sensing', auth=auth, ) > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [airflow] randr97 commented on a change in pull request #8429: [AIRFLOW-5156] Added auth type to HttpHook
randr97 commented on a change in pull request #8429: URL: https://github.com/apache/airflow/pull/8429#discussion_r416302309 ## File path: airflow/providers/http/hooks/http.py ## @@ -66,7 +75,7 @@ def get_conn(self, headers=None): if conn.port: self.base_url = self.base_url + ":" + str(conn.port) if conn.login: -session.auth = (conn.login, conn.password) +session.auth = self.auth_type(conn.login, conn.password) Review comment: @mik-laj the reason why I have given a default value is cus of backward compatibility. For other auth methods I think we will have to build stuff out. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (AIRFLOW-5156) Add other authentication mechanisms to HttpHook
[ https://issues.apache.org/jira/browse/AIRFLOW-5156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094122#comment-17094122 ] ASF GitHub Bot commented on AIRFLOW-5156: - randr97 commented on a change in pull request #8429: URL: https://github.com/apache/airflow/pull/8429#discussion_r416302309 ## File path: airflow/providers/http/hooks/http.py ## @@ -66,7 +75,7 @@ def get_conn(self, headers=None): if conn.port: self.base_url = self.base_url + ":" + str(conn.port) if conn.login: -session.auth = (conn.login, conn.password) +session.auth = self.auth_type(conn.login, conn.password) Review comment: @mik-laj the reason why I have given a default value is cus of backward compatibility. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add other authentication mechanisms to HttpHook > --- > > Key: AIRFLOW-5156 > URL: https://issues.apache.org/jira/browse/AIRFLOW-5156 > Project: Apache Airflow > Issue Type: Improvement > Components: hooks >Affects Versions: 1.10.4 >Reporter: Joshua Kornblum >Assignee: Rohit S S >Priority: Minor > > It looks like the only supported authentication for HttpHooks is basic auth. > The hook code shows > {quote}_if conn.login:_ > _session.auth = (conn.login, conn.password)_ > {quote} > requests library supports any auth that inherits AuthBase – in my scenario we > need ntlmauth for API on IIS server. > [https://2.python-requests.org/en/master/user/advanced/#custom-authentication] > I would suggest option to pass auth object in constructor then add to if/else > control flow like > {quote}_if self.auth is not None:_ > _session.auth = self.auth_ > _elif conn.login:_ > _session.auth = (conn.login, conn.password)_ > {quote} > One would have to fetch the connection themselves and then fill out auth and > then pass that to hook which is flexible although a little awkard. > {quote}api_conn = BaseHook().get_connection('my_api') > auth = HttpNtlmAuth(api_conn.login, api_conn.password) > HttpSensor(task_id='sensing', auth=auth, ) > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [airflow] randr97 commented on a change in pull request #8429: [AIRFLOW-5156] Added auth type to HttpHook
randr97 commented on a change in pull request #8429: URL: https://github.com/apache/airflow/pull/8429#discussion_r416302309 ## File path: airflow/providers/http/hooks/http.py ## @@ -66,7 +75,7 @@ def get_conn(self, headers=None): if conn.port: self.base_url = self.base_url + ":" + str(conn.port) if conn.login: -session.auth = (conn.login, conn.password) +session.auth = self.auth_type(conn.login, conn.password) Review comment: @mik-laj the reason why I have given a default value is cus of backward compatibility. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (AIRFLOW-6981) Move Google Cloud Build from Discovery API to Python Library
[ https://issues.apache.org/jira/browse/AIRFLOW-6981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094118#comment-17094118 ] ASF GitHub Bot commented on AIRFLOW-6981: - mik-laj edited a comment on pull request #8575: URL: https://github.com/apache/airflow/pull/8575#issuecomment-620357784 Please ignore tests in quarantine. They are flaky. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Move Google Cloud Build from Discovery API to Python Library > > > Key: AIRFLOW-6981 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6981 > Project: Apache Airflow > Issue Type: Improvement > Components: gcp >Affects Versions: 2.0.0 >Reporter: Ryan Yuan >Assignee: Ryan Yuan >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (AIRFLOW-6981) Move Google Cloud Build from Discovery API to Python Library
[ https://issues.apache.org/jira/browse/AIRFLOW-6981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094117#comment-17094117 ] ASF GitHub Bot commented on AIRFLOW-6981: - mik-laj commented on pull request #8575: URL: https://github.com/apache/airflow/pull/8575#issuecomment-620357784 Please ignore tests in quarantine. It's flaky. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Move Google Cloud Build from Discovery API to Python Library > > > Key: AIRFLOW-6981 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6981 > Project: Apache Airflow > Issue Type: Improvement > Components: gcp >Affects Versions: 2.0.0 >Reporter: Ryan Yuan >Assignee: Ryan Yuan >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [airflow] mik-laj edited a comment on pull request #8575: [AIRFLOW-6981] Move Google Cloud Build from Discovery API to Python Library
mik-laj edited a comment on pull request #8575: URL: https://github.com/apache/airflow/pull/8575#issuecomment-620357784 Please ignore tests in quarantine. They are flaky. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] mik-laj commented on pull request #8575: [AIRFLOW-6981] Move Google Cloud Build from Discovery API to Python Library
mik-laj commented on pull request #8575: URL: https://github.com/apache/airflow/pull/8575#issuecomment-620357784 Please ignore tests in quarantine. It's flaky. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (AIRFLOW-6981) Move Google Cloud Build from Discovery API to Python Library
[ https://issues.apache.org/jira/browse/AIRFLOW-6981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094113#comment-17094113 ] ASF GitHub Bot commented on AIRFLOW-6981: - ryanyuan commented on pull request #8575: URL: https://github.com/apache/airflow/pull/8575#issuecomment-620354199 Cheers @potiuk. This time it failed at Quarantined:Pg9.6,Py3.6. But the error log didn't say much. Any suggestions? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Move Google Cloud Build from Discovery API to Python Library > > > Key: AIRFLOW-6981 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6981 > Project: Apache Airflow > Issue Type: Improvement > Components: gcp >Affects Versions: 2.0.0 >Reporter: Ryan Yuan >Assignee: Ryan Yuan >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [airflow] ryanyuan commented on pull request #8575: [AIRFLOW-6981] Move Google Cloud Build from Discovery API to Python Library
ryanyuan commented on pull request #8575: URL: https://github.com/apache/airflow/pull/8575#issuecomment-620354199 Cheers @potiuk. This time it failed at Quarantined:Pg9.6,Py3.6. But the error log didn't say much. Any suggestions? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] zhongjiajie commented on pull request #8534: fix: aws hook should work without conn id
zhongjiajie commented on pull request #8534: URL: https://github.com/apache/airflow/pull/8534#issuecomment-620349037 Thanks @houqp This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] mik-laj commented on a change in pull request #8529: Split and improve BigQuery example DAG
mik-laj commented on a change in pull request #8529: URL: https://github.com/apache/airflow/pull/8529#discussion_r415907894 ## File path: airflow/providers/google/cloud/example_dags/example_bigquery_transfer.py ## @@ -0,0 +1,82 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Example Airflow DAG for Google BigQuery service. +""" +import os + +from airflow import models +from airflow.providers.google.cloud.operators.bigquery import ( +BigQueryCreateEmptyDatasetOperator, BigQueryCreateEmptyTableOperator, BigQueryDeleteDatasetOperator, +) +from airflow.providers.google.cloud.operators.bigquery_to_bigquery import BigQueryToBigQueryOperator +from airflow.providers.google.cloud.operators.bigquery_to_gcs import BigQueryToGCSOperator +from airflow.utils.dates import days_ago + +PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project") +DATASET_NAME = os.environ.get("GCP_BIGQUERY_DATASET_NAME", "test_dataset_transfer") +DATA_EXPORT_BUCKET_NAME = os.environ.get( +"GCP_BIGQUERY_EXPORT_BUCKET_NAME", "test-bigquery-sample-data" +) +ORIGIN = "origin" +TARGET = "target" + +default_args = {"start_date": days_ago(1)} + +with models.DAG( +"example_bigquery_transfer", +default_args=default_args, +schedule_interval=None, # Override to match your needs +tags=["example"], +) as dag: +copy_selected_data = BigQueryToBigQueryOperator( Review comment: I would prefer that each module have separate unit tests, a separate guide and separate system tests. Otherwise it will be difficult for us to determine coverage. A few days ago one person found one test that was in the wrong file. https://github.com/apache/airflow/pull/8556/files This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] kennknowles commented on a change in pull request #8553: Add DataflowStartSQLQuery operator
kennknowles commented on a change in pull request #8553: URL: https://github.com/apache/airflow/pull/8553#discussion_r416287771 ## File path: airflow/providers/google/cloud/hooks/dataflow.py ## @@ -93,15 +94,30 @@ def inner_wrapper(self: "DataflowHook", *args, **kwargs) -> RT: class DataflowJobStatus: """ Helper class with Dataflow job statuses. +Reference: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState """ -JOB_STATE_DONE = "JOB_STATE_DONE" +JOB_STATE_UNKNOWN = "JOB_STATE_UNKNOWN" +JOB_STATE_STOPPED = "JOB_STATE_STOPPED" JOB_STATE_RUNNING = "JOB_STATE_RUNNING" +JOB_STATE_DONE = "JOB_STATE_DONE" JOB_STATE_FAILED = "JOB_STATE_FAILED" JOB_STATE_CANCELLED = "JOB_STATE_CANCELLED" +JOB_STATE_UPDATED = "JOB_STATE_UPDATED" +JOB_STATE_DRAINING = "JOB_STATE_DRAINING" +JOB_STATE_DRAINED = "JOB_STATE_DRAINED" JOB_STATE_PENDING = "JOB_STATE_PENDING" -FAILED_END_STATES = {JOB_STATE_FAILED, JOB_STATE_CANCELLED} -SUCCEEDED_END_STATES = {JOB_STATE_DONE} -END_STATES = SUCCEEDED_END_STATES | FAILED_END_STATES +JOB_STATE_CANCELLING = "JOB_STATE_CANCELLING" +JOB_STATE_QUEUED = "JOB_STATE_QUEUED" +FAILED_END_STATES = {JOB_STATE_FAILED, JOB_STATE_CANCELLED, JOB_STATE_STOPPED} +SUCCEEDED_END_STATES = {JOB_STATE_DONE, JOB_STATE_UPDATED, JOB_STATE_DRAINED} +TERMINAL_STATES = SUCCEEDED_END_STATES | FAILED_END_STATES Review comment: I do not know airflow that well. I just tried to see how these variables were used. I missed the place where they actually affect the Airflow result. It is a good idea to let the user say what they expect, and then a failure can be anything else. Example: we have had real use cases where we deliberately cancel jobs we do not need anymore, and that can be success for streaming jobs. I think the only certain failed state is JOB_STATE_FAILED. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] aaltay commented on a change in pull request #8553: Add DataflowStartSQLQuery operator
aaltay commented on a change in pull request #8553: URL: https://github.com/apache/airflow/pull/8553#discussion_r416284747 ## File path: airflow/providers/google/cloud/hooks/dataflow.py ## @@ -783,6 +794,77 @@ def cancel_job( name=job_name, job_id=job_id, location=location, -poll_sleep=self.poll_sleep +poll_sleep=self.poll_sleep, +num_retries=self.num_retries, ) jobs_controller.cancel() + +@GoogleBaseHook.fallback_to_default_project_id +def start_sql_job( +self, +job_name: str, +query: str, +options: Dict[str, Any], +project_id: str, +location: str = DEFAULT_DATAFLOW_LOCATION, +on_new_job_id_callback: Optional[Callable[[str], None]] = None +): +""" +Starts Dataflow SQL query. + +:param job_name: The unique name to assign to the Cloud Dataflow job. +:type job_name: str +:param query: The SQL query to execute. +:type query: str +:param options: Job parameters to be executed. +For more information, look at: + `https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query +`__ +command reference +:param location: The location of the Dataflow job (for example europe-west1) +:type location: str +:param project_id: The ID of the GCP project that owns the job. +If set to ``None`` or missing, the default project_id from the GCP connection is used. +:type project_id: Optional[str] +:param on_new_job_id_callback: Callback called when the job ID is known. +:type on_new_job_id_callback: callable +:return: the new job object +""" +cmd = [ +'gcloud', +'beta', +'dataflow', +'sql', +'query', +query, +f'--project={project_id}', +'--format=value(job.id)', +f'--job-name={job_name}', +f'--region={location}', +*(self._options_to_args(options)) +] +self.log.info("Executing command: %s", " ".join([shlex.quote(c) for c in cmd])) +with self.provide_authorized_gcloud(): Review comment: Is this really required if it is only for logs? subprocess.run does not need to escape them anyway. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] mik-laj opened a new pull request #8597: Fix example for Local Filesystem Secrets Backend
mik-laj opened a new pull request #8597: URL: https://github.com/apache/airflow/pull/8597 --- Make sure to mark the boxes below before creating PR: [x] - [X] Description above provides context of the change - [X] Unit tests coverage for changes (not needed for documentation changes) - [X] Target Github ISSUE in description if exists - [X] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)" - [X] Relevant documentation is updated including usage instructions. - [X] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example). --- In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed. In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x). In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md). Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] mik-laj opened a new pull request #8596: Add Local Filesystem Secret Backend (v1-10)
mik-laj opened a new pull request #8596: URL: https://github.com/apache/airflow/pull/8596 Backport: https://github.com/apache/airflow/pull/8436 --- Make sure to mark the boxes below before creating PR: [x] - [X] Description above provides context of the change - [X] Unit tests coverage for changes (not needed for documentation changes) - [X] Target Github ISSUE in description if exists - [X] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)" - [X] Relevant documentation is updated including usage instructions. - [X] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example). --- In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed. In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x). In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md). Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] zhongjiajie commented on pull request #8591: Add http system test
zhongjiajie commented on pull request #8591: URL: https://github.com/apache/airflow/pull/8591#issuecomment-620322452 I think it's failed due to ```log 2020-04-27T18:10:16.7233125Z [01mlooking for now-outdated files... [39;49;00mnone found 2020-04-27T18:10:16.9090056Z [01mpickling environment... [39;49;00mdone 2020-04-27T18:10:16.9090564Z [01mchecking consistency... [39;49;00m[91m/opt/airflow/docs/howto/operator/http/http.rst: WARNING: document isn't included in any toctree[39;49;00m ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] zhongjiajie commented on pull request #8591: Add http system test
zhongjiajie commented on pull request #8591: URL: https://github.com/apache/airflow/pull/8591#issuecomment-620321096 > @kaxil fixed the doc issues but wait for ci before you merge please 😁 ..did not test the docs. I also changed the task var names and I hope I refactored it correctly. (if there are issues I am gonna fix it tomorrow :)) FYI, still failed 😢 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] dimberman opened a new pull request #8595: test pulling docker iamge from github
dimberman opened a new pull request #8595: URL: https://github.com/apache/airflow/pull/8595 --- Make sure to mark the boxes below before creating PR: [x] - [ ] Description above provides context of the change - [ ] Unit tests coverage for changes (not needed for documentation changes) - [ ] Target Github ISSUE in description if exists - [ ] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)" - [ ] Relevant documentation is updated including usage instructions. - [ ] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example). --- In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed. In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x). In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md). Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] mik-laj commented on a change in pull request #8531: Support all RuntimeEnvironment parameters in DataflowTemplatedJobStartOperator
mik-laj commented on a change in pull request #8531: URL: https://github.com/apache/airflow/pull/8531#discussion_r416195651 ## File path: airflow/providers/google/cloud/hooks/dataflow.py ## @@ -548,23 +554,17 @@ def start_template_dataflow( :type location: str """ name = self._build_dataflow_job_name(job_name, append_job_name) -# Builds RuntimeEnvironment from variables dictionary -# https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment -environment = {} -for key in ['numWorkers', 'maxWorkers', 'zone', 'serviceAccountEmail', -'tempLocation', 'bypassTempDirValidation', 'machineType', -'additionalExperiments', 'network', 'subnetwork', 'additionalUserLabels']: -if key in variables: -environment.update({key: variables[key]}) -body = {"jobName": name, -"parameters": parameters, -"environment": environment} + service = self.get_conn() request = service.projects().locations().templates().launch( # pylint: disable=no-member projectId=project_id, location=location, gcsPath=dataflow_template, -body=body +body={ +"jobName": name, +"parameters": parameters, +"environment": variables Review comment: I want to change names for more parameters in many places. I will probably do it with a special decorator that will allow me to do it cleverly. ``` @rename_parameter({'variables': 'environment'}) def start_template_dataflow( self, job_name: str, variables: Dict, parameters: Dict, dataflow_template: str, project_id: str, append_job_name: bool = True, on_new_job_id_callback: Optional[Callable[[str], None]] = None, location: str = DEFAULT_DATAFLOW_LOCATION ) -> Dict: ``` That way, we won't have so much outdated code in the repository. This change from a perspective will be much more problematic and I would like to test it in various IDEs, etc ![Screenshot 2020-04-28 at 00 31 57](https://user-images.githubusercontent.com/12058428/80427302-bee93980-88e7-11ea-97bb-a8b0bdbec6f2.png) I would like to get similar messages when the user updates the operator. It looks simple but still needs testing and I will do it in a separate PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] dinigo edited a comment on issue #8585: Integrate Google Analytics Reporting API from plugin
dinigo edited a comment on issue #8585: URL: https://github.com/apache/airflow/issues/8585#issuecomment-620267476 I just checked the Orchestra project, which seems to be a "plug'n play" version of the Google operators we have in the master (I suppose it will be obsolete once the operators backport is officially released from apache-airflow). As it is basically a copy of the code in airflow/providers/google it also lacks this API. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] dinigo commented on issue #8585: Integrate Google Analytics Reporting API from plugin
dinigo commented on issue #8585: URL: https://github.com/apache/airflow/issues/8585#issuecomment-620267476 I just checked the Orchestra project, which seems to be a "plug'n play" version of the Google operators we have in the master (I suppose it will be obsolete once the operators backport is officially released from apache-airflow) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] mik-laj commented on a change in pull request #8553: Add DataflowStartSQLQuery operator
mik-laj commented on a change in pull request #8553: URL: https://github.com/apache/airflow/pull/8553#discussion_r416189250 ## File path: airflow/providers/google/cloud/hooks/dataflow.py ## @@ -783,6 +794,77 @@ def cancel_job( name=job_name, job_id=job_id, location=location, -poll_sleep=self.poll_sleep +poll_sleep=self.poll_sleep, +num_retries=self.num_retries, ) jobs_controller.cancel() + +@GoogleBaseHook.fallback_to_default_project_id +def start_sql_job( +self, +job_name: str, +query: str, +options: Dict[str, Any], +project_id: str, +location: str = DEFAULT_DATAFLOW_LOCATION, +on_new_job_id_callback: Optional[Callable[[str], None]] = None +): +""" +Starts Dataflow SQL query. + +:param job_name: The unique name to assign to the Cloud Dataflow job. +:type job_name: str +:param query: The SQL query to execute. +:type query: str +:param options: Job parameters to be executed. +For more information, look at: + `https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query +`__ +command reference +:param location: The location of the Dataflow job (for example europe-west1) +:type location: str +:param project_id: The ID of the GCP project that owns the job. +If set to ``None`` or missing, the default project_id from the GCP connection is used. +:type project_id: Optional[str] +:param on_new_job_id_callback: Callback called when the job ID is known. +:type on_new_job_id_callback: callable +:return: the new job object +""" +cmd = [ +'gcloud', +'beta', +'dataflow', +'sql', +'query', +query, +f'--project={project_id}', +'--format=value(job.id)', +f'--job-name={job_name}', +f'--region={location}', +*(self._options_to_args(options)) +] +self.log.info("Executing command: %s", " ".join([shlex.quote(c) for c in cmd])) +with self.provide_authorized_gcloud(): Review comment: These logs are available in Airflow Web UI, so a normal user can easily access them. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] dinigo commented on issue #8585: Integrate Google Analytics Reporting API from plugin
dinigo commented on issue #8585: URL: https://github.com/apache/airflow/issues/8585#issuecomment-620265270 No, we lack the reporting API that lets you compose a report and download a dataset for this report. It's the easiest way to take Data out of Google Analytics. it was there long before the BigQiery export so a lot of business logic for a lot of companies relay in this export. I've seen it "Cron"ified very frequently. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] mik-laj commented on a change in pull request #8553: Add DataflowStartSQLQuery operator
mik-laj commented on a change in pull request #8553: URL: https://github.com/apache/airflow/pull/8553#discussion_r416187351 ## File path: airflow/providers/google/cloud/hooks/dataflow.py ## @@ -783,6 +794,77 @@ def cancel_job( name=job_name, job_id=job_id, location=location, -poll_sleep=self.poll_sleep +poll_sleep=self.poll_sleep, +num_retries=self.num_retries, ) jobs_controller.cancel() + +@GoogleBaseHook.fallback_to_default_project_id +def start_sql_job( +self, +job_name: str, +query: str, +options: Dict[str, Any], +project_id: str, +location: str = DEFAULT_DATAFLOW_LOCATION, +on_new_job_id_callback: Optional[Callable[[str], None]] = None +): +""" +Starts Dataflow SQL query. + +:param job_name: The unique name to assign to the Cloud Dataflow job. +:type job_name: str +:param query: The SQL query to execute. +:type query: str +:param options: Job parameters to be executed. +For more information, look at: + `https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query +`__ +command reference +:param location: The location of the Dataflow job (for example europe-west1) +:type location: str +:param project_id: The ID of the GCP project that owns the job. +If set to ``None`` or missing, the default project_id from the GCP connection is used. +:type project_id: Optional[str] +:param on_new_job_id_callback: Callback called when the job ID is known. +:type on_new_job_id_callback: callable +:return: the new job object +""" +cmd = [ +'gcloud', +'beta', +'dataflow', +'sql', +'query', +query, +f'--project={project_id}', +'--format=value(job.id)', +f'--job-name={job_name}', +f'--region={location}', +*(self._options_to_args(options)) +] +self.log.info("Executing command: %s", " ".join([shlex.quote(c) for c in cmd])) +with self.provide_authorized_gcloud(): Review comment: This is only for logs. I used it to test this operator. A normal user will not copy it, but it may be helpful to him for debugging only. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] mik-laj commented on a change in pull request #8550: Add DataflowStartFlexTemplateOperator
mik-laj commented on a change in pull request #8550: URL: https://github.com/apache/airflow/pull/8550#discussion_r416184937 ## File path: tests/providers/google/cloud/operators/test_dataflow_system.py ## @@ -26,4 +37,198 @@ class CloudDataflowExampleDagsSystemTest(GoogleSystemTest): @provide_gcp_context(GCP_DATAFLOW_KEY) def test_run_example_dag_function(self): -self.run_dag('example_gcp_dataflow', CLOUD_DAG_FOLDER) +self.run_dag("example_gcp_dataflow", CLOUD_DAG_FOLDER) + + +GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project") +GCR_FLEX_TEMPLATE_IMAGE = f"gcr.io/{GCP_PROJECT_ID}/samples-dataflow-streaming-beam-sql:latest" + +# https://github.com/GoogleCloudPlatform/java-docs-samples/tree/954553c/dataflow/flex-templates/streaming_beam_sql +GCS_TEMPLATE_PARTS = urlparse(GCS_FLEX_TEMPLATE_TEMPLATE_PATH) +GCS_FLEX_TEMPLATE_BUCKET_NAME = GCS_TEMPLATE_PARTS.netloc + + +EXAMPLE_FLEX_TEMPLATE_REPO = "GoogleCloudPlatform/java-docs-samples" +EXAMPLE_FLEX_TEMPLATE_COMMIT = "deb0745be1d1ac1d133e1f0a7faa9413dbfbe5fe" +EXAMPLE_FLEX_TEMPLATE_SUBDIR = "dataflow/flex-templates/streaming_beam_sql" + + +@pytest.mark.backend("mysql", "postgres") +@pytest.mark.credential_file(GCP_GCS_TRANSFER_KEY) +class CloudDataflowExampleDagFlexTemplateJavagSystemTest(GoogleSystemTest): +@provide_gcp_context(GCP_GCS_TRANSFER_KEY, project_id=GoogleSystemTest._project_id()) +def setUp(self) -> None: +# Create a Cloud Storage bucket +self.execute_cmd(["gsutil", "mb", f"gs://{GCS_FLEX_TEMPLATE_BUCKET_NAME}"]) + +# Build image with pipeline +with NamedTemporaryFile() as f: +f.write( +textwrap.dedent( +"""\ +steps: Review comment: The Cloud Build service will provide a new virtual machine for each build. Each build contains many steps. Each step is described by a container image. I personally would not worry about a large number of images, because after the build is completed, the virtual machine is deleted. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] mik-laj commented on a change in pull request #8550: Add DataflowStartFlexTemplateOperator
mik-laj commented on a change in pull request #8550: URL: https://github.com/apache/airflow/pull/8550#discussion_r416185616 ## File path: tests/providers/google/cloud/operators/test_dataflow_system.py ## @@ -26,4 +37,198 @@ class CloudDataflowExampleDagsSystemTest(GoogleSystemTest): @provide_gcp_context(GCP_DATAFLOW_KEY) def test_run_example_dag_function(self): -self.run_dag('example_gcp_dataflow', CLOUD_DAG_FOLDER) +self.run_dag("example_gcp_dataflow", CLOUD_DAG_FOLDER) + + +GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project") +GCR_FLEX_TEMPLATE_IMAGE = f"gcr.io/{GCP_PROJECT_ID}/samples-dataflow-streaming-beam-sql:latest" + +# https://github.com/GoogleCloudPlatform/java-docs-samples/tree/954553c/dataflow/flex-templates/streaming_beam_sql +GCS_TEMPLATE_PARTS = urlparse(GCS_FLEX_TEMPLATE_TEMPLATE_PATH) +GCS_FLEX_TEMPLATE_BUCKET_NAME = GCS_TEMPLATE_PARTS.netloc + + +EXAMPLE_FLEX_TEMPLATE_REPO = "GoogleCloudPlatform/java-docs-samples" +EXAMPLE_FLEX_TEMPLATE_COMMIT = "deb0745be1d1ac1d133e1f0a7faa9413dbfbe5fe" +EXAMPLE_FLEX_TEMPLATE_SUBDIR = "dataflow/flex-templates/streaming_beam_sql" + + +@pytest.mark.backend("mysql", "postgres") +@pytest.mark.credential_file(GCP_GCS_TRANSFER_KEY) +class CloudDataflowExampleDagFlexTemplateJavagSystemTest(GoogleSystemTest): +@provide_gcp_context(GCP_GCS_TRANSFER_KEY, project_id=GoogleSystemTest._project_id()) +def setUp(self) -> None: +# Create a Cloud Storage bucket +self.execute_cmd(["gsutil", "mb", f"gs://{GCS_FLEX_TEMPLATE_BUCKET_NAME}"]) + +# Build image with pipeline +with NamedTemporaryFile() as f: +f.write( +textwrap.dedent( +"""\ +steps: Review comment: I removed some build steps. https://github.com/apache/airflow/pull/8550/commits/aa29389cac91f29f5f37b268fa61c0a3eaabd663 In order to build this Docker image I used Google Cloud Build. The Cloud Build service will provide a new virtual machine for each build. Each build contains many steps. Each step is described by a Docker image. As a result of the build, a new Docker image is built. I personally would not worry about a large number of images, because after the build is completed, the virtual machine is deleted. It does not run on the local machine. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] mik-laj commented on a change in pull request #8550: Add DataflowStartFlexTemplateOperator
mik-laj commented on a change in pull request #8550: URL: https://github.com/apache/airflow/pull/8550#discussion_r416185412 ## File path: airflow/providers/google/cloud/operators/dataflow.py ## @@ -406,6 +406,71 @@ def on_kill(self) -> None: self.hook.cancel_job(job_id=self.job_id, project_id=self.project_id) +class DataflowStartFlexTemplateOperator(BaseOperator): +""" +Starts flex templates with the Dataflow pipeline. + +:param body: The request body +:param location: The location of the Dataflow job (for example europe-west1) +:type location: str +:param project_id: The ID of the GCP project that owns the job. +If set to ``None`` or missing, the default project_id from the GCP connection is used. +:type project_id: Optional[str] +:param gcp_conn_id: The connection ID to use connecting to Google Cloud +Platform. +:type gcp_conn_id: str +:param delegate_to: The account to impersonate, if any. +For this to work, the service account making the request must have +domain-wide delegation enabled. +:type delegate_to: str +""" + +template_fields = ["body", 'location', 'project_id', 'gcp_conn_id'] + +@apply_defaults +def __init__( +self, +body: Dict, +location: str, +project_id: Optional[str] = None, +gcp_conn_id: str = 'google_cloud_default', +delegate_to: Optional[str] = None, +*args, +**kwargs +) -> None: +super().__init__(*args, **kwargs) +self.body = body +self.location = location +self.project_id = project_id +self.gcp_conn_id = gcp_conn_id +self.delegate_to = delegate_to +self.job_id = None +self.hook: Optional[DataflowHook] = None + +def execute(self, context): +self.hook = DataflowHook( +gcp_conn_id=self.gcp_conn_id, +delegate_to=self.delegate_to, +) + +def set_current_job_id(job_id): +self.job_id = job_id + +job = self.hook.start_flex_template( +body=self.body, +location=self.location, +project_id=self.project_id, +on_new_job_id_callback=set_current_job_id, +) + +return job + +def on_kill(self) -> None: +self.log.info("On kill.") +if self.job_id: +self.hook.cancel_job(job_id=self.job_id, project_id=self.project_id) Review comment: Good point. I will change it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] mik-laj commented on a change in pull request #8550: Add DataflowStartFlexTemplateOperator
mik-laj commented on a change in pull request #8550: URL: https://github.com/apache/airflow/pull/8550#discussion_r416184937 ## File path: tests/providers/google/cloud/operators/test_dataflow_system.py ## @@ -26,4 +37,198 @@ class CloudDataflowExampleDagsSystemTest(GoogleSystemTest): @provide_gcp_context(GCP_DATAFLOW_KEY) def test_run_example_dag_function(self): -self.run_dag('example_gcp_dataflow', CLOUD_DAG_FOLDER) +self.run_dag("example_gcp_dataflow", CLOUD_DAG_FOLDER) + + +GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project") +GCR_FLEX_TEMPLATE_IMAGE = f"gcr.io/{GCP_PROJECT_ID}/samples-dataflow-streaming-beam-sql:latest" + +# https://github.com/GoogleCloudPlatform/java-docs-samples/tree/954553c/dataflow/flex-templates/streaming_beam_sql +GCS_TEMPLATE_PARTS = urlparse(GCS_FLEX_TEMPLATE_TEMPLATE_PATH) +GCS_FLEX_TEMPLATE_BUCKET_NAME = GCS_TEMPLATE_PARTS.netloc + + +EXAMPLE_FLEX_TEMPLATE_REPO = "GoogleCloudPlatform/java-docs-samples" +EXAMPLE_FLEX_TEMPLATE_COMMIT = "deb0745be1d1ac1d133e1f0a7faa9413dbfbe5fe" +EXAMPLE_FLEX_TEMPLATE_SUBDIR = "dataflow/flex-templates/streaming_beam_sql" + + +@pytest.mark.backend("mysql", "postgres") +@pytest.mark.credential_file(GCP_GCS_TRANSFER_KEY) +class CloudDataflowExampleDagFlexTemplateJavagSystemTest(GoogleSystemTest): +@provide_gcp_context(GCP_GCS_TRANSFER_KEY, project_id=GoogleSystemTest._project_id()) +def setUp(self) -> None: +# Create a Cloud Storage bucket +self.execute_cmd(["gsutil", "mb", f"gs://{GCS_FLEX_TEMPLATE_BUCKET_NAME}"]) + +# Build image with pipeline +with NamedTemporaryFile() as f: +f.write( +textwrap.dedent( +"""\ +steps: Review comment: The Cloud Build service will provide a new virtual machine for each build. Each build contains many steps. Each step is described by a container image. I personally would not worry about a large number of images, because after the build is completed, the virtual machine is deleted. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] mik-laj commented on a change in pull request #8550: Add DataflowStartFlexTemplateOperator
mik-laj commented on a change in pull request #8550: URL: https://github.com/apache/airflow/pull/8550#discussion_r416181912 ## File path: tests/providers/google/cloud/operators/test_dataflow_system.py ## @@ -15,9 +15,20 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import os Review comment: We always try to write system tests and integrations simultaneously to ensure the best integration reliability. Thanks to this, every person on my team can easily test every integration. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] mik-laj commented on a change in pull request #8550: Add DataflowStartFlexTemplateOperator
mik-laj commented on a change in pull request #8550: URL: https://github.com/apache/airflow/pull/8550#discussion_r416180769 ## File path: tests/providers/google/cloud/operators/test_dataflow_system.py ## @@ -26,4 +37,198 @@ class CloudDataflowExampleDagsSystemTest(GoogleSystemTest): @provide_gcp_context(GCP_DATAFLOW_KEY) def test_run_example_dag_function(self): -self.run_dag('example_gcp_dataflow', CLOUD_DAG_FOLDER) +self.run_dag("example_gcp_dataflow", CLOUD_DAG_FOLDER) + + +GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project") +GCR_FLEX_TEMPLATE_IMAGE = f"gcr.io/{GCP_PROJECT_ID}/samples-dataflow-streaming-beam-sql:latest" + +# https://github.com/GoogleCloudPlatform/java-docs-samples/tree/954553c/dataflow/flex-templates/streaming_beam_sql +GCS_TEMPLATE_PARTS = urlparse(GCS_FLEX_TEMPLATE_TEMPLATE_PATH) +GCS_FLEX_TEMPLATE_BUCKET_NAME = GCS_TEMPLATE_PARTS.netloc + + +EXAMPLE_FLEX_TEMPLATE_REPO = "GoogleCloudPlatform/java-docs-samples" +EXAMPLE_FLEX_TEMPLATE_COMMIT = "deb0745be1d1ac1d133e1f0a7faa9413dbfbe5fe" +EXAMPLE_FLEX_TEMPLATE_SUBDIR = "dataflow/flex-templates/streaming_beam_sql" + + +@pytest.mark.backend("mysql", "postgres") +@pytest.mark.credential_file(GCP_GCS_TRANSFER_KEY) +class CloudDataflowExampleDagFlexTemplateJavagSystemTest(GoogleSystemTest): +@provide_gcp_context(GCP_GCS_TRANSFER_KEY, project_id=GoogleSystemTest._project_id()) +def setUp(self) -> None: +# Create a Cloud Storage bucket +self.execute_cmd(["gsutil", "mb", f"gs://{GCS_FLEX_TEMPLATE_BUCKET_NAME}"]) + +# Build image with pipeline +with NamedTemporaryFile() as f: +f.write( +textwrap.dedent( +"""\ +steps: +- name: gcr.io/cloud-builders/git + args: ['clone', '$_EXAMPLE_REPO', 'repo_dir'] +- name: gcr.io/cloud-builders/git + args: ['checkout', '$_EXAMPLE_COMMIT'] + dir: 'repo_dir' +- name: alpine + entrypoint: /bin/sh + dir: 'repo_dir/$_EXAMPLE_SUBDIR' Review comment: ```suggestion ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] mik-laj commented on a change in pull request #8550: Add DataflowStartFlexTemplateOperator
mik-laj commented on a change in pull request #8550: URL: https://github.com/apache/airflow/pull/8550#discussion_r416180636 ## File path: tests/providers/google/cloud/operators/test_dataflow_system.py ## @@ -26,4 +37,198 @@ class CloudDataflowExampleDagsSystemTest(GoogleSystemTest): @provide_gcp_context(GCP_DATAFLOW_KEY) def test_run_example_dag_function(self): -self.run_dag('example_gcp_dataflow', CLOUD_DAG_FOLDER) +self.run_dag("example_gcp_dataflow", CLOUD_DAG_FOLDER) + + +GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project") +GCR_FLEX_TEMPLATE_IMAGE = f"gcr.io/{GCP_PROJECT_ID}/samples-dataflow-streaming-beam-sql:latest" + +# https://github.com/GoogleCloudPlatform/java-docs-samples/tree/954553c/dataflow/flex-templates/streaming_beam_sql +GCS_TEMPLATE_PARTS = urlparse(GCS_FLEX_TEMPLATE_TEMPLATE_PATH) +GCS_FLEX_TEMPLATE_BUCKET_NAME = GCS_TEMPLATE_PARTS.netloc + + +EXAMPLE_FLEX_TEMPLATE_REPO = "GoogleCloudPlatform/java-docs-samples" +EXAMPLE_FLEX_TEMPLATE_COMMIT = "deb0745be1d1ac1d133e1f0a7faa9413dbfbe5fe" +EXAMPLE_FLEX_TEMPLATE_SUBDIR = "dataflow/flex-templates/streaming_beam_sql" + + +@pytest.mark.backend("mysql", "postgres") +@pytest.mark.credential_file(GCP_GCS_TRANSFER_KEY) +class CloudDataflowExampleDagFlexTemplateJavagSystemTest(GoogleSystemTest): +@provide_gcp_context(GCP_GCS_TRANSFER_KEY, project_id=GoogleSystemTest._project_id()) +def setUp(self) -> None: +# Create a Cloud Storage bucket +self.execute_cmd(["gsutil", "mb", f"gs://{GCS_FLEX_TEMPLATE_BUCKET_NAME}"]) + +# Build image with pipeline +with NamedTemporaryFile() as f: +f.write( +textwrap.dedent( +"""\ +steps: +- name: gcr.io/cloud-builders/git + args: ['clone', '$_EXAMPLE_REPO', 'repo_dir'] +- name: gcr.io/cloud-builders/git + args: ['checkout', '$_EXAMPLE_COMMIT'] + dir: 'repo_dir' +- name: alpine + entrypoint: /bin/sh + dir: 'repo_dir/$_EXAMPLE_SUBDIR' +- name: maven + args: ["mvn", "clean", "package"] + dir: 'repo_dir/$_EXAMPLE_SUBDIR' +- name: alpine + entrypoint: "/bin/sh" + args: ["-c", "ls -lh target/*.jar"] + dir: 'repo_dir/$_EXAMPLE_SUBDIR' Review comment: ```suggestion ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] aaltay commented on a change in pull request #8531: Support all RuntimeEnvironment parameters in DataflowTemplatedJobStartOperator
aaltay commented on a change in pull request #8531: URL: https://github.com/apache/airflow/pull/8531#discussion_r416178253 ## File path: airflow/providers/google/cloud/hooks/dataflow.py ## @@ -548,23 +554,17 @@ def start_template_dataflow( :type location: str """ name = self._build_dataflow_job_name(job_name, append_job_name) -# Builds RuntimeEnvironment from variables dictionary -# https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment -environment = {} -for key in ['numWorkers', 'maxWorkers', 'zone', 'serviceAccountEmail', -'tempLocation', 'bypassTempDirValidation', 'machineType', -'additionalExperiments', 'network', 'subnetwork', 'additionalUserLabels']: -if key in variables: -environment.update({key: variables[key]}) -body = {"jobName": name, -"parameters": parameters, -"environment": environment} + service = self.get_conn() request = service.projects().locations().templates().launch( # pylint: disable=no-member projectId=project_id, location=location, gcsPath=dataflow_template, -body=body +body={ +"jobName": name, +"parameters": parameters, +"environment": variables Review comment: got it. thank you for cleaning things, and keeping them backward compatible. How about adding a new clean flag and marking the old one as depracated? ## File path: airflow/providers/google/cloud/hooks/dataflow.py ## @@ -548,23 +554,17 @@ def start_template_dataflow( :type location: str """ name = self._build_dataflow_job_name(job_name, append_job_name) -# Builds RuntimeEnvironment from variables dictionary -# https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment -environment = {} -for key in ['numWorkers', 'maxWorkers', 'zone', 'serviceAccountEmail', -'tempLocation', 'bypassTempDirValidation', 'machineType', -'additionalExperiments', 'network', 'subnetwork', 'additionalUserLabels']: -if key in variables: -environment.update({key: variables[key]}) -body = {"jobName": name, -"parameters": parameters, -"environment": environment} + service = self.get_conn() request = service.projects().locations().templates().launch( # pylint: disable=no-member projectId=project_id, location=location, gcsPath=dataflow_template, -body=body +body={ +"jobName": name, +"parameters": parameters, +"environment": variables Review comment: got it. thank you for cleaning things, and keeping them backward compatible. How about adding a new clean flag and marking the old one as deprecated? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] mik-laj commented on a change in pull request #8531: Support all RuntimeEnvironment parameters in DataflowTemplatedJobStartOperator
mik-laj commented on a change in pull request #8531: URL: https://github.com/apache/airflow/pull/8531#discussion_r416172704 ## File path: airflow/providers/google/cloud/hooks/dataflow.py ## @@ -548,23 +554,17 @@ def start_template_dataflow( :type location: str """ name = self._build_dataflow_job_name(job_name, append_job_name) -# Builds RuntimeEnvironment from variables dictionary -# https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment -environment = {} -for key in ['numWorkers', 'maxWorkers', 'zone', 'serviceAccountEmail', -'tempLocation', 'bypassTempDirValidation', 'machineType', -'additionalExperiments', 'network', 'subnetwork', 'additionalUserLabels']: -if key in variables: -environment.update({key: variables[key]}) -body = {"jobName": name, -"parameters": parameters, -"environment": environment} + service = self.get_conn() request = service.projects().locations().templates().launch( # pylint: disable=no-member projectId=project_id, location=location, gcsPath=dataflow_template, -body=body +body={ +"jobName": name, +"parameters": parameters, +"environment": variables Review comment: This is problematic, but someone once created a parameter and called `variables`. He used the same name elsewhere. Now I am trying to maintain backward compatibility. In the next PR I will try to change the names of the arguments, but this will require more work to maintain backward compatibility. Formerly `variables` parameters contain environments + project id + region. It was similar to the native pipeline, where the region was also passed as an argument. `python pipeline.py --region=europe-west-1`. However, this was changed when I introduced the changes required by the [GCP guidelines](https://docs.google.com/document/d/1_rTdJSLCt0eyrAylmmgYc3yZr-_h51fVlnvMmWqhCkY/edit). Now it contains only environment parameters This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] aaltay commented on a change in pull request #8553: Add DataflowStartSQLQuery operator
aaltay commented on a change in pull request #8553: URL: https://github.com/apache/airflow/pull/8553#discussion_r416177435 ## File path: airflow/providers/google/cloud/hooks/dataflow.py ## @@ -783,6 +794,77 @@ def cancel_job( name=job_name, job_id=job_id, location=location, -poll_sleep=self.poll_sleep +poll_sleep=self.poll_sleep, +num_retries=self.num_retries, ) jobs_controller.cancel() + +@GoogleBaseHook.fallback_to_default_project_id +def start_sql_job( +self, +job_name: str, +query: str, +options: Dict[str, Any], +project_id: str, +location: str = DEFAULT_DATAFLOW_LOCATION, +on_new_job_id_callback: Optional[Callable[[str], None]] = None +): +""" +Starts Dataflow SQL query. + +:param job_name: The unique name to assign to the Cloud Dataflow job. +:type job_name: str +:param query: The SQL query to execute. +:type query: str +:param options: Job parameters to be executed. +For more information, look at: + `https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query +`__ +command reference +:param location: The location of the Dataflow job (for example europe-west1) +:type location: str +:param project_id: The ID of the GCP project that owns the job. +If set to ``None`` or missing, the default project_id from the GCP connection is used. +:type project_id: Optional[str] +:param on_new_job_id_callback: Callback called when the job ID is known. +:type on_new_job_id_callback: callable +:return: the new job object +""" +cmd = [ +'gcloud', +'beta', +'dataflow', +'sql', +'query', +query, +f'--project={project_id}', +'--format=value(job.id)', +f'--job-name={job_name}', +f'--region={location}', +*(self._options_to_args(options)) +] +self.log.info("Executing command: %s", " ".join([shlex.quote(c) for c in cmd])) +with self.provide_authorized_gcloud(): Review comment: but this is only for logging? Do users normally copy paste these commands out of the logs? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] aaltay commented on a change in pull request #8553: Add DataflowStartSQLQuery operator
aaltay commented on a change in pull request #8553: URL: https://github.com/apache/airflow/pull/8553#discussion_r416176973 ## File path: airflow/providers/google/cloud/hooks/dataflow.py ## @@ -640,9 +659,8 @@ def start_python_dataflow( # pylint: disable=too-many-arguments variables['job_name'] = name variables['region'] = location -def label_formatter(labels_dict): -return ['--labels={}={}'.format(key, value) -for key, value in labels_dict.items()] +if 'labels' in variables: +variables['labels'] = ['{}={}'.format(key, value) for key, value in variables['labels'].items()] Review comment: OK. Thank you for the explanation. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] aaltay commented on a change in pull request #8553: Add DataflowStartSQLQuery operator
aaltay commented on a change in pull request #8553: URL: https://github.com/apache/airflow/pull/8553#discussion_r416176659 ## File path: airflow/providers/google/cloud/example_dags/example_dataflow_sql.py ## @@ -0,0 +1,60 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Example Airflow DAG for Google Cloud Dataflow service +""" +import os + +from airflow import models +from airflow.providers.google.cloud.operators.dataflow import DataflowStartSqlJobOperator +from airflow.utils.dates import days_ago + +GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project") + +BQ_SQL_DATASET = os.environ.get('DATAFLOW_BQ_SQL_DATASET', 'airflow_dataflow_samples') +DATAFLOW_SQL_JOB_NAME = os.environ.get('DATAFLOW_SQL_JOB_NAME', "dataflow-sql") +DATAFLOW_SQL_LOCATION = os.environ.get('DATAFLOW_SQL_LOCATION', 'us-west1') + +with models.DAG( +dag_id="example_gcp_dataflow_sql", +default_args={ +"start_date": days_ago(1), +}, +schedule_interval=None, # Override to match your needs +) as dag_sql: +start_sql = DataflowStartSqlJobOperator( +task_id="start_sql_query", +job_name=DATAFLOW_SQL_JOB_NAME, +query=f""" +SELECT +sales_region as sales_region, +count(state_id) as count_state +FROM +bigquery.table.`{GCP_PROJECT_ID}`.beam_samples.beam_table Review comment: @ibzib - is there a public dataset that could be used? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] aaltay commented on a change in pull request #8550: Add DataflowStartFlexTemplateOperator
aaltay commented on a change in pull request #8550: URL: https://github.com/apache/airflow/pull/8550#discussion_r416176147 ## File path: tests/providers/google/cloud/operators/test_dataflow_system.py ## @@ -26,4 +37,198 @@ class CloudDataflowExampleDagsSystemTest(GoogleSystemTest): @provide_gcp_context(GCP_DATAFLOW_KEY) def test_run_example_dag_function(self): -self.run_dag('example_gcp_dataflow', CLOUD_DAG_FOLDER) +self.run_dag("example_gcp_dataflow", CLOUD_DAG_FOLDER) + + +GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project") +GCR_FLEX_TEMPLATE_IMAGE = f"gcr.io/{GCP_PROJECT_ID}/samples-dataflow-streaming-beam-sql:latest" + +# https://github.com/GoogleCloudPlatform/java-docs-samples/tree/954553c/dataflow/flex-templates/streaming_beam_sql +GCS_TEMPLATE_PARTS = urlparse(GCS_FLEX_TEMPLATE_TEMPLATE_PATH) +GCS_FLEX_TEMPLATE_BUCKET_NAME = GCS_TEMPLATE_PARTS.netloc + + +EXAMPLE_FLEX_TEMPLATE_REPO = "GoogleCloudPlatform/java-docs-samples" +EXAMPLE_FLEX_TEMPLATE_COMMIT = "deb0745be1d1ac1d133e1f0a7faa9413dbfbe5fe" +EXAMPLE_FLEX_TEMPLATE_SUBDIR = "dataflow/flex-templates/streaming_beam_sql" + + +@pytest.mark.backend("mysql", "postgres") +@pytest.mark.credential_file(GCP_GCS_TRANSFER_KEY) +class CloudDataflowExampleDagFlexTemplateJavagSystemTest(GoogleSystemTest): +@provide_gcp_context(GCP_GCS_TRANSFER_KEY, project_id=GoogleSystemTest._project_id()) +def setUp(self) -> None: +# Create a Cloud Storage bucket +self.execute_cmd(["gsutil", "mb", f"gs://{GCS_FLEX_TEMPLATE_BUCKET_NAME}"]) + +# Build image with pipeline +with NamedTemporaryFile() as f: +f.write( +textwrap.dedent( +"""\ +steps: Review comment: Ack. Does it required this many containers in the image? I am not very familiar. Is this a VM image with containers? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] mik-laj commented on a change in pull request #8531: Support all RuntimeEnvironment parameters in DataflowTemplatedJobStartOperator
mik-laj commented on a change in pull request #8531: URL: https://github.com/apache/airflow/pull/8531#discussion_r416176087 ## File path: airflow/providers/google/cloud/operators/dataflow.py ## @@ -377,6 +394,7 @@ def __init__( self.poll_sleep = poll_sleep self.job_id = None self.hook: Optional[DataflowHook] = None +self.options.update(dataflow_default_options) Review comment: Good point. I will fix it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] mik-laj commented on a change in pull request #8531: Support all RuntimeEnvironment parameters in DataflowTemplatedJobStartOperator
mik-laj commented on a change in pull request #8531: URL: https://github.com/apache/airflow/pull/8531#discussion_r416175888 ## File path: tests/providers/google/cloud/hooks/test_dataflow.py ## @@ -52,32 +54,32 @@ 'stagingLocation': 'gs://test/staging', 'labels': {'foo': 'bar'} } -DATAFLOW_VARIABLES_TEMPLATE = { -'project': 'test', -'tempLocation': 'gs://test/temp', -'zone': 'us-central1-f' -} RUNTIME_ENV = { -'tempLocation': 'gs://test/temp', -'zone': 'us-central1-f', -'numWorkers': 2, -'maxWorkers': 10, -'serviceAccountEmail': 'test@apache.airflow', -'machineType': 'n1-standard-1', 'additionalExperiments': ['exp_flag1', 'exp_flag2'], -'network': 'default', -'subnetwork': 'regions/REGION/subnetworks/SUBNETWORK', 'additionalUserLabels': { 'name': 'wrench', 'mass': '1.3kg', 'count': '3' -} +}, +'bypassTempDirValidation': {}, +'ipConfiguration': 'WORKER_IP_PRIVATE', +'kmsKeyName': ( + 'projects/TEST_PROJECT_ID/locations/TEST_LOCATIONS/keyRings/TEST_KEYRING/cryptoKeys/TEST_CRYPTOKEYS' +), +'maxWorkers': 10, +'network': 'default', +'numWorkers': 2, +'serviceAccountEmail': 'test@apache.airflow', +'subnetwork': 'regions/REGION/subnetworks/SUBNETWORK', +'tempLocation': 'gs://test/temp', +'workerRegion': "test-region", +'workerZone': 'test-zone', +'zone': 'us-central1-f', +'machineType': 'n1-standard-1', Review comment: IP Configuration is also tested on line 65. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] mik-laj commented on a change in pull request #8531: Support all RuntimeEnvironment parameters in DataflowTemplatedJobStartOperator
mik-laj commented on a change in pull request #8531: URL: https://github.com/apache/airflow/pull/8531#discussion_r416172704 ## File path: airflow/providers/google/cloud/hooks/dataflow.py ## @@ -548,23 +554,17 @@ def start_template_dataflow( :type location: str """ name = self._build_dataflow_job_name(job_name, append_job_name) -# Builds RuntimeEnvironment from variables dictionary -# https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment -environment = {} -for key in ['numWorkers', 'maxWorkers', 'zone', 'serviceAccountEmail', -'tempLocation', 'bypassTempDirValidation', 'machineType', -'additionalExperiments', 'network', 'subnetwork', 'additionalUserLabels']: -if key in variables: -environment.update({key: variables[key]}) -body = {"jobName": name, -"parameters": parameters, -"environment": environment} + service = self.get_conn() request = service.projects().locations().templates().launch( # pylint: disable=no-member projectId=project_id, location=location, gcsPath=dataflow_template, -body=body +body={ +"jobName": name, +"parameters": parameters, +"environment": variables Review comment: This is problematic, but someone once created a parameter and called `variables`. He used the same name elsewhere. Now I am trying to maintain backward compatibility. In the next PR I will try to change the names of the arguments, but this will require more work to maintain backward compatibility. Formerly `variables` parameters contain environments + project id + region. It was similar to the native pipeline, where the region was also passed as an argument. `python pipeline.py --region=europe-west-1`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] ashb opened a new pull request #8594: Reduce duplication of functions called in ci scripts
ashb opened a new pull request #8594: URL: https://github.com/apache/airflow/pull/8594 Every place we called rebuild_ci_image_if_needed we were also calling preprepare_ci_build -- there's no need. --- Make sure to mark the boxes below before creating PR: [x] - [x] Description above provides context of the change - [x] Unit tests coverage for changes (not needed for documentation changes) - [x] Target Github ISSUE in description if exists - [x] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)" - [x] Relevant documentation is updated including usage instructions. - [x] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example). --- In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed. In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x). In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md). Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] mik-laj commented on a change in pull request #8531: Support all RuntimeEnvironment parameters in DataflowTemplatedJobStartOperator
mik-laj commented on a change in pull request #8531: URL: https://github.com/apache/airflow/pull/8531#discussion_r416172704 ## File path: airflow/providers/google/cloud/hooks/dataflow.py ## @@ -548,23 +554,17 @@ def start_template_dataflow( :type location: str """ name = self._build_dataflow_job_name(job_name, append_job_name) -# Builds RuntimeEnvironment from variables dictionary -# https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment -environment = {} -for key in ['numWorkers', 'maxWorkers', 'zone', 'serviceAccountEmail', -'tempLocation', 'bypassTempDirValidation', 'machineType', -'additionalExperiments', 'network', 'subnetwork', 'additionalUserLabels']: -if key in variables: -environment.update({key: variables[key]}) -body = {"jobName": name, -"parameters": parameters, -"environment": environment} + service = self.get_conn() request = service.projects().locations().templates().launch( # pylint: disable=no-member projectId=project_id, location=location, gcsPath=dataflow_template, -body=body +body={ +"jobName": name, +"parameters": parameters, +"environment": variables Review comment: This is problematic, but someone once created a parameter and called it variables. He used the same name elsewhere. Now I am trying to maintain backward compatibility. In the next PR I will try to correct the names of the arguments, but this will require more work to maintain backward compatibility. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] mik-laj commented on a change in pull request #8531: Support all RuntimeEnvironment parameters in DataflowTemplatedJobStartOperator
mik-laj commented on a change in pull request #8531: URL: https://github.com/apache/airflow/pull/8531#discussion_r416171580 ## File path: airflow/providers/google/cloud/hooks/dataflow.py ## @@ -532,7 +532,13 @@ def start_template_dataflow( :param job_name: The name of the job. Review comment: Yes. It only affects templates. Native pipelines were not affected by this issue. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] mik-laj commented on a change in pull request #8553: Add DataflowStartSQLQuery operator
mik-laj commented on a change in pull request #8553: URL: https://github.com/apache/airflow/pull/8553#discussion_r416169086 ## File path: airflow/providers/google/cloud/hooks/dataflow.py ## @@ -93,15 +94,30 @@ def inner_wrapper(self: "DataflowHook", *args, **kwargs) -> RT: class DataflowJobStatus: """ Helper class with Dataflow job statuses. +Reference: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState """ -JOB_STATE_DONE = "JOB_STATE_DONE" +JOB_STATE_UNKNOWN = "JOB_STATE_UNKNOWN" +JOB_STATE_STOPPED = "JOB_STATE_STOPPED" JOB_STATE_RUNNING = "JOB_STATE_RUNNING" +JOB_STATE_DONE = "JOB_STATE_DONE" JOB_STATE_FAILED = "JOB_STATE_FAILED" JOB_STATE_CANCELLED = "JOB_STATE_CANCELLED" +JOB_STATE_UPDATED = "JOB_STATE_UPDATED" +JOB_STATE_DRAINING = "JOB_STATE_DRAINING" +JOB_STATE_DRAINED = "JOB_STATE_DRAINED" JOB_STATE_PENDING = "JOB_STATE_PENDING" -FAILED_END_STATES = {JOB_STATE_FAILED, JOB_STATE_CANCELLED} -SUCCEEDED_END_STATES = {JOB_STATE_DONE} -END_STATES = SUCCEEDED_END_STATES | FAILED_END_STATES +JOB_STATE_CANCELLING = "JOB_STATE_CANCELLING" +JOB_STATE_QUEUED = "JOB_STATE_QUEUED" +FAILED_END_STATES = {JOB_STATE_FAILED, JOB_STATE_CANCELLED, JOB_STATE_STOPPED} +SUCCEEDED_END_STATES = {JOB_STATE_DONE, JOB_STATE_UPDATED, JOB_STATE_DRAINED} +TERMINAL_STATES = SUCCEEDED_END_STATES | FAILED_END_STATES Review comment: Airflow does not have the ability to display full information about the status of the job in an external system. We only have two states - SUCCESS/FAILED. What are you proposing then? Can the user specify expected end-states? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] kaxil opened a new pull request #8593: Test deprecation warning for back-compat secrets
kaxil opened a new pull request #8593: URL: https://github.com/apache/airflow/pull/8593 Based on https://github.com/apache/airflow/pull/8413#issuecomment-616535528 Add contrib secrets to TestMovingCoreToContrib to test deprecation warning is raised --- Make sure to mark the boxes below before creating PR: [x] - [x] Description above provides context of the change - [x] Unit tests coverage for changes (not needed for documentation changes) - [x] Target Github ISSUE in description if exists - [x] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)" - [x] Relevant documentation is updated including usage instructions. - [x] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example). --- In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed. In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x). In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md). Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] mik-laj commented on a change in pull request #8553: Add DataflowStartSQLQuery operator
mik-laj commented on a change in pull request #8553: URL: https://github.com/apache/airflow/pull/8553#discussion_r416157606 ## File path: airflow/providers/google/cloud/example_dags/example_dataflow_sql.py ## @@ -0,0 +1,60 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Example Airflow DAG for Google Cloud Dataflow service +""" +import os + +from airflow import models +from airflow.providers.google.cloud.operators.dataflow import DataflowStartSqlJobOperator +from airflow.utils.dates import days_ago + +GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project") + +BQ_SQL_DATASET = os.environ.get('DATAFLOW_BQ_SQL_DATASET', 'airflow_dataflow_samples') +DATAFLOW_SQL_JOB_NAME = os.environ.get('DATAFLOW_SQL_JOB_NAME', "dataflow-sql") +DATAFLOW_SQL_LOCATION = os.environ.get('DATAFLOW_SQL_LOCATION', 'us-west1') + +with models.DAG( +dag_id="example_gcp_dataflow_sql", +default_args={ +"start_date": days_ago(1), +}, +schedule_interval=None, # Override to match your needs +) as dag_sql: +start_sql = DataflowStartSqlJobOperator( +task_id="start_sql_query", +job_name=DATAFLOW_SQL_JOB_NAME, +query=f""" +SELECT +sales_region as sales_region, +count(state_id) as count_state +FROM +bigquery.table.`{GCP_PROJECT_ID}`.beam_samples.beam_table Review comment: We strive to maintain a high level of reliability for GCP integration. All integrations that are developed by my team must have system tests. System tests are required criteria for internal review. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] ashb edited a comment on pull request #8499: Optimize GitHub CI configuration
ashb edited a comment on pull request #8499: URL: https://github.com/apache/airflow/pull/8499#issuecomment-620246643 Speaking of Semantic commits - PR title != commit title here :grin: The only problem with that is some people less familiar with git aren't comfortable editing commits and force pushing, so the approach of commitzen to have a pending check (but not a failing one) looks quite nice. Certainly for single commit PRs anyway, the title should match the PR title, or at least prompt us committers to notice that it doesn't cc @turbaszek This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] ashb commented on pull request #8499: Optimize GitHub CI configuration
ashb commented on pull request #8499: URL: https://github.com/apache/airflow/pull/8499#issuecomment-620246643 Speaking of Semantic commits - PR title != commit title here :grin: The only problem with that is some people less familiar with git aren't comfortable editing commits and force pushing, so the approach of commitzen to have a pending check (but not a failing one) looks quite nice. Certainly for single commit PRs anyway, the title should match the PR title, or at least prompt us committers to notice that it doesn't This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] aaltay commented on a change in pull request #8531: Support all RuntimeEnvironment parameters in DataflowTemplatedJobStartOperator
aaltay commented on a change in pull request #8531: URL: https://github.com/apache/airflow/pull/8531#discussion_r416140436 ## File path: airflow/providers/google/cloud/hooks/dataflow.py ## @@ -548,23 +554,17 @@ def start_template_dataflow( :type location: str """ name = self._build_dataflow_job_name(job_name, append_job_name) -# Builds RuntimeEnvironment from variables dictionary -# https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment -environment = {} -for key in ['numWorkers', 'maxWorkers', 'zone', 'serviceAccountEmail', -'tempLocation', 'bypassTempDirValidation', 'machineType', -'additionalExperiments', 'network', 'subnetwork', 'additionalUserLabels']: -if key in variables: -environment.update({key: variables[key]}) -body = {"jobName": name, -"parameters": parameters, -"environment": environment} + service = self.get_conn() request = service.projects().locations().templates().launch( # pylint: disable=no-member projectId=project_id, location=location, gcsPath=dataflow_template, -body=body +body={ +"jobName": name, +"parameters": parameters, +"environment": variables Review comment: What is the difference between parameters and variables? ## File path: airflow/providers/google/cloud/operators/dataflow.py ## @@ -377,6 +394,7 @@ def __init__( self.poll_sleep = poll_sleep self.job_id = None self.hook: Optional[DataflowHook] = None +self.options.update(dataflow_default_options) Review comment: Would not this override user provided options with default options? ## File path: airflow/providers/google/cloud/hooks/dataflow.py ## @@ -532,7 +532,13 @@ def start_template_dataflow( :param job_name: The name of the job. Review comment: Is this bug ( #8300) only affecting templates? ## File path: tests/providers/google/cloud/hooks/test_dataflow.py ## @@ -52,32 +54,32 @@ 'stagingLocation': 'gs://test/staging', 'labels': {'foo': 'bar'} } -DATAFLOW_VARIABLES_TEMPLATE = { -'project': 'test', -'tempLocation': 'gs://test/temp', -'zone': 'us-central1-f' -} RUNTIME_ENV = { -'tempLocation': 'gs://test/temp', -'zone': 'us-central1-f', -'numWorkers': 2, -'maxWorkers': 10, -'serviceAccountEmail': 'test@apache.airflow', -'machineType': 'n1-standard-1', 'additionalExperiments': ['exp_flag1', 'exp_flag2'], -'network': 'default', -'subnetwork': 'regions/REGION/subnetworks/SUBNETWORK', 'additionalUserLabels': { 'name': 'wrench', 'mass': '1.3kg', 'count': '3' -} +}, +'bypassTempDirValidation': {}, +'ipConfiguration': 'WORKER_IP_PRIVATE', +'kmsKeyName': ( + 'projects/TEST_PROJECT_ID/locations/TEST_LOCATIONS/keyRings/TEST_KEYRING/cryptoKeys/TEST_CRYPTOKEYS' +), +'maxWorkers': 10, +'network': 'default', +'numWorkers': 2, +'serviceAccountEmail': 'test@apache.airflow', +'subnetwork': 'regions/REGION/subnetworks/SUBNETWORK', +'tempLocation': 'gs://test/temp', +'workerRegion': "test-region", +'workerZone': 'test-zone', +'zone': 'us-central1-f', +'machineType': 'n1-standard-1', Review comment: maybe test private ip flag, since it was specifically mentioned in the issue. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] mik-laj commented on a change in pull request #8553: Add DataflowStartSQLQuery operator
mik-laj commented on a change in pull request #8553: URL: https://github.com/apache/airflow/pull/8553#discussion_r416146984 ## File path: airflow/providers/google/cloud/hooks/dataflow.py ## @@ -640,9 +659,8 @@ def start_python_dataflow( # pylint: disable=too-many-arguments variables['job_name'] = name variables['region'] = location -def label_formatter(labels_dict): -return ['--labels={}={}'.format(key, value) -for key, value in labels_dict.items()] +if 'labels' in variables: +variables['labels'] = ['{}={}'.format(key, value) for key, value in variables['labels'].items()] Review comment: It depends on the SDK that is used. These two SDK require different argument formats. We have three related methods. * start_python_dataflow * start_java_dataflow * _start_dataflow The first two methods are public and dependent on the SDK. This is responsible for actions regarding a specific SDK e.g. environment preparation. _start_dataflow is an internal method. This starts the system process and supervises execution. I have the impression that this separation is helpful. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] mik-laj commented on a change in pull request #8553: Add DataflowStartSQLQuery operator
mik-laj commented on a change in pull request #8553: URL: https://github.com/apache/airflow/pull/8553#discussion_r416146984 ## File path: airflow/providers/google/cloud/hooks/dataflow.py ## @@ -640,9 +659,8 @@ def start_python_dataflow( # pylint: disable=too-many-arguments variables['job_name'] = name variables['region'] = location -def label_formatter(labels_dict): -return ['--labels={}={}'.format(key, value) -for key, value in labels_dict.items()] +if 'labels' in variables: +variables['labels'] = ['{}={}'.format(key, value) for key, value in variables['labels'].items()] Review comment: It depends on the SDK that is used. These two SDK require different argument formats. We have three related methods. * start_python_dataflow * start_java_dataflow * _start_dataflow The first two methods are public and dependent on the SDK. They are responsible for introducing changes dependent on the SDK, e.g. environment preparation. _start_dataflow is an internal method. This starts the system process and supervises execution. I have the impression that this separation is helpful. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] mik-laj commented on a change in pull request #8550: Add DataflowStartFlexTemplateOperator
mik-laj commented on a change in pull request #8550: URL: https://github.com/apache/airflow/pull/8550#discussion_r416159089 ## File path: tests/providers/google/cloud/operators/test_dataflow_system.py ## @@ -26,4 +37,198 @@ class CloudDataflowExampleDagsSystemTest(GoogleSystemTest): @provide_gcp_context(GCP_DATAFLOW_KEY) def test_run_example_dag_function(self): -self.run_dag('example_gcp_dataflow', CLOUD_DAG_FOLDER) +self.run_dag("example_gcp_dataflow", CLOUD_DAG_FOLDER) + + +GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project") +GCR_FLEX_TEMPLATE_IMAGE = f"gcr.io/{GCP_PROJECT_ID}/samples-dataflow-streaming-beam-sql:latest" + +# https://github.com/GoogleCloudPlatform/java-docs-samples/tree/954553c/dataflow/flex-templates/streaming_beam_sql +GCS_TEMPLATE_PARTS = urlparse(GCS_FLEX_TEMPLATE_TEMPLATE_PATH) +GCS_FLEX_TEMPLATE_BUCKET_NAME = GCS_TEMPLATE_PARTS.netloc + + +EXAMPLE_FLEX_TEMPLATE_REPO = "GoogleCloudPlatform/java-docs-samples" +EXAMPLE_FLEX_TEMPLATE_COMMIT = "deb0745be1d1ac1d133e1f0a7faa9413dbfbe5fe" +EXAMPLE_FLEX_TEMPLATE_SUBDIR = "dataflow/flex-templates/streaming_beam_sql" + + +@pytest.mark.backend("mysql", "postgres") +@pytest.mark.credential_file(GCP_GCS_TRANSFER_KEY) +class CloudDataflowExampleDagFlexTemplateJavagSystemTest(GoogleSystemTest): +@provide_gcp_context(GCP_GCS_TRANSFER_KEY, project_id=GoogleSystemTest._project_id()) +def setUp(self) -> None: +# Create a Cloud Storage bucket +self.execute_cmd(["gsutil", "mb", f"gs://{GCS_FLEX_TEMPLATE_BUCKET_NAME}"]) + +# Build image with pipeline +with NamedTemporaryFile() as f: +f.write( +textwrap.dedent( +"""\ +steps: Review comment: I am building an image from your repository. https://github.com/GoogleCloudPlatform/java-docs-samples/tree/954553c/dataflow/flex-templates/streaming_beam_sql This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] mik-laj commented on a change in pull request #8553: Add DataflowStartSQLQuery operator
mik-laj commented on a change in pull request #8553: URL: https://github.com/apache/airflow/pull/8553#discussion_r416157606 ## File path: airflow/providers/google/cloud/example_dags/example_dataflow_sql.py ## @@ -0,0 +1,60 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Example Airflow DAG for Google Cloud Dataflow service +""" +import os + +from airflow import models +from airflow.providers.google.cloud.operators.dataflow import DataflowStartSqlJobOperator +from airflow.utils.dates import days_ago + +GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project") + +BQ_SQL_DATASET = os.environ.get('DATAFLOW_BQ_SQL_DATASET', 'airflow_dataflow_samples') +DATAFLOW_SQL_JOB_NAME = os.environ.get('DATAFLOW_SQL_JOB_NAME', "dataflow-sql") +DATAFLOW_SQL_LOCATION = os.environ.get('DATAFLOW_SQL_LOCATION', 'us-west1') + +with models.DAG( +dag_id="example_gcp_dataflow_sql", +default_args={ +"start_date": days_ago(1), +}, +schedule_interval=None, # Override to match your needs +) as dag_sql: +start_sql = DataflowStartSqlJobOperator( +task_id="start_sql_query", +job_name=DATAFLOW_SQL_JOB_NAME, +query=f""" +SELECT +sales_region as sales_region, +count(state_id) as count_state +FROM +bigquery.table.`{GCP_PROJECT_ID}`.beam_samples.beam_table Review comment: We strive to maintain a high level of reliability for GCP integration. All integrations that are developed by my team must have system tests. System tests are required criteria for internal review. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] mik-laj commented on a change in pull request #8553: Add DataflowStartSQLQuery operator
mik-laj commented on a change in pull request #8553: URL: https://github.com/apache/airflow/pull/8553#discussion_r416154847 ## File path: airflow/providers/google/cloud/operators/dataflow.py ## @@ -406,6 +406,88 @@ def on_kill(self) -> None: self.hook.cancel_job(job_id=self.job_id, project_id=self.project_id) +class DataflowStartSqlJobOperator(BaseOperator): +""" +Starts Dataflow SQL query. + +:param job_name: The unique name to assign to the Cloud Dataflow job. +:type job_name: str +:param query: The SQL query to execute. +:type query: str +:param options: Job parameters to be executed. It can be a dictionary with the following keys. + +For more information, look at: +`https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query +`__ +command reference + +:param options: dict +:param location: The location of the Dataflow job (for example europe-west1) +:type location: str +:param project_id: The ID of the GCP project that owns the job. +If set to ``None`` or missing, the default project_id from the GCP connection is used. +:type project_id: Optional[str] +:param gcp_conn_id: The connection ID to use connecting to Google Cloud +Platform. +:type gcp_conn_id: str +:param delegate_to: The account to impersonate, if any. +For this to work, the service account making the request must have +domain-wide delegation enabled. +:type delegate_to: str +""" + +template_fields = ["job_name", 'query', 'options', 'location', 'project_id', 'gcp_conn_id'] + +@apply_defaults +def __init__( +self, +job_name: str, +query: str, +options: Dict[str, Any], +location: str = DEFAULT_DATAFLOW_LOCATION, +project_id: Optional[str] = None, +gcp_conn_id: str = 'google_cloud_default', +delegate_to: Optional[str] = None, +*args, +**kwargs +) -> None: +super().__init__(*args, **kwargs) +self.job_name = job_name +self.query = query +self.options = options +self.location = location +self.project_id = project_id +self.gcp_conn_id = gcp_conn_id +self.delegate_to = delegate_to +self.job_id = None +self.hook: Optional[DataflowHook] = None + +def execute(self, context): +self.hook = DataflowHook( +gcp_conn_id=self.gcp_conn_id, +delegate_to=self.delegate_to, +) + +def set_current_job_id(job_id): +self.job_id = job_id + +job = self.hook.start_sql_job( +job_name=self.job_name, +query=self.query, +options=self.options, +location=self.location, +project_id=self.project_id, +on_new_job_id_callback=set_current_job_id, +) + +return job + +def on_kill(self) -> None: +self.log.info("On kill.") +if self.job_id: +self.hook.cancel_job(job_id=self.job_id, project_id=self.project_id) Review comment: Good point. I will skip jobs in the terminal state. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] mik-laj commented on a change in pull request #8553: Add DataflowStartSQLQuery operator
mik-laj commented on a change in pull request #8553: URL: https://github.com/apache/airflow/pull/8553#discussion_r416153736 ## File path: airflow/providers/google/cloud/operators/dataflow.py ## @@ -406,6 +406,88 @@ def on_kill(self) -> None: self.hook.cancel_job(job_id=self.job_id, project_id=self.project_id) +class DataflowStartSqlJobOperator(BaseOperator): +""" +Starts Dataflow SQL query. + +:param job_name: The unique name to assign to the Cloud Dataflow job. +:type job_name: str +:param query: The SQL query to execute. +:type query: str +:param options: Job parameters to be executed. It can be a dictionary with the following keys. + +For more information, look at: +`https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query +`__ +command reference + +:param options: dict +:param location: The location of the Dataflow job (for example europe-west1) +:type location: str +:param project_id: The ID of the GCP project that owns the job. +If set to ``None`` or missing, the default project_id from the GCP connection is used. +:type project_id: Optional[str] +:param gcp_conn_id: The connection ID to use connecting to Google Cloud +Platform. +:type gcp_conn_id: str Review comment: Airflow saves all credentials(MySQL, GCP, AWS, and other) in one table in the database. It's called `connection`. This is the entry ID in this table. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] mik-laj commented on a change in pull request #8553: Add DataflowStartSQLQuery operator
mik-laj commented on a change in pull request #8553: URL: https://github.com/apache/airflow/pull/8553#discussion_r416152856 ## File path: airflow/providers/google/cloud/hooks/dataflow.py ## @@ -783,6 +794,77 @@ def cancel_job( name=job_name, job_id=job_id, location=location, -poll_sleep=self.poll_sleep +poll_sleep=self.poll_sleep, +num_retries=self.num_retries, ) jobs_controller.cancel() + +@GoogleBaseHook.fallback_to_default_project_id +def start_sql_job( +self, +job_name: str, +query: str, +options: Dict[str, Any], +project_id: str, +location: str = DEFAULT_DATAFLOW_LOCATION, +on_new_job_id_callback: Optional[Callable[[str], None]] = None +): +""" +Starts Dataflow SQL query. + +:param job_name: The unique name to assign to the Cloud Dataflow job. +:type job_name: str +:param query: The SQL query to execute. +:type query: str +:param options: Job parameters to be executed. +For more information, look at: + `https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query +`__ +command reference +:param location: The location of the Dataflow job (for example europe-west1) +:type location: str +:param project_id: The ID of the GCP project that owns the job. +If set to ``None`` or missing, the default project_id from the GCP connection is used. +:type project_id: Optional[str] +:param on_new_job_id_callback: Callback called when the job ID is known. +:type on_new_job_id_callback: callable +:return: the new job object +""" +cmd = [ +'gcloud', +'beta', +'dataflow', +'sql', +'query', +query, +f'--project={project_id}', +'--format=value(job.id)', +f'--job-name={job_name}', +f'--region={location}', +*(self._options_to_args(options)) +] +self.log.info("Executing command: %s", " ".join([shlex.quote(c) for c in cmd])) +with self.provide_authorized_gcloud(): +proc = subprocess.run( # pylint: disable=subprocess-run-check +cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) +self.log.info("Output: %s", proc.stdout.decode()) +self.log.info("Stderr: %s", proc.stderr.decode()) Review comment: `stderr` often contains developer information. There are not only errors. I will change it to `log.warning` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] ashb commented on a change in pull request #8572: Improve template capabilities of EMR job and step operators
ashb commented on a change in pull request #8572: URL: https://github.com/apache/airflow/pull/8572#discussion_r416151252 ## File path: airflow/providers/amazon/aws/operators/emr_add_steps.py ## @@ -38,13 +38,14 @@ class EmrAddStepsOperator(BaseOperator): :type cluster_states: list :param aws_conn_id: aws connection to uses :type aws_conn_id: str -:param steps: boto3 style steps to be added to the jobflow. (templated) -:type steps: list +:param steps: boto3 style steps or reference to a steps file (must be '.json' or '.jinja2') to +be added to the jobflow. (templated) +:type steps: list|str :param do_xcom_push: if True, job_flow_id is pushed to XCom with key job_flow_id. :type do_xcom_push: bool """ template_fields = ['job_flow_id', 'job_flow_name', 'cluster_states', 'steps'] -template_ext = () +template_ext = ('.json', '.jinja2') Review comment: Nothing else uses `.jinja2` in Airflow, so I think we shouldn't create new pattern for just this operator. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] mik-laj commented on a change in pull request #8553: Add DataflowStartSQLQuery operator
mik-laj commented on a change in pull request #8553: URL: https://github.com/apache/airflow/pull/8553#discussion_r416150987 ## File path: airflow/providers/google/cloud/hooks/dataflow.py ## @@ -783,6 +794,77 @@ def cancel_job( name=job_name, job_id=job_id, location=location, -poll_sleep=self.poll_sleep +poll_sleep=self.poll_sleep, +num_retries=self.num_retries, ) jobs_controller.cancel() + +@GoogleBaseHook.fallback_to_default_project_id +def start_sql_job( +self, +job_name: str, +query: str, +options: Dict[str, Any], +project_id: str, +location: str = DEFAULT_DATAFLOW_LOCATION, +on_new_job_id_callback: Optional[Callable[[str], None]] = None +): +""" +Starts Dataflow SQL query. + +:param job_name: The unique name to assign to the Cloud Dataflow job. +:type job_name: str +:param query: The SQL query to execute. +:type query: str +:param options: Job parameters to be executed. +For more information, look at: + `https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query +`__ +command reference +:param location: The location of the Dataflow job (for example europe-west1) +:type location: str +:param project_id: The ID of the GCP project that owns the job. +If set to ``None`` or missing, the default project_id from the GCP connection is used. +:type project_id: Optional[str] +:param on_new_job_id_callback: Callback called when the job ID is known. +:type on_new_job_id_callback: callable +:return: the new job object +""" +cmd = [ +'gcloud', +'beta', +'dataflow', +'sql', +'query', +query, +f'--project={project_id}', +'--format=value(job.id)', +f'--job-name={job_name}', +f'--region={location}', +*(self._options_to_args(options)) +] +self.log.info("Executing command: %s", " ".join([shlex.quote(c) for c in cmd])) +with self.provide_authorized_gcloud(): Review comment: Adds escape characters if needed. Example: If you want to display the contents of the `/tmp/` directory then you can use the command `ls /tmp/` If you want to display the contents of the `/tmp/i love pizza` directory then you can use the command `ls '/tmp/ i love pizza'`. `ls /tmp/i love pizza` is incorrect command. The decision about quotation characeters was made by shlex.quote. This also supports other cases required by sh e.g. quote character in an argument This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] mik-laj commented on a change in pull request #8553: Add DataflowStartSQLQuery operator
mik-laj commented on a change in pull request #8553: URL: https://github.com/apache/airflow/pull/8553#discussion_r416146984 ## File path: airflow/providers/google/cloud/hooks/dataflow.py ## @@ -640,9 +659,8 @@ def start_python_dataflow( # pylint: disable=too-many-arguments variables['job_name'] = name variables['region'] = location -def label_formatter(labels_dict): -return ['--labels={}={}'.format(key, value) -for key, value in labels_dict.items()] +if 'labels' in variables: +variables['labels'] = ['{}={}'.format(key, value) for key, value in variables['labels'].items()] Review comment: It depends on the SDK that is used. These two SDK require different argument formats. We have three related methods. * start_python_dataflow * start_java_dataflow * _start_dataflow The first two methods are public and dependent on the SDK. They are responsible for introducing changes dependent on the SDK, e.g. environment preparation. _start_dataflow is an internal method. This starts the system process and supervises execution. I have the impression that this separation is helpful. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] mik-laj commented on a change in pull request #8553: Add DataflowStartSQLQuery operator
mik-laj commented on a change in pull request #8553: URL: https://github.com/apache/airflow/pull/8553#discussion_r416142108 ## File path: airflow/providers/google/cloud/example_dags/example_dataflow_sql.py ## @@ -0,0 +1,60 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Example Airflow DAG for Google Cloud Dataflow service +""" +import os + +from airflow import models +from airflow.providers.google.cloud.operators.dataflow import DataflowStartSqlJobOperator +from airflow.utils.dates import days_ago + +GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project") + +BQ_SQL_DATASET = os.environ.get('DATAFLOW_BQ_SQL_DATASET', 'airflow_dataflow_samples') +DATAFLOW_SQL_JOB_NAME = os.environ.get('DATAFLOW_SQL_JOB_NAME', "dataflow-sql") +DATAFLOW_SQL_LOCATION = os.environ.get('DATAFLOW_SQL_LOCATION', 'us-west1') + +with models.DAG( +dag_id="example_gcp_dataflow_sql", +default_args={ +"start_date": days_ago(1), +}, +schedule_interval=None, # Override to match your needs +) as dag_sql: +start_sql = DataflowStartSqlJobOperator( +task_id="start_sql_query", +job_name=DATAFLOW_SQL_JOB_NAME, +query=f""" +SELECT +sales_region as sales_region, +count(state_id) as count_state +FROM +bigquery.table.`{GCP_PROJECT_ID}`.beam_samples.beam_table +GROUP BY sales_region; +""", +options={ +"bigquery-project": GCP_PROJECT_ID, +"bigquery-dataset": BQ_SQL_DATASET, +"bigquery-table": "beam_output", +'bigquery-write-disposition': "write-truncate", +}, +location=DATAFLOW_SQL_LOCATION, +do_xcom_push=True, Review comment: I still asked @jaketfI to review. Before this change is merged, it will also be reviewed by at least one Apache Airflow commiter. . This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] mik-laj commented on a change in pull request #8553: Add DataflowStartSQLQuery operator
mik-laj commented on a change in pull request #8553: URL: https://github.com/apache/airflow/pull/8553#discussion_r416140501 ## File path: airflow/providers/google/cloud/example_dags/example_dataflow_sql.py ## @@ -0,0 +1,60 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Example Airflow DAG for Google Cloud Dataflow service +""" +import os + +from airflow import models +from airflow.providers.google.cloud.operators.dataflow import DataflowStartSqlJobOperator +from airflow.utils.dates import days_ago + +GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project") + +BQ_SQL_DATASET = os.environ.get('DATAFLOW_BQ_SQL_DATASET', 'airflow_dataflow_samples') +DATAFLOW_SQL_JOB_NAME = os.environ.get('DATAFLOW_SQL_JOB_NAME', "dataflow-sql") +DATAFLOW_SQL_LOCATION = os.environ.get('DATAFLOW_SQL_LOCATION', 'us-west1') + +with models.DAG( +dag_id="example_gcp_dataflow_sql", +default_args={ +"start_date": days_ago(1), +}, +schedule_interval=None, # Override to match your needs +) as dag_sql: +start_sql = DataflowStartSqlJobOperator( +task_id="start_sql_query", +job_name=DATAFLOW_SQL_JOB_NAME, +query=f""" +SELECT +sales_region as sales_region, +count(state_id) as count_state +FROM +bigquery.table.`{GCP_PROJECT_ID}`.beam_samples.beam_table Review comment: This bucket is created in system tests. https://github.com/apache/airflow/pull/8553/files# Unfortunately, Dataflow SQL is not compatible with the public datasets I know. I got the following error when I referred to the public dataset. > Caused by: java.lang.UnsupportedOperationException: Field type 'NUMERIC' is not supported (field 'value') Its error message from BigQuery console ![Screenshot 2020-04-27 at 22 52 46](https://user-images.githubusercontent.com/12058428/80419684-de796580-88d9-11ea-85bd-1d5473c13901.png) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] kennknowles commented on a change in pull request #8553: Add DataflowStartSQLQuery operator
kennknowles commented on a change in pull request #8553: URL: https://github.com/apache/airflow/pull/8553#discussion_r416136954 ## File path: airflow/providers/google/cloud/hooks/dataflow.py ## @@ -93,15 +94,30 @@ def inner_wrapper(self: "DataflowHook", *args, **kwargs) -> RT: class DataflowJobStatus: """ Helper class with Dataflow job statuses. +Reference: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState """ -JOB_STATE_DONE = "JOB_STATE_DONE" +JOB_STATE_UNKNOWN = "JOB_STATE_UNKNOWN" +JOB_STATE_STOPPED = "JOB_STATE_STOPPED" JOB_STATE_RUNNING = "JOB_STATE_RUNNING" +JOB_STATE_DONE = "JOB_STATE_DONE" JOB_STATE_FAILED = "JOB_STATE_FAILED" JOB_STATE_CANCELLED = "JOB_STATE_CANCELLED" +JOB_STATE_UPDATED = "JOB_STATE_UPDATED" +JOB_STATE_DRAINING = "JOB_STATE_DRAINING" +JOB_STATE_DRAINED = "JOB_STATE_DRAINED" JOB_STATE_PENDING = "JOB_STATE_PENDING" -FAILED_END_STATES = {JOB_STATE_FAILED, JOB_STATE_CANCELLED} -SUCCEEDED_END_STATES = {JOB_STATE_DONE} -END_STATES = SUCCEEDED_END_STATES | FAILED_END_STATES +JOB_STATE_CANCELLING = "JOB_STATE_CANCELLING" +JOB_STATE_QUEUED = "JOB_STATE_QUEUED" +FAILED_END_STATES = {JOB_STATE_FAILED, JOB_STATE_CANCELLED, JOB_STATE_STOPPED} +SUCCEEDED_END_STATES = {JOB_STATE_DONE, JOB_STATE_UPDATED, JOB_STATE_DRAINED} +TERMINAL_STATES = SUCCEEDED_END_STATES | FAILED_END_STATES Review comment: Are these lists used anywhere else? I think that the success/fail distinction is artificial. You cannot really say if CANCELED is a failure or not. Probably the same with DRAINED and UPDATED. Whatever is looking at the job status probably wants the full details. ## File path: airflow/providers/google/cloud/operators/dataflow.py ## @@ -406,6 +406,88 @@ def on_kill(self) -> None: self.hook.cancel_job(job_id=self.job_id, project_id=self.project_id) +class DataflowStartSqlJobOperator(BaseOperator): +""" +Starts Dataflow SQL query. Review comment: @ibzib would be a good reviewer here ## File path: airflow/providers/google/cloud/hooks/dataflow.py ## @@ -783,6 +794,77 @@ def cancel_job( name=job_name, job_id=job_id, location=location, -poll_sleep=self.poll_sleep +poll_sleep=self.poll_sleep, +num_retries=self.num_retries, ) jobs_controller.cancel() + +@GoogleBaseHook.fallback_to_default_project_id +def start_sql_job( +self, +job_name: str, +query: str, +options: Dict[str, Any], +project_id: str, +location: str = DEFAULT_DATAFLOW_LOCATION, +on_new_job_id_callback: Optional[Callable[[str], None]] = None +): +""" +Starts Dataflow SQL query. + +:param job_name: The unique name to assign to the Cloud Dataflow job. +:type job_name: str +:param query: The SQL query to execute. +:type query: str +:param options: Job parameters to be executed. +For more information, look at: + `https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query +`__ +command reference +:param location: The location of the Dataflow job (for example europe-west1) +:type location: str +:param project_id: The ID of the GCP project that owns the job. +If set to ``None`` or missing, the default project_id from the GCP connection is used. +:type project_id: Optional[str] +:param on_new_job_id_callback: Callback called when the job ID is known. +:type on_new_job_id_callback: callable +:return: the new job object +""" +cmd = [ +'gcloud', +'beta', Review comment: It is not required. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] aaltay commented on a change in pull request #8550: Add DataflowStartFlexTemplateOperator
aaltay commented on a change in pull request #8550: URL: https://github.com/apache/airflow/pull/8550#discussion_r416133310 ## File path: airflow/providers/google/cloud/hooks/dataflow.py ## @@ -93,15 +93,30 @@ def inner_wrapper(self: "DataflowHook", *args, **kwargs) -> RT: class DataflowJobStatus: """ Helper class with Dataflow job statuses. +Reference: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState Review comment: This file is also modified in https://github.com/apache/airflow/pull/8553. I am assuming it is the same changes, skipping this file for the review. ## File path: airflow/providers/google/cloud/operators/dataflow.py ## @@ -406,6 +406,71 @@ def on_kill(self) -> None: self.hook.cancel_job(job_id=self.job_id, project_id=self.project_id) +class DataflowStartFlexTemplateOperator(BaseOperator): +""" +Starts flex templates with the Dataflow pipeline. + +:param body: The request body +:param location: The location of the Dataflow job (for example europe-west1) +:type location: str +:param project_id: The ID of the GCP project that owns the job. +If set to ``None`` or missing, the default project_id from the GCP connection is used. +:type project_id: Optional[str] +:param gcp_conn_id: The connection ID to use connecting to Google Cloud +Platform. +:type gcp_conn_id: str +:param delegate_to: The account to impersonate, if any. +For this to work, the service account making the request must have +domain-wide delegation enabled. +:type delegate_to: str +""" + +template_fields = ["body", 'location', 'project_id', 'gcp_conn_id'] + +@apply_defaults +def __init__( +self, +body: Dict, +location: str, +project_id: Optional[str] = None, +gcp_conn_id: str = 'google_cloud_default', +delegate_to: Optional[str] = None, +*args, +**kwargs +) -> None: +super().__init__(*args, **kwargs) +self.body = body +self.location = location +self.project_id = project_id +self.gcp_conn_id = gcp_conn_id +self.delegate_to = delegate_to +self.job_id = None +self.hook: Optional[DataflowHook] = None + +def execute(self, context): +self.hook = DataflowHook( +gcp_conn_id=self.gcp_conn_id, +delegate_to=self.delegate_to, +) + +def set_current_job_id(job_id): +self.job_id = job_id + +job = self.hook.start_flex_template( +body=self.body, +location=self.location, +project_id=self.project_id, +on_new_job_id_callback=set_current_job_id, +) + +return job + +def on_kill(self) -> None: +self.log.info("On kill.") +if self.job_id: +self.hook.cancel_job(job_id=self.job_id, project_id=self.project_id) Review comment: Do you need to call this if job is no longer running? ## File path: tests/providers/google/cloud/operators/test_dataflow_system.py ## @@ -15,9 +15,20 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import os Review comment: This file reads more like an end2end test of various dataflow system things. Should it be a different PR? ## File path: airflow/providers/google/cloud/example_dags/example_dataflow_flex_template.py ## @@ -0,0 +1,63 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Example Airflow DAG for Google Cloud Dataflow service +""" +import os + +from airflow import models +from airflow.providers.google.cloud.operators.dataflow import DataflowStartFlexTemplateOperator +from airflow.utils.dates import days_ago + +GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project") + +DATAFLOW_FLEX_TEMPLATE_JOB_NAME = os.environ.get('DATAFLOW_FLEX_TEMPLATE_JOB_NAME', f"dataflow-flex-template") + +# For simplicity we use the same topic name as the subscription name. +PUBSUB_FLEX_TEMPLATE_TOPIC = os.environ.get('DATAFLOW_PUBSUB_FLE
[GitHub] [airflow] aaltay commented on pull request #8553: Add DataflowStartSQLQuery operator
aaltay commented on pull request #8553: URL: https://github.com/apache/airflow/pull/8553#issuecomment-620222175 /cc @kennknowles This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (AIRFLOW-5156) Add other authentication mechanisms to HttpHook
[ https://issues.apache.org/jira/browse/AIRFLOW-5156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17093913#comment-17093913 ] ASF GitHub Bot commented on AIRFLOW-5156: - mik-laj commented on a change in pull request #8429: URL: https://github.com/apache/airflow/pull/8429#discussion_r416123217 ## File path: airflow/providers/http/hooks/http.py ## @@ -66,7 +75,7 @@ def get_conn(self, headers=None): if conn.port: self.base_url = self.base_url + ":" + str(conn.port) if conn.login: -session.auth = (conn.login, conn.password) +session.auth = self.auth_type(conn.login, conn.password) Review comment: https://requests.readthedocs.io/en/master/user/authentication/ Is it possible to use this hook with other methods? I think it would be better if the user passed the auth object, not just the auth_type. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add other authentication mechanisms to HttpHook > --- > > Key: AIRFLOW-5156 > URL: https://issues.apache.org/jira/browse/AIRFLOW-5156 > Project: Apache Airflow > Issue Type: Improvement > Components: hooks >Affects Versions: 1.10.4 >Reporter: Joshua Kornblum >Assignee: Rohit S S >Priority: Minor > > It looks like the only supported authentication for HttpHooks is basic auth. > The hook code shows > {quote}_if conn.login:_ > _session.auth = (conn.login, conn.password)_ > {quote} > requests library supports any auth that inherits AuthBase – in my scenario we > need ntlmauth for API on IIS server. > [https://2.python-requests.org/en/master/user/advanced/#custom-authentication] > I would suggest option to pass auth object in constructor then add to if/else > control flow like > {quote}_if self.auth is not None:_ > _session.auth = self.auth_ > _elif conn.login:_ > _session.auth = (conn.login, conn.password)_ > {quote} > One would have to fetch the connection themselves and then fill out auth and > then pass that to hook which is flexible although a little awkard. > {quote}api_conn = BaseHook().get_connection('my_api') > auth = HttpNtlmAuth(api_conn.login, api_conn.password) > HttpSensor(task_id='sensing', auth=auth, ) > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [airflow] aaltay commented on a change in pull request #8553: Add DataflowStartSQLQuery operator
aaltay commented on a change in pull request #8553: URL: https://github.com/apache/airflow/pull/8553#discussion_r416125310 ## File path: airflow/providers/google/cloud/hooks/dataflow.py ## @@ -93,15 +94,30 @@ def inner_wrapper(self: "DataflowHook", *args, **kwargs) -> RT: class DataflowJobStatus: """ Helper class with Dataflow job statuses. +Reference: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState """ -JOB_STATE_DONE = "JOB_STATE_DONE" +JOB_STATE_UNKNOWN = "JOB_STATE_UNKNOWN" +JOB_STATE_STOPPED = "JOB_STATE_STOPPED" JOB_STATE_RUNNING = "JOB_STATE_RUNNING" +JOB_STATE_DONE = "JOB_STATE_DONE" JOB_STATE_FAILED = "JOB_STATE_FAILED" JOB_STATE_CANCELLED = "JOB_STATE_CANCELLED" +JOB_STATE_UPDATED = "JOB_STATE_UPDATED" +JOB_STATE_DRAINING = "JOB_STATE_DRAINING" +JOB_STATE_DRAINED = "JOB_STATE_DRAINED" JOB_STATE_PENDING = "JOB_STATE_PENDING" -FAILED_END_STATES = {JOB_STATE_FAILED, JOB_STATE_CANCELLED} -SUCCEEDED_END_STATES = {JOB_STATE_DONE} -END_STATES = SUCCEEDED_END_STATES | FAILED_END_STATES +JOB_STATE_CANCELLING = "JOB_STATE_CANCELLING" +JOB_STATE_QUEUED = "JOB_STATE_QUEUED" +FAILED_END_STATES = {JOB_STATE_FAILED, JOB_STATE_CANCELLED, JOB_STATE_STOPPED} Review comment: JOB_STATE_STOPPED is not a failed state. (See: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#jobstate) ## File path: airflow/providers/google/cloud/hooks/dataflow.py ## @@ -496,17 +517,15 @@ def start_java_dataflow( variables['jobName'] = name variables['region'] = location -def label_formatter(labels_dict): -return ['--labels={}'.format( -json.dumps(labels_dict).replace(' ', ''))] +if 'labels' in variables: +variables['labels'] = json.dumps(variables['labels']).replace(' ', '') Review comment: This is modifying user provided input. Is this your intention? ## File path: airflow/providers/google/cloud/hooks/dataflow.py ## @@ -640,9 +659,8 @@ def start_python_dataflow( # pylint: disable=too-many-arguments variables['job_name'] = name variables['region'] = location -def label_formatter(labels_dict): -return ['--labels={}={}'.format(key, value) -for key, value in labels_dict.items()] +if 'labels' in variables: +variables['labels'] = ['{}={}'.format(key, value) for key, value in variables['labels'].items()] Review comment: Why is this formatter different than the one from L521? Could we move all label fomating to the place where dataflow job is triggered? ## File path: airflow/providers/google/cloud/hooks/dataflow.py ## @@ -783,6 +794,77 @@ def cancel_job( name=job_name, job_id=job_id, location=location, -poll_sleep=self.poll_sleep +poll_sleep=self.poll_sleep, +num_retries=self.num_retries, ) jobs_controller.cancel() + +@GoogleBaseHook.fallback_to_default_project_id +def start_sql_job( +self, +job_name: str, +query: str, +options: Dict[str, Any], +project_id: str, +location: str = DEFAULT_DATAFLOW_LOCATION, +on_new_job_id_callback: Optional[Callable[[str], None]] = None +): +""" +Starts Dataflow SQL query. + +:param job_name: The unique name to assign to the Cloud Dataflow job. +:type job_name: str +:param query: The SQL query to execute. +:type query: str +:param options: Job parameters to be executed. +For more information, look at: + `https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query +`__ +command reference +:param location: The location of the Dataflow job (for example europe-west1) +:type location: str +:param project_id: The ID of the GCP project that owns the job. +If set to ``None`` or missing, the default project_id from the GCP connection is used. +:type project_id: Optional[str] +:param on_new_job_id_callback: Callback called when the job ID is known. +:type on_new_job_id_callback: callable +:return: the new job object +""" +cmd = [ +'gcloud', +'beta', Review comment: Is the beta still required, do you know? ## File path: airflow/providers/google/cloud/example_dags/example_dataflow_sql.py ## @@ -0,0 +1,60 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "
[GitHub] [airflow] mik-laj commented on a change in pull request #8429: [AIRFLOW-5156] Added auth type to HttpHook
mik-laj commented on a change in pull request #8429: URL: https://github.com/apache/airflow/pull/8429#discussion_r416123217 ## File path: airflow/providers/http/hooks/http.py ## @@ -66,7 +75,7 @@ def get_conn(self, headers=None): if conn.port: self.base_url = self.base_url + ":" + str(conn.port) if conn.login: -session.auth = (conn.login, conn.password) +session.auth = self.auth_type(conn.login, conn.password) Review comment: https://requests.readthedocs.io/en/master/user/authentication/ Is it possible to use this hook with other methods? I think it would be better if the user passed the auth object, not just the auth_type. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] mik-laj commented on issue #8541: "TypeError: can't pickle _thread.RLock objects" on usage of BigQueryOperator
mik-laj commented on issue #8541: URL: https://github.com/apache/airflow/issues/8541#issuecomment-620217086 Does this problem still occur in Airflow in the master branch? We have introduced a lot of changes and improvements in GCP operators and I think that this problem may have already been solved. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (AIRFLOW-5156) Add other authentication mechanisms to HttpHook
[ https://issues.apache.org/jira/browse/AIRFLOW-5156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17093909#comment-17093909 ] ASF GitHub Bot commented on AIRFLOW-5156: - mik-laj commented on a change in pull request #8429: URL: https://github.com/apache/airflow/pull/8429#discussion_r416123217 ## File path: airflow/providers/http/hooks/http.py ## @@ -66,7 +75,7 @@ def get_conn(self, headers=None): if conn.port: self.base_url = self.base_url + ":" + str(conn.port) if conn.login: -session.auth = (conn.login, conn.password) +session.auth = self.auth_type(conn.login, conn.password) Review comment: https://requests.readthedocs.io/en/master/user/authentication/ Is it used with other methods? I think it would be better if the user passed the auth object, not just the auth_type. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add other authentication mechanisms to HttpHook > --- > > Key: AIRFLOW-5156 > URL: https://issues.apache.org/jira/browse/AIRFLOW-5156 > Project: Apache Airflow > Issue Type: Improvement > Components: hooks >Affects Versions: 1.10.4 >Reporter: Joshua Kornblum >Assignee: Rohit S S >Priority: Minor > > It looks like the only supported authentication for HttpHooks is basic auth. > The hook code shows > {quote}_if conn.login:_ > _session.auth = (conn.login, conn.password)_ > {quote} > requests library supports any auth that inherits AuthBase – in my scenario we > need ntlmauth for API on IIS server. > [https://2.python-requests.org/en/master/user/advanced/#custom-authentication] > I would suggest option to pass auth object in constructor then add to if/else > control flow like > {quote}_if self.auth is not None:_ > _session.auth = self.auth_ > _elif conn.login:_ > _session.auth = (conn.login, conn.password)_ > {quote} > One would have to fetch the connection themselves and then fill out auth and > then pass that to hook which is flexible although a little awkard. > {quote}api_conn = BaseHook().get_connection('my_api') > auth = HttpNtlmAuth(api_conn.login, api_conn.password) > HttpSensor(task_id='sensing', auth=auth, ) > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [airflow] mik-laj commented on a change in pull request #8429: [AIRFLOW-5156] Added auth type to HttpHook
mik-laj commented on a change in pull request #8429: URL: https://github.com/apache/airflow/pull/8429#discussion_r416123217 ## File path: airflow/providers/http/hooks/http.py ## @@ -66,7 +75,7 @@ def get_conn(self, headers=None): if conn.port: self.base_url = self.base_url + ":" + str(conn.port) if conn.login: -session.auth = (conn.login, conn.password) +session.auth = self.auth_type(conn.login, conn.password) Review comment: https://requests.readthedocs.io/en/master/user/authentication/ Is it used with other methods? I think it would be better if the user passed the auth object, not just the auth_type. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] mik-laj commented on issue #8567: Support YAML input for CloudBuildCreateOperator
mik-laj commented on issue #8567: URL: https://github.com/apache/airflow/issues/8567#issuecomment-620212297 @joppevos Use the current argument again if possible. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (AIRFLOW-5156) Add other authentication mechanisms to HttpHook
[ https://issues.apache.org/jira/browse/AIRFLOW-5156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17093903#comment-17093903 ] ASF GitHub Bot commented on AIRFLOW-5156: - jeffolsi commented on a change in pull request #8429: URL: https://github.com/apache/airflow/pull/8429#discussion_r416119369 ## File path: airflow/providers/http/hooks/http.py ## @@ -66,7 +75,7 @@ def get_conn(self, headers=None): if conn.port: self.base_url = self.base_url + ":" + str(conn.port) if conn.login: -session.auth = (conn.login, conn.password) +session.auth = self.auth_type(conn.login, conn.password) Review comment: the change make sense to me since it uses request package so it can work with custom auth https://2.python-requests.org/en/master/user/advanced/#custom-authentication provide the the auth_type in this context is auth object passed in the operator constructor This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add other authentication mechanisms to HttpHook > --- > > Key: AIRFLOW-5156 > URL: https://issues.apache.org/jira/browse/AIRFLOW-5156 > Project: Apache Airflow > Issue Type: Improvement > Components: hooks >Affects Versions: 1.10.4 >Reporter: Joshua Kornblum >Assignee: Rohit S S >Priority: Minor > > It looks like the only supported authentication for HttpHooks is basic auth. > The hook code shows > {quote}_if conn.login:_ > _session.auth = (conn.login, conn.password)_ > {quote} > requests library supports any auth that inherits AuthBase – in my scenario we > need ntlmauth for API on IIS server. > [https://2.python-requests.org/en/master/user/advanced/#custom-authentication] > I would suggest option to pass auth object in constructor then add to if/else > control flow like > {quote}_if self.auth is not None:_ > _session.auth = self.auth_ > _elif conn.login:_ > _session.auth = (conn.login, conn.password)_ > {quote} > One would have to fetch the connection themselves and then fill out auth and > then pass that to hook which is flexible although a little awkard. > {quote}api_conn = BaseHook().get_connection('my_api') > auth = HttpNtlmAuth(api_conn.login, api_conn.password) > HttpSensor(task_id='sensing', auth=auth, ) > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [airflow] jeffolsi commented on a change in pull request #8429: [AIRFLOW-5156] Added auth type to HttpHook
jeffolsi commented on a change in pull request #8429: URL: https://github.com/apache/airflow/pull/8429#discussion_r416119369 ## File path: airflow/providers/http/hooks/http.py ## @@ -66,7 +75,7 @@ def get_conn(self, headers=None): if conn.port: self.base_url = self.base_url + ":" + str(conn.port) if conn.login: -session.auth = (conn.login, conn.password) +session.auth = self.auth_type(conn.login, conn.password) Review comment: the change make sense to me since it uses request package so it can work with custom auth https://2.python-requests.org/en/master/user/advanced/#custom-authentication provide the the auth_type in this context is auth object passed in the operator constructor This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] joppevos commented on issue #8567: Support YAML input for CloudBuildCreateOperator
joppevos commented on issue #8567: URL: https://github.com/apache/airflow/issues/8567#issuecomment-620210294 @mik-laj Happy to pick up this task. Do you mean adding an extra argument to be able to pass the filepath or adjusting the current `body` variable to also accept a filepath instead of only dict builds? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] boring-cyborg[bot] commented on pull request #8533: Stop DockerSwarmOperator from pulling Docker images
boring-cyborg[bot] commented on pull request #8533: URL: https://github.com/apache/airflow/pull/8533#issuecomment-620207440 Awesome work, congrats on your first merged pull request! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (AIRFLOW-5156) Add other authentication mechanisms to HttpHook
[ https://issues.apache.org/jira/browse/AIRFLOW-5156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17093890#comment-17093890 ] ASF GitHub Bot commented on AIRFLOW-5156: - mik-laj commented on a change in pull request #8429: URL: https://github.com/apache/airflow/pull/8429#discussion_r416110652 ## File path: airflow/providers/http/hooks/http.py ## @@ -66,7 +75,7 @@ def get_conn(self, headers=None): if conn.port: self.base_url = self.base_url + ":" + str(conn.port) if conn.login: -session.auth = (conn.login, conn.password) +session.auth = self.auth_type(conn.login, conn.password) Review comment: Other auth methods require a different number of arguments, e.g. OAuth2.What is the purpose of this change? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add other authentication mechanisms to HttpHook > --- > > Key: AIRFLOW-5156 > URL: https://issues.apache.org/jira/browse/AIRFLOW-5156 > Project: Apache Airflow > Issue Type: Improvement > Components: hooks >Affects Versions: 1.10.4 >Reporter: Joshua Kornblum >Assignee: Rohit S S >Priority: Minor > > It looks like the only supported authentication for HttpHooks is basic auth. > The hook code shows > {quote}_if conn.login:_ > _session.auth = (conn.login, conn.password)_ > {quote} > requests library supports any auth that inherits AuthBase – in my scenario we > need ntlmauth for API on IIS server. > [https://2.python-requests.org/en/master/user/advanced/#custom-authentication] > I would suggest option to pass auth object in constructor then add to if/else > control flow like > {quote}_if self.auth is not None:_ > _session.auth = self.auth_ > _elif conn.login:_ > _session.auth = (conn.login, conn.password)_ > {quote} > One would have to fetch the connection themselves and then fill out auth and > then pass that to hook which is flexible although a little awkard. > {quote}api_conn = BaseHook().get_connection('my_api') > auth = HttpNtlmAuth(api_conn.login, api_conn.password) > HttpSensor(task_id='sensing', auth=auth, ) > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [airflow] mik-laj commented on a change in pull request #8429: [AIRFLOW-5156] Added auth type to HttpHook
mik-laj commented on a change in pull request #8429: URL: https://github.com/apache/airflow/pull/8429#discussion_r416110652 ## File path: airflow/providers/http/hooks/http.py ## @@ -66,7 +75,7 @@ def get_conn(self, headers=None): if conn.port: self.base_url = self.base_url + ":" + str(conn.port) if conn.login: -session.auth = (conn.login, conn.password) +session.auth = self.auth_type(conn.login, conn.password) Review comment: Other auth methods require a different number of arguments, e.g. OAuth2.What is the purpose of this change? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] mik-laj commented on a change in pull request #8591: Add http system test
mik-laj commented on a change in pull request #8591: URL: https://github.com/apache/airflow/pull/8591#discussion_r416109551 ## File path: tests/providers/http/operators/test_http_system.py ## @@ -0,0 +1,35 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import os + +import pytest + +from tests.test_utils import AIRFLOW_MAIN_FOLDER +from tests.test_utils.system_tests_class import SystemTest + +HTTP_DAG_FOLDER = os.path.join( +AIRFLOW_MAIN_FOLDER, "airflow", "providers", "http", "example_dags" +) + + +@pytest.mark.backend("mysql", "postgres") +@pytest.mark.system("http") Review comment: It is not the core operator. It is in the provider directory. This allows it to be released independently from Airflow. It is common and mature, but its use and usage are still developing. It is flexible enough to be extended with new authentication mechanisms and more. Last week we had a valuable and non-doc contribution to this operator. https://github.com/apache/airflow/pull/8429 Here is list of all core operators. ``` airflow.operators.bash.BashOperator airflow.operators.branch_operator.BaseBranchOperator airflow.operators.check_operator.CheckOperator airflow.operators.check_operator.IntervalCheckOperator airflow.operators.check_operator.ThresholdCheckOperator airflow.operators.check_operator.ValueCheckOperator airflow.operators.dagrun_operator.TriggerDagRunOperator airflow.operators.dummy_operator.DummyOperator airflow.operators.generic_transfer.GenericTransfer airflow.operators.latest_only_operator.LatestOnlyOperator airflow.operators.presto_check_operator.PrestoCheckOperator airflow.operators.presto_check_operator.PrestoIntervalCheckOperator airflow.operators.presto_check_operator.PrestoValueCheckOperator airflow.operators.python.BranchPythonOperator airflow.operators.python.PythonOperator airflow.operators.python.PythonVirtualenvOperator airflow.operators.python.ShortCircuitOperator airflow.operators.subdag_operator.SubDagOperator ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] potiuk commented on pull request #8473: [AIRFLOW-8472]: `PATCH` for Databricks hook `_do_api_call`
potiuk commented on pull request #8473: URL: https://github.com/apache/airflow/pull/8473#issuecomment-620189429 Great ! thanks @dimonchik-suvorov ! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (AIRFLOW-6981) Move Google Cloud Build from Discovery API to Python Library
[ https://issues.apache.org/jira/browse/AIRFLOW-6981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17093875#comment-17093875 ] ASF GitHub Bot commented on AIRFLOW-6981: - potiuk commented on pull request #8575: URL: https://github.com/apache/airflow/pull/8575#issuecomment-620186003 You can read more about it in contributing docs: https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#generating-requirement-files This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Move Google Cloud Build from Discovery API to Python Library > > > Key: AIRFLOW-6981 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6981 > Project: Apache Airflow > Issue Type: Improvement > Components: gcp >Affects Versions: 2.0.0 >Reporter: Ryan Yuan >Assignee: Ryan Yuan >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)