Re: How is union() implemented? Need to implement column bind

2022-04-18 Thread Sean Owen
A join is the natural answer, but this is a 10114-way join, which probably
chokes readily just to even plan it, let alone all the shuffling and
shuffling of huge data. You could tune your way out of it maybe, but not
optimistic. It's just huge.

You could go off-road and lower-level to take advantage of the structure of
the data. You effectively want "column bind". There is no such operation in
Spark. (union is 'row bind'.) You could do this with zipPartition, which is
in the RDD API, and to my surprise, not in the Python API but exists in
Scala. And R (!). If you can read several RDDs of data, you can use this
method to pair all their corresponding values and ultimately get rows of
10114 values out. In fact that is how sparklyr implements cbind on Spark,
FWIW: https://rdrr.io/cran/sparklyr/man/sdf_fast_bind_cols.html

The issue I see is that you can only zip a few at a time; you don't want to
zip 10114 of them. Perhaps you have to do that iteratively, and I don't
know if that is going to face the same issues with huge huge plans.

I like the pivot idea. If you can read the individual files as data rows
(maybe list all the file names, parallelize with Spark, write a UDF that
reads the data for that file to generate the rows). If you can emit (file,
index, value) and groupBy index, pivot on file (I think?) that should be
about it? I think it doesn't need additional hashing or whatever. Not sure
how fast it is but that seems more direct than the join, as well.

On Mon, Apr 18, 2022 at 8:27 PM Andrew Davidson 
wrote:

> Hi have a hard problem
>
>
>
> I have  10114 column vectors each in a separate file. The file has 2
> columns, the row id, and numeric values. The row ids are identical and in
> sort order. All the column vectors have the same number of rows. There are
> over 5 million rows.  I need to combine them into a single table. The row
> ids are very long strings. The column names are about 20 chars long.
>
>
>
> My current implementation uses join. This takes a long time on a cluster
> with 2 works totaling 192 vcpu and 2.8 tb of memory. It often crashes. I
> mean totally dead start over. Checkpoints do not seem  help, It still
> crashes and need to be restarted from scratch. What is really surprising
> is the final file size is only 213G ! The way got the file  was to copy
> all the column vectors to a single BIG IRON machine and used unix cut and
> paste. Took about 44 min to run once I got all the data moved around. It
> was very tedious and error prone. I had to move a lot data around. Not a
> particularly reproducible process. I will need to rerun this three more
> times on different data sets of about the same size
>
>
>
> I noticed that spark has a union function(). It implements row bind. Any
> idea how it is implemented? Is it just map reduce under the covers?
>
>
>
> My thought was
>
>1. load each col vector
>2. maybe I need to replace the really long row id strings with integers
>3. convert column vectors into row vectors using piviot (Ie matrix
>transpose.)
>4. union all the row vectors into a single table
>5. piviot the table back so I have the correct column vectors
>
>
>
> I could replace the row ids and column name with integers if needed, and
> restore them later
>
>
>
> Maybe I would be better off using many small machines? I assume memory is
> the limiting resource not cpu. I notice that memory usage will reach 100%.
> I added several TB’s of local ssd. I am not convinced that spark is using
> the local disk
>
>
>
>
>
> will this perform better than join?
>
>
>
>- The rows  before the final pivot will be very very wide (over 5
>million columns)
>- There will only be 10114 rows before the pivot
>
>
>
> I assume the pivots will shuffle all the data. I assume the Colum vectors
> are trivial. The file table pivot will be expensive however will only need
> to be done once
>
>
>
>
>
>
>
> Comments and suggestions appreciated
>
>
>
> Andy
>
>
>
>
>


How is union() implemented? Need to implement column bind

2022-04-18 Thread Andrew Davidson
Hi have a hard problem

I have  10114 column vectors each in a separate file. The file has 2 columns, 
the row id, and numeric values. The row ids are identical and in sort order. 
All the column vectors have the same number of rows. There are over 5 million 
rows.  I need to combine them into a single table. The row ids are very long 
strings. The column names are about 20 chars long.

My current implementation uses join. This takes a long time on a cluster with 2 
works totaling 192 vcpu and 2.8 tb of memory. It often crashes. I mean totally 
dead start over. Checkpoints do not seem  help, It still crashes and need to be 
restarted from scratch. What is really surprising is the final file size is 
only 213G ! The way got the file  was to copy all the column vectors to a 
single BIG IRON machine and used unix cut and paste. Took about 44 min to run 
once I got all the data moved around. It was very tedious and error prone. I 
had to move a lot data around. Not a particularly reproducible process. I will 
need to rerun this three more times on different data sets of about the same 
size

I noticed that spark has a union function(). It implements row bind. Any idea 
how it is implemented? Is it just map reduce under the covers?

My thought was

  1.  load each col vector
  2.  maybe I need to replace the really long row id strings with integers
  3.  convert column vectors into row vectors using piviot (Ie matrix 
transpose.)
  4.  union all the row vectors into a single table
  5.  piviot the table back so I have the correct column vectors


I could replace the row ids and column name with integers if needed, and 
restore them later

Maybe I would be better off using many small machines? I assume memory is the 
limiting resource not cpu. I notice that memory usage will reach 100%. I added 
several TB’s of local ssd. I am not convinced that spark is using the local disk


will this perform better than join?


  *   The rows  before the final pivot will be very very wide (over 5 million 
columns)
  *   There will only be 10114 rows before the pivot

I assume the pivots will shuffle all the data. I assume the Colum vectors are 
trivial. The file table pivot will be expensive however will only need to be 
done once



Comments and suggestions appreciated

Andy




Re: [Spark Streaming] [Debug] Memory error when using NER model in Python

2022-04-18 Thread Bjørn Jørgensen
When did SpaCy have support for Spark?

Try Spark NLP  it`s made for spark. They have
a lot of notebooks at https://github.com/JohnSnowLabs/spark-nlp and they
public user guides at
https://towardsdatascience.com/introduction-to-spark-nlp-foundations-and-basic-components-part-i-c83b7629ed59




man. 18. apr. 2022 kl. 16:17 skrev Sean Owen :

> It looks good, are you sure it even starts? the problem I see is that you
> send a copy of the model from the driver for every task. Try broadcasting
> the model instead. I'm not sure if that resolves it but would be a good
> practice.
>
> On Mon, Apr 18, 2022 at 9:10 AM Xavier Gervilla <
> xavier.gervi...@datapta.com> wrote:
>
>> Hi Team,
>> 
>>
>> I'm developing a project that retrieves tweets on a 'host' app, streams
>> them to Spark and with different operations with DataFrames obtains the
>> Sentiment of the tweets and their entities applying a Sentiment model and a
>> NER model respectively.
>>
>> The problem I've come across is that when applying the NER model, the RAM
>> consumption increases until the program stops with a memory error because
>> there's no memory left to execute. In addition, on SparkUI I've seen that
>> there's only one executor running, the executor driver, but using htop on
>> the terminal I see that the 8 cores of the instance are executing at 100%.
>>
>> The SparkSession is only configured to receive the tweets from the socket
>> that connects with the second program that sends the tweets. The DataFrame
>> goes through some processing to obtain other properties of the tweet like
>> its sentiment (which causes no error even with less than 8GB of RAM) and
>> then the NER is applied.
>>
>> *spark = SparkSession.builder.appName(**"TwitterStreamApp"**).getOrCreate()
>> rawTweets = spark.readStream.**format**(**"socket"**).option(**"host"**, 
>> **"localhost"**).option(**"port"**,**9008**).load()
>> tweets = rawTweets.selectExpr(**"CAST(value AS STRING)"**)
>>
>> **#prior processing of the tweets**
>> sentDF = other_processing(tweets)
>>
>> **#obtaining the column that contains the list of entities from a tweet**
>> nerDF = ner_classification(sentDF)*
>>
>>
>> This is the code of the functions related to obtaining the NER, the "main
>> call" and the UDF function.
>>
>> *nerModel = spacy.load(**"en_core_web_sm"**)
>>
>> **#main call, applies the UDF function to every tweet from the "tweet" 
>> column**def* *ner_classification**(**words**):
>> ner_list = udf(obtain_ner_udf, ArrayType(StringType()))
>> words = words.withColumn(**"nerlist"**, ner_list(**"tweet"**))
>> **return** words
>>
>> **#udf function**def* *obtain_ner_udf**(**words**):
>> **#if the tweet is empty return None*
>> *if** words == **""**:
>> **return* *None*
>> *#else: applying the NER model (Spacy en_core_web_sm)**
>> entities = nerModel(words)
>>
>> **#returns a list of the form ['entity1_label1', 'entity2_label2',...]*
>> *return** [ word.text + **'_'** + word.label_ **for** word **in** 
>> entities.ents ]*
>>
>>
>>
>> And lastly I map each entity with the sentiment from its tweet and obtain
>> the average sentiment of the entity and the number of appearances.
>>
>> *flattenedNER = nerDF.select(nerDF.sentiment, explode(nerDF.nerlist))
>> flattenedNER.registerTempTable(**"df"**)
>>
>>
>> querySelect = **"SELECT col as entity, avg(sentiment) as sentiment, 
>> count(col) as count FROM df GROUP BY col"**
>> finalDF = spark.sql(querySelect)
>>
>> query = 
>> finalDF.writeStream.foreachBatch(processBatch).outputMode(**"complete"**).start()*
>>
>>
>> The resulting DF is processed with a function that separates each column
>> in a list and prints it.
>>
>> *def* *processBatch**(**df**,* *epoch_id**):**entities* *=* 
>> *[**str**(**t**.**entity**)* *for* *t* *in* 
>> *df**.**select**(**"entity"**).**collect**()]*
>> *sentiments* *=* *[**float**(**t**.**sentiment**)* *for* *t* *in* 
>> *df**.**select**(**"sentiment"**).**collect**()]*
>> *counts* *=* *[**int**(**row**.**asDict**()[**'count'**])* *for* *row* 
>> *in* *df**.**select**(**"count"**).**collect**()]*
>>
>> *print(**entities**,* *sentiments**,* *counts**)*
>>
>>
>> At first I tried with other NER models from Flair they have the same
>> effect, after printing the first batch memory use starts increasing until
>> it fails and stops the execution because of the memory error. When applying
>> a "simple" function instead of the NER model, such as *return
>> words.split()* on the UDF there's no such error so the data ingested
>> should not be what's causing the overload but the model.
>>
>> Is there a way to prevent the excessive RAM consumption? Why is there
>> only the driver executor and no other executors are generated? How could I
>> prevent it from collapsing when applying the NER model?
>>
>> Thanks in advance!

Re: [Spark Streaming] [Debug] Memory error when using NER model in Python

2022-04-18 Thread Sean Owen
It looks good, are you sure it even starts? the problem I see is that you
send a copy of the model from the driver for every task. Try broadcasting
the model instead. I'm not sure if that resolves it but would be a good
practice.

On Mon, Apr 18, 2022 at 9:10 AM Xavier Gervilla 
wrote:

> Hi Team,
> 
>
> I'm developing a project that retrieves tweets on a 'host' app, streams
> them to Spark and with different operations with DataFrames obtains the
> Sentiment of the tweets and their entities applying a Sentiment model and a
> NER model respectively.
>
> The problem I've come across is that when applying the NER model, the RAM
> consumption increases until the program stops with a memory error because
> there's no memory left to execute. In addition, on SparkUI I've seen that
> there's only one executor running, the executor driver, but using htop on
> the terminal I see that the 8 cores of the instance are executing at 100%.
>
> The SparkSession is only configured to receive the tweets from the socket
> that connects with the second program that sends the tweets. The DataFrame
> goes through some processing to obtain other properties of the tweet like
> its sentiment (which causes no error even with less than 8GB of RAM) and
> then the NER is applied.
>
> *spark = SparkSession.builder.appName(**"TwitterStreamApp"**).getOrCreate()
> rawTweets = spark.readStream.**format**(**"socket"**).option(**"host"**, 
> **"localhost"**).option(**"port"**,**9008**).load()
> tweets = rawTweets.selectExpr(**"CAST(value AS STRING)"**)
>
> **#prior processing of the tweets**
> sentDF = other_processing(tweets)
>
> **#obtaining the column that contains the list of entities from a tweet**
> nerDF = ner_classification(sentDF)*
>
>
> This is the code of the functions related to obtaining the NER, the "main
> call" and the UDF function.
>
> *nerModel = spacy.load(**"en_core_web_sm"**)
>
> **#main call, applies the UDF function to every tweet from the "tweet" 
> column**def* *ner_classification**(**words**):
> ner_list = udf(obtain_ner_udf, ArrayType(StringType()))
> words = words.withColumn(**"nerlist"**, ner_list(**"tweet"**))
> **return** words
>
> **#udf function**def* *obtain_ner_udf**(**words**):
> **#if the tweet is empty return None*
> *if** words == **""**:
> **return* *None*
> *#else: applying the NER model (Spacy en_core_web_sm)**
> entities = nerModel(words)
>
> **#returns a list of the form ['entity1_label1', 'entity2_label2',...]*
> *return** [ word.text + **'_'** + word.label_ **for** word **in** 
> entities.ents ]*
>
>
>
> And lastly I map each entity with the sentiment from its tweet and obtain
> the average sentiment of the entity and the number of appearances.
>
> *flattenedNER = nerDF.select(nerDF.sentiment, explode(nerDF.nerlist))
> flattenedNER.registerTempTable(**"df"**)
>
>
> querySelect = **"SELECT col as entity, avg(sentiment) as sentiment, 
> count(col) as count FROM df GROUP BY col"**
> finalDF = spark.sql(querySelect)
>
> query = 
> finalDF.writeStream.foreachBatch(processBatch).outputMode(**"complete"**).start()*
>
>
> The resulting DF is processed with a function that separates each column
> in a list and prints it.
>
> *def* *processBatch**(**df**,* *epoch_id**):**entities* *=* 
> *[**str**(**t**.**entity**)* *for* *t* *in* 
> *df**.**select**(**"entity"**).**collect**()]*
> *sentiments* *=* *[**float**(**t**.**sentiment**)* *for* *t* *in* 
> *df**.**select**(**"sentiment"**).**collect**()]*
> *counts* *=* *[**int**(**row**.**asDict**()[**'count'**])* *for* *row* 
> *in* *df**.**select**(**"count"**).**collect**()]*
>
> *print(**entities**,* *sentiments**,* *counts**)*
>
>
> At first I tried with other NER models from Flair they have the same
> effect, after printing the first batch memory use starts increasing until
> it fails and stops the execution because of the memory error. When applying
> a "simple" function instead of the NER model, such as *return
> words.split()* on the UDF there's no such error so the data ingested
> should not be what's causing the overload but the model.
>
> Is there a way to prevent the excessive RAM consumption? Why is there only
> the driver executor and no other executors are generated? How could I
> prevent it from collapsing when applying the NER model?
>
> Thanks in advance!
>
>


[Spark Streaming] [Debug] Memory error when using NER model in Python

2022-04-18 Thread Xavier Gervilla
Hi 
Team,https://stackoverflow.com/questions/71841814/is-there-a-way-to-prevent-excessive-ram-consumption-with-the-spark-configuration



I'm developing a project that retrieves tweets on a 'host' app, streams them to 
Spark and 
with different operations with DataFrames obtains the Sentiment of the 
tweets and their entities applying a Sentiment model and a NER model 
respectively.



The problem I've come across is that when applying the NER model, the 
RAM consumption increases until the program stops with a memory error 
because there's no memory left to execute. In addition, on SparkUI I've seen 
that 
there's only one executor running, the executor driver, but using htop on the 
terminal I see that the 8 cores of the instance are executing at 100%.




The SparkSession is only configured to receive the tweets from the socket
 that connects with the second program that sends the tweets. The 
DataFrame goes through some processing to obtain other properties of the
 tweet like its sentiment (which causes no error even with less than 8GB
 of RAM) and then the NER is applied.

spark = SparkSession.builder.appName("TwitterStreamApp").getOrCreate()
rawTweets = spark.readStream.format("socket").option("host", 
"localhost").option("port",9008).load()
tweets = rawTweets.selectExpr("CAST(value AS STRING)")

#prior processing of the tweets
sentDF = other_processing(tweets)

#obtaining the column that contains the list of entities from a tweet
nerDF = ner_classification(sentDF)



This is the code of the functions related to obtaining the NER, the "main call" 
and the UDF function.

nerModel = spacy.load("en_core_web_sm")

#main call, applies the UDF function to every tweet from the "tweet" column
def ner_classification(words):
ner_list = udf(obtain_ner_udf, ArrayType(StringType()))
words = words.withColumn("nerlist", ner_list("tweet"))
return words

#udf function
def obtain_ner_udf(words):
#if the tweet is empty return None
if words == "":
return None
#else: applying the NER model (Spacy en_core_web_sm)
entities = nerModel(words)

#returns a list of the form ['entity1_label1', 'entity2_label2',...]
return [ word.text + '_' + word.label_ for word in entities.ents ]






And lastly I map each entity with the sentiment from its tweet and 
obtain the average sentiment of the entity and the number of 
appearances.



flattenedNER = nerDF.select(nerDF.sentiment, explode(nerDF.nerlist))
flattenedNER.registerTempTable("df")


querySelect = "SELECT col as entity, avg(sentiment) as sentiment, count(col) as 
count FROM df GROUP BY col"
finalDF = spark.sql(querySelect)

query = 
finalDF.writeStream.foreachBatch(processBatch).outputMode("complete").start()



The resulting DF is processed with a function that separates each column in a 
list and prints it.



def processBatch(df, epoch_id):
entities = [str(t.entity) for t in df.select("entity").collect()]

sentiments = [float(t.sentiment) for t in df.select("sentiment").collect()]

counts = [int(row.asDict()['count']) for row in 
df.select("count").collect()]


    print(entities, sentiments, counts) 





At first I tried with other NER models from Flair they have the same effect, 
after printing the first batch memory use starts increasing until it fails and 
stops the execution because of the memory error. When applying a "simple" 
function instead of the NER model, such as return words.split() on the UDF 
there's no such error so  the data ingested should not be what's causing the 
overload but the model.



Is there a way to prevent the excessive RAM consumption? Why is there only the 
driver executor and no other executors are generated? How could I prevent it 
from 
collapsing when applying the NER model?



Thanks in advance!

[Spark Web UI] Integrating Keycloak SSO

2022-04-18 Thread Solomon, Brad
As outlined at https://issues.apache.org/jira/browse/SPARK-38693 and 
https://stackoverflow.com/q/71667296/7954504, we are attempting to integrate 
Keycloak
 Single Sign On with the Spark Web UI.

However, Spark errors out with:

spark_1 | 22/03/29 18:43:24 INFO KeycloakDeployment: Loaded URLs from 
http://REDACTED/auth/realms/master/.well-known/openid-configuration
spark_1 | 22/03/29 18:43:24 WARN HttpChannel: /
spark_1 | java.lang.IllegalStateException: No SessionManager

This appears to be caused by Spark not using a Jetty SessionManager. (See the 
first link above for configuration of Spark.)

Are there any workarounds to make this integration succeed, and if so, what 
would they look like? Would it be possible to use a Jetty 
Adapter 
with Spark and if so, how?

Component: Spark UI, authentication
Scenario: How-to

This message (including any attachments) contains confidential information 
intended for a specific individual and purpose, and is protected by law. If you 
are not the intended recipient, you should delete this message and any 
disclosure, copying, or distribution of this message, or the taking of any 
action based on it, by you is strictly prohibited.

Deloitte refers to a Deloitte member firm, one of its related entities, or 
Deloitte Touche Tohmatsu Limited ("DTTL"). Each Deloitte member firm is a 
separate legal entity and a member of DTTL. DTTL does not provide services to 
clients. Please see www.deloitte.com/about to learn more.

v.E.1