Hi,
I'm trying to use KafkaUtils.createDirectStream to read avro messages from
Kafka but something is off with my type arguments:

val avroStream = KafkaUtils.createDirectStream[AvroKey[GenericRecord],
GenericRecord, NullWritable, AvroInputFormat[GenericRecord]](ssc,
kafkaParams, topicSet)

I'm getting the following error:
<console>:47: error: type arguments
[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord],org.apache.avro.generic.GenericRecord,org.apache.hadoop.io.NullWritable,org.apache.avro.mapred.AvroInputFormat[org.apache.avro.generic.GenericRecord]]
conform to the bounds of none of the overloaded alternatives of
 value createDirectStream: [K, V, KD <: kafka.serializer.Decoder[K], VD <:
kafka.serializer.Decoder[V]](jssc:
org.apache.spark.streaming.api.java.JavaStreamingContext, keyClass:
Class[K], valueClass: Class[V], keyDecoderClass: Class[KD],
valueDecoderClass: Class[VD], kafkaParams: java.util.Map[String,String],
topics:
java.util.Set[String])org.apache.spark.streaming.api.java.JavaPairInputDStream[K,V]
<and> [K, V, KD <: kafka.serializer.Decoder[K], VD <:
kafka.serializer.Decoder[V]](ssc:
org.apache.spark.streaming.StreamingContext, kafkaParams:
Map[String,String], topics: Set[String])(implicit evidence$19:
scala.reflect.ClassTag[K], implicit evidence$20: scala.reflect.ClassTag[V],
implicit evidence$21: scala.reflect.ClassTag[KD], implicit evidence$22:
scala.reflect.ClassTag[VD])org.apache.spark.streaming.dstream.InputDStream[(K,
V)]

What am I doing wrong?

Thank you.
Daniel

Reply via email to