[ https://issues.apache.org/jira/browse/IGNITE-4982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15969120#comment-15969120 ]
Konstantin Dudkov commented on IGNITE-4982: ------------------------------------------- Semen, I added two methods to GridDhtAtomicAbstractUpdateRequest: int obsoleteSize() & KeyCacheObject obsoleteKey(int idx) witn new structure in request and did small refactoring in GridNearAtomicCache#processDhtAtomicUpdateRequest. Please review. Thanks. > GridCacheAbstractRemoveFailureTest fail > --------------------------------------- > > Key: IGNITE-4982 > URL: https://issues.apache.org/jira/browse/IGNITE-4982 > Project: Ignite > Issue Type: Bug > Components: cache > Reporter: Konstantin Dudkov > Assignee: Konstantin Dudkov > Fix For: 2.0 > > > GridCacheAbstractRemoveFailureTest (and some child tests) fails. Reproducer: > {code:java} > import java.util.Collection; > import java.util.HashSet; > import java.util.Map; > import java.util.concurrent.Callable; > import java.util.concurrent.CyclicBarrier; > import java.util.concurrent.ThreadLocalRandom; > import java.util.concurrent.TimeUnit; > import java.util.concurrent.TimeoutException; > import java.util.concurrent.atomic.AtomicBoolean; > import java.util.concurrent.atomic.AtomicLong; > import java.util.concurrent.atomic.AtomicReference; > import javax.cache.CacheException; > import org.apache.ignite.Ignite; > import org.apache.ignite.IgniteCache; > import org.apache.ignite.IgniteException; > import org.apache.ignite.IgniteTransactions; > import org.apache.ignite.cache.CacheAtomicWriteOrderMode; > import org.apache.ignite.cache.CacheAtomicityMode; > import org.apache.ignite.cache.CacheMode; > import org.apache.ignite.configuration.CacheConfiguration; > import org.apache.ignite.configuration.IgniteConfiguration; > import org.apache.ignite.configuration.NearCacheConfiguration; > import org.apache.ignite.internal.IgniteInternalFuture; > import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; > import org.apache.ignite.internal.util.lang.GridTuple; > import org.apache.ignite.internal.util.typedef.F; > import org.apache.ignite.internal.util.typedef.T2; > import org.apache.ignite.internal.util.typedef.X; > import org.apache.ignite.internal.util.typedef.internal.U; > import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; > import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; > import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; > import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; > import org.apache.ignite.testframework.GridTestUtils; > import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; > import org.apache.ignite.transactions.Transaction; > import org.apache.ignite.transactions.TransactionConcurrency; > import org.apache.ignite.transactions.TransactionIsolation; > import org.jsr166.ConcurrentHashMap8; > import static > org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE; > import static org.apache.ignite.cache.CacheMode.PARTITIONED; > import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; > import static > org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; > import static > org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; > import static > org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; > import static > org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE; > /** > * Tests that removes are not lost when topology changes. > */ > public abstract class GridCacheAbstractRemoveFailureTest extends > GridCommonAbstractTest { > /** IP finder. */ > private static final TcpDiscoveryIpFinder IP_FINDER = new > TcpDiscoveryVmIpFinder(true); > /** */ > private static final int GRID_CNT = 3; > /** Keys count. */ > private static final int KEYS_CNT = 10_000; > /** Test duration. */ > private static final long DUR = 90 * 1000L; > /** Cache data assert frequency. */ > private static final long ASSERT_FREQ = 10_000; > /** Kill delay. */ > private static final T2<Integer, Integer> KILL_DELAY = new T2<>(2000, > 5000); > /** Start delay. */ > private static final T2<Integer, Integer> START_DELAY = new T2<>(2000, > 5000); > /** */ > private static String sizePropVal; > /** {@inheritDoc} */ > @Override protected IgniteConfiguration getConfiguration(String > igniteInstanceName) throws Exception { > IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); > > ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER).setForceServerMode(true); > if (testClientNode() && > getTestIgniteInstanceName(0).equals(igniteInstanceName)) > cfg.setClientMode(true); > > ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1); > return cfg; > } > /** {@inheritDoc} */ > @Override protected void beforeTestsStarted() throws Exception { > // Need to increase value set in GridAbstractTest > sizePropVal = > System.getProperty(IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE); > System.setProperty(IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE, "100000"); > startGrids(GRID_CNT); > } > /** {@inheritDoc} */ > @Override protected void afterTestsStopped() throws Exception { > super.afterTestsStopped(); > System.setProperty(IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE, > sizePropVal != null ? sizePropVal : ""); > stopAllGrids(); > } > /** {@inheritDoc} */ > @Override protected long getTestTimeout() { > return DUR + 60_000; > } > /** > * @return Cache mode. > */ > protected abstract CacheMode cacheMode(); > /** > * @return Cache atomicity mode. > */ > protected abstract CacheAtomicityMode atomicityMode(); > /** > * @return Near cache configuration. > */ > protected abstract NearCacheConfiguration nearCache(); > /** > * @return Atomic cache write order mode. > */ > protected CacheAtomicWriteOrderMode atomicWriteOrderMode() { > return null; > } > /** > * @return {@code True} if test updates from client node. > */ > protected boolean testClientNode() { > return false; > } > /** > * @throws Exception If failed. > */ > public void testPutAndRemove() throws Exception { > putAndRemove(DUR, null, null); > } > /** > * @throws Exception If failed. > */ > public void testPutAndRemovePessimisticTx() throws Exception { > if (atomicityMode() != CacheAtomicityMode.TRANSACTIONAL) > return; > putAndRemove(30_000, PESSIMISTIC, REPEATABLE_READ); > } > /** > * @throws Exception If failed. > */ > public void testPutAndRemoveOptimisticSerializableTx() throws Exception { > if (atomicityMode() != CacheAtomicityMode.TRANSACTIONAL) > return; > putAndRemove(30_000, OPTIMISTIC, SERIALIZABLE); > } > /** > * @param duration Test duration. > * @param txConcurrency Transaction concurrency if test explicit > transaction. > * @param txIsolation Transaction isolation if test explicit transaction. > * @throws Exception If failed. > */ > private void putAndRemove(long duration, > final TransactionConcurrency txConcurrency, > final TransactionIsolation txIsolation) throws Exception { > assertEquals(testClientNode(), (boolean) > grid(0).configuration().isClientMode()); > grid(0).destroyCache(null); > CacheConfiguration<Integer, Integer> ccfg = new > CacheConfiguration<>(); > ccfg.setWriteSynchronizationMode(FULL_SYNC); > ccfg.setCacheMode(cacheMode()); > if (cacheMode() == PARTITIONED) > ccfg.setBackups(1); > ccfg.setAtomicityMode(atomicityMode()); > ccfg.setAtomicWriteOrderMode(atomicWriteOrderMode()); > ccfg.setNearConfiguration(nearCache()); > final IgniteCache<Integer, Integer> sndCache0 = > grid(0).createCache(ccfg); > final AtomicBoolean stop = new AtomicBoolean(); > final AtomicLong cntr = new AtomicLong(); > final AtomicLong errCntr = new AtomicLong(); > // Expected values in cache. > final Map<Integer, GridTuple<Integer>> expVals = new > ConcurrentHashMap8<>(); > final AtomicReference<CyclicBarrier> cmp = new AtomicReference<>(); > IgniteInternalFuture<?> updateFut = GridTestUtils.runAsync(new > Callable<Void>() { > @Override public Void call() throws Exception { > Thread.currentThread().setName("update-thread"); > ThreadLocalRandom rnd = ThreadLocalRandom.current(); > IgniteTransactions txs = > sndCache0.unwrap(Ignite.class).transactions(); > while (!stop.get()) { > for (int i = 0; i < 100; i++) { > int key = rnd.nextInt(KEYS_CNT); > boolean put = rnd.nextInt(0, 100) > 10; > while (true) { > try { > if (put) { > boolean failed = false; > if (txConcurrency != null) { > try (Transaction tx = > txs.txStart(txConcurrency, txIsolation)) { > sndCache0.put(key, i); > tx.commit(); > } > catch (CacheException | > IgniteException e) { > if (!X.hasCause(e, > ClusterTopologyCheckedException.class)) { > log.error("Unexpected error: > " + e); > throw e; > } > failed = true; > } > } > else > sndCache0.put(key, i); > if (!failed) > expVals.put(key, F.t(i)); > } > else { > boolean failed = false; > if (txConcurrency != null) { > try (Transaction tx = > txs.txStart(txConcurrency, txIsolation)) { > sndCache0.remove(key); > tx.commit(); > } > catch (CacheException | > IgniteException e) { > if (!X.hasCause(e, > ClusterTopologyCheckedException.class)) { > log.error("Unexpected error: > " + e); > throw e; > } > failed = true; > } > } > else > sndCache0.remove(key); > if (!failed) > expVals.put(key, F.<Integer>t(null)); > } > break; > } > catch (CacheException e) { > if (put) > log.error("Put failed [key=" + key + ", > val=" + i + ']', e); > else > log.error("Remove failed [key=" + key + > ']', e); > errCntr.incrementAndGet(); > } > } > } > cntr.addAndGet(100); > CyclicBarrier barrier = cmp.get(); > if (barrier != null) { > log.info("Wait data check."); > barrier.await(60_000, TimeUnit.MILLISECONDS); > log.info("Finished wait data check."); > } > } > return null; > } > }); > IgniteInternalFuture<?> killFut = GridTestUtils.runAsync(new > Callable<Void>() { > @Override public Void call() throws Exception { > Thread.currentThread().setName("restart-thread"); > while (!stop.get()) { > U.sleep(random(KILL_DELAY.get1(), KILL_DELAY.get2())); > killAndRestart(stop); > CyclicBarrier barrier = cmp.get(); > if (barrier != null) { > log.info("Wait data check."); > barrier.await(60_000, TimeUnit.MILLISECONDS); > log.info("Finished wait data check."); > } > } > return null; > } > }); > try { > long stopTime = duration + U.currentTimeMillis() ; > long nextAssert = U.currentTimeMillis() + ASSERT_FREQ; > while (U.currentTimeMillis() < stopTime) { > long start = System.nanoTime(); > long ops = cntr.longValue(); > U.sleep(1000); > long diff = cntr.longValue() - ops; > double time = (System.nanoTime() - start) / 1_000_000_000d; > long opsPerSecond = (long)(diff / time); > log.info("Operations/second: " + opsPerSecond); > if (U.currentTimeMillis() >= nextAssert) { > CyclicBarrier barrier = new CyclicBarrier(3, new > Runnable() { > @Override public void run() { > try { > cmp.set(null); > log.info("Checking cache content."); > assertCacheContent(expVals); > log.info("Finished check cache content."); > } > catch (Throwable e) { > log.error("Unexpected error: " + e, e); > throw e; > } > } > }); > log.info("Start cache content check."); > cmp.set(barrier); > try { > barrier.await(60_000, TimeUnit.MILLISECONDS); > } > catch (TimeoutException e) { > U.dumpThreads(log); > fail("Failed to check cache content: " + e); > } > log.info("Cache content check done."); > nextAssert = System.currentTimeMillis() + ASSERT_FREQ; > } > } > } > finally { > stop.set(true); > } > killFut.get(); > updateFut.get(); > log.info("Test finished. Update errors: " + errCntr.get()); > } > /** > * @param stop Stop flag. > * @throws Exception If failed. > */ > private void killAndRestart(AtomicBoolean stop) throws Exception { > if (stop.get()) > return; > int idx = random(1, GRID_CNT + 1); > log.info("Killing node " + idx); > stopGrid(idx); > U.sleep(random(START_DELAY.get1(), START_DELAY.get2())); > log.info("Restarting node " + idx); > startGrid(idx); > if (stop.get()) > return; > U.sleep(1000); > } > /** > * @param expVals Expected values in cache. > */ > @SuppressWarnings({"TooBroadScope", "ConstantIfStatement"}) > private void assertCacheContent(Map<Integer, GridTuple<Integer>> expVals) > { > assert !expVals.isEmpty(); > Collection<Integer> failedKeys = new HashSet<>(); > for (int i = 0; i < GRID_CNT; i++) { > Ignite ignite = grid(i); > IgniteCache<Integer, Integer> cache = ignite.cache(null); > for (Map.Entry<Integer, GridTuple<Integer>> expVal : > expVals.entrySet()) { > Integer val = cache.get(expVal.getKey()); > if (!F.eq(expVal.getValue().get(), val)) { > failedKeys.add(expVal.getKey()); > boolean primary = > affinity(cache).isPrimary(ignite.cluster().localNode(), expVal.getKey()); > boolean backup = > affinity(cache).isBackup(ignite.cluster().localNode(), expVal.getKey()); > log.error("Unexpected cache data [exp=" + expVal + > ", actual=" + val + > ", nodePrimary=" + primary + > ", nodeBackup=" + backup + > ", nodeIdx" + i + > ", nodeId=" + ignite.cluster().localNode().id() + > ']'); > } > } > } > assertTrue("Unexpected data for keys: " + failedKeys, > failedKeys.isEmpty()); > } > /** > * @param min Min possible value. > * @param max Max possible value (exclusive). > * @return Random value. > */ > private static int random(int min, int max) { > if (max == min) > return max; > return ThreadLocalRandom.current().nextInt(min, max); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)