This is an automated email from the ASF dual-hosted git repository. eladkal pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 28de326 Rename FileToWasbOperator to LocalFilesystemToWasbOperator (#18109) 28de326 is described below commit 28de326d6192bcb4871d5c2ea85857b022aaabd5 Author: eladkal <45845474+elad...@users.noreply.github.com> AuthorDate: Thu Sep 9 16:18:55 2021 +0300 Rename FileToWasbOperator to LocalFilesystemToWasbOperator (#18109) --- airflow/contrib/operators/file_to_wasb.py | 26 +++++++--- ...le_file_to_wasb.py => example_local_to_wasb.py} | 6 +-- airflow/providers/microsoft/azure/provider.yaml | 3 ++ .../microsoft/azure/transfers/file_to_wasb.py | 59 +++------------------- .../{file_to_wasb.py => local_to_wasb.py} | 2 +- dev/provider_packages/prepare_provider_packages.py | 1 + tests/deprecated_classes.py | 2 +- ...{test_file_to_wasb.py => test_local_to_wasb.py} | 12 ++--- ...wasb_system.py => test_local_to_wasb_system.py} | 8 +-- 9 files changed, 47 insertions(+), 72 deletions(-) diff --git a/airflow/contrib/operators/file_to_wasb.py b/airflow/contrib/operators/file_to_wasb.py index 2af0476..eb6275d 100644 --- a/airflow/contrib/operators/file_to_wasb.py +++ b/airflow/contrib/operators/file_to_wasb.py @@ -15,17 +15,31 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -""" -This module is deprecated. -Please use :mod:`airflow.providers.microsoft.azure.transfers.file_to_wasb`. -""" +"""This module is deprecated. Please use :mod:`airflow.providers.microsoft.azure.transfers.local_to_wasb`.""" import warnings -from airflow.providers.microsoft.azure.transfers.file_to_wasb import FileToWasbOperator # noqa +from airflow.providers.microsoft.azure.transfers.local_to_wasb import LocalFilesystemToWasbOperator warnings.warn( - "This module is deprecated. Please use `airflow.providers.microsoft.azure.transfers.file_to_wasb`.", + "This module is deprecated. Please use `airflow.providers.microsoft.azure.transfers.local_to_wasb`.", DeprecationWarning, stacklevel=2, ) + + +class FileToWasbOperator(LocalFilesystemToWasbOperator): + """ + This class is deprecated. + Please use `airflow.providers.microsoft.azure.transfers.local_to_wasb.LocalFilesystemToWasbOperator`. + """ + + def __init__(self, *args, **kwargs): + warnings.warn( + """This class is deprecated. + Please use + `airflow.providers.microsoft.azure.transfers.local_to_wasb.LocalFilesystemToWasbOperator`.""", + DeprecationWarning, + stacklevel=2, + ) + super().__init__(*args, **kwargs) diff --git a/airflow/providers/microsoft/azure/example_dags/example_file_to_wasb.py b/airflow/providers/microsoft/azure/example_dags/example_local_to_wasb.py similarity index 84% rename from airflow/providers/microsoft/azure/example_dags/example_file_to_wasb.py rename to airflow/providers/microsoft/azure/example_dags/example_local_to_wasb.py index 0ffc1e4..af5e4ed 100644 --- a/airflow/providers/microsoft/azure/example_dags/example_file_to_wasb.py +++ b/airflow/providers/microsoft/azure/example_dags/example_local_to_wasb.py @@ -19,13 +19,13 @@ import os from airflow.models import DAG from airflow.providers.microsoft.azure.operators.wasb_delete_blob import WasbDeleteBlobOperator -from airflow.providers.microsoft.azure.transfers.file_to_wasb import FileToWasbOperator +from airflow.providers.microsoft.azure.transfers.local_to_wasb import LocalFilesystemToWasbOperator from airflow.utils.dates import days_ago PATH_TO_UPLOAD_FILE = os.environ.get('AZURE_PATH_TO_UPLOAD_FILE', 'example-text.txt') -with DAG("example_file_to_wasb", schedule_interval="@once", start_date=days_ago(2)) as dag: - upload = FileToWasbOperator( +with DAG("example_local_to_wasb", schedule_interval="@once", start_date=days_ago(2)) as dag: + upload = LocalFilesystemToWasbOperator( task_id="upload_file", file_path=PATH_TO_UPLOAD_FILE, container_name="mycontainer", blob_name='myblob' ) delete = WasbDeleteBlobOperator(task_id="delete_file", container_name="mycontainer", blob_name="myblob") diff --git a/airflow/providers/microsoft/azure/provider.yaml b/airflow/providers/microsoft/azure/provider.yaml index 3622489..a0bc93f 100644 --- a/airflow/providers/microsoft/azure/provider.yaml +++ b/airflow/providers/microsoft/azure/provider.yaml @@ -148,6 +148,9 @@ transfers: - source-integration-name: Local target-integration-name: Microsoft Azure Blob Storage python-module: airflow.providers.microsoft.azure.transfers.file_to_wasb + - source-integration-name: Local + target-integration-name: Microsoft Azure Blob Storage + python-module: airflow.providers.microsoft.azure.transfers.local_to_wasb - source-integration-name: Microsoft Azure Blob Storage target-integration-name: Google Cloud Storage (GCS) how-to-guide: /docs/apache-airflow-providers-microsoft-azure/operators/azure_blob_to_gcs.rst diff --git a/airflow/providers/microsoft/azure/transfers/file_to_wasb.py b/airflow/providers/microsoft/azure/transfers/file_to_wasb.py index 0d00ded..3979ad4 100644 --- a/airflow/providers/microsoft/azure/transfers/file_to_wasb.py +++ b/airflow/providers/microsoft/azure/transfers/file_to_wasb.py @@ -16,57 +16,14 @@ # specific language governing permissions and limitations # under the License. # -from typing import Optional +"""This module is deprecated. Please use :mod:`airflow.providers.microsoft.azure.transfers.local_to_wasb`.""" -from airflow.models import BaseOperator -from airflow.providers.microsoft.azure.hooks.wasb import WasbHook +import warnings +from airflow.providers.microsoft.azure.transfers.local_to_wasb import LocalFilesystemToWasbOperator # noqa -class FileToWasbOperator(BaseOperator): - """ - Uploads a file to Azure Blob Storage. - - :param file_path: Path to the file to load. (templated) - :type file_path: str - :param container_name: Name of the container. (templated) - :type container_name: str - :param blob_name: Name of the blob. (templated) - :type blob_name: str - :param wasb_conn_id: Reference to the wasb connection. - :type wasb_conn_id: str - :param load_options: Optional keyword arguments that - `WasbHook.load_file()` takes. - :type load_options: Optional[dict] - """ - - template_fields = ('file_path', 'container_name', 'blob_name') - - def __init__( - self, - *, - file_path: str, - container_name: str, - blob_name: str, - wasb_conn_id: str = 'wasb_default', - load_options: Optional[dict] = None, - **kwargs, - ) -> None: - super().__init__(**kwargs) - if load_options is None: - load_options = {} - self.file_path = file_path - self.container_name = container_name - self.blob_name = blob_name - self.wasb_conn_id = wasb_conn_id - self.load_options = load_options - - def execute(self, context: dict) -> None: - """Upload a file to Azure Blob Storage.""" - hook = WasbHook(wasb_conn_id=self.wasb_conn_id) - self.log.info( - 'Uploading %s to wasb://%s as %s', - self.file_path, - self.container_name, - self.blob_name, - ) - hook.load_file(self.file_path, self.container_name, self.blob_name, **self.load_options) +warnings.warn( + "This module is deprecated. Please use `airflow.providers.microsoft.azure.transfers.local_to_wasb`.", + DeprecationWarning, + stacklevel=2, +) diff --git a/airflow/providers/microsoft/azure/transfers/file_to_wasb.py b/airflow/providers/microsoft/azure/transfers/local_to_wasb.py similarity index 97% copy from airflow/providers/microsoft/azure/transfers/file_to_wasb.py copy to airflow/providers/microsoft/azure/transfers/local_to_wasb.py index 0d00ded..5da137d 100644 --- a/airflow/providers/microsoft/azure/transfers/file_to_wasb.py +++ b/airflow/providers/microsoft/azure/transfers/local_to_wasb.py @@ -22,7 +22,7 @@ from airflow.models import BaseOperator from airflow.providers.microsoft.azure.hooks.wasb import WasbHook -class FileToWasbOperator(BaseOperator): +class LocalFilesystemToWasbOperator(BaseOperator): """ Uploads a file to Azure Blob Storage. diff --git a/dev/provider_packages/prepare_provider_packages.py b/dev/provider_packages/prepare_provider_packages.py index b45ee3c..5f18588 100755 --- a/dev/provider_packages/prepare_provider_packages.py +++ b/dev/provider_packages/prepare_provider_packages.py @@ -2117,6 +2117,7 @@ KNOWN_DEPRECATED_MESSAGES: Set[Tuple[str, str]] = { # we imported it directly during module walk by the importlib library KNOWN_DEPRECATED_DIRECT_IMPORTS: Set[str] = { "This module is deprecated. Please use `airflow.providers.amazon.aws.hooks.dynamodb`.", + "This module is deprecated. Please use `airflow.providers.microsoft.azure.transfers.local_to_wasb`.", "This module is deprecated. Please use `airflow.providers.tableau.operators.tableau_refresh_workbook`.", "This module is deprecated. Please use `airflow.providers.tableau.sensors.tableau_job_status`.", "This module is deprecated. Please use `airflow.providers.tableau.hooks.tableau`.", diff --git a/tests/deprecated_classes.py b/tests/deprecated_classes.py index 314c148..e7b34ca 100644 --- a/tests/deprecated_classes.py +++ b/tests/deprecated_classes.py @@ -1717,7 +1717,7 @@ TRANSFERS = [ 'airflow.operators.mssql_to_hive.MsSqlToHiveTransfer', ), ( - 'airflow.providers.microsoft.azure.transfers.file_to_wasb.FileToWasbOperator', + 'airflow.providers.microsoft.azure.transfers.local_to_wasb.LocalFilesystemToWasbOperator', 'airflow.contrib.operators.file_to_wasb.FileToWasbOperator', ), ( diff --git a/tests/providers/microsoft/azure/transfers/test_file_to_wasb.py b/tests/providers/microsoft/azure/transfers/test_local_to_wasb.py similarity index 81% rename from tests/providers/microsoft/azure/transfers/test_file_to_wasb.py rename to tests/providers/microsoft/azure/transfers/test_local_to_wasb.py index 73deaae..790db81 100644 --- a/tests/providers/microsoft/azure/transfers/test_file_to_wasb.py +++ b/tests/providers/microsoft/azure/transfers/test_local_to_wasb.py @@ -22,10 +22,10 @@ import unittest from unittest import mock from airflow.models.dag import DAG -from airflow.providers.microsoft.azure.transfers.file_to_wasb import FileToWasbOperator +from airflow.providers.microsoft.azure.transfers.local_to_wasb import LocalFilesystemToWasbOperator -class TestFileToWasbOperator(unittest.TestCase): +class TestLocalFilesystemToWasbOperator(unittest.TestCase): _config = { 'file_path': 'file', @@ -40,7 +40,7 @@ class TestFileToWasbOperator(unittest.TestCase): self.dag = DAG('test_dag_id', default_args=args) def test_init(self): - operator = FileToWasbOperator(task_id='wasb_operator_1', dag=self.dag, **self._config) + operator = LocalFilesystemToWasbOperator(task_id='wasb_operator_1', dag=self.dag, **self._config) assert operator.file_path == self._config['file_path'] assert operator.container_name == self._config['container_name'] assert operator.blob_name == self._config['blob_name'] @@ -48,15 +48,15 @@ class TestFileToWasbOperator(unittest.TestCase): assert operator.load_options == {} assert operator.retries == self._config['retries'] - operator = FileToWasbOperator( + operator = LocalFilesystemToWasbOperator( task_id='wasb_operator_2', dag=self.dag, load_options={'timeout': 2}, **self._config ) assert operator.load_options == {'timeout': 2} - @mock.patch('airflow.providers.microsoft.azure.transfers.file_to_wasb.WasbHook', autospec=True) + @mock.patch('airflow.providers.microsoft.azure.transfers.local_to_wasb.WasbHook', autospec=True) def test_execute(self, mock_hook): mock_instance = mock_hook.return_value - operator = FileToWasbOperator( + operator = LocalFilesystemToWasbOperator( task_id='wasb_sensor', dag=self.dag, load_options={'timeout': 2}, **self._config ) operator.execute(None) diff --git a/tests/providers/microsoft/azure/transfers/test_file_to_wasb_system.py b/tests/providers/microsoft/azure/transfers/test_local_to_wasb_system.py similarity index 85% rename from tests/providers/microsoft/azure/transfers/test_file_to_wasb_system.py rename to tests/providers/microsoft/azure/transfers/test_local_to_wasb_system.py index 5be49a5..5ec6d4d 100644 --- a/tests/providers/microsoft/azure/transfers/test_file_to_wasb_system.py +++ b/tests/providers/microsoft/azure/transfers/test_local_to_wasb_system.py @@ -19,7 +19,7 @@ import os import pytest -from airflow.providers.microsoft.azure.example_dags.example_file_to_wasb import PATH_TO_UPLOAD_FILE +from airflow.providers.microsoft.azure.example_dags.example_local_to_wasb import PATH_TO_UPLOAD_FILE from tests.test_utils.azure_system_helpers import ( AZURE_DAG_FOLDER, AzureSystemTest, @@ -33,7 +33,7 @@ CREDENTIALS_PATH = os.path.join(CREDENTIALS_DIR, WASB_DEFAULT_KEY) @pytest.mark.backend('postgres', 'mysql') @pytest.mark.credential_file(WASB_DEFAULT_KEY) -class FileToWasbSystem(AzureSystemTest): +class LocalToWasbSystem(AzureSystemTest): def setUp(self): super().setUp() with open(PATH_TO_UPLOAD_FILE, 'w+') as file: @@ -44,5 +44,5 @@ class FileToWasbSystem(AzureSystemTest): super().tearDown() @provide_wasb_default_connection(CREDENTIALS_PATH) - def test_run_example_file_to_wasb(self): - self.run_dag('example_file_to_wasb', AZURE_DAG_FOLDER) + def test_run_example_local_to_wasb(self): + self.run_dag('example_local_to_wasb', AZURE_DAG_FOLDER)