[GitHub] [airflow] oripwk commented on a change in pull request #8572: Improve template capabilities of EMR job and step operators

2020-04-27 Thread GitBox


oripwk commented on a change in pull request #8572:
URL: https://github.com/apache/airflow/pull/8572#discussion_r416360122



##
File path: airflow/providers/amazon/aws/operators/emr_add_steps.py
##
@@ -38,13 +38,14 @@ class EmrAddStepsOperator(BaseOperator):
 :type cluster_states: list
 :param aws_conn_id: aws connection to uses
 :type aws_conn_id: str
-:param steps: boto3 style steps to be added to the jobflow. (templated)
-:type steps: list
+:param steps: boto3 style steps or reference to a steps file (must be 
'.json' or '.jinja2') to
+be added to the jobflow. (templated)
+:type steps: list|str
 :param do_xcom_push: if True, job_flow_id is pushed to XCom with key 
job_flow_id.
 :type do_xcom_push: bool
 """
 template_fields = ['job_flow_id', 'job_flow_name', 'cluster_states', 
'steps']
-template_ext = ()
+template_ext = ('.json', '.jinja2')

Review comment:
   I can remove it. Though it's a widespread convention, and also helps 
distinguish between real files and template files.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] boring-cyborg[bot] commented on issue #8599: Airflow DAGs On/Off toggle button not working properly with RBAC=True mode

2020-04-27 Thread GitBox


boring-cyborg[bot] commented on issue #8599:
URL: https://github.com/apache/airflow/issues/8599#issuecomment-620407902


   Thanks for opening your first issue here! Be sure to follow the issue 
template!
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] ytiam opened a new issue #8599: Airflow DAGs On/Off toggle button not working properly with RBAC=True mode

2020-04-27 Thread GitBox


ytiam opened a new issue #8599:
URL: https://github.com/apache/airflow/issues/8599


   I am using an AWS ec2 instance and installed apache-airflow there. Following 
steps, I am doing in that instance,
1) After initiating airflow with 'airflow initdb', I created a user with 
**Admin** role using 'airflow create_user' option and  in airflow.cfg changed 
rbac = True. 
   2) Again ran 'airflow initdb' and 'airflow webserver'. It initiated the 
webserver.
   3) In another tab,  run 'airflow scheduler'
   4) Then using port-forwarding to port forward the 8080 port from the ec2 
instance to my local machine
   5) In my local browser, I am just opening localhost:8080/
   6) On Airflow login page, putting credentials of the Admin user, I logged in 
into Airflow
   7) After toggling ON a dag from UI, when I am refreshing the page or 
clicking on some other option, the DAG toggling button is automatically turning 
OFF. and hence I am not able to run any DAG
   
   
   
![image](https://user-images.githubusercontent.com/13828878/80454358-8bd69280-8947-11ea-9fbf-e63b20e09d9b.png)
   
   The button I am talking about is on the left side of the DAG name.
   
   Please help anyone 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] oripwk commented on a change in pull request #8572: Improve template capabilities of EMR job and step operators

2020-04-27 Thread GitBox


oripwk commented on a change in pull request #8572:
URL: https://github.com/apache/airflow/pull/8572#discussion_r416360122



##
File path: airflow/providers/amazon/aws/operators/emr_add_steps.py
##
@@ -38,13 +38,14 @@ class EmrAddStepsOperator(BaseOperator):
 :type cluster_states: list
 :param aws_conn_id: aws connection to uses
 :type aws_conn_id: str
-:param steps: boto3 style steps to be added to the jobflow. (templated)
-:type steps: list
+:param steps: boto3 style steps or reference to a steps file (must be 
'.json' or '.jinja2') to
+be added to the jobflow. (templated)
+:type steps: list|str
 :param do_xcom_push: if True, job_flow_id is pushed to XCom with key 
job_flow_id.
 :type do_xcom_push: bool
 """
 template_fields = ['job_flow_id', 'job_flow_name', 'cluster_states', 
'steps']
-template_ext = ()
+template_ext = ('.json', '.jinja2')

Review comment:
   I can remove it. Though it's a widespread convention, and also helps 
distinguish between real file and template files.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] Acehaidrey opened a new pull request #8598: make hive macros py3 compatible with decoded string return type

2020-04-27 Thread GitBox


Acehaidrey opened a new pull request #8598:
URL: https://github.com/apache/airflow/pull/8598


   With the current implementation of the hive macros encoding the resultant 
from the metastore calls, in py2 this returns a string type still but in 
python3 encoding forces the representation to be a byte type. See the example 
below
   ```
   ahaidrey-078HTD6:incubator-airflow ahaidrey$ python3
   Python 3.7.0 (default, Oct  2 2018, 09:20:07)
   [Clang 10.0.0 (clang-1000.11.45.2)] on darwin
   Type "help", "copyright", "credits" or "license" for more information.
   >>> a = 'apple'
   >>> b = a.encode('utf-8')
   >>> type(b)
   
   
   ahaidrey-078HTD6:~ ahaidrey$ python
   Python 2.7.16 (default, Dec  3 2019, 02:03:47)
   [GCC 4.2.1 Compatible Apple LLVM 9.0.0 (clang-900.0.31)] on darwin
   Type "help", "copyright", "credits" or "license" for more information.
   >>> a = 'apple'
   >>> b = a.encode('utf-8')
   >>> type(b)
   
   ```
   The issue with this is that the resultant for example being used by macros 
returns a byte type that isn't templatable as a string and breaks the queries 
it is used in. What this means is that all the templates need to be written as 
something like this:
   ```
   val = "{{ macros.hive.max_partition(table='mytable', schema='myschema', 
field='myfield', filter_map={'key1': 'val1'}).decode('utf-8') }}"
   ```
   Requiring from the users end to always decode the value is not the intention 
of this method and should use a value that can be returned as is.
   
   This PR is to fix this ordeal. We may be able to just remove the encoding 
altogether but it could make things backwards incompatible.
   
   ---
   Make sure to mark the boxes below before creating PR: [x]
   
   - [x] Description above provides context of the change
   - [x] Unit tests coverage for changes (not needed for documentation changes)
   - [x] Target Github ISSUE in description if exists (not existing)
   - [x] Commits follow "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)"
   - [x] Relevant documentation is updated including usage instructions.
   - [x] I will engage committers as explained in [Contribution Workflow 
Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in 
[UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request 
Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)
 for more information.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] VinodKumarLogan commented on issue #8565: SalesforceHook missing method to return dataframe

2020-04-27 Thread GitBox


VinodKumarLogan commented on issue #8565:
URL: https://github.com/apache/airflow/issues/8565#issuecomment-620396479


   Hi @jeffolsi @potiuk , can I give this a shot and submit a PR? 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (AIRFLOW-4568) The ExternalTaskSensor should be configurable to raise an Airflow Exception in case the poked external task reaches a disallowed state, such as f.i. failed

2020-04-27 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-4568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094158#comment-17094158
 ] 

ASF GitHub Bot commented on AIRFLOW-4568:
-

lokeshlal commented on pull request #8509:
URL: https://github.com/apache/airflow/pull/8509#issuecomment-620384248


   Requesting review of this PR



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> The ExternalTaskSensor should be configurable to raise an Airflow Exception 
> in case the poked external task reaches a disallowed state, such as f.i. 
> failed
> ---
>
> Key: AIRFLOW-4568
> URL: https://issues.apache.org/jira/browse/AIRFLOW-4568
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: operators
>Affects Versions: 1.10.3
>Reporter: ddluke
>Priority: Minor
>
> _As an engineer, I would like to have the behavior of the ExternalTaskSensor 
> changed_
> _So that it fails in case the poked external_task_id fails_
> *Therefore*
>  * I suggest extending the behavior of the sensor to optionally also query 
> the TaskInstance for disallowed states and raise an AirflowException if 
> found. Currently, if the poked external task reaches a failed state, the 
> sensor continues to poke and does not terminate
> *Acceptance Criteria (from my pov)*
>  * The class interface for ExternalTaskSensor is extended with an additional 
> parameter, disallowed_states, which is an Optional List of 
> airflow.utils.state.State
>  * The poke method is expanded to count the number of rows from TaskInstance 
> which met the filter criteria dag_id, task_id, disallowed_states and 
> dttm_filter if disallowed_states is not None
>  * If disallowed_states is not None and the above query returns a counter > 
> 0, an Airflow Exception is thrown



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] lokeshlal commented on pull request #8509: [AIRFLOW-4568]The ExternalTaskSensor should be configurable to raise an Airflow Exception in case the poked external task reaches a disall

2020-04-27 Thread GitBox


lokeshlal commented on pull request #8509:
URL: https://github.com/apache/airflow/pull/8509#issuecomment-620384248


   Requesting review of this PR



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] dhuang commented on a change in pull request #8422: Add Snowflake system test

2020-04-27 Thread GitBox


dhuang commented on a change in pull request #8422:
URL: https://github.com/apache/airflow/pull/8422#discussion_r416327461



##
File path: airflow/providers/snowflake/operators/s3_to_snowflake.py
##
@@ -51,7 +51,7 @@ def __init__(self,
  table,
  stage,
  file_format,
- schema,
+ schema,  # TODO: shouldn't be required

Review comment:
   Similar to database/warehouse/role values, the schema may already be set 
from the user's settings (`DEFAULT_NAMESPACE`) or in the session [set via 
connection 
extras](https://github.com/apache/airflow/blob/7d4b81ddd7f0c11532b0abac2050336b4f243b88/airflow/providers/snowflake/hooks/snowflake.py#L62).
 It can also be explicitly referenced in the table name, i.e. 
`my_schema.my_table`. Although there are good reasons to override the session 
default, I think that should be optional and is consistent with 
[SnowflakeOperator](https://github.com/apache/airflow/blob/7d4b81ddd7f0c11532b0abac2050336b4f243b88/airflow/providers/snowflake/operators/snowflake.py#L44-L46).
   
   Will update comment to be a little more specific.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (AIRFLOW-6231) Show DAG Run conf in graph view

2020-04-27 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-6231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094155#comment-17094155
 ] 

ASF GitHub Bot commented on AIRFLOW-6231:
-

dhuang commented on pull request #6794:
URL: https://github.com/apache/airflow/pull/6794#issuecomment-620380349


   > @dhuang Can you add a test to "When a conf is passed in DAG it appears in 
ListView"?
   > 
   > Example: ccbaf57#diff-948e87b4f8f644b3ad8c7950958df033
   
   Good call, added. Thanks for providing the example.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Show DAG Run conf in graph view
> ---
>
> Key: AIRFLOW-6231
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6231
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: webserver
>Affects Versions: 1.10.6
>Reporter: Daniel Huang
>Assignee: Daniel Huang
>Priority: Trivial
>
> A DAG run's conf (from triggered DAGs) isn't surfaced anywhere other than in 
> the database itself. Would be handy to show it when one exists.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] dhuang commented on pull request #6794: [AIRFLOW-6231] Display DAG run conf in the list view

2020-04-27 Thread GitBox


dhuang commented on pull request #6794:
URL: https://github.com/apache/airflow/pull/6794#issuecomment-620380349


   > @dhuang Can you add a test to "When a conf is passed in DAG it appears in 
ListView"?
   > 
   > Example: ccbaf57#diff-948e87b4f8f644b3ad8c7950958df033
   
   Good call, added. Thanks for providing the example.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] stale[bot] commented on pull request #7645: [AIRFLOW-7004][depends on AIRFLOW-7003][WIP] Lazy initialize settings

2020-04-27 Thread GitBox


stale[bot] commented on pull request #7645:
URL: https://github.com/apache/airflow/pull/7645#issuecomment-620366010


   This issue has been automatically marked as stale because it has not had 
recent activity. It will be closed if no further activity occurs. Thank you for 
your contributions.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] stale[bot] commented on pull request #5308: [AIRFLOW-4549] skipped tasks should be ok for wait_for_downstream

2020-04-27 Thread GitBox


stale[bot] commented on pull request #5308:
URL: https://github.com/apache/airflow/pull/5308#issuecomment-620366004


   This issue has been automatically marked as stale because it has not had 
recent activity. It will be closed if no further activity occurs. Thank you for 
your contributions.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (AIRFLOW-7004) Lazy initialize settings

2020-04-27 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-7004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094133#comment-17094133
 ] 

ASF GitHub Bot commented on AIRFLOW-7004:
-

stale[bot] commented on pull request #7645:
URL: https://github.com/apache/airflow/pull/7645#issuecomment-620366010


   This issue has been automatically marked as stale because it has not had 
recent activity. It will be closed if no further activity occurs. Thank you for 
your contributions.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Lazy initialize settings
> 
>
> Key: AIRFLOW-7004
> URL: https://issues.apache.org/jira/browse/AIRFLOW-7004
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 1.10.9
>Reporter: Kamil Bregula
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (AIRFLOW-5948) Replace SimpleDag with serialized version

2020-04-27 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-5948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094131#comment-17094131
 ] 

ASF GitHub Bot commented on AIRFLOW-5948:
-

stale[bot] commented on pull request #7694:
URL: https://github.com/apache/airflow/pull/7694#issuecomment-620366014


   This issue has been automatically marked as stale because it has not had 
recent activity. It will be closed if no further activity occurs. Thank you for 
your contributions.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Replace SimpleDag with serialized version
> -
>
> Key: AIRFLOW-5948
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5948
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: core, scheduler
>Affects Versions: 2.0.0, 1.10.7
>Reporter: Kaxil Naik
>Assignee: Kaxil Naik
>Priority: Critical
>  Labels: dag-serialization
>
> Replace SimpleDag with serialized version (json over multiprocessing) in 
> SchedulerJob etc., no other change in scheduler behaviour. (This doesn't make 
> sense long term, but does tidy up the code)
> Currently, we have 2 Serialized Representation:
> # SimpleDags (were created because SimpleDags were not pickleable)
> # Serialized DAG
> We should remove SimpleDags



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (AIRFLOW-4549) wait_for_downstream does not respect skipped tasks

2020-04-27 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-4549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094132#comment-17094132
 ] 

ASF GitHub Bot commented on AIRFLOW-4549:
-

stale[bot] commented on pull request #5308:
URL: https://github.com/apache/airflow/pull/5308#issuecomment-620366004


   This issue has been automatically marked as stale because it has not had 
recent activity. It will be closed if no further activity occurs. Thank you for 
your contributions.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> wait_for_downstream does not respect skipped tasks
> --
>
> Key: AIRFLOW-4549
> URL: https://issues.apache.org/jira/browse/AIRFLOW-4549
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: scheduler
>Reporter: Dima Kamalov
>Assignee: Teddy Hartanto
>Priority: Major
>
> See 
> [http://mail-archives.apache.org/mod_mbox/airflow-dev/201609.mbox/%3ccaheep7utgpjvkgww9_9n5fupnu+pskf3rmbvxugk5dxb6bh...@mail.gmail.com%3E]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] stale[bot] commented on pull request #7694: [AIRFLOW-5948][WIP] Replace SimpleDag with SerializedDag

2020-04-27 Thread GitBox


stale[bot] commented on pull request #7694:
URL: https://github.com/apache/airflow/pull/7694#issuecomment-620366014


   This issue has been automatically marked as stale because it has not had 
recent activity. It will be closed if no further activity occurs. Thank you for 
your contributions.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] randr97 edited a comment on pull request #8503: Facebook system test

2020-04-27 Thread GitBox


randr97 edited a comment on pull request #8503:
URL: https://github.com/apache/airflow/pull/8503#issuecomment-620362839


   > I have run these tests and there are two minor problems.
   > 
   > * The bucket is not created or deleted;
   > * The BigQuery is not deleted.
   
   Will add the create bucket and delete BigQ operator.
   Thanks @mik-laj :)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] randr97 commented on pull request #8503: Facebook system test

2020-04-27 Thread GitBox


randr97 commented on pull request #8503:
URL: https://github.com/apache/airflow/pull/8503#issuecomment-620362839


   > I have run these tests and there are two minor problems.
   > 
   > * The bucket is not created or deleted;
   > * The BigQuery is not deleted.
   
   Will add the create bucket and delete BigQ operator.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (AIRFLOW-5156) Add other authentication mechanisms to HttpHook

2020-04-27 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-5156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094123#comment-17094123
 ] 

ASF GitHub Bot commented on AIRFLOW-5156:
-

randr97 commented on a change in pull request #8429:
URL: https://github.com/apache/airflow/pull/8429#discussion_r416302309



##
File path: airflow/providers/http/hooks/http.py
##
@@ -66,7 +75,7 @@ def get_conn(self, headers=None):
 if conn.port:
 self.base_url = self.base_url + ":" + str(conn.port)
 if conn.login:
-session.auth = (conn.login, conn.password)
+session.auth = self.auth_type(conn.login, conn.password)

Review comment:
   @mik-laj the reason why I have given a default value is cus of backward 
compatibility.
   For other auth methods I think we will have to build stuff out.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add other authentication mechanisms to HttpHook
