[window aggregate][debug] Rows not dropping with watermark and window
Hi team, With your help last week I was able to adapt a project I'm developing and apply a sentiment analysis and NER retrieval to streaming tweets. One of the next steps in order to ensure that memory doesn't collapse is applying windows and watermarks to discard tweets after some time. However, when checking the metric ''Aggregated Number Of Rows Dropped By Watermark'' from SparkUI it's always 0. This is the updated code I use to apply the Sentiment and NER prediction and to add the timestamp value: sentPipeline = PretrainedPipeline('analyze_sentiment') nerPipeline = PretrainedPipeline('recognize_entities_dl') sentPred = sentPipeline.transform(tweets) nerPred = nerPipeline.transform(sentPred) tsCol = nerPred.withColumn('timestamp', current_timestamp()) After applying some transformations I generate two columns with the entity (entLab) and its sentiment (sentNum) and apply the watermark before doing the query: finalDF = resultDF.withWatermark("timestamp", "10 minutes").\ groupBy("entLab", window("timestamp", "5 minutes", "2 minutes")).\ agg(avg("sentNum").alias("avgSent"), count("sentNum").alias("countEnt")).\ select("entLab", "avgSent", "countEnt") query = finalDF.writeStream.queryName('treemapResult').\ foreachBatch(processBatch).outputMode("update").\ option("checkpointLocation", "/tmp/checkpoints").start() Each processBatch generates a plot with the selected values. When I execute the program it's mostly maintained at 7GB of RAM but increases really slowly and as mentioned above when checking SparkUI the number of rows dropped is zero. I've tried changing the output to append (since using complete would be the opposite of the goal) but the result is very similar. Is there any problem with the declaration of the watermark? And how could I change it to generate a plot after every window is finished? Right now it generates around 80-90 seconds instead of the two minutes there are between sliding windows. Thank you in advance!
Re: [Spark Streaming] [Debug] Memory error when using NER model in Python
"Julia","","Williams"),"OH","F"), (("Maria","Anne","Jones"),"NY","M"), (("Jen","Mary","Brown"),"NY","M"), (("Mike","Mary","Williams"),"OH","M") ] schema = StructType([ StructField('name', StructType([ StructField('firstname', StringType(), True), StructField('middlename', StringType(), True), StructField('lastname', StringType(), True) ])), StructField('state', StringType(), True), StructField('gender', StringType(), True) ]) df_struct = spark.createDataFrame(data = data_struct, schema = schema) df_struct.printSchema() root |-- name: struct (nullable = true) | |-- firstname: string (nullable = true) | |-- middlename: string (nullable = true) | |-- lastname: string (nullable = true) |-- state: string (nullable = true) |-- gender: string (nullable = true) df_struct_flat = flatten_test(df_struct, sep=":") df_struct_flat.printSchema() root |-- state: string (nullable = true) |-- gender: string (nullable = true) |-- name:firstname: string (nullable = true) |-- name:middlename: string (nullable = true) |-- name:lastname: string (nullable = true) """ # compute Complex Fields (Arrays, Structs and Map Types) in Schema complex_fields = dict([(http://field.name, field.dataType) for field in df.schema.fields if type(field.dataType) == ArrayType or type(field.dataType) == StructType or type(field.dataType) == MapType]) while len(complex_fields) !=0: col_name = list(complex_fields.keys())[0] #print ("Processing :"+col_name+" Type : "+str(type(complex_fields[col_name]))) # if StructType then convert all sub elements to columns. # i.e. flatten structs if (type(complex_fields[col_name]) == StructType): expanded = [col(col_name + '.' + k).alias(col_name + sep + k) for k in [http://n.name for n in complex_fields[col_name]]] df = df.select("*", *expanded).drop(col_name) # if ArrayType then add the Array Elements as Rows using the explode function # i.e. explode Arrays elif (type(complex_fields[col_name]) == ArrayType): df = df.withColumn(col_name, explode_outer(col_name)) # if MapType then convert all sub elements to columns. # i.e. flatten elif (type(complex_fields[col_name]) == MapType): keys_df = df.select(explode_outer(map_keys(col(col_name.distinct() keys = list(map(lambda row: row[0], keys_df.collect())) key_cols = list(map(lambda f: col(col_name).getItem(f) .alias(str(col_name + sep + f)), keys)) drop_column_list = [col_name] df = df.select([col_name for col_name in df.columns if col_name not in drop_column_list] + key_cols) # recompute remaining Complex Fields in Schema complex_fields = dict([(http://field.name, field.dataType) for field in df.schema.fields if type(field.dataType) == ArrayType or type(field.dataType) == StructType or type(field.dataType) == MapType]) return df Please use "answer to all" on your emails so that the user group can see the emails. :) ons. 20. apr. 2022 kl. 14:05 skrev Xavier Gervilla <mailto:xavier.gervi...@datapta.com>: Changing this worked! spark = sparknlp.start(spark32=True) I'm adapting the rest of the code now, understanding the new schema and debugging and I've found this https://github.com/JohnSnowLabs/spark-nlp-workshop/blob/master/tutorials/Certification_Trainings/Public/3.SparkNLP_Pretrained_Models.ipynb with many examples that has helped get a clearer idea of how to implement the pipeline. This is the schema resulting after applying the pipeline and selecting only the sentiment and the entities obtained: |-- sentiment: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- annotatorType: string (nullable = true) | | |-- begin: integer (nullable = false) | | |-- end: integer (nullable = false) | | |-- result: string (nullable = true) | | |-- metadata: map (nullable = true) | | | |-- key: string | | | |-- value: string (val
[Spark Streaming] [Debug] Memory error when using NER model in Python
Hi Team,https://stackoverflow.com/questions/71841814/is-there-a-way-to-prevent-excessive-ram-consumption-with-the-spark-configuration I'm developing a project that retrieves tweets on a 'host' app, streams them to Spark and with different operations with DataFrames obtains the Sentiment of the tweets and their entities applying a Sentiment model and a NER model respectively. The problem I've come across is that when applying the NER model, the RAM consumption increases until the program stops with a memory error because there's no memory left to execute. In addition, on SparkUI I've seen that there's only one executor running, the executor driver, but using htop on the terminal I see that the 8 cores of the instance are executing at 100%. The SparkSession is only configured to receive the tweets from the socket that connects with the second program that sends the tweets. The DataFrame goes through some processing to obtain other properties of the tweet like its sentiment (which causes no error even with less than 8GB of RAM) and then the NER is applied. spark = SparkSession.builder.appName("TwitterStreamApp").getOrCreate() rawTweets = spark.readStream.format("socket").option("host", "localhost").option("port",9008).load() tweets = rawTweets.selectExpr("CAST(value AS STRING)") #prior processing of the tweets sentDF = other_processing(tweets) #obtaining the column that contains the list of entities from a tweet nerDF = ner_classification(sentDF) This is the code of the functions related to obtaining the NER, the "main call" and the UDF function. nerModel = spacy.load("en_core_web_sm") #main call, applies the UDF function to every tweet from the "tweet" column def ner_classification(words): ner_list = udf(obtain_ner_udf, ArrayType(StringType())) words = words.withColumn("nerlist", ner_list("tweet")) return words #udf function def obtain_ner_udf(words): #if the tweet is empty return None if words == "": return None #else: applying the NER model (Spacy en_core_web_sm) entities = nerModel(words) #returns a list of the form ['entity1_label1', 'entity2_label2',...] return [ word.text + '_' + word.label_ for word in entities.ents ] And lastly I map each entity with the sentiment from its tweet and obtain the average sentiment of the entity and the number of appearances. flattenedNER = nerDF.select(nerDF.sentiment, explode(nerDF.nerlist)) flattenedNER.registerTempTable("df") querySelect = "SELECT col as entity, avg(sentiment) as sentiment, count(col) as count FROM df GROUP BY col" finalDF = spark.sql(querySelect) query = finalDF.writeStream.foreachBatch(processBatch).outputMode("complete").start() The resulting DF is processed with a function that separates each column in a list and prints it. def processBatch(df, epoch_id): entities = [str(t.entity) for t in df.select("entity").collect()] sentiments = [float(t.sentiment) for t in df.select("sentiment").collect()] counts = [int(row.asDict()['count']) for row in df.select("count").collect()] print(entities, sentiments, counts) At first I tried with other NER models from Flair they have the same effect, after printing the first batch memory use starts increasing until it fails and stops the execution because of the memory error. When applying a "simple" function instead of the NER model, such as return words.split() on the UDF there's no such error so the data ingested should not be what's causing the overload but the model. Is there a way to prevent the excessive RAM consumption? Why is there only the driver executor and no other executors are generated? How could I prevent it from collapsing when applying the NER model? Thanks in advance!