Hi,
I am reading spark streaming Kafka code.
In org.apache.spark.streaming.kafka.KafkaUtils file,
the function "createDirectStream" takes key class, value class, etc to
create classTag.
However, they are all implicit. I don't understand why they are implicit.
In fact, I can not find any other overloaded "createDirectStream" take
implicit parameters.
So what are these implicit ClassTags are used for ? Thank you.
def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
jssc: JavaStreamingContext,
keyClass: Class[K],
valueClass: Class[V],
keyDecoderClass: Class[KD],
valueDecoderClass: Class[VD],
recordClass: Class[R],
kafkaParams: JMap[String, String],
fromOffsets: JMap[TopicAndPartition, JLong],
messageHandler: JFunction[MessageAndMetadata[K, V], R]
): JavaInputDStream[R] = {
implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
implicit val recordCmt: ClassTag[R] = ClassTag(recordClass)
val cleanedHandler = jssc.sparkContext.clean(messageHandler.call _)
createDirectStream[K, V, KD, VD, R](
jssc.ssc,
Map(kafkaParams.toSeq: _*),
Map(fromOffsets.mapValues { _.longValue() }.toSeq: _*),
cleanedHandler
)
}
--
Hao Ren
Data Engineer @ leboncoin
Paris, France