Hi,

in the spirit of not fitting the solution to the problem, would it not be
better to first create a producer for your job and use a broker like Kafka
or Kinesis or Pulsar?


Regards,
Gourav Sengupta

On Sat, May 21, 2022 at 3:46 PM Rohit Pant <rpant1...@gmail.com> wrote:

> Hi all,
>
> I am trying to implement a custom Spark Datasource for Salesforce by
> implementing the Spark Datasource V2 interfaces. For querying Salesforce
> data parallelly I am planning to use the Salesforce Bulk V1 API
> https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/api_asynch_introduction_bulk_api.htm
> .
>
> I am aware that there is an existing Salesforce Spark library by Springml,
> but it doesn't support the authentication types I need and isn't compatible
> with Spark 3.x.
>
> I am not using the Bulk V2 Salesforce API as it is serial in nature. You
> submit a query, Salesforce automatically creates the batches, you then need
> to poll for results and iterate over the batches using batch locators
> returned in the header.
>
> I am planning to make it work like this -
>
>
>    1. The user specifies the options numPartitions and dbtable. Using
>    this, internally I will fetch the record counts for that Salesforce object
>    and deduce the chunk size to be used for querying the data using PK
>    chunking.
>    
> https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/asynch_api_code_curl_walkthrough_pk_chunking.htm
>    2. The Bulk API is asynchronous. You need to create a Job and then
>    create query batches within the job. If you specify that you want to use PK
>    Chunking along with the chunk size you want then Salesforce automatically
>    creates the batches for you. You then need to poll for and fetch the
>    results for each batch using a batch-specific URL.
>    3. I am planning to pass the batch ID to each executor using an
>    InputPartition object. Each executor will then poll for and fetch the
>    results.
>
> I am having trouble deciding how I would go about creating the Bulk job
> and submitting the batches on the driver node before dispatching the batch
> ids to the executors. I tried doing this in the implementation of the
> planInputPartitions method for the Batch interface, but it turns out that
> it is called 2-3 times per each action(show, collect etc.), thus creating
> unnecessary Bulk jobs.
>
> One potential solution that might work is maintaining a set of hashed user
> options in the static scope for the implementation of the Batch interface
> (using a companion object) and only creating the job if it doesn't exist in
> the set. However, I find this solution to be very clumsy. Also, what
> happens if a user submits multiple actions on a dataframe. I could maybe
> also have a TTL for the set entries, but you see how it gets complicated.
>
> Would really appreciate any pointers on the ideal way to achieve what I
> want.
>
> Regards,
>
> Rohit Pant
>

Reply via email to