> ---
>
> Key: AIRFLOW-5156
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5156
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: hooks
>Affects Versions: 1.10.4
>Reporter: Joshua Kornblum
>Assignee: Rohit S S
>Priority: Minor
>
> It looks like the only supported authentication for HttpHooks is basic auth.
> The hook code shows 
> {quote}_if conn.login:_
>   _session.auth = (conn.login, conn.password)_
> {quote}
> requests library supports any auth that inherits AuthBase – in my scenario we 
> need ntlmauth for API on IIS server. 
> [https://2.python-requests.org/en/master/user/advanced/#custom-authentication]
> I would suggest option to pass auth object in constructor then add to if/else 
> control flow like
> {quote}_if self.auth is not None:_
>   _session.auth = self.auth_
> _elif conn.login:_
>   _session.auth = (conn.login, conn.password)_
> {quote}
> One would have to fetch the connection themselves and then fill out auth and 
> then pass that to hook which is flexible although a little awkard.
> {quote}api_conn = BaseHook().get_connection('my_api')
> auth = HttpNtlmAuth(api_conn.login, api_conn.password)
> HttpSensor(task_id='sensing', auth=auth, )
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] randr97 commented on a change in pull request #8429: [AIRFLOW-5156] Added auth type to HttpHook

2020-04-27 Thread GitBox


randr97 commented on a change in pull request #8429:
URL: https://github.com/apache/airflow/pull/8429#discussion_r416302309



##
File path: airflow/providers/http/hooks/http.py
##
@@ -66,7 +75,7 @@ def get_conn(self, headers=None):
 if conn.port:
 self.base_url = self.base_url + ":" + str(conn.port)
 if conn.login:
-session.auth = (conn.login, conn.password)
+session.auth = self.auth_type(conn.login, conn.password)

Review comment:
   @mik-laj the reason why I have given a default value is cus of backward 
compatibility.
   For other auth methods I think we will have to build stuff out.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (AIRFLOW-5156) Add other authentication mechanisms to HttpHook

2020-04-27 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-5156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094122#comment-17094122
 ] 

ASF GitHub Bot commented on AIRFLOW-5156:
-

randr97 commented on a change in pull request #8429:
URL: https://github.com/apache/airflow/pull/8429#discussion_r416302309



##
File path: airflow/providers/http/hooks/http.py
##
@@ -66,7 +75,7 @@ def get_conn(self, headers=None):
 if conn.port:
 self.base_url = self.base_url + ":" + str(conn.port)
 if conn.login:
-session.auth = (conn.login, conn.password)
+session.auth = self.auth_type(conn.login, conn.password)

Review comment:
   @mik-laj the reason why I have given a default value is cus of backward 
compatibility.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add other authentication mechanisms to HttpHook
> ---
>
> Key: AIRFLOW-5156
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5156
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: hooks
>Affects Versions: 1.10.4
>Reporter: Joshua Kornblum
>Assignee: Rohit S S
>Priority: Minor
>
> It looks like the only supported authentication for HttpHooks is basic auth.
> The hook code shows 
> {quote}_if conn.login:_
>   _session.auth = (conn.login, conn.password)_
> {quote}
> requests library supports any auth that inherits AuthBase – in my scenario we 
> need ntlmauth for API on IIS server. 
> [https://2.python-requests.org/en/master/user/advanced/#custom-authentication]
> I would suggest option to pass auth object in constructor then add to if/else 
> control flow like
> {quote}_if self.auth is not None:_
>   _session.auth = self.auth_
> _elif conn.login:_
>   _session.auth = (conn.login, conn.password)_
> {quote}
> One would have to fetch the connection themselves and then fill out auth and 
> then pass that to hook which is flexible although a little awkard.
> {quote}api_conn = BaseHook().get_connection('my_api')
> auth = HttpNtlmAuth(api_conn.login, api_conn.password)
> HttpSensor(task_id='sensing', auth=auth, )
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] randr97 commented on a change in pull request #8429: [AIRFLOW-5156] Added auth type to HttpHook

2020-04-27 Thread GitBox


randr97 commented on a change in pull request #8429:
URL: https://github.com/apache/airflow/pull/8429#discussion_r416302309



##
File path: airflow/providers/http/hooks/http.py
##
@@ -66,7 +75,7 @@ def get_conn(self, headers=None):
 if conn.port:
 self.base_url = self.base_url + ":" + str(conn.port)
 if conn.login:
-session.auth = (conn.login, conn.password)
+session.auth = self.auth_type(conn.login, conn.password)

Review comment:
   @mik-laj the reason why I have given a default value is cus of backward 
compatibility.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (AIRFLOW-6981) Move Google Cloud Build from Discovery API to Python Library

2020-04-27 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-6981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094118#comment-17094118
 ] 

ASF GitHub Bot commented on AIRFLOW-6981:
-

mik-laj edited a comment on pull request #8575:
URL: https://github.com/apache/airflow/pull/8575#issuecomment-620357784


   Please ignore tests in quarantine. They are flaky. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Move Google Cloud Build from Discovery API to Python Library
> 
>
> Key: AIRFLOW-6981
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6981
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: gcp
>Affects Versions: 2.0.0
>Reporter: Ryan Yuan
>Assignee: Ryan Yuan
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (AIRFLOW-6981) Move Google Cloud Build from Discovery API to Python Library

2020-04-27 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-6981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094117#comment-17094117
 ] 

ASF GitHub Bot commented on AIRFLOW-6981:
-

mik-laj commented on pull request #8575:
URL: https://github.com/apache/airflow/pull/8575#issuecomment-620357784


   Please ignore tests in quarantine. It's flaky. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Move Google Cloud Build from Discovery API to Python Library
> 
>
> Key: AIRFLOW-6981
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6981
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: gcp
>Affects Versions: 2.0.0
>Reporter: Ryan Yuan
>Assignee: Ryan Yuan
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] mik-laj edited a comment on pull request #8575: [AIRFLOW-6981] Move Google Cloud Build from Discovery API to Python Library

2020-04-27 Thread GitBox


mik-laj edited a comment on pull request #8575:
URL: https://github.com/apache/airflow/pull/8575#issuecomment-620357784


   Please ignore tests in quarantine. They are flaky. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on pull request #8575: [AIRFLOW-6981] Move Google Cloud Build from Discovery API to Python Library

2020-04-27 Thread GitBox


mik-laj commented on pull request #8575:
URL: https://github.com/apache/airflow/pull/8575#issuecomment-620357784


   Please ignore tests in quarantine. It's flaky. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (AIRFLOW-6981) Move Google Cloud Build from Discovery API to Python Library

2020-04-27 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-6981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094113#comment-17094113
 ] 

ASF GitHub Bot commented on AIRFLOW-6981:
-

ryanyuan commented on pull request #8575:
URL: https://github.com/apache/airflow/pull/8575#issuecomment-620354199


   Cheers @potiuk. This time it failed at Quarantined:Pg9.6,Py3.6. But the 
error log didn't say much. Any suggestions?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Move Google Cloud Build from Discovery API to Python Library
> 
>
> Key: AIRFLOW-6981
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6981
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: gcp
>Affects Versions: 2.0.0
>Reporter: Ryan Yuan
>Assignee: Ryan Yuan
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] ryanyuan commented on pull request #8575: [AIRFLOW-6981] Move Google Cloud Build from Discovery API to Python Library

2020-04-27 Thread GitBox


ryanyuan commented on pull request #8575:
URL: https://github.com/apache/airflow/pull/8575#issuecomment-620354199


   Cheers @potiuk. This time it failed at Quarantined:Pg9.6,Py3.6. But the 
error log didn't say much. Any suggestions?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] zhongjiajie commented on pull request #8534: fix: aws hook should work without conn id

2020-04-27 Thread GitBox


zhongjiajie commented on pull request #8534:
URL: https://github.com/apache/airflow/pull/8534#issuecomment-620349037


   Thanks @houqp 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on a change in pull request #8529: Split and improve BigQuery example DAG

2020-04-27 Thread GitBox


mik-laj commented on a change in pull request #8529:
URL: https://github.com/apache/airflow/pull/8529#discussion_r415907894



##
File path: 
airflow/providers/google/cloud/example_dags/example_bigquery_transfer.py
##
@@ -0,0 +1,82 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG for Google BigQuery service.
+"""
+import os
+
+from airflow import models
+from airflow.providers.google.cloud.operators.bigquery import (
+BigQueryCreateEmptyDatasetOperator, BigQueryCreateEmptyTableOperator, 
BigQueryDeleteDatasetOperator,
+)
+from airflow.providers.google.cloud.operators.bigquery_to_bigquery import 
BigQueryToBigQueryOperator
+from airflow.providers.google.cloud.operators.bigquery_to_gcs import 
BigQueryToGCSOperator
+from airflow.utils.dates import days_ago
+
+PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
+DATASET_NAME = os.environ.get("GCP_BIGQUERY_DATASET_NAME", 
"test_dataset_transfer")
+DATA_EXPORT_BUCKET_NAME = os.environ.get(
+"GCP_BIGQUERY_EXPORT_BUCKET_NAME", "test-bigquery-sample-data"
+)
+ORIGIN = "origin"
+TARGET = "target"
+
+default_args = {"start_date": days_ago(1)}
+
+with models.DAG(
+"example_bigquery_transfer",
+default_args=default_args,
+schedule_interval=None,  # Override to match your needs
+tags=["example"],
+) as dag:
+copy_selected_data = BigQueryToBigQueryOperator(

Review comment:
   I would prefer that each module have separate unit tests, a separate 
guide and separate system tests.  Otherwise it will be difficult for us to 
determine coverage. A few days ago one person found one test that was in the 
wrong file.
   https://github.com/apache/airflow/pull/8556/files





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] kennknowles commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

2020-04-27 Thread GitBox


kennknowles commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r416287771



##
File path: airflow/providers/google/cloud/hooks/dataflow.py
##
@@ -93,15 +94,30 @@ def inner_wrapper(self: "DataflowHook", *args, **kwargs) -> 
RT:
 class DataflowJobStatus:
 """
 Helper class with Dataflow job statuses.
+Reference: 
https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState
 """
-JOB_STATE_DONE = "JOB_STATE_DONE"
+JOB_STATE_UNKNOWN = "JOB_STATE_UNKNOWN"
+JOB_STATE_STOPPED = "JOB_STATE_STOPPED"
 JOB_STATE_RUNNING = "JOB_STATE_RUNNING"
+JOB_STATE_DONE = "JOB_STATE_DONE"
 JOB_STATE_FAILED = "JOB_STATE_FAILED"
 JOB_STATE_CANCELLED = "JOB_STATE_CANCELLED"
+JOB_STATE_UPDATED = "JOB_STATE_UPDATED"
+JOB_STATE_DRAINING = "JOB_STATE_DRAINING"
+JOB_STATE_DRAINED = "JOB_STATE_DRAINED"
 JOB_STATE_PENDING = "JOB_STATE_PENDING"
-FAILED_END_STATES = {JOB_STATE_FAILED, JOB_STATE_CANCELLED}
-SUCCEEDED_END_STATES = {JOB_STATE_DONE}
-END_STATES = SUCCEEDED_END_STATES | FAILED_END_STATES
+JOB_STATE_CANCELLING = "JOB_STATE_CANCELLING"
+JOB_STATE_QUEUED = "JOB_STATE_QUEUED"
+FAILED_END_STATES = {JOB_STATE_FAILED, JOB_STATE_CANCELLED, 
JOB_STATE_STOPPED}
+SUCCEEDED_END_STATES = {JOB_STATE_DONE, JOB_STATE_UPDATED, 
JOB_STATE_DRAINED}
+TERMINAL_STATES = SUCCEEDED_END_STATES | FAILED_END_STATES

Review comment:
   I do not know airflow that well. I just tried to see how these variables 
were used. I missed the place where they actually affect the Airflow result. It 
is a good idea to let the user say what they expect, and then a failure can be 
anything else.
   
   Example: we have had real use cases where we deliberately cancel jobs we do 
not need anymore, and that can be success for streaming jobs.
   
   I think the only certain failed state is JOB_STATE_FAILED.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] aaltay commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

2020-04-27 Thread GitBox


aaltay commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r416284747



##
File path: airflow/providers/google/cloud/hooks/dataflow.py
##
@@ -783,6 +794,77 @@ def cancel_job(
 name=job_name,
 job_id=job_id,
 location=location,
-poll_sleep=self.poll_sleep
+poll_sleep=self.poll_sleep,
+num_retries=self.num_retries,
 )
 jobs_controller.cancel()
+
+@GoogleBaseHook.fallback_to_default_project_id
+def start_sql_job(
+self,
+job_name: str,
+query: str,
+options: Dict[str, Any],
+project_id: str,
+location: str = DEFAULT_DATAFLOW_LOCATION,
+on_new_job_id_callback: Optional[Callable[[str], None]] = None
+):
+"""
+Starts Dataflow SQL query.
+
+:param job_name: The unique name to assign to the Cloud Dataflow job.
+:type job_name: str
+:param query: The SQL query to execute.
+:type query: str
+:param options: Job parameters to be executed.
+For more information, look at:
+
`https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query
+`__
+command reference
+:param location: The location of the Dataflow job (for example 
europe-west1)
+:type location: str
+:param project_id: The ID of the GCP project that owns the job.
+If set to ``None`` or missing, the default project_id from the GCP 
connection is used.
+:type project_id: Optional[str]
+:param on_new_job_id_callback: Callback called when the job ID is 
known.
+:type on_new_job_id_callback: callable
+:return: the new job object
+"""
+cmd = [
+'gcloud',
+'beta',
+'dataflow',
+'sql',
+'query',
+query,
+f'--project={project_id}',
+'--format=value(job.id)',
+f'--job-name={job_name}',
+f'--region={location}',
+*(self._options_to_args(options))
+]
+self.log.info("Executing command: %s", " ".join([shlex.quote(c) for c 
in cmd]))
+with self.provide_authorized_gcloud():

Review comment:
   Is this really required if it is only for logs? subprocess.run does not 
need to escape them anyway.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj opened a new pull request #8597: Fix example for Local Filesystem Secrets Backend

2020-04-27 Thread GitBox


mik-laj opened a new pull request #8597:
URL: https://github.com/apache/airflow/pull/8597


   ---
   Make sure to mark the boxes below before creating PR: [x]
   
   - [X] Description above provides context of the change
   - [X] Unit tests coverage for changes (not needed for documentation changes)
   - [X] Target Github ISSUE in description if exists
   - [X] Commits follow "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)"
   - [X] Relevant documentation is updated including usage instructions.
   - [X] I will engage committers as explained in [Contribution Workflow 
Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in 
[UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request 
Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)
 for more information.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj opened a new pull request #8596: Add Local Filesystem Secret Backend (v1-10)

2020-04-27 Thread GitBox


mik-laj opened a new pull request #8596:
URL: https://github.com/apache/airflow/pull/8596


   Backport: https://github.com/apache/airflow/pull/8436
   ---
   Make sure to mark the boxes below before creating PR: [x]
   
   - [X] Description above provides context of the change
   - [X] Unit tests coverage for changes (not needed for documentation changes)
   - [X] Target Github ISSUE in description if exists
   - [X] Commits follow "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)"
   - [X] Relevant documentation is updated including usage instructions.
   - [X] I will engage committers as explained in [Contribution Workflow 
Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in 
[UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request 
Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)
 for more information.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] zhongjiajie commented on pull request #8591: Add http system test

2020-04-27 Thread GitBox


zhongjiajie commented on pull request #8591:
URL: https://github.com/apache/airflow/pull/8591#issuecomment-620322452


   I think it's failed due to
   ```log
   2020-04-27T18:10:16.7233125Z looking for now-outdated files... 
none found
   2020-04-27T18:10:16.9090056Z pickling environment... done
   2020-04-27T18:10:16.9090564Z checking consistency... 
/opt/airflow/docs/howto/operator/http/http.rst: WARNING: 
document isn't included in any toctree
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] zhongjiajie commented on pull request #8591: Add http system test

2020-04-27 Thread GitBox


zhongjiajie commented on pull request #8591:
URL: https://github.com/apache/airflow/pull/8591#issuecomment-620321096


   > @kaxil fixed the doc issues but wait for ci before you merge please 😁 
..did not test the docs. I also changed the task var names and I hope I 
refactored it correctly. (if there are issues I am gonna fix it tomorrow :))
   
   FYI, still failed 😢 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] dimberman opened a new pull request #8595: test pulling docker iamge from github

2020-04-27 Thread GitBox


dimberman opened a new pull request #8595:
URL: https://github.com/apache/airflow/pull/8595


   ---
   Make sure to mark the boxes below before creating PR: [x]
   
   - [ ] Description above provides context of the change
   - [ ] Unit tests coverage for changes (not needed for documentation changes)
   - [ ] Target Github ISSUE in description if exists
   - [ ] Commits follow "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)"
   - [ ] Relevant documentation is updated including usage instructions.
   - [ ] I will engage committers as explained in [Contribution Workflow 
Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in 
[UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request 
Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)
 for more information.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on a change in pull request #8531: Support all RuntimeEnvironment parameters in DataflowTemplatedJobStartOperator

2020-04-27 Thread GitBox


mik-laj commented on a change in pull request #8531:
URL: https://github.com/apache/airflow/pull/8531#discussion_r416195651



##
File path: airflow/providers/google/cloud/hooks/dataflow.py
##
@@ -548,23 +554,17 @@ def start_template_dataflow(
 :type location: str
 """
 name = self._build_dataflow_job_name(job_name, append_job_name)
