Hi Gourav,

Could you please provide me with some examples?

On Mon, Jun 13, 2022 at 2:23 PM Gourav Sengupta <gourav.sengu...@gmail.com>
wrote:

> Hi,
>
> try to use mod of a monotonically increasing field and then use
> repartitionbyrange function, and see whether SPARK automatically serialises
> it based on the number of executors that you put in the job.
>
> But once again, this is kind of an overkill, for fetching data from a API,
> creating a simple python program works quite well.
>
>
> Regards,
> Gourav
>
> On Mon, Jun 13, 2022 at 9:28 AM Sid <flinkbyhe...@gmail.com> wrote:
>
>> Hi Gourav,
>>
>> Do you have any examples or links, please? That would help me to
>> understand.
>>
>> Thanks,
>> Sid
>>
>> On Mon, Jun 13, 2022 at 1:42 PM Gourav Sengupta <
>> gourav.sengu...@gmail.com> wrote:
>>
>>> Hi,
>>> I think that serialising data using spark is an overkill, why not use
>>> normal python.
>>>
>>> Also have you tried repartition by range, that way you can use modulus
>>> operator to batch things up?
>>>
>>> Regards,
>>> Gourav
>>>
>>>
>>> On Mon, Jun 13, 2022 at 8:37 AM Sid <flinkbyhe...@gmail.com> wrote:
>>>
>>>> Hi Team,
>>>>
>>>> I am trying to hit the POST APIs for the very first time using Pyspark.
>>>>
>>>> My end goal is to achieve is something like the below:
>>>>
>>>>
>>>>    1.  Generate the data
>>>>    2. Send the data in the batch of 4k records in one batch since the
>>>>    API can accept the 4k records at once.
>>>>    3. The record would be as the below:
>>>>    4.
>>>>
>>>>    {
>>>>        "Token": "",
>>>>        "CustomerName": "",
>>>>        "Object": "",
>>>>        "Data": [{"A":"1"},{"A":"2"}]
>>>>    }
>>>>
>>>>    5. Token will be generated first then it would be passed to the
>>>>    'Token' key in the data.
>>>>
>>>> For the above goal, I initially wrote something like the below which
>>>> gives a heap error because the data frame is getting created on the driver
>>>> side, and the size of the records is a minimum of 1M.
>>>>            df = modifiedData # Assume it to be query results stored as
>>>> a DF
>>>>
>>>>             df = df.withColumn("uniqueID", lit("1"))
>>>>
>>>>             df = df.withColumn("row_num", row_number().over(
>>>>
>>>> Window.partitionBy(col("uniqueID")).orderBy(col("uniqueID"))
>>>>             ))
>>>>             tokenUrl = ""
>>>>             # tokenUrl = ""
>>>>             policyUrl = ""
>>>>             tokenBody = {"Username": "", "Password": "",
>>>> "CustomerName": ""}
>>>>
>>>>             def get_token(url, payload):
>>>>                 try:
>>>>                     print("Getting Token")
>>>>                     response = requests.request("POST", url,
>>>> data=payload)
>>>>                     data = response.json()
>>>>                     if data['ErrorDescription'] == 'Success':
>>>>                         token = data['Token']
>>>>                         print(":::Token Generated::::")
>>>>                     else:
>>>>                         print('TokenNotGeneratedFrom: ')
>>>>                         # raise TokenNotGeneratedFrom(500, 'Token not
>>>> Generated from ')
>>>>                     return token
>>>>                 except Exception as e:
>>>>                     print('TokenNotGeneratedFrom: ' + str(e))
>>>>                     # raise TokenNotGeneratedFrom(500, str(e))
>>>>
>>>>             def call_to_cust_bulk_api(url, payload):
>>>>                 print("Calling Bulk API")
>>>>                 try:
>>>>                     # TODO: write code...
>>>>                     headers = {'content-type': 'application/json'}
>>>>                     print(":::::::jsn load::::")
>>>>                     # print(json.dumps(payload))
>>>>                     # print(payload)
>>>>                     response = requests.post(url,
>>>> data=json.dumps(payload), headers=headers)
>>>>                     # print(json.dumps(payload))
>>>>                     data = response.json()
>>>>                     return data
>>>>                 except Exception as e:
>>>>                     print('ExceptionInPushingDataTo: ' + str(e))
>>>>                     # raise ExceptionInPushingDataTo(500, str(e))
>>>>
>>>>             total_count = df.count()
>>>>             i = 1
>>>>             while i < total_count:
>>>>                 rangeNum = i + 3999
>>>>                 print("Range Num:::")
>>>>                 print(rangeNum)
>>>>                 df1 = df.filter((col("row_num") >= i) & (col("row_num")
>>>> <= rangeNum))
>>>>                 df1.cache()
>>>>                 maxValue =
>>>> df1.select(max(col("row_num")).alias("max_val")).toPandas()['max_val'].iloc[0]
>>>>                 finalDF = df1.drop("row_num", "edl_timestamp",
>>>> "uniqueID")
>>>>                 print("finalDF count:::", finalDF.count())
>>>>                 token = get_token(tokenUrl, tokenBody)
>>>>
>>>>                 result =
>>>> json.loads((finalDF.toPandas().to_json(orient="records")))
>>>>                 # token = get_token(tokenUrl, tokenBody)
>>>>                 custRequestBody = {
>>>>                     "Token": token,
>>>>                     "CustomerName": "",
>>>>                     "Object": "",
>>>>                     "Data": result
>>>>                 }
>>>>
>>>>                 # print("::::Customer Request Body::::::")
>>>>                 # print(json.dumps(custRequestBody))
>>>>                 response = call_to_cust_bulk_api(policyUrl,
>>>> custRequestBody)
>>>>                 print(response)
>>>>                 finalDFStatus = finalDF.withColumn("edl_timestamp",
>>>> to_timestamp(lit(F.TimeNow()))).withColumn(
>>>>                     "status_for_each_batch",
>>>>                     lit(str(response)))
>>>>
>>>>
>>>>                 print("Max Value:::")
>>>>                 print(maxValue)
>>>>                 print("Next I:::")
>>>>                 i = rangeNum + 1
>>>>                 print(i)
>>>>
>>>> This is my very first approach to hitting the APIs with Spark. So,
>>>> could anyone please help me to redesign the approach, or can share some
>>>> links or references using which I can go to the depth of this and rectify
>>>> myself. How can I scale this?
>>>>
>>>>
>>>> Any help is much appreciated.
>>>>
>>>> TIA,
>>>> Sid
>>>>
>>>

Reply via email to