Unsubscribe
Unsubscribe
Re: StreamingKMeans does not update cluster centroid locations
Also the cluster centroid I get in streaming mode (some with negative values) do not make sense - if I use the same data and run in batch KMeans.train(sc.parallelize(parsedData), numClusters, numIterations) cluster centers are what you would expect. Krishna On Fri, Feb 19, 2016 at 12:49 PM, krishna ramachandran wrote: > ok i will share a simple example soon. meantime you will be able to see > this behavior using example here, > > > https://github.com/apache/spark/blob/branch-1.2/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingKMeansExample.scala > > slightly modify it to include > > model.latestModel.clusterCenters.foreach(println) > > (after model.trainOn) > > add new files to trainingDir periodically > > I have 3 dimensions per data-point - they look like these, > > [1, 1, 385.224145278] > > [3, 1, 384.752946389] > > [4,1, 3083.2778025] > > [2, 4, 6226.40232139] > > [1, 2, 785.84266] > > [5, 1, 6706.05424139] > > > > and monitor. please let know if I missed something > > Krishna > > > > > > On Fri, Feb 19, 2016 at 10:59 AM, Bryan Cutler wrote: > >> Can you share more of your code to reproduce this issue? The model >> should be updated with each batch, but can't tell what is happening from >> what you posted so far. >> >> On Fri, Feb 19, 2016 at 10:40 AM, krishna ramachandran >> wrote: >> >>> Hi Bryan >>> Agreed. It is a single statement to print the centers once for *every* >>> streaming batch (4 secs) - remember this is in streaming mode and the >>> receiver has fresh data every batch. That is, as the model is trained >>> continuously so I expect the centroids to change with incoming streams (at >>> least until convergence) >>> >>> But am seeing same centers always for the entire duration - ran the app >>> for several hours with a custom receiver. >>> >>> Yes I am using the latestModel to predict using "labeled" test data. But >>> also like to know where my centers are >>> >>> regards >>> Krishna >>> >>> >>> >>> On Fri, Feb 19, 2016 at 10:18 AM, Bryan Cutler >>> wrote: >>> >>>> Could you elaborate where the issue is? You say calling >>>> model.latestModel.clusterCenters.foreach(println) doesn't show an updated >>>> model, but that is just a single statement to print the centers once.. >>>> >>>> Also, is there any reason you don't predict on the test data like this? >>>> >>>> model.predictOnValues(testData.map(lp => (lp.label, >>>> lp.features))).print() >>>> >>>> >>>> >>>> On Thu, Feb 18, 2016 at 5:59 PM, ramach1776 wrote: >>>> >>>>> I have streaming application wherein I train the model using a >>>>> receiver input >>>>> stream in 4 sec batches >>>>> >>>>> val stream = ssc.receiverStream(receiver) //receiver gets new data >>>>> every >>>>> batch >>>>> model.trainOn(stream.map(Vectors.parse)) >>>>> If I use >>>>> model.latestModel.clusterCenters.foreach(println) >>>>> >>>>> the value of cluster centers remain unchanged from the very initial >>>>> value it >>>>> got during first iteration (when the streaming app started) >>>>> >>>>> when I use the model to predict cluster assignment with a labeled >>>>> input the >>>>> assignments change over time as expected >>>>> >>>>> testData.transform {rdd => >>>>> rdd.map(lp => (lp.label, >>>>> model.latestModel().predict(lp.features))) >>>>> }.print() >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> View this message in context: >>>>> http://apache-spark-user-list.1001560.n3.nabble.com/StreamingKMeans-does-not-update-cluster-centroid-locations-tp26275.html >>>>> Sent from the Apache Spark User List mailing list archive at >>>>> Nabble.com. >>>>> >>>>> - >>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>> >>>>> >>>> >>> >> >
Re: StreamingKMeans does not update cluster centroid locations
ok i will share a simple example soon. meantime you will be able to see this behavior using example here, https://github.com/apache/spark/blob/branch-1.2/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingKMeansExample.scala slightly modify it to include model.latestModel.clusterCenters.foreach(println) (after model.trainOn) add new files to trainingDir periodically I have 3 dimensions per data-point - they look like these, [1, 1, 385.224145278] [3, 1, 384.752946389] [4,1, 3083.2778025] [2, 4, 6226.40232139] [1, 2, 785.84266] [5, 1, 6706.05424139] and monitor. please let know if I missed something Krishna On Fri, Feb 19, 2016 at 10:59 AM, Bryan Cutler wrote: > Can you share more of your code to reproduce this issue? The model should > be updated with each batch, but can't tell what is happening from what you > posted so far. > > On Fri, Feb 19, 2016 at 10:40 AM, krishna ramachandran > wrote: > >> Hi Bryan >> Agreed. It is a single statement to print the centers once for *every* >> streaming batch (4 secs) - remember this is in streaming mode and the >> receiver has fresh data every batch. That is, as the model is trained >> continuously so I expect the centroids to change with incoming streams (at >> least until convergence) >> >> But am seeing same centers always for the entire duration - ran the app >> for several hours with a custom receiver. >> >> Yes I am using the latestModel to predict using "labeled" test data. But >> also like to know where my centers are >> >> regards >> Krishna >> >> >> >> On Fri, Feb 19, 2016 at 10:18 AM, Bryan Cutler wrote: >> >>> Could you elaborate where the issue is? You say calling >>> model.latestModel.clusterCenters.foreach(println) doesn't show an updated >>> model, but that is just a single statement to print the centers once.. >>> >>> Also, is there any reason you don't predict on the test data like this? >>> >>> model.predictOnValues(testData.map(lp => (lp.label, >>> lp.features))).print() >>> >>> >>> >>> On Thu, Feb 18, 2016 at 5:59 PM, ramach1776 wrote: >>> >>>> I have streaming application wherein I train the model using a receiver >>>> input >>>> stream in 4 sec batches >>>> >>>> val stream = ssc.receiverStream(receiver) //receiver gets new data every >>>> batch >>>> model.trainOn(stream.map(Vectors.parse)) >>>> If I use >>>> model.latestModel.clusterCenters.foreach(println) >>>> >>>> the value of cluster centers remain unchanged from the very initial >>>> value it >>>> got during first iteration (when the streaming app started) >>>> >>>> when I use the model to predict cluster assignment with a labeled input >>>> the >>>> assignments change over time as expected >>>> >>>> testData.transform {rdd => >>>> rdd.map(lp => (lp.label, >>>> model.latestModel().predict(lp.features))) >>>> }.print() >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> -- >>>> View this message in context: >>>> http://apache-spark-user-list.1001560.n3.nabble.com/StreamingKMeans-does-not-update-cluster-centroid-locations-tp26275.html >>>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>>> >>>> - >>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>> For additional commands, e-mail: user-h...@spark.apache.org >>>> >>>> >>> >> >
Re: StreamingKMeans does not update cluster centroid locations
Hi Bryan Agreed. It is a single statement to print the centers once for *every* streaming batch (4 secs) - remember this is in streaming mode and the receiver has fresh data every batch. That is, as the model is trained continuously so I expect the centroids to change with incoming streams (at least until convergence) But am seeing same centers always for the entire duration - ran the app for several hours with a custom receiver. Yes I am using the latestModel to predict using "labeled" test data. But also like to know where my centers are regards Krishna On Fri, Feb 19, 2016 at 10:18 AM, Bryan Cutler wrote: > Could you elaborate where the issue is? You say calling > model.latestModel.clusterCenters.foreach(println) doesn't show an updated > model, but that is just a single statement to print the centers once.. > > Also, is there any reason you don't predict on the test data like this? > > model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print() > > > > On Thu, Feb 18, 2016 at 5:59 PM, ramach1776 wrote: > >> I have streaming application wherein I train the model using a receiver >> input >> stream in 4 sec batches >> >> val stream = ssc.receiverStream(receiver) //receiver gets new data every >> batch >> model.trainOn(stream.map(Vectors.parse)) >> If I use >> model.latestModel.clusterCenters.foreach(println) >> >> the value of cluster centers remain unchanged from the very initial value >> it >> got during first iteration (when the streaming app started) >> >> when I use the model to predict cluster assignment with a labeled input >> the >> assignments change over time as expected >> >> testData.transform {rdd => >> rdd.map(lp => (lp.label, >> model.latestModel().predict(lp.features))) >> }.print() >> >> >> >> >> >> >> >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/StreamingKMeans-does-not-update-cluster-centroid-locations-tp26275.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >
Re: adding a split and union to a streaming application cause big performance hit
I tried these 2 global settings (and restarted the app) after enabling cache for stream1 conf.set("spark.streaming.unpersist", "true") streamingContext.remember(Seconds(batchDuration * 4)) batch duration is 4 sec Using spark-1.4.1. The application runs for about 4-5 hrs then see out of memory error regards Krishna On Thu, Feb 18, 2016 at 4:54 AM, Ted Yu wrote: > bq. streamingContext.remember("duration") did not help > > Can you give a bit more detail on the above ? > Did you mean the job encountered OOME later on ? > > Which Spark release are you using ? > > Cheers > > On Wed, Feb 17, 2016 at 6:03 PM, ramach1776 wrote: > >> We have a streaming application containing approximately 12 jobs every >> batch, >> running in streaming mode (4 sec batches). Each job has several >> transformations and 1 action (output to cassandra) which causes the >> execution of the job (DAG) >> >> For example the first job, >> >> /job 1 >> ---> receive Stream A --> map --> filter -> (union with another stream B) >> --> map -->/ groupbykey --> transform --> reducebykey --> map >> >> Likewise we go thro' few more transforms and save to database (job2, >> job3...) >> >> Recently we added a new transformation further downstream wherein we union >> the output of DStream from job 1 (in italics) with output from a new >> transformation(job 5). It appears the whole execution thus far is repeated >> which is redundant (I can see this in execution graph & also performance >> -> >> processing time). >> >> That is, with this additional transformation (union with a stream >> processed >> upstream) each batch runs as much as 2.5 times slower compared to runs >> without the union. If I cache the DStream from job 1(italics), performance >> improves substantially but hit out of memory errors within few hours. >> >> What is the recommended way to cache/unpersist in such a scenario? there >> is >> no dstream level "unpersist" >> setting "spark.streaming.unpersist" to true and >> streamingContext.remember("duration") did not help. >> >> >> >> >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/adding-a-split-and-union-to-a-streaming-application-cause-big-performance-hit-tp26259.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >
streaming application redundant dag stage execution/performance/caching
We have a streaming application containing approximately 12 stages every batch, running in streaming mode (4 sec batches). Each stage persists output to cassandra the pipeline stages stage 1 ---> receive Stream A --> map --> filter -> (union with another stream B) --> map --> groupbykey --> transform --> reducebykey --> map we go thro' few more stages of transforms and save to database. Around stage 5, we union the output of Dstream from stage 1 (in red) with another stream (generated by split during stage 2) and save that state It appears the whole execution thus far is repeated which is redundant (I can see this in execution graph & also performance -> processing time). Processing time per batch nearly doubles or triples. This additional & redundant processing cause each batch to run as much as 2.5 times slower compared to runs without the union - union for most batches does not alter the original DStream (union with an empty set). If I cache the DStream (red block output), performance improves substantially but hit out of memory errors within few hours. What is the recommended way to cache/unpersist in such a scenario? there is no dstream level "unpersist" setting "spark.streaming.unpersist" to true and streamingContext.remember("duration") did not help. Still seeing out of memory errors Krishna