This is an automated email from the ASF dual-hosted git repository. agura pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 5933b4e IGNITE-12231 Fixed logging RollbackRecords to WAL 5933b4e is described below commit 5933b4ec62de26ce25a8cc1a5be9ba4fcab7befc Author: Andrey Gura <ag...@apache.org> AuthorDate: Wed Sep 25 16:21:29 2019 +0300 IGNITE-12231 Fixed logging RollbackRecords to WAL --- .../dht/topology/GridDhtPartitionTopologyImpl.java | 16 +++- .../cache/transactions/IgniteTxHandler.java | 91 ++++++++++++---------- 2 files changed, 65 insertions(+), 42 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java index e5d5651..2078eab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java @@ -42,6 +42,7 @@ import org.apache.ignite.events.EventType; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.pagemem.wal.WALPointer; import org.apache.ignite.internal.pagemem.wal.record.RollbackRecord; import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; @@ -2734,6 +2735,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { ctx.database().checkpointReadLock(); try { + WALPointer ptr = null; + lock.readLock().lock(); try { @@ -2764,7 +2767,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { gapStart - 1, gapStop - gapStart + 1); try { - ctx.wal().log(rec); + ptr = ctx.wal().log(rec); } catch (IgniteCheckedException e) { throw new IgniteException(e); @@ -2779,7 +2782,16 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } } finally { - lock.readLock().unlock(); + try { + if (ptr != null) + ctx.wal().flush(ptr, false); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + finally { + lock.readLock().unlock(); + } } } finally { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index e62665f..4055709 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -31,9 +31,11 @@ import org.apache.ignite.failure.FailureType; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.pagemem.wal.WALPointer; import org.apache.ignite.internal.pagemem.wal.record.RollbackRecord; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; @@ -2288,65 +2290,74 @@ public class IgniteTxHandler { if (counters == null) return; - for (PartitionUpdateCountersMessage counter : counters) { - GridCacheContext ctx0 = ctx.cacheContext(counter.cacheId()); + WALPointer ptr = null; - GridDhtPartitionTopology top = ctx0.topology(); + try { + for (PartitionUpdateCountersMessage counter : counters) { + GridCacheContext ctx0 = ctx.cacheContext(counter.cacheId()); - AffinityTopologyVersion topVer = top.readyTopologyVersion(); + GridDhtPartitionTopology top = ctx0.topology(); - assert top != null; + AffinityTopologyVersion topVer = top.readyTopologyVersion(); - for (int i = 0; i < counter.size(); i++) { - boolean invalid = false; + assert top != null; - try { - GridDhtLocalPartition part = top.localPartition(counter.partition(i)); + for (int i = 0; i < counter.size(); i++) { + boolean invalid = false; - if (part != null && part.reserve()) { - try { - if (part.state() != GridDhtPartitionState.RENTING) { // Check is actual only for backup node. - long start = counter.initialCounter(i); - long delta = counter.updatesCount(i); + try { + GridDhtLocalPartition part = top.localPartition(counter.partition(i)); + + if (part != null && part.reserve()) { + try { + if (part.state() != GridDhtPartitionState.RENTING) { // Check is actual only for backup node. + long start = counter.initialCounter(i); + long delta = counter.updatesCount(i); - boolean updated = part.updateCounter(start, delta); + boolean updated = part.updateCounter(start, delta); - // Need to log rolled back range for logical recovery. - if (updated && rollback) { - if (part.group().persistenceEnabled() && - part.group().walEnabled() && - !part.group().mvccEnabled()) { - RollbackRecord rec = new RollbackRecord(part.group().groupId(), part.id(), - start, delta); + // Need to log rolled back range for logical recovery. + if (updated && rollback) { + CacheGroupContext grpCtx = part.group(); - ctx.wal().log(rec); - } + if (grpCtx.persistenceEnabled() && grpCtx.walEnabled() && !grpCtx.mvccEnabled()) { + RollbackRecord rec = + new RollbackRecord(grpCtx.groupId(), part.id(), start, delta); + + ptr = ctx.wal().log(rec); + } - for (int cntr = 1; cntr <= delta; cntr++) { - ctx0.continuousQueries().skipUpdateCounter(null, part.id(), start + cntr, - topVer, rollbackOnPrimary); + for (int cntr = 1; cntr <= delta; cntr++) { + ctx0.continuousQueries().skipUpdateCounter(null, part.id(), start + cntr, + topVer, rollbackOnPrimary); + } } } + else + invalid = true; + } + finally { + part.release(); } - else - invalid = true; - } - finally { - part.release(); } + else + invalid = true; } - else + catch (GridDhtInvalidPartitionException e) { invalid = true; - } - catch (GridDhtInvalidPartitionException e) { - invalid = true; - } + } - if (invalid && log.isDebugEnabled()) - log.debug("Received partition update counters message for invalid partition, ignoring: " + - "[cacheId=" + counter.cacheId() + ", part=" + counter.partition(i) + "]"); + if (invalid && log.isDebugEnabled()) { + log.debug("Received partition update counters message for invalid partition, ignoring: " + + "[cacheId=" + counter.cacheId() + ", part=" + counter.partition(i) + ']'); + } + } } } + finally { + if (ptr != null) + ctx.wal().flush(ptr, false); + } } /**