dataset.csv
id,tel_in_dataset
1,+3311111111
2,+3312224444
3,+3313333333
4,+3312225555
5,+3312226666
6,+3314444444
7,+3312227777
8,+3315555555

telephone_numbers.csv
tel
+3312224444
+3312225555
+3312226666
+3312227777



start spark with all of yous cpu and ram

import os
import multiprocessing
from pyspark import SparkConf, SparkContext
from pyspark import pandas as ps
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, concat, concat_ws, expr, lit, trim,
regexp_replace
from pyspark.sql.types import IntegerType, StringType, StructField,
StructType

os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"

number_cores = int(multiprocessing.cpu_count())

mem_bytes = os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES")  #
e.g. 4015976448
memory_gb = int(mem_bytes / (1024.0**3))  # e.g. 3.74


def get_spark_session(app_name: str, conf: SparkConf):
    conf.setMaster("local[{}]".format(number_cores))
    conf.set("spark.driver.memory", "{}g".format(memory_gb)).set(
        "spark.sql.adaptive.enabled", "True"
    ).set(
        "spark.serializer", "org.apache.spark.serializer.KryoSerializer"
    ).set(
        "spark.sql.repl.eagerEval.maxNumRows", "10000"
        ).set(
        "sc.setLogLevel", "ERROR"
    )

    return
SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate()


spark = get_spark_session("My app", SparkConf())
spark.sparkContext.setLogLevel("ERROR")




#pandas API on spark
tel_df = ps.read_csv("telephone_numbers.csv")

tel_df['tel'] = tel_df['tel'].astype(str)
tel_df['cleaned_tel'] = tel_df['tel'].str.replace('+', '', regex=False)

dataset_df = ps.read_csv("dataset.csv")
dataset_df['tel_in_dataset'] = dataset_df['tel_in_dataset'].astype(str)

dataset_df['cleaned_tel_in_dataset'] =
dataset_df['tel_in_dataset'].str.replace('+', '', regex=False)

filtered_df =
dataset_df[dataset_df['cleaned_tel_in_dataset'].isin(tel_df['cleaned_tel'].to_list())]

filtered_df.head()


idtel_in_datasetcleaned_tel_in_dataset
1 2 3312224444 3312224444
3 4 3312225555 3312225555
4 5 3312226666 3312226666
6 7 3312227777 3312227777


#pyspark
tel_df = spark.read.csv("telephone_numbers.csv", header=True)
tel_df = tel_df.withColumn("cleaned_tel", regexp_replace(col("tel"), "\\+",
""))

dataset_df = spark.read.csv("dataset.csv", header=True)
dataset_df = dataset_df.withColumn("cleaned_tel_in_dataset",
regexp_replace(col("tel_in_dataset"), "\\+", ""))

filtered_df =
dataset_df.where(col("cleaned_tel_in_dataset").isin([row.cleaned_tel for
row in tel_df.collect()]))

filtered_df.show()


+---+--------------+----------------------+
| id|tel_in_dataset|cleaned_tel_in_dataset|
+---+--------------+----------------------+
|  2|   +3312224444|            3312224444|
|  4|   +3312225555|            3312225555|
|  5|   +3312226666|            3312226666|
|  7|   +3312227777|            3312227777|
+---+--------------+----------------------+




søn. 2. apr. 2023 kl. 18:18 skrev Mich Talebzadeh <mich.talebza...@gmail.com
>:

> Hi Phillipe,
>
> These are my thoughts besides comments from Sean
>
> Just to clarify, you receive a CSV file periodically and you already have
> a file that contains valid patterns for phone numbers (reference)
>
> In a pseudo language you can probe your csv DF against the reference DF
>
> // load your reference dataframeval 
> reference_DF=sqlContext.parquetFile("path")
> // mark this smaller dataframe to be stored in memoryreference_DF.cache()
>
> //Create a temp table
>
> reference_DF.createOrReplaceTempView("reference")
>
> // Do the same on the CSV, change the line below
>
> val csvDF = 
> spark.read.format("com.databricks.spark.csv").option("inferSchema", 
> "true").option("header", "false").load("path")
>
> csvDF.cache()  // This may or not work if CSV is large, however it is worth 
> trying
>
> csvDF.createOrReplaceTempView("csv")
>
> sqlContext.sql("JOIN Query").show
>
> If you prefer to broadcast the reference data, you must first collect it on 
> the driver before you broadcast it. This requires that your RDD fits in 
> memory on your driver (and executors).
>
> You can then play around with that join.
>
> HTH
>
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies Limited
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sun, 2 Apr 2023 at 09:17, Philippe de Rochambeau <phi...@free.fr>
> wrote:
>
>> Many thanks, Mich.
>> Is « foreach »  the best construct to  lookup items is a dataset  such as
>> the below «  telephonedirectory » data set?
>>
>> val telrdd = spark.sparkContext.parallelize(Seq(«  tel1 » , «  tel2 » , «  
>> tel3 » …)) // the telephone sequence
>>
>> // was read for a CSV file
>>
>> val ds = spark.read.parquet(«  /path/to/telephonedirectory » )
>>
>>   rdd .foreach(tel => {
>>     longAcc.select(«  * » ).rlike(«  + »  + tel)
>>   })
>>
>>
>>
>>
>> Le 1 avr. 2023 à 22:36, Mich Talebzadeh <mich.talebza...@gmail.com> a
>> écrit :
>>
>> This may help
>>
>> Spark rlike() Working with Regex Matching Example
>> <https://sparkbyexamples.com/spark/spark-rlike-regex-matching-examples/>s
>> Mich Talebzadeh,
>> Lead Solutions Architect/Engineering Lead
>> Palantir Technologies Limited
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Sat, 1 Apr 2023 at 19:32, Philippe de Rochambeau <phi...@free.fr>
>> wrote:
>>
>>> Hello,
>>> I’m looking for an efficient way in Spark to search for a series of
>>> telephone numbers, contained in a CSV file, in a data set column.
>>>
>>> In pseudo code,
>>>
>>> for tel in [tel1, tel2, …. tel40,000]
>>>         search for tel in dataset using .like(« %tel% »)
>>> end for
>>>
>>> I’m using the like function because the telephone numbers in the data
>>> set main contain prefixes, such as « + « ; e.g., « +3312224444 ».
>>>
>>> Any suggestions would be welcome.
>>>
>>> Many thanks.
>>>
>>> Philippe
>>>
>>>
>>>
>>>
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297

Reply via email to