Hi,
I think that serialising data using spark is an overkill, why not use
normal python.

Also have you tried repartition by range, that way you can use modulus
operator to batch things up?

Regards,
Gourav


On Mon, Jun 13, 2022 at 8:37 AM Sid <flinkbyhe...@gmail.com> wrote:

> Hi Team,
>
> I am trying to hit the POST APIs for the very first time using Pyspark.
>
> My end goal is to achieve is something like the below:
>
>
>    1.  Generate the data
>    2. Send the data in the batch of 4k records in one batch since the API
>    can accept the 4k records at once.
>    3. The record would be as the below:
>    4.
>
>    {
>        "Token": "",
>        "CustomerName": "",
>        "Object": "",
>        "Data": [{"A":"1"},{"A":"2"}]
>    }
>
>    5. Token will be generated first then it would be passed to the
>    'Token' key in the data.
>
> For the above goal, I initially wrote something like the below which gives
> a heap error because the data frame is getting created on the driver side,
> and the size of the records is a minimum of 1M.
>            df = modifiedData # Assume it to be query results stored as a DF
>
>             df = df.withColumn("uniqueID", lit("1"))
>
>             df = df.withColumn("row_num", row_number().over(
>
> Window.partitionBy(col("uniqueID")).orderBy(col("uniqueID"))
>             ))
>             tokenUrl = ""
>             # tokenUrl = ""
>             policyUrl = ""
>             tokenBody = {"Username": "", "Password": "", "CustomerName":
> ""}
>
>             def get_token(url, payload):
>                 try:
>                     print("Getting Token")
>                     response = requests.request("POST", url, data=payload)
>                     data = response.json()
>                     if data['ErrorDescription'] == 'Success':
>                         token = data['Token']
>                         print(":::Token Generated::::")
>                     else:
>                         print('TokenNotGeneratedFrom: ')
>                         # raise TokenNotGeneratedFrom(500, 'Token not
> Generated from ')
>                     return token
>                 except Exception as e:
>                     print('TokenNotGeneratedFrom: ' + str(e))
>                     # raise TokenNotGeneratedFrom(500, str(e))
>
>             def call_to_cust_bulk_api(url, payload):
>                 print("Calling Bulk API")
>                 try:
>                     # TODO: write code...
>                     headers = {'content-type': 'application/json'}
>                     print(":::::::jsn load::::")
>                     # print(json.dumps(payload))
>                     # print(payload)
>                     response = requests.post(url,
> data=json.dumps(payload), headers=headers)
>                     # print(json.dumps(payload))
>                     data = response.json()
>                     return data
>                 except Exception as e:
>                     print('ExceptionInPushingDataTo: ' + str(e))
>                     # raise ExceptionInPushingDataTo(500, str(e))
>
>             total_count = df.count()
>             i = 1
>             while i < total_count:
>                 rangeNum = i + 3999
>                 print("Range Num:::")
>                 print(rangeNum)
>                 df1 = df.filter((col("row_num") >= i) & (col("row_num") <=
> rangeNum))
>                 df1.cache()
>                 maxValue =
> df1.select(max(col("row_num")).alias("max_val")).toPandas()['max_val'].iloc[0]
>                 finalDF = df1.drop("row_num", "edl_timestamp", "uniqueID")
>                 print("finalDF count:::", finalDF.count())
>                 token = get_token(tokenUrl, tokenBody)
>
>                 result =
> json.loads((finalDF.toPandas().to_json(orient="records")))
>                 # token = get_token(tokenUrl, tokenBody)
>                 custRequestBody = {
>                     "Token": token,
>                     "CustomerName": "",
>                     "Object": "",
>                     "Data": result
>                 }
>
>                 # print("::::Customer Request Body::::::")
>                 # print(json.dumps(custRequestBody))
>                 response = call_to_cust_bulk_api(policyUrl,
> custRequestBody)
>                 print(response)
>                 finalDFStatus = finalDF.withColumn("edl_timestamp",
> to_timestamp(lit(F.TimeNow()))).withColumn(
>                     "status_for_each_batch",
>                     lit(str(response)))
>
>
>                 print("Max Value:::")
>                 print(maxValue)
>                 print("Next I:::")
>                 i = rangeNum + 1
>                 print(i)
>
> This is my very first approach to hitting the APIs with Spark. So, could
> anyone please help me to redesign the approach, or can share some links or
> references using which I can go to the depth of this and rectify myself.
> How can I scale this?
>
>
> Any help is much appreciated.
>
> TIA,
> Sid
>

Reply via email to