Hello!

It is not advisable to call any blocking methods from event listeners. Just
fire resetLostPartitions from another thread.

Regards,
-- 
Ilya Kasnacheev


чт, 7 нояб. 2019 г. в 15:17, Akash Shinde <akashshi...@gmail.com>:

> Hi,
> I am trying to handle lost partition scenario.
> I have written event listener listening  to
> EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST event.
> I want to reset lost partition state of cache after cache loading  is done.
> *Issue:* ignite.resetLostPartitions(caheName) is getting blocked and not
> completing.
>
> Please find the code for Event Listener. Someone can help on this. *Why
> this resetLostPartitions getting blocked.*
>
> public class IgniteEventListner implements 
> IgnitePredicate<CacheRebalancingEvent> {
>    private static final Logger LOGGER = 
> LoggerFactory.getLogger(IgniteEventListner.class);
>
>   private final Ignite ignite;
>
>   public IgniteEventListner(Ignite ignite) {
>     this.ignite = ignite;
>   }
>
>   @Override
>   public boolean apply(CacheRebalancingEvent evt) {
>
>     IgniteCache<DefaultDataAffinityKey, AssetGroupData> cache = 
> ignite.getOrCreateCache(CacheName.ASSET_GROUP_CACHE.name());
>     Collection<Integer> lostPartitions = cache.lostPartitions();
>     reloadCache(lostPartitions); //perform partition based cache loading
>
>    * 
> ignite.resetLostPartitions(Arrays.asList(CacheName.ASSET_GROUP_CACHE.name()));
>   //Reset partitions*
>
>     System.out.println("Check-1, Partition lost event processed");
>
>     return true;
>   }
> }
>
> *Cache Configuration*
>
> private CacheConfiguration assetGroupCacheCfg() {
>     CacheConfiguration assetGroupCacheCfg = new 
> CacheConfiguration<>(CacheName.ASSET_GROUP_CACHE.name());
>     assetGroupCacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
>     assetGroupCacheCfg.setWriteThrough(false);
>     assetGroupCacheCfg.setReadThrough(false);
>     assetGroupCacheCfg.setRebalanceMode(CacheRebalanceMode.ASYNC);
>     assetGroupCacheCfg.setBackups(0);
>     assetGroupCacheCfg.setCacheMode(CacheMode.PARTITIONED);
>     assetGroupCacheCfg.setIndexedTypes(DefaultDataAffinityKey.class, 
> AssetGroupData.class);
>     assetGroupCacheCfg.setSqlIndexMaxInlineSize(100);
>     RendezvousAffinityFunction affinityFunction = new 
> RendezvousAffinityFunction();
>     assetGroupCacheCfg.setAffinity(affinityFunction);
>     assetGroupCacheCfg.setStatisticsEnabled(true);
>    
> assetGroupCacheCfg.setPartitionLossPolicy(PartitionLossPolicy.READ_WRITE_SAFE);
>     return assetGroupCacheCfg;
>   }
>
> *Ignite Configuration*
>
> private IgniteConfiguration getIgniteConfiguration() {
>
>   TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
>   String[] hosts = {"127.0.0.1:47500..47509"};
>   ipFinder.setAddresses(Arrays.asList(hosts));
>
>   TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
>   discoSpi.setIpFinder(ipFinder);
>
>   IgniteConfiguration cfg = new IgniteConfiguration();
>   cfg.setDiscoverySpi(discoSpi);
>   cfg.setIgniteInstanceName("springDataNode");
>   cfg.setPeerClassLoadingEnabled(false);
>   cfg.setRebalanceThreadPoolSize(4);
>   DataStorageConfiguration storageCfg = new DataStorageConfiguration();
>   DataRegionConfiguration regionConfiguration = new DataRegionConfiguration();
>   regionConfiguration.setInitialSize(3L * 1024 * 1024 * 1024);
>   regionConfiguration.setMaxSize(3L * 1024 * 1024 * 1024);
>   regionConfiguration.setMetricsEnabled(true);
>
>   storageCfg.setDefaultDataRegionConfiguration(regionConfiguration);
>   storageCfg.setStoragePath("c:/ignite-storage/storage");
>   storageCfg.setWalPath("c:/ignite-storage/storage/wal");
>   storageCfg.setWalArchivePath("c:/ignite-storage/storage/wal-archive");
>   storageCfg.setMetricsEnabled(true);
>   cfg.setDataStorageConfiguration(storageCfg);
>   
> cfg.setIncludeEventTypes(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST,EventType.EVT_NODE_FAILED);
>   cfg.setCacheConfiguration(getCacheConfigurations());
>   return cfg;
> }
>
>
> Thanks,
>
> Akash
>
>

Reply via email to