Re: [PR] Update salesforce.py - Salesforce: Implement fetching consumer_secret from Airflow Connections [airflow]

2025-01-23 Thread via GitHub


kanagaraj-dhanapal-89 commented on code in PR #45954:
URL: https://github.com/apache/airflow/pull/45954#discussion_r1927124200


##
providers/src/airflow/providers/salesforce/hooks/salesforce.py:
##
@@ -151,6 +151,7 @@ def conn(self) -> api.Salesforce:
 session=self.session,
 client_id=self._get_field(extras, "client_id") or None,
 consumer_key=self._get_field(extras, "consumer_key") or None,
+consumer_secret=self._get_field(extras, "consumer_secret") or None,

Review Comment:
   Code implemented and documentation updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Update salesforce.py - Salesforce: Implement fetching consumer_secret from Airflow Connections [airflow]

2025-01-23 Thread via GitHub


eladkal commented on code in PR #45954:
URL: https://github.com/apache/airflow/pull/45954#discussion_r1927083623


##
providers/src/airflow/providers/salesforce/hooks/salesforce.py:
##
@@ -151,6 +151,7 @@ def conn(self) -> api.Salesforce:
 session=self.session,
 client_id=self._get_field(extras, "client_id") or None,
 consumer_key=self._get_field(extras, "consumer_key") or None,
+consumer_secret=self._get_field(extras, "consumer_secret") or None,

Review Comment:
   You'll need also to update docs 
   
https://github.com/apache/airflow/blob/5c0e98cc770b4f055dbd1c0b60ccbd69f3166da7/docs/apache-airflow-providers-salesforce/connections/salesforce.rst#L46
   
   and `get_connection_form_widgets` function.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Update salesforce.py - Salesforce: Implement fetching consumer_secret from Airflow Connections [airflow]

2025-01-23 Thread via GitHub


kanagaraj-dhanapal-89 commented on code in PR #45954:
URL: https://github.com/apache/airflow/pull/45954#discussion_r1927076325


##
providers/src/airflow/providers/salesforce/hooks/salesforce.py:
##
@@ -151,6 +151,7 @@ def conn(self) -> api.Salesforce:
 session=self.session,
 client_id=self._get_field(extras, "client_id") or None,
 consumer_key=self._get_field(extras, "consumer_key") or None,
+consumer_secret=self._get_field(extras, "consumer_secret") or None,

Review Comment:
   We have a custom Salesforce API Fetch Operator that fetches Salesforce 