-# Builds RuntimeEnvironment from variables dictionary
-# 
https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment
-environment = {}
-for key in ['numWorkers', 'maxWorkers', 'zone', 'serviceAccountEmail',
-'tempLocation', 'bypassTempDirValidation', 'machineType',
-'additionalExperiments', 'network', 'subnetwork', 
'additionalUserLabels']:
-if key in variables:
-environment.update({key: variables[key]})
-body = {"jobName": name,
-"parameters": parameters,
-"environment": environment}
+
 service = self.get_conn()
 request = service.projects().locations().templates().launch(  # 
pylint: disable=no-member
 projectId=project_id,
 location=location,
 gcsPath=dataflow_template,
-body=body
+body={
+"jobName": name,
+"parameters": parameters,
+"environment": variables

Review comment:
   I want to change names for more parameters in many places. I will 
probably do it with a special decorator that will allow me to do it cleverly.
   ```
   @rename_parameter({'variables': 'environment'})
   def start_template_dataflow(
   self,
   job_name: str,
   variables: Dict,
   parameters: Dict,
   dataflow_template: str,
   project_id: str,
   append_job_name: bool = True,
   on_new_job_id_callback: Optional[Callable[[str], None]] = None,
   location: str = DEFAULT_DATAFLOW_LOCATION
   ) -> Dict:
   ```
   That way, we won't have so much outdated code in the repository.
   
   This change from a perspective will be much more problematic and I would 
like to test it in various IDEs, etc
   ![Screenshot 2020-04-28 at 00 31 
57](https://user-images.githubusercontent.com/12058428/80427302-bee93980-88e7-11ea-97bb-a8b0bdbec6f2.png)
   I would like to get similar messages when the user updates the operator.
   
   It looks simple but still needs testing and I will do it in a separate PR.
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] dinigo edited a comment on issue #8585: Integrate Google Analytics Reporting API from plugin

2020-04-27 Thread GitBox


dinigo edited a comment on issue #8585:
URL: https://github.com/apache/airflow/issues/8585#issuecomment-620267476


   I just checked the Orchestra project, which seems to be a "plug'n play" 
version of the Google operators we have in the master (I suppose it will be 
obsolete once the operators backport is officially released from 
apache-airflow). As it is basically a copy of the code in 
airflow/providers/google it also lacks this API.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] dinigo commented on issue #8585: Integrate Google Analytics Reporting API from plugin

2020-04-27 Thread GitBox


dinigo commented on issue #8585:
URL: https://github.com/apache/airflow/issues/8585#issuecomment-620267476


   I just checked the Orchestra project, which seems to be a "plug'n play" 
version of the Google operators we have in the master (I suppose it will be 
obsolete once the operators backport is officially released from apache-airflow)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

2020-04-27 Thread GitBox


mik-laj commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r416189250



##
File path: airflow/providers/google/cloud/hooks/dataflow.py
##
@@ -783,6 +794,77 @@ def cancel_job(
 name=job_name,
 job_id=job_id,
 location=location,
-poll_sleep=self.poll_sleep
+poll_sleep=self.poll_sleep,
+num_retries=self.num_retries,
 )
 jobs_controller.cancel()
+
+@GoogleBaseHook.fallback_to_default_project_id
+def start_sql_job(
+self,
+job_name: str,
+query: str,
+options: Dict[str, Any],
+project_id: str,
+location: str = DEFAULT_DATAFLOW_LOCATION,
+on_new_job_id_callback: Optional[Callable[[str], None]] = None
+):
+"""
+Starts Dataflow SQL query.
+
+:param job_name: The unique name to assign to the Cloud Dataflow job.
+:type job_name: str
+:param query: The SQL query to execute.
+:type query: str
+:param options: Job parameters to be executed.
+For more information, look at:
+
`https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query
+`__
+command reference
+:param location: The location of the Dataflow job (for example 
europe-west1)
+:type location: str
+:param project_id: The ID of the GCP project that owns the job.
+If set to ``None`` or missing, the default project_id from the GCP 
connection is used.
+:type project_id: Optional[str]
+:param on_new_job_id_callback: Callback called when the job ID is 
known.
+:type on_new_job_id_callback: callable
+:return: the new job object
+"""
+cmd = [
+'gcloud',
+'beta',
+'dataflow',
+'sql',
+'query',
+query,
+f'--project={project_id}',
+'--format=value(job.id)',
+f'--job-name={job_name}',
+f'--region={location}',
+*(self._options_to_args(options))
+]
+self.log.info("Executing command: %s", " ".join([shlex.quote(c) for c 
in cmd]))
+with self.provide_authorized_gcloud():

Review comment:
   These logs are available in Airflow Web UI, so a normal user can easily 
access them.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] dinigo commented on issue #8585: Integrate Google Analytics Reporting API from plugin

2020-04-27 Thread GitBox


dinigo commented on issue #8585:
URL: https://github.com/apache/airflow/issues/8585#issuecomment-620265270


   No, we lack the reporting API that lets you compose a report and download a 
dataset for this report.
   
   It's the easiest way to take Data out of Google Analytics. it was there long 
before  the BigQiery export so a lot of business logic for a lot of companies 
relay in this export. I've seen it "Cron"ified very frequently.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

2020-04-27 Thread GitBox


mik-laj commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r416187351



##
File path: airflow/providers/google/cloud/hooks/dataflow.py
##
@@ -783,6 +794,77 @@ def cancel_job(
 name=job_name,
 job_id=job_id,
 location=location,
-poll_sleep=self.poll_sleep
+poll_sleep=self.poll_sleep,
+num_retries=self.num_retries,
 )
 jobs_controller.cancel()
+
+@GoogleBaseHook.fallback_to_default_project_id
+def start_sql_job(
+self,
+job_name: str,
+query: str,
+options: Dict[str, Any],
+project_id: str,
+location: str = DEFAULT_DATAFLOW_LOCATION,
+on_new_job_id_callback: Optional[Callable[[str], None]] = None
+):
+"""
+Starts Dataflow SQL query.
+
+:param job_name: The unique name to assign to the Cloud Dataflow job.
+:type job_name: str
+:param query: The SQL query to execute.
+:type query: str
+:param options: Job parameters to be executed.
+For more information, look at:
+
`https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query
+`__
+command reference
+:param location: The location of the Dataflow job (for example 
europe-west1)
+:type location: str
+:param project_id: The ID of the GCP project that owns the job.
+If set to ``None`` or missing, the default project_id from the GCP 
connection is used.
+:type project_id: Optional[str]
+:param on_new_job_id_callback: Callback called when the job ID is 
known.
+:type on_new_job_id_callback: callable
+:return: the new job object
+"""
+cmd = [
+'gcloud',
+'beta',
+'dataflow',
+'sql',
+'query',
+query,
+f'--project={project_id}',
+'--format=value(job.id)',
+f'--job-name={job_name}',
+f'--region={location}',
+*(self._options_to_args(options))
+]
+self.log.info("Executing command: %s", " ".join([shlex.quote(c) for c 
in cmd]))
+with self.provide_authorized_gcloud():

Review comment:
   This is only for logs. I used it to test this operator.  A normal user 
will not copy it, but it may be helpful to him for debugging only.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on a change in pull request #8550: Add DataflowStartFlexTemplateOperator

2020-04-27 Thread GitBox


mik-laj commented on a change in pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#discussion_r416184937



##
File path: tests/providers/google/cloud/operators/test_dataflow_system.py
##
@@ -26,4 +37,198 @@
 class CloudDataflowExampleDagsSystemTest(GoogleSystemTest):
 @provide_gcp_context(GCP_DATAFLOW_KEY)
 def test_run_example_dag_function(self):
