Hi,

>> spark.range(10000).createOrReplaceTempView("test")
>> maximum_records_per_api_call = 40
>> batch_count = spark.sql("SELECT * FROM test").count() /
maximum_records_per_api_call
>> spark.sql("SELECT id, mod(monotonically_increasing_id() / batch_count)
batch_id FROM
test).repartitionByRange("batch_id").createOrReplaceTempView("test_batch")


the above code should be able to then be run with a udf as long as we are
able to control the parallelism with the help of executor count and task
cpi configuration.

But once again, this is just an unnecessary overkill.


Regards,
Gourav Sengupta

On Mon, Jun 13, 2022 at 10:41 AM Sid <flinkbyhe...@gmail.com> wrote:

> Hi Gourav,
>
> Could you please provide me with some examples?
>
> On Mon, Jun 13, 2022 at 2:23 PM Gourav Sengupta <gourav.sengu...@gmail.com>
> wrote:
>
>> Hi,
>>
>> try to use mod of a monotonically increasing field and then use
>> repartitionbyrange function, and see whether SPARK automatically serialises
>> it based on the number of executors that you put in the job.
>>
>> But once again, this is kind of an overkill, for fetching data from a
>> API, creating a simple python program works quite well.
>>
>>
>> Regards,
>> Gourav
>>
>> On Mon, Jun 13, 2022 at 9:28 AM Sid <flinkbyhe...@gmail.com> wrote:
>>
>>> Hi Gourav,
>>>
>>> Do you have any examples or links, please? That would help me to
>>> understand.
>>>
>>> Thanks,
>>> Sid
>>>
>>> On Mon, Jun 13, 2022 at 1:42 PM Gourav Sengupta <
>>> gourav.sengu...@gmail.com> wrote:
>>>
>>>> Hi,
>>>> I think that serialising data using spark is an overkill, why not use
>>>> normal python.
>>>>
>>>> Also have you tried repartition by range, that way you can use modulus
>>>> operator to batch things up?
>>>>
>>>> Regards,
>>>> Gourav
>>>>
>>>>
>>>> On Mon, Jun 13, 2022 at 8:37 AM Sid <flinkbyhe...@gmail.com> wrote:
>>>>
>>>>> Hi Team,
>>>>>
>>>>> I am trying to hit the POST APIs for the very first time using Pyspark.
>>>>>
>>>>> My end goal is to achieve is something like the below:
>>>>>
>>>>>
>>>>>    1.  Generate the data
>>>>>    2. Send the data in the batch of 4k records in one batch since the
>>>>>    API can accept the 4k records at once.
>>>>>    3. The record would be as the below:
>>>>>    4.
>>>>>
>>>>>    {
>>>>>        "Token": "",
>>>>>        "CustomerName": "",
>>>>>        "Object": "",
>>>>>        "Data": [{"A":"1"},{"A":"2"}]
>>>>>    }
>>>>>
>>>>>    5. Token will be generated first then it would be passed to the
>>>>>    'Token' key in the data.
>>>>>
>>>>> For the above goal, I initially wrote something like the below which
>>>>> gives a heap error because the data frame is getting created on the driver
>>>>> side, and the size of the records is a minimum of 1M.
>>>>>            df = modifiedData # Assume it to be query results stored as
>>>>> a DF
>>>>>
>>>>>             df = df.withColumn("uniqueID", lit("1"))
>>>>>
>>>>>             df = df.withColumn("row_num", row_number().over(
>>>>>
>>>>> Window.partitionBy(col("uniqueID")).orderBy(col("uniqueID"))
>>>>>             ))
>>>>>             tokenUrl = ""
>>>>>             # tokenUrl = ""
>>>>>             policyUrl = ""
>>>>>             tokenBody = {"Username": "", "Password": "",
>>>>> "CustomerName": ""}
>>>>>
>>>>>             def get_token(url, payload):
>>>>>                 try:
>>>>>                     print("Getting Token")
>>>>>                     response = requests.request("POST", url,
>>>>> data=payload)
>>>>>                     data = response.json()
>>>>>                     if data['ErrorDescription'] == 'Success':
>>>>>                         token = data['Token']
>>>>>                         print(":::Token Generated::::")
>>>>>                     else:
>>>>>                         print('TokenNotGeneratedFrom: ')
>>>>>                         # raise TokenNotGeneratedFrom(500, 'Token not
>>>>> Generated from ')
>>>>>                     return token
>>>>>                 except Exception as e:
>>>>>                     print('TokenNotGeneratedFrom: ' + str(e))
>>>>>                     # raise TokenNotGeneratedFrom(500, str(e))
>>>>>
>>>>>             def call_to_cust_bulk_api(url, payload):
>>>>>                 print("Calling Bulk API")
>>>>>                 try:
>>>>>                     # TODO: write code...
>>>>>                     headers = {'content-type': 'application/json'}
>>>>>                     print(":::::::jsn load::::")
>>>>>                     # print(json.dumps(payload))
>>>>>                     # print(payload)
>>>>>                     response = requests.post(url,
>>>>> data=json.dumps(payload), headers=headers)
>>>>>                     # print(json.dumps(payload))
>>>>>                     data = response.json()
>>>>>                     return data
>>>>>                 except Exception as e:
>>>>>                     print('ExceptionInPushingDataTo: ' + str(e))
>>>>>                     # raise ExceptionInPushingDataTo(500, str(e))
>>>>>
>>>>>             total_count = df.count()
>>>>>             i = 1
>>>>>             while i < total_count:
>>>>>                 rangeNum = i + 3999
>>>>>                 print("Range Num:::")
>>>>>                 print(rangeNum)
>>>>>                 df1 = df.filter((col("row_num") >= i) &
>>>>> (col("row_num") <= rangeNum))
>>>>>                 df1.cache()
>>>>>                 maxValue =
>>>>> df1.select(max(col("row_num")).alias("max_val")).toPandas()['max_val'].iloc[0]
>>>>>                 finalDF = df1.drop("row_num", "edl_timestamp",
>>>>> "uniqueID")
>>>>>                 print("finalDF count:::", finalDF.count())
>>>>>                 token = get_token(tokenUrl, tokenBody)
>>>>>
>>>>>                 result =
>>>>> json.loads((finalDF.toPandas().to_json(orient="records")))
>>>>>                 # token = get_token(tokenUrl, tokenBody)
>>>>>                 custRequestBody = {
>>>>>                     "Token": token,
>>>>>                     "CustomerName": "",
>>>>>                     "Object": "",
>>>>>                     "Data": result
>>>>>                 }
>>>>>
>>>>>                 # print("::::Customer Request Body::::::")
>>>>>                 # print(json.dumps(custRequestBody))
>>>>>                 response = call_to_cust_bulk_api(policyUrl,
>>>>> custRequestBody)
>>>>>                 print(response)
>>>>>                 finalDFStatus = finalDF.withColumn("edl_timestamp",
>>>>> to_timestamp(lit(F.TimeNow()))).withColumn(
>>>>>                     "status_for_each_batch",
>>>>>                     lit(str(response)))
>>>>>
>>>>>
>>>>>                 print("Max Value:::")
>>>>>                 print(maxValue)
>>>>>                 print("Next I:::")
>>>>>                 i = rangeNum + 1
>>>>>                 print(i)
>>>>>
>>>>> This is my very first approach to hitting the APIs with Spark. So,
>>>>> could anyone please help me to redesign the approach, or can share some
>>>>> links or references using which I can go to the depth of this and rectify
>>>>> myself. How can I scale this?
>>>>>
>>>>>
>>>>> Any help is much appreciated.
>>>>>
>>>>> TIA,
>>>>> Sid
>>>>>
>>>>

Reply via email to