Re: resetLostPartitions is blocked inside event listener

2019-11-14 Thread Akash Shinde
Thank you Ilya.

On Fri, Nov 8, 2019 at 12:43 AM Ilya Kasnacheev 
wrote:

> Hello!
>
> Event listener is invoked synchronously from internal threads. If
> partition reset has to happen from the same thread, then obviously there
> will be a deadlock.
>
> Cache listeners have same property, i.e., you should avoid doing cache
> operations from them.
>
> This is tradeoff between performance and usability which was resolved in
> favor of former.
>
> Regards,
> --
> Ilya Kasnacheev
>
>
> чт, 7 нояб. 2019 г. в 20:30, Prasad Bhalerao  >:
>
>> Do you mean to say, spawn a different thread from event listener and
>> reset the lost partition in that thread?
>>
>> I tried this and it works.
>>
>> But wanted to understand the reason, why this call get blocked in event
>> listener?
>>
>> Thanks,
>> Prasad
>>
>> On Thu 7 Nov, 2019, 9:28 PM Ilya Kasnacheev > wrote:
>>
>>> Hello!
>>>
>>> It is not advisable to call any blocking methods from event listeners.
>>> Just fire resetLostPartitions from another thread.
>>>
>>> Regards,
>>> --
>>> Ilya Kasnacheev
>>>
>>>
>>> чт, 7 нояб. 2019 г. в 15:17, Akash Shinde :
>>>
 Hi,
 I am trying to handle lost partition scenario.
 I have written event listener listening  to
 EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST event.
 I want to reset lost partition state of cache after cache loading  is
 done.
 *Issue:* ignite.resetLostPartitions(caheName) is getting blocked and
 not completing.

 Please find the code for Event Listener. Someone can help on this. *Why
 this resetLostPartitions getting blocked.*

 public class IgniteEventListner implements 
 IgnitePredicate {
private static final Logger LOGGER = 
 LoggerFactory.getLogger(IgniteEventListner.class);

   private final Ignite ignite;

   public IgniteEventListner(Ignite ignite) {
 this.ignite = ignite;
   }

   @Override
   public boolean apply(CacheRebalancingEvent evt) {

 IgniteCache cache = 
 ignite.getOrCreateCache(CacheName.ASSET_GROUP_CACHE.name());
 Collection lostPartitions = cache.lostPartitions();
 reloadCache(lostPartitions); //perform partition based cache loading

* 
 ignite.resetLostPartitions(Arrays.asList(CacheName.ASSET_GROUP_CACHE.name()));
   //Reset partitions*

 System.out.println("Check-1, Partition lost event processed");

 return true;
   }
 }

 *Cache Configuration*

 private CacheConfiguration assetGroupCacheCfg() {
 CacheConfiguration assetGroupCacheCfg = new 
 CacheConfiguration<>(CacheName.ASSET_GROUP_CACHE.name());
 assetGroupCacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
 assetGroupCacheCfg.setWriteThrough(false);
 assetGroupCacheCfg.setReadThrough(false);
 assetGroupCacheCfg.setRebalanceMode(CacheRebalanceMode.ASYNC);
 assetGroupCacheCfg.setBackups(0);
 assetGroupCacheCfg.setCacheMode(CacheMode.PARTITIONED);
 assetGroupCacheCfg.setIndexedTypes(DefaultDataAffinityKey.class, 
 AssetGroupData.class);
 assetGroupCacheCfg.setSqlIndexMaxInlineSize(100);
 RendezvousAffinityFunction affinityFunction = new 
 RendezvousAffinityFunction();
 assetGroupCacheCfg.setAffinity(affinityFunction);
 assetGroupCacheCfg.setStatisticsEnabled(true);

 assetGroupCacheCfg.setPartitionLossPolicy(PartitionLossPolicy.READ_WRITE_SAFE);
 return assetGroupCacheCfg;
   }

 *Ignite Configuration*

 private IgniteConfiguration getIgniteConfiguration() {

   TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
   String[] hosts = {"127.0.0.1:47500..47509"};
   ipFinder.setAddresses(Arrays.asList(hosts));

   TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
   discoSpi.setIpFinder(ipFinder);

   IgniteConfiguration cfg = new IgniteConfiguration();
   cfg.setDiscoverySpi(discoSpi);
   cfg.setIgniteInstanceName("springDataNode");
   cfg.setPeerClassLoadingEnabled(false);
   cfg.setRebalanceThreadPoolSize(4);
   DataStorageConfiguration storageCfg = new DataStorageConfiguration();
   DataRegionConfiguration regionConfiguration = new 
 DataRegionConfiguration();
   regionConfiguration.setInitialSize(3L * 1024 * 1024 * 1024);
   regionConfiguration.setMaxSize(3L * 1024 * 1024 * 1024);
   regionConfiguration.setMetricsEnabled(true);

   storageCfg.setDefaultDataRegionConfiguration(regionConfiguration);
   storageCfg.setStoragePath("c:/ignite-storage/storage");
   storageCfg.setWalPath("c:/ignite-storage/storage/wal");
   storageCfg.setWalArchivePath("c:/ignite-storage/storage/wal-archive");
   storageCfg.setMetricsEnabled(true);
   cfg.setDataStorageConfiguration(storageCfg);
   
 

Re: resetLostPartitions is blocked inside event listener

2019-11-07 Thread Ilya Kasnacheev
Hello!

Event listener is invoked synchronously from internal threads. If partition
reset has to happen from the same thread, then obviously there will be a
deadlock.

Cache listeners have same property, i.e., you should avoid doing cache
operations from them.

This is tradeoff between performance and usability which was resolved in
favor of former.

Regards,
-- 
Ilya Kasnacheev


чт, 7 нояб. 2019 г. в 20:30, Prasad Bhalerao :

> Do you mean to say, spawn a different thread from event listener and reset
> the lost partition in that thread?
>
> I tried this and it works.
>
> But wanted to understand the reason, why this call get blocked in event
> listener?
>
> Thanks,
> Prasad
>
> On Thu 7 Nov, 2019, 9:28 PM Ilya Kasnacheev  wrote:
>
>> Hello!
>>
>> It is not advisable to call any blocking methods from event listeners.
>> Just fire resetLostPartitions from another thread.
>>
>> Regards,
>> --
>> Ilya Kasnacheev
>>
>>
>> чт, 7 нояб. 2019 г. в 15:17, Akash Shinde :
>>
>>> Hi,
>>> I am trying to handle lost partition scenario.
>>> I have written event listener listening  to
>>> EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST event.
>>> I want to reset lost partition state of cache after cache loading  is
>>> done.
>>> *Issue:* ignite.resetLostPartitions(caheName) is getting blocked and
>>> not completing.
>>>
>>> Please find the code for Event Listener. Someone can help on this. *Why
>>> this resetLostPartitions getting blocked.*
>>>
>>> public class IgniteEventListner implements 
>>> IgnitePredicate {
>>>private static final Logger LOGGER = 
>>> LoggerFactory.getLogger(IgniteEventListner.class);
>>>
>>>   private final Ignite ignite;
>>>
>>>   public IgniteEventListner(Ignite ignite) {
>>> this.ignite = ignite;
>>>   }
>>>
>>>   @Override
>>>   public boolean apply(CacheRebalancingEvent evt) {
>>>
>>> IgniteCache cache = 
>>> ignite.getOrCreateCache(CacheName.ASSET_GROUP_CACHE.name());
>>> Collection lostPartitions = cache.lostPartitions();
>>> reloadCache(lostPartitions); //perform partition based cache loading
>>>
>>>* 
>>> ignite.resetLostPartitions(Arrays.asList(CacheName.ASSET_GROUP_CACHE.name()));
>>>   //Reset partitions*
>>>
>>> System.out.println("Check-1, Partition lost event processed");
>>>
>>> return true;
>>>   }
>>> }
>>>
>>> *Cache Configuration*
>>>
>>> private CacheConfiguration assetGroupCacheCfg() {
>>> CacheConfiguration assetGroupCacheCfg = new 
>>> CacheConfiguration<>(CacheName.ASSET_GROUP_CACHE.name());
>>> assetGroupCacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
>>> assetGroupCacheCfg.setWriteThrough(false);
>>> assetGroupCacheCfg.setReadThrough(false);
>>> assetGroupCacheCfg.setRebalanceMode(CacheRebalanceMode.ASYNC);
>>> assetGroupCacheCfg.setBackups(0);
>>> assetGroupCacheCfg.setCacheMode(CacheMode.PARTITIONED);
>>> assetGroupCacheCfg.setIndexedTypes(DefaultDataAffinityKey.class, 
>>> AssetGroupData.class);
>>> assetGroupCacheCfg.setSqlIndexMaxInlineSize(100);
>>> RendezvousAffinityFunction affinityFunction = new 
>>> RendezvousAffinityFunction();
>>> assetGroupCacheCfg.setAffinity(affinityFunction);
>>> assetGroupCacheCfg.setStatisticsEnabled(true);
>>>
>>> assetGroupCacheCfg.setPartitionLossPolicy(PartitionLossPolicy.READ_WRITE_SAFE);
>>> return assetGroupCacheCfg;
>>>   }
>>>
>>> *Ignite Configuration*
>>>
>>> private IgniteConfiguration getIgniteConfiguration() {
>>>
>>>   TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
>>>   String[] hosts = {"127.0.0.1:47500..47509"};
>>>   ipFinder.setAddresses(Arrays.asList(hosts));
>>>
>>>   TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
>>>   discoSpi.setIpFinder(ipFinder);
>>>
>>>   IgniteConfiguration cfg = new IgniteConfiguration();
>>>   cfg.setDiscoverySpi(discoSpi);
>>>   cfg.setIgniteInstanceName("springDataNode");
>>>   cfg.setPeerClassLoadingEnabled(false);
>>>   cfg.setRebalanceThreadPoolSize(4);
>>>   DataStorageConfiguration storageCfg = new DataStorageConfiguration();
>>>   DataRegionConfiguration regionConfiguration = new 
>>> DataRegionConfiguration();
>>>   regionConfiguration.setInitialSize(3L * 1024 * 1024 * 1024);
>>>   regionConfiguration.setMaxSize(3L * 1024 * 1024 * 1024);
>>>   regionConfiguration.setMetricsEnabled(true);
>>>
>>>   storageCfg.setDefaultDataRegionConfiguration(regionConfiguration);
>>>   storageCfg.setStoragePath("c:/ignite-storage/storage");
>>>   storageCfg.setWalPath("c:/ignite-storage/storage/wal");
>>>   storageCfg.setWalArchivePath("c:/ignite-storage/storage/wal-archive");
>>>   storageCfg.setMetricsEnabled(true);
>>>   cfg.setDataStorageConfiguration(storageCfg);
>>>   
>>> cfg.setIncludeEventTypes(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST,EventType.EVT_NODE_FAILED);
>>>   cfg.setCacheConfiguration(getCacheConfigurations());
>>>   return cfg;
>>> }
>>>
>>>
>>> Thanks,
>>>
>>> Akash
>>>
>>>


Re: resetLostPartitions is blocked inside event listener

2019-11-07 Thread Prasad Bhalerao
Do you mean to say, spawn a different thread from event listener and reset
the lost partition in that thread?

I tried this and it works.

But wanted to understand the reason, why this call get blocked in event
listener?

Thanks,
Prasad

On Thu 7 Nov, 2019, 9:28 PM Ilya Kasnacheev  Hello!
>
> It is not advisable to call any blocking methods from event listeners.
> Just fire resetLostPartitions from another thread.
>
> Regards,
> --
> Ilya Kasnacheev
>
>
> чт, 7 нояб. 2019 г. в 15:17, Akash Shinde :
>
>> Hi,
>> I am trying to handle lost partition scenario.
>> I have written event listener listening  to
>> EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST event.
>> I want to reset lost partition state of cache after cache loading  is
>> done.
>> *Issue:* ignite.resetLostPartitions(caheName) is getting blocked and not
>> completing.
>>
>> Please find the code for Event Listener. Someone can help on this. *Why
>> this resetLostPartitions getting blocked.*
>>
>> public class IgniteEventListner implements 
>> IgnitePredicate {
>>private static final Logger LOGGER = 
>> LoggerFactory.getLogger(IgniteEventListner.class);
>>
>>   private final Ignite ignite;
>>
>>   public IgniteEventListner(Ignite ignite) {
>> this.ignite = ignite;
>>   }
>>
>>   @Override
>>   public boolean apply(CacheRebalancingEvent evt) {
>>
>> IgniteCache cache = 
>> ignite.getOrCreateCache(CacheName.ASSET_GROUP_CACHE.name());
>> Collection lostPartitions = cache.lostPartitions();
>> reloadCache(lostPartitions); //perform partition based cache loading
>>
>>* 
>> ignite.resetLostPartitions(Arrays.asList(CacheName.ASSET_GROUP_CACHE.name()));
>>   //Reset partitions*
>>
>> System.out.println("Check-1, Partition lost event processed");
>>
>> return true;
>>   }
>> }
>>
>> *Cache Configuration*
>>
>> private CacheConfiguration assetGroupCacheCfg() {
>> CacheConfiguration assetGroupCacheCfg = new 
>> CacheConfiguration<>(CacheName.ASSET_GROUP_CACHE.name());
>> assetGroupCacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
>> assetGroupCacheCfg.setWriteThrough(false);
>> assetGroupCacheCfg.setReadThrough(false);
>> assetGroupCacheCfg.setRebalanceMode(CacheRebalanceMode.ASYNC);
>> assetGroupCacheCfg.setBackups(0);
>> assetGroupCacheCfg.setCacheMode(CacheMode.PARTITIONED);
>> assetGroupCacheCfg.setIndexedTypes(DefaultDataAffinityKey.class, 
>> AssetGroupData.class);
>> assetGroupCacheCfg.setSqlIndexMaxInlineSize(100);
>> RendezvousAffinityFunction affinityFunction = new 
>> RendezvousAffinityFunction();
>> assetGroupCacheCfg.setAffinity(affinityFunction);
>> assetGroupCacheCfg.setStatisticsEnabled(true);
>>
>> assetGroupCacheCfg.setPartitionLossPolicy(PartitionLossPolicy.READ_WRITE_SAFE);
>> return assetGroupCacheCfg;
>>   }
>>
>> *Ignite Configuration*
>>
>> private IgniteConfiguration getIgniteConfiguration() {
>>
>>   TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
>>   String[] hosts = {"127.0.0.1:47500..47509"};
>>   ipFinder.setAddresses(Arrays.asList(hosts));
>>
>>   TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
>>   discoSpi.setIpFinder(ipFinder);
>>
>>   IgniteConfiguration cfg = new IgniteConfiguration();
>>   cfg.setDiscoverySpi(discoSpi);
>>   cfg.setIgniteInstanceName("springDataNode");
>>   cfg.setPeerClassLoadingEnabled(false);
>>   cfg.setRebalanceThreadPoolSize(4);
>>   DataStorageConfiguration storageCfg = new DataStorageConfiguration();
>>   DataRegionConfiguration regionConfiguration = new 
>> DataRegionConfiguration();
>>   regionConfiguration.setInitialSize(3L * 1024 * 1024 * 1024);
>>   regionConfiguration.setMaxSize(3L * 1024 * 1024 * 1024);
>>   regionConfiguration.setMetricsEnabled(true);
>>
>>   storageCfg.setDefaultDataRegionConfiguration(regionConfiguration);
>>   storageCfg.setStoragePath("c:/ignite-storage/storage");
>>   storageCfg.setWalPath("c:/ignite-storage/storage/wal");
>>   storageCfg.setWalArchivePath("c:/ignite-storage/storage/wal-archive");
>>   storageCfg.setMetricsEnabled(true);
>>   cfg.setDataStorageConfiguration(storageCfg);
>>   
>> cfg.setIncludeEventTypes(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST,EventType.EVT_NODE_FAILED);
>>   cfg.setCacheConfiguration(getCacheConfigurations());
>>   return cfg;
>> }
>>
>>
>> Thanks,
>>
>> Akash
>>
>>


Re: resetLostPartitions is blocked inside event listener

2019-11-07 Thread Ilya Kasnacheev
Hello!

It is not advisable to call any blocking methods from event listeners. Just
fire resetLostPartitions from another thread.

Regards,
-- 
Ilya Kasnacheev


чт, 7 нояб. 2019 г. в 15:17, Akash Shinde :

> Hi,
> I am trying to handle lost partition scenario.
> I have written event listener listening  to
> EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST event.
> I want to reset lost partition state of cache after cache loading  is done.
> *Issue:* ignite.resetLostPartitions(caheName) is getting blocked and not
> completing.
>
> Please find the code for Event Listener. Someone can help on this. *Why
> this resetLostPartitions getting blocked.*
>
> public class IgniteEventListner implements 
> IgnitePredicate {
>private static final Logger LOGGER = 
> LoggerFactory.getLogger(IgniteEventListner.class);
>
>   private final Ignite ignite;
>
>   public IgniteEventListner(Ignite ignite) {
> this.ignite = ignite;
>   }
>
>   @Override
>   public boolean apply(CacheRebalancingEvent evt) {
>
> IgniteCache cache = 
> ignite.getOrCreateCache(CacheName.ASSET_GROUP_CACHE.name());
> Collection lostPartitions = cache.lostPartitions();
> reloadCache(lostPartitions); //perform partition based cache loading
>
>* 
> ignite.resetLostPartitions(Arrays.asList(CacheName.ASSET_GROUP_CACHE.name()));
>   //Reset partitions*
>
> System.out.println("Check-1, Partition lost event processed");
>
> return true;
>   }
> }
>
> *Cache Configuration*
>
> private CacheConfiguration assetGroupCacheCfg() {
> CacheConfiguration assetGroupCacheCfg = new 
> CacheConfiguration<>(CacheName.ASSET_GROUP_CACHE.name());
> assetGroupCacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
> assetGroupCacheCfg.setWriteThrough(false);
> assetGroupCacheCfg.setReadThrough(false);
> assetGroupCacheCfg.setRebalanceMode(CacheRebalanceMode.ASYNC);
> assetGroupCacheCfg.setBackups(0);
> assetGroupCacheCfg.setCacheMode(CacheMode.PARTITIONED);
> assetGroupCacheCfg.setIndexedTypes(DefaultDataAffinityKey.class, 
> AssetGroupData.class);
> assetGroupCacheCfg.setSqlIndexMaxInlineSize(100);
> RendezvousAffinityFunction affinityFunction = new 
> RendezvousAffinityFunction();
> assetGroupCacheCfg.setAffinity(affinityFunction);
> assetGroupCacheCfg.setStatisticsEnabled(true);
>
> assetGroupCacheCfg.setPartitionLossPolicy(PartitionLossPolicy.READ_WRITE_SAFE);
> return assetGroupCacheCfg;
>   }
>
> *Ignite Configuration*
>
> private IgniteConfiguration getIgniteConfiguration() {
>
>   TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
>   String[] hosts = {"127.0.0.1:47500..47509"};
>   ipFinder.setAddresses(Arrays.asList(hosts));
>
>   TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
>   discoSpi.setIpFinder(ipFinder);
>
>   IgniteConfiguration cfg = new IgniteConfiguration();
>   cfg.setDiscoverySpi(discoSpi);
>   cfg.setIgniteInstanceName("springDataNode");
>   cfg.setPeerClassLoadingEnabled(false);
>   cfg.setRebalanceThreadPoolSize(4);
>   DataStorageConfiguration storageCfg = new DataStorageConfiguration();
>   DataRegionConfiguration regionConfiguration = new DataRegionConfiguration();
>   regionConfiguration.setInitialSize(3L * 1024 * 1024 * 1024);
>   regionConfiguration.setMaxSize(3L * 1024 * 1024 * 1024);
>   regionConfiguration.setMetricsEnabled(true);
>
>   storageCfg.setDefaultDataRegionConfiguration(regionConfiguration);
>   storageCfg.setStoragePath("c:/ignite-storage/storage");
>   storageCfg.setWalPath("c:/ignite-storage/storage/wal");
>   storageCfg.setWalArchivePath("c:/ignite-storage/storage/wal-archive");
>   storageCfg.setMetricsEnabled(true);
>   cfg.setDataStorageConfiguration(storageCfg);
>   
> cfg.setIncludeEventTypes(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST,EventType.EVT_NODE_FAILED);
>   cfg.setCacheConfiguration(getCacheConfigurations());
>   return cfg;
> }
>
>
> Thanks,
>
> Akash
>
>