Hello,

see if this works, from the documentation:


// Subscribe to 1 topic, with headersval df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("includeHeaders", "true")
  .load()df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)",
"headers")
  .as[(String, String, Map)]


On Thu, 3 Dec 2020 at 18:22, <eugen.wintersber...@gmail.com> wrote:

> Hi folks,
>   I am trying to read the message headers from a Kafka structured stream
> which should be stored in a column named ``headers``.
> I try something like this:
>
> val stream = sparkSession.readStream.format("kafka")......load()
>
> stream.map(row => {
>
>  ...
>
>  val headers = row.getAs[HeaderT]("headers")
>
> ....
>
> })
>
>
> My question is: what would be *HeaderT*?
>
> Thanks in advance
>
>  Eugen
>

Reply via email to