Hi Gourav, Do you have any examples or links, please? That would help me to understand.
Thanks, Sid On Mon, Jun 13, 2022 at 1:42 PM Gourav Sengupta <gourav.sengu...@gmail.com> wrote: > Hi, > I think that serialising data using spark is an overkill, why not use > normal python. > > Also have you tried repartition by range, that way you can use modulus > operator to batch things up? > > Regards, > Gourav > > > On Mon, Jun 13, 2022 at 8:37 AM Sid <flinkbyhe...@gmail.com> wrote: > >> Hi Team, >> >> I am trying to hit the POST APIs for the very first time using Pyspark. >> >> My end goal is to achieve is something like the below: >> >> >> 1. Generate the data >> 2. Send the data in the batch of 4k records in one batch since the >> API can accept the 4k records at once. >> 3. The record would be as the below: >> 4. >> >> { >> "Token": "", >> "CustomerName": "", >> "Object": "", >> "Data": [{"A":"1"},{"A":"2"}] >> } >> >> 5. Token will be generated first then it would be passed to the >> 'Token' key in the data. >> >> For the above goal, I initially wrote something like the below which >> gives a heap error because the data frame is getting created on the driver >> side, and the size of the records is a minimum of 1M. >> df = modifiedData # Assume it to be query results stored as a >> DF >> >> df = df.withColumn("uniqueID", lit("1")) >> >> df = df.withColumn("row_num", row_number().over( >> >> Window.partitionBy(col("uniqueID")).orderBy(col("uniqueID")) >> )) >> tokenUrl = "" >> # tokenUrl = "" >> policyUrl = "" >> tokenBody = {"Username": "", "Password": "", "CustomerName": >> ""} >> >> def get_token(url, payload): >> try: >> print("Getting Token") >> response = requests.request("POST", url, data=payload) >> data = response.json() >> if data['ErrorDescription'] == 'Success': >> token = data['Token'] >> print(":::Token Generated::::") >> else: >> print('TokenNotGeneratedFrom: ') >> # raise TokenNotGeneratedFrom(500, 'Token not >> Generated from ') >> return token >> except Exception as e: >> print('TokenNotGeneratedFrom: ' + str(e)) >> # raise TokenNotGeneratedFrom(500, str(e)) >> >> def call_to_cust_bulk_api(url, payload): >> print("Calling Bulk API") >> try: >> # TODO: write code... >> headers = {'content-type': 'application/json'} >> print(":::::::jsn load::::") >> # print(json.dumps(payload)) >> # print(payload) >> response = requests.post(url, >> data=json.dumps(payload), headers=headers) >> # print(json.dumps(payload)) >> data = response.json() >> return data >> except Exception as e: >> print('ExceptionInPushingDataTo: ' + str(e)) >> # raise ExceptionInPushingDataTo(500, str(e)) >> >> total_count = df.count() >> i = 1 >> while i < total_count: >> rangeNum = i + 3999 >> print("Range Num:::") >> print(rangeNum) >> df1 = df.filter((col("row_num") >= i) & (col("row_num") >> <= rangeNum)) >> df1.cache() >> maxValue = >> df1.select(max(col("row_num")).alias("max_val")).toPandas()['max_val'].iloc[0] >> finalDF = df1.drop("row_num", "edl_timestamp", "uniqueID") >> print("finalDF count:::", finalDF.count()) >> token = get_token(tokenUrl, tokenBody) >> >> result = >> json.loads((finalDF.toPandas().to_json(orient="records"))) >> # token = get_token(tokenUrl, tokenBody) >> custRequestBody = { >> "Token": token, >> "CustomerName": "", >> "Object": "", >> "Data": result >> } >> >> # print("::::Customer Request Body::::::") >> # print(json.dumps(custRequestBody)) >> response = call_to_cust_bulk_api(policyUrl, >> custRequestBody) >> print(response) >> finalDFStatus = finalDF.withColumn("edl_timestamp", >> to_timestamp(lit(F.TimeNow()))).withColumn( >> "status_for_each_batch", >> lit(str(response))) >> >> >> print("Max Value:::") >> print(maxValue) >> print("Next I:::") >> i = rangeNum + 1 >> print(i) >> >> This is my very first approach to hitting the APIs with Spark. So, could >> anyone please help me to redesign the approach, or can share some links or >> references using which I can go to the depth of this and rectify myself. >> How can I scale this? >> >> >> Any help is much appreciated. >> >> TIA, >> Sid >> >