This is an automated email from the ASF dual-hosted git repository.

agura pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 5933b4e  IGNITE-12231 Fixed logging RollbackRecords to WAL
5933b4e is described below

commit 5933b4ec62de26ce25a8cc1a5be9ba4fcab7befc
Author: Andrey Gura <ag...@apache.org>
AuthorDate: Wed Sep 25 16:21:29 2019 +0300

    IGNITE-12231 Fixed logging RollbackRecords to WAL
---
 .../dht/topology/GridDhtPartitionTopologyImpl.java | 16 +++-
 .../cache/transactions/IgniteTxHandler.java        | 91 ++++++++++++----------
 2 files changed, 65 insertions(+), 42 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
index e5d5651..2078eab 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
@@ -42,6 +42,7 @@ import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
 import org.apache.ignite.internal.pagemem.wal.record.RollbackRecord;
 import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -2734,6 +2735,8 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
         ctx.database().checkpointReadLock();
 
         try {
+            WALPointer ptr = null;
+
             lock.readLock().lock();
 
             try {
@@ -2764,7 +2767,7 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                                         gapStart - 1, gapStop - gapStart + 1);
 
                                     try {
-                                        ctx.wal().log(rec);
+                                        ptr = ctx.wal().log(rec);
                                     }
                                     catch (IgniteCheckedException e) {
                                         throw new IgniteException(e);
@@ -2779,7 +2782,16 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                 }
             }
             finally {
-                lock.readLock().unlock();
+                try {
+                    if (ptr != null)
+                        ctx.wal().flush(ptr, false);
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+                finally {
+                    lock.readLock().unlock();
+                }
             }
         }
         finally {
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index e62665f..4055709 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -31,9 +31,11 @@ import org.apache.ignite.failure.FailureType;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
 import org.apache.ignite.internal.pagemem.wal.record.RollbackRecord;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
@@ -2288,65 +2290,74 @@ public class IgniteTxHandler {
         if (counters == null)
             return;
 
-        for (PartitionUpdateCountersMessage counter : counters) {
-            GridCacheContext ctx0 = ctx.cacheContext(counter.cacheId());
+        WALPointer ptr = null;
 
-            GridDhtPartitionTopology top = ctx0.topology();
+        try {
+            for (PartitionUpdateCountersMessage counter : counters) {
+                GridCacheContext ctx0 = ctx.cacheContext(counter.cacheId());
 
-            AffinityTopologyVersion topVer = top.readyTopologyVersion();
+                GridDhtPartitionTopology top = ctx0.topology();
 
-            assert top != null;
+                AffinityTopologyVersion topVer = top.readyTopologyVersion();
 
-            for (int i = 0; i < counter.size(); i++) {
-                boolean invalid = false;
+                assert top != null;
 
-                try {
-                    GridDhtLocalPartition part = 
top.localPartition(counter.partition(i));
+                for (int i = 0; i < counter.size(); i++) {
+                    boolean invalid = false;
 
-                    if (part != null && part.reserve()) {
-                        try {
-                            if (part.state() != GridDhtPartitionState.RENTING) 
{ // Check is actual only for backup node.
-                                long start = counter.initialCounter(i);
-                                long delta = counter.updatesCount(i);
+                    try {
+                        GridDhtLocalPartition part = 
top.localPartition(counter.partition(i));
+
+                        if (part != null && part.reserve()) {
+                            try {
+                                if (part.state() != 
GridDhtPartitionState.RENTING) { // Check is actual only for backup node.
+                                    long start = counter.initialCounter(i);
+                                    long delta = counter.updatesCount(i);
 
-                                boolean updated = part.updateCounter(start, 
delta);
+                                    boolean updated = 
part.updateCounter(start, delta);
 
-                                // Need to log rolled back range for logical 
recovery.
-                                if (updated && rollback) {
-                                    if (part.group().persistenceEnabled() &&
-                                        part.group().walEnabled() &&
-                                        !part.group().mvccEnabled()) {
-                                        RollbackRecord rec = new 
RollbackRecord(part.group().groupId(), part.id(),
-                                            start, delta);
+                                    // Need to log rolled back range for 
logical recovery.
+                                    if (updated && rollback) {
+                                        CacheGroupContext grpCtx = 
part.group();
 
-                                        ctx.wal().log(rec);
-                                    }
+                                        if (grpCtx.persistenceEnabled() && 
grpCtx.walEnabled() && !grpCtx.mvccEnabled()) {
+                                            RollbackRecord rec =
+                                                new 
RollbackRecord(grpCtx.groupId(), part.id(), start, delta);
+
+                                            ptr = ctx.wal().log(rec);
+                                        }
 
-                                    for (int cntr = 1; cntr <= delta; cntr++) {
-                                        
ctx0.continuousQueries().skipUpdateCounter(null, part.id(), start + cntr,
-                                            topVer, rollbackOnPrimary);
+                                        for (int cntr = 1; cntr <= delta; 
cntr++) {
+                                            
ctx0.continuousQueries().skipUpdateCounter(null, part.id(), start + cntr,
+                                                topVer, rollbackOnPrimary);
+                                        }
                                     }
                                 }
+                                else
+                                    invalid = true;
+                            }
+                            finally {
+                                part.release();
                             }
-                            else
-                                invalid = true;
-                        }
-                        finally {
-                            part.release();
                         }
+                        else
+                            invalid = true;
                     }
-                    else
+                    catch (GridDhtInvalidPartitionException e) {
                         invalid = true;
-                }
-                catch (GridDhtInvalidPartitionException e) {
-                    invalid = true;
-                }
+                    }
 
-                if (invalid && log.isDebugEnabled())
-                    log.debug("Received partition update counters message for 
invalid partition, ignoring: " +
-                        "[cacheId=" + counter.cacheId() + ", part=" + 
counter.partition(i) + "]");
+                    if (invalid && log.isDebugEnabled()) {
+                        log.debug("Received partition update counters message 
for invalid partition, ignoring: " +
+                            "[cacheId=" + counter.cacheId() + ", part=" + 
counter.partition(i) + ']');
+                    }
+                }
             }
         }
+        finally {
+            if (ptr != null)
+                ctx.wal().flush(ptr, false);
+        }
     }
 
     /**

Reply via email to