[GitHub] [airflow] Soonmok commented on issue #26865: logical_date

2022-12-16 Thread GitBox


Soonmok commented on issue #26865:
URL: https://github.com/apache/airflow/issues/26865#issuecomment-1356031865

   @frankbreetz Could you provide the sample dag code to reproduce this issue? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] Soonmok commented on a diff in pull request #28398: raise ValueError when logical date is earlier than start_date

2022-12-16 Thread GitBox


Soonmok commented on code in PR #28398:
URL: https://github.com/apache/airflow/pull/28398#discussion_r1051306708


##
airflow/api/common/trigger_dag.py:
##
@@ -51,7 +51,11 @@ def _trigger_dag(
 if dag is None or dag_id not in dag_bag.dags:
 raise DagNotFound(f"Dag id {dag_id} not found")
 
-execution_date = execution_date if execution_date else timezone.utcnow()
+if execution_date is None:
+if conf and "logical_date" in conf:
+execution_date = conf["logical_date"]

Review Comment:
   Okay. 
   According to the [issue](https://github.com/apache/airflow/issues/26865), 
the issuer triggered the dag with config. so I assumed it was triggered with 
execution_date which is set by default_args.
   but now I think there is another reason of this issue. I'll look it up more.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] Soonmok commented on a diff in pull request #28398: raise ValueError when logical date is earlier than start_date

2022-12-16 Thread GitBox


Soonmok commented on code in PR #28398:
URL: https://github.com/apache/airflow/pull/28398#discussion_r1051306708


##
airflow/api/common/trigger_dag.py:
##
@@ -51,7 +51,11 @@ def _trigger_dag(
 if dag is None or dag_id not in dag_bag.dags:
 raise DagNotFound(f"Dag id {dag_id} not found")
 
-execution_date = execution_date if execution_date else timezone.utcnow()
+if execution_date is None:
+if conf and "logical_date" in conf:
+execution_date = conf["logical_date"]

Review Comment:
   Okay. 
   According to the issue, the issuer triggered the dag with config. so I 
assumed it was triggered with execution_date which is set by default_args.
   but not I think there is another reason of this issue. I'll look it up more.



##
airflow/api/common/trigger_dag.py:
##
@@ -51,7 +51,11 @@ def _trigger_dag(
 if dag is None or dag_id not in dag_bag.dags:
 raise DagNotFound(f"Dag id {dag_id} not found")
 
-execution_date = execution_date if execution_date else timezone.utcnow()
+if execution_date is None:
+if conf and "logical_date" in conf:
+execution_date = conf["logical_date"]

Review Comment:
   Okay. 
   According to the issue, the issuer triggered the dag with config. so I 
assumed it was triggered with execution_date which is set by default_args.
   but now I think there is another reason of this issue. I'll look it up more.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] Taragolis closed issue #14962: Create EMR Notebook execution operators

2022-12-16 Thread GitBox


Taragolis closed issue #14962: Create EMR Notebook execution operators
URL: https://github.com/apache/airflow/issues/14962


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] Taragolis commented on issue #14962: Create EMR Notebook execution operators

2022-12-16 Thread GitBox


Taragolis commented on issue #14962:
URL: https://github.com/apache/airflow/issues/14962#issuecomment-1356004799

   Better late than never 拾 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] Taragolis closed pull request #28427: Update Dockerfile

2022-12-16 Thread GitBox


Taragolis closed pull request #28427: Update Dockerfile
URL: https://github.com/apache/airflow/pull/28427


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] Taragolis commented on pull request #28427: Update Dockerfile

2022-12-16 Thread GitBox


Taragolis commented on PR #28427:
URL: https://github.com/apache/airflow/pull/28427#issuecomment-1355994550

   @syed-gilani it should be strong reasons to add a new package as a runtime 
dependency into the prod image which would available for everyone.
   
   BTW you've already had information how to extend/customise your own image:
   - 
https://github.com/apache/airflow/discussions/28385#discussioncomment-4429996
   - 
https://github.com/apache/airflow/discussions/28385#discussioncomment-4430075


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] syed-gilani opened a new pull request, #28427: Update Dockerfile

2022-12-16 Thread GitBox


syed-gilani opened a new pull request, #28427:
URL: https://github.com/apache/airflow/pull/28427

   Added git as a runtime dependency
   
   
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)**
 for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a 
newsfragment file, named `{pr_number}.significant.rst` or 
`{issue_number}.significant.rst`, in 
[newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] Taragolis commented on a diff in pull request #28398: raise ValueError when logical date is earlier than start_date

2022-12-16 Thread GitBox


Taragolis commented on code in PR #28398:
URL: https://github.com/apache/airflow/pull/28398#discussion_r1051297557


##
airflow/api/common/trigger_dag.py:
##
@@ -51,7 +51,11 @@ def _trigger_dag(
 if dag is None or dag_id not in dag_bag.dags:
 raise DagNotFound(f"Dag id {dag_id} not found")
 
-execution_date = execution_date if execution_date else timezone.utcnow()
+if execution_date is None:
+if conf and "logical_date" in conf:
+execution_date = conf["logical_date"]

Review Comment:
   And this also incorrect assumption.
   
   `default_args` designed for reuse same parameters across Tasks within the 
single DAG, and it user-defined on code level. This arguments do not changed by 
their nature in runtime, except selected arguments:
   
   1. `params` - 
[Params](https://airflow.apache.org/docs/apache-airflow/stable/concepts/params.html#params)
 could defined on DAG and Task level, so it resolved during DAG initialisation
   2. `start_date` and `end_date` - 
https://airflow.apache.org/docs/apache-airflow/stable/faq.html#what-s-the-deal-with-start-date
   
   That is all exceptions. 
   
   
   `default_args` doesn't not contain `logical_date` / `execution_date` unless 
user define it, and this might happen if required propagate this arguments to 
[TriggerDagRunOperator](https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/trigger_dagrun/index.html#airflow.operators.trigger_dagrun.TriggerDagRunOperator)
 or 
[ExternalTaskSensor](https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/sensors/external_task/index.html#airflow.sensors.external_task.ExternalTaskSensor)
 without explicit define it during initialisation. But even in this case most 
probably it would be something like:
   
   ```python
   
   with DAG(
   ..., 
   default_args={
   "execution_date": "{{ logical_date }}"
   },
   ) as dag:
   ...
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] Soonmok commented on a diff in pull request #28398: raise ValueError when logical date is earlier than start_date

2022-12-16 Thread GitBox


Soonmok commented on code in PR #28398:
URL: https://github.com/apache/airflow/pull/28398#discussion_r1051289893


##
airflow/api/common/trigger_dag.py:
##
@@ -51,7 +51,11 @@ def _trigger_dag(
 if dag is None or dag_id not in dag_bag.dags:
 raise DagNotFound(f"Dag id {dag_id} not found")
 
-execution_date = execution_date if execution_date else timezone.utcnow()
+if execution_date is None:
+if conf and "logical_date" in conf:
+execution_date = conf["logical_date"]

Review Comment:
   Thank you for the explanation. 
   Than, the issuer might have used `default_args` to parse `logical_date` 
(execution_date)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] Soonmok commented on a diff in pull request #28398: raise ValueError when logical date is earlier than start_date

2022-12-16 Thread GitBox


Soonmok commented on code in PR #28398:
URL: https://github.com/apache/airflow/pull/28398#discussion_r1051291646


##
airflow/api/common/trigger_dag.py:
##
@@ -51,7 +51,11 @@ def _trigger_dag(
 if dag is None or dag_id not in dag_bag.dags:
 raise DagNotFound(f"Dag id {dag_id} not found")
 
-execution_date = execution_date if execution_date else timezone.utcnow()
+if execution_date is None:
+if conf and "logical_date" in conf:
+execution_date = conf["logical_date"]

Review Comment:
   I've change conf to default_args.  
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] Soonmok commented on a diff in pull request #28398: raise ValueError when logical date is earlier than start_date

2022-12-16 Thread GitBox


Soonmok commented on code in PR #28398:
URL: https://github.com/apache/airflow/pull/28398#discussion_r1051289893


##
airflow/api/common/trigger_dag.py:
##
@@ -51,7 +51,11 @@ def _trigger_dag(
 if dag is None or dag_id not in dag_bag.dags:
 raise DagNotFound(f"Dag id {dag_id} not found")
 
-execution_date = execution_date if execution_date else timezone.utcnow()
+if execution_date is None:
+if conf and "logical_date" in conf:
+execution_date = conf["logical_date"]

Review Comment:
   Thank you for the explanation. 
   Than, the issuer might have used `default_args` to parse `logical_date`
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] Taragolis commented on a diff in pull request #28398: raise ValueError when logical date is earlier than start_date

2022-12-16 Thread GitBox


Taragolis commented on code in PR #28398:
URL: https://github.com/apache/airflow/pull/28398#discussion_r1051289256


##
airflow/api/common/trigger_dag.py:
##
@@ -51,7 +51,11 @@ def _trigger_dag(
 if dag is None or dag_id not in dag_bag.dags:
 raise DagNotFound(f"Dag id {dag_id} not found")
 
-execution_date = execution_date if execution_date else timezone.utcnow()
+if execution_date is None:
+if conf and "logical_date" in conf:
+execution_date = conf["logical_date"]

Review Comment:
   `dag_run.conf` use in Templates, and `default_args` parameters propagate to 
tasks arguments if it not set explicitly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] Taragolis commented on a diff in pull request #28398: raise ValueError when logical date is earlier than start_date

2022-12-16 Thread GitBox


Taragolis commented on code in PR #28398:
URL: https://github.com/apache/airflow/pull/28398#discussion_r1051288043


##
airflow/api/common/trigger_dag.py:
##
@@ -51,7 +51,11 @@ def _trigger_dag(
 if dag is None or dag_id not in dag_bag.dags:
 raise DagNotFound(f"Dag id {dag_id} not found")
 
-execution_date = execution_date if execution_date else timezone.utcnow()
+if execution_date is None:
+if conf and "logical_date" in conf:
+execution_date = conf["logical_date"]

Review Comment:
   `default_args` != `dag_run.conf`
   
   1. 
https://airflow.apache.org/docs/apache-airflow/stable/dag-run.html#passing-parameters-when-triggering-dags
   2. 
https://airflow.apache.org/docs/apache-airflow/stable/tutorial/fundamentals.html#tasks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] Taragolis commented on a diff in pull request #28398: raise ValueError when logical date is earlier than start_date

2022-12-16 Thread GitBox


Taragolis commented on code in PR #28398:
URL: https://github.com/apache/airflow/pull/28398#discussion_r1051288043


##
airflow/api/common/trigger_dag.py:
##
@@ -51,7 +51,11 @@ def _trigger_dag(
 if dag is None or dag_id not in dag_bag.dags:
 raise DagNotFound(f"Dag id {dag_id} not found")
 
-execution_date = execution_date if execution_date else timezone.utcnow()
+if execution_date is None:
+if conf and "logical_date" in conf:
+execution_date = conf["logical_date"]

Review Comment:
   `default_args` != `dag.conf`
   
   1. 
https://airflow.apache.org/docs/apache-airflow/stable/dag-run.html#passing-parameters-when-triggering-dags
   2. 
https://airflow.apache.org/docs/apache-airflow/stable/tutorial/fundamentals.html#tasks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] Soonmok commented on a diff in pull request #28398: raise ValueError when logical date is earlier than start_date

2022-12-16 Thread GitBox


Soonmok commented on code in PR #28398:
URL: https://github.com/apache/airflow/pull/28398#discussion_r1051282512


##
airflow/api/common/trigger_dag.py:
##
@@ -51,7 +51,11 @@ def _trigger_dag(
 if dag is None or dag_id not in dag_bag.dags:
 raise DagNotFound(f"Dag id {dag_id} not found")
 
-execution_date = execution_date if execution_date else timezone.utcnow()
+if execution_date is None:
+if conf and "logical_date" in conf:
+execution_date = conf["logical_date"]

Review Comment:
   
https://github.com/apache/airflow/blob/401fc57e8ba1dddb041e0d777bb0277a09f227db/airflow/models/dag.py#L295
 
   The comment describe, default_arg values can be conflict with operator's 
arguments, it will follow to operator's argument. 
   Should `execution_date` argument follow this rule?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[airflow] branch main updated (401fc57e8b -> 8e0df8881f)

2022-12-16 Thread taragolis
This is an automated email from the ASF dual-hosted git repository.

taragolis pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from 401fc57e8b Restructure Docs  (#27235)
 add 8e0df8881f Add Amazon Elastic Container Registry (ECR) Hook (#28279)

No new revisions were added by this update.

Summary of changes:
 airflow/providers/amazon/aws/hooks/ecr.py  | 101 
 airflow/providers/amazon/provider.yaml |   7 ++
 ...azon-elastic-container-registry_light...@4x.png | Bin 0 -> 0 bytes
 docs/spelling_wordlist.txt |   1 +
 tests/providers/amazon/aws/hooks/test_ecr.py   | 103 +
 5 files changed, 212 insertions(+)
 create mode 100644 airflow/providers/amazon/aws/hooks/ecr.py
 create mode 100644 
docs/integration-logos/aws/amazon-elastic-container-registry_light...@4x.png
 create mode 100644 tests/providers/amazon/aws/hooks/test_ecr.py



[GitHub] [airflow] Taragolis merged pull request #28279: Add Amazon Elastic Container Registry (ECR) Hook

2022-12-16 Thread GitBox


Taragolis merged PR #28279:
URL: https://github.com/apache/airflow/pull/28279


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] Taragolis commented on a diff in pull request #28336: Fixed hanged KubernetesPodOperator

2022-12-16 Thread GitBox


Taragolis commented on code in PR #28336:
URL: https://github.com/apache/airflow/pull/28336#discussion_r1051281701


##
airflow/providers/cncf/kubernetes/utils/pod_manager.py:
##
@@ -91,6 +98,63 @@ def get_container_termination_message(pod: V1Pod, 
container_name: str):
 return container_status.state.terminated.message if container_status 
else None
 
 
+class PodLogsConsumer:
+"""
+PodLogsConsumer is responsible for pulling pod logs from a stream with 
checking a container status before
+reading data.
+This class is a workaround for the issue 
https://github.com/apache/airflow/issues/23497
+"""
+
+def __init__(
+self,
+response: HTTPResponse,
+pod: V1Pod,
+pod_manager: PodManager,
+container_name: str,
+timeout: int = 120,
+):
+self.response = response
+self.pod = pod
+self.pod_manager = pod_manager
+self.container_name = container_name
+self.timeout = timeout
+
+def __iter__(self) -> Generator[bytes, None, None]:
+messages: list[bytes] = []
+if self.logs_available():
+for chunk in self.response.stream(amt=None, decode_content=True):
+if b"\n" in chunk:
+chunks = chunk.split(b"\n")
+yield b"".join(messages) + chunks[0] + b"\n"
+for x in chunks[1:-1]:
+yield x + b"\n"
+if chunks[-1]:
+messages = [chunks[-1]]
+else:
+messages = []
+else:
+messages.append(chunk)
+if not self.logs_available():
+break
+if messages:
+yield b"".join(messages)
+
+def logs_available(self):
+remote_pod = self.pod_manager.read_pod(self.pod)
+if container_is_running(pod=remote_pod, 
container_name=self.container_name):
+return True
+container_status = get_container_status(pod=remote_pod, 
container_name=self.container_name)
+state = container_status.state if container_status else None
+terminated = state.terminated if state else None
+if terminated:
+termination_time = terminated.finished_at
+if termination_time:
+termination_time_ts = 
datetime.fromtimestamp(termination_time.timestamp())
+now_ts = datetime.fromtimestamp(datetime.now().timestamp())

Review Comment:
   Just wondering why you do transformation datetime -> timestamp -> datetime?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] Soonmok commented on a diff in pull request #28398: raise ValueError when logical date is earlier than start_date

2022-12-16 Thread GitBox


Soonmok commented on code in PR #28398:
URL: https://github.com/apache/airflow/pull/28398#discussion_r1051279698


##
airflow/api/common/trigger_dag.py:
##
@@ -51,7 +51,11 @@ def _trigger_dag(
 if dag is None or dag_id not in dag_bag.dags:
 raise DagNotFound(f"Dag id {dag_id} not found")
 
-execution_date = execution_date if execution_date else timezone.utcnow()
+if execution_date is None:
+if conf and "logical_date" in conf:
+execution_date = conf["logical_date"]

Review Comment:
   In this 
[tutorial](https://airflow.apache.org/docs/apache-airflow/1.10.1/tutorial.html),
 default_args doesn't provide logical_date, but the issuer was assigning 
`execution_date` to default_args. 
   
   I think we should check validation of using default_args.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] Soonmok commented on a diff in pull request #28398: raise ValueError when logical date is earlier than start_date

2022-12-16 Thread GitBox


Soonmok commented on code in PR #28398:
URL: https://github.com/apache/airflow/pull/28398#discussion_r1051280102


##
airflow/api/common/trigger_dag.py:
##
@@ -51,7 +51,11 @@ def _trigger_dag(
 if dag is None or dag_id not in dag_bag.dags:
 raise DagNotFound(f"Dag id {dag_id} not found")
 
-execution_date = execution_date if execution_date else timezone.utcnow()
+if execution_date is None:
+if conf and "logical_date" in conf:
+execution_date = conf["logical_date"]

Review Comment:
   Maybe we can set whitelist.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] Soonmok commented on a diff in pull request #28398: raise ValueError when logical date is earlier than start_date

2022-12-16 Thread GitBox


Soonmok commented on code in PR #28398:
URL: https://github.com/apache/airflow/pull/28398#discussion_r1051279698


##
airflow/api/common/trigger_dag.py:
##
@@ -51,7 +51,11 @@ def _trigger_dag(
 if dag is None or dag_id not in dag_bag.dags:
 raise DagNotFound(f"Dag id {dag_id} not found")
 
-execution_date = execution_date if execution_date else timezone.utcnow()
+if execution_date is None:
+if conf and "logical_date" in conf:
+execution_date = conf["logical_date"]

Review Comment:
   In this 
[tutorial](https://airflow.apache.org/docs/apache-airflow/1.10.1/tutorial.html),
 default_args doesn't provide logical_date, but the issuer was assigning 
execution_date to default_args. 
   
   I think we should check validation of using default_args.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] potiuk closed issue #28372: Airflow KubernetesPodOperator task running despite no resources being available

2022-12-16 Thread GitBox


potiuk closed issue #28372: Airflow KubernetesPodOperator task running despite 
no resources being available
URL: https://github.com/apache/airflow/issues/28372


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] potiuk commented on issue #28372: Airflow KubernetesPodOperator task running despite no resources being available

2022-12-16 Thread GitBox


potiuk commented on issue #28372:
URL: https://github.com/apache/airflow/issues/28372#issuecomment-1355901746

   > > You can limit the number of concurrent task run by airflow thanks to 
airflow pool
   > > or concurrency settings on the dag
   > 
   > Ya perhaps this is not necessarily a bug from airflow then but more of a 
feature request. What you suggested is a workaround to my problem, thanks for 
the suggestion.
   
   > I think it would be pretty cool if Airflow scheduler was aware of 
resources + auto scaling capability of a cluster, and then schedule accordingly 
(i.e. keep running jobs, and schedule the remainder that no resources can 
possibly be allocated for).
   
   This is actually not a workaround. This is how you are supposed to limit 
resources in Airflow when you use Kubernetes Pod Operator. 
   
   Using Kubernetes Pod Operator and expecting Airlfow to understand resource 
limits coming from autoscaling of the cluster it runs would basically mean that 
Airflow would have to copy the whole logic of Kubernetes to know what it can / 
cannot schedule. I am not sure if you are aware that there are plenty of things 
Kubernetes takes into account when scheduling pods - and many of them have 
super complex logic. It's not only memory, but also affinities, 
anti-affinities, labels that are matching or not the nodes the pod could run on 
and plenty of others. For example imagine you have 20 KPOs each requiring GPU 
and only 2 GPUS are available. And tihs is only one of the cases. Duplicating 
the whole logic of K8S by airflow is not only difficult but also prone to 
errors and it would mean that Airlfow's KPO would be closely tied with specific 
version of K8S because new features of K8S are added with each release. What 
you ask for is not really feasible. 
   
   You might think it is simple for your specific case because you just 
**know** you have 2 CPUS per node and you know you have 6 of them in total, so 
it must be simple for Airflow to know it ...  But in fact Airlfow would have to 
implement a very complex logic to know it in general case. And by providing the 
Pool you ACTUALLY pass your knowledge to Airflow and it indeed knows what are 
the limits without performing all the complex and brittle K8s logic..
   
   We do not really want to re-implement K8S in Airflow.
   
   But you can do better than manually allocating fixed pool of resources for 
your workloads. And Airlflow gets you covered.
   
   If you really want to do scaling, then what you can do you can use Celery 
Executor Running on K8S. As surprisingly as it is - this is pretty good way to 
implement K8s auto-scaling. This is precisely what Celery Executor was designed 
for really - especially if you have relatively short tasks which are similar to 
each other in terms of complexity, CeleryExecutor is the way to go rather than 
running tasks through KPOs. We have KEDA-based auto-scaling implemented in our 
Helm Chart, and  if you run it on top of auto-scaling K8S cluster, it will 
actually be able to handle autoscaling well. You can even connect it with long 
running Kubernetes tasks and run Celery Kubernetes Executor and choose which 
tasks are run where.
   
   Again - in this case you need to manage queues to direct your load, but then 
those queues can dynamically grow in sizes if you want it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] ayman-albaz commented on issue #28372: Airflow KubernetesPodOperator task running despite no resources being available

2022-12-16 Thread GitBox


ayman-albaz commented on issue #28372:
URL: https://github.com/apache/airflow/issues/28372#issuecomment-1355872345

   > You can limit the number of concurrent task run by airflow thanks to 
airflow pool
   > 
   > or concurrency settings on the dag
   
   Ya perhaps this is not necessarily a bug from airflow then but more of a 
feature request. What you suggested is a workaround to my problem, thanks for 
the suggestion.
   
   I think it would be pretty cool if Airflow scheduler was aware of resources 
+ auto scaling capability of a cluster, and then schedule accordingly (i.e. 
keep running jobs, and schedule the remainder that no resources can possibly be 
allocated for).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] rkarish commented on pull request #28375: AIP-51 - Misc. Compatibility Checks

2022-12-16 Thread GitBox


rkarish commented on PR #28375:
URL: https://github.com/apache/airflow/pull/28375#issuecomment-1355848046

   Thanks for the feedback Niko! I will clean this up and let you know when I 
am finished.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] rkarish commented on a diff in pull request #28375: AIP-51 - Misc. Compatibility Checks

2022-12-16 Thread GitBox


rkarish commented on code in PR #28375:
URL: https://github.com/apache/airflow/pull/28375#discussion_r1051247458


##
airflow/executors/executor_loader.py:
##
@@ -167,10 +167,3 @@ def __load_local_kubernetes_executor(cls) -> BaseExecutor:
 
 local_kubernetes_executor_cls = 
import_string(cls.executors[LOCAL_KUBERNETES_EXECUTOR])
 return local_kubernetes_executor_cls(local_executor, 
kubernetes_executor)
-
-
-UNPICKLEABLE_EXECUTORS = (

Review Comment:
   I searched the repository for references and it looked like it was safe to 
delete, but I didn't think about other users in the community being dependent 
on this. I think that adding a comment here and waiting to clean this up at a 
later time is a safer approach.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] rkarish commented on a diff in pull request #28375: AIP-51 - Misc. Compatibility Checks

2022-12-16 Thread GitBox


rkarish commented on code in PR #28375:
URL: https://github.com/apache/airflow/pull/28375#discussion_r1051244879


##
airflow/executors/base_executor.py:
##
@@ -65,11 +65,13 @@ class BaseExecutor(LoggingMixin):
 """
 
 supports_ad_hoc_ti_run: bool = False
+supports_sentry: bool = False
 
 job_id: None | int | str = None
 callback_sink: BaseCallbackSink | None = None
 
 is_local: bool = False
+is_picklable: bool = False

Review Comment:
   I agree that this makes more sense. For some reason I went with the opposite 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[airflow] branch constraints-main updated: Updating constraints. Build id:

2022-12-16 Thread github-bot
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch constraints-main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/constraints-main by this push:
 new c4a89b18eb Updating constraints. Build id:
c4a89b18eb is described below

commit c4a89b18eb6ba85f2db17d9500155dcf7e7aa3d2
Author: Automated GitHub Actions commit 
AuthorDate: Fri Dec 16 23:36:59 2022 +

Updating constraints. Build id:

This update in constraints is automatically committed by the CI 
'constraints-push' step based on
HEAD of '' in ''
with commit sha .

All tests passed in this build so we determined we can push the updated 
constraints.

See 
https://github.com/apache/airflow/blob/main/README.md#installing-from-pypi for 
details.
---
 constraints-3.10.txt  | 24 
 constraints-3.7.txt   | 20 ++--
 constraints-3.8.txt   | 24 
 constraints-3.9.txt   | 24 
 constraints-no-providers-3.10.txt |  8 
 constraints-no-providers-3.7.txt  |  4 ++--
 constraints-no-providers-3.8.txt  |  8 
 constraints-no-providers-3.9.txt  |  8 
 constraints-source-providers-3.10.txt | 24 
 constraints-source-providers-3.7.txt  | 20 ++--
 constraints-source-providers-3.8.txt  | 24 
 constraints-source-providers-3.9.txt  | 24 
 12 files changed, 106 insertions(+), 106 deletions(-)

diff --git a/constraints-3.10.txt b/constraints-3.10.txt
index ba0c4773d5..feb30adce2 100644
--- a/constraints-3.10.txt
+++ b/constraints-3.10.txt
@@ -1,5 +1,5 @@
 #
-# This constraints file was automatically generated on 2022-12-15T17:38:59Z
+# This constraints file was automatically generated on 2022-12-16T23:36:18Z
 # via "eager-upgrade" mechanism of PIP. For the "main" branch of Airflow.
 # This variant of constraints install uses the HEAD of the branch version for 
'apache-airflow' but installs
 # the providers from PIP-released packages at the moment of the constraint 
generation.
@@ -50,7 +50,7 @@ aiofiles==0.8.0
 aiohttp==3.8.3
 aiosignal==1.3.1
 alabaster==0.7.12
-alembic==1.8.1
+alembic==1.9.0
 aliyun-python-sdk-core==2.13.36
 aliyun-python-sdk-kms==2.16.0
 amqp==5.1.1
@@ -158,7 +158,7 @@ azure-mgmt-datafactory==1.1.0
 azure-mgmt-datalake-nspkg==3.0.1
 azure-mgmt-datalake-store==0.5.0
 azure-mgmt-nspkg==3.0.2
-azure-mgmt-resource==21.2.1
+azure-mgmt-resource==22.0.0
 azure-nspkg==3.0.2
 azure-servicebus==7.8.1
 azure-storage-blob==12.14.1
@@ -173,9 +173,9 @@ billiard==3.6.4.0
 black==22.12.0
 bleach==5.0.1
 blinker==1.5
-boto3==1.26.30
+boto3==1.26.32
 boto==2.49.0
-botocore==1.29.30
+botocore==1.29.32
 bowler==0.9.0
 cachelib==0.9.0
 cachetools==4.2.2
@@ -185,7 +185,7 @@ celery==5.2.7
 certifi==2022.12.7
 cffi==1.15.1
 cfgv==3.3.1
-cfn-lint==0.72.3
+cfn-lint==0.72.5
 cgroupspy==0.2.2
 chardet==4.0.0
 charset-normalizer==2.1.1
@@ -209,7 +209,7 @@ cron-descriptor==1.2.32
 croniter==1.3.8
 cryptography==36.0.2
 curlify==2.2.1
-dask==2022.12.0
+dask==2022.12.1
 databricks-sql-connector==2.2.0
 datadog==0.44.0
 db-dtypes==1.0.5
@@ -217,7 +217,7 @@ decorator==5.1.1
 defusedxml==0.7.1
 dill==0.3.1.1
 distlib==0.3.6
-distributed==2022.12.0
+distributed==2022.12.1
 dnspython==2.2.1
 docker==6.0.1
 docopt==0.6.2
@@ -318,7 +318,7 @@ httplib2==0.20.4
 httpx==0.23.1
 humanize==4.4.0
 hvac==1.0.2
-identify==2.5.9
+identify==2.5.10
 idna==3.4
 ijson==3.1.4
 imagesize==1.4.1
@@ -342,7 +342,7 @@ json-merge-patch==0.2
 jsondiff==2.0.0
 jsonpatch==1.32
 jsonpath-ng==1.5.3
-jsonpickle==3.0.0
+jsonpickle==3.0.1
 jsonpointer==2.3
 jsonschema==4.17.3
 junit-xml==1.9
@@ -382,7 +382,7 @@ msrest==0.7.1
 msrestazure==0.6.4
 multi-key-dict==2.0.3
 multidict==6.0.3
-mypy-boto3-appflow==1.26.15
+mypy-boto3-appflow==1.26.32
 mypy-boto3-rds==1.26.29
 mypy-boto3-redshift-data==1.26.30
 mypy-extensions==0.4.3
@@ -532,7 +532,7 @@ snakebite-py3==3.0.5
 sniffio==1.3.0
 snowballstemmer==2.2.0
 snowflake-connector-python==2.9.0
-snowflake-sqlalchemy==1.4.5
+snowflake-sqlalchemy==1.4.4
 sortedcontainers==2.4.0
 soupsieve==2.3.2.post1
 sphinx-airflow-theme==0.0.11
diff --git a/constraints-3.7.txt b/constraints-3.7.txt
index 75d8a6f1e2..bf43ed299d 100644
--- a/constraints-3.7.txt
+++ b/constraints-3.7.txt
@@ -1,5 +1,5 @@
 #
-# This constraints file was automatically generated on 2022-12-15T17:39:19Z
+# This constraints file was automatically generated on 2022-12-16T23:36:56Z
 # via "eager-upgrade" mechanism of PIP. For the "main" branch of Airflow.
 # This variant of constraints install uses the HEAD of the branch version for 
'apache-airflow' but installs
 # the providers from PIP-released packages at the moment of the constraint 
generation.
@@ -50,7 +50,7 @@ aiofiles==0.8.0
 

[GitHub] [airflow] o-nikolas commented on a diff in pull request #28279: Add Amazon Elastic Container Registry (ECR) Hook

2022-12-16 Thread GitBox


o-nikolas commented on code in PR #28279:
URL: https://github.com/apache/airflow/pull/28279#discussion_r1051231747


##
airflow/providers/amazon/aws/hooks/ecr.py:
##
@@ -0,0 +1,101 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import base64
+import logging
+from dataclasses import dataclass
+from datetime import datetime
+
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+from airflow.utils.log.secrets_masker import mask_secret
+
+logger = logging.getLogger(__name__)
+
+
+@dataclass(frozen=True)
+class EcrCredentials:
+"""Helper (frozen dataclass) for storing temporary ECR credentials."""
+
+username: str
+password: str
+proxy_endpoint: str
+expires_at: datetime
+
+def __post_init__(self):
+mask_secret(self.password)
+logger.debug("Credentials to Amazon ECR %r expires at %s.", 
self.proxy_endpoint, self.expires_at)
+
+@property
+def registry(self) -> str:
+"""Return registry in appropriate `docker login` format."""
+# 
https://github.com/docker/docker-py/issues/2256#issuecomment-824940506
+return self.proxy_endpoint.replace("https://;, "")
+
+
+class EcrHook(AwsBaseHook):
+"""
+Interact with Amazon Elastic Container Registry (ECR)
+
+Additional arguments (such as ``aws_conn_id``) may be specified and
+are passed down to the underlying AwsBaseHook.
+
+.. seealso::
+:class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+"""
+
+def __init__(self, **kwargs):
+kwargs.pop("client_type", None)
+super().__init__(client_type="ecr", **kwargs)
+
+def get_temporary_credentials(self, registry_ids: list[str] | str | None = 
None) -> list[EcrCredentials]:
+"""Get temporary credentials for Amazon ECR.
+
+Return list of 
:class:`~airflow.providers.amazon.aws.hooks.ecr.EcrCredentials`,
+obtained credentials valid for 12 hours.
+
+:param registry_ids: Either AWS Account ID or list of AWS Account IDs 
that are associated
+with the registries for which obtain credentials. If you do not 
specify a registry,
+the default registry is assumed.
+
+.. seealso::
+- `boto3 ECR client get_authorization_token method 
`_.
+"""
+registry_ids = registry_ids or None

Review Comment:
   Yeah no concern I was just checking to be sure. You could re-arrange the if 
statements so that the falsey check happens first so that you wouldn't need 
this assignment, but the way you have it is also perfectly fine :+1: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] o-nikolas commented on a diff in pull request #28375: AIP-51 - Misc. Compatibility Checks

2022-12-16 Thread GitBox


o-nikolas commented on code in PR #28375:
URL: https://github.com/apache/airflow/pull/28375#discussion_r1051226845


##
airflow/sentry.py:
##
@@ -78,14 +79,15 @@ class ConfiguredSentry(DummySentry):
 def __init__(self):
 """Initialize the Sentry SDK."""
 ignore_logger("airflow.task")
-executor_name = conf.get("core", "EXECUTOR")
 
 sentry_flask = FlaskIntegration()
 
 # LoggingIntegration is set by default.
 integrations = [sentry_flask]
 
-if executor_name == "CeleryExecutor":
+executor_class, _ = 
ExecutorLoader.import_executor_cls(conf.get("core", "EXECUTOR"))

Review Comment:
   You can use the helper method I added for this 
`ExecutorLoader.import_default_executor_cls()`



##
airflow/executors/executor_loader.py:
##
@@ -167,10 +167,3 @@ def __load_local_kubernetes_executor(cls) -> BaseExecutor:
 
 local_kubernetes_executor_cls = 
import_string(cls.executors[LOCAL_KUBERNETES_EXECUTOR])
 return local_kubernetes_executor_cls(local_executor, 
kubernetes_executor)
-
-
-UNPICKLEABLE_EXECUTORS = (

Review Comment:
   I wonder if we should actually keep this in for backwards compatibility 
(with a comment describing that it is deprecated) :thinking: i.e. someone in 
the community could be depending on this list existing. I wonder what others 
think about this.



##
airflow/executors/celery_kubernetes_executor.py:
##
@@ -38,6 +38,7 @@ class CeleryKubernetesExecutor(LoggingMixin):
 otherwise, CeleryExecutor is used.
 """
 
+is_picklable: bool = True

Review Comment:
   You'll also need to set a value for `supports_sentry` in this class as well 
since it does not implement the `BaseExecutor` interface (see the discussion 
starting [from 
here](https://github.com/apache/airflow/issues/28276#issuecomment-1344899475) 
for more context). You'll need to ensure both new fields are set on all the 
executors which don't inherit from `BaseExecutor` (`LocalKubernetesExecutor` as 
well) and add the appropriate unit tests for those.



##
airflow/executors/base_executor.py:
##
@@ -65,11 +65,13 @@ class BaseExecutor(LoggingMixin):
 """
 
 supports_ad_hoc_ti_run: bool = False
+supports_sentry: bool = False
 
 job_id: None | int | str = None
 callback_sink: BaseCallbackSink | None = None
 
 is_local: bool = False
+is_picklable: bool = False

Review Comment:
   I think we want the default to be True, no? The previous code assumed all 
executors were picklable unless it was present in the list of 
`UNPICKLEABLE_EXECUTORS`. So any executor that goes through that code should be 
assumed to be picklable unless it says otherwise. WDYT?



##
airflow/executors/local_kubernetes_executor.py:
##
@@ -38,6 +38,7 @@ class LocalKubernetesExecutor(LoggingMixin):
 otherwise, LocalExecutor is used.
 """
 
+is_picklable: bool = True

Review Comment:
   Do we actually want this to be True? The `LocalExecutor` is not picklable 
and so far we've gone with True for these only if all sub executors are True. 
WDYT?
   There is some context in a PR [comment 
here](https://github.com/apache/airflow/pull/28288/files#r1045147675) (from 
@pierrejeambrun's PR for `is_local`) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #28300: Add Public Interface description to Airflow documentation

2022-12-16 Thread GitBox


potiuk commented on PR #28300:
URL: https://github.com/apache/airflow/pull/28300#issuecomment-1355754952

   Hey everyone - I spoke to @TohnJhomas and after we merged the big 
restructuring of his in #27235 we decided together to move all the changes for 
"customizing airflow" and "public-airlfow-interface" to 
"administration-and-deployment". 
   
   This is mostly what it is all about - people writing DAGs are rarely 
concerned with changes that would involve breaking the API - those are more 
people who are installing and administering airlfow that are concerned about 
it, but I think we can still discuss what is the best place if others think 
otherwise.
   
   Looking forward to merging that one afte the big restructure of docs is 
merged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] amoGLingle commented on issue #24909: Airflow Scheduler Deadlock - Transaction not rolled back on Exception?

2022-12-16 Thread GitBox


amoGLingle commented on issue #24909:
URL: https://github.com/apache/airflow/issues/24909#issuecomment-1355743923

   thx


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[airflow] branch main updated (11f30a887c -> 401fc57e8b)

2022-12-16 Thread potiuk
This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from 11f30a887c Dont show task/run durations when there is no start_date 
(#28395)
 add 401fc57e8b Restructure Docs  (#27235)

No new revisions were added by this update.

Summary of changes:
 .github/boring-cyborg.yml  | 20 ++---
 RELEASE_NOTES.rst  |  4 +-
 airflow/hooks/subprocess.py|  2 +-
 airflow/providers/cncf/kubernetes/CHANGELOG.rst|  2 +-
 .../operators.rst  |  4 +-
 .../operators/index.rst|  2 +-
 .../core-extensions/auth-backends.rst  |  2 +-
 .../core-extensions/connections.rst|  2 +-
 .../core-extensions/logging.rst|  2 +-
 .../core-extensions/secrets-backends.rst   |  2 +-
 .../cluster-policies.rst   |  0
 .../dag-serialization.rst  |  2 +-
 .../index.rst  | 33 
 .../kubernetes.rst |  4 +-
 .../lineage.rst|  0
 .../listeners.rst  |  2 +-
 .../logging-monitoring/callbacks.rst   |  2 +-
 .../logging-monitoring/check-health.rst|  0
 .../logging-monitoring/errors.rst  |  0
 .../logging-monitoring/index.rst   |  0
 .../logging-monitoring/logging-architecture.rst|  6 +-
 .../logging-monitoring/logging-tasks.rst   |  0
 .../logging-monitoring/metrics.rst |  0
 .../logging-monitoring/tracking-user-activity.rst  |  2 +-
 .../modules_management.rst |  2 +-
 .../pools.rst  |  0
 .../priority-weight.rst|  0
 .../production-deployment.rst  | 12 +--
 .../scheduler.rst  |  8 +-
 .../security/access-control.rst|  2 +-
 .../security/api.rst   |  0
 .../security/flower.rst|  0
 .../security/index.rst |  0
 .../security/kerberos.rst  |  0
 .../security/secrets/fernet.rst|  0
 .../security/secrets/index.rst |  4 +-
 .../security/secrets/mask-sensitive-values.rst |  0
 .../security/secrets/secrets-backend/index.rst |  2 +-
 .../local-filesystem-secrets-backend.rst   |  0
 .../security/webserver.rst |  2 +-
 .../security/workload.rst  |  0
 .../connections.rst|  0
 .../dagfile-processing.rst |  0
 .../datasets.rst   |  0
 .../deferring.rst  |  2 +-
 .../dynamic-task-mapping.rst   |  0
 .../index.rst  | 43 ---
 .../{ => authoring-and-scheduling}/plugins.rst |  6 +-
 .../serializers.rst|  0
 .../timetable.rst  |  2 +-
 .../{ => authoring-and-scheduling}/timezone.rst|  4 +-
 docs/apache-airflow/best-practices.rst |  6 +-
 docs/apache-airflow/cli-and-env-variables-ref.rst  |  2 +-
 .../apache-airflow/{ => core-concepts}/dag-run.rst |  8 +-
 .../{concepts => core-concepts}/dags.rst   | 22 +++---
 .../{ => core-concepts}/executor/celery.rst|  4 +-
 .../executor/celery_kubernetes.rst |  0
 .../{ => core-concepts}/executor/dask.rst  |  2 +-
 .../{ => core-concepts}/executor/debug.rst |  2 +-
 .../{ => core-concepts}/executor/index.rst |  2 +-
 .../{ => core-concepts}/executor/kubernetes.rst| 10 +--
 .../{ => core-concepts}/executor/local.rst |  0
 .../executor/local_kubernetes.rst  |  0
 .../{ => core-concepts}/executor/sequential.rst|  0
 .../{concepts => core-concepts}/index.rst  | 18 +
 .../{concepts => core-concepts}/operators.rst  |  0
 .../{concepts => core-concepts}/overview.rst   | 12 +--
 .../{concepts => core-concepts}/params.rst |  0
 .../{concepts => core-concepts}/sensors.rst|  2 +-
 .../{concepts => core-concepts}/taskflow.rst   |  0
 .../{concepts => core-concepts}/tasks.rst  | 16 ++--
 .../{concepts => core-concepts}/variables.rst  |  0
 .../{concepts => core-concepts}/xcoms.rst  |  0
 docs/apache-airflow/faq.rst|  2 +-
 docs/apache-airflow/howto/connection.rst   |  8 +-
 docs/apache-airflow/howto/custom-operator.rst  |  4 +-
 docs/apache-airflow/howto/custom-view-plugin.rst   |  2 +-
 docs/apache-airflow/howto/customize-ui.rst |  2 +-
 

[GitHub] [airflow] potiuk merged pull request #27235: Restructure Docs

2022-12-16 Thread GitBox


potiuk merged PR #27235:
URL: https://github.com/apache/airflow/pull/27235


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on pull request #27235: Restructure Docs

2022-12-16 Thread GitBox


boring-cyborg[bot] commented on PR #27235:
URL: https://github.com/apache/airflow/pull/27235#issuecomment-1355725875

   Awesome work, congrats on your first merged pull request!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #27235: Restructure Docs

2022-12-16 Thread GitBox


potiuk commented on PR #27235:
URL: https://github.com/apache/airflow/pull/27235#issuecomment-1355724573

                       
   
   Let me merge it quickly before there are any conflicts
   
   
                       


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] mhenc opened a new pull request, #28425: AIP-44: Add command for running standalone Internal API

2022-12-16 Thread GitBox


mhenc opened a new pull request, #28425:
URL: https://github.com/apache/airflow/pull/28425

   Introduce `airflow internal-api` CLI command that starts intependent 
Internal API server.
   
   closes: #28266 
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #28367: mkdirs should set mode correctly

2022-12-16 Thread GitBox


potiuk commented on PR #28367:
URL: https://github.com/apache/airflow/pull/28367#issuecomment-1355637320

   +1


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] vincbeck opened a new pull request, #28422: example_dms system test: fetch default VPC id from boto3

2022-12-16 Thread GitBox


vincbeck opened a new pull request, #28422:
URL: https://github.com/apache/airflow/pull/28422

   All resources in this system test uses the default VPC. Instead of getting 
it as external value, we should get it using `boto3`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] vincbeck opened a new pull request, #28421: example_eks_with_fargate_in_one_step: fetch VPC ID from external

2022-12-16 Thread GitBox


vincbeck opened a new pull request, #28421:
URL: https://github.com/apache/airflow/pull/28421

   The public subnet ID passed down to the system test must belong to the VPC 
used in the system test. Since the public subnet comes as external value, the 
VPC should as well


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] Taragolis closed issue #28419: Difficulty connecting to

2022-12-16 Thread GitBox


Taragolis closed issue #28419: Difficulty connecting to 
URL: https://github.com/apache/airflow/issues/28419


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on issue #28419: Difficulty connecting to

2022-12-16 Thread GitBox


boring-cyborg[bot] commented on issue #28419:
URL: https://github.com/apache/airflow/issues/28419#issuecomment-1355587386

   Thanks for opening your first issue here! Be sure to follow the issue 
template!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] archiero opened a new issue, #28419: Difficulty connecting to

2022-12-16 Thread GitBox


archiero opened a new issue, #28419:
URL: https://github.com/apache/airflow/issues/28419

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### What happened
   
   I'm working with Apache-Airflow 2.2.2 using both MWAA in Amazon and on my 
local Lubuntu machine (booting from a USB stick on a Windows 10 Surface Pro). I 
followed the guide below to connect to Snowflake and it didn't work either 
place with the exact same error message of "[Errno -2] Name or service not 
known". I've attached the log file from my machine, the dag that fails, and 
requirements file for both. 
   
   Side note, you'll notice that I have a Python operator that connects using 
the snowflake.connector module to log into the database. It works just fine but 
my superiors have said that they'd prefer the SnowflakeHook or SnowflakeOperator
   
   
https://community.snowflake.com/s/article/How-to-connect-Apache-Airflow-to-Snowflake-and-schedule-queries-jobs
   
[ExampleFailureSnowflake.log](https://github.com/apache/airflow/files/10248044/ExampleFailureSnowflake.log)
   
[requirements.txt](https://github.com/apache/airflow/files/10248047/requirements.txt)
   
[test_snowflake_connection_dag.txt](https://github.com/apache/airflow/files/10248060/test_snowflake_connection_dag.txt)
   
   ### What you think should happen instead
   
   It should connect to Snowflake
   
   ### How to reproduce
   
   Run the dag with the same requirements file
   
   ### Operating System
   
   MWAA and Lubuntu 22.10 on USB stick with main hardrive of Windows 10 pro
   
   ### Versions of Apache Airflow Providers
   
   2.2.2
   
   ### Deployment
   
   MWAA
   
   ### Deployment details
   
   MWAA (main) and virtualenv (Python 3.10) on the Lubuntu stick
   
   ### Anything else
   
   Literally every time I attempt to use SnowflakeHook and SnowflakeOperator 
but never when I use snowflake.connector.
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] mhenc commented on issue #28270: AIP-44 Migrate DagFileProcessorManager._deactivate_stale_dags to Internal API

2022-12-16 Thread GitBox


mhenc commented on issue #28270:
URL: https://github.com/apache/airflow/issues/28270#issuecomment-135266

   Seems this method was refactored
   in https://github.com/apache/airflow/pull/21399 to
   DagFileProcessorManager._deactivate_stale_dags
   
   Updated the title


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #27235: Restructure Docs

2022-12-16 Thread GitBox


potiuk commented on PR #27235:
URL: https://github.com/apache/airflow/pull/27235#issuecomment-1355540427

   Approved workflow again :)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] eladkal commented on pull request #28024: Add AWS SageMaker operator to register a model's version

2022-12-16 Thread GitBox


eladkal commented on PR #28024:
URL: https://github.com/apache/airflow/pull/28024#issuecomment-1355539041

   I'll review this week


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] potiuk commented on issue #28380: Celery green threads incompatibility

2022-12-16 Thread GitBox


potiuk commented on issue #28380:
URL: https://github.com/apache/airflow/issues/28380#issuecomment-1355537151

   > @potiuk, I still have the issue. Nevertheless, monkey patch warning have 
disappeared.
   
   This is a good news. At least it allows us to merge this change because 
clearly the warnings are removed.
   
   > I am quite sure now that the celery broker or backend are responsible. The 
result backend database are not updated when some tasks finished, contrary to 
metadata database...
   
   > What do you think?
   
   There are many things that could have caused the behaviour  - but without 
any evidences, I can only guess, and I have no prior similar experience to base 
it on. As discussed on Slack the "Celery" Executor is not supported in Breeze 
as a "working" feature (Issue opened 
https://github.com/apache/airflow/issues/28412 . It might well be some issue in 
the way how things are configured in Breeze. If you would like to explore it 
further and try to investigate how to implement it, that woudl be awesome. But 
this is not a high priority, really and I would prefer someone (like you) who 
is a celery user can do some more investigation how to make it works in Breeze. 
This happend in the past and I would like more of the contirbutors to spend 
their time in improving our dev env - seems that someone who understand what 
gevent is and wants to use it has enough incentive to do more investigation. 
Happy to help with it but need more evidences and some deeper debugging from 
your side if we are to pr
 ogress there.
   
   I am happy to get ideas bounced off me and to help to understand ins-outs of 
Breeze to guide such a person. Slack discussion in #breeze  channel on slack is 
likely best place for that.
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] vandonr-amz commented on pull request #28024: Add AWS SageMaker operator to register a model's version

2022-12-16 Thread GitBox


vandonr-amz commented on PR #28024:
URL: https://github.com/apache/airflow/pull/28024#issuecomment-1355533568

   hey @eladkal do you think there are more things to change on this PR ?
   if not could you drop your `change requested` review ?  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] Taragolis commented on pull request #28367: mkdirs should set mode correctly

2022-12-16 Thread GitBox


Taragolis commented on PR #28367:
URL: https://github.com/apache/airflow/pull/28367#issuecomment-1355515252

   Yep. If this issue still exists better to fix them. It could be a situation 
that initial issue solved but comments still exists.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] Taragolis commented on a diff in pull request #28318: Add FTPSFileTransmitOperator

2022-12-16 Thread GitBox


Taragolis commented on code in PR #28318:
URL: https://github.com/apache/airflow/pull/28318#discussion_r1051106575


##
tests/providers/ftp/operators/test_ftp.py:
##
@@ -184,3 +180,108 @@ def test_unequal_local_remote_file_paths(self):
 local_filepath=["/tmp/test1", "/tmp/test2"],
 remote_filepath="/tmp/test1",
 )
+
+
+class TestFTPSFileTransmitOperator:
+def setup_method(self):
+self.test_local_dir = "/tmp"
+self.test_local_dir_int = "/tmp/interdir"
+self.test_remote_dir = "/ftpshome"
+self.test_remote_dir_int = "/ftpshome/interdir"
+self.test_local_filename = "test_local_file"
+self.test_remote_filename = "test_remote_file"
+self.test_local_filepath = 
f"{self.test_local_dir}/{self.test_local_filename}"
+self.test_remote_filepath = 
f"{self.test_remote_dir}/{self.test_remote_filename}"
+self.test_local_filepath_int_dir = 
f"{self.test_local_dir_int}/{self.test_local_filename}"
+self.test_remote_filepath_int_dir = 
f"{self.test_remote_dir_int}/{self.test_remote_filename}"
+
+def teardown_method(self):
+if os.path.exists(self.test_local_dir_int):
+os.rmdir(self.test_local_dir_int)

Review Comment:
   We almost complete migrate all tests from `unittests` to `pytest`.
   Right now most of migration only about get rid of `unittests.TestCase` and 
their methods and we still use xunit-style for setup/teardown tests.
   
   In general better to use all functional provided by `pytest` especially 
fixtures: provided by pytest, plugins, user-defined.
   Out of the box pytest support two fixture which provide ability to work with 
temporary directories:
   1. `tmp_path` for create unique temporary path (it could be also created 
during the tests)
   2. `tmp_path_factory` for create temporary directories
   
   see: https://docs.pytest.org/en/6.2.x/tmpdir.html
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] dstandish opened a new pull request, #28417: Use object instead of array in config.yml for config template

2022-12-16 Thread GitBox


dstandish opened a new pull request, #28417:
URL: https://github.com/apache/airflow/pull/28417

   This makes it easier to navigate the document in IDE.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on pull request #28416: Update index.rst

2022-12-16 Thread GitBox


boring-cyborg[bot] commented on PR #28416:
URL: https://github.com/apache/airflow/pull/28416#issuecomment-1355504787

   Congratulations on your first Pull Request and welcome to the Apache Airflow 
community! If you have any issues or are unsure about any anything please check 
our Contribution Guide 
(https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
   Here are some useful points:
   - Pay attention to the quality of your code (flake8, mypy and type 
annotations). Our [pre-commits]( 
https://github.com/apache/airflow/blob/main/STATIC_CODE_CHECKS.rst#prerequisites-for-pre-commit-hooks)
 will help you with that.
   - In case of a new feature add useful documentation (in docstrings or in 
`docs/` directory). Adding a new operator? Check this short 
[guide](https://github.com/apache/airflow/blob/main/docs/apache-airflow/howto/custom-operator.rst)
 Consider adding an example DAG that shows how users should use it.
   - Consider using [Breeze 
environment](https://github.com/apache/airflow/blob/main/BREEZE.rst) for 
testing locally, it's a heavy docker but it ships with a working Airflow and a 
lot of integrations.
   - Be patient and persistent. It might take some time to get a review or get 
the final approval from Committers.
   - Please follow [ASF Code of 
Conduct](https://www.apache.org/foundation/policies/conduct) for all 
communication including (but not limited to) comments on Pull Requests, Mailing 
list and Slack.
   - Be sure to read the [Airflow Coding style]( 
https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#coding-style-and-best-practices).
   Apache Airflow is a community-driven project and together we are making it 
better .
   In case of doubts contact the developers at:
   Mailing List: d...@airflow.apache.org
   Slack: https://s.apache.org/airflow-slack
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] syed-gilani opened a new pull request, #28416: Update index.rst

2022-12-16 Thread GitBox


syed-gilani opened a new pull request, #28416:
URL: https://github.com/apache/airflow/pull/28416

   Added Rancher to installation instruction
   
   
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)**
 for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a 
newsfragment file, named `{pr_number}.significant.rst` or 
`{issue_number}.significant.rst`, in 
[newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] avicol commented on pull request #28353: Fix the sql syntax in merge_data

2022-12-16 Thread GitBox


avicol commented on PR #28353:
URL: https://github.com/apache/airflow/pull/28353#issuecomment-1355503635

   > Your PR description doesn't match the code?
   
   Changed the title and description


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] Taragolis commented on a diff in pull request #28321: Add support in AWS Batch Operator for multinode jobs

2022-12-16 Thread GitBox


Taragolis commented on code in PR #28321:
URL: https://github.com/apache/airflow/pull/28321#discussion_r1051097899


##
airflow/providers/amazon/aws/operators/batch.py:
##
@@ -108,12 +110,13 @@ class BatchOperator(BaseOperator):
 "job_queue",
 "overrides",

Review Comment:
   I thought it fine if rename this argument because one day we might also add 
`eks_properties_overrides`, so `override` might confuse users more rather than 
change attribute name.
   And deprecation warning give time to change arguments in end users DAG code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] vandonr-amz opened a new pull request, #28415: add a step to delete docker image after test in example_sagemaker

2022-12-16 Thread GitBox


vandonr-amz opened a new pull request, #28415:
URL: https://github.com/apache/airflow/pull/28415

   the test setup creates a docker image that then stays in docker forever, 
eventually filling up the disk space available to docker (up to a point when it 
cannot build new images, and the test starts failing for weird reasons).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] vandonr-amz commented on a diff in pull request #28321: Add support in AWS Batch Operator for multinode jobs

2022-12-16 Thread GitBox


vandonr-amz commented on code in PR #28321:
URL: https://github.com/apache/airflow/pull/28321#discussion_r1051087293


##
airflow/providers/amazon/aws/operators/batch.py:
##
@@ -108,12 +110,13 @@ class BatchOperator(BaseOperator):
 "job_queue",
 "overrides",

Review Comment:
   given the fact that it was just a rename for readability, I wonder if it 
wouldn't be simpler to just keep the old name ?
   but then it could be confusing to users...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] vandonr-amz commented on a diff in pull request #28321: Add support in AWS Batch Operator for multinode jobs

2022-12-16 Thread GitBox


vandonr-amz commented on code in PR #28321:
URL: https://github.com/apache/airflow/pull/28321#discussion_r1051087293


##
airflow/providers/amazon/aws/operators/batch.py:
##
@@ -108,12 +110,13 @@ class BatchOperator(BaseOperator):
 "job_queue",
 "overrides",

Review Comment:
   given the fact that it was just a rename for readability, I wonder if it 
wouldn't be simpler to just keep the old name ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] Taragolis commented on a diff in pull request #28321: Add support in AWS Batch Operator for multinode jobs

2022-12-16 Thread GitBox


Taragolis commented on code in PR #28321:
URL: https://github.com/apache/airflow/pull/28321#discussion_r1051085532


##
airflow/providers/amazon/aws/operators/batch.py:
##
@@ -108,12 +110,13 @@ class BatchOperator(BaseOperator):
 "job_queue",
 "overrides",

Review Comment:
   @camilleanne @vandonr-amz I recommend deprecate old parameter but keep it 
for a while, so users would have a time for change their code.
   It is much easier achieve in subclasss of `BaseOperator` (this PR case), 
because all arguments are keyword. 
   
   Some examples
   
   **Use old attribute value only if new attribute not set**
   
https://github.com/apache/airflow/blob/11f30a887c77f9636e88e31dffd969056132ae8c/airflow/providers/slack/operators/slack_webhook.py#L95-L104
   
   **Use old attribute value only if it equal new attribute value or not new 
attribute not set**
   
https://github.com/apache/airflow/blob/11f30a887c77f9636e88e31dffd969056132ae8c/airflow/providers/amazon/aws/operators/athena.py#L91-L101



##
airflow/providers/amazon/aws/operators/batch.py:
##
@@ -108,12 +110,13 @@ class BatchOperator(BaseOperator):
 "job_queue",
 "overrides",

Review Comment:
   @camilleanne @vandonr-amz I recommend deprecate old parameter but keep it 
for a while, so users would have a time for change their code.
   It is much easier achieve in subclass of `BaseOperator` (this PR case), 
because all arguments are keyword. 
   
   Some examples
   
   **Use old attribute value only if new attribute not set**
   
https://github.com/apache/airflow/blob/11f30a887c77f9636e88e31dffd969056132ae8c/airflow/providers/slack/operators/slack_webhook.py#L95-L104
   
   **Use old attribute value only if it equal new attribute value or not new 
attribute not set**
   
https://github.com/apache/airflow/blob/11f30a887c77f9636e88e31dffd969056132ae8c/airflow/providers/amazon/aws/operators/athena.py#L91-L101



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] potiuk closed issue #24909: Airflow Scheduler Deadlock - Transaction not rolled back on Exception?

2022-12-16 Thread GitBox


potiuk closed issue #24909: Airflow Scheduler Deadlock - Transaction not rolled 
back on Exception?
URL: https://github.com/apache/airflow/issues/24909


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] potiuk commented on issue #24909: Airflow Scheduler Deadlock - Transaction not rolled back on Exception?

2022-12-16 Thread GitBox


potiuk commented on issue #24909:
URL: https://github.com/apache/airflow/issues/24909#issuecomment-1355469985

   > Question:
   > Does airflow have/support an official module/dag that does db cleanup?
   
   Look for `airflow db clean` command (added in 2.3 I think) 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] victorjourne commented on issue #28380: Celery green threads incompatibility

2022-12-16 Thread GitBox


victorjourne commented on issue #28380:
URL: https://github.com/apache/airflow/issues/28380#issuecomment-1355456814

   @potiuk, I still have the issue. Nevertheless, monkey patch warning have 
disappeared. 
   Tested with Breeze `breeze start-airflow  --python 3.7 --backend postgres 
--postgres-version 13 --integration celery`. then launched airflow worker and 
flower.
   ```
   #files/airflow-breeze-config/variables.env
   export AIRFLOW__CORE__EXECUTOR=CeleryExecutor
   export AIRFLOW__CELERY__POOL=gevent
   export _AIRFLOW_PATCH_GEVENT=1
   ```
   
   I am quite sure now that the celery broker or backend  are responsible. The 
result backend database are not updated when some tasks finished, contrary to 
metadata database... 
   
   What do you think?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] eladkal commented on pull request #28397: A manual run can't look like a scheduled one

2022-12-16 Thread GitBox


eladkal commented on PR #28397:
URL: https://github.com/apache/airflow/pull/28397#issuecomment-1355445156

   tests are failing :(


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] carlsonp opened a new issue, #28414: Airflow >= 2.3.4: The CSRF session token is missing upon login

2022-12-16 Thread GitBox


carlsonp opened a new issue, #28414:
URL: https://github.com/apache/airflow/issues/28414

   ### Apache Airflow version
   
   2.5.0
   
   ### What happened
   
   I have a docker-compose local install of Airflow.  When I go to login using 
LDAP, I get an error message: `The CSRF session token is missing`.  It's trying 
to `POST` to a `/login/` endpoint.  When I look at the request being submitted 
via my browser developer console, I see a value being set as part of the 
payload:
   
   ```
   csrf_token=redacted=myuser=secret
   ```
   
   I **don't** have an issue with Airflow `2.3.3`.  I've tested it with other 
versions such as `2.3.4`, `2.4.3`, and `2.5.0` and they all exhibit the same 
issue.
   
   Based on searching, some people talk about ensuring a common secret key is 
set to ensure communication between the Webserver and the Worker nodes.  I've 
tried setting the following environment variables in my docker-compose as part 
of the startup.  This didn't appear to fix the issue.  I also tried setting the 
number of workers to 1.
   
   ```
   AIRFLOW__LOGGING__LOGGING_LEVEL: DEBUG
   # 
https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#workers
   # https://github.com/apache/airflow/issues/23512#issuecomment-1276644397
   AIRFLOW__WEBSERVER__WORKERS: 1
   # 
https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#auth-backends
   # allows all requests to hit the API
   AIRFLOW__API__AUTH_BACKENDS: 
'airflow.api.auth.backend.default,airflow.api.auth.backend.session'
   # 
https://stackoverflow.com/questions/68889419/csrf-session-token-is-missing-in-airflow
   AIRFLOW__WEBSERVER__SECRET_KEY: 'superdupersecret'
   ```
   
   
   ### What you think should happen instead
   
   I should be able to login and get to the DAG screen.
   
   ### How to reproduce
   
   I have the following `webserver_config.py` file that works with Airflow 
`2.3.3`.
   
   ```
   import os
   from flask_appbuilder.security.manager import AUTH_LDAP
   
   # The authentication type
   AUTH_TYPE = AUTH_LDAP
   
   # Will allow user self registration
   AUTH_USER_REGISTRATION = True
   
   AUTH_USER_REGISTRATION_ROLE = "Admin"
   
   AUTH_LDAP_SERVER = "ldaps://redacted"
   AUTH_LDAP_BIND_USER = "CN=" + os.environ['LDAP_USER'] + 
",CN=Users,DC=redacted,DC=redacted,DC=redacted"
   AUTH_LDAP_BIND_PASSWORD = os.environ['LDAP_PASSWORD']
   AUTH_LDAP_SEARCH = "CN=Users,DC=redacted,DC=redacted,DC=redacted"
   AUTH_LDAP_SEARCH_FILTER = "(memberOf=cn=" + 
os.environ['LDAP_SECURITY_METAGROUP'] + 
",CN=Users,DC=redacted,DC=redacted,DC=redacted)"
   AUTH_LDAP_UID_FIELD = "sAMAccountName"
   
   AUTH_LDAP_FIRSTNAME_FIELD = "givenName"
   AUTH_LDAP_LASTTNAME_FIELD = "sn"
   
   # if we should replace ALL the user's roles each login, or only on 
registration
   AUTH_ROLES_SYNC_AT_LOGIN = True
   
   # force users to re-auth after 30min of inactivity (to keep roles in sync)
   PERMANENT_SESSION_LIFETIME = 1800
   ```
   
   This [discussion post](https://github.com/apache/airflow/discussions/26870) 
is *exactly* what I am experiencing.  They mentioned adjusting 
`webserver_config.py`.  Based on [the default config file in the 
repo](https://github.com/apache/airflow/blob/main/airflow/config_templates/default_webserver_config.py),
 I have made adjustments to the file.
   
   ```
   import os
   from flask_appbuilder.security.manager import AUTH_LDAP
   from airflow.www.fab_security.manager import AUTH_LDAP
   basedir = os.path.abspath(os.path.dirname(__file__))
   
   # I've tried enabling and disabling every combination of these two variables
   #CSRF_ENABLED = True
   #WTF_CSRF_ENABLED = True
   
   # The authentication type
   AUTH_TYPE = AUTH_LDAP
   ...
   ```
   
   This still doesn't work for me.
   
   I'm not sure what else to try.  [This 
change](https://github.com/apache/airflow/commit/48d4c5da19217174c8996b2882bb71f40381ae2c)
 seemed to make adjustments to the underlying FAB security system.  However, I 
can't find any examples in the official documentation or elsewhere for *new* 
working LDAP examples.
   
   Thank you.
   
   ### Operating System
   
   Linux (Ubuntu) within container
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   docker-compose on Windows
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] RachitSharma2001 commented on a diff in pull request #28318: Add FTPSFileTransmitOperator

2022-12-16 Thread GitBox


RachitSharma2001 commented on code in PR #28318:
URL: https://github.com/apache/airflow/pull/28318#discussion_r1051047298


##
tests/providers/ftp/operators/test_ftp.py:
##
@@ -184,3 +180,108 @@ def test_unequal_local_remote_file_paths(self):
 local_filepath=["/tmp/test1", "/tmp/test2"],
 remote_filepath="/tmp/test1",
 )
+
+
+class TestFTPSFileTransmitOperator:
+def setup_method(self):
+self.test_local_dir = "/tmp"
+self.test_local_dir_int = "/tmp/interdir"
+self.test_remote_dir = "/ftpshome"
+self.test_remote_dir_int = "/ftpshome/interdir"
+self.test_local_filename = "test_local_file"
+self.test_remote_filename = "test_remote_file"
+self.test_local_filepath = 
f"{self.test_local_dir}/{self.test_local_filename}"
+self.test_remote_filepath = 
f"{self.test_remote_dir}/{self.test_remote_filename}"
+self.test_local_filepath_int_dir = 
f"{self.test_local_dir_int}/{self.test_local_filename}"
+self.test_remote_filepath_int_dir = 
f"{self.test_remote_dir_int}/{self.test_remote_filename}"
+
+def teardown_method(self):
+if os.path.exists(self.test_local_dir_int):
+os.rmdir(self.test_local_dir_int)

Review Comment:
   @Taragolis The testing code itself does not create any files or directories. 
Rather, for one of the tests (the 
`test_file_transfer_with_intermediate_dir_get`), the code that is being tested 
(from line 121 in `FTPFileTransmitOperator `in ftp.py) will create a new 
directory, so we need a teardown method to remove that created directory after 
that test. The FTPFileTransmitOperator is set up so that even if that directory 
already exists, it won't crash. Do you think it would be better to just mock 
out the creation of the directory from FTPFileTransmitOperator, so that during 
the tests no new directories are created?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] vandonr-amz commented on a diff in pull request #28321: Add support in AWS Batch Operator for multinode jobs

2022-12-16 Thread GitBox


vandonr-amz commented on code in PR #28321:
URL: https://github.com/apache/airflow/pull/28321#discussion_r1051041510


##
airflow/providers/amazon/aws/operators/batch.py:
##
@@ -108,12 +110,13 @@ class BatchOperator(BaseOperator):
 "job_queue",
 "overrides",

Review Comment:
   TIL the names here need to match not the parameter name in the ctor but the 
name of the attribute in the class, so:
   ```suggestion
   "container_overrides",
   ```
   
   Thinking about it, I wonder if this is a breaking change... I don't know too 
well how this works  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] amoGLingle commented on issue #24909: Airflow Scheduler Deadlock - Transaction not rolled back on Exception?

2022-12-16 Thread GitBox


amoGLingle commented on issue #24909:
URL: https://github.com/apache/airflow/issues/24909#issuecomment-1355356024

   Hello,
   I think we found the culprit and can close this.
   We had been occasionally running the db cleanup dag that is part of
   https://github.com/teamclairvoyant/airflow-maintenance-dags
   There didn't seem to be a correlation, but the last time it got run within 
an hour the system locked up.
   I did notice that there's an updated version that we weren't running, but 
haven't bothered to install it:
   The risk of running it is too high.
   
   Question:
   Does airflow have/support an official module/dag that does db cleanup?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] potiuk opened a new issue, #28412: Make `--executor` flag works for `breeze start-airflow` command

2022-12-16 Thread GitBox


potiuk opened a new issue, #28412:
URL: https://github.com/apache/airflow/issues/28412

   ### Body
   
   Currently `breeze start-airflow` command only uses LocalExecutor and starts 
scheduler, triggerer and webserver. However it would be nice if it accepts 
`--executor` flag that could accept other executors. This would be  much nicer 
to test some scenarios - mostly with regards to Celery executor. 
   
   Such a flag could:
   
   * enable `--celery` integraiton (With rabbitmq and redis)
   * also start worker (optionallly more than one) 
   * also start flower 
   
   
   
   ### Committer
   
   - [X] I acknowledge that I am a maintainer/committer of the Apache Airflow 
project.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[airflow] branch main updated (4d0fa01f72 -> 11f30a887c)

2022-12-16 Thread bbovenzi
This is an automated email from the ASF dual-hosted git repository.

bbovenzi pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


from 4d0fa01f72 Add documentation for task group mapping (#28001)
 add 11f30a887c Dont show task/run durations when there is no start_date 
(#28395)

No new revisions were added by this update.

Summary of changes:
 airflow/www/static/js/dag/InstanceTooltip.test.tsx | 14 +
 airflow/www/static/js/dag/InstanceTooltip.tsx  | 24 +-
 airflow/www/static/js/dag/details/dagRun/index.tsx | 14 +++--
 .../static/js/dag/details/taskInstance/Details.tsx | 16 ---
 airflow/www/yarn.lock  | 18 
 5 files changed, 49 insertions(+), 37 deletions(-)



[GitHub] [airflow] bbovenzi closed issue #28070: task duration in grid view is different when viewed at different times.

2022-12-16 Thread GitBox


bbovenzi closed issue #28070: task duration in grid view is different when 
viewed at different times.
URL: https://github.com/apache/airflow/issues/28070


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] uranusjr commented on a diff in pull request #28397: A manual run can't look like a scheduled one

2022-12-16 Thread GitBox


uranusjr commented on code in PR #28397:
URL: https://github.com/apache/airflow/pull/28397#discussion_r1051021322


##
airflow/models/dag.py:
##
@@ -2586,14 +2586,23 @@ def create_dagrun(
 else:
 data_interval = 
self.infer_automated_data_interval(logical_date)
 
+if run_type is not None and not isinstance(run_type, DagRunType):
+raise ValueError(f"`run_type` should be a DagRunType, not 
{type(run_type)}")
+
 if run_id:  # Infer run_type from run_id if needed.
 if not isinstance(run_id, str):
 raise ValueError(f"`run_id` should be a str, not 
{type(run_id)}")
-if not run_type:
-run_type = DagRunType.from_run_id(run_id)
+inferred_run_type = DagRunType.from_run_id(run_id)
+if run_type is None:
+# No explicit type given, use the inferred type.
+run_type = inferred_run_type
+elif run_type == DagRunType.MANUAL and inferred_run_type != 
DagRunType.MANUAL:

Review Comment:
   I considered this as well. Airflow always only generate the run ID of an 
automated run with an appropriate format, so this is probably unless someone is 
delibrately manipulating Airflow to do otherwise (e.g. calling `create_dagrun` 
directly). I opted for the current approach to keep the impact as little as 
possible, but the alternative is also fine for me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] bbovenzi merged pull request #28395: Dont show task/run durations when there is no start_date

2022-12-16 Thread GitBox


bbovenzi merged PR #28395:
URL: https://github.com/apache/airflow/pull/28395


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] bbovenzi opened a new pull request, #28411: Fix calendar view for CronTriggerTimeTable dags

2022-12-16 Thread GitBox


bbovenzi opened a new pull request, #28411:
URL: https://github.com/apache/airflow/pull/28411

   We were checking the wrong instance when building the calendar data for 
CronTrigger dags and then getting stuck in an infinite loop.
   
   
   Fixes: https://github.com/apache/airflow/issues/27645
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)**
 for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a 
newsfragment file, named `{pr_number}.significant.rst` or 
`{issue_number}.significant.rst`, in 
[newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] vandonr-amz commented on a diff in pull request #28321: Add support in AWS Batch Operator for multinode jobs

2022-12-16 Thread GitBox


vandonr-amz commented on code in PR #28321:
URL: https://github.com/apache/airflow/pull/28321#discussion_r1050994658


##
airflow/providers/amazon/aws/hooks/batch_client.py:
##
@@ -416,8 +416,43 @@ def get_job_awslogs_info(self, job_id: str) -> dict[str, 
str] | None:
 
 :param job_id: AWS Batch Job ID
 """
-job_container_desc = 
self.get_job_description(job_id=job_id).get("container", {})
-log_configuration = job_container_desc.get("logConfiguration", {})
+job_desc = self.get_job_description(job_id=job_id)
+
+job_node_properties = job_desc.get("nodeProperties", {})
+job_container_desc = job_desc.get("container", {})
+
+if job_node_properties:
+job_node_range_properties = 
job_node_properties.get("nodeRangeProperties", {})
+if len(job_node_range_properties) > 1:
+self.log.warning(
+"AWS Batch job (%s) has more than one node group. Only 
returning logs from first group.",
+job_id,
+)
+log_configuration = (
+job_node_range_properties[0].get("container", 
{}).get("logConfiguration", {})

Review Comment:
   Is it possible to have zero element in the array ? i.e. should we add a 
check on `len == 0` and a user-friendly error message ?



##
tests/providers/amazon/aws/operators/test_batch.py:
##
@@ -187,6 +188,47 @@ def test_kill_job(self):
 self.client_mock.terminate_job.assert_called_once_with(jobId=JOB_ID, 
reason="Task killed by the user")
 
 
+class TestBatchOperatorTrimmedArgs:
+"""test class that does not inherit from unittest.TestCase"""

Review Comment:
   I added this class before @Taragolis removed `unittest.TestCase` in all AWS 
tests, this can be merged with the class above now :)



##
airflow/providers/amazon/aws/hooks/batch_client.py:
##
@@ -416,8 +416,43 @@ def get_job_awslogs_info(self, job_id: str) -> dict[str, 
str] | None:
 
 :param job_id: AWS Batch Job ID
 """
-job_container_desc = 
self.get_job_description(job_id=job_id).get("container", {})
-log_configuration = job_container_desc.get("logConfiguration", {})
+job_desc = self.get_job_description(job_id=job_id)
+
+job_node_properties = job_desc.get("nodeProperties", {})
+job_container_desc = job_desc.get("container", {})
+
+if job_node_properties:
+job_node_range_properties = 
job_node_properties.get("nodeRangeProperties", {})
+if len(job_node_range_properties) > 1:
+self.log.warning(
+"AWS Batch job (%s) has more than one node group. Only 
returning logs from first group.",
+job_id,
+)
+log_configuration = (
+job_node_range_properties[0].get("container", 
{}).get("logConfiguration", {})
+)
+# "logStreamName" value is not available in the "container" object 
for multinode jobs --
+# it is available in the "attempts" object
+job_attempts = job_desc.get("attempts", [])
+if len(job_attempts):
+if len(job_attempts) > 1:
+self.log.warning(
+"AWS Batch job (%s) has had more than one attempt. \
+Only returning logs from the most recent attempt.",
+job_id,
+)
+awslogs_stream_name = job_attempts[-1].get("container", 
{}).get("logStreamName")
+else:
+awslogs_stream_name = None
+
+elif job_container_desc:
+log_configuration = job_container_desc.get("logConfiguration", {})
+awslogs_stream_name = job_container_desc.get("logStreamName")
+else:
+self.log.warning(
+"AWS Batch job (%s) is neither a container nor multinode job. 
Log info not found."

Review Comment:
   Maybe this could be an error log, considering the user-provided input is 
invalid for this kind of request ? 
   
   The other warning logs in this method are mostly informative (there are 
several node groups, which is important info for the user to know, but doesn't 
require any action), this one I think requires user action, and thus more 
attention.



##
tests/providers/amazon/aws/hooks/test_batch_client.py:
##
@@ -309,6 +326,40 @@ def test_job_splunk_logs(self, caplog):
 assert len(caplog.records) == 1
 assert "uses logDriver (splunk). AWS CloudWatch logging disabled." 
in caplog.messages[0]
 
+def test_job_awslogs_multinode_job(self):
+self.client_mock.describe_jobs.return_value = {
+"jobs": [
+{
+"jobId": JOB_ID,
+"attempts": [
+{"container": {"exitCode": 0, "logStreamName": 
"test/stream/attempt0"}},
+  

[GitHub] [airflow] swapz-z commented on pull request #28282: Mark stepIds for cancel in EmrAddStepsOperator

2022-12-16 Thread GitBox


swapz-z commented on PR #28282:
URL: https://github.com/apache/airflow/pull/28282#issuecomment-1355303154

   > 
   
   Please add the reviewers again, I was just trying to request for a re-review 
from you and others got removed 臘‍♂️


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a diff in pull request #28354: Allow Users to disable SwaggerUI via configuration

2022-12-16 Thread GitBox


ephraimbuddy commented on code in PR #28354:
URL: https://github.com/apache/airflow/pull/28354#discussion_r1051006086


##
airflow/config_templates/config.yml:
##
@@ -1502,6 +1502,13 @@
   type: string
   example: "dagrun_cleared,failed"
   default: ~
+- name: enable_swagger_ui
+  description: |
+Boolean for running SwaggerUI in the webserver.
+  version_added: 2.5.1

Review Comment:
   ```suggestion
 version_added: 2.6.0
   ```
   ~Should this be in config 'api' section, since swagger is about REST API?~ 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a diff in pull request #28354: Allow Users to disable SwaggerUI via configuration

2022-12-16 Thread GitBox


ephraimbuddy commented on code in PR #28354:
URL: https://github.com/apache/airflow/pull/28354#discussion_r1051006086


##
airflow/config_templates/config.yml:
##
@@ -1502,6 +1502,13 @@
   type: string
   example: "dagrun_cleared,failed"
   default: ~
+- name: enable_swagger_ui
+  description: |
+Boolean for running SwaggerUI in the webserver.
+  version_added: 2.5.1

Review Comment:
   ```suggestion
 version_added: 2.6.0
   ```
   Should this be in config 'api' section, since swagger is about REST API? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] Taragolis commented on issue #28405: Add general-purpose "notifier" concept to DAGs for easier slack/teams/email notifications

2022-12-16 Thread GitBox


Taragolis commented on issue #28405:
URL: https://github.com/apache/airflow/issues/28405#issuecomment-1355246110

   Just my 50 cents based on past experience. In projects where I worked we 
usual use callable objects instead of just functions just for additional 
re-usability.
   
   Something like this
   
   ```python
   from abc import abstractmethod
   from typing import Any
   
   from airflow.models.taskinstance import Context
   from airflow.utils.log.logging_mixin import LoggingMixin
   
   
   class BaseCallback(LoggingMixin):
   """Base callback for Airflow tasks"""
   
   @abstractmethod
   def callback(self, context: Context) -> None:
   pass
   
   def __call__(self, context: Context):
   try:
   self.callback(context)
   except Exception:
   self.log.exception("Error during callback")
   raise
   ```
   
   So after that we can parametrise our pipelines and create callback like this
   
   ```python
   from airflow.providers.slack.hooks.slack import SlackHook
   
   
   class SlackNotificationCallback(BaseCallback):
   def __init__(
   self,
   *,
   slack_conn_id: str,
   channel: str,
   template: str,
   template_type: str = "json",
   username: str = None,
   icon_emoji: str = None,
   ):
   super().__init__()
   self.slack_conn_id = slack_conn_id
   self.template = template
   self.template_type = template_type
   self.channel = channel
   self.username = username
   self.icon_emoji = icon_emoji
   
   def callback(self, context):
   hook = SlackHook(slack_conn_id=self.slack_conn_id)
   message = ... # Some magic with Jinja here
   ...
   
   return hook.client.api_call("chat.postMessage", json=message)
   ```
   
   And use it in task by different way
   
   ```python
   task = EmptyOperator(
   task_id = "awesome_task",
   on_failure_callback=SlackNotificationCallback(
   slack_conn_id="slack_api_conn",
   channel="#everything-fine",
   template="foo/bar/on-failure-slack.yml"
   template_type="yaml",
   ),
   on_retry_callback=SlackNotificationCallback(
   slack_conn_id="slack_api_conn",
   channel="#spam-nobody-read-it",
   template="foo/bar/on-retry-slack.yml"
   template_type="yaml",
   )
   )
   ```
   
   The small limitation with this approach:
   1. In case of task required more than one callback then need to create 
middleware Callback which run other callbacks in a loop
   2. Unable out of the box use templated fields as result need to implements 
all jinja stuff manually


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] eladkal commented on pull request #28282: Mark stepIds for cancel in EmrAddStepsOperator

2022-12-16 Thread GitBox


eladkal commented on PR #28282:
URL: https://github.com/apache/airflow/pull/28282#issuecomment-1355226060

   I'm curious about this request.
   This operator doesn't feel natural to a workflow. It feels more like "I did 
a mistake I want to fix it". I'm not sure if fixing it should involve Airflow 
as it means writing a new dag with this operator and deploy it to handle the 
deletion.
   
   Would love to hear me ore thoughts on that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] Taragolis commented on a diff in pull request #28279: Add Amazon Elastic Container Registry (ECR) Hook

2022-12-16 Thread GitBox


Taragolis commented on code in PR #28279:
URL: https://github.com/apache/airflow/pull/28279#discussion_r1050933236


##
airflow/providers/amazon/aws/hooks/ecr.py:
##
@@ -0,0 +1,101 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import base64
+import logging
+from dataclasses import dataclass
+from datetime import datetime
+
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+from airflow.utils.log.secrets_masker import mask_secret
+
+logger = logging.getLogger(__name__)
+
+
+@dataclass(frozen=True)
+class EcrCredentials:
+"""Helper (frozen dataclass) for storing temporary ECR credentials."""
+
+username: str
+password: str
+proxy_endpoint: str
+expires_at: datetime
+
+def __post_init__(self):
+mask_secret(self.password)
+logger.debug("Credentials to Amazon ECR %r expires at %s.", 
self.proxy_endpoint, self.expires_at)
+
+@property
+def registry(self) -> str:
+"""Return registry in appropriate `docker login` format."""
+# 
https://github.com/docker/docker-py/issues/2256#issuecomment-824940506
+return self.proxy_endpoint.replace("https://;, "")
+
+
+class EcrHook(AwsBaseHook):
+"""
+Interact with Amazon Elastic Container Registry (ECR)
+
+Additional arguments (such as ``aws_conn_id``) may be specified and
+are passed down to the underlying AwsBaseHook.
+
+.. seealso::
+:class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+"""
+
+def __init__(self, **kwargs):
+kwargs.pop("client_type", None)
+super().__init__(client_type="ecr", **kwargs)
+
+def get_temporary_credentials(self, registry_ids: list[str] | str | None = 
None) -> list[EcrCredentials]:
+"""Get temporary credentials for Amazon ECR.
+
+Return list of 
:class:`~airflow.providers.amazon.aws.hooks.ecr.EcrCredentials`,
+obtained credentials valid for 12 hours.
+
+:param registry_ids: Either AWS Account ID or list of AWS Account IDs 
that are associated
+with the registries for which obtain credentials. If you do not 
specify a registry,
+the default registry is assumed.
+
+.. seealso::
+- `boto3 ECR client get_authorization_token method 
`_.
+"""
+registry_ids = registry_ids or None

Review Comment:
   @o-nikolas @ferruzzi do we have any concern about this part? Or we could 
merge this changes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] tatiana commented on a diff in pull request #28262: Hook for managing directories and files in Azure Data Lake Storage Gen2

2022-12-16 Thread GitBox


tatiana commented on code in PR #28262:
URL: https://github.com/apache/airflow/pull/28262#discussion_r1050906637


##
airflow/providers/microsoft/azure/hooks/adls_v2.py:
##
@@ -0,0 +1,307 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from typing import Any
+
+from azure.core.exceptions import ResourceExistsError, ResourceNotFoundError
+from azure.identity import ClientSecretCredential
+from azure.storage.filedatalake import (
+DataLakeDirectoryClient,
+DataLakeFileClient,
+DataLakeServiceClient,
+DirectoryProperties,
+FileSystemClient,
+FileSystemProperties,
+)
+from hooks.base import BaseHook
+
+
+class AdlsClientHook(BaseHook):

Review Comment:
   Would it make sense for this to be defined inside 
`airflow/providers/microsoft/azure/hooks/azure_data_lake`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] luanmorenomaciel commented on a diff in pull request #28262: Hook for managing directories and files in Azure Data Lake Storage Gen2

2022-12-16 Thread GitBox


luanmorenomaciel commented on code in PR #28262:
URL: https://github.com/apache/airflow/pull/28262#discussion_r1050905172


##
airflow/providers/microsoft/azure/hooks/adls.py:
##
@@ -0,0 +1,264 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from typing import Any
+
+from azure.core.exceptions import ResourceExistsError, ResourceNotFoundError
+from azure.identity import ClientSecretCredential
+from azure.storage.filedatalake import (
+DataLakeDirectoryClient,
+DataLakeFileClient,
+DataLakeServiceClient,
+DirectoryProperties,
+FileSystemClient,
+FileSystemProperties,
+)
+
+from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
+
+
+class AzureDataLakeStorageClient(WasbHook):

Review Comment:
   @tatiana and @mateusoliveiraowshq as per our conversation, implementing the 
`AzureBaseHook` would be challenging since Azure unfortunately does not share 
the same connection client principles (BlobServiceClient & 
DataLakeServiceClient) between the WASB and ABFS protocol.
   
   [Azure Blob Storage - WASB ](https://pypi.org/project/azure-storage-blob/)
   [Azure Data Lake Storage Gen2 - 
ABFS](https://pypi.org/project/azure-storage-file-datalake/)
   
   What we could do instead of is to have the following hook structure:
   
   
[airflow.providers.microsoft.azure.hooks.wasb](https://airflow.apache.org/docs/apache-airflow-providers-microsoft-azure/stable/_api/airflow/providers/microsoft/azure/hooks/wasb/index.html)
 (Already Exists)
   
   
[airflow.providers.microsoft.azure.hooks.azure_data_lake](https://airflow.apache.org/docs/apache-airflow-providers-microsoft-azure/1.0.0/_api/airflow/providers/microsoft/azure/hooks/azure_data_lake/index.html#module-airflow.providers.microsoft.azure.hooks.azure_data_lake)
 (Suggested)
   
   Meaning that as per @bharanidharan14 @eladkal conversation we would 
recommend to inherid the hook from the 
**airflow.providers.microsoft.azure.hooks.azure_data_lake** instead of 
**airflow.providers.microsoft.azure.hooks.wasb**, that would make more sense 
since WASB protocol has been marked as legacy hence the, implementation process 
are different from each other.
   
   
   
   
![wasb-abfs](https://user-images.githubusercontent.com/20648427/208137163-382b616f-d14b-413c-afa6-0c7506db95e6.png)
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] bbovenzi opened a new pull request, #28410: Separate callModal from dag.js

2022-12-16 Thread GitBox


bbovenzi opened a new pull request, #28410:
URL: https://github.com/apache/airflow/pull/28410

   Move `callModal()` to its own file and make it easier to follow how all the 
variables are used.
   This prevents us from importing `dag.js` into pages that already use 
`dag.js` and prevent multiple listeners being mounted for the same thing. This 
caused multiple paused events to fire for every click.
   
   Fixes: https://github.com/apache/airflow/issues/20373
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)**
 for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a 
newsfragment file, named `{pr_number}.significant.rst` or 
`{issue_number}.significant.rst`, in 
[newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] tatiana commented on a diff in pull request #28262: Hook for managing directories and files in Azure Data Lake Storage Gen2

2022-12-16 Thread GitBox


tatiana commented on code in PR #28262:
URL: https://github.com/apache/airflow/pull/28262#discussion_r1050892506


##
airflow/providers/microsoft/azure/hooks/adls.py:
##
@@ -0,0 +1,221 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from typing import Any
+
+from azure.core.exceptions import ResourceExistsError, ResourceNotFoundError
+from azure.identity import ClientSecretCredential
+from azure.storage.filedatalake import (
+DataLakeDirectoryClient,
+DataLakeFileClient,
+DataLakeServiceClient,
+FileSystemClient,
+)
+
+from airflow.hooks.base import BaseHook
+
+
+class AzureDataLakeStorageV2(BaseHook):
+
+conn_name_attr = "adls_v2_conn_id"
+default_conn_name = "adls_v2_default"
+conn_type = "adls_v2"
+hook_name = "Azure Date Lake Storage"
+
+@staticmethod
+def get_connection_form_widgets() -> dict[str, Any]:
+"""Returns connection widgets to add to connection form"""
+from flask_appbuilder.fieldwidgets import BS3PasswordFieldWidget, 
BS3TextFieldWidget
+from flask_babel import lazy_gettext
+from wtforms import PasswordField, StringField
+
+return {
+"extra__adls_v2__connection_string": PasswordField(
+lazy_gettext("Blob Storage Connection String (optional)"), 
widget=BS3PasswordFieldWidget()
+),
+"extra__adls_v2__tenant_id": StringField(
+lazy_gettext("Tenant Id (Active Directory Auth)"), 
widget=BS3TextFieldWidget()
+),
+}
+
+@staticmethod
+def get_ui_field_behaviour() -> dict[str, Any]:
+"""Returns custom field behaviour"""
+return {
+"hidden_fields": ["schema", "port"],
+"relabeling": {
+"login": "Blob Storage Login (optional)",
+"password": "Blob Storage Key (optional)",
+"host": "Account Name (Active Directory Auth)",
+},
+"placeholders": {
+"login": "account name",
+"password": "secret",
+"host": "account url",
+"extra__adls_v2__connection_string": "connection string auth",
+"extra__adls_v2__tenant_id": "tenant",
+},
+}
+
+def __init__(self, adls_v2_conn_id: str = default_conn_name, public_read: 
bool = False) -> None:
+super().__init__()
+self.conn_id = adls_v2_conn_id
+self.public_read = public_read
+self.service_client = self.get_conn()
+
+def get_conn(self) -> DataLakeServiceClient:
+"""Return the DataLakeServiceClient object."""
+conn = self.get_connection(self.conn_id)
+extra = conn.extra_dejson or {}
+
+connection_string = extra.pop(
+"connection_string", 
extra.pop("extra__adls_v2__connection_string", None)
+)
+if connection_string:
+# connection_string auth takes priority
+return 
DataLakeServiceClient.from_connection_string(connection_string, **extra)
+
+tenant = extra.pop("tenant_id", extra.pop("extra__adls_v2__tenant_id", 
None))
+if tenant:
+# use Active Directory auth
+app_id = conn.login
+app_secret = conn.password
+token_credential = ClientSecretCredential(tenant, app_id, 
app_secret)
+return DataLakeServiceClient(
+account_url=f"https://{conn.login}.dfs.core.windows.net;, 
credential=token_credential, **extra
+)
+credential = conn.password

Review Comment:
   That's a shame, @bharanidharan14 - I didn't realise things were so 
disconnected in Azure



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] sfc-gh-madkins commented on pull request #24652: Add ``@task.snowpark`` decorator

2022-12-16 Thread GitBox


sfc-gh-madkins commented on PR #24652:
URL: https://github.com/apache/airflow/pull/24652#issuecomment-1355093571

   > @sfc-gh-madkins any thoughts from eng on why the older version dep for 
Apache Beam and if/when they would bump that up?
   
   I dont see any dependencies on apache beam in the snowpark repo directly, 
only pyarrow


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] tatiana commented on a diff in pull request #28262: Hook for managing directories and files in Azure Data Lake Storage Gen2

2022-12-16 Thread GitBox


tatiana commented on code in PR #28262:
URL: https://github.com/apache/airflow/pull/28262#discussion_r1050879716


##
airflow/providers/microsoft/azure/hooks/adls.py:
##
@@ -0,0 +1,264 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from typing import Any
+
+from azure.core.exceptions import ResourceExistsError, ResourceNotFoundError
+from azure.identity import ClientSecretCredential
+from azure.storage.filedatalake import (
+DataLakeDirectoryClient,
+DataLakeFileClient,
+DataLakeServiceClient,
+DirectoryProperties,
+FileSystemClient,
+FileSystemProperties,
+)
+
+from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
+
+
+class AzureDataLakeStorageClient(WasbHook):

Review Comment:
   @bharanidharan14 @eladkal  did we consider having an `AzureBaseHook` similar 
to `GoogleBaseHook`
   
https://github.com/apache/airflow/blob/main/airflow/providers/google/common/hooks/base_google.py
   This way, both `AzureDataLakeStorageHook` and `WasbHook` could inherit from 
it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] MrGeorgeOwl opened a new pull request, #28406: Add defer mode to GKECreateClusterOperator and GKEDeleteClusterOperator

2022-12-16 Thread GitBox


MrGeorgeOwl opened a new pull request, #28406:
URL: https://github.com/apache/airflow/pull/28406

   
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)**
 for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a 
newsfragment file, named `{pr_number}.significant.rst` or 
`{issue_number}.significant.rst`, in 
[newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] swapz-z commented on a diff in pull request #28282: Mark stepIds for cancel in EmrAddStepsOperator

2022-12-16 Thread GitBox


swapz-z commented on code in PR #28282:
URL: https://github.com/apache/airflow/pull/28282#discussion_r1050772669


##
tests/providers/amazon/aws/hooks/test_emr.py:
##
@@ -190,3 +190,100 @@ def test_get_cluster_id_by_name(self):
 no_match = hook.get_cluster_id_by_name("foo", ["RUNNING", "WAITING", 
"BOOTSTRAPPING"])
 
 assert no_match is None
+
+@mock_emr
+def test_send_cancel_steps_first_invocation(self):
+"""
+Test that we can resolve cluster id by cluster name.
+"""
+hook = EmrHook(aws_conn_id="aws_default", emr_conn_id="emr_default")
+
+job_flow = hook.create_job_flow(
+{"Name": "test_cluster", "Instances": 
{"KeepJobFlowAliveWhenNoSteps": True}}
+)
+
+job_flow_id = job_flow["JobFlowId"]
+
+step = [
+{
+"ActionOnFailure": "test_step",
+"HadoopJarStep": {
+"Args": ["test args"],
+"Jar": "test.jar",
+},
+"Name": "step_1",
+}
+]
+
+did_not_execute_response = hook.send_cancel_steps(
+steps_states=["PENDING", "RUNNING"],
+emr_cluster_id=job_flow_id,
+cancellation_option="SEND_INTERRUPT",
+steps=step,
+)
+
+assert did_not_execute_response is None
+
+@mock_emr
+@pytest.mark.parametrize("num_steps", [1, 2, 3, 4])
+def test_send_cancel_steps_on_pre_existing_step_name(self, num_steps):
+"""
+Test that we can resolve cluster id by cluster name.
+"""
+hook = EmrHook(aws_conn_id="aws_default", emr_conn_id="emr_default")
+
+job_flow = hook.create_job_flow(
+{"Name": "test_cluster", "Instances": 
{"KeepJobFlowAliveWhenNoSteps": True}}
+)
+
+job_flow_id = job_flow["JobFlowId"]
+
+steps = [
+{
+"ActionOnFailure": "test_step",
+"HadoopJarStep": {
+"Args": ["test args"],
+"Jar": "test.jar",
+},
+"Name": f"step_{i}",
+}
+for i in range(num_steps)
+]
+
+retry_step = [
+{
+"ActionOnFailure": "test_step",
+"HadoopJarStep": {
+"Args": ["test args"],
+"Jar": "test.jar",
+},
+"Name": "retry_step_1",
+}
+]
+
+triggered = hook.add_job_flow_steps(job_flow_id=job_flow_id, 
steps=steps)
+
+triggered_steps = 
hook._get_list_of_steps_already_triggered(job_flow_id, ["PENDING", "RUNNING"])
+assert len(triggered_steps) == num_steps == len(triggered)
+
+cancel_steps = hook._cancel_list_of_steps_already_triggered(
+steps + retry_step, job_flow_id, ["PENDING", "RUNNING"]
+)
+
+assert len(cancel_steps) == num_steps
+
+with pytest.raises(NotImplementedError):

Review Comment:
   Actually, moto library which mocks aws api's, currently does not have an 
implementation of `cancel_steps` needed for verifying. Check 
[here](https://github.com/spulec/moto/blob/860d8bf4b7d5223e2ac32a328122f3b48b86c103/moto/emr/responses.py#L102)
   However, in 
[this](https://github.com/apache/airflow/pull/28282/files#diff-12403a99d050969934d307ac584b2a414382e29c3a49d535be55bdc59e411213R261)
 test case I have basically checked it with methods in hook if they are 
generating expected values before getting sent to the boto3 cancel_steps method.
   
   I am not much aware about moto, but curious if there is a way we can achieve 
it in a different way, let me know ?
   @Taragolis @o-nikolas 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] swapz-z commented on pull request #28282: Mark stepIds for cancel in EmrAddStepsOperator

2022-12-16 Thread GitBox


swapz-z commented on PR #28282:
URL: https://github.com/apache/airflow/pull/28282#issuecomment-1354966147

   Hello @Taragolis I accidentally removed @eladkal  from reviewers list.
   Now I don't have an option to add a reviewer. Could you do the needful and 
add him back :) 
   
   Also, let me know if I need to change anything beyond these files like 
docs/etc. Thanks in advance


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] swapz-z commented on a diff in pull request #28282: Mark stepIds for cancel in EmrAddStepsOperator

2022-12-16 Thread GitBox


swapz-z commented on code in PR #28282:
URL: https://github.com/apache/airflow/pull/28282#discussion_r1050772669


##
tests/providers/amazon/aws/hooks/test_emr.py:
##
@@ -190,3 +190,100 @@ def test_get_cluster_id_by_name(self):
 no_match = hook.get_cluster_id_by_name("foo", ["RUNNING", "WAITING", 
"BOOTSTRAPPING"])
 
 assert no_match is None
+
+@mock_emr
+def test_send_cancel_steps_first_invocation(self):
+"""
+Test that we can resolve cluster id by cluster name.
+"""
+hook = EmrHook(aws_conn_id="aws_default", emr_conn_id="emr_default")
+
+job_flow = hook.create_job_flow(
+{"Name": "test_cluster", "Instances": 
{"KeepJobFlowAliveWhenNoSteps": True}}
+)
+
+job_flow_id = job_flow["JobFlowId"]
+
+step = [
+{
+"ActionOnFailure": "test_step",
+"HadoopJarStep": {
+"Args": ["test args"],
+"Jar": "test.jar",
+},
+"Name": "step_1",
+}
+]
+
+did_not_execute_response = hook.send_cancel_steps(
+steps_states=["PENDING", "RUNNING"],
+emr_cluster_id=job_flow_id,
+cancellation_option="SEND_INTERRUPT",
+steps=step,
+)
+
+assert did_not_execute_response is None
+
+@mock_emr
+@pytest.mark.parametrize("num_steps", [1, 2, 3, 4])
+def test_send_cancel_steps_on_pre_existing_step_name(self, num_steps):
+"""
+Test that we can resolve cluster id by cluster name.
+"""
+hook = EmrHook(aws_conn_id="aws_default", emr_conn_id="emr_default")
+
+job_flow = hook.create_job_flow(
+{"Name": "test_cluster", "Instances": 
{"KeepJobFlowAliveWhenNoSteps": True}}
+)
+
+job_flow_id = job_flow["JobFlowId"]
+
+steps = [
+{
+"ActionOnFailure": "test_step",
+"HadoopJarStep": {
+"Args": ["test args"],
+"Jar": "test.jar",
+},
+"Name": f"step_{i}",
+}
+for i in range(num_steps)
+]
+
+retry_step = [
+{
+"ActionOnFailure": "test_step",
+"HadoopJarStep": {
+"Args": ["test args"],
+"Jar": "test.jar",
+},
+"Name": "retry_step_1",
+}
+]
+
+triggered = hook.add_job_flow_steps(job_flow_id=job_flow_id, 
steps=steps)
+
+triggered_steps = 
hook._get_list_of_steps_already_triggered(job_flow_id, ["PENDING", "RUNNING"])
+assert len(triggered_steps) == num_steps == len(triggered)
+
+cancel_steps = hook._cancel_list_of_steps_already_triggered(
+steps + retry_step, job_flow_id, ["PENDING", "RUNNING"]
+)
+
+assert len(cancel_steps) == num_steps
+
+with pytest.raises(NotImplementedError):

Review Comment:
   Actually, moto library which mocks aws api's, currently does not have an 
implementation of `cancel_steps` needed for verifying. 
   However, in 
[this](https://github.com/apache/airflow/pull/28282/files#diff-12403a99d050969934d307ac584b2a414382e29c3a49d535be55bdc59e411213R261)
 test case I have basically checked it with methods in hook if they are 
generating expected values before getting sent to the boto3 cancel_steps method.
   
   I am not much aware about moto, but curious if there is a way we can achieve 
it in a different way, let me know ?
   @Taragolis @o-nikolas 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] swapz-z commented on a diff in pull request #28282: Mark stepIds for cancel in EmrAddStepsOperator

2022-12-16 Thread GitBox


swapz-z commented on code in PR #28282:
URL: https://github.com/apache/airflow/pull/28282#discussion_r1050772669


##
tests/providers/amazon/aws/hooks/test_emr.py:
##
@@ -190,3 +190,100 @@ def test_get_cluster_id_by_name(self):
 no_match = hook.get_cluster_id_by_name("foo", ["RUNNING", "WAITING", 
"BOOTSTRAPPING"])
 
 assert no_match is None
+
+@mock_emr
+def test_send_cancel_steps_first_invocation(self):
+"""
+Test that we can resolve cluster id by cluster name.
+"""
+hook = EmrHook(aws_conn_id="aws_default", emr_conn_id="emr_default")
+
+job_flow = hook.create_job_flow(
+{"Name": "test_cluster", "Instances": 
{"KeepJobFlowAliveWhenNoSteps": True}}
+)
+
+job_flow_id = job_flow["JobFlowId"]
+
+step = [
+{
+"ActionOnFailure": "test_step",
+"HadoopJarStep": {
+"Args": ["test args"],
+"Jar": "test.jar",
+},
+"Name": "step_1",
+}
+]
+
+did_not_execute_response = hook.send_cancel_steps(
+steps_states=["PENDING", "RUNNING"],
+emr_cluster_id=job_flow_id,
+cancellation_option="SEND_INTERRUPT",
+steps=step,
+)
+
+assert did_not_execute_response is None
+
+@mock_emr
+@pytest.mark.parametrize("num_steps", [1, 2, 3, 4])
+def test_send_cancel_steps_on_pre_existing_step_name(self, num_steps):
+"""
+Test that we can resolve cluster id by cluster name.
+"""
+hook = EmrHook(aws_conn_id="aws_default", emr_conn_id="emr_default")
+
+job_flow = hook.create_job_flow(
+{"Name": "test_cluster", "Instances": 
{"KeepJobFlowAliveWhenNoSteps": True}}
+)
+
+job_flow_id = job_flow["JobFlowId"]
+
+steps = [
+{
+"ActionOnFailure": "test_step",
+"HadoopJarStep": {
+"Args": ["test args"],
+"Jar": "test.jar",
+},
+"Name": f"step_{i}",
+}
+for i in range(num_steps)
+]
+
+retry_step = [
+{
+"ActionOnFailure": "test_step",
+"HadoopJarStep": {
+"Args": ["test args"],
+"Jar": "test.jar",
+},
+"Name": "retry_step_1",
+}
+]
+
+triggered = hook.add_job_flow_steps(job_flow_id=job_flow_id, 
steps=steps)
+
+triggered_steps = 
hook._get_list_of_steps_already_triggered(job_flow_id, ["PENDING", "RUNNING"])
+assert len(triggered_steps) == num_steps == len(triggered)
+
+cancel_steps = hook._cancel_list_of_steps_already_triggered(
+steps + retry_step, job_flow_id, ["PENDING", "RUNNING"]
+)
+
+assert len(cancel_steps) == num_steps
+
+with pytest.raises(NotImplementedError):

Review Comment:
   Actually, moto library which mocks aws api's, currently does not have an 
implementation of `cancel_steps` needed for verifying. 
   However, in 
[this](https://github.com/apache/airflow/pull/28282/files#diff-12403a99d050969934d307ac584b2a414382e29c3a49d535be55bdc59e411213R262)
 test case I have basically checked it with methods in hook if they are 
generating expected values before getting sent to the boto3 cancel_steps method.
   
   I am not much aware about moto, but curious if there is a way we can achieve 
it in a different way, let me know ?
   @Taragolis @o-nikolas 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] swapz-z commented on a diff in pull request #28282: Mark stepIds for cancel in EmrAddStepsOperator

2022-12-16 Thread GitBox


swapz-z commented on code in PR #28282:
URL: https://github.com/apache/airflow/pull/28282#discussion_r1050767976


##
airflow/providers/amazon/aws/hooks/emr.py:
##
@@ -202,6 +202,74 @@ def get_ui_field_behaviour() -> dict[str, Any]:
 },
 }
 
+def is_cluster_available(self, emr_cluster_id, cluster_states):
+response = self.get_conn().list_clusters(ClusterStates=cluster_states)
+matching_clusters = list(
+filter(lambda cluster: cluster["Id"] == emr_cluster_id, 
response["Clusters"])
+)
+
+if len(matching_clusters) == 1:
+emr_cluster_name = matching_clusters[0]["Name"]
+self.log.info("Found cluster name = %s id = %s", emr_cluster_name, 
emr_cluster_id)
+return True
+elif len(matching_clusters) > 1:
+raise AirflowException(f"More than one cluster found for Id 
{emr_cluster_id}")
+else:
+self.log.info("No cluster found for Id %s", emr_cluster_id)
+return False
+
+def _get_list_of_steps_already_triggered(
+self, cluster_id: str, step_states: list[str]
+) -> list[tuple[str, str]]:
+
+response = self.get_conn().list_steps(
+ClusterId=cluster_id,
+StepStates=step_states,
+)
+steps_name_id = [(step["Name"], step["Id"]) for step in 
response["Steps"]]
+print(steps_name_id)
+return steps_name_id
+
+def _cancel_list_of_steps_already_triggered(
+self, steps: list[dict], cluster_id: str, step_states: list[str]
+):
+names_list = self._get_list_of_steps_already_triggered(cluster_id, 
step_states)
+
+self.log.info(steps)
+
+steps_name_list = [step["Name"] for step in steps if "Name" in step]

Review Comment:
   Because of this, it got me looking and found that boto3 actually enforces it 
as a mandatory paramater while creation of steps. So we can expect them to also 
have a Name. 
   Backed it up with a test case for this . Find it 
[here](https://github.com/apache/airflow/pull/28282/files#diff-12403a99d050969934d307ac584b2a414382e29c3a49d535be55bdc59e411213R232)
 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] swapz-z commented on a diff in pull request #28282: Mark stepIds for cancel in EmrAddStepsOperator

2022-12-16 Thread GitBox


swapz-z commented on code in PR #28282:
URL: https://github.com/apache/airflow/pull/28282#discussion_r1050773595


##
airflow/providers/amazon/aws/operators/emr.py:
##
@@ -71,6 +71,9 @@ def __init__(
 aws_conn_id: str = "aws_default",
 steps: list[dict] | str | None = None,
 wait_for_completion: bool = False,
+cancel_existing_steps: bool = True,

Review Comment:
   Handled, thanks 
   Now it makes all the added parameters considered to be optional and wouldnot 
break existing pipelines 



##
airflow/providers/amazon/aws/operators/emr.py:
##
@@ -71,6 +71,9 @@ def __init__(
 aws_conn_id: str = "aws_default",
 steps: list[dict] | str | None = None,
 wait_for_completion: bool = False,
+cancel_existing_steps: bool = True,
+steps_states: list[str],

Review Comment:
   Handled 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] swapz-z commented on a diff in pull request #28282: Mark stepIds for cancel in EmrAddStepsOperator

2022-12-16 Thread GitBox


swapz-z commented on code in PR #28282:
URL: https://github.com/apache/airflow/pull/28282#discussion_r1050772669


##
tests/providers/amazon/aws/hooks/test_emr.py:
##
@@ -190,3 +190,100 @@ def test_get_cluster_id_by_name(self):
 no_match = hook.get_cluster_id_by_name("foo", ["RUNNING", "WAITING", 
"BOOTSTRAPPING"])
 
 assert no_match is None
+
+@mock_emr
+def test_send_cancel_steps_first_invocation(self):
+"""
+Test that we can resolve cluster id by cluster name.
+"""
+hook = EmrHook(aws_conn_id="aws_default", emr_conn_id="emr_default")
+
+job_flow = hook.create_job_flow(
+{"Name": "test_cluster", "Instances": 
{"KeepJobFlowAliveWhenNoSteps": True}}
+)
+
+job_flow_id = job_flow["JobFlowId"]
+
+step = [
+{
+"ActionOnFailure": "test_step",
+"HadoopJarStep": {
+"Args": ["test args"],
+"Jar": "test.jar",
+},
+"Name": "step_1",
+}
+]
+
+did_not_execute_response = hook.send_cancel_steps(
+steps_states=["PENDING", "RUNNING"],
+emr_cluster_id=job_flow_id,
+cancellation_option="SEND_INTERRUPT",
+steps=step,
+)
+
+assert did_not_execute_response is None
+
+@mock_emr
+@pytest.mark.parametrize("num_steps", [1, 2, 3, 4])
+def test_send_cancel_steps_on_pre_existing_step_name(self, num_steps):
+"""
+Test that we can resolve cluster id by cluster name.
+"""
+hook = EmrHook(aws_conn_id="aws_default", emr_conn_id="emr_default")
+
+job_flow = hook.create_job_flow(
+{"Name": "test_cluster", "Instances": 
{"KeepJobFlowAliveWhenNoSteps": True}}
+)
+
+job_flow_id = job_flow["JobFlowId"]
+
+steps = [
+{
+"ActionOnFailure": "test_step",
+"HadoopJarStep": {
+"Args": ["test args"],
+"Jar": "test.jar",
+},
+"Name": f"step_{i}",
+}
+for i in range(num_steps)
+]
+
+retry_step = [
+{
+"ActionOnFailure": "test_step",
+"HadoopJarStep": {
+"Args": ["test args"],
+"Jar": "test.jar",
+},
+"Name": "retry_step_1",
+}
+]
+
+triggered = hook.add_job_flow_steps(job_flow_id=job_flow_id, 
steps=steps)
+
+triggered_steps = 
hook._get_list_of_steps_already_triggered(job_flow_id, ["PENDING", "RUNNING"])
+assert len(triggered_steps) == num_steps == len(triggered)
+
+cancel_steps = hook._cancel_list_of_steps_already_triggered(
+steps + retry_step, job_flow_id, ["PENDING", "RUNNING"]
+)
+
+assert len(cancel_steps) == num_steps
+
+with pytest.raises(NotImplementedError):

Review Comment:
   Actually, moto library which mocks aws api's, currently does not have an 
implementation of `cancel_steps` needed for verifying. 
   However, I have basically checked it with methods in hook if they are 
generating expected values before getting sent to the boto3 cancel_steps method.
   
   I am not much aware about moto, but curious if there is a way we can achieve 
it in a different way, let me know ?
   @Taragolis @o-nikolas 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a diff in pull request #28397: A manual run can't look like a scheduled one

2022-12-16 Thread GitBox


ephraimbuddy commented on code in PR #28397:
URL: https://github.com/apache/airflow/pull/28397#discussion_r1050771058


##
airflow/models/dag.py:
##
@@ -2586,14 +2586,23 @@ def create_dagrun(
 else:
 data_interval = 
self.infer_automated_data_interval(logical_date)
 
+if run_type is not None and not isinstance(run_type, DagRunType):
+raise ValueError(f"`run_type` should be a DagRunType, not 
{type(run_type)}")
+
 if run_id:  # Infer run_type from run_id if needed.
 if not isinstance(run_id, str):
 raise ValueError(f"`run_id` should be a str, not 
{type(run_id)}")
-if not run_type:
-run_type = DagRunType.from_run_id(run_id)
+inferred_run_type = DagRunType.from_run_id(run_id)
+if run_type is None:
+# No explicit type given, use the inferred type.
+run_type = inferred_run_type
+elif run_type == DagRunType.MANUAL and inferred_run_type != 
DagRunType.MANUAL:

Review Comment:
   ```suggestion
   elif run_type != inferred_run_type:
   ```
   Should we do it this way?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] swapz-z commented on a diff in pull request #28282: Mark stepIds for cancel in EmrAddStepsOperator

2022-12-16 Thread GitBox


swapz-z commented on code in PR #28282:
URL: https://github.com/apache/airflow/pull/28282#discussion_r1050768849


##
tests/providers/amazon/aws/hooks/test_emr.py:
##
@@ -190,3 +190,100 @@ def test_get_cluster_id_by_name(self):
 no_match = hook.get_cluster_id_by_name("foo", ["RUNNING", "WAITING", 
"BOOTSTRAPPING"])
 
 assert no_match is None
+
+@mock_emr
+def test_send_cancel_steps_first_invocation(self):
+"""
+Test that we can resolve cluster id by cluster name.

Review Comment:
   Yup, updated the testcase names and docstring for each case according to the 
scenario it tests 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] swapz-z commented on a diff in pull request #28282: Mark stepIds for cancel in EmrAddStepsOperator

2022-12-16 Thread GitBox


swapz-z commented on code in PR #28282:
URL: https://github.com/apache/airflow/pull/28282#discussion_r1050768165


##
airflow/providers/amazon/aws/hooks/emr.py:
##
@@ -202,6 +202,74 @@ def get_ui_field_behaviour() -> dict[str, Any]:
 },
 }
 
+def is_cluster_available(self, emr_cluster_id, cluster_states):
+response = self.get_conn().list_clusters(ClusterStates=cluster_states)
+matching_clusters = list(
+filter(lambda cluster: cluster["Id"] == emr_cluster_id, 
response["Clusters"])
+)
+
+if len(matching_clusters) == 1:
+emr_cluster_name = matching_clusters[0]["Name"]
+self.log.info("Found cluster name = %s id = %s", emr_cluster_name, 
emr_cluster_id)
+return True
+elif len(matching_clusters) > 1:
+raise AirflowException(f"More than one cluster found for Id 
{emr_cluster_id}")
+else:
+self.log.info("No cluster found for Id %s", emr_cluster_id)
+return False
+
+def _get_list_of_steps_already_triggered(
+self, cluster_id: str, step_states: list[str]
+) -> list[tuple[str, str]]:
+
+response = self.get_conn().list_steps(
+ClusterId=cluster_id,
+StepStates=step_states,
+)
+steps_name_id = [(step["Name"], step["Id"]) for step in 
response["Steps"]]
+print(steps_name_id)

Review Comment:
   Handled this



##
tests/providers/amazon/aws/hooks/test_emr.py:
##
@@ -190,3 +190,100 @@ def test_get_cluster_id_by_name(self):
 no_match = hook.get_cluster_id_by_name("foo", ["RUNNING", "WAITING", 
"BOOTSTRAPPING"])
 
 assert no_match is None
+
+@mock_emr
+def test_send_cancel_steps_first_invocation(self):
+"""
+Test that we can resolve cluster id by cluster name.
+"""
+hook = EmrHook(aws_conn_id="aws_default", emr_conn_id="emr_default")
+
+job_flow = hook.create_job_flow(
+{"Name": "test_cluster", "Instances": 
{"KeepJobFlowAliveWhenNoSteps": True}}
+)
+
+job_flow_id = job_flow["JobFlowId"]
+
+step = [
+{
+"ActionOnFailure": "test_step",
+"HadoopJarStep": {
+"Args": ["test args"],
+"Jar": "test.jar",
+},
+"Name": "step_1",
+}
+]
+
+did_not_execute_response = hook.send_cancel_steps(
+steps_states=["PENDING", "RUNNING"],
+emr_cluster_id=job_flow_id,
+cancellation_option="SEND_INTERRUPT",
+steps=step,
+)
+
+assert did_not_execute_response is None
+
+@mock_emr
+@pytest.mark.parametrize("num_steps", [1, 2, 3, 4])
+def test_send_cancel_steps_on_pre_existing_step_name(self, num_steps):
+"""
+Test that we can resolve cluster id by cluster name.
+"""
+hook = EmrHook(aws_conn_id="aws_default", emr_conn_id="emr_default")
+
+job_flow = hook.create_job_flow(
+{"Name": "test_cluster", "Instances": 
{"KeepJobFlowAliveWhenNoSteps": True}}
+)
+
+job_flow_id = job_flow["JobFlowId"]
+
+steps = [
+{
+"ActionOnFailure": "test_step",
+"HadoopJarStep": {
+"Args": ["test args"],
+"Jar": "test.jar",
+},
+"Name": f"step_{i}",
+}
+for i in range(num_steps)
+]
+
+retry_step = [
+{
+"ActionOnFailure": "test_step",
+"HadoopJarStep": {
+"Args": ["test args"],
+"Jar": "test.jar",
+},
+"Name": "retry_step_1",
+}
+]
+
+triggered = hook.add_job_flow_steps(job_flow_id=job_flow_id, 
steps=steps)
+
+triggered_steps = 
hook._get_list_of_steps_already_triggered(job_flow_id, ["PENDING", "RUNNING"])
+assert len(triggered_steps) == num_steps == len(triggered)
+
+cancel_steps = hook._cancel_list_of_steps_already_triggered(
+steps + retry_step, job_flow_id, ["PENDING", "RUNNING"]
+)
+
+assert len(cancel_steps) == num_steps
+
+with pytest.raises(NotImplementedError):
+response = hook.send_cancel_steps(
+steps_states=["PENDING", "RUNNING"],
+emr_cluster_id=job_flow_id,
+cancellation_option="SEND_INTERRUPT",
+steps=steps + retry_step,
+)
+
+assert response
+
+# assert set([status['Status'] for status in 
response['CancelStepsInfoList'][0]]) \
+#== {'SUBMITTED'} or None
+#
+# assert [step['StepId'] for step in 
response['CancelStepsInfoList'][0] if
+# step['Status'] in ['SUBMITTED']] == [step_id for step_name, 
step_id in 

[GitHub] [airflow] swapz-z commented on a diff in pull request #28282: Mark stepIds for cancel in EmrAddStepsOperator

2022-12-16 Thread GitBox


swapz-z commented on code in PR #28282:
URL: https://github.com/apache/airflow/pull/28282#discussion_r1050767976


##
airflow/providers/amazon/aws/hooks/emr.py:
##
@@ -202,6 +202,74 @@ def get_ui_field_behaviour() -> dict[str, Any]:
 },
 }
 
+def is_cluster_available(self, emr_cluster_id, cluster_states):
+response = self.get_conn().list_clusters(ClusterStates=cluster_states)
+matching_clusters = list(
+filter(lambda cluster: cluster["Id"] == emr_cluster_id, 
response["Clusters"])
+)
+
+if len(matching_clusters) == 1:
+emr_cluster_name = matching_clusters[0]["Name"]
+self.log.info("Found cluster name = %s id = %s", emr_cluster_name, 
emr_cluster_id)
+return True
+elif len(matching_clusters) > 1:
+raise AirflowException(f"More than one cluster found for Id 
{emr_cluster_id}")
+else:
+self.log.info("No cluster found for Id %s", emr_cluster_id)
+return False
+
+def _get_list_of_steps_already_triggered(
+self, cluster_id: str, step_states: list[str]
+) -> list[tuple[str, str]]:
+
+response = self.get_conn().list_steps(
+ClusterId=cluster_id,
+StepStates=step_states,
+)
+steps_name_id = [(step["Name"], step["Id"]) for step in 
response["Steps"]]
+print(steps_name_id)
+return steps_name_id
+
+def _cancel_list_of_steps_already_triggered(
+self, steps: list[dict], cluster_id: str, step_states: list[str]
+):
+names_list = self._get_list_of_steps_already_triggered(cluster_id, 
step_states)
+
+self.log.info(steps)
+
+steps_name_list = [step["Name"] for step in steps if "Name" in step]

Review Comment:
   Because of this, it got me looking and found that boto3 actually enforces it 
as a mandatory paramater while creation of steps. So we can expect them to also 
have a Name. 
   Backed it up with a test case for this . Find it here 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] swapz-z commented on a diff in pull request #28282: Mark stepIds for cancel in EmrAddStepsOperator

2022-12-16 Thread GitBox


swapz-z commented on code in PR #28282:
URL: https://github.com/apache/airflow/pull/28282#discussion_r1050766440


##
airflow/providers/amazon/aws/hooks/emr.py:
##
@@ -202,6 +202,74 @@ def get_ui_field_behaviour() -> dict[str, Any]:
 },
 }
 
+def is_cluster_available(self, emr_cluster_id, cluster_states):
+response = self.get_conn().list_clusters(ClusterStates=cluster_states)
+matching_clusters = list(
+filter(lambda cluster: cluster["Id"] == emr_cluster_id, 
response["Clusters"])
+)
+
+if len(matching_clusters) == 1:
+emr_cluster_name = matching_clusters[0]["Name"]
+self.log.info("Found cluster name = %s id = %s", emr_cluster_name, 
emr_cluster_id)
+return True
+elif len(matching_clusters) > 1:
+raise AirflowException(f"More than one cluster found for Id 
{emr_cluster_id}")
+else:
+self.log.info("No cluster found for Id %s", emr_cluster_id)
+return False
+
+def _get_list_of_steps_already_triggered(
+self, cluster_id: str, step_states: list[str]
+) -> list[tuple[str, str]]:
+
+response = self.get_conn().list_steps(
+ClusterId=cluster_id,
+StepStates=step_states,
+)
+steps_name_id = [(step["Name"], step["Id"]) for step in 
response["Steps"]]
+print(steps_name_id)
+return steps_name_id
+
+def _cancel_list_of_steps_already_triggered(
+self, steps: list[dict], cluster_id: str, step_states: list[str]
+):
+names_list = self._get_list_of_steps_already_triggered(cluster_id, 
step_states)
+
+self.log.info(steps)

Review Comment:
   Removed all the debug kind of log statements which were used during my 
testing.
   Added info logs wherever necessary to give better understanding about the 
flow of the operator to the user



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >