Repository: ignite
Updated Branches:
  refs/heads/master d3760ed79 -> 309162be3


IGNITE-8446 Ability to check and completely fill transactions on creation

Signed-off-by: Anton Vinogradov <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/309162be
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/309162be
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/309162be

Branch: refs/heads/master
Commit: 309162be35d1c0864982ad9cd5d17da107cb464d
Parents: d3760ed
Author: Anton Vinogradov <[email protected]>
Authored: Mon Aug 6 12:56:22 2018 +0300
Committer: Anton Vinogradov <[email protected]>
Committed: Mon Aug 6 12:56:22 2018 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/events/EventType.java     |  75 +++++
 .../events/TransactionStateChangedEvent.java    |  56 ++++
 .../cache/transactions/IgniteTxAdapter.java     |  59 +++-
 .../cache/transactions/IgniteTxManager.java     |   5 +-
 .../transactions/TransactionEventProxyImpl.java | 220 +++++++++++++
 .../TxRollbackOnIncorrectParamsTest.java        | 326 +++++++++++++++++++
 .../transactions/TxStateChangeEventTest.java    | 234 +++++++++++++
 .../testsuites/IgniteCacheTestSuite6.java       |   4 +
 8 files changed, 977 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/309162be/modules/core/src/main/java/org/apache/ignite/events/EventType.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/events/EventType.java 
