Hi Jainshasha, I need to read each row from Dataframe and made some changes to it before inserting it into ES.
Thanks Siva On Mon, Oct 5, 2020 at 8:06 PM jainshasha <jainsha...@gmail.com> wrote: > Hi Siva > > To emit data into ES using spark structured streaming job you need to used > ElasticSearch jar which has support for sink for spark structured streaming > job. For this you can use this one my branch where we have integrated ES > with spark 3.0 and scala 2.12 compatible > https://github.com/ThalesGroup/spark/tree/guavus/v3.0.0 > > Also in this you need to build three jars > elasticsearch-hadoop-sql > elasticsearch-hadoop-core > elasticsearch-hadoop-mr > which help in writing data into ES through spark structured streaming. > > And in your application job u can use this way to sink the data, remember > with ES there is only support of append mode of structured streaming. > val esDf = aggregatedDF > .writeStream > .outputMode("append") > .format("org.elasticsearch.spark.sql") > .option(CHECKPOINTLOCATION, kafkaCheckpointDirPath + "/es") > .start("aggregation-job-index-latest-1") > > > Let me know if you face any issues, will be happy to help you :) > > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > --------------------------------------------------------------------- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >