This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-3-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 1bf8938fdfcb061dd234d91b9747168f48baa670 Author: Bowrna <mailbow...@gmail.com> AuthorDate: Mon Jun 6 20:50:35 2022 +0530 docker new system test (#23167) (cherry picked from commit 06856337a51139d66b1a39544e276e477c6b5ea1) --- airflow/providers/docker/example_dags/__init__.py | 17 ---- .../docker/example_dags/example_docker.py | 51 ---------- .../example_dags/example_docker_copy_data.py | 101 ------------------- docs/apache-airflow-providers-docker/index.rst | 14 ++- docs/apache-airflow/tutorial_taskflow_api.rst | 4 +- tests/system/providers/docker/example_docker.py | 63 ++++++++++++ .../providers/docker/example_docker_copy_data.py | 110 +++++++++++++++++++++ .../providers/docker}/example_docker_swarm.py | 30 ++++-- .../example_taskflow_api_etl_docker_virtualenv.py | 39 +++++--- 9 files changed, 236 insertions(+), 193 deletions(-) diff --git a/airflow/providers/docker/example_dags/__init__.py b/airflow/providers/docker/example_dags/__init__.py deleted file mode 100644 index 217e5db960..0000000000 --- a/airflow/providers/docker/example_dags/__init__.py +++ /dev/null @@ -1,17 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. diff --git a/airflow/providers/docker/example_dags/example_docker.py b/airflow/providers/docker/example_dags/example_docker.py deleted file mode 100644 index 83f6744883..0000000000 --- a/airflow/providers/docker/example_dags/example_docker.py +++ /dev/null @@ -1,51 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from datetime import datetime, timedelta - -from airflow import DAG -from airflow.operators.bash import BashOperator -from airflow.providers.docker.operators.docker import DockerOperator - -dag = DAG( - 'docker_sample', - default_args={'retries': 1}, - schedule_interval=timedelta(minutes=10), - start_date=datetime(2021, 1, 1), - catchup=False, -) - -t1 = BashOperator(task_id='print_date', bash_command='date', dag=dag) - -t2 = BashOperator(task_id='sleep', bash_command='sleep 5', retries=3, dag=dag) - -t3 = DockerOperator( - docker_url='tcp://localhost:2375', # Set your docker URL - command='/bin/sleep 30', - image='centos:latest', - network_mode='bridge', - task_id='docker_op_tester', - dag=dag, -) - - -t4 = BashOperator(task_id='print_hello', bash_command='echo "hello world!!!"', dag=dag) - - -t1 >> t2 -t1 >> t3 -t3 >> t4 diff --git a/airflow/providers/docker/example_dags/example_docker_copy_data.py b/airflow/providers/docker/example_dags/example_docker_copy_data.py deleted file mode 100644 index 5ce78d02cd..0000000000 --- a/airflow/providers/docker/example_dags/example_docker_copy_data.py +++ /dev/null @@ -1,101 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -""" -This sample "listen to directory". move the new file and print it, -using docker-containers. -The following operators are being used: DockerOperator, -BashOperator & ShortCircuitOperator. -TODO: Review the workflow, change it accordingly to - your environment & enable the code. -""" - -from datetime import datetime, timedelta - -from docker.types import Mount - -from airflow import DAG -from airflow.operators.bash import BashOperator -from airflow.operators.python import ShortCircuitOperator -from airflow.providers.docker.operators.docker import DockerOperator - -dag = DAG( - "docker_sample_copy_data", - default_args={"retries": 1}, - schedule_interval=timedelta(minutes=10), - start_date=datetime(2021, 1, 1), - catchup=False, -) - -locate_file_cmd = """ - sleep 10 - find {{params.source_location}} -type f -printf "%f\n" | head -1 -""" - -t_view = BashOperator( - task_id="view_file", - bash_command=locate_file_cmd, - do_xcom_push=True, - params={"source_location": "/your/input_dir/path"}, - dag=dag, -) - -t_is_data_available = ShortCircuitOperator( - task_id="check_if_data_available", - python_callable=lambda task_output: not task_output == "", - op_kwargs=dict(task_output=t_view.output), - dag=dag, -) - -t_move = DockerOperator( - api_version="1.19", - docker_url="tcp://localhost:2375", # replace it with swarm/docker endpoint - image="centos:latest", - network_mode="bridge", - mounts=[ - Mount(source="/your/host/input_dir/path", target="/your/input_dir/path", type="bind"), - Mount(source="/your/host/output_dir/path", target="/your/output_dir/path", type="bind"), - ], - command=[ - "/bin/bash", - "-c", - "/bin/sleep 30; " - "/bin/mv {{ params.source_location }}/" + str(t_view.output) + " {{ params.target_location }};" - "/bin/echo '{{ params.target_location }}/" + f"{t_view.output}';", - ], - task_id="move_data", - do_xcom_push=True, - params={"source_location": "/your/input_dir/path", "target_location": "/your/output_dir/path"}, - dag=dag, -) - -t_print = DockerOperator( - api_version="1.19", - docker_url="tcp://localhost:2375", - image="centos:latest", - mounts=[Mount(source="/your/host/output_dir/path", target="/your/output_dir/path", type="bind")], - command=f"cat {t_move.output}", - task_id="print", - dag=dag, -) - -t_is_data_available.set_downstream(t_move) -t_move.set_downstream(t_print) - -# Task dependencies created via `XComArgs`: -# t_view >> t_is_data_available diff --git a/docs/apache-airflow-providers-docker/index.rst b/docs/apache-airflow-providers-docker/index.rst index 3fa5c2b9c0..fc4ae5b6e6 100644 --- a/docs/apache-airflow-providers-docker/index.rst +++ b/docs/apache-airflow-providers-docker/index.rst @@ -39,7 +39,7 @@ Content :maxdepth: 1 :caption: Resources - Example DAGs <https://github.com/apache/airflow/tree/main/airflow/providers/docker/example_dags> + Example DAGs <https://github.com/apache/airflow/tree/main/tests/system/providers/docker> PyPI Repository <https://pypi.org/project/apache-airflow-providers-docker/> Installing from sources <installing-providers-from-sources> @@ -84,3 +84,15 @@ PIP package Version required ================== ================== .. include:: ../../airflow/providers/docker/CHANGELOG.rst + +DockerOperator +-------------- +Use the +:class:`~airflow.providers.docker.operators.docker.DockerOperator` +to execute command in Docker container. + +.. exampleinclude:: /../../tests/system/providers/docker/example_docker.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_docker] + :end-before: [END howto_operator_docker] diff --git a/docs/apache-airflow/tutorial_taskflow_api.rst b/docs/apache-airflow/tutorial_taskflow_api.rst index 0904465e98..00b3e43a2a 100644 --- a/docs/apache-airflow/tutorial_taskflow_api.rst +++ b/docs/apache-airflow/tutorial_taskflow_api.rst @@ -233,7 +233,7 @@ image must have a working Python installed and take in a bash command as the ``c Below is an example of using the ``@task.docker`` decorator to run a Python task. -.. exampleinclude:: /../../airflow/providers/docker/example_dags/tutorial_taskflow_api_etl_docker_virtualenv.py +.. exampleinclude:: /../../tests/system/providers/docker/example_taskflow_api_etl_docker_virtualenv.py :language: python :dedent: 4 :start-after: [START transform_docker] @@ -257,7 +257,7 @@ environment on the same machine, you can use the ``@task.virtualenv`` decorator decorator will allow you to create a new virtualenv with custom libraries and even a different Python version to run your function. -.. exampleinclude:: /../../airflow/providers/docker/example_dags/tutorial_taskflow_api_etl_docker_virtualenv.py +.. exampleinclude:: /../../tests/system/providers/docker/example_taskflow_api_etl_docker_virtualenv.py :language: python :dedent: 4 :start-after: [START extract_virtualenv] diff --git a/tests/system/providers/docker/example_docker.py b/tests/system/providers/docker/example_docker.py new file mode 100644 index 0000000000..8a00c408b7 --- /dev/null +++ b/tests/system/providers/docker/example_docker.py @@ -0,0 +1,63 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import os +from datetime import datetime + +from airflow import models +from airflow.operators.bash import BashOperator +from airflow.providers.docker.operators.docker import DockerOperator + +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +DAG_ID = 'docker_test' + +with models.DAG( + DAG_ID, + schedule_interval="@once", + start_date=datetime(2021, 1, 1), + catchup=False, + tags=["example", "docker"], +) as dag: + t1 = BashOperator(task_id='print_date', bash_command='date', dag=dag) + t2 = BashOperator(task_id='sleep', bash_command='sleep 5', retries=3, dag=dag) + # [START howto_operator_docker] + t3 = DockerOperator( + docker_url='unix://var/run/docker.sock', # Set your docker URL + command='/bin/sleep 30', + image='centos:latest', + network_mode='bridge', + task_id='docker_op_tester', + dag=dag, + ) + # [END howto_operator_docker] + + t4 = BashOperator(task_id='print_hello', bash_command='echo "hello world!!!"', dag=dag) + # t1 >> t2 + # t1 >> t3 + # t3 >> t4 + + ( + # TEST BODY + t1 + >> [t2, t3] + >> t4 + ) + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/tests/system/providers/docker/example_docker_copy_data.py b/tests/system/providers/docker/example_docker_copy_data.py new file mode 100644 index 0000000000..d709bf14d5 --- /dev/null +++ b/tests/system/providers/docker/example_docker_copy_data.py @@ -0,0 +1,110 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +This sample "listen to directory". move the new file and print it, +using docker-containers. +The following operators are being used: DockerOperator, +BashOperator & ShortCircuitOperator. +TODO: Review the workflow, change it accordingly to + your environment & enable the code. +""" +import os +from datetime import datetime + +from docker.types import Mount + +from airflow import models +from airflow.operators.bash import BashOperator +from airflow.operators.python import ShortCircuitOperator +from airflow.providers.docker.operators.docker import DockerOperator + +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +DAG_ID = 'docker_sample_copy_data' + +with models.DAG( + DAG_ID, + schedule_interval="@once", + start_date=datetime(2021, 1, 1), + catchup=False, + tags=["example", "docker"], +) as dag: + + locate_file_cmd = """ + sleep 10 + find {{params.source_location}} -type f -printf "%f\n" | head -1 + """ + + t_view = BashOperator( + task_id="view_file", + bash_command=locate_file_cmd, + do_xcom_push=True, + params={"source_location": "/your/input_dir/path"}, + dag=dag, + ) + + t_is_data_available = ShortCircuitOperator( + task_id="check_if_data_available", + python_callable=lambda task_output: not task_output == "", + op_kwargs=dict(task_output=t_view.output), + dag=dag, + ) + + t_move = DockerOperator( + api_version="1.19", + docker_url="tcp://localhost:2375", # replace it with swarm/docker endpoint + image="centos:latest", + network_mode="bridge", + mounts=[ + Mount(source="/your/host/input_dir/path", target="/your/input_dir/path", type="bind"), + Mount(source="/your/host/output_dir/path", target="/your/output_dir/path", type="bind"), + ], + command=[ + "/bin/bash", + "-c", + "/bin/sleep 30; " + "/bin/mv {{ params.source_location }}/" + str(t_view.output) + " {{ params.target_location }};" + "/bin/echo '{{ params.target_location }}/" + f"{t_view.output}';", + ], + task_id="move_data", + do_xcom_push=True, + params={"source_location": "/your/input_dir/path", "target_location": "/your/output_dir/path"}, + dag=dag, + ) + + t_print = DockerOperator( + api_version="1.19", + docker_url="tcp://localhost:2375", + image="centos:latest", + mounts=[Mount(source="/your/host/output_dir/path", target="/your/output_dir/path", type="bind")], + command=f"cat {t_move.output}", + task_id="print", + dag=dag, + ) + + ( + # TEST BODY + t_is_data_available + >> t_move + >> t_print + ) + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/airflow/providers/docker/example_dags/example_docker_swarm.py b/tests/system/providers/docker/example_docker_swarm.py similarity index 67% rename from airflow/providers/docker/example_dags/example_docker_swarm.py rename to tests/system/providers/docker/example_docker_swarm.py index 365a4b44a9..d216f11b94 100644 --- a/airflow/providers/docker/example_dags/example_docker_swarm.py +++ b/tests/system/providers/docker/example_docker_swarm.py @@ -15,24 +15,38 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -from datetime import datetime, timedelta +import os +from datetime import datetime -from airflow import DAG +from airflow import models from airflow.providers.docker.operators.docker_swarm import DockerSwarmOperator -dag = DAG( - 'docker_swarm_sample', - schedule_interval=timedelta(minutes=10), +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +DAG_ID = 'docker_swarm_dag' + +with models.DAG( + dag_id=DAG_ID, + schedule_interval='@once', start_date=datetime(2021, 1, 1), catchup=False, -) + tags=['example', "docker"], +) as dag: -with dag as dag: t1 = DockerSwarmOperator( api_version='auto', - docker_url='tcp://localhost:2375', # Set your docker URL + docker_url='unix://var/run/docker.sock', # Set your docker URL command='/bin/sleep 10', image='centos:latest', auto_remove=True, task_id='sleep_with_swarm', ) + + ( + # TEST BODY + t1 + ) + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/airflow/providers/docker/example_dags/tutorial_taskflow_api_etl_docker_virtualenv.py b/tests/system/providers/docker/example_taskflow_api_etl_docker_virtualenv.py similarity index 77% rename from airflow/providers/docker/example_dags/tutorial_taskflow_api_etl_docker_virtualenv.py rename to tests/system/providers/docker/example_taskflow_api_etl_docker_virtualenv.py index c16588c10c..dc0f56dbb6 100644 --- a/airflow/providers/docker/example_dags/tutorial_taskflow_api_etl_docker_virtualenv.py +++ b/tests/system/providers/docker/example_taskflow_api_etl_docker_virtualenv.py @@ -19,15 +19,17 @@ # [START tutorial] # [START import_module] +import os from datetime import datetime -from airflow.decorators import dag, task +from airflow import models +from airflow.decorators import task # [END import_module] +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +DAG_ID = 'docker_taskflow' -# [START instantiate_dag] -@dag(schedule_interval=None, start_date=datetime(2021, 1, 1), catchup=False, tags=['example']) def tutorial_taskflow_api_etl_docker_virtualenv(): """ ### TaskFlow API Tutorial Documentation @@ -37,7 +39,6 @@ def tutorial_taskflow_api_etl_docker_virtualenv(): located [here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html) """ - # [END instantiate_dag] # [START extract_virtualenv] @task.virtualenv( @@ -98,14 +99,26 @@ def tutorial_taskflow_api_etl_docker_virtualenv(): # [END main_flow] -# The try/except here is because Airflow versions less than 2.2.0 doesn't support -# @task.docker decorator and we use this dag in CI test. Thus, in order not to -# break the CI test, we added this try/except here. -try: - # [START dag_invocation] - tutorial_etl_dag = tutorial_taskflow_api_etl_docker_virtualenv() - # [END dag_invocation] -except AttributeError: - pass +with models.DAG( + DAG_ID, + schedule_interval="@once", + start_date=datetime(2021, 1, 1), + catchup=False, + tags=["example", "docker"], +) as dag: + # The try/except here is because Airflow versions less than 2.2.0 doesn't support + # @task.docker decorator and we use this dag in CI test. Thus, in order not to + # break the CI test, we added this try/except here. + try: + # [START dag_invocation] + tutorial_etl_dag = tutorial_taskflow_api_etl_docker_virtualenv() + # [END dag_invocation] + except AttributeError: + pass + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) # [END tutorial]