http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index a42eb98..8469a7c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -38,6 +38,7 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.binary.BinaryObjectEx; import org.apache.ignite.internal.processors.GridProcessorAdapter; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryImpl; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.CacheObjectContext; @@ -145,6 +146,9 @@ public class GridQueryProcessor extends GridProcessorAdapter { /** */ private final GridQueryIndexing idx; + /** */ + private static final ThreadLocal<AffinityTopologyVersion> requestTopVer = new ThreadLocal<>(); + /** * @param ctx Kernal context. */ @@ -878,7 +882,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { sqlQry, F.asList(params), typeDesc, - idx.backupFilter(null, null, null)); + idx.backupFilter(null, requestTopVer.get(), null)); sendQueryExecutedEvent( sqlQry, @@ -964,7 +968,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { Object[] args = qry.getArgs(); final GridQueryFieldsResult res = idx.queryFields(space, sql, F.asList(args), - idx.backupFilter(null, null, null)); + idx.backupFilter(null, requestTopVer.get(), null)); sendQueryExecutedEvent(sql, args); @@ -1815,6 +1819,20 @@ public class GridQueryProcessor extends GridProcessorAdapter { } /** + * @param ver Version. + */ + public static void setRequestAffinityTopologyVersion(AffinityTopologyVersion ver) { + requestTopVer.set(ver); + } + + /** + * @return Affinity topology version of the current request. + */ + public static AffinityTopologyVersion getRequestAffinityTopologyVersion() { + return requestTopVer.get(); + } + + /** * Description of type property. */ private static class ClassProperty extends GridQueryProperty {
http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java index 415d632..00ea29e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java @@ -73,6 +73,7 @@ import org.apache.ignite.internal.util.typedef.CO; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -110,6 +111,9 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject { /** Split size threshold. */ private static final int SPLIT_WARN_THRESHOLD = 1000; + /** Retry delay factor (ms). Retry delay = retryAttempt * RETRY_DELAY_MS */ + private static final long RETRY_DELAY_MS = 10; + /** {@code True} for internal tasks. */ private boolean internal; @@ -192,7 +196,19 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject { private final Object affKey; /** */ - private final String affCache; + private final int affPartId; + + /** */ + private final String affCacheName; + + /** */ + private final int[] affCacheIds; + + /** */ + private AffinityTopologyVersion mapTopVer; + + /** */ + private int retryAttemptCnt; /** */ private final UUID subjId; @@ -308,12 +324,27 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject { if (task instanceof AffinityTask) { AffinityTask affTask = (AffinityTask)task; + assert affTask.affinityCacheNames() != null : affTask; + assert affTask.partition() >= 0 : affTask; + + affPartId = affTask.partition(); + affCacheName = F.first(affTask.affinityCacheNames()); affKey = affTask.affinityKey(); - affCache = affTask.affinityCacheName(); + mapTopVer = affTask.topologyVersion(); + + affCacheIds = new int[affTask.affinityCacheNames().size()]; + int i = 0; + for (String cacheName : affTask.affinityCacheNames()) { + affCacheIds[i] = CU.cacheId(cacheName); + ++i; + } } else { + affPartId = -1; + affCacheName = null; affKey = null; - affCache = null; + mapTopVer = null; + affCacheIds = null; } } @@ -469,7 +500,7 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject { // Nodes are ignored by affinity tasks. final List<ClusterNode> shuffledNodes = - affKey == null ? getTaskTopology() : Collections.<ClusterNode>emptyList(); + affCacheIds == null ? getTaskTopology() : Collections.<ClusterNode>emptyList(); // Load balancer. ComputeLoadBalancer balancer = ctx.loadBalancing().getLoadBalancer(ses, shuffledNodes); @@ -818,6 +849,7 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject { return; } + boolean retry = false; synchronized (mux) { // If task is not waiting for responses, // then there is no point to proceed. @@ -829,54 +861,76 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject { return; } - switch (plc) { - // Start reducing all results received so far. - case REDUCE: { - state = State.REDUCING; + if (res.retry()) { + // Retry is used only with affinity call / run. + assert affCacheIds != null; + retry = true; - break; - } + mapTopVer = U.max(res.getRetryTopologyVersion(), ctx.discovery().topologyVersionEx()); + affFut = ctx.cache().context().exchange().affinityReadyFuture(mapTopVer); - // Keep waiting if there are more responses to come, - // otherwise, reduce. - case WAIT: { - assert results.size() <= this.jobRes.size(); + if (affFut != null && !affFut.isDone()) { + waitForAffTop = true; - // If there are more results to wait for. - // If result cache is disabled, then we reduce - // when both collections are empty. - if (results.size() == this.jobRes.size()) { - plc = ComputeJobResultPolicy.REDUCE; - - // All results are received, proceed to reduce method. + jobRes.resetResponse(); + } + } else { + switch (plc) { + // Start reducing all results received so far. + case REDUCE: { state = State.REDUCING; + + break; } - break; - } + // Keep waiting if there are more responses to come, + // otherwise, reduce. + case WAIT: { + assert results.size() <= this.jobRes.size(); + + // If there are more results to wait for. + // If result cache is disabled, then we reduce + // when both collections are empty. + if (results.size() == this.jobRes.size()) { + plc = ComputeJobResultPolicy.REDUCE; - case FAILOVER: { - if (affKey != null) { - AffinityTopologyVersion topVer = ctx.discovery().topologyVersionEx(); + // All results are received, proceed to reduce method. + state = State.REDUCING; + } - affFut = ctx.cache().context().exchange().affinityReadyFuture(topVer); + break; } - if (affFut != null && !affFut.isDone()) { - waitForAffTop = true; + case FAILOVER: { + if (affCacheIds != null) { + mapTopVer = ctx.discovery().topologyVersionEx(); - jobRes.resetResponse(); - } - else if (!failover(res, jobRes, getTaskTopology())) - plc = null; + affFut = ctx.cache().context().exchange().affinityReadyFuture(mapTopVer); + } + + if (affFut != null && !affFut.isDone()) { + waitForAffTop = true; - break; + jobRes.resetResponse(); + } + else if (!failover(res, jobRes, getTaskTopology())) + plc = null; + + break; + } } } } // Outside of synchronization. - if (plc != null && !waitForAffTop) { + if (retry && !waitForAffTop) { + // Handle retry + retryAttemptCnt++; + + final long wait = retryAttemptCnt * RETRY_DELAY_MS; + sendRetryRequest(wait, jobRes, res); + } + else if (plc != null && !waitForAffTop && !retry) { // Handle failover. if (plc == FAILOVER) sendFailoverRequest(jobRes); @@ -928,6 +982,36 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject { } /** + * @param waitms Waitms. + * @param jRes Job result. + * @param resp Job responce. + */ + private void sendRetryRequest(final long waitms, final GridJobResultImpl jRes, final GridJobExecuteResponse resp) { + ctx.timeout().schedule(new Runnable() { + @Override public void run() { + ctx.closure().runLocalSafe(new Runnable() { + @Override public void run() { + try { + ClusterNode newNode = ctx.affinity().mapPartitionToNode(affCacheName, affPartId, + mapTopVer); + + if(!checkTargetNode(resp, jRes, newNode)) + return; + + sendRequest(jRes); + } + catch (Exception e) { + U.error(log, "Failed to re-map job or retry request [ses=" + ses + "]", e); + + finishTask(null, e); + } + } + }, false); + } + }, waitms, -1); + } + + /** * @param jobRes Job result. * @param results Existing job results. * @return Job result policy. @@ -1083,53 +1167,63 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject { try { ctx.resource().invokeAnnotated(dep, jobRes.getJob(), ComputeJobBeforeFailover.class); - // Map to a new node. - ClusterNode node = ctx.failover().failover(ses, jobRes, new ArrayList<>(top), affKey, affCache); + ClusterNode node = ctx.failover().failover(ses, jobRes, new ArrayList<>(top), affPartId, + affKey, affCacheName, mapTopVer); - if (node == null) { - String msg = "Failed to failover a job to another node (failover SPI returned null) [job=" + - jobRes.getJob() + ", node=" + jobRes.getNode() + ']'; + return checkTargetNode(res, jobRes, node); + } + // Catch Throwable to protect against bad user code. + catch (Throwable e) { + String errMsg = "Failed to failover job due to undeclared user exception [job=" + + jobRes.getJob() + ", err=" + e + ']'; - if (log.isDebugEnabled()) - log.debug(msg); + U.error(log, errMsg, e); - Throwable e = new ClusterTopologyCheckedException(msg, jobRes.getException()); + finishTask(null, new ComputeUserUndeclaredException(errMsg, e)); - finishTask(null, e); + if (e instanceof Error) + throw (Error)e; - return false; - } + return false; + } + } + + /** + * @param res Execution response. + * @param jobRes Job result. + * @param node New target node. + * @return {@code True} if new target node is not null. + */ + private boolean checkTargetNode(GridJobExecuteResponse res, GridJobResultImpl jobRes, ClusterNode node) { + if (node == null) { + String msg = "Failed to failover a job to another node (failover SPI returned null) [job=" + + jobRes.getJob() + ", node=" + jobRes.getNode() + ']'; if (log.isDebugEnabled()) - log.debug("Resolved job failover [newNode=" + node + ", oldNode=" + jobRes.getNode() + - ", job=" + jobRes.getJob() + ", resMsg=" + res + ']'); + log.debug(msg); + + Throwable e = new ClusterTopologyCheckedException(msg, jobRes.getException()); + + finishTask(null, e); + + return false; + } + if (log.isDebugEnabled()) + log.debug("Resolved job failover [newNode=" + node + ", oldNode=" + jobRes.getNode() + + ", job=" + jobRes.getJob() + ", resMsg=" + res + ']'); + + synchronized (mux) { jobRes.setNode(node); jobRes.resetResponse(); if (!resCache) { - synchronized (mux) { // Store result back in map before sending. this.jobRes.put(res.getJobId(), jobRes); - } } - - return true; } - // Catch Throwable to protect against bad user code. - catch (Throwable e) { - String errMsg = "Failed to failover job due to undeclared user exception [job=" + - jobRes.getJob() + ", err=" + e + ']'; - U.error(log, errMsg, e); - - finishTask(null, new ComputeUserUndeclaredException(errMsg, e)); - - if (e instanceof Error) - throw (Error)e; - - return false; - } + return true; } /** @@ -1227,7 +1321,7 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject { ctx.resource().invokeAnnotated(dep, res.getJob(), ComputeJobAfterSend.class); GridJobExecuteResponse fakeRes = new GridJobExecuteResponse(node.id(), ses.getId(), - res.getJobContext().getJobId(), null, null, null, null, null, null, false); + res.getJobContext().getJobId(), null, null, null, null, null, null, false, null); fakeRes.setFakeException(new ClusterTopologyException("Failed to send job due to node failure: " + node)); @@ -1272,7 +1366,10 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject { forceLocDep, ses.isFullSupport(), internal, - subjId); + subjId, + affCacheIds, + affPartId, + mapTopVer); if (loc) ctx.job().processJobExecuteRequest(ctx.discovery().localNode(), req); @@ -1319,7 +1416,7 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject { } GridJobExecuteResponse fakeRes = new GridJobExecuteResponse(node.id(), ses.getId(), - res.getJobContext().getJobId(), null, null, null, null, null, null, false); + res.getJobContext().getJobId(), null, null, null, null, null, null, false, null); if (fakeErr == null) fakeErr = U.convertException(e); @@ -1351,7 +1448,7 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject { // Artificial response in case if a job is waiting for a response from // non-existent node. GridJobExecuteResponse fakeRes = new GridJobExecuteResponse(nodeId, ses.getId(), - jr.getJobContext().getJobId(), null, null, null, null, null, null, false); + jr.getJobContext().getJobId(), null, null, null, null, null, null, false, null); fakeRes.setFakeException(new ClusterTopologyException("Node has left grid: " + nodeId)); http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index 269795b..a480b87 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -9602,4 +9602,14 @@ public abstract class IgniteUtils { return "<failed to find active thread " + threadId + '>'; } + + /** + * @param t0 Comparable object. + * @param t1 Comparable object. + * @param <T> Comparable type. + * @return Maximal object o t0 and t1. + */ + public static <T extends Comparable<? super T>> T max(T t0, T t1) { + return t0.compareTo(t1) > 0 ? t0 : t1; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java b/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java index 1108ad1..b126db1 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java @@ -17,6 +17,7 @@ package org.apache.ignite.spi.failover; +import java.util.Collection; import java.util.List; import org.apache.ignite.IgniteCompute; import org.apache.ignite.IgniteException; @@ -58,14 +59,24 @@ public interface FailoverContext { public ClusterNode getBalancedNode(List<ClusterNode> top) throws IgniteException; /** - * Gets affinity key for {@link IgniteCompute#affinityRun(String, Object, IgniteRunnable)} - * and {@link IgniteCompute#affinityCall(String, Object, IgniteCallable)}. + * Gets affinity key for {@link IgniteCompute#affinityRun(String, Object, IgniteRunnable)}, + * {@link IgniteCompute#affinityRun(Collection, Object, IgniteRunnable)}, + * {@link IgniteCompute#affinityCall(String, Object, IgniteCallable)} + * and {@link IgniteCompute#affinityCall(Collection, Object, IgniteCallable)}. * * @return Affinity key. */ @Nullable public Object affinityKey(); /** + * Gets partition for {@link IgniteCompute#affinityRun(Collection, int, IgniteRunnable)} + * and {@link IgniteCompute#affinityCall(Collection, int, IgniteCallable)}. + * + * @return Partition number. + */ + public int partition(); + + /** * Returns affinity cache name {@link IgniteCompute#affinityRun(String, Object, IgniteRunnable)} * and {@link IgniteCompute#affinityCall(String, Object, IgniteCallable)}. * http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java index 77b3745..63c990e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java @@ -23,9 +23,12 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.UUID; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.managers.failover.GridFailoverContextImpl; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -189,7 +192,7 @@ public class AlwaysFailoverSpi extends IgniteSpiAdapter implements FailoverSpi, return null; } - if (ctx.affinityKey() != null) { + if (ctx.partition() >= 0) { Integer affCallAttempt = ctx.getJobResult().getJobContext().getAttribute(AFFINITY_CALL_ATTEMPT); if (affCallAttempt == null) @@ -205,7 +208,15 @@ public class AlwaysFailoverSpi extends IgniteSpiAdapter implements FailoverSpi, else { ctx.getJobResult().getJobContext().setAttribute(AFFINITY_CALL_ATTEMPT, affCallAttempt + 1); - return ignite.affinity(ctx.affinityCacheName()).mapKeyToNode(ctx.affinityKey()); + try { + return ((IgniteEx)ignite).context().affinity().mapPartitionToNode(ctx.affinityCacheName(), ctx.partition(), + ((GridFailoverContextImpl)ctx).affinityTopologyVersion()); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to get map job to node on failover: " + ctx, e); + + return null; + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/test/java/org/apache/ignite/internal/GridJobMasterLeaveAwareSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridJobMasterLeaveAwareSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridJobMasterLeaveAwareSelfTest.java index a7cab3f..a484ec3 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridJobMasterLeaveAwareSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridJobMasterLeaveAwareSelfTest.java @@ -464,7 +464,7 @@ public class GridJobMasterLeaveAwareSelfTest extends GridCommonAbstractTest { ClusterNode node = F.first(prj.nodes()); - comp.affinityRun(null, keyForNode(aff, node), new TestRunnable()); + comp.affinityRun((String)null, keyForNode(aff, node), new TestRunnable()); return comp.future(); } @@ -483,7 +483,7 @@ public class GridJobMasterLeaveAwareSelfTest extends GridCommonAbstractTest { ClusterNode node = F.first(prj.nodes()); - comp.affinityCall(null, keyForNode(aff, node), new TestCallable()); + comp.affinityCall((String)null, keyForNode(aff, node), new TestCallable()); return comp.future(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionLocalJobMultipleArgumentsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionLocalJobMultipleArgumentsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionLocalJobMultipleArgumentsSelfTest.java index 356e002..fc94663 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionLocalJobMultipleArgumentsSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionLocalJobMultipleArgumentsSelfTest.java @@ -88,7 +88,7 @@ public class GridProjectionLocalJobMultipleArgumentsSelfTest extends GridCommonA Collection<Integer> res = new ArrayList<>(); for (int i : F.asList(1, 2, 3)) { - res.add(grid().compute().affinityCall(null, i, new IgniteCallable<Integer>() { + res.add(grid().compute().affinityCall((String)null, i, new IgniteCallable<Integer>() { @Override public Integer call() { ids.add(this); @@ -106,7 +106,7 @@ public class GridProjectionLocalJobMultipleArgumentsSelfTest extends GridCommonA */ public void testAffinityRun() throws Exception { for (int i : F.asList(1, 2, 3)) { - grid().compute().affinityRun(null, i, new IgniteRunnable() { + grid().compute().affinityRun((String)null, i, new IgniteRunnable() { @Override public void run() { ids.add(this); http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/test/java/org/apache/ignite/internal/GridTaskFailoverAffinityRunTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridTaskFailoverAffinityRunTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridTaskFailoverAffinityRunTest.java index f804cb3..7997560 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridTaskFailoverAffinityRunTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridTaskFailoverAffinityRunTest.java @@ -137,7 +137,7 @@ public class GridTaskFailoverAffinityRunTest extends GridCommonAbstractTest { Collection<IgniteFuture<?>> futs = new ArrayList<>(1000); for (int i = 0; i < 1000; i++) { - comp.affinityCall(null, i, new TestJob()); + comp.affinityCall((String)null, i, new TestJob()); IgniteFuture<?> fut0 = comp.future(); http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeEmptyClusterGroupTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeEmptyClusterGroupTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeEmptyClusterGroupTest.java index 48039a5..b595fee 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeEmptyClusterGroupTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeEmptyClusterGroupTest.java @@ -82,7 +82,7 @@ public class IgniteComputeEmptyClusterGroupTest extends GridCommonAbstractTest { IgniteCompute comp = ignite(0).compute(empty).withAsync(); - comp.affinityRun(null, 1, new FailRunnable()); + comp.affinityRun((String)null, 1, new FailRunnable()); checkFutureFails(comp); @@ -90,7 +90,7 @@ public class IgniteComputeEmptyClusterGroupTest extends GridCommonAbstractTest { checkFutureFails(comp); - comp.affinityCall(null, 1, new FailCallable()); + comp.affinityCall((String)null, 1, new FailCallable()); checkFutureFails(comp); @@ -112,7 +112,7 @@ public class IgniteComputeEmptyClusterGroupTest extends GridCommonAbstractTest { GridTestUtils.assertThrows(log, new Callable<Void>() { @Override public Void call() throws Exception { - comp.affinityRun(null, 1, new FailRunnable()); + comp.affinityRun((String)null, 1, new FailRunnable()); return null; } @@ -129,7 +129,7 @@ public class IgniteComputeEmptyClusterGroupTest extends GridCommonAbstractTest { GridTestUtils.assertThrows(log, new Callable<Void>() { @Override public Void call() throws Exception { - comp.affinityCall(null, 1, new FailCallable()); + comp.affinityCall((String)null, 1, new FailCallable()); return null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/test/java/org/apache/ignite/internal/binary/GridBinaryAffinityKeySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/binary/GridBinaryAffinityKeySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/binary/GridBinaryAffinityKeySelfTest.java index 2b54f6b..d5f084d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/binary/GridBinaryAffinityKeySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/binary/GridBinaryAffinityKeySelfTest.java @@ -178,7 +178,7 @@ public class GridBinaryAffinityKeySelfTest extends GridCommonAbstractTest { for (int i = 0; i < 1000; i++) { nodeId.set(null); - grid(0).compute().affinityRun(null, new TestObject(i), new IgniteRunnable() { + grid(0).compute().affinityRun((String)null, new TestObject(i), new IgniteRunnable() { @IgniteInstanceResource private Ignite ignite; @@ -189,7 +189,7 @@ public class GridBinaryAffinityKeySelfTest extends GridCommonAbstractTest { assertEquals(aff.mapKeyToNode(i).id(), nodeId.get()); - grid(0).compute().affinityRun(null, new AffinityKey(0, i), new IgniteRunnable() { + grid(0).compute().affinityRun((String)null, new AffinityKey(0, i), new IgniteRunnable() { @IgniteInstanceResource private Ignite ignite; @@ -211,7 +211,7 @@ public class GridBinaryAffinityKeySelfTest extends GridCommonAbstractTest { for (int i = 0; i < 1000; i++) { nodeId.set(null); - grid(0).compute().affinityCall(null, new TestObject(i), new IgniteCallable<Object>() { + grid(0).compute().affinityCall((String)null, new TestObject(i), new IgniteCallable<Object>() { @IgniteInstanceResource private Ignite ignite; http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractUsersAffinityMapperSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractUsersAffinityMapperSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractUsersAffinityMapperSelfTest.java index c84a2d0..706d8aa 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractUsersAffinityMapperSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractUsersAffinityMapperSelfTest.java @@ -121,7 +121,7 @@ public abstract class GridCacheAbstractUsersAffinityMapperSelfTest extends GridC startGrid(1); for (int i = 0; i < KEY_CNT; i++) - grid(i % 2).compute().affinityRun(null, new TestAffinityKey(1, "1"), new NoopClosure()); + grid(i % 2).compute().affinityRun((String)null, new TestAffinityKey(1, "1"), new NoopClosure()); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartStopConcurrentTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartStopConcurrentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartStopConcurrentTest.java index 084be02..f953c47 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartStopConcurrentTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartStopConcurrentTest.java @@ -88,7 +88,7 @@ public class IgniteDynamicCacheStartStopConcurrentTest extends GridCommonAbstrac checkTopologyVersion(new AffinityTopologyVersion(NODES, minorVer)); - ignite(0).compute().affinityRun(null, 1, new IgniteRunnable() { + ignite(0).compute().affinityRun((String)null, 1, new IgniteRunnable() { @Override public void run() { // No-op. } http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/core/src/test/java/org/apache/ignite/spi/failover/GridFailoverTestContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/failover/GridFailoverTestContext.java b/modules/core/src/test/java/org/apache/ignite/spi/failover/GridFailoverTestContext.java index a8a2edf..97a3e0b 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/failover/GridFailoverTestContext.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/failover/GridFailoverTestContext.java @@ -22,6 +22,7 @@ import java.util.Random; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.compute.ComputeJobResult; import org.apache.ignite.compute.ComputeTaskSession; +import org.jetbrains.annotations.Nullable; /** * Failover test context. @@ -74,6 +75,11 @@ public class GridFailoverTestContext implements FailoverContext { } /** {@inheritDoc} */ + @Nullable @Override public int partition() { + return -1; + } + + /** {@inheritDoc} */ @Override public String affinityCacheName() { return null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunAbstractTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunAbstractTest.java new file mode 100644 index 0000000..28d297d --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunAbstractTest.java @@ -0,0 +1,412 @@ +package org.apache.ignite.internal.processors.cache; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.affinity.AffinityFunctionContext; +import org.apache.ignite.cache.affinity.AffinityKeyMapped; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.cache.query.annotations.QuerySqlField; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.binary.BinaryMarshaller; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.failover.always.AlwaysFailoverSpi; +import org.apache.ignite.testframework.GridTestUtils; + +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; + +/** + */ +public class IgniteCacheLockPartitionOnAffinityRunAbstractTest extends GridCacheAbstractSelfTest { + /** Count of affinity run threads. */ + protected static final int AFFINITY_THREADS_CNT = 10; + + /** Count of collocated objects. */ + protected static final int PERS_AT_ORG_CNT = 10_000; + + /** Name of the cache with special affinity function (all partition are placed on the first node). */ + protected static final String OTHER_CACHE_NAME = "otherCache"; + + /** Grid count. */ + protected static final int GRID_CNT = 4; + + /** Count of restarted nodes. */ + protected static final int RESTARTED_NODE_CNT = 2; + + /** Count of objects. */ + protected static final int ORGS_COUNT_PER_NODE = 2; + + /** Test duration. */ + protected static final long TEST_DURATION = 5 * 60_000; + + /** Test timeout. */ + protected static final long TEST_TIMEOUT = TEST_DURATION + 2 * 60_000; + + /** Timeout between restart of a node. */ + protected static final long RESTART_TIMEOUT = 3_000; + + /** Max failover attempts. */ + protected static final int MAX_FAILOVER_ATTEMPTS = 100; + + /** Organization ids. */ + protected static List<Integer> orgIds; + + /** Test end time. */ + protected static long endTime; + + /** Node restart thread future. */ + protected static IgniteInternalFuture<?> nodeRestartFut; + + /** Stop a test flag . */ + protected final AtomicBoolean stopRestartThread = new AtomicBoolean(); + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return TEST_TIMEOUT; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1); + + cfg.setMarshaller(new BinaryMarshaller()); + + AlwaysFailoverSpi failSpi = new AlwaysFailoverSpi(); + failSpi.setMaximumFailoverAttempts(MAX_FAILOVER_ATTEMPTS); + cfg.setFailoverSpi(failSpi); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected Class<?>[] indexedTypes() { + return new Class<?>[] { + Integer.class, Organization.class, + Person.Key.class, Person.class, + Integer.class, Integer.class + }; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return ATOMIC; + } + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return GRID_CNT; + } + + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + info("Fill caches begin..."); + + fillCaches(); + + info("Caches are filled"); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + grid(0).destroyCache(Organization.class.getSimpleName()); + grid(0).destroyCache(Person.class.getSimpleName()); + grid(0).destroyCache(OTHER_CACHE_NAME); + super.afterTestsStopped(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopRestartThread.set(true); + if (nodeRestartFut != null) { + nodeRestartFut.get(); + nodeRestartFut = null; + } + + Thread.sleep(3_000); + + awaitPartitionMapExchange(); + + super.afterTest(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + endTime = System.currentTimeMillis() + TEST_DURATION; + + super.beforeTest(); + } + + /** + * @param cacheName Cache name. + * @throws Exception If failed. + */ + private void createCacheWithAffinity(String cacheName) throws Exception { + CacheConfiguration ccfg = cacheConfiguration(grid(0).name()); + ccfg.setName(cacheName); + + ccfg.setAffinity(new DummyAffinity()); + + grid(0).createCache(ccfg); + } + + /** + * @throws Exception If failed. + */ + private void fillCaches() throws Exception { + grid(0).createCache(Organization.class.getSimpleName()); + grid(0).createCache(Person.class.getSimpleName()); + + createCacheWithAffinity(OTHER_CACHE_NAME); + + awaitPartitionMapExchange(); + + orgIds = new ArrayList<>(ORGS_COUNT_PER_NODE * RESTARTED_NODE_CNT); + + for (int i = GRID_CNT - RESTARTED_NODE_CNT; i < GRID_CNT; ++i) + orgIds.addAll(primaryKeys(grid(i).cache(Organization.class.getSimpleName()), ORGS_COUNT_PER_NODE)); + + try ( + IgniteDataStreamer<Integer, Organization> orgStreamer = + grid(0).dataStreamer(Organization.class.getSimpleName()); + IgniteDataStreamer<Person.Key, Person> persStreamer = + grid(0).dataStreamer(Person.class.getSimpleName())) { + + int persId = 0; + for (int orgId : orgIds) { + Organization org = new Organization(orgId); + orgStreamer.addData(orgId, org); + + for (int persCnt = 0; persCnt < PERS_AT_ORG_CNT; ++persCnt, ++persId) { + Person pers = new Person(persId, orgId); + persStreamer.addData(pers.createKey(), pers); + } + } + } + + awaitPartitionMapExchange(); + } + + /** + * + */ + protected void beginNodesRestart() { + stopRestartThread.set(false); + nodeRestartFut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + int restartGrid = GRID_CNT - RESTARTED_NODE_CNT; + while (!stopRestartThread.get() && System.currentTimeMillis() < endTime) { + log.info("Restart grid: " + restartGrid); + stopGrid(restartGrid); + Thread.sleep(500); + startGrid(restartGrid); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return !stopRestartThread.get(); + } + }, RESTART_TIMEOUT); + + restartGrid++; + if (restartGrid >= GRID_CNT) + restartGrid = GRID_CNT - RESTARTED_NODE_CNT; + awaitPartitionMapExchange(); + } + return null; + } + }, "restart-node"); + } + + /** + * @param ignite Ignite. + * @param orgId Org id. + * @param expReservations Expected reservations. + * @throws Exception If failed. + */ + protected static void checkPartitionsReservations(final IgniteEx ignite, int orgId, + final int expReservations) throws Exception { + int part = ignite.affinity(Organization.class.getSimpleName()).partition(orgId); + + final GridDhtLocalPartition pPers = ignite.context().cache() + .internalCache(Person.class.getSimpleName()).context().topology() + .localPartition(part, AffinityTopologyVersion.NONE, false); + + assertNotNull(pPers); + + final GridDhtLocalPartition pOrgs = ignite.context().cache() + .internalCache(Organization.class.getSimpleName()).context().topology() + .localPartition(part, AffinityTopologyVersion.NONE, false); + + assertNotNull(pOrgs); + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return expReservations == pOrgs.reservations() && expReservations == pPers.reservations(); + } + }, 1000L); + assertEquals("Unexpected reservations count", expReservations, pOrgs.reservations()); + assertEquals("Unexpected reservations count", expReservations, pPers.reservations()); + } + + /** */ + private static class DummyAffinity extends RendezvousAffinityFunction { + /** + * Default constructor. + */ + public DummyAffinity() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext affCtx) { + List<ClusterNode> nodes = affCtx.currentTopologySnapshot(); + + List<List<ClusterNode>> assign = new ArrayList<>(partitions()); + + for (int i = 0; i < partitions(); ++i) + assign.add(Collections.singletonList(nodes.get(0))); + + return assign; + } + } + + + /** + * Test class Organization. + */ + public static class Organization implements Serializable { + /** */ + @QuerySqlField(index = true) + private final int id; + + /** + * @param id ID. + */ + Organization(int id) { + this.id = id; + } + + /** + * @return id. + */ + int getId() { + return id; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(Organization.class, this); + } + } + + /** + * Test class Organization. + */ + public static class Person implements Serializable { + /** */ + @QuerySqlField + private final int id; + + /** */ + @QuerySqlField(index = true) + private final int orgId; + + /** + * @param id ID. + * @param orgId Organization ID. + */ + Person(int id, int orgId) { + this.id = id; + this.orgId = orgId; + } + + /** + * @return id. + */ + int getId() { + return id; + } + + /** + * @return organization id. + */ + int getOrgId() { + return orgId; + } + + /** + * @return Affinity key. + */ + public Person.Key createKey() { + return new Person.Key(id, orgId); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(Person.class, this); + } + + /** + * + */ + static class Key implements Serializable { + /** Id. */ + private final int id; + + /** Org id. */ + @AffinityKeyMapped + protected final int orgId; + + /** + * @param id Id. + * @param orgId Org id. + */ + private Key(int id, int orgId) { + this.id = id; + this.orgId = orgId; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + Person.Key key = (Person.Key)o; + + return id == key.id && orgId == key.orgId; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = id; + res = 31 * res + orgId; + return res; + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/01800101/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunAtomicCacheOpTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunAtomicCacheOpTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunAtomicCacheOpTest.java new file mode 100644 index 0000000..fb90c7e --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheLockPartitionOnAffinityRunAtomicCacheOpTest.java @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.lang.IgniteRunnable; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.resources.LoggerResource; +import org.apache.ignite.testframework.GridTestUtils; + +/** + * Test to validate https://issues.apache.org/jira/browse/IGNITE-2310 + */ +public class IgniteCacheLockPartitionOnAffinityRunAtomicCacheOpTest extends IgniteCacheLockPartitionOnAffinityRunAbstractTest { + /** Atomic cache. */ + private static final String ATOMIC_CACHE = "atomic"; + /** Transact cache. */ + private static final String TRANSACT_CACHE = "transact"; + /** Transact cache. */ + private static final long TEST_TIMEOUT = 10 * 60_000; + /** Keys count. */ + private static int KEYS_CNT = 100; + /** Keys count. */ + private static int PARTS_CNT = 16; + /** Key. */ + private static AtomicInteger key = new AtomicInteger(0); + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return TEST_TIMEOUT; + } + + /** {@inheritDoc} */ + @Override protected void beginNodesRestart() { + stopRestartThread.set(false); + nodeRestartFut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + while (!stopRestartThread.get() && System.currentTimeMillis() < endTime) { + log.info("Restart nodes"); + for (int i = GRID_CNT - RESTARTED_NODE_CNT; i < GRID_CNT; ++i) + stopGrid(i); + Thread.sleep(500); + for (int i = GRID_CNT - RESTARTED_NODE_CNT; i < GRID_CNT; ++i) + startGrid(i); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return !stopRestartThread.get(); + } + }, RESTART_TIMEOUT); + } + return null; + } + }, "restart-node"); + } + + /** {@inheritDoc} */ + @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception { + CacheConfiguration ccfg = super.cacheConfiguration(gridName); + ccfg.setBackups(0); + + return ccfg; + } + + /** + * @param cacheName Cache name. + * @param mode Atomicity mode. + * @throws Exception If failed. + */ + private void createCache(String cacheName, CacheAtomicityMode mode) throws Exception { + CacheConfiguration ccfg = cacheConfiguration(grid(0).name()); + ccfg.setName(cacheName); + + ccfg.setAtomicityMode(mode); + + ccfg.setAffinity(new RendezvousAffinityFunction(false, PARTS_CNT)); + + grid(0).createCache(ccfg); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + key.set(0); + createCache(ATOMIC_CACHE, CacheAtomicityMode.ATOMIC); + createCache(TRANSACT_CACHE, CacheAtomicityMode.TRANSACTIONAL); + + awaitPartitionMapExchange(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + grid(0).destroyCache(ATOMIC_CACHE); + grid(0).destroyCache(TRANSACT_CACHE); + + super.afterTest(); + } + + /** + * @throws Exception If failed. + */ + public void testNotReservedAtomicCacheOp() throws Exception { + notReservedCacheOp(ATOMIC_CACHE); + } + + /** + * @throws Exception If failed. + */ + public void testNotReservedTxCacheOp() throws Exception { + notReservedCacheOp(TRANSACT_CACHE); + } + + /** + * @param cacheName Cache name. + * @throws Exception If failed. + */ + private void notReservedCacheOp(final String cacheName) throws Exception { + // Workaround for initial update job metadata. + grid(0).compute().affinityRun( + Arrays.asList(Person.class.getSimpleName(), Organization.class.getSimpleName()), + new Integer(orgIds.get(0)), + new NotReservedCacheOpAffinityRun(0, 0, cacheName)); + + // Run restart threads: start re-balancing + beginNodesRestart(); + + grid(0).cache(cacheName).clear(); + + IgniteInternalFuture<Long> affFut = null; + try { + affFut = GridTestUtils.runMultiThreadedAsync(new Runnable() { + @Override public void run() { + for (int i = 0; i < PARTS_CNT; ++i) { + grid(0).compute().affinityRun( + Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()), + new Integer(i), + new NotReservedCacheOpAffinityRun(i, key.getAndIncrement() * KEYS_CNT, cacheName)); + } + } + }, AFFINITY_THREADS_CNT, "affinity-run"); + } + finally { + if (affFut != null) + affFut.get(); + + stopRestartThread.set(true); + nodeRestartFut.get(); + + Thread.sleep(5000); + + log.info("Final await. Timed out if failed"); + awaitPartitionMapExchange(); + + IgniteCache cache = grid(0).cache(cacheName); + cache.clear(); + } + } + + /** + * @throws Exception If failed. + */ + public void testReservedPartitionCacheOp() throws Exception { + // Workaround for initial update job metadata. + grid(0).cache(Person.class.getSimpleName()).clear(); + grid(0).compute().affinityRun( + Arrays.asList(Person.class.getSimpleName(), Organization.class.getSimpleName()), + 0, + new ReservedPartitionCacheOpAffinityRun(0, 0)); + + // Run restart threads: start re-balancing + beginNodesRestart(); + + IgniteInternalFuture<Long> affFut = null; + try { + affFut = GridTestUtils.runMultiThreadedAsync(new Runnable() { + @Override public void run() { + for (int i = 0; i < PARTS_CNT; ++i) { + if (System.currentTimeMillis() >= endTime) + break; + + grid(0).compute().affinityRun( + Arrays.asList(Organization.class.getSimpleName(), Person.class.getSimpleName()), + new Integer(i), + new ReservedPartitionCacheOpAffinityRun(i, key.getAndIncrement() * KEYS_CNT)); + } + } + }, AFFINITY_THREADS_CNT, "affinity-run"); + } + finally { + if (affFut != null) + affFut.get(); + + stopRestartThread.set(true); + nodeRestartFut.get(); + + Thread.sleep(5000); + + log.info("Final await. Timed out if failed"); + awaitPartitionMapExchange(); + + IgniteCache cache = grid(0).cache(Person.class.getSimpleName()); + cache.clear(); + } + } + + /** */ + private static class NotReservedCacheOpAffinityRun implements IgniteRunnable { + /** Org id. */ + int orgId; + + /** Begin of key. */ + int keyBegin; + + /** Cache name. */ + private String cacheName; + + /** */ + @IgniteInstanceResource + private IgniteEx ignite; + + /** */ + @LoggerResource + private IgniteLogger log; + + /** */ + public NotReservedCacheOpAffinityRun() { + // No-op. + } + + /** + * @param orgId Organization. + * @param keyBegin Begin key value. + * @param cacheName Cache name. + */ + public NotReservedCacheOpAffinityRun(int orgId, int keyBegin, String cacheName) { + this.orgId = orgId; + this.keyBegin = keyBegin; + this.cacheName = cacheName; + } + + /** {@inheritDoc} */ + @Override public void run() { + log.info("Begin run " + keyBegin); + IgniteCache cache = ignite.cache(cacheName); + Map<Integer, Integer> vals = new HashMap<>(); + + for (int i = 0; i < KEYS_CNT; ++i) + cache.put(i + keyBegin, i + keyBegin); +// vals.put(i + keyBegin, i + keyBegin); + +// cache.putAll(vals); + log.info("End run " + keyBegin); + } + } + + /** */ + private static class ReservedPartitionCacheOpAffinityRun implements IgniteRunnable { + /** Org id. */ + int orgId; + + /** Begin of key. */ + int keyBegin; + + /** */ + @IgniteInstanceResource + private IgniteEx ignite; + + /** */ + @LoggerResource + private IgniteLogger log; + + /** */ + public ReservedPartitionCacheOpAffinityRun() { + // No-op. + } + + /** + * @param orgId Organization Id. + * @param keyBegin Begin key value; + */ + public ReservedPartitionCacheOpAffinityRun(int orgId, int keyBegin) { + this.orgId = orgId; + this.keyBegin = keyBegin; + } + + /** {@inheritDoc} */ + @Override public void run() { + log.info("Begin run " + keyBegin); + IgniteCache cache = ignite.cache(Person.class.getSimpleName()); + Map<Person.Key, Person> pers = new HashMap<>(); + + for (int i = 0; i < KEYS_CNT; ++i) { + Person p = new Person(i + keyBegin, orgId); +// pers.put(p.createKey(), p); + cache.put(p.createKey(), p); + } + +// cache.putAll(pers); + } + } +} \ No newline at end of file
