turbaszek commented on a change in pull request #11111:
URL: https://github.com/apache/airflow/pull/11111#discussion_r493917075
##########
File path: airflow/providers/google/cloud/operators/bigquery.py
##########
@@ -2109,3 +2127,268 @@ def execute(self, context: Any):
def on_kill(self):
if self.job_id and self.cancel_on_kill:
self.hook.cancel_job(job_id=self.job_id,
project_id=self.project_id, location=self.location)
+
+
+class GreatExpectationsBigQueryOperator(BaseOperator):
+ """
+ Use Great Expectations to validate data expectations against a
BigQuery table or the result of a SQL query.
+ The expectations need to be stored in a JSON file sitting in an
accessible GCS bucket. The validation results
+ are output to GCS in both JSON and HTML formats.
+ Here's the current list of expectations types:
+
https://docs.greatexpectations.io/en/latest/reference/glossary_of_expectations.html
+ Here's how to create expectations files:
+
https://docs.greatexpectations.io/en/latest/guides/tutorials/how_to_create_expectations.html
+
+ :param gcp_project: The GCP project which houses the GCS buckets
where the expectations files are stored
+ and where the validation files & data docs will be output (e.g.
HTML docs showing if the data matches
+ expectations).
+ :type gcp_project: str
+ :param expectations_file_name: The name of the JSON file containing
the expectations for the data.
+ :type expectations_file_name: str
+ :param gcs_bucket: Google Cloud Storage bucket where expectation
files are stored and where validation outputs
+ and data docs will be saved.
+ (e.g.
gs://<gcs_bucket>/<gcs_expectations_prefix>/<expectations_file_name>
+ gs://mybucket/myprefix/myexpectationsfile.json )
+ :type gcs_bucket: str
+ :param gcs_expectations_prefix: Google Cloud Storage prefix where the
expectations file can be found.
+ (e.g. 'ge/expectations')
+ :type gcs_expectations_prefix: str
+ :param gcs_validations_prefix: Google Cloud Storage prefix where the
validation output files should be saved.
+ (e.g. 'ge/validations')
+ :type gcs_validations_prefix: str
+ :param gcs_datadocs_prefix: Google Cloud Storage prefix where the
validation datadocs files should be saved.
+ (e.g. 'ge/datadocs')
+ :type gcs_datadocs_prefix: str
+ :param validation_type: For the set of data to be validated (i.e.
compared against expectations), is it already
+ sitting in a BigQuery table or do you want to validate the data
returned by a SQL query? The options are
+ 'TABLE' or 'SQL'.
+ :type validation_type: str
+ :param validation_type_input: The name of the BigQuery table
(dataset_name.table_name) if the validation_type
+ is 'TABLE' or the SQL query string if the validation_type is 'SQL'.
+ :type validation_type_input: str
+ :param bigquery_conn_id: Name of the BigQuery connection that contains
the connection and credentials
+ info needed to connect to BigQuery.
+ :type bigquery_conn_id: str
+ :param bq_dataset_name: The name of the BigQuery data set where any
temp tables will be created that are needed
+ as part of the GE validation process.
+ :type bq_dataset_name: str
+ :param send_alert_email: Send an alert email if one or more
expectations fail to be met? Defaults to True.
+ :type send_alert_email: boolean
+ :param include_datadocs_link_in_email: Include in the alert email a
link to the data doc in GCS that shows the
+ validation results? Defaults to False because there's extra setup
needed to serve HTML data docs stored in
+ GCS. When set to False, only a GCS path to the results are
included in the email.
+ Execute the following steps if you want a clickable link for the
data doc to be included in the
+ email:
+ 1) Set up a GAE app to serve the data docs by following the
instructions here:
+
https://docs.greatexpectations.io/en/latest/guides/how_to_guides/configuring_data_docs/how_to_host_and_share_data_docs_on_gcs.html
+ 2) Create an Airflow variable called
'great_expectations_datadocs_domain' with a value set to the domain
+ google creates in step 1 to serve the data docs (e.g.
'ge-data-docs-dot-my-gcp-project.ue.r.appspot.com').
+ This operator will look for it if
include_datadocs_link_in_email=True
+ :type include_datadocs_link_in_email: boolean
+ :param email_to: Email address to receive any alerts when
expectations are not met.
+ :type email_to: str
+ :param fail_if_expectations_not_met: Fail the Airflow task if
expectations are not met? Defaults to True.
+ :type fail_if_expectations_not_met: boolean
+ """
+
+ @apply_defaults
+ def __init__(self, gcp_project, expectations_file_name, gcs_bucket,
gcs_expectations_prefix,
+ gcs_validations_prefix, gcs_datadocs_prefix, validation_type,
validation_type_input,
+ bq_dataset_name, email_to, send_alert_email=True,
include_datadocs_link_in_email=False,
+ fail_if_expectations_not_met=True,
+ bigquery_conn_id='bigquery_default',
+ *args, **kwargs):
+ self.expectations_file_name = expectations_file_name
+ if validation_type.upper() not in VALID_TYPE:
+ raise AirflowException("argument 'validation_type' must be one of
%r." % VALID_TYPE)
+ self.validation_type = validation_type
+ self.validation_type_input = validation_type_input
+ self.bigquery_conn_id = bigquery_conn_id
+ self.bq_dataset_name = bq_dataset_name
+ self.gcp_project = gcp_project
+ self.gcs_bucket = gcs_bucket
+ self.gcs_expectations_prefix = gcs_expectations_prefix
+ self.gcs_validations_prefix = gcs_validations_prefix
+ self.gcs_datadocs_prefix = gcs_datadocs_prefix
+ self.email_to = email_to
+ self.send_alert_email = send_alert_email
+ self.include_datadocs_link_in_email = include_datadocs_link_in_email
+ self.fail_if_expectations_not_met = fail_if_expectations_not_met
+
+ super(GreatExpectationsBigQueryOperator, self).__init__(*args,
**kwargs)
+
+ def execute(self, context):
+ conn = BaseHook.get_connection(self.bigquery_conn_id)
+ connectionJson = json.loads(conn.extra)
+ log.info('####### validation_type_input
{}'.format(self.validation_type_input))
+
+ project_config = DataContextConfig(
+ config_version=2,
+ datasources={
+ "bq_datasource": {
+ "credentials": {
+ "url": "bigquery://" + connectionJson[
+ 'extra__google_cloud_platform__project'] + "/" +
self.bq_dataset_name + "?credentials_path=" +
+
connectionJson['extra__google_cloud_platform__key_path']
+ },
+ "class_name": "SqlAlchemyDatasource",
+ "module_name": "great_expectations.datasource",
+ "data_asset_type": {
+ "module_name": "great_expectations.dataset",
+ "class_name": "SqlAlchemyDataset"
+ }
+ }
+ },
+ expectations_store_name="expectations_GCS_store",
+ validations_store_name="validations_GCS_store",
+ evaluation_parameter_store_name="evaluation_parameter_store",
+ plugins_directory=None,
+ validation_operators={
+ "action_list_operator": {
+ "class_name": "ActionListValidationOperator",
+ "action_list": [
+ {
+ "name": "store_validation_result",
+ "action": {"class_name":
"StoreValidationResultAction"},
+ },
+ {
+ "name": "store_evaluation_params",
+ "action": {"class_name":
"StoreEvaluationParametersAction"},
+ },
+ {
+ "name": "update_data_docs",
+ "action": {"class_name": "UpdateDataDocsAction"},
+ },
+ ],
+ }
+ },
+ stores={
+ 'expectations_GCS_store': {
+ 'class_name': 'ExpectationsStore',
+ 'store_backend': {
+ 'class_name': 'TupleGCSStoreBackend',
+ 'project': self.gcp_project,
+ 'bucket': self.gcs_bucket,
+ 'prefix': self.gcs_expectations_prefix
+ }
+ },
+ 'validations_GCS_store': {
+ 'class_name': 'ValidationsStore',
+ 'store_backend': {
+ 'class_name': 'TupleGCSStoreBackend',
+ 'project': self.gcp_project,
+ 'bucket': self.gcs_bucket,
+ 'prefix': self.gcs_validations_prefix
+ }
+ },
+ "evaluation_parameter_store": {"class_name":
"EvaluationParameterStore"},
+ },
+ data_docs_sites={
+ "GCS_site": {
+ "class_name": "SiteBuilder",
+ "store_backend": {
+ "class_name": "TupleGCSStoreBackend",
+ "project": self.gcp_project,
+ "bucket": self.gcs_bucket,
+ 'prefix': self.gcs_datadocs_prefix
+ },
+ "site_index_builder": {
+ "class_name": "DefaultSiteIndexBuilder",
+ },
+ }
+ },
+ config_variables_file_path=None,
+ commented_map=None,
+ )
+ data_context = BaseDataContext(project_config=project_config)
+
+ # Tell GE how to fetch the batch of data that should be validated.
+ batch_kwargs = {
+ "datasource": "bq_datasource",
+ }
+ if self.validation_type == VALIDATIONS.SQL.name:
+ batch_kwargs["query"] = self.validation_type_input
+ batch_kwargs["data_asset_name"] = self.bq_dataset_name
+ batch_kwargs["bigquery_temp_table"] = self.get_temp_table_name()
+ elif self.validation_type == VALIDATIONS.TABLE.name:
+ batch_kwargs["table"] = self.validation_type_input
+ batch_kwargs["data_asset_name"] = self.bq_dataset_name
+
+ log.info("batch_kwargs: " + str(batch_kwargs))
+
+ log.info("Loading expectations...")
+ suite =
data_context.get_expectation_suite((self.expectations_file_name.rsplit(".",
1)[0]))
+
+ log.info("Getting the batch of data to be validated...")
+ batch = data_context.get_batch(batch_kwargs, suite)
+
+ run_id = {
+ "run_name": 'bq',
+ "run_time": datetime.datetime.now(datetime.timezone.utc)
+ }
+
+ log.info("Validating batch against expectations...")
+ results = data_context.run_validation_operator(
+ "action_list_operator",
+ assets_to_validate=[batch],
+ run_id=run_id)
+
+ validation_result_identifier = list(results['run_results'].keys())[0]
+ # For the given validation_result_identifier, get a link to the data
docs that were generated by Great
+ # Expectations as part of the validation.
+ data_docs_url = \
+
data_context.get_docs_sites_urls(resource_identifier=validation_result_identifier,
site_name='GCS_site')[0][
+ 'site_url']
+ log.info("Data docs url is: " + data_docs_url)
+ if results["success"]:
+ self.log.info('All expectations met')
+ else:
+ self.log.info('One or more expectations were not met.')
+ if self.send_alert_email:
+ self.log.info('Sending alert email...')
+ self.send_alert(data_docs_url)
+ if self.fail_if_expectations_not_met:
+ raise AirflowException('One or more expectations were not met')
+
+ # Generate a unique name for a temporary BQ table
+ def get_temp_table_name(self):
+ now = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
+ name_start = "temp_great_expectations_" + now + '_'
+ full_name = name_start + ''.join(random.choices(string.ascii_uppercase
+
+ string.digits, k=5))
+ log.info("Generated name for temporary table: " + full_name)
+ return full_name
+
+ def send_alert(self, data_docs_url):
+ if self.include_datadocs_link_in_email:
+ # From an Airflow variable set by the user, get the domain name of
the service serving the data docs.
+ domain = Variable.get("great_expectations_datadocs_domain")
+ # Replace the domain returned by GE with the domain set up to
serve the data docs
+ parsed = urlsplit(data_docs_url)
+ new_url = parsed._replace(netloc=domain)
+ results = ' See the results <a href=' + new_url.geturl() +
'>here</a>.'
+ else:
+ # From the data docs url, pull out just the GCS path and send it
to the users in the email.
+ parsed = urlsplit(data_docs_url)
+ results = ' See the following GCS location for results:' +
parsed.path
+ email_content = '''
+ <html>
+ <head>
+ <meta charset="utf-8">
+ </head>
+ <body style="background-color: #fafafa; font-family: Roboto,
sans-serif=;">
+ <div style="width: 600px; margin:0 auto;">
+ <div style="background-color: white; border-top: 4px
solid #22a667; border-left: 1px solid #eee; border-right: 1px solid #eee;
border-radius: 6px 6px 0 0; height: 24px;"></div>
+ <div style="background-color: white; border-left:
1px solid #eee; border-right: 1px solid #eee; padding: 0 24px; overflow:
hidden;">
+ <div style="margin-left: 35px;">
+ Great Expectations Alert<br>
+ One or more data expectations were not met in
the {0} file. {1}
+ </div>
+ </body>
+ </html>
+ '''.format(self.expectations_file_name, results)
+ send_email(self.email_to, 'expectations in ' +
self.expectations_file_name + ' not met', email_content,
+ files=None, cc=None, bcc=None,
+ mime_subtype='mixed', mime_charset='us_ascii')
+ B
Review comment:
```suggestion
```
Not sure about it
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]