Hi,

 

    I have a spark streaming task that basically does the following,

 

1.      Read a batch using a custom receiver
2.      Parse and apply transforms to the batch
3.      Convert the raw fields to a bunch of features
4.      Use a pre-built model to predict the class of each record in the
batch
5.      Output the result to a DB

 

Everything was working fine and my streaming pipeline was pretty stable.

 

I realized that the results were wrong as some of the features in the
records required cumulative data, 

for example "Ratio of Dest IP" to "Total number of IPs", for a given "source
IP".Now these features are not correct when I am dealing with a batch cause
the batch only has a micro view of the entire dataset.So I changed the code
and inserted another step 

 

3a). This step will accumulate the data for a given "Source IP" over
multiple batches.so far so good.

 

To achieve this I used a dataframe which is a "var" instead of a "val", and
as new batches came in, I extract the "SIP" based data and union it with the
existing dataframe and also do a bit of filtering as I do not want my data
to keep on increasing in size over time (keep only say 30 mins worth of
data). 

 

Now when I test the system I see that the "processing time" for each batch
keeps on continuously increasing, I understand it to go up until the 30 min
mark but at that point as data gets filtered based on time, the size of the
SIP rdd (DF) is almost constant but the processing time is increasing. This
leads to my streaming pipleline to eventually become unstable and the app
dies of OOM. (The receiver executor gets bloated and dies).

 

I have tested this for almost a week now and this line,

srcHostsDF.filter(srcHostsDF("last_updated_time") > ejectTime)
              .union(batInterDF)

              .persist()

Where "batInterDF" is the received batch and 'srcHostDF', is the Dataframe
that I keep data across batches. 

 

Shows up in the "spark  ui" as increasing over time. The size of
"srcHostsDF" is fairly constant, if so why should the time taken by persist
go up.

 

The other two calls that showup as increasing in time are srcHostsDF.count()
and srcHostDF.rdd Why should this be the case ?

 

Any clues on what is happening ??

 

I replaced the "persist" with a "repartition" and I still get the similar
results. The below image shows the "executor memory" growth, the app start a
lil after 18:20, the neon green line is the "receiver executor", the others
are the "process executors". There a 5 executors and 1 driver in all.  One
tick before 19:00 is where the size of the 'srcHostDF' stabilises.

 

 



 

Regards

-Ravi Gurram

 

 

 

Reply via email to