Team,
Presently I have added elasticsearch as a sink to a stream and inserting the
json data, the problem is when I restore the application in case of crash it
reprocess the data in between (meanwhile a backend application updates the
document in ES) and flink reinsert the document in ES and all update to ES
are lost .

I am trying for a update or insert in case document not found or do not
insert if document is already there.


I have tried by providing opType to elasticsearch builder, I am getting an
error message "document already exists" on my console, but it still updates
the value in elasticsearch

val jsonString = write(record)
val rqst: IndexRequest = Requests.indexRequest
  .index(parameter.get("esIndexName"))
  .`type`(parameter.get("esIndexType"))
  .id(record.getApi_key + "_" + record.getOrder_id)
  .source(jsonString, XContentType.JSON)
    .opType(OpType.CREATE)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to