problem with kafka createDirectStream ..
Hello - I am facing some issues with the following snippet of code that reads from Kafka and creates DStream. I am using KafkaUtils.createDirectStream(..) with Kafka 0.10.1 and Spark 2.0.1. // get the data from kafka val stream: DStream[ConsumerRecord[Array[Byte], (String, String)]] = KafkaUtils.createDirectStream[Array[Byte], (String, String)]( streamingContext, PreferConsistent, Subscribe[Array[Byte], (String, String)](topicToReadFrom, kafkaParams) ) // label and vectorize the value val projected: DStream[(String, Vector)] = stream.map { record => val (label, value) = record.value val vector = Vectors.dense(value.split(",").map(_.toDouble)) (label, vector) }.transform(projectToLowerDimension) In the above snippet if I have the call to transform in the last line, I get the following exception .. Caused by: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431) at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1132) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.Iterator$$anon$10.next(Iterator.scala:393) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) at scala.collection.AbstractIterator.to(Iterator.scala:1336) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336) The transform method does a PCA and gives the top 2 principal components .. private def projectToLowerDimension: RDD[(String, Vector)] => RDD[(String, Vector)] = { rdd => if (rdd.isEmpty) rdd else { // reduce to 2 dimensions val pca = new PCA(2).fit(rdd.map(_._2)) // Project vectors to the linear space spanned by the top 2 principal // components, keeping the label rdd.map(p => (p._1, pca.transform(p._2))) } } However if I remove the transform call, I can process everything correctly. Any help will be most welcome .. regards. - Debasish -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/problem-with-kafka-createDirectStream-tp28190.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
parallelizing model training ..
Hello - I have a question on parallelization of model training in Spark .. Suppose I have this code fragment for training a model with KMeans .. labeledData.foreachRDD { rdd => val normalizedData: RDD[Vector] = normalize(rdd) val trainedModel: KMeansModel = trainModel(normalizedData, noOfClusters) //.. compute WCSSE } Here labeledData is a DStream that I fetched from Kafka. Is there any way I can use the above fragment to train multiple models parallely with different values of noOfClusters ? e.g. (1 to 100).foreach { i => labeledData.foreachRDD { rdd => val normalizedData: RDD[Vector] = normalize(rdd) val trainedModel: KMeansModel = trainModel(normalizedData, i) //.. compute WCSSE } } which will use all available CPUs parallely for the training .. regards. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/parallelizing-model-training-tp28118.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
using StreamingKMeans
Hello - I am trying to implement an outlier detection application on streaming data. I am a newbie to Spark and hence would like some advice on the confusions that I have .. I am thinking of using StreamingKMeans - is this a good choice ? I have one stream of data and I need an online algorithm. But here are some questions that immediately come to my mind .. 1. I cannot do separate training, cross validation etc. Is this a good idea to do training and prediction online ? 2. The data will be read from the stream coming from Kafka in microbatches of (say) 3 seconds. I get a DStream on which I train and get the clusters. How can I decide on the number of clusters ? Using StreamingKMeans is there any way I can iterate on microbatches with different values of k to find the optimal one ? 3. Even if I fix k, after training on every microbatch I get a DStream. How can I compute things like clustering score on the DStream ? StreamingKMeansModel has a computeCost function but it takes an RDD. I can use dstream.foreachRDD { // process RDD for the micro batch here } - is this the idiomatic way ? 4. If I use dstream.foreachRDD { .. } and use functions like new StandardScaler().fit(rdd) to do feature normalization, then it works when I have data in the stream. But when the microbatch is empty (say I don't have data for some time), the fit method throws exception as it gets an empty collection. Things start working ok when data starts coming back to the stream. But is this the way to go ? any suggestion will be welcome .. regards. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/using-StreamingKMeans-tp28109.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Incremental model update
Hello - I have a question on how to handle incremental model updation in Spark ML .. We have a time series where we predict the future conditioned on the past. We can train a model offline based on historical data and then use that model during prediction. But say, if the underlying process is non-stationary, the probability distribution changes with time. In such cases we need to update the model so as to reflect the current change in distribution. We have 2 options - a. retrain periodically b. update the model incrementally a. is expensive. What about b. ? I think in Spark we have StreamingKMeans that takes care of this incremental model update for this classifier. Is this true ? But what about other classifiers that don't have these Streaming counterparts ? How do we handle such incremental model changes with those classifiers where the underlying distribution changes ? regards. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Incremental-model-update-tp27800.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org