Hi all, I am trying to implement a custom Spark Datasource for Salesforce by implementing the Spark Datasource V2 interfaces. For querying Salesforce data parallelly I am planning to use the Salesforce Bulk V1 API https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/api_asynch_introduction_bulk_api.htm .
I am aware that there is an existing Salesforce Spark library by Springml, but it doesn't support the authentication types I need and isn't compatible with Spark 3.x. I am not using the Bulk V2 Salesforce API as it is serial in nature. You submit a query, Salesforce automatically creates the batches, you then need to poll for results and iterate over the batches using batch locators returned in the header. I am planning to make it work like this - 1. The user specifies the options numPartitions and dbtable. Using this, internally I will fetch the record counts for that Salesforce object and deduce the chunk size to be used for querying the data using PK chunking. https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/asynch_api_code_curl_walkthrough_pk_chunking.htm 2. The Bulk API is asynchronous. You need to create a Job and then create query batches within the job. If you specify that you want to use PK Chunking along with the chunk size you want then Salesforce automatically creates the batches for you. You then need to poll for and fetch the results for each batch using a batch-specific URL. 3. I am planning to pass the batch ID to each executor using an InputPartition object. Each executor will then poll for and fetch the results. I am having trouble deciding how I would go about creating the Bulk job and submitting the batches on the driver node before dispatching the batch ids to the executors. I tried doing this in the implementation of the planInputPartitions method for the Batch interface, but it turns out that it is called 2-3 times per each action(show, collect etc.), thus creating unnecessary Bulk jobs. One potential solution that might work is maintaining a set of hashed user options in the static scope for the implementation of the Batch interface (using a companion object) and only creating the job if it doesn't exist in the set. However, I find this solution to be very clumsy. Also, what happens if a user submits multiple actions on a dataframe. I could maybe also have a TTL for the set entries, but you see how it gets complicated. Would really appreciate any pointers on the ideal way to achieve what I want. Regards, Rohit Pant