HI, I have created custom kafka data streamer for my use case and i see following exception.
java.lang.IllegalStateException: Data streamer has been closed. at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.enterBusy(DataStreamerImpl.java:360) at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.addData(DataStreamerImpl.java:507) at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.addData(DataStreamerImpl.java:498) at net.juniper.cs.cache.KafkaCacheDataStreamer.addMessage(KafkaCacheDataStreamer.java:128) at net.juniper.cs.cache.KafkaCacheDataStreamer$1.run(KafkaCacheDataStreamer.java:176) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) addMessage method is @Override protected void addMessage(T msg) { if (getMultipleTupleExtractor() == null){ Map.Entry<K, V> e = getSingleTupleExtractor().extract(msg); if (e != null) getStreamer().addData(e); } else { Map<K, V> m = getMultipleTupleExtractor().extract(msg); if (m != null && !m.isEmpty()){ getStreamer().addData(m); } } } Do you see any issue ? Please let me know if you need any additional information. thanks. Thanks.