I have a DirectStream and process data from Kafka,

 val directKafkaStream = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams1, topics1.toSet)
 directKafkaStream.foreachRDD { rdd =>
      val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

When I have added a new DirectStream and do a union between both it doesn't
work. I thought that it was the same type, but I got a ClassCastException


     val directKafkaStream = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams1, topics1.toSet)
    val directKafkaStream2 = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams2, topics2.toSet)
    val kafkaStream = directKafkaStream.union(directKafkaStream2)

    kafkaStream.foreachRDD { rdd =>
      val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
-->Exception

Exception in thread "main" java.lang.ClassCastException:
org.apache.spark.rdd.UnionRDD cannot be cast to
org.apache.spark.streaming.kafka.HasOffsetRanges
at
com.produban.metrics.MetricsSpark$$anonfun$main$1.apply(MetricsSpark.scala:72)

I guessed that rdd.union(rdd2) gives same type of RDD..

Reply via email to