Glad to hear that it works :) Your dataframe is nested with both map, array and struct.
I`m using this function to flatten a nested dataframe to rows and columns. from pyspark.sql.types import * from pyspark.sql.functions import * def flatten_test(df, sep="_"): """Returns a flattened dataframe. .. versionadded:: x.X.X Parameters ---------- sep : str Delimiter for flatted columns. Default `_` Notes ----- Don`t use `.` as `sep` It won't work on nested data frames with more than one level. And you will have to use `columns.name`. Flattening Map Types will have to find every key in the column. This can be slow. Examples -------- data_mixed = [ { "state": "Florida", "shortname": "FL", "info": {"governor": "Rick Scott"}, "counties": [ {"name": "Dade", "population": 12345}, {"name": "Broward", "population": 40000}, {"name": "Palm Beach", "population": 60000}, ], }, { "state": "Ohio", "shortname": "OH", "info": {"governor": "John Kasich"}, "counties": [ {"name": "Summit", "population": 1234}, {"name": "Cuyahoga", "population": 1337}, ], }, ] data_mixed = spark.createDataFrame(data=data_mixed) data_mixed.printSchema() root |-- counties: array (nullable = true) | |-- element: map (containsNull = true) | | |-- key: string | | |-- value: string (valueContainsNull = true) |-- info: map (nullable = true) | |-- key: string | |-- value: string (valueContainsNull = true) |-- shortname: string (nullable = true) |-- state: string (nullable = true) data_mixed_flat = flatten_test(df, sep=":") data_mixed_flat.printSchema() root |-- shortname: string (nullable = true) |-- state: string (nullable = true) |-- counties:name: string (nullable = true) |-- counties:population: string (nullable = true) |-- info:governor: string (nullable = true) data = [ { "id": 1, "name": "Cole Volk", "fitness": {"height": 130, "weight": 60}, }, {"name": "Mark Reg", "fitness": {"height": 130, "weight": 60}}, { "id": 2, "name": "Faye Raker", "fitness": {"height": 130, "weight": 60}, }, ] df = spark.createDataFrame(data=data) df.printSchema() root |-- fitness: map (nullable = true) | |-- key: string | |-- value: long (valueContainsNull = true) |-- id: long (nullable = true) |-- name: string (nullable = true) df_flat = flatten_test(df, sep=":") df_flat.printSchema() root |-- id: long (nullable = true) |-- name: string (nullable = true) |-- fitness:height: long (nullable = true) |-- fitness:weight: long (nullable = true) data_struct = [ (("James",None,"Smith"),"OH","M"), (("Anna","Rose",""),"NY","F"), (("Julia","","Williams"),"OH","F"), (("Maria","Anne","Jones"),"NY","M"), (("Jen","Mary","Brown"),"NY","M"), (("Mike","Mary","Williams"),"OH","M") ] schema = StructType([ StructField('name', StructType([ StructField('firstname', StringType(), True), StructField('middlename', StringType(), True), StructField('lastname', StringType(), True) ])), StructField('state', StringType(), True), StructField('gender', StringType(), True) ]) df_struct = spark.createDataFrame(data = data_struct, schema = schema) df_struct.printSchema() root |-- name: struct (nullable = true) | |-- firstname: string (nullable = true) | |-- middlename: string (nullable = true) | |-- lastname: string (nullable = true) |-- state: string (nullable = true) |-- gender: string (nullable = true) df_struct_flat = flatten_test(df_struct, sep=":") df_struct_flat.printSchema() root |-- state: string (nullable = true) |-- gender: string (nullable = true) |-- name:firstname: string (nullable = true) |-- name:middlename: string (nullable = true) |-- name:lastname: string (nullable = true) """ # compute Complex Fields (Arrays, Structs and Map Types) in Schema complex_fields = dict([(field.name, field.dataType) for field in df.schema.fields if type(field.dataType) == ArrayType or type(field.dataType) == StructType or type(field.dataType) == MapType]) while len(complex_fields) !=0: col_name = list(complex_fields.keys())[0] #print ("Processing :"+col_name+" Type : "+str(type(complex_fields[col_name]))) # if StructType then convert all sub elements to columns. # i.e. flatten structs if (type(complex_fields[col_name]) == StructType): expanded = [col(col_name + '.' + k).alias(col_name + sep + k) for k in [n.name for n in complex_fields[col_name]]] df = df.select("*", *expanded).drop(col_name) # if ArrayType then add the Array Elements as Rows using the explode function # i.e. explode Arrays elif (type(complex_fields[col_name]) == ArrayType): df = df.withColumn(col_name, explode_outer(col_name)) # if MapType then convert all sub elements to columns. # i.e. flatten elif (type(complex_fields[col_name]) == MapType): keys_df = df.select(explode_outer(map_keys(col(col_name)))).distinct() keys = list(map(lambda row: row[0], keys_df.collect())) key_cols = list(map(lambda f: col(col_name).getItem(f) .alias(str(col_name + sep + f)), keys)) drop_column_list = [col_name] df = df.select([col_name for col_name in df.columns if col_name not in drop_column_list] + key_cols) # recompute remaining Complex Fields in Schema complex_fields = dict([(field.name, field.dataType) for field in df.schema.fields if type(field.dataType) == ArrayType or type(field.dataType) == StructType or type(field.dataType) == MapType]) return df Please use "answer to all" on your emails so that the user group can see the emails. :) ons. 20. apr. 2022 kl. 14:05 skrev Xavier Gervilla < xavier.gervi...@datapta.com>: > Changing this worked! > *spark = sparknlp.start(spark32=True)* > > I'm adapting the rest of the code now, understanding the new schema and > debugging and I've found this page > <https://github.com/JohnSnowLabs/spark-nlp-workshop/blob/master/tutorials/Certification_Trainings/Public/3.SparkNLP_Pretrained_Models.ipynb> > with many examples that has helped get a clearer idea of how to implement > the pipeline. > > This is the schema resulting after applying the pipeline and selecting > only the sentiment and the entities obtained: > > * |-- sentiment: array (nullable = true)* > > * | |-- element: struct (containsNull = true)* > > * | | |-- annotatorType: string (nullable = true)* > > * | | |-- begin: integer (nullable = false)* > > * | | |-- end: integer (nullable = false)* > > * | | |-- result: string (nullable = true)* > > * | | |-- metadata: map (nullable = true)* > > * | | | |-- key: string* > > * | | | |-- value: string (valueContainsNull = true)* > > * | | |-- embeddings: array (nullable = true)* > > * | | | |-- element: float (containsNull = false)* > > * |-- entities: array (nullable = true)* > > * | |-- element: struct (containsNull = true)* > > * | | |-- annotatorType: string (nullable = true)* > > * | | |-- begin: integer (nullable = false)* > > * | | |-- end: integer (nullable = false)* > > * | | |-- result: string (nullable = true)* > > * | | |-- metadata: map (nullable = true)* > > * | | | |-- key: string* > > * | | | |-- value: string (valueContainsNull = true)* > > * | | |-- embeddings: array (nullable = true)* > * | | | |-- element: float (containsNull = false)* > > I have used explode on both to change from an array per row to a single > sentiment per row, which works fine, but I'm having trouble handling > null/None values, specifically both with result and metadata. On sentiment > it's the equivalent of 'negative'/'positive'/'na' and {confidence -> value} > for example whereas on entities it's 'entity predicted' and {entity -> > label, sentence -> value, chunk -> value}, with label being MISC, ORG... > I've tried using drop like this > *df.drop(col('nerEx.result') == None)* > but I still get null/None values. > > Thank you again for your help, this is more related to specific debugging > but your suggestion of using SparkNLP was very useful. > > > > ---- Activado Tue, 19 Apr 2022 21:41:26 +0200 *Bjørn Jørgensen > <bjornjorgen...@gmail.com <bjornjorgen...@gmail.com>>* escribió ---- > > https://github.com/JohnSnowLabs/spark-nlp#packages-cheatsheet > > *change spark = sparknlp.start()* > to > spark = sparknlp.start(spark32=True) > > > tir. 19. apr. 2022 kl. 21:10 skrev Bjørn Jørgensen < > bjornjorgen...@gmail.com>: > > Yes, there are some that have that issue. > > Please open a new issue at > https://github.com/JohnSnowLabs/spark-nlp/issues and they will help you. > > > > > tir. 19. apr. 2022 kl. 20:33 skrev Xavier Gervilla < > xavier.gervi...@datapta.com>: > > > Thank you for your advice, I had small knowledge of Spark NLP and I > thought it was only possible to use with models that required training and > therefore my project wasn’t the case. I'm trying now to build the project > again with SparkNLP but when I try to load a pretrained model from > JohnSnowLabs I get an error (*py4j.protocol.Py4JJavaError: An error > occurred while calling > z:com.johnsnowlabs.nlp.pretrained.PythonResourceDownloader.getDownloadSize.* > ). > > This is the new basic code to develop the project again: > > > *spark = sparknlp.start()* > > *pipelineName = 'analyze_sentiment'* > > > *pipeline = PretrainedPipeline(pipelineName, 'en') #this is the line that > generates the error* > > *rawTweets = spark.readStream.format('socket').option('host', > 'localhost').option('port',9008).load()* > > *allTweets = rawTweets.selectExpr('CAST(value AS > STRING)').withColumnRenamed('value', 'text').dropDuplicates('text')* > > > *sentPred = pipeline.transform(allTweets)* > > *query = > sentPred.writeStream.outputMode('complete').format('console').start()* > *query.awaitTermination()* > > Spark version is 3.2.1 and SparkNLP version is 3.4.3, while Java version > is 8. I've tried with a different model but the error is still the same, so > what could be causing it? > > If this error is solved I think SparkNLP will be the solution I was > looking for to reduce memory consumption so thank you again for suggesting > it. > > > > El 18 abr 2022, a las 21:07, Bjørn Jørgensen <bjornjorgen...@gmail.com> > escribió: > > When did SpaCy have support for Spark? > > Try Spark NLP <https://nlp.johnsnowlabs.com/> it`s made for spark. They > have a lot of notebooks at https://github.com/JohnSnowLabs/spark-nlp and > they public user guides at > https://towardsdatascience.com/introduction-to-spark-nlp-foundations-and-basic-components-part-i-c83b7629ed59 > > > > > man. 18. apr. 2022 kl. 16:17 skrev Sean Owen <sro...@gmail.com>: > > It looks good, are you sure it even starts? the problem I see is that you > send a copy of the model from the driver for every task. Try broadcasting > the model instead. I'm not sure if that resolves it but would be a good > practice. > > On Mon, Apr 18, 2022 at 9:10 AM Xavier Gervilla < > xavier.gervi...@datapta.com> wrote: > > > Hi Team, > <https://stackoverflow.com/questions/71841814/is-there-a-way-to-prevent-excessive-ram-consumption-with-the-spark-configuration> > > I'm developing a project that retrieves tweets on a 'host' app, streams > them to Spark and with different operations with DataFrames obtains the > Sentiment of the tweets and their entities applying a Sentiment model and a > NER model respectively. > > The problem I've come across is that when applying the NER model, the RAM > consumption increases until the program stops with a memory error because > there's no memory left to execute. In addition, on SparkUI I've seen that > there's only one executor running, the executor driver, but using htop on > the terminal I see that the 8 cores of the instance are executing at 100%. > > The SparkSession is only configured to receive the tweets from the socket > that connects with the second program that sends the tweets. The DataFrame > goes through some processing to obtain other properties of the tweet like > its sentiment (which causes no error even with less than 8GB of RAM) and > then the NER is applied. > > *spark = SparkSession.builder.appName(**"TwitterStreamApp"**).getOrCreate() > rawTweets = spark.readStream.**format**(**"socket"**).option(**"host"**, > **"localhost"**).option(**"port"**,**9008**).load() > tweets = rawTweets.selectExpr(**"CAST(value AS STRING)"**) > > **#prior processing of the tweets** > sentDF = other_processing(tweets) > > **#obtaining the column that contains the list of entities from a tweet** > nerDF = ner_classification(sentDF)* > > > This is the code of the functions related to obtaining the NER, the "main > call" and the UDF function. > > *nerModel = spacy.load(**"en_core_web_sm"**) > > **#main call, applies the UDF function to every tweet from the "tweet" > column**def* *ner_classification**(**words**): > ner_list = udf(obtain_ner_udf, ArrayType(StringType())) > words = words.withColumn(**"nerlist"**, ner_list(**"tweet"**)) > **return** words > > **#udf function**def* *obtain_ner_udf**(**words**): > **#if the tweet is empty return None* > *if** words == **""**: > **return* *None* > *#else: applying the NER model (Spacy en_core_web_sm)** > entities = nerModel(words) > > **#returns a list of the form ['entity1_label1', 'entity2_label2',...]* > *return** [ word.text + **'_'** + word.label_ **for** word **in** > entities.ents ]* > > > > And lastly I map each entity with the sentiment from its tweet and obtain > the average sentiment of the entity and the number of appearances. > > *flattenedNER = nerDF.select(nerDF.sentiment, explode(nerDF.nerlist)) > flattenedNER.registerTempTable(**"df"**) > > > querySelect = **"SELECT col as entity, avg(sentiment) as sentiment, > count(col) as count FROM df GROUP BY col"** > finalDF = spark.sql(querySelect) > > query = > finalDF.writeStream.foreachBatch(processBatch).outputMode(**"complete"**).start()* > > > The resulting DF is processed with a function that separates each column > in a list and prints it. > > *def* *processBatch(df,* *epoch_id):* *entities* *=* *[str(t.entity)* > *for* *t* *in* *df.select("entity").collect()]* > *sentiments* *=* *[float(t.sentiment)* *for* *t* *in* > *df.select("sentiment").collect()]* > *counts* *=* *[int(row.asDict()['count'])* *for* *row* *in* > *df.select("count").collect()]* > > * print(**entities,* *sentiments,* *counts)* > > > At first I tried with other NER models from Flair they have the same > effect, after printing the first batch memory use starts increasing until > it fails and stops the execution because of the memory error. When applying > a "simple" function instead of the NER model, such as *return > words.split()* on the UDF there's no such error so the data ingested > should not be what's causing the overload but the model. > > Is there a way to prevent the excessive RAM consumption? Why is there only > the driver executor and no other executors are generated? How could I > prevent it from collapsing when applying the NER model? > > Thanks in advance! > > > > -- > Bjørn Jørgensen > Vestre Aspehaug 4, 6010 Ålesund > Norge > > +47 480 94 297 > > > > > > > > > > -- > Bjørn Jørgensen > Vestre Aspehaug 4, 6010 Ålesund > Norge > > +47 480 94 297 > > > > -- > Bjørn Jørgensen > Vestre Aspehaug 4, 6010 Ålesund > Norge > > +47 480 94 297 > > > > > > -- Bjørn Jørgensen Vestre Aspehaug 4, 6010 Ålesund Norge +47 480 94 297