Hi, in the spirit of not fitting the solution to the problem, would it not be better to first create a producer for your job and use a broker like Kafka or Kinesis or Pulsar?
Regards, Gourav Sengupta On Sat, May 21, 2022 at 3:46 PM Rohit Pant <rpant1...@gmail.com> wrote: > Hi all, > > I am trying to implement a custom Spark Datasource for Salesforce by > implementing the Spark Datasource V2 interfaces. For querying Salesforce > data parallelly I am planning to use the Salesforce Bulk V1 API > https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/api_asynch_introduction_bulk_api.htm > . > > I am aware that there is an existing Salesforce Spark library by Springml, > but it doesn't support the authentication types I need and isn't compatible > with Spark 3.x. > > I am not using the Bulk V2 Salesforce API as it is serial in nature. You > submit a query, Salesforce automatically creates the batches, you then need > to poll for results and iterate over the batches using batch locators > returned in the header. > > I am planning to make it work like this - > > > 1. The user specifies the options numPartitions and dbtable. Using > this, internally I will fetch the record counts for that Salesforce object > and deduce the chunk size to be used for querying the data using PK > chunking. > > https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/asynch_api_code_curl_walkthrough_pk_chunking.htm > 2. The Bulk API is asynchronous. You need to create a Job and then > create query batches within the job. If you specify that you want to use PK > Chunking along with the chunk size you want then Salesforce automatically > creates the batches for you. You then need to poll for and fetch the > results for each batch using a batch-specific URL. > 3. I am planning to pass the batch ID to each executor using an > InputPartition object. Each executor will then poll for and fetch the > results. > > I am having trouble deciding how I would go about creating the Bulk job > and submitting the batches on the driver node before dispatching the batch > ids to the executors. I tried doing this in the implementation of the > planInputPartitions method for the Batch interface, but it turns out that > it is called 2-3 times per each action(show, collect etc.), thus creating > unnecessary Bulk jobs. > > One potential solution that might work is maintaining a set of hashed user > options in the static scope for the implementation of the Batch interface > (using a companion object) and only creating the job if it doesn't exist in > the set. However, I find this solution to be very clumsy. Also, what > happens if a user submits multiple actions on a dataframe. I could maybe > also have a TTL for the set entries, but you see how it gets complicated. > > Would really appreciate any pointers on the ideal way to achieve what I > want. > > Regards, > > Rohit Pant >