Hi Team, We get the below error message when we try to add an elastick sink Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:92) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) ... 23 more Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:92) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at com.hds.alta.pipeline.topology.TopologyJob.lambda$workflow$cde51820$1(TopologyJob.java:186) at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) ... 27 more Caused by: java.lang.IllegalArgumentException: cannot write xcontent for unknown value of type class com.hds.alta.pipeline.model.TopologyDTO.
The code written for the same is here workflow(filterItems(openTelSrc)).sinkTo(new Elasticsearch7SinkBuilder<TopologyDTO>().setBulkFlushMaxActions(1) .setHosts(new HttpHost("elastic-host.com", 9200, "https")) .setConnectionPassword("password").setConnectionUsername("elastic") .setEmitter((element, context, indexer) -> indexer.add(createIndexRequest( element))).build()) .name("topology_sink"); private static IndexRequest createIndexRequest(TopologyDTO data) { Map<String, TopologyDTO> json = new HashMap<>(); json.put("data", data); return Requests.indexRequest() .index("topology") .id(data.getUuid()) //here uuid is String .source(json); } Any help would be greatly appreciated. Thanks, Tauseef