Hi Team,
I have a spark streaming job, which will read from kafka and write into
elastic via Http request.
I want to validate each request from Kafka and change the payload as per
business need and write into Elastic Search.
I have used ES Http Request to push the data into Elastic Search. Can some
guide me how to write the data into ES via a data frame?
*Code Snippet: *
val dfInput = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test")
.option("startingOffsets", "latest")
.option("group.id", sourceTopicGroupId)
.option("failOnDataLoss", "false")
.option("maxOffsetsPerTrigger", maxOffsetsPerTrigger)
.load()
import spark.implicits._
val resultDf = dfInput
.withColumn("value", $"value".cast("string"))
.select("value")
resultDf.writeStream.foreach(new ForeachWriter[Row] {
override def open(partitionId: Long, version: Long): Boolean = true
override def process(value: Row): Unit = {
processEventsData(value.get(0).asInstanceOf[String], deviceIndex,
msgIndex, retryOnConflict,auth,refreshInterval,deviceUrl,messageUrl,spark)
}
override def close(errorOrNull: Throwable): Unit = {
}
}).trigger(Trigger.ProcessingTime(triggerPeriod)).start().awaitTermination()
//"1 second"
}
Please suggest, is there any approach.
Thanks