Hi Karthick,

A few points that may help you:

As stated in the URL you posted, "The function is non-deterministic because its 
result depends on partition IDs." Hence, the generated ID is dependent on 
partition IDs. Based on the code snippet you provided, I didn't see the partion 
columns you selected to apply the window function. Therefore your generated IDs 
will vary for each run because the partion ID assgined to a particular record 
may change in each run.

You mentioned that it worked on small database. The small data may happen to 
fit into one partition and thus give you the same output by chance. Once your 
data scale up and being assigned with different partition IDs, your result will 
likely be different. When it comes to solution, to begin with, you may partion 
your data correctly before applying window functions.

Besides, to achieve what you want to do, you may use row_number() alone to 
generate the so call index or use monotonically_increasing_id() together with 
rank() to assign the id and ranking them to make the result more deterministic.

Thank You & Best Regards
Winston Lai

Thank You & Best Regards
Winston Lai
________________________________
From: Karthick Nk <kcekarth...@gmail.com>
Sent: Friday, April 28, 2023 7:59:29 PM
To: user@spark.apache.org <user@spark.apache.org>
Subject: ***pyspark.sql.functions.monotonically_increasing_id()***

Hi @all,

I am using monotonically_increasing_id(), in the pyspark function, for removing 
one field from json field in one column from the delta table, please refer the 
below code

df = spark.sql(f"SELECT * from {database}.{table}")
df1 = spark.read.json(df.rdd.map(lambda x: x.data), multiLine = True)
df1 = df1.drop('fw_createdts')
df1 = df1.selectExpr('to_json(struct(*)) as data')
df = df.withColumn('row_index', 
row_number().over(Window.orderBy(monotonically_increasing_id()))).withColumnRenamed("data","data1")
df1 = df1.withColumn('row_index', 
row_number().over(Window.orderBy(monotonically_increasing_id())))
df = df.join(df1, on=["row_index"]).drop("row_index","data1")
df.createOrReplaceTempView('tempdf')

Business Requirement:
Need to remove one key value from json field from the json field in the delta 
table

done steps:
1. reading the data
2. read the JSON data only in separate df and split using spark.read.json
3. I am removing the unwanted column from the df
4. again i am converting it into json field
5. (since we don't have any unique column between json column and other primary 
columns are not able to map with unique id) so, I am joining two dataframe 
using  monotonically_increasing_id()
(and also some of the fields present in the JSON field is present  in the 
primitive level, so not able to split the json field in same dataframe)

Issue Faced:
1. For small database and data it is working as expected
2. for big database and data it is not working as expected and get mapping with 
different records in same table

When I referred document, I could see the id are not consecutive, is there any 
limit?
https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.sql.functions.monotonically_increasing_id.html#pyspark-sql-functions-monotonically-increasing-id

Could you explain to us if there any constraints on it?
How we can achieve this requirement by using any alternate methods?


Thanks in advance🙂

Reply via email to