When building the request, you should build an UpdateRequest, like the 
following snippet:

import org.elasticsearch.action.update.UpdateRequest
import org.elasticsearch.common.xcontent.XContentType

val doc: String = ???
val targetIndex: String = ???
val indexType: Option[String] = ???

new UpdateRequest()
  .index(targetIndex)
  .`type`(indexType.getOrElse("_doc"))
  .id(id)
  .upsert(doc, XContentType.JSON)
  .doc(doc, XContentType.JSON)
  .docAsUpsert(true)

I'm not entirely sure if you need both "doc" and "upsert" fields, as I think 
this depends on the Elasticsearch you're using.

________________________________
De: ApoorvK <apoorv.upadh...@razorpay.com>
Enviat el: dilluns, 10 de febrer de 2020 15:33
Per a: user@flink.apache.org <user@flink.apache.org>
Tema: Flink Elasticsearch upsert document in ES

Team,
Presently I have added elasticsearch as a sink to a stream and inserting the
json data, the problem is when I restore the application in case of crash it
reprocess the data in between (meanwhile a backend application updates the
document in ES) and flink reinsert the document in ES and all update to ES
are lost .

I am trying for a update or insert in case document not found or do not
insert if document is already there.


I have tried by providing opType to elasticsearch builder, I am getting an
error message "document already exists" on my console, but it still updates
the value in elasticsearch

val jsonString = write(record)
val rqst: IndexRequest = Requests.indexRequest
  .index(parameter.get("esIndexName"))
  .`type`(parameter.get("esIndexType"))
  .id(record.getApi_key + "_" + record.getOrder_id)
  .source(jsonString, XContentType.JSON)
    .opType(OpType.CREATE)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

________________________________

Este mensaje y sus adjuntos se dirigen exclusivamente a su destinatario, puede 
contener informaci?n privilegiada o confidencial y es para uso exclusivo de la 
persona o entidad de destino. Si no es usted. el destinatario indicado, queda 
notificado de que la lectura, utilizaci?n, divulgaci?n y/o copia sin 
autorizaci?n puede estar prohibida en virtud de la legislaci?n vigente. Si ha 
recibido este mensaje por error, le rogamos que nos lo comunique inmediatamente 
por esta misma v?a y proceda a su destrucci?n.

The information contained in this transmission is privileged and confidential 
information intended only for the use of the individual or entity named above. 
If the reader of this message is not the intended recipient, you are hereby 
notified that any dissemination, distribution or copying of this communication 
is strictly prohibited. If you have received this transmission in error, do not 
read it. Please immediately reply to the sender that you have received this 
communication in error and then delete it.

Esta mensagem e seus anexos se dirigem exclusivamente ao seu destinat?rio, pode 
conter informa??o privilegiada ou confidencial e ? para uso exclusivo da pessoa 
ou entidade de destino. Se n?o ? vossa senhoria o destinat?rio indicado, fica 
notificado de que a leitura, utiliza??o, divulga??o e/ou c?pia sem autoriza??o 
pode estar proibida em virtude da legisla??o vigente. Se recebeu esta mensagem 
por erro, rogamos-lhe que nos o comunique imediatamente por esta mesma via e 
proceda a sua destrui??o

Reply via email to