
If you want help I'd suggest copying the full code, you just shared the
config part.

On the other hand, if you are doing a project *now* I'd also suggest
using Structured
I'm sure you would get better support than using DStreams.


On Tue, 15 Jun 2021 at 08:39, Muhammed Favas <
favas.muham...@expeedsoftware.com> wrote:

> Hi All,
> Can some one help me how to resolve this?. It is very important to achieve
> my project objective
> *Regards,*
> *Favas  *
> *From:* Muhammed Favas
> *Sent:* Thursday, June 10, 2021 15:26 PM
> *To:* user@spark.apache.org
> *Subject:* Apply window function on data consumed from Kafka topic
> Hi,
>                 I have a requirement to create a spark streaming job that
> get data from kafka broker and need to apply window function on the data
> coming into the spark context.
> This is how I connected to kafka from spark
> val kafkaParams = *Map*[String, Object](
>   "bootstrap.servers" -> "<my-srvername>",
>   "key.deserializer" -> * classOf*[StringDeserializer],
>   "value.deserializer" -> * classOf*[StringDeserializer],
>   "group.id" -> "7b37787c-20e7-4614-98ba-6f4212e07bf0",
>   "auto.offset.reset" -> "earliest",
>   "enable.auto.commit" -> (true: java.lang.Boolean)
> )
> val topics = *Array*("7b37787c-20e7-4614-98ba-6f4212e07bf0")
> val inputMsg = KafkaUtils.*createDirectStream*[String,String](
>   ssc,
>   *PreferConsistent*,
>   *Subscribe*[String, String](topics, kafkaParams)
> )
> The variable “inputMsg” is of type
> “InputDStream[ConsumerRecord[String,String]]”
> When I used window function “inputMsg.window(Minute(1))”, it throws an
> error like below
> ERROR DirectKafkaInputDStream: Kafka ConsumerRecord is not serializable.
> Use .map to extract fields before calling .persist or .window
> Can some one help me on how to use widow function on spark streaming using
> kafka?
> *Regards,*
> *Favas  *

Reply via email to