https://github.com/JohnSnowLabs/spark-nlp#packages-cheatsheet

*change spark = sparknlp.start()*
to
spark = sparknlp.start(spark32=True)


tir. 19. apr. 2022 kl. 21:10 skrev Bjørn Jørgensen <bjornjorgen...@gmail.com
>:

> Yes, there are some that have that issue.
>
> Please open a new issue at
> https://github.com/JohnSnowLabs/spark-nlp/issues and they will help you.
>
>
>
>
> tir. 19. apr. 2022 kl. 20:33 skrev Xavier Gervilla <
> xavier.gervi...@datapta.com>:
>
>> Thank you for your advice, I had small knowledge of Spark NLP and I
>> thought it was only possible to use with models that required training and
>> therefore my project wasn’t the case. I'm trying now to build the project
>> again with SparkNLP but when I try to load a pretrained model from
>> JohnSnowLabs I get an error (*py4j.protocol.Py4JJavaError: An error
>> occurred while calling
>> z:com.johnsnowlabs.nlp.pretrained.PythonResourceDownloader.getDownloadSize.*
>> ).
>>
>> This is the new basic code to develop the project again:
>>
>>
>> *spark = sparknlp.start()*
>>
>> *pipelineName = 'analyze_sentiment'*
>>
>>
>> *pipeline = PretrainedPipeline(pipelineName, 'en') #this is the line that
>> generates the error*
>>
>> *rawTweets = spark.readStream.format('socket').option('host',
>> 'localhost').option('port',9008).load()*
>>
>> *allTweets = rawTweets.selectExpr('CAST(value AS
>> STRING)').withColumnRenamed('value', 'text').dropDuplicates('text')*
>>
>>
>> *sentPred = pipeline.transform(allTweets)*
>>
>> *query =
>> sentPred.writeStream.outputMode('complete').format('console').start()*
>> *query.awaitTermination()*
>>
>> Spark version is 3.2.1 and SparkNLP version is 3.4.3, while Java version
>> is 8. I've tried with a different model but the error is still the same, so
>> what could be causing it?
>>
>> If this error is solved I think SparkNLP will be the solution I was
>> looking for to reduce memory consumption so thank you again for suggesting
>> it.
>>
>>
>>
>> El 18 abr 2022, a las 21:07, Bjørn Jørgensen <bjornjorgen...@gmail.com>
>> escribió:
>>
>> When did SpaCy have support for Spark?
>>
>> Try Spark NLP <https://nlp.johnsnowlabs.com/> it`s made for spark. They
>> have a lot of notebooks at https://github.com/JohnSnowLabs/spark-nlp and
>> they public user guides at
>> https://towardsdatascience.com/introduction-to-spark-nlp-foundations-and-basic-components-part-i-c83b7629ed59
>>
>>
>>
>>
>> man. 18. apr. 2022 kl. 16:17 skrev Sean Owen <sro...@gmail.com>:
>>
>> It looks good, are you sure it even starts? the problem I see is that you
>> send a copy of the model from the driver for every task. Try broadcasting
>> the model instead. I'm not sure if that resolves it but would be a good
>> practice.
>>
>> On Mon, Apr 18, 2022 at 9:10 AM Xavier Gervilla <
>> xavier.gervi...@datapta.com> wrote:
>>
>>
>> Hi Team,
>> <https://stackoverflow.com/questions/71841814/is-there-a-way-to-prevent-excessive-ram-consumption-with-the-spark-configuration>
>>
>> I'm developing a project that retrieves tweets on a 'host' app, streams
>> them to Spark and with different operations with DataFrames obtains the
>> Sentiment of the tweets and their entities applying a Sentiment model and a
>> NER model respectively.
>>
>> The problem I've come across is that when applying the NER model, the RAM
>> consumption increases until the program stops with a memory error because
>> there's no memory left to execute. In addition, on SparkUI I've seen that
>> there's only one executor running, the executor driver, but using htop on
>> the terminal I see that the 8 cores of the instance are executing at 100%.
>>
>> The SparkSession is only configured to receive the tweets from the socket
>> that connects with the second program that sends the tweets. The DataFrame
>> goes through some processing to obtain other properties of the tweet like
>> its sentiment (which causes no error even with less than 8GB of RAM) and
>> then the NER is applied.
>>
>> *spark = SparkSession.builder.appName(**"TwitterStreamApp"**).getOrCreate()
>> rawTweets = spark.readStream.**format**(**"socket"**).option(**"host"**, 
>> **"localhost"**).option(**"port"**,**9008**).load()
>> tweets = rawTweets.selectExpr(**"CAST(value AS STRING)"**)
>>
>> **#prior processing of the tweets**
>> sentDF = other_processing(tweets)
>>
>> **#obtaining the column that contains the list of entities from a tweet**
>> nerDF = ner_classification(sentDF)*
>>
>>
>> This is the code of the functions related to obtaining the NER, the "main
>> call" and the UDF function.
>>
>> *nerModel = spacy.load(**"en_core_web_sm"**)
>>
>> **#main call, applies the UDF function to every tweet from the "tweet" 
>> column**def* *ner_classification**(**words**):
>>     ner_list = udf(obtain_ner_udf, ArrayType(StringType()))
>>     words = words.withColumn(**"nerlist"**, ner_list(**"tweet"**))
>>     **return** words
>>
>> **#udf function**def* *obtain_ner_udf**(**words**):
>>     **#if the tweet is empty return None*
>>     *if** words == **""**:
>>         **return* *None*
>>     *#else: applying the NER model (Spacy en_core_web_sm)**
>>     entities = nerModel(words)
>>
>>     **#returns a list of the form ['entity1_label1', 'entity2_label2',...]*
>>     *return** [ word.text + **'_'** + word.label_ **for** word **in** 
>> entities.ents ]*
>>
>>
>>
>> And lastly I map each entity with the sentiment from its tweet and obtain
>> the average sentiment of the entity and the number of appearances.
>>
>> *flattenedNER = nerDF.select(nerDF.sentiment, explode(nerDF.nerlist))
>> flattenedNER.registerTempTable(**"df"**)
>>
>>
>> querySelect = **"SELECT col as entity, avg(sentiment) as sentiment, 
>> count(col) as count FROM df GROUP BY col"**
>> finalDF = spark.sql(querySelect)
>>
>> query = 
>> finalDF.writeStream.foreachBatch(processBatch).outputMode(**"complete"**).start()*
>>
>>
>> The resulting DF is processed with a function that separates each column
>> in a list and prints it.
>>
>> *def* *processBatch(df,* *epoch_id):*    *entities* *=* *[str(t.entity)* 
>> *for* *t* *in* *df.select("entity").collect()]*
>>     *sentiments* *=* *[float(t.sentiment)* *for* *t* *in* 
>> *df.select("sentiment").collect()]*
>>     *counts* *=* *[int(row.asDict()['count'])* *for* *row* *in* 
>> *df.select("count").collect()]*
>>
>> *    print(**entities,* *sentiments,* *counts)*
>>
>>
>> At first I tried with other NER models from Flair they have the same
>> effect, after printing the first batch memory use starts increasing until
>> it fails and stops the execution because of the memory error. When applying
>> a "simple" function instead of the NER model, such as *return
>> words.split()* on the UDF there's no such error so the data ingested
>> should not be what's causing the overload but the model.
>>
>> Is there a way to prevent the excessive RAM consumption? Why is there
>> only the driver executor and no other executors are generated? How could I
>> prevent it from collapsing when applying the NER model?
>>
>> Thanks in advance!
>>
>>
>>
>> --
>> Bjørn Jørgensen
>> Vestre Aspehaug 4, 6010 Ålesund
>> Norge
>>
>> +47 480 94 297
>>
>>
>>
>>
>>
>>
>>
>>
>
> --
> Bjørn Jørgensen
> Vestre Aspehaug 4, 6010 Ålesund
> Norge
>
> +47 480 94 297
>


-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297

Reply via email to