http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index f05fdf7,cd4c55c..0309fa7 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@@ -403,7 -400,7 +404,8 @@@ public abstract class IgniteTxLocalAdap boolean skipVals, boolean needVer, boolean keepBinary, + boolean recovery, + final ExpiryPolicy expiryPlc, final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c ) { assert cacheCtx.isLocal() : cacheCtx.name(); @@@ -431,9 -430,11 +435,9 @@@ continue; try { - T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned( + EntryGetResult res = entry.innerGetVersioned( null, this, - /*readSwap*/true, - /*unmarshal*/true, /*update-metrics*/!skipVals, /*event*/!skipVals, CU.subjectId(this, cctx), @@@ -1246,9 -1222,11 +1254,9 @@@ F.first(txEntry.entryProcessors()) : null; if (needVer) { - T2<CacheObject, GridCacheVersion> res = txEntry.cached().innerGetVersioned( + EntryGetResult res = txEntry.cached().innerGetVersioned( null, this, - /*swap*/true, - /*unmarshal*/true, /*update-metrics*/true, /*event*/!skipVals, CU.subjectId(this, cctx), @@@ -1499,7 -1486,7 +1511,8 @@@ skipVals, needReadVer, !deserializeBinary, + recovery, + expiryPlc, new GridInClosure3<KeyCacheObject, Object, GridCacheVersion>() { @Override public void apply(KeyCacheObject key, Object val, GridCacheVersion loadVer) { if (isRollbackOnly()) { @@@ -1675,10 -1665,12 +1692,10 @@@ F.first(txEntry.entryProcessors()) : null; if (needVer) { - T2<CacheObject, GridCacheVersion> res = cached.innerGetVersioned( + EntryGetResult res = cached.innerGetVersioned( null, IgniteTxLocalAdapter.this, - /*swap*/cacheCtx.isSwapOrOffheapEnabled(), - /*unmarshal*/true, - /*update-metrics*/true, + /**update-metrics*/true, /*event*/!skipVals, CU.subjectId(IgniteTxLocalAdapter.this, cctx), transformClo, @@@ -2041,7 -2035,7 +2061,8 @@@ /*read through*/(entryProcessor != null || cacheCtx.config().isLoadPreviousValue()) && !skipStore, retval, keepBinary, - expiryPlc); ++ expiryPlc, + recovery); } return new GridFinishedFuture<>(); @@@ -2212,7 -2205,7 +2233,8 @@@ /*read through*/(invokeMap != null || cacheCtx.config().isLoadPreviousValue()) && !skipStore, retval, keepBinary, - expiryPlc); ++ expiryPlc, + recovery); } return new GridFinishedFuture<>(); @@@ -2232,8 -2225,7 +2254,9 @@@ * @param hasFilters {@code True} if filters not empty. * @param readThrough Read through flag. * @param retval Return value flag. + * @param keepBinary Keep binary flag. + * @param expiryPlc Expiry policy. + * @param recovery Recovery flag. * @return Load future. */ private IgniteInternalFuture<Void> loadMissing( @@@ -2248,8 -2240,7 +2271,9 @@@ final boolean readThrough, final boolean retval, final boolean keepBinary, - final ExpiryPolicy expiryPlc) { ++ final ExpiryPolicy expiryPlc, + final boolean recovery + ) { GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c = new GridInClosure3<KeyCacheObject, Object, GridCacheVersion>() { @Override public void apply(KeyCacheObject key, @@@ -2323,7 -2314,7 +2347,8 @@@ /*skipVals*/singleRmv, needReadVer, keepBinary, + recovery, + expiryPlc, c); }
http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java index d6b09e4,f5687a0..cf1e7e2 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java @@@ -192,6 -194,6 +194,7 @@@ public interface IgniteTxLocalEx extend boolean skipVals, boolean needVer, boolean keepBinary, + boolean recovery, + final ExpiryPolicy expiryPlc, GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c); } http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java index 124cb4b,d1c8b2d..a32b9d8 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java @@@ -121,8 -139,27 +139,27 @@@ public class ClusterProcessor extends G } } + + /** + * @param vals collection to seek through. + */ + private Boolean findLastFlag(Collection<Serializable> vals) { + Boolean flag = null; + + for (Serializable ser : vals) { + if (ser != null) { + Map<String, Object> map = (Map<String, Object>) ser; + + if (map.containsKey(ATTR_UPDATE_NOTIFIER_STATUS)) + flag = (Boolean) map.get(ATTR_UPDATE_NOTIFIER_STATUS); + } + } + + return flag; + } + /** {@inheritDoc} */ - @Override public void onKernalStart() throws IgniteCheckedException { + @Override public void onKernalStart(boolean activeOnStart) throws IgniteCheckedException { if (notifyEnabled.get()) { try { verChecker = new GridUpdateNotifier(ctx.gridName(), http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java index 87d54a1,0000000..247d469 mode 100644,000000..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java @@@ -1,947 -1,0 +1,955 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cluster; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteCompute; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.events.Event; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.cluster.ClusterGroupAdapter; +import org.apache.ignite.internal.managers.discovery.CustomEventListener; +import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; +import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager; +import org.apache.ignite.internal.processors.GridProcessorAdapter; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.ClusterState; +import org.apache.ignite.internal.processors.cache.ChangeGlobalStateMessage; +import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch; +import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest; +import org.apache.ignite.internal.processors.cache.GridCacheProcessor; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.GridChangeGlobalStateMessageResponse; +import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.CI1; +import org.apache.ignite.internal.util.typedef.CI2; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.lang.IgniteRunnable; +import org.apache.ignite.resources.IgniteInstanceResource; ++import org.apache.ignite.spi.discovery.DiscoveryDataBag; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; +import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; +import static org.apache.ignite.internal.processors.cache.ClusterState.ACTIVE; +import static org.apache.ignite.internal.processors.cache.ClusterState.INACTIVE; +import static org.apache.ignite.internal.processors.cache.ClusterState.TRANSITION; + +/** + * + */ +public class GridClusterStateProcessor extends GridProcessorAdapter { + /** Global status. */ + private volatile ClusterState globalState; + + /** Action context. */ + private volatile ChangeGlobalStateContext lastCgsCtx; + + /** Local action future. */ + private final AtomicReference<GridChangeGlobalStateFuture> cgsLocFut = new AtomicReference<>(); + + /** Process. */ + @GridToStringExclude + private GridCacheProcessor cacheProc; + + /** Shared context. */ + @GridToStringExclude + private GridCacheSharedContext<?, ?> sharedCtx; + + //todo may be add init latch + + /** Listener. */ + private final GridLocalEventListener lsr = new GridLocalEventListener() { + @Override public void onEvent(Event evt) { + assert evt != null; + + final DiscoveryEvent e = (DiscoveryEvent)evt; + + assert e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED : this; + + final GridChangeGlobalStateFuture f = cgsLocFut.get(); + + if (f != null) + f.initFut.listen(new CI1<IgniteInternalFuture>() { + @Override public void apply(IgniteInternalFuture fut) { + f.onDiscoveryEvent(e); + } + }); + } + }; + + /** + * @param ctx Kernal context. + */ + public GridClusterStateProcessor(GridKernalContext ctx) { + super(ctx); + } + + /** {@inheritDoc} */ + @Override public void start(boolean activeOnStart) throws IgniteCheckedException { + super.start(activeOnStart); + + globalState = activeOnStart ? ACTIVE : INACTIVE; + cacheProc = ctx.cache(); + sharedCtx = cacheProc.context(); + + sharedCtx.io().addHandler(0, + GridChangeGlobalStateMessageResponse.class, + new CI2<UUID, GridChangeGlobalStateMessageResponse>() { + @Override public void apply(UUID nodeId, GridChangeGlobalStateMessageResponse msg) { + processChangeGlobalStateResponse(nodeId, msg); + } + }); + + ctx.discovery().setCustomEventListener( + ChangeGlobalStateMessage.class, new CustomEventListener<ChangeGlobalStateMessage>() { + @Override public void onCustomEvent( + AffinityTopologyVersion topVer, ClusterNode snd, ChangeGlobalStateMessage msg) { + assert topVer != null; + assert snd != null; + assert msg != null; + + boolean activate = msg.activate(); + + ChangeGlobalStateContext actx = lastCgsCtx; + + if (actx != null && globalState == TRANSITION) { + GridChangeGlobalStateFuture f = cgsLocFut.get(); + + if (log.isDebugEnabled()) + log.debug("Concurrent " + prettyStr(activate) + " [id=" + + ctx.localNodeId() + " topVer=" + topVer + " actx=" + actx + ", msg=" + msg + "]"); + + if (f != null && f.requestId.equals(msg.requestId())) + f.onDone(new IgniteCheckedException( + "Concurrent change state, now in progress=" + (activate) + + ", initiatingNodeId=" + actx.initiatingNodeId + + ", you try=" + (prettyStr(activate)) + ", locNodeId=" + ctx.localNodeId() + )); + + msg.concurrentChangeState(); + } + else { + if (log.isInfoEnabled()) + log.info("Create " + prettyStr(activate) + " context [id=" + + ctx.localNodeId() + " topVer=" + topVer + ", reqId=" + + msg.requestId() + ", initiatingNodeId=" + msg.initiatorNodeId() + "]"); + + lastCgsCtx = new ChangeGlobalStateContext( + msg.requestId(), + msg.initiatorNodeId(), + msg.getDynamicCacheChangeBatch(), + msg.activate()); + + globalState = TRANSITION; + } + } + }); + + ctx.event().addLocalEventListener(lsr, EVT_NODE_LEFT, EVT_NODE_FAILED); + } + + /** {@inheritDoc} */ + @Override public void stop(boolean cancel) throws IgniteCheckedException { + super.stop(cancel); + + sharedCtx.io().removeHandler(0, GridChangeGlobalStateMessageResponse.class); + ctx.event().removeLocalEventListener(lsr, EVT_NODE_LEFT, EVT_NODE_FAILED); + + IgniteCheckedException stopErr = new IgniteInterruptedCheckedException( + "Node is stopping: " + ctx.gridName()); + + GridChangeGlobalStateFuture f = cgsLocFut.get(); + + if (f != null) + f.onDone(stopErr); + + cgsLocFut.set(null); + } + + /** {@inheritDoc} */ + @Nullable @Override public DiscoveryDataExchangeType discoveryDataType() { + return DiscoveryDataExchangeType.STATE_PROC; + } + + /** {@inheritDoc} */ - @Nullable @Override public Serializable collectDiscoveryData(UUID nodeId) { - return globalState; ++ @Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) { ++ dataBag.addJoiningNodeData(DiscoveryDataExchangeType.STATE_PROC.ordinal(), globalState); + } + + /** {@inheritDoc} */ - @Override public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, Serializable data) { - if (ctx.localNodeId().equals(joiningNodeId)) - globalState = (ClusterState)data; ++ @Override public void collectGridNodeData(DiscoveryDataBag dataBag) { ++ dataBag.addGridCommonData(DiscoveryDataExchangeType.STATE_PROC.ordinal(), globalState); ++ } ++ ++ /** {@inheritDoc} */ ++ @Override public void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) { ++ globalState = (ClusterState)data.commonData(); ++ } ++ ++ /** {@inheritDoc} */ ++ @Override public void onJoiningNodeDataReceived(DiscoveryDataBag.JoiningNodeDiscoveryData data) { ++ // No-op. + } + + /** + * + */ + public IgniteInternalFuture<?> changeGlobalState(final boolean activate) { + if (cacheProc.transactions().tx() != null || sharedCtx.lockedTopologyVersion(null) != null) + throw new IgniteException("Cannot " + prettyStr(activate) + " cluster, because cache locked on transaction."); + + if ((this.globalState == ACTIVE && activate) || (this.globalState == INACTIVE && !activate)) + return new GridFinishedFuture<>(); + + final UUID requestId = UUID.randomUUID(); + + final GridChangeGlobalStateFuture cgsFut = new GridChangeGlobalStateFuture(requestId, activate, ctx); + + if (!cgsLocFut.compareAndSet(null, cgsFut)) { + GridChangeGlobalStateFuture locF = cgsLocFut.get(); + + if (locF.activate == activate) + return locF; + else + return new GridFinishedFuture<>(new IgniteException( + "fail " + prettyStr(activate) + ", because now in progress" + prettyStr(locF.activate))); + } + + try { + if (ctx.clientNode()) { + AffinityTopologyVersion topVer = ctx.discovery().topologyVersionEx(); + + IgniteCompute comp = ((ClusterGroupAdapter)ctx.cluster().get().forServers()) + .compute().withAsync(); + + if (log.isInfoEnabled()) + log.info("Send " + prettyStr(activate) + " request from client node [id=" + + ctx.localNodeId() + " topVer=" + topVer + " ]"); + + comp.run(new ClientChangeGlobalStateComputeRequest(activate)); + + comp.future().listen(new CI1<IgniteFuture>() { + @Override public void apply(IgniteFuture fut) { + try { + fut.get(); + + cgsFut.onDone(); + } + catch (Exception e) { + cgsFut.onDone(e); + } + } + }); + } + else { + List<DynamicCacheChangeRequest> reqs = new ArrayList<>(); + + DynamicCacheChangeRequest changeGlobalStateReq = new DynamicCacheChangeRequest( + requestId, null, ctx.localNodeId()); + + changeGlobalStateReq.state(activate ? ACTIVE : INACTIVE); + + reqs.add(changeGlobalStateReq); + + reqs.addAll(activate ? cacheProc.startAllCachesRequests() : cacheProc.stopAllCachesRequests()); + + ChangeGlobalStateMessage changeGlobalStateMsg = new ChangeGlobalStateMessage( + requestId, ctx.localNodeId(), activate, new DynamicCacheChangeBatch(reqs)); + + try { + ctx.discovery().sendCustomEvent(changeGlobalStateMsg); + + if (ctx.isStopping()) + cgsFut.onDone(new IgniteCheckedException("Failed to execute " + prettyStr(activate) + " request, " + + "node is stopping.")); + } + catch (IgniteCheckedException e) { + log.error("Fail create or send change global state request." + cgsFut, e); + + cgsFut.onDone(e); + } + } + } + catch (IgniteCheckedException e) { + log.error("Fail create or send change global state request." + cgsFut, e); + + cgsFut.onDone(e); + } + + return cgsFut; + } + + /** + * + */ + public boolean active() { + ChangeGlobalStateContext actx = lastCgsCtx; + + if (actx != null && !actx.activate && globalState == TRANSITION) + return true; + + if (actx != null && actx.activate && globalState == TRANSITION) + return false; + + return globalState == ACTIVE; + } + + /** + * @param reqs Requests. + */ + public boolean changeGlobalState( + Collection<DynamicCacheChangeRequest> reqs, + AffinityTopologyVersion topVer + ) { + assert !F.isEmpty(reqs); + assert topVer != null; + + for (DynamicCacheChangeRequest req : reqs) + if (req.globalStateChange()) { + ChangeGlobalStateContext cgsCtx = lastCgsCtx; + + assert cgsCtx != null : "reqs: " + Arrays.toString(reqs.toArray()); + + cgsCtx.topologyVersion(topVer); + + return true; + } + + + return false; + } + + /** + * Invoke from exchange future. + */ + public Exception onChangeGlobalState() { + GridChangeGlobalStateFuture f = cgsLocFut.get(); + + ChangeGlobalStateContext cgsCtx = lastCgsCtx; + + assert cgsCtx != null; + + if (f != null) + f.setRemaining(cgsCtx.topVer); + + return cgsCtx.activate ? onActivate(cgsCtx) : onDeActivate(cgsCtx); + } + + /** + * @param exs Exs. + */ + public void onFullResponseMessage(Map<UUID, Exception> exs) { + assert !F.isEmpty(exs); + + ChangeGlobalStateContext actx = lastCgsCtx; + + actx.setFail(); + + // revert change if activation request fail + if (actx.activate) { + try { + cacheProc.onKernalStopCaches(true); + + cacheProc.stopCaches(true); + + sharedCtx.affinity().removeAllCacheInfo(); + + if (!ctx.clientNode()) { + sharedCtx.database().onDeActivate(ctx); + + if (sharedCtx.pageStore() != null) + sharedCtx.pageStore().onDeActivate(ctx); + + if (sharedCtx.wal() != null) + sharedCtx.wal().onDeActivate(ctx); + } + } + catch (Exception e) { + for (Map.Entry<UUID, Exception> entry : exs.entrySet()) + e.addSuppressed(entry.getValue()); + + log.error("Fail while revert activation request changes", e); + } + } + else { + //todo revert change if deactivate request fail + } + + globalState = actx.activate ? INACTIVE : ACTIVE; + + GridChangeGlobalStateFuture af = cgsLocFut.get(); + + if (af != null && af.requestId.equals(actx.requestId)) { + IgniteCheckedException e = new IgniteCheckedException("see suppressed"); + + for (Map.Entry<UUID, Exception> entry : exs.entrySet()) + e.addSuppressed(entry.getValue()); + + af.onDone(e); + } + } + + /** + * + */ + private Exception onActivate(ChangeGlobalStateContext cgsCtx) { + final boolean client = ctx.clientNode(); + + if (log.isInfoEnabled()) + log.info("Start activation process [nodeId=" + this.ctx.localNodeId() + ", client=" + client + + ", topVer=" + cgsCtx.topVer + "]"); + + Collection<CacheConfiguration> cfgs = new ArrayList<>(); + + for (DynamicCacheChangeRequest req : cgsCtx.batch.requests()) { + if (req.startCacheConfiguration() != null) + cfgs.add(req.startCacheConfiguration()); + } + + try { + if (!client) { + sharedCtx.database().lock(); + + IgnitePageStoreManager pageStore = sharedCtx.pageStore(); + + if (pageStore != null) + pageStore.onActivate(ctx); + + if (sharedCtx.wal() != null) + sharedCtx.wal().onActivate(ctx); + + sharedCtx.database().initDataBase(); + + for (CacheConfiguration cfg : cfgs) { + if (CU.isSystemCache(cfg.getName())) + if (pageStore != null) + pageStore.initializeForCache(cfg); + } + + for (CacheConfiguration cfg : cfgs) { + if (!CU.isSystemCache(cfg.getName())) + if (pageStore != null) + pageStore.initializeForCache(cfg); + } + + sharedCtx.database().onActivate(ctx); + } + + if (log.isInfoEnabled()) + log.info("Success activate wal, dataBase, pageStore [nodeId=" + + ctx.localNodeId() + ", client=" + client + ", topVer=" + cgsCtx.topVer + "]"); + + return null; + } + catch (Exception e) { + log.error("Fail activate wal, dataBase, pageStore [nodeId=" + ctx.localNodeId() + ", client=" + client + + ", topVer=" + cgsCtx.topVer + "]", e); + + if (!ctx.clientNode()) + sharedCtx.database().unLock(); + + return e; + } + } + + /** + * + */ + public Exception onDeActivate(ChangeGlobalStateContext cgsCtx) { + final boolean client = ctx.clientNode(); + + if (log.isInfoEnabled()) + log.info("Start deactivate process [id=" + ctx.localNodeId() + ", client=" + + client + ", topVer=" + cgsCtx.topVer + "]"); + + try { + ctx.dataStructures().onDeActivate(ctx); + + ctx.service().onDeActivate(ctx); + + if (log.isInfoEnabled()) + log.info("Success deactivate services, dataStructures, database, pageStore, wal [id=" + ctx.localNodeId() + ", client=" + + client + ", topVer=" + cgsCtx.topVer + "]"); + + return null; + } + catch (Exception e) { + log.error("DeActivation fail [nodeId=" + ctx.localNodeId() + ", client=" + client + + ", topVer=" + cgsCtx.topVer + "]", e); + + return e; + } + finally { + if (!client) + sharedCtx.database().unLock(); + } + } + + /** + * + */ + private void onFinalActivate(final ChangeGlobalStateContext cgsCtx) { + IgniteInternalFuture<?> asyncActivateFut = ctx.closure().runLocalSafe(new Runnable() { + @Override public void run() { + boolean client = ctx.clientNode(); + + Exception e = null; + + try { - ctx.marshallerContext().onMarshallerCacheStarted(ctx); - + if (!ctx.config().isDaemon()) + ctx.cacheObjects().onUtilityCacheStarted(); + + ctx.service().onUtilityCacheStarted(); + + ctx.service().onActivate(ctx); + + ctx.dataStructures().onActivate(ctx); + + if (log.isInfoEnabled()) + log.info("Success final activate [nodeId=" + + ctx.localNodeId() + ", client=" + client + ", topVer=" + cgsCtx.topVer + "]"); + } + catch (Exception ex) { + e = ex; + + log.error("Fail activate finished [nodeId=" + ctx.localNodeId() + ", client=" + client + + ", topVer=" + GridClusterStateProcessor.this.lastCgsCtx.topVer + "]", ex); + } + finally { + globalState = ACTIVE; + + sendChangeGlobalStateResponse(cgsCtx.requestId, cgsCtx.initiatingNodeId, e); + + GridClusterStateProcessor.this.lastCgsCtx = null; + } + } + }); + + cgsCtx.setAsyncActivateFut(asyncActivateFut); + } + + /** + * + */ + public void onFinalDeActivate(ChangeGlobalStateContext cgsCtx) { + final boolean client = ctx.clientNode(); + + if (log.isInfoEnabled()) + log.info("Success final deactivate [nodeId=" + + ctx.localNodeId() + ", client=" + client + ", topVer=" + cgsCtx.topVer + "]"); + + Exception ex = null; + + try { + if (!client) { + sharedCtx.database().onDeActivate(ctx); + + if (sharedCtx.pageStore() != null) + sharedCtx.pageStore().onDeActivate(ctx); + + if (sharedCtx.wal() != null) + sharedCtx.wal().onDeActivate(ctx); + + sharedCtx.affinity().removeAllCacheInfo(); + } + } + catch (Exception e) { + ex = e; + } + finally { + globalState = INACTIVE; + } + + sendChangeGlobalStateResponse(cgsCtx.requestId, cgsCtx.initiatingNodeId, ex); + + this.lastCgsCtx = null; + } + + /** + * + */ + public void onExchangeDone() { + ChangeGlobalStateContext cgsCtx = lastCgsCtx; + + assert cgsCtx != null; + + if (!cgsCtx.isFail()) { + if (cgsCtx.activate) + onFinalActivate(cgsCtx); + else + onFinalDeActivate(cgsCtx); + } + else + lastCgsCtx = null; + } + + /** + * @param initNodeId Initialize node id. + * @param ex Exception. + */ + private void sendChangeGlobalStateResponse(UUID requestId, UUID initNodeId, Exception ex) { + assert requestId != null; + assert initNodeId != null; + + try { + GridChangeGlobalStateMessageResponse actResp = new GridChangeGlobalStateMessageResponse(requestId, ex); + + if (log.isDebugEnabled()) + log.debug("Send change global state response [nodeId=" + ctx.localNodeId() + + ", topVer=" + ctx.discovery().topologyVersionEx() + ", response=" + actResp + "]"); + + if (ctx.localNodeId().equals(initNodeId)) + processChangeGlobalStateResponse(ctx.localNodeId(), actResp); + else + sharedCtx.io().send(initNodeId, actResp, SYSTEM_POOL); + } + catch (IgniteCheckedException e) { + log.error("Fail send change global state response to " + initNodeId, e); + } + } + + /** + * @param msg Message. + */ + private void processChangeGlobalStateResponse(final UUID nodeId, final GridChangeGlobalStateMessageResponse msg) { + assert nodeId != null; + assert msg != null; + + if (log.isDebugEnabled()) + log.debug("Received activation response [requestId=" + msg.getRequestId() + + ", nodeId=" + nodeId + "]"); + + ClusterNode node = ctx.discovery().node(nodeId); + + if (node == null) { + U.warn(log, "Received activation response from unknown node (will ignore) [requestId=" + + msg.getRequestId() + ']'); + + return; + } + + UUID requestId = msg.getRequestId(); + + final GridChangeGlobalStateFuture fut = cgsLocFut.get(); + + if (fut != null && !fut.isDone() && requestId.equals(fut.requestId)) { + fut.initFut.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> f) { + fut.onResponse(nodeId, msg); + } + }); + } + } + + + + /** + * @param activate Activate. + */ + private String prettyStr(boolean activate) { + return activate ? "activate" : "deactivate"; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridClusterStateProcessor.class, this); + } + + /** + * + */ + private static class GridChangeGlobalStateFuture extends GridFutureAdapter { + /** Request id. */ + @GridToStringInclude + private final UUID requestId; + + /** Activate. */ + private final boolean activate; + + /** Nodes. */ + @GridToStringInclude + private final Set<UUID> remaining = new HashSet<>(); + + /** Responses. */ + @GridToStringInclude + private final Map<UUID, GridChangeGlobalStateMessageResponse> resps = new HashMap<>(); + + /** Context. */ + @GridToStringExclude + private final GridKernalContext ctx; + + /** */ + @GridToStringExclude + private final Object mux = new Object(); + + /** */ + @GridToStringInclude + private final GridFutureAdapter initFut = new GridFutureAdapter(); + + /** Grid logger. */ + @GridToStringExclude + private final IgniteLogger log; + + /** + * + */ + public GridChangeGlobalStateFuture(UUID requestId, boolean activate, GridKernalContext ctx) { + this.requestId = requestId; + this.activate = activate; + this.ctx = ctx; + this.log = ctx.log(getClass()); + } + + /** + * @param event Event. + */ + public void onDiscoveryEvent(DiscoveryEvent event) { + assert event != null; + + if (isDone()) + return; + + boolean allReceived = false; + + synchronized (mux) { + if (remaining.remove(event.eventNode().id())) + allReceived = remaining.isEmpty(); + } + + if (allReceived) + onAllReceived(); + } + + /** + * + */ + public void setRemaining(AffinityTopologyVersion topVer) { + Collection<ClusterNode> nodes = ctx.discovery().nodes(topVer); + + List<UUID> ids = new ArrayList<>(nodes.size()); + + for (ClusterNode n : nodes) + ids.add(n.id()); + + if (log.isDebugEnabled()) + log.debug("Setup remaining node [id=" + ctx.localNodeId() + ", client=" + + ctx.clientNode() + ", topVer=" + ctx.discovery().topologyVersionEx() + + ", nodes=" + Arrays.toString(ids.toArray()) + "]"); + + synchronized (mux) { + remaining.addAll(ids); + } + + initFut.onDone(); + } + + /** + * @param msg Activation message response. + */ + public void onResponse(UUID nodeId, GridChangeGlobalStateMessageResponse msg) { + assert msg != null; + + if (isDone()) + return; + + boolean allReceived = false; + + synchronized (mux) { + if (remaining.remove(nodeId)) + allReceived = remaining.isEmpty(); + + resps.put(nodeId, msg); + } + + if (allReceived) + onAllReceived(); + } + + /** + * + */ + private void onAllReceived() { + Throwable e = new Throwable(); + + boolean fail = false; + + for (Map.Entry<UUID, GridChangeGlobalStateMessageResponse> entry : resps.entrySet()) { + GridChangeGlobalStateMessageResponse r = entry.getValue(); + + if (r.getError() != null) { + fail = true; + + e.addSuppressed(r.getError()); + } + } + + if (fail) + onDone(e); + else + onDone(); + } + + /** {@inheritDoc} */ + @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) { + ctx.state().cgsLocFut.set(null); + + return super.onDone(res, err); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridChangeGlobalStateFuture.class, this); + } + } + + /** + * + * + */ + private static class ChangeGlobalStateContext { + /** Request id. */ + private final UUID requestId; + + /** Initiating node id. */ + private final UUID initiatingNodeId; + + /** Batch requests. */ + private final DynamicCacheChangeBatch batch; + + /** Activate. */ + private final boolean activate; + + /** Topology version. */ + private AffinityTopologyVersion topVer; + + /** Fail. */ + private boolean fail; + + /** Async activate future. */ + private IgniteInternalFuture<?> asyncActivateFut; + + /** + * + */ + public ChangeGlobalStateContext( + UUID requestId, + UUID initiatingNodeId, + DynamicCacheChangeBatch batch, + boolean activate + ) { + this.requestId = requestId; + this.batch = batch; + this.activate = activate; + this.initiatingNodeId = initiatingNodeId; + } + + /** + * @param topVer Topology version. + */ + public void topologyVersion(AffinityTopologyVersion topVer) { + this.topVer = topVer; + } + + /** + * + */ + private void setFail() { + fail = true; + } + + /** + * + */ + private boolean isFail() { + return fail; + } + + /** + * + */ + public IgniteInternalFuture<?> getAsyncActivateFut() { + return asyncActivateFut; + } + + /** + * @param asyncActivateFut Async activate future. + */ + public void setAsyncActivateFut(IgniteInternalFuture<?> asyncActivateFut) { + this.asyncActivateFut = asyncActivateFut; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ChangeGlobalStateContext.class, this); + } + } + + /** + * + */ + private static class ClientChangeGlobalStateComputeRequest implements IgniteRunnable { + /** */ + private static final long serialVersionUID = 0L; + + /** Activation. */ + private final boolean activation; + + /** Ignite. */ + @IgniteInstanceResource + private Ignite ignite; + + /** + * + */ + private ClientChangeGlobalStateComputeRequest(boolean activation) { + this.activation = activation; + } + + /** {@inheritDoc} */ + @Override public void run() { + ignite.active(activation); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index 9012214,f97fc14..df2f7d9 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@@ -1925,59 -1920,24 +1923,59 @@@ public class DataStreamerImpl<K, V> imp ExpiryPolicy plc = cctx.expiry(); - for (Entry<KeyCacheObject, CacheObject> e : entries) { - try { - e.getKey().finishUnmarshal(cctx.cacheObjectContext(), cctx.deploy().globalLoader()); + Collection<Integer> reservedParts = new HashSet<>(); + Collection<Integer> ignoredParts = new HashSet<>(); - GridCacheEntryEx entry = internalCache.entryEx(e.getKey(), topVer); + try { + for (Entry<KeyCacheObject, CacheObject> e : entries) { + cctx.shared().database().checkpointReadLock(); - if (plc != null) { - ttl = CU.toTtl(plc.getExpiryForCreation()); + try { + e.getKey().finishUnmarshal(cctx.cacheObjectContext(), cctx.deploy().globalLoader()); - if (ttl == CU.TTL_ZERO) - continue; - else if (ttl == CU.TTL_NOT_CHANGED) - ttl = 0; + if (!cctx.isLocal()) { + int p = cctx.affinity().partition(e.getKey()); - expiryTime = CU.toExpireTime(ttl); - } + if (ignoredParts.contains(p)) + continue; + + if (!reservedParts.contains(p)) { + GridDhtLocalPartition part = cctx.topology().localPartition(p, topVer, true); + + if (!part.reserve()) { + ignoredParts.add(p); + + continue; + } + else { + // We must not allow to read from RENTING partitions. + if (part.state() == GridDhtPartitionState.RENTING) { + part.release(); + + ignoredParts.add(p); + + continue; + } + + reservedParts.add(p); + } + } + } + + GridCacheEntryEx entry = internalCache.entryEx(e.getKey(), topVer); + + if (plc != null) { + ttl = CU.toTtl(plc.getExpiryForCreation()); + + if (ttl == CU.TTL_ZERO) + continue; + else if (ttl == CU.TTL_NOT_CHANGED) + ttl = 0; + + expiryTime = CU.toExpireTime(ttl); + } - boolean primary = cctx.affinity().primary(cctx.localNode(), entry.key(), topVer); + boolean primary = cctx.affinity().primaryByKey(cctx.localNode(), entry.key(), topVer); entry.initialValue(e.getValue(), ver, http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java index 0000000,fdea869..3d2f86e mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java @@@ -1,0 -1,363 +1,363 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.marshaller; + + import java.util.Iterator; + import java.util.List; + import java.util.Map; + import java.util.UUID; + import java.util.concurrent.ConcurrentMap; + import java.util.concurrent.CopyOnWriteArrayList; + import java.util.concurrent.ExecutorService; + import org.apache.ignite.IgniteCheckedException; + import org.apache.ignite.cluster.ClusterNode; + import org.apache.ignite.events.DiscoveryEvent; + import org.apache.ignite.events.Event; + import org.apache.ignite.internal.GridKernalContext; + import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; + import org.apache.ignite.internal.MarshallerContextImpl; + import org.apache.ignite.internal.managers.communication.GridIoManager; + import org.apache.ignite.internal.managers.communication.GridMessageListener; + import org.apache.ignite.internal.managers.discovery.CustomEventListener; + import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; + import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; + import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; + import org.apache.ignite.internal.processors.GridProcessorAdapter; + import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; + import org.apache.ignite.internal.processors.closure.GridClosureProcessor; + import org.apache.ignite.internal.util.future.GridFutureAdapter; + import org.apache.ignite.internal.util.typedef.internal.U; + import org.apache.ignite.lang.IgniteFuture; + import org.apache.ignite.spi.discovery.DiscoveryDataBag; + import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData; + import org.jetbrains.annotations.Nullable; + import org.jsr166.ConcurrentHashMap8; + + import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; + import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; + import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.MARSHALLER_PROC; + import static org.apache.ignite.internal.GridTopic.TOPIC_MAPPING_MARSH; + import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; + + /** + * Processor responsible for managing custom {@link DiscoveryCustomMessage} + * events for exchanging marshalling mappings between nodes in grid. + * + * In particular it processes two flows: + * <ul> + * <li> + * Some node, server or client, wants to add new mapping for some class. + * In that case a pair of {@link MappingProposedMessage} and {@link MappingAcceptedMessage} events is used. + * </li> + * <li> + * As discovery events are delivered to clients asynchronously, + * client node may not have some mapping when server nodes in the grid are already allowed to use the mapping. + * In that situation client sends a {@link MissingMappingRequestMessage} request + * and processor handles it as well as {@link MissingMappingResponseMessage} message. + * </li> + * </ul> + */ + public class GridMarshallerMappingProcessor extends GridProcessorAdapter { + /** */ + private final MarshallerContextImpl marshallerCtx; + + /** */ + private final GridClosureProcessor closProc; + + /** */ + private final List<MappingUpdatedListener> mappingUpdatedLsnrs = new CopyOnWriteArrayList<>(); + + /** */ + private final ConcurrentMap<MarshallerMappingItem, GridFutureAdapter<MappingExchangeResult>> mappingExchangeSyncMap + = new ConcurrentHashMap8<>(); + + /** */ + private final ConcurrentMap<MarshallerMappingItem, ClientRequestFuture> clientReqSyncMap = new ConcurrentHashMap8<>(); + + /** + * @param ctx Kernal context. + */ + public GridMarshallerMappingProcessor(GridKernalContext ctx) { + super(ctx); + + marshallerCtx = ctx.marshallerContext(); + + closProc = ctx.closure(); + } + + /** {@inheritDoc} */ - @Override public void start() throws IgniteCheckedException { ++ @Override public void start(boolean activeOnStart) throws IgniteCheckedException { + GridDiscoveryManager discoMgr = ctx.discovery(); + GridIoManager ioMgr = ctx.io(); + + MarshallerMappingTransport transport = new MarshallerMappingTransport( + ctx, + mappingExchangeSyncMap, + clientReqSyncMap + ); + marshallerCtx.onMarshallerProcessorStarted(ctx, transport); + + discoMgr.setCustomEventListener(MappingProposedMessage.class, new MarshallerMappingExchangeListener()); + + discoMgr.setCustomEventListener(MappingAcceptedMessage.class, new MappingAcceptedListener()); + + if (ctx.clientNode()) + ioMgr.addMessageListener(TOPIC_MAPPING_MARSH, new MissingMappingResponseListener()); + else + ioMgr.addMessageListener(TOPIC_MAPPING_MARSH, new MissingMappingRequestListener(ioMgr)); + + if (ctx.clientNode()) + ctx.event().addLocalEventListener(new GridLocalEventListener() { + @Override public void onEvent(Event evt) { + DiscoveryEvent evt0 = (DiscoveryEvent) evt; + + if (!ctx.isStopping()) { + for (ClientRequestFuture fut : clientReqSyncMap.values()) + fut.onNodeLeft(evt0.eventNode().id()); + } + } + }, EVT_NODE_LEFT, EVT_NODE_FAILED); + } + + /** + * Adds a listener to be notified when mapping changes. + * + * @param mappingUpdatedListener listener for mapping updated events. + */ + public void addMappingUpdatedListener(MappingUpdatedListener mappingUpdatedListener) { + mappingUpdatedLsnrs.add(mappingUpdatedListener); + } + + /** + * Gets an iterator over all current mappings. + * + * @return Iterator over current mappings. + */ + public Iterator<Map.Entry<Byte, Map<Integer, String>>> currentMappings() { + return marshallerCtx.currentMappings(); + } + + /** + * + */ + private final class MissingMappingRequestListener implements GridMessageListener { + /** */ + private final GridIoManager ioMgr; + + /** + * @param ioMgr Io manager. + */ + MissingMappingRequestListener(GridIoManager ioMgr) { + this.ioMgr = ioMgr; + } + + /** {@inheritDoc} */ + @Override public void onMessage(UUID nodeId, Object msg) { + assert msg instanceof MissingMappingRequestMessage : msg; + + MissingMappingRequestMessage msg0 = (MissingMappingRequestMessage) msg; + + byte platformId = msg0.platformId(); + int typeId = msg0.typeId(); + + String resolvedClsName = marshallerCtx.resolveMissedMapping(platformId, typeId); + + try { + ioMgr.send( + nodeId, + TOPIC_MAPPING_MARSH, + new MissingMappingResponseMessage(platformId, typeId, resolvedClsName), + SYSTEM_POOL); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send missing mapping response.", e); + } + } + } + + /** + * + */ + private final class MissingMappingResponseListener implements GridMessageListener { + /** {@inheritDoc} */ + @Override public void onMessage(UUID nodeId, Object msg) { + assert msg instanceof MissingMappingResponseMessage : msg; + + MissingMappingResponseMessage msg0 = (MissingMappingResponseMessage) msg; + + byte platformId = msg0.platformId(); + int typeId = msg0.typeId(); + String resolvedClsName = msg0.className(); + + MarshallerMappingItem item = new MarshallerMappingItem(platformId, typeId, null); + + GridFutureAdapter<MappingExchangeResult> fut = clientReqSyncMap.get(item); + + if (fut != null) { + if (resolvedClsName != null) { + marshallerCtx.onMissedMappingResolved(item, resolvedClsName); + + fut.onDone(MappingExchangeResult.createSuccessfulResult(resolvedClsName)); + } + else + fut.onDone(MappingExchangeResult.createFailureResult( + new IgniteCheckedException( + "Failed to resolve mapping [platformId: " + + platformId + + ", typeId: " + + typeId + "]"))); + } + } + } + + /** + * + */ + private final class MarshallerMappingExchangeListener implements CustomEventListener<MappingProposedMessage> { + /** {@inheritDoc} */ + @Override public void onCustomEvent( + AffinityTopologyVersion topVer, + ClusterNode snd, + MappingProposedMessage msg + ) { + if (!ctx.isStopping()) { + if (msg.duplicated()) + return; + + if (!msg.inConflict()) { + MarshallerMappingItem item = msg.mappingItem(); + String conflictingName = marshallerCtx.onMappingProposed(item); + + if (conflictingName != null) { + if (conflictingName.equals(item.className())) + msg.markDuplicated(); + else + msg.conflictingWithClass(conflictingName); + } + } + else { + UUID origNodeId = msg.origNodeId(); + + if (origNodeId.equals(ctx.localNodeId())) { + GridFutureAdapter<MappingExchangeResult> fut = mappingExchangeSyncMap.get(msg.mappingItem()); + + assert fut != null: msg; + + fut.onDone(MappingExchangeResult.createFailureResult( + duplicateMappingException(msg.mappingItem(), msg.conflictingClassName()))); + } + } + } + } + + /** + * @param mappingItem Mapping item. + * @param conflictingClsName Conflicting class name. + */ + private IgniteCheckedException duplicateMappingException( + MarshallerMappingItem mappingItem, + String conflictingClsName + ) { + return new IgniteCheckedException("Duplicate ID [platformId=" + + mappingItem.platformId() + + ", typeId=" + + mappingItem.typeId() + + ", oldCls=" + + conflictingClsName + + ", newCls=" + + mappingItem.className() + "]"); + } + } + + /** + * + */ + private final class MappingAcceptedListener implements CustomEventListener<MappingAcceptedMessage> { + /** {@inheritDoc} */ + @Override public void onCustomEvent( + AffinityTopologyVersion topVer, + ClusterNode snd, + MappingAcceptedMessage msg + ) { + final MarshallerMappingItem item = msg.getMappingItem(); + marshallerCtx.onMappingAccepted(item); + + closProc.runLocalSafe(new Runnable() { + @Override public void run() { + for (MappingUpdatedListener lsnr : mappingUpdatedLsnrs) + lsnr.mappingUpdated(item.platformId(), item.typeId(), item.className()); + } + }); + + GridFutureAdapter<MappingExchangeResult> fut = mappingExchangeSyncMap.get(item); + + if (fut != null) + fut.onDone(MappingExchangeResult.createSuccessfulResult(item.className())); + } + } + + /** {@inheritDoc} */ + @Override public void collectGridNodeData(DiscoveryDataBag dataBag) { + if (!dataBag.commonDataCollectedFor(MARSHALLER_PROC.ordinal())) + dataBag.addGridCommonData(MARSHALLER_PROC.ordinal(), marshallerCtx.getCachedMappings()); + } + + /** {@inheritDoc} */ + @Override public void onGridDataReceived(GridDiscoveryData data) { + List<Map<Integer, MappedName>> mappings = (List<Map<Integer, MappedName>>) data.commonData(); + + if (mappings != null) { + for (int i = 0; i < mappings.size(); i++) { + Map<Integer, MappedName> map; + + if ((map = mappings.get(i)) != null) + marshallerCtx.onMappingDataReceived((byte) i, map); + } + } + } + + /** {@inheritDoc} */ + @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException { + cancelFutures(MappingExchangeResult.createFailureResult(new IgniteClientDisconnectedCheckedException( + ctx.cluster().clientReconnectFuture(), + "Failed to propose or request mapping, client node disconnected."))); + } + + /** {@inheritDoc} */ + @Override public void onKernalStop(boolean cancel) { + marshallerCtx.onMarshallerProcessorStop(); + + cancelFutures(MappingExchangeResult.createExchangeDisabledResult()); + } + + /** {@inheritDoc} */ + @Nullable @Override public DiscoveryDataExchangeType discoveryDataType() { + return MARSHALLER_PROC; + } + + /** + * @param res Response. + */ + private void cancelFutures(MappingExchangeResult res) { + for (GridFutureAdapter<MappingExchangeResult> fut : mappingExchangeSyncMap.values()) + fut.onDone(res); + + for (GridFutureAdapter<MappingExchangeResult> fut : clientReqSyncMap.values()) + fut.onDone(res); + } + } http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java index 5fcd2eb,ea8f361..efd1926 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java @@@ -55,15 -44,12 +55,17 @@@ import org.apache.ignite.configuration. import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.configuration.TransactionConfiguration; -import org.apache.ignite.internal.binary.*; +import org.apache.ignite.internal.binary.BinaryRawReaderEx; +import org.apache.ignite.internal.binary.BinaryRawWriterEx; import org.apache.ignite.internal.processors.platform.cache.affinity.PlatformAffinityFunction; import org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicyFactory; + import org.apache.ignite.internal.processors.platform.plugin.cache.PlatformCachePluginConfiguration; -import org.apache.ignite.platform.dotnet.*; +import org.apache.ignite.platform.dotnet.PlatformDotNetAffinityFunction; +import org.apache.ignite.platform.dotnet.PlatformDotNetBinaryConfiguration; +import org.apache.ignite.platform.dotnet.PlatformDotNetBinaryTypeConfiguration; +import org.apache.ignite.platform.dotnet.PlatformDotNetCacheStoreFactoryNative; +import org.apache.ignite.platform.dotnet.PlatformDotNetConfiguration; + import org.apache.ignite.plugin.CachePluginConfiguration; import org.apache.ignite.spi.communication.CommunicationSpi; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiMBean; @@@ -821,19 -865,19 +850,19 @@@ public class PlatformConfigurationUtil * Write query entity. * * @param writer Writer. -- * @param queryEntity Query entity. ++ * @param qryEntity Query entity. */ -- private static void writeQueryEntity(BinaryRawWriter writer, QueryEntity queryEntity) { -- assert queryEntity != null; ++ private static void writeQueryEntity(BinaryRawWriter writer, QueryEntity qryEntity) { ++ assert qryEntity != null; -- writer.writeString(queryEntity.getKeyType()); -- writer.writeString(queryEntity.getValueType()); ++ writer.writeString(qryEntity.getKeyType()); ++ writer.writeString(qryEntity.getValueType()); // Fields -- LinkedHashMap<String, String> fields = queryEntity.getFields(); ++ LinkedHashMap<String, String> fields = qryEntity.getFields(); if (fields != null) { -- Set<String> keyFields = queryEntity.getKeyFields(); ++ Set<String> keyFields = qryEntity.getKeyFields(); writer.writeInt(fields.size()); @@@ -847,7 -891,7 +876,7 @@@ writer.writeInt(0); // Aliases -- Map<String, String> aliases = queryEntity.getAliases(); ++ Map<String, String> aliases = qryEntity.getAliases(); if (aliases != null) { writer.writeInt(aliases.size()); @@@ -861,7 -905,7 +890,7 @@@ writer.writeInt(0); // Indexes -- Collection<QueryIndex> indexes = queryEntity.getIndexes(); ++ Collection<QueryIndex> indexes = qryEntity.getIndexes(); if (indexes != null) { writer.writeInt(indexes.size()); http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/CachePluginManager.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/CachePluginManager.java index 772813f,fa33d3a..456bb86 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/CachePluginManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/CachePluginManager.java @@@ -67,13 -65,13 +67,13 @@@ public class CachePluginManager extend public CachePluginManager(GridKernalContext ctx, CacheConfiguration cfg) { this.ctx = ctx; this.cfg = cfg; - + - for (PluginProvider p : ctx.plugins().allProviders()) { - CachePluginContext pluginCtx = new GridCachePluginContext(ctx, cfg); + if (cfg.getPluginConfigurations() != null) { + for (CachePluginConfiguration cachePluginCfg : cfg.getPluginConfigurations()) { + CachePluginContext pluginCtx = new GridCachePluginContext(ctx, cfg, cachePluginCfg); - CachePluginProvider provider = p.createCacheProvider(pluginCtx); + CachePluginProvider provider = cachePluginCfg.createProvider(pluginCtx); - if (provider != null) { providersList.add(provider); providersMap.put(pluginCtx, provider); } @@@ -99,9 -97,9 +99,9 @@@ } /** {@inheritDoc} */ - @Override protected void stop0(boolean cancel) { + @Override protected void stop0(boolean cancel, boolean destroy) { - for (ListIterator<CachePluginProvider> iter = providersList.listIterator(); iter.hasPrevious();) - iter.previous().stop(cancel); + for (int i = providersList.size() - 1; i >= 0; i--) + providersList.get(i).stop(cancel); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index cf85a52,0ff6d8b..8b282ab --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@@ -62,9 -62,9 +62,10 @@@ import org.apache.ignite.configuration. import org.apache.ignite.events.CacheQueryExecutedEvent; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.binary.BinaryObjectEx; + import org.apache.ignite.internal.binary.BinaryObjectExImpl; import org.apache.ignite.internal.processors.GridProcessorAdapter; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryImpl; @@@ -742,83 -685,40 +742,70 @@@ public class GridQueryProcessor extend if (log.isDebugEnabled()) log.debug("Store [space=" + space + ", key=" + key + ", val=" + val + "]"); -- CacheObjectContext coctx = null; - - if (ctx.indexing().enabled()) { - coctx = cacheObjectContext(space); - - Object key0 = unwrap(key, coctx); - - Object val0 = unwrap(val, coctx); - - ctx.indexing().store(space, key0, val0, expirationTime); - } -- if (idx == null) return; if (!busyLock.enterBusy()) - return; + throw new NodeStoppingException("Operation has been cancelled (node is stopping)."); try { -- if (coctx == null) -- coctx = cacheObjectContext(space); ++ CacheObjectContext coctx = cacheObjectContext(space); - Class<?> valCls = null; + TypeDescriptor desc = typeByValue(coctx, key, val, true); - TypeId id; + if (prevVal != null) { + TypeDescriptor prevValDesc = typeByValue(coctx, key, prevVal, false); - boolean binaryVal = ctx.cacheObjects().isBinaryObject(val); + if (prevValDesc != null && prevValDesc != desc) + idx.remove(space, prevValDesc, key, partId, prevVal, prevVer); + } - if (binaryVal) { - int typeId = ctx.cacheObjects().typeId(val); + if (desc == null) + return; - id = new TypeId(space, typeId); - } - else { - valCls = val.value(coctx, false).getClass(); + idx.store(space, desc, key, partId, val, ver, expirationTime, link); + } + finally { + busyLock.leaveBusy(); + } + } - id = new TypeId(space, valCls); - } + /** + * @param coctx Cache context. + * @param key Key. + * @param val Value. + * @param checkType If {@code true} checks that key and value type correspond to found TypeDescriptor. + * @return Type descriptor if found. + * @throws IgniteCheckedException If type check failed. + */ + @Nullable private TypeDescriptor typeByValue(CacheObjectContext coctx, + KeyCacheObject key, + CacheObject val, + boolean checkType) + throws IgniteCheckedException { + Class<?> valCls = null; - TypeDescriptor desc = types.get(id); + TypeId id; - if (desc == null || !desc.registered()) - return; + boolean binaryVal = ctx.cacheObjects().isBinaryObject(val); + + if (binaryVal) { + int typeId = ctx.cacheObjects().typeId(val); + id = new TypeId(coctx.cacheName(), typeId); + } + else { + valCls = val.value(coctx, false).getClass(); + + id = new TypeId(coctx.cacheName(), valCls); + } + + TypeDescriptor desc = types.get(id); + + if (desc == null || !desc.registered()) + return null; + + if (checkType) { if (!binaryVal && !desc.valueClass().isAssignableFrom(valCls)) throw new IgniteCheckedException("Failed to update index due to class name conflict" + "(multiple classes with same simple name are stored in the same cache) " + @@@ -1143,33 -1039,14 +1124,21 @@@ if (log.isDebugEnabled()) log.debug("Remove [space=" + space + ", key=" + key + ", val=" + val + "]"); - CacheObjectContext coctx = null; - - if (ctx.indexing().enabled()) { - coctx = cacheObjectContext(space); - - Object key0 = unwrap(key, coctx); - - ctx.indexing().remove(space, key0); - } - - // If val == null we only need to call SPI. - if (idx == null) + if (idx == null || val == null) return; if (!busyLock.enterBusy()) throw new IllegalStateException("Failed to remove from index (grid is stopping)."); try { - if (coctx == null) - coctx = cacheObjectContext(space); - idx.remove(space, key, val); ++ CacheObjectContext coctx = cacheObjectContext(space); + + TypeDescriptor desc = typeByValue(coctx, key, val, false); + + if (desc == null) + return; + + idx.remove(space, desc, key, partId, val, ver); } finally { busyLock.leaveBusy(); http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryTypeDescriptor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java index 914c3a3,986fff7..096c531 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java @@@ -1666,26 -1651,24 +1671,26 @@@ public class GridServiceProcessor exten onReassignmentFailed(topVer, retries); } + Iterator<Cache.Entry<Object, Object>> it = serviceEntries(ServiceAssignmentsPredicate.INSTANCE); + // Clean up zombie assignments. - for (Cache.Entry<Object, Object> e : - cache.entrySetx(CU.cachePrimary(ctx.grid().affinity(cache.name()), ctx.grid().localNode()))) { - if (!(e.getKey() instanceof GridServiceAssignmentsKey)) - continue; + while (it.hasNext()) { + Cache.Entry<Object, Object> e = it.next(); - if (cache.context().affinity().primary(ctx.grid().localNode(), e.getKey(), topVer)) { - String name = ((GridServiceAssignmentsKey)e.getKey()).name(); ++ if (cache.context().affinity().primaryByKey(ctx.grid().localNode(), e.getKey(), topVer)) { + String name = ((GridServiceAssignmentsKey)e.getKey()).name(); - try { - if (cache.get(new GridServiceDeploymentKey(name)) == null) { - if (log.isDebugEnabled()) - log.debug("Removed zombie assignments: " + e.getValue()); + try { + if (cache.get(new GridServiceDeploymentKey(name)) == null) { + if (log.isDebugEnabled()) + log.debug("Removed zombie assignments: " + e.getValue()); - cache.getAndRemove(e.getKey()); + cache.getAndRemove(e.getKey()); + } + } + catch (IgniteCheckedException ex) { + U.error(log, "Failed to clean up zombie assignments for service: " + name, ex); } - } - catch (IgniteCheckedException ex) { - U.error(log, "Failed to clean up zombie assignments for service: " + name, ex); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index 389d058,1cca9d3..d2e0714 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@@ -74,7 -74,7 +74,8 @@@ import java.nio.channels.SelectionKey import java.nio.channels.Selector; import java.nio.charset.Charset; import java.nio.file.Files; +import java.nio.file.Path; + import java.nio.file.Paths; import java.security.AccessController; import java.security.KeyManagementException; import java.security.MessageDigest; http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 4b140be,4c89a7c..00ca409 mode 100644,100755..100755 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreAbstractSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreAbstractMultithreadedSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryAbstractTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java index 2c86322,7a0b713..1343151 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java @@@ -5869,14 -5869,15 +5869,14 @@@ public abstract class GridCacheAbstract @Override public void run(int idx) throws Exception { GridCacheContext<String, Integer> ctx = ((IgniteKernal)ignite).<String, Integer>internalCache().context(); - if (ctx.cache().configuration().getMemoryMode() == OFFHEAP_TIERED) - return; - int size = 0; + if (ctx.isNear()) + ctx = ctx.near().dht().context(); + for (String key : keys) { - if (ctx.affinity().localNode(key, ctx.discovery().topologyVersionEx())) { + if (ctx.affinity().keyLocalNode(key, ctx.discovery().topologyVersionEx())) { - GridCacheEntryEx e = - ctx.isNear() ? ctx.near().dht().peekEx(key) : ctx.cache().peekEx(key); + GridCacheEntryEx e = ctx.cache().entryEx(key); assert e != null : "Entry is null [idx=" + idx + ", key=" + key + ", ctx=" + ctx + ']'; assert !e.deleted() : "Entry is deleted: " + e; http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheEntryMemorySizeSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStopSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java index be3933f,e76ab40..691100f --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java @@@ -411,9 -417,31 +410,28 @@@ public class GridCacheTestEntryEx exten } /** @inheritDoc */ - @Nullable @Override public T2<CacheObject, GridCacheVersion> innerGetVersioned( + @Override public void clearReserveForLoad(GridCacheVersion ver) { + assert false; + } + + /** @inheritDoc */ + @Override public EntryGetResult innerGetAndReserveForLoad( - boolean readSwap, + boolean updateMetrics, + boolean evt, + UUID subjId, + String taskName, + @Nullable IgniteCacheExpiryPolicy expiryPlc, + boolean keepBinary, + @Nullable ReaderArguments args) throws IgniteCheckedException, GridCacheEntryRemovedException { + assert false; + + return null; + } + + /** @inheritDoc */ + @Nullable @Override public EntryGetResult innerGetVersioned( @Nullable GridCacheVersion ver, IgniteInternalTx tx, - boolean readSwap, - boolean unmarshal, boolean updateMetrics, boolean evt, UUID subjId, http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java index b746883,e47a18d..a78d8ef --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java @@@ -5553,12 -5545,10 +5551,12 @@@ public class IgniteCacheConfigVariation int size = 0; + if (ctx.isNear()) + ctx = ctx.near().dht().context(); + for (String key : keys) { - if (ctx.affinity().localNode(key, ctx.discovery().topologyVersionEx())) { + if (ctx.affinity().keyLocalNode(key, ctx.discovery().topologyVersionEx())) { - GridCacheEntryEx e = - ctx.isNear() ? ctx.near().dht().peekEx(key) : ctx.cache().peekEx(key); + GridCacheEntryEx e = ctx.cache().entryEx(key); assert e != null : "Entry is null [idx=" + idx + ", key=" + key + ", ctx=" + ctx + ']'; assert !e.deleted() : "Entry is deleted: " + e; http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePeekModesAbstractTest.java ----------------------------------------------------------------------