kanagaraj-dhanapal-89 commented on code in PR #45954:
URL: https://github.com/apache/airflow/pull/45954#discussion_r1927076325
##########
providers/src/airflow/providers/salesforce/hooks/salesforce.py:
##########
@@ -151,6 +151,7 @@ def conn(self) -> api.Salesforce:
session=self.session,
client_id=self._get_field(extras, "client_id") or None,
consumer_key=self._get_field(extras, "consumer_key") or None,
+ consumer_secret=self._get_field(extras, "consumer_secret") or None,
Review Comment:
We have a custom Salesforce API Fetch Operator that fetches Salesforce
objects using SQL queries. To use this operator, we need to pass the consumer
key from the Airflow Connections extra
`class SalesforceApiDataExtractionOperator(BaseOperator):
def __init__(self,
task_id: str,
query: str,
upload_path:str,
gcp_conn_id: str,
salesforce_conn_id: str = "salesforce_conn_id",
include_deleted: bool = False,
query_params: dict | None = None,
**kwargs
):
self.upload_path = upload_path
self.gcp_conn_id = gcp_conn_id
self.task_id = task_id
self.salesforce_conn_id = salesforce_conn_id
self.include_deleted = include_deleted
self.query_params = query_params
self.query = query
super().__init__(task_id=task_id,**kwargs)
def __get_local_storage_location(self):
_base_loc = os.path.join(os.getenv("HOME"), self.task_id)
if not os.path.exists(_base_loc):
os.mkdir(_base_loc)
return _base_loc
def __clean_up(self, df):
self.log.info("started data clean up")
df = df.replace("~", " ").replace("\n", " ").replace("\r", "")
self.log.info("data clean up completed")
return df
def __write_local_file(self, df:DataFrame):
file_name = os.path.basename(self.upload_path)
local_folder_path = self.__get_local_storage_location().rstrip('/')
local_file_path = f"{local_folder_path}/{file_name}"
df.to_csv(local_file_path, header=False, sep="~")
return local_file_path
def __move_to_gcs(self, file_path):
hook = GCSHook(gcp_conn_id=self.gcp_conn_id)
bucket_name = self.upload_path.replace("gs://", "").split("/")[0]
self.log.info(f"bucket: {bucket_name}")
gcs_file_path = "/".join(self.upload_path.replace("gs://",
"").split("/")[1:])
self.log.info(f"upload file: {gcs_file_path}")
hook.upload(
bucket_name=bucket_name,
object_name=gcs_file_path,
filename=file_path
)
def execute(self, context: Context):
if context["params"].get("sf_filter", None):
query = self.query + " " + context["params"]["sf_filter"]
else:
query = self.query
sf_hook = SalesforceHook(salesforce_conn_id=self.salesforce_conn_id)
results = sf_hook.make_query(
query=query,
include_deleted=self.include_deleted,
query_params=self.query_params
)
df = sf_hook.object_to_df(results["records"])
df = self.__clean_up(df)
file_path = self.__write_local_file(df)
self.__move_to_gcs(file_path)`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]