http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteSingleStateImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteSingleStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteSingleStateImpl.java new file mode 100644 index 0000000..22f04a8 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteSingleStateImpl.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.transactions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public class IgniteTxRemoteSingleStateImpl extends IgniteTxRemoteStateAdapter { + /** */ + private IgniteTxEntry entry; + + /** {@inheritDoc} */ + @Override public void addWriteEntry(IgniteTxKey key, IgniteTxEntry e) { + this.entry = e; + } + + /** {@inheritDoc} */ + @Override public void clearEntry(IgniteTxKey key) { + if (entry != null && entry.txKey().equals(key)) + entry = null; + } + + /** {@inheritDoc} */ + @Override public IgniteTxEntry entry(IgniteTxKey key) { + if (entry != null && entry.txKey().equals(key)) + return entry; + + return null; + } + + /** {@inheritDoc} */ + @Override public boolean hasWriteKey(IgniteTxKey key) { + return entry != null && entry.txKey().equals(key); + } + + /** {@inheritDoc} */ + @Override public Set<IgniteTxKey> readSet() { + return Collections.emptySet(); + } + + /** {@inheritDoc} */ + @Override public Set<IgniteTxKey> writeSet() { + return entry != null ? Collections.singleton(entry.txKey()) : Collections.<IgniteTxKey>emptySet(); + } + + /** {@inheritDoc} */ + @Override public Collection<IgniteTxEntry> writeEntries() { + return entry != null ? Collections.singletonList(entry) : Collections.<IgniteTxEntry>emptyList(); + } + + /** {@inheritDoc} */ + @Override public Collection<IgniteTxEntry> readEntries() { + return Collections.emptyList(); + } + + /** {@inheritDoc} */ + @Override public Map<IgniteTxKey, IgniteTxEntry> writeMap() { + return entry != null ? Collections.singletonMap(entry.txKey(), entry) : + Collections.<IgniteTxKey, IgniteTxEntry>emptyMap(); + } + + /** {@inheritDoc} */ + @Override public Map<IgniteTxKey, IgniteTxEntry> readMap() { + return Collections.emptyMap(); + } + + /** {@inheritDoc} */ + @Override public boolean empty() { + return entry == null; + } + + /** {@inheritDoc} */ + @Override public Collection<IgniteTxEntry> allEntries() { + return entry != null ? Collections.singletonList(entry) : Collections.<IgniteTxEntry>emptyList(); + } + + /** {@inheritDoc} */ + @Nullable @Override public IgniteTxEntry singleWrite() { + return entry; + } + + /** {@inheritDoc} */ + public String toString() { + return S.toString(IgniteTxRemoteSingleStateImpl.class, this); + } +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteState.java new file mode 100644 index 0000000..b8290a1 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteState.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.transactions; + +/** + * + */ +public interface IgniteTxRemoteState extends IgniteTxState { + /** + * @param key Key. + * @param e Entry. + */ + public void addWriteEntry(IgniteTxKey key, IgniteTxEntry e); + + /** + * @param key Entry key. + */ + public void clearEntry(IgniteTxKey key); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java new file mode 100644 index 0000000..e7c4c96 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.transactions; + +import java.util.Collection; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; +import org.apache.ignite.internal.processors.cache.store.CacheStoreManager; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public abstract class IgniteTxRemoteStateAdapter implements IgniteTxRemoteState { + /** {@inheritDoc} */ + @Override public boolean implicitSingle() { + return false; + } + + /** {@inheritDoc} */ + @Nullable @Override public Integer firstCacheId() { + assert false; + + return null; + } + + /** {@inheritDoc} */ + @Override public void awaitLastFut(GridCacheSharedContext cctx) { + assert false; + } + + /** {@inheritDoc} */ + @Override public IgniteCheckedException validateTopology(GridCacheSharedContext cctx, GridDhtTopologyFuture topFut) { + assert false; + + return null; + } + + /** {@inheritDoc} */ + @Override public boolean sync(GridCacheSharedContext cctx) { + assert false; + + return false; + } + + /** {@inheritDoc} */ + @Override public boolean hasNearCache(GridCacheSharedContext cctx) { + assert false; + + return false; + } + + /** {@inheritDoc} */ + @Override public void addActiveCache(GridCacheContext cacheCtx, IgniteTxLocalAdapter tx) + throws IgniteCheckedException { + assert false; + } + + /** {@inheritDoc} */ + @Override public GridDhtTopologyFuture topologyReadLock(GridCacheSharedContext cctx, GridFutureAdapter<?> fut) { + assert false; + + return null; + } + + /** {@inheritDoc} */ + @Override public void topologyReadUnlock(GridCacheSharedContext cctx) { + assert false; + } + + /** {@inheritDoc} */ + @Override public boolean storeUsed(GridCacheSharedContext cctx) { + return false; + } + + /** {@inheritDoc} */ + @Override public Collection<CacheStoreManager> stores(GridCacheSharedContext cctx) { + assert false; + + return null; + } + + /** {@inheritDoc} */ + @Nullable @Override public GridCacheContext singleCacheContext(GridCacheSharedContext cctx) { + return null; + } + + /** {@inheritDoc} */ + @Override public void onTxEnd(GridCacheSharedContext cctx, IgniteInternalTx tx, boolean commit) { + assert false; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateImpl.java new file mode 100644 index 0000000..32bc646 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateImpl.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.transactions; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * + */ +public class IgniteTxRemoteStateImpl extends IgniteTxRemoteStateAdapter { + /** Read set. */ + @GridToStringInclude + protected Map<IgniteTxKey, IgniteTxEntry> readMap; + + /** Write map. */ + @GridToStringInclude + protected Map<IgniteTxKey, IgniteTxEntry> writeMap; + + /** + * @param readMap Read map. + * @param writeMap Write map. + */ + public IgniteTxRemoteStateImpl(Map<IgniteTxKey, IgniteTxEntry> readMap, + Map<IgniteTxKey, IgniteTxEntry> writeMap) { + this.readMap = readMap; + this.writeMap = writeMap; + } + + /** {@inheritDoc} */ + @Override public IgniteTxEntry entry(IgniteTxKey key) { + IgniteTxEntry e = writeMap == null ? null : writeMap.get(key); + + if (e == null) + e = readMap == null ? null : readMap.get(key); + + return e; + } + + /** {@inheritDoc} */ + @Override public boolean hasWriteKey(IgniteTxKey key) { + return writeMap.containsKey(key); + } + + /** {@inheritDoc} */ + @Override public Set<IgniteTxKey> readSet() { + return readMap.keySet(); + } + + /** {@inheritDoc} */ + @Override public Set<IgniteTxKey> writeSet() { + return writeMap.keySet(); + } + + /** {@inheritDoc} */ + @Override public Collection<IgniteTxEntry> writeEntries() { + return writeMap.values(); + } + + /** {@inheritDoc} */ + @Override public Collection<IgniteTxEntry> readEntries() { + return readMap.values(); + } + + /** {@inheritDoc} */ + @Override public Map<IgniteTxKey, IgniteTxEntry> writeMap() { + return writeMap; + } + + /** {@inheritDoc} */ + @Override public Map<IgniteTxKey, IgniteTxEntry> readMap() { + return readMap; + } + + /** {@inheritDoc} */ + @Override public boolean empty() { + return readMap.isEmpty() && writeMap.isEmpty(); + } + + /** {@inheritDoc} */ + public void addWriteEntry(IgniteTxKey key, IgniteTxEntry e) { + writeMap.put(key, e); + } + + /** {@inheritDoc} */ + public void clearEntry(IgniteTxKey key) { + readMap.remove(key); + writeMap.remove(key); + } + + /** {@inheritDoc} */ + @Override public Collection<IgniteTxEntry> allEntries() { + return F.concat(false, writeEntries(), readEntries()); + } + + /** {@inheritDoc} */ + @Override public IgniteTxEntry singleWrite() { + return null; + } + + /** {@inheritDoc} */ + public String toString() { + return S.toString(IgniteTxRemoteStateImpl.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java new file mode 100644 index 0000000..81707ba --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.transactions; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; +import org.apache.ignite.internal.processors.cache.store.CacheStoreManager; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public interface IgniteTxState { + /** + * + * @return Flag indicating whether transaction is implicit with only one key. + */ + public boolean implicitSingle(); + + /** + * @return First tx cache id. + */ + @Nullable public Integer firstCacheId(); + + /** + * @param cctx Context. + * @return cctx Non-null cache context if tx has only one active cache. + */ + @Nullable public GridCacheContext singleCacheContext(GridCacheSharedContext cctx); + + /** + * @param cctx Awaits for previous async operations on active caches to be completed. + */ + public void awaitLastFut(GridCacheSharedContext cctx); + + /** + * @param cctx Context. + * @param topFut Topology future. + * @return Error if validation failed. + */ + public IgniteCheckedException validateTopology(GridCacheSharedContext cctx, GridDhtTopologyFuture topFut); + + /** + * @param cctx Context. + * @return {@code True} if transaction is fully synchronous. + */ + public boolean sync(GridCacheSharedContext cctx); + + /** + * @param cctx Context. + * @return {@code True} is tx has active near cache. + */ + public boolean hasNearCache(GridCacheSharedContext cctx); + + /** + * @param cacheCtx Ccntext. + * @param tx Transaction. + * @throws IgniteCheckedException If cache check failed. + */ + public void addActiveCache(GridCacheContext cacheCtx, IgniteTxLocalAdapter tx) throws IgniteCheckedException; + + /** + * @param cctx Context. + * @param fut Future to finish with error if some cache is stopping. + * @return Topology future. + */ + public GridDhtTopologyFuture topologyReadLock(GridCacheSharedContext cctx, GridFutureAdapter<?> fut); + + /** + * @param cctx Context. + */ + public void topologyReadUnlock(GridCacheSharedContext cctx); + + /** + * @param cctx Context. + * @return {@code True} if transaction is allowed to use store and transactions spans one or more caches with + * store enabled. + */ + public boolean storeUsed(GridCacheSharedContext cctx); + + /** + * @param cctx Context. + * @return Configured stores for active caches. + */ + public Collection<CacheStoreManager> stores(GridCacheSharedContext cctx); + + /** + * @param cctx Context. + * @param tx Transaction. + * @param commit Commit flag. + */ + public void onTxEnd(GridCacheSharedContext cctx, IgniteInternalTx tx, boolean commit); + + /** + * @param key Key. + * @return Entry. + */ + @Nullable public IgniteTxEntry entry(IgniteTxKey key); + + /** + * @param key Key. + * @return {@code True} if tx has write key. + */ + public boolean hasWriteKey(IgniteTxKey key); + + /** + * @return Read entries keys. + */ + public Set<IgniteTxKey> readSet(); + + /** + * @return Write entries keys. + */ + public Set<IgniteTxKey> writeSet(); + + /** + * @return Write entries. + */ + public Collection<IgniteTxEntry> writeEntries(); + + /** + * @return Read entries. + */ + public Collection<IgniteTxEntry> readEntries(); + + /** + * @return Write entries map. + */ + public Map<IgniteTxKey, IgniteTxEntry> writeMap(); + + /** + * @return Read entries map. + */ + public Map<IgniteTxKey, IgniteTxEntry> readMap(); + + /** + * @return All entries. + */ + public Collection<IgniteTxEntry> allEntries(); + + /** + * @return Non-null entry if tx has only one write entry. + */ + @Nullable public IgniteTxEntry singleWrite(); + + /** + * @return {@code True} if transaction is empty. + */ + public boolean empty(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java new file mode 100644 index 0000000..c95fb19 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.transactions; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; +import org.apache.ignite.internal.processors.cache.store.CacheStoreManager; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter { + /** Active cache IDs. */ + private Set<Integer> activeCacheIds = new HashSet<>(); + /** Per-transaction read map. */ + + @GridToStringInclude + protected Map<IgniteTxKey, IgniteTxEntry> txMap; + + /** Read view on transaction map. */ + @GridToStringExclude + protected IgniteTxMap readView; + + /** Write view on transaction map. */ + @GridToStringExclude + protected IgniteTxMap writeView; + + /** {@inheritDoc} */ + @Override public boolean implicitSingle() { + return false; + } + + /** {@inheritDoc} */ + @Nullable @Override public Integer firstCacheId() { + return F.first(activeCacheIds); + } + + /** {@inheritDoc} */ + @Nullable @Override public GridCacheContext singleCacheContext(GridCacheSharedContext cctx) { + if (activeCacheIds.size() == 1) { + int cacheId = F.first(activeCacheIds); + + return cctx.cacheContext(cacheId); + } + + return null; + } + + /** {@inheritDoc} */ + @Override public void awaitLastFut(GridCacheSharedContext cctx) { + for (Integer cacheId : activeCacheIds) + cctx.cacheContext(cacheId).cache().awaitLastFut(); + } + + /** {@inheritDoc} */ + @Override public IgniteCheckedException validateTopology(GridCacheSharedContext cctx, + GridDhtTopologyFuture topFut) { + StringBuilder invalidCaches = null; + + for (Integer cacheId : activeCacheIds) { + GridCacheContext ctx = cctx.cacheContext(cacheId); + + assert ctx != null : cacheId; + + Throwable err = topFut.validateCache(ctx); + + if (err != null) { + if (invalidCaches != null) + invalidCaches.append(", "); + else + invalidCaches = new StringBuilder(); + + invalidCaches.append(U.maskName(ctx.name())); + } + } + + if (invalidCaches != null) { + return new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " + + invalidCaches.toString()); + } + + for (int cacheId : activeCacheIds) { + GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId); + + if (CU.affinityNodes(cacheCtx, topFut.topologyVersion()).isEmpty()) { + return new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all " + + "partition nodes left the grid): " + cacheCtx.name()); + } + } + + return null; + } + + /** {@inheritDoc} */ + @Override public boolean sync(GridCacheSharedContext cctx) { + for (int cacheId : activeCacheIds) { + if (cctx.cacheContext(cacheId).config().getWriteSynchronizationMode() == FULL_SYNC) + return true; + } + + return false; + } + + /** {@inheritDoc} */ + @Override public boolean hasNearCache(GridCacheSharedContext cctx) { + for (Integer cacheId : activeCacheIds) { + GridCacheContext cacheCtx = cctx.cacheContext(cacheId); + + if (cacheCtx.isNear()) + return true; + } + + return false; + } + + /** {@inheritDoc} */ + @Override public void addActiveCache(GridCacheContext cacheCtx, IgniteTxLocalAdapter tx) + throws IgniteCheckedException { + GridCacheSharedContext cctx = cacheCtx.shared(); + + int cacheId = cacheCtx.cacheId(); + + // Check if we can enlist new cache to transaction. + if (!activeCacheIds.contains(cacheId)) { + String err = cctx.verifyTxCompatibility(tx, activeCacheIds, cacheCtx); + + if (err != null) { + StringBuilder cacheNames = new StringBuilder(); + + int idx = 0; + + for (Integer activeCacheId : activeCacheIds) { + cacheNames.append(cctx.cacheContext(activeCacheId).name()); + + if (idx++ < activeCacheIds.size() - 1) + cacheNames.append(", "); + } + + throw new IgniteCheckedException("Failed to enlist new cache to existing transaction (" + + err + + ") [activeCaches=[" + cacheNames + "]" + + ", cacheName=" + cacheCtx.name() + + ", cacheSystem=" + cacheCtx.systemTx() + + ", txSystem=" + tx.system() + ']'); + } + else + activeCacheIds.add(cacheId); + + if (activeCacheIds.size() == 1) + tx.activeCachesDeploymentEnabled(cacheCtx.deploymentEnabled()); + } + } + + /** {@inheritDoc} */ + @Override public GridDhtTopologyFuture topologyReadLock(GridCacheSharedContext cctx, GridFutureAdapter<?> fut) { + if (activeCacheIds.isEmpty()) + return cctx.exchange().lastTopologyFuture(); + + GridCacheContext<?, ?> nonLocCtx = null; + + for (int cacheId : activeCacheIds) { + GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId); + + if (!cacheCtx.isLocal()) { + nonLocCtx = cacheCtx; + + break; + } + } + + if (nonLocCtx == null) + return cctx.exchange().lastTopologyFuture(); + + nonLocCtx.topology().readLock(); + + if (nonLocCtx.topology().stopping()) { + fut.onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " + + nonLocCtx.name())); + + return null; + } + + return nonLocCtx.topology().topologyVersionFuture(); + } + + /** {@inheritDoc} */ + @Override public void topologyReadUnlock(GridCacheSharedContext cctx) { + if (!activeCacheIds.isEmpty()) { + GridCacheContext<?, ?> nonLocCtx = null; + + for (int cacheId : activeCacheIds) { + GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId); + + if (!cacheCtx.isLocal()) { + nonLocCtx = cacheCtx; + + break; + } + } + + if (nonLocCtx != null) + nonLocCtx.topology().readUnlock(); + } + } + + /** {@inheritDoc} */ + @Override public boolean storeUsed(GridCacheSharedContext cctx) { + if (!activeCacheIds.isEmpty()) { + for (int cacheId : activeCacheIds) { + CacheStoreManager store = cctx.cacheContext(cacheId).store(); + + if (store.configured()) + return true; + } + } + + return false; + } + + /** {@inheritDoc} */ + @Override public Collection<CacheStoreManager> stores(GridCacheSharedContext cctx) { + Collection<Integer> cacheIds = activeCacheIds; + + if (!cacheIds.isEmpty()) { + Collection<CacheStoreManager> stores = new ArrayList<>(cacheIds.size()); + + for (int cacheId : cacheIds) { + CacheStoreManager store = cctx.cacheContext(cacheId).store(); + + if (store.configured()) + stores.add(store); + } + + return stores; + } + + return null; + } + + /** {@inheritDoc} */ + @Override public void onTxEnd(GridCacheSharedContext cctx, IgniteInternalTx tx, boolean commit) { + for (int cacheId : activeCacheIds) { + GridCacheContext cacheCtx = cctx.cacheContext(cacheId); + + onTxEnd(cacheCtx, tx, commit); + } + } + + /** {@inheritDoc} */ + @Override public boolean init(int txSize) { + if (txMap == null) { + txMap = U.newLinkedHashMap(txSize > 0 ? txSize : 16); + + readView = new IgniteTxMap(txMap, CU.reads()); + writeView = new IgniteTxMap(txMap, CU.writes()); + + return true; + } + + return false; + } + + /** {@inheritDoc} */ + @Override public boolean initialized() { + return txMap != null; + } + + /** {@inheritDoc} */ + @Override public Collection<IgniteTxEntry> allEntries() { + return txMap == null ? Collections.<IgniteTxEntry>emptySet() : txMap.values(); + } + + /** {@inheritDoc} */ + @Override public IgniteTxEntry entry(IgniteTxKey key) { + return txMap == null ? null : txMap.get(key); + } + + /** {@inheritDoc} */ + @Override public boolean hasWriteKey(IgniteTxKey key) { + return writeView.containsKey(key); + } + + /** {@inheritDoc} */ + @Override public Set<IgniteTxKey> readSet() { + return txMap == null ? Collections.<IgniteTxKey>emptySet() : readView.keySet(); + } + + /** {@inheritDoc} */ + @Override public Set<IgniteTxKey> writeSet() { + return txMap == null ? Collections.<IgniteTxKey>emptySet() : writeView.keySet(); + } + + /** {@inheritDoc} */ + @Override public Collection<IgniteTxEntry> writeEntries() { + return writeView == null ? Collections.<IgniteTxEntry>emptyList() : writeView.values(); + } + + /** {@inheritDoc} */ + @Override public Collection<IgniteTxEntry> readEntries() { + return readView == null ? Collections.<IgniteTxEntry>emptyList() : readView.values(); + } + + /** {@inheritDoc} */ + @Override public Map<IgniteTxKey, IgniteTxEntry> writeMap() { + return writeView == null ? Collections.<IgniteTxKey, IgniteTxEntry>emptyMap() : writeView; + } + + /** {@inheritDoc} */ + @Override public Map<IgniteTxKey, IgniteTxEntry> readMap() { + return readView == null ? Collections.<IgniteTxKey, IgniteTxEntry>emptyMap() : readView; + } + + /** {@inheritDoc} */ + @Override public boolean empty() { + return txMap.isEmpty(); + } + + /** {@inheritDoc} */ + @Override public void addEntry(IgniteTxEntry entry) { + txMap.put(entry.txKey(), entry); + } + + /** {@inheritDoc} */ + @Override public void seal() { + if (readView != null) + readView.seal(); + + if (writeView != null) + writeView.seal(); + } + + /** {@inheritDoc} */ + @Override public IgniteTxEntry singleWrite() { + return writeView != null && writeView.size() == 1 ? F.firstValue(writeView) : null; + } + + /** {@inheritDoc} */ + public String toString() { + return S.toString(IgniteTxStateImpl.class, this); + } +}