Hi Anil,

Please attach whole example.
I can not found where is DataStreamer was closed in your case (Where is
method o.a.i.IgniteDataStreamer#close() was invoked?).

About cache size, It does not means anything, because Streamer caches
entries and sends as batch.
And yes, check each future, as Anton said, is a good point.

On Thu, Nov 3, 2016 at 3:05 PM, Anton Vinogradov <avinogra...@gridgain.com>
wrote:

> Anil,
>
> getStreamer().addData() will return you IgniteFuture. You can check it
> result by fut.get().
> get() will give you null in case data streamed and stored or throw an
> exception.
>
> On Thu, Nov 3, 2016 at 2:32 PM, Anil <anilk...@gmail.com> wrote:
>
>>
>> Yes,. that is only exception i see in logs. i will try the debug option.
>> thanks.
>>
>> though data streamer is not returning exception all the time,
>> IgniteCache#size() remains empty all the time. It weird.
>>
>> 1.
>> for (Map.Entry<K, V> entry : m.entrySet()){
>>                 getStreamer().addData(entry.getKey(), entry.getValue());
>>
>>                 }
>>
>> 2.
>>
>> for (Map.Entry<K, V> entry : m.entrySet()){
>>                  cache.put((String)entry.getKey(), (Person)
>> entry.getValue());
>>                 }
>>
>> 3.
>> for (Map.Entry<K, V> entry : m.entrySet()){
>>                  cache.replace((String)entry.getKey(), (Person)
>> entry.getValue());
>>                  }
>>
>>
>> cache size with #1 & #3  is 0
>> cache size with #2 is 1 as expected.
>>
>> Have you see similar issue before ?
>>
>> Thanks
>>
>>
>>
>> On 3 November 2016 at 16:33, Anton Vinogradov <avinogra...@gridgain.com>
>> wrote:
>>
>>> Anil,
>>>
>>> Is it first and only exception at logs?
>>>
>>> Is it possible to debud this?
>>> You can set breakpoint at first line of org.apache.ignite.internal.pro
>>> cessors.datastreamer.DataStreamerImpl#closeEx(boolean,
>>> org.apache.ignite.IgniteCheckedException)
>>> This will give you information who stopping the datastreamer.
>>>
>>> On Thu, Nov 3, 2016 at 1:41 PM, Anil <anilk...@gmail.com> wrote:
>>>
>>>> Hi Anton,
>>>> No. ignite nodes looks good.
>>>>
>>>> I have attached my KafkaCacheDataStreamer class and following is the
>>>> code to listen to the kafka topic. IgniteCache is created using java
>>>> configuration.
>>>>
>>>> I see cache size is zero after adding the entries to cache as well from
>>>> KafkaCacheDataStreamer. Not sure how to log whether the entries added to
>>>> cache or not.
>>>>
>>>> KafkaCacheDataStreamer<String, String, Person> kafkaStreamer = new
>>>> KafkaCacheDataStreamer<String, String, Person>();
>>>>
>>>>  Properites pros = new Properites() // kafka properties
>>>>  ConsumerConfig consumerConfig = new ConsumerConfig(props);
>>>>
>>>>     try {
>>>>     IgniteDataStreamer<String, Person> stmr =
>>>> ignite.dataStreamer(CacheManager.PERSON_CACHE);
>>>>        // allow overwriting cache data
>>>>        stmr.allowOverwrite(true);
>>>>
>>>>        kafkaStreamer.setIgnite(ignite);
>>>>        kafkaStreamer.setStreamer(stmr);
>>>>
>>>>        // set the topic
>>>>        kafkaStreamer.setTopic(kafkaConfig.getString("topic",
>>>> "TestTopic"));
>>>>
>>>>        // set the number of threads to process Kafka streams
>>>>        kafkaStreamer.setThreads(1);
>>>>
>>>>        // set Kafka consumer configurations
>>>>        kafkaStreamer.setConsumerConfig(consumerConfig);
>>>>
>>>>        // set decoders
>>>>        kafkaStreamer.setKeyDecoder(new StringDecoder(new
>>>> VerifiableProperties()));
>>>>        kafkaStreamer.setValueDecoder(new StringDecoder(new
>>>> VerifiableProperties()));
>>>>        kafkaStreamer.setMultipleTupleExtractor(new
>>>> StreamMultipleTupleExtractor<String, String, Person>() {
>>>>     @Override
>>>>     public Map<String, Person> extract(String msg) {
>>>>     Map<String, Person> entries = new HashMap<>();
>>>>     try {
>>>>     KafkaMessage request = Json.decodeValue(msg, KafkaMessage.class);
>>>>     IgniteCache<String, Person> cache = CacheManager.getCache();
>>>>
>>>>     if (CollectionUtils.isNotEmpty(request.getPersons())){
>>>>     String id = null;
>>>>     for (Person ib : request.getPersons()){
>>>>     if (StringUtils.isNotBlank(ib.getId())){
>>>>     id = ib.getId();
>>>>     if (null != ib.isDeleted() && Boolean.TRUE.equals(ib.isDeleted())){
>>>>     cache.remove(id);
>>>>     }else {
>>>>     // no need to store the id. so setting null.
>>>>     ib.setId(null);
>>>>     entries.put(id, ib);
>>>>     }
>>>>     }
>>>>     }
>>>>     }else {
>>>>
>>>>     }
>>>>     }catch (Exception ex){
>>>>     logger.error("Error while updating the cache - {} {} " ,msg, ex);
>>>>     }
>>>>
>>>>     return entries;
>>>>     }
>>>>     });
>>>>
>>>>        kafkaStreamer.start();
>>>>     }catch (Exception ex){
>>>>     logger.error("Error in kafka data streamer ", ex);
>>>>     }
>>>>
>>>>
>>>> Please let me know if you see any issues. thanks.
>>>>
>>>> On 3 November 2016 at 15:59, Anton Vinogradov <avinogra...@gridgain.com
>>>> > wrote:
>>>>
>>>>> Anil,
>>>>>
>>>>> Could you provide getStreamer() code and full logs?
>>>>> Possible, ignite node was disconnected and this cause DataStreamer
>>>>> closure.
>>>>>
>>>>> On Thu, Nov 3, 2016 at 1:17 PM, Anil <anilk...@gmail.com> wrote:
>>>>>
>>>>>> HI,
>>>>>>
>>>>>> I have created custom kafka data streamer for my use case and i see
>>>>>> following exception.
>>>>>>
>>>>>> java.lang.IllegalStateException: Data streamer has been closed.
>>>>>>         at org.apache.ignite.internal.pro
>>>>>> cessors.datastreamer.DataStreamerImpl.enterBusy(DataStreamer
>>>>>> Impl.java:360)
>>>>>>         at org.apache.ignite.internal.pro
>>>>>> cessors.datastreamer.DataStreamerImpl.addData(DataStreamerIm
>>>>>> pl.java:507)
>>>>>>         at org.apache.ignite.internal.pro
>>>>>> cessors.datastreamer.DataStreamerImpl.addData(DataStreamerIm
>>>>>> pl.java:498)
>>>>>>         at net.juniper.cs.cache.KafkaCach
>>>>>> eDataStreamer.addMessage(KafkaCacheDataStreamer.java:128)
>>>>>>         at net.juniper.cs.cache.KafkaCach
>>>>>> eDataStreamer$1.run(KafkaCacheDataStreamer.java:176)
>>>>>>         at java.util.concurrent.Executors
>>>>>> $RunnableAdapter.call(Executors.java:511)
>>>>>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>         at java.util.concurrent.ThreadPoo
>>>>>> lExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>>         at java.util.concurrent.ThreadPoo
>>>>>> lExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>>>
>>>>>>
>>>>>>
>>>>>> addMessage method is
>>>>>>
>>>>>>  @Override
>>>>>>     protected void addMessage(T msg) {
>>>>>>     if (getMultipleTupleExtractor() == null){
>>>>>>             Map.Entry<K, V> e = getSingleTupleExtractor().extr
>>>>>> act(msg);
>>>>>>
>>>>>>             if (e != null)
>>>>>>                 getStreamer().addData(e);
>>>>>>
>>>>>>         } else {
>>>>>>             Map<K, V> m = getMultipleTupleExtractor().extract(msg);
>>>>>>             if (m != null && !m.isEmpty()){
>>>>>>                 getStreamer().addData(m);
>>>>>>             }
>>>>>>         }
>>>>>>     }
>>>>>>
>>>>>>
>>>>>> Do you see any issue ? Please let me know if you need any additional
>>>>>> information. thanks.
>>>>>>
>>>>>> Thanks.
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>


-- 
Vladislav Pyatkov

Reply via email to