http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java index 5a4ba14..283da80 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java @@ -1009,107 +1009,111 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC */ private void checkEvents(final List<T3<Object, Object, Object>> expEvts, final CacheEventListener2 lsnr, boolean lostAllow, boolean wait) throws Exception { - if (wait) + if (wait) { GridTestUtils.waitForCondition(new PA() { - @Override public boolean apply() { + @Override + public boolean apply() { return expEvts.size() == lsnr.size(); } }, 2000L); + } - Map<Integer, List<CacheEntryEvent<?, ?>>> prevMap = new HashMap<>(lsnr.evts.size()); + synchronized (lsnr) { + Map<Integer, List<CacheEntryEvent<?, ?>>> prevMap = new HashMap<>(lsnr.evts.size()); - for (Map.Entry<Integer, List<CacheEntryEvent<?, ?>>> e : lsnr.evts.entrySet()) - prevMap.put(e.getKey(), new ArrayList<>(e.getValue())); + for (Map.Entry<Integer, List<CacheEntryEvent<?, ?>>> e : lsnr.evts.entrySet()) + prevMap.put(e.getKey(), new ArrayList<>(e.getValue())); - List<T3<Object, Object, Object>> lostEvts = new ArrayList<>(); + List<T3<Object, Object, Object>> lostEvts = new ArrayList<>(); - for (T3<Object, Object, Object> exp : expEvts) { - List<CacheEntryEvent<?, ?>> rcvdEvts = lsnr.evts.get(exp.get1()); + for (T3<Object, Object, Object> exp : expEvts) { + List<CacheEntryEvent<?, ?>> rcvdEvts = lsnr.evts.get(exp.get1()); - if (F.eq(exp.get2(), exp.get3())) - continue; + if (F.eq(exp.get2(), exp.get3())) + continue; - if (rcvdEvts == null || rcvdEvts.isEmpty()) { - lostEvts.add(exp); + if (rcvdEvts == null || rcvdEvts.isEmpty()) { + lostEvts.add(exp); - continue; - } + continue; + } - Iterator<CacheEntryEvent<?, ?>> iter = rcvdEvts.iterator(); + Iterator<CacheEntryEvent<?, ?>> iter = rcvdEvts.iterator(); - boolean found = false; + boolean found = false; - while (iter.hasNext()) { - CacheEntryEvent<?, ?> e = iter.next(); + while (iter.hasNext()) { + CacheEntryEvent<?, ?> e = iter.next(); - if ((exp.get2() != null && e.getValue() != null && exp.get2().equals(e.getValue())) - && equalOldValue(e, exp)) { - found = true; + if ((exp.get2() != null && e.getValue() != null && exp.get2().equals(e.getValue())) + && equalOldValue(e, exp)) { + found = true; - iter.remove(); + iter.remove(); - break; + break; + } } - } - // Lost event is acceptable. - if (!found) - lostEvts.add(exp); - } + // Lost event is acceptable. + if (!found) + lostEvts.add(exp); + } - boolean dup = false; + boolean dup = false; - // Check duplicate. - if (!lsnr.evts.isEmpty()) { - for (List<CacheEntryEvent<?, ?>> evts : lsnr.evts.values()) { - if (!evts.isEmpty()) { - for (CacheEntryEvent<?, ?> e : evts) { - boolean found = false; + // Check duplicate. + if (!lsnr.evts.isEmpty()) { + for (List<CacheEntryEvent<?, ?>> evts : lsnr.evts.values()) { + if (!evts.isEmpty()) { + for (CacheEntryEvent<?, ?> e : evts) { + boolean found = false; - for (T3<Object, Object, Object> lostEvt : lostEvts) { - if (e.getKey().equals(lostEvt.get1()) && e.getValue().equals(lostEvt.get2())) { - found = true; + for (T3<Object, Object, Object> lostEvt : lostEvts) { + if (e.getKey().equals(lostEvt.get1()) && e.getValue().equals(lostEvt.get2())) { + found = true; - lostEvts.remove(lostEvt); + lostEvts.remove(lostEvt); - break; + break; + } } - } - if (!found) { - dup = true; + if (!found) { + dup = true; - break; + break; + } } } } - } - if (dup) { - for (List<CacheEntryEvent<?, ?>> e : lsnr.evts.values()) { - if (!e.isEmpty()) { - for (CacheEntryEvent<?, ?> event : e) - log.error("Got duplicate event: " + event); + if (dup) { + for (List<CacheEntryEvent<?, ?>> e : lsnr.evts.values()) { + if (!e.isEmpty()) { + for (CacheEntryEvent<?, ?> event : e) + log.error("Got duplicate event: " + event); + } } } } - } - if (!lostAllow && lostEvts.size() > 100) { - log.error("Lost event cnt: " + lostEvts.size()); + if (!lostAllow && lostEvts.size() > 100) { + log.error("Lost event cnt: " + lostEvts.size()); - for (T3<Object, Object, Object> e : lostEvts) - log.error("Lost event: " + e); + for (T3<Object, Object, Object> e : lostEvts) + log.error("Lost event: " + e); - fail("Lose events, see log for details."); - } + fail("Lose events, see log for details."); + } - log.error("Lost event cnt: " + lostEvts.size()); + log.error("Lost event cnt: " + lostEvts.size()); - expEvts.clear(); + expEvts.clear(); - lsnr.evts.clear(); - lsnr.vals.clear(); + lsnr.evts.clear(); + lsnr.vals.clear(); + } } /** @@ -2111,7 +2115,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC /** * @return Count events. */ - public int size() { + public synchronized int size() { int size = 0; for (List<CacheEntryEvent<?, ?>> e : evts.values())
http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java index 5f5dfd4..db59a7f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java @@ -62,6 +62,7 @@ import org.apache.ignite.internal.util.typedef.PA; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; @@ -128,6 +129,8 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo cfg.setDiscoverySpi(disco); + ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1); + return cfg; } http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ClosureServiceClientsNodesTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ClosureServiceClientsNodesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ClosureServiceClientsNodesTest.java index b529b6c..49c6968 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ClosureServiceClientsNodesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ClosureServiceClientsNodesTest.java @@ -27,6 +27,7 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.lang.IgniteCallable; import org.apache.ignite.marshaller.optimized.OptimizedMarshaller; @@ -38,6 +39,7 @@ import org.apache.ignite.services.ServiceDescriptor; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; /** @@ -171,14 +173,20 @@ public class ClosureServiceClientsNodesTest extends GridCommonAbstractTest { for (int i = 0 ; i < NODES_CNT; i++) { log.info("Iteration: " + i); - Ignite ignite = grid(i); + final Ignite ignite = grid(i); ignite.services().deployNodeSingleton(SINGLETON_NAME, new TestService()); - ClusterGroup grp = ignite.cluster(); + final ClusterGroup grp = ignite.cluster(); assertEquals(NODES_CNT, grp.nodes().size()); + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return ignite.services(grp).serviceDescriptors().size() == 1; + } + }, 5000); + Collection<ServiceDescriptor> srvDscs = ignite.services(grp).serviceDescriptors(); assertEquals(1, srvDscs.size()); @@ -206,14 +214,20 @@ public class ClosureServiceClientsNodesTest extends GridCommonAbstractTest { for (int i = 0 ; i < NODES_CNT; i++) { log.info("Iteration: " + i); - Ignite ignite = grid(i); + final Ignite ignite = grid(i); ignite.services(ignite.cluster().forClients()).deployNodeSingleton(SINGLETON_NAME, new TestService()); - ClusterGroup grp = ignite.cluster(); + final ClusterGroup grp = ignite.cluster(); assertEquals(NODES_CNT, grp.nodes().size()); + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return ignite.services(grp).serviceDescriptors().size() == 1; + } + }, 5000); + Collection<ServiceDescriptor> srvDscs = ignite.services(grp).serviceDescriptors(); assertEquals(1, srvDscs.size()); http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java index dfea37a..92b18ab 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java @@ -17,15 +17,18 @@ package org.apache.ignite.internal.processors.service; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteServices; import org.apache.ignite.Ignition; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.services.Service; import org.apache.ignite.services.ServiceContext; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; /** @@ -49,10 +52,8 @@ public class GridServiceProcessorStopSelfTest extends GridCommonAbstractTest { final Ignite ignite = startGrid(0); - Thread t = new Thread(new Runnable() { - @Override public void run() { - Thread.currentThread().setName("deploy-thread"); - + IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { IgniteServices svcs = ignite.services(); IgniteServices services = svcs.withAsync(); @@ -67,13 +68,13 @@ public class GridServiceProcessorStopSelfTest extends GridCommonAbstractTest { catch (IgniteException e) { finishLatch.countDown(); } - catch (Throwable e) { - log.error("Service deployment error: ", e); + finally { + finishLatch.countDown(); } - } - }); - t.start(); + return null; + } + }, "deploy-thread"); depLatch.await(); @@ -85,6 +86,8 @@ public class GridServiceProcessorStopSelfTest extends GridCommonAbstractTest { U.dumpThreads(log); assertTrue("Deploy future isn't completed", wait); + + fut.get(); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java index 731b0c7..7bbf531 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java @@ -37,6 +37,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; +import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.nio.GridCommunicationClient; import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor; @@ -55,6 +56,7 @@ import org.apache.ignite.spi.communication.GridTestMessage; import org.apache.ignite.testframework.GridSpiTestContext; import org.apache.ignite.testframework.GridTestNode; import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.GridTestKernalContext; import org.apache.ignite.testframework.junits.IgniteTestResources; import org.apache.ignite.testframework.junits.spi.GridSpiAbstractTest; import org.jsr166.ConcurrentLinkedDeque8; @@ -90,6 +92,9 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest extends GridSpiAbstrac /** Initialized nodes */ private static final List<ClusterNode> nodes = new ArrayList<>(); + /** */ + private static GridTimeoutProcessor timeoutProcessor; + /** Flag indicating if listener should reject messages. */ private static boolean reject; @@ -472,6 +477,12 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest extends GridSpiAbstrac Map<ClusterNode, GridSpiTestContext> ctxs = new HashMap<>(); + timeoutProcessor = new GridTimeoutProcessor(new GridTestKernalContext(log)); + + timeoutProcessor.start(); + + timeoutProcessor.onKernalStart(); + for (int i = 0; i < getSpiCount(); i++) { CommunicationSpi<Message> spi = newCommunicationSpi(); @@ -485,6 +496,8 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest extends GridSpiAbstrac GridSpiTestContext ctx = initSpiContext(); + ctx.timeoutProcessor(timeoutProcessor); + ctx.setLocalNode(node); info(">>> Initialized context: nodeId=" + ctx.localNode().id()); @@ -548,6 +561,14 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest extends GridSpiAbstrac /** {@inheritDoc} */ @Override protected void afterTestsStopped() throws Exception { + if (timeoutProcessor != null) { + timeoutProcessor.onKernalStop(true); + + timeoutProcessor.stop(true); + + timeoutProcessor = null; + } + for (CommunicationSpi<Message> spi : spis.values()) { spi.onContextDestroyed(); http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java index 5af0596..0df7da6 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java @@ -897,8 +897,8 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { */ public void testIpFinderCleaning() throws Exception { try { - ipFinder.registerAddresses(Arrays.asList(new InetSocketAddress("host1", 1024), - new InetSocketAddress("host2", 1024))); + ipFinder.registerAddresses(Arrays.asList(new InetSocketAddress("1.1.1.1", 1024), + new InetSocketAddress("1.1.1.2", 1024))); Ignite g1 = startGrid(1); @@ -912,13 +912,19 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { } }, timeout); + if (ipFinder.getRegisteredAddresses().size() != 1) { + log.error("Failed to wait for IP cleanup, will dump threads."); + + U.dumpThreads(log); + } + assert ipFinder.getRegisteredAddresses().size() == 1 : "ipFinder=" + ipFinder.getRegisteredAddresses(); // Check that missing addresses are returned back. ipFinder.unregisterAddresses(ipFinder.getRegisteredAddresses()); // Unregister valid address. - ipFinder.registerAddresses(Arrays.asList(new InetSocketAddress("host1", 1024), - new InetSocketAddress("host2", 1024))); + ipFinder.registerAddresses(Arrays.asList(new InetSocketAddress("1.1.1.1", 1024), + new InetSocketAddress("1.1.1.2", 1024))); GridTestUtils.waitForCondition(new GridAbsPredicate() { @Override public boolean apply() { http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java index e257a97..0bffe8b 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java @@ -41,6 +41,8 @@ import org.apache.ignite.internal.managers.communication.GridIoManager; import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; +import org.apache.ignite.internal.processors.timeout.GridSpiTimeoutObject; +import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.extensions.communication.MessageFactory; @@ -89,6 +91,16 @@ public class GridSpiTestContext implements IgniteSpiContext { /** */ private MessageFactory factory; + /** */ + private GridTimeoutProcessor timeoutProcessor; + + /** + * @param timeoutProcessor Timeout processor. + */ + public void timeoutProcessor(GridTimeoutProcessor timeoutProcessor) { + this.timeoutProcessor = timeoutProcessor; + } + /** {@inheritDoc} */ @Override public Collection<ClusterNode> remoteNodes() { return rmtNodes; @@ -530,12 +542,14 @@ public class GridSpiTestContext implements IgniteSpiContext { /** {@inheritDoc} */ @Override public void addTimeoutObject(IgniteSpiTimeoutObject obj) { - // No-op. + if (timeoutProcessor != null) + timeoutProcessor.addTimeoutObject(new GridSpiTimeoutObject(obj)); } /** {@inheritDoc} */ @Override public void removeTimeoutObject(IgniteSpiTimeoutObject obj) { - // No-op. + if (timeoutProcessor != null) + timeoutProcessor.removeTimeoutObject(new GridSpiTimeoutObject(obj)); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/web/src/main/java/org/apache/ignite/cache/websession/WebSessionFilter.java ---------------------------------------------------------------------- diff --git a/modules/web/src/main/java/org/apache/ignite/cache/websession/WebSessionFilter.java b/modules/web/src/main/java/org/apache/ignite/cache/websession/WebSessionFilter.java index 77e2dae..4a84931 100644 --- a/modules/web/src/main/java/org/apache/ignite/cache/websession/WebSessionFilter.java +++ b/modules/web/src/main/java/org/apache/ignite/cache/websession/WebSessionFilter.java @@ -38,14 +38,16 @@ import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteTransactions; -import org.apache.ignite.cache.CachePartialUpdateException; +import org.apache.ignite.cluster.ClusterTopologyException; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteClosure; +import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.startup.servlet.ServletContextListenerStartup; import org.apache.ignite.transactions.Transaction; @@ -191,6 +193,9 @@ public class WebSessionFilter implements Filter { /** Transactions enabled flag. */ private boolean txEnabled; + /** */ + private int retries; + /** {@inheritDoc} */ @Override public void init(FilterConfig cfg) throws ServletException { ctx = cfg.getServletContext(); @@ -207,8 +212,6 @@ public class WebSessionFilter implements Filter { cfg.getInitParameter(WEB_SES_MAX_RETRIES_ON_FAIL_NAME_PARAM), ctx.getInitParameter(WEB_SES_MAX_RETRIES_ON_FAIL_NAME_PARAM)); - int retries; - try { retries = retriesStr != null ? Integer.parseInt(retriesStr) : DFLT_MAX_RETRIES_ON_FAIL; } @@ -226,10 +229,6 @@ public class WebSessionFilter implements Filter { log = webSesIgnite.log(); - if (webSesIgnite == null) - throw new IgniteException("Grid for web sessions caching is not started (is it configured?): " + - gridName); - cache = webSesIgnite.cache(cacheName); if (cache == null) @@ -409,41 +408,62 @@ public class WebSessionFilter implements Filter { WebSession cached = new WebSession(ses, true); - try { - while (true) { - try { - IgniteCache<String, WebSession> cache0; - - if (cached.getMaxInactiveInterval() > 0) { - long ttl = cached.getMaxInactiveInterval() * 1000; + for (int i = 0; i < retries; i++) { + try { + IgniteCache<String, WebSession> cache0; - ExpiryPolicy plc = new ModifiedExpiryPolicy(new Duration(MILLISECONDS, ttl)); + if (cached.getMaxInactiveInterval() > 0) { + long ttl = cached.getMaxInactiveInterval() * 1000; - cache0 = cache.withExpiryPolicy(plc); - } - else - cache0 = cache; + ExpiryPolicy plc = new ModifiedExpiryPolicy(new Duration(MILLISECONDS, ttl)); - WebSession old = cache0.getAndPutIfAbsent(sesId, cached); + cache0 = cache.withExpiryPolicy(plc); + } + else + cache0 = cache; - if (old != null) { - cached = old; + WebSession old = cache0.getAndPutIfAbsent(sesId, cached); - if (cached.isNew()) - cached = new WebSession(cached, false); - } + if (old != null) { + cached = old; - break; + if (cached.isNew()) + cached = new WebSession(cached, false); } - catch (CachePartialUpdateException e) { + + break; + } + catch (CacheException | IgniteException e) { + if (log.isDebugEnabled()) + log.debug(e.getMessage()); + + if (i == retries - 1) + throw new IgniteException("Failed to save session: " + sesId, e); + else { if (log.isDebugEnabled()) - log.debug(e.getMessage()); + log.debug("Failed to save session (will retry): " + sesId); + + IgniteFuture<?> retryFut = null; + + if (X.hasCause(e, ClusterTopologyException.class)) { + ClusterTopologyException cause = X.cause(e, ClusterTopologyException.class); + + assert cause != null : e; + + retryFut = cause.retryReadyFuture(); + } + + if (retryFut != null) { + try { + retryFut.get(); + } + catch (IgniteException retryErr) { + throw new IgniteException("Failed to save session: " + sesId, retryErr); + } + } } } } - catch (CacheException e) { - throw new IgniteException("Failed to save session: " + sesId, e); - } return cached; } http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/web/src/main/java/org/apache/ignite/cache/websession/WebSessionListener.java ---------------------------------------------------------------------- diff --git a/modules/web/src/main/java/org/apache/ignite/cache/websession/WebSessionListener.java b/modules/web/src/main/java/org/apache/ignite/cache/websession/WebSessionListener.java index 82f1633..b826031 100644 --- a/modules/web/src/main/java/org/apache/ignite/cache/websession/WebSessionListener.java +++ b/modules/web/src/main/java/org/apache/ignite/cache/websession/WebSessionListener.java @@ -30,12 +30,14 @@ import javax.cache.processor.EntryProcessor; import javax.cache.processor.MutableEntry; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; -import org.apache.ignite.cache.CachePartialUpdateException; -import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.cluster.ClusterTopologyException; import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteFuture; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -117,7 +119,7 @@ class WebSessionListener { break; } - catch (CachePartialUpdateException ignored) { + catch (CacheException | IgniteException e) { if (i == retries - 1) { U.warn(log, "Failed to apply updates for session (maximum number of retries exceeded) [sesId=" + sesId + ", retries=" + retries + ']'); @@ -125,12 +127,25 @@ class WebSessionListener { else { U.warn(log, "Failed to apply updates for session (will retry): " + sesId); - U.sleep(RETRY_DELAY); + IgniteFuture<?> retryFut = null; + + if (X.hasCause(e, ClusterTopologyException.class)) { + ClusterTopologyException cause = X.cause(e, ClusterTopologyException.class); + + assert cause != null : e; + + retryFut = cause.retryReadyFuture(); + } + + if (retryFut != null) + retryFut.get(); + else + U.sleep(RETRY_DELAY); } } } } - catch (CacheException | IgniteInterruptedCheckedException e) { + catch (Exception e) { U.error(log, "Failed to update session attributes [id=" + sesId + ']', e); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/web/src/test/java/org/apache/ignite/internal/websession/WebSessionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/web/src/test/java/org/apache/ignite/internal/websession/WebSessionSelfTest.java b/modules/web/src/test/java/org/apache/ignite/internal/websession/WebSessionSelfTest.java index 4508edb..7a321d6 100644 --- a/modules/web/src/test/java/org/apache/ignite/internal/websession/WebSessionSelfTest.java +++ b/modules/web/src/test/java/org/apache/ignite/internal/websession/WebSessionSelfTest.java @@ -142,7 +142,6 @@ public class WebSessionSelfTest extends GridCommonAbstractTest { } assert idx != -1; - assert srv != null; stopServer(srv); @@ -181,7 +180,6 @@ public class WebSessionSelfTest extends GridCommonAbstractTest { } assert idx != -1; - assert srv != null; int port = TEST_JETTY_PORT + idx;