This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 42e13e1  Remove all deprecation warnings in providers (#17900)
42e13e1 is described below

commit 42e13e1a5a4c97a2085ddf96f7d93e7bf71949b8
Author: Jarek Potiuk <[email protected]>
AuthorDate: Mon Aug 30 13:48:24 2021 +0200

    Remove all deprecation warnings in providers (#17900)
    
    There were a number of deprecation warnings when just importing
    certain providers or their examples. Some of them came from still
    using 1.10 classes, some from using still `apply_defaults` and
    some from badly implemented fallback mechanism to support
    backwards compatibility.
    
    This has all been fixed and our documentation generation step
    for providers in CI will also fail in case there are some new
    DeprecationWarnings generated, which means that they will have
    to be fixed before merging.
    
    While checking that we found that BigQuery table_resource new
    approach had some inconsistencies when passing new `table_resource`
    parameters. Some deprecated parameters should be None by default,
    otherwise you'd have to explicitly set them to None when
    `table_resource` is set but when it is not set, the backwards
    compatible behaviour is maintained - the parameters get the same.
    default values set.
    
    Several deprecation warnings had wrong stack-level - this has been
    fixed as well.
    
    The "legitimate" warnings are now filtered out and they are only
    filtered out for the appropriate modules we know they are generated
    from or in case the warnings result from direct import of the
    deprecated module.
---
 .../amazon/aws/operators/dms_create_task.py        |   2 -
 .../amazon/aws/operators/dms_delete_task.py        |   2 -
 .../amazon/aws/operators/dms_describe_tasks.py     |   2 -
 .../amazon/aws/operators/dms_start_task.py         |   2 -
 .../amazon/aws/operators/dms_stop_task.py          |   2 -
 .../amazon/aws/operators/emr_containers.py         |   2 -
 airflow/providers/amazon/aws/sensors/dms_task.py   |   3 -
 airflow/providers/apache/drill/operators/drill.py  |   2 -
 .../backcompat/backwards_compat_converters.py      |   6 +-
 .../example_dags/example_bigquery_operations.py    |  38 +-
 .../cloud/example_dags/example_bigquery_queries.py |  40 +-
 .../cloud/example_dags/example_bigquery_sensors.py |  12 +-
 .../cloud/example_dags/example_datacatalog.py      |   2 +-
 .../google/cloud/example_dags/example_dataflow.py  |  51 +--
 .../google/cloud/example_dags/example_dataproc.py  |  22 +-
 .../example_dags/example_facebook_ads_to_gcs.py    |  12 +-
 .../cloud/example_dags/example_gdrive_to_gcs.py    |   4 +-
 .../cloud/example_dags/example_presto_to_gcs.py    |  64 ++-
 .../example_dags/example_salesforce_to_gcs.py      |  12 +-
 .../cloud/example_dags/example_trino_to_gcs.py     |  64 ++-
 airflow/providers/google/cloud/hooks/dataproc.py   |  22 +-
 .../providers/google/cloud/operators/bigquery.py   |  26 +-
 .../providers/google/cloud/operators/dataproc.py   |   6 +-
 airflow/providers/google/cloud/sensors/dataproc.py |   2 +-
 .../google/cloud/transfers/bigquery_to_mssql.py    |   2 -
 .../google/cloud/utils/mlengine_operator_utils.py  |   8 +-
 .../firebase/example_dags/example_firestore.py     |  32 +-
 .../example_dags/example_singularity.py            |   2 +-
 dev/import_all_classes.py                          |  37 +-
 dev/provider_packages/prepare_provider_packages.py | 435 ++++++++++++---------
 .../operators/cloud/bigquery.rst                   |  17 -
 tests/always/test_project_structure.py             |  46 +--
 .../google/cloud/operators/test_mlengine_utils.py  |  25 +-
 .../cloud/utils/test_mlengine_operator_utils.py    |  44 +--
 34 files changed, 607 insertions(+), 441 deletions(-)

diff --git a/airflow/providers/amazon/aws/operators/dms_create_task.py 
b/airflow/providers/amazon/aws/operators/dms_create_task.py
index 675070a..6eac3d5 100644
--- a/airflow/providers/amazon/aws/operators/dms_create_task.py
+++ b/airflow/providers/amazon/aws/operators/dms_create_task.py
@@ -20,7 +20,6 @@ from typing import Optional
 
 from airflow.models import BaseOperator
 from airflow.providers.amazon.aws.hooks.dms import DmsHook
-from airflow.utils.decorators import apply_defaults
 
 
 class DmsCreateTaskOperator(BaseOperator):
@@ -68,7 +67,6 @@ class DmsCreateTaskOperator(BaseOperator):
         "create_task_kwargs": "json",
     }
 
-    @apply_defaults
     def __init__(
         self,
         *,
diff --git a/airflow/providers/amazon/aws/operators/dms_delete_task.py 
b/airflow/providers/amazon/aws/operators/dms_delete_task.py
index fb3cda7..6f12d2b 100644
--- a/airflow/providers/amazon/aws/operators/dms_delete_task.py
+++ b/airflow/providers/amazon/aws/operators/dms_delete_task.py
@@ -20,7 +20,6 @@ from typing import Optional
 
 from airflow.models import BaseOperator
 from airflow.providers.amazon.aws.hooks.dms import DmsHook
-from airflow.utils.decorators import apply_defaults
 
 
 class DmsDeleteTaskOperator(BaseOperator):
@@ -45,7 +44,6 @@ class DmsDeleteTaskOperator(BaseOperator):
     template_ext = ()
     template_fields_renderers = {}
 
-    @apply_defaults
     def __init__(
         self,
         *,
diff --git a/airflow/providers/amazon/aws/operators/dms_describe_tasks.py 
b/airflow/providers/amazon/aws/operators/dms_describe_tasks.py
index 9f41947..cc97044 100644
--- a/airflow/providers/amazon/aws/operators/dms_describe_tasks.py
+++ b/airflow/providers/amazon/aws/operators/dms_describe_tasks.py
@@ -20,7 +20,6 @@ from typing import Optional
 
 from airflow.models import BaseOperator
 from airflow.providers.amazon.aws.hooks.dms import DmsHook
-from airflow.utils.decorators import apply_defaults
 
 
 class DmsDescribeTasksOperator(BaseOperator):
@@ -41,7 +40,6 @@ class DmsDescribeTasksOperator(BaseOperator):
     template_ext = ()
     template_fields_renderers = {'describe_tasks_kwargs': 'json'}
 
-    @apply_defaults
     def __init__(
         self,
         *,
diff --git a/airflow/providers/amazon/aws/operators/dms_start_task.py 
b/airflow/providers/amazon/aws/operators/dms_start_task.py
index 50ae6ad..a2ce635 100644
--- a/airflow/providers/amazon/aws/operators/dms_start_task.py
+++ b/airflow/providers/amazon/aws/operators/dms_start_task.py
@@ -20,7 +20,6 @@ from typing import Optional
 
 from airflow.models import BaseOperator
 from airflow.providers.amazon.aws.hooks.dms import DmsHook
-from airflow.utils.decorators import apply_defaults
 
 
 class DmsStartTaskOperator(BaseOperator):
@@ -54,7 +53,6 @@ class DmsStartTaskOperator(BaseOperator):
     template_ext = ()
     template_fields_renderers = {'start_task_kwargs': 'json'}
 
-    @apply_defaults
     def __init__(
         self,
         *,
diff --git a/airflow/providers/amazon/aws/operators/dms_stop_task.py 
b/airflow/providers/amazon/aws/operators/dms_stop_task.py
index c7c1aa2..ea45b58 100644
--- a/airflow/providers/amazon/aws/operators/dms_stop_task.py
+++ b/airflow/providers/amazon/aws/operators/dms_stop_task.py
@@ -20,7 +20,6 @@ from typing import Optional
 
 from airflow.models import BaseOperator
 from airflow.providers.amazon.aws.hooks.dms import DmsHook
-from airflow.utils.decorators import apply_defaults
 
 
 class DmsStopTaskOperator(BaseOperator):
@@ -41,7 +40,6 @@ class DmsStopTaskOperator(BaseOperator):
     template_ext = ()
     template_fields_renderers = {}
 
-    @apply_defaults
     def __init__(
         self,
         *,
diff --git a/airflow/providers/amazon/aws/operators/emr_containers.py 
b/airflow/providers/amazon/aws/operators/emr_containers.py
index ca3c936..9466c32 100644
--- a/airflow/providers/amazon/aws/operators/emr_containers.py
+++ b/airflow/providers/amazon/aws/operators/emr_containers.py
@@ -27,7 +27,6 @@ except ImportError:
 
 from airflow.models import BaseOperator
 from airflow.providers.amazon.aws.hooks.emr_containers import EMRContainerHook
-from airflow.utils.decorators import apply_defaults
 
 
 class EMRContainerOperator(BaseOperator):
@@ -63,7 +62,6 @@ class EMRContainerOperator(BaseOperator):
     template_fields = ["name", "virtual_cluster_id", "execution_role_arn", 
"release_label", "job_driver"]
     ui_color = "#f9c915"
 
-    @apply_defaults
     def __init__(  # pylint: disable=too-many-arguments
         self,
         *,
diff --git a/airflow/providers/amazon/aws/sensors/dms_task.py 
b/airflow/providers/amazon/aws/sensors/dms_task.py
index 39af362..32ed4f9 100644
--- a/airflow/providers/amazon/aws/sensors/dms_task.py
+++ b/airflow/providers/amazon/aws/sensors/dms_task.py
@@ -21,7 +21,6 @@ from typing import Iterable, Optional
 from airflow.exceptions import AirflowException
 from airflow.providers.amazon.aws.hooks.dms import DmsHook
 from airflow.sensors.base import BaseSensorOperator
-from airflow.utils.decorators import apply_defaults
 
 
 class DmsTaskBaseSensor(BaseSensorOperator):
@@ -45,7 +44,6 @@ class DmsTaskBaseSensor(BaseSensorOperator):
     template_fields = ['replication_task_arn']
     template_ext = ()
 
-    @apply_defaults
     def __init__(
         self,
         replication_task_arn: str,
@@ -104,7 +102,6 @@ class DmsTaskCompletedSensor(DmsTaskBaseSensor):
     template_fields = ['replication_task_arn']
     template_ext = ()
 
-    @apply_defaults
     def __init__(self, *args, **kwargs):
         super().__init__(*args, **kwargs)
         self.target_statuses = ['stopped']
diff --git a/airflow/providers/apache/drill/operators/drill.py 
b/airflow/providers/apache/drill/operators/drill.py
index 459c623..2c33011 100644
--- a/airflow/providers/apache/drill/operators/drill.py
+++ b/airflow/providers/apache/drill/operators/drill.py
@@ -21,7 +21,6 @@ import sqlparse
 
 from airflow.models import BaseOperator
 from airflow.providers.apache.drill.hooks.drill import DrillHook
-from airflow.utils.decorators import apply_defaults
 
 
 class DrillOperator(BaseOperator):
@@ -48,7 +47,6 @@ class DrillOperator(BaseOperator):
     template_ext = ('.sql',)
     ui_color = '#ededed'
 
-    @apply_defaults
     def __init__(
         self,
         *,
diff --git 
a/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py 
b/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py
index 7c28105..4c6404f 100644
--- 
a/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py
+++ 
b/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py
@@ -23,8 +23,6 @@ from kubernetes.client import ApiClient, models as k8s
 from airflow.exceptions import AirflowException
 from airflow.providers.cncf.kubernetes.backcompat.pod import Port, Resources
 from airflow.providers.cncf.kubernetes.backcompat.pod_runtime_info_env import 
PodRuntimeInfoEnv
-from airflow.providers.cncf.kubernetes.backcompat.volume import Volume
-from airflow.providers.cncf.kubernetes.backcompat.volume_mount import 
VolumeMount
 
 
 def _convert_kube_model_object(obj, old_class, new_class):
@@ -54,6 +52,8 @@ def convert_volume(volume) -> k8s.V1Volume:
     :param volume:
     :return: k8s.V1Volume
     """
+    from airflow.providers.cncf.kubernetes.backcompat.volume import Volume
+
     return _convert_kube_model_object(volume, Volume, k8s.V1Volume)
 
 
@@ -64,6 +64,8 @@ def convert_volume_mount(volume_mount) -> k8s.V1VolumeMount:
     :param volume_mount:
     :return: k8s.V1VolumeMount
     """
+    from airflow.providers.cncf.kubernetes.backcompat.volume_mount import 
VolumeMount
+
     return _convert_kube_model_object(volume_mount, VolumeMount, 
k8s.V1VolumeMount)
 
 
diff --git 
a/airflow/providers/google/cloud/example_dags/example_bigquery_operations.py 
b/airflow/providers/google/cloud/example_dags/example_bigquery_operations.py
index a72ff36..e3e96fe 100644
--- a/airflow/providers/google/cloud/example_dags/example_bigquery_operations.py
+++ b/airflow/providers/google/cloud/example_dags/example_bigquery_operations.py
@@ -33,7 +33,6 @@ from airflow.providers.google.cloud.operators.bigquery import 
(
     BigQueryDeleteTableOperator,
     BigQueryGetDatasetOperator,
     BigQueryGetDatasetTablesOperator,
-    BigQueryPatchDatasetOperator,
     BigQueryUpdateDatasetOperator,
     BigQueryUpdateTableOperator,
     BigQueryUpdateTableSchemaOperator,
@@ -134,14 +133,26 @@ with models.DAG(
     # [START howto_operator_bigquery_create_external_table]
     create_external_table = BigQueryCreateExternalTableOperator(
         task_id="create_external_table",
+        table_resource={
+            "tableReference": {
+                "projectId": PROJECT_ID,
+                "datasetId": DATASET_NAME,
+                "tableId": "external_table",
+            },
+            "schema": {
+                "fields": [
+                    {"name": "name", "type": "STRING"},
+                    {"name": "post_abbr", "type": "STRING"},
+                ]
+            },
+            "externalDataConfiguration": {
+                "sourceFormat": "CSV",
+                "compression": "NONE",
+                "csvOptions": {"skipLeadingRows": 1},
+            },
+        },
         bucket=DATA_SAMPLE_GCS_BUCKET_NAME,
         source_objects=[DATA_SAMPLE_GCS_OBJECT_NAME],
-        destination_project_dataset_table=f"{DATASET_NAME}.external_table",
-        skip_leading_rows=1,
-        schema_fields=[
-            {"name": "name", "type": "STRING"},
-            {"name": "post_abbr", "type": "STRING"},
-        ],
     )
     # [END howto_operator_bigquery_create_external_table]
 
@@ -191,17 +202,6 @@ with models.DAG(
     )
     # [END howto_operator_bigquery_update_table]
 
-    # [START howto_operator_bigquery_patch_dataset]
-    patch_dataset = BigQueryPatchDatasetOperator(
-        task_id="patch_dataset",
-        dataset_id=DATASET_NAME,
-        dataset_resource={
-            "friendlyName": "Patched Dataset",
-            "description": "Patched dataset",
-        },
-    )
-    # [END howto_operator_bigquery_patch_dataset]
-
     # [START howto_operator_bigquery_update_dataset]
     update_dataset = BigQueryUpdateDatasetOperator(
         task_id="update_dataset",
@@ -216,7 +216,7 @@ with models.DAG(
     )
     # [END howto_operator_bigquery_delete_dataset]
 
-    create_dataset >> patch_dataset >> update_dataset >> get_dataset >> 
get_dataset_result >> delete_dataset
+    create_dataset >> update_dataset >> get_dataset >> get_dataset_result >> 
delete_dataset
 
     (
         update_dataset
diff --git 
a/airflow/providers/google/cloud/example_dags/example_bigquery_queries.py 
b/airflow/providers/google/cloud/example_dags/example_bigquery_queries.py
index 24788c9..06946fa 100644
--- a/airflow/providers/google/cloud/example_dags/example_bigquery_queries.py
+++ b/airflow/providers/google/cloud/example_dags/example_bigquery_queries.py
@@ -29,7 +29,6 @@ from airflow.providers.google.cloud.operators.bigquery import 
(
     BigQueryCreateEmptyDatasetOperator,
     BigQueryCreateEmptyTableOperator,
     BigQueryDeleteDatasetOperator,
-    BigQueryExecuteQueryOperator,
     BigQueryGetDataOperator,
     BigQueryInsertJobOperator,
     BigQueryIntervalCheckOperator,
@@ -125,25 +124,40 @@ for location in [None, LOCATION]:
         )
         # [END howto_operator_bigquery_select_job]
 
-        execute_insert_query = BigQueryExecuteQueryOperator(
-            task_id="execute_insert_query", sql=INSERT_ROWS_QUERY, 
use_legacy_sql=False, location=location
+        execute_insert_query = BigQueryInsertJobOperator(
+            task_id="execute_insert_query",
+            configuration={
+                "query": {
+                    "query": INSERT_ROWS_QUERY,
+                    "useLegacySql": False,
+                }
+            },
+            location=location,
         )
 
-        bigquery_execute_multi_query = BigQueryExecuteQueryOperator(
+        bigquery_execute_multi_query = BigQueryInsertJobOperator(
             task_id="execute_multi_query",
-            sql=[
-                f"SELECT * FROM {DATASET_NAME}.{TABLE_2}",
-                f"SELECT COUNT(*) FROM {DATASET_NAME}.{TABLE_2}",
-            ],
-            use_legacy_sql=False,
+            configuration={
+                "query": {
+                    "query": [
+                        f"SELECT * FROM {DATASET_NAME}.{TABLE_2}",
+                        f"SELECT COUNT(*) FROM {DATASET_NAME}.{TABLE_2}",
+                    ],
+                    "useLegacySql": False,
+                }
+            },
             location=location,
         )
 
-        execute_query_save = BigQueryExecuteQueryOperator(
+        execute_query_save = BigQueryInsertJobOperator(
             task_id="execute_query_save",
-            sql=f"SELECT * FROM {DATASET_NAME}.{TABLE_1}",
-            use_legacy_sql=False,
-            destination_dataset_table=f"{DATASET_NAME}.{TABLE_2}",
+            configuration={
+                "query": {
+                    "query": f"SELECT * FROM {DATASET_NAME}.{TABLE_1}",
+                    "useLegacySql": False,
+                    "destinationTable": f"{DATASET_NAME}.{TABLE_2}",
+                }
+            },
             location=location,
         )
 
diff --git 
a/airflow/providers/google/cloud/example_dags/example_bigquery_sensors.py 
b/airflow/providers/google/cloud/example_dags/example_bigquery_sensors.py
index 4423b1c..cde6cd9 100644
--- a/airflow/providers/google/cloud/example_dags/example_bigquery_sensors.py
+++ b/airflow/providers/google/cloud/example_dags/example_bigquery_sensors.py
@@ -27,7 +27,7 @@ from airflow.providers.google.cloud.operators.bigquery import 
(
     BigQueryCreateEmptyDatasetOperator,
     BigQueryCreateEmptyTableOperator,
     BigQueryDeleteDatasetOperator,
-    BigQueryExecuteQueryOperator,
+    BigQueryInsertJobOperator,
 )
 from airflow.providers.google.cloud.sensors.bigquery import (
     BigQueryTableExistenceSensor,
@@ -80,8 +80,14 @@ with models.DAG(
     )
     # [END howto_sensor_bigquery_table]
 
-    execute_insert_query = BigQueryExecuteQueryOperator(
-        task_id="execute_insert_query", sql=INSERT_ROWS_QUERY, 
use_legacy_sql=False
+    execute_insert_query = BigQueryInsertJobOperator(
+        task_id="execute_insert_query",
+        configuration={
+            "query": {
+                "query": INSERT_ROWS_QUERY,
+                "useLegacySql": False,
+            }
+        },
     )
 
     # [START howto_sensor_bigquery_table_partition]
diff --git a/airflow/providers/google/cloud/example_dags/example_datacatalog.py 
b/airflow/providers/google/cloud/example_dags/example_datacatalog.py
index a764c11..3457805 100644
--- a/airflow/providers/google/cloud/example_dags/example_datacatalog.py
+++ b/airflow/providers/google/cloud/example_dags/example_datacatalog.py
@@ -22,6 +22,7 @@ Example Airflow DAG that interacts with Google Data Catalog 
service
 from google.cloud.datacatalog_v1beta1 import FieldType, TagField, 
TagTemplateField
 
 from airflow import models
+from airflow.models.baseoperator import chain
 from airflow.operators.bash import BashOperator
 from airflow.providers.google.cloud.operators.datacatalog import (
     CloudDataCatalogCreateEntryGroupOperator,
@@ -47,7 +48,6 @@ from airflow.providers.google.cloud.operators.datacatalog 
import (
     CloudDataCatalogUpdateTagTemplateOperator,
 )
 from airflow.utils.dates import days_ago
-from airflow.utils.helpers import chain
 
 PROJECT_ID = "polidea-airflow"
 LOCATION = "us-central1"
diff --git a/airflow/providers/google/cloud/example_dags/example_dataflow.py 
b/airflow/providers/google/cloud/example_dags/example_dataflow.py
index 6e58ff4..1761cba 100644
--- a/airflow/providers/google/cloud/example_dags/example_dataflow.py
+++ b/airflow/providers/google/cloud/example_dags/example_dataflow.py
@@ -25,11 +25,13 @@ from urllib.parse import urlparse
 
 from airflow import models
 from airflow.exceptions import AirflowException
+from airflow.providers.apache.beam.operators.beam import (
+    BeamRunJavaPipelineOperator,
+    BeamRunPythonPipelineOperator,
+)
 from airflow.providers.google.cloud.hooks.dataflow import DataflowJobStatus
 from airflow.providers.google.cloud.operators.dataflow import (
     CheckJobRunning,
-    DataflowCreateJavaJobOperator,
-    DataflowCreatePythonJobOperator,
     DataflowTemplatedJobStartOperator,
 )
 from airflow.providers.google.cloud.sensors.dataflow import (
@@ -66,17 +68,18 @@ with models.DAG(
 ) as dag_native_java:
 
     # [START howto_operator_start_java_job_jar_on_gcs]
-    start_java_job = DataflowCreateJavaJobOperator(
+    start_java_job = BeamRunJavaPipelineOperator(
         task_id="start-java-job",
         jar=GCS_JAR,
-        job_name='{{task.task_id}}',
-        options={
+        pipeline_options={
             'output': GCS_OUTPUT,
         },
-        poll_sleep=10,
         job_class='org.apache.beam.examples.WordCount',
-        check_if_running=CheckJobRunning.IgnoreJob,
-        location='europe-west3',
+        dataflow_config={
+            "check_if_running": CheckJobRunning.IgnoreJob,
+            "location": 'europe-west3',
+            "poll_sleep": 10,
+        },
     )
     # [END howto_operator_start_java_job_jar_on_gcs]
 
@@ -88,16 +91,18 @@ with models.DAG(
         filename="/tmp/dataflow-{{ ds_nodash }}.jar",
     )
 
-    start_java_job_local = DataflowCreateJavaJobOperator(
+    start_java_job_local = BeamRunJavaPipelineOperator(
         task_id="start-java-job-local",
         jar="/tmp/dataflow-{{ ds_nodash }}.jar",
-        job_name='{{task.task_id}}',
-        options={
+        pipeline_options={
             'output': GCS_OUTPUT,
         },
-        poll_sleep=10,
         job_class='org.apache.beam.examples.WordCount',
-        check_if_running=CheckJobRunning.WaitForRun,
+        dataflow_config={
+            "check_if_running": CheckJobRunning.WaitForRun,
+            "location": 'europe-west3',
+            "poll_sleep": 10,
+        },
     )
     jar_to_local >> start_java_job_local
     # [END howto_operator_start_java_job_local_jar]
@@ -111,27 +116,25 @@ with models.DAG(
 ) as dag_native_python:
 
     # [START howto_operator_start_python_job]
-    start_python_job = DataflowCreatePythonJobOperator(
+    start_python_job = BeamRunPythonPipelineOperator(
         task_id="start-python-job",
         py_file=GCS_PYTHON,
         py_options=[],
-        job_name='{{task.task_id}}',
-        options={
+        pipeline_options={
             'output': GCS_OUTPUT,
         },
         py_requirements=['apache-beam[gcp]==2.21.0'],
         py_interpreter='python3',
         py_system_site_packages=False,
-        location='europe-west3',
+        dataflow_config={'location': 'europe-west3'},
     )
     # [END howto_operator_start_python_job]
 
-    start_python_job_local = DataflowCreatePythonJobOperator(
+    start_python_job_local = BeamRunPythonPipelineOperator(
         task_id="start-python-job-local",
         py_file='apache_beam.examples.wordcount',
         py_options=['-m'],
-        job_name='{{task.task_id}}',
-        options={
+        pipeline_options={
             'output': GCS_OUTPUT,
         },
         py_requirements=['apache-beam[gcp]==2.14.0'],
@@ -147,19 +150,17 @@ with models.DAG(
     tags=['example'],
 ) as dag_native_python_async:
     # [START howto_operator_start_python_job_async]
-    start_python_job_async = DataflowCreatePythonJobOperator(
+    start_python_job_async = BeamRunPythonPipelineOperator(
         task_id="start-python-job-async",
         py_file=GCS_PYTHON,
         py_options=[],
-        job_name='{{task.task_id}}',
-        options={
+        pipeline_options={
             'output': GCS_OUTPUT,
         },
         py_requirements=['apache-beam[gcp]==2.25.0'],
         py_interpreter='python3',
         py_system_site_packages=False,
-        location='europe-west3',
-        wait_until_finished=False,
+        dataflow_config={"location": 'europe-west3', "wait_until_finished": 
False},
     )
     # [END howto_operator_start_python_job_async]
 
diff --git a/airflow/providers/google/cloud/example_dags/example_dataproc.py 
b/airflow/providers/google/cloud/example_dags/example_dataproc.py
index 914df0e..9694eb8 100644
--- a/airflow/providers/google/cloud/example_dags/example_dataproc.py
+++ b/airflow/providers/google/cloud/example_dags/example_dataproc.py
@@ -170,7 +170,7 @@ with models.DAG("example_gcp_dataproc", 
start_date=days_ago(1), schedule_interva
         update_mask=UPDATE_MASK,
         graceful_decommission_timeout=TIMEOUT,
         project_id=PROJECT_ID,
-        location=REGION,
+        region=REGION,
     )
     # [END how_to_cloud_dataproc_update_cluster_operator]
 
@@ -179,7 +179,7 @@ with models.DAG("example_gcp_dataproc", 
start_date=days_ago(1), schedule_interva
         task_id="create_workflow_template",
         template=WORKFLOW_TEMPLATE,
         project_id=PROJECT_ID,
-        location=REGION,
+        region=REGION,
     )
     # [END how_to_cloud_dataproc_create_workflow_template]
 
@@ -190,24 +190,24 @@ with models.DAG("example_gcp_dataproc", 
start_date=days_ago(1), schedule_interva
     # [END how_to_cloud_dataproc_trigger_workflow_template]
 
     pig_task = DataprocSubmitJobOperator(
-        task_id="pig_task", job=PIG_JOB, location=REGION, project_id=PROJECT_ID
+        task_id="pig_task", job=PIG_JOB, region=REGION, project_id=PROJECT_ID
     )
     spark_sql_task = DataprocSubmitJobOperator(
-        task_id="spark_sql_task", job=SPARK_SQL_JOB, location=REGION, 
project_id=PROJECT_ID
+        task_id="spark_sql_task", job=SPARK_SQL_JOB, region=REGION, 
project_id=PROJECT_ID
     )
 
     spark_task = DataprocSubmitJobOperator(
-        task_id="spark_task", job=SPARK_JOB, location=REGION, 
project_id=PROJECT_ID
+        task_id="spark_task", job=SPARK_JOB, region=REGION, 
project_id=PROJECT_ID
     )
 
     # [START cloud_dataproc_async_submit_sensor]
     spark_task_async = DataprocSubmitJobOperator(
-        task_id="spark_task_async", job=SPARK_JOB, location=REGION, 
project_id=PROJECT_ID, asynchronous=True
+        task_id="spark_task_async", job=SPARK_JOB, region=REGION, 
project_id=PROJECT_ID, asynchronous=True
     )
 
     spark_task_async_sensor = DataprocJobSensor(
         task_id='spark_task_async_sensor_task',
-        location=REGION,
+        region=REGION,
         project_id=PROJECT_ID,
         dataproc_job_id=spark_task_async.output,
         poke_interval=10,
@@ -216,20 +216,20 @@ with models.DAG("example_gcp_dataproc", 
start_date=days_ago(1), schedule_interva
 
     # [START how_to_cloud_dataproc_submit_job_to_cluster_operator]
     pyspark_task = DataprocSubmitJobOperator(
-        task_id="pyspark_task", job=PYSPARK_JOB, location=REGION, 
project_id=PROJECT_ID
+        task_id="pyspark_task", job=PYSPARK_JOB, region=REGION, 
project_id=PROJECT_ID
     )
     # [END how_to_cloud_dataproc_submit_job_to_cluster_operator]
 
     sparkr_task = DataprocSubmitJobOperator(
-        task_id="sparkr_task", job=SPARKR_JOB, location=REGION, 
project_id=PROJECT_ID
+        task_id="sparkr_task", job=SPARKR_JOB, region=REGION, 
project_id=PROJECT_ID
     )
 
     hive_task = DataprocSubmitJobOperator(
-        task_id="hive_task", job=HIVE_JOB, location=REGION, 
project_id=PROJECT_ID
+        task_id="hive_task", job=HIVE_JOB, region=REGION, project_id=PROJECT_ID
     )
 
     hadoop_task = DataprocSubmitJobOperator(
-        task_id="hadoop_task", job=HADOOP_JOB, location=REGION, 
project_id=PROJECT_ID
+        task_id="hadoop_task", job=HADOOP_JOB, region=REGION, 
project_id=PROJECT_ID
     )
 
     # [START how_to_cloud_dataproc_delete_cluster_operator]
diff --git 
a/airflow/providers/google/cloud/example_dags/example_facebook_ads_to_gcs.py 
b/airflow/providers/google/cloud/example_dags/example_facebook_ads_to_gcs.py
index e5d67f0..920935d 100644
--- a/airflow/providers/google/cloud/example_dags/example_facebook_ads_to_gcs.py
+++ b/airflow/providers/google/cloud/example_dags/example_facebook_ads_to_gcs.py
@@ -27,7 +27,7 @@ from airflow.providers.google.cloud.operators.bigquery import 
(
     BigQueryCreateEmptyDatasetOperator,
     BigQueryCreateEmptyTableOperator,
     BigQueryDeleteDatasetOperator,
-    BigQueryExecuteQueryOperator,
+    BigQueryInsertJobOperator,
 )
 from airflow.providers.google.cloud.operators.gcs import 
GCSCreateBucketOperator, GCSDeleteBucketOperator
 from airflow.providers.google.cloud.transfers.facebook_ads_to_gcs import 
FacebookAdsReportToGcsOperator
@@ -105,10 +105,14 @@ with models.DAG(
         write_disposition='WRITE_TRUNCATE',
     )
 
-    read_data_from_gcs_many_chunks = BigQueryExecuteQueryOperator(
+    read_data_from_gcs_many_chunks = BigQueryInsertJobOperator(
         task_id="read_data_from_gcs_many_chunks",
-        sql=f"SELECT COUNT(*) FROM 
`{GCP_PROJECT_ID}.{DATASET_NAME}.{TABLE_NAME}`",
-        use_legacy_sql=False,
+        configuration={
+            "query": {
+                "query": f"SELECT COUNT(*) FROM 
`{GCP_PROJECT_ID}.{DATASET_NAME}.{TABLE_NAME}`",
+                "useLegacySql": False,
+            }
+        },
     )
 
     delete_bucket = GCSDeleteBucketOperator(
diff --git 
a/airflow/providers/google/cloud/example_dags/example_gdrive_to_gcs.py 
b/airflow/providers/google/cloud/example_dags/example_gdrive_to_gcs.py
index 3a604681..974fa66 100644
--- a/airflow/providers/google/cloud/example_dags/example_gdrive_to_gcs.py
+++ b/airflow/providers/google/cloud/example_dags/example_gdrive_to_gcs.py
@@ -44,8 +44,8 @@ with models.DAG(
         task_id="upload_gdrive_object_to_gcs",
         folder_id=FOLDER_ID,
         file_name=FILE_NAME,
-        destination_bucket=BUCKET,
-        destination_object=OBJECT,
+        bucket_name=BUCKET,
+        object_name=OBJECT,
     )
     # [END upload_gdrive_to_gcs]
     detect_file >> upload_gdrive_to_gcs
diff --git 
a/airflow/providers/google/cloud/example_dags/example_presto_to_gcs.py 
b/airflow/providers/google/cloud/example_dags/example_presto_to_gcs.py
index 759c429..cf82a48 100644
--- a/airflow/providers/google/cloud/example_dags/example_presto_to_gcs.py
+++ b/airflow/providers/google/cloud/example_dags/example_presto_to_gcs.py
@@ -26,7 +26,7 @@ from airflow.providers.google.cloud.operators.bigquery import 
(
     BigQueryCreateEmptyDatasetOperator,
     BigQueryCreateExternalTableOperator,
     BigQueryDeleteDatasetOperator,
-    BigQueryExecuteQueryOperator,
+    BigQueryInsertJobOperator,
 )
 from airflow.providers.google.cloud.transfers.presto_to_gcs import 
PrestoToGCSOperator
 from airflow.utils.dates import days_ago
@@ -84,16 +84,37 @@ with models.DAG(
         task_id="create_external_table_multiple_types",
         bucket=GCS_BUCKET,
         source_objects=[f"{safe_name(SOURCE_MULTIPLE_TYPES)}.*.json"],
-        source_format="NEWLINE_DELIMITED_JSON",
-        
destination_project_dataset_table=f"{DATASET_NAME}.{safe_name(SOURCE_MULTIPLE_TYPES)}",
+        table_resource={
+            "tableReference": {
+                "projectId": GCP_PROJECT_ID,
+                "datasetId": DATASET_NAME,
+                "tableId": f"{safe_name(SOURCE_MULTIPLE_TYPES)}",
+            },
+            "schema": {
+                "fields": [
+                    {"name": "name", "type": "STRING"},
+                    {"name": "post_abbr", "type": "STRING"},
+                ]
+            },
+            "externalDataConfiguration": {
+                "sourceFormat": "NEWLINE_DELIMITED_JSON",
+                "compression": "NONE",
+                "csvOptions": {"skipLeadingRows": 1},
+            },
+        },
         schema_object=f"{safe_name(SOURCE_MULTIPLE_TYPES)}-schema.json",
     )
     # [END howto_operator_create_external_table_multiple_types]
 
-    read_data_from_gcs_multiple_types = BigQueryExecuteQueryOperator(
+    read_data_from_gcs_multiple_types = BigQueryInsertJobOperator(
         task_id="read_data_from_gcs_multiple_types",
-        sql=f"SELECT COUNT(*) FROM 
`{GCP_PROJECT_ID}.{DATASET_NAME}.{safe_name(SOURCE_MULTIPLE_TYPES)}`",
-        use_legacy_sql=False,
+        configuration={
+            "query": {
+                "query": f"SELECT COUNT(*) FROM 
`{GCP_PROJECT_ID}.{DATASET_NAME}."
+                f"{safe_name(SOURCE_MULTIPLE_TYPES)}`",
+                "useLegacySql": False,
+            }
+        },
     )
 
     # [START howto_operator_presto_to_gcs_many_chunks]
@@ -111,17 +132,38 @@ with models.DAG(
     create_external_table_many_chunks = BigQueryCreateExternalTableOperator(
         task_id="create_external_table_many_chunks",
         bucket=GCS_BUCKET,
+        table_resource={
+            "tableReference": {
+                "projectId": GCP_PROJECT_ID,
+                "datasetId": DATASET_NAME,
+                "tableId": f"{safe_name(SOURCE_CUSTOMER_TABLE)}",
+            },
+            "schema": {
+                "fields": [
+                    {"name": "name", "type": "STRING"},
+                    {"name": "post_abbr", "type": "STRING"},
+                ]
+            },
+            "externalDataConfiguration": {
+                "sourceFormat": "NEWLINE_DELIMITED_JSON",
+                "compression": "NONE",
+                "csvOptions": {"skipLeadingRows": 1},
+            },
+        },
         source_objects=[f"{safe_name(SOURCE_CUSTOMER_TABLE)}.*.json"],
-        source_format="NEWLINE_DELIMITED_JSON",
-        
destination_project_dataset_table=f"{DATASET_NAME}.{safe_name(SOURCE_CUSTOMER_TABLE)}",
         schema_object=f"{safe_name(SOURCE_CUSTOMER_TABLE)}-schema.json",
     )
 
     # [START howto_operator_read_data_from_gcs_many_chunks]
-    read_data_from_gcs_many_chunks = BigQueryExecuteQueryOperator(
+    read_data_from_gcs_many_chunks = BigQueryInsertJobOperator(
         task_id="read_data_from_gcs_many_chunks",
-        sql=f"SELECT COUNT(*) FROM 
`{GCP_PROJECT_ID}.{DATASET_NAME}.{safe_name(SOURCE_CUSTOMER_TABLE)}`",
-        use_legacy_sql=False,
+        configuration={
+            "query": {
+                "query": f"SELECT COUNT(*) FROM 
`{GCP_PROJECT_ID}.{DATASET_NAME}."
+                f"{safe_name(SOURCE_CUSTOMER_TABLE)}`",
+                "useLegacySql": False,
+            }
+        },
     )
     # [END howto_operator_read_data_from_gcs_many_chunks]
 
diff --git 
a/airflow/providers/google/cloud/example_dags/example_salesforce_to_gcs.py 
b/airflow/providers/google/cloud/example_dags/example_salesforce_to_gcs.py
index a49b267..be28864 100644
--- a/airflow/providers/google/cloud/example_dags/example_salesforce_to_gcs.py
+++ b/airflow/providers/google/cloud/example_dags/example_salesforce_to_gcs.py
@@ -25,7 +25,7 @@ from airflow.providers.google.cloud.operators.bigquery import 
(
     BigQueryCreateEmptyDatasetOperator,
     BigQueryCreateEmptyTableOperator,
     BigQueryDeleteDatasetOperator,
-    BigQueryExecuteQueryOperator,
+    BigQueryInsertJobOperator,
 )
 from airflow.providers.google.cloud.operators.gcs import 
GCSCreateBucketOperator, GCSDeleteBucketOperator
 from airflow.providers.google.cloud.transfers.gcs_to_bigquery import 
GCSToBigQueryOperator
@@ -98,10 +98,14 @@ with models.DAG(
         write_disposition='WRITE_TRUNCATE',
     )
 
-    read_data_from_gcs = BigQueryExecuteQueryOperator(
+    read_data_from_gcs = BigQueryInsertJobOperator(
         task_id="read_data_from_gcs",
-        sql=f"SELECT COUNT(*) FROM 
`{GCP_PROJECT_ID}.{DATASET_NAME}.{TABLE_NAME}`",
-        use_legacy_sql=False,
+        configuration={
+            "query": {
+                "query": f"SELECT COUNT(*) FROM 
`{GCP_PROJECT_ID}.{DATASET_NAME}.{TABLE_NAME}`",
+                "useLegacySql": False,
+            }
+        },
     )
 
     delete_bucket = GCSDeleteBucketOperator(
diff --git 
a/airflow/providers/google/cloud/example_dags/example_trino_to_gcs.py 
b/airflow/providers/google/cloud/example_dags/example_trino_to_gcs.py
index 209c51e..0d8ef02 100644
--- a/airflow/providers/google/cloud/example_dags/example_trino_to_gcs.py
+++ b/airflow/providers/google/cloud/example_dags/example_trino_to_gcs.py
@@ -26,7 +26,7 @@ from airflow.providers.google.cloud.operators.bigquery import 
(
     BigQueryCreateEmptyDatasetOperator,
     BigQueryCreateExternalTableOperator,
     BigQueryDeleteDatasetOperator,
-    BigQueryExecuteQueryOperator,
+    BigQueryInsertJobOperator,
 )
 from airflow.providers.google.cloud.transfers.trino_to_gcs import 
TrinoToGCSOperator
 from airflow.utils.dates import days_ago
@@ -83,17 +83,38 @@ with models.DAG(
     create_external_table_multiple_types = BigQueryCreateExternalTableOperator(
         task_id="create_external_table_multiple_types",
         bucket=GCS_BUCKET,
+        table_resource={
+            "tableReference": {
+                "projectId": GCP_PROJECT_ID,
+                "datasetId": DATASET_NAME,
+                "tableId": f"{safe_name(SOURCE_MULTIPLE_TYPES)}",
+            },
+            "schema": {
+                "fields": [
+                    {"name": "name", "type": "STRING"},
+                    {"name": "post_abbr", "type": "STRING"},
+                ]
+            },
+            "externalDataConfiguration": {
+                "sourceFormat": "NEWLINE_DELIMITED_JSON",
+                "compression": "NONE",
+                "csvOptions": {"skipLeadingRows": 1},
+            },
+        },
         source_objects=[f"{safe_name(SOURCE_MULTIPLE_TYPES)}.*.json"],
-        source_format="NEWLINE_DELIMITED_JSON",
-        
destination_project_dataset_table=f"{DATASET_NAME}.{safe_name(SOURCE_MULTIPLE_TYPES)}",
         schema_object=f"{safe_name(SOURCE_MULTIPLE_TYPES)}-schema.json",
     )
     # [END howto_operator_create_external_table_multiple_types]
 
-    read_data_from_gcs_multiple_types = BigQueryExecuteQueryOperator(
+    read_data_from_gcs_multiple_types = BigQueryInsertJobOperator(
         task_id="read_data_from_gcs_multiple_types",
-        sql=f"SELECT COUNT(*) FROM 
`{GCP_PROJECT_ID}.{DATASET_NAME}.{safe_name(SOURCE_MULTIPLE_TYPES)}`",
-        use_legacy_sql=False,
+        configuration={
+            "query": {
+                "query": f"SELECT COUNT(*) FROM 
`{GCP_PROJECT_ID}.{DATASET_NAME}."
+                f"{safe_name(SOURCE_MULTIPLE_TYPES)}`",
+                "useLegacySql": False,
+            }
+        },
     )
 
     # [START howto_operator_trino_to_gcs_many_chunks]
@@ -111,17 +132,38 @@ with models.DAG(
     create_external_table_many_chunks = BigQueryCreateExternalTableOperator(
         task_id="create_external_table_many_chunks",
         bucket=GCS_BUCKET,
+        table_resource={
+            "tableReference": {
+                "projectId": GCP_PROJECT_ID,
+                "datasetId": DATASET_NAME,
+                "tableId": f"{safe_name(SOURCE_CUSTOMER_TABLE)}",
+            },
+            "schema": {
+                "fields": [
+                    {"name": "name", "type": "STRING"},
+                    {"name": "post_abbr", "type": "STRING"},
+                ]
+            },
+            "externalDataConfiguration": {
+                "sourceFormat": "NEWLINE_DELIMITED_JSON",
+                "compression": "NONE",
+                "csvOptions": {"skipLeadingRows": 1},
+            },
+        },
         source_objects=[f"{safe_name(SOURCE_CUSTOMER_TABLE)}.*.json"],
-        source_format="NEWLINE_DELIMITED_JSON",
-        
destination_project_dataset_table=f"{DATASET_NAME}.{safe_name(SOURCE_CUSTOMER_TABLE)}",
         schema_object=f"{safe_name(SOURCE_CUSTOMER_TABLE)}-schema.json",
     )
 
     # [START howto_operator_read_data_from_gcs_many_chunks]
-    read_data_from_gcs_many_chunks = BigQueryExecuteQueryOperator(
+    read_data_from_gcs_many_chunks = BigQueryInsertJobOperator(
         task_id="read_data_from_gcs_many_chunks",
-        sql=f"SELECT COUNT(*) FROM 
`{GCP_PROJECT_ID}.{DATASET_NAME}.{safe_name(SOURCE_CUSTOMER_TABLE)}`",
-        use_legacy_sql=False,
+        configuration={
+            "query": {
+                "query": f"SELECT COUNT(*) FROM 
`{GCP_PROJECT_ID}.{DATASET_NAME}."
+                f"{safe_name(SOURCE_CUSTOMER_TABLE)}`",
+                "useLegacySql": False,
+            }
+        },
     )
     # [END howto_operator_read_data_from_gcs_many_chunks]
 
diff --git a/airflow/providers/google/cloud/hooks/dataproc.py 
b/airflow/providers/google/cloud/hooks/dataproc.py
index 0f138d7..e353ef7 100644
--- a/airflow/providers/google/cloud/hooks/dataproc.py
+++ b/airflow/providers/google/cloud/hooks/dataproc.py
@@ -216,7 +216,7 @@ class DataprocHook(GoogleBaseHook):
                 "Parameter `location` will be deprecated. "
                 "Please provide value through `region` parameter instead.",
                 DeprecationWarning,
-                stacklevel=1,
+                stacklevel=2,
             )
             region = location
         client_options = None
@@ -236,7 +236,7 @@ class DataprocHook(GoogleBaseHook):
                 "Parameter `location` will be deprecated. "
                 "Please provide value through `region` parameter instead.",
                 DeprecationWarning,
-                stacklevel=1,
+                stacklevel=2,
             )
             region = location
         client_options = None
@@ -256,7 +256,7 @@ class DataprocHook(GoogleBaseHook):
                 "Parameter `location` will be deprecated. "
                 "Please provide value through `region` parameter instead.",
                 DeprecationWarning,
-                stacklevel=1,
+                stacklevel=2,
             )
             region = location
         client_options = None
@@ -587,7 +587,7 @@ class DataprocHook(GoogleBaseHook):
                     "Parameter `location` will be deprecated. "
                     "Please provide value through `region` parameter instead.",
                     DeprecationWarning,
-                    stacklevel=1,
+                    stacklevel=2,
                 )
                 region = location
             else:
@@ -647,7 +647,7 @@ class DataprocHook(GoogleBaseHook):
                     "Parameter `location` will be deprecated. "
                     "Please provide value through `region` parameter instead.",
                     DeprecationWarning,
-                    stacklevel=1,
+                    stacklevel=2,
                 )
                 region = location
             else:
@@ -712,7 +712,7 @@ class DataprocHook(GoogleBaseHook):
                     "Parameter `location` will be deprecated. "
                     "Please provide value through `region` parameter instead.",
                     DeprecationWarning,
-                    stacklevel=1,
+                    stacklevel=2,
                 )
                 region = location
             else:
@@ -771,7 +771,7 @@ class DataprocHook(GoogleBaseHook):
                     "Parameter `location` will be deprecated. "
                     "Please provide value through `region` parameter instead.",
                     DeprecationWarning,
-                    stacklevel=1,
+                    stacklevel=2,
                 )
                 region = location
             else:
@@ -819,7 +819,7 @@ class DataprocHook(GoogleBaseHook):
                     "Parameter `location` will be deprecated. "
                     "Please provide value through `region` parameter instead.",
                     DeprecationWarning,
-                    stacklevel=1,
+                    stacklevel=2,
                 )
                 region = location
             else:
@@ -878,7 +878,7 @@ class DataprocHook(GoogleBaseHook):
                     "Parameter `location` will be deprecated. "
                     "Please provide value through `region` parameter instead.",
                     DeprecationWarning,
-                    stacklevel=1,
+                    stacklevel=2,
                 )
                 region = location
             else:
@@ -935,7 +935,7 @@ class DataprocHook(GoogleBaseHook):
                     "Parameter `location` will be deprecated. "
                     "Please provide value through `region` parameter instead.",
                     DeprecationWarning,
-                    stacklevel=1,
+                    stacklevel=2,
                 )
                 region = location
             else:
@@ -1010,7 +1010,7 @@ class DataprocHook(GoogleBaseHook):
                     "Parameter `location` will be deprecated. "
                     "Please provide value through `region` parameter instead.",
                     DeprecationWarning,
-                    stacklevel=1,
+                    stacklevel=2,
                 )
                 region = location
 
diff --git a/airflow/providers/google/cloud/operators/bigquery.py 
b/airflow/providers/google/cloud/operators/bigquery.py
index 4a0f6c0..fe853c2 100644
--- a/airflow/providers/google/cloud/operators/bigquery.py
+++ b/airflow/providers/google/cloud/operators/bigquery.py
@@ -489,6 +489,9 @@ class BigQueryExecuteQueryOperator(BaseOperator):
     Executes BigQuery SQL queries in a specific BigQuery database.
     This operator does not assert idempotency.
 
+    This operator is deprecated.
+    Please use 
:class:`airflow.providers.google.cloud.operators.bigquery.BigQueryInsertJobOperator`
+
     :param sql: the sql code to be executed (templated)
     :type sql: Can receive a str representing a sql statement,
         a list of str (sql statements), or reference to a template file.
@@ -1084,14 +1087,14 @@ class BigQueryCreateExternalTableOperator(BaseOperator):
         *,
         bucket: str,
         source_objects: List,
-        destination_project_dataset_table: str,
+        destination_project_dataset_table: str = None,
         table_resource: Optional[Dict[str, Any]] = None,
         schema_fields: Optional[List] = None,
         schema_object: Optional[str] = None,
-        source_format: str = 'CSV',
-        compression: str = 'NONE',
-        skip_leading_rows: int = 0,
-        field_delimiter: str = ',',
+        source_format: Optional[str] = None,
+        compression: Optional[str] = None,
+        skip_leading_rows: Optional[int] = None,
+        field_delimiter: Optional[str] = None,
         max_bad_records: int = 0,
         quote_character: Optional[str] = None,
         allow_quoted_newlines: bool = False,
@@ -1140,6 +1143,14 @@ class BigQueryCreateExternalTableOperator(BaseOperator):
                 DeprecationWarning,
                 stacklevel=2,
             )
+            if not source_format:
+                source_format = 'CSV'
+            if not compression:
+                compression = 'NONE'
+            if not skip_leading_rows:
+                skip_leading_rows = 0
+            if not field_delimiter:
+                field_delimiter = ","
 
         if table_resource and kwargs_passed:
             raise ValueError("You provided both `table_resource` and exclusive 
keywords arguments.")
@@ -1579,9 +1590,8 @@ class BigQueryPatchDatasetOperator(BaseOperator):
     This operator is used to patch dataset for your Project in BigQuery.
     It only replaces fields that are provided in the submitted dataset 
resource.
 
-    .. seealso::
-        For more information on how to use this operator, take a look at the 
guide:
-        :ref:`howto/operator:BigQueryPatchDatasetOperator`
+    This operator is deprecated.
+    Please use 
:class:`airflow.providers.google.cloud.operators.bigquery.BigQueryUpdateTableOperator`
 
     :param dataset_id: The id of dataset. Don't need to provide,
         if datasetId in dataset_reference.
diff --git a/airflow/providers/google/cloud/operators/dataproc.py 
b/airflow/providers/google/cloud/operators/dataproc.py
index 7988aa5..96d3ced 100644
--- a/airflow/providers/google/cloud/operators/dataproc.py
+++ b/airflow/providers/google/cloud/operators/dataproc.py
@@ -1673,7 +1673,7 @@ class 
DataprocCreateWorkflowTemplateOperator(BaseOperator):
                     "Parameter `location` will be deprecated. "
                     "Please provide value through `region` parameter instead.",
                     DeprecationWarning,
-                    stacklevel=1,
+                    stacklevel=2,
                 )
                 region = location
             else:
@@ -1974,7 +1974,7 @@ class DataprocSubmitJobOperator(BaseOperator):
                     "Parameter `location` will be deprecated. "
                     "Please provide value through `region` parameter instead.",
                     DeprecationWarning,
-                    stacklevel=1,
+                    stacklevel=2,
                 )
                 region = location
             else:
@@ -2116,7 +2116,7 @@ class DataprocUpdateClusterOperator(BaseOperator):
                     "Parameter `location` will be deprecated. "
                     "Please provide value through `region` parameter instead.",
                     DeprecationWarning,
-                    stacklevel=1,
+                    stacklevel=2,
                 )
                 region = location
             else:
diff --git a/airflow/providers/google/cloud/sensors/dataproc.py 
b/airflow/providers/google/cloud/sensors/dataproc.py
index 68b4c70..2bcfbe1 100644
--- a/airflow/providers/google/cloud/sensors/dataproc.py
+++ b/airflow/providers/google/cloud/sensors/dataproc.py
@@ -63,7 +63,7 @@ class DataprocJobSensor(BaseSensorOperator):
                     "Parameter `location` will be deprecated. "
                     "Please provide value through `region` parameter instead.",
                     DeprecationWarning,
-                    stacklevel=1,
+                    stacklevel=2,
                 )
                 region = location
             else:
diff --git a/airflow/providers/google/cloud/transfers/bigquery_to_mssql.py 
b/airflow/providers/google/cloud/transfers/bigquery_to_mssql.py
index 553a8bf..a384a0f 100644
--- a/airflow/providers/google/cloud/transfers/bigquery_to_mssql.py
+++ b/airflow/providers/google/cloud/transfers/bigquery_to_mssql.py
@@ -23,7 +23,6 @@ from google.cloud.bigquery.table import TableReference
 from airflow.models import BaseOperator
 from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
 from airflow.providers.microsoft.mssql.hooks.mssql import MsSqlHook
-from airflow.utils.decorators import apply_defaults
 
 
 class BigQueryToMsSqlOperator(BaseOperator):
@@ -89,7 +88,6 @@ class BigQueryToMsSqlOperator(BaseOperator):
         'impersonation_chain',
     )
 
-    @apply_defaults
     def __init__(
         self,
         *,
diff --git a/airflow/providers/google/cloud/utils/mlengine_operator_utils.py 
b/airflow/providers/google/cloud/utils/mlengine_operator_utils.py
index 5551d8f..90c05bb 100644
--- a/airflow/providers/google/cloud/utils/mlengine_operator_utils.py
+++ b/airflow/providers/google/cloud/utils/mlengine_operator_utils.py
@@ -30,8 +30,8 @@ import dill
 from airflow import DAG
 from airflow.exceptions import AirflowException
 from airflow.operators.python import PythonOperator
+from airflow.providers.apache.beam.operators.beam import 
BeamRunPythonPipelineOperator
 from airflow.providers.google.cloud.hooks.gcs import GCSHook
-from airflow.providers.google.cloud.operators.dataflow import 
DataflowCreatePythonJobOperator
 from airflow.providers.google.cloud.operators.mlengine import 
MLEngineStartBatchPredictionJobOperator
 
 T = TypeVar("T", bound=Callable)
@@ -242,11 +242,11 @@ def create_evaluate_ops(
     )
 
     metric_fn_encoded = base64.b64encode(dill.dumps(metric_fn, 
recurse=True)).decode()
-    evaluate_summary = DataflowCreatePythonJobOperator(
+    evaluate_summary = BeamRunPythonPipelineOperator(
         task_id=(task_prefix + "-summary"),
         py_file=os.path.join(os.path.dirname(__file__), 
'mlengine_prediction_summary.py'),
-        dataflow_default_options=dataflow_options,
-        options={
+        default_pipeline_options=dataflow_options,
+        pipeline_options={
             "prediction_path": prediction_path,
             "metric_fn_encoded": metric_fn_encoded,
             "metric_keys": ','.join(metric_keys),
diff --git 
a/airflow/providers/google/firebase/example_dags/example_firestore.py 
b/airflow/providers/google/firebase/example_dags/example_firestore.py
index df0bb3a..041b266 100644
--- a/airflow/providers/google/firebase/example_dags/example_firestore.py
+++ b/airflow/providers/google/firebase/example_dags/example_firestore.py
@@ -51,7 +51,7 @@ from airflow.providers.google.cloud.operators.bigquery import 
(
     BigQueryCreateEmptyDatasetOperator,
     BigQueryCreateExternalTableOperator,
     BigQueryDeleteDatasetOperator,
-    BigQueryExecuteQueryOperator,
+    BigQueryInsertJobOperator,
 )
 from airflow.providers.google.firebase.operators.firestore import 
CloudFirestoreExportDatabaseOperator
 from airflow.utils import dates
@@ -99,19 +99,39 @@ with models.DAG(
     create_external_table_multiple_types = BigQueryCreateExternalTableOperator(
         task_id="create_external_table",
         bucket=BUCKET_NAME,
+        table_resource={
+            "tableReference": {
+                "projectId": GCP_PROJECT_ID,
+                "datasetId": DATASET_NAME,
+                "tableId": "firestore_data",
+            },
+            "schema": {
+                "fields": [
+                    {"name": "name", "type": "STRING"},
+                    {"name": "post_abbr", "type": "STRING"},
+                ]
+            },
+            "externalDataConfiguration": {
+                "sourceFormat": "DATASTORE_BACKUP",
+                "compression": "NONE",
+                "csvOptions": {"skipLeadingRows": 1},
+            },
+        },
         source_objects=[
             f"{EXPORT_PREFIX}/all_namespaces/kind_{EXPORT_COLLECTION_ID}"
             f"/all_namespaces_kind_{EXPORT_COLLECTION_ID}.export_metadata"
         ],
-        source_format="DATASTORE_BACKUP",
-        
destination_project_dataset_table=f"{GCP_PROJECT_ID}.{DATASET_NAME}.firestore_data",
     )
     # [END howto_operator_create_external_table_multiple_types]
 
-    read_data_from_gcs_multiple_types = BigQueryExecuteQueryOperator(
+    read_data_from_gcs_multiple_types = BigQueryInsertJobOperator(
         task_id="execute_query",
-        sql=f"SELECT COUNT(*) FROM 
`{GCP_PROJECT_ID}.{DATASET_NAME}.firestore_data`",
-        use_legacy_sql=False,
+        configuration={
+            "query": {
+                "query": f"SELECT COUNT(*) FROM 
`{GCP_PROJECT_ID}.{DATASET_NAME}.firestore_data`",
+                "useLegacySql": False,
+            }
+        },
     )
 
     # Firestore
diff --git a/airflow/providers/singularity/example_dags/example_singularity.py 
b/airflow/providers/singularity/example_dags/example_singularity.py
index b37cab7..83c9a7b 100644
--- a/airflow/providers/singularity/example_dags/example_singularity.py
+++ b/airflow/providers/singularity/example_dags/example_singularity.py
@@ -19,7 +19,7 @@
 from datetime import timedelta
 
 from airflow import DAG
-from airflow.operators.bash_operator import BashOperator
+from airflow.operators.bash import BashOperator
 from airflow.providers.singularity.operators.singularity import 
SingularityOperator
 from airflow.utils.dates import days_ago
 
diff --git a/dev/import_all_classes.py b/dev/import_all_classes.py
index a1611f3..67a76c8 100755
--- a/dev/import_all_classes.py
+++ b/dev/import_all_classes.py
@@ -20,8 +20,10 @@ import importlib
 import pkgutil
 import sys
 import traceback
+import warnings
 from inspect import isclass
-from typing import List, Set
+from typing import List, Set, Tuple
+from warnings import WarningMessage
 
 from rich import print
 
@@ -32,7 +34,7 @@ def import_all_classes(
     provider_ids: List[str] = None,
     print_imports: bool = False,
     print_skips: bool = False,
-) -> List[str]:
+) -> Tuple[List[str], List[WarningMessage]]:
     """
     Imports all classes in providers packages. This method loads and imports
     all the classes found in providers, so that we can find all the subclasses
@@ -43,7 +45,7 @@ def import_all_classes(
     :param provider_ids - provider ids that should be loaded.
     :param print_imports - if imported class should also be printed in output
     :param print_skips - if skipped classes should also be printed in output
-    :return: list of all imported classes
+    :return: tupple of list of all imported classes and all warnings generated
     """
     imported_classes = []
     tracebacks = []
@@ -63,6 +65,7 @@ def import_all_classes(
         if any(provider_prefix in exception_string for provider_prefix in 
provider_prefixes):
             tracebacks.append(exception_string)
 
+    all_warnings: List[WarningMessage] = []
     for modinfo in pkgutil.walk_packages(path=paths, prefix=prefix, 
onerror=onerror):
         if not any(modinfo.name.startswith(provider_prefix) for 
provider_prefix in provider_prefixes):
             if print_skips:
@@ -74,12 +77,16 @@ def import_all_classes(
                 printed_packages.add(package_to_print)
                 print(f"Importing package: {package_to_print}")
         try:
-            _module = importlib.import_module(modinfo.name)
-            for attribute_name in dir(_module):
-                class_name = modinfo.name + "." + attribute_name
-                attribute = getattr(_module, attribute_name)
-                if isclass(attribute):
-                    imported_classes.append(class_name)
+            with warnings.catch_warnings(record=True) as w:
+                warnings.filterwarnings("always", category=DeprecationWarning)
+                _module = importlib.import_module(modinfo.name)
+                for attribute_name in dir(_module):
+                    class_name = modinfo.name + "." + attribute_name
+                    attribute = getattr(_module, attribute_name)
+                    if isclass(attribute):
+                        imported_classes.append(class_name)
+            if w:
+                all_warnings.extend(w)
         except Exception:
             exception_str = traceback.format_exc()
             tracebacks.append(exception_str)
@@ -96,7 +103,7 @@ def import_all_classes(
             print("[red]----------------------------------------[/]", 
file=sys.stderr)
         sys.exit(1)
     else:
-        return imported_classes
+        return imported_classes, all_warnings
 
 
 if __name__ == '__main__':
@@ -109,10 +116,18 @@ if __name__ == '__main__':
     print()
     print(f"Walking all packages in {args.path} with prefix {args.prefix}")
     print()
-    classes = import_all_classes(print_imports=True, print_skips=True, 
paths=args.path, prefix=args.prefix)
+    classes, warns = import_all_classes(
+        print_imports=True, print_skips=True, paths=args.path, 
prefix=args.prefix
+    )
     if len(classes) == 0:
         print("[red]Something is seriously wrong - no classes imported[/]")
         sys.exit(1)
+    if warns:
+        print("[yellow]There were warnings generated during the import[/]")
+        for w in warns:
+            one_line_message = str(w.message).replace('\n', ' ')
+            print(f"[yellow]{w.filename}:{w.lineno}: {one_line_message}[/]")
+
     print()
     print(f"[green]SUCCESS: All provider packages are importable! Imported 
{len(classes)} classes.[/]")
     print()
diff --git a/dev/provider_packages/prepare_provider_packages.py 
b/dev/provider_packages/prepare_provider_packages.py
index 23c51c7..b45ee3c 100755
--- a/dev/provider_packages/prepare_provider_packages.py
+++ b/dev/provider_packages/prepare_provider_packages.py
@@ -31,6 +31,7 @@ import subprocess
 import sys
 import tempfile
 import textwrap
+import warnings
 from contextlib import contextmanager
 from copy import deepcopy
 from datetime import datetime, timedelta
@@ -43,20 +44,15 @@ from typing import Any, Dict, Iterable, List, NamedTuple, 
Optional, Set, Tuple,
 
 import click
 import jsonschema
-import yaml
 from github import Github, PullRequest, UnknownObjectException
 from packaging.version import Version
-from rich import print
 from rich.console import Console
 from rich.progress import Progress
 from rich.syntax import Syntax
 
-ALL_PYTHON_VERSIONS = ["3.6", "3.7", "3.8", "3.9"]
+from airflow.utils.yaml import safe_load
 
-try:
-    from yaml import CSafeLoader as SafeLoader
-except ImportError:
-    from yaml import SafeLoader
+ALL_PYTHON_VERSIONS = ["3.6", "3.7", "3.8", "3.9"]
 
 INITIAL_CHANGELOG_CONTENT = """
 
@@ -118,6 +114,8 @@ logger = logging.getLogger(__name__)
 
 PY3 = sys.version_info[0] == 3
 
+console = Console(width=400, color_system="standard")
+
 
 @click.group(context_settings={'help_option_names': ['-h', '--help'], 
'max_content_width': 500})
 def cli():
@@ -172,12 +170,12 @@ def with_group(title):
     
https://docs.github.com/en/free-pro-team@latest/actions/reference/workflow-commands-for-github-actions#grouping-log-lines
     """
     if os.environ.get('GITHUB_ACTIONS', 'false') != "true":
-        print("[blue]" + "#" * 10 + ' ' + title + ' ' + "#" * 10 + "[/]")
+        console.print("[blue]" + "#" * 10 + ' ' + title + ' ' + "#" * 10 + 
"[/]")
         yield
         return
-    print(f"::group::{title}")
+    console.print(f"::group::{title}")
     yield
-    print("::endgroup::")
+    console.print("::endgroup::")
 
 
 class EntityType(Enum):
@@ -189,11 +187,8 @@ class EntityType(Enum):
 
 
 class EntityTypeSummary(NamedTuple):
-    entities: Set[str]
-    new_entities: List[str]
-    moved_entities: Dict[str, str]
+    entities: List[str]
     new_entities_table: str
-    moved_entities_table: str
     wrong_entities: List[Tuple[type, str]]
 
 
@@ -221,12 +216,12 @@ ENTITY_NAMES = {
     EntityType.Secrets: "Secrets",
 }
 
-TOTALS: Dict[EntityType, List[int]] = {
-    EntityType.Operators: [0, 0],
-    EntityType.Hooks: [0, 0],
-    EntityType.Sensors: [0, 0],
-    EntityType.Transfers: [0, 0],
-    EntityType.Secrets: [0, 0],
+TOTALS: Dict[EntityType, int] = {
+    EntityType.Operators: 0,
+    EntityType.Hooks: 0,
+    EntityType.Sensors: 0,
+    EntityType.Transfers: 0,
+    EntityType.Secrets: 0,
 }
 
 OPERATORS_PATTERN = r".*Operator$"
@@ -301,14 +296,6 @@ def 
get_target_providers_package_folder(provider_package_id: str) -> str:
 
 DEPENDENCIES_JSON_FILE = os.path.join(PROVIDERS_PATH, "dependencies.json")
 
-MOVED_ENTITIES: Dict[EntityType, Dict[str, str]] = {
-    EntityType.Operators: {value[0]: value[1] for value in 
tests.deprecated_classes.OPERATORS},
-    EntityType.Sensors: {value[0]: value[1] for value in 
tests.deprecated_classes.SENSORS},
-    EntityType.Hooks: {value[0]: value[1] for value in 
tests.deprecated_classes.HOOKS},
-    EntityType.Secrets: {value[0]: value[1] for value in 
tests.deprecated_classes.SECRETS},
-    EntityType.Transfers: {value[0]: value[1] for value in 
tests.deprecated_classes.TRANSFERS},
-}
-
 
 def get_pip_package_name(provider_package_id: str) -> str:
     """
@@ -565,49 +552,19 @@ def find_all_entities(
     return VerifiedEntities(all_entities=found_entities, 
wrong_entities=wrong_entities)
 
 
-def convert_new_classes_to_table(
-    entity_type: EntityType, new_entities: List[str], full_package_name: str
-) -> str:
+def convert_classes_to_table(entity_type: EntityType, entities: List[str], 
full_package_name: str) -> str:
     """
     Converts new entities tp a markdown table.
 
-    :param entity_type: list of entities to convert to markup
-    :param new_entities: list of new entities
+    :param entity_type: entity type to convert to markup
+    :param entities: list of  entities
     :param full_package_name: name of the provider package
     :return: table of new classes
     """
     from tabulate import tabulate
 
     headers = [f"New Airflow 2.0 {entity_type.value.lower()}: 
`{full_package_name}` package"]
-    table = [(get_class_code_link(full_package_name, class_name, "main"),) for 
class_name in new_entities]
-    return tabulate(table, headers=headers, tablefmt="pipe")
-
-
-def convert_moved_classes_to_table(
-    entity_type: EntityType,
-    moved_entities: Dict[str, str],
-    full_package_name: str,
-) -> str:
-    """
-    Converts moved entities to a markdown table
-    :param entity_type: type of entities -> operators, sensors etc.
-    :param moved_entities: dictionary of moved entities `to -> from`
-    :param full_package_name: name of the provider package
-    :return: table of moved classes
-    """
-    from tabulate import tabulate
-
-    headers = [
-        f"Airflow 2.0 {entity_type.value.lower()}: `{full_package_name}` 
package",
-        "Airflow 1.10.* previous location (usually `airflow.contrib`)",
-    ]
-    table = [
-        (
-            get_class_code_link(full_package_name, to_class, "main"),
-            get_class_code_link("airflow", moved_entities[to_class], 
"v1-10-stable"),
-        )
-        for to_class in sorted(moved_entities.keys())
-    ]
+    table = [(get_class_code_link(full_package_name, class_name, "main"),) for 
class_name in entities]
     return tabulate(table, headers=headers, tablefmt="pipe")
 
 
@@ -618,8 +575,7 @@ def get_details_about_classes(
     full_package_name: str,
 ) -> EntityTypeSummary:
     """
-    Splits the set of entities into new and moved, depending on their presence 
in the dict of objects
-    retrieved from the test_contrib_to_core. Updates all_entities with the 
split class.
+    Get details about entities..
 
     :param entity_type: type of entity (Operators, Hooks etc.)
     :param entities: set of entities found
@@ -627,30 +583,14 @@ def get_details_about_classes(
     :param full_package_name: full package name
     :return:
     """
-    dict_of_moved_classes = MOVED_ENTITIES[entity_type]
-    new_entities = []
-    moved_entities = {}
-    for obj in entities:
-        if obj in dict_of_moved_classes:
-            moved_entities[obj] = dict_of_moved_classes[obj]
-            del dict_of_moved_classes[obj]
-        else:
-            new_entities.append(obj)
-    new_entities.sort()
-    TOTALS[entity_type][0] += len(new_entities)
-    TOTALS[entity_type][1] += len(moved_entities)
+    all_entities = list(entities)
+    all_entities.sort()
+    TOTALS[entity_type] += len(all_entities)
     return EntityTypeSummary(
-        entities=entities,
-        new_entities=new_entities,
-        moved_entities=moved_entities,
-        new_entities_table=convert_new_classes_to_table(
-            entity_type=entity_type,
-            new_entities=new_entities,
-            full_package_name=full_package_name,
-        ),
-        moved_entities_table=convert_moved_classes_to_table(
+        entities=all_entities,
+        new_entities_table=convert_classes_to_table(
             entity_type=entity_type,
-            moved_entities=moved_entities,
+            entities=all_entities,
             full_package_name=full_package_name,
         ),
         wrong_entities=wrong_entities,
@@ -701,9 +641,9 @@ def print_wrong_naming(entity_type: EntityType, 
wrong_classes: List[Tuple[type,
     :param wrong_classes: list of wrong entities
     """
     if wrong_classes:
-        print(f"\n[red]There are wrongly named entities of type 
{entity_type}:[/]\n", file=sys.stderr)
+        console.print(f"\n[red]There are wrongly named entities of type 
{entity_type}:[/]\n")
         for wrong_entity_type, message in wrong_classes:
-            print(f"{wrong_entity_type}: {message}", file=sys.stderr)
+            console.print(f"{wrong_entity_type}: {message}")
 
 
 def get_package_class_summary(
@@ -1050,10 +990,9 @@ def check_if_release_version_ok(
             current_release_version = (datetime.today() + 
timedelta(days=5)).strftime('%Y.%m.%d')
     if previous_release_version:
         if Version(current_release_version) < 
Version(previous_release_version):
-            print(
+            console.print(
                 f"[red]The release {current_release_version} must be not less 
than "
-                f"{previous_release_version} - last release for the 
package[/]",
-                file=sys.stderr,
+                f"{previous_release_version} - last release for the package[/]"
             )
             raise Exception("Bad release version")
     return current_release_version, previous_release_version
@@ -1088,7 +1027,7 @@ def make_sure_remote_apache_exists_and_fetch(git_update: 
bool, verbose: bool):
     try:
         check_remote_command = ["git", "remote", "get-url", HTTPS_REMOTE]
         if verbose:
-            print(f"Running command: '{' '.join(check_remote_command)}'")
+            console.print(f"Running command: '{' 
'.join(check_remote_command)}'")
         subprocess.check_call(
             check_remote_command,
             stdout=subprocess.DEVNULL,
@@ -1108,19 +1047,19 @@ def 
make_sure_remote_apache_exists_and_fetch(git_update: bool, verbose: bool):
                 "https://github.com/apache/airflow.git";,
             ]
             if verbose:
-                print(f"Running command: '{' '.join(remote_add_command)}'")
+                console.print(f"Running command: '{' 
'.join(remote_add_command)}'")
             try:
                 subprocess.check_output(
                     remote_add_command,
                     stderr=subprocess.STDOUT,
                 )
             except subprocess.CalledProcessError as ex:
-                print("[red]Error: when adding remote:[/]", ex)
+                console.print("[red]Error: when adding remote:[/]", ex)
         else:
             raise
     if verbose:
-        print("Fetching full history and tags from remote. ")
-        print("This might override your local tags!")
+        console.print("Fetching full history and tags from remote. ")
+        console.print("This might override your local tags!")
     is_shallow_repo = (
         subprocess.check_output(["git", "rev-parse", 
"--is-shallow-repository"], stderr=subprocess.DEVNULL)
         == 'true'
@@ -1128,13 +1067,13 @@ def 
make_sure_remote_apache_exists_and_fetch(git_update: bool, verbose: bool):
     fetch_command = ["git", "fetch", "--tags", "--force", HTTPS_REMOTE]
     if is_shallow_repo:
         if verbose:
-            print(
+            console.print(
                 "This will also unshallow the repository, "
                 "making all history available and increasing storage!"
             )
         fetch_command.append("--unshallow")
     if verbose:
-        print(f"Running command: '{' '.join(fetch_command)}'")
+        console.print(f"Running command: '{' '.join(fetch_command)}'")
     subprocess.check_call(
         fetch_command,
         stderr=subprocess.DEVNULL,
@@ -1163,7 +1102,7 @@ def get_git_log_command(
         git_cmd.append(from_commit)
     git_cmd.extend(['--', '.'])
     if verbose:
-        print(f"Command to run: '{' '.join(git_cmd)}'")
+        console.print(f"Command to run: '{' '.join(git_cmd)}'")
     return git_cmd
 
 
@@ -1261,13 +1200,13 @@ def check_if_classes_are_properly_named(
             _, class_name = class_full_name.rsplit(".", maxsplit=1)
             error_encountered = False
             if not is_camel_case_with_acronyms(class_name):
-                print(
+                console.print(
                     f"[red]The class {class_full_name} is wrongly named. The "
                     f"class name should be CamelCaseWithACRONYMS ![/]"
                 )
                 error_encountered = True
             if not class_name.endswith(class_suffix):
-                print(
+                console.print(
                     f"[red]The class {class_full_name} is wrongly named. It is 
one of the {entity_type.value}"
                     f" so it should end with {class_suffix}[/]"
                 )
@@ -1296,7 +1235,7 @@ def 
validate_provider_info_with_runtime_schema(provider_info: Dict[str, Any]) ->
     try:
         jsonschema.validate(provider_info, schema=schema)
     except jsonschema.ValidationError as ex:
-        print("[red]Provider info not validated against runtime schema[/]")
+        console.print("[red]Provider info not validated against runtime 
schema[/]")
         raise Exception(
             "Error when validating schema. The schema must be compatible with "
             + "airflow/provider_info.schema.json.",
@@ -1316,7 +1255,7 @@ def get_provider_yaml(provider_package_id: str) -> 
Dict[str, Any]:
     if not os.path.exists(provider_yaml_file_name):
         raise Exception(f"The provider.yaml file is missing: 
{provider_yaml_file_name}")
     with open(provider_yaml_file_name) as provider_file:
-        provider_yaml_dict = yaml.load(provider_file, SafeLoader)
+        provider_yaml_dict = safe_load(provider_file)
     return provider_yaml_dict
 
 
@@ -1339,7 +1278,6 @@ def get_version_tag(version: str, provider_package_id: 
str, version_suffix: str
 
 def print_changes_table(changes_table):
     syntax = Syntax(changes_table, "rst", theme="ansi_dark")
-    console = Console(width=200)
     console.print(syntax)
 
 
@@ -1360,14 +1298,14 @@ def get_all_changes_for_package(
     current_version = versions[0]
     current_tag_no_suffix = get_version_tag(current_version, 
provider_package_id)
     if verbose:
-        print(f"Checking if tag '{current_tag_no_suffix}' exist.")
+        console.print(f"Checking if tag '{current_tag_no_suffix}' exist.")
     if not subprocess.call(
         get_git_tag_check_command(current_tag_no_suffix),
         cwd=source_provider_package_path,
         stderr=subprocess.DEVNULL,
     ):
         if verbose:
-            print(f"The tag {current_tag_no_suffix} exists.")
+            console.print(f"The tag {current_tag_no_suffix} exists.")
         # The tag already exists
         changes = subprocess.check_output(
             get_git_log_command(verbose, HEAD_OF_HTTPS_REMOTE, 
current_tag_no_suffix),
@@ -1389,21 +1327,23 @@ def get_all_changes_for_package(
                         universal_newlines=True,
                     )
                     if not changes_since_last_doc_only_check:
-                        print()
-                        print("[yellow]The provider has doc-only changes since 
the last release. Skipping[/]")
+                        console.print()
+                        console.print(
+                            "[yellow]The provider has doc-only changes since 
the last release. Skipping[/]"
+                        )
                         # Returns 66 in case of doc-only changes
                         sys.exit(66)
                 except subprocess.CalledProcessError:
                     # ignore when the commit mentioned as last doc-only change 
is obsolete
                     pass
-            print(f"[yellow]The provider {provider_package_id} has changes 
since last release[/]")
-            print()
-            print(
+            console.print(f"[yellow]The provider {provider_package_id} has 
changes since last release[/]")
+            console.print()
+            console.print(
                 "[yellow]Please update version in "
                 f"'airflow/providers/{provider_package_id.replace('-','/')}/'"
                 "provider.yaml'[/]\n"
             )
-            print("[yellow]Or mark the changes as doc-only[/]")
+            console.print("[yellow]Or mark the changes as doc-only[/]")
             changes_table, array_of_changes = convert_git_changes_to_table(
                 "UNKNOWN",
                 changes,
@@ -1413,14 +1353,16 @@ def get_all_changes_for_package(
             print_changes_table(changes_table)
             return False, array_of_changes[0], changes_table
         else:
-            print(f"No changes for {provider_package_id}")
+            console.print(f"No changes for {provider_package_id}")
             return False, None, ""
     if verbose:
-        print("The tag does not exist. ")
+        console.print("The tag does not exist. ")
     if len(versions) == 1:
-        print(f"The provider '{provider_package_id}' has never been released 
but it is ready to release!\n")
+        console.print(
+            f"The provider '{provider_package_id}' has never been released but 
it is ready to release!\n"
+        )
     else:
-        print(f"New version of the '{provider_package_id}' package is ready to 
be released!\n")
+        console.print(f"New version of the '{provider_package_id}' package is 
ready to be released!\n")
     next_version_tag = HEAD_OF_HTTPS_REMOTE
     changes_table = ''
     current_version = versions[0]
@@ -1565,7 +1507,7 @@ def confirm(message: str):
     """
     answer = ""
     while answer not in ["y", "n", "q"]:
-        print(f"[yellow]{message}[Y/N/Q]?[/] ", end='')
+        console.print(f"[yellow]{message}[Y/N/Q]?[/] ", end='')
         answer = input("").lower()
     if answer == "q":
         # Returns 65 in case user decided to quit
@@ -1576,7 +1518,7 @@ def confirm(message: str):
 def mark_latest_changes_as_documentation_only(
     provider_details: ProviderPackageDetails, latest_change: Change
 ):
-    print(
+    console.print(
         f"Marking last change: {latest_change.short_hash} and all above 
changes since the last release "
         "as doc-only changes!"
     )
@@ -1626,11 +1568,11 @@ def update_release_notes(
             if interactive and not confirm("Provider marked for release. 
Proceed?"):
                 return False
         elif not latest_change:
-            print()
-            print(
+            console.print()
+            console.print(
                 f"[yellow]Provider: {provider_package_id} - skipping 
documentation generation. No changes![/]"
             )
-            print()
+            console.print()
             return False
         else:
             if interactive and confirm("Are those changes 
documentation-only?"):
@@ -1666,9 +1608,9 @@ def update_setup_files(
         current_release_version=current_release_version,
         version_suffix=version_suffix,
     )
-    print()
-    print(f"Generating setup files for {provider_package_id}")
-    print()
+    console.print()
+    console.print(f"Generating setup files for {provider_package_id}")
+    console.print()
     prepare_setup_py_file(jinja_context)
     prepare_setup_cfg_file(jinja_context)
     prepare_get_provider_info_py_file(jinja_context, provider_package_id)
@@ -1685,9 +1627,9 @@ def replace_content(file_path, old_text, new_text, 
provider_package_id):
                 copyfile(file_path, temp_file_path)
             with open(file_path, "wt") as readme_file:
                 readme_file.write(new_text)
-            print()
-            print(f"Generated {file_path} file for the {provider_package_id} 
provider")
-            print()
+            console.print()
+            console.print(f"Generated {file_path} file for the 
{provider_package_id} provider")
+            console.print()
             if old_text != "":
                 subprocess.call(["diff", "--color=always", temp_file_path, 
file_path])
         finally:
@@ -1838,9 +1780,9 @@ def verify_provider_package(provider_package_id: str) -> 
str:
     :return: None
     """
     if provider_package_id not in get_provider_packages():
-        print(f"[red]Wrong package name: {provider_package_id}[/]")
-        print("Use one of:")
-        print(get_provider_packages())
+        console.print(f"[red]Wrong package name: {provider_package_id}[/]")
+        console.print("Use one of:")
+        console.print(get_provider_packages())
         raise Exception(f"The package {provider_package_id} is not a provider 
package.")
 
 
@@ -1848,17 +1790,16 @@ def verify_changelog_exists(package: str) -> str:
     provider_details = get_provider_details(package)
     changelog_path = 
os.path.join(provider_details.source_provider_package_path, "CHANGELOG.rst")
     if not os.path.isfile(changelog_path):
-        print(f"[red]ERROR: Missing ${changelog_path}[/]")
-        print("Please add the file with initial content:")
-        print()
+        console.print(f"[red]ERROR: Missing ${changelog_path}[/]")
+        console.print("Please add the file with initial content:")
+        console.print()
         syntax = Syntax(
             INITIAL_CHANGELOG_CONTENT,
             "rst",
             theme="ansi_dark",
         )
-        console = Console(width=200)
         console.print(syntax)
-        print()
+        console.print()
         raise Exception(f"Missing {changelog_path}")
     return changelog_path
 
@@ -1868,7 +1809,7 @@ def list_providers_packages():
     """List all provider packages."""
     providers = get_all_providers()
     for provider in providers:
-        print(provider)
+        console.print(provider)
 
 
 @cli.command()
@@ -1894,7 +1835,7 @@ def update_package_documentation(
     provider_package_id = package_id
     verify_provider_package(provider_package_id)
     with with_group(f"Update release notes for package '{provider_package_id}' 
"):
-        print("Updating documentation for the latest release version.")
+        console.print("Updating documentation for the latest release version.")
         make_sure_remote_apache_exists_and_fetch(git_update, verbose)
         if not update_release_notes(
             provider_package_id, version_suffix, force=force, verbose=verbose, 
interactive=interactive
@@ -1906,7 +1847,7 @@ def update_package_documentation(
 def tag_exists_for_version(provider_package_id: str, current_tag: str, 
verbose: bool):
     provider_details = get_provider_details(provider_package_id)
     if verbose:
-        print(f"Checking if tag `{current_tag}` exists.")
+        console.print(f"Checking if tag `{current_tag}` exists.")
     if not subprocess.call(
         get_git_tag_check_command(current_tag),
         cwd=provider_details.source_provider_package_path,
@@ -1914,10 +1855,10 @@ def tag_exists_for_version(provider_package_id: str, 
current_tag: str, verbose:
         stdout=subprocess.DEVNULL,
     ):
         if verbose:
-            print(f"Tag `{current_tag}` exists.")
+            console.print(f"Tag `{current_tag}` exists.")
         return True
     if verbose:
-        print(f"Tag `{current_tag}` does not exist.")
+        console.print(f"Tag `{current_tag}` does not exist.")
     return False
 
 
@@ -1936,7 +1877,7 @@ def generate_setup_files(version_suffix: str, git_update: 
bool, package_id: str,
     with with_group(f"Generate setup files for '{provider_package_id}'"):
         current_tag = get_current_tag(provider_package_id, version_suffix, 
git_update, verbose)
         if tag_exists_for_version(provider_package_id, current_tag, verbose):
-            print(f"[yellow]The tag {current_tag} exists. Not preparing the 
package.[/]")
+            console.print(f"[yellow]The tag {current_tag} exists. Not 
preparing the package.[/]")
             # Returns 1 in case of skipped package
             sys.exit(1)
         else:
@@ -1944,7 +1885,7 @@ def generate_setup_files(version_suffix: str, git_update: 
bool, package_id: str,
                 provider_package_id,
                 version_suffix,
             ):
-                print(f"[green]Generated regular package setup files for 
{provider_package_id}[/]")
+                console.print(f"[green]Generated regular package setup files 
for {provider_package_id}[/]")
             else:
                 # Returns 64 in case of skipped package
                 sys.exit(64)
@@ -1962,7 +1903,7 @@ def get_current_tag(provider_package_id: str, suffix: 
str, git_update: bool, ver
 
 def cleanup_remnants(verbose: bool):
     if verbose:
-        print("Cleaning remnants")
+        console.print("Cleaning remnants")
     files = glob.glob("*.egg-info")
     for file in files:
         shutil.rmtree(file, ignore_errors=True)
@@ -1976,11 +1917,11 @@ def verify_setup_py_prepared(provider_package):
         setup_content = f.read()
     search_for = f"providers-{provider_package.replace('.','-')} for Apache 
Airflow"
     if search_for not in setup_content:
-        print(
+        console.print(
             f"[red]The setup.py is probably prepared for another package. "
             f"It does not contain [bold]{search_for}[/bold]![/]"
         )
-        print(
+        console.print(
             
f"\nRun:\n\n[bold]./dev/provider_packages/prepare_provider_packages.py "
             f"generate-setup-files {provider_package}[/bold]\n"
         )
@@ -2022,15 +1963,15 @@ def build_provider_packages(
         with with_group(f"Prepare provider package for 
'{provider_package_id}'"):
             current_tag = get_current_tag(provider_package_id, version_suffix, 
git_update, verbose)
             if tag_exists_for_version(provider_package_id, current_tag, 
verbose):
-                print(f"[yellow]The tag {current_tag} exists. Skipping the 
package.[/]")
+                console.print(f"[yellow]The tag {current_tag} exists. Skipping 
the package.[/]")
                 return False
-            print(f"Changing directory to ${TARGET_PROVIDER_PACKAGES_PATH}")
+            console.print(f"Changing directory to 
${TARGET_PROVIDER_PACKAGES_PATH}")
             os.chdir(TARGET_PROVIDER_PACKAGES_PATH)
             cleanup_remnants(verbose)
             provider_package = package_id
             verify_setup_py_prepared(provider_package)
 
-            print(f"Building provider package: {provider_package} in format 
{package_format}")
+            console.print(f"Building provider package: {provider_package} in 
format {package_format}")
             command = ["python3", "setup.py", "build", "--build-temp", 
tmp_build_dir]
             if version_suffix is not None:
                 command.extend(['egg_info', '--tag-build', version_suffix])
@@ -2038,13 +1979,15 @@ def build_provider_packages(
                 command.append("sdist")
             if package_format in ['wheel', 'both']:
                 command.extend(["bdist_wheel", "--bdist-dir", tmp_dist_dir])
-            print(f"Executing command: '{' '.join(command)}'")
+            console.print(f"Executing command: '{' '.join(command)}'")
             try:
                 subprocess.check_call(command, stdout=subprocess.DEVNULL)
             except subprocess.CalledProcessError as ex:
-                print(ex.output.decode())
+                console.print(ex.output.decode())
                 raise Exception("The command returned an error %s", command)
-            print(f"[green]Prepared provider package {provider_package} in 
format {package_format}[/]")
+            console.print(
+                f"[green]Prepared provider package {provider_package} in 
format {package_format}[/]"
+            )
     finally:
         shutil.rmtree(tmp_build_dir, ignore_errors=True)
         shutil.rmtree(tmp_dist_dir, ignore_errors=True)
@@ -2057,35 +2000,148 @@ def 
verify_provider_classes_for_single_provider(imported_classes: List[str], pro
     total, bad = check_if_classes_are_properly_named(entity_summaries)
     bad += sum(len(entity_summary.wrong_entities) for entity_summary in 
entity_summaries.values())
     if bad != 0:
-        print()
-        print(f"[red]There are {bad} errors of {total} entities for 
{provider_package_id}[/]")
-        print()
+        console.print()
+        console.print(f"[red]There are {bad} errors of {total} entities for 
{provider_package_id}[/]")
+        console.print()
     return total, bad
 
 
-def summarise_total_vs_bad(total: int, bad: int):
-    """Summarises Bad/Good class names for providers"""
+def summarise_total_vs_bad_and_warnings(total: int, bad: int, warns: 
List[warnings.WarningMessage]) -> bool:
+    """Summarises Bad/Good class names for providers and warnings"""
+    raise_error = False
     if bad == 0:
-        print()
-        print(f"[green]All good! All {total} entities are properly named[/]")
-        print()
-        print("Totals:")
-        print()
-        print("New:")
-        print()
-        for entity in EntityType:
-            print(f"{entity.value}: {TOTALS[entity][0]}")
-        print()
-        print("Moved:")
-        print()
+        console.print()
+        console.print(f"[green]OK: All {total} entities are properly named[/]")
+        console.print()
+        console.print("Totals:")
+        console.print()
         for entity in EntityType:
-            print(f"{entity.value}: {TOTALS[entity][1]}")
-        print()
+            console.print(f"{entity.value}: {TOTALS[entity]}")
+        console.print()
     else:
-        print()
-        print(f"[red]There are in total: {bad} entities badly named out of 
{total} entities[/]")
-        print()
-        raise Exception("Badly names entities")
+        console.print()
+        console.print(
+            f"[red]ERROR! There are in total: {bad} entities badly named out 
of {total} entities[/]"
+        )
+        console.print()
+        raise_error = True
+    if warns:
+        console.print()
+        console.print("[red]Unknown warnings generated:[/]")
+        console.print()
+        for w in warns:
+            one_line_message = str(w.message).replace('\n', ' ')
+            
console.print(f"{w.filename}:{w.lineno}:[yellow]{one_line_message}[/]")
+        console.print()
+        console.print(f"[red]ERROR! There were {len(warns)} warnings generated 
during the import[/]")
+        console.print()
+        console.print("[yellow]Ideally, fix it, so that no warnings are 
generated during import.[/]")
+        console.print("[yellow]There are two cases that are legitimate 
deprecation warnings though:[/]")
+        console.print("[yellow] 1) when you deprecate whole module or class 
and replace it in provider[/]")
+        console.print("[yellow] 2) when 3rd-party module generates Deprecation 
and you cannot upgrade it[/]")
+        console.print()
+        console.print(
+            "[yellow]In case 1), add the deprecation message to "
+            "the KNOWN_DEPRECATED_DIRECT_IMPORTS in 
prepare_provider_packages.py[/]"
+        )
+        console.print(
+            "[yellow]In case 2), add the deprecation message together with 
module it generates to "
+            "the KNOWN_DEPRECATED_MESSAGES in prepare_provider_packages.py[/]"
+        )
+        console.print()
+        raise_error = True
+    else:
+        console.print()
+        console.print("[green]OK: No warnings generated[/]")
+        console.print()
+
+    if raise_error:
+        console.print("[red]Please fix the problems listed above [/]")
+        return False
+    return True
+
+
+# The set of known deprecation messages that we know about.
+# It contains tuples of "message" and the module that generates the warning - 
so when the
+# Same warning is generated by different module, it is not treated as "known" 
warning.
+KNOWN_DEPRECATED_MESSAGES: Set[Tuple[str, str]] = {
+    (
+        'This version of Apache Beam has not been sufficiently tested on 
Python 3.9. '
+        'You may encounter bugs or missing features.',
+        "apache_beam",
+    ),
+    (
+        "Using or importing the ABCs from 'collections' instead of from 
'collections.abc' is deprecated since"
+        " Python 3.3, and in 3.10 it will stop working",
+        "apache_beam",
+    ),
+    (
+        "Using or importing the ABCs from 'collections' instead of from 
'collections.abc' is deprecated since"
+        " Python 3.3, and in 3.10 it will stop working",
+        "dns",
+    ),
+    (
+        'pyarrow.HadoopFileSystem is deprecated as of 2.0.0, please use 
pyarrow.fs.HadoopFileSystem instead.',
+        "papermill",
+    ),
+    (
+        "You have an incompatible version of 'pyarrow' installed (4.0.1), 
please install a version that "
+        "adheres to: 'pyarrow<3.1.0,>=3.0.0; extra == \"pandas\"'",
+        "apache_beam",
+    ),
+    (
+        "You have an incompatible version of 'pyarrow' installed (4.0.1), 
please install a version that "
+        "adheres to: 'pyarrow<5.1.0,>=5.0.0; extra == \"pandas\"'",
+        "snowflake",
+    ),
+    ("dns.hash module will be removed in future versions. Please use hashlib 
instead.", "dns"),
+    ("PKCS#7 support in pyOpenSSL is deprecated. You should use the APIs in 
cryptography.", "eventlet"),
+    ("PKCS#12 support in pyOpenSSL is deprecated. You should use the APIs in 
cryptography.", "eventlet"),
+    (
+        "the imp module is deprecated in favour of importlib; see the module's 
documentation"
+        " for alternative uses",
+        "hdfs",
+    ),
+    ("This operator is deprecated. Please use 
`airflow.providers.tableau.operators.tableau`.", "salesforce"),
+    (
+        "You have an incompatible version of 'pyarrow' installed (4.0.1), 
please install a version that"
+        " adheres to: 'pyarrow<3.1.0,>=3.0.0; extra == \"pandas\"'",
+        "snowflake",
+    ),
+    ("SelectableGroups dict interface is deprecated. Use select.", "kombu"),
+    ("The module cloudant is now deprecated. The replacement is ibmcloudant.", 
"cloudant"),
+}
+
+# The set of warning messages generated by direct importing of some deprecated 
modules. We should only
+# ignore those messages when the warnings are generated directly by importlib 
- which means that
+# we imported it directly during module walk by the importlib library
+KNOWN_DEPRECATED_DIRECT_IMPORTS: Set[str] = {
+    "This module is deprecated. Please use 
`airflow.providers.amazon.aws.hooks.dynamodb`.",
+    "This module is deprecated. Please use 
`airflow.providers.tableau.operators.tableau_refresh_workbook`.",
+    "This module is deprecated. Please use 
`airflow.providers.tableau.sensors.tableau_job_status`.",
+    "This module is deprecated. Please use 
`airflow.providers.tableau.hooks.tableau`.",
+    "This module is deprecated. Please use 
`kubernetes.client.models.V1Volume`.",
+    "This module is deprecated. Please use 
`kubernetes.client.models.V1VolumeMount`.",
+    'numpy.ufunc size changed, may indicate binary incompatibility. Expected 
192 from C header,'
+    ' got 216 from PyObject',
+}
+
+
+def filter_known_warnings(warn: warnings.WarningMessage) -> bool:
+    msg_string = str(warn.message).replace("\n", " ")
+    for m in KNOWN_DEPRECATED_MESSAGES:
+        expected_package_string = "/" + m[1] + "/"
+        if msg_string == m[0] and warn.filename.find(expected_package_string) 
!= -1:
+            return False
+    return True
+
+
+def filter_direct_importlib_warning(warn: warnings.WarningMessage) -> bool:
+    msg_string = str(warn.message).replace("\n", " ")
+    for m in KNOWN_DEPRECATED_DIRECT_IMPORTS:
+        if msg_string == m and warn.filename.find("/importlib/") != -1:
+            return False
+    return True
 
 
 @cli.command()
@@ -2093,9 +2149,9 @@ def verify_provider_classes():
     """Verifies names for all provider classes."""
     with with_group("Verifies names for all provider classes"):
         provider_ids = get_all_providers()
-        imported_classes = import_all_classes(
+        imported_classes, warns = import_all_classes(
             provider_ids=provider_ids,
-            print_imports=False,
+            print_imports=True,
             paths=[PROVIDERS_PATH],
             prefix="airflow.providers.",
         )
@@ -2107,7 +2163,10 @@ def verify_provider_classes():
             )
             total += inc_total
             bad += inc_bad
-        summarise_total_vs_bad(total, bad)
+        warns = list(filter(filter_known_warnings, warns))
+        warns = list(filter(filter_direct_importlib_warning, warns))
+        if not summarise_total_vs_bad_and_warnings(total, bad, warns):
+            sys.exit(1)
 
 
 def find_insertion_index_for_version(content: List[str], version: str) -> 
Tuple[int, bool]:
@@ -2199,12 +2258,14 @@ def _update_changelog(package_id: str, verbose: bool) 
-> bool:
             verbose,
         )
         if not proceed:
-            print(f"[yellow]The provider {package_id} is not being released. 
Skipping the package.[/]")
+            console.print(
+                f"[yellow]The provider {package_id} is not being released. 
Skipping the package.[/]"
+            )
             return True
         generate_new_changelog(package_id, provider_details, changelog_path, 
changes)
-        print()
-        print(f"Update index.rst for {package_id}")
-        print()
+        console.print()
+        console.print(f"Update index.rst for {package_id}")
+        console.print()
         update_index_rst(jinja_context, package_id, 
provider_details.documentation_provider_package_path)
         return False
 
@@ -2217,7 +2278,7 @@ def generate_new_changelog(package_id, provider_details, 
changelog_path, changes
     insertion_index, append = 
find_insertion_index_for_version(current_changelog_lines, latest_version)
     if append:
         if not changes:
-            print(
+            console.print(
                 f"[green]The provider {package_id} changelog for 
`{latest_version}` "
                 "has first release. Not updating the changelog.[/]"
             )
@@ -2226,7 +2287,7 @@ def generate_new_changelog(package_id, provider_details, 
changelog_path, changes
             change for change in changes[0] if change.pr and "(#" + change.pr 
+ ")" not in current_changelog
         ]
         if not new_changes:
-            print(
+            console.print(
                 f"[green]The provider {package_id} changelog for 
`{latest_version}` "
                 "has no new changes. Not updating the changelog.[/]"
             )
@@ -2250,15 +2311,16 @@ def generate_new_changelog(package_id, 
provider_details, changelog_path, changes
     new_changelog_lines.extend(current_changelog_lines[insertion_index:])
     diff = "\n".join(difflib.context_diff(current_changelog_lines, 
new_changelog_lines, n=5))
     syntax = Syntax(diff, "diff")
-    console = Console(width=200)
     console.print(syntax)
     if not append:
-        print(
+        console.print(
             f"[green]The provider {package_id} changelog for 
`{latest_version}` "
             "version is missing. Generating fresh changelog.[/]"
         )
     else:
-        print(f"[green]Appending the provider {package_id} changelog for" 
f"`{latest_version}` version.[/]")
+        console.print(
+            f"[green]Appending the provider {package_id} changelog for" 
f"`{latest_version}` version.[/]"
+        )
     with open(changelog_path, "wt") as changelog:
         changelog.write("\n".join(new_changelog_lines))
         changelog.write("\n")
@@ -2340,7 +2402,6 @@ def generate_issue_content(package_ids: List[str], 
github_token: str, suffix: st
             excluded_prs = [int(pr) for pr in excluded_pr_list.split(",")]
         else:
             excluded_prs = []
-        console = Console(width=200, color_system="standard")
         all_prs: Set[int] = set()
         provider_prs: Dict[str, List[int]] = {}
         for package_id in package_ids:
diff --git a/docs/apache-airflow-providers-google/operators/cloud/bigquery.rst 
b/docs/apache-airflow-providers-google/operators/cloud/bigquery.rst
index b99971a..5cd2548 100644
--- a/docs/apache-airflow-providers-google/operators/cloud/bigquery.rst
+++ b/docs/apache-airflow-providers-google/operators/cloud/bigquery.rst
@@ -99,23 +99,6 @@ method only replaces fields that are provided in the 
submitted Table resource.
     :start-after: [START howto_operator_bigquery_update_table]
     :end-before: [END howto_operator_bigquery_update_table]
 
-.. _howto/operator:BigQueryPatchDatasetOperator:
-
-Patch dataset
-"""""""""""""
-
-To patch a dataset in BigQuery you can use
-:class:`~airflow.providers.google.cloud.operators.bigquery.BigQueryPatchDatasetOperator`.
-
-Note, this operator only replaces fields that are provided in the submitted 
dataset
-resource.
-
-.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_bigquery_operations.py
-    :language: python
-    :dedent: 4
-    :start-after: [START howto_operator_bigquery_patch_dataset]
-    :end-before: [END howto_operator_bigquery_patch_dataset]
-
 .. _howto/operator:BigQueryUpdateDatasetOperator:
 
 Update dataset
diff --git a/tests/always/test_project_structure.py 
b/tests/always/test_project_structure.py
index d4d8645..5b40d74 100644
--- a/tests/always/test_project_structure.py
+++ b/tests/always/test_project_structure.py
@@ -162,39 +162,39 @@ class 
TestGoogleProviderProjectStructure(unittest.TestCase):
         ('ads', 'ads_to_gcs'),
     }
 
-    MISSING_EXAMPLES_FOR_OPERATORS = {
-        # Deprecated operator. Ignore it.
+    # Those operators are deprecated and we do not need examples for them
+    DEPRECATED_OPERATORS = {
         
'airflow.providers.google.cloud.operators.cloud_storage_transfer_service'
         '.CloudDataTransferServiceS3ToGCSOperator',
-        # Deprecated operator. Ignore it.
         
'airflow.providers.google.cloud.operators.cloud_storage_transfer_service'
         '.CloudDataTransferServiceGCSToGCSOperator',
-        # Deprecated operator. Ignore it.
         
'airflow.providers.google.cloud.sensors.gcs.GCSObjectsWtihPrefixExistenceSensor',
-        # Base operator. Ignore it.
-        
'airflow.providers.google.cloud.operators.cloud_sql.CloudSQLBaseOperator',
-        # Deprecated operator. Ignore it
         
'airflow.providers.google.cloud.operators.dataproc.DataprocSubmitHadoopJobOperator',
-        
'airflow.providers.google.cloud.operators.dataproc.DataprocInstantiateInlineWorkflowTemplateOperator',
-        # Deprecated operator. Ignore it
         
'airflow.providers.google.cloud.operators.dataproc.DataprocScaleClusterOperator',
-        # Base operator. Ignore it
-        
'airflow.providers.google.cloud.operators.dataproc.DataprocJobBaseOperator',
-        # Deprecated operator. Ignore it
         
'airflow.providers.google.cloud.operators.dataproc.DataprocSubmitSparkJobOperator',
-        # Deprecated operator. Ignore it
         
'airflow.providers.google.cloud.operators.dataproc.DataprocSubmitSparkSqlJobOperator',
-        # Deprecated operator. Ignore it
         
'airflow.providers.google.cloud.operators.dataproc.DataprocSubmitHiveJobOperator',
-        # Deprecated operator. Ignore it
         
'airflow.providers.google.cloud.operators.dataproc.DataprocSubmitPigJobOperator',
-        # Deprecated operator. Ignore it
         
'airflow.providers.google.cloud.operators.dataproc.DataprocSubmitPySparkJobOperator',
-        
'airflow.providers.google.cloud.operators.mlengine.MLEngineTrainingCancelJobOperator',
-        # Deprecated operator. Ignore it
         
'airflow.providers.google.cloud.operators.mlengine.MLEngineManageModelOperator',
-        # Deprecated operator. Ignore it
         
'airflow.providers.google.cloud.operators.mlengine.MLEngineManageVersionOperator',
+        
'airflow.providers.google.cloud.operators.dataflow.DataflowCreateJavaJobOperator',
+        
'airflow.providers.google.cloud.operators.bigquery.BigQueryPatchDatasetOperator',
+        
'airflow.providers.google.cloud.operators.dataflow.DataflowCreatePythonJobOperator',
+        
'airflow.providers.google.cloud.operators.bigquery.BigQueryExecuteQueryOperator',
+    }
+
+    # Those operators should not have examples as they are never used 
standalone (they are abstract)
+    BASE_OPERATORS = {
+        
'airflow.providers.google.cloud.operators.compute.ComputeEngineBaseOperator',
+        
'airflow.providers.google.cloud.operators.cloud_sql.CloudSQLBaseOperator',
+        
'airflow.providers.google.cloud.operators.dataproc.DataprocJobBaseOperator',
+    }
+
+    # Please at the examples to those operators at the earliest convenience :)
+    MISSING_EXAMPLES_FOR_OPERATORS = {
+        
'airflow.providers.google.cloud.operators.dataproc.DataprocInstantiateInlineWorkflowTemplateOperator',
+        
'airflow.providers.google.cloud.operators.mlengine.MLEngineTrainingCancelJobOperator',
         
'airflow.providers.google.cloud.operators.dlp.CloudDLPGetStoredInfoTypeOperator',
         
'airflow.providers.google.cloud.operators.dlp.CloudDLPReidentifyContentOperator',
         
'airflow.providers.google.cloud.operators.dlp.CloudDLPCreateDeidentifyTemplateOperator',
@@ -217,8 +217,6 @@ class TestGoogleProviderProjectStructure(unittest.TestCase):
         
'airflow.providers.google.cloud.operators.dlp.CloudDLPRedactImageOperator',
         
'airflow.providers.google.cloud.operators.datastore.CloudDatastoreDeleteOperationOperator',
         
'airflow.providers.google.cloud.operators.datastore.CloudDatastoreGetOperationOperator',
-        # Base operator. Ignore it
-        
'airflow.providers.google.cloud.operators.compute.ComputeEngineBaseOperator',
         'airflow.providers.google.cloud.sensors.gcs.GCSObjectUpdateSensor',
         
'airflow.providers.google.cloud.sensors.gcs.GCSUploadSessionCompleteSensor',
     }
@@ -299,7 +297,11 @@ class 
TestGoogleProviderProjectStructure(unittest.TestCase):
                 print("example_paths=", example_paths)
                 operators_paths = 
set(get_classes_from_file(f"{ROOT_FOLDER}/{filepath}"))
                 missing_operators.extend(operators_paths - example_paths)
-        assert set(missing_operators) == self.MISSING_EXAMPLES_FOR_OPERATORS
+        full_set = set()
+        full_set.update(self.MISSING_EXAMPLES_FOR_OPERATORS)
+        full_set.update(self.DEPRECATED_OPERATORS)
+        full_set.update(self.BASE_OPERATORS)
+        assert set(missing_operators) == full_set
 
     @parameterized.expand(
         itertools.product(["_system.py", "_system_helper.py"], ["operators", 
"sensors", "transfers"])
diff --git a/tests/providers/google/cloud/operators/test_mlengine_utils.py 
b/tests/providers/google/cloud/operators/test_mlengine_utils.py
index 37a753a..27f0886 100644
--- a/tests/providers/google/cloud/operators/test_mlengine_utils.py
+++ b/tests/providers/google/cloud/operators/test_mlengine_utils.py
@@ -106,34 +106,16 @@ class TestCreateEvaluateOps(unittest.TestCase):
             )
             assert success_message['predictionOutput'] == result
 
-        with patch(
-            'airflow.providers.google.cloud.operators.dataflow.DataflowHook'
-        ) as mock_dataflow_hook, patch(
-            'airflow.providers.google.cloud.operators.dataflow.BeamHook'
-        ) as mock_beam_hook:
-            dataflow_hook_instance = mock_dataflow_hook.return_value
-            dataflow_hook_instance.start_python_dataflow.return_value = None
+        with patch('airflow.providers.apache.beam.operators.beam.BeamHook') as 
mock_beam_hook:
             beam_hook_instance = mock_beam_hook.return_value
             summary.execute(None)
-            mock_dataflow_hook.assert_called_once_with(
-                gcp_conn_id='google_cloud_default',
-                delegate_to=None,
-                poll_sleep=10,
-                drain_pipeline=False,
-                cancel_timeout=600,
-                wait_until_finished=None,
-                impersonation_chain=None,
-            )
-            mock_beam_hook.assert_called_once_with(runner="DataflowRunner")
+            mock_beam_hook.assert_called_once_with(runner="DirectRunner")
             beam_hook_instance.start_python_pipeline.assert_called_once_with(
                 variables={
                     'prediction_path': 'gs://legal-bucket/fake-output-path',
                     'labels': {'airflow-version': TEST_VERSION},
                     'metric_keys': 'err',
                     'metric_fn_encoded': self.metric_fn_encoded,
-                    'project': 'test-project',
-                    'region': 'us-central1',
-                    'job_name': mock.ANY,
                 },
                 py_file=mock.ANY,
                 py_options=[],
@@ -142,9 +124,6 @@ class TestCreateEvaluateOps(unittest.TestCase):
                 py_system_site_packages=False,
                 process_line_callback=mock.ANY,
             )
-            dataflow_hook_instance.wait_for_done.assert_called_once_with(
-                job_name=mock.ANY, location='us-central1', job_id=mock.ANY, 
multiple_jobs=False
-            )
 
         with 
patch('airflow.providers.google.cloud.utils.mlengine_operator_utils.GCSHook') 
as mock_gcs_hook:
             hook_instance = mock_gcs_hook.return_value
diff --git a/tests/providers/google/cloud/utils/test_mlengine_operator_utils.py 
b/tests/providers/google/cloud/utils/test_mlengine_operator_utils.py
index 5e19c17..b50815c 100644
--- a/tests/providers/google/cloud/utils/test_mlengine_operator_utils.py
+++ b/tests/providers/google/cloud/utils/test_mlengine_operator_utils.py
@@ -27,8 +27,8 @@ import pytest
 from airflow.exceptions import AirflowException
 from airflow.models import DAG
 from airflow.operators.python import PythonOperator
+from airflow.providers.apache.beam.operators.beam import 
BeamRunPythonPipelineOperator
 from airflow.providers.google.cloud.hooks.gcs import GCSHook
-from airflow.providers.google.cloud.operators.dataflow import 
DataflowCreatePythonJobOperator
 from airflow.providers.google.cloud.utils.mlengine_operator_utils import 
create_evaluate_ops
 
 TASK_PREFIX = "test-task-prefix"
@@ -93,8 +93,8 @@ def validate_err_and_count(summary):
 
 class TestMlengineOperatorUtils(unittest.TestCase):
     @mock.patch.object(PythonOperator, "set_upstream")
-    @mock.patch.object(DataflowCreatePythonJobOperator, "set_upstream")
-    def test_create_evaluate_ops(self, mock_dataflow, mock_python):
+    @mock.patch.object(BeamRunPythonPipelineOperator, "set_upstream")
+    def test_create_evaluate_ops(self, mock_beam_pipeline, mock_python):
         result = create_evaluate_ops(
             task_prefix=TASK_PREFIX,
             data_format=DATA_FORMAT,
@@ -111,7 +111,7 @@ class TestMlengineOperatorUtils(unittest.TestCase):
 
         evaluate_prediction, evaluate_summary, evaluate_validation = result
 
-        mock_dataflow.assert_called_once_with(evaluate_prediction)
+        mock_beam_pipeline.assert_called_once_with(evaluate_prediction)
         mock_python.assert_called_once_with(evaluate_summary)
 
         assert TASK_PREFIX_PREDICTION == evaluate_prediction.task_id
@@ -124,17 +124,17 @@ class TestMlengineOperatorUtils(unittest.TestCase):
         assert MODEL_URI == evaluate_prediction._uri
 
         assert TASK_PREFIX_SUMMARY == evaluate_summary.task_id
-        assert DATAFLOW_OPTIONS == evaluate_summary.dataflow_default_options
-        assert PREDICTION_PATH == evaluate_summary.options["prediction_path"]
-        assert METRIC_FN_ENCODED == 
evaluate_summary.options["metric_fn_encoded"]
-        assert METRIC_KEYS_EXPECTED == evaluate_summary.options["metric_keys"]
+        assert DATAFLOW_OPTIONS == evaluate_summary.default_pipeline_options
+        assert PREDICTION_PATH == 
evaluate_summary.pipeline_options["prediction_path"]
+        assert METRIC_FN_ENCODED == 
evaluate_summary.pipeline_options["metric_fn_encoded"]
+        assert METRIC_KEYS_EXPECTED == 
evaluate_summary.pipeline_options["metric_keys"]
 
         assert TASK_PREFIX_VALIDATION == evaluate_validation.task_id
         assert PREDICTION_PATH == 
evaluate_validation.templates_dict["prediction_path"]
 
     @mock.patch.object(PythonOperator, "set_upstream")
-    @mock.patch.object(DataflowCreatePythonJobOperator, "set_upstream")
-    def test_create_evaluate_ops_model_and_version_name(self, mock_dataflow, 
mock_python):
+    @mock.patch.object(BeamRunPythonPipelineOperator, "set_upstream")
+    def test_create_evaluate_ops_model_and_version_name(self, 
mock_beam_pipeline, mock_python):
         result = create_evaluate_ops(
             task_prefix=TASK_PREFIX,
             data_format=DATA_FORMAT,
@@ -152,7 +152,7 @@ class TestMlengineOperatorUtils(unittest.TestCase):
 
         evaluate_prediction, evaluate_summary, evaluate_validation = result
 
-        mock_dataflow.assert_called_once_with(evaluate_prediction)
+        mock_beam_pipeline.assert_called_once_with(evaluate_prediction)
         mock_python.assert_called_once_with(evaluate_summary)
 
         assert TASK_PREFIX_PREDICTION == evaluate_prediction.task_id
@@ -166,16 +166,16 @@ class TestMlengineOperatorUtils(unittest.TestCase):
         assert VERSION_NAME == evaluate_prediction._version_name
 
         assert TASK_PREFIX_SUMMARY == evaluate_summary.task_id
-        assert DATAFLOW_OPTIONS == evaluate_summary.dataflow_default_options
-        assert PREDICTION_PATH == evaluate_summary.options["prediction_path"]
-        assert METRIC_FN_ENCODED == 
evaluate_summary.options["metric_fn_encoded"]
-        assert METRIC_KEYS_EXPECTED == evaluate_summary.options["metric_keys"]
+        assert DATAFLOW_OPTIONS == evaluate_summary.default_pipeline_options
+        assert PREDICTION_PATH == 
evaluate_summary.pipeline_options["prediction_path"]
+        assert METRIC_FN_ENCODED == 
evaluate_summary.pipeline_options["metric_fn_encoded"]
+        assert METRIC_KEYS_EXPECTED == 
evaluate_summary.pipeline_options["metric_keys"]
 
         assert TASK_PREFIX_VALIDATION == evaluate_validation.task_id
         assert PREDICTION_PATH == 
evaluate_validation.templates_dict["prediction_path"]
 
     @mock.patch.object(PythonOperator, "set_upstream")
-    @mock.patch.object(DataflowCreatePythonJobOperator, "set_upstream")
+    @mock.patch.object(BeamRunPythonPipelineOperator, "set_upstream")
     def test_create_evaluate_ops_dag(self, mock_dataflow, mock_python):
         result = create_evaluate_ops(
             task_prefix=TASK_PREFIX,
@@ -204,18 +204,18 @@ class TestMlengineOperatorUtils(unittest.TestCase):
         assert VERSION_NAME == evaluate_prediction._version_name
 
         assert TASK_PREFIX_SUMMARY == evaluate_summary.task_id
-        assert DATAFLOW_OPTIONS == evaluate_summary.dataflow_default_options
-        assert PREDICTION_PATH == evaluate_summary.options["prediction_path"]
-        assert METRIC_FN_ENCODED == 
evaluate_summary.options["metric_fn_encoded"]
-        assert METRIC_KEYS_EXPECTED == evaluate_summary.options["metric_keys"]
+        assert DATAFLOW_OPTIONS == evaluate_summary.default_pipeline_options
+        assert PREDICTION_PATH == 
evaluate_summary.pipeline_options["prediction_path"]
+        assert METRIC_FN_ENCODED == 
evaluate_summary.pipeline_options["metric_fn_encoded"]
+        assert METRIC_KEYS_EXPECTED == 
evaluate_summary.pipeline_options["metric_keys"]
 
         assert TASK_PREFIX_VALIDATION == evaluate_validation.task_id
         assert PREDICTION_PATH == 
evaluate_validation.templates_dict["prediction_path"]
 
     @mock.patch.object(GCSHook, "download")
     @mock.patch.object(PythonOperator, "set_upstream")
-    @mock.patch.object(DataflowCreatePythonJobOperator, "set_upstream")
-    def test_apply_validate_fn(self, mock_dataflow, mock_python, 
mock_download):
+    @mock.patch.object(BeamRunPythonPipelineOperator, "set_upstream")
+    def test_apply_validate_fn(self, mock_beam_pipeline, mock_python, 
mock_download):
         result = create_evaluate_ops(
             task_prefix=TASK_PREFIX,
             data_format=DATA_FORMAT,

Reply via email to