HI,

I have created custom kafka data streamer for my use case and i see
following exception.

java.lang.IllegalStateException: Data streamer has been closed.
        at
org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.enterBusy(DataStreamerImpl.java:360)
        at
org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.addData(DataStreamerImpl.java:507)
        at
org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.addData(DataStreamerImpl.java:498)
        at
net.juniper.cs.cache.KafkaCacheDataStreamer.addMessage(KafkaCacheDataStreamer.java:128)
        at
net.juniper.cs.cache.KafkaCacheDataStreamer$1.run(KafkaCacheDataStreamer.java:176)
        at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)



addMessage method is

 @Override
    protected void addMessage(T msg) {
    if (getMultipleTupleExtractor() == null){
            Map.Entry<K, V> e = getSingleTupleExtractor().extract(msg);

            if (e != null)
                getStreamer().addData(e);

        } else {
            Map<K, V> m = getMultipleTupleExtractor().extract(msg);
            if (m != null && !m.isEmpty()){
                getStreamer().addData(m);
            }
        }
    }


Do you see any issue ? Please let me know if you need any additional
information. thanks.

Thanks.

Reply via email to