HI Anton, Thanks for responding. i will check if i can reproduce with issue with reproducer.
I had to implement my own kafka streamer because of https://issues.apache.org/jira/browse/IGNITE-4140 I suspect there is a problem when node rejoins the cluster and streamer is already closed and not recreated. Correct ? In the above case, kafka streamer tries to getStreamer and push the data but streamer is not available. Thanks. On 11 November 2016 at 14:00, Anton Vinogradov <a...@apache.org> wrote: > Anil, > > Unfortunately, > at com.test.cs.cache.KafkaCacheDataStreamer.addMessage(KafkaCac > heDataStreamer.java:149) > does not fits on attached sources. > > But, > java.lang.IllegalStateException: Cache has been closed or destroyed: > PERSON_CACHE > is a reason of closed datastreamer. > > It it possible to write reproducible example or to attach both (full, all) > logs and sourcess? > > > BTW, we already have Kafka streamer, why you decided to reimplement it? > > > > On Wed, Nov 9, 2016 at 5:39 PM, Anil <anilk...@gmail.com> wrote: > >> Would there be any issues because of size of data ? >> i loaded around 80 gb on 4 node cluster. each node is of 8 CPU and 32 GB >> RAM configuration. >> >> and cache configuration - >> >> CacheConfiguration<String, Person> pConfig = new >> CacheConfiguration<String, Person>(); >> pConfig.setName("Person_Cache"); >> pConfig.setIndexedTypes(String.class, Person.class); >> pConfig.setBackups(1); >> pConfig.setCacheMode(CacheMode.PARTITIONED); >> pConfig.setCopyOnRead(false); >> pConfig.setSwapEnabled(true); >> pConfig.setMemoryMode(CacheMemoryMode.OFFHEAP_TIERED); >> pConfig.setSqlOnheapRowCacheSize(100_000); >> pConfig.setOffHeapMaxMemory(10 * 1024 * 1024 * 1024); >> pConfig.setStartSize(2000000); >> pConfig.setStatisticsEnabled(true); >> >> Thanks for your help. >> >> On 9 November 2016 at 19:56, Anil <anilk...@gmail.com> wrote: >> >>> HI, >>> >>> Data streamer closed exception is very frequent. I did not see any >>> explicit errors/exception about data streamer close. the excption i see >>> only when message is getting added. >>> >>> I have 4 node ignite cluster and each node have consumer to connection >>> and push the message received to streamer. >>> >>> What if the node is down and re-joined when message is getting added >>> cache. >>> >>> Following is the exception from logs - >>> >>> 2016-11-09 05:55:55 ERROR pool-6-thread-1 KafkaCacheDataStreamer:146 - >>> Exception while adding to streamer >>> java.lang.IllegalStateException: Data streamer has been closed. >>> at org.apache.ignite.internal.processors.datastreamer.DataStrea >>> merImpl.enterBusy(DataStreamerImpl.java:360) >>> at org.apache.ignite.internal.processors.datastreamer.DataStrea >>> merImpl.addData(DataStreamerImpl.java:507) >>> at org.apache.ignite.internal.processors.datastreamer.DataStrea >>> merImpl.addData(DataStreamerImpl.java:498) >>> at com.test.cs.cache.KafkaCacheDataStreamer.addMessage(KafkaCac >>> heDataStreamer.java:140) >>> at com.test.cs.cache.KafkaCacheDataStreamer$1.run(KafkaCacheDat >>> aStreamer.java:197) >>> at java.util.concurrent.Executors$RunnableAdapter.call(Executor >>> s.java:511) >>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool >>> Executor.java:1142) >>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo >>> lExecutor.java:617) >>> at java.lang.Thread.run(Thread.java:745) >>> 2016-11-09 05:55:55 ERROR pool-6-thread-1 KafkaCacheDataStreamer:200 - >>> Message is ignored due to an error >>> [msg=MessageAndMetadata(TestTopic,1,Message(magic >>> = 0, attributes = 0, crc = 2111790081, key = null, payload = >>> java.nio.HeapByteBuffer[pos=0 lim=1155 cap=1155]),2034,kafka.serializ >>> er.StringDecoder@3f77f0b,kafka.serializer.StringDecoder@67fd2da0)] >>> java.lang.IllegalStateException: Cache has been closed or destroyed: >>> PERSON_CACHE >>> at org.apache.ignite.internal.processors.cache.GridCacheGateway >>> .enter(GridCacheGateway.java:160) >>> at org.apache.ignite.internal.processors.cache.IgniteCacheProxy >>> .onEnter(IgniteCacheProxy.java:2103) >>> at org.apache.ignite.internal.processors.cache.IgniteCacheProxy >>> .size(IgniteCacheProxy.java:826) >>> at com.test.cs.cache.KafkaCacheDataStreamer.addMessage(KafkaCac >>> heDataStreamer.java:149) >>> at com.test.cs.cache.KafkaCacheDataStreamer$1.run(KafkaCacheDat >>> aStreamer.java:197) >>> at java.util.concurrent.Executors$RunnableAdapter.call(Executor >>> s.java:511) >>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool >>> Executor.java:1142) >>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo >>> lExecutor.java:617) >>> at java.lang.Thread.run(Thread.java:745) >>> >>> I have attached the KafkaCacheDataStreamer class and let me know if you >>> need any additional details. thanks. >>> >>> >> >