Incremental model update
Hello - I have a question on how to handle incremental model updation in Spark .. We have a time series where we predict the future conditioned on the past. We can train a model offline based on historical data and then use that model during prediction. But say, if the underlying process is non-stationary, the probability distribution changes with time. In such cases we need to update the model so as to reflect the current change in distribution. We have 2 options - a. retrain periodically b. update the model incrementally a. is expensive. What about b. ? I think in Spark we have StreamingKMeans that takes care of this incremental model update for this classifier. Is this true ? But what about other classifiers that don't have these Streaming counterparts ? How do we handle such incremental model changes with those classifiers where the underlying distribution changes ? regards. -- Debasish Ghosh http://manning.com/ghosh2 http://manning.com/ghosh Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: http://github.com/debasishg
outlier detection using StreamingKMeans
Hello - I am trying to implement an outlier detection application on streaming data. I am a newbie to Spark and hence would like some advice on the confusions that I have .. I am thinking of using StreamingKMeans - is this a good choice ? I have one stream of data and I need an online algorithm. But here are some questions that immediately come to my mind .. 1. I cannot do separate training, cross validation etc. Is this a good idea to do training and prediction online ? 2. The data will be read from the stream coming from Kafka in microbatches of (say) 3 seconds. I get a DStream on which I train and get the clusters. How can I decide on the number of clusters ? Using StreamingKMeans is there any way I can iterate on microbatches with different values of k to find the optimal one ? 3. Even if I fix k, after training on every microbatch I get a DStream. How can I compute things like clustering score on the DStream ? StreamingKMeansModel has a computeCost function but it takes an RDD. May be using DStream.foreachRDD { //.. can work, but I am not able to figure out how. How can we compute the cost of clustering for an unbounded list of data ? Any idiomatic way to handle this ? Or is StreamingKMeans is not the right choice to do anomaly detection in an online setting .. any suggestion will be welcome .. regards. -- Debasish Ghosh http://manning.com/ghosh2 http://manning.com/ghosh Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: http://github.com/debasishg
Re: using StreamingKMeans
Thanks a lot for the response. Regarding the sampling part - yeah that's what I need to do if there's no way of titrating the number of clusters online. I am using something like dstream.foreachRDD { rdd => if (rdd.count() > 0) { //.. logic } } Feels a little odd but if that's the idiom then I will stick to it. regards. On Sat, Nov 19, 2016 at 10:52 PM, Cody Koeninger wrote: > So I haven't played around with streaming k means at all, but given > that no one responded to your message a couple of days ago, I'll say > what I can. > > 1. Can you not sample out some % of the stream for training? > 2. Can you run multiple streams at the same time with different values > for k and compare their performance? > 3. foreachRDD is fine in general, can't speak to the specifics. > 4. If you haven't done any transformations yet on a direct stream, > foreachRDD will give you a KafkaRDD. Checking if a KafkaRDD is empty > is very cheap, it's done on the driver only because the beginning and > ending offsets are known. So you should be able to skip empty > batches. > > > > On Sat, Nov 19, 2016 at 10:46 AM, debasishg > wrote: > > Hello - > > > > I am trying to implement an outlier detection application on streaming > data. > > I am a newbie to Spark and hence would like some advice on the confusions > > that I have .. > > > > I am thinking of using StreamingKMeans - is this a good choice ? I have > one > > stream of data and I need an online algorithm. But here are some > questions > > that immediately come to my mind .. > > > > 1. I cannot do separate training, cross validation etc. Is this a good > idea > > to do training and prediction online ? > > > > 2. The data will be read from the stream coming from Kafka in > microbatches > > of (say) 3 seconds. I get a DStream on which I train and get the > clusters. > > How can I decide on the number of clusters ? Using StreamingKMeans is > there > > any way I can iterate on microbatches with different values of k to find > the > > optimal one ? > > > > 3. Even if I fix k, after training on every microbatch I get a DStream. > How > > can I compute things like clustering score on the DStream ? > > StreamingKMeansModel has a computeCost function but it takes an RDD. I > can > > use dstream.foreachRDD { // process RDD for the micro batch here } - is > this > > the idiomatic way ? > > > > 4. If I use dstream.foreachRDD { .. } and use functions like new > > StandardScaler().fit(rdd) to do feature normalization, then it works > when I > > have data in the stream. But when the microbatch is empty (say I don't > have > > data for some time), the fit method throws exception as it gets an empty > > collection. Things start working ok when data starts coming back to the > > stream. But is this the way to go ? > > > > any suggestion will be welcome .. > > > > regards. > > > > > > > > -- > > View this message in context: http://apache-spark-user-list. > 1001560.n3.nabble.com/using-StreamingKMeans-tp28109.html > > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > > > - > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > > -- Debasish Ghosh http://manning.com/ghosh2 http://manning.com/ghosh Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: http://github.com/debasishg
Re: using StreamingKMeans
Looking for alternative suggestions in case where we have 1 continuous stream of data. Offline training and online prediction can be one option if we can have an alternate set of data to train. But if it's one single stream you don't have separate sets for training or cross validation. So whatever data u get in each micro batch, train on them and u get the cluster centroids from the model. Then apply some heuristics like mean distance from centroid and detect outliers. So for every microbatch u get the outliers based on the model and u can control forgetfulness of the model through the decay factor that u specify for StramingKMeans. Suggestions ? regards. On Sun, 20 Nov 2016 at 3:51 AM, ayan guha wrote: > Curious why do you want to train your models every 3 secs? > On 20 Nov 2016 06:25, "Debasish Ghosh" wrote: > > Thanks a lot for the response. > > Regarding the sampling part - yeah that's what I need to do if there's no > way of titrating the number of clusters online. > > I am using something like > > dstream.foreachRDD { rdd => > if (rdd.count() > 0) { //.. logic > } > } > > Feels a little odd but if that's the idiom then I will stick to it. > > regards. > > > > On Sat, Nov 19, 2016 at 10:52 PM, Cody Koeninger > wrote: > > So I haven't played around with streaming k means at all, but given > that no one responded to your message a couple of days ago, I'll say > what I can. > > 1. Can you not sample out some % of the stream for training? > 2. Can you run multiple streams at the same time with different values > for k and compare their performance? > 3. foreachRDD is fine in general, can't speak to the specifics. > 4. If you haven't done any transformations yet on a direct stream, > foreachRDD will give you a KafkaRDD. Checking if a KafkaRDD is empty > is very cheap, it's done on the driver only because the beginning and > ending offsets are known. So you should be able to skip empty > batches. > > > > On Sat, Nov 19, 2016 at 10:46 AM, debasishg > wrote: > > Hello - > > > > I am trying to implement an outlier detection application on streaming > data. > > I am a newbie to Spark and hence would like some advice on the confusions > > that I have .. > > > > I am thinking of using StreamingKMeans - is this a good choice ? I have > one > > stream of data and I need an online algorithm. But here are some > questions > > that immediately come to my mind .. > > > > 1. I cannot do separate training, cross validation etc. Is this a good > idea > > to do training and prediction online ? > > > > 2. The data will be read from the stream coming from Kafka in > microbatches > > of (say) 3 seconds. I get a DStream on which I train and get the > clusters. > > How can I decide on the number of clusters ? Using StreamingKMeans is > there > > any way I can iterate on microbatches with different values of k to find > the > > optimal one ? > > > > 3. Even if I fix k, after training on every microbatch I get a DStream. > How > > can I compute things like clustering score on the DStream ? > > StreamingKMeansModel has a computeCost function but it takes an RDD. I > can > > use dstream.foreachRDD { // process RDD for the micro batch here } - is > this > > the idiomatic way ? > > > > 4. If I use dstream.foreachRDD { .. } and use functions like new > > StandardScaler().fit(rdd) to do feature normalization, then it works > when I > > have data in the stream. But when the microbatch is empty (say I don't > have > > data for some time), the fit method throws exception as it gets an empty > > collection. Things start working ok when data starts coming back to the > > stream. But is this the way to go ? > > > > any suggestion will be welcome .. > > > > regards. > > > > > > > > -- > > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/using-StreamingKMeans-tp28109.html > > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > > > - > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > > > > > > -- > Debasish Ghosh > http://manning.com/ghosh2 > http://manning.com/ghosh > > Twttr: @debasishg > Blog: http://debasishg.blogspot.com > Code: http://github.com/debasishg > > -- Sent from my iPhone
Re: using StreamingKMeans
I share both the concerns that u have expressed. And as I mentioned in my earlier mail, offline (batch) training is an option if I get a dataset without outliers. In that case I can train and have a model. I find the model parameters, which will be the mean distance to the centroid. Note in training I will have only 1 cluster as it's only normal data (no outlier). I can now pass these parameters to the prediction phase which can work on streaming data. In the prediction phase I just compute the distance to centroid for each point and flag the violating ones as outliers. This looks like a perfectly valid option if I get a dataset with no outliers to train on. Now my question is what then is the use case in which we can use StreamingKMeans ? In the above scenario we use batch KMeans in training phase while we just compute the distance in the prediction phase. And how do we address the scenario where we have only one stream of data available ? regards. On Sun, 20 Nov 2016 at 6:07 AM, ayan guha wrote: > Here are 2 concerns I would have with the design (This discussion is > mostly to validate my own understanding) > > 1. if you have outliers "before" running k-means, aren't your centroids > get skewed? In other word, outliers by themselves may bias the cluster > evaluation, isn't it? > 2. Typically microbatches are small, like 3 sec in your case. in this > window you may not have enough data to run any statistically sigficant > operation, can you? > > My approach would have been: Run K-means on data without outliers (in > batch mode). Determine the model, ie centroids in case of kmeans. Then load > the model in your streaming app and just apply "outlier detection" > function, which takes the form of > > def detectOutlier(model,data): > /// your code, like mean distance etc > return T or F > > In response to your point about "alternet set of data", I would assume you > would accumulate the data you are receiving from streaming over few weeks > or months before running offline training. > > Am I missing something? > > On Sun, Nov 20, 2016 at 10:29 AM, Debasish Ghosh > wrote: > > Looking for alternative suggestions in case where we have 1 continuous > stream of data. Offline training and online prediction can be one option if > we can have an alternate set of data to train. But if it's one single > stream you don't have separate sets for training or cross validation. > > So whatever data u get in each micro batch, train on them and u get the > cluster centroids from the model. Then apply some heuristics like mean > distance from centroid and detect outliers. So for every microbatch u get > the outliers based on the model and u can control forgetfulness of the > model through the decay factor that u specify for StramingKMeans. > > Suggestions ? > > regards. > > On Sun, 20 Nov 2016 at 3:51 AM, ayan guha wrote: > > Curious why do you want to train your models every 3 secs? > On 20 Nov 2016 06:25, "Debasish Ghosh" wrote: > > Thanks a lot for the response. > > Regarding the sampling part - yeah that's what I need to do if there's no > way of titrating the number of clusters online. > > I am using something like > > dstream.foreachRDD { rdd => > if (rdd.count() > 0) { //.. logic > } > } > > Feels a little odd but if that's the idiom then I will stick to it. > > regards. > > > > On Sat, Nov 19, 2016 at 10:52 PM, Cody Koeninger > wrote: > > So I haven't played around with streaming k means at all, but given > that no one responded to your message a couple of days ago, I'll say > what I can. > > 1. Can you not sample out some % of the stream for training? > 2. Can you run multiple streams at the same time with different values > for k and compare their performance? > 3. foreachRDD is fine in general, can't speak to the specifics. > 4. If you haven't done any transformations yet on a direct stream, > foreachRDD will give you a KafkaRDD. Checking if a KafkaRDD is empty > is very cheap, it's done on the driver only because the beginning and > ending offsets are known. So you should be able to skip empty > batches. > > > > On Sat, Nov 19, 2016 at 10:46 AM, debasishg > wrote: > > Hello - > > > > I am trying to implement an outlier detection application on streaming > data. > > I am a newbie to Spark and hence would like some advice on the confusions > > that I have .. > > > > I am thinking of using StreamingKMeans - is this a good choice ? I have > one > > stream of data and I need an online algorithm. But here are some > questions > > that immediately come to my mind .
problem with kafka createDirectStream ..
Hello - I am facing some issues with the following snippet of code that reads from Kafka and creates DStream. I am using KafkaUtils.createDirectStream(..) with Kafka 0.10.1 and Spark 2.0.1. // get the data from kafka val stream: DStream[ConsumerRecord[Array[Byte], (String, String)]] = KafkaUtils.createDirectStream[Array[Byte], (String, String)]( streamingContext, PreferConsistent, Subscribe[Array[Byte], (String, String)](topicToReadFrom, kafkaParams) ) // label and vectorize the value val projected: DStream[(String, Vector)] = stream.map { record => val (label, value) = record.value val vector = Vectors.dense(value.split(",").map(_.toDouble)) (label, vector) }.transform(projectToLowerDimension) In the above snippet if I have the call to transform in the last line, I get the following exception .. Caused by: java.util.ConcurrentModificationException: KafkaConsumer is not > safe for multi-threaded access > at > org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431) > at > org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1132) > at > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95) > at > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69) > at > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227) > at > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at scala.collection.Iterator$$anon$10.next(Iterator.scala:393) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > at > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) > at scala.collection.TraversableOnce$class.to > (TraversableOnce.scala:310) > at scala.collection.AbstractIterator.to(Iterator.scala:1336) > at > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336) > The transform method does a PCA and gives the top 2 principal components .. private def projectToLowerDimension: RDD[(String, Vector)] => RDD[(String, Vector)] = { rdd => if (rdd.isEmpty) rdd else { // reduce to 2 dimensions val pca = new PCA(2).fit(rdd.map(_._2)) // Project vectors to the linear space spanned by the top 2 principal // components, keeping the label rdd.map(p => (p._1, pca.transform(p._2))) } } However if I remove the transform call, I can process everything correctly. Any help will be most welcome .. regards. -- Debasish Ghosh
Re: problem with kafka createDirectStream ..
My assembly contains the 0.10.1 classes .. Here are the dependencies related to kafka & spark that my assembly has .. libraryDependencies ++= Seq( "org.apache.kafka" % "kafka-streams" % "0.10.0.0", "org.apache.spark" %% "spark-streaming-kafka-0-10" % spark, "org.apache.spark" %% "spark-core" % spark % "provided", "org.apache.spark" %% "spark-streaming"% spark % "provided", "org.apache.spark" %% "spark-mllib"% spark % "provided", "org.apache.spark" %% "spark-sql" % spark % "provided" ) regards. On Fri, Dec 9, 2016 at 10:00 PM, Cody Koeninger wrote: > When you say 0.10.1 do you mean broker version only, or does your > assembly contain classes from the 0.10.1 kafka consumer? > > On Fri, Dec 9, 2016 at 10:19 AM, debasishg > wrote: > > Hello - > > > > I am facing some issues with the following snippet of code that reads > from > > Kafka and creates DStream. I am using KafkaUtils.createDirectStream(..) > with > > Kafka 0.10.1 and Spark 2.0.1. > > > > // get the data from kafka > > val stream: DStream[ConsumerRecord[Array[Byte], (String, String)]] = > > KafkaUtils.createDirectStream[Array[Byte], (String, String)]( > > streamingContext, > > PreferConsistent, > > Subscribe[Array[Byte], (String, String)](topicToReadFrom, > kafkaParams) > > ) > > > > // label and vectorize the value > > val projected: DStream[(String, Vector)] = stream.map { record => > > val (label, value) = record.value > > val vector = Vectors.dense(value.split(",").map(_.toDouble)) > > (label, vector) > > }.transform(projectToLowerDimension) > > > > In the above snippet if I have the call to transform in the last line, I > get > > the following exception .. > > > > Caused by: java.util.ConcurrentModificationException: KafkaConsumer is > not > > safe for multi-threaded access > > at > > org.apache.kafka.clients.consumer.KafkaConsumer. > acquire(KafkaConsumer.java:1431) > > at > > org.apache.kafka.clients.consumer.KafkaConsumer.seek( > KafkaConsumer.java:1132) > > at > > org.apache.spark.streaming.kafka010.CachedKafkaConsumer. > seek(CachedKafkaConsumer.scala:95) > > at > > org.apache.spark.streaming.kafka010.CachedKafkaConsumer. > get(CachedKafkaConsumer.scala:69) > > at > > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next( > KafkaRDD.scala:227) > > at > > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next( > KafkaRDD.scala:193) > > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > > at scala.collection.Iterator$$anon$10.next(Iterator.scala:393) > > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > > at > > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) > > at > > scala.collection.mutable.ArrayBuffer.$plus$plus$eq( > ArrayBuffer.scala:104) > > at > > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) > > at scala.collection.TraversableOnce$class.to( > TraversableOnce.scala:310) > > at scala.collection.AbstractIterator.to(Iterator.scala:1336) > > at > > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce. > scala:302) > > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336) > > > > > > The transform method does a PCA and gives the top 2 principal components > .. > > > > private def projectToLowerDimension: RDD[(String, Vector)] => > RDD[(String, > > Vector)] = { rdd => > > if (rdd.isEmpty) rdd else { > > // reduce to 2 dimensions > > val pca = new PCA(2).fit(rdd.map(_._2)) > > > > // Project vectors to the linear space spanned by the top 2 principal > > // components, keeping the label > > rdd.map(p => (p._1, pca.transform(p._2))) > > } > > } > > > > However if I remove the transform call, I can process everything > correctly. > > > > Any help will be most welcome .. > > > > regards. > > - Debasish > > > > > > > > -- > > View this message in context: http://apache-spark-user-list. > 1001560.n3.nabble.com/problem-with-kafka-createDirectStream-tp28190.html > > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > > > - > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > > -- Debasish Ghosh http://manning.com/ghosh2 http://manning.com/ghosh Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: http://github.com/debasishg
Re: problem with kafka createDirectStream ..
oops .. it's 0.10.0 .. sorry for the confusion .. On Fri, Dec 9, 2016 at 10:07 PM, Debasish Ghosh wrote: > My assembly contains the 0.10.1 classes .. Here are the dependencies > related to kafka & spark that my assembly has .. > > libraryDependencies ++= Seq( > "org.apache.kafka" % "kafka-streams" % > "0.10.0.0", > "org.apache.spark" %% "spark-streaming-kafka-0-10" % spark, > "org.apache.spark" %% "spark-core" % spark % > "provided", > "org.apache.spark" %% "spark-streaming"% spark % > "provided", > "org.apache.spark" %% "spark-mllib"% spark % > "provided", > "org.apache.spark" %% "spark-sql" % spark % > "provided" > ) > > regards. > > On Fri, Dec 9, 2016 at 10:00 PM, Cody Koeninger > wrote: > >> When you say 0.10.1 do you mean broker version only, or does your >> assembly contain classes from the 0.10.1 kafka consumer? >> >> On Fri, Dec 9, 2016 at 10:19 AM, debasishg >> wrote: >> > Hello - >> > >> > I am facing some issues with the following snippet of code that reads >> from >> > Kafka and creates DStream. I am using KafkaUtils.createDirectStream(..) >> with >> > Kafka 0.10.1 and Spark 2.0.1. >> > >> > // get the data from kafka >> > val stream: DStream[ConsumerRecord[Array[Byte], (String, String)]] = >> > KafkaUtils.createDirectStream[Array[Byte], (String, String)]( >> > streamingContext, >> > PreferConsistent, >> > Subscribe[Array[Byte], (String, String)](topicToReadFrom, >> kafkaParams) >> > ) >> > >> > // label and vectorize the value >> > val projected: DStream[(String, Vector)] = stream.map { record => >> > val (label, value) = record.value >> > val vector = Vectors.dense(value.split(",").map(_.toDouble)) >> > (label, vector) >> > }.transform(projectToLowerDimension) >> > >> > In the above snippet if I have the call to transform in the last line, >> I get >> > the following exception .. >> > >> > Caused by: java.util.ConcurrentModificationException: KafkaConsumer is >> not >> > safe for multi-threaded access >> > at >> > org.apache.kafka.clients.consumer.KafkaConsumer.acquire( >> KafkaConsumer.java:1431) >> > at >> > org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaCo >> nsumer.java:1132) >> > at >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek >> (CachedKafkaConsumer.scala:95) >> > at >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get( >> CachedKafkaConsumer.scala:69) >> > at >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterato >> r.next(KafkaRDD.scala:227) >> > at >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterato >> r.next(KafkaRDD.scala:193) >> > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) >> > at scala.collection.Iterator$$anon$10.next(Iterator.scala:393) >> > at scala.collection.Iterator$class.foreach(Iterator.scala:893) >> > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) >> > at >> > scala.collection.generic.Growable$class.$plus$plus$eq(Growab >> le.scala:59) >> > at >> > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuff >> er.scala:104) >> > at >> > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuff >> er.scala:48) >> > at scala.collection.TraversableOnce$class.to(TraversableOnce. >> scala:310) >> > at scala.collection.AbstractIterator.to(Iterator.scala:1336) >> > at >> > scala.collection.TraversableOnce$class.toBuffer( >> TraversableOnce.scala:302) >> > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336) >> > >> > >> > The transform method does a PCA and gives the top 2 principal >> components .. >> > >> > private def projectToLowerDimension: RDD[(String, Vector)] => >> RDD[(String, >> > Vector)] = { rdd => >> > if (rdd.isEmpty) rdd else { >> > // reduce to 2 dimensions >> > val pca = new PCA(2).fit(rdd.map(_._2)) >> > >> >