Anil,

Unfortunately,
  at com.test.cs.cache.KafkaCacheDataStreamer.addMessage(
KafkaCacheDataStreamer.java:149)
does not fits on attached sources.

But,
java.lang.IllegalStateException: Cache has been closed or destroyed:
PERSON_CACHE
is a reason of closed datastreamer.

It it possible to write reproducible example or to attach both (full, all)
logs and sourcess?


BTW, we already have Kafka streamer, why you decided to reimplement it?



On Wed, Nov 9, 2016 at 5:39 PM, Anil <anilk...@gmail.com> wrote:

> Would there be any issues because of size of data ?
> i loaded around 80 gb on 4 node cluster. each node is of 8 CPU and 32 GB
> RAM configuration.
>
> and cache configuration -
>
> CacheConfiguration<String, Person> pConfig = new
> CacheConfiguration<String, Person>();
> pConfig.setName("Person_Cache");
> pConfig.setIndexedTypes(String.class, Person.class);
> pConfig.setBackups(1);
> pConfig.setCacheMode(CacheMode.PARTITIONED);
> pConfig.setCopyOnRead(false);
> pConfig.setSwapEnabled(true);
> pConfig.setMemoryMode(CacheMemoryMode.OFFHEAP_TIERED);
> pConfig.setSqlOnheapRowCacheSize(100_000);
> pConfig.setOffHeapMaxMemory(10 * 1024 * 1024 * 1024);
> pConfig.setStartSize(2000000);
> pConfig.setStatisticsEnabled(true);
>
> Thanks for your help.
>
> On 9 November 2016 at 19:56, Anil <anilk...@gmail.com> wrote:
>
>> HI,
>>
>> Data streamer closed exception is very frequent. I did not see any
>> explicit errors/exception about data streamer close. the excption i see
>> only when message is getting added.
>>
>> I have 4 node ignite cluster and each node have consumer to connection
>> and push the message received to streamer.
>>
>> What if the node is down and re-joined when message is getting added
>> cache.
>>
>> Following is the exception from logs -
>>
>> 2016-11-09 05:55:55 ERROR pool-6-thread-1 KafkaCacheDataStreamer:146 -
>> Exception while adding to streamer
>> java.lang.IllegalStateException: Data streamer has been closed.
>>         at org.apache.ignite.internal.processors.datastreamer.DataStrea
>> merImpl.enterBusy(DataStreamerImpl.java:360)
>>         at org.apache.ignite.internal.processors.datastreamer.DataStrea
>> merImpl.addData(DataStreamerImpl.java:507)
>>         at org.apache.ignite.internal.processors.datastreamer.DataStrea
>> merImpl.addData(DataStreamerImpl.java:498)
>>         at com.test.cs.cache.KafkaCacheDataStreamer.addMessage(KafkaCac
>> heDataStreamer.java:140)
>>         at com.test.cs.cache.KafkaCacheDataStreamer$1.run(KafkaCacheDat
>> aStreamer.java:197)
>>         at java.util.concurrent.Executors$RunnableAdapter.call(
>> Executors.java:511)
>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>> Executor.java:1142)
>>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>> lExecutor.java:617)
>>         at java.lang.Thread.run(Thread.java:745)
>> 2016-11-09 05:55:55 ERROR pool-6-thread-1 KafkaCacheDataStreamer:200 -
>> Message is ignored due to an error 
>> [msg=MessageAndMetadata(TestTopic,1,Message(magic
>> = 0, attributes = 0, crc = 2111790081, key = null, payload =
>> java.nio.HeapByteBuffer[pos=0 lim=1155 cap=1155]),2034,kafka.serializ
>> er.StringDecoder@3f77f0b,kafka.serializer.StringDecoder@67fd2da0)]
>> java.lang.IllegalStateException: Cache has been closed or destroyed:
>> PERSON_CACHE
>>         at org.apache.ignite.internal.processors.cache.GridCacheGateway
>> .enter(GridCacheGateway.java:160)
>>         at org.apache.ignite.internal.processors.cache.IgniteCacheProxy
>> .onEnter(IgniteCacheProxy.java:2103)
>>         at org.apache.ignite.internal.processors.cache.IgniteCacheProxy
>> .size(IgniteCacheProxy.java:826)
>>         at com.test.cs.cache.KafkaCacheDataStreamer.addMessage(KafkaCac
>> heDataStreamer.java:149)
>>         at com.test.cs.cache.KafkaCacheDataStreamer$1.run(KafkaCacheDat
>> aStreamer.java:197)
>>         at java.util.concurrent.Executors$RunnableAdapter.call(
>> Executors.java:511)
>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>> Executor.java:1142)
>>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>> lExecutor.java:617)
>>         at java.lang.Thread.run(Thread.java:745)
>>
>> I have attached the KafkaCacheDataStreamer class and let me know if you
>> need any additional details. thanks.
>>
>>
>

Reply via email to