Unsubscribe

2016-12-16 Thread krishna ramachandran
Unsubscribe


Re: StreamingKMeans does not update cluster centroid locations

2016-02-19 Thread krishna ramachandran
Also the cluster centroid I get in streaming mode (some with negative
values) do not make sense - if I use the same data and run in batch

KMeans.train(sc.parallelize(parsedData), numClusters, numIterations)

cluster centers are what you would expect.

Krishna



On Fri, Feb 19, 2016 at 12:49 PM, krishna ramachandran 
wrote:

> ok i will share a simple example soon. meantime you will be able to see
> this behavior using example here,
>
>
> https://github.com/apache/spark/blob/branch-1.2/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingKMeansExample.scala
>
> slightly modify it to include
>
> model.latestModel.clusterCenters.foreach(println)
>
> (after model.trainOn)
>
> add new files to trainingDir periodically
>
> I have 3 dimensions per data-point - they look like these,
>
> [1, 1, 385.224145278]
>
> [3, 1, 384.752946389]
>
> [4,1, 3083.2778025]
>
> [2, 4, 6226.40232139]
>
> [1, 2, 785.84266]
>
> [5, 1, 6706.05424139]
>
> 
>
> and monitor. please let know if I missed something
>
> Krishna
>
>
>
>
>
> On Fri, Feb 19, 2016 at 10:59 AM, Bryan Cutler  wrote:
>
>> Can you share more of your code to reproduce this issue?  The model
>> should be updated with each batch, but can't tell what is happening from
>> what you posted so far.
>>
>> On Fri, Feb 19, 2016 at 10:40 AM, krishna ramachandran 
>> wrote:
>>
>>> Hi Bryan
>>> Agreed. It is a single statement to print the centers once for *every*
>>> streaming batch (4 secs) - remember this is in streaming mode and the
>>> receiver has fresh data every batch. That is, as the model is trained
>>> continuously so I expect the centroids to change with incoming streams (at
>>> least until convergence)
>>>
>>> But am seeing same centers always for the entire duration - ran the app
>>> for several hours with a custom receiver.
>>>
>>> Yes I am using the latestModel to predict using "labeled" test data. But
>>> also like to know where my centers are
>>>
>>> regards
>>> Krishna
>>>
>>>
>>>
>>> On Fri, Feb 19, 2016 at 10:18 AM, Bryan Cutler 
>>> wrote:
>>>
>>>> Could you elaborate where the issue is?  You say calling
>>>> model.latestModel.clusterCenters.foreach(println) doesn't show an updated
>>>> model, but that is just a single statement to print the centers once..
>>>>
>>>> Also, is there any reason you don't predict on the test data like this?
>>>>
>>>> model.predictOnValues(testData.map(lp => (lp.label,
>>>> lp.features))).print()
>>>>
>>>>
>>>>
>>>> On Thu, Feb 18, 2016 at 5:59 PM, ramach1776  wrote:
>>>>
>>>>> I have streaming application wherein I train the model using a
>>>>> receiver input
>>>>> stream in 4 sec batches
>>>>>
>>>>> val stream = ssc.receiverStream(receiver) //receiver gets new data
>>>>> every
>>>>> batch
>>>>> model.trainOn(stream.map(Vectors.parse))
>>>>> If I use
>>>>> model.latestModel.clusterCenters.foreach(println)
>>>>>
>>>>> the value of cluster centers remain unchanged from the very initial
>>>>> value it
>>>>> got during first iteration (when the streaming app started)
>>>>>
>>>>> when I use the model to predict cluster assignment with a labeled
>>>>> input the
>>>>> assignments change over time as expected
>>>>>
>>>>>   testData.transform {rdd =>
>>>>> rdd.map(lp => (lp.label,
>>>>> model.latestModel().predict(lp.features)))
>>>>>   }.print()
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> View this message in context:
>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/StreamingKMeans-does-not-update-cluster-centroid-locations-tp26275.html
>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>> Nabble.com.
>>>>>
>>>>> -
>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>
>>>>>
>>>>
>>>
>>
>


Re: StreamingKMeans does not update cluster centroid locations

