Hi Guys, 
I’m getting this error in KafkaWordCount;

TaskSetManager: Lost task 0.0 in stage 4095.0 (TID 1281, 10.20.10.234): 
java.lang.ClassCastException: [B cannot be cast to java.lang.String             
                                                                                
                                                         
        at 
org.apache.spark.examples.streaming.KafkaWordCount$$anonfun$4$$anonfun$apply$1.apply(KafkaWordCount.scala:7


Some idea that could be?


Bellow the piece of code



val kafkaStream = {                                                             
                                                                                
                                          
        val kafkaParams = Map[String, String](                                  
                                                                                
  
                "zookeeper.connect" -> "achab3:2181",                           
                                                                                
      
                "group.id" -> "mygroup",                                        
                                                                                
                                 
                "zookeeper.connect.timeout.ms" -> "10000",                      
                                                                                
                                 
                "kafka.fetch.message.max.bytes" -> "4000000",                   
                                                                                
                                            
                "auto.offset.reset" -> "largest")                               
                                                                                
  
                                                                                
                                                                                
        
        val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap       
                                                                                
                                 
  //    val lines = KafkaUtils.createStream[String, String, DefaultDecoder, 
DefaultDecoder](ssc, kafkaParams, topicpMa                                      
                                      
p, storageLevel = StorageLevel.MEMORY_ONLY_SER).map(_._2)                       
                                                                                
                                            
        val KafkaDStreams = (1 to numStreams).map {_ =>                         
                                                                                
    
        KafkaUtils.createStream[String, String, DefaultDecoder, 
DefaultDecoder](ssc, kafkaParams, topicpMap, storageLe
vel = StorageLevel.MEMORY_ONLY_SER).map(_._2)                                   
                                                                                
                                 
        }                                                                       
                                                                                
                                 
    val unifiedStream = ssc.union(KafkaDStreams)                                
                                      
    unifiedStream.repartition(sparkProcessingParallelism)                       
                                      
 }

Thanks Guys
-- 
Informativa sulla Privacy: http://www.unibs.it/node/8155

Reply via email to