kanagaraj-dhanapal-89 commented on code in PR #45954:
URL: https://github.com/apache/airflow/pull/45954#discussion_r1927076325


##########
providers/src/airflow/providers/salesforce/hooks/salesforce.py:
##########
@@ -151,6 +151,7 @@ def conn(self) -> api.Salesforce:
             session=self.session,
             client_id=self._get_field(extras, "client_id") or None,
             consumer_key=self._get_field(extras, "consumer_key") or None,
+            consumer_secret=self._get_field(extras, "consumer_secret") or None,

Review Comment:
   We have a custom Salesforce API Fetch Operator that fetches Salesforce 
objects using SQL queries. To use this operator, we need to pass the consumer 
key from the Airflow Connections extra
   
   `class SalesforceApiDataExtractionOperator(BaseOperator):
   
       def __init__(self,
                    task_id: str,
                    query: str,
                    upload_path:str,
                    gcp_conn_id: str,
                    salesforce_conn_id: str = "salesforce_conn_id",
                    include_deleted: bool = False,
                    query_params: dict | None = None,
                    **kwargs
   
       ):
           self.upload_path = upload_path
           self.gcp_conn_id = gcp_conn_id
           self.task_id = task_id
           self.salesforce_conn_id = salesforce_conn_id
           self.include_deleted = include_deleted
           self.query_params = query_params
           self.query = query
   
           super().__init__(task_id=task_id,**kwargs)
   
       def __get_local_storage_location(self):
           _base_loc = os.path.join(os.getenv("HOME"), self.task_id)
           if not os.path.exists(_base_loc):
               os.mkdir(_base_loc)
           return _base_loc
   
       def __clean_up(self, df):
           self.log.info("started data clean up")
           df = df.replace("~", " ").replace("\n", " ").replace("\r", "")
           self.log.info("data clean up completed")
           return df
       def __write_local_file(self, df:DataFrame):
   
           file_name = os.path.basename(self.upload_path)
           local_folder_path = self.__get_local_storage_location().rstrip('/')
           local_file_path = f"{local_folder_path}/{file_name}"
           df.to_csv(local_file_path, header=False, sep="~")
   
           return local_file_path
   
       def __move_to_gcs(self, file_path):
           hook = GCSHook(gcp_conn_id=self.gcp_conn_id)
           bucket_name = self.upload_path.replace("gs://", "").split("/")[0]
           self.log.info(f"bucket: {bucket_name}")
           gcs_file_path = "/".join(self.upload_path.replace("gs://", 
"").split("/")[1:])
           self.log.info(f"upload file: {gcs_file_path}")
           hook.upload(
               bucket_name=bucket_name,
               object_name=gcs_file_path,
               filename=file_path
           )
   
       def execute(self, context: Context):
           if context["params"].get("sf_filter", None):
               query = self.query + " " + context["params"]["sf_filter"]
           else:
               query = self.query
           sf_hook = SalesforceHook(salesforce_conn_id=self.salesforce_conn_id)
           results = sf_hook.make_query(
               query=query,
               include_deleted=self.include_deleted,
               query_params=self.query_params
           )
           df = sf_hook.object_to_df(results["records"])
           df = self.__clean_up(df)
           file_path = self.__write_local_file(df)
           self.__move_to_gcs(file_path)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to