2016-02-19 Thread krishna ramachandran
ok i will share a simple example soon. meantime you will be able to see
this behavior using example here,

https://github.com/apache/spark/blob/branch-1.2/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingKMeansExample.scala

slightly modify it to include

model.latestModel.clusterCenters.foreach(println)

(after model.trainOn)

add new files to trainingDir periodically

I have 3 dimensions per data-point - they look like these,

[1, 1, 385.224145278]

[3, 1, 384.752946389]

[4,1, 3083.2778025]

[2, 4, 6226.40232139]

[1, 2, 785.84266]

[5, 1, 6706.05424139]



and monitor. please let know if I missed something

Krishna





On Fri, Feb 19, 2016 at 10:59 AM, Bryan Cutler  wrote:

> Can you share more of your code to reproduce this issue?  The model should
> be updated with each batch, but can't tell what is happening from what you
> posted so far.
>
> On Fri, Feb 19, 2016 at 10:40 AM, krishna ramachandran 
> wrote:
>
>> Hi Bryan
>> Agreed. It is a single statement to print the centers once for *every*
>> streaming batch (4 secs) - remember this is in streaming mode and the
>> receiver has fresh data every batch. That is, as the model is trained
>> continuously so I expect the centroids to change with incoming streams (at
>> least until convergence)
>>
>> But am seeing same centers always for the entire duration - ran the app
>> for several hours with a custom receiver.
>>
>> Yes I am using the latestModel to predict using "labeled" test data. But
>> also like to know where my centers are
>>
>> regards
>> Krishna
>>
>>
>>
>> On Fri, Feb 19, 2016 at 10:18 AM, Bryan Cutler  wrote:
>>
>>> Could you elaborate where the issue is?  You say calling
>>> model.latestModel.clusterCenters.foreach(println) doesn't show an updated
>>> model, but that is just a single statement to print the centers once..
>>>
>>> Also, is there any reason you don't predict on the test data like this?
>>>
>>> model.predictOnValues(testData.map(lp => (lp.label,
>>> lp.features))).print()
>>>
>>>
>>>
>>> On Thu, Feb 18, 2016 at 5:59 PM, ramach1776  wrote:
>>>
>>>> I have streaming application wherein I train the model using a receiver
>>>> input
>>>> stream in 4 sec batches
>>>>
>>>> val stream = ssc.receiverStream(receiver) //receiver gets new data every
>>>> batch
>>>> model.trainOn(stream.map(Vectors.parse))
>>>> If I use
>>>> model.latestModel.clusterCenters.foreach(println)
>>>>
>>>> the value of cluster centers remain unchanged from the very initial
>>>> value it
>>>> got during first iteration (when the streaming app started)
>>>>
>>>> when I use the model to predict cluster assignment with a labeled input
>>>> the
>>>> assignments change over time as expected
>>>>
>>>>   testData.transform {rdd =>
>>>> rdd.map(lp => (lp.label,
>>>> model.latestModel().predict(lp.features)))
>>>>   }.print()
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/StreamingKMeans-does-not-update-cluster-centroid-locations-tp26275.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> -
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>>
>>>
>>
>


Re: StreamingKMeans does not update cluster centroid locations

2016-02-19 Thread krishna ramachandran
Hi Bryan
Agreed. It is a single statement to print the centers once for *every*
streaming batch (4 secs) - remember this is in streaming mode and the
receiver has fresh data every batch. That is, as the model is trained
continuously so I expect the centroids to change with incoming streams (at
least until convergence)

But am seeing same centers always for the entire duration - ran the app for
several hours with a custom receiver.

Yes I am using the latestModel to predict using "labeled" test data. But
also like to know where my centers are

regards
Krishna



On Fri, Feb 19, 2016 at 10:18 AM, Bryan Cutler  wrote:

> Could you elaborate where the issue is?  You say calling
> model.latestModel.clusterCenters.foreach(println) doesn't show an updated
> model, but that is just a single statement to print the centers once..
>
> Also, is there any reason you don't predict on the test data like this?
>
> model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print()
>
>
>
> On Thu, Feb 18, 2016 at 5:59 PM, ramach1776  wrote:
>
>> I have streaming application wherein I train the model using a receiver
>> input
>> stream in 4 sec batches
>>
>> val stream = ssc.receiverStream(receiver) //receiver gets new data every
>> batch
>> model.trainOn(stream.map(Vectors.parse))
>> If I use
>> model.latestModel.clusterCenters.foreach(println)
>>
>> the value of cluster centers remain unchanged from the very initial value
>> it
>> got during first iteration (when the streaming app started)
>>
>> when I use the model to predict cluster assignment with a labeled input
>> the
>> assignments change over time as expected
>>
>>   testData.transform {rdd =>
>> rdd.map(lp => (lp.label,
>> model.latestModel().predict(lp.features)))
>>   }.print()
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/StreamingKMeans-does-not-update-cluster-centroid-locations-tp26275.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: adding a split and union to a streaming application cause big performance hit

2016-02-18 Thread krishna ramachandran
I tried these 2 global settings (and restarted the app) after enabling
cache for stream1

conf.set("spark.streaming.unpersist", "true")

streamingContext.remember(Seconds(batchDuration * 4))

batch duration is 4 sec

Using spark-1.4.1. The application runs for about 4-5 hrs then see out of
memory error

regards

Krishna

On Thu, Feb 18, 2016 at 4:54 AM, Ted Yu  wrote:

> bq. streamingContext.remember("duration") did not help
>
> Can you give a bit more detail on the above ?
> Did you mean the job encountered OOME later on ?
>
> Which Spark release are you using ?
>
> Cheers
>
> On Wed, Feb 17, 2016 at 6:03 PM, ramach1776  wrote:
>
>> We have a streaming application containing approximately 12 jobs every
>> batch,
>> running in streaming mode (4 sec batches). Each  job has several
>> transformations and 1 action (output to cassandra) which causes the
>> execution of the job (DAG)
>>
>> For example the first job,
>>
>> /job 1
>> ---> receive Stream A --> map --> filter -> (union with another stream B)
>> --> map -->/ groupbykey --> transform --> reducebykey --> map
>>
>> Likewise we go thro' few more transforms and save to database (job2,
>> job3...)
>>
>> Recently we added a new transformation further downstream wherein we union
>> the output of DStream from job 1 (in italics) with output from a new
>> transformation(job 5). It appears the whole execution thus far is repeated
>> which is redundant (I can see this in execution graph & also performance
>> ->
>> processing time).
>>
>> That is, with this additional transformation (union with a stream
>> processed
>> upstream) each batch runs as much as 2.5 times slower compared to runs
>> without the union. If I cache the DStream from job 1(italics), performance
>> improves substantially but hit out of memory errors within few hours.
>>
>> What is the recommended way to cache/unpersist in such a scenario? there
>> is
>> no dstream level "unpersist"
>> setting "spark.streaming.unpersist" to true and
>> streamingContext.remember("duration") did not help.
>>
>>
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/adding-a-split-and-union-to-a-streaming-application-cause-big-performance-hit-tp26259.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


streaming application redundant dag stage execution/performance/caching

2016-02-16 Thread krishna ramachandran
We have a streaming application containing approximately 12 stages every
batch, running in streaming mode (4 sec batches). Each stage persists
output to cassandra

the pipeline stages
stage 1

---> receive Stream A --> map --> filter -> (union with another stream B)
--> map --> groupbykey --> transform --> reducebykey --> map

we go thro' few more stages of transforms and save to database.

Around stage 5, we union the output of Dstream from stage 1 (in red) with
another stream (generated by split during stage 2) and save that state

It appears the whole execution thus far is repeated which is redundant (I
can see this in execution graph & also performance -> processing time).
Processing time per batch nearly doubles or triples.

This additional & redundant processing cause each batch to run as much as
2.5 times slower compared to runs without the union - union for most
batches does not alter the original DStream (union with an empty set). If I
cache the DStream (red block output), performance improves substantially
but hit out of memory errors within few hours.

What is the recommended way to cache/unpersist in such a scenario? there is
no dstream level "unpersist"

setting "spark.streaming.unpersist" to true and
streamingContext.remember("duration") did not help. Still seeing out of
memory errors

Krishna