b/modules/core/src/main/java/org/apache/ignite/events/EventType.java
index b0b410a..a6ab962 100644
--- a/modules/core/src/main/java/org/apache/ignite/events/EventType.java
+++ b/modules/core/src/main/java/org/apache/ignite/events/EventType.java
@@ -781,6 +781,66 @@ public interface EventType {
     public static final int EVT_WAL_SEGMENT_ARCHIVED = 128;
 
     /**
+     * Built-in event type: Transaction has been started.
+     * <p>
+     * Fired for each started transaction except system transactions.
+     * <p>
+     * NOTE: all types in range <b>from 1 to 1000 are reserved</b> for
+     * internal Ignite events and should not be used by user-defined events.
+     *
+     * @see TransactionStateChangedEvent
+     */
+    public static final int EVT_TX_STARTED = 129;
+
+    /**
+     * Built-in event type: Transaction has been committed.
+     * <p>
+     * Fired for each committed transaction except system transactions.
+     * <p>
+     * NOTE: all types in range <b>from 1 to 1000 are reserved</b> for
+     * internal Ignite events and should not be used by user-defined events.
+     *
+     * @see TransactionStateChangedEvent
+     */
+    public static final int EVT_TX_COMMITTED = 130;
+
+    /**
+     * Built-in event type: Transaction has been rolled back.
+     * <p>
+     * Fired for each rolled back transaction except system transactions.
+     * <p>
+     * NOTE: all types in range <b>from 1 to 1000 are reserved</b> for
+     * internal Ignite events and should not be used by user-defined events.
+     *
+     * @see TransactionStateChangedEvent
+     */
+    public static final int EVT_TX_ROLLED_BACK = 131;
+
+    /**
+     * Built-in event type: Transaction has been suspended.
+     * <p>
+     * Fired for each suspended transaction except system transactions.
+     * <p>
+     * NOTE: all types in range <b>from 1 to 1000 are reserved</b> for
+     * internal Ignite events and should not be used by user-defined events.
+     *
+     * @see TransactionStateChangedEvent
+     */
+    public static final int EVT_TX_SUSPENDED = 132;
+
+    /**
+     * Built-in event type: Transaction has been resumed.
+     * <p>
+     * Fired for each resumed transaction except system transactions.
+     * <p>
+     * NOTE: all types in range <b>from 1 to 1000 are reserved</b> for
+     * internal Ignite events and should not be used by user-defined events.
+     *
+     * @see TransactionStateChangedEvent
+     */
+    public static final int EVT_TX_RESUMED = 133;
+
+    /**
      * All checkpoint events. This array can be directly passed into
      * {@link IgniteEvents#localListen(IgnitePredicate, int...)} method to
      * subscribe to all checkpoint events.
@@ -979,6 +1039,21 @@ public interface EventType {
     };
 
     /**
+     * All Transaction events. This array can be directly passed into
+     * {@link IgniteEvents#localListen(IgnitePredicate, int...)} method to
+     * subscribe to all transaction events.
+     *
+     * @see TransactionStateChangedEvent
+     */
+    public static final int[] EVTS_TX = {
+        EVT_TX_STARTED,
+        EVT_TX_COMMITTED,
+        EVT_TX_ROLLED_BACK,
+        EVT_TX_SUSPENDED,
+        EVT_TX_RESUMED
+    };
+
+    /**
      * All Ignite events (<b>including</b> metric update event).
      */
     public static final int[] EVTS_ALL = U.gridEvents();

http://git-wip-us.apache.org/repos/asf/ignite/blob/309162be/modules/core/src/main/java/org/apache/ignite/events/TransactionStateChangedEvent.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/events/TransactionStateChangedEvent.java
 
b/modules/core/src/main/java/org/apache/ignite/events/TransactionStateChangedEvent.java
new file mode 100644
index 0000000..cf6e099
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/events/TransactionStateChangedEvent.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.events;
+
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.transactions.Transaction;
+
+/**
+ * Event indicates transaction state change.
+ *
+ * @see EventType#EVTS_TX
+ */
+public class TransactionStateChangedEvent extends EventAdapter {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Tx. */
+    private Transaction tx;
+
+    /**
+     * @param node Node.
+     * @param msg Message.
+     * @param type Type.
+     * @param tx Tx.
+     */
+    public TransactionStateChangedEvent(ClusterNode node, String msg, int 
type, Transaction tx) {
+        super(node, msg, type);
+
+        assert tx != null;
+
+        this.tx = tx;
+    }
+
+    /**
+     * Provides transaction proxy allows all 'get' operations such as {@link 
Transaction#label()}
+     * and also {@link Transaction#setRollbackOnly()} method.
+     */
+    public Transaction tx() {
+        return tx;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/309162be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 15df637..6fdb046 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -43,8 +43,10 @@ import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.store.CacheStore;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.TransactionStateChangedEvent;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.managers.discovery.ConsistentIdMapper;
+import 
org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
 import org.apache.ignite.internal.pagemem.wal.WALPointer;
 import org.apache.ignite.internal.pagemem.wal.record.TxRecord;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -60,13 +62,14 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
+import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
 import 
org.apache.ignite.internal.processors.cache.version.GridCacheLazyPlainVersionedEntry;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
 import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
-import 
org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
 import org.apache.ignite.internal.processors.cluster.BaselineTopology;
+import 
org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
 import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
 import org.apache.ignite.internal.util.GridSetWrapper;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -88,6 +91,10 @@ import org.apache.ignite.transactions.TransactionState;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ;
+import static org.apache.ignite.events.EventType.EVT_TX_COMMITTED;
+import static org.apache.ignite.events.EventType.EVT_TX_RESUMED;
+import static org.apache.ignite.events.EventType.EVT_TX_ROLLED_BACK;
+import static org.apache.ignite.events.EventType.EVT_TX_SUSPENDED;
 import static 
org.apache.ignite.internal.processors.cache.GridCacheOperation.CREATE;
 import static 
org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
 import static 
org.apache.ignite.internal.processors.cache.GridCacheOperation.NOOP;
@@ -1095,6 +1102,8 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter implement
                 if (log.isDebugEnabled())
                     log.debug("Changed transaction state [prev=" + prev + ", 
new=" + this.state + ", tx=" + this + ']');
 
+                recordStateChangedEvent(state);
+
                 notifyAll();
             }
             else {
@@ -1160,6 +1169,54 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter implement
         return valid;
     }
 
+    /** */
+    private void recordStateChangedEvent(TransactionState state){
+        if (!near() || !local()) // Covers only GridNearTxLocal's state 
changes.
+            return;
+
+        switch (state) {
+            case ACTIVE: {
+                recordStateChangedEvent(EVT_TX_RESUMED);
+
+                break;
+            }
+
+            case COMMITTED: {
+                recordStateChangedEvent(EVT_TX_COMMITTED);
+
+                break;
+            }
+
+            case ROLLED_BACK: {
+                recordStateChangedEvent(EVT_TX_ROLLED_BACK);
+
+                break;
+            }
+
+            case SUSPENDED: {
+                recordStateChangedEvent(EVT_TX_SUSPENDED);
+
+                break;
+            }
+        }
+    }
+
+    /**
+     * @param type Event type.
+     */
+    protected void recordStateChangedEvent(int type){
+        assert near() && local();
+
+        GridEventStorageManager evtMgr = cctx.gridEvents();
+
+        if (!system() /* ignoring system tx */ && evtMgr.isRecordable(type))
+            evtMgr.record(new TransactionStateChangedEvent(
+                cctx.discovery().localNode(),
+                "Transaction state changed.",
+                type,
+                new TransactionEventProxyImpl((GridNearTxLocal)this)));
+    }
+
     /** {@inheritDoc} */
     @Override public void endVersion(GridCacheVersion endVer) {
         this.endVer = endVer;

http://git-wip-us.apache.org/repos/asf/ignite/blob/309162be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index ec9a5c4..ffaaf43 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -51,9 +51,9 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio
 import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
-import org.apache.ignite.internal.processors.cache.GridCacheVersionedFuture;
 import 
org.apache.ignite.internal.processors.cache.GridCacheReturnCompletableWrapper;
 import 
org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
+import org.apache.ignite.internal.processors.cache.GridCacheVersionedFuture;
 import 
org.apache.ignite.internal.processors.cache.GridDeferredAckMessageSender;
 import 
org.apache.ignite.internal.processors.cache.distributed.GridCacheMappedVersion;
 import 
org.apache.ignite.internal.processors.cache.distributed.GridCacheTxFinishSync;
@@ -101,6 +101,7 @@ import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_TX_SALVAGE_TIMEOUT
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_LOG_TX_RECORDS;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static org.apache.ignite.events.EventType.EVT_TX_STARTED;
 import static org.apache.ignite.internal.GridTopic.TOPIC_TX;
 import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
 import static 
org.apache.ignite.internal.processors.cache.GridCacheOperation.READ;
@@ -532,6 +533,8 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
                     else
                         sysThreadMap.put(new TxThreadKey(tx.threadId(), 
cacheCtx.cacheId()), tx);
                 }
+
+                ((GridNearTxLocal)tx).recordStateChangedEvent(EVT_TX_STARTED);
             }
 
             // Handle mapped versions.

http://git-wip-us.apache.org/repos/asf/ignite/blob/309162be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionEventProxyImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionEventProxyImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionEventProxyImpl.java
new file mode 100644
index 0000000..f6bc668
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionEventProxyImpl.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.UUID;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.events.TransactionStateChangedEvent;
+import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
+import org.apache.ignite.lang.IgniteAsyncSupport;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.apache.ignite.transactions.TransactionState;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Transaction proxy used at tx events.
+ *
+ * @see TransactionStateChangedEvent
+ */
+public class TransactionEventProxyImpl implements TransactionProxy, 
Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Unsupported operation message. */
+    private static final String UNSUPPORTED_OPERATION_MESSAGE = "All 
operations changing transaction's params or " +
+        "state are restricted inside event listener. " +
+        "The only exception is setRollbackOnly(), use it in case transaction 
should be rolled back.";
+
+    /** Xid. */
+    private IgniteUuid xid;
+
+    /** Tx. */
+    private GridNearTxLocal tx;
+
+    /** Proxy. */
+    private TransactionProxy proxy;
+
+    /**
+     * Default constructor (required by Externalizable).
+     */
+    public TransactionEventProxyImpl() {
+    }
+
+    /**
+     * @param tx Tx proxy.
+     */
+    public TransactionEventProxyImpl(GridNearTxLocal tx) {
+        assert tx != null;
+
+        this.tx = tx;
+        this.xid = tx.xid();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteUuid xid() {
+        return xid;
+    }
+
+    /** {@inheritDoc} */
+    @Override public UUID nodeId() {
+        return tx().nodeId();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long threadId() {
+        return tx().threadId();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long startTime() {
+        return tx().startTime();
+    }
+
+    /** {@inheritDoc} */
+    @Override public TransactionIsolation isolation() {
+        return tx().isolation();
+    }
+
+    /** {@inheritDoc} */
+    @Override public TransactionConcurrency concurrency() {
+        return tx().concurrency();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean implicit() {
+        return tx().implicit();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isInvalidate() {
+        return tx().isInvalidate();
+    }
+
+    /** {@inheritDoc} */
+    @Override public TransactionState state() {
+        return tx().state();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long timeout() {
+        return tx().timeout();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long timeout(long timeout) {
+        throw new UnsupportedOperationException(UNSUPPORTED_OPERATION_MESSAGE);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean setRollbackOnly() {
+        return tx().setRollbackOnly();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isRollbackOnly() {
+        return tx().isRollbackOnly();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void commit() throws IgniteException {
+        throw new UnsupportedOperationException(UNSUPPORTED_OPERATION_MESSAGE);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<Void> commitAsync() throws IgniteException {
+        throw new UnsupportedOperationException(UNSUPPORTED_OPERATION_MESSAGE);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() throws IgniteException {
+        throw new UnsupportedOperationException(UNSUPPORTED_OPERATION_MESSAGE);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void rollback() throws IgniteException {
+        throw new UnsupportedOperationException(UNSUPPORTED_OPERATION_MESSAGE);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<Void> rollbackAsync() throws IgniteException 
{
+        throw new UnsupportedOperationException(UNSUPPORTED_OPERATION_MESSAGE);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void resume() throws IgniteException {
+        throw new UnsupportedOperationException(UNSUPPORTED_OPERATION_MESSAGE);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void suspend() throws IgniteException {
+        throw new UnsupportedOperationException(UNSUPPORTED_OPERATION_MESSAGE);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public String label() {
+        return tx().label();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteAsyncSupport withAsync() {
+        throw new UnsupportedOperationException("Operation deprecated.");
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isAsync() {
+        throw new UnsupportedOperationException("Operation deprecated.");
+    }
+
+    /** {@inheritDoc} */
+    @Override public <R> IgniteFuture<R> future() {
+        throw new UnsupportedOperationException("Operation deprecated.");
+    }
+
+    /**
+     * @return local transaction
+     * @throws IgniteException in case tx was not found.
+     */
+    private TransactionProxy tx() throws IgniteException {
+        if (tx == null)
+            throw new IgniteException("Operation allowed only inside remote 
filter or " +
+                "inside local listener registered on originating node. " +
+                "Only xid() operation allowed in other case. ");
+
+        if (proxy == null)
+            proxy = tx.proxy();
+
+        return proxy;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeObject(xid);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+        xid = (IgniteUuid)in.readObject();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/309162be/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnIncorrectParamsTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnIncorrectParamsTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnIncorrectParamsTest.java
new file mode 100644
index 0000000..8aafa8b
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnIncorrectParamsTest.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.cache.CacheException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.events.TransactionStateChangedEvent;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+
+import static org.apache.ignite.events.EventType.EVT_TX_STARTED;
+
+/**
+ * Tests transaction rollback on incorrect tx params.
+ */
+public class TxRollbackOnIncorrectParamsTest extends GridCommonAbstractTest {
+    /**
+     *
+     */
+    public void testTimeoutSetLocalGuarantee() throws Exception {
+        Ignite ignite = startGrid(0);
+
+        ignite.events().localListen((IgnitePredicate<Event>)e -> {
+            assert e instanceof TransactionStateChangedEvent;
+
+            TransactionStateChangedEvent evt = (TransactionStateChangedEvent)e;
+
+            Transaction tx = evt.tx();
+
+            if (tx.timeout() < 200)
+                tx.setRollbackOnly();
+
+            return true;
+        }, EVT_TX_STARTED);
+
+        IgniteCache cache = 
ignite.getOrCreateCache(defaultCacheConfiguration());
+
+        try (Transaction tx = ignite.transactions().txStart(
+            TransactionConcurrency.OPTIMISTIC, 
TransactionIsolation.REPEATABLE_READ, 200, 2)) {
+            cache.put(1, 1);
+
+            tx.commit();
+        }
+
+        try (Transaction tx = ignite.transactions().txStart(
+            TransactionConcurrency.OPTIMISTIC, 
TransactionIsolation.REPEATABLE_READ, 100, 2)) {
+            cache.put(1, 2);
+
+            tx.commit();
+
+            fail("Should fail prior this line.");
+        }
+        catch (CacheException ignored) {
+            // No-op.
+        }
+
+        try (Transaction tx = ignite.transactions().txStart()) {
+            cache.put(1, 3);
+
+            tx.commit();
+
+            fail("Should fail prior this line.");
+        }
+        catch (CacheException ignored) {
+            // No-op.
+        }
+    }
+
+    /**
+     *
+     */
+    public void testLabelFilledLocalGuarantee() throws Exception {
+        Ignite ignite = startGrid(0);
+
+        ignite.events().localListen((IgnitePredicate<Event>)e -> {
+            assert e instanceof TransactionStateChangedEvent;
+
+            TransactionStateChangedEvent evt = (TransactionStateChangedEvent)e;
+
+            Transaction tx = evt.tx();
+
+            if (tx.label() == null)
+                tx.setRollbackOnly();
+
+            return true;
+        }, EVT_TX_STARTED);
+
+        IgniteCache cache = 
ignite.getOrCreateCache(defaultCacheConfiguration());
+
+        try (Transaction tx = 
ignite.transactions().withLabel("test").txStart()) {
+            cache.put(1, 1);
+
+            tx.commit();
+        }
+
+        try (Transaction tx = ignite.transactions().txStart()) {
+            cache.put(1, 2);
+
+            tx.commit();
+
+            fail("Should fail prior this line.");
+        }
+        catch (CacheException ignored) {
+            // No-op.
+        }
+    }
+
+    /**
+     *
+     */
+    public void testLabelFilledRemoteGuarantee() throws Exception {
+        Ignite ignite = startGrid(0);
+        Ignite remote = startGrid(1);
+
+        IgniteCache cacheLocal = 
ignite.getOrCreateCache(defaultCacheConfiguration());
+        IgniteCache cacheRemote = 
remote.getOrCreateCache(defaultCacheConfiguration());
+
+        ignite.events().remoteListen(null,
+            (IgnitePredicate<Event>)e -> {
+                assert e instanceof TransactionStateChangedEvent;
+
+                TransactionStateChangedEvent evt = 
(TransactionStateChangedEvent)e;
+
+                Transaction tx = evt.tx();
+
+                if (tx.label() == null)
+                    tx.setRollbackOnly();
+
+                return true;
+            },
+            EVT_TX_STARTED);
+
+        try (Transaction tx = 
ignite.transactions().withLabel("test").txStart()) {
+            cacheLocal.put(1, 1);
+
+            tx.commit();
+        }
+
+        try (Transaction tx = 
remote.transactions().withLabel("test").txStart()) {
+            cacheRemote.put(1, 2);
+
+            tx.commit();
+        }
+
+        try (Transaction tx = ignite.transactions().txStart()) {
+            cacheLocal.put(1, 3);
+
+            tx.commit();
+
+            fail("Should fail prior this line.");
+        }
+        catch (CacheException ignored) {
+            // No-op.
+        }
+
+        try (Transaction tx = remote.transactions().txStart()) {
+            cacheRemote.put(1, 4);
+
+            tx.commit();
+
+            fail("Should fail prior this line.");
+        }
+        catch (CacheException ignored) {
+            // No-op.
+        }
+    }
+
+    /**
+     *
+     */
+    public void testTimeoutSetRemoteGuarantee() throws Exception {
+        Ignite ignite = startGrid(0);
+        Ignite remote = startGrid(1);
+
+        IgniteCache cacheLocal = 
ignite.getOrCreateCache(defaultCacheConfiguration());
+        IgniteCache cacheRemote = 
remote.getOrCreateCache(defaultCacheConfiguration());
+
+        ignite.events().remoteListen(null,
+            (IgnitePredicate<Event>)e -> {
+                assert e instanceof TransactionStateChangedEvent;
+
+                TransactionStateChangedEvent evt = 
(TransactionStateChangedEvent)e;
+
+                Transaction tx = evt.tx();
+
+                if (tx.timeout() == 0)
+                    tx.setRollbackOnly();
+
+                return true;
+            },
+            EVT_TX_STARTED);
+
+        try (Transaction tx = ignite.transactions().txStart(
+            TransactionConcurrency.OPTIMISTIC, 
TransactionIsolation.REPEATABLE_READ, 100, 2)) {
+            cacheLocal.put(1, 1);
+
+            tx.commit();
+        }
+
+        try (Transaction tx = remote.transactions().txStart(
+            TransactionConcurrency.OPTIMISTIC, 
TransactionIsolation.REPEATABLE_READ, 100, 2)) {
+            cacheRemote.put(1, 2);
+
+            tx.commit();
+        }
+
+        try (Transaction tx = ignite.transactions().txStart()) {
+            cacheLocal.put(1, 3);
+
+            tx.commit();
+
+            fail("Should fail prior this line.");
+        }
+        catch (CacheException ignored) {
+            // No-op.
+        }
+
+        try (Transaction tx = remote.transactions().txStart()) {
+            cacheRemote.put(1, 4);
+
+            tx.commit();
+
+            fail("Should fail prior this line.");
+        }
+        catch (CacheException ignored) {
+            // No-op.
+        }
+    }
+
+    /**
+     *
+     */
+    public void testRollbackInsideLocalListenerAfterRemoteFilter() throws 
Exception {
+        Ignite ignite = startGrid(0);
+        Ignite remote = startGrid(1);
+
+        IgniteCache cacheLocal = 
ignite.getOrCreateCache(defaultCacheConfiguration());
+        IgniteCache cacheRemote = 
remote.getOrCreateCache(defaultCacheConfiguration());
+
+        AtomicBoolean rollbackFailed = new AtomicBoolean();
+        AtomicBoolean alreadyRolledBack = new AtomicBoolean();
+
+        ignite.events().remoteListen(
+            (IgniteBiPredicate<UUID, Event>)(uuid, e) -> {
+                assert e instanceof TransactionStateChangedEvent;
+
+                TransactionStateChangedEvent evt = 
(TransactionStateChangedEvent)e;
+
+                Transaction tx = evt.tx();
+
+                try {
+                    tx.setRollbackOnly();
+                }
+                catch (IgniteException ignored) {
+                    alreadyRolledBack.set(rollbackFailed.getAndSet(true));
+                }
+
+                return true;
+            },
+            (IgnitePredicate<Event>)e -> {
+                assert e instanceof TransactionStateChangedEvent;
+
+                return true;
+            },
+            EVT_TX_STARTED);
+
+        assertFalse(rollbackFailed.get());
+        assertFalse(alreadyRolledBack.get());
+
+        try (Transaction tx = ignite.transactions().txStart()) {
+            cacheLocal.put(1, 1);
+
+            tx.commit();
+
+            fail("Should fail prior this line.");
+        }
+        catch (CacheException ignored) {
+            // No-op.
+        }
+
+        assertFalse(rollbackFailed.get());
+        assertFalse(alreadyRolledBack.get());
+
+        try (Transaction tx = remote.transactions().txStart()) {
+            cacheRemote.put(1, 2);
+
+            tx.commit();
+        }
+
+        assertTrue(GridTestUtils.waitForCondition(rollbackFailed::get, 5_000));
+
+        assertFalse(alreadyRolledBack.get());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/309162be/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxStateChangeEventTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxStateChangeEventTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxStateChangeEventTest.java
new file mode 100644
index 0000000..01c87ae
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxStateChangeEventTest.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteEvents;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.events.TransactionStateChangedEvent;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.apache.ignite.transactions.TransactionState;
+
+import static org.apache.ignite.events.EventType.EVTS_TX;
+import static org.apache.ignite.events.EventType.EVT_TX_COMMITTED;
+import static org.apache.ignite.events.EventType.EVT_TX_RESUMED;
+import static org.apache.ignite.events.EventType.EVT_TX_ROLLED_BACK;
+import static org.apache.ignite.events.EventType.EVT_TX_STARTED;
+import static org.apache.ignite.events.EventType.EVT_TX_SUSPENDED;
+
+/**
+ * Tests transaction state change event.
+ */
+public class TxStateChangeEventTest extends GridCommonAbstractTest {
+    /** Label. */
+    private final String lb = "testLabel";
+
+    /** Timeout. */
+    private final long timeout = 404;
+
+    /** Creation. */
+    private static AtomicBoolean creation = new AtomicBoolean();
+
+    /** Commit. */
+    private static AtomicBoolean commit = new AtomicBoolean();
+
+    /** Rollback. */
+    private static AtomicBoolean rollback = new AtomicBoolean();
+
+    /** Suspend. */
+    private static AtomicBoolean suspend = new AtomicBoolean();
+
+    /** Resume. */
+    private static AtomicBoolean resume = new AtomicBoolean();
+
+    /**
+     *
+     */
+    public void testLocal() throws Exception {
+        test(true);
+    }
+
+    /**
+     *
+     */
+    public void testRemote() throws Exception {
+        test(false);
+    }
+
+    /**
+     *
+     */
+    private void test(boolean loc) throws Exception {
+        Ignite ignite = startGrids(5);
+
+        final IgniteEvents evts = loc ? ignite.events() : grid(3).events();
+
+        if (loc)
+            evts.localListen((IgnitePredicate<Event>)e -> {
+                assert e instanceof TransactionStateChangedEvent;
+
+                checkEvent((TransactionStateChangedEvent)e);
+
+                return true;
+            }, EVTS_TX);
+        else
+            evts.remoteListen(null,
+                (IgnitePredicate<Event>)e -> {
+                    assert e instanceof TransactionStateChangedEvent;
+
+                    checkEvent((TransactionStateChangedEvent)e);
+
+                    return false;
+                },
+                EVTS_TX);
+
+        IgniteCache cache = 
ignite.getOrCreateCache(defaultCacheConfiguration().setBackups(2));
+
+        // create & commit
+        try (Transaction tx = ignite.transactions().withLabel(lb).txStart(
+            TransactionConcurrency.PESSIMISTIC, 
TransactionIsolation.SERIALIZABLE, timeout, 3)) {
+            cache.put(1, 1);
+
+            tx.commit();
+        }
+
+        assertTrue(
+            creation.get() &&
+                commit.get() &&
+                !rollback.get() &&
+                !suspend.get() &&
+                !resume.get());
+
+        clear();
+
+        // create & suspend & resume & commit
+        try (Transaction tx = ignite.transactions().withLabel(lb).txStart(
+            TransactionConcurrency.OPTIMISTIC, 
TransactionIsolation.SERIALIZABLE, timeout, 3)) {
+            cache.put(2, 7);
+
+            tx.suspend();
+
+            U.sleep(100);
+
+            tx.resume();
+
+            tx.commit();
+        }
+
+        assertTrue(
+            creation.get() &&
+                commit.get() &&
+                !rollback.get() &&
+                suspend.get() &&
+                resume.get());
+
+        clear();
+
+        // create & rollback (pessimistic)
+        try (Transaction tx = ignite.transactions().withLabel(lb).txStart(
+            TransactionConcurrency.PESSIMISTIC, 
TransactionIsolation.SERIALIZABLE, timeout, 3)) {
+            cache.put(4, 5);
+        }
+
+        assertTrue(
+            creation.get() &&
+                !commit.get() &&
+                rollback.get() &&
+                !suspend.get() &&
+                !resume.get());
+    }
+
+    /**
+     *
+     */
+    private void clear() {
+        creation.set(false);
+        commit.set(false);
+        rollback.set(false);
+        suspend.set(false);
+        resume.set(false);
+    }
+
+    /**
+     * @param evt Event.
+     */
+    private void checkEvent(TransactionStateChangedEvent evt) {
+        Transaction tx = evt.tx();
+
+        assertEquals(timeout, tx.timeout());
+        assertEquals(lb, tx.label());
+
+        switch (evt.type()) {
+            case EVT_TX_STARTED: {
+                assertEquals(tx.state(), TransactionState.ACTIVE);
+
+                assertFalse(creation.getAndSet(true));
+
+                break;
+            }
+
+            case EVT_TX_COMMITTED: {
+                assertEquals(tx.state(), TransactionState.COMMITTED);
+
+                assertFalse(commit.getAndSet(true));
+
+                break;
+            }
+
+            case EVT_TX_ROLLED_BACK: {
+                assertEquals(tx.state(), TransactionState.ROLLED_BACK);
+
+                assertFalse(rollback.getAndSet(true));
+
+                break;
+            }
+
+            case EVT_TX_SUSPENDED: {
+                assertEquals(tx.state(), TransactionState.SUSPENDED);
+
+                assertFalse(suspend.getAndSet(true));
+
+                break;
+            }
+
+            case EVT_TX_RESUMED: {
+                assertEquals(tx.state(), TransactionState.ACTIVE);
+
+                assertFalse(resume.getAndSet(true));
+
+                break;
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+
+        clear();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/309162be/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
index 7a5aa80..4af9e1f 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
@@ -44,6 +44,8 @@ import 
org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTime
 import 
org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTimeoutNoDeadlockDetectionTest;
 import 
org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTimeoutTest;
 import 
org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnTopologyChangeTest;
+import 
org.apache.ignite.internal.processors.cache.transactions.TxRollbackOnIncorrectParamsTest;
+import 
org.apache.ignite.internal.processors.cache.transactions.TxStateChangeEventTest;
 import org.apache.ignite.testframework.junits.GridAbstractTest;
 
 /**
@@ -79,6 +81,8 @@ public class IgniteCacheTestSuite6 extends TestSuite {
         suite.addTestSuite(TxOptimisticPrepareOnUnstableTopologyTest.class);
 
         suite.addTestSuite(TxLabelTest.class);
+        suite.addTestSuite(TxRollbackOnIncorrectParamsTest.class);
+        suite.addTestSuite(TxStateChangeEventTest.class);
 
         suite.addTestSuite(TxMultiCacheAsyncOpsTest.class);
 

Reply via email to