nuclearpinguin commented on a change in pull request #7692: [AIRFLOW-6732] Add GoogleAdsHook and GoogleAdsToGcsOperator URL: https://github.com/apache/airflow/pull/7692#discussion_r391087275
########## File path: airflow/providers/google/marketing_platform/operators/google_ads.py ########## @@ -0,0 +1,136 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +This module contains Google Ad to GCS operators. +""" +import csv +from operator import attrgetter +from tempfile import NamedTemporaryFile +from typing import Dict, List + +from airflow.models import BaseOperator +from airflow.providers.google.cloud.hooks.gcs import GCSHook +from airflow.providers.google.marketing_platform.hooks.google_ads import GoogleAdsHook +from airflow.utils.decorators import apply_defaults + + +class GoogleAdsToGcsOperator(BaseOperator): + """ + Fetches the daily results from the Google Ads API for 1-n clients + Converts and saves the data as a temporary CSV file + Uploads the CSV to Google Cloud Storage + + :param client_ids: Google Ads client IDs to query + :type client_ids: List[str] + :param query: Google Ads Query Language API query + :type query: str + :param attributes: List of Google Ads Row attributes to extract + :type attributes: List[str] + :param bucket: The GCS bucket to upload to + :type bucket: str + :param obj: GCS path to save the object. Must be the full file path (ex. `path/to/file.txt`) + :type obj: str + :param gcp_conn_id: Airflow Google Cloud Platform connection ID + :type gcp_conn_id: str + :param google_ads_conn_id: Airflow Google Ads connection ID + :type google_ads_conn_id: str + :param page_size: The number of results per API page request. Max 10,000 + :type page_size: int + :param gzip: Option to compress local file or file data for upload + :type gzip: bool + + :Example: + google_ads_query = \""" + SELECT segments.date, customer.id, campaign.id, metrics.impressions + FROM ad_group_ad + WHERE segments.date >= '2020-01-01' + \""" + + GoogleAdsToGcsOperator( + client_ids=["123456", "11223344"], + query=google_ads_query, + attributes=[ + "segments.date.value", + "customer.id.value", + "campaign.id.value", + "metrics.impressions.value"], + bucket="bucket_name", + obj="the_path/to_save_the_file_to/results.csv" + ) + """ + + template_fields = ("client_ids", "query", "attributes", "bucket", "obj") + + @apply_defaults + def __init__( + self, + client_ids: List[str], + query: str, + attributes: List[str], + bucket: str, + obj: str, + gcp_conn_id: str = "google_cloud_default", + google_ads_conn_id: str = "google_ads_default", + page_size: int = 10000, + gzip: bool = False, + *args, + **kwargs, + ) -> None: + super().__init__(*args, **kwargs) + self.client_ids = client_ids + self.query = query + self.attributes = attributes + self.bucket = bucket + self.obj = obj + self.gcp_conn_id = gcp_conn_id + self.google_ads_conn_id = google_ads_conn_id + self.page_size = page_size + self.gzip = gzip + + def execute(self, context: Dict): + """ + Gets the authenticated Google Ads service and queries the API for each client + Converts the returned Google Ad Rows objects to CSV and uploads to GCS + """ + + service = GoogleAdsHook(self.gcp_conn_id, self.google_ads_conn_id) + rows = service.search( + client_ids=self.client_ids, query=self.query, page_size=self.page_size + ) + + try: + getter = attrgetter(*self.attributes) + converted_rows = [getter(row) for row in rows] Review comment: Nice one! ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
