Anil, getStreamer().addData() will return you IgniteFuture. You can check it result by fut.get(). get() will give you null in case data streamed and stored or throw an exception.
On Thu, Nov 3, 2016 at 2:32 PM, Anil <anilk...@gmail.com> wrote: > > Yes,. that is only exception i see in logs. i will try the debug option. > thanks. > > though data streamer is not returning exception all the time, > IgniteCache#size() remains empty all the time. It weird. > > 1. > for (Map.Entry<K, V> entry : m.entrySet()){ > getStreamer().addData(entry.getKey(), entry.getValue()); > > } > > 2. > > for (Map.Entry<K, V> entry : m.entrySet()){ > cache.put((String)entry.getKey(), (Person) > entry.getValue()); > } > > 3. > for (Map.Entry<K, V> entry : m.entrySet()){ > cache.replace((String)entry.getKey(), (Person) > entry.getValue()); > } > > > cache size with #1 & #3 is 0 > cache size with #2 is 1 as expected. > > Have you see similar issue before ? > > Thanks > > > > On 3 November 2016 at 16:33, Anton Vinogradov <avinogra...@gridgain.com> > wrote: > >> Anil, >> >> Is it first and only exception at logs? >> >> Is it possible to debud this? >> You can set breakpoint at first line of org.apache.ignite.internal.pro >> cessors.datastreamer.DataStreamerImpl#closeEx(boolean, >> org.apache.ignite.IgniteCheckedException) >> This will give you information who stopping the datastreamer. >> >> On Thu, Nov 3, 2016 at 1:41 PM, Anil <anilk...@gmail.com> wrote: >> >>> Hi Anton, >>> No. ignite nodes looks good. >>> >>> I have attached my KafkaCacheDataStreamer class and following is the >>> code to listen to the kafka topic. IgniteCache is created using java >>> configuration. >>> >>> I see cache size is zero after adding the entries to cache as well from >>> KafkaCacheDataStreamer. Not sure how to log whether the entries added to >>> cache or not. >>> >>> KafkaCacheDataStreamer<String, String, Person> kafkaStreamer = new >>> KafkaCacheDataStreamer<String, String, Person>(); >>> >>> Properites pros = new Properites() // kafka properties >>> ConsumerConfig consumerConfig = new ConsumerConfig(props); >>> >>> try { >>> IgniteDataStreamer<String, Person> stmr = >>> ignite.dataStreamer(CacheManager.PERSON_CACHE); >>> // allow overwriting cache data >>> stmr.allowOverwrite(true); >>> >>> kafkaStreamer.setIgnite(ignite); >>> kafkaStreamer.setStreamer(stmr); >>> >>> // set the topic >>> kafkaStreamer.setTopic(kafkaConfig.getString("topic", >>> "TestTopic")); >>> >>> // set the number of threads to process Kafka streams >>> kafkaStreamer.setThreads(1); >>> >>> // set Kafka consumer configurations >>> kafkaStreamer.setConsumerConfig(consumerConfig); >>> >>> // set decoders >>> kafkaStreamer.setKeyDecoder(new StringDecoder(new >>> VerifiableProperties())); >>> kafkaStreamer.setValueDecoder(new StringDecoder(new >>> VerifiableProperties())); >>> kafkaStreamer.setMultipleTupleExtractor(new >>> StreamMultipleTupleExtractor<String, String, Person>() { >>> @Override >>> public Map<String, Person> extract(String msg) { >>> Map<String, Person> entries = new HashMap<>(); >>> try { >>> KafkaMessage request = Json.decodeValue(msg, KafkaMessage.class); >>> IgniteCache<String, Person> cache = CacheManager.getCache(); >>> >>> if (CollectionUtils.isNotEmpty(request.getPersons())){ >>> String id = null; >>> for (Person ib : request.getPersons()){ >>> if (StringUtils.isNotBlank(ib.getId())){ >>> id = ib.getId(); >>> if (null != ib.isDeleted() && Boolean.TRUE.equals(ib.isDeleted())){ >>> cache.remove(id); >>> }else { >>> // no need to store the id. so setting null. >>> ib.setId(null); >>> entries.put(id, ib); >>> } >>> } >>> } >>> }else { >>> >>> } >>> }catch (Exception ex){ >>> logger.error("Error while updating the cache - {} {} " ,msg, ex); >>> } >>> >>> return entries; >>> } >>> }); >>> >>> kafkaStreamer.start(); >>> }catch (Exception ex){ >>> logger.error("Error in kafka data streamer ", ex); >>> } >>> >>> >>> Please let me know if you see any issues. thanks. >>> >>> On 3 November 2016 at 15:59, Anton Vinogradov <avinogra...@gridgain.com> >>> wrote: >>> >>>> Anil, >>>> >>>> Could you provide getStreamer() code and full logs? >>>> Possible, ignite node was disconnected and this cause DataStreamer >>>> closure. >>>> >>>> On Thu, Nov 3, 2016 at 1:17 PM, Anil <anilk...@gmail.com> wrote: >>>> >>>>> HI, >>>>> >>>>> I have created custom kafka data streamer for my use case and i see >>>>> following exception. >>>>> >>>>> java.lang.IllegalStateException: Data streamer has been closed. >>>>> at org.apache.ignite.internal.pro >>>>> cessors.datastreamer.DataStreamerImpl.enterBusy(DataStreamer >>>>> Impl.java:360) >>>>> at org.apache.ignite.internal.pro >>>>> cessors.datastreamer.DataStreamerImpl.addData(DataStreamerIm >>>>> pl.java:507) >>>>> at org.apache.ignite.internal.pro >>>>> cessors.datastreamer.DataStreamerImpl.addData(DataStreamerIm >>>>> pl.java:498) >>>>> at net.juniper.cs.cache.KafkaCach >>>>> eDataStreamer.addMessage(KafkaCacheDataStreamer.java:128) >>>>> at net.juniper.cs.cache.KafkaCach >>>>> eDataStreamer$1.run(KafkaCacheDataStreamer.java:176) >>>>> at java.util.concurrent.Executors >>>>> $RunnableAdapter.call(Executors.java:511) >>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>>>> at java.util.concurrent.ThreadPoo >>>>> lExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>>> at java.util.concurrent.ThreadPoo >>>>> lExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>>> at java.lang.Thread.run(Thread.java:745) >>>>> >>>>> >>>>> >>>>> addMessage method is >>>>> >>>>> @Override >>>>> protected void addMessage(T msg) { >>>>> if (getMultipleTupleExtractor() == null){ >>>>> Map.Entry<K, V> e = getSingleTupleExtractor().extr >>>>> act(msg); >>>>> >>>>> if (e != null) >>>>> getStreamer().addData(e); >>>>> >>>>> } else { >>>>> Map<K, V> m = getMultipleTupleExtractor().extract(msg); >>>>> if (m != null && !m.isEmpty()){ >>>>> getStreamer().addData(m); >>>>> } >>>>> } >>>>> } >>>>> >>>>> >>>>> Do you see any issue ? Please let me know if you need any additional >>>>> information. thanks. >>>>> >>>>> Thanks. >>>>> >>>> >>>> >>> >> >