[ https://issues.apache.org/jira/browse/IGNITE-2145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Valentin Kulichenko resolved IGNITE-2145. ----------------------------------------- Resolution: Won't Fix {{EntryProcessor}} has to be invoked atomically, regardless of whether you're executing it synchronously or asynchronously. This means that the entry will be locked during the execution. If you need to execute a long-running task collocated with the data, you can use {{IgniteCompute.affinityRun()}} method instead, which sends a closure to the node where the entry is stored without locking it. > Async cache operations on primary node might block > -------------------------------------------------- > > Key: IGNITE-2145 > URL: https://issues.apache.org/jira/browse/IGNITE-2145 > Project: Ignite > Issue Type: Bug > Affects Versions: ignite-1.4 > Environment: Windows 8.1 64 bit > java version "1.8.0_51" > Java(TM) SE Runtime Environment (build 1.8.0_51-b16) > Java HotSpot(TM) 64-Bit Server VM (build 25.51-b03, mixed mode) > Ignite 1.4.0 > Reporter: Avihai Berkovitz > > When executing async operations on caches the initial function call is > supposed to return immediately, without regard to cache state, network > traffic, etc. If we reference a cache key on a different node it works. But > if the primary node for a cache key is the same node that initiate the > operation everything is executed synchronously. > This is probably an optimization, but can have side effects. For example, if > a long running CacheProcessor is invoked on this key, every operation will > block. > Here is some code to illustrate the problem. Notice that there is a 4 seconds > delay between "Before async op" and "After async op". If you reverse the > isPrimary() condition it won't happen. > {code} > IgniteConfiguration igniteConfiguration = new IgniteConfiguration() > .setFailoverSpi(new AlwaysFailoverSpi()) > .setGridLogger(new Slf4jLogger()) > .setPeerClassLoadingEnabled(false) > .setDeploymentMode(DeploymentMode.CONTINUOUS); > Ignite ignite1 = Ignition.start(igniteConfiguration); > Ignite ignite2 = Ignition.start(igniteConfiguration.setGridName("2")); > assert ignite1.cluster().nodes().size() == 2; > assert ignite2.cluster().nodes().size() == 2; > CacheConfiguration<String, Integer> cacheConfiguration = new > CacheConfiguration<String, Integer>() > .setName("test") > .setCacheMode(CacheMode.PARTITIONED) > .setAtomicityMode(CacheAtomicityMode.ATOMIC) > > .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); > IgniteCache<String, Integer> cache1 = > ignite1.getOrCreateCache(cacheConfiguration); > IgniteCache<String, Integer> cache2 = > ignite2.getOrCreateCache(cacheConfiguration); > // Find key mapped to the first node > String key = IntStream.range(0, 100) > .mapToObj(value -> "HI" + value) > .filter(s -> > ignite1.affinity(cacheConfiguration.getName()).isPrimary(ignite1.cluster().localNode(), > s)) > .findFirst().get(); > // Run a blocked processor from node 2 > Thread thread = new Thread() { > @Override > public void run() { > System.out.println("Invoking blocked processor"); > cache2.invoke(key, (entry, arguments) -> { > System.out.println("IN INVOKE"); > try { > Thread.sleep(5 * 1000); > } catch (InterruptedException e) { > e.printStackTrace(); > } > return null; > }); > System.out.println("Blocked invoke returned"); > } > }; > thread.start(); > Thread.sleep(1000); > IgniteCache<String, Integer> async1 = cache1.withAsync(); > System.out.println("Before async op"); > //async1.put(key, 1); > //async1.get(key); > async1.containsKey(key); > System.out.println("After async op"); > assert async1.future().isDone(); > thread.join(); > ignite2.close(); > ignite1.close(); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)