[ 
https://issues.apache.org/jira/browse/IGNITE-2145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Valentin Kulichenko resolved IGNITE-2145.
-----------------------------------------
    Resolution: Won't Fix

{{EntryProcessor}} has to be invoked atomically, regardless of whether you're 
executing it synchronously or asynchronously. This means that the entry will be 
locked during the execution.

If you need to execute a long-running task collocated with the data, you can 
use {{IgniteCompute.affinityRun()}} method instead, which sends a closure to 
the node where the entry is stored without locking it.

> Async cache operations on primary node might block
> --------------------------------------------------
>
>                 Key: IGNITE-2145
>                 URL: https://issues.apache.org/jira/browse/IGNITE-2145
>             Project: Ignite
>          Issue Type: Bug
>    Affects Versions: ignite-1.4
>         Environment: Windows 8.1 64 bit
> java version "1.8.0_51"
> Java(TM) SE Runtime Environment (build 1.8.0_51-b16)
> Java HotSpot(TM) 64-Bit Server VM (build 25.51-b03, mixed mode)
> Ignite 1.4.0
>            Reporter: Avihai Berkovitz
>
> When executing async operations on caches the initial function call is 
> supposed to return immediately, without regard to cache state, network 
> traffic, etc. If we reference a cache key on a different node it works. But 
> if the primary node for a cache key is the same node that initiate the 
> operation everything is executed synchronously.
> This is probably an optimization, but can have side effects. For example, if 
> a long running CacheProcessor is invoked on this key, every operation will 
> block.
> Here is some code to illustrate the problem. Notice that there is a 4 seconds 
> delay between "Before async op" and "After async op". If you reverse the 
> isPrimary() condition it won't happen.
> {code}
> IgniteConfiguration igniteConfiguration = new IgniteConfiguration()
>               .setFailoverSpi(new AlwaysFailoverSpi())
>               .setGridLogger(new Slf4jLogger())
>               .setPeerClassLoadingEnabled(false)
>               .setDeploymentMode(DeploymentMode.CONTINUOUS);
> Ignite ignite1 = Ignition.start(igniteConfiguration);
> Ignite ignite2 = Ignition.start(igniteConfiguration.setGridName("2"));
> assert ignite1.cluster().nodes().size() == 2;
> assert ignite2.cluster().nodes().size() == 2;
> CacheConfiguration<String, Integer> cacheConfiguration = new 
> CacheConfiguration<String, Integer>()
>               .setName("test")
>               .setCacheMode(CacheMode.PARTITIONED)
>               .setAtomicityMode(CacheAtomicityMode.ATOMIC)
>               
> .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
> IgniteCache<String, Integer> cache1 = 
> ignite1.getOrCreateCache(cacheConfiguration);
> IgniteCache<String, Integer> cache2 = 
> ignite2.getOrCreateCache(cacheConfiguration);
> // Find key mapped to the first node
> String key = IntStream.range(0, 100)
>               .mapToObj(value -> "HI" + value)
>               .filter(s -> 
> ignite1.affinity(cacheConfiguration.getName()).isPrimary(ignite1.cluster().localNode(),
>  s))
>               .findFirst().get();
> // Run a blocked processor from node 2
> Thread thread = new Thread() {
>       @Override
>       public void run() {
>               System.out.println("Invoking blocked processor");
>               cache2.invoke(key, (entry, arguments) -> {
>                       System.out.println("IN INVOKE");
>                       try {
>                               Thread.sleep(5 * 1000);
>                       } catch (InterruptedException e) {
>                               e.printStackTrace();
>                       }
>                       return null;
>               });
>               System.out.println("Blocked invoke returned");
>       }
> };
> thread.start();
> Thread.sleep(1000);
> IgniteCache<String, Integer> async1 = cache1.withAsync();
> System.out.println("Before async op");
> //async1.put(key, 1);
> //async1.get(key);
> async1.containsKey(key);
> System.out.println("After async op");
> assert async1.future().isDone();
> thread.join();
> ignite2.close();
> ignite1.close();
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to