Re: Spark Streaming ElasticSearch
Hi Siva In that case u can use structured streaming foreach / foreachBatch function which can help you process each record and write it into some sink -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Spark Streaming ElasticSearch
Hi Jainshasha, I need to read each row from Dataframe and made some changes to it before inserting it into ES. Thanks Siva On Mon, Oct 5, 2020 at 8:06 PM jainshasha wrote: > Hi Siva > > To emit data into ES using spark structured streaming job you need to used > ElasticSearch jar which has support for sink for spark structured streaming > job. For this you can use this one my branch where we have integrated ES > with spark 3.0 and scala 2.12 compatible > https://github.com/ThalesGroup/spark/tree/guavus/v3.0.0 > > Also in this you need to build three jars > elasticsearch-hadoop-sql > elasticsearch-hadoop-core > elasticsearch-hadoop-mr > which help in writing data into ES through spark structured streaming. > > And in your application job u can use this way to sink the data, remember > with ES there is only support of append mode of structured streaming. > val esDf = aggregatedDF > .writeStream > .outputMode("append") > .format("org.elasticsearch.spark.sql") > .option(CHECKPOINTLOCATION, kafkaCheckpointDirPath + "/es") > .start("aggregation-job-index-latest-1") > > > Let me know if you face any issues, will be happy to help you :) > > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Re: Spark Streaming ElasticSearch
Hi Siva To emit data into ES using spark structured streaming job you need to used ElasticSearch jar which has support for sink for spark structured streaming job. For this you can use this one my branch where we have integrated ES with spark 3.0 and scala 2.12 compatible https://github.com/ThalesGroup/spark/tree/guavus/v3.0.0 Also in this you need to build three jars elasticsearch-hadoop-sql elasticsearch-hadoop-core elasticsearch-hadoop-mr which help in writing data into ES through spark structured streaming. And in your application job u can use this way to sink the data, remember with ES there is only support of append mode of structured streaming. val esDf = aggregatedDF .writeStream .outputMode("append") .format("org.elasticsearch.spark.sql") .option(CHECKPOINTLOCATION, kafkaCheckpointDirPath + "/es") .start("aggregation-job-index-latest-1") Let me know if you face any issues, will be happy to help you :) -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org