Hi Team,

I am trying to hit the POST APIs for the very first time using Pyspark.

My end goal is to achieve is something like the below:


   1.  Generate the data
   2. Send the data in the batch of 4k records in one batch since the API
   can accept the 4k records at once.
   3. The record would be as the below:
   4.

   {
       "Token": "",
       "CustomerName": "",
       "Object": "",
       "Data": [{"A":"1"},{"A":"2"}]
   }

   5. Token will be generated first then it would be passed to the 'Token'
   key in the data.

For the above goal, I initially wrote something like the below which gives
a heap error because the data frame is getting created on the driver side,
and the size of the records is a minimum of 1M.
           df = modifiedData # Assume it to be query results stored as a DF

            df = df.withColumn("uniqueID", lit("1"))

            df = df.withColumn("row_num", row_number().over(
                Window.partitionBy(col("uniqueID")).orderBy(col("uniqueID"))
            ))
            tokenUrl = ""
            # tokenUrl = ""
            policyUrl = ""
            tokenBody = {"Username": "", "Password": "", "CustomerName": ""}

            def get_token(url, payload):
                try:
                    print("Getting Token")
                    response = requests.request("POST", url, data=payload)
                    data = response.json()
                    if data['ErrorDescription'] == 'Success':
                        token = data['Token']
                        print(":::Token Generated::::")
                    else:
                        print('TokenNotGeneratedFrom: ')
                        # raise TokenNotGeneratedFrom(500, 'Token not
Generated from ')
                    return token
                except Exception as e:
                    print('TokenNotGeneratedFrom: ' + str(e))
                    # raise TokenNotGeneratedFrom(500, str(e))

            def call_to_cust_bulk_api(url, payload):
                print("Calling Bulk API")
                try:
                    # TODO: write code...
                    headers = {'content-type': 'application/json'}
                    print(":::::::jsn load::::")
                    # print(json.dumps(payload))
                    # print(payload)
                    response = requests.post(url, data=json.dumps(payload),
headers=headers)
                    # print(json.dumps(payload))
                    data = response.json()
                    return data
                except Exception as e:
                    print('ExceptionInPushingDataTo: ' + str(e))
                    # raise ExceptionInPushingDataTo(500, str(e))

            total_count = df.count()
            i = 1
            while i < total_count:
                rangeNum = i + 3999
                print("Range Num:::")
                print(rangeNum)
                df1 = df.filter((col("row_num") >= i) & (col("row_num") <=
rangeNum))
                df1.cache()
                maxValue =
df1.select(max(col("row_num")).alias("max_val")).toPandas()['max_val'].iloc[0]
                finalDF = df1.drop("row_num", "edl_timestamp", "uniqueID")
                print("finalDF count:::", finalDF.count())
                token = get_token(tokenUrl, tokenBody)

                result =
json.loads((finalDF.toPandas().to_json(orient="records")))
                # token = get_token(tokenUrl, tokenBody)
                custRequestBody = {
                    "Token": token,
                    "CustomerName": "",
                    "Object": "",
                    "Data": result
                }

                # print("::::Customer Request Body::::::")
                # print(json.dumps(custRequestBody))
                response = call_to_cust_bulk_api(policyUrl, custRequestBody)
                print(response)
                finalDFStatus = finalDF.withColumn("edl_timestamp",
to_timestamp(lit(F.TimeNow()))).withColumn(
                    "status_for_each_batch",
                    lit(str(response)))


                print("Max Value:::")
                print(maxValue)
                print("Next I:::")
                i = rangeNum + 1
                print(i)

This is my very first approach to hitting the APIs with Spark. So, could
anyone please help me to redesign the approach, or can share some links or
references using which I can go to the depth of this and rectify myself.
How can I scale this?


Any help is much appreciated.

TIA,
Sid

Reply via email to