Glad to hear that it works :)

Your dataframe is nested with both map, array and struct.

I`m using this function to flatten a nested dataframe to rows and columns.

from pyspark.sql.types import *
from pyspark.sql.functions import *


def flatten_test(df, sep="_"):
    """Returns a flattened dataframe.
        .. versionadded:: x.X.X

        Parameters
        ----------
        sep : str
            Delimiter for flatted columns. Default `_`

        Notes
        -----
        Don`t use `.` as `sep`
        It won't work on nested data frames with more than one level.
        And you will have to use `columns.name`.

        Flattening Map Types will have to find every key in the column.
        This can be slow.

        Examples
        --------

        data_mixed = [
            {
                "state": "Florida",
                "shortname": "FL",
                "info": {"governor": "Rick Scott"},
                "counties": [
                    {"name": "Dade", "population": 12345},
                    {"name": "Broward", "population": 40000},
                    {"name": "Palm Beach", "population": 60000},
                ],
            },
            {
                "state": "Ohio",
                "shortname": "OH",
                "info": {"governor": "John Kasich"},
                "counties": [
                    {"name": "Summit", "population": 1234},
                    {"name": "Cuyahoga", "population": 1337},
                ],
            },
        ]

        data_mixed = spark.createDataFrame(data=data_mixed)

        data_mixed.printSchema()

        root
        |-- counties: array (nullable = true)
        |    |-- element: map (containsNull = true)
        |    |    |-- key: string
        |    |    |-- value: string (valueContainsNull = true)
        |-- info: map (nullable = true)
        |    |-- key: string
        |    |-- value: string (valueContainsNull = true)
        |-- shortname: string (nullable = true)
        |-- state: string (nullable = true)


        data_mixed_flat = flatten_test(df, sep=":")
        data_mixed_flat.printSchema()
        root
        |-- shortname: string (nullable = true)
        |-- state: string (nullable = true)
        |-- counties:name: string (nullable = true)
        |-- counties:population: string (nullable = true)
        |-- info:governor: string (nullable = true)




        data = [
            {
                "id": 1,
                "name": "Cole Volk",
                "fitness": {"height": 130, "weight": 60},
            },
            {"name": "Mark Reg", "fitness": {"height": 130, "weight": 60}},
            {
                "id": 2,
                "name": "Faye Raker",
                "fitness": {"height": 130, "weight": 60},
            },
        ]


        df = spark.createDataFrame(data=data)

        df.printSchema()

        root
        |-- fitness: map (nullable = true)
        |    |-- key: string
        |    |-- value: long (valueContainsNull = true)
        |-- id: long (nullable = true)
        |-- name: string (nullable = true)

        df_flat = flatten_test(df, sep=":")

        df_flat.printSchema()

        root
        |-- id: long (nullable = true)
        |-- name: string (nullable = true)
        |-- fitness:height: long (nullable = true)
        |-- fitness:weight: long (nullable = true)

        data_struct = [
                (("James",None,"Smith"),"OH","M"),
                (("Anna","Rose",""),"NY","F"),
                (("Julia","","Williams"),"OH","F"),
                (("Maria","Anne","Jones"),"NY","M"),
                (("Jen","Mary","Brown"),"NY","M"),
                (("Mike","Mary","Williams"),"OH","M")
                ]


        schema = StructType([
            StructField('name', StructType([
                StructField('firstname', StringType(), True),
                StructField('middlename', StringType(), True),
                StructField('lastname', StringType(), True)
                ])),
            StructField('state', StringType(), True),
            StructField('gender', StringType(), True)
            ])

        df_struct = spark.createDataFrame(data = data_struct, schema =
schema)

        df_struct.printSchema()

        root
        |-- name: struct (nullable = true)
        |    |-- firstname: string (nullable = true)
        |    |-- middlename: string (nullable = true)
        |    |-- lastname: string (nullable = true)
        |-- state: string (nullable = true)
        |-- gender: string (nullable = true)

        df_struct_flat = flatten_test(df_struct, sep=":")

        df_struct_flat.printSchema()

        root
        |-- state: string (nullable = true)
        |-- gender: string (nullable = true)
        |-- name:firstname: string (nullable = true)
        |-- name:middlename: string (nullable = true)
        |-- name:lastname: string (nullable = true)
        """
    # compute Complex Fields (Arrays, Structs and Map Types) in Schema
    complex_fields = dict([(field.name, field.dataType)
                            for field in df.schema.fields
                            if type(field.dataType) == ArrayType
                            or type(field.dataType) == StructType
                            or type(field.dataType) == MapType])

    while len(complex_fields) !=0:
        col_name = list(complex_fields.keys())[0]
        #print ("Processing :"+col_name+" Type :
"+str(type(complex_fields[col_name])))

        # if StructType then convert all sub elements to columns.
        # i.e. flatten structs
        if (type(complex_fields[col_name]) == StructType):
            expanded = [col(col_name + '.' + k).alias(col_name + sep + k)
            for k in [n.name for n in complex_fields[col_name]]]
            df = df.select("*", *expanded).drop(col_name)

        # if ArrayType then add the Array Elements as Rows using the
explode function
        # i.e. explode Arrays
        elif (type(complex_fields[col_name]) == ArrayType):
            df = df.withColumn(col_name, explode_outer(col_name))

        # if MapType then convert all sub elements to columns.
        # i.e. flatten
        elif (type(complex_fields[col_name]) == MapType):
            keys_df =
df.select(explode_outer(map_keys(col(col_name)))).distinct()
            keys = list(map(lambda row: row[0], keys_df.collect()))
            key_cols = list(map(lambda f: col(col_name).getItem(f)
            .alias(str(col_name + sep + f)), keys))
            drop_column_list = [col_name]
            df = df.select([col_name for col_name in df.columns
            if col_name not in drop_column_list] + key_cols)

        # recompute remaining Complex Fields in Schema
        complex_fields = dict([(field.name, field.dataType)
                            for field in df.schema.fields
                            if type(field.dataType) == ArrayType
                            or type(field.dataType) == StructType
                            or type(field.dataType) == MapType])

    return df



Please use "answer to all" on your emails so that the user group can see
the emails. :)


ons. 20. apr. 2022 kl. 14:05 skrev Xavier Gervilla <
xavier.gervi...@datapta.com>:

> Changing this worked!
> *spark = sparknlp.start(spark32=True)*
>
> I'm adapting the rest of the code now, understanding the new schema and
> debugging and I've found this page
> <https://github.com/JohnSnowLabs/spark-nlp-workshop/blob/master/tutorials/Certification_Trainings/Public/3.SparkNLP_Pretrained_Models.ipynb>
> with many examples that has helped get a clearer idea of how to implement
> the pipeline.
>
> This is the schema resulting after applying the pipeline and selecting
> only the sentiment and the entities obtained:
>
> * |-- sentiment: array (nullable = true)*
>
> * |    |-- element: struct (containsNull = true)*
>
> * |    |    |-- annotatorType: string (nullable = true)*
>
> * |    |    |-- begin: integer (nullable = false)*
>
> * |    |    |-- end: integer (nullable = false)*
>
> * |    |    |-- result: string (nullable = true)*
>
> * |    |    |-- metadata: map (nullable = true)*
>
> * |    |    |    |-- key: string*
>
> * |    |    |    |-- value: string (valueContainsNull = true)*
>
> * |    |    |-- embeddings: array (nullable = true)*
>
> * |    |    |    |-- element: float (containsNull = false)*
>
> * |-- entities: array (nullable = true)*
>
> * |    |-- element: struct (containsNull = true)*
>
> * |    |    |-- annotatorType: string (nullable = true)*
>
> * |    |    |-- begin: integer (nullable = false)*
>
> * |    |    |-- end: integer (nullable = false)*
>
> * |    |    |-- result: string (nullable = true)*
>
> * |    |    |-- metadata: map (nullable = true)*
>
> * |    |    |    |-- key: string*
>
> * |    |    |    |-- value: string (valueContainsNull = true)*
>
> * |    |    |-- embeddings: array (nullable = true)*
> * |    |    |    |-- element: float (containsNull = false)*
>
> I have used explode on both to change from an array per row to a single
> sentiment per row, which works fine, but I'm having trouble handling
> null/None values, specifically both with result and metadata. On sentiment
> it's the equivalent of 'negative'/'positive'/'na' and {confidence -> value}
> for example whereas on entities it's 'entity predicted' and {entity ->
> label, sentence -> value, chunk -> value}, with label being MISC, ORG...
> I've tried using drop like this
> *df.drop(col('nerEx.result') == None)*
> but I still get null/None values.
>
> Thank you again for your help, this is more related to specific debugging
> but your suggestion of using SparkNLP was very useful.
>
>
>
> ---- Activado Tue, 19 Apr 2022 21:41:26 +0200 *Bjørn Jørgensen
> <bjornjorgen...@gmail.com <bjornjorgen...@gmail.com>>* escribió ----
>
> https://github.com/JohnSnowLabs/spark-nlp#packages-cheatsheet
>
> *change spark = sparknlp.start()*
> to
> spark = sparknlp.start(spark32=True)
>
>
> tir. 19. apr. 2022 kl. 21:10 skrev Bjørn Jørgensen <
> bjornjorgen...@gmail.com>:
>
> Yes, there are some that have that issue.
>
> Please open a new issue at
> https://github.com/JohnSnowLabs/spark-nlp/issues and they will help you.
>
>
>
>
> tir. 19. apr. 2022 kl. 20:33 skrev Xavier Gervilla <
> xavier.gervi...@datapta.com>:
>
>
> Thank you for your advice, I had small knowledge of Spark NLP and I
> thought it was only possible to use with models that required training and
> therefore my project wasn’t the case. I'm trying now to build the project
> again with SparkNLP but when I try to load a pretrained model from
> JohnSnowLabs I get an error (*py4j.protocol.Py4JJavaError: An error
> occurred while calling
> z:com.johnsnowlabs.nlp.pretrained.PythonResourceDownloader.getDownloadSize.*
> ).
>
> This is the new basic code to develop the project again:
>
>
> *spark = sparknlp.start()*
>
> *pipelineName = 'analyze_sentiment'*
>
>
> *pipeline = PretrainedPipeline(pipelineName, 'en') #this is the line that
> generates the error*
>
> *rawTweets = spark.readStream.format('socket').option('host',
> 'localhost').option('port',9008).load()*
>
> *allTweets = rawTweets.selectExpr('CAST(value AS
> STRING)').withColumnRenamed('value', 'text').dropDuplicates('text')*
>
>
> *sentPred = pipeline.transform(allTweets)*
>
> *query =
> sentPred.writeStream.outputMode('complete').format('console').start()*
> *query.awaitTermination()*
>
> Spark version is 3.2.1 and SparkNLP version is 3.4.3, while Java version
> is 8. I've tried with a different model but the error is still the same, so
> what could be causing it?
>
> If this error is solved I think SparkNLP will be the solution I was
> looking for to reduce memory consumption so thank you again for suggesting
> it.
>
>
>
> El 18 abr 2022, a las 21:07, Bjørn Jørgensen <bjornjorgen...@gmail.com>
> escribió:
>
> When did SpaCy have support for Spark?
>
> Try Spark NLP <https://nlp.johnsnowlabs.com/> it`s made for spark. They
> have a lot of notebooks at https://github.com/JohnSnowLabs/spark-nlp and
> they public user guides at
> https://towardsdatascience.com/introduction-to-spark-nlp-foundations-and-basic-components-part-i-c83b7629ed59
>
>
>
>
> man. 18. apr. 2022 kl. 16:17 skrev Sean Owen <sro...@gmail.com>:
>
> It looks good, are you sure it even starts? the problem I see is that you
> send a copy of the model from the driver for every task. Try broadcasting
> the model instead. I'm not sure if that resolves it but would be a good
> practice.
>
> On Mon, Apr 18, 2022 at 9:10 AM Xavier Gervilla <
> xavier.gervi...@datapta.com> wrote:
>
>
> Hi Team,
> <https://stackoverflow.com/questions/71841814/is-there-a-way-to-prevent-excessive-ram-consumption-with-the-spark-configuration>
>
> I'm developing a project that retrieves tweets on a 'host' app, streams
> them to Spark and with different operations with DataFrames obtains the
> Sentiment of the tweets and their entities applying a Sentiment model and a
> NER model respectively.
>
> The problem I've come across is that when applying the NER model, the RAM
> consumption increases until the program stops with a memory error because
> there's no memory left to execute. In addition, on SparkUI I've seen that
> there's only one executor running, the executor driver, but using htop on
> the terminal I see that the 8 cores of the instance are executing at 100%.
>
> The SparkSession is only configured to receive the tweets from the socket
> that connects with the second program that sends the tweets. The DataFrame
> goes through some processing to obtain other properties of the tweet like
> its sentiment (which causes no error even with less than 8GB of RAM) and
> then the NER is applied.
>
> *spark = SparkSession.builder.appName(**"TwitterStreamApp"**).getOrCreate()
> rawTweets = spark.readStream.**format**(**"socket"**).option(**"host"**, 
> **"localhost"**).option(**"port"**,**9008**).load()
> tweets = rawTweets.selectExpr(**"CAST(value AS STRING)"**)
>
> **#prior processing of the tweets**
> sentDF = other_processing(tweets)
>
> **#obtaining the column that contains the list of entities from a tweet**
> nerDF = ner_classification(sentDF)*
>
>
> This is the code of the functions related to obtaining the NER, the "main
> call" and the UDF function.
>
> *nerModel = spacy.load(**"en_core_web_sm"**)
>
> **#main call, applies the UDF function to every tweet from the "tweet" 
> column**def* *ner_classification**(**words**):
>     ner_list = udf(obtain_ner_udf, ArrayType(StringType()))
>     words = words.withColumn(**"nerlist"**, ner_list(**"tweet"**))
>     **return** words
>
> **#udf function**def* *obtain_ner_udf**(**words**):
>     **#if the tweet is empty return None*
>     *if** words == **""**:
>         **return* *None*
>     *#else: applying the NER model (Spacy en_core_web_sm)**
>     entities = nerModel(words)
>
>     **#returns a list of the form ['entity1_label1', 'entity2_label2',...]*
>     *return** [ word.text + **'_'** + word.label_ **for** word **in** 
> entities.ents ]*
>
>
>
> And lastly I map each entity with the sentiment from its tweet and obtain
> the average sentiment of the entity and the number of appearances.
>
> *flattenedNER = nerDF.select(nerDF.sentiment, explode(nerDF.nerlist))
> flattenedNER.registerTempTable(**"df"**)
>
>
> querySelect = **"SELECT col as entity, avg(sentiment) as sentiment, 
> count(col) as count FROM df GROUP BY col"**
> finalDF = spark.sql(querySelect)
>
> query = 
> finalDF.writeStream.foreachBatch(processBatch).outputMode(**"complete"**).start()*
>
>
> The resulting DF is processed with a function that separates each column
> in a list and prints it.
>
> *def* *processBatch(df,* *epoch_id):*    *entities* *=* *[str(t.entity)* 
> *for* *t* *in* *df.select("entity").collect()]*
>     *sentiments* *=* *[float(t.sentiment)* *for* *t* *in* 
> *df.select("sentiment").collect()]*
>     *counts* *=* *[int(row.asDict()['count'])* *for* *row* *in* 
> *df.select("count").collect()]*
>
> *    print(**entities,* *sentiments,* *counts)*
>
>
> At first I tried with other NER models from Flair they have the same
> effect, after printing the first batch memory use starts increasing until
> it fails and stops the execution because of the memory error. When applying
> a "simple" function instead of the NER model, such as *return
> words.split()* on the UDF there's no such error so the data ingested
> should not be what's causing the overload but the model.
>
> Is there a way to prevent the excessive RAM consumption? Why is there only
> the driver executor and no other executors are generated? How could I
> prevent it from collapsing when applying the NER model?
>
> Thanks in advance!
>
>
>
> --
> Bjørn Jørgensen
> Vestre Aspehaug 4, 6010 Ålesund
> Norge
>
> +47 480 94 297
>
>
>
>
>
>
>
>
>
> --
> Bjørn Jørgensen
> Vestre Aspehaug 4, 6010 Ålesund
> Norge
>
> +47 480 94 297
>
>
>
> --
> Bjørn Jørgensen
> Vestre Aspehaug 4, 6010 Ålesund
> Norge
>
> +47 480 94 297
>
>
>
>
>
>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297

Reply via email to