objects using SQL queries. To use this operator, we need to pass the consumer 
key from the Airflow Connections extra
   
   `class SalesforceApiDataExtractionOperator(BaseOperator):
   
   def __init__(self,
task_id: str,
query: str,
upload_path:str,
gcp_conn_id: str,
salesforce_conn_id: str = "salesforce_conn_id",
include_deleted: bool = False,
query_params: dict | None = None,
**kwargs
   
   ):
   self.upload_path = upload_path
   self.gcp_conn_id = gcp_conn_id
   self.task_id = task_id
   self.salesforce_conn_id = salesforce_conn_id
   self.include_deleted = include_deleted
   self.query_params = query_params
   self.query = query
   
   super().__init__(task_id=task_id,**kwargs)
   
   def __get_local_storage_location(self):
   _base_loc = os.path.join(os.getenv("HOME"), self.task_id)
   if not os.path.exists(_base_loc):
   os.mkdir(_base_loc)
   return _base_loc
   
   def __clean_up(self, df):
   self.log.info("started data clean up")
   df = df.replace("~", " ").replace("\n", " ").replace("\r", "")
   self.log.info("data clean up completed")
   return df
   def __write_local_file(self, df:DataFrame):
   
   file_name = os.path.basename(self.upload_path)
   local_folder_path = self.__get_local_storage_location().rstrip('/')
   local_file_path = f"{local_folder_path}/{file_name}"
   df.to_csv(local_file_path, header=False, sep="~")
   
   return local_file_path
   
   def __move_to_gcs(self, file_path):
   hook = GCSHook(gcp_conn_id=self.gcp_conn_id)
   bucket_name = self.upload_path.replace("gs://", "").split("/")[0]
   self.log.info(f"bucket: {bucket_name}")
   gcs_file_path = "/".join(self.upload_path.replace("gs://", 
"").split("/")[1:])
   self.log.info(f"upload file: {gcs_file_path}")
   hook.upload(
   bucket_name=bucket_name,
   object_name=gcs_file_path,
   filename=file_path
   )
   
   def execute(self, context: Context):
   if context["params"].get("sf_filter", None):
   query = self.query + " " + context["params"]["sf_filter"]
   else:
   query = self.query
   sf_hook = SalesforceHook(salesforce_conn_id=self.salesforce_conn_id)
   results = sf_hook.make_query(
   query=query,
   include_deleted=self.include_deleted,
   query_params=self.query_params
   )
   df = sf_hook.object_to_df(results["records"])
   df = self.__clean_up(df)
   file_path = self.__write_local_file(df)
   self.__move_to_gcs(file_path)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Update salesforce.py - Salesforce: Implement fetching consumer_secret from Airflow Connections [airflow]

2025-01-23 Thread via GitHub


kanagaraj-dhanapal-89 commented on code in PR #45954:
URL: https://github.com/apache/airflow/pull/45954#discussion_r1927076325


##
providers/src/airflow/providers/salesforce/hooks/salesforce.py:
##
@@ -151,6 +151,7 @@ def conn(self) -> api.Salesforce:
 session=self.session,
 client_id=self._get_field(extras, "client_id") or None,
 consumer_key=self._get_field(extras, "consumer_key") or None,
+consumer_secret=self._get_field(extras, "consumer_secret") or None,

Review Comment:
   We have a custom Salesforce API Fetch Operator that fetches Salesforce 
objects using SQL queries. To use this operator, we need to pass the consumer 
key from the Airflow Connections extra
   
   `class SalesforceApiDataExtractionOperator(BaseOperator):
   
   def __init__(self,
task_id: str,
query: str,
upload_path:str,
gcp_conn_id: str,
salesforce_conn_id: str = "salesforce_conn_id",
include_deleted: bool = False,
query_params: dict | None = None,
**kwargs
   
   ):
   self.upload_path = upload_path
   self.gcp_conn_id = gcp_conn_id
   self.task_id = task_id
   self.salesforce_conn_id = salesforce_conn_id
   self.include_deleted = include_deleted
   self.query_params = query_params
   self.query = query
   
   super().__init__(task_id=task_id,**kwargs)
   
   def __get_local_storage_location(self):
   _base_loc = os.path.join(os.getenv("HOME"), self.task_id)
   if not os.path.exists(_base_loc):
   os.mkdir(_base_loc)
   return _base_loc
   
   def __clean_up(self, df):
   self.log.info("started data clean up")
   df = df.replace("~", " ").replace("\n", " ").replace("\r", "")
   self.log.info("data clean up completed")
   return df
   def __write_local_file(self, df:DataFrame):
   
   file_name = os.path.basename(self.upload_path)
   local_folder_path = self.__get_local_storage_location().rstrip('/')
   local_file_path = f"{local_folder_path}/{file_name}"
   df.to_csv(local_file_path, header=False, sep="~")
   
   return local_file_path
   
   def __move_to_gcs(self, file_path):
   hook = GCSHook(gcp_conn_id=self.gcp_conn_id)
   bucket_name = self.upload_path.replace("gs://", "").split("/")[0]
   self.log.info(f"bucket: {bucket_name}")
   gcs_file_path = "/".join(self.upload_path.replace("gs://", 
"").split("/")[1:])
   self.log.info(f"upload file: {gcs_file_path}")
   hook.upload(
   bucket_name=bucket_name,
   object_name=gcs_file_path,
   filename=file_path
   )
   
   def execute(self, context: Context):
   if context["params"].get("sf_filter", None):
   query = self.query + " " + context["params"]["sf_filter"]
   else:
   query = self.query
   sf_hook = 
WalmartSalesforceHook(salesforce_conn_id=self.salesforce_conn_id)
   results = sf_hook.make_query(
   query=query,
   include_deleted=self.include_deleted,
   query_params=self.query_params
   )
   df = sf_hook.object_to_df(results["records"])
   df = self.__clean_up(df)
   file_path = self.__write_local_file(df)
   self.__move_to_gcs(file_path)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Update salesforce.py - Salesforce: Implement fetching consumer_secret from Airflow Connections [airflow]

2025-01-23 Thread via GitHub


kanagaraj-dhanapal-89 commented on code in PR #45954:
URL: https://github.com/apache/airflow/pull/45954#discussion_r1927076325


##
providers/src/airflow/providers/salesforce/hooks/salesforce.py:
##
@@ -151,6 +151,7 @@ def conn(self) -> api.Salesforce:
 session=self.session,
 client_id=self._get_field(extras, "client_id") or None,
 consumer_key=self._get_field(extras, "consumer_key") or None,
+consumer_secret=self._get_field(extras, "consumer_secret") or None,

Review Comment:
   We have a custom Salesforce API Fetch Operator that fetches Salesforce 
objects using SQL queries. To use this operator, you need to pass the consumer 
key from the Airflow Connections extra
   
   `class SalesforceApiDataExtractionOperator(BaseOperator):
   
   def __init__(self,
task_id: str,
query: str,
upload_path:str,
gcp_conn_id: str,
salesforce_conn_id: str = "salesforce_conn_id",
include_deleted: bool = False,
query_params: dict | None = None,
**kwargs
   
   ):
   self.upload_path = upload_path
   self.gcp_conn_id = gcp_conn_id
   self.task_id = task_id
   self.salesforce_conn_id = salesforce_conn_id
   self.include_deleted = include_deleted
   self.query_params = query_params
   self.query = query
   
   super().__init__(task_id=task_id,**kwargs)
   
   def __get_local_storage_location(self):
   _base_loc = os.path.join(os.getenv("HOME"), self.task_id)
   if not os.path.exists(_base_loc):
   os.mkdir(_base_loc)
   return _base_loc
   
   def __clean_up(self, df):
   self.log.info("started data clean up")
   df = df.replace("~", " ").replace("\n", " ").replace("\r", "")
   self.log.info("data clean up completed")
   return df
   def __write_local_file(self, df:DataFrame):
   
   file_name = os.path.basename(self.upload_path)
   local_folder_path = self.__get_local_storage_location().rstrip('/')
   local_file_path = f"{local_folder_path}/{file_name}"
   df.to_csv(local_file_path, header=False, sep="~")
   
   return local_file_path
   
   def __move_to_gcs(self, file_path):
   hook = GCSHook(gcp_conn_id=self.gcp_conn_id)
   bucket_name = self.upload_path.replace("gs://", "").split("/")[0]
   self.log.info(f"bucket: {bucket_name}")
   gcs_file_path = "/".join(self.upload_path.replace("gs://", 
"").split("/")[1:])
   self.log.info(f"upload file: {gcs_file_path}")
   hook.upload(
   bucket_name=bucket_name,
   object_name=gcs_file_path,
   filename=file_path
   )
   
   def execute(self, context: Context):
   if context["params"].get("sf_filter", None):
   query = self.query + " " + context["params"]["sf_filter"]
   else:
   query = self.query
   sf_hook = 
WalmartSalesforceHook(salesforce_conn_id=self.salesforce_conn_id)
   results = sf_hook.make_query(
   query=query,
   include_deleted=self.include_deleted,
   query_params=self.query_params
   )
   df = sf_hook.object_to_df(results["records"])
   df = self.__clean_up(df)
   file_path = self.__write_local_file(df)
   self.__move_to_gcs(file_path)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org