Hi Gourav,

Could you please provide me with some examples?

On Mon, Jun 13, 2022 at 2:23 PM

> Hi,
> try to use mod of a monotonically increasing field and then use
> repartitionbyrange function, and see whether SPARK automatically serialises
> it based on the number of executors that you put in the job.
> But once again, this is kind of an overkill, for fetching data from a API,
> creating a simple python program works quite well.
> Regards,
> Gourav
On Mon, Jun 13, 2022 at 9:28 AM
>> Hi Gourav,
>> Do you have any examples or links, please? That would help me to
>> understand.
>> Thanks,
>> Sid
On Mon, Jun 13, 2022 at 1:42 PM
>> gourav.sengu...@gmail.com> wrote:
>>> Hi,
>>> I think that serialising data using spark is an overkill, why not use
>>> normal python.
>>> Also have you tried repartition by range, that way you can use modulus
>>> operator to batch things up?
>>> Regards,
>>> Gourav
On Mon, Jun 13, 2022 at 8:37 AM
>>>> Hi Team,
>>>> I am trying to hit the POST APIs for the very first time using Pyspark.
>>>> My end goal is to achieve is something like the below:
>>>>    1.  Generate the data
>>>>    2. Send the data in the batch of 4k records in one batch since the
>>>>    API can accept the 4k records at once.
>>>>    3. The record would be as the below:
>>>>    4.
>>>>    {
>>>>        "Token": "",
>>>>        "CustomerName": "",
>>>>        "Object": "",
>>>>        "Data": [{"A":"1"},{"A":"2"}]
>>>>    }
>>>>    5. Token will be generated first then it would be passed to the
>>>>    'Token' key in the data.
>>>> For the above goal, I initially wrote something like the below which
>>>> gives a heap error because the data frame is getting created on the driver
>>>> side, and the size of the records is a minimum of 1M.
>>>>            df = modifiedData # Assume it to be query results stored as
>>>> a DF
>>>>             df = df.withColumn("uniqueID", lit("1"))
>>>>             df = df.withColumn("row_num", row_number().over(
>>>> Window.partitionBy(col("uniqueID")).orderBy(col("uniqueID"))
>>>>             ))
>>>>             tokenUrl = ""
>>>>             # tokenUrl = ""
>>>>             policyUrl = ""
>>>>             tokenBody = {"Username": "", "Password": "",
>>>> "CustomerName": ""}
>>>>             def get_token(url, payload):
>>>>                 try:
>>>>                     print("Getting Token")
>>>>                     response = requests.request("POST", url,
>>>> data=payload)
>>>>                     data = response.json()
>>>>                     if data['ErrorDescription'] == 'Success':
>>>>                         token = data['Token']
>>>>                         print(":::Token Generated::::")
>>>>                     else:
>>>>                         print('TokenNotGeneratedFrom: ')
>>>>                         # raise TokenNotGeneratedFrom(500, 'Token not
>>>> Generated from ')
>>>>                     return token
>>>>                 except Exception as e:
>>>>                     print('TokenNotGeneratedFrom: ' + str(e))
>>>>                     # raise TokenNotGeneratedFrom(500, str(e))
>>>>             def call_to_cust_bulk_api(url, payload):
>>>>                 print("Calling Bulk API")
>>>>                 try:
>>>>                     # TODO: write code...
>>>>                     headers = {'content-type': 'application/json'}
>>>>                     print(":::::::jsn load::::")
>>>>                     # print(json.dumps(payload))
>>>>                     # print(payload)
>>>>                     response = requests.post(url,
>>>> data=json.dumps(payload), headers=headers)
>>>>                     # print(json.dumps(payload))
>>>>                     data = response.json()
>>>>                     return data
>>>>                 except Exception as e:
>>>>                     print('ExceptionInPushingDataTo: ' + str(e))
>>>>                     # raise ExceptionInPushingDataTo(500, str(e))
>>>>             total_count = df.count()
>>>>             i = 1
>>>>             while i < total_count:
>>>>                 rangeNum = i + 3999
>>>>                 print("Range Num:::")
>>>>                 print(rangeNum)
>>>>                 df1 = df.filter((col("row_num") >= i) & (col("row_num")
>>>> <= rangeNum))
>>>>                 df1.cache()
>>>>                 maxValue =
>>>> df1.select(max(col("row_num")).alias("max_val")).toPandas()['max_val'].iloc[0]
>>>>                 finalDF = df1.drop("row_num", "edl_timestamp",
>>>> "uniqueID")
>>>>                 print("finalDF count:::", finalDF.count())
>>>>                 token = get_token(tokenUrl, tokenBody)
>>>>                 result =
>>>> json.loads((finalDF.toPandas().to_json(orient="records")))
>>>>                 # token = get_token(tokenUrl, tokenBody)
>>>>                 custRequestBody = {
>>>>                     "Token": token,
>>>>                     "CustomerName": "",
>>>>                     "Object": "",
>>>>                     "Data": result
>>>>                 }
>>>>                 # print("::::Customer Request Body::::::")
>>>>                 # print(json.dumps(custRequestBody))
>>>>                 response = call_to_cust_bulk_api(policyUrl,
>>>> custRequestBody)
>>>>                 print(response)
>>>>                 finalDFStatus = finalDF.withColumn("edl_timestamp",
>>>> to_timestamp(lit(F.TimeNow()))).withColumn(
>>>>                     "status_for_each_batch",
>>>>                     lit(str(response)))
>>>>                 print("Max Value:::")
>>>>                 print(maxValue)
>>>>                 print("Next I:::")
>>>>                 i = rangeNum + 1
>>>>                 print(i)
>>>> This is my very first approach to hitting the APIs with Spark. So,
>>>> could anyone please help me to redesign the approach, or can share some
>>>> links or references using which I can go to the depth of this and rectify
>>>> myself. How can I scale this?
>>>> Any help is much appreciated.
>>>> TIA,
>>>> Sid

