Hi Gourav, Could you please provide me with some examples?
On Mon, Jun 13, 2022 at 2:23 PM Gourav Sengupta <gourav.sengu...@gmail.com> wrote: > Hi, > > try to use mod of a monotonically increasing field and then use > repartitionbyrange function, and see whether SPARK automatically serialises > it based on the number of executors that you put in the job. > > But once again, this is kind of an overkill, for fetching data from a API, > creating a simple python program works quite well. > > > Regards, > Gourav > > On Mon, Jun 13, 2022 at 9:28 AM Sid <flinkbyhe...@gmail.com> wrote: > >> Hi Gourav, >> >> Do you have any examples or links, please? That would help me to >> understand. >> >> Thanks, >> Sid >> >> On Mon, Jun 13, 2022 at 1:42 PM Gourav Sengupta < >> gourav.sengu...@gmail.com> wrote: >> >>> Hi, >>> I think that serialising data using spark is an overkill, why not use >>> normal python. >>> >>> Also have you tried repartition by range, that way you can use modulus >>> operator to batch things up? >>> >>> Regards, >>> Gourav >>> >>> >>> On Mon, Jun 13, 2022 at 8:37 AM Sid <flinkbyhe...@gmail.com> wrote: >>> >>>> Hi Team, >>>> >>>> I am trying to hit the POST APIs for the very first time using Pyspark. >>>> >>>> My end goal is to achieve is something like the below: >>>> >>>> >>>> 1. Generate the data >>>> 2. Send the data in the batch of 4k records in one batch since the >>>> API can accept the 4k records at once. >>>> 3. The record would be as the below: >>>> 4. >>>> >>>> { >>>> "Token": "", >>>> "CustomerName": "", >>>> "Object": "", >>>> "Data": [{"A":"1"},{"A":"2"}] >>>> } >>>> >>>> 5. Token will be generated first then it would be passed to the >>>> 'Token' key in the data. >>>> >>>> For the above goal, I initially wrote something like the below which >>>> gives a heap error because the data frame is getting created on the driver >>>> side, and the size of the records is a minimum of 1M. >>>> df = modifiedData # Assume it to be query results stored as >>>> a DF >>>> >>>> df = df.withColumn("uniqueID", lit("1")) >>>> >>>> df = df.withColumn("row_num", row_number().over( >>>> >>>> Window.partitionBy(col("uniqueID")).orderBy(col("uniqueID")) >>>> )) >>>> tokenUrl = "" >>>> # tokenUrl = "" >>>> policyUrl = "" >>>> tokenBody = {"Username": "", "Password": "", >>>> "CustomerName": ""} >>>> >>>> def get_token(url, payload): >>>> try: >>>> print("Getting Token") >>>> response = requests.request("POST", url, >>>> data=payload) >>>> data = response.json() >>>> if data['ErrorDescription'] == 'Success': >>>> token = data['Token'] >>>> print(":::Token Generated::::") >>>> else: >>>> print('TokenNotGeneratedFrom: ') >>>> # raise TokenNotGeneratedFrom(500, 'Token not >>>> Generated from ') >>>> return token >>>> except Exception as e: >>>> print('TokenNotGeneratedFrom: ' + str(e)) >>>> # raise TokenNotGeneratedFrom(500, str(e)) >>>> >>>> def call_to_cust_bulk_api(url, payload): >>>> print("Calling Bulk API") >>>> try: >>>> # TODO: write code... >>>> headers = {'content-type': 'application/json'} >>>> print(":::::::jsn load::::") >>>> # print(json.dumps(payload)) >>>> # print(payload) >>>> response = requests.post(url, >>>> data=json.dumps(payload), headers=headers) >>>> # print(json.dumps(payload)) >>>> data = response.json() >>>> return data >>>> except Exception as e: >>>> print('ExceptionInPushingDataTo: ' + str(e)) >>>> # raise ExceptionInPushingDataTo(500, str(e)) >>>> >>>> total_count = df.count() >>>> i = 1 >>>> while i < total_count: >>>> rangeNum = i + 3999 >>>> print("Range Num:::") >>>> print(rangeNum) >>>> df1 = df.filter((col("row_num") >= i) & (col("row_num") >>>> <= rangeNum)) >>>> df1.cache() >>>> maxValue = >>>> df1.select(max(col("row_num")).alias("max_val")).toPandas()['max_val'].iloc[0] >>>> finalDF = df1.drop("row_num", "edl_timestamp", >>>> "uniqueID") >>>> print("finalDF count:::", finalDF.count()) >>>> token = get_token(tokenUrl, tokenBody) >>>> >>>> result = >>>> json.loads((finalDF.toPandas().to_json(orient="records"))) >>>> # token = get_token(tokenUrl, tokenBody) >>>> custRequestBody = { >>>> "Token": token, >>>> "CustomerName": "", >>>> "Object": "", >>>> "Data": result >>>> } >>>> >>>> # print("::::Customer Request Body::::::") >>>> # print(json.dumps(custRequestBody)) >>>> response = call_to_cust_bulk_api(policyUrl, >>>> custRequestBody) >>>> print(response) >>>> finalDFStatus = finalDF.withColumn("edl_timestamp", >>>> to_timestamp(lit(F.TimeNow()))).withColumn( >>>> "status_for_each_batch", >>>> lit(str(response))) >>>> >>>> >>>> print("Max Value:::") >>>> print(maxValue) >>>> print("Next I:::") >>>> i = rangeNum + 1 >>>> print(i) >>>> >>>> This is my very first approach to hitting the APIs with Spark. So, >>>> could anyone please help me to redesign the approach, or can share some >>>> links or references using which I can go to the depth of this and rectify >>>> myself. How can I scale this? >>>> >>>> >>>> Any help is much appreciated. >>>> >>>> TIA, >>>> Sid >>>> >>>