Merge branch master into ignite-3477-master
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3eb05de5 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3eb05de5 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3eb05de5 Branch: refs/heads/ignite-3477-master Commit: 3eb05de5e2ee0d14567167bfe8547441cae69523 Parents: 8122099 aeacad6 Author: Alexey Goncharuk <[email protected]> Authored: Tue Apr 11 13:16:03 2017 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Tue Apr 11 13:16:03 2017 +0300 ---------------------------------------------------------------------- .../communication/GridIoMessageFactory.java | 8 +- .../GridCachePartitionExchangeManager.java | 6 +- .../processors/cache/GridCachePreloader.java | 4 +- .../cache/GridCachePreloaderAdapter.java | 4 +- .../dht/preloader/GridDhtPartitionDemander.java | 2 +- .../dht/preloader/GridDhtPartitionSupplier.java | 8 +- .../GridDhtPartitionSupplyMessage.java | 103 +++-- .../GridDhtPartitionSupplyMessageV2.java | 422 ------------------- .../dht/preloader/GridDhtPreloader.java | 2 +- .../resources/META-INF/classnames.properties | 1 - .../cache/ClusterStateAbstractTest.java | 10 +- .../CacheLateAffinityAssignmentTest.java | 6 +- .../IgniteCacheReadFromBackupTest.java | 6 +- .../atomic/IgniteCacheAtomicProtocolTest.java | 3 +- 14 files changed, 84 insertions(+), 501 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb05de5/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb05de5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb05de5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb05de5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb05de5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb05de5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java index b80ad04,7c2599a..f7f0aff --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java @@@ -332,10 -537,8 +334,10 @@@ class GridDhtPartitionSupplier if (!reply(node, d, s, scId)) return; - s = new GridDhtPartitionSupplyMessageV2(d.updateSequence(), + s = new GridDhtPartitionSupplyMessage(d.updateSequence(), - cctx.cacheId(), d.topologyVersion(), cctx.deploymentEnabled()); + cctx.cacheId(), + d.topologyVersion(), + cctx.deploymentEnabled()); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb05de5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java index cc30321,a01be28..ee461ab --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java @@@ -66,9 -64,9 +64,14 @@@ public class GridDhtPartitionSupplyMess @GridDirectCollection(int.class) private Collection<Integer> missed; ++ /** Partitions for which we were able to get historical iterator. */ ++ @GridToStringInclude ++ @GridDirectCollection(int.class) ++ private Collection<Integer> clean; ++ /** Entries. */ @GridDirectMap(keyType = int.class, valueType = CacheEntryInfoCollection.class) - private Map<Integer, CacheEntryInfoCollection> infos = new HashMap<>(); + private Map<Integer, CacheEntryInfoCollection> infos; /** Message size. */ @GridDirectTransient @@@ -159,6 -142,6 +147,25 @@@ } /** ++ * @param p Partition to clean. ++ */ ++ void clean(int p) { ++ if (clean == null) ++ clean = new HashSet<>(); ++ ++ if (clean.add(p)) ++ msgSize += 4; ++ } ++ ++ /** ++ * @param p Partition to check. ++ * @return Check result. ++ */ ++ boolean isClean(int p) { ++ return clean != null && clean.contains(p); ++ } ++ ++ /** * @param p Missed partition. */ void missed(int p) { @@@ -288,7 -274,7 +298,7 @@@ switch (writer.state()) { case 3: - if (!writer.writeBoolean("ack", ack)) - if (!writer.writeMap("infos", infos, MessageCollectionItemType.INT, MessageCollectionItemType.MSG)) ++ if (!writer.writeCollection("clean", clean, MessageCollectionItemType.INT)) return false; writer.incrementState(); @@@ -312,13 -298,7 +322,13 @@@ writer.incrementState(); case 7: - if (!writer.writeLong("updateSeq", updateSeq)) ++ if (!writer.writeMessage("topVer", topVer)) + return false; + + writer.incrementState(); + + case 8: - if (!writer.writeInt("workerId", workerId)) + if (!writer.writeLong("updateSeq", updateSeq)) return false; writer.incrementState(); @@@ -340,7 -320,7 +350,7 @@@ switch (reader.state()) { case 3: - ack = reader.readBoolean("ack"); - infos = reader.readMap("infos", MessageCollectionItemType.INT, MessageCollectionItemType.MSG, false); ++ clean = reader.readCollection("clean", MessageCollectionItemType.INT); if (!reader.isLastRead()) return false; @@@ -372,15 -352,7 +382,15 @@@ reader.incrementState(); case 7: - updateSeq = reader.readLong("updateSeq"); ++ topVer = reader.readMessage("topVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 8: - workerId = reader.readInt("workerId"); + updateSeq = reader.readLong("updateSeq"); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb05de5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb05de5/modules/core/src/main/resources/META-INF/classnames.properties ---------------------------------------------------------------------- diff --cc modules/core/src/main/resources/META-INF/classnames.properties index 473f176,8c5a72e..335a33f --- a/modules/core/src/main/resources/META-INF/classnames.properties +++ b/modules/core/src/main/resources/META-INF/classnames.properties @@@ -763,9 -740,9 +763,8 @@@ org.apache.ignite.internal.processors.c org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap -org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplier$1 org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplier$SupplyContextPhase org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage - org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2 org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture$1 org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture$2 http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb05de5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClusterStateAbstractTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClusterStateAbstractTest.java index f095e79,0000000..ce7829a mode 100644,000000..100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClusterStateAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClusterStateAbstractTest.java @@@ -1,439 -1,0 +1,439 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.Collection; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage; - import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2; ++import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage; +import org.apache.ignite.internal.util.GridConcurrentHashSet; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; + +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; + +/** + * + */ +@SuppressWarnings("TooBroadScope") +public abstract class ClusterStateAbstractTest extends GridCommonAbstractTest { + /** Entry count. */ + public static final int ENTRY_CNT = 5000; + + /** */ + public static final int GRID_CNT = 4; + + /** */ + private static final String CACHE_NAME = "cache1"; + + /** */ + private static final Collection<Class> forbidden = new GridConcurrentHashSet<>(); + + /** */ + private static AtomicReference<Exception> errEncountered = new AtomicReference<>(); + + /** */ + private boolean activeOnStart = true; + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setActiveOnStart(activeOnStart); + + cfg.setCacheConfiguration(cacheConfiguration(CACHE_NAME)); + + if (client) + cfg.setClientMode(true); + + cfg.setCommunicationSpi(new TestCommunicationSpi()); + + return cfg; + } + + /** + * @param cacheName Cache name. + * @return Cache configuration. + */ + protected abstract CacheConfiguration cacheConfiguration(String cacheName); + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + forbidden.clear(); + + Exception err = errEncountered.getAndSet(null); + + if (err != null) + throw err; + } + + /** + * @throws Exception if failed. + */ + public void testDynamicCacheStart() throws Exception { + activeOnStart = false; + - forbidden.add(GridDhtPartitionSupplyMessageV2.class); ++ forbidden.add(GridDhtPartitionSupplyMessage.class); + forbidden.add(GridDhtPartitionDemandMessage.class); + + startGrids(GRID_CNT); + + checkInactive(GRID_CNT); + + forbidden.clear(); + + grid(0).active(true); + + IgniteCache<Object, Object> cache2 = grid(0).createCache(new CacheConfiguration<>("cache2")); + + for (int k = 0; k < ENTRY_CNT; k++) + cache2.put(k, k); + + grid(0).active(false); + + checkInactive(GRID_CNT); + + stopAllGrids(); + } + + /** + * @throws Exception if failed. + */ + public void testNoRebalancing() throws Exception { + activeOnStart = false; + - forbidden.add(GridDhtPartitionSupplyMessageV2.class); ++ forbidden.add(GridDhtPartitionSupplyMessage.class); + forbidden.add(GridDhtPartitionDemandMessage.class); + + startGrids(GRID_CNT); + + checkInactive(GRID_CNT); + + forbidden.clear(); + + grid(0).active(true); + + awaitPartitionMapExchange(); + + final IgniteCache<Object, Object> cache = grid(0).cache(CACHE_NAME); + + for (int k = 0; k < ENTRY_CNT; k++) + cache.put(k, k); + + for (int g = 0; g < GRID_CNT; g++) { + // Tests that state changes are propagated to existing and new nodes. + assertTrue(grid(g).active()); + + IgniteCache<Object, Object> cache0 = grid(g).cache(CACHE_NAME); + + for (int k = 0; k < ENTRY_CNT; k++) + assertEquals(k, cache0.get(k)); + } + + // Check that new node startup and shutdown works fine after activation. + startGrid(GRID_CNT); + startGrid(GRID_CNT + 1); + + for (int g = 0; g < GRID_CNT + 2; g++) { + IgniteCache<Object, Object> cache0 = grid(g).cache(CACHE_NAME); + + for (int k = 0; k < ENTRY_CNT; k++) + assertEquals("Failed for [grid=" + g + ", key=" + k + ']', k, cache0.get(k)); + } + + stopGrid(GRID_CNT + 1); + + for (int g = 0; g < GRID_CNT + 1; g++) + grid(g).cache(CACHE_NAME).rebalance().get(); + + stopGrid(GRID_CNT); + + for (int g = 0; g < GRID_CNT; g++) { + IgniteCache<Object, Object> cache0 = grid(g).cache(CACHE_NAME); + + for (int k = 0; k < ENTRY_CNT; k++) + assertEquals(k, cache0.get(k)); + } + + grid(0).active(false); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + for (int g = 0; g < GRID_CNT; g++) { + if (grid(g).active()) + return false; + } + + return true; + } + }, 5000); + + checkInactive(GRID_CNT); + - forbidden.add(GridDhtPartitionSupplyMessageV2.class); ++ forbidden.add(GridDhtPartitionSupplyMessage.class); + forbidden.add(GridDhtPartitionDemandMessage.class); + + // Should stop without exchange. + stopAllGrids(); + } + + /** + * @throws Exception if failed. + */ + public void testActivationFromClient() throws Exception { - forbidden.add(GridDhtPartitionSupplyMessageV2.class); ++ forbidden.add(GridDhtPartitionSupplyMessage.class); + forbidden.add(GridDhtPartitionDemandMessage.class); + + activeOnStart = false; + + startGrids(GRID_CNT); + + client = true; + + startGrid(GRID_CNT); + + checkInactive(GRID_CNT + 1); + + Ignite cl = grid(GRID_CNT); + + forbidden.clear(); + + cl.active(true); + + awaitPartitionMapExchange(); + + IgniteCache<Object, Object> cache = cl.cache(CACHE_NAME); + + for (int k = 0; k < ENTRY_CNT; k++) + cache.put(k, k); + + for (int g = 0; g < GRID_CNT + 1; g++) { + // Tests that state changes are propagated to existing and new nodes. + assertTrue(grid(g).active()); + + IgniteCache<Object, Object> cache0 = grid(g).cache(CACHE_NAME); + + for (int k = 0; k < ENTRY_CNT; k++) + assertEquals(k, cache0.get(k)); + } + + cl.active(false); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + for (int g = 0; g < GRID_CNT + 1; g++) { + if (grid(g).active()) + return false; + } + + return true; + } + }, 5000); + + checkInactive(GRID_CNT + 1); + } + + /** + * Tests that state doesn't change until all acquired locks are released. + * + * @throws Exception If fails. + */ + public void testDeactivationWithPendingLock() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-4931"); + + startGrids(GRID_CNT); + + final CountDownLatch finishedLatch = new CountDownLatch(1); + + Lock lock = grid(0).cache(CACHE_NAME).lock(1); + + IgniteInternalFuture<?> fut; + + lock.lock(); + + try { + fut = multithreadedAsync(new Runnable() { + @Override public void run() { + grid(1).active(false); + + finishedLatch.countDown(); + } + }, 1); + + U.sleep(2000); + + assert !fut.isDone(); + + boolean hasActive = false; + + for (int g = 0; g < GRID_CNT; g++) { + IgniteEx grid = grid(g); + + if (grid.active()) { + hasActive = true; + + break; + } + + } + + assertTrue(hasActive); + } + finally { + lock.unlock(); + } + + fut.get(getTestTimeout(), TimeUnit.MILLISECONDS); + + checkInactive(GRID_CNT); + + finishedLatch.await(); + } + + /** + * Tests that state doesn't change until all pending transactions are finished. + * + * @throws Exception If fails. + */ + public void testDeactivationWithPendingTransaction() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-4931"); + + startGrids(GRID_CNT); + + final CountDownLatch finishedLatch = new CountDownLatch(1); + + final Ignite ignite0 = grid(0); + + final IgniteCache<Object, Object> cache0 = ignite0.cache(CACHE_NAME); + + IgniteInternalFuture<?> fut; + + try (Transaction tx = ignite0.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache0.get(1); + + fut = multithreadedAsync(new Runnable() { + @Override public void run() { + ignite0.active(false); + + finishedLatch.countDown(); + } + }, 1); + + U.sleep(2000); + + assert !fut.isDone(); + + boolean hasActive = false; + + for (int g = 0; g < GRID_CNT; g++) { + IgniteEx grid = grid(g); + + if (grid.active()) { + hasActive = true; + + break; + } + + } + + assertTrue(hasActive); + + cache0.put(1, 2); + + tx.commit(); + } + + fut.get(getTestTimeout(), TimeUnit.MILLISECONDS); + + checkInactive(GRID_CNT); + + ignite0.active(true); + + for (int g = 0; g < GRID_CNT; g++) + assertEquals(2, grid(g).cache(CACHE_NAME).get(1)); + + finishedLatch.await(); + } + + /** + * + */ + private void checkInactive(int cnt) { + for (int g = 0; g < cnt; g++) + assertFalse(grid(g).active()); + } + + /** + * + */ + private static class TestCommunicationSpi extends TcpCommunicationSpi { + /** {@inheritDoc} */ + @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) throws IgniteSpiException { + checkForbidden((GridIoMessage)msg); + + super.sendMessage(node, msg, ackC); + } + + /** {@inheritDoc} */ + @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException { + checkForbidden((GridIoMessage)msg); + + super.sendMessage(node, msg); + } + + /** + * @param msg Message to check. + */ + private void checkForbidden(GridIoMessage msg) { + if (forbidden.contains(msg.message().getClass())) { + IgniteSpiException err = new IgniteSpiException("Message is forbidden for this test: " + msg.message()); + + // Set error in case if this exception is not visible to the user code. + errEncountered.compareAndSet(null, err); + + throw err; + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb05de5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java ----------------------------------------------------------------------