-self.run_dag('example_gcp_dataflow', CLOUD_DAG_FOLDER)
+self.run_dag("example_gcp_dataflow", CLOUD_DAG_FOLDER)
+
+
+GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
+GCR_FLEX_TEMPLATE_IMAGE = 
f"gcr.io/{GCP_PROJECT_ID}/samples-dataflow-streaming-beam-sql:latest"
+
+# 
https://github.com/GoogleCloudPlatform/java-docs-samples/tree/954553c/dataflow/flex-templates/streaming_beam_sql
+GCS_TEMPLATE_PARTS = urlparse(GCS_FLEX_TEMPLATE_TEMPLATE_PATH)
+GCS_FLEX_TEMPLATE_BUCKET_NAME = GCS_TEMPLATE_PARTS.netloc
+
+
+EXAMPLE_FLEX_TEMPLATE_REPO = "GoogleCloudPlatform/java-docs-samples"
+EXAMPLE_FLEX_TEMPLATE_COMMIT = "deb0745be1d1ac1d133e1f0a7faa9413dbfbe5fe"
+EXAMPLE_FLEX_TEMPLATE_SUBDIR = "dataflow/flex-templates/streaming_beam_sql"
+
+
+@pytest.mark.backend("mysql", "postgres")
+@pytest.mark.credential_file(GCP_GCS_TRANSFER_KEY)
+class CloudDataflowExampleDagFlexTemplateJavagSystemTest(GoogleSystemTest):
+@provide_gcp_context(GCP_GCS_TRANSFER_KEY, 
project_id=GoogleSystemTest._project_id())
+def setUp(self) -> None:
+# Create a Cloud Storage bucket
+self.execute_cmd(["gsutil", "mb", 
f"gs://{GCS_FLEX_TEMPLATE_BUCKET_NAME}"])
+
+# Build image with pipeline
+with NamedTemporaryFile() as f:
+f.write(
+textwrap.dedent(
+"""\
+steps:

Review comment:
   The Cloud Build service will provide a new virtual machine for each 
build. Each build contains many steps. Each step is described by a container 
image. I personally would not worry about a large number of images, because 
after the build is completed, the virtual machine is deleted.
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on a change in pull request #8550: Add DataflowStartFlexTemplateOperator

2020-04-27 Thread GitBox


mik-laj commented on a change in pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#discussion_r416185616



##
File path: tests/providers/google/cloud/operators/test_dataflow_system.py
##
@@ -26,4 +37,198 @@
 class CloudDataflowExampleDagsSystemTest(GoogleSystemTest):
 @provide_gcp_context(GCP_DATAFLOW_KEY)
 def test_run_example_dag_function(self):
-self.run_dag('example_gcp_dataflow', CLOUD_DAG_FOLDER)
+self.run_dag("example_gcp_dataflow", CLOUD_DAG_FOLDER)
+
+
+GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
+GCR_FLEX_TEMPLATE_IMAGE = 
f"gcr.io/{GCP_PROJECT_ID}/samples-dataflow-streaming-beam-sql:latest"
+
+# 
https://github.com/GoogleCloudPlatform/java-docs-samples/tree/954553c/dataflow/flex-templates/streaming_beam_sql
+GCS_TEMPLATE_PARTS = urlparse(GCS_FLEX_TEMPLATE_TEMPLATE_PATH)
+GCS_FLEX_TEMPLATE_BUCKET_NAME = GCS_TEMPLATE_PARTS.netloc
+
+
+EXAMPLE_FLEX_TEMPLATE_REPO = "GoogleCloudPlatform/java-docs-samples"
+EXAMPLE_FLEX_TEMPLATE_COMMIT = "deb0745be1d1ac1d133e1f0a7faa9413dbfbe5fe"
+EXAMPLE_FLEX_TEMPLATE_SUBDIR = "dataflow/flex-templates/streaming_beam_sql"
+
+
+@pytest.mark.backend("mysql", "postgres")
+@pytest.mark.credential_file(GCP_GCS_TRANSFER_KEY)
+class CloudDataflowExampleDagFlexTemplateJavagSystemTest(GoogleSystemTest):
+@provide_gcp_context(GCP_GCS_TRANSFER_KEY, 
project_id=GoogleSystemTest._project_id())
+def setUp(self) -> None:
+# Create a Cloud Storage bucket
+self.execute_cmd(["gsutil", "mb", 
f"gs://{GCS_FLEX_TEMPLATE_BUCKET_NAME}"])
+
+# Build image with pipeline
+with NamedTemporaryFile() as f:
+f.write(
+textwrap.dedent(
+"""\
+steps:

Review comment:
   I removed some build steps. 
   
https://github.com/apache/airflow/pull/8550/commits/aa29389cac91f29f5f37b268fa61c0a3eaabd663
   
   In order to build this Docker image I used Google Cloud Build. The Cloud 
Build service will provide a new virtual machine for each build. Each build 
contains many steps. Each step is described by a Docker image. As a result of 
the build, a new Docker image is built.
   
   I personally would not worry about a large number of images, because after 
the build is completed, the virtual machine is deleted. It does not run on the 
local machine.
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on a change in pull request #8550: Add DataflowStartFlexTemplateOperator

2020-04-27 Thread GitBox


mik-laj commented on a change in pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#discussion_r416185412



##
File path: airflow/providers/google/cloud/operators/dataflow.py
##
@@ -406,6 +406,71 @@ def on_kill(self) -> None:
 self.hook.cancel_job(job_id=self.job_id, 
project_id=self.project_id)
 
 
+class DataflowStartFlexTemplateOperator(BaseOperator):
+"""
+Starts flex templates with the Dataflow  pipeline.
+
+:param body: The request body
+:param location: The location of the Dataflow job (for example 
europe-west1)
+:type location: str
+:param project_id: The ID of the GCP project that owns the job.
+If set to ``None`` or missing, the default project_id from the GCP 
connection is used.
+:type project_id: Optional[str]
+:param gcp_conn_id: The connection ID to use connecting to Google Cloud
+Platform.
+:type gcp_conn_id: str
+:param delegate_to: The account to impersonate, if any.
+For this to work, the service account making the request must have
+domain-wide delegation enabled.
+:type delegate_to: str
+"""
+
+template_fields = ["body", 'location', 'project_id', 'gcp_conn_id']
+
+@apply_defaults
+def __init__(
+self,
+body: Dict,
+location: str,
+project_id: Optional[str] = None,
+gcp_conn_id: str = 'google_cloud_default',
+delegate_to: Optional[str] = None,
+*args,
+**kwargs
+) -> None:
+super().__init__(*args, **kwargs)
+self.body = body
+self.location = location
+self.project_id = project_id
+self.gcp_conn_id = gcp_conn_id
+self.delegate_to = delegate_to
+self.job_id = None
+self.hook: Optional[DataflowHook] = None
+
+def execute(self, context):
+self.hook = DataflowHook(
+gcp_conn_id=self.gcp_conn_id,
+delegate_to=self.delegate_to,
+)
+
+def set_current_job_id(job_id):
+self.job_id = job_id
+
+job = self.hook.start_flex_template(
+body=self.body,
+location=self.location,
+project_id=self.project_id,
+on_new_job_id_callback=set_current_job_id,
+)
+
+return job
+
+def on_kill(self) -> None:
+self.log.info("On kill.")
+if self.job_id:
+self.hook.cancel_job(job_id=self.job_id, 
project_id=self.project_id)

Review comment:
   Good point. I will change it





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on a change in pull request #8550: Add DataflowStartFlexTemplateOperator

2020-04-27 Thread GitBox


mik-laj commented on a change in pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#discussion_r416184937



##
File path: tests/providers/google/cloud/operators/test_dataflow_system.py
##
@@ -26,4 +37,198 @@
 class CloudDataflowExampleDagsSystemTest(GoogleSystemTest):
 @provide_gcp_context(GCP_DATAFLOW_KEY)
 def test_run_example_dag_function(self):
-self.run_dag('example_gcp_dataflow', CLOUD_DAG_FOLDER)
+self.run_dag("example_gcp_dataflow", CLOUD_DAG_FOLDER)
+
+
+GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
+GCR_FLEX_TEMPLATE_IMAGE = 
f"gcr.io/{GCP_PROJECT_ID}/samples-dataflow-streaming-beam-sql:latest"
+
+# 
https://github.com/GoogleCloudPlatform/java-docs-samples/tree/954553c/dataflow/flex-templates/streaming_beam_sql
+GCS_TEMPLATE_PARTS = urlparse(GCS_FLEX_TEMPLATE_TEMPLATE_PATH)
+GCS_FLEX_TEMPLATE_BUCKET_NAME = GCS_TEMPLATE_PARTS.netloc
+
+
+EXAMPLE_FLEX_TEMPLATE_REPO = "GoogleCloudPlatform/java-docs-samples"
+EXAMPLE_FLEX_TEMPLATE_COMMIT = "deb0745be1d1ac1d133e1f0a7faa9413dbfbe5fe"
+EXAMPLE_FLEX_TEMPLATE_SUBDIR = "dataflow/flex-templates/streaming_beam_sql"
+
+
+@pytest.mark.backend("mysql", "postgres")
+@pytest.mark.credential_file(GCP_GCS_TRANSFER_KEY)
+class CloudDataflowExampleDagFlexTemplateJavagSystemTest(GoogleSystemTest):
+@provide_gcp_context(GCP_GCS_TRANSFER_KEY, 
project_id=GoogleSystemTest._project_id())
+def setUp(self) -> None:
+# Create a Cloud Storage bucket
+self.execute_cmd(["gsutil", "mb", 
f"gs://{GCS_FLEX_TEMPLATE_BUCKET_NAME}"])
+
+# Build image with pipeline
+with NamedTemporaryFile() as f:
+f.write(
+textwrap.dedent(
+"""\
+steps:

Review comment:
   The Cloud Build service will provide a new virtual machine for each 
build. Each build contains many steps. Each step is described by a container 
image. I personally would not worry about a large number of images, because 
after the build is completed, the virtual machine is deleted.
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on a change in pull request #8550: Add DataflowStartFlexTemplateOperator

2020-04-27 Thread GitBox


mik-laj commented on a change in pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#discussion_r416181912



##
File path: tests/providers/google/cloud/operators/test_dataflow_system.py
##
@@ -15,9 +15,20 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import os

Review comment:
   We always try to write system tests and integrations simultaneously to 
ensure the best integration reliability. Thanks to this, every person on my 
team can easily test every integration.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on a change in pull request #8550: Add DataflowStartFlexTemplateOperator

2020-04-27 Thread GitBox


mik-laj commented on a change in pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#discussion_r416180769



##
File path: tests/providers/google/cloud/operators/test_dataflow_system.py
##
@@ -26,4 +37,198 @@
 class CloudDataflowExampleDagsSystemTest(GoogleSystemTest):
 @provide_gcp_context(GCP_DATAFLOW_KEY)
 def test_run_example_dag_function(self):
-self.run_dag('example_gcp_dataflow', CLOUD_DAG_FOLDER)
+self.run_dag("example_gcp_dataflow", CLOUD_DAG_FOLDER)
+
+
+GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
+GCR_FLEX_TEMPLATE_IMAGE = 
f"gcr.io/{GCP_PROJECT_ID}/samples-dataflow-streaming-beam-sql:latest"
+
+# 
https://github.com/GoogleCloudPlatform/java-docs-samples/tree/954553c/dataflow/flex-templates/streaming_beam_sql
+GCS_TEMPLATE_PARTS = urlparse(GCS_FLEX_TEMPLATE_TEMPLATE_PATH)
+GCS_FLEX_TEMPLATE_BUCKET_NAME = GCS_TEMPLATE_PARTS.netloc
+
+
+EXAMPLE_FLEX_TEMPLATE_REPO = "GoogleCloudPlatform/java-docs-samples"
+EXAMPLE_FLEX_TEMPLATE_COMMIT = "deb0745be1d1ac1d133e1f0a7faa9413dbfbe5fe"
+EXAMPLE_FLEX_TEMPLATE_SUBDIR = "dataflow/flex-templates/streaming_beam_sql"
+
+
+@pytest.mark.backend("mysql", "postgres")
+@pytest.mark.credential_file(GCP_GCS_TRANSFER_KEY)
+class CloudDataflowExampleDagFlexTemplateJavagSystemTest(GoogleSystemTest):
+@provide_gcp_context(GCP_GCS_TRANSFER_KEY, 
project_id=GoogleSystemTest._project_id())
+def setUp(self) -> None:
+# Create a Cloud Storage bucket
+self.execute_cmd(["gsutil", "mb", 
f"gs://{GCS_FLEX_TEMPLATE_BUCKET_NAME}"])
+
+# Build image with pipeline
+with NamedTemporaryFile() as f:
+f.write(
+textwrap.dedent(
+"""\
+steps:
+- name: gcr.io/cloud-builders/git
+  args: ['clone', '$_EXAMPLE_REPO', 'repo_dir']
+- name: gcr.io/cloud-builders/git
+  args: ['checkout', '$_EXAMPLE_COMMIT']
+  dir: 'repo_dir'
+- name: alpine
+  entrypoint: /bin/sh
+  dir: 'repo_dir/$_EXAMPLE_SUBDIR'

Review comment:
   ```suggestion
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on a change in pull request #8550: Add DataflowStartFlexTemplateOperator

2020-04-27 Thread GitBox


mik-laj commented on a change in pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#discussion_r416180636



##
File path: tests/providers/google/cloud/operators/test_dataflow_system.py
##
@@ -26,4 +37,198 @@
 class CloudDataflowExampleDagsSystemTest(GoogleSystemTest):
 @provide_gcp_context(GCP_DATAFLOW_KEY)
 def test_run_example_dag_function(self):
-self.run_dag('example_gcp_dataflow', CLOUD_DAG_FOLDER)
+self.run_dag("example_gcp_dataflow", CLOUD_DAG_FOLDER)
+
+
+GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
+GCR_FLEX_TEMPLATE_IMAGE = 
f"gcr.io/{GCP_PROJECT_ID}/samples-dataflow-streaming-beam-sql:latest"
+
+# 
https://github.com/GoogleCloudPlatform/java-docs-samples/tree/954553c/dataflow/flex-templates/streaming_beam_sql
+GCS_TEMPLATE_PARTS = urlparse(GCS_FLEX_TEMPLATE_TEMPLATE_PATH)
+GCS_FLEX_TEMPLATE_BUCKET_NAME = GCS_TEMPLATE_PARTS.netloc
+
+
+EXAMPLE_FLEX_TEMPLATE_REPO = "GoogleCloudPlatform/java-docs-samples"
+EXAMPLE_FLEX_TEMPLATE_COMMIT = "deb0745be1d1ac1d133e1f0a7faa9413dbfbe5fe"
+EXAMPLE_FLEX_TEMPLATE_SUBDIR = "dataflow/flex-templates/streaming_beam_sql"
+
+
+@pytest.mark.backend("mysql", "postgres")
+@pytest.mark.credential_file(GCP_GCS_TRANSFER_KEY)
+class CloudDataflowExampleDagFlexTemplateJavagSystemTest(GoogleSystemTest):
+@provide_gcp_context(GCP_GCS_TRANSFER_KEY, 
project_id=GoogleSystemTest._project_id())
+def setUp(self) -> None:
+# Create a Cloud Storage bucket
+self.execute_cmd(["gsutil", "mb", 
f"gs://{GCS_FLEX_TEMPLATE_BUCKET_NAME}"])
+
+# Build image with pipeline
+with NamedTemporaryFile() as f:
+f.write(
+textwrap.dedent(
+"""\
+steps:
+- name: gcr.io/cloud-builders/git
+  args: ['clone', '$_EXAMPLE_REPO', 'repo_dir']
+- name: gcr.io/cloud-builders/git
+  args: ['checkout', '$_EXAMPLE_COMMIT']
+  dir: 'repo_dir'
+- name: alpine
+  entrypoint: /bin/sh
+  dir: 'repo_dir/$_EXAMPLE_SUBDIR'
+- name: maven
+  args: ["mvn", "clean", "package"]
+  dir: 'repo_dir/$_EXAMPLE_SUBDIR'
+- name: alpine
+  entrypoint: "/bin/sh"
+  args: ["-c", "ls -lh target/*.jar"]
+  dir: 'repo_dir/$_EXAMPLE_SUBDIR'

Review comment:
   ```suggestion
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] aaltay commented on a change in pull request #8531: Support all RuntimeEnvironment parameters in DataflowTemplatedJobStartOperator

2020-04-27 Thread GitBox


aaltay commented on a change in pull request #8531:
URL: https://github.com/apache/airflow/pull/8531#discussion_r416178253



##
File path: airflow/providers/google/cloud/hooks/dataflow.py
##
@@ -548,23 +554,17 @@ def start_template_dataflow(
 :type location: str
 """
 name = self._build_dataflow_job_name(job_name, append_job_name)
-# Builds RuntimeEnvironment from variables dictionary
-# 
https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment
-environment = {}
-for key in ['numWorkers', 'maxWorkers', 'zone', 'serviceAccountEmail',
-'tempLocation', 'bypassTempDirValidation', 'machineType',
-'additionalExperiments', 'network', 'subnetwork', 
'additionalUserLabels']:
-if key in variables:
-environment.update({key: variables[key]})
-body = {"jobName": name,
-"parameters": parameters,
-"environment": environment}
+
 service = self.get_conn()
 request = service.projects().locations().templates().launch(  # 
pylint: disable=no-member
 projectId=project_id,
 location=location,
 gcsPath=dataflow_template,
-body=body
+body={
+"jobName": name,
+"parameters": parameters,
+"environment": variables

Review comment:
   got it. thank you for cleaning things, and keeping them backward 
compatible.
   
   How about adding a new clean flag and marking the old one as depracated?

##
File path: airflow/providers/google/cloud/hooks/dataflow.py
##
@@ -548,23 +554,17 @@ def start_template_dataflow(
 :type location: str
 """
 name = self._build_dataflow_job_name(job_name, append_job_name)
-# Builds RuntimeEnvironment from variables dictionary
-# 
https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment
-environment = {}
-for key in ['numWorkers', 'maxWorkers', 'zone', 'serviceAccountEmail',
-'tempLocation', 'bypassTempDirValidation', 'machineType',
-'additionalExperiments', 'network', 'subnetwork', 
'additionalUserLabels']:
-if key in variables:
-environment.update({key: variables[key]})
-body = {"jobName": name,
-"parameters": parameters,
-"environment": environment}
+
 service = self.get_conn()
 request = service.projects().locations().templates().launch(  # 
pylint: disable=no-member
 projectId=project_id,
 location=location,
 gcsPath=dataflow_template,
-body=body
+body={
+"jobName": name,
+"parameters": parameters,
+"environment": variables

Review comment:
   got it. thank you for cleaning things, and keeping them backward 
compatible.
   
   How about adding a new clean flag and marking the old one as deprecated?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on a change in pull request #8531: Support all RuntimeEnvironment parameters in DataflowTemplatedJobStartOperator

2020-04-27 Thread GitBox


mik-laj commented on a change in pull request #8531:
URL: https://github.com/apache/airflow/pull/8531#discussion_r416172704



##
File path: airflow/providers/google/cloud/hooks/dataflow.py
##
@@ -548,23 +554,17 @@ def start_template_dataflow(
 :type location: str
 """
 name = self._build_dataflow_job_name(job_name, append_job_name)
-# Builds RuntimeEnvironment from variables dictionary
-# 
https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment
-environment = {}
-for key in ['numWorkers', 'maxWorkers', 'zone', 'serviceAccountEmail',
-'tempLocation', 'bypassTempDirValidation', 'machineType',
-'additionalExperiments', 'network', 'subnetwork', 
'additionalUserLabels']:
-if key in variables:
-environment.update({key: variables[key]})
-body = {"jobName": name,
-"parameters": parameters,
-"environment": environment}
+
 service = self.get_conn()
 request = service.projects().locations().templates().launch(  # 
pylint: disable=no-member
 projectId=project_id,
 location=location,
 gcsPath=dataflow_template,
-body=body
+body={
+"jobName": name,
+"parameters": parameters,
+"environment": variables

Review comment:
   This is problematic, but someone once created a parameter and called 
`variables`.  He used the same name elsewhere. Now I am trying to maintain 
backward compatibility. In the next PR I will try to change the names of the 
arguments, but this will require more work to maintain backward compatibility. 
   
   Formerly `variables` parameters contain environments + project id + region. 
It was similar to the native pipeline, where the region was also passed as an 
argument. `python pipeline.py --region=europe-west-1`.  However, this was 
changed when I introduced the changes required by the [GCP 
guidelines](https://docs.google.com/document/d/1_rTdJSLCt0eyrAylmmgYc3yZr-_h51fVlnvMmWqhCkY/edit).
  Now it contains only environment parameters 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] aaltay commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

2020-04-27 Thread GitBox


aaltay commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r416177435



##
File path: airflow/providers/google/cloud/hooks/dataflow.py
##
@@ -783,6 +794,77 @@ def cancel_job(
 name=job_name,
 job_id=job_id,
 location=location,
-poll_sleep=self.poll_sleep
+poll_sleep=self.poll_sleep,
+num_retries=self.num_retries,
 )
 jobs_controller.cancel()
+
+@GoogleBaseHook.fallback_to_default_project_id
+def start_sql_job(
+self,
+job_name: str,
+query: str,
+options: Dict[str, Any],
+project_id: str,
+location: str = DEFAULT_DATAFLOW_LOCATION,
+on_new_job_id_callback: Optional[Callable[[str], None]] = None
+):
+"""
+Starts Dataflow SQL query.
+
+:param job_name: The unique name to assign to the Cloud Dataflow job.
+:type job_name: str
+:param query: The SQL query to execute.
+:type query: str
+:param options: Job parameters to be executed.
+For more information, look at:
+
`https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query
+`__
+command reference
+:param location: The location of the Dataflow job (for example 
europe-west1)
+:type location: str
+:param project_id: The ID of the GCP project that owns the job.
+If set to ``None`` or missing, the default project_id from the GCP 
connection is used.
+:type project_id: Optional[str]
+:param on_new_job_id_callback: Callback called when the job ID is 
known.
+:type on_new_job_id_callback: callable
+:return: the new job object
+"""
+cmd = [
+'gcloud',
+'beta',
+'dataflow',
+'sql',
+'query',
+query,
+f'--project={project_id}',
+'--format=value(job.id)',
+f'--job-name={job_name}',
+f'--region={location}',
+*(self._options_to_args(options))
+]
+self.log.info("Executing command: %s", " ".join([shlex.quote(c) for c 
in cmd]))
+with self.provide_authorized_gcloud():

Review comment:
   but this is only for logging? Do users normally copy paste these 
commands out of the logs?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] aaltay commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

2020-04-27 Thread GitBox


aaltay commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r416176973



##
File path: airflow/providers/google/cloud/hooks/dataflow.py
##
@@ -640,9 +659,8 @@ def start_python_dataflow(  # pylint: 
disable=too-many-arguments
 variables['job_name'] = name
 variables['region'] = location
 
-def label_formatter(labels_dict):
-return ['--labels={}={}'.format(key, value)
-for key, value in labels_dict.items()]
+if 'labels' in variables:
+variables['labels'] = ['{}={}'.format(key, value) for key, value 
in variables['labels'].items()]

Review comment:
   OK. Thank you for the explanation.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] aaltay commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

2020-04-27 Thread GitBox


aaltay commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r416176659



##
File path: airflow/providers/google/cloud/example_dags/example_dataflow_sql.py
##
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG for Google Cloud Dataflow service
+"""
+import os
+
+from airflow import models
+from airflow.providers.google.cloud.operators.dataflow import 
DataflowStartSqlJobOperator
+from airflow.utils.dates import days_ago
+
+GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
+
+BQ_SQL_DATASET = os.environ.get('DATAFLOW_BQ_SQL_DATASET', 
'airflow_dataflow_samples')
+DATAFLOW_SQL_JOB_NAME = os.environ.get('DATAFLOW_SQL_JOB_NAME', "dataflow-sql")
+DATAFLOW_SQL_LOCATION = os.environ.get('DATAFLOW_SQL_LOCATION', 'us-west1')
+
+with models.DAG(
+dag_id="example_gcp_dataflow_sql",
+default_args={
+"start_date": days_ago(1),
+},
+schedule_interval=None,  # Override to match your needs
+) as dag_sql:
+start_sql = DataflowStartSqlJobOperator(
+task_id="start_sql_query",
+job_name=DATAFLOW_SQL_JOB_NAME,
+query=f"""
+SELECT
+sales_region as sales_region,
+count(state_id) as count_state
+FROM
+bigquery.table.`{GCP_PROJECT_ID}`.beam_samples.beam_table

Review comment:
   @ibzib - is there a public dataset that could be used?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] aaltay commented on a change in pull request #8550: Add DataflowStartFlexTemplateOperator

2020-04-27 Thread GitBox


aaltay commented on a change in pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#discussion_r416176147



##
File path: tests/providers/google/cloud/operators/test_dataflow_system.py
##
@@ -26,4 +37,198 @@
 class CloudDataflowExampleDagsSystemTest(GoogleSystemTest):
 @provide_gcp_context(GCP_DATAFLOW_KEY)
 def test_run_example_dag_function(self):
-self.run_dag('example_gcp_dataflow', CLOUD_DAG_FOLDER)
+self.run_dag("example_gcp_dataflow", CLOUD_DAG_FOLDER)
+
+
+GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
+GCR_FLEX_TEMPLATE_IMAGE = 
f"gcr.io/{GCP_PROJECT_ID}/samples-dataflow-streaming-beam-sql:latest"
+
+# 
https://github.com/GoogleCloudPlatform/java-docs-samples/tree/954553c/dataflow/flex-templates/streaming_beam_sql
+GCS_TEMPLATE_PARTS = urlparse(GCS_FLEX_TEMPLATE_TEMPLATE_PATH)
+GCS_FLEX_TEMPLATE_BUCKET_NAME = GCS_TEMPLATE_PARTS.netloc
+
+
+EXAMPLE_FLEX_TEMPLATE_REPO = "GoogleCloudPlatform/java-docs-samples"
+EXAMPLE_FLEX_TEMPLATE_COMMIT = "deb0745be1d1ac1d133e1f0a7faa9413dbfbe5fe"
+EXAMPLE_FLEX_TEMPLATE_SUBDIR = "dataflow/flex-templates/streaming_beam_sql"
+
+
+@pytest.mark.backend("mysql", "postgres")
+@pytest.mark.credential_file(GCP_GCS_TRANSFER_KEY)
+class CloudDataflowExampleDagFlexTemplateJavagSystemTest(GoogleSystemTest):
+@provide_gcp_context(GCP_GCS_TRANSFER_KEY, 
project_id=GoogleSystemTest._project_id())
+def setUp(self) -> None:
+# Create a Cloud Storage bucket
+self.execute_cmd(["gsutil", "mb", 
f"gs://{GCS_FLEX_TEMPLATE_BUCKET_NAME}"])
+
+# Build image with pipeline
+with NamedTemporaryFile() as f:
+f.write(
+textwrap.dedent(
+"""\
+steps:

Review comment:
   Ack. Does it required this many containers in the image?
   
   I am not very familiar. Is this a VM image with containers?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on a change in pull request #8531: Support all RuntimeEnvironment parameters in DataflowTemplatedJobStartOperator

2020-04-27 Thread GitBox


mik-laj commented on a change in pull request #8531:
URL: https://github.com/apache/airflow/pull/8531#discussion_r416176087



##
File path: airflow/providers/google/cloud/operators/dataflow.py
##
@@ -377,6 +394,7 @@ def __init__(
 self.poll_sleep = poll_sleep
 self.job_id = None
 self.hook: Optional[DataflowHook] = None
+self.options.update(dataflow_default_options)

Review comment:
   Good point. I will fix it. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on a change in pull request #8531: Support all RuntimeEnvironment parameters in DataflowTemplatedJobStartOperator

2020-04-27 Thread GitBox


mik-laj commented on a change in pull request #8531:
URL: https://github.com/apache/airflow/pull/8531#discussion_r416175888



##
File path: tests/providers/google/cloud/hooks/test_dataflow.py
##
@@ -52,32 +54,32 @@
 'stagingLocation': 'gs://test/staging',
 'labels': {'foo': 'bar'}
 }
-DATAFLOW_VARIABLES_TEMPLATE = {
-'project': 'test',
-'tempLocation': 'gs://test/temp',
-'zone': 'us-central1-f'
-}
 RUNTIME_ENV = {
-'tempLocation': 'gs://test/temp',
-'zone': 'us-central1-f',
-'numWorkers': 2,
-'maxWorkers': 10,
-'serviceAccountEmail': 'test@apache.airflow',
-'machineType': 'n1-standard-1',
 'additionalExperiments': ['exp_flag1', 'exp_flag2'],
-'network': 'default',
-'subnetwork': 'regions/REGION/subnetworks/SUBNETWORK',
 'additionalUserLabels': {
 'name': 'wrench',
 'mass': '1.3kg',
 'count': '3'
-}
+},
+'bypassTempDirValidation': {},
+'ipConfiguration': 'WORKER_IP_PRIVATE',
+'kmsKeyName': (
+
'projects/TEST_PROJECT_ID/locations/TEST_LOCATIONS/keyRings/TEST_KEYRING/cryptoKeys/TEST_CRYPTOKEYS'
+),
+'maxWorkers': 10,
+'network': 'default',
+'numWorkers': 2,
+'serviceAccountEmail': 'test@apache.airflow',
+'subnetwork': 'regions/REGION/subnetworks/SUBNETWORK',
+'tempLocation': 'gs://test/temp',
+'workerRegion': "test-region",
+'workerZone': 'test-zone',
+'zone': 'us-central1-f',
+'machineType': 'n1-standard-1',

Review comment:
   IP Configuration is also tested on line 65. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on a change in pull request #8531: Support all RuntimeEnvironment parameters in DataflowTemplatedJobStartOperator

2020-04-27 Thread GitBox


mik-laj commented on a change in pull request #8531:
URL: https://github.com/apache/airflow/pull/8531#discussion_r416172704



##
File path: airflow/providers/google/cloud/hooks/dataflow.py
##
@@ -548,23 +554,17 @@ def start_template_dataflow(
 :type location: str
 """
 name = self._build_dataflow_job_name(job_name, append_job_name)
-# Builds RuntimeEnvironment from variables dictionary
-# 
https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment
-environment = {}
-for key in ['numWorkers', 'maxWorkers', 'zone', 'serviceAccountEmail',
-'tempLocation', 'bypassTempDirValidation', 'machineType',
-'additionalExperiments', 'network', 'subnetwork', 
'additionalUserLabels']:
-if key in variables:
-environment.update({key: variables[key]})
-body = {"jobName": name,
-"parameters": parameters,
-"environment": environment}
+
 service = self.get_conn()
 request = service.projects().locations().templates().launch(  # 
pylint: disable=no-member
 projectId=project_id,
 location=location,
 gcsPath=dataflow_template,
-body=body
+body={
+"jobName": name,
+"parameters": parameters,
+"environment": variables

Review comment:
   This is problematic, but someone once created a parameter and called 
`variables`.  He used the same name elsewhere. Now I am trying to maintain 
backward compatibility. In the next PR I will try to change the names of the 
arguments, but this will require more work to maintain backward compatibility. 
   Formerly `variables` parameters contain environments + project id + region. 
It was similar to the native pipeline, where the region was also passed as an 
argument. `python pipeline.py --region=europe-west-1`. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] ashb opened a new pull request #8594: Reduce duplication of functions called in ci scripts

2020-04-27 Thread GitBox


ashb opened a new pull request #8594:
URL: https://github.com/apache/airflow/pull/8594


   Every place we called rebuild_ci_image_if_needed we were also calling
   preprepare_ci_build -- there's no need.
   
   ---
   Make sure to mark the boxes below before creating PR: [x]
   
   - [x] Description above provides context of the change
   - [x] Unit tests coverage for changes (not needed for documentation changes)
   - [x] Target Github ISSUE in description if exists
   - [x] Commits follow "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)"
   - [x] Relevant documentation is updated including usage instructions.
   - [x] I will engage committers as explained in [Contribution Workflow 
Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in 
[UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request 
Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)
 for more information.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on a change in pull request #8531: Support all RuntimeEnvironment parameters in DataflowTemplatedJobStartOperator

2020-04-27 Thread GitBox


mik-laj commented on a change in pull request #8531:
URL: https://github.com/apache/airflow/pull/8531#discussion_r416172704



##
File path: airflow/providers/google/cloud/hooks/dataflow.py
##
@@ -548,23 +554,17 @@ def start_template_dataflow(
 :type location: str
 """
 name = self._build_dataflow_job_name(job_name, append_job_name)
-# Builds RuntimeEnvironment from variables dictionary
-# 
https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment
-environment = {}
-for key in ['numWorkers', 'maxWorkers', 'zone', 'serviceAccountEmail',
-'tempLocation', 'bypassTempDirValidation', 'machineType',
-'additionalExperiments', 'network', 'subnetwork', 
'additionalUserLabels']:
-if key in variables:
-environment.update({key: variables[key]})
-body = {"jobName": name,
-"parameters": parameters,
-"environment": environment}
+
 service = self.get_conn()
 request = service.projects().locations().templates().launch(  # 
pylint: disable=no-member
 projectId=project_id,
 location=location,
 gcsPath=dataflow_template,
-body=body
+body={
+"jobName": name,
+"parameters": parameters,
+"environment": variables

Review comment:
   This is problematic, but someone once created a parameter and called it 
variables. He used the same name elsewhere. Now I am trying to maintain 
backward compatibility. In the next PR I will try to correct the names of the 
arguments, but this will require more work to maintain backward compatibility.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on a change in pull request #8531: Support all RuntimeEnvironment parameters in DataflowTemplatedJobStartOperator

2020-04-27 Thread GitBox


mik-laj commented on a change in pull request #8531:
URL: https://github.com/apache/airflow/pull/8531#discussion_r416171580



##
File path: airflow/providers/google/cloud/hooks/dataflow.py
##
@@ -532,7 +532,13 @@ def start_template_dataflow(
 
 :param job_name: The name of the job.

Review comment:
   Yes. It only affects templates.  Native pipelines were not affected by 
this issue.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

2020-04-27 Thread GitBox


mik-laj commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r416169086



##
File path: airflow/providers/google/cloud/hooks/dataflow.py
##
@@ -93,15 +94,30 @@ def inner_wrapper(self: "DataflowHook", *args, **kwargs) -> 
RT:
 class DataflowJobStatus:
 """
 Helper class with Dataflow job statuses.
+Reference: 
https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState
 """
-JOB_STATE_DONE = "JOB_STATE_DONE"
+JOB_STATE_UNKNOWN = "JOB_STATE_UNKNOWN"
+JOB_STATE_STOPPED = "JOB_STATE_STOPPED"
 JOB_STATE_RUNNING = "JOB_STATE_RUNNING"
+JOB_STATE_DONE = "JOB_STATE_DONE"
 JOB_STATE_FAILED = "JOB_STATE_FAILED"
 JOB_STATE_CANCELLED = "JOB_STATE_CANCELLED"
+JOB_STATE_UPDATED = "JOB_STATE_UPDATED"
+JOB_STATE_DRAINING = "JOB_STATE_DRAINING"
+JOB_STATE_DRAINED = "JOB_STATE_DRAINED"
 JOB_STATE_PENDING = "JOB_STATE_PENDING"
-FAILED_END_STATES = {JOB_STATE_FAILED, JOB_STATE_CANCELLED}
-SUCCEEDED_END_STATES = {JOB_STATE_DONE}
-END_STATES = SUCCEEDED_END_STATES | FAILED_END_STATES
+JOB_STATE_CANCELLING = "JOB_STATE_CANCELLING"
+JOB_STATE_QUEUED = "JOB_STATE_QUEUED"
+FAILED_END_STATES = {JOB_STATE_FAILED, JOB_STATE_CANCELLED, 
JOB_STATE_STOPPED}
+SUCCEEDED_END_STATES = {JOB_STATE_DONE, JOB_STATE_UPDATED, 
JOB_STATE_DRAINED}
+TERMINAL_STATES = SUCCEEDED_END_STATES | FAILED_END_STATES

Review comment:
   Airflow does not have the ability to display full information about the 
status of the job in an external system. We only have two states - 
SUCCESS/FAILED.  What are you proposing then? Can the user specify expected 
end-states? 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] kaxil opened a new pull request #8593: Test deprecation warning for back-compat secrets

2020-04-27 Thread GitBox


kaxil opened a new pull request #8593:
URL: https://github.com/apache/airflow/pull/8593


   Based on https://github.com/apache/airflow/pull/8413#issuecomment-616535528
   
   Add contrib secrets to TestMovingCoreToContrib to test deprecation warning 
is raised
   
   ---
   Make sure to mark the boxes below before creating PR: [x]
   
   - [x] Description above provides context of the change
   - [x] Unit tests coverage for changes (not needed for documentation changes)
   - [x] Target Github ISSUE in description if exists
   - [x] Commits follow "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)"
   - [x] Relevant documentation is updated including usage instructions.
   - [x] I will engage committers as explained in [Contribution Workflow 
Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in 
[UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request 
Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)
 for more information.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

2020-04-27 Thread GitBox


mik-laj commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r416157606



##
File path: airflow/providers/google/cloud/example_dags/example_dataflow_sql.py
##
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG for Google Cloud Dataflow service
+"""
+import os
+
+from airflow import models
+from airflow.providers.google.cloud.operators.dataflow import 
DataflowStartSqlJobOperator
+from airflow.utils.dates import days_ago
+
+GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
+
+BQ_SQL_DATASET = os.environ.get('DATAFLOW_BQ_SQL_DATASET', 
'airflow_dataflow_samples')
+DATAFLOW_SQL_JOB_NAME = os.environ.get('DATAFLOW_SQL_JOB_NAME', "dataflow-sql")
+DATAFLOW_SQL_LOCATION = os.environ.get('DATAFLOW_SQL_LOCATION', 'us-west1')
+
+with models.DAG(
+dag_id="example_gcp_dataflow_sql",
+default_args={
+"start_date": days_ago(1),
+},
+schedule_interval=None,  # Override to match your needs
+) as dag_sql:
+start_sql = DataflowStartSqlJobOperator(
+task_id="start_sql_query",
+job_name=DATAFLOW_SQL_JOB_NAME,
+query=f"""
+SELECT
+sales_region as sales_region,
+count(state_id) as count_state
+FROM
+bigquery.table.`{GCP_PROJECT_ID}`.beam_samples.beam_table

Review comment:
   We strive to maintain a high level of reliability for GCP integration. 
All integrations that are developed by my team must have system tests. System 
tests are required criteria for internal review. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] ashb edited a comment on pull request #8499: Optimize GitHub CI configuration

2020-04-27 Thread GitBox


ashb edited a comment on pull request #8499:
URL: https://github.com/apache/airflow/pull/8499#issuecomment-620246643


   Speaking of Semantic commits - PR title != commit title here :grin:
   
   The only problem with that is some people less familiar with git aren't 
comfortable editing commits and force pushing, so the approach of commitzen to 
have a pending check (but not a failing one) looks quite nice.
   
   Certainly for single commit PRs anyway, the title should match the PR title, 
or at least prompt us committers to notice that it doesn't
   
   cc @turbaszek 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] ashb commented on pull request #8499: Optimize GitHub CI configuration

2020-04-27 Thread GitBox


ashb commented on pull request #8499:
URL: https://github.com/apache/airflow/pull/8499#issuecomment-620246643


   Speaking of Semantic commits - PR title != commit title here :grin:
   
   The only problem with that is some people less familiar with git aren't 
comfortable editing commits and force pushing, so the approach of commitzen to 
have a pending check (but not a failing one) looks quite nice.
   
   Certainly for single commit PRs anyway, the title should match the PR title, 
or at least prompt us committers to notice that it doesn't



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] aaltay commented on a change in pull request #8531: Support all RuntimeEnvironment parameters in DataflowTemplatedJobStartOperator

2020-04-27 Thread GitBox


aaltay commented on a change in pull request #8531:
URL: https://github.com/apache/airflow/pull/8531#discussion_r416140436



##
File path: airflow/providers/google/cloud/hooks/dataflow.py
##
@@ -548,23 +554,17 @@ def start_template_dataflow(
 :type location: str
 """
 name = self._build_dataflow_job_name(job_name, append_job_name)
-# Builds RuntimeEnvironment from variables dictionary
-# 
https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment
-environment = {}
-for key in ['numWorkers', 'maxWorkers', 'zone', 'serviceAccountEmail',
-'tempLocation', 'bypassTempDirValidation', 'machineType',
-'additionalExperiments', 'network', 'subnetwork', 
'additionalUserLabels']:
-if key in variables:
-environment.update({key: variables[key]})
-body = {"jobName": name,
-"parameters": parameters,
-"environment": environment}
+
 service = self.get_conn()
 request = service.projects().locations().templates().launch(  # 
pylint: disable=no-member
 projectId=project_id,
 location=location,
 gcsPath=dataflow_template,
-body=body
+body={
+"jobName": name,
+"parameters": parameters,
+"environment": variables

Review comment:
   What is the difference between parameters and variables?

##
File path: airflow/providers/google/cloud/operators/dataflow.py
##
@@ -377,6 +394,7 @@ def __init__(
 self.poll_sleep = poll_sleep
 self.job_id = None
 self.hook: Optional[DataflowHook] = None
+self.options.update(dataflow_default_options)

Review comment:
   Would not this override user provided options with default options?

##
File path: airflow/providers/google/cloud/hooks/dataflow.py
##
@@ -532,7 +532,13 @@ def start_template_dataflow(
 
 :param job_name: The name of the job.

Review comment:
   Is this bug ( #8300) only affecting templates?

##
File path: tests/providers/google/cloud/hooks/test_dataflow.py
##
@@ -52,32 +54,32 @@
 'stagingLocation': 'gs://test/staging',
 'labels': {'foo': 'bar'}
 }
-DATAFLOW_VARIABLES_TEMPLATE = {
-'project': 'test',
-'tempLocation': 'gs://test/temp',
-'zone': 'us-central1-f'
-}
 RUNTIME_ENV = {
-'tempLocation': 'gs://test/temp',
-'zone': 'us-central1-f',
-'numWorkers': 2,
-'maxWorkers': 10,
-'serviceAccountEmail': 'test@apache.airflow',
-'machineType': 'n1-standard-1',
 'additionalExperiments': ['exp_flag1', 'exp_flag2'],
-'network': 'default',
-'subnetwork': 'regions/REGION/subnetworks/SUBNETWORK',
 'additionalUserLabels': {
 'name': 'wrench',
 'mass': '1.3kg',
 'count': '3'
-}
+},
+'bypassTempDirValidation': {},
+'ipConfiguration': 'WORKER_IP_PRIVATE',
+'kmsKeyName': (
+
'projects/TEST_PROJECT_ID/locations/TEST_LOCATIONS/keyRings/TEST_KEYRING/cryptoKeys/TEST_CRYPTOKEYS'
+),
+'maxWorkers': 10,
+'network': 'default',
+'numWorkers': 2,
+'serviceAccountEmail': 'test@apache.airflow',
+'subnetwork': 'regions/REGION/subnetworks/SUBNETWORK',
+'tempLocation': 'gs://test/temp',
+'workerRegion': "test-region",
+'workerZone': 'test-zone',
+'zone': 'us-central1-f',
+'machineType': 'n1-standard-1',

Review comment:
   maybe test private ip flag, since it was specifically mentioned in the 
issue.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

2020-04-27 Thread GitBox


mik-laj commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r416146984



##
File path: airflow/providers/google/cloud/hooks/dataflow.py
##
@@ -640,9 +659,8 @@ def start_python_dataflow(  # pylint: 
disable=too-many-arguments
 variables['job_name'] = name
 variables['region'] = location
 
-def label_formatter(labels_dict):
-return ['--labels={}={}'.format(key, value)
-for key, value in labels_dict.items()]
+if 'labels' in variables:
+variables['labels'] = ['{}={}'.format(key, value) for key, value 
in variables['labels'].items()]

Review comment:
   It depends on the SDK that is used.  These two SDK require different 
argument formats.
   
   We have three related methods.
   * start_python_dataflow  
   * start_java_dataflow
   * _start_dataflow
   
   The first two methods are public and dependent on the SDK. This is 
responsible for actions regarding a specific SDK e.g. environment preparation. 
   _start_dataflow is an internal method. This starts the system process and 
supervises execution. I have the impression that this separation is helpful.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

2020-04-27 Thread GitBox


mik-laj commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r416146984



##
File path: airflow/providers/google/cloud/hooks/dataflow.py
##
@@ -640,9 +659,8 @@ def start_python_dataflow(  # pylint: 
disable=too-many-arguments
 variables['job_name'] = name
 variables['region'] = location
 
-def label_formatter(labels_dict):
-return ['--labels={}={}'.format(key, value)
-for key, value in labels_dict.items()]
+if 'labels' in variables:
+variables['labels'] = ['{}={}'.format(key, value) for key, value 
in variables['labels'].items()]

Review comment:
   It depends on the SDK that is used.  These two SDK require different 
argument formats.
   
   We have three related methods.
   * start_python_dataflow  
   * start_java_dataflow
   * _start_dataflow
   
   The first two methods are public and dependent on the SDK. They are 
responsible for introducing changes dependent on the SDK, e.g. environment 
preparation. 
   _start_dataflow is an internal method. This starts the system process and 
supervises execution. I have the impression that this separation is helpful.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on a change in pull request #8550: Add DataflowStartFlexTemplateOperator

2020-04-27 Thread GitBox


mik-laj commented on a change in pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#discussion_r416159089



##
File path: tests/providers/google/cloud/operators/test_dataflow_system.py
##
@@ -26,4 +37,198 @@
 class CloudDataflowExampleDagsSystemTest(GoogleSystemTest):
 @provide_gcp_context(GCP_DATAFLOW_KEY)
 def test_run_example_dag_function(self):
-self.run_dag('example_gcp_dataflow', CLOUD_DAG_FOLDER)
+self.run_dag("example_gcp_dataflow", CLOUD_DAG_FOLDER)
+
+
+GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
+GCR_FLEX_TEMPLATE_IMAGE = 
f"gcr.io/{GCP_PROJECT_ID}/samples-dataflow-streaming-beam-sql:latest"
+
+# 
https://github.com/GoogleCloudPlatform/java-docs-samples/tree/954553c/dataflow/flex-templates/streaming_beam_sql
+GCS_TEMPLATE_PARTS = urlparse(GCS_FLEX_TEMPLATE_TEMPLATE_PATH)
+GCS_FLEX_TEMPLATE_BUCKET_NAME = GCS_TEMPLATE_PARTS.netloc
+
+
+EXAMPLE_FLEX_TEMPLATE_REPO = "GoogleCloudPlatform/java-docs-samples"
+EXAMPLE_FLEX_TEMPLATE_COMMIT = "deb0745be1d1ac1d133e1f0a7faa9413dbfbe5fe"
+EXAMPLE_FLEX_TEMPLATE_SUBDIR = "dataflow/flex-templates/streaming_beam_sql"
+
+
+@pytest.mark.backend("mysql", "postgres")
+@pytest.mark.credential_file(GCP_GCS_TRANSFER_KEY)
+class CloudDataflowExampleDagFlexTemplateJavagSystemTest(GoogleSystemTest):
+@provide_gcp_context(GCP_GCS_TRANSFER_KEY, 
project_id=GoogleSystemTest._project_id())
+def setUp(self) -> None:
+# Create a Cloud Storage bucket
+self.execute_cmd(["gsutil", "mb", 
f"gs://{GCS_FLEX_TEMPLATE_BUCKET_NAME}"])
+
+# Build image with pipeline
+with NamedTemporaryFile() as f:
+f.write(
+textwrap.dedent(
+"""\
+steps:

Review comment:
   I am building an image from your repository.
   
https://github.com/GoogleCloudPlatform/java-docs-samples/tree/954553c/dataflow/flex-templates/streaming_beam_sql





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

2020-04-27 Thread GitBox


mik-laj commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r416157606



##
File path: airflow/providers/google/cloud/example_dags/example_dataflow_sql.py
##
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG for Google Cloud Dataflow service
+"""
+import os
+
+from airflow import models
+from airflow.providers.google.cloud.operators.dataflow import 
DataflowStartSqlJobOperator
+from airflow.utils.dates import days_ago
+
+GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
+
+BQ_SQL_DATASET = os.environ.get('DATAFLOW_BQ_SQL_DATASET', 
'airflow_dataflow_samples')
+DATAFLOW_SQL_JOB_NAME = os.environ.get('DATAFLOW_SQL_JOB_NAME', "dataflow-sql")
+DATAFLOW_SQL_LOCATION = os.environ.get('DATAFLOW_SQL_LOCATION', 'us-west1')
+
+with models.DAG(
+dag_id="example_gcp_dataflow_sql",
+default_args={
+"start_date": days_ago(1),
+},
+schedule_interval=None,  # Override to match your needs
+) as dag_sql:
+start_sql = DataflowStartSqlJobOperator(
+task_id="start_sql_query",
+job_name=DATAFLOW_SQL_JOB_NAME,
+query=f"""
+SELECT
+sales_region as sales_region,
+count(state_id) as count_state
+FROM
+bigquery.table.`{GCP_PROJECT_ID}`.beam_samples.beam_table

Review comment:
   We strive to maintain a high level of reliability for GCP integration. 
All integrations that are developed by my team must have system tests. System 
tests are required criteria for internal review. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

2020-04-27 Thread GitBox


mik-laj commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r416154847



##
File path: airflow/providers/google/cloud/operators/dataflow.py
##
@@ -406,6 +406,88 @@ def on_kill(self) -> None:
 self.hook.cancel_job(job_id=self.job_id, 
project_id=self.project_id)
 
 
+class DataflowStartSqlJobOperator(BaseOperator):
+"""
+Starts Dataflow SQL query.
+
+:param job_name: The unique name to assign to the Cloud Dataflow job.
+:type job_name: str
+:param query: The SQL query to execute.
+:type query: str
+:param options: Job parameters to be executed. It can be a dictionary with 
the following keys.
+
+For more information, look at:
+`https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query
+`__
+command reference
+
+:param options: dict
+:param location: The location of the Dataflow job (for example 
europe-west1)
+:type location: str
+:param project_id: The ID of the GCP project that owns the job.
+If set to ``None`` or missing, the default project_id from the GCP 
connection is used.
+:type project_id: Optional[str]
+:param gcp_conn_id: The connection ID to use connecting to Google Cloud
+Platform.
+:type gcp_conn_id: str
+:param delegate_to: The account to impersonate, if any.
+For this to work, the service account making the request must have
+domain-wide delegation enabled.
+:type delegate_to: str
+"""
+
+template_fields = ["job_name", 'query', 'options', 'location', 
'project_id', 'gcp_conn_id']
+
+@apply_defaults
+def __init__(
+self,
+job_name: str,
+query: str,
+options: Dict[str, Any],
+location: str = DEFAULT_DATAFLOW_LOCATION,
+project_id: Optional[str] = None,
+gcp_conn_id: str = 'google_cloud_default',
+delegate_to: Optional[str] = None,
+*args,
+**kwargs
+) -> None:
+super().__init__(*args, **kwargs)
+self.job_name = job_name
+self.query = query
+self.options = options
+self.location = location
+self.project_id = project_id
+self.gcp_conn_id = gcp_conn_id
+self.delegate_to = delegate_to
+self.job_id = None
+self.hook: Optional[DataflowHook] = None
+
+def execute(self, context):
+self.hook = DataflowHook(
+gcp_conn_id=self.gcp_conn_id,
+delegate_to=self.delegate_to,
+)
+
+def set_current_job_id(job_id):
+self.job_id = job_id
+
+job = self.hook.start_sql_job(
+job_name=self.job_name,
+query=self.query,
+options=self.options,
+location=self.location,
+project_id=self.project_id,
+on_new_job_id_callback=set_current_job_id,
+)
+
+return job
+
+def on_kill(self) -> None:
+self.log.info("On kill.")
+if self.job_id:
+self.hook.cancel_job(job_id=self.job_id, 
project_id=self.project_id)

Review comment:
   Good point. I will skip jobs in the terminal state.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

2020-04-27 Thread GitBox


mik-laj commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r416153736



##
File path: airflow/providers/google/cloud/operators/dataflow.py
##
@@ -406,6 +406,88 @@ def on_kill(self) -> None:
 self.hook.cancel_job(job_id=self.job_id, 
project_id=self.project_id)
 
 
+class DataflowStartSqlJobOperator(BaseOperator):
+"""
+Starts Dataflow SQL query.
+
+:param job_name: The unique name to assign to the Cloud Dataflow job.
+:type job_name: str
+:param query: The SQL query to execute.
+:type query: str
+:param options: Job parameters to be executed. It can be a dictionary with 
the following keys.
+
+For more information, look at:
+`https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query
+`__
+command reference
+
+:param options: dict
+:param location: The location of the Dataflow job (for example 
europe-west1)
+:type location: str
+:param project_id: The ID of the GCP project that owns the job.
+If set to ``None`` or missing, the default project_id from the GCP 
connection is used.
+:type project_id: Optional[str]
+:param gcp_conn_id: The connection ID to use connecting to Google Cloud
+Platform.
+:type gcp_conn_id: str

Review comment:
   Airflow saves all credentials(MySQL, GCP, AWS, and other) in one table 
in the database. It's called `connection`. This is the entry ID in this table.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

2020-04-27 Thread GitBox


mik-laj commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r416152856



##
File path: airflow/providers/google/cloud/hooks/dataflow.py
##
@@ -783,6 +794,77 @@ def cancel_job(
 name=job_name,
 job_id=job_id,
 location=location,
-poll_sleep=self.poll_sleep
+poll_sleep=self.poll_sleep,
+num_retries=self.num_retries,
 )
 jobs_controller.cancel()
+
+@GoogleBaseHook.fallback_to_default_project_id
+def start_sql_job(
+self,
+job_name: str,
+query: str,
+options: Dict[str, Any],
+project_id: str,
+location: str = DEFAULT_DATAFLOW_LOCATION,
+on_new_job_id_callback: Optional[Callable[[str], None]] = None
+):
+"""
+Starts Dataflow SQL query.
+
+:param job_name: The unique name to assign to the Cloud Dataflow job.
+:type job_name: str
+:param query: The SQL query to execute.
+:type query: str
+:param options: Job parameters to be executed.
+For more information, look at:
+
`https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query
+`__
+command reference
+:param location: The location of the Dataflow job (for example 
europe-west1)
+:type location: str
+:param project_id: The ID of the GCP project that owns the job.
+If set to ``None`` or missing, the default project_id from the GCP 
connection is used.
+:type project_id: Optional[str]
+:param on_new_job_id_callback: Callback called when the job ID is 
known.
+:type on_new_job_id_callback: callable
+:return: the new job object
+"""
+cmd = [
+'gcloud',
+'beta',
+'dataflow',
+'sql',
+'query',
+query,
+f'--project={project_id}',
+'--format=value(job.id)',
+f'--job-name={job_name}',
+f'--region={location}',
+*(self._options_to_args(options))
+]
+self.log.info("Executing command: %s", " ".join([shlex.quote(c) for c 
in cmd]))
+with self.provide_authorized_gcloud():
+proc = subprocess.run(  # pylint: disable=subprocess-run-check
+cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+self.log.info("Output: %s", proc.stdout.decode())
+self.log.info("Stderr: %s", proc.stderr.decode())

Review comment:
   `stderr` often contains developer information. There are not only 
errors. I will change it to `log.warning`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] ashb commented on a change in pull request #8572: Improve template capabilities of EMR job and step operators

2020-04-27 Thread GitBox


ashb commented on a change in pull request #8572:
URL: https://github.com/apache/airflow/pull/8572#discussion_r416151252



##
File path: airflow/providers/amazon/aws/operators/emr_add_steps.py
##
@@ -38,13 +38,14 @@ class EmrAddStepsOperator(BaseOperator):
 :type cluster_states: list
 :param aws_conn_id: aws connection to uses
 :type aws_conn_id: str
-:param steps: boto3 style steps to be added to the jobflow. (templated)
-:type steps: list
+:param steps: boto3 style steps or reference to a steps file (must be 
'.json' or '.jinja2') to
+be added to the jobflow. (templated)
+:type steps: list|str
 :param do_xcom_push: if True, job_flow_id is pushed to XCom with key 
job_flow_id.
 :type do_xcom_push: bool
 """
 template_fields = ['job_flow_id', 'job_flow_name', 'cluster_states', 
'steps']
-template_ext = ()
+template_ext = ('.json', '.jinja2')

Review comment:
   Nothing else uses `.jinja2` in Airflow, so I think we shouldn't create 
new pattern for just this operator.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

2020-04-27 Thread GitBox


mik-laj commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r416150987



##
File path: airflow/providers/google/cloud/hooks/dataflow.py
##
@@ -783,6 +794,77 @@ def cancel_job(
 name=job_name,
 job_id=job_id,
 location=location,
-poll_sleep=self.poll_sleep
+poll_sleep=self.poll_sleep,
+num_retries=self.num_retries,
 )
 jobs_controller.cancel()
+
+@GoogleBaseHook.fallback_to_default_project_id
+def start_sql_job(
+self,
+job_name: str,
+query: str,
+options: Dict[str, Any],
+project_id: str,
+location: str = DEFAULT_DATAFLOW_LOCATION,
+on_new_job_id_callback: Optional[Callable[[str], None]] = None
+):
+"""
+Starts Dataflow SQL query.
+
+:param job_name: The unique name to assign to the Cloud Dataflow job.
+:type job_name: str
+:param query: The SQL query to execute.
+:type query: str
+:param options: Job parameters to be executed.
+For more information, look at:
+
`https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query
+`__
+command reference
+:param location: The location of the Dataflow job (for example 
europe-west1)
+:type location: str
+:param project_id: The ID of the GCP project that owns the job.
+If set to ``None`` or missing, the default project_id from the GCP 
connection is used.
+:type project_id: Optional[str]
+:param on_new_job_id_callback: Callback called when the job ID is 
known.
+:type on_new_job_id_callback: callable
+:return: the new job object
+"""
+cmd = [
+'gcloud',
+'beta',
+'dataflow',
+'sql',
+'query',
+query,
+f'--project={project_id}',
+'--format=value(job.id)',
+f'--job-name={job_name}',
+f'--region={location}',
+*(self._options_to_args(options))
+]
+self.log.info("Executing command: %s", " ".join([shlex.quote(c) for c 
in cmd]))
+with self.provide_authorized_gcloud():

Review comment:
   Adds escape characters if needed. 
   Example:
   If you want to display the contents of the `/tmp/` directory then you can 
use the command `ls /tmp/`
   If you want to display the contents of the `/tmp/i love pizza` directory 
then you can use the command `ls '/tmp/ i love pizza'`. `ls /tmp/i love pizza` 
is incorrect command. The decision about quotation characeters was made by 
shlex.quote. This also supports other cases required by sh e.g. quote character 
in an argument
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

2020-04-27 Thread GitBox


mik-laj commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r416146984



##
File path: airflow/providers/google/cloud/hooks/dataflow.py
##
@@ -640,9 +659,8 @@ def start_python_dataflow(  # pylint: 
disable=too-many-arguments
 variables['job_name'] = name
 variables['region'] = location
 
-def label_formatter(labels_dict):
-return ['--labels={}={}'.format(key, value)
-for key, value in labels_dict.items()]
+if 'labels' in variables:
+variables['labels'] = ['{}={}'.format(key, value) for key, value 
in variables['labels'].items()]

Review comment:
   It depends on the SDK that is used.  These two SDK require different 
argument formats.
   
   We have three related methods.
   * start_python_dataflow  
   * start_java_dataflow
   * _start_dataflow
   The first two methods are public and dependent on the SDK. They are 
responsible for introducing changes dependent on the SDK, e.g. environment 
preparation. 
   _start_dataflow is an internal method. This starts the system process and 
supervises execution. I have the impression that this separation is helpful.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

2020-04-27 Thread GitBox


mik-laj commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r416142108



##
File path: airflow/providers/google/cloud/example_dags/example_dataflow_sql.py
##
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG for Google Cloud Dataflow service
+"""
+import os
+
+from airflow import models
+from airflow.providers.google.cloud.operators.dataflow import 
DataflowStartSqlJobOperator
+from airflow.utils.dates import days_ago
+
+GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
+
+BQ_SQL_DATASET = os.environ.get('DATAFLOW_BQ_SQL_DATASET', 
'airflow_dataflow_samples')
+DATAFLOW_SQL_JOB_NAME = os.environ.get('DATAFLOW_SQL_JOB_NAME', "dataflow-sql")
+DATAFLOW_SQL_LOCATION = os.environ.get('DATAFLOW_SQL_LOCATION', 'us-west1')
+
+with models.DAG(
+dag_id="example_gcp_dataflow_sql",
+default_args={
+"start_date": days_ago(1),
+},
+schedule_interval=None,  # Override to match your needs
+) as dag_sql:
+start_sql = DataflowStartSqlJobOperator(
+task_id="start_sql_query",
+job_name=DATAFLOW_SQL_JOB_NAME,
+query=f"""
+SELECT
+sales_region as sales_region,
+count(state_id) as count_state
+FROM
+bigquery.table.`{GCP_PROJECT_ID}`.beam_samples.beam_table
+GROUP BY sales_region;
+""",
+options={
+"bigquery-project": GCP_PROJECT_ID,
+"bigquery-dataset": BQ_SQL_DATASET,
+"bigquery-table": "beam_output",
+'bigquery-write-disposition': "write-truncate",
+},
+location=DATAFLOW_SQL_LOCATION,
+do_xcom_push=True,

Review comment:
   I still asked  @jaketfI  to review.  Before this change is merged, it 
will also be reviewed by at least one Apache Airflow commiter. .





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

2020-04-27 Thread GitBox


mik-laj commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r416140501



##
File path: airflow/providers/google/cloud/example_dags/example_dataflow_sql.py
##
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG for Google Cloud Dataflow service
+"""
+import os
+
+from airflow import models
+from airflow.providers.google.cloud.operators.dataflow import 
DataflowStartSqlJobOperator
+from airflow.utils.dates import days_ago
+
+GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
+
+BQ_SQL_DATASET = os.environ.get('DATAFLOW_BQ_SQL_DATASET', 
'airflow_dataflow_samples')
+DATAFLOW_SQL_JOB_NAME = os.environ.get('DATAFLOW_SQL_JOB_NAME', "dataflow-sql")
+DATAFLOW_SQL_LOCATION = os.environ.get('DATAFLOW_SQL_LOCATION', 'us-west1')
+
+with models.DAG(
+dag_id="example_gcp_dataflow_sql",
+default_args={
+"start_date": days_ago(1),
+},
+schedule_interval=None,  # Override to match your needs
+) as dag_sql:
+start_sql = DataflowStartSqlJobOperator(
+task_id="start_sql_query",
+job_name=DATAFLOW_SQL_JOB_NAME,
+query=f"""
+SELECT
+sales_region as sales_region,
+count(state_id) as count_state
+FROM
+bigquery.table.`{GCP_PROJECT_ID}`.beam_samples.beam_table

Review comment:
   This bucket is created in system tests. 
https://github.com/apache/airflow/pull/8553/files#
   Unfortunately, Dataflow SQL is not compatible with the public datasets I 
know.
   
   I got the following error when I referred to the public dataset.
   
   > Caused by: java.lang.UnsupportedOperationException: Field type 'NUMERIC' 
is not supported (field 'value')
   
   Its error message from BigQuery console
   ![Screenshot 2020-04-27 at 22 52 
46](https://user-images.githubusercontent.com/12058428/80419684-de796580-88d9-11ea-85bd-1d5473c13901.png)
   
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] kennknowles commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

2020-04-27 Thread GitBox


kennknowles commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r416136954



##
File path: airflow/providers/google/cloud/hooks/dataflow.py
##
@@ -93,15 +94,30 @@ def inner_wrapper(self: "DataflowHook", *args, **kwargs) -> 
RT:
 class DataflowJobStatus:
 """
 Helper class with Dataflow job statuses.
+Reference: 
https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState
 """
-JOB_STATE_DONE = "JOB_STATE_DONE"
+JOB_STATE_UNKNOWN = "JOB_STATE_UNKNOWN"
+JOB_STATE_STOPPED = "JOB_STATE_STOPPED"
 JOB_STATE_RUNNING = "JOB_STATE_RUNNING"
+JOB_STATE_DONE = "JOB_STATE_DONE"
 JOB_STATE_FAILED = "JOB_STATE_FAILED"
 JOB_STATE_CANCELLED = "JOB_STATE_CANCELLED"
+JOB_STATE_UPDATED = "JOB_STATE_UPDATED"
+JOB_STATE_DRAINING = "JOB_STATE_DRAINING"
+JOB_STATE_DRAINED = "JOB_STATE_DRAINED"
 JOB_STATE_PENDING = "JOB_STATE_PENDING"
-FAILED_END_STATES = {JOB_STATE_FAILED, JOB_STATE_CANCELLED}
-SUCCEEDED_END_STATES = {JOB_STATE_DONE}
-END_STATES = SUCCEEDED_END_STATES | FAILED_END_STATES
+JOB_STATE_CANCELLING = "JOB_STATE_CANCELLING"
+JOB_STATE_QUEUED = "JOB_STATE_QUEUED"
+FAILED_END_STATES = {JOB_STATE_FAILED, JOB_STATE_CANCELLED, 
JOB_STATE_STOPPED}
+SUCCEEDED_END_STATES = {JOB_STATE_DONE, JOB_STATE_UPDATED, 
JOB_STATE_DRAINED}
+TERMINAL_STATES = SUCCEEDED_END_STATES | FAILED_END_STATES

Review comment:
   Are these lists used anywhere else? I think that the success/fail 
distinction is artificial. You cannot really say if CANCELED is a failure or 
not. Probably the same with DRAINED and UPDATED. Whatever is looking at the job 
status probably wants the full details.

##
File path: airflow/providers/google/cloud/operators/dataflow.py
##
@@ -406,6 +406,88 @@ def on_kill(self) -> None:
 self.hook.cancel_job(job_id=self.job_id, 
project_id=self.project_id)
 
 
+class DataflowStartSqlJobOperator(BaseOperator):
+"""
+Starts Dataflow SQL query.

Review comment:
   @ibzib would be a good reviewer here

##
File path: airflow/providers/google/cloud/hooks/dataflow.py
##
@@ -783,6 +794,77 @@ def cancel_job(
 name=job_name,
 job_id=job_id,
 location=location,
-poll_sleep=self.poll_sleep
+poll_sleep=self.poll_sleep,
+num_retries=self.num_retries,
 )
 jobs_controller.cancel()
+
+@GoogleBaseHook.fallback_to_default_project_id
+def start_sql_job(
+self,
+job_name: str,
+query: str,
+options: Dict[str, Any],
+project_id: str,
+location: str = DEFAULT_DATAFLOW_LOCATION,
+on_new_job_id_callback: Optional[Callable[[str], None]] = None
+):
+"""
+Starts Dataflow SQL query.
+
+:param job_name: The unique name to assign to the Cloud Dataflow job.
+:type job_name: str
+:param query: The SQL query to execute.
+:type query: str
+:param options: Job parameters to be executed.
+For more information, look at:
+
`https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query
+`__
+command reference
+:param location: The location of the Dataflow job (for example 
europe-west1)
+:type location: str
+:param project_id: The ID of the GCP project that owns the job.
+If set to ``None`` or missing, the default project_id from the GCP 
connection is used.
+:type project_id: Optional[str]
+:param on_new_job_id_callback: Callback called when the job ID is 
known.
+:type on_new_job_id_callback: callable
+:return: the new job object
+"""
+cmd = [
+'gcloud',
+'beta',

Review comment:
   It is not required.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] aaltay commented on a change in pull request #8550: Add DataflowStartFlexTemplateOperator

2020-04-27 Thread GitBox


aaltay commented on a change in pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#discussion_r416133310



##
File path: airflow/providers/google/cloud/hooks/dataflow.py
##
@@ -93,15 +93,30 @@ def inner_wrapper(self: "DataflowHook", *args, **kwargs) -> 
RT:
 class DataflowJobStatus:
 """
 Helper class with Dataflow job statuses.
+Reference: 
https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState

Review comment:
   This file is also modified in 
https://github.com/apache/airflow/pull/8553. I am assuming it is the same 
changes, skipping this file for the review.

##
File path: airflow/providers/google/cloud/operators/dataflow.py
##
@@ -406,6 +406,71 @@ def on_kill(self) -> None:
 self.hook.cancel_job(job_id=self.job_id, 
project_id=self.project_id)
 
 
+class DataflowStartFlexTemplateOperator(BaseOperator):
+"""
+Starts flex templates with the Dataflow  pipeline.
+
+:param body: The request body
+:param location: The location of the Dataflow job (for example 
europe-west1)
+:type location: str
+:param project_id: The ID of the GCP project that owns the job.
+If set to ``None`` or missing, the default project_id from the GCP 
connection is used.
+:type project_id: Optional[str]
+:param gcp_conn_id: The connection ID to use connecting to Google Cloud
+Platform.
+:type gcp_conn_id: str
+:param delegate_to: The account to impersonate, if any.
+For this to work, the service account making the request must have
+domain-wide delegation enabled.
+:type delegate_to: str
+"""
+
+template_fields = ["body", 'location', 'project_id', 'gcp_conn_id']
+
+@apply_defaults
+def __init__(
+self,
+body: Dict,
+location: str,
+project_id: Optional[str] = None,
+gcp_conn_id: str = 'google_cloud_default',
+delegate_to: Optional[str] = None,
+*args,
+**kwargs
+) -> None:
+super().__init__(*args, **kwargs)
+self.body = body
+self.location = location
+self.project_id = project_id
+self.gcp_conn_id = gcp_conn_id
+self.delegate_to = delegate_to
+self.job_id = None
+self.hook: Optional[DataflowHook] = None
+
+def execute(self, context):
+self.hook = DataflowHook(
+gcp_conn_id=self.gcp_conn_id,
+delegate_to=self.delegate_to,
+)
+
+def set_current_job_id(job_id):
+self.job_id = job_id
+
+job = self.hook.start_flex_template(
+body=self.body,
+location=self.location,
+project_id=self.project_id,
+on_new_job_id_callback=set_current_job_id,
+)
+
+return job
+
+def on_kill(self) -> None:
+self.log.info("On kill.")
+if self.job_id:
+self.hook.cancel_job(job_id=self.job_id, 
project_id=self.project_id)

Review comment:
   Do you need to call this if job is no longer running?

##
File path: tests/providers/google/cloud/operators/test_dataflow_system.py
##
@@ -15,9 +15,20 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import os

Review comment:
   This file reads more like an end2end test of various dataflow system 
things. Should it be a different PR?

##
File path: 
airflow/providers/google/cloud/example_dags/example_dataflow_flex_template.py
##
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG for Google Cloud Dataflow service
+"""
+import os
+
+from airflow import models
+from airflow.providers.google.cloud.operators.dataflow import 
DataflowStartFlexTemplateOperator
+from airflow.utils.dates import days_ago
+
+GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
+
+DATAFLOW_FLEX_TEMPLATE_JOB_NAME = 
os.environ.get('DATAFLOW_FLEX_TEMPLATE_JOB_NAME', f"dataflow-flex-template")
+
+# For simplicity we use the same topic name as the subscription name.
+PUBSUB_FLEX_TEMPLATE_TOPIC = 
os.environ.get('DATAFLOW_PUBSUB_FLE

[GitHub] [airflow] aaltay commented on pull request #8553: Add DataflowStartSQLQuery operator

2020-04-27 Thread GitBox


aaltay commented on pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#issuecomment-620222175


   /cc @kennknowles



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (AIRFLOW-5156) Add other authentication mechanisms to HttpHook

2020-04-27 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-5156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17093913#comment-17093913
 ] 

ASF GitHub Bot commented on AIRFLOW-5156:
-

mik-laj commented on a change in pull request #8429:
URL: https://github.com/apache/airflow/pull/8429#discussion_r416123217



##
File path: airflow/providers/http/hooks/http.py
##
@@ -66,7 +75,7 @@ def get_conn(self, headers=None):
 if conn.port:
 self.base_url = self.base_url + ":" + str(conn.port)
 if conn.login:
-session.auth = (conn.login, conn.password)
+session.auth = self.auth_type(conn.login, conn.password)

Review comment:
   https://requests.readthedocs.io/en/master/user/authentication/
   Is it possible to use this hook with other methods? I think it would be 
better if the user passed the auth object, not just the auth_type.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add other authentication mechanisms to HttpHook
> ---
>
> Key: AIRFLOW-5156
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5156
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: hooks
>Affects Versions: 1.10.4
>Reporter: Joshua Kornblum
>Assignee: Rohit S S
>Priority: Minor
>
> It looks like the only supported authentication for HttpHooks is basic auth.
> The hook code shows 
> {quote}_if conn.login:_
>   _session.auth = (conn.login, conn.password)_
> {quote}
> requests library supports any auth that inherits AuthBase – in my scenario we 
> need ntlmauth for API on IIS server. 
> [https://2.python-requests.org/en/master/user/advanced/#custom-authentication]
> I would suggest option to pass auth object in constructor then add to if/else 
> control flow like
> {quote}_if self.auth is not None:_
>   _session.auth = self.auth_
> _elif conn.login:_
>   _session.auth = (conn.login, conn.password)_
> {quote}
> One would have to fetch the connection themselves and then fill out auth and 
> then pass that to hook which is flexible although a little awkard.
> {quote}api_conn = BaseHook().get_connection('my_api')
> auth = HttpNtlmAuth(api_conn.login, api_conn.password)
> HttpSensor(task_id='sensing', auth=auth, )
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] aaltay commented on a change in pull request #8553: Add DataflowStartSQLQuery operator

2020-04-27 Thread GitBox


aaltay commented on a change in pull request #8553:
URL: https://github.com/apache/airflow/pull/8553#discussion_r416125310



##
File path: airflow/providers/google/cloud/hooks/dataflow.py
##
@@ -93,15 +94,30 @@ def inner_wrapper(self: "DataflowHook", *args, **kwargs) -> 
RT:
 class DataflowJobStatus:
 """
 Helper class with Dataflow job statuses.
+Reference: 
https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState
 """
-JOB_STATE_DONE = "JOB_STATE_DONE"
+JOB_STATE_UNKNOWN = "JOB_STATE_UNKNOWN"
+JOB_STATE_STOPPED = "JOB_STATE_STOPPED"
 JOB_STATE_RUNNING = "JOB_STATE_RUNNING"
+JOB_STATE_DONE = "JOB_STATE_DONE"
 JOB_STATE_FAILED = "JOB_STATE_FAILED"
 JOB_STATE_CANCELLED = "JOB_STATE_CANCELLED"
+JOB_STATE_UPDATED = "JOB_STATE_UPDATED"
+JOB_STATE_DRAINING = "JOB_STATE_DRAINING"
+JOB_STATE_DRAINED = "JOB_STATE_DRAINED"
 JOB_STATE_PENDING = "JOB_STATE_PENDING"
-FAILED_END_STATES = {JOB_STATE_FAILED, JOB_STATE_CANCELLED}
-SUCCEEDED_END_STATES = {JOB_STATE_DONE}
-END_STATES = SUCCEEDED_END_STATES | FAILED_END_STATES
+JOB_STATE_CANCELLING = "JOB_STATE_CANCELLING"
+JOB_STATE_QUEUED = "JOB_STATE_QUEUED"
+FAILED_END_STATES = {JOB_STATE_FAILED, JOB_STATE_CANCELLED, 
JOB_STATE_STOPPED}

Review comment:
   JOB_STATE_STOPPED is not a failed state.  (See: 
https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#jobstate)

##
File path: airflow/providers/google/cloud/hooks/dataflow.py
##
@@ -496,17 +517,15 @@ def start_java_dataflow(
 variables['jobName'] = name
 variables['region'] = location
 
-def label_formatter(labels_dict):
-return ['--labels={}'.format(
-json.dumps(labels_dict).replace(' ', ''))]
+if 'labels' in variables:
+variables['labels'] = json.dumps(variables['labels']).replace(' ', 
'')

Review comment:
   This is modifying user provided input. Is this your intention?

##
File path: airflow/providers/google/cloud/hooks/dataflow.py
##
@@ -640,9 +659,8 @@ def start_python_dataflow(  # pylint: 
disable=too-many-arguments
 variables['job_name'] = name
 variables['region'] = location
 
-def label_formatter(labels_dict):
-return ['--labels={}={}'.format(key, value)
-for key, value in labels_dict.items()]
+if 'labels' in variables:
+variables['labels'] = ['{}={}'.format(key, value) for key, value 
in variables['labels'].items()]

Review comment:
   Why is this formatter different than the one from L521?
   
   Could we move all label fomating to the place where dataflow job is 
triggered?

##
File path: airflow/providers/google/cloud/hooks/dataflow.py
##
@@ -783,6 +794,77 @@ def cancel_job(
 name=job_name,
 job_id=job_id,
 location=location,
-poll_sleep=self.poll_sleep
+poll_sleep=self.poll_sleep,
+num_retries=self.num_retries,
 )
 jobs_controller.cancel()
+
+@GoogleBaseHook.fallback_to_default_project_id
+def start_sql_job(
+self,
+job_name: str,
+query: str,
+options: Dict[str, Any],
+project_id: str,
+location: str = DEFAULT_DATAFLOW_LOCATION,
+on_new_job_id_callback: Optional[Callable[[str], None]] = None
+):
+"""
+Starts Dataflow SQL query.
+
+:param job_name: The unique name to assign to the Cloud Dataflow job.
+:type job_name: str
+:param query: The SQL query to execute.
+:type query: str
+:param options: Job parameters to be executed.
+For more information, look at:
+
`https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query
+`__
+command reference
+:param location: The location of the Dataflow job (for example 
europe-west1)
+:type location: str
+:param project_id: The ID of the GCP project that owns the job.
+If set to ``None`` or missing, the default project_id from the GCP 
connection is used.
+:type project_id: Optional[str]
+:param on_new_job_id_callback: Callback called when the job ID is 
known.
+:type on_new_job_id_callback: callable
+:return: the new job object
+"""
+cmd = [
+'gcloud',
+'beta',

Review comment:
   Is the beta still required, do you know?

##
File path: airflow/providers/google/cloud/example_dags/example_dataflow_sql.py
##
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "

[GitHub] [airflow] mik-laj commented on a change in pull request #8429: [AIRFLOW-5156] Added auth type to HttpHook

2020-04-27 Thread GitBox


mik-laj commented on a change in pull request #8429:
URL: https://github.com/apache/airflow/pull/8429#discussion_r416123217



##
File path: airflow/providers/http/hooks/http.py
##
@@ -66,7 +75,7 @@ def get_conn(self, headers=None):
 if conn.port:
 self.base_url = self.base_url + ":" + str(conn.port)
 if conn.login:
-session.auth = (conn.login, conn.password)
+session.auth = self.auth_type(conn.login, conn.password)

Review comment:
   https://requests.readthedocs.io/en/master/user/authentication/
   Is it possible to use this hook with other methods? I think it would be 
better if the user passed the auth object, not just the auth_type.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on issue #8541: "TypeError: can't pickle _thread.RLock objects" on usage of BigQueryOperator

2020-04-27 Thread GitBox


mik-laj commented on issue #8541:
URL: https://github.com/apache/airflow/issues/8541#issuecomment-620217086


   Does this problem still occur in Airflow in the master branch? We have 
introduced a lot of changes and improvements in GCP operators and I think that 
this problem may have already been solved.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (AIRFLOW-5156) Add other authentication mechanisms to HttpHook

2020-04-27 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-5156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17093909#comment-17093909
 ] 

ASF GitHub Bot commented on AIRFLOW-5156:
-

mik-laj commented on a change in pull request #8429:
URL: https://github.com/apache/airflow/pull/8429#discussion_r416123217



##
File path: airflow/providers/http/hooks/http.py
##
@@ -66,7 +75,7 @@ def get_conn(self, headers=None):
 if conn.port:
 self.base_url = self.base_url + ":" + str(conn.port)
 if conn.login:
-session.auth = (conn.login, conn.password)
+session.auth = self.auth_type(conn.login, conn.password)

Review comment:
   https://requests.readthedocs.io/en/master/user/authentication/
   Is it used with other methods?
   I think it would be better if the user passed the auth object, not just the 
auth_type.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add other authentication mechanisms to HttpHook
> ---
>
> Key: AIRFLOW-5156
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5156
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: hooks
>Affects Versions: 1.10.4
>Reporter: Joshua Kornblum
>Assignee: Rohit S S
>Priority: Minor
>
> It looks like the only supported authentication for HttpHooks is basic auth.
> The hook code shows 
> {quote}_if conn.login:_
>   _session.auth = (conn.login, conn.password)_
> {quote}
> requests library supports any auth that inherits AuthBase – in my scenario we 
> need ntlmauth for API on IIS server. 
> [https://2.python-requests.org/en/master/user/advanced/#custom-authentication]
> I would suggest option to pass auth object in constructor then add to if/else 
> control flow like
> {quote}_if self.auth is not None:_
>   _session.auth = self.auth_
> _elif conn.login:_
>   _session.auth = (conn.login, conn.password)_
> {quote}
> One would have to fetch the connection themselves and then fill out auth and 
> then pass that to hook which is flexible although a little awkard.
> {quote}api_conn = BaseHook().get_connection('my_api')
> auth = HttpNtlmAuth(api_conn.login, api_conn.password)
> HttpSensor(task_id='sensing', auth=auth, )
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] mik-laj commented on a change in pull request #8429: [AIRFLOW-5156] Added auth type to HttpHook

2020-04-27 Thread GitBox


mik-laj commented on a change in pull request #8429:
URL: https://github.com/apache/airflow/pull/8429#discussion_r416123217



##
File path: airflow/providers/http/hooks/http.py
##
@@ -66,7 +75,7 @@ def get_conn(self, headers=None):
 if conn.port:
 self.base_url = self.base_url + ":" + str(conn.port)
 if conn.login:
-session.auth = (conn.login, conn.password)
+session.auth = self.auth_type(conn.login, conn.password)

Review comment:
   https://requests.readthedocs.io/en/master/user/authentication/
   Is it used with other methods?
   I think it would be better if the user passed the auth object, not just the 
auth_type.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on issue #8567: Support YAML input for CloudBuildCreateOperator

2020-04-27 Thread GitBox


mik-laj commented on issue #8567:
URL: https://github.com/apache/airflow/issues/8567#issuecomment-620212297


   @joppevos Use the current argument again if possible.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (AIRFLOW-5156) Add other authentication mechanisms to HttpHook

2020-04-27 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-5156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17093903#comment-17093903
 ] 

ASF GitHub Bot commented on AIRFLOW-5156:
-

jeffolsi commented on a change in pull request #8429:
URL: https://github.com/apache/airflow/pull/8429#discussion_r416119369



##
File path: airflow/providers/http/hooks/http.py
##
@@ -66,7 +75,7 @@ def get_conn(self, headers=None):
 if conn.port:
 self.base_url = self.base_url + ":" + str(conn.port)
 if conn.login:
-session.auth = (conn.login, conn.password)
+session.auth = self.auth_type(conn.login, conn.password)

Review comment:
   the change make sense to me since it uses request package so it can work 
with custom auth
   https://2.python-requests.org/en/master/user/advanced/#custom-authentication
   provide the 
   the auth_type in this context is auth object passed in the operator 
constructor
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add other authentication mechanisms to HttpHook
> ---
>
> Key: AIRFLOW-5156
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5156
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: hooks
>Affects Versions: 1.10.4
>Reporter: Joshua Kornblum
>Assignee: Rohit S S
>Priority: Minor
>
> It looks like the only supported authentication for HttpHooks is basic auth.
> The hook code shows 
> {quote}_if conn.login:_
>   _session.auth = (conn.login, conn.password)_
> {quote}
> requests library supports any auth that inherits AuthBase – in my scenario we 
> need ntlmauth for API on IIS server. 
> [https://2.python-requests.org/en/master/user/advanced/#custom-authentication]
> I would suggest option to pass auth object in constructor then add to if/else 
> control flow like
> {quote}_if self.auth is not None:_
>   _session.auth = self.auth_
> _elif conn.login:_
>   _session.auth = (conn.login, conn.password)_
> {quote}
> One would have to fetch the connection themselves and then fill out auth and 
> then pass that to hook which is flexible although a little awkard.
> {quote}api_conn = BaseHook().get_connection('my_api')
> auth = HttpNtlmAuth(api_conn.login, api_conn.password)
> HttpSensor(task_id='sensing', auth=auth, )
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] jeffolsi commented on a change in pull request #8429: [AIRFLOW-5156] Added auth type to HttpHook

2020-04-27 Thread GitBox


jeffolsi commented on a change in pull request #8429:
URL: https://github.com/apache/airflow/pull/8429#discussion_r416119369



##
File path: airflow/providers/http/hooks/http.py
##
@@ -66,7 +75,7 @@ def get_conn(self, headers=None):
 if conn.port:
 self.base_url = self.base_url + ":" + str(conn.port)
 if conn.login:
-session.auth = (conn.login, conn.password)
+session.auth = self.auth_type(conn.login, conn.password)

Review comment:
   the change make sense to me since it uses request package so it can work 
with custom auth
   https://2.python-requests.org/en/master/user/advanced/#custom-authentication
   provide the 
   the auth_type in this context is auth object passed in the operator 
constructor
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] joppevos commented on issue #8567: Support YAML input for CloudBuildCreateOperator

2020-04-27 Thread GitBox


joppevos commented on issue #8567:
URL: https://github.com/apache/airflow/issues/8567#issuecomment-620210294


   @mik-laj Happy to pick up this task. Do you mean adding an extra argument to 
be able to pass the filepath or adjusting the current `body` variable to also 
accept a filepath instead of only dict builds?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] boring-cyborg[bot] commented on pull request #8533: Stop DockerSwarmOperator from pulling Docker images

2020-04-27 Thread GitBox


boring-cyborg[bot] commented on pull request #8533:
URL: https://github.com/apache/airflow/pull/8533#issuecomment-620207440


   Awesome work, congrats on your first merged pull request!
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (AIRFLOW-5156) Add other authentication mechanisms to HttpHook

2020-04-27 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-5156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17093890#comment-17093890
 ] 

ASF GitHub Bot commented on AIRFLOW-5156:
-

mik-laj commented on a change in pull request #8429:
URL: https://github.com/apache/airflow/pull/8429#discussion_r416110652



##
File path: airflow/providers/http/hooks/http.py
##
@@ -66,7 +75,7 @@ def get_conn(self, headers=None):
 if conn.port:
 self.base_url = self.base_url + ":" + str(conn.port)
 if conn.login:
-session.auth = (conn.login, conn.password)
+session.auth = self.auth_type(conn.login, conn.password)

Review comment:
   Other auth methods require a different number of arguments, e.g. 
OAuth2.What is the purpose of this change?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add other authentication mechanisms to HttpHook
> ---
>
> Key: AIRFLOW-5156
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5156
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: hooks
>Affects Versions: 1.10.4
>Reporter: Joshua Kornblum
>Assignee: Rohit S S
>Priority: Minor
>
> It looks like the only supported authentication for HttpHooks is basic auth.
> The hook code shows 
> {quote}_if conn.login:_
>   _session.auth = (conn.login, conn.password)_
> {quote}
> requests library supports any auth that inherits AuthBase – in my scenario we 
> need ntlmauth for API on IIS server. 
> [https://2.python-requests.org/en/master/user/advanced/#custom-authentication]
> I would suggest option to pass auth object in constructor then add to if/else 
> control flow like
> {quote}_if self.auth is not None:_
>   _session.auth = self.auth_
> _elif conn.login:_
>   _session.auth = (conn.login, conn.password)_
> {quote}
> One would have to fetch the connection themselves and then fill out auth and 
> then pass that to hook which is flexible although a little awkard.
> {quote}api_conn = BaseHook().get_connection('my_api')
> auth = HttpNtlmAuth(api_conn.login, api_conn.password)
> HttpSensor(task_id='sensing', auth=auth, )
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] mik-laj commented on a change in pull request #8429: [AIRFLOW-5156] Added auth type to HttpHook

2020-04-27 Thread GitBox


mik-laj commented on a change in pull request #8429:
URL: https://github.com/apache/airflow/pull/8429#discussion_r416110652



##
File path: airflow/providers/http/hooks/http.py
##
@@ -66,7 +75,7 @@ def get_conn(self, headers=None):
 if conn.port:
 self.base_url = self.base_url + ":" + str(conn.port)
 if conn.login:
-session.auth = (conn.login, conn.password)
+session.auth = self.auth_type(conn.login, conn.password)

Review comment:
   Other auth methods require a different number of arguments, e.g. 
OAuth2.What is the purpose of this change?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on a change in pull request #8591: Add http system test

2020-04-27 Thread GitBox


mik-laj commented on a change in pull request #8591:
URL: https://github.com/apache/airflow/pull/8591#discussion_r416109551



##
File path: tests/providers/http/operators/test_http_system.py
##
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+
+import pytest
+
+from tests.test_utils import AIRFLOW_MAIN_FOLDER
+from tests.test_utils.system_tests_class import SystemTest
+
+HTTP_DAG_FOLDER = os.path.join(
+AIRFLOW_MAIN_FOLDER, "airflow", "providers", "http", "example_dags"
+)
+
+
+@pytest.mark.backend("mysql", "postgres")
+@pytest.mark.system("http")

Review comment:
   It is not the core operator. It is in the provider directory. This 
allows it to be released independently from Airflow.  It is common and mature, 
but its use and usage are still developing. It is flexible enough to be 
extended with new authentication mechanisms and more. Last week we had a 
valuable and non-doc contribution to this operator.
   https://github.com/apache/airflow/pull/8429
   
   Here is list of all core operators.
   ```
   airflow.operators.bash.BashOperator
   airflow.operators.branch_operator.BaseBranchOperator
   airflow.operators.check_operator.CheckOperator
   airflow.operators.check_operator.IntervalCheckOperator
   airflow.operators.check_operator.ThresholdCheckOperator
   airflow.operators.check_operator.ValueCheckOperator
   airflow.operators.dagrun_operator.TriggerDagRunOperator
   airflow.operators.dummy_operator.DummyOperator
   airflow.operators.generic_transfer.GenericTransfer
   airflow.operators.latest_only_operator.LatestOnlyOperator
   airflow.operators.presto_check_operator.PrestoCheckOperator
   airflow.operators.presto_check_operator.PrestoIntervalCheckOperator
   airflow.operators.presto_check_operator.PrestoValueCheckOperator
   airflow.operators.python.BranchPythonOperator
   airflow.operators.python.PythonOperator
   airflow.operators.python.PythonVirtualenvOperator
   airflow.operators.python.ShortCircuitOperator
   airflow.operators.subdag_operator.SubDagOperator
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] potiuk commented on pull request #8473: [AIRFLOW-8472]: `PATCH` for Databricks hook `_do_api_call`

2020-04-27 Thread GitBox


potiuk commented on pull request #8473:
URL: https://github.com/apache/airflow/pull/8473#issuecomment-620189429


   Great ! thanks @dimonchik-suvorov !



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (AIRFLOW-6981) Move Google Cloud Build from Discovery API to Python Library

2020-04-27 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-6981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17093875#comment-17093875
 ] 

ASF GitHub Bot commented on AIRFLOW-6981:
-

potiuk commented on pull request #8575:
URL: https://github.com/apache/airflow/pull/8575#issuecomment-620186003


   You can read  more about it in contributing docs: 
https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#generating-requirement-files
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Move Google Cloud Build from Discovery API to Python Library
> 
>
> Key: AIRFLOW-6981
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6981
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: gcp
>Affects Versions: 2.0.0
>Reporter: Ryan Yuan
>Assignee: Ryan Yuan
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   3   >