http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/LatchAckMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/LatchAckMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/LatchAckMessage.java new file mode 100644 index 0000000..bad1b61 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/LatchAckMessage.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch; + +import java.nio.ByteBuffer; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * Message is used to send acks for {@link Latch} instances management. + */ +public class LatchAckMessage implements Message { + /** */ + private static final long serialVersionUID = 0L; + + /** Latch id. */ + private String latchId; + + /** Latch topology version. */ + private AffinityTopologyVersion topVer; + + /** Flag indicates that ack is final. */ + private boolean isFinal; + + /** + * Constructor. + * + * @param latchId Latch id. + * @param topVer Latch topology version. + * @param isFinal Final acknowledgement flag. + */ + public LatchAckMessage(String latchId, AffinityTopologyVersion topVer, boolean isFinal) { + this.latchId = latchId; + this.topVer = topVer; + this.isFinal = isFinal; + } + + /** + * Empty constructor for marshalling purposes. + */ + public LatchAckMessage() { + } + + /** + * @return Latch id. + */ + public String latchId() { + return latchId; + } + + /** + * @return Latch topology version. + */ + public AffinityTopologyVersion topVer() { + return topVer; + } + + /** + * @return {@code} if ack is final. + */ + public boolean isFinal() { + return isFinal; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeBoolean("isFinal", isFinal)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeString("latchId", latchId)) + return false; + + writer.incrementState(); + + case 2: + if (!writer.writeMessage("topVer", topVer)) + return false; + + writer.incrementState(); + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + isFinal = reader.readBoolean("isFinal"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + latchId = reader.readString("latchId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 2: + topVer = reader.readMessage("topVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + } + + return reader.afterMessageRead(LatchAckMessage.class); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 135; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 3; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 7785605..33f84f0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -3525,6 +3525,11 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou U.error(log, "Failed to prepare transaction: " + this, e); } + catch (Throwable t) { + fut.onDone(t); + + throw t; + } if (err != null) fut.rollbackOnError(err); @@ -3544,6 +3549,11 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou U.error(log, "Failed to prepare transaction: " + this, e); } + catch (Throwable t) { + fut.onDone(t); + + throw t; + } if (err != null) fut.rollbackOnError(err); http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java index 5cfd92d..68ec83d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java @@ -189,7 +189,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple freeList.saveMetadata(); long updCntr = store.updateCounter(); - int size = store.fullSize(); + long size = store.fullSize(); long rmvId = globalRemoveId().get(); PageMemoryEx pageMem = (PageMemoryEx)grp.dataRegion().pageMemory(); @@ -318,7 +318,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple partMetaId, updCntr, rmvId, - size, + (int)size, // TODO: Partition size may be long cntrsPageId, state == null ? -1 : (byte)state.ordinal(), pageCnt @@ -549,7 +549,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple final int grpId, final int partId, final int currAllocatedPageCnt, - final int partSize + final long partSize ) { if (part != null) { boolean reserved = part.reserve(); @@ -1301,7 +1301,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple } /** {@inheritDoc} */ - @Override public int fullSize() { + @Override public long fullSize() { try { CacheDataStore delegate0 = init0(true); @@ -1313,7 +1313,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple } /** {@inheritDoc} */ - @Override public int cacheSize(int cacheId) { + @Override public long cacheSize(int cacheId) { try { CacheDataStore delegate0 = init0(true); http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 9bfaaf3..945ef48 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -490,7 +490,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement @Override public AffinityTopologyVersion topologyVersion() { AffinityTopologyVersion res = topVer; - if (res.equals(AffinityTopologyVersion.NONE)) { + if (res == null || res.equals(AffinityTopologyVersion.NONE)) { if (system()) { AffinityTopologyVersion topVer = cctx.tm().lockedTopologyVersion(Thread.currentThread().getId(), this); http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index fbdeca1..9fb8777 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -545,10 +545,10 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * @param topVer Topology version. * @return Future that will be completed when all ongoing transactions are finished. */ - public IgniteInternalFuture<Boolean> finishTxs(AffinityTopologyVersion topVer) { + public IgniteInternalFuture<Boolean> finishLocalTxs(AffinityTopologyVersion topVer) { GridCompoundFuture<IgniteInternalTx, Boolean> res = new CacheObjectsReleaseFuture<>( - "Tx", + "LocalTx", topVer, new IgniteReducer<IgniteInternalTx, Boolean>() { @Override public boolean collect(IgniteInternalTx e) { @@ -561,8 +561,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { }); for (IgniteInternalTx tx : txs()) { - if (needWaitTransaction(tx, topVer)) + if (needWaitTransaction(tx, topVer)) { res.add(tx.finishFuture()); + } } res.markInitialized(); @@ -571,6 +572,29 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { } /** + * Creates a future that will wait for finishing all tx updates on backups after all local transactions are finished. + * + * NOTE: + * As we send finish request to backup nodes after transaction successfully completed on primary node + * it's important to ensure that all updates from primary to backup are finished or at least remote transaction has created on backup node. + * + * @param finishLocalTxsFuture Local transactions finish future. + * @param topVer Topology version. + * @return Future that will be completed when all ongoing transactions are finished. + */ + public IgniteInternalFuture<?> finishAllTxs(IgniteInternalFuture<?> finishLocalTxsFuture, AffinityTopologyVersion topVer) { + final GridCompoundFuture finishAllTxsFuture = new CacheObjectsReleaseFuture("AllTx", topVer); + + // After finishing all local updates, wait for finishing all tx updates on backups. + finishLocalTxsFuture.listen(future -> { + finishAllTxsFuture.add(cctx.mvcc().finishRemoteTxs(topVer)); + finishAllTxsFuture.markInitialized(); + }); + + return finishAllTxsFuture; + } + + /** * @param tx Transaction. * @param topVer Exchange version. * @return {@code True} if need wait transaction for exchange. @@ -1834,12 +1858,12 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * @return Finish future for related remote transactions. */ @SuppressWarnings("unchecked") - public IgniteInternalFuture<?> remoteTxFinishFuture(GridCacheVersion nearVer) { - GridCompoundFuture<Void, Void> fut = new GridCompoundFuture<>(); + public IgniteInternalFuture<IgniteInternalTx> remoteTxFinishFuture(GridCacheVersion nearVer) { + GridCompoundFuture<IgniteInternalTx, IgniteInternalTx> fut = new GridCompoundFuture<>(); for (final IgniteInternalTx tx : txs()) { if (!tx.local() && nearVer.equals(tx.nearXidVersion())) - fut.add((IgniteInternalFuture) tx.finishFuture()); + fut.add(tx.finishFuture()); } fut.markInitialized(); http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDhtLocalPartitionAfterRemoveSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDhtLocalPartitionAfterRemoveSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDhtLocalPartitionAfterRemoveSelfTest.java index 7263656..702b188 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDhtLocalPartitionAfterRemoveSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDhtLocalPartitionAfterRemoveSelfTest.java @@ -76,7 +76,7 @@ public class CacheDhtLocalPartitionAfterRemoveSelfTest extends GridCommonAbstrac cache = grid(g).cache(DEFAULT_CACHE_NAME); for (GridDhtLocalPartition p : dht(cache).topology().localPartitions()) { - int size = p.dataStore().fullSize(); + long size = p.dataStore().fullSize(); assertTrue("Unexpected size: " + size, size <= 32); } http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java index 468bbc8..6c570d7 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java @@ -86,6 +86,7 @@ import org.apache.ignite.internal.util.lang.GridIterator; import org.apache.ignite.internal.util.lang.GridPlainCallable; import org.apache.ignite.internal.util.lang.gridfunc.ContainsPredicate; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.PA; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteExchangeLatchManagerCoordinatorFailTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteExchangeLatchManagerCoordinatorFailTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteExchangeLatchManagerCoordinatorFailTest.java new file mode 100644 index 0000000..52cd033 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteExchangeLatchManagerCoordinatorFailTest.java @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.processors.cache.datastructures; + +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.collect.Lists; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.Latch; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.ExchangeLatchManager; +import org.apache.ignite.internal.util.future.GridCompoundFuture; +import org.apache.ignite.lang.IgniteBiClosure; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Assert; + +/** + * Tests for {@link ExchangeLatchManager} functionality when latch coordinator is failed. + */ +public class IgniteExchangeLatchManagerCoordinatorFailTest extends GridCommonAbstractTest { + /** */ + private static final String LATCH_NAME = "test"; + + /** 5 nodes. */ + private final AffinityTopologyVersion latchTopVer = new AffinityTopologyVersion(5, 0); + + /** Wait before latch creation. */ + private final IgniteBiClosure<ExchangeLatchManager, CountDownLatch, Boolean> beforeCreate = (mgr, syncLatch) -> { + try { + syncLatch.countDown(); + syncLatch.await(); + + Latch distributedLatch = mgr.getOrCreate(LATCH_NAME, latchTopVer); + + distributedLatch.countDown(); + + distributedLatch.await(); + } catch (Exception e) { + log.error("Unexpected exception", e); + + return false; + } + + return true; + }; + + /** Wait before latch count down. */ + private final IgniteBiClosure<ExchangeLatchManager, CountDownLatch, Boolean> beforeCountDown = (mgr, syncLatch) -> { + try { + Latch distributedLatch = mgr.getOrCreate(LATCH_NAME, latchTopVer); + + syncLatch.countDown(); + syncLatch.await(); + + distributedLatch.countDown(); + + distributedLatch.await(); + } catch (Exception e) { + log.error("Unexpected exception ", e); + + return false; + } + + return true; + }; + + /** Wait after all operations are successful. */ + private final IgniteBiClosure<ExchangeLatchManager, CountDownLatch, Boolean> all = (mgr, syncLatch) -> { + try { + Latch distributedLatch = mgr.getOrCreate(LATCH_NAME, latchTopVer); + + distributedLatch.countDown(); + + syncLatch.countDown(); + + distributedLatch.await(); + + syncLatch.await(); + } catch (Exception e) { + log.error("Unexpected exception ", e); + + return false; + } + + return true; + }; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** + * Test scenarios description: + * + * We have existing coordinator and 4 other nodes. + * Each node do following operations: + * 1) Create latch + * 2) Countdown latch + * 3) Await latch + * + * While nodes do the operations we shutdown coordinator and next oldest node become new coordinator. + * We should check that new coordinator properly restored latch and all nodes finished latch completion successfully after that. + * + * Each node before coordinator shutdown can be in 3 different states: + * + * State {@link #beforeCreate} - Node didn't create latch yet. + * State {@link #beforeCountDown} - Node created latch but didn't count down it yet. + * State {@link #all} - Node created latch and count downed it. + * + * We should check important cases when future coordinator is in one of these states, and other 3 nodes have 3 different states. + */ + + /** + * Scenario 1: + * + * Node 1 state -> {@link #beforeCreate} + * Node 2 state -> {@link #beforeCountDown} + * Node 3 state -> {@link #all} + * Node 4 state -> {@link #beforeCreate} + */ + public void testCoordinatorFail1() throws Exception { + List<IgniteBiClosure<ExchangeLatchManager, CountDownLatch, Boolean>> nodeStates = Lists.newArrayList( + beforeCreate, + beforeCountDown, + all, + beforeCreate + ); + + doTestCoordinatorFail(nodeStates); + } + + /** + * Scenario 2: + * + * Node 1 state -> {@link #beforeCountDown} + * Node 2 state -> {@link #beforeCountDown} + * Node 3 state -> {@link #all} + * Node 4 state -> {@link #beforeCreate} + */ + public void testCoordinatorFail2() throws Exception { + List<IgniteBiClosure<ExchangeLatchManager, CountDownLatch, Boolean>> nodeStates = Lists.newArrayList( + beforeCountDown, + beforeCountDown, + all, + beforeCreate + ); + + doTestCoordinatorFail(nodeStates); + } + + /** + * Scenario 3: + * + * Node 1 state -> {@link #all} + * Node 2 state -> {@link #beforeCountDown} + * Node 3 state -> {@link #all} + * Node 4 state -> {@link #beforeCreate} + */ + public void testCoordinatorFail3() throws Exception { + List<IgniteBiClosure<ExchangeLatchManager, CountDownLatch, Boolean>> nodeStates = Lists.newArrayList( + all, + beforeCountDown, + all, + beforeCreate + ); + + doTestCoordinatorFail(nodeStates); + } + + /** + * Test latch coordinator fail with specified scenarios. + * + * @param nodeScenarios Node scenarios. + * @throws Exception If failed. + */ + private void doTestCoordinatorFail(List<IgniteBiClosure<ExchangeLatchManager, CountDownLatch, Boolean>> nodeScenarios) throws Exception { + IgniteEx crd = (IgniteEx) startGridsMultiThreaded(5); + crd.cluster().active(true); + + // Latch to synchronize node states. + CountDownLatch syncLatch = new CountDownLatch(5); + + GridCompoundFuture finishAllLatches = new GridCompoundFuture(); + + AtomicBoolean hasErrors = new AtomicBoolean(); + + for (int node = 1; node < 5; node++) { + IgniteEx grid = grid(node); + ExchangeLatchManager latchMgr = grid.context().cache().context().exchange().latch(); + final int stateIdx = node - 1; + + IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(() -> { + boolean success = nodeScenarios.get(stateIdx).apply(latchMgr, syncLatch); + if (!success) + hasErrors.set(true); + }, 1, "latch-runner-" + node); + + finishAllLatches.add(fut); + } + + finishAllLatches.markInitialized(); + + // Wait while all nodes reaches their states. + while (syncLatch.getCount() != 1) { + Thread.sleep(10); + + if (hasErrors.get()) + throw new Exception("All nodes should complete latches without errors"); + } + + crd.close(); + + // Resume progress for all nodes. + syncLatch.countDown(); + + // Wait for distributed latch completion. + finishAllLatches.get(5000); + + Assert.assertFalse("All nodes should complete latches without errors", hasErrors.get()); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidationTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidationTest.java new file mode 100644 index 0000000..63d772a --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidationTest.java @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; + +/** + * + */ +public class GridCachePartitionsStateValidationTest extends GridCommonAbstractTest { + /** Cache name. */ + private static final String CACHE_NAME = "cache"; + + /** */ + private boolean clientMode; + + /** {@inheritDoc */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setConsistentId(igniteInstanceName); + + cfg.setCacheConfiguration(new CacheConfiguration(CACHE_NAME) + .setBackups(1) + .setAffinity(new RendezvousAffinityFunction(false, 32)) + ); + + cfg.setCommunicationSpi(new SingleMessageInterceptorCommunicationSpi(2)); + + if (clientMode) + cfg.setClientMode(true); + + return cfg; + } + + /** {@inheritDoc */ + @Override protected void beforeTest() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + clientMode = false; + } + + /** + * Test that partitions state validation works correctly. + * + * @throws Exception If failed. + */ + public void testValidationIfPartitionCountersAreInconsistent() throws Exception { + IgniteEx ignite = (IgniteEx) startGrids(2); + ignite.cluster().active(true); + + awaitPartitionMapExchange(); + + // Modify update counter for some partition. + for (GridDhtLocalPartition partition : ignite.cachex(CACHE_NAME).context().topology().localPartitions()) { + partition.updateCounter(100500L); + break; + } + + // Trigger exchange. + startGrid(2); + + awaitPartitionMapExchange(); + + // Nothing should happen (just log error message) and we're still able to put data to corrupted cache. + ignite.cache(CACHE_NAME).put(0, 0); + + stopAllGrids(); + } + + /** + * Test that all nodes send correct {@link GridDhtPartitionsSingleMessage} with consistent update counters. + * + * @throws Exception If failed. + */ + public void testPartitionCountersConsistencyOnExchange() throws Exception { + IgniteEx ignite = (IgniteEx) startGrids(4); + ignite.cluster().active(true); + + awaitPartitionMapExchange(); + + final String atomicCacheName = "atomic-cache"; + final String txCacheName = "tx-cache"; + + clientMode = true; + + Ignite client = startGrid(4); + + clientMode = false; + + IgniteCache atomicCache = client.getOrCreateCache(new CacheConfiguration<>(atomicCacheName) + .setAtomicityMode(CacheAtomicityMode.ATOMIC) + .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC) + .setBackups(2) + .setAffinity(new RendezvousAffinityFunction(false, 32)) + ); + + IgniteCache txCache = client.getOrCreateCache(new CacheConfiguration<>(txCacheName) + .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) + .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC) + .setBackups(2) + .setAffinity(new RendezvousAffinityFunction(false, 32)) + ); + + for (int it = 0; it < 10; it++) { + SingleMessageInterceptorCommunicationSpi spi = (SingleMessageInterceptorCommunicationSpi) ignite.configuration().getCommunicationSpi(); + spi.clear(); + + // Stop load future. + final AtomicBoolean stop = new AtomicBoolean(); + + // Run atomic load. + IgniteInternalFuture atomicLoadFuture = GridTestUtils.runMultiThreadedAsync(() -> { + int k = 0; + + while (!stop.get()) { + k++; + try { + atomicCache.put(k, k); + } catch (Exception ignored) {} + } + }, 1, "atomic-load"); + + // Run tx load. + IgniteInternalFuture txLoadFuture = GridTestUtils.runMultiThreadedAsync(() -> { + final int txOps = 5; + + while (!stop.get()) { + List<Integer> randomKeys = Stream.generate(() -> ThreadLocalRandom.current().nextInt(5)) + .limit(txOps) + .sorted() + .collect(Collectors.toList()); + + try (Transaction tx = ignite.transactions().txStart(TransactionConcurrency.OPTIMISTIC, TransactionIsolation.READ_COMMITTED)) { + for (Integer key : randomKeys) + txCache.put(key, key); + + tx.commit(); + } + catch (Exception ignored) { } + } + }, 4, "tx-load"); + + // Wait for some data. + Thread.sleep(1000); + + // Prevent sending full message. + spi.blockFullMessage(); + + // Trigger exchange. + IgniteInternalFuture nodeStopFuture = GridTestUtils.runAsync(() -> stopGrid(3)); + + try { + spi.waitUntilAllSingleMessagesAreSent(); + + List<GridDhtPartitionsSingleMessage> interceptedMessages = spi.getMessages(); + + // Associate each message with existing node UUID. + Map<UUID, GridDhtPartitionsSingleMessage> messagesMap = new HashMap<>(); + for (int i = 0; i < interceptedMessages.size(); i++) + messagesMap.put(grid(i + 1).context().localNodeId(), interceptedMessages.get(i)); + + GridDhtPartitionsStateValidator validator = new GridDhtPartitionsStateValidator(ignite.context().cache().context()); + + // Validate partition update counters. If counters are not consistent, exception will be thrown. + validator.validatePartitionsUpdateCounters(ignite.cachex(atomicCacheName).context().topology(), messagesMap, Collections.emptySet()); + validator.validatePartitionsUpdateCounters(ignite.cachex(txCacheName).context().topology(), messagesMap, Collections.emptySet()); + + } finally { + // Stop load and resume exchange. + spi.unblockFullMessage(); + + stop.set(true); + + atomicLoadFuture.get(); + txLoadFuture.get(); + nodeStopFuture.get(); + } + + // Return grid to initial state. + startGrid(3); + + awaitPartitionMapExchange(); + } + } + + /** + * SPI which intercepts single messages during exchange. + */ + private static class SingleMessageInterceptorCommunicationSpi extends TcpCommunicationSpi { + /** */ + private static final List<GridDhtPartitionsSingleMessage> messages = new CopyOnWriteArrayList<>(); + + /** Future completes when {@link #singleMessagesThreshold} messages are sent to coordinator. */ + private static final GridFutureAdapter allSingleMessagesSent = new GridFutureAdapter(); + + /** A number of single messages we're waiting for send. */ + private final int singleMessagesThreshold; + + /** Latch which blocks full message sending. */ + private volatile CountDownLatch blockFullMsgLatch; + + /** + * Constructor. + */ + private SingleMessageInterceptorCommunicationSpi(int singleMessagesThreshold) { + this.singleMessagesThreshold = singleMessagesThreshold; + } + + /** {@inheritDoc} */ + @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) throws IgniteSpiException { + if (((GridIoMessage) msg).message() instanceof GridDhtPartitionsSingleMessage) { + GridDhtPartitionsSingleMessage singleMsg = (GridDhtPartitionsSingleMessage) ((GridIoMessage) msg).message(); + + // We're interesting for only exchange messages and when node is stopped. + if (singleMsg.exchangeId() != null && singleMsg.exchangeId().isLeft() && !singleMsg.client()) { + messages.add(singleMsg); + + if (messages.size() == singleMessagesThreshold) + allSingleMessagesSent.onDone(); + } + } + + try { + if (((GridIoMessage) msg).message() instanceof GridDhtPartitionsFullMessage) { + if (blockFullMsgLatch != null) + blockFullMsgLatch.await(); + } + } + catch (Exception ignored) { } + + super.sendMessage(node, msg, ackC); + } + + /** */ + public void clear() { + messages.clear(); + allSingleMessagesSent.reset(); + } + + /** */ + public List<GridDhtPartitionsSingleMessage> getMessages() { + return Collections.unmodifiableList(messages); + } + + /** */ + public void blockFullMessage() { + blockFullMsgLatch = new CountDownLatch(1); + } + + /** */ + public void unblockFullMessage() { + blockFullMsgLatch.countDown(); + } + + /** */ + public void waitUntilAllSingleMessagesAreSent() throws IgniteCheckedException { + allSingleMessagesSent.get(); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidatorSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidatorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidatorSelfTest.java new file mode 100644 index 0000000..9ed8d54 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionsStateValidatorSelfTest.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage; +import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Assert; +import org.mockito.Matchers; +import org.mockito.Mockito; + +/** + * Test correct behaviour of {@link GridDhtPartitionsStateValidator} class. + */ +public class GridCachePartitionsStateValidatorSelfTest extends GridCommonAbstractTest { + /** Mocks and stubs. */ + private final UUID localNodeId = UUID.randomUUID(); + /** */ + private GridCacheSharedContext cctxMock; + /** */ + private GridDhtPartitionTopology topologyMock; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + // Prepare mocks. + cctxMock = Mockito.mock(GridCacheSharedContext.class); + Mockito.when(cctxMock.localNodeId()).thenReturn(localNodeId); + + topologyMock = Mockito.mock(GridDhtPartitionTopology.class); + Mockito.when(topologyMock.partitionState(Matchers.any(), Matchers.anyInt())).thenReturn(GridDhtPartitionState.OWNING); + Mockito.when(topologyMock.groupId()).thenReturn(0); + Mockito.when(topologyMock.partitions()).thenReturn(3); + + List<GridDhtLocalPartition> localPartitions = Lists.newArrayList( + partitionMock(0, 1, 1), + partitionMock(1, 2, 2), + partitionMock(2, 3, 3) + ); + Mockito.when(topologyMock.localPartitions()).thenReturn(localPartitions); + Mockito.when(topologyMock.currentLocalPartitions()).thenReturn(localPartitions); + } + + /** + * @return Partition mock with specified {@code id}, {@code updateCounter} and {@code size}. + */ + private GridDhtLocalPartition partitionMock(int id, long updateCounter, long size) { + GridDhtLocalPartition partitionMock = Mockito.mock(GridDhtLocalPartition.class); + Mockito.when(partitionMock.id()).thenReturn(id); + Mockito.when(partitionMock.updateCounter()).thenReturn(updateCounter); + Mockito.when(partitionMock.fullSize()).thenReturn(size); + return partitionMock; + } + + /** + * @return Message containing specified {@code countersMap}. + */ + private GridDhtPartitionsSingleMessage fromUpdateCounters(Map<Integer, T2<Long, Long>> countersMap) { + GridDhtPartitionsSingleMessage msg = new GridDhtPartitionsSingleMessage(); + msg.addPartitionUpdateCounters(0, countersMap); + return msg; + } + + /** + * @return Message containing specified {@code sizesMap}. + */ + private GridDhtPartitionsSingleMessage fromCacheSizes(Map<Integer, Long> sizesMap) { + GridDhtPartitionsSingleMessage msg = new GridDhtPartitionsSingleMessage(); + msg.addPartitionSizes(0, sizesMap); + return msg; + } + + /** + * Test partition update counters validation. + */ + public void testPartitionCountersValidation() { + UUID remoteNode = UUID.randomUUID(); + UUID ignoreNode = UUID.randomUUID(); + + // For partitions 0 and 2 (zero counter) we have inconsistent update counters. + Map<Integer, T2<Long, Long>> updateCountersMap = new HashMap<>(); + updateCountersMap.put(0, new T2<>(2L, 2L)); + updateCountersMap.put(1, new T2<>(2L, 2L)); + + // Form single messages map. + Map<UUID, GridDhtPartitionsSingleMessage> messages = new HashMap<>(); + messages.put(remoteNode, fromUpdateCounters(updateCountersMap)); + messages.put(ignoreNode, fromUpdateCounters(updateCountersMap)); + + GridDhtPartitionsStateValidator validator = new GridDhtPartitionsStateValidator(cctxMock); + + // (partId, (nodeId, updateCounter)) + Map<Integer, Map<UUID, Long>> result = validator.validatePartitionsUpdateCounters(topologyMock, messages, Sets.newHashSet(ignoreNode)); + + // Check that validation result contains all necessary information. + Assert.assertEquals(2, result.size()); + Assert.assertTrue(result.containsKey(0)); + Assert.assertTrue(result.containsKey(2)); + Assert.assertTrue(result.get(0).get(localNodeId) == 1L); + Assert.assertTrue(result.get(0).get(remoteNode) == 2L); + Assert.assertTrue(result.get(2).get(localNodeId) == 3L); + Assert.assertTrue(result.get(2).get(remoteNode) == 0L); + } + + /** + * Test partition cache sizes validation. + */ + public void testPartitionCacheSizesValidation() { + UUID remoteNode = UUID.randomUUID(); + UUID ignoreNode = UUID.randomUUID(); + + // For partitions 0 and 2 we have inconsistent cache sizes. + Map<Integer, Long> cacheSizesMap = new HashMap<>(); + cacheSizesMap.put(0, 2L); + cacheSizesMap.put(1, 2L); + cacheSizesMap.put(2, 2L); + + // Form single messages map. + Map<UUID, GridDhtPartitionsSingleMessage> messages = new HashMap<>(); + messages.put(remoteNode, fromCacheSizes(cacheSizesMap)); + messages.put(ignoreNode, fromCacheSizes(cacheSizesMap)); + + GridDhtPartitionsStateValidator validator = new GridDhtPartitionsStateValidator(cctxMock); + + // (partId, (nodeId, cacheSize)) + Map<Integer, Map<UUID, Long>> result = validator.validatePartitionsSizes(topologyMock, messages, Sets.newHashSet(ignoreNode)); + + // Check that validation result contains all necessary information. + Assert.assertEquals(2, result.size()); + Assert.assertTrue(result.containsKey(0)); + Assert.assertTrue(result.containsKey(2)); + Assert.assertTrue(result.get(0).get(localNodeId) == 1L); + Assert.assertTrue(result.get(0).get(remoteNode) == 2L); + Assert.assertTrue(result.get(2).get(localNodeId) == 3L); + Assert.assertTrue(result.get(2).get(remoteNode) == 2L); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticOnPartitionExchangeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticOnPartitionExchangeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticOnPartitionExchangeTest.java new file mode 100644 index 0000000..03ea0f7 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticOnPartitionExchangeTest.java @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.transactions; + +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager; +import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.internal.util.typedef.T1; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionIsolation; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE; +import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; +import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; +import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE; + +/** + * + */ +public class TxOptimisticOnPartitionExchangeTest extends GridCommonAbstractTest { + /** Nodes count. */ + private static final int NODES_CNT = 3; + + /** Tx size. */ + private static final int TX_SIZE = 20 * NODES_CNT; + + /** Cache name. */ + private static final String CACHE_NAME = "cache"; + + /** Logger started. */ + private static volatile boolean msgInterception; + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGrids(NODES_CNT); + + awaitPartitionMapExchange(); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setCommunicationSpi(new TestCommunicationSpi(log())); + + cfg.setCacheConfiguration(defaultCacheConfiguration() + .setName(CACHE_NAME) + .setAtomicityMode(TRANSACTIONAL) + .setWriteSynchronizationMode(FULL_SYNC) + .setCacheMode(PARTITIONED) + .setBackups(1)); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testConsistencyOnPartitionExchange() throws Exception { + doTest(SERIALIZABLE, true); + doTest(READ_COMMITTED, true); + doTest(SERIALIZABLE, false); + doTest(READ_COMMITTED, false); + } + + /** + * @param isolation {@link TransactionIsolation}. + * @param txInitiatorPrimary False If the transaction does not use the keys of the node that initiated it. + * @throws Exception If failed. + */ + @SuppressWarnings("unchecked") + public void doTest(final TransactionIsolation isolation, boolean txInitiatorPrimary) throws Exception { + final CountDownLatch txStarted = new CountDownLatch(1); + + final IgniteCache cache = ignite(0).cache(CACHE_NAME); + + final Map<Integer, Integer> txValues = new TreeMap<>(); + + ClusterNode node = ignite(0).cluster().node(); + + GridCacheAffinityManager affinity = ((IgniteCacheProxy)cache).context().affinity(); + + for (int i = 0; txValues.size() < TX_SIZE; i++) { + if (!txInitiatorPrimary && node.equals(affinity.primaryByKey(i, NONE))) + continue; + + txValues.put(i, i); + } + + TestCommunicationSpi.init(); + + msgInterception = true; + + IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() { + try (Transaction tx = ignite(0).transactions().txStart(OPTIMISTIC, isolation)) { + info(">>> TX started."); + + txStarted.countDown(); + + cache.putAll(txValues); + + tx.commit(); + + info(">>> TX committed."); + } + + return null; + } + }); + + txStarted.await(); + + try { + info(">>> Grid starting."); + + IgniteEx ignite = startGrid(NODES_CNT); + + info(">>> Grid started."); + + fut.get(); + + awaitPartitionMapExchange(); + + msgInterception = false; + + IgniteCache<Object, Object> cacheStartedNode = ignite.cache(CACHE_NAME); + + try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + Set<Object> keys = cacheStartedNode.getAll(txValues.keySet()).keySet(); + + assertEquals(txValues.keySet(), new TreeSet<>(keys)); + + tx.commit(); + } + } + finally { + msgInterception = false; + + stopGrid(NODES_CNT); + } + } + + /** + * + */ + @SuppressWarnings("ConstantConditions") + private static class TestCommunicationSpi extends TcpCommunicationSpi { + /** Partition single message sent from added node. */ + private static volatile CountDownLatch partSingleMsgSentFromAddedNode; + + /** Partition supply message sent count. */ + private static final AtomicInteger partSupplyMsgSentCnt = new AtomicInteger(); + + /** Logger. */ + private IgniteLogger log; + + /** + * @param log Logger. + */ + public TestCommunicationSpi(IgniteLogger log) { + this.log = log; + } + + /** + * + */ + public static void init() { + partSingleMsgSentFromAddedNode = new CountDownLatch(1); + + partSupplyMsgSentCnt.set(0); + } + + /** {@inheritDoc} */ + @Override public void sendMessage( + final ClusterNode node, + final Message msg, + final IgniteInClosure<IgniteException> ackC + ) throws IgniteSpiException { + if (msgInterception) { + if (msg instanceof GridIoMessage) { + final Message msg0 = ((GridIoMessage)msg).message(); + + String locNodeId = ((IgniteEx)ignite).context().localNodeId().toString(); + + int nodeIdx = Integer.parseInt(locNodeId.substring(locNodeId.length() - 3)); + + if (nodeIdx == 0) { + if (msg0 instanceof GridNearTxPrepareRequest || msg0 instanceof GridDhtTxPrepareRequest) { + GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + partSingleMsgSentFromAddedNode.await(); + + sendMessage(node, msg, ackC, true); + + return null; + } + }); + + return; + + } + else if (msg0 instanceof GridNearTxFinishRequest || msg0 instanceof GridDhtTxFinishRequest) { + GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + final T1<Integer> i = new T1<>(0); + + while (waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return partSupplyMsgSentCnt.get() > i.get(); + } + }, i.get() == 0 ? 5_000 : 500)) + i.set(partSupplyMsgSentCnt.get()); + + sendMessage(node, msg, ackC, true); + + return null; + } + }); + + return; + } + } + else if (nodeIdx == NODES_CNT && msg0 instanceof GridDhtPartitionsSingleMessage) + partSingleMsgSentFromAddedNode.countDown(); + + if (msg0 instanceof GridDhtPartitionSupplyMessage) + partSupplyMsgSentCnt.incrementAndGet(); + } + } + + sendMessage(node, msg, ackC, msgInterception); + } + + /** + * @param node Node. + * @param msg Message. + * @param ackC Ack closure. + * @param logMsg Log Messages. + */ + private void sendMessage( + final ClusterNode node, + final Message msg, + final IgniteInClosure<IgniteException> ackC, + boolean logMsg + ) throws IgniteSpiException { + if (logMsg) { + String id = node.id().toString(); + String locNodeId = ((IgniteEx)ignite).context().localNodeId().toString(); + + Message msg0 = ((GridIoMessage)msg).message(); + + log.info( + String.format(">>> Output msg[type=%s, fromNode= %s, toNode=%s]", + msg0.getClass().getSimpleName(), + locNodeId.charAt(locNodeId.length() - 1), + id.charAt(id.length() - 1) + ) + ); + } + + super.sendMessage(node, msg, ackC); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java index bb397f7..0612615 100755 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java @@ -133,6 +133,8 @@ import org.apache.ignite.internal.processors.cache.distributed.IgniteCrossCacheT import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheAtomicNearCacheSelfTest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheColocatedTxExceptionSelfTest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheGlobalLoadTest; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionsStateValidationTest; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionsStateValidatorSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheGetStoreErrorSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheNearTxExceptionSelfTest; import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedTxExceptionSelfTest; @@ -292,6 +294,8 @@ public class IgniteCacheTestSuite extends TestSuite { suite.addTestSuite(IgniteCacheSystemTransactionsSelfTest.class); suite.addTestSuite(CacheDeferredDeleteSanitySelfTest.class); suite.addTestSuite(CacheDeferredDeleteQueueTest.class); + suite.addTestSuite(GridCachePartitionsStateValidatorSelfTest.class); + suite.addTestSuite(GridCachePartitionsStateValidationTest.class); suite.addTest(IgniteCacheTcpClientDiscoveryTestSuite.suite()); http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java index f8add30..415479d 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java @@ -31,6 +31,7 @@ import org.apache.ignite.internal.processors.cache.ReplicatedTransactionalPessim import org.apache.ignite.internal.processors.cache.WalModeChangeAdvancedSelfTest; import org.apache.ignite.internal.processors.cache.WalModeChangeCoordinatorNotAffinityNodeSelfTest; import org.apache.ignite.internal.processors.cache.WalModeChangeSelfTest; +import org.apache.ignite.internal.processors.cache.datastructures.IgniteExchangeLatchManagerCoordinatorFailTest; import org.apache.ignite.internal.processors.cache.distributed.CacheExchangeMergeTest; import org.apache.ignite.internal.processors.cache.distributed.CachePartitionStateTest; import org.apache.ignite.internal.processors.cache.distributed.GridCachePartitionEvictionDuringReadThroughSelfTest; @@ -40,6 +41,7 @@ import org.apache.ignite.internal.processors.cache.distributed.IgniteOptimisticT import org.apache.ignite.internal.processors.cache.distributed.IgniteOptimisticTxSuspendResumeTest; import org.apache.ignite.internal.processors.cache.distributed.IgnitePessimisticTxSuspendResumeTest; import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsCacheAssignmentNodeRestartsTest; +import org.apache.ignite.internal.processors.cache.transactions.TxOptimisticOnPartitionExchangeTest; import org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTimeoutNearCacheTest; import org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTimeoutNoDeadlockDetectionTest; import org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTimeoutTest; @@ -93,6 +95,10 @@ public class IgniteCacheTestSuite6 extends TestSuite { suite.addTestSuite(PartitionedTransactionalOptimisticCacheGetsDistributionTest.class); suite.addTestSuite(PartitionedTransactionalPessimisticCacheGetsDistributionTest.class); + suite.addTestSuite(TxOptimisticOnPartitionExchangeTest.class); + + suite.addTestSuite(IgniteExchangeLatchManagerCoordinatorFailTest.class); + return suite; } }