It looks good, are you sure it even starts? the problem I see is that you send a copy of the model from the driver for every task. Try broadcasting the model instead. I'm not sure if that resolves it but would be a good practice.
On Mon, Apr 18, 2022 at 9:10 AM Xavier Gervilla <xavier.gervi...@datapta.com> wrote: > Hi Team, > <https://stackoverflow.com/questions/71841814/is-there-a-way-to-prevent-excessive-ram-consumption-with-the-spark-configuration> > > I'm developing a project that retrieves tweets on a 'host' app, streams > them to Spark and with different operations with DataFrames obtains the > Sentiment of the tweets and their entities applying a Sentiment model and a > NER model respectively. > > The problem I've come across is that when applying the NER model, the RAM > consumption increases until the program stops with a memory error because > there's no memory left to execute. In addition, on SparkUI I've seen that > there's only one executor running, the executor driver, but using htop on > the terminal I see that the 8 cores of the instance are executing at 100%. > > The SparkSession is only configured to receive the tweets from the socket > that connects with the second program that sends the tweets. The DataFrame > goes through some processing to obtain other properties of the tweet like > its sentiment (which causes no error even with less than 8GB of RAM) and > then the NER is applied. > > *spark = SparkSession.builder.appName(**"TwitterStreamApp"**).getOrCreate() > rawTweets = spark.readStream.**format**(**"socket"**).option(**"host"**, > **"localhost"**).option(**"port"**,**9008**).load() > tweets = rawTweets.selectExpr(**"CAST(value AS STRING)"**) > > **#prior processing of the tweets** > sentDF = other_processing(tweets) > > **#obtaining the column that contains the list of entities from a tweet** > nerDF = ner_classification(sentDF)* > > > This is the code of the functions related to obtaining the NER, the "main > call" and the UDF function. > > *nerModel = spacy.load(**"en_core_web_sm"**) > > **#main call, applies the UDF function to every tweet from the "tweet" > column**def* *ner_classification**(**words**): > ner_list = udf(obtain_ner_udf, ArrayType(StringType())) > words = words.withColumn(**"nerlist"**, ner_list(**"tweet"**)) > **return** words > > **#udf function**def* *obtain_ner_udf**(**words**): > **#if the tweet is empty return None* > *if** words == **""**: > **return* *None* > *#else: applying the NER model (Spacy en_core_web_sm)** > entities = nerModel(words) > > **#returns a list of the form ['entity1_label1', 'entity2_label2',...]* > *return** [ word.text + **'_'** + word.label_ **for** word **in** > entities.ents ]* > > > > And lastly I map each entity with the sentiment from its tweet and obtain > the average sentiment of the entity and the number of appearances. > > *flattenedNER = nerDF.select(nerDF.sentiment, explode(nerDF.nerlist)) > flattenedNER.registerTempTable(**"df"**) > > > querySelect = **"SELECT col as entity, avg(sentiment) as sentiment, > count(col) as count FROM df GROUP BY col"** > finalDF = spark.sql(querySelect) > > query = > finalDF.writeStream.foreachBatch(processBatch).outputMode(**"complete"**).start()* > > > The resulting DF is processed with a function that separates each column > in a list and prints it. > > *def* *processBatch**(**df**,* *epoch_id**):* *entities* *=* > *[**str**(**t**.**entity**)* *for* *t* *in* > *df**.**select**(**"entity"**).**collect**()]* > *sentiments* *=* *[**float**(**t**.**sentiment**)* *for* *t* *in* > *df**.**select**(**"sentiment"**).**collect**()]* > *counts* *=* *[**int**(**row**.**asDict**()[**'count'**])* *for* *row* > *in* *df**.**select**(**"count"**).**collect**()]* > > * print(**entities**,* *sentiments**,* *counts**)* > > > At first I tried with other NER models from Flair they have the same > effect, after printing the first batch memory use starts increasing until > it fails and stops the execution because of the memory error. When applying > a "simple" function instead of the NER model, such as *return > words.split()* on the UDF there's no such error so the data ingested > should not be what's causing the overload but the model. > > Is there a way to prevent the excessive RAM consumption? Why is there only > the driver executor and no other executors are generated? How could I > prevent it from collapsing when applying the NER model? > > Thanks in advance! > >