problem with kafka createDirectStream ..

2016-12-09 Thread debasishg
Hello -

I am facing some issues with the following snippet of code that reads from
Kafka and creates DStream. I am using KafkaUtils.createDirectStream(..) with
Kafka 0.10.1 and Spark 2.0.1.

// get the data from kafka
val stream: DStream[ConsumerRecord[Array[Byte], (String, String)]] = 
  KafkaUtils.createDirectStream[Array[Byte], (String, String)](
streamingContext,
PreferConsistent,
Subscribe[Array[Byte], (String, String)](topicToReadFrom, kafkaParams)
  )

// label and vectorize the value
val projected: DStream[(String, Vector)] = stream.map { record =>
  val (label, value) = record.value
  val vector = Vectors.dense(value.split(",").map(_.toDouble))
  (label, vector)
}.transform(projectToLowerDimension)

In the above snippet if I have the call to transform in the last line, I get
the following exception ..

Caused by: java.util.ConcurrentModificationException: KafkaConsumer is not
safe for multi-threaded access
at
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
at
org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1132)
at
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95)
at
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69)
at
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
at
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:393)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1336)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)


The transform method does a PCA and gives the top 2 principal components ..

private def projectToLowerDimension: RDD[(String, Vector)] => RDD[(String,
Vector)] = { rdd =>
  if (rdd.isEmpty) rdd else {
// reduce to 2 dimensions
val pca = new PCA(2).fit(rdd.map(_._2))

// Project vectors to the linear space spanned by the top 2 principal
// components, keeping the label
rdd.map(p => (p._1, pca.transform(p._2)))
  }
}

However if I remove the transform call, I can process everything correctly.

Any help will be most welcome ..

regards.
- Debasish



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/problem-with-kafka-createDirectStream-tp28190.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



parallelizing model training ..

2016-11-22 Thread debasishg
Hello -

I have a question on parallelization of model training in Spark ..

Suppose I have this code fragment for training a model with KMeans ..

labeledData.foreachRDD { rdd =>
  val normalizedData: RDD[Vector] = normalize(rdd)
  val trainedModel: KMeansModel = trainModel(normalizedData, noOfClusters)
  //.. compute WCSSE
}

Here labeledData is a DStream that I fetched from Kafka.

Is there any way I can use the above fragment to train multiple models
parallely with different values of noOfClusters ? e.g.

(1 to 100).foreach { i =>
  labeledData.foreachRDD { rdd =>
val normalizedData: RDD[Vector] = normalize(rdd)
val trainedModel: KMeansModel = trainModel(normalizedData, i)
//.. compute WCSSE
  }
}

which will use all available CPUs parallely for the training ..

regards.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/parallelizing-model-training-tp28118.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



using StreamingKMeans

2016-11-19 Thread debasishg
Hello -

I am trying to implement an outlier detection application on streaming data.
I am a newbie to Spark and hence would like some advice on the confusions
that I have ..

I am thinking of using StreamingKMeans - is this a good choice ? I have one
stream of data and I need an online algorithm. But here are some questions
that immediately come to my mind ..

1. I cannot do separate training, cross validation etc. Is this a good idea
to do training and prediction online ? 

2. The data will be read from the stream coming from Kafka in microbatches
of (say) 3 seconds. I get a DStream on which I train and get the clusters.
How can I decide on the number of clusters ? Using StreamingKMeans is there
any way I can iterate on microbatches with different values of k to find the
optimal one ?

3. Even if I fix k, after training on every microbatch I get a DStream. How
can I compute things like clustering score on the DStream ?
StreamingKMeansModel has a computeCost function but it takes an RDD. I can
use dstream.foreachRDD { // process RDD for the micro batch here } - is this
the idiomatic way ? 

4. If I use dstream.foreachRDD { .. } and use functions like new
StandardScaler().fit(rdd) to do feature normalization, then it works when I
have data in the stream. But when the microbatch is empty (say I don't have
data for some time), the fit method throws exception as it gets an empty
collection. Things start working ok when data starts coming back to the
stream. But is this the way to go ?

any suggestion will be welcome ..

regards.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/using-StreamingKMeans-tp28109.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Incremental model update

2016-09-27 Thread debasishg
Hello -

I have a question on how to handle incremental model updation in Spark ML ..

We have a time series where we predict the future conditioned on the past.
We can train a model offline based on historical data and then use that
model during prediction. 

But say, if the underlying process is non-stationary, the probability
distribution changes with time. In such cases we need to update the model so
as to reflect the current change in distribution.

We have 2 options -

a. retrain periodically
b. update the model incrementally

a. is expensive. 

What about b. ? I think in Spark we have StreamingKMeans that takes care of
this incremental model update for this classifier. Is this true ?

But what about other classifiers that don't have these Streaming
counterparts ? How do we handle such incremental model changes with those
classifiers where the underlying distribution changes ?

regards.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Incremental-model-update-tp27800.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org