http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index db4a4b8..434b6c7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -68,9 +68,7 @@ import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; import org.apache.ignite.transactions.TransactionState; import org.jetbrains.annotations.Nullable; -import org.jsr166.ConcurrentHashMap8; -import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.transactions.TransactionState.COMMITTED; import static org.apache.ignite.transactions.TransactionState.COMMITTING; import static org.apache.ignite.transactions.TransactionState.PREPARING; @@ -87,7 +85,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { private static final long serialVersionUID = 0L; /** DHT mappings. */ - private Map<UUID, GridDistributedTxMapping> mappings = new ConcurrentHashMap8<>(); + private IgniteTxMappings mappings; /** Future. */ @GridToStringExclude @@ -172,6 +170,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { subjId, taskNameHash); + mappings = implicitSingle ? new IgniteTxMappingsSingleImpl() : new IgniteTxMappingsImpl(); + initResult(); } @@ -208,13 +208,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { } /** {@inheritDoc} */ - @Override protected IgniteUuid nearMiniId() { - assert false : "nearMiniId should not be called for colocated transactions."; - - return null; - } - - /** {@inheritDoc} */ @Override protected IgniteInternalFuture<Boolean> addReader( long msgId, GridDhtCacheEntry cached, @@ -280,15 +273,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { * @return {@code True} if transaction is fully synchronous. */ private boolean sync() { - if (super.syncCommit()) - return true; - - for (int cacheId : activeCacheIds()) { - if (cctx.cacheContext(cacheId).config().getWriteSynchronizationMode() == FULL_SYNC) - return true; - } - - return false; + return super.syncCommit() || txState().sync(cctx); } /** @@ -471,7 +456,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { /** * @return DHT map. */ - Map<UUID, GridDistributedTxMapping> mappings() { + IgniteTxMappings mappings() { return mappings; } @@ -518,9 +503,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { GridDistributedTxMapping m = mappings.get(node.id()); if (m == null) - mappings.put(node.id(), m = new GridDistributedTxMapping(node)); + mappings.put(m = new GridDistributedTxMapping(node)); - IgniteTxEntry txEntry = txMap.get(key); + IgniteTxEntry txEntry = entry(key); assert txEntry != null; @@ -534,26 +519,10 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { } /** - * Adds keys mapping. - * - * @param n Mapped node. - * @param mappedKeys Mapped keys. + * @return Non-null entry if tx has only one write entry. */ - private void addKeyMapping(ClusterNode n, Iterable<IgniteTxKey> mappedKeys) { - GridDistributedTxMapping m = mappings.get(n.id()); - - if (m == null) - mappings.put(n.id(), m = new GridDistributedTxMapping(n)); - - for (IgniteTxKey key : mappedKeys) { - IgniteTxEntry txEntry = txMap.get(key); - - assert txEntry != null; - - txEntry.nodeId(n.id()); - - m.add(txEntry); - } + @Nullable IgniteTxEntry singleWrite() { + return txState.singleWrite(); } /** @@ -567,7 +536,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { GridDistributedTxMapping m = mappings.get(n.id()); if (m == null) { - m = F.addIfAbsent(mappings, n.id(), new GridDistributedTxMapping(n)); + mappings.put(m = new GridDistributedTxMapping(n)); m.near(map.near()); @@ -575,8 +544,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { m.markExplicitLock(); } - assert m != null; - for (IgniteTxEntry entry : map.entries()) m.add(entry); } @@ -588,6 +555,25 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { } /** + * @param map Mapping. + * @param entry Entry. + */ + void addSingleEntryMapping(GridDistributedTxMapping map, IgniteTxEntry entry) { + ClusterNode n = map.node(); + + GridDistributedTxMapping m = new GridDistributedTxMapping(n); + + mappings.put(m); + + m.near(map.near()); + + if (map.explicitLock()) + m.markExplicitLock(); + + m.add(entry); + } + + /** * @param nodeId Node ID to mark with explicit lock. * @return {@code True} if mapping was found. */ @@ -623,8 +609,23 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { Collection<GridCacheVersion> committedVers, Collection<GridCacheVersion> rolledbackVers) { - Collection<IgniteTxEntry> entries = F.concat(false, mapping.writes(), mapping.reads()); + readyNearLocks(mapping.writes(), mapping.dhtVersion(), pendingVers, committedVers, rolledbackVers); + readyNearLocks(mapping.reads(), mapping.dhtVersion(), pendingVers, committedVers, rolledbackVers); + } + /** + * @param entries Entries. + * @param dhtVer DHT version. + * @param pendingVers Pending versions. + * @param committedVers Committed versions. + * @param rolledbackVers Rolled back versions. + */ + void readyNearLocks(Collection<IgniteTxEntry> entries, + GridCacheVersion dhtVer, + Collection<GridCacheVersion> pendingVers, + Collection<GridCacheVersion> committedVers, + Collection<GridCacheVersion> rolledbackVers) + { for (IgniteTxEntry txEntry : entries) { while (true) { GridCacheContext cacheCtx = txEntry.cached().context(); @@ -637,8 +638,13 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { // Handle explicit locks. GridCacheVersion explicit = txEntry.explicitVersion(); - if (explicit == null) - entry.readyNearLock(xidVer, mapping.dhtVersion(), committedVers, rolledbackVers, pendingVers); + if (explicit == null) { + entry.readyNearLock(xidVer, + dhtVer, + committedVers, + rolledbackVers, + pendingVers); + } break; } @@ -871,7 +877,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { * @param writes Write entries. * @param txNodes Transaction nodes mapping. * @param last {@code True} if this is last prepare request. - * @param lastBackups IDs of backup nodes receiving last prepare request. * @return Future that will be completed when locks are acquired. */ @SuppressWarnings("TypeMayBeWeakened") @@ -879,8 +884,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { @Nullable Collection<IgniteTxEntry> reads, @Nullable Collection<IgniteTxEntry> writes, Map<UUID, Collection<UUID>> txNodes, - boolean last, - Collection<UUID> lastBackups + boolean last ) { if (state() != PREPARING) { if (timedOut()) @@ -901,8 +905,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { IgniteUuid.randomUuid(), Collections.<IgniteTxKey, GridCacheVersion>emptyMap(), last, - needReturnValue() && implicit(), - lastBackups); + needReturnValue() && implicit()); try { // At this point all the entries passed in must be enlisted in transaction because this is an @@ -1274,6 +1277,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { /** {@inheritDoc} */ @Override public String toString() { - return S.toString(GridNearTxLocal.class, this, "mappings", mappings.keySet(), "super", super.toString()); + return S.toString(GridNearTxLocal.class, this, "mappings", mappings, "super", super.toString()); } }
http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java index 456d726..798635a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java @@ -99,7 +99,6 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { * @param near {@code True} if mapping is for near caches. * @param txNodes Transaction nodes mapping. * @param last {@code True} if this last prepare request for node. - * @param lastBackups IDs of backup nodes receiving last prepare request during this prepare. * @param onePhaseCommit One phase commit flag. * @param retVal Return value flag. * @param implicitSingle Implicit single flag. @@ -118,7 +117,6 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { boolean near, Map<UUID, Collection<UUID>> txNodes, boolean last, - Collection<UUID> lastBackups, boolean onePhaseCommit, boolean retVal, boolean implicitSingle, @@ -137,7 +135,6 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { this.topVer = topVer; this.near = near; this.last = last; - this.lastBackups = lastBackups; this.retVal = retVal; this.implicitSingle = implicitSingle; this.explicitLock = explicitLock; @@ -153,12 +150,6 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { return firstClientReq; } - /** - * @return IDs of backup nodes receiving last prepare request during this prepare. - */ - public Collection<UUID> lastBackups() { - return lastBackups; - } /** * @return {@code True} if this last prepare request for node. http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java index 87c68b2..d078df4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java @@ -34,10 +34,12 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxRemoteAdapter; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; +import org.apache.ignite.internal.processors.cache.transactions.IgniteTxRemoteStateImpl; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.GridLeanMap; import org.apache.ignite.internal.util.tostring.GridToStringBuilder; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; import org.jetbrains.annotations.Nullable; @@ -74,7 +76,6 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter { * @param ldr Class loader. * @param nodeId Node ID. * @param nearNodeId Near node ID. - * @param rmtThreadId Remote thread ID. * @param xidVer XID version. * @param commitVer Commit version. * @param sys System flag. @@ -92,7 +93,6 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter { ClassLoader ldr, UUID nodeId, UUID nearNodeId, - long rmtThreadId, GridCacheVersion xidVer, GridCacheVersion commitVer, boolean sys, @@ -109,8 +109,7 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter { super( ctx, nodeId, - rmtThreadId, - xidVer, + xidVer, commitVer, sys, plc, @@ -127,10 +126,10 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter { this.nearNodeId = nearNodeId; - readMap = Collections.emptyMap(); + int writeSize = writeEntries != null ? Math.max(txSize, writeEntries.size()) : txSize; - writeMap = new LinkedHashMap<>( - writeEntries != null ? Math.max(txSize, writeEntries.size()) : txSize, 1.0f); + txState = new IgniteTxRemoteStateImpl(Collections.<IgniteTxKey, IgniteTxEntry>emptyMap(), + U.<IgniteTxKey, IgniteTxEntry>newLinkedHashMap(writeSize)); if (writeEntries != null) { for (IgniteTxEntry entry : writeEntries) { @@ -147,7 +146,6 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter { * @param nodeId Node ID. * @param nearNodeId Near node ID. * @param nearXidVer Near transaction ID. - * @param rmtThreadId Remote thread ID. * @param xidVer XID version. * @param commitVer Commit version. * @param sys System flag. @@ -163,7 +161,6 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter { UUID nodeId, UUID nearNodeId, GridCacheVersion nearXidVer, - long rmtThreadId, GridCacheVersion xidVer, GridCacheVersion commitVer, boolean sys, @@ -179,8 +176,7 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter { super( ctx, nodeId, - rmtThreadId, - xidVer, + xidVer, commitVer, sys, plc, @@ -198,8 +194,8 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter { this.nearXidVer = nearXidVer; this.nearNodeId = nearNodeId; - readMap = new LinkedHashMap<>(1, 1.0f); - writeMap = new LinkedHashMap<>(txSize, 1.0f); + txState = new IgniteTxRemoteStateImpl(U.<IgniteTxKey, IgniteTxEntry>newLinkedHashMap(1), + U.<IgniteTxKey, IgniteTxEntry>newLinkedHashMap(txSize)); } /** {@inheritDoc} */ @@ -322,7 +318,7 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter { // Initialize cache entry. entry.cached(cached); - writeMap.put(entry.txKey(), entry); + txState.addWriteEntry(entry.txKey(), entry); addExplicit(entry); @@ -391,7 +387,7 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter { drVer, skipStore); - writeMap.put(key, txEntry); + txState.addWriteEntry(key, txEntry); return true; } http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxMappings.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxMappings.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxMappings.java new file mode 100644 index 0000000..0465510 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxMappings.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.near; + +import java.util.Collection; +import java.util.UUID; +import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public interface IgniteTxMappings { + /** + * Clears this mappings. + */ + public void clear(); + + /** + * @return {@code True} if there are no mappings. + */ + public boolean empty(); + + /** + * @param nodeId Node ID. + * @return Node mapping. + */ + @Nullable public GridDistributedTxMapping get(UUID nodeId); + + /** + * @param mapping Mapping. + */ + public void put(GridDistributedTxMapping mapping); + + /** + * @param nodeId Node ID. + * @return Removed mapping. + */ + @Nullable public GridDistributedTxMapping remove(UUID nodeId); + + /** + * @return Mapping for local node. + */ + @Nullable public GridDistributedTxMapping localMapping(); + + /** + * @return Non null instance if this mappings contain only one mapping. + */ + @Nullable public GridDistributedTxMapping singleMapping(); + + /** + * @return All mappings. + */ + public Collection<GridDistributedTxMapping> mappings(); + + /** + * @return {@code True} if this is single mapping. + */ + public boolean single(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxMappingsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxMappingsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxMappingsImpl.java new file mode 100644 index 0000000..7dec7af --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxMappingsImpl.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.near; + +import java.util.Collection; +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.Nullable; +import org.jsr166.ConcurrentHashMap8; + +/** + * + */ +public class IgniteTxMappingsImpl implements IgniteTxMappings { + /** */ + private final Map<UUID, GridDistributedTxMapping> mappings = new ConcurrentHashMap8<>(); + + /** {@inheritDoc} */ + @Override public void clear() { + mappings.clear(); + } + + /** {@inheritDoc} */ + @Override public boolean empty() { + return mappings.isEmpty(); + } + + /** {@inheritDoc} */ + @Override public GridDistributedTxMapping get(UUID nodeId) { + return mappings.get(nodeId); + } + + /** {@inheritDoc} */ + @Override public void put(GridDistributedTxMapping mapping) { + mappings.put(mapping.node().id(), mapping); + } + + /** {@inheritDoc} */ + @Override public GridDistributedTxMapping remove(UUID nodeId) { + return mappings.remove(nodeId); + } + + /** {@inheritDoc} */ + @Nullable @Override public GridDistributedTxMapping localMapping() { + for (GridDistributedTxMapping m : mappings.values()) { + if (m.node().isLocal()) + return m; + } + + return null; + } + + /** {@inheritDoc} */ + @Override public boolean single() { + return false; + } + + /** {@inheritDoc} */ + @Nullable @Override public GridDistributedTxMapping singleMapping() { + assert mappings.size() == 1 : mappings; + + return F.firstValue(mappings); + } + + /** {@inheritDoc} */ + @Override public Collection<GridDistributedTxMapping> mappings() { + return mappings.values(); + } + + /** {@inheritDoc} */ + public String toString() { + return S.toString(IgniteTxMappingsImpl.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxMappingsSingleImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxMappingsSingleImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxMappingsSingleImpl.java new file mode 100644 index 0000000..fc15592 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteTxMappingsSingleImpl.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.near; + +import java.util.Collection; +import java.util.UUID; +import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public class IgniteTxMappingsSingleImpl implements IgniteTxMappings { + /** */ + private volatile GridDistributedTxMapping mapping; + + /** {@inheritDoc} */ + @Override public void clear() { + mapping = null; + } + + /** {@inheritDoc} */ + @Override public boolean empty() { + return mapping == null; + } + + /** {@inheritDoc} */ + @Override public GridDistributedTxMapping get(UUID nodeId) { + GridDistributedTxMapping mapping0 = mapping; + + return (mapping0 != null && mapping0.node().id().equals(nodeId)) ? mapping0 : null; + } + + /** {@inheritDoc} */ + @Override public void put(GridDistributedTxMapping mapping) { + assert this.mapping == null; + + this.mapping = mapping; + } + + /** {@inheritDoc} */ + @Override public GridDistributedTxMapping remove(UUID nodeId) { + GridDistributedTxMapping mapping0 = mapping; + + if (mapping0 != null && mapping0.node().id().equals(nodeId)) { + this.mapping = null; + + return mapping0; + } + + return null; + } + + /** {@inheritDoc} */ + @Nullable @Override public GridDistributedTxMapping localMapping() { + GridDistributedTxMapping mapping0 = mapping; + + if (mapping0 != null && mapping0.node().isLocal()) + return mapping0; + + return null; + } + + /** {@inheritDoc} */ + @Override public boolean single() { + return true; + } + + /** {@inheritDoc} */ + @Nullable @Override public GridDistributedTxMapping singleMapping() { + return mapping; + } + + /** {@inheritDoc} */ + @Override public Collection<GridDistributedTxMapping> mappings() { + assert false; + + return null; + } + + /** {@inheritDoc} */ + public String toString() { + return S.toString(IgniteTxMappingsSingleImpl.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java index 94af6bb..f5f99f5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java @@ -275,9 +275,9 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject { public boolean implicitSingle(); /** - * @return Collection of cache IDs involved in this transaction. + * @return Transaction state. */ - public Collection<Integer> activeCacheIds(); + public IgniteTxState txState(); /** * @return {@code true} or {@code false} if the deployment is enabled or disabled for all active caches involved @@ -669,14 +669,6 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject { public boolean serializable(); /** - * Checks whether given key has been removed within transaction. - * - * @param key Key to check. - * @return {@code True} if key has been removed. - */ - public boolean removed(IgniteTxKey key); - - /** * Gets allowed remaining time for this transaction. * * @return Remaining time. http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index eb2ca2c..6a0f8ab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; import java.io.ObjectStreamException; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -50,7 +49,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry; -import org.apache.ignite.internal.processors.cache.store.CacheStoreManager; import org.apache.ignite.internal.processors.cache.version.GridCachePlainVersionedEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext; @@ -120,10 +118,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter @GridToStringInclude protected boolean implicit; - /** Implicit with one key flag. */ - @GridToStringInclude - protected boolean implicitSingle; - /** Local flag. */ @GridToStringInclude protected boolean loc; @@ -262,7 +256,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter * @param cctx Cache registry. * @param xidVer Transaction ID. * @param implicit Implicit flag. - * @param implicitSingle Implicit with one key flag. * @param loc Local flag. * @param sys System transaction flag. * @param plc IO policy. @@ -275,7 +268,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter GridCacheSharedContext<?, ?> cctx, GridCacheVersion xidVer, boolean implicit, - boolean implicitSingle, boolean loc, boolean sys, byte plc, @@ -295,7 +287,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter this.cctx = cctx; this.xidVer = xidVer; this.implicit = implicit; - this.implicitSingle = implicitSingle; this.loc = loc; this.sys = sys; this.plc = plc; @@ -362,7 +353,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter this.taskNameHash = taskNameHash; implicit = false; - implicitSingle = false; loc = false; if (log == null) @@ -421,45 +411,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter /** {@inheritDoc} */ @Override public boolean storeUsed() { - if (!storeEnabled()) - return false; - - Collection<Integer> cacheIds = activeCacheIds(); - - if (!cacheIds.isEmpty()) { - for (int cacheId : cacheIds) { - CacheStoreManager store = cctx.cacheContext(cacheId).store(); - - if (store.configured()) - return true; - } - } - - return false; - } - - /** - * Store manager for current transaction. - * - * @return Store manager. - */ - protected Collection<CacheStoreManager> stores() { - Collection<Integer> cacheIds = activeCacheIds(); - - if (!cacheIds.isEmpty()) { - Collection<CacheStoreManager> stores = new ArrayList<>(cacheIds.size()); - - for (int cacheId : cacheIds) { - CacheStoreManager store = cctx.cacheContext(cacheId).store(); - - if (store.configured()) - stores.add(store); - } - - return stores; - } - - return null; + return storeEnabled() && txState().storeUsed(cctx); } /** @@ -645,7 +597,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter /** {@inheritDoc} */ @Override public boolean implicitSingle() { - return implicitSingle; + return txState().implicitSingle(); } /** {@inheritDoc} */ @@ -1758,11 +1710,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter } /** {@inheritDoc} */ - @Override public Collection<Integer> activeCacheIds() { - throw new IllegalStateException("Deserialized transaction can only be used as read-only."); - } - - /** {@inheritDoc} */ @Override public boolean activeCachesDeploymentEnabled() { return false; } @@ -1877,6 +1824,11 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter } /** {@inheritDoc} */ + @Override public IgniteTxState txState() { + return null; + } + + /** {@inheritDoc} */ @Override public Collection<UUID> masterNodeIds() { return null; } @@ -2150,11 +2102,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter } /** {@inheritDoc} */ - @Override public boolean removed(IgniteTxKey key) { - return false; - } - - /** {@inheritDoc} */ @Override public long remainingTime() throws IgniteTxTimeoutCheckedException { return 0; } http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index 570aa48..0e5657b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -216,8 +216,7 @@ public class IgniteTxHandler { req.reads(), req.writes(), req.transactionNodes(), - req.last(), - req.lastBackups()); + req.last()); if (locTx.isRollbackOnly()) locTx.rollbackAsync(); @@ -398,7 +397,6 @@ public class IgniteTxHandler { if (req.onePhaseCommit()) { assert req.last(); - assert F.isEmpty(req.lastBackups()) || req.lastBackups().size() <= 1; tx.onePhaseCommit(true); } @@ -413,8 +411,7 @@ public class IgniteTxHandler { req.messageId(), req.miniId(), req.transactionNodes(), - req.last(), - req.lastBackups()); + req.last()); if (tx.isRollbackOnly()) { try { @@ -1091,12 +1088,13 @@ public class IgniteTxHandler { GridDhtTxRemote tx = ctx.tm().tx(req.version()); if (tx == null) { + boolean single = req.last() && req.writes().size() == 1; + tx = new GridDhtTxRemote( ctx, req.nearNodeId(), req.futureId(), nodeId, - req.threadId(), req.topologyVersion(), req.version(), null, @@ -1110,7 +1108,8 @@ public class IgniteTxHandler { req.nearXidVersion(), req.transactionNodes(), req.subjectId(), - req.taskNameHash()); + req.taskNameHash(), + single); tx.writeVersion(req.writeVersion()); @@ -1138,7 +1137,7 @@ public class IgniteTxHandler { tx.transactionNodes(req.transactionNodes()); } - if (!tx.isSystemInvalidate() && !F.isEmpty(req.writes())) { + if (!tx.isSystemInvalidate()) { int idx = 0; for (IgniteTxEntry entry : req.writes()) { @@ -1236,7 +1235,6 @@ public class IgniteTxHandler { ldr, nodeId, req.nearNodeId(), - req.threadId(), req.version(), null, req.system(), http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java new file mode 100644 index 0000000..5f48469 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.transactions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; +import org.apache.ignite.internal.processors.cache.store.CacheStoreManager; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +public class IgniteTxImplicitSingleStateImpl extends IgniteTxLocalStateAdapter { + /** */ + private GridCacheContext cacheCtx; + + /** */ + private IgniteTxEntry entry; + + /** */ + private boolean init; + + /** {@inheritDoc} */ + @Override public void addActiveCache(GridCacheContext ctx, IgniteTxLocalAdapter tx) + throws IgniteCheckedException { + assert cacheCtx == null : "Cache already set [cur=" + cacheCtx.name() + ", new=" + ctx.name() + ']'; + + this.cacheCtx = ctx; + + tx.activeCachesDeploymentEnabled(cacheCtx.deploymentEnabled()); + } + + /** {@inheritDoc} */ + @Nullable @Override public GridCacheContext singleCacheContext(GridCacheSharedContext cctx) { + return cacheCtx; + } + + /** {@inheritDoc} */ + @Nullable @Override public Integer firstCacheId() { + return cacheCtx != null ? cacheCtx.cacheId() : null; + } + + /** {@inheritDoc} */ + @Override public void awaitLastFut(GridCacheSharedContext ctx) { + if (cacheCtx == null) + return; + + cacheCtx.cache().awaitLastFut(); + } + + /** {@inheritDoc} */ + @Override public boolean implicitSingle() { + return true; + } + + /** {@inheritDoc} */ + @Override public IgniteCheckedException validateTopology(GridCacheSharedContext cctx, GridDhtTopologyFuture topFut) { + if (cacheCtx == null) + return null; + + Throwable err = topFut.validateCache(cacheCtx); + + if (err != null) { + return new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " + + U.maskName(cacheCtx.name())); + } + + if (CU.affinityNodes(cacheCtx, topFut.topologyVersion()).isEmpty()) { + return new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all " + + "partition nodes left the grid): " + cacheCtx.name()); + } + + return null; + } + + /** {@inheritDoc} */ + @Override public boolean sync(GridCacheSharedContext cctx) { + return cacheCtx != null && cacheCtx.config().getWriteSynchronizationMode() == FULL_SYNC; + } + + /** {@inheritDoc} */ + @Override public boolean hasNearCache(GridCacheSharedContext cctx) { + return cacheCtx != null && cacheCtx.isNear(); + } + + /** {@inheritDoc} */ + @Override public GridDhtTopologyFuture topologyReadLock(GridCacheSharedContext cctx, GridFutureAdapter<?> fut) { + if (cacheCtx == null || cacheCtx.isLocal()) + return cctx.exchange().lastTopologyFuture(); + + cacheCtx.topology().readLock(); + + if (cacheCtx.topology().stopping()) { + fut.onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " + + cacheCtx.name())); + + return null; + } + + return cacheCtx.topology().topologyVersionFuture(); + } + + /** {@inheritDoc} */ + @Override public void topologyReadUnlock(GridCacheSharedContext cctx) { + if (cacheCtx == null || cacheCtx.isLocal()) + return; + + cacheCtx.topology().readUnlock(); + } + + /** {@inheritDoc} */ + @Override public boolean storeUsed(GridCacheSharedContext cctx) { + if (cacheCtx == null) + return false; + + CacheStoreManager store = cacheCtx.store(); + + return store.configured(); + } + + /** {@inheritDoc} */ + @Override public Collection<CacheStoreManager> stores(GridCacheSharedContext cctx) { + if (cacheCtx == null) + return null; + + CacheStoreManager store = cacheCtx.store(); + + if (store.configured()) + return Collections.singleton(store); + + return null; + } + + /** {@inheritDoc} */ + @Override public void onTxEnd(GridCacheSharedContext cctx, IgniteInternalTx tx, boolean commit) { + if (cacheCtx != null) + onTxEnd(cacheCtx, tx, commit); + } + + /** {@inheritDoc} */ + @Override public IgniteTxEntry entry(IgniteTxKey key) { + if (entry != null && entry.txKey().equals(key)) + return entry; + + return null; + } + + /** {@inheritDoc} */ + @Override public boolean hasWriteKey(IgniteTxKey key) { + return entry != null && entry.txKey().equals(key); + } + + /** {@inheritDoc} */ + @Override public Set<IgniteTxKey> readSet() { + return Collections.emptySet(); + } + + /** {@inheritDoc} */ + @Override public Set<IgniteTxKey> writeSet() { + return entry != null ? Collections.singleton(entry.txKey()) : Collections.<IgniteTxKey>emptySet(); + } + + /** {@inheritDoc} */ + @Override public Collection<IgniteTxEntry> writeEntries() { + return entry != null ? Collections.singletonList(entry) : Collections.<IgniteTxEntry>emptyList(); + } + + /** {@inheritDoc} */ + @Override public Collection<IgniteTxEntry> readEntries() { + return Collections.emptyList(); + } + + /** {@inheritDoc} */ + @Override public Map<IgniteTxKey, IgniteTxEntry> writeMap() { + return entry != null ? Collections.singletonMap(entry.txKey(), entry) : + Collections.<IgniteTxKey, IgniteTxEntry>emptyMap(); + } + + /** {@inheritDoc} */ + @Override public Map<IgniteTxKey, IgniteTxEntry> readMap() { + return Collections.emptyMap(); + } + + /** {@inheritDoc} */ + @Override public boolean empty() { + return entry == null; + } + + /** {@inheritDoc} */ + @Override public Collection<IgniteTxEntry> allEntries() { + return entry != null ? Collections.singletonList(entry) : Collections.<IgniteTxEntry>emptyList(); + } + + /** {@inheritDoc} */ + @Override public boolean init(int txSize) { + if (!init) { + init = true; + + return true; + } + + return false; + } + + /** {@inheritDoc} */ + @Override public boolean initialized() { + return init; + } + + /** {@inheritDoc} */ + @Override public void addEntry(IgniteTxEntry entry) { + assert this.entry == null : "Entry already set [cur=" + this.entry + ", new=" + entry + ']'; + + this.entry = entry; + } + + /** {@inheritDoc} */ + @Override public void seal() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public IgniteTxEntry singleWrite() { + return entry; + } + + /** {@inheritDoc} */ + public String toString() { + return S.toString(IgniteTxImplicitSingleStateImpl.class, this); + } +}