kanagaraj-dhanapal-89 commented on code in PR #45954: URL: https://github.com/apache/airflow/pull/45954#discussion_r1927076325
########## providers/src/airflow/providers/salesforce/hooks/salesforce.py: ########## @@ -151,6 +151,7 @@ def conn(self) -> api.Salesforce: session=self.session, client_id=self._get_field(extras, "client_id") or None, consumer_key=self._get_field(extras, "consumer_key") or None, + consumer_secret=self._get_field(extras, "consumer_secret") or None, Review Comment: We have a custom Salesforce API Fetch Operator that fetches Salesforce objects using SQL queries. To use this operator, we need to pass the consumer key from the Airflow Connections extra `class SalesforceApiDataExtractionOperator(BaseOperator): def __init__(self, task_id: str, query: str, upload_path:str, gcp_conn_id: str, salesforce_conn_id: str = "salesforce_conn_id", include_deleted: bool = False, query_params: dict | None = None, **kwargs ): self.upload_path = upload_path self.gcp_conn_id = gcp_conn_id self.task_id = task_id self.salesforce_conn_id = salesforce_conn_id self.include_deleted = include_deleted self.query_params = query_params self.query = query super().__init__(task_id=task_id,**kwargs) def __get_local_storage_location(self): _base_loc = os.path.join(os.getenv("HOME"), self.task_id) if not os.path.exists(_base_loc): os.mkdir(_base_loc) return _base_loc def __clean_up(self, df): self.log.info("started data clean up") df = df.replace("~", " ").replace("\n", " ").replace("\r", "") self.log.info("data clean up completed") return df def __write_local_file(self, df:DataFrame): file_name = os.path.basename(self.upload_path) local_folder_path = self.__get_local_storage_location().rstrip('/') local_file_path = f"{local_folder_path}/{file_name}" df.to_csv(local_file_path, header=False, sep="~") return local_file_path def __move_to_gcs(self, file_path): hook = GCSHook(gcp_conn_id=self.gcp_conn_id) bucket_name = self.upload_path.replace("gs://", "").split("/")[0] self.log.info(f"bucket: {bucket_name}") gcs_file_path = "/".join(self.upload_path.replace("gs://", "").split("/")[1:]) self.log.info(f"upload file: {gcs_file_path}") hook.upload( bucket_name=bucket_name, object_name=gcs_file_path, filename=file_path ) def execute(self, context: Context): if context["params"].get("sf_filter", None): query = self.query + " " + context["params"]["sf_filter"] else: query = self.query sf_hook = SalesforceHook(salesforce_conn_id=self.salesforce_conn_id) results = sf_hook.make_query( query=query, include_deleted=self.include_deleted, query_params=self.query_params ) df = sf_hook.object_to_df(results["records"]) df = self.__clean_up(df) file_path = self.__write_local_file(df) self.__move_to_gcs(file_path)` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org