[ https://issues.apache.org/jira/browse/IGNITE-5960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16472110#comment-16472110 ]
Alexey Kuznetsov commented on IGNITE-5960: ------------------------------------------ [~agoncharuk] > Note that it would be cleaner to acquire the listeners map after the > partition update counter is incremented, however, the listeners map is used > in the needVal flag evaluation. Yeah, I tried to acquire the listeners map *after* the partition update counter is incremented: To do so, we firstly need to move _needVal_ and _readFromStore_ flags evaluation after update counter is incremented. But _readFromStore_ flag must be evaluated strongly *before* partition counter is incremented, see _GridCacheMapEntry.AtomicCacheUpdateClosure#call_ where we load data from store. So, we cannot acquire the listeners map *after* the partition update counter is incremented. > Currently it is possible that {{evtOldVal}} will be {{null}} if we read >{{null}} from {{updateListeners}} in the first place. In my fix, query entry must be filtered out if we read {{null}} from {{updateListeners}} in the first place(this fixes the essential bug). To filter out entry, _newVal_ and _oldVal_ must be passed as nulls to _cctx.continuousQueries().onEntryUpdated_ , see _CacheContinuousQueryManager#onEntryUpdated_ , _CacheContinuousQueryManager#skipUpdateEvent_ . Let's consider again the scenario: 1) T1 updates E1 (updateCounter gets incremented to 1); 2) T2 updates E2 (updateCounter gets incremented to 2); 3) T2 finishes update and exits GridCacheMapEntry::innerUpdate method; In this point we have CacheContinuousQueryEventBuffer#currentPartitionCounter equals 2 4) user adds continuous query listener; 5) T1 proceeds, picks up listener (not null thanks to the fix) and notifies listener; Batch#startCntr equals 2 (currentPartitionCounter() returns 2) so entry E1 would be filtered out of Batch (E1 update counter would be smaller than Batch#startCntr) PS E1 also can be sent to remote node(if we have remote listener installed) and processed in CacheContinuousQueryPartitionRecovery#collectEntries, but it would be filtered out there. 6) T3 updates E3 (updateCounter gets incremented to 3) and notifies listener; After 6) user's listener would be notified only once by key 3. After listener is set by user, entry E1 must be filtered out. Are you agree with such changes ? > Ignite Continuous Query (Queries 3): > CacheContinuousQueryConcurrentPartitionUpdateTest::testConcurrentUpdatesAndQueryStartAtomic > is flaky > ----------------------------------------------------------------------------------------------------------------------------------------- > > Key: IGNITE-5960 > URL: https://issues.apache.org/jira/browse/IGNITE-5960 > Project: Ignite > Issue Type: Bug > Affects Versions: 2.1 > Reporter: Sergey Chugunov > Assignee: Alexey Kuznetsov > Priority: Major > Labels: MakeTeamcityGreenAgain, test-failure > Fix For: 2.6 > > > According to [TC > history|http://ci.ignite.apache.org/project.html?projectId=Ignite20Tests&testNameId=6546112007182082024&tab=testDetails&branch_Ignite20Tests=%3Cdefault%3E] > test is flaky. > It is possible to reproduce it locally, sample run shows 9 failed tests out > of 30 overall executed. > Test fails with jUnit assertion check: > {noformat} > junit.framework.AssertionFailedError: > Expected :10000 > Actual :0 > <Click to see difference> > at junit.framework.Assert.fail(Assert.java:57) > at junit.framework.Assert.failNotEquals(Assert.java:329) > at junit.framework.Assert.assertEquals(Assert.java:78) > at junit.framework.Assert.assertEquals(Assert.java:234) > at junit.framework.Assert.assertEquals(Assert.java:241) > at junit.framework.TestCase.assertEquals(TestCase.java:409) > at > org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryConcurrentPartitionUpdateTest.concurrentUpdatesAndQueryStart(CacheContinuousQueryConcurrentPartitionUpdateTest.java:385) > at > org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryConcurrentPartitionUpdateTest.testConcurrentUpdatesAndQueryStartTx(CacheContinuousQueryConcurrentPartitionUpdateTest.java:245) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at junit.framework.TestCase.runTest(TestCase.java:176) > at > org.apache.ignite.testframework.junits.GridAbstractTest.runTestInternal(GridAbstractTest.java:2000) > at > org.apache.ignite.testframework.junits.GridAbstractTest.access$000(GridAbstractTest.java:132) > at > org.apache.ignite.testframework.junits.GridAbstractTest$5.run(GridAbstractTest.java:1915) > at java.lang.Thread.run(Thread.java:745) > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)