Team, Presently I have added elasticsearch as a sink to a stream and inserting the json data, the problem is when I restore the application in case of crash it reprocess the data in between (meanwhile a backend application updates the document in ES) and flink reinsert the document in ES and all update to ES are lost .
I am trying for a update or insert in case document not found or do not insert if document is already there. I have tried by providing opType to elasticsearch builder, I am getting an error message "document already exists" on my console, but it still updates the value in elasticsearch val jsonString = write(record) val rqst: IndexRequest = Requests.indexRequest .index(parameter.get("esIndexName")) .`type`(parameter.get("esIndexType")) .id(record.getApi_key + "_" + record.getOrder_id) .source(jsonString, XContentType.JSON) .opType(OpType.CREATE) -- Sent from: