Re: resetLostPartitions is blocked inside event listener
Thank you Ilya. On Fri, Nov 8, 2019 at 12:43 AM Ilya Kasnacheev wrote: > Hello! > > Event listener is invoked synchronously from internal threads. If > partition reset has to happen from the same thread, then obviously there > will be a deadlock. > > Cache listeners have same property, i.e., you should avoid doing cache > operations from them. > > This is tradeoff between performance and usability which was resolved in > favor of former. > > Regards, > -- > Ilya Kasnacheev > > > чт, 7 нояб. 2019 г. в 20:30, Prasad Bhalerao >: > >> Do you mean to say, spawn a different thread from event listener and >> reset the lost partition in that thread? >> >> I tried this and it works. >> >> But wanted to understand the reason, why this call get blocked in event >> listener? >> >> Thanks, >> Prasad >> >> On Thu 7 Nov, 2019, 9:28 PM Ilya Kasnacheev > wrote: >> >>> Hello! >>> >>> It is not advisable to call any blocking methods from event listeners. >>> Just fire resetLostPartitions from another thread. >>> >>> Regards, >>> -- >>> Ilya Kasnacheev >>> >>> >>> чт, 7 нояб. 2019 г. в 15:17, Akash Shinde : >>> Hi, I am trying to handle lost partition scenario. I have written event listener listening to EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST event. I want to reset lost partition state of cache after cache loading is done. *Issue:* ignite.resetLostPartitions(caheName) is getting blocked and not completing. Please find the code for Event Listener. Someone can help on this. *Why this resetLostPartitions getting blocked.* public class IgniteEventListner implements IgnitePredicate { private static final Logger LOGGER = LoggerFactory.getLogger(IgniteEventListner.class); private final Ignite ignite; public IgniteEventListner(Ignite ignite) { this.ignite = ignite; } @Override public boolean apply(CacheRebalancingEvent evt) { IgniteCache cache = ignite.getOrCreateCache(CacheName.ASSET_GROUP_CACHE.name()); Collection lostPartitions = cache.lostPartitions(); reloadCache(lostPartitions); //perform partition based cache loading * ignite.resetLostPartitions(Arrays.asList(CacheName.ASSET_GROUP_CACHE.name())); //Reset partitions* System.out.println("Check-1, Partition lost event processed"); return true; } } *Cache Configuration* private CacheConfiguration assetGroupCacheCfg() { CacheConfiguration assetGroupCacheCfg = new CacheConfiguration<>(CacheName.ASSET_GROUP_CACHE.name()); assetGroupCacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); assetGroupCacheCfg.setWriteThrough(false); assetGroupCacheCfg.setReadThrough(false); assetGroupCacheCfg.setRebalanceMode(CacheRebalanceMode.ASYNC); assetGroupCacheCfg.setBackups(0); assetGroupCacheCfg.setCacheMode(CacheMode.PARTITIONED); assetGroupCacheCfg.setIndexedTypes(DefaultDataAffinityKey.class, AssetGroupData.class); assetGroupCacheCfg.setSqlIndexMaxInlineSize(100); RendezvousAffinityFunction affinityFunction = new RendezvousAffinityFunction(); assetGroupCacheCfg.setAffinity(affinityFunction); assetGroupCacheCfg.setStatisticsEnabled(true); assetGroupCacheCfg.setPartitionLossPolicy(PartitionLossPolicy.READ_WRITE_SAFE); return assetGroupCacheCfg; } *Ignite Configuration* private IgniteConfiguration getIgniteConfiguration() { TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(); String[] hosts = {"127.0.0.1:47500..47509"}; ipFinder.setAddresses(Arrays.asList(hosts)); TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); discoSpi.setIpFinder(ipFinder); IgniteConfiguration cfg = new IgniteConfiguration(); cfg.setDiscoverySpi(discoSpi); cfg.setIgniteInstanceName("springDataNode"); cfg.setPeerClassLoadingEnabled(false); cfg.setRebalanceThreadPoolSize(4); DataStorageConfiguration storageCfg = new DataStorageConfiguration(); DataRegionConfiguration regionConfiguration = new DataRegionConfiguration(); regionConfiguration.setInitialSize(3L * 1024 * 1024 * 1024); regionConfiguration.setMaxSize(3L * 1024 * 1024 * 1024); regionConfiguration.setMetricsEnabled(true); storageCfg.setDefaultDataRegionConfiguration(regionConfiguration); storageCfg.setStoragePath("c:/ignite-storage/storage"); storageCfg.setWalPath("c:/ignite-storage/storage/wal"); storageCfg.setWalArchivePath("c:/ignite-storage/storage/wal-archive"); storageCfg.setMetricsEnabled(true); cfg.setDataStorageConfiguration(storageCfg);
Re: resetLostPartitions is blocked inside event listener
Hello! Event listener is invoked synchronously from internal threads. If partition reset has to happen from the same thread, then obviously there will be a deadlock. Cache listeners have same property, i.e., you should avoid doing cache operations from them. This is tradeoff between performance and usability which was resolved in favor of former. Regards, -- Ilya Kasnacheev чт, 7 нояб. 2019 г. в 20:30, Prasad Bhalerao : > Do you mean to say, spawn a different thread from event listener and reset > the lost partition in that thread? > > I tried this and it works. > > But wanted to understand the reason, why this call get blocked in event > listener? > > Thanks, > Prasad > > On Thu 7 Nov, 2019, 9:28 PM Ilya Kasnacheev wrote: > >> Hello! >> >> It is not advisable to call any blocking methods from event listeners. >> Just fire resetLostPartitions from another thread. >> >> Regards, >> -- >> Ilya Kasnacheev >> >> >> чт, 7 нояб. 2019 г. в 15:17, Akash Shinde : >> >>> Hi, >>> I am trying to handle lost partition scenario. >>> I have written event listener listening to >>> EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST event. >>> I want to reset lost partition state of cache after cache loading is >>> done. >>> *Issue:* ignite.resetLostPartitions(caheName) is getting blocked and >>> not completing. >>> >>> Please find the code for Event Listener. Someone can help on this. *Why >>> this resetLostPartitions getting blocked.* >>> >>> public class IgniteEventListner implements >>> IgnitePredicate { >>>private static final Logger LOGGER = >>> LoggerFactory.getLogger(IgniteEventListner.class); >>> >>> private final Ignite ignite; >>> >>> public IgniteEventListner(Ignite ignite) { >>> this.ignite = ignite; >>> } >>> >>> @Override >>> public boolean apply(CacheRebalancingEvent evt) { >>> >>> IgniteCache cache = >>> ignite.getOrCreateCache(CacheName.ASSET_GROUP_CACHE.name()); >>> Collection lostPartitions = cache.lostPartitions(); >>> reloadCache(lostPartitions); //perform partition based cache loading >>> >>>* >>> ignite.resetLostPartitions(Arrays.asList(CacheName.ASSET_GROUP_CACHE.name())); >>> //Reset partitions* >>> >>> System.out.println("Check-1, Partition lost event processed"); >>> >>> return true; >>> } >>> } >>> >>> *Cache Configuration* >>> >>> private CacheConfiguration assetGroupCacheCfg() { >>> CacheConfiguration assetGroupCacheCfg = new >>> CacheConfiguration<>(CacheName.ASSET_GROUP_CACHE.name()); >>> assetGroupCacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); >>> assetGroupCacheCfg.setWriteThrough(false); >>> assetGroupCacheCfg.setReadThrough(false); >>> assetGroupCacheCfg.setRebalanceMode(CacheRebalanceMode.ASYNC); >>> assetGroupCacheCfg.setBackups(0); >>> assetGroupCacheCfg.setCacheMode(CacheMode.PARTITIONED); >>> assetGroupCacheCfg.setIndexedTypes(DefaultDataAffinityKey.class, >>> AssetGroupData.class); >>> assetGroupCacheCfg.setSqlIndexMaxInlineSize(100); >>> RendezvousAffinityFunction affinityFunction = new >>> RendezvousAffinityFunction(); >>> assetGroupCacheCfg.setAffinity(affinityFunction); >>> assetGroupCacheCfg.setStatisticsEnabled(true); >>> >>> assetGroupCacheCfg.setPartitionLossPolicy(PartitionLossPolicy.READ_WRITE_SAFE); >>> return assetGroupCacheCfg; >>> } >>> >>> *Ignite Configuration* >>> >>> private IgniteConfiguration getIgniteConfiguration() { >>> >>> TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(); >>> String[] hosts = {"127.0.0.1:47500..47509"}; >>> ipFinder.setAddresses(Arrays.asList(hosts)); >>> >>> TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); >>> discoSpi.setIpFinder(ipFinder); >>> >>> IgniteConfiguration cfg = new IgniteConfiguration(); >>> cfg.setDiscoverySpi(discoSpi); >>> cfg.setIgniteInstanceName("springDataNode"); >>> cfg.setPeerClassLoadingEnabled(false); >>> cfg.setRebalanceThreadPoolSize(4); >>> DataStorageConfiguration storageCfg = new DataStorageConfiguration(); >>> DataRegionConfiguration regionConfiguration = new >>> DataRegionConfiguration(); >>> regionConfiguration.setInitialSize(3L * 1024 * 1024 * 1024); >>> regionConfiguration.setMaxSize(3L * 1024 * 1024 * 1024); >>> regionConfiguration.setMetricsEnabled(true); >>> >>> storageCfg.setDefaultDataRegionConfiguration(regionConfiguration); >>> storageCfg.setStoragePath("c:/ignite-storage/storage"); >>> storageCfg.setWalPath("c:/ignite-storage/storage/wal"); >>> storageCfg.setWalArchivePath("c:/ignite-storage/storage/wal-archive"); >>> storageCfg.setMetricsEnabled(true); >>> cfg.setDataStorageConfiguration(storageCfg); >>> >>> cfg.setIncludeEventTypes(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST,EventType.EVT_NODE_FAILED); >>> cfg.setCacheConfiguration(getCacheConfigurations()); >>> return cfg; >>> } >>> >>> >>> Thanks, >>> >>> Akash >>> >>>
Re: resetLostPartitions is blocked inside event listener
Do you mean to say, spawn a different thread from event listener and reset the lost partition in that thread? I tried this and it works. But wanted to understand the reason, why this call get blocked in event listener? Thanks, Prasad On Thu 7 Nov, 2019, 9:28 PM Ilya Kasnacheev Hello! > > It is not advisable to call any blocking methods from event listeners. > Just fire resetLostPartitions from another thread. > > Regards, > -- > Ilya Kasnacheev > > > чт, 7 нояб. 2019 г. в 15:17, Akash Shinde : > >> Hi, >> I am trying to handle lost partition scenario. >> I have written event listener listening to >> EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST event. >> I want to reset lost partition state of cache after cache loading is >> done. >> *Issue:* ignite.resetLostPartitions(caheName) is getting blocked and not >> completing. >> >> Please find the code for Event Listener. Someone can help on this. *Why >> this resetLostPartitions getting blocked.* >> >> public class IgniteEventListner implements >> IgnitePredicate { >>private static final Logger LOGGER = >> LoggerFactory.getLogger(IgniteEventListner.class); >> >> private final Ignite ignite; >> >> public IgniteEventListner(Ignite ignite) { >> this.ignite = ignite; >> } >> >> @Override >> public boolean apply(CacheRebalancingEvent evt) { >> >> IgniteCache cache = >> ignite.getOrCreateCache(CacheName.ASSET_GROUP_CACHE.name()); >> Collection lostPartitions = cache.lostPartitions(); >> reloadCache(lostPartitions); //perform partition based cache loading >> >>* >> ignite.resetLostPartitions(Arrays.asList(CacheName.ASSET_GROUP_CACHE.name())); >> //Reset partitions* >> >> System.out.println("Check-1, Partition lost event processed"); >> >> return true; >> } >> } >> >> *Cache Configuration* >> >> private CacheConfiguration assetGroupCacheCfg() { >> CacheConfiguration assetGroupCacheCfg = new >> CacheConfiguration<>(CacheName.ASSET_GROUP_CACHE.name()); >> assetGroupCacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); >> assetGroupCacheCfg.setWriteThrough(false); >> assetGroupCacheCfg.setReadThrough(false); >> assetGroupCacheCfg.setRebalanceMode(CacheRebalanceMode.ASYNC); >> assetGroupCacheCfg.setBackups(0); >> assetGroupCacheCfg.setCacheMode(CacheMode.PARTITIONED); >> assetGroupCacheCfg.setIndexedTypes(DefaultDataAffinityKey.class, >> AssetGroupData.class); >> assetGroupCacheCfg.setSqlIndexMaxInlineSize(100); >> RendezvousAffinityFunction affinityFunction = new >> RendezvousAffinityFunction(); >> assetGroupCacheCfg.setAffinity(affinityFunction); >> assetGroupCacheCfg.setStatisticsEnabled(true); >> >> assetGroupCacheCfg.setPartitionLossPolicy(PartitionLossPolicy.READ_WRITE_SAFE); >> return assetGroupCacheCfg; >> } >> >> *Ignite Configuration* >> >> private IgniteConfiguration getIgniteConfiguration() { >> >> TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(); >> String[] hosts = {"127.0.0.1:47500..47509"}; >> ipFinder.setAddresses(Arrays.asList(hosts)); >> >> TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); >> discoSpi.setIpFinder(ipFinder); >> >> IgniteConfiguration cfg = new IgniteConfiguration(); >> cfg.setDiscoverySpi(discoSpi); >> cfg.setIgniteInstanceName("springDataNode"); >> cfg.setPeerClassLoadingEnabled(false); >> cfg.setRebalanceThreadPoolSize(4); >> DataStorageConfiguration storageCfg = new DataStorageConfiguration(); >> DataRegionConfiguration regionConfiguration = new >> DataRegionConfiguration(); >> regionConfiguration.setInitialSize(3L * 1024 * 1024 * 1024); >> regionConfiguration.setMaxSize(3L * 1024 * 1024 * 1024); >> regionConfiguration.setMetricsEnabled(true); >> >> storageCfg.setDefaultDataRegionConfiguration(regionConfiguration); >> storageCfg.setStoragePath("c:/ignite-storage/storage"); >> storageCfg.setWalPath("c:/ignite-storage/storage/wal"); >> storageCfg.setWalArchivePath("c:/ignite-storage/storage/wal-archive"); >> storageCfg.setMetricsEnabled(true); >> cfg.setDataStorageConfiguration(storageCfg); >> >> cfg.setIncludeEventTypes(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST,EventType.EVT_NODE_FAILED); >> cfg.setCacheConfiguration(getCacheConfigurations()); >> return cfg; >> } >> >> >> Thanks, >> >> Akash >> >>
Re: resetLostPartitions is blocked inside event listener
Hello! It is not advisable to call any blocking methods from event listeners. Just fire resetLostPartitions from another thread. Regards, -- Ilya Kasnacheev чт, 7 нояб. 2019 г. в 15:17, Akash Shinde : > Hi, > I am trying to handle lost partition scenario. > I have written event listener listening to > EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST event. > I want to reset lost partition state of cache after cache loading is done. > *Issue:* ignite.resetLostPartitions(caheName) is getting blocked and not > completing. > > Please find the code for Event Listener. Someone can help on this. *Why > this resetLostPartitions getting blocked.* > > public class IgniteEventListner implements > IgnitePredicate { >private static final Logger LOGGER = > LoggerFactory.getLogger(IgniteEventListner.class); > > private final Ignite ignite; > > public IgniteEventListner(Ignite ignite) { > this.ignite = ignite; > } > > @Override > public boolean apply(CacheRebalancingEvent evt) { > > IgniteCache cache = > ignite.getOrCreateCache(CacheName.ASSET_GROUP_CACHE.name()); > Collection lostPartitions = cache.lostPartitions(); > reloadCache(lostPartitions); //perform partition based cache loading > >* > ignite.resetLostPartitions(Arrays.asList(CacheName.ASSET_GROUP_CACHE.name())); > //Reset partitions* > > System.out.println("Check-1, Partition lost event processed"); > > return true; > } > } > > *Cache Configuration* > > private CacheConfiguration assetGroupCacheCfg() { > CacheConfiguration assetGroupCacheCfg = new > CacheConfiguration<>(CacheName.ASSET_GROUP_CACHE.name()); > assetGroupCacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); > assetGroupCacheCfg.setWriteThrough(false); > assetGroupCacheCfg.setReadThrough(false); > assetGroupCacheCfg.setRebalanceMode(CacheRebalanceMode.ASYNC); > assetGroupCacheCfg.setBackups(0); > assetGroupCacheCfg.setCacheMode(CacheMode.PARTITIONED); > assetGroupCacheCfg.setIndexedTypes(DefaultDataAffinityKey.class, > AssetGroupData.class); > assetGroupCacheCfg.setSqlIndexMaxInlineSize(100); > RendezvousAffinityFunction affinityFunction = new > RendezvousAffinityFunction(); > assetGroupCacheCfg.setAffinity(affinityFunction); > assetGroupCacheCfg.setStatisticsEnabled(true); > > assetGroupCacheCfg.setPartitionLossPolicy(PartitionLossPolicy.READ_WRITE_SAFE); > return assetGroupCacheCfg; > } > > *Ignite Configuration* > > private IgniteConfiguration getIgniteConfiguration() { > > TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(); > String[] hosts = {"127.0.0.1:47500..47509"}; > ipFinder.setAddresses(Arrays.asList(hosts)); > > TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); > discoSpi.setIpFinder(ipFinder); > > IgniteConfiguration cfg = new IgniteConfiguration(); > cfg.setDiscoverySpi(discoSpi); > cfg.setIgniteInstanceName("springDataNode"); > cfg.setPeerClassLoadingEnabled(false); > cfg.setRebalanceThreadPoolSize(4); > DataStorageConfiguration storageCfg = new DataStorageConfiguration(); > DataRegionConfiguration regionConfiguration = new DataRegionConfiguration(); > regionConfiguration.setInitialSize(3L * 1024 * 1024 * 1024); > regionConfiguration.setMaxSize(3L * 1024 * 1024 * 1024); > regionConfiguration.setMetricsEnabled(true); > > storageCfg.setDefaultDataRegionConfiguration(regionConfiguration); > storageCfg.setStoragePath("c:/ignite-storage/storage"); > storageCfg.setWalPath("c:/ignite-storage/storage/wal"); > storageCfg.setWalArchivePath("c:/ignite-storage/storage/wal-archive"); > storageCfg.setMetricsEnabled(true); > cfg.setDataStorageConfiguration(storageCfg); > > cfg.setIncludeEventTypes(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST,EventType.EVT_NODE_FAILED); > cfg.setCacheConfiguration(getCacheConfigurations()); > return cfg; > } > > > Thanks, > > Akash > >