This is an automated email from the ASF dual-hosted git repository.
vldpyatkov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 37e92c50d5d IGNITE-28610 Implement savepoint for Key-Value API for
pessimistic transactions (#13080)
37e92c50d5d is described below
commit 37e92c50d5dbd16f6d89574e8c667b5b246f3949
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Fri May 1 01:19:30 2026 +0300
IGNITE-28610 Implement savepoint for Key-Value API for pessimistic
transactions (#13080)
---
.../ignite/snippets/PerformingTransactions.java | 33 +
docs/_docs/key-value-api/transactions.adoc | 28 +
.../cache/distributed/GridNearUnlockRequest.java | 18 +
.../dht/GridDhtTransactionalCacheAdapter.java | 82 ++-
.../distributed/dht/GridDhtTxLocalAdapter.java | 31 +
.../near/GridNearTransactionalCache.java | 33 +-
.../cache/distributed/near/GridNearTxLocal.java | 348 ++++++++++
.../cache/store/GridCacheStoreManagerAdapter.java | 20 +
.../cache/transactions/IgniteTxEntry.java | 98 +++
.../transactions/TransactionEventProxyImpl.java | 20 +
.../cache/transactions/TransactionProxyImpl.java | 50 ++
.../TransactionProxyRollbackOnlyImpl.java | 20 +
.../apache/ignite/transactions/Transaction.java | 56 ++
.../transactions/TxSavepointParameterizedTest.java | 724 +++++++++++++++++++++
.../transactions/TxSavepointPessimisticTest.java | 198 ++++++
.../cache/GridAbstractCacheStoreSelfTest.java | 20 +
.../ignite/testsuites/IgniteCacheTestSuite12.java | 5 +
.../ignite/testsuites/IgniteCacheTestSuite6.java | 2 +
18 files changed, 1781 insertions(+), 5 deletions(-)
diff --git
a/docs/_docs/code-snippets/java/src/main/java/org/apache/ignite/snippets/PerformingTransactions.java
b/docs/_docs/code-snippets/java/src/main/java/org/apache/ignite/snippets/PerformingTransactions.java
index ce38d289013..8a22e81430e 100644
---
a/docs/_docs/code-snippets/java/src/main/java/org/apache/ignite/snippets/PerformingTransactions.java
+++
b/docs/_docs/code-snippets/java/src/main/java/org/apache/ignite/snippets/PerformingTransactions.java
@@ -42,6 +42,7 @@ public class PerformingTransactions {
public static void runAll() {
enablingTransactions();
executingTransactionsExample();
+ transactionSavepointsExample();
optimisticTransactionExample();
deadlockDetectionExample();
@@ -98,6 +99,38 @@ public class PerformingTransactions {
}
}
+ public static void transactionSavepointsExample() {
+ try (Ignite ignite = Ignition.start()) {
+ // tag::savepoints[]
+ CacheConfiguration<String, Integer> cfg = new
CacheConfiguration<>();
+ cfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+ cfg.setName("myCache");
+
+ IgniteCache<String, Integer> cache = ignite.getOrCreateCache(cfg);
+
+ try (Transaction tx =
ignite.transactions().txStart(TransactionConcurrency.PESSIMISTIC,
+ TransactionIsolation.REPEATABLE_READ)) {
+ cache.put("order:1", 10);
+
+ tx.savepoint("before-shipping");
+
+ cache.put("shipping:1", 5);
+ cache.put("order:1", 15);
+
+ tx.rollbackToSavepoint("before-shipping");
+
+ cache.put("order:1", 11);
+
+ tx.releaseSavepoint("before-shipping");
+
+ tx.commit();
+ }
+ // end::savepoints[]
+ System.out.println(cache.get("order:1"));
+ System.out.println(cache.get("shipping:1"));
+ }
+ }
+
public static void optimisticTransactionExample() {
try (Ignite ignite = Ignition.start()) {
// tag::optimistic[]
diff --git a/docs/_docs/key-value-api/transactions.adoc
b/docs/_docs/key-value-api/transactions.adoc
index 9aef1368a97..18368bfa640 100644
--- a/docs/_docs/key-value-api/transactions.adoc
+++ b/docs/_docs/key-value-api/transactions.adoc
@@ -90,6 +90,34 @@
include::code-snippets/cpp/src/transactions.cpp[tag=transactions-execution,inden
It is critical that an Ignite Transaction should be `closed` regardless of its
commit state or ocurred exceptions. Using of `try-with-resources` is
recommended approach, but you can also explicitly call `rollback` when catching
exceptions. Calling of `close`, `commit` or `rollback` ensures that all
resources are released and the transaction is no longer bound to the current
thread.
====
+== Transaction Savepoints
+
+Savepoints allow you to mark an intermediate state inside an explicit
transaction and later roll back only the changes made after that point.
+They are useful when a transaction contains several logical steps and one of
the later steps can be discarded without rolling back the whole transaction.
+
+Ignite supports savepoints only for explicit `PESSIMISTIC` transactions.
+Savepoints are local to the transaction that created them and are removed when
the transaction is committed or rolled back.
+
+Use `Transaction.savepoint(name)` to create a savepoint.
+If a savepoint with the same name already exists, Ignite throws a
`TransactionException`.
+Use `Transaction.savepoint(name, true)` to replace an existing savepoint.
+
+Use `Transaction.rollbackToSavepoint(name)` to roll back transaction changes
made after the savepoint was created.
+Savepoints created after the target savepoint are released as part of this
operation.
+
+Use `Transaction.releaseSavepoint(name)` to release a savepoint without
rolling back transaction changes.
+When a savepoint is released, all savepoints created after it are released as
well.
+Releasing a missing savepoint is a no-op.
+
+[tabs]
+--
+tab:Java[]
+[source,java]
+----
+include::{javaFile}[tags=savepoints,!exclude,indent=0]
+----
+--
+
////
== Two-Phase-Commit
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridNearUnlockRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridNearUnlockRequest.java
index 2c55bfd6425..7f46f967748 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridNearUnlockRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridNearUnlockRequest.java
@@ -36,6 +36,10 @@ public class GridNearUnlockRequest extends
GridDistributedBaseMessage {
@Order(0)
public List<KeyCacheObject> keys;
+ /** Savepoint rollback flag. */
+ @Order(1)
+ public boolean forSavepoint;
+
/**
* Empty constructor.
*/
@@ -60,6 +64,20 @@ public class GridNearUnlockRequest extends
GridDistributedBaseMessage {
return keys;
}
+ /**
+ * @return {@code True} if unlock request is sent during rollback to
savepoint.
+ */
+ public boolean forSavepoint() {
+ return forSavepoint;
+ }
+
+ /**
+ * @param forSavepoint Savepoint rollback flag.
+ */
+ public void forSavepoint(boolean forSavepoint) {
+ this.forSavepoint = forSavepoint;
+ }
+
/**
* @param key Key.
*/
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index c500bc9fcf7..8acb9ccb562 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -59,6 +59,7 @@ import
org.apache.ignite.internal.processors.cache.distributed.near.GridNearLock
import
org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
import
org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
import
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTransactionalCache;
+import
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import
org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
@@ -580,6 +581,9 @@ public abstract class GridDhtTransactionalCacheAdapter<K,
V> extends GridDhtCach
private void processDhtUnlockRequest(UUID nodeId, GridDhtUnlockRequest
req) {
clearLocks(nodeId, req);
+ if (req.forSavepoint())
+ clearTxEntries(req.version(), req.keys());
+
if (isNearEnabled(cacheCfg))
near().clearLocks(nodeId, req);
}
@@ -1488,7 +1492,13 @@ public abstract class
GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
assert ctx.affinityNode();
assert nodeId != null;
- removeLocks(nodeId, req.version(), req.keys(), true);
+ removeLocks(
+ nodeId,
+ req.version(),
+ req.keys(),
+ true,
+ req.forSavepoint()
+ );
}
/**
@@ -1565,6 +1575,23 @@ public abstract class
GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
* @param unmap Flag for un-mapping version.
*/
public void removeLocks(UUID nodeId, GridCacheVersion ver,
Iterable<KeyCacheObject> keys, boolean unmap) {
+ removeLocks(nodeId, ver, keys, unmap, false);
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param ver Version.
+ * @param keys Keys.
+ * @param unmap Flag for un-mapping version.
+ * @param forSavepoint Savepoint rollback flag.
+ */
+ public void removeLocks(
+ UUID nodeId,
+ GridCacheVersion ver,
+ Iterable<KeyCacheObject> keys,
+ boolean unmap,
+ boolean forSavepoint
+ ) {
assert nodeId != null;
assert ver != null;
@@ -1574,7 +1601,8 @@ public abstract class GridDhtTransactionalCacheAdapter<K,
V> extends GridDhtCach
// Remove mapped versions.
GridCacheVersion dhtVer = unmap ? ctx.mvcc().unmapVersion(ver) : ver;
- ctx.mvcc().addRemoved(ctx, ver);
+ if (!forSavepoint)
+ ctx.mvcc().addRemoved(ctx, ver);
Map<ClusterNode, List<KeyCacheObject>> dhtMap = new HashMap<>();
Map<ClusterNode, List<KeyCacheObject>> nearMap = new HashMap<>();
@@ -1636,6 +1664,9 @@ public abstract class GridDhtTransactionalCacheAdapter<K,
V> extends GridDhtCach
// as there is no point to reorder relative to the version
// we are about to remove.
if (entry.removeLock(dhtVer)) {
+ if (forSavepoint)
+ clearTxEntry(dhtVer, key);
+
// Map to backups and near readers.
map(nodeId, topVer, entry, readers, dhtMap, nearMap);
@@ -1674,10 +1705,11 @@ public abstract class
GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
GridDhtUnlockRequest req = new GridDhtUnlockRequest(ctx.cacheId(),
keyBytes.size());
req.version(dhtVer);
+ req.forSavepoint(forSavepoint);
try {
- for (KeyCacheObject key : keyBytes)
- req.addKey(key);
+ for (int i = 0; i < keyBytes.size(); i++)
+ req.addKey(keyBytes.get(i));
keyBytes = nearMap.get(n);
@@ -1708,6 +1740,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K,
V> extends GridDhtCach
GridDhtUnlockRequest req = new
GridDhtUnlockRequest(ctx.cacheId(), keyBytes.size());
req.version(dhtVer);
+ req.forSavepoint(forSavepoint);
try {
for (KeyCacheObject key : keyBytes)
@@ -1728,6 +1761,47 @@ public abstract class
GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
}
}
+ /**
+ * @param ver Tx version.
+ * @param keys Keys to clear from remote tx.
+ */
+ private void clearTxEntries(GridCacheVersion ver, List<KeyCacheObject>
keys) {
+ if (F.isEmpty(keys))
+ return;
+
+ for (KeyCacheObject key : keys)
+ clearTxEntry(ver, key);
+ }
+
+ /**
+ * @param ver Tx version.
+ * @param key Key.
+ */
+ private void clearTxEntry(GridCacheVersion ver, KeyCacheObject key) {
+ IgniteInternalTx tx = ctx.tm().tx(ver);
+
+ if (tx instanceof GridDhtTxLocal) {
+ ((GridDhtTxLocal)tx).clearEntry(ctx.txKey(key));
+
+ try {
+ if (tx.empty()) {
+ ((GridDhtTxLocal)tx).rollbackDhtLocal();
+ }
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to remove transaction container during
rollback to savepoint: " + tx, e);
+ }
+ }
+ else if (configuration().getNearConfiguration() != null) {
+ try {
+ invalidateNearEntry(key, ver);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to invalidate near entry during rollback
to savepoint: " + key, e);
+ }
+ }
+ }
+
/**
* @param key Key
* @param ver Version.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index e8dfb2b1f6f..af3655f3b2f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -41,6 +41,7 @@ import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.Grid
import
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
import
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import
org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.F0;
@@ -388,6 +389,23 @@ public abstract class GridDhtTxLocalAdapter extends
IgniteTxLocalAdapter {
return removeMapping(nodeId, entry, nearMap);
}
+ /**
+ * Removes tx entry from local DHT transaction state and all DHT/near
mappings.
+ *
+ * @param key Tx key.
+ */
+ public void clearEntry(IgniteTxKey key) {
+ IgniteTxEntry txEntry = entry(key);
+
+ if (txEntry == null)
+ return;
+
+ removeEntryFromMappings(txEntry, dhtMap);
+ removeEntryFromMappings(txEntry, nearMap);
+
+ txState().removeEntry(key);
+ }
+
/**
* @param nodeId Node ID.
* @param entry Entry to remove.
@@ -418,6 +436,19 @@ public abstract class GridDhtTxLocalAdapter extends
IgniteTxLocalAdapter {
return map.remove(nodeId) != null;
}
+ /**
+ * @param txEntry Entry.
+ * @param map Mappings.
+ */
+ private void removeEntryFromMappings(IgniteTxEntry txEntry, Map<UUID,
GridDistributedTxMapping> map) {
+ for (Map.Entry<UUID, GridDistributedTxMapping> e : map.entrySet()) {
+ GridDistributedTxMapping mapping = e.getValue();
+
+ if (mapping.removeEntry(txEntry) && mapping.empty())
+ map.remove(e.getKey(), mapping);
+ }
+ }
+
/**
* @param mappings Entry mappings.
* @param dst Transaction mappings.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index 407a46ddee6..e6b26780659 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -476,6 +476,30 @@ public class GridNearTransactionalCache<K, V> extends
GridNearCacheAdapter<K, V>
* @param keys Keys.
*/
public void removeLocks(GridCacheVersion ver, Collection<KeyCacheObject>
keys) {
+ removeLocks(ver, keys, false);
+ }
+
+ /**
+ * Removes locks regardless of whether they are owned or not for given
+ * version and keys. In savepoint mode lock version is not marked as
globally cancelled.
+ *
+ * @param ver Lock version.
+ * @param keys Keys.
+ */
+ public void removeLocksForSavepoint(GridCacheVersion ver,
Collection<KeyCacheObject> keys) {
+ removeLocks(ver, keys, true);
+ }
+
+ /**
+ * @param ver Lock version.
+ * @param keys Keys.
+ * @param forSavepoint Savepoint rollback flag.
+ */
+ private void removeLocks(
+ GridCacheVersion ver,
+ Collection<KeyCacheObject> keys,
+ boolean forSavepoint
+ ) {
if (keys.isEmpty())
return;
@@ -523,13 +547,20 @@ public class GridNearTransactionalCache<K, V> extends
GridNearCacheAdapter<K, V>
map.put(primary, req = new
GridNearUnlockRequest(ctx.cacheId(), keyCnt));
req.version(ver);
+ req.forSavepoint(forSavepoint);
}
}
// Remove candidate from local node first.
if (entry.removeLock(cand.version())) {
if (primary.isLocal()) {
- dht.removeLocks(primary.id(), ver,
F.asList(key), true);
+ dht.removeLocks(
+ primary.id(),
+ ver,
+ F.asList(key),
+ true,
+ forSavepoint
+ );
assert req == null;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 92479aae7be..3d66d89fbde 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -24,6 +24,9 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
@@ -59,6 +62,7 @@ import
org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import
org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
import
org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
+import
org.apache.ignite.internal.processors.cache.distributed.GridNearUnlockRequest;
import
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
import
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishFuture;
import
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocalAdapter;
@@ -103,6 +107,7 @@ import org.apache.ignite.lang.IgniteBiClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.security.SecurityPermission;
import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionException;
import org.apache.ignite.transactions.TransactionIsolation;
import org.apache.ignite.transactions.TransactionState;
import org.apache.ignite.transactions.TransactionTimeoutException;
@@ -219,6 +224,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter
implements GridTimeou
/** Tx label. */
@Nullable private final String lb;
+ /** Savepoints created for this transaction. Guarded by {@code this}. */
+ private List<TxSavepoint> savepoints;
+
/**
* @param ctx Cache registry.
* @param implicit Implicit flag.
@@ -3021,6 +3029,310 @@ public class GridNearTxLocal extends
GridDhtTxLocalAdapter implements GridTimeou
}
}
+ /**
+ * Creates savepoint for a pessimistic transaction.
+ *
+ * @param name Savepoint name.
+ * @param overwrite Whether to overwrite an existing savepoint with the
same name.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void savepoint(String name, boolean overwrite) throws
IgniteCheckedException {
+ A.notNull(name, "name");
+
+ synchronized (this) {
+ checkValid();
+
+ if (implicit())
+ throw new IgniteCheckedException("Savepoints can be used only
inside explicit transactions.");
+
+ if (!pessimistic())
+ throw new IgniteCheckedException("Savepoints are supported
only for PESSIMISTIC transactions.");
+
+ ListIterator<TxSavepoint> spIter = findSavepoint(name);
+
+ if (spIter != null) {
+ if (overwrite)
+ spIter.remove();
+ else {
+ throw new TransactionException("Savepoint \"" + name + "\"
already exists. " +
+ "Use savepoint(name, true) to overwrite it.");
+ }
+ }
+
+ if (savepoints == null)
+ savepoints = new ArrayList<>();
+
+ savepoints.add(new TxSavepoint(name, savepointState()));
+ }
+ }
+
+ /**
+ * Rolls back transaction changes to the specified savepoint.
+ *
+ * @param name Savepoint name.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void rollbackToSavepoint(String name) throws IgniteCheckedException
{
+ A.notNull(name, "name");
+
+ synchronized (this) {
+ checkValid();
+
+ if (implicit())
+ throw new IgniteCheckedException("Savepoints can be used only
inside explicit transactions.");
+
+ if (!pessimistic())
+ throw new IgniteCheckedException("Savepoints are supported
only for PESSIMISTIC transactions.");
+
+ ListIterator<TxSavepoint> spIter = findSavepoint(name);
+
+ if (spIter == null)
+ throw new TransactionException("Savepoint does not exist
[name=" + name + ']');
+
+ TxSavepoint savepoint = spIter.next();
+
+ rollbackToSavepoint(savepoint);
+
+ while (spIter.hasNext()) {
+ spIter.next();
+ spIter.remove();
+ }
+ }
+ }
+
+ /**
+ * Releases savepoint.
+ *
+ * @param name Savepoint name.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void releaseSavepoint(String name) throws IgniteCheckedException {
+ A.notNull(name, "name");
+
+ synchronized (this) {
+ checkValid();
+
+ if (savepoints == null)
+ return;
+
+ ListIterator<TxSavepoint> spIter = findSavepoint(name);
+
+ if (spIter != null) {
+ spIter.remove();
+
+ while (spIter.hasNext()) {
+ spIter.next();
+ spIter.remove();
+ }
+ }
+ }
+ }
+
+ /**
+ * @return Current transaction state snapshot.
+ */
+ private Map<IgniteTxKey, TxSavepointEntryState> savepointState() {
+ Map<IgniteTxKey, TxSavepointEntryState> res = new LinkedHashMap<>();
+
+ for (IgniteTxEntry entry : allEntries())
+ res.put(entry.txKey(), new TxSavepointEntryState(entry,
entry.copy()));
+
+ return res;
+ }
+
+ /**
+ * @param name Savepoint name.
+ * @return Iterator positioned on the found savepoint, or {@code null} if
no savepoint exists.
+ */
+ @Nullable private ListIterator<TxSavepoint> findSavepoint(String name) {
+ if (savepoints == null)
+ return null;
+
+ ListIterator<TxSavepoint> iter =
savepoints.listIterator(savepoints.size());
+
+ while (iter.hasPrevious()) {
+ TxSavepoint sp = iter.previous();
+
+ if (sp.name.equals(name))
+ return iter;
+ }
+
+ return null;
+ }
+
+ /**
+ * @param savepoint Savepoint.
+ */
+ private void rollbackToSavepoint(TxSavepoint savepoint) {
+ assert savepoint != null;
+
+ Map<IgniteTxKey, TxSavepointEntryState> savepointState =
savepoint.entries;
+
+ Collection<IgniteTxEntry> curEntries = new ArrayList<>(allEntries());
+ Collection<IgniteTxEntry> entriesToUnlock = new ArrayList<>();
+
+ for (IgniteTxEntry curEntry : curEntries) {
+ if (!savepointState.containsKey(curEntry.txKey())) {
+ txState().removeEntry(curEntry.txKey());
+ removeEntryMappings(curEntry);
+ entriesToUnlock.add(curEntry);
+ }
+ }
+
+ for (Map.Entry<IgniteTxKey, TxSavepointEntryState> e :
savepointState.entrySet()) {
+ IgniteTxEntry curEntry = entry(e.getKey());
+ TxSavepointEntryState state = e.getValue();
+
+ if (curEntry == null) {
+ curEntry = state.entry;
+ txState().addEntry(curEntry);
+ }
+
+ boolean unlockAfterRestore = curEntry.locked() &&
!state.snapshot.locked();
+
+ curEntry.restoreFrom(state.snapshot);
+
+ if (unlockAfterRestore)
+ entriesToUnlock.add(curEntry);
+ }
+
+ unlockTxEntries(entriesToUnlock);
+ }
+
+ /**
+ * Removes entry from all node mappings used by the transaction.
+ *
+ * @param entry Entry.
+ */
+ private void removeEntryMappings(IgniteTxEntry entry) {
+ removeEntryFromMappings(entry);
+ removeEntryFromMappings(entry, dhtMap);
+ removeEntryFromMappings(entry, nearMap);
+ }
+
+ /**
+ * @param entry Entry.
+ */
+ private void removeEntryFromMappings(IgniteTxEntry entry) {
+ if (mappings.single()) {
+ GridDistributedTxMapping mapping = mappings.singleMapping();
+
+ if (mapping != null && mapping.removeEntry(entry) &&
mapping.empty())
+ mappings.remove(mapping.primary().id());
+
+ return;
+ }
+
+ Collection<GridDistributedTxMapping> mappings0 = mappings.mappings();
+
+ if (F.isEmpty(mappings0))
+ return;
+
+ for (GridDistributedTxMapping mapping : new ArrayList<>(mappings0)) {
+ if (mapping.removeEntry(entry) && mapping.empty())
+ mappings.remove(mapping.primary().id());
+ }
+ }
+
+ /**
+ * @param entry Entry.
+ * @param map Mappings.
+ */
+ private void removeEntryFromMappings(IgniteTxEntry entry, Map<UUID,
GridDistributedTxMapping> map) {
+ for (Map.Entry<UUID, GridDistributedTxMapping> mapEntry :
map.entrySet()) {
+ GridDistributedTxMapping mapping = mapEntry.getValue();
+
+ if (mapping.removeEntry(entry) && mapping.empty())
+ map.remove(mapEntry.getKey(), mapping);
+ }
+ }
+
+ /**
+ * Unlocks entries if lock was acquired after the savepoint.
+ *
+ * @param entries Entries to unlock.
+ */
+ private void unlockTxEntries(Collection<IgniteTxEntry> entries) {
+ if (F.isEmpty(entries))
+ return;
+
+ Map<GridCacheContext<?, ?>, Collection<KeyCacheObject>> nearKeys = new
HashMap<>();
+ Map<GridCacheContext<?, ?>, Collection<KeyCacheObject>>
colocatedLocKeys = new HashMap<>();
+ Map<GridCacheContext<?, ?>, Map<UUID, Collection<KeyCacheObject>>>
colocatedRmtKeys = new HashMap<>();
+
+ for (IgniteTxEntry entry : entries) {
+ GridCacheContext<?, ?> cacheCtx = entry.context();
+
+ if (cacheCtx == null || entry.key() == null)
+ continue;
+
+ if (cacheCtx.cache().isNear())
+ nearKeys.computeIfAbsent(cacheCtx, k -> new
ArrayList<>()).add(entry.key());
+ else if (cacheCtx.cache().isColocated()) {
+ UUID nodeId = entry.nodeId();
+
+ if (nodeId == null || cctx.localNodeId().equals(nodeId))
+ colocatedLocKeys.computeIfAbsent(cacheCtx, k -> new
ArrayList<>()).add(entry.key());
+ else {
+ colocatedRmtKeys
+ .computeIfAbsent(cacheCtx, k -> new HashMap<>())
+ .computeIfAbsent(nodeId, k -> new ArrayList<>())
+ .add(entry.key());
+ }
+ }
+ }
+
+ for (Map.Entry<GridCacheContext<?, ?>, Collection<KeyCacheObject>> e :
nearKeys.entrySet()) {
+ e.getKey().nearTx().removeLocksForSavepoint(
+ xidVersion(),
+ e.getValue()
+ );
+ }
+
+ for (Map.Entry<GridCacheContext<?, ?>, Collection<KeyCacheObject>> e :
colocatedLocKeys.entrySet()) {
+ e.getKey().dhtTx().removeLocks(
+ cctx.localNodeId(),
+ xidVersion(),
+ e.getValue(),
+ false,
+ true
+ );
+ }
+
+ for (Map.Entry<GridCacheContext<?, ?>, Map<UUID,
Collection<KeyCacheObject>>> byCache : colocatedRmtKeys.entrySet()) {
+ GridCacheContext<?, ?> cacheCtx = byCache.getKey();
+
+ for (Map.Entry<UUID, Collection<KeyCacheObject>> byNode :
byCache.getValue().entrySet()) {
+ UUID nodeId = byNode.getKey();
+ Collection<KeyCacheObject> keys = byNode.getValue();
+
+ if (F.isEmpty(keys))
+ continue;
+
+ ClusterNode node = cctx.discovery().node(nodeId);
+
+ if (node == null)
+ continue;
+
+ GridNearUnlockRequest req = new
GridNearUnlockRequest(cacheCtx.cacheId(), keys.size());
+
+ req.version(xidVersion());
+ req.forSavepoint(true);
+
+ for (KeyCacheObject key : keys)
+ req.addKey(key);
+
+ try {
+ // Best-effort asynchronous unlock.
+ cacheCtx.io().send(node, req, cacheCtx.ioPolicy());
+ }
+ catch (IgniteCheckedException ex) {
+ U.error(log, "Failed to send savepoint unlock request
[node=" + nodeId + ", keys=" + keys + ']', ex);
+ }
+ }
+ }
+ }
+
/**
* @param maps Mappings.
*/
@@ -4452,6 +4764,42 @@ public class GridNearTxLocal extends
GridDhtTxLocalAdapter implements GridTimeou
sysTime.addAndGet(System.nanoTime() - sysStartTime0);
}
+ /** Savepoint descriptor. */
+ private static class TxSavepoint {
+ /** Name. */
+ private final String name;
+
+ /** Entry states by key. */
+ private final Map<IgniteTxKey, TxSavepointEntryState> entries;
+
+ /**
+ * @param name Name.
+ * @param entries Entry states by key.
+ */
+ private TxSavepoint(String name, Map<IgniteTxKey,
TxSavepointEntryState> entries) {
+ this.name = name;
+ this.entries = entries;
+ }
+ }
+
+ /** Entry state captured in a savepoint. */
+ private static class TxSavepointEntryState {
+ /** Entry reference from transaction state at savepoint creation time.
*/
+ private final IgniteTxEntry entry;
+
+ /** Snapshot of entry state. */
+ private final IgniteTxEntry snapshot;
+
+ /**
+ * @param entry Entry reference.
+ * @param snapshot Snapshot.
+ */
+ private TxSavepointEntryState(IgniteTxEntry entry, IgniteTxEntry
snapshot) {
+ this.entry = entry;
+ this.snapshot = snapshot;
+ }
+ }
+
/**
* Post-lock closure.
*
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
index 6221f9682dc..b175df09a09 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
@@ -1544,6 +1544,26 @@ public abstract class GridCacheStoreManagerAdapter
extends GridCacheManagerAdapt
throw new UnsupportedOperationException();
}
+ /** {@inheritDoc} */
+ @Override public void savepoint(String name) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void savepoint(String name, boolean overwrite) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void rollbackToSavepoint(String name) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void releaseSavepoint(String name) {
+ throw new UnsupportedOperationException();
+ }
+
/** {@inheritDoc} */
@Override public IgniteAsyncSupport withAsync() {
throw new UnsupportedOperationException();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index ef1bbce28a9..c4f024b2d25 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -378,6 +378,104 @@ public class IgniteTxEntry implements
GridPeerDeployAware, Message {
return cp;
}
+ /**
+ * @return Deep-enough copy of this entry to restore its state later.
+ */
+ public IgniteTxEntry copy() {
+ IgniteTxEntry cp = new IgniteTxEntry();
+
+ cp.tx = tx;
+ cp.key = key;
+ cp.cacheId = cacheId;
+ cp.txKey = txKey;
+ cp.ctx = ctx;
+ cp.val = copyHolder(val);
+ cp.prevVal.value(prevVal.operation(), prevVal.value(),
prevVal.hasWriteValue(), prevVal.hasReadValue());
+ cp.oldVal = copyHolder(oldVal);
+ cp.entryProcessorsCol = entryProcessorsCol == null ? null : new
LinkedList<>(entryProcessorsCol);
+ cp.entryProcessorCalcVal = entryProcessorCalcVal;
+ cp.transformClosBytes = transformClosBytes;
+ cp.ttl = ttl;
+ cp.conflictExpireTime = conflictExpireTime;
+ cp.conflictVer = conflictVer;
+ cp.explicitVer = explicitVer;
+ cp.dhtVer = dhtVer;
+ cp.filters = filters;
+ cp.filtersPassed = filtersPassed;
+ cp.filtersSet = filtersSet;
+ cp.entry = entry;
+ cp.prepared = prepared;
+ cp.locked = locked;
+ cp.nodeId = nodeId;
+ cp.locMapped = locMapped;
+ cp.expiryPlc = expiryPlc;
+ cp.transferExpiryPlc = transferExpiryPlc;
+ cp.expiryPlcBytes = expiryPlcBytes;
+ cp.flags = flags;
+ cp.partUpdateCntr = partUpdateCntr;
+ cp.serReadVer = serReadVer;
+
+ return cp;
+ }
+
+ /**
+ * Restores this entry from a previously captured snapshot.
+ *
+ * @param snapshot Snapshot.
+ */
+ public void restoreFrom(IgniteTxEntry snapshot) {
+ tx = snapshot.tx;
+ key = snapshot.key;
+ cacheId = snapshot.cacheId;
+ txKey = snapshot.txKey;
+ ctx = snapshot.ctx;
+ val = copyHolder(snapshot.val);
+ prevVal.value(
+ snapshot.prevVal.operation(),
+ snapshot.prevVal.value(),
+ snapshot.prevVal.hasWriteValue(),
+ snapshot.prevVal.hasReadValue()
+ );
+ oldVal = copyHolder(snapshot.oldVal);
+ entryProcessorsCol = snapshot.entryProcessorsCol == null ? null : new
LinkedList<>(snapshot.entryProcessorsCol);
+ entryProcessorCalcVal = snapshot.entryProcessorCalcVal;
+ transformClosBytes = snapshot.transformClosBytes;
+ ttl = snapshot.ttl;
+ conflictExpireTime = snapshot.conflictExpireTime;
+ conflictVer = snapshot.conflictVer;
+ explicitVer = snapshot.explicitVer;
+ dhtVer = snapshot.dhtVer;
+ filters = snapshot.filters;
+ filtersPassed = snapshot.filtersPassed;
+ filtersSet = snapshot.filtersSet;
+ entry = snapshot.entry;
+ prepared = snapshot.prepared;
+ locked = snapshot.locked;
+ nodeId = snapshot.nodeId;
+ locMapped = snapshot.locMapped;
+ expiryPlc = snapshot.expiryPlc;
+ transferExpiryPlc = snapshot.transferExpiryPlc;
+ expiryPlcBytes = snapshot.expiryPlcBytes;
+ flags = snapshot.flags;
+ partUpdateCntr = snapshot.partUpdateCntr;
+ serReadVer = snapshot.serReadVer;
+ }
+
+ /**
+ * @param holder Holder to copy.
+ * @return Copy.
+ */
+ private static @Nullable TxEntryValueHolder copyHolder(@Nullable
TxEntryValueHolder holder) {
+ if (holder == null)
+ return null;
+
+ TxEntryValueHolder cp = new TxEntryValueHolder();
+
+ cp.value(holder.operation(), holder.value(), holder.hasWriteValue(),
holder.hasReadValue());
+
+ return cp;
+ }
+
/**
* @return Node ID.
*/
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionEventProxyImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionEventProxyImpl.java
index f6bc6688f3d..a2968da2c62 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionEventProxyImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionEventProxyImpl.java
@@ -172,6 +172,26 @@ public class TransactionEventProxyImpl implements
TransactionProxy, Externalizab
throw new UnsupportedOperationException(UNSUPPORTED_OPERATION_MESSAGE);
}
+ /** {@inheritDoc} */
+ @Override public void savepoint(String name) {
+ throw new UnsupportedOperationException(UNSUPPORTED_OPERATION_MESSAGE);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void savepoint(String name, boolean overwrite) {
+ throw new UnsupportedOperationException(UNSUPPORTED_OPERATION_MESSAGE);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void rollbackToSavepoint(String name) {
+ throw new UnsupportedOperationException(UNSUPPORTED_OPERATION_MESSAGE);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void releaseSavepoint(String name) {
+ throw new UnsupportedOperationException(UNSUPPORTED_OPERATION_MESSAGE);
+ }
+
/** {@inheritDoc} */
@Nullable @Override public String label() {
return tx().label();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java
index c9dccdbea66..dfa8cc911e4 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java
@@ -482,6 +482,56 @@ public class TransactionProxyImpl<K, V> implements
TransactionProxy, Externaliza
}
}
+ /** {@inheritDoc} */
+ @Override public void savepoint(String name) {
+ savepoint(name, false);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void savepoint(String name, boolean overwrite) {
+ enter();
+
+ try {
+ tx.savepoint(name, overwrite);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ finally {
+ leave();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void rollbackToSavepoint(String name) {
+ enter();
+
+ try {
+ tx.rollbackToSavepoint(name);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ finally {
+ leave();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void releaseSavepoint(String name) {
+ enter();
+
+ try {
+ tx.releaseSavepoint(name);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ finally {
+ leave();
+ }
+ }
+
/**
* @param res Result to convert to finished future.
*/
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyRollbackOnlyImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyRollbackOnlyImpl.java
index b2e9c0cb124..e260b84301b 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyRollbackOnlyImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyRollbackOnlyImpl.java
@@ -76,4 +76,24 @@ public class TransactionProxyRollbackOnlyImpl<K, V> extends
TransactionProxyImpl
@Override public long timeout(long timeout) {
throw new UnsupportedOperationException();
}
+
+ /** {@inheritDoc} */
+ @Override public void savepoint(String name) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void savepoint(String name, boolean overwrite) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void rollbackToSavepoint(String name) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void releaseSavepoint(String name) {
+ throw new UnsupportedOperationException();
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java
b/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java
index 5b36dff2c71..fbab6c8c2ef 100644
--- a/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java
+++ b/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java
@@ -22,6 +22,7 @@ import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteTransactions;
import org.apache.ignite.lang.IgniteAsyncSupport;
import org.apache.ignite.lang.IgniteAsyncSupported;
+import org.apache.ignite.lang.IgniteExperimental;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
@@ -290,6 +291,61 @@ public interface Transaction extends AutoCloseable,
IgniteAsyncSupport {
*/
public void suspend() throws IgniteException;
+ /**
+ * Creates a savepoint in the current transaction.
+ * <p>
+ * Savepoints are supported only for explicit transactions with
+ * {@link TransactionConcurrency#PESSIMISTIC} concurrency. The savepoint
keeps the current transaction state and can
+ * later be used by {@link #rollbackToSavepoint(String)} to discard
changes made after it was created.
+ *
+ * @param name Savepoint name.
+ * @throws TransactionException If savepoint with the same name already
exists.
+ * @throws IgniteException If savepoints are not supported for this
transaction.
+ */
+ @IgniteExperimental
+ public void savepoint(String name);
+
+ /**
+ * Creates a savepoint in the current transaction.
+ * <p>
+ * Savepoints are supported only for explicit transactions with
+ * {@link TransactionConcurrency#PESSIMISTIC} concurrency. If {@code
overwrite} is {@code true} and a savepoint with
+ * the same name exists, the existing savepoint is replaced with a
snapshot of the current transaction state.
+ *
+ * @param name Savepoint name.
+ * @param overwrite Whether to overwrite an existing savepoint with the
same name.
+ * @throws TransactionException If savepoint with the same name already
exists and {@code overwrite} is
+ * {@code false}.
+ * @throws IgniteException If savepoints are not supported for this
transaction.
+ */
+ @IgniteExperimental
+ public void savepoint(String name, boolean overwrite);
+
+ /**
+ * Rolls back transaction changes to the specified savepoint.
+ * <p>
+ * Changes made after the savepoint was created are discarded. Savepoints
created after the specified savepoint are
+ * released. The transaction remains active and can be committed or rolled
back after this method returns.
+ *
+ * @param name Savepoint name.
+ * @throws TransactionException If savepoint with the given name does not
exist.
+ * @throws IgniteException If savepoints are not supported for this
transaction.
+ */
+ @IgniteExperimental
+ public void rollbackToSavepoint(String name);
+
+ /**
+ * Releases a savepoint. If savepoint does not exist this operation does
nothing.
+ * <p>
+ * Releasing a savepoint does not roll back transaction changes. It
removes the specified savepoint and all
+ * savepoints created after it, so none of them can be used by {@link
#rollbackToSavepoint(String)} anymore.
+ *
+ * @param name Savepoint name.
+ * @throws IgniteException If savepoints are not supported for this
transaction.
+ */
+ @IgniteExperimental
+ public void releaseSavepoint(String name);
+
/**
* Returns transaction's label.
* <p>
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxSavepointParameterizedTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxSavepointParameterizedTest.java
new file mode 100644
index 00000000000..4e49523f485
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxSavepointParameterizedTest.java
@@ -0,0 +1,724 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionException;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+import static
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static
org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static
org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ * Tests for transaction savepoint API.
+ */
+@RunWith(Parameterized.class)
+public class TxSavepointParameterizedTest extends GridCommonAbstractTest {
+ /** */
+ private static Ignite ignite0;
+
+ /** */
+ private static Ignite ignite1;
+
+ /** */
+ private static Ignite ignite2;
+
+ /** */
+ private static Ignite ignite3;
+
+ /** */
+ private static Ignite client;
+
+ /** */
+ @Parameter(0)
+ public boolean initKeies;
+
+ /** */
+ @Parameter(1)
+ public boolean useNearCache;
+
+ /** */
+ @Parameter(2)
+ public int backups;
+
+ /** */
+ @Parameter(3)
+ public boolean spKeyOnTxInitiator;
+
+ /** */
+ @Parameter(4)
+ public boolean replicated;
+
+ /** */
+ @Parameter(5)
+ public TransactionIsolation transactionIsolation;
+
+ /**
+ * Returns data for test.
+ * @return Test parameters.
+ */
+ @Parameters(name = "initKeies={0}, useNearCache={1}, backups={2},
spKeyOnTxInitiator={3}, replicated={4}, txIsolation={5}")
+ public static Collection<Object[]> testData() {
+ return List.of(new Object[][] {
+ // READ_COMMITTED
+ // backups = 0
+ {true, true, 0, true, false, READ_COMMITTED},
+ {true, true, 0, false, false, READ_COMMITTED},
+ {true, false, 0, true, false, READ_COMMITTED},
+ {true, false, 0, false, false, READ_COMMITTED},
+ {false, true, 0, true, false, READ_COMMITTED},
+ {false, true, 0, false, false, READ_COMMITTED},
+ {false, false, 0, true, false, READ_COMMITTED},
+ {false, false, 0, false, false, READ_COMMITTED},
+
+ // backups = 1
+ {true, true, 1, true, false, READ_COMMITTED},
+ {true, true, 1, false, false, READ_COMMITTED},
+ {true, false, 1, true, false, READ_COMMITTED},
+ {true, false, 1, false, false, READ_COMMITTED},
+ {false, true, 1, true, false, READ_COMMITTED},
+ {false, true, 1, false, false, READ_COMMITTED},
+ {false, false, 1, true, false, READ_COMMITTED},
+ {false, false, 1, false, false, READ_COMMITTED},
+
+ // backups = 2
+ {true, true, 2, true, false, READ_COMMITTED},
+ {true, true, 2, false, false, READ_COMMITTED},
+ {true, false, 2, true, false, READ_COMMITTED},
+ {true, false, 2, false, false, READ_COMMITTED},
+ {false, true, 2, true, false, READ_COMMITTED},
+ {false, true, 2, false, false, READ_COMMITTED},
+ {false, false, 2, true, false, READ_COMMITTED},
+ {false, false, 2, false, false, READ_COMMITTED},
+
+ // replicated cache.
+ {true, true, 0, true, true, READ_COMMITTED},
+ {true, false, 0, true, true, READ_COMMITTED},
+ {false, true, 0, true, true, READ_COMMITTED},
+ {false, false, 0, true, true, READ_COMMITTED},
+
+ // REPEATABLE_READ
+ // backups = 0
+ {true, true, 0, true, false, REPEATABLE_READ},
+ {true, true, 0, false, false, REPEATABLE_READ},
+ {true, false, 0, true, false, REPEATABLE_READ},
+ {true, false, 0, false, false, REPEATABLE_READ},
+ {false, true, 0, true, false, REPEATABLE_READ},
+ {false, true, 0, false, false, REPEATABLE_READ},
+ {false, false, 0, true, false, REPEATABLE_READ},
+ {false, false, 0, false, false, REPEATABLE_READ},
+
+ // backups = 1
+ {true, true, 1, true, false, REPEATABLE_READ},
+ {true, true, 1, false, false, REPEATABLE_READ},
+ {true, false, 1, true, false, REPEATABLE_READ},
+ {true, false, 1, false, false, REPEATABLE_READ},
+ {false, true, 1, true, false, REPEATABLE_READ},
+ {false, true, 1, false, false, REPEATABLE_READ},
+ {false, false, 1, true, false, REPEATABLE_READ},
+ {false, false, 1, false, false, REPEATABLE_READ},
+
+ // backups = 2
+ {true, true, 2, true, false, REPEATABLE_READ},
+ {true, true, 2, false, false, REPEATABLE_READ},
+ {true, false, 2, true, false, REPEATABLE_READ},
+ {true, false, 2, false, false, REPEATABLE_READ},
+ {false, true, 2, true, false, REPEATABLE_READ},
+ {false, true, 2, false, false, REPEATABLE_READ},
+ {false, false, 2, true, false, REPEATABLE_READ},
+ {false, false, 2, false, false, REPEATABLE_READ},
+
+ // replicated cache.
+ {true, true, 0, true, true, REPEATABLE_READ},
+ {true, false, 0, true, true, REPEATABLE_READ},
+ {false, true, 0, true, true, REPEATABLE_READ},
+ {false, false, 0, true, true, REPEATABLE_READ},
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ ignite0 = startGrid(0);
+ ignite1 = startGrid(1);
+ ignite2 = startGrid(2);
+ ignite3 = startGrid(3);
+ client = startClientGrid();
+
+ awaitPartitionMapExchange();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+
+ super.afterTestsStopped();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ ignite0.destroyCache(DEFAULT_CACHE_NAME);
+
+ super.afterTest();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testRollbackToSavepointReleasesRemoteDhtLockAcquireAgain()
throws Exception {
+ IgniteCache<Integer, Integer> cache0 = transactionalCache(ignite0);
+
+ int node0Key = primaryKey(cache0);
+ int node1Key = keyForPrimaryAndBackup(ignite0, ignite1);
+
+ if (initKeies) {
+ cache0.put(node0Key, -1);
+ cache0.put(node1Key, -1);
+ }
+
+ CountDownLatch savepointRolledBackLatch = new CountDownLatch(1);
+ CountDownLatch finishFirstTxLatch = new CountDownLatch(1);
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runAsync(() -> {
+ try (Transaction tx = startTransaction(ignite0, 2)) {
+ cache0.put(node0Key, 1);
+
+ tx.savepoint("sp");
+
+ cache0.put(node1Key, 1);
+
+ tx.rollbackToSavepoint("sp");
+
+ savepointRolledBackLatch.countDown();
+
+ assertTrue(finishFirstTxLatch.await(10, TimeUnit.SECONDS));
+
+ cache0.put(node1Key, 2);
+
+ tx.commit();
+ }
+ });
+
+ updateKeyFormPrimary(node1Key);
+
+ assertFalse(fut.isDone());
+
+ // TODO: IGNITE-28612 Entry visibility violation in transactional
replication cache with one backup and near.
+ if (initKeies && useNearCache && backups == 1 && spKeyOnTxInitiator &&
!replicated) {
+ assertTrue(GridTestUtils.waitForCondition(() ->
+ Integer.valueOf(42).equals(cache0.get(node1Key)), 10_000));
+ }
+
+ assertEquals(Integer.valueOf(42), cache0.get(node1Key));
+
+ finishFirstTxLatch.countDown();
+
+ fut.get(10_000);
+
+ assertEquals(Integer.valueOf(2), cache0.get(node1Key));
+ }
+
+ /**
+ * Starts a transaction with the given size.
+ * @param ignite Transaction initiator.
+ * @param txSize Transaction size.
+ * @return Transaction.
+ */
+ private Transaction startTransaction(Ignite ignite, int txSize) {
+ return ignite.transactions().txStart(PESSIMISTIC,
transactionIsolation, 30_000, txSize);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testOverwriteAndReleaseSavepoint() throws Exception {
+ IgniteCache<Integer, Integer> cache0 = transactionalCache(ignite0);
+
+ int key = keyForPrimaryAndBackup(ignite0, ignite1);
+
+ if (initKeies) {
+ cache0.put(key, -1);
+ }
+
+ try (Transaction tx = startTransaction(ignite0, 1)) {
+ cache0.put(key, 1);
+
+ tx.savepoint("sp");
+
+ cache0.put(key, 2);
+
+ tx.savepoint("sp", true);
+
+ cache0.put(key, 3);
+
+ tx.rollbackToSavepoint("sp");
+
+ tx.commit();
+ }
+
+ assertEquals(Integer.valueOf(2), cache0.get(key));
+
+ try (Transaction tx = startTransaction(ignite0, 1)) {
+ tx.savepoint("sp");
+
+ tx.releaseSavepoint("sp");
+
+ GridTestUtils.assertThrowsAnyCause(log,
+ () -> {
+ tx.rollbackToSavepoint("sp");
+
+ return null;
+ },
+ TransactionException.class,
+ "Savepoint does not exist");
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testRollbackToSavepointReleasesLockForPutRemoveEntry() throws
Exception {
+ IgniteCache<Integer, Integer> cache0 = transactionalCache(ignite0);
+
+ int key = keyForPrimaryAndBackup(ignite0, ignite1);
+
+ if (initKeies) {
+ cache0.put(key, -1);
+ }
+
+ CountDownLatch savepointRolledBackLatch = new CountDownLatch(1);
+ CountDownLatch finishFirstTxLatch = new CountDownLatch(1);
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runAsync(() -> {
+ try (Transaction tx = startTransaction(ignite0, 1)) {
+ tx.savepoint("sp");
+
+ cache0.put(key, 1);
+ cache0.remove(key);
+
+ tx.rollbackToSavepoint("sp");
+
+ savepointRolledBackLatch.countDown();
+
+ assertTrue(finishFirstTxLatch.await(10, TimeUnit.SECONDS));
+
+ tx.commit();
+ }
+ });
+
+ assertTrue(savepointRolledBackLatch.await(10, TimeUnit.SECONDS));
+
+ updateKeyFormPrimary(key);
+
+ finishFirstTxLatch.countDown();
+
+ fut.get(10_000);
+
+ assertEquals(Integer.valueOf(42), cache0.get(key));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testRollbackToSavepoint() {
+ IgniteCache<Integer, Integer> cache0 = transactionalCache(ignite0);
+
+ int key1 = keyForPrimaryAndBackup(ignite0, ignite1);
+ int key2 = keyForPrimaryAndBackup(ignite0, ignite2);
+ int key3 = keyForPrimaryAndBackup(ignite0, ignite3);
+
+ if (initKeies) {
+ cache0.put(key1, -1);
+ cache0.put(key2, -1);
+ cache0.put(key3, -1);
+ }
+
+ try (Transaction tx = startTransaction(ignite0, 3)) {
+ cache0.put(key1, 1);
+
+ tx.savepoint("sp");
+
+ cache0.put(key2, 2);
+
+ tx.rollbackToSavepoint("sp");
+
+ cache0.put(key3, 3);
+
+ tx.commit();
+ }
+
+ assertEquals(Integer.valueOf(1), cache0.get(key1));
+
+ if (initKeies)
+ assertEquals(Integer.valueOf(-1), cache0.get(key2));
+ else
+ assertNull(cache0.get(key2));
+
+ assertEquals(Integer.valueOf(3), cache0.get(key3));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testClientInitiator() throws Exception {
+ // A client node can not be a primary node for any key.
+ if (spKeyOnTxInitiator)
+ return;
+
+ IgniteCache<Integer, Integer> cache = transactionalCache(client);
+
+ int key1 = keyForPrimaryAndBackup(client, ignite0);
+ int key2 = keyForPrimaryAndBackup(client, ignite1);
+
+ if (initKeies) {
+ cache.put(key1, -1);
+ cache.put(key2, -1);
+ }
+
+ GridCacheVersion[] nearVerRef = new GridCacheVersion[1];
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runAsync(() -> {
+ try (Transaction tx = startTransaction(client, 2)) {
+ nearVerRef[0] = ((TransactionProxyImpl<?,
?>)tx).tx().nearXidVersion();
+
+ Integer val1 = cache.get(key1);
+ Integer val2 = cache.get(key2);
+
+ if (initKeies) {
+ assertEquals(-1, val1.intValue());
+ assertEquals(-1, val2.intValue());
+ }
+ else {
+ assertNull(val1);
+ assertNull(val2);
+ }
+
+ cache.put(key1, 1);
+ cache.put(key2, 1);
+
+ tx.savepoint("sp");
+
+ cache.put(key1, 2);
+ cache.put(key2, 2);
+
+ tx.rollbackToSavepoint("sp");
+
+ val1 = cache.get(key1);
+ val2 = cache.get(key2);
+
+ assertEquals(1, val1.intValue());
+ assertEquals(1, val2.intValue());
+
+ tx.commit();
+ }
+ });
+
+ fut.get(10_000);
+
+ Integer val1 = cache.get(key1);
+ Integer val2 = cache.get(key2);
+
+ assertEquals(1, val1.intValue());
+ assertEquals(1, val2.intValue());
+
+ assertNoActiveTx(nearVerRef[0]);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testRollbackAllKey() throws Exception {
+ IgniteCache<Integer, Integer> cache0 = transactionalCache(ignite0);
+
+ int keyOnInitiator = primaryKey(cache0);
+ int keyOnOtherNode = keyForPrimaryAndBackup(ignite0, ignite1);
+
+ if (initKeies) {
+ cache0.put(keyOnInitiator, -1);
+ cache0.put(keyOnOtherNode, -1);
+ }
+
+ CountDownLatch rollbackDoneLatch = new CountDownLatch(1);
+ CountDownLatch finishTxLatch = new CountDownLatch(1);
+ GridCacheVersion[] nearVerRef = new GridCacheVersion[1];
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runAsync(() -> {
+ try (Transaction tx = startTransaction(ignite0, 2)) {
+ nearVerRef[0] = ((TransactionProxyImpl<?,
?>)tx).tx().nearXidVersion();
+
+ tx.savepoint("sp");
+
+ Integer val1 = cache0.get(keyOnInitiator);
+ Integer val2 = cache0.get(keyOnOtherNode);
+
+ if (initKeies) {
+ assertEquals(-1, val1.intValue());
+ assertEquals(-1, val2.intValue());
+ }
+ else {
+ assertNull(val1);
+ assertNull(val2);
+ }
+
+ cache0.put(keyOnInitiator, 1);
+ cache0.put(keyOnOtherNode, 1);
+
+ tx.rollbackToSavepoint("sp");
+
+ rollbackDoneLatch.countDown();
+
+ assertTrue(finishTxLatch.await(10, TimeUnit.SECONDS));
+
+ val1 = cache0.get(keyOnInitiator);
+ val2 = cache0.get(keyOnOtherNode);
+
+ assertEquals(42, val1.intValue());
+ assertEquals(42, val2.intValue());
+
+ tx.commit();
+ }
+ });
+
+ assertTrue(rollbackDoneLatch.await(10, TimeUnit.SECONDS));
+
+ assertNoTxStateKeyOnNode(nearVerRef[0], keyOnInitiator);
+ assertNoTxStateKeyOnNode(nearVerRef[0], keyOnOtherNode);
+
+ updateKeyFormPrimary(keyOnInitiator);
+ updateKeyFormPrimary(keyOnOtherNode);
+
+ finishTxLatch.countDown();
+
+ fut.get(10_000);
+
+ assertNoActiveTx(nearVerRef[0]);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testDhtEntriesAfterRollbackToSavepoint() throws Exception {
+ IgniteCache<Integer, Integer> cache0 = transactionalCache(ignite0);
+
+ int keepTxAliveKey = primaryKey(cache0);
+ int dhtKey = keyForPrimaryAndBackup(ignite0, ignite1);
+
+ if (initKeies) {
+ cache0.put(keepTxAliveKey, -1);
+ cache0.put(dhtKey, -1);
+ }
+
+ CountDownLatch dhtKeyWrittenLatch = new CountDownLatch(1);
+ CountDownLatch proceedRollbackLatch = new CountDownLatch(1);
+ CountDownLatch rollbackDoneLatch = new CountDownLatch(1);
+ CountDownLatch finishTxLatch = new CountDownLatch(1);
+ GridCacheVersion[] nearVerRef = new GridCacheVersion[1];
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runAsync(() -> {
+ try (Transaction tx = startTransaction(ignite0, 2)) {
+ nearVerRef[0] = ((TransactionProxyImpl<?,
?>)tx).tx().nearXidVersion();
+
+ cache0.put(keepTxAliveKey, 1);
+
+ tx.savepoint("sp");
+
+ cache0.put(dhtKey, 1);
+
+ dhtKeyWrittenLatch.countDown();
+
+ assertTrue(proceedRollbackLatch.await(10, TimeUnit.SECONDS));
+
+ tx.rollbackToSavepoint("sp");
+
+ rollbackDoneLatch.countDown();
+
+ assertTrue(finishTxLatch.await(10, TimeUnit.SECONDS));
+
+ tx.commit();
+ }
+ });
+
+ assertTrue(dhtKeyWrittenLatch.await(10, TimeUnit.SECONDS));
+ assertNotNull(nearVerRef[0]);
+
+ proceedRollbackLatch.countDown();
+
+ assertTrue(rollbackDoneLatch.await(10, TimeUnit.SECONDS));
+
+ assertNoTxStateKeyOnNode(nearVerRef[0], dhtKey);
+
+ finishTxLatch.countDown();
+
+ fut.get(10_000);
+
+ assertNoActiveTx(nearVerRef[0]);
+ }
+
+ /**
+ * Updates a key from its primary node.
+ *
+ * @param key Key to update.
+ */
+ private void updateKeyFormPrimary(int key) {
+ Ignite updateNode = primaryNode(key, DEFAULT_CACHE_NAME);
+
+ updateNode.cache(DEFAULT_CACHE_NAME).put(key, 42);
+ }
+
+ /**
+ * Asserts that no active transaction with the given near version contains
the specified key
+ * in its write-set or entry map on any node.
+ *
+ * @param nearVer Near transaction version.
+ * @param key Key that must be absent from tx state.
+ * @throws Exception If waiting fails.
+ */
+ private void assertNoTxStateKeyOnNode(GridCacheVersion nearVer, int key)
throws Exception {
+ for (Ignite ignite : G.allGrids()) {
+ if (ignite.configuration().isClientMode())
+ continue;
+
+ assertTrue(GridTestUtils.waitForCondition(() -> {
+ GridCacheContext<?, ?> cctx =
((IgniteKernal)ignite).internalCache(DEFAULT_CACHE_NAME).context();
+ IgniteTxKey txKey = cctx.txKey(cctx.toCacheKeyObject(key));
+
+ for (IgniteInternalTx tx : cctx.tm().activeTransactions()) {
+ if (!nearVer.equals(tx.nearXidVersion()))
+ continue;
+
+ if (tx.hasWriteKey(txKey) || tx.entry(txKey) != null)
+ return false;
+ }
+
+ return true;
+ }, 10_000));
+ }
+ }
+
+ /**
+ * Asserts that no active transaction remains with the near version.
+ *
+ * @param nearVer Near transaction version.
+ * @throws Exception If waiting fails.
+ */
+ private void assertNoActiveTx(GridCacheVersion nearVer) throws Exception {
+ for (Ignite ignite : G.allGrids()) {
+ if (ignite.configuration().isClientMode())
+ continue;
+
+ assertTrue(GridTestUtils.waitForCondition(() -> {
+ GridCacheContext<?, ?> cctx =
((IgniteKernal)ignite).internalCache(DEFAULT_CACHE_NAME).context();
+ for (IgniteInternalTx tx : cctx.tm().activeTransactions()) {
+ if (nearVer.equals(tx.nearXidVersion()))
+ return false;
+ }
+
+ return true;
+ }, 10_000));
+ }
+ }
+
+ /**
+ * Creates transactional cache.
+ *
+ * @param ignite Node.
+ * @return Transactional cache.
+ */
+ private IgniteCache<Integer, Integer> transactionalCache(Ignite ignite) {
+ CacheConfiguration<?, ?> ccfg =
+ new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+
.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
+ .setNearConfiguration(useNearCache ? new
NearCacheConfiguration<>() : null)
+ .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+ .setCacheMode(replicated ? CacheMode.REPLICATED :
CacheMode.PARTITIONED)
+ .setBackups(backups);
+
+ return (IgniteCache<Integer, Integer>)ignite.createCache(ccfg);
+ }
+
+ /**
+ * Gets key mapped to given primary and backups.
+ * Primary is determined by primaryNode parameter, backup nodes are
determined by spKeyOnTxInitiator parameter.
+ *
+ * @param txInitiator Transaction initiator.
+ * @param primaryNode Node that should be primary.
+ * @return Key mapped to given primary and backups.
+ */
+ private int keyForPrimaryAndBackup(Ignite txInitiator, Ignite primaryNode)
{
+ Ignite backupNode1 = spKeyOnTxInitiator && primaryNode != txInitiator ?
+ txInitiator : G.allGrids().stream()
+ .filter(n -> n != txInitiator && n !=
primaryNode).findFirst().orElseThrow();
+ Ignite backupNode2 = G.allGrids().stream()
+ .filter(n -> n != txInitiator && n != primaryNode && n !=
backupNode1).findFirst().orElseThrow();
+
+ for (int key = 0; key < 50_000; key++) {
+ Collection<ClusterNode> mapping =
txInitiator.affinity(DEFAULT_CACHE_NAME).mapKeyToPrimaryAndBackups(key);
+
+ if (!replicated)
+ assertEquals(backups + 1, mapping.size());
+
+ ClusterNode primary = mapping.iterator().next();
+
+ if (replicated &&
primary.id().equals(primaryNode.cluster().localNode().id()))
+ return key;
+
+ ClusterNode backup1 =
mapping.stream().skip(1).findFirst().orElse(null);
+ ClusterNode backup2 =
mapping.stream().skip(2).findFirst().orElse(null);
+
+ if (primary.id().equals(primaryNode.cluster().localNode().id()) &&
+ (backups < 1 ||
backup1.id().equals(backupNode1.cluster().localNode().id())) &&
+ (backups < 2 ||
backup2.id().equals(backupNode2.cluster().localNode().id())))
+ return key;
+ }
+
+ throw new AssertionError("Failed to find key for requested
primary/backup mapping.");
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxSavepointPessimisticTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxSavepointPessimisticTest.java
new file mode 100644
index 00000000000..59d28384cfa
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxSavepointPessimisticTest.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionException;
+import org.junit.Test;
+
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static
org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static
org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static
org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ * Tests transaction savepoint functionality.
+ * Currently, savepoint API is supported only for pessimistic transactions.
+ */
+public class TxSavepointPessimisticTest extends GridCommonAbstractTest {
+ /** */
+ private static Ignite ignite;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName)
+ .setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+ .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+ .setBackups(1)
+ .setCacheMode(CacheMode.PARTITIONED));
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ ignite = startGrid(0);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+
+ super.afterTestsStopped();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ ignite.cache(DEFAULT_CACHE_NAME).clear();
+
+ super.afterTest();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testSavepointRejectedForOptimisticTx() throws Exception {
+ try (Transaction tx = ignite.transactions().txStart(OPTIMISTIC,
REPEATABLE_READ)) {
+ GridTestUtils.assertThrowsAnyCause(log,
+ () -> {
+ tx.savepoint("sp");
+
+ return null;
+ },
+ IgniteCheckedException.class,
+ "Savepoints are supported only for PESSIMISTIC transactions.");
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testRollabackSeveralSavepoints() throws Exception {
+ IgniteCache<Integer, Integer> cache = ignite.cache(DEFAULT_CACHE_NAME);
+
+ int key = 1;
+
+ try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC,
READ_COMMITTED)) {
+ cache.put(key, 1);
+
+ tx.savepoint("sp1");
+
+ cache.put(key, 2);
+
+ tx.savepoint("sp2");
+
+ cache.put(key, 3);
+
+ tx.savepoint("sp3");
+
+ cache.put(key, 4);
+
+ tx.rollbackToSavepoint("sp1");
+
+ tx.commit();
+ }
+
+ assertEquals(Integer.valueOf(1), cache.get(key));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testDuplicateSavepointWithoutOverwriteThrows() throws
Exception {
+ try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC,
READ_COMMITTED)) {
+ tx.savepoint("sp");
+
+ GridTestUtils.assertThrowsAnyCause(
+ log,
+ () -> {
+ tx.savepoint("sp");
+
+ return null;
+ },
+ TransactionException.class,
+ "already exists"
+ );
+
+ tx.savepoint("sp", true);
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testReleaseSavepointReleasesNestedSavepoints() throws
Exception {
+ IgniteCache<Integer, Integer> cache = ignite.cache(DEFAULT_CACHE_NAME);
+
+ int key = 1;
+
+ try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC,
READ_COMMITTED)) {
+ cache.put(key, 1);
+ tx.savepoint("sp1");
+
+ cache.put(key, 2);
+ tx.savepoint("sp2");
+
+ cache.put(key, 3);
+ tx.savepoint("sp3");
+
+ cache.put(key, 4);
+
+ tx.releaseSavepoint("sp2");
+
+ GridTestUtils.assertThrowsAnyCause(log,
+ () -> {
+ tx.rollbackToSavepoint("sp3");
+
+ return null;
+ },
+ TransactionException.class,
+ "Savepoint does not exist");
+
+ GridTestUtils.assertThrowsAnyCause(log,
+ () -> {
+ tx.rollbackToSavepoint("sp2");
+
+ return null;
+ },
+ TransactionException.class,
+ "Savepoint does not exist");
+
+ tx.rollbackToSavepoint("sp1");
+
+ tx.commit();
+ }
+
+ assertEquals(Integer.valueOf(1), cache.get(key));
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/GridAbstractCacheStoreSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/GridAbstractCacheStoreSelfTest.java
index c9dafe95dd6..414fbd0e5c8 100644
---
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/GridAbstractCacheStoreSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/GridAbstractCacheStoreSelfTest.java
@@ -599,6 +599,26 @@ public abstract class GridAbstractCacheStoreSelfTest<T
extends CacheStore<Object
// No-op.
}
+ /** {@inheritDoc} */
+ @Override public void savepoint(String name) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void savepoint(String name, boolean overwrite) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void rollbackToSavepoint(String name) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void releaseSavepoint(String name) {
+ // No-op.
+ }
+
/** {@inheritDoc} */
@Override public IgniteFuture<Void> rollbackAsync() throws
IgniteException {
return null;
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite12.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite12.java
index 38ab77dcf7b..060434b06a9 100755
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite12.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite12.java
@@ -47,6 +47,8 @@ import
org.apache.ignite.internal.processors.cache.transactions.TxCrossCacheMapO
import
org.apache.ignite.internal.processors.cache.transactions.TxCrossCacheRemoteMultiplePartitionReservationTest;
import
org.apache.ignite.internal.processors.cache.transactions.TxRecoveryConcurrentTest;
import
org.apache.ignite.internal.processors.cache.transactions.TxRecoveryWithConcurrentRollbackTest;
+import
org.apache.ignite.internal.processors.cache.transactions.TxSavepointParameterizedTest;
+import
org.apache.ignite.internal.processors.cache.transactions.TxSavepointPessimisticTest;
import
org.apache.ignite.internal.processors.cache.transactions.TxWithKeyContentionSelfTest;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.DynamicSuite;
@@ -110,6 +112,9 @@ public class IgniteCacheTestSuite12 {
GridTestUtils.addTestIfNeeded(suite,
EvictionWhilePartitionGroupIsReservedTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite,
DelayedOwningDuringExchangeTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, TxSavepointPessimisticTest.class,
ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite,
TxSavepointParameterizedTest.class, ignoredTests);
+
return suite;
}
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
index 1f6bae5f4cd..0891cfdc8ca 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
@@ -71,6 +71,7 @@ import
org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTime
import
org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTimeoutOnePhaseCommitTest;
import
org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTimeoutTest;
import
org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTopologyChangeTest;
+import
org.apache.ignite.internal.processors.cache.transactions.TxSavepointPessimisticTest;
import
org.apache.ignite.internal.processors.cache.transactions.TxStateChangeEventTest;
import
org.apache.ignite.internal.processors.cache.transactions.TxTimeoutOnInitializationTest;
import org.apache.ignite.testframework.GridTestUtils;
@@ -121,6 +122,7 @@ public class IgniteCacheTestSuite6 {
GridTestUtils.addTestIfNeeded(suite, TxLabelTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite,
TxRollbackOnIncorrectParamsTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, TxSavepointPessimisticTest.class,
ignoredTests);
GridTestUtils.addTestIfNeeded(suite, TxStateChangeEventTest.class,
ignoredTests);
GridTestUtils.addTestIfNeeded(suite, TxMultiCacheAsyncOpsTest.class,
ignoredTests);