IGNITE-6846 Add metrics for entry processor invocations. - Fixes #3148.

Signed-off-by: Dmitriy Pavlov <dpav...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5e8669af
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5e8669af
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5e8669af

Branch: refs/heads/ignite-8446
Commit: 5e8669af709e8a4bfd41997212ee471efe4e6f36
Parents: bcda7a1
Author: voipp <alkuznetsov...@gmail.com>
Authored: Tue Jul 31 16:01:39 2018 +0300
Committer: Dmitriy Pavlov <dpav...@apache.org>
Committed: Tue Jul 31 16:01:39 2018 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/cache/CacheMetrics.java   |  77 ++++
 .../cache/CacheClusterMetricsMXBeanImpl.java    |  55 +++
 .../cache/CacheLocalMetricsMXBeanImpl.java      |  55 +++
 .../processors/cache/CacheMetricsImpl.java      | 206 +++++++++
 .../processors/cache/CacheMetricsSnapshot.java  | 157 +++++++
 .../processors/cache/GridCacheAdapter.java      |  67 ++-
 .../processors/cache/GridCacheEntryEx.java      |   8 +-
 .../processors/cache/GridCacheMapEntry.java     | 115 +++--
 .../cache/GridCacheUpdateAtomicResult.java      |  15 +-
 .../distributed/dht/GridDhtTxPrepareFuture.java |   5 +
 .../GridDhtAtomicAbstractUpdateFuture.java      |   8 +-
 .../GridDhtAtomicAbstractUpdateRequest.java     |  16 +-
 .../dht/atomic/GridDhtAtomicCache.java          |  63 ++-
 .../GridDhtAtomicSingleUpdateRequest.java       |  11 +-
 .../dht/atomic/GridDhtAtomicUpdateRequest.java  |   5 +-
 .../distributed/near/GridNearAtomicCache.java   |  14 +-
 .../local/atomic/GridLocalAtomicCache.java      | 116 ++++--
 .../cache/transactions/IgniteTxAdapter.java     |   2 +
 .../platform/cache/PlatformCache.java           |  11 +
 .../cache/GridCacheAbstractMetricsSelfTest.java | 417 +++++++++++++++++++
 .../processors/cache/GridCacheTestEntryEx.java  |   8 +-
 .../GridCacheNearAtomicMetricsSelfTest.java     |  35 ++
 .../near/GridCacheNearMetricsSelfTest.java      | 388 +++++++++++++++++
 .../platform/PlatformCacheWriteMetricsTask.java |  55 +++
 .../IgniteCacheMetricsSelfTestSuite.java        |   2 +
 .../Cache/CacheMetricsTest.cs                   |  11 +
 .../Apache.Ignite.Core/Cache/ICacheMetrics.cs   |  88 ++++
 .../Impl/Cache/CacheMetricsImpl.cs              |  79 +++-
 28 files changed, 1999 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5e8669af/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java 
