HI,

Data streamer closed exception is very frequent. I did not see any explicit
errors/exception about data streamer close. the excption i see only when
message is getting added.

I have 4 node ignite cluster and each node have consumer to connection and
push the message received to streamer.

What if the node is down and re-joined when message is getting added cache.

Following is the exception from logs -

2016-11-09 05:55:55 ERROR pool-6-thread-1 KafkaCacheDataStreamer:146 -
Exception while adding to streamer
java.lang.IllegalStateException: Data streamer has been closed.
        at
org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.enterBusy(DataStreamerImpl.java:360)
        at
org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.addData(DataStreamerImpl.java:507)
        at
org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.addData(DataStreamerImpl.java:498)
        at
com.test.cs.cache.KafkaCacheDataStreamer.addMessage(KafkaCacheDataStreamer.java:140)
        at
com.test.cs.cache.KafkaCacheDataStreamer$1.run(KafkaCacheDataStreamer.java:197)
        at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
2016-11-09 05:55:55 ERROR pool-6-thread-1 KafkaCacheDataStreamer:200 -
Message is ignored due to an error
[msg=MessageAndMetadata(TestTopic,1,Message(magic = 0, attributes = 0, crc
= 2111790081, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=1155
cap=1155]),2034,kafka.serializer.StringDecoder@3f77f0b
,kafka.serializer.StringDecoder@67fd2da0)]
java.lang.IllegalStateException: Cache has been closed or destroyed:
PERSON_CACHE
        at
org.apache.ignite.internal.processors.cache.GridCacheGateway.enter(GridCacheGateway.java:160)
        at
org.apache.ignite.internal.processors.cache.IgniteCacheProxy.onEnter(IgniteCacheProxy.java:2103)
        at
org.apache.ignite.internal.processors.cache.IgniteCacheProxy.size(IgniteCacheProxy.java:826)
        at
com.test.cs.cache.KafkaCacheDataStreamer.addMessage(KafkaCacheDataStreamer.java:149)
        at
com.test.cs.cache.KafkaCacheDataStreamer$1.run(KafkaCacheDataStreamer.java:197)
        at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

I have attached the KafkaCacheDataStreamer class and let me know if you
need any additional details. thanks.

Attachment: KafkaCacheDataStreamer.java
Description: Binary data

Reply via email to