Hi Team,

We get the below error message when we try to add an elastick sink
Caused by:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:92)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
... 23 more
Caused by:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:92)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at
com.hds.alta.pipeline.topology.TopologyJob.lambda$workflow$cde51820$1(TopologyJob.java:186)
at
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47)
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
... 27 more
Caused by: java.lang.IllegalArgumentException: cannot write xcontent for
unknown value of type class com.hds.alta.pipeline.model.TopologyDTO.

The code written for the same is here

workflow(filterItems(openTelSrc)).sinkTo(new
Elasticsearch7SinkBuilder<TopologyDTO>().setBulkFlushMaxActions(1)

.setHosts(new HttpHost("elastic-host.com", 9200, "https"))

.setConnectionPassword("password").setConnectionUsername("elastic")

.setEmitter((element, context, indexer) -> indexer.add(createIndexRequest(
element))).build())

.name("topology_sink");


private static IndexRequest createIndexRequest(TopologyDTO data) {

Map<String, TopologyDTO> json = new HashMap<>();

json.put("data", data);

return Requests.indexRequest()

.index("topology")

.id(data.getUuid()) //here uuid is String

.source(json);

}

Any help would be greatly appreciated.

Thanks,
Tauseef

Reply via email to