What do you mean by overkill here? I tried the below way to iterate over 4k records under a while loop. However, it runs for the only first record. What could be wrong here? I am going through few SO blogs where user found the below approach faster than withColumn approach :
finalDF = finalDF.select("meta").rdd.map( lambda x: call_to_cust_bulk_api(policyUrl, x[0])).toDF() On Mon, Jun 13, 2022 at 4:13 PM Gourav Sengupta <gourav.sengu...@gmail.com> wrote: > Hi, > > >> spark.range(10000).createOrReplaceTempView("test") > >> maximum_records_per_api_call = 40 > >> batch_count = spark.sql("SELECT * FROM test").count() / > maximum_records_per_api_call > >> spark.sql("SELECT id, mod(monotonically_increasing_id() / batch_count) > batch_id FROM > test).repartitionByRange("batch_id").createOrReplaceTempView("test_batch") > > > the above code should be able to then be run with a udf as long as we are > able to control the parallelism with the help of executor count and task > cpi configuration. > > But once again, this is just an unnecessary overkill. > > > Regards, > Gourav Sengupta > > On Mon, Jun 13, 2022 at 10:41 AM Sid <flinkbyhe...@gmail.com> wrote: > >> Hi Gourav, >> >> Could you please provide me with some examples? >> >> On Mon, Jun 13, 2022 at 2:23 PM Gourav Sengupta < >> gourav.sengu...@gmail.com> wrote: >> >>> Hi, >>> >>> try to use mod of a monotonically increasing field and then use >>> repartitionbyrange function, and see whether SPARK automatically serialises >>> it based on the number of executors that you put in the job. >>> >>> But once again, this is kind of an overkill, for fetching data from a >>> API, creating a simple python program works quite well. >>> >>> >>> Regards, >>> Gourav >>> >>> On Mon, Jun 13, 2022 at 9:28 AM Sid <flinkbyhe...@gmail.com> wrote: >>> >>>> Hi Gourav, >>>> >>>> Do you have any examples or links, please? That would help me to >>>> understand. >>>> >>>> Thanks, >>>> Sid >>>> >>>> On Mon, Jun 13, 2022 at 1:42 PM Gourav Sengupta < >>>> gourav.sengu...@gmail.com> wrote: >>>> >>>>> Hi, >>>>> I think that serialising data using spark is an overkill, why not use >>>>> normal python. >>>>> >>>>> Also have you tried repartition by range, that way you can use modulus >>>>> operator to batch things up? >>>>> >>>>> Regards, >>>>> Gourav >>>>> >>>>> >>>>> On Mon, Jun 13, 2022 at 8:37 AM Sid <flinkbyhe...@gmail.com> wrote: >>>>> >>>>>> Hi Team, >>>>>> >>>>>> I am trying to hit the POST APIs for the very first time using >>>>>> Pyspark. >>>>>> >>>>>> My end goal is to achieve is something like the below: >>>>>> >>>>>> >>>>>> 1. Generate the data >>>>>> 2. Send the data in the batch of 4k records in one batch since >>>>>> the API can accept the 4k records at once. >>>>>> 3. The record would be as the below: >>>>>> 4. >>>>>> >>>>>> { >>>>>> "Token": "", >>>>>> "CustomerName": "", >>>>>> "Object": "", >>>>>> "Data": [{"A":"1"},{"A":"2"}] >>>>>> } >>>>>> >>>>>> 5. Token will be generated first then it would be passed to the >>>>>> 'Token' key in the data. >>>>>> >>>>>> For the above goal, I initially wrote something like the below which >>>>>> gives a heap error because the data frame is getting created on the >>>>>> driver >>>>>> side, and the size of the records is a minimum of 1M. >>>>>> df = modifiedData # Assume it to be query results stored >>>>>> as a DF >>>>>> >>>>>> df = df.withColumn("uniqueID", lit("1")) >>>>>> >>>>>> df = df.withColumn("row_num", row_number().over( >>>>>> >>>>>> Window.partitionBy(col("uniqueID")).orderBy(col("uniqueID")) >>>>>> )) >>>>>> tokenUrl = "" >>>>>> # tokenUrl = "" >>>>>> policyUrl = "" >>>>>> tokenBody = {"Username": "", "Password": "", >>>>>> "CustomerName": ""} >>>>>> >>>>>> def get_token(url, payload): >>>>>> try: >>>>>> print("Getting Token") >>>>>> response = requests.request("POST", url, >>>>>> data=payload) >>>>>> data = response.json() >>>>>> if data['ErrorDescription'] == 'Success': >>>>>> token = data['Token'] >>>>>> print(":::Token Generated::::") >>>>>> else: >>>>>> print('TokenNotGeneratedFrom: ') >>>>>> # raise TokenNotGeneratedFrom(500, 'Token not >>>>>> Generated from ') >>>>>> return token >>>>>> except Exception as e: >>>>>> print('TokenNotGeneratedFrom: ' + str(e)) >>>>>> # raise TokenNotGeneratedFrom(500, str(e)) >>>>>> >>>>>> def call_to_cust_bulk_api(url, payload): >>>>>> print("Calling Bulk API") >>>>>> try: >>>>>> # TODO: write code... >>>>>> headers = {'content-type': 'application/json'} >>>>>> print(":::::::jsn load::::") >>>>>> # print(json.dumps(payload)) >>>>>> # print(payload) >>>>>> response = requests.post(url, >>>>>> data=json.dumps(payload), headers=headers) >>>>>> # print(json.dumps(payload)) >>>>>> data = response.json() >>>>>> return data >>>>>> except Exception as e: >>>>>> print('ExceptionInPushingDataTo: ' + str(e)) >>>>>> # raise ExceptionInPushingDataTo(500, str(e)) >>>>>> >>>>>> total_count = df.count() >>>>>> i = 1 >>>>>> while i < total_count: >>>>>> rangeNum = i + 3999 >>>>>> print("Range Num:::") >>>>>> print(rangeNum) >>>>>> df1 = df.filter((col("row_num") >= i) & >>>>>> (col("row_num") <= rangeNum)) >>>>>> df1.cache() >>>>>> maxValue = >>>>>> df1.select(max(col("row_num")).alias("max_val")).toPandas()['max_val'].iloc[0] >>>>>> finalDF = df1.drop("row_num", "edl_timestamp", >>>>>> "uniqueID") >>>>>> print("finalDF count:::", finalDF.count()) >>>>>> token = get_token(tokenUrl, tokenBody) >>>>>> >>>>>> result = >>>>>> json.loads((finalDF.toPandas().to_json(orient="records"))) >>>>>> # token = get_token(tokenUrl, tokenBody) >>>>>> custRequestBody = { >>>>>> "Token": token, >>>>>> "CustomerName": "", >>>>>> "Object": "", >>>>>> "Data": result >>>>>> } >>>>>> >>>>>> # print("::::Customer Request Body::::::") >>>>>> # print(json.dumps(custRequestBody)) >>>>>> response = call_to_cust_bulk_api(policyUrl, >>>>>> custRequestBody) >>>>>> print(response) >>>>>> finalDFStatus = finalDF.withColumn("edl_timestamp", >>>>>> to_timestamp(lit(F.TimeNow()))).withColumn( >>>>>> "status_for_each_batch", >>>>>> lit(str(response))) >>>>>> >>>>>> >>>>>> print("Max Value:::") >>>>>> print(maxValue) >>>>>> print("Next I:::") >>>>>> i = rangeNum + 1 >>>>>> print(i) >>>>>> >>>>>> This is my very first approach to hitting the APIs with Spark. So, >>>>>> could anyone please help me to redesign the approach, or can share some >>>>>> links or references using which I can go to the depth of this and rectify >>>>>> myself. How can I scale this? >>>>>> >>>>>> >>>>>> Any help is much appreciated. >>>>>> >>>>>> TIA, >>>>>> Sid >>>>>> >>>>>