This is an automated email from the ASF dual-hosted git repository. ferruzzi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 7a93b19138 D401 Support - Providers: DaskExecutor to Github (Inclusive) (#34935) 7a93b19138 is described below commit 7a93b1913845710eb67ab4670c1be9e9382c030b Author: D. Ferruzzi <ferru...@amazon.com> AuthorDate: Mon Oct 16 00:49:36 2023 -0700 D401 Support - Providers: DaskExecutor to Github (Inclusive) (#34935) --- airflow/providers/databricks/hooks/databricks.py | 46 +++++++++++----------- .../providers/databricks/hooks/databricks_base.py | 10 ++--- .../providers/databricks/hooks/databricks_sql.py | 5 ++- .../providers/databricks/operators/databricks.py | 8 ++-- .../databricks/operators/databricks_repos.py | 8 ++-- .../databricks/operators/databricks_sql.py | 2 +- .../databricks/sensors/databricks_partition.py | 8 ++-- .../providers/databricks/sensors/databricks_sql.py | 4 +- airflow/providers/databricks/utils/databricks.py | 2 +- airflow/providers/datadog/hooks/datadog.py | 8 ++-- airflow/providers/dbt/cloud/hooks/dbt.py | 40 +++++++++---------- airflow/providers/dbt/cloud/operators/dbt.py | 4 +- airflow/providers/dbt/cloud/sensors/dbt.py | 8 ++-- airflow/providers/dbt/cloud/triggers/dbt.py | 2 +- airflow/providers/dbt/cloud/utils/openlineage.py | 4 +- airflow/providers/docker/hooks/docker.py | 4 +- .../providers/elasticsearch/hooks/elasticsearch.py | 8 ++-- .../providers/elasticsearch/log/es_task_handler.py | 8 ++-- airflow/providers/exasol/hooks/exasol.py | 3 +- airflow/providers/facebook/ads/hooks/ads.py | 4 +- airflow/providers/ftp/hooks/ftp.py | 20 +++++----- airflow/providers/ftp/operators/ftp.py | 2 +- airflow/providers/github/hooks/github.py | 4 +- airflow/providers/github/sensors/github.py | 6 +-- 24 files changed, 110 insertions(+), 108 deletions(-) diff --git a/airflow/providers/databricks/hooks/databricks.py b/airflow/providers/databricks/hooks/databricks.py index 06ba762327..5d3f714b44 100644 --- a/airflow/providers/databricks/hooks/databricks.py +++ b/airflow/providers/databricks/hooks/databricks.py @@ -196,7 +196,7 @@ class DatabricksHook(BaseDatabricksHook): def run_now(self, json: dict) -> int: """ - Utility function to call the ``api/2.1/jobs/run-now`` endpoint. + Call the ``api/2.1/jobs/run-now`` endpoint. :param json: The data used in the body of the request to the ``run-now`` endpoint. :return: the run_id as an int @@ -206,7 +206,7 @@ class DatabricksHook(BaseDatabricksHook): def submit_run(self, json: dict) -> int: """ - Utility function to call the ``api/2.1/jobs/runs/submit`` endpoint. + Call the ``api/2.1/jobs/runs/submit`` endpoint. :param json: The data used in the body of the request to the ``submit`` endpoint. :return: the run_id as an int @@ -223,7 +223,7 @@ class DatabricksHook(BaseDatabricksHook): page_token: str | None = None, ) -> list[dict[str, Any]]: """ - Lists the jobs in the Databricks Job Service. + List the jobs in the Databricks Job Service. :param limit: The limit/batch size used to retrieve jobs. :param offset: The offset of the first job to return, relative to the most recently created job. @@ -274,7 +274,7 @@ class DatabricksHook(BaseDatabricksHook): def find_job_id_by_name(self, job_name: str) -> int | None: """ - Finds job id by its name. If there are multiple jobs with the same name, raises AirflowException. + Find job id by its name; if there are multiple jobs with the same name, raise AirflowException. :param job_name: The name of the job to look up. :return: The job_id as an int or None if no job was found. @@ -295,7 +295,7 @@ class DatabricksHook(BaseDatabricksHook): self, batch_size: int = 25, pipeline_name: str | None = None, notebook_path: str | None = None ) -> list[dict[str, Any]]: """ - Lists the pipelines in Databricks Delta Live Tables. + List the pipelines in Databricks Delta Live Tables. :param batch_size: The limit/batch size used to retrieve pipelines. :param pipeline_name: Optional name of a pipeline to search. Cannot be combined with path. @@ -334,7 +334,7 @@ class DatabricksHook(BaseDatabricksHook): def find_pipeline_id_by_name(self, pipeline_name: str) -> str | None: """ - Finds pipeline id by its name. If multiple pipelines with the same name, raises AirflowException. + Find pipeline id by its name; if multiple pipelines with the same name, raise AirflowException. :param pipeline_name: The name of the pipeline to look up. :return: The pipeline_id as a GUID string or None if no pipeline was found. @@ -354,7 +354,7 @@ class DatabricksHook(BaseDatabricksHook): def get_run_page_url(self, run_id: int) -> str: """ - Retrieves run_page_url. + Retrieve run_page_url. :param run_id: id of the run :return: URL of the run page @@ -376,7 +376,7 @@ class DatabricksHook(BaseDatabricksHook): def get_job_id(self, run_id: int) -> int: """ - Retrieves job_id from run_id. + Retrieve job_id from run_id. :param run_id: id of the run :return: Job id for given Databricks run @@ -387,7 +387,7 @@ class DatabricksHook(BaseDatabricksHook): def get_run_state(self, run_id: int) -> RunState: """ - Retrieves run state of the run. + Retrieve run state of the run. Please note that any Airflow tasks that call the ``get_run_state`` method will result in failure unless you have enabled xcom pickling. This can be done using the following @@ -454,7 +454,7 @@ class DatabricksHook(BaseDatabricksHook): def get_run_state_lifecycle(self, run_id: int) -> str: """ - Returns the lifecycle state of the run. + Return the lifecycle state of the run. :param run_id: id of the run :return: string with lifecycle state @@ -463,7 +463,7 @@ class DatabricksHook(BaseDatabricksHook): def get_run_state_result(self, run_id: int) -> str: """ - Returns the resulting state of the run. + Return the resulting state of the run. :param run_id: id of the run :return: string with resulting state @@ -472,7 +472,7 @@ class DatabricksHook(BaseDatabricksHook): def get_run_state_message(self, run_id: int) -> str: """ - Returns the state message for the run. + Return the state message for the run. :param run_id: id of the run :return: string with state message @@ -481,7 +481,7 @@ class DatabricksHook(BaseDatabricksHook): def get_run_output(self, run_id: int) -> dict: """ - Retrieves run output of the run. + Retrieve run output of the run. :param run_id: id of the run :return: output of the run @@ -492,7 +492,7 @@ class DatabricksHook(BaseDatabricksHook): def cancel_run(self, run_id: int) -> None: """ - Cancels the run. + Cancel the run. :param run_id: id of the run """ @@ -501,7 +501,7 @@ class DatabricksHook(BaseDatabricksHook): def cancel_all_runs(self, job_id: int) -> None: """ - Cancels all active runs of a job. The runs are canceled asynchronously. + Cancel all active runs of a job asynchronously. :param job_id: The canonical identifier of the job to cancel all runs of """ @@ -510,7 +510,7 @@ class DatabricksHook(BaseDatabricksHook): def delete_run(self, run_id: int) -> None: """ - Deletes a non-active run. + Delete a non-active run. :param run_id: id of the run """ @@ -527,7 +527,7 @@ class DatabricksHook(BaseDatabricksHook): def get_cluster_state(self, cluster_id: str) -> ClusterState: """ - Retrieves run state of the cluster. + Retrieve run state of the cluster. :param cluster_id: id of the cluster :return: state of the cluster @@ -561,7 +561,7 @@ class DatabricksHook(BaseDatabricksHook): def start_cluster(self, json: dict) -> None: """ - Starts the cluster. + Start the cluster. :param json: json dictionary containing cluster specification. """ @@ -569,7 +569,7 @@ class DatabricksHook(BaseDatabricksHook): def terminate_cluster(self, json: dict) -> None: """ - Terminates the cluster. + Terminate the cluster. :param json: json dictionary containing cluster specification. """ @@ -597,7 +597,7 @@ class DatabricksHook(BaseDatabricksHook): def update_repo(self, repo_id: str, json: dict[str, Any]) -> dict: """ - Updates given Databricks Repos. + Update given Databricks Repos. :param repo_id: ID of Databricks Repos :param json: payload @@ -608,7 +608,7 @@ class DatabricksHook(BaseDatabricksHook): def delete_repo(self, repo_id: str): """ - Deletes given Databricks Repos. + Delete given Databricks Repos. :param repo_id: ID of Databricks Repos :return: @@ -618,7 +618,7 @@ class DatabricksHook(BaseDatabricksHook): def create_repo(self, json: dict[str, Any]) -> dict: """ - Creates a Databricks Repos. + Create a Databricks Repos. :param json: payload :return: @@ -628,7 +628,7 @@ class DatabricksHook(BaseDatabricksHook): def get_repo_by_path(self, path: str) -> str | None: """ - Obtains Repos ID by path. + Obtain Repos ID by path. :param path: path to a repository :return: Repos ID if it exists, None if doesn't. diff --git a/airflow/providers/databricks/hooks/databricks_base.py b/airflow/providers/databricks/hooks/databricks_base.py index 3fec74c343..dfbd464b5e 100644 --- a/airflow/providers/databricks/hooks/databricks_base.py +++ b/airflow/providers/databricks/hooks/databricks_base.py @@ -175,7 +175,7 @@ class BaseDatabricksHook(BaseHook): @staticmethod def _parse_host(host: str) -> str: """ - This function is resistant to incorrect connection settings provided by users, in the host field. + Parse host field data; this function is resistant to incorrect connection settings provided by users. For example -- when users supply ``https://xx.cloud.databricks.com`` as the host, we must strip out the protocol to get the host.:: @@ -215,7 +215,7 @@ class BaseDatabricksHook(BaseHook): return AsyncRetrying(**self.retry_args) def _get_sp_token(self, resource: str) -> str: - """Function to get Service Principal token.""" + """Get Service Principal token.""" sp_token = self.oauth_tokens.get(resource) if sp_token and self._is_oauth_token_valid(sp_token): return sp_token["access_token"] @@ -287,7 +287,7 @@ class BaseDatabricksHook(BaseHook): def _get_aad_token(self, resource: str) -> str: """ - Function to get AAD token for given resource. + Get AAD token for given resource. Supports managed identity or service principal auth. :param resource: resource to issue token to @@ -441,7 +441,7 @@ class BaseDatabricksHook(BaseHook): @staticmethod def _is_oauth_token_valid(token: dict, time_key="expires_on") -> bool: """ - Utility function to check if an OAuth token is valid and hasn't expired yet. + Check if an OAuth token is valid and hasn't expired yet. :param sp_token: dict with properties of OAuth token :param time_key: name of the key that holds the time of expiration @@ -556,7 +556,7 @@ class BaseDatabricksHook(BaseHook): wrap_http_errors: bool = True, ): """ - Utility function to perform an API call with retries. + Perform an API call with retries. :param endpoint_info: Tuple of method and endpoint :param json: Parameters for this API call. diff --git a/airflow/providers/databricks/hooks/databricks_sql.py b/airflow/providers/databricks/hooks/databricks_sql.py index c592142b73..c0bf5f1cad 100644 --- a/airflow/providers/databricks/hooks/databricks_sql.py +++ b/airflow/providers/databricks/hooks/databricks_sql.py @@ -101,7 +101,7 @@ class DatabricksSqlHook(BaseDatabricksHook, DbApiHook): return endpoint def get_conn(self) -> Connection: - """Returns a Databricks SQL connection object.""" + """Return a Databricks SQL connection object.""" if not self._http_path: if self._sql_endpoint_name: endpoint = self._get_sql_endpoint_by_name(self._sql_endpoint_name) @@ -178,7 +178,8 @@ class DatabricksSqlHook(BaseDatabricksHook, DbApiHook): split_statements: bool = True, return_last: bool = True, ) -> T | list[T] | None: - """Runs a command or a list of commands. + """ + Run a command or a list of commands. Pass a list of SQL statements to the SQL parameter to get them to execute sequentially. diff --git a/airflow/providers/databricks/operators/databricks.py b/airflow/providers/databricks/operators/databricks.py index 8551c8d43f..fd9e256230 100644 --- a/airflow/providers/databricks/operators/databricks.py +++ b/airflow/providers/databricks/operators/databricks.py @@ -44,7 +44,7 @@ XCOM_RUN_PAGE_URL_KEY = "run_page_url" def _handle_databricks_operator_execution(operator, hook, log, context) -> None: """ - Handles the Airflow + Databricks lifecycle logic for a Databricks operator. + Handle the Airflow + Databricks lifecycle logic for a Databricks operator. :param operator: Databricks operator being handled :param context: Airflow context @@ -102,7 +102,7 @@ def _handle_databricks_operator_execution(operator, hook, log, context) -> None: def _handle_deferrable_databricks_operator_execution(operator, hook, log, context) -> None: """ - Handles the Airflow + Databricks lifecycle logic for deferrable Databricks operators. + Handle the Airflow + Databricks lifecycle logic for deferrable Databricks operators. :param operator: Databricks async operator being handled :param context: Airflow context @@ -320,7 +320,7 @@ class DatabricksSubmitRunOperator(BaseOperator): deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), **kwargs, ) -> None: - """Creates a new ``DatabricksSubmitRunOperator``.""" + """Create a new ``DatabricksSubmitRunOperator``.""" super().__init__(**kwargs) self.json = json or {} self.databricks_conn_id = databricks_conn_id @@ -621,7 +621,7 @@ class DatabricksRunNowOperator(BaseOperator): deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), **kwargs, ) -> None: - """Creates a new ``DatabricksRunNowOperator``.""" + """Create a new ``DatabricksRunNowOperator``.""" super().__init__(**kwargs) self.json = json or {} self.databricks_conn_id = databricks_conn_id diff --git a/airflow/providers/databricks/operators/databricks_repos.py b/airflow/providers/databricks/operators/databricks_repos.py index ad9be926a0..67fd49323b 100644 --- a/airflow/providers/databricks/operators/databricks_repos.py +++ b/airflow/providers/databricks/operators/databricks_repos.py @@ -80,7 +80,7 @@ class DatabricksReposCreateOperator(BaseOperator): databricks_retry_delay: int = 1, **kwargs, ) -> None: - """Creates a new ``DatabricksReposCreateOperator``.""" + """Create a new ``DatabricksReposCreateOperator``.""" super().__init__(**kwargs) self.databricks_conn_id = databricks_conn_id self.databricks_retry_limit = databricks_retry_limit @@ -125,7 +125,7 @@ class DatabricksReposCreateOperator(BaseOperator): def execute(self, context: Context): """ - Creates a Databricks Repo. + Create a Databricks Repo. :param context: context :return: Repo ID @@ -194,7 +194,7 @@ class DatabricksReposUpdateOperator(BaseOperator): databricks_retry_delay: int = 1, **kwargs, ) -> None: - """Creates a new ``DatabricksReposUpdateOperator``.""" + """Create a new ``DatabricksReposUpdateOperator``.""" super().__init__(**kwargs) self.databricks_conn_id = databricks_conn_id self.databricks_retry_limit = databricks_retry_limit @@ -266,7 +266,7 @@ class DatabricksReposDeleteOperator(BaseOperator): databricks_retry_delay: int = 1, **kwargs, ) -> None: - """Creates a new ``DatabricksReposDeleteOperator``.""" + """Create a new ``DatabricksReposDeleteOperator``.""" super().__init__(**kwargs) self.databricks_conn_id = databricks_conn_id self.databricks_retry_limit = databricks_retry_limit diff --git a/airflow/providers/databricks/operators/databricks_sql.py b/airflow/providers/databricks/operators/databricks_sql.py index b40a1da115..a9354000b0 100644 --- a/airflow/providers/databricks/operators/databricks_sql.py +++ b/airflow/providers/databricks/operators/databricks_sql.py @@ -244,7 +244,7 @@ class DatabricksCopyIntoOperator(BaseOperator): validate: bool | int | None = None, **kwargs, ) -> None: - """Creates a new ``DatabricksSqlOperator``.""" + """Create a new ``DatabricksSqlOperator``.""" super().__init__(**kwargs) if files is not None and pattern is not None: raise AirflowException("Only one of 'pattern' or 'files' should be specified") diff --git a/airflow/providers/databricks/sensors/databricks_partition.py b/airflow/providers/databricks/sensors/databricks_partition.py index 9c74891018..d056cea73a 100644 --- a/airflow/providers/databricks/sensors/databricks_partition.py +++ b/airflow/providers/databricks/sensors/databricks_partition.py @@ -110,7 +110,7 @@ class DatabricksPartitionSensor(BaseSensorOperator): super().__init__(**kwargs) def _sql_sensor(self, sql): - """Executes the supplied SQL statement using the hook object.""" + """Execute the supplied SQL statement using the hook object.""" hook = self._get_hook sql_result = hook.run( sql, @@ -121,7 +121,7 @@ class DatabricksPartitionSensor(BaseSensorOperator): @cached_property def _get_hook(self) -> DatabricksSqlHook: - """Creates and returns a DatabricksSqlHook object.""" + """Create and return a DatabricksSqlHook object.""" return DatabricksSqlHook( self.databricks_conn_id, self._http_path, @@ -166,7 +166,7 @@ class DatabricksPartitionSensor(BaseSensorOperator): escape_key: bool = False, ) -> str: """ - Queries the table for available partitions. + Query the table for available partitions. Generates the SQL query based on the partition data types. * For a list, it prepares the SQL in the format: @@ -225,7 +225,7 @@ class DatabricksPartitionSensor(BaseSensorOperator): return formatted_opts.strip() def poke(self, context: Context) -> bool: - """Checks the table partitions and returns the results.""" + """Check the table partitions and return the results.""" partition_result = self._check_table_partitions() self.log.debug("Partition sensor result: %s", partition_result) if partition_result: diff --git a/airflow/providers/databricks/sensors/databricks_sql.py b/airflow/providers/databricks/sensors/databricks_sql.py index 532af07613..5f215eed3f 100644 --- a/airflow/providers/databricks/sensors/databricks_sql.py +++ b/airflow/providers/databricks/sensors/databricks_sql.py @@ -83,7 +83,7 @@ class DatabricksSqlSensor(BaseSensorOperator): client_parameters: dict[str, Any] | None = None, **kwargs, ) -> None: - """Creates DatabricksSqlSensor object using the specified input arguments.""" + """Create DatabricksSqlSensor object using the specified input arguments.""" self.databricks_conn_id = databricks_conn_id self._http_path = http_path self._sql_warehouse_name = sql_warehouse_name @@ -115,7 +115,7 @@ class DatabricksSqlSensor(BaseSensorOperator): ) def _get_results(self) -> bool: - """Uses the Databricks SQL hook and runs the specified SQL query.""" + """Use the Databricks SQL hook and run the specified SQL query.""" if not (self._http_path or self._sql_warehouse_name): # TODO: remove this if block when min_airflow_version is set to higher than 2.7.1 message = ( diff --git a/airflow/providers/databricks/utils/databricks.py b/airflow/providers/databricks/utils/databricks.py index cac38b8fd9..0635017b28 100644 --- a/airflow/providers/databricks/utils/databricks.py +++ b/airflow/providers/databricks/utils/databricks.py @@ -51,7 +51,7 @@ def normalise_json_content(content, json_path: str = "json") -> str | bool | lis def validate_trigger_event(event: dict): """ - Validates correctness of the event received from DatabricksExecutionTrigger. + Validate correctness of the event received from DatabricksExecutionTrigger. See: :class:`~airflow.providers.databricks.triggers.databricks.DatabricksExecutionTrigger`. """ diff --git a/airflow/providers/datadog/hooks/datadog.py b/airflow/providers/datadog/hooks/datadog.py index eae101eabd..4db7abd3ca 100644 --- a/airflow/providers/datadog/hooks/datadog.py +++ b/airflow/providers/datadog/hooks/datadog.py @@ -78,7 +78,7 @@ class DatadogHook(BaseHook, LoggingMixin): interval: int | None = None, ) -> dict[str, Any]: """ - Sends a single datapoint metric to Datadog. + Send a single datapoint metric to Datadog. :param metric_name: The name of the metric :param datapoint: A single integer or float related to the metric @@ -122,7 +122,7 @@ class DatadogHook(BaseHook, LoggingMixin): device_name: list[str] | None = None, ) -> dict[str, Any]: """ - Posts an event to datadog (processing finished, potentially alerts, other issues). + Post an event to datadog (processing finished, potentially alerts, other issues). Think about this as a means to maintain persistence of alerts, rather than alerting itself. @@ -160,7 +160,7 @@ class DatadogHook(BaseHook, LoggingMixin): @staticmethod def get_connection_form_widgets() -> dict[str, Any]: - """Returns connection widgets to add to connection form.""" + """Return connection widgets to add to connection form.""" from flask_appbuilder.fieldwidgets import BS3TextFieldWidget from flask_babel import lazy_gettext from wtforms import StringField @@ -174,7 +174,7 @@ class DatadogHook(BaseHook, LoggingMixin): @staticmethod def get_ui_field_behaviour() -> dict[str, Any]: - """Returns custom field behaviour.""" + """Return custom field behaviour.""" return { "hidden_fields": ["schema", "login", "password", "port", "extra"], "relabeling": {"host": "Events host name"}, diff --git a/airflow/providers/dbt/cloud/hooks/dbt.py b/airflow/providers/dbt/cloud/hooks/dbt.py index db70abf120..b37a9ca829 100644 --- a/airflow/providers/dbt/cloud/hooks/dbt.py +++ b/airflow/providers/dbt/cloud/hooks/dbt.py @@ -42,7 +42,7 @@ if TYPE_CHECKING: def fallback_to_default_account(func: Callable) -> Callable: """ - Decorator which provides a fallback value for ``account_id``. + Provide a fallback value for ``account_id``. If the ``account_id`` is None or not passed to the decorated function, the value will be taken from the configured dbt Cloud Airflow Connection. @@ -113,7 +113,7 @@ class DbtCloudJobRunStatus(Enum): @classmethod def check_is_valid(cls, statuses: int | Sequence[int] | set[int]): - """Validates input statuses are a known value.""" + """Validate input statuses are a known value.""" if isinstance(statuses, (Sequence, Set)): for status in statuses: cls(status) @@ -122,7 +122,7 @@ class DbtCloudJobRunStatus(Enum): @classmethod def is_terminal(cls, status: int) -> bool: - """Checks if the input status is that of a terminal type.""" + """Check if the input status is that of a terminal type.""" cls.check_is_valid(statuses=status) return status in cls.TERMINAL_STATUSES.value @@ -137,7 +137,7 @@ T = TypeVar("T", bound=Any) def provide_account_id(func: T) -> T: """ - Decorator which provides a fallback value for ``account_id``. + Provide a fallback value for ``account_id``. If the ``account_id`` is None or not passed to the decorated function, the value will be taken from the configured dbt Cloud Airflow Connection. @@ -176,7 +176,7 @@ class DbtCloudHook(HttpHook): @staticmethod def get_ui_field_behaviour() -> dict[str, Any]: - """Builds custom field behavior for the dbt Cloud connection form in the Airflow UI.""" + """Build custom field behavior for the dbt Cloud connection form in the Airflow UI.""" return { "hidden_fields": ["schema", "port", "extra"], "relabeling": {"login": "Account ID", "password": "API Token", "host": "Tenant"}, @@ -224,7 +224,7 @@ class DbtCloudHook(HttpHook): self, run_id: int, account_id: int | None = None, include_related: list[str] | None = None ) -> Any: """ - Uses Http async call to retrieve metadata for a specific run of a dbt Cloud job. + Use Http async call to retrieve metadata for a specific run of a dbt Cloud job. :param run_id: The ID of a dbt Cloud job run. :param account_id: Optional. The ID of a dbt Cloud account. @@ -247,7 +247,7 @@ class DbtCloudHook(HttpHook): self, run_id: int, account_id: int | None = None, include_related: list[str] | None = None ) -> int: """ - Retrieves the status for a specific run of a dbt Cloud job. + Retrieve the status for a specific run of a dbt Cloud job. :param run_id: The ID of a dbt Cloud job run. :param account_id: Optional. The ID of a dbt Cloud account. @@ -317,7 +317,7 @@ class DbtCloudHook(HttpHook): def list_accounts(self) -> list[Response]: """ - Retrieves all of the dbt Cloud accounts the configured API token is authorized to access. + Retrieve all of the dbt Cloud accounts the configured API token is authorized to access. :return: List of request responses. """ @@ -326,7 +326,7 @@ class DbtCloudHook(HttpHook): @fallback_to_default_account def get_account(self, account_id: int | None = None) -> Response: """ - Retrieves metadata for a specific dbt Cloud account. + Retrieve metadata for a specific dbt Cloud account. :param account_id: Optional. The ID of a dbt Cloud account. :return: The request response. @@ -336,7 +336,7 @@ class DbtCloudHook(HttpHook): @fallback_to_default_account def list_projects(self, account_id: int | None = None) -> list[Response]: """ - Retrieves metadata for all projects tied to a specified dbt Cloud account. + Retrieve metadata for all projects tied to a specified dbt Cloud account. :param account_id: Optional. The ID of a dbt Cloud account. :return: List of request responses. @@ -346,7 +346,7 @@ class DbtCloudHook(HttpHook): @fallback_to_default_account def get_project(self, project_id: int, account_id: int | None = None) -> Response: """ - Retrieves metadata for a specific project. + Retrieve metadata for a specific project. :param project_id: The ID of a dbt Cloud project. :param account_id: Optional. The ID of a dbt Cloud account. @@ -362,7 +362,7 @@ class DbtCloudHook(HttpHook): project_id: int | None = None, ) -> list[Response]: """ - Retrieves metadata for all jobs tied to a specified dbt Cloud account. + Retrieve metadata for all jobs tied to a specified dbt Cloud account. If a ``project_id`` is supplied, only jobs pertaining to this project will be retrieved. @@ -381,7 +381,7 @@ class DbtCloudHook(HttpHook): @fallback_to_default_account def get_job(self, job_id: int, account_id: int | None = None) -> Response: """ - Retrieves metadata for a specific job. + Retrieve metadata for a specific job. :param job_id: The ID of a dbt Cloud job. :param account_id: Optional. The ID of a dbt Cloud account. @@ -438,7 +438,7 @@ class DbtCloudHook(HttpHook): order_by: str | None = None, ) -> list[Response]: """ - Retrieves metadata for all dbt Cloud job runs for an account. + Retrieve metadata for all dbt Cloud job runs for an account. If a ``job_definition_id`` is supplied, only metadata for runs of that specific job are pulled. @@ -465,7 +465,7 @@ class DbtCloudHook(HttpHook): self, run_id: int, account_id: int | None = None, include_related: list[str] | None = None ) -> Response: """ - Retrieves metadata for a specific run of a dbt Cloud job. + Retrieve metadata for a specific run of a dbt Cloud job. :param run_id: The ID of a dbt Cloud job run. :param account_id: Optional. The ID of a dbt Cloud account. @@ -480,7 +480,7 @@ class DbtCloudHook(HttpHook): def get_job_run_status(self, run_id: int, account_id: int | None = None) -> int: """ - Retrieves the status for a specific run of a dbt Cloud job. + Retrieve the status for a specific run of a dbt Cloud job. :param run_id: The ID of a dbt Cloud job run. :param account_id: Optional. The ID of a dbt Cloud account. @@ -504,7 +504,7 @@ class DbtCloudHook(HttpHook): timeout: int = 60 * 60 * 24 * 7, ) -> bool: """ - Waits for a dbt Cloud job run to match an expected status. + Wait for a dbt Cloud job run to match an expected status. :param run_id: The ID of a dbt Cloud job run. :param account_id: Optional. The ID of a dbt Cloud account. @@ -555,7 +555,7 @@ class DbtCloudHook(HttpHook): self, run_id: int, account_id: int | None = None, step: int | None = None ) -> list[Response]: """ - Retrieves a list of the available artifact files generated for a completed run of a dbt Cloud job. + Retrieve a list of the available artifact files generated for a completed run of a dbt Cloud job. By default, this returns artifacts from the last step in the run. To list artifacts from other steps in the run, use the ``step`` parameter. @@ -576,7 +576,7 @@ class DbtCloudHook(HttpHook): self, run_id: int, path: str, account_id: int | None = None, step: int | None = None ) -> Response: """ - Retrieves a list of the available artifact files generated for a completed run of a dbt Cloud job. + Retrieve a list of the available artifact files generated for a completed run of a dbt Cloud job. By default, this returns artifacts from the last step in the run. To list artifacts from other steps in the run, use the ``step`` parameter. @@ -604,7 +604,7 @@ class DbtCloudHook(HttpHook): step: int | None = None, ): """ - Retrieves a list of chosen artifact files generated for a step in completed run of a dbt Cloud job. + Retrieve a list of chosen artifact files generated for a step in completed run of a dbt Cloud job. By default, this returns artifacts from the last step in the run. This takes advantage of the asynchronous calls to speed up the retrieval. diff --git a/airflow/providers/dbt/cloud/operators/dbt.py b/airflow/providers/dbt/cloud/operators/dbt.py index 2b37fa144a..d43931cf89 100644 --- a/airflow/providers/dbt/cloud/operators/dbt.py +++ b/airflow/providers/dbt/cloud/operators/dbt.py @@ -190,7 +190,7 @@ class DbtCloudRunJobOperator(BaseOperator): def execute_complete(self, context: Context, event: dict[str, Any]) -> int: """ - Callback for when the trigger fires - returns immediately. + Execute when the trigger fires - returns immediately. Relies on trigger to throw an exception, otherwise it assumes execution was successful. """ @@ -220,7 +220,7 @@ class DbtCloudRunJobOperator(BaseOperator): def get_openlineage_facets_on_complete(self, task_instance) -> OperatorLineage: """ - Implementing _on_complete because job_run needs to be triggered first in execute method. + Implement _on_complete because job_run needs to be triggered first in execute method. This should send additional events only if operator `wait_for_termination` is set to True. """ diff --git a/airflow/providers/dbt/cloud/sensors/dbt.py b/airflow/providers/dbt/cloud/sensors/dbt.py index 4f371d9c1f..63f3ab6ac1 100644 --- a/airflow/providers/dbt/cloud/sensors/dbt.py +++ b/airflow/providers/dbt/cloud/sensors/dbt.py @@ -130,7 +130,8 @@ class DbtCloudJobRunSensor(BaseSensorOperator): ) def execute_complete(self, context: Context, event: dict[str, Any]) -> int: - """Callback for when the trigger fires - returns immediately. + """ + Execute when the trigger fires - returns immediately. This relies on trigger to throw an exception, otherwise it assumes execution was successful. @@ -144,12 +145,13 @@ class DbtCloudJobRunSensor(BaseSensorOperator): return int(event["run_id"]) def get_openlineage_facets_on_complete(self, task_instance) -> OperatorLineage: - """Implementing _on_complete because job_run needs to be triggered first in execute method.""" + """Implement _on_complete because job_run needs to be triggered first in execute method.""" return generate_openlineage_events_from_dbt_cloud_run(operator=self, task_instance=task_instance) class DbtCloudJobRunAsyncSensor(DbtCloudJobRunSensor): - """This class is deprecated. + """ + This class is deprecated. Please use :class:`airflow.providers.dbt.cloud.sensor.dbt.DbtCloudJobRunSensor` with ``deferrable=True``. diff --git a/airflow/providers/dbt/cloud/triggers/dbt.py b/airflow/providers/dbt/cloud/triggers/dbt.py index eaca3c12f8..4f6688a293 100644 --- a/airflow/providers/dbt/cloud/triggers/dbt.py +++ b/airflow/providers/dbt/cloud/triggers/dbt.py @@ -52,7 +52,7 @@ class DbtCloudRunJobTrigger(BaseTrigger): self.poll_interval = poll_interval def serialize(self) -> tuple[str, dict[str, Any]]: - """Serializes DbtCloudRunJobTrigger arguments and classpath.""" + """Serialize DbtCloudRunJobTrigger arguments and classpath.""" return ( "airflow.providers.dbt.cloud.triggers.dbt.DbtCloudRunJobTrigger", { diff --git a/airflow/providers/dbt/cloud/utils/openlineage.py b/airflow/providers/dbt/cloud/utils/openlineage.py index f25df02efa..6a0934d412 100644 --- a/airflow/providers/dbt/cloud/utils/openlineage.py +++ b/airflow/providers/dbt/cloud/utils/openlineage.py @@ -32,7 +32,7 @@ def generate_openlineage_events_from_dbt_cloud_run( operator: DbtCloudRunJobOperator | DbtCloudJobRunSensor, task_instance: TaskInstance ) -> OperatorLineage: """ - Common method generating OpenLineage events from the DBT Cloud run. + Generate OpenLineage events from the DBT Cloud run. This function retrieves information about a DBT Cloud run, including the associated job, project, and execution details. It processes the run's artifacts, such as the manifest and run results, @@ -83,7 +83,7 @@ def generate_openlineage_events_from_dbt_cloud_run( catalog = operator.hook.get_job_run_artifact(operator.run_id, path="catalog.json").json()["data"] async def get_artifacts_for_steps(steps, artifacts): - """Gets artifacts for a list of steps concurrently.""" + """Get artifacts for a list of steps concurrently.""" tasks = [ operator.hook.get_job_run_artifacts_concurrently( run_id=operator.run_id, diff --git a/airflow/providers/docker/hooks/docker.py b/airflow/providers/docker/hooks/docker.py index 47f3b95907..768ea9f942 100644 --- a/airflow/providers/docker/hooks/docker.py +++ b/airflow/providers/docker/hooks/docker.py @@ -168,7 +168,7 @@ class DockerHook(BaseHook): @staticmethod def get_connection_form_widgets() -> dict[str, Any]: - """Returns connection form widgets.""" + """Return connection form widgets.""" from flask_appbuilder.fieldwidgets import BS3TextFieldWidget from flask_babel import lazy_gettext from wtforms import BooleanField, StringField @@ -183,7 +183,7 @@ class DockerHook(BaseHook): @classmethod def get_ui_field_behaviour(cls) -> dict[str, Any]: - """Returns custom field behaviour.""" + """Return custom field behaviour.""" return { "hidden_fields": ["schema"], "relabeling": { diff --git a/airflow/providers/elasticsearch/hooks/elasticsearch.py b/airflow/providers/elasticsearch/hooks/elasticsearch.py index 10801eca2c..6c93586892 100644 --- a/airflow/providers/elasticsearch/hooks/elasticsearch.py +++ b/airflow/providers/elasticsearch/hooks/elasticsearch.py @@ -90,7 +90,7 @@ class ElasticsearchSQLHook(DbApiHook): self.connection = connection def get_conn(self) -> ESConnection: - """Returns a elasticsearch connection object.""" + """Return an elasticsearch connection object.""" conn_id = getattr(self, self.conn_name_attr) conn = self.connection or self.get_connection(conn_id) @@ -172,19 +172,19 @@ class ElasticsearchPythonHook(BaseHook): self.es_conn_args = es_conn_args or {} def _get_elastic_connection(self): - """Returns the Elasticsearch client.""" + """Return the Elasticsearch client.""" client = Elasticsearch(self.hosts, **self.es_conn_args) return client @cached_property def get_conn(self): - """Returns the Elasticsearch client (cached).""" + """Return the Elasticsearch client (cached).""" return self._get_elastic_connection() def search(self, query: dict[Any, Any], index: str = "_all") -> dict: """ - Returns results matching a query using Elasticsearch DSL. + Return results matching a query using Elasticsearch DSL. :param index: str: The index you want to query :param query: dict: The query you want to run diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py b/airflow/providers/elasticsearch/log/es_task_handler.py index ab07e12487..6c97e25f3b 100644 --- a/airflow/providers/elasticsearch/log/es_task_handler.py +++ b/airflow/providers/elasticsearch/log/es_task_handler.py @@ -164,9 +164,7 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix @staticmethod def format_url(host: str) -> str: """ - Formats the given host string to ensure it starts with 'http'. - - Checks if the host string represents a valid URL. + Format the given host string to ensure it starts with 'http' and check if it represents a valid URL. :params host: The host string to format and check. """ @@ -454,7 +452,7 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix def get_external_log_url(self, task_instance: TaskInstance, try_number: int) -> str: """ - Creates an address for an external log collecting service. + Create an address for an external log collecting service. :param task_instance: task instance object :param try_number: task instance try_number to read logs from. @@ -471,7 +469,7 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix def _resolve_nested(self, hit: dict[Any, Any], parent_class=None) -> type[Hit]: """ - Resolves nested hits from Elasticsearch by iteratively navigating the `_nested` field. + Resolve nested hits from Elasticsearch by iteratively navigating the `_nested` field. The result is used to fetch the appropriate document class to handle the hit. diff --git a/airflow/providers/exasol/hooks/exasol.py b/airflow/providers/exasol/hooks/exasol.py index ffadf46072..98955a2579 100644 --- a/airflow/providers/exasol/hooks/exasol.py +++ b/airflow/providers/exasol/hooks/exasol.py @@ -138,7 +138,8 @@ class ExasolHook(DbApiHook): @staticmethod def get_description(statement: ExaStatement) -> Sequence[Sequence]: - """Copied implementation from DB2-API wrapper. + """ + Get description; copied implementation from DB2-API wrapper. For more info, see https://github.com/exasol/pyexasol/blob/master/docs/DBAPI_COMPAT.md#db-api-20-wrapper diff --git a/airflow/providers/facebook/ads/hooks/ads.py b/airflow/providers/facebook/ads/hooks/ads.py index 074dd1154e..b8b9c8526d 100644 --- a/airflow/providers/facebook/ads/hooks/ads.py +++ b/airflow/providers/facebook/ads/hooks/ads.py @@ -73,7 +73,7 @@ class FacebookAdsReportingHook(BaseHook): self.client_required_fields = ["app_id", "app_secret", "access_token", "account_id"] def _get_service(self) -> FacebookAdsApi: - """Returns Facebook Ads Client using a service account.""" + """Return Facebook Ads Client using a service account.""" config = self.facebook_ads_config return FacebookAdsApi.init( app_id=config["app_id"], @@ -84,7 +84,7 @@ class FacebookAdsReportingHook(BaseHook): @cached_property def multiple_accounts(self) -> bool: - """Checks whether provided account_id in the Facebook Ads Connection is provided as a list.""" + """Check whether provided account_id in the Facebook Ads Connection is provided as a list.""" return isinstance(self.facebook_ads_config["account_id"], list) @cached_property diff --git a/airflow/providers/ftp/hooks/ftp.py b/airflow/providers/ftp/hooks/ftp.py index ea94b32c47..eb77fd3fa4 100644 --- a/airflow/providers/ftp/hooks/ftp.py +++ b/airflow/providers/ftp/hooks/ftp.py @@ -55,7 +55,7 @@ class FTPHook(BaseHook): self.close_conn() def get_conn(self) -> ftplib.FTP: - """Returns a FTP connection object.""" + """Return an FTP connection object.""" if self.conn is None: params = self.get_connection(self.ftp_conn_id) pasv = params.extra_dejson.get("passive", True) @@ -65,7 +65,7 @@ class FTPHook(BaseHook): return self.conn def close_conn(self): - """Closes the connection; an error will occur if the connection was never opened.""" + """Close the connection; an error will occur if the connection was never opened.""" conn = self.conn conn.quit() self.conn = None @@ -83,7 +83,7 @@ class FTPHook(BaseHook): def list_directory(self, path: str) -> list[str]: """ - Returns a list of files on the remote system. + Return a list of files on the remote system. :param path: full path to the remote directory to list """ @@ -95,7 +95,7 @@ class FTPHook(BaseHook): def create_directory(self, path: str) -> None: """ - Creates a directory on the remote system. + Create a directory on the remote system. :param path: full path to the remote directory to create """ @@ -104,7 +104,7 @@ class FTPHook(BaseHook): def delete_directory(self, path: str) -> None: """ - Deletes a directory on the remote system. + Delete a directory on the remote system. :param path: full path to the remote directory to delete """ @@ -119,7 +119,7 @@ class FTPHook(BaseHook): block_size: int = 8192, ) -> None: """ - Transfers the remote file to a local location. + Transfer the remote file to a local location. If local_full_path_or_buffer is a string path, the file will be put at that location; if it is a file-like buffer, the file will @@ -221,7 +221,7 @@ class FTPHook(BaseHook): def delete_file(self, path: str) -> None: """ - Removes a file on the FTP Server. + Remove a file on the FTP Server. :param path: full path to the remote file """ @@ -240,7 +240,7 @@ class FTPHook(BaseHook): def get_mod_time(self, path: str) -> datetime.datetime: """ - Returns a datetime object representing the last time the file was modified. + Return a datetime object representing the last time the file was modified. :param path: remote file path """ @@ -255,7 +255,7 @@ class FTPHook(BaseHook): def get_size(self, path: str) -> int | None: """ - Returns the size of a file (in bytes). + Return the size of a file (in bytes). :param path: remote file path """ @@ -277,7 +277,7 @@ class FTPSHook(FTPHook): """Interact with FTPS.""" def get_conn(self) -> ftplib.FTP: - """Returns a FTPS connection object.""" + """Return an FTPS connection object.""" if self.conn is None: params = self.get_connection(self.ftp_conn_id) pasv = params.extra_dejson.get("passive", True) diff --git a/airflow/providers/ftp/operators/ftp.py b/airflow/providers/ftp/operators/ftp.py index 4489839d8f..5118aa667c 100644 --- a/airflow/providers/ftp/operators/ftp.py +++ b/airflow/providers/ftp/operators/ftp.py @@ -139,7 +139,7 @@ class FTPFileTransmitOperator(BaseOperator): def get_openlineage_facets_on_start(self): """ - Returns OpenLineage datasets. + Return OpenLineage datasets. Dataset will have the following structure: input: file://hostname/path diff --git a/airflow/providers/github/hooks/github.py b/airflow/providers/github/hooks/github.py index 5b89830dee..b4a4732850 100644 --- a/airflow/providers/github/hooks/github.py +++ b/airflow/providers/github/hooks/github.py @@ -47,7 +47,7 @@ class GithubHook(BaseHook): self.get_conn() def get_conn(self) -> GithubClient: - """Function that initiates a new GitHub connection with token and hostname (for GitHub Enterprise).""" + """Initiate a new GitHub connection with token and hostname (for GitHub Enterprise).""" if self.client is not None: return self.client @@ -70,7 +70,7 @@ class GithubHook(BaseHook): @staticmethod def get_ui_field_behaviour() -> dict: - """Returns custom field behaviour.""" + """Return custom field behaviour.""" return { "hidden_fields": ["schema", "port", "login", "extra"], "relabeling": {"host": "GitHub Enterprise URL (Optional)", "password": "GitHub Access Token"}, diff --git a/airflow/providers/github/sensors/github.py b/airflow/providers/github/sensors/github.py index 868af7361c..f742121b05 100644 --- a/airflow/providers/github/sensors/github.py +++ b/airflow/providers/github/sensors/github.py @@ -91,13 +91,13 @@ class BaseGithubRepositorySensor(GithubSensor): ) def poke(self, context: Context) -> bool: - """Function that the sensors defined while deriving this class should override.""" + """Check sensor status; sensors defined while deriving this class should override.""" raise AirflowException("Override me.") class GithubTagSensor(BaseGithubRepositorySensor): """ - Monitors a github tag for its creation. + Monitor a github tag for its creation. :param github_conn_id: reference to a pre-defined GitHub Connection :param tag_name: name of the tag to be monitored @@ -128,7 +128,7 @@ class GithubTagSensor(BaseGithubRepositorySensor): return GithubSensor.poke(self, context=context) def tag_checker(self, repo: Any) -> bool | None: - """Checking existence of Tag in a Repository.""" + """Check existence of Tag in a Repository.""" result = None try: if repo is not None and self.tag_name is not None: