You can use a combination of explode and distinct before joining.

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode

# Create a SparkSession
spark = SparkSession.builder \
    .appName("JoinExample") \
    .getOrCreate()

sc = spark.sparkContext
# Set the log level to ERROR to reduce verbosity
sc.setLogLevel("ERROR")

# Create DataFrame df
data = [
    ["a"],
    ["b"],
    ["d"],
]
column_names = ["column1"]
df = spark.createDataFrame(data, column_names)
print("df:")
df.show()

# Create DataFrame df_1
df_1 = spark.createDataFrame([(["a", "b", "c"],), ([],)], ['data'])
print("df_1:")
df_1.show()

# Explode the array column in df_1
exploded_df_1 = df_1.select(explode("data").alias("data"))

# Join with df using the exploded column
final_df = exploded_df_1.join(df, exploded_df_1.data == df.column1)

# Distinct to ensure only unique rows are returned from df_1
final_df = final_df.select("data").distinct()

print("Result:")
final_df.show()


Results in

df:
+-------+
|column1|
+-------+
|      a|
|      b|
|      d|
+-------+

df_1:
+---------+
|     data|
+---------+
|[a, b, c]|
|       []|
+---------+

Result:
+----+
|data|
+----+
|   a|
|   b|
+----+

HTH

Mich Talebzadeh,

Technologist | Architect | Data Engineer  | Generative AI | FinCrimeLondon
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".


On Tue, 14 May 2024 at 13:19, Karthick Nk <kcekarth...@gmail.com> wrote:

> Hi All,
>
> Could anyone have any idea or suggestion of any alternate way to achieve
> this scenario?
>
> Thanks.
>
> On Sat, May 11, 2024 at 6:55 AM Damien Hawes <marley.ha...@gmail.com>
> wrote:
>
>> Right now, with the structure of your data, it isn't possible.
>>
>> The rows aren't duplicates of each other. "a" and "b" both exist in the
>> array. So Spark is correctly performing the join. It looks like you need to
>> find another way to model this data to get what you want to achieve.
>>
>> Are the values of "a" and "b" related to each other in any way?
>>
>> - Damien
>>
>> Op vr 10 mei 2024 18:08 schreef Karthick Nk <kcekarth...@gmail.com>:
>>
>>> Hi Mich,
>>>
>>> Thanks for the solution, But I am getting duplicate result by using
>>> array_contains. I have explained the scenario below, could you help me on
>>> that, how we can achieve i have tried different way bu i could able to
>>> achieve.
>>>
>>> For example
>>>
>>> data = [
>>>     ["a"],
>>>     ["b"],
>>>     ["d"],
>>> ]
>>> column_names = ["column1"]
>>> df = spark.createDataFrame(data, column_names)
>>> df.display()
>>>
>>> [image: image.png]
>>>
>>> df_1 = spark.createDataFrame([(["a", "b", "c"],), ([],)], ['data'])
>>> df_1.display()
>>> [image: image.png]
>>>
>>>
>>> final_df = df_1.join(df, expr("array_contains(data, column1)"))
>>> final_df.display()
>>>
>>> Resul:
>>> [image: image.png]
>>>
>>> But i need the result like below:
>>>
>>> [image: image.png]
>>>
>>> Why because
>>>
>>> In the df_1 i have only two records, in that first records onlly i have
>>> matching value.
>>> But both records from the df i.e *a, b* are present in the first
>>> records itself, it is returning two records as resultant, but my
>>> expectation is to return only one records means if any of the records from
>>> the df is present in the df_1 it should return only one records from the
>>> df_1.
>>>
>>> Note:
>>> 1. Here we are able to filter the duplicate records by using distinct of
>>> ID field in the resultant df, bu I am thinking that shouldn't be effective
>>> way, rather i am thinking of updating in array_contains steps itself.
>>>
>>> Thanks.
>>>
>>>
>>> On Fri, Mar 1, 2024 at 4:11 AM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>>
>>>> This is what you want, how to join two DFs with a string column in one
>>>> and an array of strings in the other, keeping only rows where the
>>>> string is present in the array.
>>>>
>>>> from pyspark.sql import SparkSession
>>>> from pyspark.sql import Row
>>>> from pyspark.sql.functions import expr
>>>>
>>>> spark = SparkSession.builder.appName("joins").getOrCreate()
>>>>
>>>> data1 = [Row(combined_id=[1, 2, 3])  # this one has a column
>>>> combined_id as an array of integers
>>>> data2 = [Row(mr_id=2), Row(mr_id=5)] # this one has column mr_id with
>>>> single integers
>>>>
>>>> df1 = spark.createDataFrame(data1)
>>>> df2 = spark.createDataFrame(data2)
>>>>
>>>> df1.printSchema()
>>>> df2.printSchema()
>>>>
>>>> # Perform the join with array_contains. It takes two arguments: an
>>>> array and a value. It returns True if the value exists as an element
>>>> within the array, otherwise False.
>>>> joined_df = df1.join(df2, expr("array_contains(combined_id, mr_id)"))
>>>>
>>>> # Show the result
>>>> joined_df.show()
>>>>
>>>> root
>>>>  |-- combined_id: array (nullable = true)
>>>>  |    |-- element: long (containsNull = true)
>>>>
>>>> root
>>>>  |-- mr_id: long (nullable = true)
>>>>
>>>> +-----------+-----+
>>>> |combined_id|mr_id|
>>>> +-----------+-----+
>>>> |  [1, 2, 3]|    2|
>>>> |  [4, 5, 6]|    5|
>>>> +-----------+-----+
>>>>
>>>> HTH
>>>>
>>>> Mich Talebzadeh,
>>>> Dad | Technologist | Solutions Architect | Engineer
>>>> London
>>>> United Kingdom
>>>>
>>>>
>>>>    view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>
>>>>
>>>>
>>>> *Disclaimer:* The information provided is correct to the best of my
>>>> knowledge but of course cannot be guaranteed . It is essential to note
>>>> that, as with any advice, quote "one test result is worth one-thousand
>>>> expert opinions (Werner
>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun
>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>>>
>>>>
>>>> On Thu, 29 Feb 2024 at 20:50, Karthick Nk <kcekarth...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I have two dataframe with below structure, i have to join these two
>>>>> dataframe - the scenario is one column is string in one dataframe and in
>>>>> other df join column is array of string, so we have to inner join two df
>>>>> and get the data if string value is present in any of the array of string
>>>>> value in another dataframe,
>>>>>
>>>>>
>>>>> df1 = spark.sql("""
>>>>>     SELECT
>>>>>         mr.id as mr_id,
>>>>>         pv.id as pv_id,
>>>>>         array(mr.id, pv.id) as combined_id
>>>>>     FROM
>>>>>         table1 mr
>>>>>         INNER JOIN table2 pv ON pv.id = Mr.recordid
>>>>>        where
>>>>>         pv.id = '35122806-4cd2-4916-a149-24ea55c2dc36'
>>>>>         or pv.id = 'a5f03625-6cc5-49df-95eb-df741fe9139b'
>>>>> """)
>>>>>
>>>>> # df1.display()
>>>>>
>>>>> # Your second query
>>>>> df2 = spark.sql("""
>>>>>     SELECT
>>>>>         id
>>>>>     FROM
>>>>>         table2
>>>>>     WHERE
>>>>>         id = '35122806-4cd2-4916-a149-24ea55c2dc36'
>>>>>
>>>>> """)
>>>>>
>>>>>
>>>>>
>>>>> Result data:
>>>>> 35122806-4cd2-4916-a149-24ea55c2dc36 only, because this records alone
>>>>> is common between string and array of string value.
>>>>>
>>>>> Can you share the sample snippet, how we can do the join for this two
>>>>> different datatype in the dataframe.
>>>>>
>>>>> if any clarification needed, pls feel free to ask.
>>>>>
>>>>> Thanks
>>>>>
>>>>>

Reply via email to