Hi Gourav,

Do you have any examples or links, please? That would help me to understand.

Thanks,
Sid

On Mon, Jun 13, 2022 at 1:42 PM Gourav Sengupta <gourav.sengu...@gmail.com>
wrote:

> Hi,
> I think that serialising data using spark is an overkill, why not use
> normal python.
>
> Also have you tried repartition by range, that way you can use modulus
> operator to batch things up?
>
> Regards,
> Gourav
>
>
> On Mon, Jun 13, 2022 at 8:37 AM Sid <flinkbyhe...@gmail.com> wrote:
>
>> Hi Team,
>>
>> I am trying to hit the POST APIs for the very first time using Pyspark.
>>
>> My end goal is to achieve is something like the below:
>>
>>
>>    1.  Generate the data
>>    2. Send the data in the batch of 4k records in one batch since the
>>    API can accept the 4k records at once.
>>    3. The record would be as the below:
>>    4.
>>
>>    {
>>        "Token": "",
>>        "CustomerName": "",
>>        "Object": "",
>>        "Data": [{"A":"1"},{"A":"2"}]
>>    }
>>
>>    5. Token will be generated first then it would be passed to the
>>    'Token' key in the data.
>>
>> For the above goal, I initially wrote something like the below which
>> gives a heap error because the data frame is getting created on the driver
>> side, and the size of the records is a minimum of 1M.
>>            df = modifiedData # Assume it to be query results stored as a
>> DF
>>
>>             df = df.withColumn("uniqueID", lit("1"))
>>
>>             df = df.withColumn("row_num", row_number().over(
>>
>> Window.partitionBy(col("uniqueID")).orderBy(col("uniqueID"))
>>             ))
>>             tokenUrl = ""
>>             # tokenUrl = ""
>>             policyUrl = ""
>>             tokenBody = {"Username": "", "Password": "", "CustomerName":
>> ""}
>>
>>             def get_token(url, payload):
>>                 try:
>>                     print("Getting Token")
>>                     response = requests.request("POST", url, data=payload)
>>                     data = response.json()
>>                     if data['ErrorDescription'] == 'Success':
>>                         token = data['Token']
>>                         print(":::Token Generated::::")
>>                     else:
>>                         print('TokenNotGeneratedFrom: ')
>>                         # raise TokenNotGeneratedFrom(500, 'Token not
>> Generated from ')
>>                     return token
>>                 except Exception as e:
>>                     print('TokenNotGeneratedFrom: ' + str(e))
>>                     # raise TokenNotGeneratedFrom(500, str(e))
>>
>>             def call_to_cust_bulk_api(url, payload):
>>                 print("Calling Bulk API")
>>                 try:
>>                     # TODO: write code...
>>                     headers = {'content-type': 'application/json'}
>>                     print(":::::::jsn load::::")
>>                     # print(json.dumps(payload))
>>                     # print(payload)
>>                     response = requests.post(url,
>> data=json.dumps(payload), headers=headers)
>>                     # print(json.dumps(payload))
>>                     data = response.json()
>>                     return data
>>                 except Exception as e:
>>                     print('ExceptionInPushingDataTo: ' + str(e))
>>                     # raise ExceptionInPushingDataTo(500, str(e))
>>
>>             total_count = df.count()
>>             i = 1
>>             while i < total_count:
>>                 rangeNum = i + 3999
>>                 print("Range Num:::")
>>                 print(rangeNum)
>>                 df1 = df.filter((col("row_num") >= i) & (col("row_num")
>> <= rangeNum))
>>                 df1.cache()
>>                 maxValue =
>> df1.select(max(col("row_num")).alias("max_val")).toPandas()['max_val'].iloc[0]
>>                 finalDF = df1.drop("row_num", "edl_timestamp", "uniqueID")
>>                 print("finalDF count:::", finalDF.count())
>>                 token = get_token(tokenUrl, tokenBody)
>>
>>                 result =
>> json.loads((finalDF.toPandas().to_json(orient="records")))
>>                 # token = get_token(tokenUrl, tokenBody)
>>                 custRequestBody = {
>>                     "Token": token,
>>                     "CustomerName": "",
>>                     "Object": "",
>>                     "Data": result
>>                 }
>>
>>                 # print("::::Customer Request Body::::::")
>>                 # print(json.dumps(custRequestBody))
>>                 response = call_to_cust_bulk_api(policyUrl,
>> custRequestBody)
>>                 print(response)
>>                 finalDFStatus = finalDF.withColumn("edl_timestamp",
>> to_timestamp(lit(F.TimeNow()))).withColumn(
>>                     "status_for_each_batch",
>>                     lit(str(response)))
>>
>>
>>                 print("Max Value:::")
>>                 print(maxValue)
>>                 print("Next I:::")
>>                 i = rangeNum + 1
>>                 print(i)
>>
>> This is my very first approach to hitting the APIs with Spark. So, could
>> anyone please help me to redesign the approach, or can share some links or
>> references using which I can go to the depth of this and rectify myself.
>> How can I scale this?
>>
>>
>> Any help is much appreciated.
>>
>> TIA,
>> Sid
>>
>

Reply via email to