HI, Data streamer closed exception is very frequent. I did not see any explicit errors/exception about data streamer close. the excption i see only when message is getting added.
I have 4 node ignite cluster and each node have consumer to connection and push the message received to streamer. What if the node is down and re-joined when message is getting added cache. Following is the exception from logs - 2016-11-09 05:55:55 ERROR pool-6-thread-1 KafkaCacheDataStreamer:146 - Exception while adding to streamer java.lang.IllegalStateException: Data streamer has been closed. at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.enterBusy(DataStreamerImpl.java:360) at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.addData(DataStreamerImpl.java:507) at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.addData(DataStreamerImpl.java:498) at com.test.cs.cache.KafkaCacheDataStreamer.addMessage(KafkaCacheDataStreamer.java:140) at com.test.cs.cache.KafkaCacheDataStreamer$1.run(KafkaCacheDataStreamer.java:197) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 2016-11-09 05:55:55 ERROR pool-6-thread-1 KafkaCacheDataStreamer:200 - Message is ignored due to an error [msg=MessageAndMetadata(TestTopic,1,Message(magic = 0, attributes = 0, crc = 2111790081, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=1155 cap=1155]),2034,kafka.serializer.StringDecoder@3f77f0b ,kafka.serializer.StringDecoder@67fd2da0)] java.lang.IllegalStateException: Cache has been closed or destroyed: PERSON_CACHE at org.apache.ignite.internal.processors.cache.GridCacheGateway.enter(GridCacheGateway.java:160) at org.apache.ignite.internal.processors.cache.IgniteCacheProxy.onEnter(IgniteCacheProxy.java:2103) at org.apache.ignite.internal.processors.cache.IgniteCacheProxy.size(IgniteCacheProxy.java:826) at com.test.cs.cache.KafkaCacheDataStreamer.addMessage(KafkaCacheDataStreamer.java:149) at com.test.cs.cache.KafkaCacheDataStreamer$1.run(KafkaCacheDataStreamer.java:197) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) I have attached the KafkaCacheDataStreamer class and let me know if you need any additional details. thanks.
KafkaCacheDataStreamer.java
Description: Binary data