Hi all,

I am trying to implement a custom Spark Datasource for Salesforce by
implementing the Spark Datasource V2 interfaces. For querying Salesforce
data parallelly I am planning to use the Salesforce Bulk V1 API
https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/api_asynch_introduction_bulk_api.htm
.

I am aware that there is an existing Salesforce Spark library by Springml,
but it doesn't support the authentication types I need and isn't compatible
with Spark 3.x.

I am not using the Bulk V2 Salesforce API as it is serial in nature. You
submit a query, Salesforce automatically creates the batches, you then need
to poll for results and iterate over the batches using batch locators
returned in the header.

I am planning to make it work like this -


   1. The user specifies the options numPartitions and dbtable. Using this,
   internally I will fetch the record counts for that Salesforce object and
   deduce the chunk size to be used for querying the data using PK chunking.
   
https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/asynch_api_code_curl_walkthrough_pk_chunking.htm
   2. The Bulk API is asynchronous. You need to create a Job and then
   create query batches within the job. If you specify that you want to use PK
   Chunking along with the chunk size you want then Salesforce automatically
   creates the batches for you. You then need to poll for and fetch the
   results for each batch using a batch-specific URL.
   3. I am planning to pass the batch ID to each executor using an
   InputPartition object. Each executor will then poll for and fetch the
   results.

I am having trouble deciding how I would go about creating the Bulk job and
submitting the batches on the driver node before dispatching the batch ids
to the executors. I tried doing this in the implementation of the
planInputPartitions method for the Batch interface, but it turns out that
it is called 2-3 times per each action(show, collect etc.), thus creating
unnecessary Bulk jobs.

One potential solution that might work is maintaining a set of hashed user
options in the static scope for the implementation of the Batch interface
(using a companion object) and only creating the job if it doesn't exist in
the set. However, I find this solution to be very clumsy. Also, what
happens if a user submits multiple actions on a dataframe. I could maybe
also have a TTL for the set entries, but you see how it gets complicated.

Would really appreciate any pointers on the ideal way to achieve what I
want.

Regards,

Rohit Pant

Reply via email to