Hi Enrico,

Thanks for helping me to understand the mistakes.

My end goal is to achieve is something like the below:


   1.  Generate the data
   2. Send the data in the batch of 4k records in one batch since the API
   can accept the 4k records at once.
   3. The record would be as the below:
   4.

   {
       "Token": "",
       "CustomerName": "",
       "Object": "",
       "Data": [{"A":"1"},{"A":"2"}]
   }

   5. Token will be generated first then it would be passed to the 'Token'
   key in the data.

For the above goal, I initially wrote something like the below which gives
a heap error because the data frame is getting created on the driver side,
and the size of the records is a minimum of 1M.
           df = modifiedData

            df = df.withColumn("uniqueID", lit("1"))

            df = df.withColumn("row_num", row_number().over(
                Window.partitionBy(col("uniqueID")).orderBy(col("uniqueID"))
            ))
            tokenUrl = ""
            # tokenUrl = ""
            policyUrl = ""
            tokenBody = {"Username": "", "Password": "", "CustomerName": ""}

            def get_token(url, payload):
                try:_2022_06_11
                    print("Getting Token")
                    response = requests.request("POST", url, data=payload)
                    data = response.json()
                    if data['ErrorDescription'] == 'Success':
                        token = data['Token']
                        print(":::Token Generated::::")
                    else:
                        print('TokenNotGeneratedFrom: ')
                        # raise TokenNotGeneratedFrom(500, 'Token not
Generated from ')
                    return token
                except Exception as e:
                    print('TokenNotGeneratedFrom: ' + str(e))
                    # raise TokenNotGeneratedFrom(500, str(e))

            def call_to_cust_bulk_api(url, payload):
                print("Calling Bulk API")
                try:
                    # TODO: write code...
                    headers = {'content-type': 'application/json'}
                    print(":::::::jsn load::::")
                    # print(json.dumps(payload))
                    # print(payload)
                    response = requests.post(url, data=json.dumps(payload),
headers=headers)
                    # print(json.dumps(payload))
                    data = response.json()
                    return data
                except Exception as e:
                    print('ExceptionInPushingDataTo: ' + str(e))
                    # raise ExceptionInPushingDataTo(500, str(e))

            total_count = df.count()
            i = 1
            while i < total_count:
                rangeNum = i + 3999
                print("Range Num:::")
                print(rangeNum)
                df1 = df.filter((col("row_num") >= i) & (col("row_num") <=
rangeNum))
                df1.cache()
                maxValue =
df1.select(max(col("row_num")).alias("max_val")).toPandas()['max_val'].iloc[0]
                finalDF = df1.drop("row_num", "edl_timestamp", "uniqueID")
                print("finalDF count:::", finalDF.count())
                token = get_token(tokenUrl, tokenBody)

                result =
json.loads((finalDF.toPandas().to_json(orient="records")))
                # token = get_token(tokenUrl, tokenBody)
                custRequestBody = {
                    "Token": token,
                    "CustomerName": "",
                    "Object": "",
                    "Data": result
                }

                # print("::::Customer Request Body::::::")
                # print(json.dumps(custRequestBody))
                response = call_to_cust_bulk_api(policyUrl, custRequestBody)
                print(response)
                finalDFStatus = finalDF.withColumn("edl_timestamp",
to_timestamp(lit(F.TimeNow()))).withColumn(
                    "status_for_each_batch",
                    lit(str(response)))


                print("Max Value:::")
                print(maxValue)
                print("Next I:::")
                i = rangeNum + 1
                print(i)

This is my very first approach to hitting the APIs with Spark. So, could
you please help me to redesign the approach, or can share some links or
references using which I can go to the depth of this and rectify myself.
How can I scale this?


Any help is much appreciated. Thank you so much again for your time.

TIA,
Sid




On Fri, Jun 10, 2022 at 8:54 PM Enrico Minack <i...@enrico.minack.dev>
wrote:

