https://github.com/JohnSnowLabs/spark-nlp#packages-cheatsheet
*change spark = sparknlp.start()* to spark = sparknlp.start(spark32=True) tir. 19. apr. 2022 kl. 21:10 skrev Bjørn Jørgensen <bjornjorgen...@gmail.com >: > Yes, there are some that have that issue. > > Please open a new issue at > https://github.com/JohnSnowLabs/spark-nlp/issues and they will help you. > > > > > tir. 19. apr. 2022 kl. 20:33 skrev Xavier Gervilla < > xavier.gervi...@datapta.com>: > >> Thank you for your advice, I had small knowledge of Spark NLP and I >> thought it was only possible to use with models that required training and >> therefore my project wasn’t the case. I'm trying now to build the project >> again with SparkNLP but when I try to load a pretrained model from >> JohnSnowLabs I get an error (*py4j.protocol.Py4JJavaError: An error >> occurred while calling >> z:com.johnsnowlabs.nlp.pretrained.PythonResourceDownloader.getDownloadSize.* >> ). >> >> This is the new basic code to develop the project again: >> >> >> *spark = sparknlp.start()* >> >> *pipelineName = 'analyze_sentiment'* >> >> >> *pipeline = PretrainedPipeline(pipelineName, 'en') #this is the line that >> generates the error* >> >> *rawTweets = spark.readStream.format('socket').option('host', >> 'localhost').option('port',9008).load()* >> >> *allTweets = rawTweets.selectExpr('CAST(value AS >> STRING)').withColumnRenamed('value', 'text').dropDuplicates('text')* >> >> >> *sentPred = pipeline.transform(allTweets)* >> >> *query = >> sentPred.writeStream.outputMode('complete').format('console').start()* >> *query.awaitTermination()* >> >> Spark version is 3.2.1 and SparkNLP version is 3.4.3, while Java version >> is 8. I've tried with a different model but the error is still the same, so >> what could be causing it? >> >> If this error is solved I think SparkNLP will be the solution I was >> looking for to reduce memory consumption so thank you again for suggesting >> it. >> >> >> >> El 18 abr 2022, a las 21:07, Bjørn Jørgensen <bjornjorgen...@gmail.com> >> escribió: >> >> When did SpaCy have support for Spark? >> >> Try Spark NLP <https://nlp.johnsnowlabs.com/> it`s made for spark. They >> have a lot of notebooks at https://github.com/JohnSnowLabs/spark-nlp and >> they public user guides at >> https://towardsdatascience.com/introduction-to-spark-nlp-foundations-and-basic-components-part-i-c83b7629ed59 >> >> >> >> >> man. 18. apr. 2022 kl. 16:17 skrev Sean Owen <sro...@gmail.com>: >> >> It looks good, are you sure it even starts? the problem I see is that you >> send a copy of the model from the driver for every task. Try broadcasting >> the model instead. I'm not sure if that resolves it but would be a good >> practice. >> >> On Mon, Apr 18, 2022 at 9:10 AM Xavier Gervilla < >> xavier.gervi...@datapta.com> wrote: >> >> >> Hi Team, >> <https://stackoverflow.com/questions/71841814/is-there-a-way-to-prevent-excessive-ram-consumption-with-the-spark-configuration> >> >> I'm developing a project that retrieves tweets on a 'host' app, streams >> them to Spark and with different operations with DataFrames obtains the >> Sentiment of the tweets and their entities applying a Sentiment model and a >> NER model respectively. >> >> The problem I've come across is that when applying the NER model, the RAM >> consumption increases until the program stops with a memory error because >> there's no memory left to execute. In addition, on SparkUI I've seen that >> there's only one executor running, the executor driver, but using htop on >> the terminal I see that the 8 cores of the instance are executing at 100%. >> >> The SparkSession is only configured to receive the tweets from the socket >> that connects with the second program that sends the tweets. The DataFrame >> goes through some processing to obtain other properties of the tweet like >> its sentiment (which causes no error even with less than 8GB of RAM) and >> then the NER is applied. >> >> *spark = SparkSession.builder.appName(**"TwitterStreamApp"**).getOrCreate() >> rawTweets = spark.readStream.**format**(**"socket"**).option(**"host"**, >> **"localhost"**).option(**"port"**,**9008**).load() >> tweets = rawTweets.selectExpr(**"CAST(value AS STRING)"**) >> >> **#prior processing of the tweets** >> sentDF = other_processing(tweets) >> >> **#obtaining the column that contains the list of entities from a tweet** >> nerDF = ner_classification(sentDF)* >> >> >> This is the code of the functions related to obtaining the NER, the "main >> call" and the UDF function. >> >> *nerModel = spacy.load(**"en_core_web_sm"**) >> >> **#main call, applies the UDF function to every tweet from the "tweet" >> column**def* *ner_classification**(**words**): >> ner_list = udf(obtain_ner_udf, ArrayType(StringType())) >> words = words.withColumn(**"nerlist"**, ner_list(**"tweet"**)) >> **return** words >> >> **#udf function**def* *obtain_ner_udf**(**words**): >> **#if the tweet is empty return None* >> *if** words == **""**: >> **return* *None* >> *#else: applying the NER model (Spacy en_core_web_sm)** >> entities = nerModel(words) >> >> **#returns a list of the form ['entity1_label1', 'entity2_label2',...]* >> *return** [ word.text + **'_'** + word.label_ **for** word **in** >> entities.ents ]* >> >> >> >> And lastly I map each entity with the sentiment from its tweet and obtain >> the average sentiment of the entity and the number of appearances. >> >> *flattenedNER = nerDF.select(nerDF.sentiment, explode(nerDF.nerlist)) >> flattenedNER.registerTempTable(**"df"**) >> >> >> querySelect = **"SELECT col as entity, avg(sentiment) as sentiment, >> count(col) as count FROM df GROUP BY col"** >> finalDF = spark.sql(querySelect) >> >> query = >> finalDF.writeStream.foreachBatch(processBatch).outputMode(**"complete"**).start()* >> >> >> The resulting DF is processed with a function that separates each column >> in a list and prints it. >> >> *def* *processBatch(df,* *epoch_id):* *entities* *=* *[str(t.entity)* >> *for* *t* *in* *df.select("entity").collect()]* >> *sentiments* *=* *[float(t.sentiment)* *for* *t* *in* >> *df.select("sentiment").collect()]* >> *counts* *=* *[int(row.asDict()['count'])* *for* *row* *in* >> *df.select("count").collect()]* >> >> * print(**entities,* *sentiments,* *counts)* >> >> >> At first I tried with other NER models from Flair they have the same >> effect, after printing the first batch memory use starts increasing until >> it fails and stops the execution because of the memory error. When applying >> a "simple" function instead of the NER model, such as *return >> words.split()* on the UDF there's no such error so the data ingested >> should not be what's causing the overload but the model. >> >> Is there a way to prevent the excessive RAM consumption? Why is there >> only the driver executor and no other executors are generated? How could I >> prevent it from collapsing when applying the NER model? >> >> Thanks in advance! >> >> >> >> -- >> Bjørn Jørgensen >> Vestre Aspehaug 4, 6010 Ålesund >> Norge >> >> +47 480 94 297 >> >> >> >> >> >> >> >> > > -- > Bjørn Jørgensen > Vestre Aspehaug 4, 6010 Ålesund > Norge > > +47 480 94 297 > -- Bjørn Jørgensen Vestre Aspehaug 4, 6010 Ålesund Norge +47 480 94 297