Hi Anil, Please attach whole example. I can not found where is DataStreamer was closed in your case (Where is method o.a.i.IgniteDataStreamer#close() was invoked?).
About cache size, It does not means anything, because Streamer caches entries and sends as batch. And yes, check each future, as Anton said, is a good point. On Thu, Nov 3, 2016 at 3:05 PM, Anton Vinogradov <avinogra...@gridgain.com> wrote: > Anil, > > getStreamer().addData() will return you IgniteFuture. You can check it > result by fut.get(). > get() will give you null in case data streamed and stored or throw an > exception. > > On Thu, Nov 3, 2016 at 2:32 PM, Anil <anilk...@gmail.com> wrote: > >> >> Yes,. that is only exception i see in logs. i will try the debug option. >> thanks. >> >> though data streamer is not returning exception all the time, >> IgniteCache#size() remains empty all the time. It weird. >> >> 1. >> for (Map.Entry<K, V> entry : m.entrySet()){ >> getStreamer().addData(entry.getKey(), entry.getValue()); >> >> } >> >> 2. >> >> for (Map.Entry<K, V> entry : m.entrySet()){ >> cache.put((String)entry.getKey(), (Person) >> entry.getValue()); >> } >> >> 3. >> for (Map.Entry<K, V> entry : m.entrySet()){ >> cache.replace((String)entry.getKey(), (Person) >> entry.getValue()); >> } >> >> >> cache size with #1 & #3 is 0 >> cache size with #2 is 1 as expected. >> >> Have you see similar issue before ? >> >> Thanks >> >> >> >> On 3 November 2016 at 16:33, Anton Vinogradov <avinogra...@gridgain.com> >> wrote: >> >>> Anil, >>> >>> Is it first and only exception at logs? >>> >>> Is it possible to debud this? >>> You can set breakpoint at first line of org.apache.ignite.internal.pro >>> cessors.datastreamer.DataStreamerImpl#closeEx(boolean, >>> org.apache.ignite.IgniteCheckedException) >>> This will give you information who stopping the datastreamer. >>> >>> On Thu, Nov 3, 2016 at 1:41 PM, Anil <anilk...@gmail.com> wrote: >>> >>>> Hi Anton, >>>> No. ignite nodes looks good. >>>> >>>> I have attached my KafkaCacheDataStreamer class and following is the >>>> code to listen to the kafka topic. IgniteCache is created using java >>>> configuration. >>>> >>>> I see cache size is zero after adding the entries to cache as well from >>>> KafkaCacheDataStreamer. Not sure how to log whether the entries added to >>>> cache or not. >>>> >>>> KafkaCacheDataStreamer<String, String, Person> kafkaStreamer = new >>>> KafkaCacheDataStreamer<String, String, Person>(); >>>> >>>> Properites pros = new Properites() // kafka properties >>>> ConsumerConfig consumerConfig = new ConsumerConfig(props); >>>> >>>> try { >>>> IgniteDataStreamer<String, Person> stmr = >>>> ignite.dataStreamer(CacheManager.PERSON_CACHE); >>>> // allow overwriting cache data >>>> stmr.allowOverwrite(true); >>>> >>>> kafkaStreamer.setIgnite(ignite); >>>> kafkaStreamer.setStreamer(stmr); >>>> >>>> // set the topic >>>> kafkaStreamer.setTopic(kafkaConfig.getString("topic", >>>> "TestTopic")); >>>> >>>> // set the number of threads to process Kafka streams >>>> kafkaStreamer.setThreads(1); >>>> >>>> // set Kafka consumer configurations >>>> kafkaStreamer.setConsumerConfig(consumerConfig); >>>> >>>> // set decoders >>>> kafkaStreamer.setKeyDecoder(new StringDecoder(new >>>> VerifiableProperties())); >>>> kafkaStreamer.setValueDecoder(new StringDecoder(new >>>> VerifiableProperties())); >>>> kafkaStreamer.setMultipleTupleExtractor(new >>>> StreamMultipleTupleExtractor<String, String, Person>() { >>>> @Override >>>> public Map<String, Person> extract(String msg) { >>>> Map<String, Person> entries = new HashMap<>(); >>>> try { >>>> KafkaMessage request = Json.decodeValue(msg, KafkaMessage.class); >>>> IgniteCache<String, Person> cache = CacheManager.getCache(); >>>> >>>> if (CollectionUtils.isNotEmpty(request.getPersons())){ >>>> String id = null; >>>> for (Person ib : request.getPersons()){ >>>> if (StringUtils.isNotBlank(ib.getId())){ >>>> id = ib.getId(); >>>> if (null != ib.isDeleted() && Boolean.TRUE.equals(ib.isDeleted())){ >>>> cache.remove(id); >>>> }else { >>>> // no need to store the id. so setting null. >>>> ib.setId(null); >>>> entries.put(id, ib); >>>> } >>>> } >>>> } >>>> }else { >>>> >>>> } >>>> }catch (Exception ex){ >>>> logger.error("Error while updating the cache - {} {} " ,msg, ex); >>>> } >>>> >>>> return entries; >>>> } >>>> }); >>>> >>>> kafkaStreamer.start(); >>>> }catch (Exception ex){ >>>> logger.error("Error in kafka data streamer ", ex); >>>> } >>>> >>>> >>>> Please let me know if you see any issues. thanks. >>>> >>>> On 3 November 2016 at 15:59, Anton Vinogradov <avinogra...@gridgain.com >>>> > wrote: >>>> >>>>> Anil, >>>>> >>>>> Could you provide getStreamer() code and full logs? >>>>> Possible, ignite node was disconnected and this cause DataStreamer >>>>> closure. >>>>> >>>>> On Thu, Nov 3, 2016 at 1:17 PM, Anil <anilk...@gmail.com> wrote: >>>>> >>>>>> HI, >>>>>> >>>>>> I have created custom kafka data streamer for my use case and i see >>>>>> following exception. >>>>>> >>>>>> java.lang.IllegalStateException: Data streamer has been closed. >>>>>> at org.apache.ignite.internal.pro >>>>>> cessors.datastreamer.DataStreamerImpl.enterBusy(DataStreamer >>>>>> Impl.java:360) >>>>>> at org.apache.ignite.internal.pro >>>>>> cessors.datastreamer.DataStreamerImpl.addData(DataStreamerIm >>>>>> pl.java:507) >>>>>> at org.apache.ignite.internal.pro >>>>>> cessors.datastreamer.DataStreamerImpl.addData(DataStreamerIm >>>>>> pl.java:498) >>>>>> at net.juniper.cs.cache.KafkaCach >>>>>> eDataStreamer.addMessage(KafkaCacheDataStreamer.java:128) >>>>>> at net.juniper.cs.cache.KafkaCach >>>>>> eDataStreamer$1.run(KafkaCacheDataStreamer.java:176) >>>>>> at java.util.concurrent.Executors >>>>>> $RunnableAdapter.call(Executors.java:511) >>>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>>>>> at java.util.concurrent.ThreadPoo >>>>>> lExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>>>> at java.util.concurrent.ThreadPoo >>>>>> lExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>> >>>>>> >>>>>> >>>>>> addMessage method is >>>>>> >>>>>> @Override >>>>>> protected void addMessage(T msg) { >>>>>> if (getMultipleTupleExtractor() == null){ >>>>>> Map.Entry<K, V> e = getSingleTupleExtractor().extr >>>>>> act(msg); >>>>>> >>>>>> if (e != null) >>>>>> getStreamer().addData(e); >>>>>> >>>>>> } else { >>>>>> Map<K, V> m = getMultipleTupleExtractor().extract(msg); >>>>>> if (m != null && !m.isEmpty()){ >>>>>> getStreamer().addData(m); >>>>>> } >>>>>> } >>>>>> } >>>>>> >>>>>> >>>>>> Do you see any issue ? Please let me know if you need any additional >>>>>> information. thanks. >>>>>> >>>>>> Thanks. >>>>>> >>>>> >>>>> >>>> >>> >> > -- Vladislav Pyatkov