b/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java
index e3e6446..d3a4c04 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java
@@ -72,6 +72,83 @@ public interface CacheMetrics {
     public long getCachePuts();
 
     /**
+     * The total number of cache invocations, caused update.
+     *
+     * @return The number of invocation updates.
+     */
+    public long getEntryProcessorPuts();
+
+    /**
+     * The total number of cache invocations, caused removal.
+     *
+     * @return The number of invocation removals.
+     */
+    public long getEntryProcessorRemovals();
+
+    /**
+     * The total number of cache invocations, caused no updates.
+     *
+     * @return The number of read-only invocations.
+     */
+    public long getEntryProcessorReadOnlyInvocations();
+
+    /**
+     * The total number of cache invocations.
+     *
+     * @return The number of cache invocations.
+     */
+    public long getEntryProcessorInvocations();
+
+    /**
+     * The total number of invocations on keys, which exist in cache.
+     *
+     * @return The number of cache invocation hits.
+     */
+    public long getEntryProcessorHits();
+
+    /**
+     * The percentage of invocations on keys, which exist in cache.
+     *
+     * @return The percentage of successful invocation hits.
+     */
+    public float getEntryProcessorHitPercentage();
+
+    /**
+     * The total number of invocations on keys, which don't exist in cache.
+     *
+     * @return The number of cache invocation misses.
+     */
+    public long getEntryProcessorMisses();
+
+    /**
+     * The percentage of invocations on keys, which don't exist in cache.
+     *
+     * @return The percentage of invocation misses.
+     */
+    public float getEntryProcessorMissPercentage();
+
+    /**
+     * The mean time to execute cache invokes.
+     *
+     * @return The time in µs.
+     */
+    public float getEntryProcessorAverageInvocationTime();
+
+    /**
+     * So far, the minimum time to execute cache invokes.
+     *
+     * @return The time in µs.
+     */
+    public float getEntryProcessorMinInvocationTime();
+
+    /**
+     * So far, the maximum time to execute cache invokes.
+     *
+     * @return The time in µs.
+     */
+    public float getEntryProcessorMaxInvocationTime();
+
+    /**
      * The total number of removals from the cache. This does not include 
evictions,
      * where the cache itself initiates the removal to make space.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/5e8669af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java
index 8935a98..cbd0b57 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java
@@ -290,6 +290,61 @@ class CacheClusterMetricsMXBeanImpl implements 
CacheMetricsMXBean {
     }
 
     /** {@inheritDoc} */
+    @Override public long getEntryProcessorPuts() {
+        return cache.clusterMetrics().getEntryProcessorPuts();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getEntryProcessorRemovals() {
+        return cache.clusterMetrics().getEntryProcessorRemovals();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getEntryProcessorReadOnlyInvocations() {
+        return cache.clusterMetrics().getEntryProcessorReadOnlyInvocations();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getEntryProcessorInvocations() {
+        return cache.clusterMetrics().getEntryProcessorInvocations();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getEntryProcessorHits() {
+        return cache.clusterMetrics().getEntryProcessorHits();
+    }
+
+    /** {@inheritDoc} */
+    @Override public float getEntryProcessorHitPercentage() {
+        return cache.clusterMetrics().getEntryProcessorHitPercentage();
+    }
+
+    /** {@inheritDoc} */
+    @Override public float getEntryProcessorMissPercentage() {
+        return cache.clusterMetrics().getEntryProcessorMissPercentage();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getEntryProcessorMisses() {
+        return cache.clusterMetrics().getEntryProcessorMisses();
+    }
+
+    /** {@inheritDoc} */
+    @Override public float getEntryProcessorAverageInvocationTime() {
+        return cache.clusterMetrics().getEntryProcessorAverageInvocationTime();
+    }
+
+    /** {@inheritDoc} */
+    @Override public float getEntryProcessorMinInvocationTime() {
+        return cache.clusterMetrics().getEntryProcessorMinInvocationTime();
+    }
+
+    /** {@inheritDoc} */
+    @Override public float getEntryProcessorMaxInvocationTime() {
+        return cache.clusterMetrics().getEntryProcessorMaxInvocationTime();
+    }
+
+    /** {@inheritDoc} */
     @Override public long getCacheRemovals() {
         return cache.clusterMetrics().getCacheRemovals();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5e8669af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java
index 212c7a0..790fe00 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java
@@ -286,6 +286,61 @@ class CacheLocalMetricsMXBeanImpl implements 
CacheMetricsMXBean {
     }
 
     /** {@inheritDoc} */
+    @Override public long getEntryProcessorPuts() {
+        return cache.metrics0().getEntryProcessorPuts();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getEntryProcessorRemovals() {
+        return cache.metrics0().getEntryProcessorRemovals();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getEntryProcessorReadOnlyInvocations() {
+        return cache.metrics0().getEntryProcessorReadOnlyInvocations();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getEntryProcessorInvocations() {
+        return cache.metrics0().getEntryProcessorInvocations();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getEntryProcessorHits() {
+        return cache.metrics0().getEntryProcessorHits();
+    }
+
+    /** {@inheritDoc} */
+    @Override public float getEntryProcessorHitPercentage() {
+        return cache.metrics0().getEntryProcessorHitPercentage();
+    }
+
+    /** {@inheritDoc} */
+    @Override public float getEntryProcessorMissPercentage() {
+        return cache.metrics0().getEntryProcessorMissPercentage();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getEntryProcessorMisses() {
+        return cache.metrics0().getEntryProcessorMisses();
+    }
+
+    /** {@inheritDoc} */
+    @Override public float getEntryProcessorAverageInvocationTime() {
+        return cache.metrics0().getEntryProcessorAverageInvocationTime();
+    }
+
+    /** {@inheritDoc} */
+    @Override public float getEntryProcessorMinInvocationTime() {
+        return cache.metrics0().getEntryProcessorMinInvocationTime();
+    }
+
+    /** {@inheritDoc} */
+    @Override public float getEntryProcessorMaxInvocationTime() {
+        return cache.metrics0().getEntryProcessorMaxInvocationTime();
+    }
+
+    /** {@inheritDoc} */
     @Override public long getCacheRemovals() {
         return cache.metrics0().getCacheRemovals();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5e8669af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
index 0f6d06f..e81e995 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
@@ -51,6 +51,30 @@ public class CacheMetricsImpl implements CacheMetrics {
     /** Number of reads. */
     private AtomicLong reads = new AtomicLong();
 
+    /** Number of invocations caused update. */
+    private AtomicLong entryProcessorPuts = new AtomicLong();
+
+    /** Number of invocations caused removal. */
+    private AtomicLong entryProcessorRemovals = new AtomicLong();
+
+    /** Number of invocations caused update. */
+    private AtomicLong entryProcessorReadOnlyInvocations = new AtomicLong();
+
+    /** Entry processor invoke time taken nanos. */
+    private AtomicLong entryProcessorInvokeTimeNanos = new AtomicLong();
+
+    /** So far, the minimum time to execute cache invokes. */
+    private AtomicLong entryProcessorMinInvocationTime = new AtomicLong();
+
+    /** So far, the maximum time to execute cache invokes. */
+    private AtomicLong entryProcessorMaxInvocationTime = new AtomicLong();
+
+    /** Number of entry processor invokes on keys, which exist in cache. */
+    private AtomicLong entryProcessorHits = new AtomicLong();
+
+    /** Number of entry processor invokes on keys, which don't exist in cache. 
*/
+    private AtomicLong entryProcessorMisses = new AtomicLong();
+
     /** Number of writes. */
     private AtomicLong writes = new AtomicLong();
 
@@ -439,6 +463,15 @@ public class CacheMetricsImpl implements CacheMetrics {
         commitTimeNanos.set(0);
         rollbackTimeNanos.set(0);
 
+        entryProcessorPuts.set(0);
+        entryProcessorRemovals.set(0);
+        entryProcessorReadOnlyInvocations.set(0);
+        entryProcessorMisses.set(0);
+        entryProcessorHits.set(0);
+        entryProcessorInvokeTimeNanos.set(0);
+        entryProcessorMaxInvocationTime.set(0);
+        entryProcessorMinInvocationTime.set(0);
+
         offHeapGets.set(0);
         offHeapPuts.set(0);
         offHeapRemoves.set(0);
@@ -495,6 +528,79 @@ public class CacheMetricsImpl implements CacheMetrics {
     }
 
     /** {@inheritDoc} */
+    @Override public long getEntryProcessorPuts() {
+        return entryProcessorPuts.get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getEntryProcessorRemovals() {
+        return entryProcessorRemovals.get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getEntryProcessorReadOnlyInvocations() {
+        return entryProcessorReadOnlyInvocations.get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getEntryProcessorInvocations() {
+        return entryProcessorReadOnlyInvocations.get() + 
entryProcessorPuts.get() + entryProcessorRemovals.get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getEntryProcessorHits() {
+        return entryProcessorHits.get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public float getEntryProcessorHitPercentage() {
+        long hits = entryProcessorHits.get();
+        long totalInvocations = getEntryProcessorInvocations();
+
+        if (hits == 0)
+            return 0;
+
+        return (float) hits / totalInvocations * 100.0f;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getEntryProcessorMisses() {
+        return entryProcessorMisses.get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public float getEntryProcessorMissPercentage() {
+        long misses = entryProcessorMisses.get();
+        long totalInvocations = getEntryProcessorInvocations();
+
+        if (misses == 0)
+            return 0;
+
+        return (float) misses / totalInvocations * 100.0f;
+    }
+
+    /** {@inheritDoc} */
+    @Override public float getEntryProcessorAverageInvocationTime() {
+        long totalInvokes = getEntryProcessorInvocations();
+        long timeNanos = entryProcessorInvokeTimeNanos.get();
+
+        if (timeNanos == 0 || totalInvokes == 0)
+            return 0;
+
+        return (1f * timeNanos) / totalInvokes / NANOS_IN_MICROSECOND;
+    }
+
+    /** {@inheritDoc} */
+    @Override public float getEntryProcessorMinInvocationTime() {
+        return (1f * entryProcessorMinInvocationTime.get()) / 
NANOS_IN_MICROSECOND;
+    }
+
+    /** {@inheritDoc} */
+    @Override public float getEntryProcessorMaxInvocationTime() {
+        return (1f * entryProcessorMaxInvocationTime.get()) / 
NANOS_IN_MICROSECOND;
+    }
+
+    /** {@inheritDoc} */
     @Override public long getCacheRemovals() {
         return rmCnt.get();
     }
@@ -554,6 +660,106 @@ public class CacheMetricsImpl implements CacheMetrics {
     }
 
     /**
+     * Cache invocations caused update callback.
+     *
+     * @param isHit Hit or miss flag.
+     */
+    public void onInvokeUpdate(boolean isHit) {
+        entryProcessorPuts.incrementAndGet();
+
+        if (isHit)
+            entryProcessorHits.incrementAndGet();
+        else
+            entryProcessorMisses.incrementAndGet();
+
+        if (delegate != null)
+            delegate.onInvokeUpdate(isHit);
+    }
+
+    /**
+     * Cache invocations caused removal callback.
+     *
+     * @param isHit Hit or miss flag.
+     */
+    public void onInvokeRemove(boolean isHit) {
+        entryProcessorRemovals.incrementAndGet();
+
+        if (isHit)
+            entryProcessorHits.incrementAndGet();
+        else
+            entryProcessorMisses.incrementAndGet();
+
+        if (delegate != null)
+            delegate.onInvokeRemove(isHit);
+    }
+
+    /**
+     * Read-only cache invocations.
+     *
+     * @param isHit Hit or miss flag.
+     */
+    public void onReadOnlyInvoke(boolean isHit) {
+        entryProcessorReadOnlyInvocations.incrementAndGet();
+
+        if (isHit)
+            entryProcessorHits.incrementAndGet();
+        else
+            entryProcessorMisses.incrementAndGet();
+
+        if (delegate != null)
+            delegate.onReadOnlyInvoke(isHit);
+    }
+
+    /**
+     * Increments invoke operation time nanos.
+     *
+     * @param duration Duration.
+     */
+    public void addInvokeTimeNanos(long duration) {
+        entryProcessorInvokeTimeNanos.addAndGet(duration);
+
+        recalculateInvokeMinTimeNanos(duration);
+
+        recalculateInvokeMaxTimeNanos(duration);
+
+        if (delegate != null)
+            delegate.addInvokeTimeNanos(duration);
+
+    }
+
+    /**
+     * Recalculates invoke operation minimum time nanos.
+     *
+     * @param duration Duration.
+     */
+    private void recalculateInvokeMinTimeNanos(long duration){
+        long minTime = entryProcessorMinInvocationTime.longValue();
+
+        while (minTime > duration || minTime == 0) {
+            if (entryProcessorMinInvocationTime.compareAndSet(minTime, 
duration))
+                break;
+            else
+                minTime = entryProcessorMinInvocationTime.longValue();
+        }
+    }
+
+    /**
+     * Recalculates invoke operation maximum time nanos.
+     *
+     * @param duration Duration.
+     */
+    private void recalculateInvokeMaxTimeNanos(long duration){
+        long maxTime = entryProcessorMaxInvocationTime.longValue();
+
+        while (maxTime < duration) {
+            if (entryProcessorMaxInvocationTime.compareAndSet(maxTime, 
duration))
+                break;
+            else
+                maxTime = entryProcessorMaxInvocationTime.longValue();
+        }
+    }
+
+    /**
      * Cache write callback.
      */
     public void onWrite() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/5e8669af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
index 8a0f0e4..5f3001c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
@@ -38,6 +38,57 @@ public class CacheMetricsSnapshot implements CacheMetrics, 
Externalizable {
     /** Number of puts. */
     private long puts = 0;
 
+    /** Number of invokes caused updates. */
+    private long entryProcessorPuts = 0;
+
+    /** Number of invokes caused no updates. */
+    private long entryProcessorReadOnlyInvocations = 0;
+
+    /**
+     * The mean time to execute cache invokes
+     */
+    private float entryProcessorAverageInvocationTime = 0;
+
+    /**
+     * The total number of cache invocations.
+     */
+    private long entryProcessorInvocations = 0;
+
+    /**
+     * The total number of cache invocations, caused removal.
+     */
+    private long entryProcessorRemovals = 0;
+
+    /**
+     * The total number of invocations on keys, which don't exist in cache.
+     */
+    private long entryProcessorMisses = 0;
+
+    /**
+     * The total number of invocations on keys, which exist in cache.
+     */
+    private long entryProcessorHits = 0;
+
+    /**
+     * The percentage of invocations on keys, which don't exist in cache.
+     */
+    private float entryProcessorMissPercentage = 0;
+
+    /**
+     * The percentage of invocations on keys, which exist in cache.
+     */
+    private float entryProcessorHitPercentage = 0;
+
+    /**
+     * So far, the maximum time to execute cache invokes.
+     */
+    private float entryProcessorMaxInvocationTime = 0;
+
+    /**
+     * So far, the minimum time to execute cache invokes.
+     */
+    private float entryProcessorMinInvocationTime = 0;
+
     /** Number of hits. */
     private long hits = 0;
 
@@ -270,6 +321,18 @@ public class CacheMetricsSnapshot implements CacheMetrics, 
Externalizable {
         evicts = m.getCacheEvictions();
         removes = m.getCacheRemovals();
 
+        entryProcessorPuts = m.getEntryProcessorPuts();
+        entryProcessorReadOnlyInvocations = 
m.getEntryProcessorReadOnlyInvocations();
+        entryProcessorInvocations = m.getEntryProcessorInvocations();
+        entryProcessorRemovals = m.getEntryProcessorRemovals();
+        entryProcessorMisses = m.getEntryProcessorMisses();
+        entryProcessorHits = m.getEntryProcessorHits();
+        entryProcessorMissPercentage = m.getEntryProcessorMissPercentage();
+        entryProcessorHitPercentage = m.getEntryProcessorHitPercentage();
+        entryProcessorAverageInvocationTime = 
m.getEntryProcessorAverageInvocationTime();
+        entryProcessorMaxInvocationTime = 
m.getEntryProcessorMaxInvocationTime();
+        entryProcessorMinInvocationTime = 
m.getEntryProcessorMinInvocationTime();
+
         putAvgTimeNanos = m.getAveragePutTime();
         getAvgTimeNanos = m.getAverageGetTime();
         rmvAvgTimeNanos = m.getAverageRemoveTime();
@@ -386,6 +449,18 @@ public class CacheMetricsSnapshot implements CacheMetrics, 
Externalizable {
             evicts += e.getCacheEvictions();
             removes += e.getCacheRemovals();
 
+            entryProcessorPuts = e.getEntryProcessorPuts();
+            entryProcessorReadOnlyInvocations = 
e.getEntryProcessorReadOnlyInvocations();
+            entryProcessorInvocations = e.getEntryProcessorInvocations();
+            entryProcessorRemovals = e.getEntryProcessorRemovals();
+            entryProcessorMisses = e.getEntryProcessorMisses();
+            entryProcessorHits = e.getEntryProcessorHits();
+            entryProcessorMissPercentage = e.getEntryProcessorMissPercentage();
+            entryProcessorHitPercentage = e.getEntryProcessorHitPercentage();
+            entryProcessorAverageInvocationTime = 
e.getEntryProcessorAverageInvocationTime();
+            entryProcessorMaxInvocationTime = 
e.getEntryProcessorMaxInvocationTime();
+            entryProcessorMinInvocationTime = 
e.getEntryProcessorMinInvocationTime();
+
             putAvgTimeNanos += e.getAveragePutTime();
             getAvgTimeNanos += e.getAverageGetTime();
             rmvAvgTimeNanos += e.getAverageRemoveTime();
@@ -524,6 +599,61 @@ public class CacheMetricsSnapshot implements CacheMetrics, 
Externalizable {
     }
 
     /** {@inheritDoc} */
+    @Override public long getEntryProcessorPuts() {
+        return entryProcessorPuts;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getEntryProcessorReadOnlyInvocations() {
+        return entryProcessorReadOnlyInvocations;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getEntryProcessorInvocations() {
+        return entryProcessorInvocations;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getEntryProcessorHits() {
+        return entryProcessorHits;
+    }
+
+    /** {@inheritDoc} */
+    @Override public float getEntryProcessorHitPercentage() {
+        return entryProcessorHitPercentage;
+    }
+
+    /** {@inheritDoc} */
+    @Override public float getEntryProcessorMissPercentage() {
+        return entryProcessorMissPercentage;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getEntryProcessorMisses() {
+        return entryProcessorMisses;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getEntryProcessorRemovals() {
+        return entryProcessorRemovals;
+    }
+
+    /** {@inheritDoc} */
+    @Override public float getEntryProcessorAverageInvocationTime() {
+        return entryProcessorAverageInvocationTime;
+    }
+
+    /** {@inheritDoc} */
+    @Override public float getEntryProcessorMinInvocationTime() {
+        return entryProcessorMinInvocationTime;
+    }
+
+    /** {@inheritDoc} */
+    @Override public float getEntryProcessorMaxInvocationTime() {
+        return entryProcessorMaxInvocationTime;
+    }
+
+    /** {@inheritDoc} */
     @Override public long getCacheRemovals() {
         return removes;
     }
@@ -950,6 +1080,18 @@ public class CacheMetricsSnapshot implements 
CacheMetrics, Externalizable {
         out.writeLong(rebalanceStartTime);
         out.writeLong(rebalanceFinishTime);
         out.writeLong(rebalanceClearingPartitionsLeft);
+
+        out.writeLong(entryProcessorPuts);
+        out.writeFloat(entryProcessorAverageInvocationTime);
+        out.writeLong(entryProcessorInvocations);
+        out.writeFloat(entryProcessorMaxInvocationTime);
+        out.writeFloat(entryProcessorMinInvocationTime);
+        out.writeLong(entryProcessorReadOnlyInvocations);
+        out.writeFloat(entryProcessorHitPercentage);
+        out.writeLong(entryProcessorHits);
+        out.writeLong(entryProcessorMisses);
+        out.writeFloat(entryProcessorMissPercentage);
+        out.writeLong(entryProcessorRemovals);
     }
 
     /** {@inheritDoc} */
@@ -1011,5 +1153,20 @@ public class CacheMetricsSnapshot implements 
CacheMetrics, Externalizable {
         rebalanceStartTime = in.readLong();
         rebalanceFinishTime = in.readLong();
         rebalanceClearingPartitionsLeft = in.readLong();
+
+        // 11 long and 5 float values give 108 bytes in total.
+        if (in.available() >= 108) {
+            entryProcessorPuts = in.readLong();
+            entryProcessorAverageInvocationTime = in.readFloat();
+            entryProcessorInvocations = in.readLong();
+            entryProcessorMaxInvocationTime = in.readFloat();
+            entryProcessorMinInvocationTime = in.readFloat();
+            entryProcessorReadOnlyInvocations = in.readLong();
+            entryProcessorHitPercentage = in.readFloat();
+            entryProcessorHits = in.readLong();
+            entryProcessorMisses = in.readLong();
+            entryProcessorMissPercentage = in.readFloat();
+            entryProcessorRemovals = in.readLong();
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5e8669af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index c59e84e..eeed4fa 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -2542,6 +2542,10 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
                 if (topVer != null)
                     tx.topologyVersion(topVer);
 
+                final boolean statsEnabled = ctx.statisticsEnabled();
+
+                final long start = statsEnabled ? System.nanoTime() : 0L;
+
                 IgniteInternalFuture<GridCacheReturn> fut = tx.invokeAsync(ctx,
                     null,
                     key,
@@ -2550,6 +2554,9 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
 
                 Map<K, EntryProcessorResult<T>> resMap = fut.get().value();
 
+                if (statsEnabled)
+                    metrics0().addInvokeTimeNanos(System.nanoTime() - start);
+
                 EntryProcessorResult<T> res = null;
 
                 if (resMap != null) {
@@ -2572,6 +2579,10 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         if (keyCheck)
             validateCacheKeys(keys);
 
+        final boolean statsEnabled = ctx.statisticsEnabled();
+
+        final long start = statsEnabled ? System.nanoTime() : 0L;
+
         return syncOp(new SyncOp<Map<K, EntryProcessorResult<T>>>(keys.size() 
== 1) {
             @Override public Map<K, EntryProcessorResult<T>> 
op(GridNearTxLocal tx)
                 throws IgniteCheckedException {
@@ -2586,6 +2597,9 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
 
                 Map<K, EntryProcessorResult<T>> res = fut.get().value();
 
+                if (statsEnabled)
+                    metrics0().addInvokeTimeNanos(System.nanoTime() - start);
+
                 return res != null ? res : Collections.<K, 
EntryProcessorResult<T>>emptyMap();
             }
         });
@@ -2602,6 +2616,10 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         if (keyCheck)
             validateCacheKey(key);
 
+        final boolean statsEnabled = ctx.statisticsEnabled();
+
+        final long start = statsEnabled ? System.nanoTime() : 0L;
+
         IgniteInternalFuture<?> fut = asyncOp(new AsyncOp() {
             @Override public IgniteInternalFuture op(GridNearTxLocal tx, 
AffinityTopologyVersion readyTopVer) {
                 Map<? extends K, EntryProcessor<K, V, Object>> invokeMap =
@@ -2624,6 +2642,9 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
                 throws IgniteCheckedException {
                 GridCacheReturn ret = fut.get();
 
+                if (statsEnabled)
+                    metrics0().addInvokeTimeNanos(System.nanoTime() - start);
+
                 Map<K, EntryProcessorResult<T>> resMap = ret.value();
 
                 if (resMap != null) {
@@ -2647,6 +2668,10 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         if (keyCheck)
             validateCacheKeys(keys);
 
+        final boolean statsEnabled = ctx.statisticsEnabled();
+
+        final long start = statsEnabled ? System.nanoTime() : 0L;
+
         IgniteInternalFuture<?> fut = asyncOp(new AsyncOp(keys) {
             @Override public IgniteInternalFuture<GridCacheReturn> 
op(GridNearTxLocal tx,
                 AffinityTopologyVersion readyTopVer) {
@@ -2674,6 +2699,9 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
                 throws IgniteCheckedException {
                 GridCacheReturn ret = fut.get();
 
+                if (statsEnabled)
+                    metrics0().addInvokeTimeNanos(System.nanoTime() - start);
+
                 assert ret != null;
 
                 return ret.value() != null ? ret.<Map<K, 
EntryProcessorResult<T>>>value() : Collections.<K, 
EntryProcessorResult<T>>emptyMap();
@@ -2690,6 +2718,10 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         if (keyCheck)
             validateCacheKeys(map.keySet());
 
+        final boolean statsEnabled = ctx.statisticsEnabled();
+
+        final long start = statsEnabled ? System.nanoTime() : 0L;
+
         IgniteInternalFuture<?> fut = asyncOp(new AsyncOp(map.keySet()) {
             @Override public IgniteInternalFuture<GridCacheReturn> 
op(GridNearTxLocal tx,
                 AffinityTopologyVersion readyTopVer) {
@@ -2712,6 +2744,9 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
                 throws IgniteCheckedException {
                 GridCacheReturn ret = fut.get();
 
+                if (statsEnabled)
+                    metrics0().addInvokeTimeNanos(System.nanoTime() - start);
+
                 assert ret != null;
 
                 return ret.value() != null ? ret.<Map<K, 
EntryProcessorResult<T>>>value() : Collections.<K, 
EntryProcessorResult<T>>emptyMap();
@@ -2728,13 +2763,22 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         if (keyCheck)
             validateCacheKeys(map.keySet());
 
+        final boolean statsEnabled = ctx.statisticsEnabled();
+
+        final long start = statsEnabled ? System.nanoTime() : 0L;
+
         return syncOp(new SyncOp<Map<K, EntryProcessorResult<T>>>(map.size() 
== 1) {
             @Nullable @Override public Map<K, EntryProcessorResult<T>> 
op(GridNearTxLocal tx)
                 throws IgniteCheckedException {
                 IgniteInternalFuture<GridCacheReturn> fut =
                     tx.invokeAsync(ctx, null, (Map<? extends K, ? extends 
EntryProcessor<K, V, Object>>)map, args);
 
-                return fut.get().value();
+                Map<K, EntryProcessorResult<T>> value = fut.get().value();
+
+                if (statsEnabled)
+                    metrics0().addInvokeTimeNanos(System.nanoTime() - start);
+
+                return value;
             }
         });
     }
@@ -6192,6 +6236,27 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
     }
 
     /**
+     *
+     */
+    protected static class InvokeAllTimeStatClosure<T> extends 
UpdateTimeStatClosure {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /**
+         * @param metrics Metrics.
+         * @param start Start time.
+         */
+        public InvokeAllTimeStatClosure(CacheMetricsImpl metrics, final long 
start) {
+            super(metrics, start);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void updateTimeStat() {
+            metrics.addInvokeTimeNanos(System.nanoTime() - start);
+        }
+    }
+
+    /**
      * Delayed callable class.
      */
     public static abstract class TopologyVersionAwareJob extends 
ComputeJobAdapter {

http://git-wip-us.apache.org/repos/asf/ignite/blob/5e8669af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index fc374bb..aef38e6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -450,6 +450,7 @@ public interface GridCacheEntryEx {
      * @param taskName Task name.
      * @param updateCntr Update counter.
      * @param fut Dht atomic future.
+     * @param transformOp {@code True} if transform operation caused update.
      * @return Tuple where first value is flag showing whether operation 
succeeded,
      *      second value is old entry value if return value is requested, 
third is updated entry value,
      *      fourth is the version to enqueue for deferred delete the fifth is 
DR conflict context
@@ -486,7 +487,8 @@ public interface GridCacheEntryEx {
         String taskName,
         @Nullable CacheObject prevVal,
         @Nullable Long updateCntr,
-        @Nullable GridDhtAtomicAbstractUpdateFuture fut
+        @Nullable GridDhtAtomicAbstractUpdateFuture fut,
+        boolean transformOp
     ) throws IgniteCheckedException, GridCacheEntryRemovedException;
 
     /**
@@ -506,6 +508,7 @@ public interface GridCacheEntryEx {
      * @param intercept If {@code true} then calls cache interceptor.
      * @param subjId Subject ID initiated this update.
      * @param taskName Task name.
+     * @param transformOp {@code True} if transform operation caused update.
      * @return Tuple containing success flag, old value and result for invoke 
operation.
      * @throws IgniteCheckedException If update failed.
      * @throws GridCacheEntryRemovedException If entry is obsolete.
@@ -525,7 +528,8 @@ public interface GridCacheEntryEx {
         @Nullable CacheEntryPredicate[] filter,
         boolean intercept,
         @Nullable UUID subjId,
-        String taskName
+        String taskName,
+        boolean transformOp
     ) throws IgniteCheckedException, GridCacheEntryRemovedException;
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/5e8669af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 03f6d11..9f9b5c5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -74,6 +74,7 @@ import org.apache.ignite.internal.util.lang.GridTuple3;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.T3;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -89,8 +90,11 @@ import static 
org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ;
 import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED;
 import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_UNLOCKED;
 import static 
org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
+import static 
org.apache.ignite.internal.processors.cache.GridCacheOperation.READ;
 import static 
org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
 import static 
org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
+import static 
org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult.UpdateOutcome.INVOKE_NO_OP;
+import static 
org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult.UpdateOutcome.REMOVE_NO_VAL;
 import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE;
 
 /**
@@ -1045,9 +1049,15 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
 
             recordNodeId(affNodeId, topVer);
 
-            if (metrics && cctx.statisticsEnabled())
+            if (metrics && cctx.statisticsEnabled()) {
                 cctx.cache().metrics0().onWrite();
 
+                T2<GridCacheOperation, CacheObject> entryProcRes = 
tx.entry(txKey()).entryProcessorCalculatedValue();
+
+                if (entryProcRes != null && UPDATE.equals(entryProcRes.get1()))
+                    cctx.cache().metrics0().onInvokeUpdate(old != null);
+            }
+
             if (evt && newVer != null && 
cctx.events().isRecordable(EVT_CACHE_OBJECT_PUT)) {
                 CacheObject evtOld = cctx.unwrapTemporary(old);
 
@@ -1230,9 +1240,15 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
 
             drReplicate(drType, null, newVer, topVer);
 
-            if (metrics && cctx.statisticsEnabled())
+            if (metrics && cctx.statisticsEnabled()) {
                 cctx.cache().metrics0().onRemove();
 
+                T2<GridCacheOperation, CacheObject> entryProcRes = 
tx.entry(txKey()).entryProcessorCalculatedValue();
+
+                if (entryProcRes != null && DELETE.equals(entryProcRes.get1()))
+                    cctx.cache().metrics0().onInvokeRemove(old != null);
+            }
+
             if (tx == null)
                 obsoleteVer = newVer;
             else {
@@ -1358,7 +1374,8 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
         @Nullable CacheEntryPredicate[] filter,
         boolean intercept,
         @Nullable UUID subjId,
-        String taskName
+        String taskName,
+        boolean transformOp
     ) throws IgniteCheckedException, GridCacheEntryRemovedException {
         assert cctx.isLocal() && cctx.atomic();
 
@@ -1479,6 +1496,8 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                 try {
                     Object computed = entryProcessor.process(entry, 
invokeArgs);
 
+                    transformOp = true;
+
                     if (entry.modified()) {
                         updated0 = cctx.unwrapTemporary(entry.getValue());
 
@@ -1503,6 +1522,8 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                     if (expiryPlc != null && !readFromStore && 
hasValueUnlocked())
                         updateTtl(expiryPlc);
 
+                    updateMetrics(READ, metrics, transformOp, old != null);
+
                     return new GridTuple3<>(false, null, invokeRes);
                 }
             }
@@ -1564,9 +1585,13 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                 }
             }
 
-            if (ttl == CU.TTL_ZERO)
+            if (ttl == CU.TTL_ZERO) {
                 op = GridCacheOperation.DELETE;
 
+                //If time expired no transformation needed.
+                transformOp = false;
+            }
+
             // Try write-through.
             if (op == GridCacheOperation.UPDATE) {
                 // Detach value before index update.
@@ -1634,7 +1659,9 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
             }
 
             if (res)
-                updateMetrics(op, metrics);
+                updateMetrics(op, metrics, transformOp, old != null);
+            else if (op == DELETE && transformOp)
+                cctx.cache().metrics0().onInvokeRemove(old != null);
 
             if (lsnrCol != null) {
                 long updateCntr = 
nextPartitionCounter(AffinityTopologyVersion.NONE, true, null);
@@ -1705,7 +1732,8 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
         final String taskName,
         @Nullable final CacheObject prevVal,
         @Nullable final Long updateCntr,
-        @Nullable final GridDhtAtomicAbstractUpdateFuture fut
+        @Nullable final GridDhtAtomicAbstractUpdateFuture fut,
+        boolean transformOp
     ) throws IgniteCheckedException, GridCacheEntryRemovedException, 
GridClosureException {
         assert cctx.atomic() && !detached();
 
@@ -1769,13 +1797,20 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
             CacheObject updateVal = null;
             GridCacheVersion updateVer = c.newVer;
 
+            boolean updateMetrics = metrics && cctx.statisticsEnabled();
+
             // Apply metrics.
-            if (metrics &&
+            if (updateMetrics &&
                 updateRes.outcome().updateReadMetrics() &&
-                cctx.statisticsEnabled() &&
                 needVal)
                     cctx.cache().metrics0().onRead(oldVal != null);
 
+            if (updateMetrics && INVOKE_NO_OP.equals(updateRes.outcome()) && 
(transformOp || updateRes.transformed()))
+                cctx.cache().metrics0().onReadOnlyInvoke(oldVal != null);
+            else if (updateMetrics && REMOVE_NO_VAL.equals(updateRes.outcome())
+                    && (transformOp || updateRes.transformed()))
+                cctx.cache().metrics0().onInvokeRemove(oldVal != null);
+
             switch (updateRes.outcome()) {
                 case VERSION_CHECK_FAILED: {
                     if (!cctx.isNear()) {
@@ -1913,7 +1948,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
             }
 
             if (updateRes.success())
-                updateMetrics(c.op, metrics);
+                updateMetrics(c.op, metrics, transformOp || 
updateRes.transformed(), oldVal != null);
 
             // Continuous query filter should be perform under lock.
             if (lsnrs != null) {
@@ -4225,13 +4260,24 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
      *
      * @param op Operation.
      * @param metrics Update merics flag.
+     * @param transformed {@code True} if transform operation caused update.
+     * @param hasOldVal {@code True} if entry has old value.
      */
-    private void updateMetrics(GridCacheOperation op, boolean metrics) {
+    private void updateMetrics(GridCacheOperation op, boolean metrics, boolean 
transformed, boolean hasOldVal) {
         if (metrics && cctx.statisticsEnabled()) {
-            if (op == GridCacheOperation.DELETE)
+            if (op == GridCacheOperation.DELETE) {
                 cctx.cache().metrics0().onRemove();
-            else
+
+                if (transformed)
+                    cctx.cache().metrics0().onInvokeRemove(hasOldVal);
+            } else if (op == READ && transformed)
+                cctx.cache().metrics0().onReadOnlyInvoke(hasOldVal);
+            else {
                 cctx.cache().metrics0().onWrite();
+
+                if (transformed)
+                    cctx.cache().metrics0().onInvokeUpdate(hasOldVal);
+            }
         }
     }
 
@@ -4699,12 +4745,16 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
 
             boolean invoke = op == TRANSFORM;
 
+            boolean transformed = false;
+
             if (invoke) {
                 invokeEntry = new CacheInvokeEntry<>(entry.key, oldVal, 
entry.ver, keepBinary, entry);
 
                 invokeRes = runEntryProcessor(invokeEntry);
 
                 op = writeObj == null ? DELETE : UPDATE;
+
+                transformed = true;
             }
 
             CacheObject newVal = (CacheObject)writeObj;
@@ -4747,7 +4797,8 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                         CU.EXPIRE_TIME_ETERNAL,
                         null,
                         null,
-                        0);
+                        0,
+                        false);
 
                     return;
                 }
@@ -4765,7 +4816,8 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                         CU.EXPIRE_TIME_ETERNAL,
                         null,
                         null,
-                        0);
+                        0,
+                        true);
 
                     return;
                 }
@@ -4784,7 +4836,8 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                             CU.EXPIRE_TIME_ETERNAL,
                             null,
                             null,
-                            0);
+                            0,
+                            false);
 
                         return;
                     }
@@ -4805,12 +4858,12 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
             if (op == UPDATE) {
                 assert writeObj != null;
 
-                update(conflictCtx, invokeRes, storeLoadedVal != null);
+                update(conflictCtx, invokeRes, storeLoadedVal != null, 
transformed);
             }
             else {
                 assert op == DELETE && writeObj == null : op;
 
-                remove(conflictCtx, invokeRes, storeLoadedVal != null);
+                remove(conflictCtx, invokeRes, storeLoadedVal != null, 
transformed);
             }
 
             assert updateRes != null && treeOp != null;
@@ -4934,11 +4987,13 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
          * @param conflictCtx Conflict context.
          * @param invokeRes Entry processor result (for invoke operation).
          * @param readFromStore {@code True} if initial entry value was {@code 
null} and it was read from store.
+         * @param transformed {@code True} if update caused by transformation 
operation.
          * @throws IgniteCheckedException If failed.
          */
         private void update(@Nullable GridCacheVersionConflictContext<?, ?> 
conflictCtx,
             @Nullable IgniteBiTuple<Object, Exception> invokeRes,
-            boolean readFromStore)
+            boolean readFromStore,
+            boolean transformed)
             throws IgniteCheckedException
         {
             GridCacheContext cctx = entry.context();
@@ -4979,7 +5034,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
 
                         writeObj = null;
 
-                        remove(conflictCtx, invokeRes, readFromStore);
+                        remove(conflictCtx, invokeRes, readFromStore, false);
 
                         return;
                     }
@@ -5018,7 +5073,8 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                         CU.EXPIRE_TIME_ETERNAL,
                         null,
                         null,
-                        0);
+                        0,
+                        false);
 
                     return;
                 }
@@ -5081,19 +5137,22 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                 newSysExpireTime,
                 null,
                 conflictCtx,
-                updateCntr0);
+                updateCntr0,
+                transformed);
         }
 
         /**
          * @param conflictCtx Conflict context.
          * @param invokeRes Entry processor result (for invoke operation).
          * @param readFromStore {@code True} if initial entry value was {@code 
null} and it was read from store.
+         * @param transformed {@code True} if remove caused by tranformation 
operation.
          * @throws IgniteCheckedException If failed.
          */
         @SuppressWarnings("unchecked")
         private void remove(@Nullable GridCacheVersionConflictContext<?, ?> 
conflictCtx,
             @Nullable IgniteBiTuple<Object, Exception> invokeRes,
-            boolean readFromStore)
+            boolean readFromStore,
+            boolean transformed)
             throws IgniteCheckedException
         {
             GridCacheContext cctx = entry.context();
@@ -5123,7 +5182,8 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                         CU.EXPIRE_TIME_ETERNAL,
                         null,
                         null,
-                        0);
+                        0,
+                        false);
 
                     return;
                 }
@@ -5178,7 +5238,8 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                 CU.EXPIRE_TIME_CALCULATE,
                 enqueueVer,
                 conflictCtx,
-                updateCntr0);
+                updateCntr0,
+                transformed);
         }
 
         /**
@@ -5259,7 +5320,8 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                         CU.EXPIRE_TIME_ETERNAL,
                         null,
                         null,
-                        0);
+                        0,
+                        false);
                 }
                 // Will update something.
                 else {
@@ -5325,7 +5387,8 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                         CU.EXPIRE_TIME_ETERNAL,
                         null,
                         null,
-                        0);
+                        0,
+                        false);
                 }
             }
             else

http://git-wip-us.apache.org/repos/asf/ignite/blob/5e8669af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
index 97cb534..0bf1c57 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
@@ -57,6 +57,9 @@ public class GridCacheUpdateAtomicResult {
     /** */
     private final long updateCntr;
 
+    /** Flag indicating whether value is transformed. */
+    private boolean transformed;
+
     /** Value computed by entry processor. */
     private IgniteBiTuple<Object, Exception> res;
 
@@ -72,6 +75,7 @@ public class GridCacheUpdateAtomicResult {
      * @param rmvVer Version for deferred delete.
      * @param conflictRes DR resolution result.
      * @param updateCntr Partition update counter.
+     * @param transformed {@code True} if result was transformed.
      */
     GridCacheUpdateAtomicResult(UpdateOutcome outcome,
         @Nullable CacheObject oldVal,
@@ -81,7 +85,8 @@ public class GridCacheUpdateAtomicResult {
         long conflictExpireTime,
         @Nullable GridCacheVersion rmvVer,
         @Nullable GridCacheVersionConflictContext<?, ?> conflictRes,
-        long updateCntr) {
+        long updateCntr,
+        boolean transformed) {
         assert outcome != null;
 
         this.outcome = outcome;
@@ -93,6 +98,7 @@ public class GridCacheUpdateAtomicResult {
         this.rmvVer = rmvVer;
         this.conflictRes = conflictRes;
         this.updateCntr = updateCntr;
+        this.transformed = transformed;
     }
 
     /**
@@ -240,6 +246,13 @@ public class GridCacheUpdateAtomicResult {
         }
     }
 
+    /**
+     * @return {@code True} if transformed.
+     */
+    public boolean transformed() {
+        return transformed;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridCacheUpdateAtomicResult.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/5e8669af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index d02b851..ba1210e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -440,6 +440,11 @@ public final class GridDhtTxPrepareFuture extends 
GridCacheCompoundFuture<Ignite
                             GridCacheOperation op = modified ? (val == null ? 
DELETE : UPDATE) : NOOP;
 
                             if (op == NOOP) {
+                                GridCacheAdapter<?, ?> cache = 
writeEntry.context().cache();
+
+                                if (cache.context().statisticsEnabled())
+                                    cache.metrics0().onReadOnlyInvoke(oldVal 
!= null);
+
                                 if (expiry != null) {
                                     long ttl = 
CU.toTtl(expiry.getExpiryForAccess());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5e8669af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
index e8824e7..c04e1af 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
@@ -38,6 +38,7 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import 
org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.GridCacheFutureAdapter;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
@@ -161,6 +162,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture 
extends GridCacheFutureA
      * @param addPrevVal If {@code true} sends previous value to backups.
      * @param prevVal Previous value.
      * @param updateCntr Partition update counter.
+     * @param cacheOp Corresponding cache operation.
      */
     @SuppressWarnings("ForLoopReplaceableByForEach")
     final void addWriteEntry(
@@ -173,7 +175,8 @@ public abstract class GridDhtAtomicAbstractUpdateFuture 
extends GridCacheFutureA
         @Nullable GridCacheVersion conflictVer,
         boolean addPrevVal,
         @Nullable CacheObject prevVal,
-        long updateCntr) {
+        long updateCntr,
+        GridCacheOperation cacheOp) {
         AffinityTopologyVersion topVer = updateReq.topologyVersion();
 
         List<ClusterNode> affNodes = affAssignment.get(entry.partition());
@@ -222,7 +225,8 @@ public abstract class GridDhtAtomicAbstractUpdateFuture 
extends GridCacheFutureA
                     conflictVer,
                     addPrevVal,
                     prevVal,
-                    updateCntr);
+                    updateCntr,
+                    cacheOp);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5e8669af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
index a50b68c..a5e9feb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
@@ -28,6 +28,7 @@ import 
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
 import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -59,6 +60,9 @@ public abstract class GridDhtAtomicAbstractUpdateRequest 
extends GridCacheIdMess
     /** */
     protected static final int DHT_ATOMIC_OBSOLETE_NEAR_KEY_FLAG_MASK = 0x20;
 
+    /** Flag indicating transformation operation was performed. */
+    protected static final int DHT_ATOMIC_TRANSFORM_OP_FLAG_MASK = 0x40;
+
     /** Message index. */
     public static final int CACHE_MSG_IDX = nextIndexId();
 
@@ -225,6 +229,13 @@ public abstract class GridDhtAtomicAbstractUpdateRequest 
extends GridCacheIdMess
     }
 
     /**
+     * @return {@code True} if transformation operation was performed.
+     */
+    public final boolean transformOperation() {
+        return isFlag(DHT_ATOMIC_TRANSFORM_OP_FLAG_MASK);
+    }
+
+    /**
      * @return {@code True} if on response flag changed.
      */
     public boolean onResponse() {
@@ -268,6 +279,7 @@ public abstract class GridDhtAtomicAbstractUpdateRequest 
extends GridCacheIdMess
      * @param addPrevVal If {@code true} adds previous value.
      * @param prevVal Previous value.
      * @param updateCntr Update counter.
+     * @param cacheOp Corresponding cache operation.
      */
     public abstract void addWriteValue(KeyCacheObject key,
         @Nullable CacheObject val,
@@ -277,8 +289,8 @@ public abstract class GridDhtAtomicAbstractUpdateRequest 
extends GridCacheIdMess
         @Nullable GridCacheVersion conflictVer,
         boolean addPrevVal,
         @Nullable CacheObject prevVal,
-        long updateCntr
-    );
+        long updateCntr,
+        GridCacheOperation cacheOp);
 
     /**
      * @param key Key to add.

http://git-wip-us.apache.org/repos/asf/ignite/blob/5e8669af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 8408b32..c39842e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -820,6 +820,10 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
         if (keyCheck)
             validateCacheKey(key);
 
+        final boolean statsEnabled = ctx.statisticsEnabled();
+
+        final long start = statsEnabled ? System.nanoTime() : 0L;
+
         CacheOperationContext opCtx = ctx.operationContextPerCall();
 
         final boolean keepBinary = opCtx != null && opCtx.isKeepBinary();
@@ -838,6 +842,9 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                 throws IgniteCheckedException {
                 Map<K, EntryProcessorResult<T>> resMap = fut.get();
 
+                if (statsEnabled)
+                    metrics0().addInvokeTimeNanos(System.nanoTime() - start);
+
                 if (resMap != null) {
                     assert resMap.isEmpty() || resMap.size() == 1 : 
resMap.size();
 
@@ -885,6 +892,10 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
         if (keyCheck)
             validateCacheKeys(keys);
 
+        final boolean statsEnabled = ctx.statisticsEnabled();
+
+        final long start = statsEnabled ? System.nanoTime() : 0L;
+
         Map<? extends K, EntryProcessor> invokeMap = F.viewAsMap(keys, new 
C1<K, EntryProcessor>() {
             @Override public EntryProcessor apply(K k) {
                 return entryProcessor;
@@ -912,6 +923,9 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                 ) throws IgniteCheckedException {
                     Map<Object, EntryProcessorResult> resMap = (Map)fut.get();
 
+                    if (statsEnabled)
+                        metrics0().addInvokeTimeNanos(System.nanoTime() - 
start);
+
                     return ctx.unwrapInvokeResult(resMap, keepBinary);
                 }
             });
@@ -926,7 +940,11 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
         if (keyCheck)
             validateCacheKeys(map.keySet());
 
-        return (Map<K, EntryProcessorResult<T>>)updateAll0(null,
+        final boolean statsEnabled = ctx.statisticsEnabled();
+
+        final long start = statsEnabled ? System.nanoTime() : 0L;
+
+        Map<K, EntryProcessorResult<T>> updateResults = (Map<K, 
EntryProcessorResult<T>>) updateAll0(null,
             map,
             args,
             null,
@@ -935,6 +953,11 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
             false,
             TRANSFORM,
             false).get();
+
+        if (statsEnabled)
+            metrics0().addInvokeTimeNanos(System.nanoTime() - start);
+
+        return updateResults;
     }
 
     /** {@inheritDoc} */
@@ -947,7 +970,11 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
         if (keyCheck)
             validateCacheKeys(map.keySet());
 
-        return updateAll0(null,
+        final boolean statsEnabled = ctx.statisticsEnabled();
+
+        final long start = statsEnabled ? System.nanoTime() : 0L;
+
+        IgniteInternalFuture updateResults = updateAll0(null,
             map,
             args,
             null,
@@ -956,6 +983,11 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
             false,
             TRANSFORM,
             true);
+
+        if (statsEnabled)
+            updateResults.listen(new InvokeAllTimeStatClosure(metrics0(), 
start));
+
+        return updateResults;
     }
 
     /**
@@ -2077,9 +2109,12 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                             curInvokeRes = 
CacheInvokeResult.fromResult(computed);
                         }
 
-                        if (!invokeEntry.modified())
+                        if (!invokeEntry.modified()) {
+                            if (ctx.statisticsEnabled())
+                                ctx.cache().metrics0().onReadOnlyInvoke(old != 
null);
+
                             continue;
-                        else {
+                        } else {
                             updatedVal = 
ctx.unwrapTemporary(invokeEntry.getValue());
 
                             updated = ctx.toCacheObject(updatedVal);
@@ -2461,7 +2496,8 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                     taskName,
                     /*prevVal*/null,
                     /*updateCntr*/null,
-                    dhtFut);
+                    dhtFut,
+                    false);
 
                 if (dhtFut != null) {
                     if (updRes.sendToDht()) { // Send to backups even in case 
of remove-remove scenarios.
@@ -2484,7 +2520,8 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                             newConflictVer,
                             sndPrevVal,
                             updRes.oldValue(),
-                            updRes.updateCounter());
+                            updRes.updateCounter(),
+                            op);
 
                         if (readers != null)
                             dhtFut.addNearWriteEntries(
@@ -2687,6 +2724,9 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                     // Get readers before innerUpdate (reader cleared after 
remove).
                     GridDhtCacheEntry.ReaderId[] readers = 
entry.readersLocked();
 
+                    EntryProcessor<Object, Object, Object> entryProcessor =
+                        entryProcessorMap == null ? null : 
entryProcessorMap.get(entry.key());
+
                     GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
                         ver,
                         nearNode.id(),
@@ -2715,7 +2755,8 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                         taskName,
                         null,
                         null,
-                        dhtFut);
+                        dhtFut,
+                        entryProcessor != null);
 
                     assert !updRes.success() || updRes.newTtl() == 
CU.TTL_NOT_CHANGED || expiry != null :
                         "success=" + updRes.success() + ", newTtl=" + 
updRes.newTtl() + ", expiry=" + expiry;
@@ -2740,8 +2781,6 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                     batchRes.addDeleted(entry, updRes, entries);
 
                     if (dhtFut != null) {
-                        EntryProcessor<Object, Object, Object> entryProcessor =
-                            entryProcessorMap == null ? null : 
entryProcessorMap.get(entry.key());
 
                         dhtFut.addWriteEntry(
                             affAssignment,
@@ -2753,7 +2792,8 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                             null,
                             sndPrevVal,
                             updRes.oldValue(),
-                            updRes.updateCounter());
+                            updRes.updateCounter(),
+                            op);
 
                         if (readers != null)
                             dhtFut.addNearWriteEntries(
@@ -3201,7 +3241,8 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                                 taskName,
                                 prevVal,
                                 updateIdx,
-                                null);
+                                null,
+                                req.transformOperation());
 
                             if (updRes.removeVersion() != null)
                                 ctx.onDeferredDelete(entry, 
updRes.removeVersion());

http://git-wip-us.apache.org/repos/asf/ignite/blob/5e8669af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
index 0ade243..19b24b0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
@@ -26,6 +26,7 @@ import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -38,6 +39,8 @@ import 
org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
+import static 
org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
+
 /**
  *
  */
@@ -118,6 +121,7 @@ public class GridDhtAtomicSingleUpdateRequest extends 
GridDhtAtomicAbstractUpdat
      * @param addPrevVal If {@code true} adds previous value.
      * @param prevVal Previous value.
      * @param updateCntr Update counter.
+     * @param cacheOp Corresponding cache operation.
      */
     @Override public void addWriteValue(KeyCacheObject key,
         @Nullable CacheObject val,
@@ -127,8 +131,8 @@ public class GridDhtAtomicSingleUpdateRequest extends 
GridDhtAtomicAbstractUpdat
         @Nullable GridCacheVersion conflictVer,
         boolean addPrevVal,
         @Nullable CacheObject prevVal,
-        long updateCntr
-    ) {
+        long updateCntr,
+        GridCacheOperation cacheOp) {
         assert entryProcessor == null;
         assert ttl <= 0 : ttl;
         assert conflictExpireTime <= 0 : conflictExpireTime;
@@ -144,6 +148,9 @@ public class GridDhtAtomicSingleUpdateRequest extends 
GridDhtAtomicAbstractUpdat
             this.prevVal = prevVal;
 
         this.updateCntr = updateCntr;
+
+        if (cacheOp == TRANSFORM)
+            setFlag(true, DHT_ATOMIC_TRANSFORM_OP_FLAG_MASK);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/5e8669af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index 6f3f530..31439d6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -201,8 +202,8 @@ public class GridDhtAtomicUpdateRequest extends 
GridDhtAtomicAbstractUpdateReque
         @Nullable GridCacheVersion conflictVer,
         boolean addPrevVal,
         @Nullable CacheObject prevVal,
-        long updateCntr
-    ) {
+        long updateCntr,
+        GridCacheOperation cacheOp) {
         assert key.partition() >= 0 : key;
 
         keys.add(key);

http://git-wip-us.apache.org/repos/asf/ignite/blob/5e8669af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 2f832b4..23c2480 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -197,7 +197,8 @@ public class GridNearAtomicCache<K, V> extends 
GridNearCacheAdapter<K, V> {
                     req.keepBinary(),
                     req.nodeId(),
                     req.subjectId(),
-                    taskName);
+                    taskName,
+                    req.operation() == TRANSFORM);
             }
             catch (IgniteCheckedException e) {
                 res.addFailedKey(key, new IgniteCheckedException("Failed to 
update key in near cache: " + key, e));
@@ -214,6 +215,7 @@ public class GridNearAtomicCache<K, V> extends 
GridNearCacheAdapter<K, V> {
      * @param nodeId Node ID.
      * @param subjId Subject ID.
      * @param taskName Task name.
+     * @param transformedValue {@code True} if transformed value.
      * @throws IgniteCheckedException If failed.
      */
     private void processNearAtomicUpdateResponse(
@@ -225,8 +227,8 @@ public class GridNearAtomicCache<K, V> extends 
GridNearCacheAdapter<K, V> {
         boolean keepBinary,
         UUID nodeId,
         UUID subjId,
-        String taskName
-    ) throws IgniteCheckedException {
+        String taskName,
+        boolean transformedValue) throws IgniteCheckedException {
         try {
             while (true) {
                 GridCacheEntryEx entry = null;
@@ -266,7 +268,8 @@ public class GridNearAtomicCache<K, V> extends 
GridNearCacheAdapter<K, V> {
                         taskName,
                         null,
                         null,
-                        null);
+                        null,
+                        transformedValue);
 
                     if (updRes.removeVersion() != null)
                         ctx.onDeferredDelete(entry, updRes.removeVersion());
@@ -365,7 +368,8 @@ public class GridNearAtomicCache<K, V> extends 
GridNearCacheAdapter<K, V> {
                             taskName,
                             null,
                             null,
-                            null);
+                            null,
+                            false);
 
                         if (updRes.removeVersion() != null)
                             ctx.onDeferredDelete(entry, 
updRes.removeVersion());

http://git-wip-us.apache.org/repos/asf/ignite/blob/5e8669af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index a9d7fa1..b96dbdc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -560,6 +560,10 @@ public class GridLocalAtomicCache<K, V> extends 
GridLocalCache<K, V> {
         if (keyCheck)
             validateCacheKeys(keys);
 
+        final boolean statsEnabled = ctx.statisticsEnabled();
+
+        final long start = statsEnabled ? System.nanoTime() : 0L;
+
         Map<? extends K, EntryProcessor> invokeMap = F.viewAsMap(keys, new 
C1<K, EntryProcessor>() {
             @Override public EntryProcessor apply(K k) {
                 return entryProcessor;
@@ -570,17 +574,23 @@ public class GridLocalAtomicCache<K, V> extends 
GridLocalCache<K, V> {
 
         final boolean keepBinary = opCtx != null && opCtx.isKeepBinary();
 
-        return (Map<K, EntryProcessorResult<T>>)updateAllInternal(TRANSFORM,
-            invokeMap.keySet(),
-            invokeMap.values(),
-            args,
-            expiryPerCall(),
-            false,
-            false,
-            null,
-            ctx.writeThrough(),
-            ctx.readThrough(),
-            keepBinary);
+        Map<K, EntryProcessorResult<T>> entryProcessorRes = (Map<K, 
EntryProcessorResult<T>>) updateAllInternal(
+                TRANSFORM,
+                invokeMap.keySet(),
+                invokeMap.values(),
+                args,
+                expiryPerCall(),
+                false,
+                false,
+                null,
+                ctx.writeThrough(),
+                ctx.readThrough(),
+                keepBinary);
+
+        if (statsEnabled)
+            metrics0().addInvokeTimeNanos(System.nanoTime() - start);
+
+        return entryProcessorRes;
     }
 
     /** {@inheritDoc} */
@@ -593,6 +603,10 @@ public class GridLocalAtomicCache<K, V> extends 
GridLocalCache<K, V> {
         if (keyCheck)
             validateCacheKey(key);
 
+        final boolean statsEnabled = ctx.statisticsEnabled();
+
+        final long start = statsEnabled ? System.nanoTime() : 0L;
+
         Map<? extends K, EntryProcessor> invokeMap =
             Collections.singletonMap(key, (EntryProcessor)entryProcessor);
 
@@ -608,6 +622,9 @@ public class GridLocalAtomicCache<K, V> extends 
GridLocalCache<K, V> {
                 throws IgniteCheckedException {
                 Map<K, EntryProcessorResult<T>> resMap = fut.get();
 
+                if (statsEnabled)
+                    metrics0().addInvokeTimeNanos(System.nanoTime() - start);
+
                 if (resMap != null) {
                     assert resMap.isEmpty() || resMap.size() == 1 : 
resMap.size();
 
@@ -630,18 +647,27 @@ public class GridLocalAtomicCache<K, V> extends 
GridLocalCache<K, V> {
         if (keyCheck)
             validateCacheKeys(keys);
 
+        final boolean statsEnabled = ctx.statisticsEnabled();
+
+        final long start = statsEnabled ? System.nanoTime() : 0L;
+
         Map<? extends K, EntryProcessor> invokeMap = F.viewAsMap(keys, new 
C1<K, EntryProcessor>() {
             @Override public EntryProcessor apply(K k) {
                 return entryProcessor;
             }
         });
 
-        return updateAllAsync0(null,
+        IgniteInternalFuture fut = updateAllAsync0(null,
             invokeMap,
             args,
             true,
             false,
             null);
+
+        if (statsEnabled)
+            fut.listen(new InvokeAllTimeStatClosure(metrics0(), start));
+
+        return fut;
     }
 
     /** {@inheritDoc} */
@@ -654,19 +680,29 @@ public class GridLocalAtomicCache<K, V> extends 
GridLocalCache<K, V> {
         if (keyCheck)
             validateCacheKeys(map.keySet());
 
+        final boolean statsEnabled = ctx.statisticsEnabled();
+
+        final long start = statsEnabled ? System.nanoTime() : 0L;
+
         CacheOperationContext opCtx = ctx.operationContextPerCall();
 
-        return (Map<K, EntryProcessorResult<T>>)updateAllInternal(TRANSFORM,
-            map.keySet(),
-            map.values(),
-            args,
-            expiryPerCall(),
-            false,
-            false,
-            null,
-            ctx.writeThrough(),
-            ctx.readThrough(),
-            opCtx != null && opCtx.isKeepBinary());
+        Map<K, EntryProcessorResult<T>> entryProcessorResult = (Map<K, 
EntryProcessorResult<T>>) updateAllInternal(
+                TRANSFORM,
+                map.keySet(),
+                map.values(),
+                args,
+                expiryPerCall(),
+                false,
+                false,
+                null,
+                ctx.writeThrough(),
+                ctx.readThrough(),
+                opCtx != null && opCtx.isKeepBinary());
+
+        if (statsEnabled)
+            metrics0().addInvokeTimeNanos(System.nanoTime() - start);
+
+        return entryProcessorResult;
     }
 
     /** {@inheritDoc} */
@@ -679,12 +715,21 @@ public class GridLocalAtomicCache<K, V> extends 
GridLocalCache<K, V> {
         if (keyCheck)
             validateCacheKeys(map.keySet());
 
-        return updateAllAsync0(null,
+        final boolean statsEnabled = ctx.statisticsEnabled();
+
+        final long start = statsEnabled ? System.nanoTime() : 0L;
+
+        IgniteInternalFuture fut = updateAllAsync0(null,
             map,
             args,
             true,
             false,
             null);
+
+        if (statsEnabled)
+            fut.listen(new InvokeAllTimeStatClosure(metrics0(), start));
+
+        return fut;
     }
 
     /**
@@ -893,7 +938,8 @@ public class GridLocalAtomicCache<K, V> extends 
GridLocalCache<K, V> {
                             filters,
                             intercept,
                             subjId,
-                            taskName);
+                            taskName,
+                            false);
 
                         if (op == TRANSFORM) {
                             if (t.get3() != null) {
@@ -1076,7 +1122,8 @@ public class GridLocalAtomicCache<K, V> extends 
GridLocalCache<K, V> {
                                 validation = true;
 
                                 ctx.validateKeyAndValue(entry.key(), updated);
-                            }
+                            } else if (ctx.statisticsEnabled() && 
!invokeEntry.modified())
+                                ctx.cache().metrics0().onReadOnlyInvoke(old != 
null);
                         }
                         catch (Exception e) {
                             invokeRes = CacheInvokeResult.fromError(e);
@@ -1115,7 +1162,8 @@ public class GridLocalAtomicCache<K, V> extends 
GridLocalCache<K, V> {
                                     keepBinary,
                                     err,
                                     subjId,
-                                    taskName);
+                                    taskName,
+                                    true);
 
                                 putMap = null;
                                 writeVals = null;
@@ -1153,7 +1201,8 @@ public class GridLocalAtomicCache<K, V> extends 
GridLocalCache<K, V> {
                                     keepBinary,
                                     err,
                                     subjId,
-                                    taskName);
+                                    taskName,
+                                    true);
 
                                 rmvKeys = null;
 
@@ -1258,7 +1307,8 @@ public class GridLocalAtomicCache<K, V> extends 
GridLocalCache<K, V> {
                     keepBinary,
                     err,
                     subjId,
-                    taskName);
+                    taskName,
+                    op == TRANSFORM);
             }
             else
                 assert filtered.isEmpty();
@@ -1283,6 +1333,7 @@ public class GridLocalAtomicCache<K, V> extends 
GridLocalCache<K, V> {
      * @param err Optional partial update exception.
      * @param subjId Subject ID.
      * @param taskName Task name.
+     * @param transformed {@code True} if transform operation performed.
      * @return Partial update exception.
      */
     @SuppressWarnings({"unchecked", "ConstantConditions", 
"ForLoopReplaceableByForEach"})
@@ -1296,8 +1347,8 @@ public class GridLocalAtomicCache<K, V> extends 
GridLocalCache<K, V> {
         boolean keepBinary,
         @Nullable CachePartialUpdateCheckedException err,
         UUID subjId,
-        String taskName
-    ) {
+        String taskName,
+        boolean transformed) {
         assert putMap == null ^ rmvKeys == null;
         GridCacheOperation op;
 
@@ -1373,7 +1424,8 @@ public class GridLocalAtomicCache<K, V> extends 
GridLocalCache<K, V> {
                     null,
                     false,
                     subjId,
-                    taskName);
+                    taskName,
+                    transformed);
 
                 if (intercept) {
                     if (op == UPDATE)

http://git-wip-us.apache.org/repos/asf/ignite/blob/5e8669af/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 3cf1146..15df637 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -1578,6 +1578,8 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter implement
 
             GridCacheOperation op = modified ? (cacheVal == null ? DELETE : 
UPDATE) : NOOP;
 
+            txEntry.entryProcessorCalculatedValue(new T2<>(op, op == NOOP ? 
null : cacheVal));
+
             if (op == NOOP) {
                 ExpiryPolicy expiry = cacheCtx.expiryForTxEntry(txEntry);
 

Reply via email to