> Hi,
>
> This adds a column with value "1" (string) *in all rows*:
>
> df = df.withColumn("uniqueID", lit("1"))
>
> This counts the rows for all rows that have the same uniqueID, *which are
> all rows*. The window does not make much sense.
> And it orders all rows that have the same uniqueID by uniqueID. Does not
> make much sense either.
> df = df.withColumn("row_num", row_number().over( Window.partitionBy(col(
> "uniqueID")).orderBy(col("uniqueID")) ))
>
> Then it looks like it takes the first 4000 rows (row_num from 1 to 4000)
> and tries to send them via HTTP POST. Then it moves the range by one and
> sends row 2 to 4001 (mostly overlapped with the first POST).
>
> It is not clear if the "Data" field is meant to be all rows or only a
> single row. Either way, this is not what happens. Please consider the
> difference between a Column and a DataFrame in Spark. This is very
> different from Pandas.
>
> I think you have to rethink your approach. Using Spark means big data.
> This approach is iterative and single-threaded.
>
> Enrico
>
>
> Am 10.06.22 um 16:01 schrieb Sid:
>
> Hi Enrico,
>
> Thanks for your time. Much appreciated.
>
> I am expecting the payload to be as a JSON string to be a record like
> below:
>
> {"A":"some_value","B":"some_value"}
>
> Where A and B are the columns in my dataset.
>
>
> On Fri, Jun 10, 2022 at 6:09 PM Enrico Minack <i...@enrico.minack.dev>
> wrote:
>
>> Sid,
>>
>> just recognized you are using Python API here. Then
>> struct(*colsListToBePassed)) should be correct, given it takes a list of
>> strings.
>>
>> Your method call_to_cust_bulk_api takes argument payload, which is a
>> Column. This is then used in custRequestBody. That is pretty strange use
>> of a column expression. What do you expect print(payload) to be?
>>
>> I recommend to split that complex command into multiple commands to find
>> out what "an error of column not iterable" refers to.
>>
>> Enrico
>>
>>
>> Am 10.06.22 um 13:39 schrieb Enrico Minack:
>>
>> Hi Sid,
>>
>> finalDF = finalDF.repartition(finalDF.rdd.getNumPartitions())
>>                  .withColumn("status_for_batch", 
>> call_to_cust_bulk_api(policyUrl, to_json(struct(*colsListToBePassed))))
>>
>> You are calling withColumn with the result of call_to_cust_bulk_api as
>> the second argument. That result looks like it is of type string. But
>> withColumn expects type Column. You can turn that string into a Column
>> using lit:
>>
>> finalDF = finalDF.repartition(finalDF.rdd.getNumPartitions())
>>                  .withColumn("status_for_batch", 
>> lit(call_to_cust_bulk_api(policyUrl, to_json(struct(*colsListToBePassed)))))
>>
>>
>> You are saying that gives you an error of column not iterable. I reckon
>> the struct(*colsListToBePassed)) is wrong.
>>
>> Method struct requires a single string followed by a list of strings.
>> Given your colsListToBePassed is a list of strings, this does not work.
>> Try:
>>
>>   struct(colsListToBePassed.head, colsListToBePassed.tail: _*))
>>
>> Alternatively, struct requires a list of Column, so try this:
>>
>>   struct(colsListToBePassed.map(col): _*))
>>
>> The API is pretty clear about the types it expects.
>>
>>
>> If you are still having errors, you better please paste the code and
>> error.
>>
>> Enrico
>>
>>
>>
>> Am 09.06.22 um 21:31 schrieb Sid:
>>
>> Hi Experts,
>>
>> I am facing one problem while passing a column to the method.  The
>> problem is described in detail here:
>>
>>
>> https://stackoverflow.com/questions/72565095/how-to-pass-columns-as-a-json-record-to-the-api-method-using-pyspark
>>
>> TIA,
>> Sid
>>
>>
>>
>>
>

Reply via email to